Skip to content

Commit

Permalink
Return some room timeline data in Sliding Sync
Browse files Browse the repository at this point in the history
  • Loading branch information
MadLittleMods committed Jun 17, 2024
1 parent a548543 commit 079194c
Show file tree
Hide file tree
Showing 4 changed files with 275 additions and 30 deletions.
202 changes: 191 additions & 11 deletions synapse/handlers/sliding_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,25 @@
#
#
import logging
from typing import TYPE_CHECKING, Dict, List, Optional, Tuple
from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple

import attr
from immutabledict import immutabledict

from synapse.api.constants import AccountDataTypes, EventTypes, Membership
from synapse.api.constants import AccountDataTypes, Direction, EventTypes, Membership
from synapse.events import EventBase
from synapse.storage.roommember import RoomsForUser
from synapse.types import (
PersistedEventPosition,
Requester,
RoomStreamToken,
StreamKeyType,
StreamToken,
UserID,
)
from synapse.types.handlers import OperationType, SlidingSyncConfig, SlidingSyncResult
from synapse.types.state import StateFilter
from synapse.types.state import StateFilter, StateKey
from synapse.visibility import filter_events_for_client

if TYPE_CHECKING:
from synapse.server import HomeServer
Expand Down Expand Up @@ -82,6 +85,18 @@ def filter_membership_for_sync(*, membership: str, user_id: str, sender: str) ->
return membership != Membership.LEAVE or sender != user_id


# We can't freeze this class because we want to update it in place with the
# de-duplicated data.
@attr.s(slots=True, auto_attribs=True)
class RoomSyncConfig:
"""
Holds the config for what data we should fetch for a room in the sync response.
"""

timeline_limit: int
required_state: Set[StateKey]


class SlidingSyncHandler:
def __init__(self, hs: "HomeServer"):
self.clock = hs.get_clock()
Expand Down Expand Up @@ -201,6 +216,7 @@ async def current_sync_for_user(

# Assemble sliding window lists
lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {}
relevant_room_map: Dict[str, RoomSyncConfig] = {}
if sync_config.lists:
# Get all of the room IDs that the user should be able to see in the sync
# response
Expand All @@ -225,29 +241,66 @@ async def current_sync_for_user(
ops: List[SlidingSyncResult.SlidingWindowList.Operation] = []
if list_config.ranges:
for range in list_config.ranges:
room_id_set = {
room_id
for room_id, _ in sorted_room_info[range[0] : range[1]]
}

ops.append(
SlidingSyncResult.SlidingWindowList.Operation(
op=OperationType.SYNC,
range=range,
room_ids=[
room_id
for room_id, _ in sorted_room_info[
range[0] : range[1]
]
],
room_ids=list(room_id_set),
)
)

# Update the relevant room map
for room_id in room_id_set:
if relevant_room_map.get(room_id) is not None:
# Take the highest timeline limit
if (
relevant_room_map[room_id].timeline_limit
< list_config.timeline_limit
):
relevant_room_map[room_id].timeline_limit = (
list_config.timeline_limit
)

# Union the required state
relevant_room_map[room_id].required_state.update(
list_config.required_state
)
else:
relevant_room_map[room_id] = RoomSyncConfig(
timeline_limit=list_config.timeline_limit,
required_state=set(list_config.required_state),
)

lists[list_key] = SlidingSyncResult.SlidingWindowList(
count=len(sorted_room_info),
ops=ops,
)

# TODO: if (sync_config.room_subscriptions):

# Fetch room data
rooms: Dict[str, SlidingSyncResult.RoomResult] = {}
for room_id, room_sync_config in relevant_room_map.items():
room_sync_result = await self.get_room_sync_data(
user=sync_config.user,
room_id=room_id,
room_sync_config=room_sync_config,
rooms_for_user_membership_at_to_token=sync_room_map[room_id],
from_token=from_token,
to_token=to_token,
)

rooms[room_id] = room_sync_result

return SlidingSyncResult(
next_pos=to_token,
lists=lists,
# TODO: Gather room data for rooms in lists and `sync_config.room_subscriptions`
rooms={},
rooms=rooms,
extensions={},
)

Expand Down Expand Up @@ -665,3 +718,130 @@ async def sort_rooms(
# We want descending order
reverse=True,
)

async def get_room_sync_data(
self,
user: UserID,
room_id: str,
room_sync_config: RoomSyncConfig,
rooms_for_user_membership_at_to_token: RoomsForUser,
from_token: Optional[StreamToken],
to_token: StreamToken,
) -> SlidingSyncResult.RoomResult:
"""
Fetch room data for a room.
We fetch data according to the token range (> `from_token` and <= `to_token`).
Args:
user: User to fetch data for
room_id: The room ID to fetch data for
room_sync_config: Config for what data we should fetch for a room in the
sync response.
rooms_for_user_membership_at_to_token: Membership information for the user
in the room at the time of `to_token`.
from_token: The point in the stream to sync from.
to_token: The point in the stream to sync up to.
"""

timeline_events: List[EventBase] = []
limited = False
# We want to use `to_token` (vs `from_token`) because we look backwards from the
# `to_token` up to the `timeline_limit` and we might not reach `from_token`
# before we hit the limit. We will update the room stream position once we've
# fetched the events.
prev_batch_token = to_token
if room_sync_config.timeline_limit > 0:
timeline_events, new_room_key = await self.store.paginate_room_events(
room_id=room_id,
# We're going to paginate backwards from the `to_token`
from_key=to_token.room_key,
to_key=from_token.room_key if from_token is not None else None,
direction=Direction.BACKWARDS,
# We add one so we can determine if there are enough events to saturate
# the limit or not (see `limited`)
limit=room_sync_config.timeline_limit + 1,
event_filter=None,
)

# We want to return the events in ascending order (the last event is the
# most recent).
timeline_events.reverse()

timeline_events = await filter_events_for_client(
self.storage_controllers,
user.to_string(),
timeline_events,
is_peeking=rooms_for_user_membership_at_to_token.membership
!= Membership.JOIN,
filter_send_to_client=True,
)

# Determine our `limited` status
if len(timeline_events) > room_sync_config.timeline_limit:
limited = True
# Get rid of that extra "+ 1" event because we only used it to determine
# if we hit the limit or not
timeline_events = timeline_events[-room_sync_config.timeline_limit :]
assert timeline_events[0].internal_metadata.stream_ordering
new_room_key = RoomStreamToken(
stream=timeline_events[0].internal_metadata.stream_ordering - 1
)

prev_batch_token = prev_batch_token.copy_and_replace(
StreamKeyType.ROOM, new_room_key
)

# Figure out any stripped state events for invite/knocks
stripped_state: List[EventBase] = []
if rooms_for_user_membership_at_to_token.membership in {
Membership.INVITE,
Membership.KNOCK,
}:
invite_or_knock_event = await self.store.get_event(
rooms_for_user_membership_at_to_token.event_id
)

stripped_state = []
if invite_or_knock_event.membership == Membership.INVITE:
stripped_state = invite_or_knock_event.unsigned.get(
"invite_room_state", []
)
elif invite_or_knock_event.membership == Membership.KNOCK:
stripped_state = invite_or_knock_event.unsigned.get(
"knock_room_state", []
)

stripped_state.append(invite_or_knock_event)

return SlidingSyncResult.RoomResult(
# TODO: Dummy value
name="TODO",
# TODO: Dummy value
avatar=None,
# TODO: Dummy value
heroes=None,
# Since we can't determine whether we've already sent a room down this
# Sliding Sync connection before (we plan to add this optimization in the
# future), we're always returning the requested room state instead of
# updates.
initial=True,
# TODO: Dummy value
required_state=[],
timeline=timeline_events,
# TODO: Dummy value
is_dm=False,
stripped_state=stripped_state,
prev_batch=prev_batch_token,
limited=limited,
# TODO: Dummy values
joined_count=0,
invited_count=0,
# TODO: These are just dummy values. We could potentially just remove these
# since notifications can only really be done correctly on the client anyway
# (encrypted rooms).
notification_count=0,
highlight_count=0,
# TODO: Dummy value
num_live=0,
)
Loading

0 comments on commit 079194c

Please sign in to comment.