Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(timeline): maintain aggregations when an event is deduplicated #4576

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 103 additions & 0 deletions crates/matrix-sdk-ui/src/timeline/controller/aggregations.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Copyright 2025 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;

use ruma::{EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedUserId};

use crate::timeline::{PollState, TimelineItemContent};

#[derive(Clone, Debug)]
pub(crate) enum Aggregation {
PollResponse {
sender: OwnedUserId,
timestamp: MilliSecondsSinceUnixEpoch,
answers: Vec<String>,
},

PollEnd {
end_date: MilliSecondsSinceUnixEpoch,
},
}

fn poll_state_from_item(
content: &mut TimelineItemContent,
) -> Result<&mut PollState, AggregationError> {
match content {
TimelineItemContent::Poll(state) => Ok(state),
c => Err(AggregationError::InvalidType {
expected: "a poll".to_owned(),
actual: c.debug_string().to_owned(),
}),
}
}

impl Aggregation {
pub fn apply(&self, content: &mut TimelineItemContent) -> Result<(), AggregationError> {
match self {
Aggregation::PollResponse { sender, timestamp, answers } => {
poll_state_from_item(content)?.add_response(
sender.clone(),
*timestamp,
answers.clone(),
);
}
Aggregation::PollEnd { end_date } => {
let poll_state = poll_state_from_item(content)?;
if !poll_state.end(*end_date) {
return Err(AggregationError::PollAlreadyEnded);
}
}
}
Ok(())
}
}

#[derive(Clone, Debug, Default)]
pub(crate) struct Aggregations {
stashed: HashMap<OwnedEventId, Vec<Aggregation>>,
}

impl Aggregations {
pub fn clear(&mut self) {
self.stashed.clear();
}

pub fn add(&mut self, event_id: OwnedEventId, aggregation: Aggregation) {
self.stashed.entry(event_id).or_default().push(aggregation);
}

pub fn apply(
&self,
event_id: &EventId,
content: &mut TimelineItemContent,
) -> Result<bool, AggregationError> {
let Some(aggregations) = self.stashed.get(event_id) else {
return Ok(false);
};
for a in aggregations {
a.apply(content)?;
}
Ok(true)
}
}

#[derive(Debug, thiserror::Error)]
pub(crate) enum AggregationError {
#[error("trying to end a poll twice")]
PollAlreadyEnded,

#[error("trying to apply an aggregation of one type to an invalid target: expected {expected}, actual {actual}")]
InvalidType { expected: String, actual: String },
}
3 changes: 3 additions & 0 deletions crates/matrix-sdk-ui/src/timeline/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,13 @@ use crate::{
unable_to_decrypt_hook::UtdHookManager,
};

mod aggregations;
mod observable_items;
mod read_receipts;
mod state;

pub(super) use aggregations::*;

/// Data associated to the current timeline focus.
#[derive(Debug)]
enum TimelineFocusData<P: RoomDataProvider> {
Expand Down
110 changes: 43 additions & 67 deletions crates/matrix-sdk-ui/src/timeline/controller/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

use std::{
collections::HashMap,
future::Future,
num::NonZeroUsize,
sync::{Arc, RwLock},
Expand All @@ -28,12 +27,8 @@ use matrix_sdk::{
use ruma::events::receipt::ReceiptEventContent;
use ruma::{
events::{
poll::{
unstable_response::UnstablePollResponseEventContent,
unstable_start::NewUnstablePollStartEventContentWithoutRelation,
},
relation::Replacement,
room::message::RoomMessageEventContentWithoutRelation,
poll::unstable_start::NewUnstablePollStartEventContentWithoutRelation,
relation::Replacement, room::message::RoomMessageEventContentWithoutRelation,
AnySyncEphemeralRoomEvent, AnySyncTimelineEvent,
},
push::Action,
Expand All @@ -44,6 +39,7 @@ use ruma::{
use tracing::{debug, instrument, trace, warn};

use super::{
aggregations::Aggregations,
observable_items::{
AllRemoteEvents, ObservableItems, ObservableItemsTransaction,
ObservableItemsTransactionEntry,
Expand All @@ -60,11 +56,11 @@ use crate::{
Flow, HandleEventResult, TimelineEventContext, TimelineEventHandler, TimelineEventKind,
TimelineItemPosition,
},
event_item::{PollState, RemoteEventOrigin, ResponseData},
event_item::RemoteEventOrigin,
item::TimelineUniqueId,
reactions::Reactions,
reactions::{PendingReaction, Reactions},
traits::RoomDataProvider,
Profile, TimelineItem, TimelineItemKind,
Profile, ReactionStatus, TimelineItem, TimelineItemKind,
},
unable_to_decrypt_hook::UtdHookManager,
};
Expand Down Expand Up @@ -794,7 +790,39 @@ impl TimelineStateTransaction<'_> {
if let Some(event_meta) = self.items.all_remote_events().get(event_index) {
// Fetch the `timeline_item_index` associated to the remote event.
if let Some(timeline_item_index) = event_meta.timeline_item_index {
let _removed_timeline_item = self.items.remove(timeline_item_index);
let removed_timeline_item = self.items.remove(timeline_item_index);

// Restash reactions, if there were any.
if let Some(event_item) = removed_timeline_item.as_event() {
if let Some(event_id) = event_item.event_id() {
let target_event = event_id.to_owned();

for (key, user_to_reaction) in event_item.reactions().iter() {
for (user_id, reaction_info) in user_to_reaction {
let reaction_event = match &reaction_info.status {
ReactionStatus::LocalToLocal(_)
| ReactionStatus::LocalToRemote(_) => continue,
ReactionStatus::RemoteToRemote(event) => event,
};

let pending_reaction = PendingReaction {
key: key.to_owned(),
sender_id: user_id.to_owned(),
timestamp: reaction_info.timestamp,
};

self.meta
.reactions
.pending
.entry(target_event.clone())
.or_default()
.insert(reaction_event.to_owned(), pending_reaction);
}
}
}
}

// TODO restash poll responses/ends, edits. etc.?
}

// Now we can remove the remote event.
Expand Down Expand Up @@ -937,58 +965,6 @@ impl TimelineStateTransaction<'_> {
}
}

/// Cache holding poll response and end events handled before their poll start
/// event has been handled.
#[derive(Clone, Debug, Default)]
pub(in crate::timeline) struct PendingPollEvents {
/// Responses to a poll (identified by the poll's start event id).
responses: HashMap<OwnedEventId, Vec<ResponseData>>,

/// Mapping of a poll (identified by its start event's id) to its end date.
end_dates: HashMap<OwnedEventId, MilliSecondsSinceUnixEpoch>,
}

impl PendingPollEvents {
pub(crate) fn add_response(
&mut self,
start_event_id: &EventId,
sender: &UserId,
timestamp: MilliSecondsSinceUnixEpoch,
content: &UnstablePollResponseEventContent,
) {
self.responses.entry(start_event_id.to_owned()).or_default().push(ResponseData {
sender: sender.to_owned(),
timestamp,
answers: content.poll_response.answers.clone(),
});
}

pub(crate) fn clear(&mut self) {
self.end_dates.clear();
self.responses.clear();
}

/// Mark a poll as finished by inserting its poll date.
pub(crate) fn mark_as_ended(
&mut self,
start_event_id: &EventId,
timestamp: MilliSecondsSinceUnixEpoch,
) {
self.end_dates.insert(start_event_id.to_owned(), timestamp);
}

/// Dumps all response and end events present in the cache that belong to
/// the given start_event_id into the given poll_state.
pub(crate) fn apply_pending(&mut self, start_event_id: &EventId, poll_state: &mut PollState) {
if let Some(pending_responses) = self.responses.remove(start_event_id) {
poll_state.response_data.extend(pending_responses);
}
if let Some(pending_end) = self.end_dates.remove(start_event_id) {
poll_state.end_event_timestamp = Some(pending_end);
}
}
}

#[derive(Clone)]
pub(in crate::timeline) enum PendingEditKind {
RoomMessage(Replacement<RoomMessageEventContentWithoutRelation>),
Expand Down Expand Up @@ -1064,8 +1040,8 @@ pub(in crate::timeline) struct TimelineMetadata {
/// stashing pending reactions.
pub reactions: Reactions,

/// Associated poll events received before their original poll start event.
pub pending_poll_events: PendingPollEvents,
/// Aggregation metadata and pending aggregations.
pub aggregations: Aggregations,

/// Edit events received before the related event they're editing.
pub pending_edits: RingBuffer<PendingEdit>,
Expand Down Expand Up @@ -1104,7 +1080,7 @@ impl TimelineMetadata {
own_user_id,
next_internal_id: Default::default(),
reactions: Default::default(),
pending_poll_events: Default::default(),
aggregations: Default::default(),
pending_edits: RingBuffer::new(MAX_NUM_STASHED_PENDING_EDITS),
fully_read_event: Default::default(),
// It doesn't make sense to set this to false until we fill the `fully_read_event`
Expand All @@ -1122,7 +1098,7 @@ impl TimelineMetadata {
// Note: we don't clear the next internal id to avoid bad cases of stale unique
// ids across timeline clears.
self.reactions.clear();
self.pending_poll_events.clear();
self.aggregations.clear();
self.pending_edits.clear();
self.fully_read_event = None;
// We forgot about the fully read marker right above, so wait for a new one
Expand Down
Loading
Loading