Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

automatically retry returning data in syncv3 #652

Merged
merged 3 commits into from
Jan 4, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 31 additions & 19 deletions src/api/client/sync/v3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,33 @@ pub(crate) async fn sync_events_route(
// Setup watchers, so if there's no response, we can wait for them
let watcher = services.sync.watch(sender_user, sender_device);

let response = build_sync_events(&services, &body).await?;
if body.body.full_state
|| !(response.rooms.is_empty()
&& response.presence.is_empty()
&& response.account_data.is_empty()
&& response.device_lists.is_empty()
&& response.to_device.is_empty())
{
return Ok(response);
}

// Hang a few seconds so requests are not spammed
// Stop hanging if new info arrives
let default = Duration::from_secs(30);
let duration = cmp::min(body.body.timeout.unwrap_or(default), default);
_ = tokio::time::timeout(duration, watcher).await;

// Retry returning data
build_sync_events(&services, &body).await
}

pub(crate) async fn build_sync_events(
services: &Services,
body: &Ruma<sync_events::v3::Request>,
) -> Result<sync_events::v3::Response, RumaResponse<UiaaResponse>> {
let (sender_user, sender_device) = body.sender();

let next_batch = services.globals.current_count()?;
let next_batch_string = next_batch.to_string();

Expand Down Expand Up @@ -163,7 +190,7 @@ pub(crate) async fn sync_events_route(
.map(ToOwned::to_owned)
.broad_filter_map(|room_id| {
load_joined_room(
&services,
services,
sender_user,
sender_device,
room_id.clone(),
Expand Down Expand Up @@ -196,7 +223,7 @@ pub(crate) async fn sync_events_route(
.rooms_left(sender_user)
.broad_filter_map(|(room_id, _)| {
handle_left_room(
&services,
services,
since,
room_id.clone(),
sender_user,
Expand Down Expand Up @@ -242,7 +269,7 @@ pub(crate) async fn sync_events_route(
let presence_updates: OptionFuture<_> = services
.globals
.allow_local_presence()
.then(|| process_presence_updates(&services, since, sender_user))
.then(|| process_presence_updates(services, since, sender_user))
.into();

let account_data = services
Expand Down Expand Up @@ -292,7 +319,7 @@ pub(crate) async fn sync_events_route(
.stream()
.broad_filter_map(|user_id| async move {
let no_shared_encrypted_room =
!share_encrypted_room(&services, sender_user, &user_id, None).await;
!share_encrypted_room(services, sender_user, &user_id, None).await;
no_shared_encrypted_room.then_some(user_id)
})
.ready_fold(HashSet::new(), |mut device_list_left, user_id| {
Expand Down Expand Up @@ -327,21 +354,6 @@ pub(crate) async fn sync_events_route(
to_device: ToDevice { events: to_device_events },
};

// TODO: Retry the endpoint instead of returning
if !full_state
&& response.rooms.is_empty()
&& response.presence.is_empty()
&& response.account_data.is_empty()
&& response.device_lists.is_empty()
&& response.to_device.is_empty()
{
// Hang a few seconds so requests are not spammed
// Stop hanging if new info arrives
let default = Duration::from_secs(30);
let duration = cmp::min(body.body.timeout.unwrap_or(default), default);
_ = tokio::time::timeout(duration, watcher).await;
}

Ok(response)
}

Expand Down
Loading