Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bust _membership_stream_cache cache when current state changes #17732

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion synapse/storage/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ def process_replication_position( # noqa: B027 (no-op by design)
"""

def _invalidate_state_caches(
self, room_id: str, members_changed: Collection[str]
self,
room_id: str,
members_changed: Collection[str],
) -> None:
"""Invalidates caches that are based on the current state, but does
not stream invalidations down replication.
Expand Down
5 changes: 5 additions & 0 deletions synapse/storage/databases/main/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,8 @@ def process_replication_rows(
room_id = row.keys[0]
members_changed = set(row.keys[1:])
self._invalidate_state_caches(room_id, members_changed)
for user_id in members_changed:
self._membership_stream_cache.entity_has_changed(user_id, token) # type: ignore[attr-defined]
Copy link
Contributor Author

@MadLittleMods MadLittleMods Sep 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kinda weird to just stick this here (same with the others in process_replication_rows). Better way to organize this?

MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
elif row.cache_func == PURGE_HISTORY_CACHE_NAME:
if row.keys is None:
raise Exception(
Expand All @@ -236,6 +238,7 @@ def process_replication_rows(
room_id = row.keys[0]
self._invalidate_caches_for_room_events(room_id)
self._invalidate_caches_for_room(room_id)
Comment on lines 242 to 243
Copy link
Contributor Author

@MadLittleMods MadLittleMods Dec 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Separate from this PR and in line with the fact that these code paths need to be cleaned up, there is an obvious duplication here because we call self._invalidate_caches_for_room_events(room_id) inside self._invalidate_caches_for_room(room_id) as well.

self._membership_stream_cache.all_entities_changed(token) # type: ignore[attr-defined]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't actually insert anything into current_state_delta_stream during room deletion, so these are basically no-ops (as reading from the DB won't return anything new).

Given how frequently rooms are deleted nowadays (as we delete rooms where everyone has left after N days, at least on matrix.org) I'm minded to remove these and leave a note. Otherwise I'm worried how this will affect perf

Copy link
Contributor Author

@MadLittleMods MadLittleMods Dec 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the same logic, why do we call self._invalidate_caches_for_room(room_id) at all (just above)?

Perhaps we consider the caches which only affect the room_id to be fine but we also have plenty that clear all of the keys in there (None) which seem just as bad.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the same logic, why do we call self._invalidate_caches_for_room(room_id) at all (just above)?

We do delete a lot of stuff when deleting a room (obviously), and so those caches do contain different data than what is in the DB. So invalidation makes sense in that case?

For the current_state_delta_stream nothing actually gets inserted and so the _curr_state_delta_stream_cache cache wouldn't contain a different answer than what is in the DB

Copy link
Contributor Author

@MadLittleMods MadLittleMods Dec 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So you're saying that we don't delete from current_state_delta_stream when deleting a room. And we don't use _curr_state_delta_stream_cache for anything except when fetching current_state_delta_stream so there is no need to bust the cache at the moment in this scenario.

That looks correct:

# Other tables that we might want to consider clearing out include:
#
# - event_reports
# Given that these are intended for abuse management my initial
# inclination is to leave them in place.
#
# - current_state_delta_stream

But _membership_stream_cache is used for current_state_delta_stream and room_memberships and we do purge from room_memberships when deleting a room so it feels like we need to keep this one around.


Overall, it feels very brittle to comment out _curr_state_delta_stream_cache here because the guarantee we're assuming is bound to change in the future. I can update if you insist.

And given we're only busting the _curr_state_delta_stream_cache for that specific room_id, it doesn't feel disruptive in that case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue is that self._membership_stream_cache.all_entities_changed(token) will essentially clear that cache. Given we do now purge rooms frequently on matrix.org I worry it will have a noticeable impact on the cache hit ratio. Ideally we'd make it so that we didn't have to clear the entire cache, but that starts getting fiddly quickly. Hence trying to figure out if we really need to clear the cache.

But _membership_stream_cache is used for current_state_delta_stream and room_memberships and we do purge from room_memberships when deleting a room so it feels like we need to keep this one around.

Note that the stream change caches effectively only cache the absence of changes, i.e. "nothing has changed between tokens X and Y and so don't query the database". Hence, I think, if we remove rows from the DB we don't need to clear the cache as at worst it will report that something has changed at token X, at which point it will query the DB and discover nothing is there?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've commented out self._membership_stream_cache.all_entities_changed(token) when we delete a room and added a comment with context on why we think this is safe.

Since the self._curr_state_delta_stream_cache.entity_has_changed(room_id, token) call only invalidates the specific room being deleted, I've left be. I don't think it has the same overarching impact that the all_entities_changed variants do.

else:
self._attempt_to_invalidate_cache(row.cache_func, row.keys)

Expand Down Expand Up @@ -275,6 +278,7 @@ def _process_event_stream_row(self, token: int, row: EventsStreamRow) -> None:
self._attempt_to_invalidate_cache(
"get_sliding_sync_rooms_for_user", None
)
self._membership_stream_cache.entity_has_changed(data.state_key, token) # type: ignore[attr-defined]
elif data.type == EventTypes.RoomEncryption:
self._attempt_to_invalidate_cache(
"get_room_encryption", (data.room_id,)
Expand All @@ -291,6 +295,7 @@ def _process_event_stream_row(self, token: int, row: EventsStreamRow) -> None:
# Similar to the above, but the entire caches are invalidated. This is
# unfortunate for the membership caches, but should recover quickly.
self._curr_state_delta_stream_cache.entity_has_changed(data.room_id, token) # type: ignore[attr-defined]
self._membership_stream_cache.all_entities_changed(token) # type: ignore[attr-defined]
self._attempt_to_invalidate_cache("get_rooms_for_user", None)
self._attempt_to_invalidate_cache("get_room_type", (data.room_id,))
self._attempt_to_invalidate_cache("get_room_encryption", (data.room_id,))
Expand Down
11 changes: 11 additions & 0 deletions synapse/util/caches/stream_change_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,17 @@ def entity_has_changed(self, entity: EntityType, stream_pos: int) -> None:
self._entity_to_key[entity] = stream_pos
self._evict()

def all_entities_changed(self, stream_pos: int) -> None:
"""
Mark all entities as changed. This is useful when the cache is invalidated and
there may be some potential change for all of the entities.
"""
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
# All entities are at the same stream position now.
self._cache = SortedDict({stream_pos: set(self._entity_to_key.keys())})
self._entity_to_key = {
entity: stream_pos for entity in self._entity_to_key.keys()
}

def _evict(self) -> None:
"""
Ensure the cache has not exceeded the maximum size.
Expand Down
16 changes: 16 additions & 0 deletions tests/util/test_stream_change_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,3 +251,19 @@ def test_max_pos(self) -> None:

# Unknown entities will return None
self.assertEqual(cache.get_max_pos_of_last_change("[email protected]"), None)

def test_all_entities_changed(self) -> None:
"""
`StreamChangeCache.all_entities_changed(...)` will mark all entites as changed.
"""
cache = StreamChangeCache("#test", 1, max_size=10)

cache.entity_has_changed("[email protected]", 2)
cache.entity_has_changed("[email protected]", 3)
cache.entity_has_changed("[email protected]", 4)

cache.all_entities_changed(5)

self.assertEqual(cache.get_max_pos_of_last_change("[email protected]"), 5)
self.assertEqual(cache.get_max_pos_of_last_change("[email protected]"), 5)
self.assertEqual(cache.get_max_pos_of_last_change("[email protected]"), 5)
Loading