From c7ea71804b8e4359b3f1956c5efdb13c4d70a764 Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Wed, 22 Jan 2025 17:23:25 +0100 Subject: [PATCH 1/5] refactor(timeline): always unstash related pending reactions, even if the event has been redacted --- .../src/timeline/event_handler.rs | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/crates/matrix-sdk-ui/src/timeline/event_handler.rs b/crates/matrix-sdk-ui/src/timeline/event_handler.rs index 6759a2e525..a7dee922f2 100644 --- a/crates/matrix-sdk-ui/src/timeline/event_handler.rs +++ b/crates/matrix-sdk-ui/src/timeline/event_handler.rs @@ -1326,30 +1326,30 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> { &mut self, content: &TimelineItemContent, ) -> Option { + let event_id = self.ctx.flow.event_id()?; + let reactions = self.meta.reactions.pending.remove(event_id)?; + // Drop pending reactions if the message is redacted. if let TimelineItemContent::RedactedMessage = content { return None; } - self.ctx.flow.event_id().and_then(|event_id| { - let reactions = self.meta.reactions.pending.remove(event_id)?; - let mut bundled = ReactionsByKeyBySender::default(); + let mut bundled = ReactionsByKeyBySender::default(); - for (reaction_event_id, reaction) in reactions { - let group: &mut IndexMap = - bundled.entry(reaction.key).or_default(); + for (reaction_event_id, reaction) in reactions { + let group: &mut IndexMap = + bundled.entry(reaction.key).or_default(); - group.insert( - reaction.sender_id, - ReactionInfo { - timestamp: reaction.timestamp, - status: ReactionStatus::RemoteToRemote(reaction_event_id), - }, - ); - } + group.insert( + reaction.sender_id, + ReactionInfo { + timestamp: reaction.timestamp, + status: ReactionStatus::RemoteToRemote(reaction_event_id), + }, + ); + } - Some(bundled) - }) + Some(bundled) } } From b09557b149220d573cc63b8956c9b6c54497312e Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Wed, 22 Jan 2025 18:31:38 +0100 Subject: [PATCH 2/5] test(timeline): use the MatrixMockServer more --- .../tests/integration/timeline/reactions.rs | 81 ++++++------------- 1 file changed, 26 insertions(+), 55 deletions(-) diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/reactions.rs b/crates/matrix-sdk-ui/tests/integration/timeline/reactions.rs index 23dc827b24..c8b092f871 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/reactions.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/reactions.rs @@ -17,21 +17,13 @@ use std::{sync::Mutex, time::Duration}; use assert_matches2::{assert_let, assert_matches}; use eyeball_im::VectorDiff; use futures_util::{FutureExt as _, StreamExt as _}; -use matrix_sdk::test_utils::{logged_in_client_with_server, mocks::MatrixMockServer}; -use matrix_sdk_test::{ - async_test, event_factory::EventFactory, mocks::mock_encryption_state, JoinedRoomBuilder, - SyncResponseBuilder, ALICE, -}; +use matrix_sdk::test_utils::mocks::MatrixMockServer; +use matrix_sdk_test::{async_test, event_factory::EventFactory, JoinedRoomBuilder, ALICE}; use matrix_sdk_ui::timeline::{ReactionStatus, RoomExt as _}; use ruma::{event_id, events::room::message::RoomMessageEventContent, room_id}; use serde_json::json; use stream_assert::assert_pending; -use wiremock::{ - matchers::{header, method, path_regex}, - Mock, ResponseTemplate, -}; - -use crate::mock_sync; +use wiremock::ResponseTemplate; #[async_test] async fn test_abort_before_being_sent() { @@ -186,21 +178,15 @@ async fn test_redact_failed() { // This test checks that if a reaction redaction failed, then we re-insert the // reaction after displaying it was removed. - let room_id = room_id!("!a98sd12bjh:example.org"); - let (client, server) = logged_in_client_with_server().await; + let server = MatrixMockServer::new().await; + let client = server.client_builder().build().await; let user_id = client.user_id().unwrap(); - // Make the test aware of the room. - let mut sync_builder = SyncResponseBuilder::new(); - sync_builder.add_joined_room(JoinedRoomBuilder::new(room_id)); - - mock_sync(&server, sync_builder.build_json_sync_response(), None).await; - let _response = client.sync_once(Default::default()).await.unwrap(); - server.reset().await; + let room_id = room_id!("!a98sd12bjh:example.org"); + let room = server.sync_joined_room(&client, room_id).await; - mock_encryption_state(&server, false).await; + server.mock_room_state_encryption().plain().mount().await; - let room = client.get_room(room_id).unwrap(); let timeline = room.timeline().await.unwrap(); let (initial_items, mut stream) = timeline.subscribe_batched().await; @@ -209,15 +195,14 @@ async fn test_redact_failed() { let f = EventFactory::new(); let event_id = event_id!("$1"); - sync_builder.add_joined_room( - JoinedRoomBuilder::new(room_id) - .add_timeline_event(f.text_msg("hello").sender(&ALICE).event_id(event_id)) - .add_timeline_event(f.reaction(event_id, "😆").sender(user_id)), - ); - - mock_sync(&server, sync_builder.build_json_sync_response(), None).await; - let _response = client.sync_once(Default::default()).await.unwrap(); - server.reset().await; + server + .sync_room( + &client, + JoinedRoomBuilder::new(room_id) + .add_timeline_event(f.text_msg("hello").sender(&ALICE).event_id(event_id)) + .add_timeline_event(f.reaction(event_id, "😆").sender(user_id)), + ) + .await; assert_let!(Some(timeline_updates) = stream.next().await); assert_eq!(timeline_updates.len(), 3); @@ -239,15 +224,7 @@ async fn test_redact_failed() { assert!(date_divider.is_date_divider()); // Now, redact the annotation we previously added. - - Mock::given(method("PUT")) - .and(path_regex(r"^/_matrix/client/r0/rooms/.*/redact/.*?/.*?")) - .and(header("authorization", "Bearer 1234")) - .respond_with(ResponseTemplate::new(500)) - .expect(1) - .named("redact") - .mount(&server) - .await; + server.mock_room_redact().error500().named("redact").mock_once().mount().await; // We toggle the reaction, which fails with an error. timeline.toggle_reaction(&item_id, "😆").await.unwrap_err(); @@ -272,21 +249,15 @@ async fn test_local_reaction_to_local_echo() { // This test checks that if a reaction redaction failed, then we re-insert the // reaction after displaying it was removed. - let room_id = room_id!("!a98sd12bjh:example.org"); - let (client, server) = logged_in_client_with_server().await; + let server = MatrixMockServer::new().await; + let client = server.client_builder().build().await; let user_id = client.user_id().unwrap(); - // Make the test aware of the room. - let mut sync_builder = SyncResponseBuilder::new(); - sync_builder.add_joined_room(JoinedRoomBuilder::new(room_id)); - - mock_sync(&server, sync_builder.build_json_sync_response(), None).await; - let _response = client.sync_once(Default::default()).await.unwrap(); - server.reset().await; + let room_id = room_id!("!a98sd12bjh:example.org"); + let room = server.sync_joined_room(&client, room_id).await; - mock_encryption_state(&server, false).await; + server.mock_room_state_encryption().plain().mount().await; - let room = client.get_room(room_id).unwrap(); let timeline = room.timeline().await.unwrap(); let (initial_items, mut stream) = timeline.subscribe_batched().await; @@ -295,9 +266,9 @@ async fn test_local_reaction_to_local_echo() { // Add a duration to the response, so we can check other things in the // meanwhile. let next_event_id = Mutex::new(0); - Mock::given(method("PUT")) - .and(path_regex(r"^/_matrix/client/r0/rooms/.*/send/.*")) - .and(header("authorization", "Bearer 1234")) + + server + .mock_room_send() .respond_with(move |_req: &wiremock::Request| { let mut next_event_id = next_event_id.lock().unwrap(); let event_id = *next_event_id; @@ -312,7 +283,7 @@ async fn test_local_reaction_to_local_echo() { tmp }) - .mount(&server) + .mount() .await; // Send a local event. From ce6742777efcf8cf0d38aa391c3605d7e367ead4 Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Wed, 22 Jan 2025 20:09:24 +0100 Subject: [PATCH 3/5] fix(timeline): when removing an item, re-stash its associated reactions --- .../src/timeline/controller/state.rs | 38 +++++++++- .../matrix-sdk-ui/src/timeline/tests/mod.rs | 10 +++ .../src/timeline/tests/reactions.rs | 76 ++++++++++++++++++- 3 files changed, 119 insertions(+), 5 deletions(-) diff --git a/crates/matrix-sdk-ui/src/timeline/controller/state.rs b/crates/matrix-sdk-ui/src/timeline/controller/state.rs index 75bd8512f8..b40b608d8c 100644 --- a/crates/matrix-sdk-ui/src/timeline/controller/state.rs +++ b/crates/matrix-sdk-ui/src/timeline/controller/state.rs @@ -62,9 +62,9 @@ use crate::{ }, event_item::{PollState, RemoteEventOrigin, ResponseData}, item::TimelineUniqueId, - reactions::Reactions, + reactions::{PendingReaction, Reactions}, traits::RoomDataProvider, - Profile, TimelineItem, TimelineItemKind, + Profile, ReactionStatus, TimelineItem, TimelineItemKind, }, unable_to_decrypt_hook::UtdHookManager, }; @@ -794,7 +794,39 @@ impl TimelineStateTransaction<'_> { if let Some(event_meta) = self.items.all_remote_events().get(event_index) { // Fetch the `timeline_item_index` associated to the remote event. if let Some(timeline_item_index) = event_meta.timeline_item_index { - let _removed_timeline_item = self.items.remove(timeline_item_index); + let removed_timeline_item = self.items.remove(timeline_item_index); + + // Restash reactions, if there were any. + if let Some(event_item) = removed_timeline_item.as_event() { + if let Some(event_id) = event_item.event_id() { + let target_event = event_id.to_owned(); + + for (key, user_to_reaction) in event_item.reactions().iter() { + for (user_id, reaction_info) in user_to_reaction { + let reaction_event = match &reaction_info.status { + ReactionStatus::LocalToLocal(_) + | ReactionStatus::LocalToRemote(_) => continue, + ReactionStatus::RemoteToRemote(event) => event, + }; + + let pending_reaction = PendingReaction { + key: key.to_owned(), + sender_id: user_id.to_owned(), + timestamp: reaction_info.timestamp, + }; + + self.meta + .reactions + .pending + .entry(target_event.clone()) + .or_default() + .insert(reaction_event.to_owned(), pending_reaction); + } + } + } + } + + // TODO restash poll responses/ends, edits. etc.? } // Now we can remove the remote event. diff --git a/crates/matrix-sdk-ui/src/timeline/tests/mod.rs b/crates/matrix-sdk-ui/src/timeline/tests/mod.rs index 2c23569b28..cf9e3d41eb 100644 --- a/crates/matrix-sdk-ui/src/timeline/tests/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/tests/mod.rs @@ -81,8 +81,10 @@ mod redaction; mod shields; mod virt; +/// A timeline instance used only for testing purposes in unit tests. struct TestTimeline { controller: TimelineController, + /// An [`EventFactory`] that can be used for creating events in this /// timeline. pub factory: EventFactory, @@ -241,6 +243,14 @@ impl TestTimeline { async fn handle_room_send_queue_update(&self, update: RoomSendQueueUpdate) { self.controller.handle_room_send_queue_update(update).await } + + async fn handle_event_update( + &self, + diffs: Vec>, + origin: RemoteEventOrigin, + ) { + self.controller.handle_remote_events_with_diffs(diffs, origin).await; + } } type ReadReceiptMap = diff --git a/crates/matrix-sdk-ui/src/timeline/tests/reactions.rs b/crates/matrix-sdk-ui/src/timeline/tests/reactions.rs index 78ca0d69b5..098341025c 100644 --- a/crates/matrix-sdk-ui/src/timeline/tests/reactions.rs +++ b/crates/matrix-sdk-ui/src/timeline/tests/reactions.rs @@ -18,13 +18,14 @@ use assert_matches2::{assert_let, assert_matches}; use eyeball_im::VectorDiff; use futures_core::Stream; use futures_util::{FutureExt as _, StreamExt as _}; -use matrix_sdk::deserialized_responses::TimelineEvent; +use imbl::vector; +use matrix_sdk::{assert_next_matches_with_timeout, deserialized_responses::TimelineEvent}; use matrix_sdk_test::{async_test, event_factory::EventFactory, sync_timeline_event, ALICE, BOB}; use ruma::{ event_id, events::AnyMessageLikeEventContent, server_name, uint, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, }; -use stream_assert::assert_next_matches; +use stream_assert::{assert_next_matches, assert_pending}; use tokio::time::timeout; use crate::timeline::{ @@ -232,3 +233,74 @@ async fn send_first_message( (item_id, event_id, position) } + +#[async_test] +async fn test_reinserted_item_keeps_reactions() { + // This test checks that after deduplicating events, the reactions attached to + // the deduplicated event are not lost. + let timeline = TestTimeline::new(); + + let f = EventFactory::new().sender(*ALICE); + + // We receive an initial update with one event and a reaction to this event. + let reaction_target = event_id!("$1"); + let target_event = f.text_msg("hey").sender(&BOB).event_id(reaction_target).into_event(); + let reaction_event = f + .reaction(reaction_target, REACTION_KEY) + .sender(&ALICE) + .event_id(event_id!("$2")) + .into_event(); + + let mut stream = timeline.subscribe_events().await; + + timeline + .handle_event_update( + vec![VectorDiff::Append { values: vector![target_event.clone(), reaction_event] }], + RemoteEventOrigin::Sync, + ) + .await; + + // Get the event. + assert_next_matches_with_timeout!(stream, VectorDiff::PushBack { value: item } => { + assert_eq!(item.content().as_message().unwrap().body(), "hey"); + assert!(item.reactions().is_empty()); + }); + + // Get the reaction. + assert_next_matches_with_timeout!(stream, VectorDiff::Set { index: 0, value: item } => { + assert_eq!(item.content().as_message().unwrap().body(), "hey"); + let reactions = item.reactions(); + assert_eq!(reactions.len(), 1); + reactions.get(REACTION_KEY).unwrap().get(*ALICE).unwrap(); + }); + + // And that's it for now. + assert_pending!(stream); + + // Then the event is removed and reinserted. This sequences of update is + // possible if the event cache decided to deduplicate a given event. + timeline + .handle_event_update( + vec![ + VectorDiff::Remove { index: 0 }, + VectorDiff::Insert { index: 0, value: target_event }, + ], + RemoteEventOrigin::Sync, + ) + .await; + + // The duplicate event is removed… + assert_next_matches_with_timeout!(stream, VectorDiff::Remove { index: 0 }); + + // …And reinserted. + assert_next_matches_with_timeout!(stream, VectorDiff::Insert { index: 0, value: item } => { + assert_eq!(item.content().as_message().unwrap().body(), "hey"); + // And it still includes the reaction from Alice. + let reactions = item.reactions(); + assert_eq!(reactions.len(), 1); + reactions.get(REACTION_KEY).unwrap().get(*ALICE).unwrap(); + }); + + // No other updates. + assert_pending!(stream); +} From 69715a9ba459bb3effc779f4e2035bc50dd6d28c Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Thu, 23 Jan 2025 17:15:50 +0100 Subject: [PATCH 4/5] refactor(timeline): keep poll aggregations around forever forever = the lifetime of a timeline. --- .../src/timeline/controller/aggregations.rs | 83 +++++++++++++++++++ .../src/timeline/controller/mod.rs | 3 + .../src/timeline/controller/state.rs | 72 ++-------------- .../src/timeline/event_handler.rs | 59 +++++++------ .../src/timeline/event_item/content/mod.rs | 7 +- .../src/timeline/event_item/content/polls.rs | 30 +++---- .../src/timeline/event_item/mod.rs | 1 - 7 files changed, 141 insertions(+), 114 deletions(-) create mode 100644 crates/matrix-sdk-ui/src/timeline/controller/aggregations.rs diff --git a/crates/matrix-sdk-ui/src/timeline/controller/aggregations.rs b/crates/matrix-sdk-ui/src/timeline/controller/aggregations.rs new file mode 100644 index 0000000000..f055effddd --- /dev/null +++ b/crates/matrix-sdk-ui/src/timeline/controller/aggregations.rs @@ -0,0 +1,83 @@ +// Copyright 2025 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use ruma::{EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedUserId}; + +use crate::timeline::PollState; + +#[derive(Clone, Debug)] +pub(crate) enum Aggregation { + PollResponse { + sender: OwnedUserId, + timestamp: MilliSecondsSinceUnixEpoch, + answers: Vec, + }, + + PollEnd { + end_date: MilliSecondsSinceUnixEpoch, + }, +} + +impl Aggregation { + pub fn apply_poll(&self, poll_state: &mut PollState) -> Result<(), AggregationError> { + match self { + Aggregation::PollResponse { sender, timestamp, answers } => { + poll_state.add_response(sender.clone(), *timestamp, answers.clone()); + } + Aggregation::PollEnd { end_date } => { + if !poll_state.end(*end_date) { + return Err(AggregationError::PollAlreadyEnded); + } + } + } + Ok(()) + } +} + +#[derive(Clone, Debug, Default)] +pub(crate) struct Aggregations { + stashed: HashMap>, +} + +impl Aggregations { + pub fn clear(&mut self) { + self.stashed.clear(); + } + + pub fn add(&mut self, event_id: OwnedEventId, aggregation: Aggregation) { + self.stashed.entry(event_id).or_default().push(aggregation); + } + + pub fn apply_poll( + &self, + event_id: &EventId, + poll_state: &mut PollState, + ) -> Result<(), AggregationError> { + let Some(aggregations) = self.stashed.get(event_id) else { + return Ok(()); + }; + for a in aggregations { + a.apply_poll(poll_state)?; + } + Ok(()) + } +} + +#[derive(Debug, thiserror::Error)] +pub(crate) enum AggregationError { + #[error("trying to end a poll twice")] + PollAlreadyEnded, +} diff --git a/crates/matrix-sdk-ui/src/timeline/controller/mod.rs b/crates/matrix-sdk-ui/src/timeline/controller/mod.rs index e5f963d152..66ab79af56 100644 --- a/crates/matrix-sdk-ui/src/timeline/controller/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/controller/mod.rs @@ -85,10 +85,13 @@ use crate::{ unable_to_decrypt_hook::UtdHookManager, }; +mod aggregations; mod observable_items; mod read_receipts; mod state; +pub(super) use aggregations::*; + /// Data associated to the current timeline focus. #[derive(Debug)] enum TimelineFocusData { diff --git a/crates/matrix-sdk-ui/src/timeline/controller/state.rs b/crates/matrix-sdk-ui/src/timeline/controller/state.rs index b40b608d8c..d4f6a3721b 100644 --- a/crates/matrix-sdk-ui/src/timeline/controller/state.rs +++ b/crates/matrix-sdk-ui/src/timeline/controller/state.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::{ - collections::HashMap, future::Future, num::NonZeroUsize, sync::{Arc, RwLock}, @@ -28,12 +27,8 @@ use matrix_sdk::{ use ruma::events::receipt::ReceiptEventContent; use ruma::{ events::{ - poll::{ - unstable_response::UnstablePollResponseEventContent, - unstable_start::NewUnstablePollStartEventContentWithoutRelation, - }, - relation::Replacement, - room::message::RoomMessageEventContentWithoutRelation, + poll::unstable_start::NewUnstablePollStartEventContentWithoutRelation, + relation::Replacement, room::message::RoomMessageEventContentWithoutRelation, AnySyncEphemeralRoomEvent, AnySyncTimelineEvent, }, push::Action, @@ -44,6 +39,7 @@ use ruma::{ use tracing::{debug, instrument, trace, warn}; use super::{ + aggregations::Aggregations, observable_items::{ AllRemoteEvents, ObservableItems, ObservableItemsTransaction, ObservableItemsTransactionEntry, @@ -60,7 +56,7 @@ use crate::{ Flow, HandleEventResult, TimelineEventContext, TimelineEventHandler, TimelineEventKind, TimelineItemPosition, }, - event_item::{PollState, RemoteEventOrigin, ResponseData}, + event_item::RemoteEventOrigin, item::TimelineUniqueId, reactions::{PendingReaction, Reactions}, traits::RoomDataProvider, @@ -969,58 +965,6 @@ impl TimelineStateTransaction<'_> { } } -/// Cache holding poll response and end events handled before their poll start -/// event has been handled. -#[derive(Clone, Debug, Default)] -pub(in crate::timeline) struct PendingPollEvents { - /// Responses to a poll (identified by the poll's start event id). - responses: HashMap>, - - /// Mapping of a poll (identified by its start event's id) to its end date. - end_dates: HashMap, -} - -impl PendingPollEvents { - pub(crate) fn add_response( - &mut self, - start_event_id: &EventId, - sender: &UserId, - timestamp: MilliSecondsSinceUnixEpoch, - content: &UnstablePollResponseEventContent, - ) { - self.responses.entry(start_event_id.to_owned()).or_default().push(ResponseData { - sender: sender.to_owned(), - timestamp, - answers: content.poll_response.answers.clone(), - }); - } - - pub(crate) fn clear(&mut self) { - self.end_dates.clear(); - self.responses.clear(); - } - - /// Mark a poll as finished by inserting its poll date. - pub(crate) fn mark_as_ended( - &mut self, - start_event_id: &EventId, - timestamp: MilliSecondsSinceUnixEpoch, - ) { - self.end_dates.insert(start_event_id.to_owned(), timestamp); - } - - /// Dumps all response and end events present in the cache that belong to - /// the given start_event_id into the given poll_state. - pub(crate) fn apply_pending(&mut self, start_event_id: &EventId, poll_state: &mut PollState) { - if let Some(pending_responses) = self.responses.remove(start_event_id) { - poll_state.response_data.extend(pending_responses); - } - if let Some(pending_end) = self.end_dates.remove(start_event_id) { - poll_state.end_event_timestamp = Some(pending_end); - } - } -} - #[derive(Clone)] pub(in crate::timeline) enum PendingEditKind { RoomMessage(Replacement), @@ -1096,8 +1040,8 @@ pub(in crate::timeline) struct TimelineMetadata { /// stashing pending reactions. pub reactions: Reactions, - /// Associated poll events received before their original poll start event. - pub pending_poll_events: PendingPollEvents, + /// Aggregation metadata and pending aggregations. + pub aggregations: Aggregations, /// Edit events received before the related event they're editing. pub pending_edits: RingBuffer, @@ -1136,7 +1080,7 @@ impl TimelineMetadata { own_user_id, next_internal_id: Default::default(), reactions: Default::default(), - pending_poll_events: Default::default(), + aggregations: Default::default(), pending_edits: RingBuffer::new(MAX_NUM_STASHED_PENDING_EDITS), fully_read_event: Default::default(), // It doesn't make sense to set this to false until we fill the `fully_read_event` @@ -1154,7 +1098,7 @@ impl TimelineMetadata { // Note: we don't clear the next internal id to avoid bad cases of stale unique // ids across timeline clears. self.reactions.clear(); - self.pending_poll_events.clear(); + self.aggregations.clear(); self.pending_edits.clear(); self.fully_read_event = None; // We forgot about the fully read marker right above, so wait for a new one diff --git a/crates/matrix-sdk-ui/src/timeline/event_handler.rs b/crates/matrix-sdk-ui/src/timeline/event_handler.rs index a7dee922f2..767a50e22c 100644 --- a/crates/matrix-sdk-ui/src/timeline/event_handler.rs +++ b/crates/matrix-sdk-ui/src/timeline/event_handler.rs @@ -53,8 +53,8 @@ use tracing::{debug, error, field::debug, info, instrument, trace, warn}; use super::{ algorithms::{rfind_event_by_id, rfind_event_item}, controller::{ - ObservableItemsTransaction, ObservableItemsTransactionEntry, PendingEdit, PendingEditKind, - TimelineMetadata, TimelineStateTransaction, + Aggregation, ObservableItemsTransaction, ObservableItemsTransactionEntry, PendingEdit, + PendingEditKind, TimelineMetadata, TimelineStateTransaction, }, date_dividers::DateDividerAdjuster, event_item::{ @@ -870,7 +870,9 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> { if let Some(event_id) = self.ctx.flow.event_id() { // Applying the cache to remote events only because local echoes // don't have an event ID that could be referenced by responses yet. - self.meta.pending_poll_events.apply_pending(event_id, &mut poll_state); + if let Err(err) = self.meta.aggregations.apply_poll(event_id, &mut poll_state) { + warn!("discarding poll aggregations: {err}"); + } } let edit_json = edit_json.flatten(); @@ -879,13 +881,18 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> { } fn handle_poll_response(&mut self, c: UnstablePollResponseEventContent) { - let Some((item_pos, item)) = rfind_event_by_id(self.items, &c.relates_to.event_id) else { - self.meta.pending_poll_events.add_response( - &c.relates_to.event_id, - &self.ctx.sender, - self.ctx.timestamp, - &c, - ); + let start_event_id = c.relates_to.event_id; + + self.meta.aggregations.add( + start_event_id.clone(), + Aggregation::PollResponse { + sender: self.ctx.sender.clone(), + timestamp: self.ctx.timestamp, + answers: c.poll_response.answers.clone(), + }, + ); + + let Some((item_pos, item)) = rfind_event_by_id(self.items, &start_event_id) else { return; }; @@ -893,14 +900,10 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> { return; }; - let new_item = item.with_content( - TimelineItemContent::Poll(poll_state.add_response( - &self.ctx.sender, - self.ctx.timestamp, - &c, - )), - None, - ); + let mut new_poll = poll_state.clone(); + new_poll.add_response(self.ctx.sender.clone(), self.ctx.timestamp, c.poll_response.answers); + + let new_item = item.with_content(TimelineItemContent::Poll(new_poll), None); trace!("Adding poll response."); self.items.replace(item_pos, TimelineItem::new(new_item, item.internal_id.to_owned())); @@ -908,8 +911,12 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> { } fn handle_poll_end(&mut self, c: UnstablePollEndEventContent) { - let Some((item_pos, item)) = rfind_event_by_id(self.items, &c.relates_to.event_id) else { - self.meta.pending_poll_events.mark_as_ended(&c.relates_to.event_id, self.ctx.timestamp); + let start_event_id = c.relates_to.event_id; + + let aggregation = Aggregation::PollEnd { end_date: self.ctx.timestamp }; + self.meta.aggregations.add(start_event_id.clone(), aggregation.clone()); + + let Some((item_pos, item)) = rfind_event_by_id(self.items, &start_event_id) else { return; }; @@ -917,17 +924,19 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> { return; }; - match poll_state.end(self.ctx.timestamp) { - Ok(poll_state) => { - let new_item = item.with_content(TimelineItemContent::Poll(poll_state), None); + let mut poll_state = poll_state.clone(); + match aggregation.apply_poll(&mut poll_state) { + Ok(()) => { trace!("Ending poll."); + let new_item = item.with_content(TimelineItemContent::Poll(poll_state), None); self.items .replace(item_pos, TimelineItem::new(new_item, item.internal_id.to_owned())); self.result.items_updated += 1; } - Err(_) => { - info!("Got multiple poll end events, discarding"); + + Err(err) => { + warn!("discarding poll end: {err}"); } } } diff --git a/crates/matrix-sdk-ui/src/timeline/event_item/content/mod.rs b/crates/matrix-sdk-ui/src/timeline/event_item/content/mod.rs index 9cd8321323..5fcb22f8d5 100644 --- a/crates/matrix-sdk-ui/src/timeline/event_item/content/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/event_item/content/mod.rs @@ -69,11 +69,8 @@ mod polls; pub use pinned_events::RoomPinnedEventsChange; -pub(in crate::timeline) use self::{ - message::{ - extract_bundled_edit_event_json, extract_poll_edit_content, extract_room_msg_edit_content, - }, - polls::ResponseData, +pub(in crate::timeline) use self::message::{ + extract_bundled_edit_event_json, extract_poll_edit_content, extract_room_msg_edit_content, }; pub use self::{ message::{InReplyToDetails, Message, RepliedToEvent}, diff --git a/crates/matrix-sdk-ui/src/timeline/event_item/content/polls.rs b/crates/matrix-sdk-ui/src/timeline/event_item/content/polls.rs index b758caf124..f1e225687a 100644 --- a/crates/matrix-sdk-ui/src/timeline/event_item/content/polls.rs +++ b/crates/matrix-sdk-ui/src/timeline/event_item/content/polls.rs @@ -20,14 +20,13 @@ use ruma::{ events::poll::{ compile_unstable_poll_results, start::PollKind, - unstable_response::UnstablePollResponseEventContent, unstable_start::{ NewUnstablePollStartEventContent, NewUnstablePollStartEventContentWithoutRelation, UnstablePollStartContentBlock, }, PollResponseData, }, - MilliSecondsSinceUnixEpoch, OwnedUserId, UserId, + MilliSecondsSinceUnixEpoch, OwnedUserId, }; /// Holds the state of a poll. @@ -89,30 +88,23 @@ impl PollState { } pub(crate) fn add_response( - &self, - sender: &UserId, + &mut self, + sender: OwnedUserId, timestamp: MilliSecondsSinceUnixEpoch, - content: &UnstablePollResponseEventContent, - ) -> Self { - let mut clone = self.clone(); - clone.response_data.push(ResponseData { - sender: sender.to_owned(), - timestamp, - answers: content.poll_response.answers.clone(), - }); - clone + answers: Vec, + ) { + self.response_data.push(ResponseData { sender, timestamp, answers }); } /// Marks the poll as ended. /// - /// If the poll has already ended, returns `Err(())`. - pub(crate) fn end(&self, timestamp: MilliSecondsSinceUnixEpoch) -> Result { + /// Returns false if the poll was already ended, true otherwise. + pub(crate) fn end(&mut self, timestamp: MilliSecondsSinceUnixEpoch) -> bool { if self.end_event_timestamp.is_none() { - let mut clone = self.clone(); - clone.end_event_timestamp = Some(timestamp); - Ok(clone) + self.end_event_timestamp = Some(timestamp); + true } else { - Err(()) + false } } diff --git a/crates/matrix-sdk-ui/src/timeline/event_item/mod.rs b/crates/matrix-sdk-ui/src/timeline/event_item/mod.rs index 1284fdf2b9..aeaeb90531 100644 --- a/crates/matrix-sdk-ui/src/timeline/event_item/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/event_item/mod.rs @@ -44,7 +44,6 @@ mod remote; pub(super) use self::{ content::{ extract_bundled_edit_event_json, extract_poll_edit_content, extract_room_msg_edit_content, - ResponseData, }, local::LocalEventTimelineItem, remote::{RemoteEventOrigin, RemoteEventTimelineItem}, From 140ab9ac2b05602fd8b57d358d4549d9861582d4 Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Thu, 23 Jan 2025 17:53:54 +0100 Subject: [PATCH 5/5] refactor(timeline): apply aggregation to `TimelineItemContent` [CONSIDER SQUASHING] --- .../src/timeline/controller/aggregations.rs | 38 +++++++++--- .../src/timeline/event_handler.rs | 62 +++++++++---------- 2 files changed, 59 insertions(+), 41 deletions(-) diff --git a/crates/matrix-sdk-ui/src/timeline/controller/aggregations.rs b/crates/matrix-sdk-ui/src/timeline/controller/aggregations.rs index f055effddd..4da4d5e09d 100644 --- a/crates/matrix-sdk-ui/src/timeline/controller/aggregations.rs +++ b/crates/matrix-sdk-ui/src/timeline/controller/aggregations.rs @@ -16,7 +16,7 @@ use std::collections::HashMap; use ruma::{EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedUserId}; -use crate::timeline::PollState; +use crate::timeline::{PollState, TimelineItemContent}; #[derive(Clone, Debug)] pub(crate) enum Aggregation { @@ -31,13 +31,30 @@ pub(crate) enum Aggregation { }, } +fn poll_state_from_item( + content: &mut TimelineItemContent, +) -> Result<&mut PollState, AggregationError> { + match content { + TimelineItemContent::Poll(state) => Ok(state), + c => Err(AggregationError::InvalidType { + expected: "a poll".to_owned(), + actual: c.debug_string().to_owned(), + }), + } +} + impl Aggregation { - pub fn apply_poll(&self, poll_state: &mut PollState) -> Result<(), AggregationError> { + pub fn apply(&self, content: &mut TimelineItemContent) -> Result<(), AggregationError> { match self { Aggregation::PollResponse { sender, timestamp, answers } => { - poll_state.add_response(sender.clone(), *timestamp, answers.clone()); + poll_state_from_item(content)?.add_response( + sender.clone(), + *timestamp, + answers.clone(), + ); } Aggregation::PollEnd { end_date } => { + let poll_state = poll_state_from_item(content)?; if !poll_state.end(*end_date) { return Err(AggregationError::PollAlreadyEnded); } @@ -61,18 +78,18 @@ impl Aggregations { self.stashed.entry(event_id).or_default().push(aggregation); } - pub fn apply_poll( + pub fn apply( &self, event_id: &EventId, - poll_state: &mut PollState, - ) -> Result<(), AggregationError> { + content: &mut TimelineItemContent, + ) -> Result { let Some(aggregations) = self.stashed.get(event_id) else { - return Ok(()); + return Ok(false); }; for a in aggregations { - a.apply_poll(poll_state)?; + a.apply(content)?; } - Ok(()) + Ok(true) } } @@ -80,4 +97,7 @@ impl Aggregations { pub(crate) enum AggregationError { #[error("trying to end a poll twice")] PollAlreadyEnded, + + #[error("trying to apply an aggregation of one type to an invalid target: expected {expected}, actual {actual}")] + InvalidType { expected: String, actual: String }, } diff --git a/crates/matrix-sdk-ui/src/timeline/event_handler.rs b/crates/matrix-sdk-ui/src/timeline/event_handler.rs index 767a50e22c..ac68ffefb7 100644 --- a/crates/matrix-sdk-ui/src/timeline/event_handler.rs +++ b/crates/matrix-sdk-ui/src/timeline/event_handler.rs @@ -865,49 +865,53 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> { .or(pending_edit) .unzip(); - let mut poll_state = PollState::new(c, edit_content); + let poll_state = PollState::new(c, edit_content); + let mut content = TimelineItemContent::Poll(poll_state); if let Some(event_id) = self.ctx.flow.event_id() { // Applying the cache to remote events only because local echoes // don't have an event ID that could be referenced by responses yet. - if let Err(err) = self.meta.aggregations.apply_poll(event_id, &mut poll_state) { + if let Err(err) = self.meta.aggregations.apply(event_id, &mut content) { warn!("discarding poll aggregations: {err}"); } } let edit_json = edit_json.flatten(); - self.add_item(TimelineItemContent::Poll(poll_state), edit_json); + self.add_item(content, edit_json); } fn handle_poll_response(&mut self, c: UnstablePollResponseEventContent) { let start_event_id = c.relates_to.event_id; - self.meta.aggregations.add( - start_event_id.clone(), - Aggregation::PollResponse { - sender: self.ctx.sender.clone(), - timestamp: self.ctx.timestamp, - answers: c.poll_response.answers.clone(), - }, - ); - - let Some((item_pos, item)) = rfind_event_by_id(self.items, &start_event_id) else { - return; + let aggregation = Aggregation::PollResponse { + sender: self.ctx.sender.clone(), + timestamp: self.ctx.timestamp, + answers: c.poll_response.answers, }; + self.meta.aggregations.add(start_event_id.clone(), aggregation.clone()); - let TimelineItemContent::Poll(poll_state) = item.content() else { + let Some((item_pos, item)) = rfind_event_by_id(self.items, &start_event_id) else { return; }; - let mut new_poll = poll_state.clone(); - new_poll.add_response(self.ctx.sender.clone(), self.ctx.timestamp, c.poll_response.answers); - - let new_item = item.with_content(TimelineItemContent::Poll(new_poll), None); - - trace!("Adding poll response."); - self.items.replace(item_pos, TimelineItem::new(new_item, item.internal_id.to_owned())); - self.result.items_updated += 1; + let mut new_content = item.content().clone(); + match aggregation.apply(&mut new_content) { + Ok(()) => { + trace!("adding poll response."); + self.items.replace( + item_pos, + TimelineItem::new( + item.with_content(new_content, None), + item.internal_id.clone(), + ), + ); + self.result.items_updated += 1; + } + Err(err) => { + warn!("discarding poll response: {err}"); + } + } } fn handle_poll_end(&mut self, c: UnstablePollEndEventContent) { @@ -920,21 +924,15 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> { return; }; - let TimelineItemContent::Poll(poll_state) = item.content() else { - return; - }; - - let mut poll_state = poll_state.clone(); - - match aggregation.apply_poll(&mut poll_state) { + let mut new_content = item.content().clone(); + match aggregation.apply(&mut new_content) { Ok(()) => { trace!("Ending poll."); - let new_item = item.with_content(TimelineItemContent::Poll(poll_state), None); + let new_item = item.with_content(new_content, None); self.items .replace(item_pos, TimelineItem::new(new_item, item.internal_id.to_owned())); self.result.items_updated += 1; } - Err(err) => { warn!("discarding poll end: {err}"); }