From aab3672037ee9ea71c54c89c86d9c0cc35ae184f Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 8 Jan 2025 10:11:09 -0600 Subject: [PATCH] Bust `_membership_stream_cache` cache when current state changes (#17732) This is particularly a problem in a state reset scenario where the membership might change without a corresponding event. This PR is targeting a scenario where a state reset happens which causes room membership to change. Previously, the cache would just hold onto stale data and now we properly bust the cache in this scenario. We have a few tests for these scenarios which you can see are now fixed because we can remove the `FIXME` where we were previously manually busting the cache in the test itself. This is a general Synapse thing so by it's nature it helps out Sliding Sync. Fix https://github.com/element-hq/synapse/issues/17368 Prerequisite for https://github.com/element-hq/synapse/issues/17929 --- Match when are busting `_curr_state_delta_stream_cache` --- changelog.d/17732.bugfix | 1 + synapse/storage/_base.py | 4 ++- synapse/storage/databases/main/cache.py | 36 +++++++++++++++++++ synapse/storage/databases/main/events.py | 15 +++++++- synapse/util/caches/stream_change_cache.py | 9 +++++ .../client/sliding_sync/test_sliding_sync.py | 18 ---------- tests/storage/test_stream.py | 6 ---- tests/util/test_stream_change_cache.py | 25 +++++++++++++ 8 files changed, 88 insertions(+), 26 deletions(-) create mode 100644 changelog.d/17732.bugfix diff --git a/changelog.d/17732.bugfix b/changelog.d/17732.bugfix new file mode 100644 index 00000000000..572c13fc573 --- /dev/null +++ b/changelog.d/17732.bugfix @@ -0,0 +1 @@ +Fix membership caches not updating in state reset scenarios. diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index e14d711c764..7251e72e3ad 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -86,7 +86,9 @@ def process_replication_position( # noqa: B027 (no-op by design) """ def _invalidate_state_caches( - self, room_id: str, members_changed: Collection[str] + self, + room_id: str, + members_changed: Collection[str], ) -> None: """Invalidates caches that are based on the current state, but does not stream invalidations down replication. diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index 707d18de78a..f364464c23a 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -219,6 +219,11 @@ def process_replication_rows( room_id = row.keys[0] members_changed = set(row.keys[1:]) self._invalidate_state_caches(room_id, members_changed) + self._curr_state_delta_stream_cache.entity_has_changed( # type: ignore[attr-defined] + room_id, token + ) + for user_id in members_changed: + self._membership_stream_cache.entity_has_changed(user_id, token) # type: ignore[attr-defined] elif row.cache_func == PURGE_HISTORY_CACHE_NAME: if row.keys is None: raise Exception( @@ -236,6 +241,35 @@ def process_replication_rows( room_id = row.keys[0] self._invalidate_caches_for_room_events(room_id) self._invalidate_caches_for_room(room_id) + self._curr_state_delta_stream_cache.entity_has_changed( # type: ignore[attr-defined] + room_id, token + ) + # Note: This code is commented out to improve cache performance. + # While uncommenting would provide complete correctness, our + # automatic forgotten room purge logic (see + # `forgotten_room_retention_period`) means this would frequently + # clear the entire cache (effectively) and probably have a noticable + # impact on the cache hit ratio. + # + # Not updating the cache here is safe because: + # + # 1. `_membership_stream_cache` is only used to indicate the + # *absence* of changes, i.e. "nothing has changed between tokens + # X and Y and so return early and don't query the database". + # 2. `_membership_stream_cache` is used when we query data from + # `current_state_delta_stream` and `room_memberships` but since + # nothing new is written to the database for those tables when + # purging/deleting a room (only deleting rows), there is nothing + # changed to care about. + # + # At worst, the cache might indicate a change at token X, at which + # point, we will query the database and discover nothing is there. + # + # Ideally, we would make it so that we could clear the cache on a + # more granular level but that's a bit complex and fiddly to do with + # room membership. + # + # self._membership_stream_cache.all_entities_changed(token) # type: ignore[attr-defined] else: self._attempt_to_invalidate_cache(row.cache_func, row.keys) @@ -275,6 +309,7 @@ def _process_event_stream_row(self, token: int, row: EventsStreamRow) -> None: self._attempt_to_invalidate_cache( "get_sliding_sync_rooms_for_user", None ) + self._membership_stream_cache.entity_has_changed(data.state_key, token) # type: ignore[attr-defined] elif data.type == EventTypes.RoomEncryption: self._attempt_to_invalidate_cache( "get_room_encryption", (data.room_id,) @@ -291,6 +326,7 @@ def _process_event_stream_row(self, token: int, row: EventsStreamRow) -> None: # Similar to the above, but the entire caches are invalidated. This is # unfortunate for the membership caches, but should recover quickly. self._curr_state_delta_stream_cache.entity_has_changed(data.room_id, token) # type: ignore[attr-defined] + self._membership_stream_cache.all_entities_changed(token) # type: ignore[attr-defined] self._attempt_to_invalidate_cache("get_rooms_for_user", None) self._attempt_to_invalidate_cache("get_room_type", (data.room_id,)) self._attempt_to_invalidate_cache("get_room_encryption", (data.room_id,)) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index dd6ac909e9f..a23aaf50962 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -1605,7 +1605,13 @@ def _update_current_state_txn( room_id delta_state: Deltas that are going to be used to update the `current_state_events` table. Changes to the current state of the room. - stream_id: TODO + stream_id: This is expected to be the minimum `stream_ordering` for the + batch of events that we are persisting; which means we do not end up in a + situation where workers see events before the `current_state_delta` updates. + FIXME: However, this function also gets called with next upcoming + `stream_ordering` when we re-sync the state of a partial stated room (see + `update_current_state(...)`) which may be "correct" but it would be good to + nail down what exactly is the expected value here. sliding_sync_table_changes: Changes to the `sliding_sync_membership_snapshots` and `sliding_sync_joined_rooms` tables derived from the given `delta_state` (see @@ -1908,6 +1914,13 @@ def _update_current_state_txn( stream_id, ) + for user_id in members_to_cache_bust: + txn.call_after( + self.store._membership_stream_cache.entity_has_changed, + user_id, + stream_id, + ) + # Invalidate the various caches self.store._invalidate_state_caches_and_stream( txn, room_id, members_to_cache_bust diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index 03503abe0f6..5ac8643eefc 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -314,6 +314,15 @@ def entity_has_changed(self, entity: EntityType, stream_pos: int) -> None: self._entity_to_key[entity] = stream_pos self._evict() + def all_entities_changed(self, stream_pos: int) -> None: + """ + Mark all entities as changed. This is useful when the cache is invalidated and + there may be some potential change for all of the entities. + """ + self._cache.clear() + self._entity_to_key.clear() + self._earliest_known_stream_pos = stream_pos + def _evict(self) -> None: """ Ensure the cache has not exceeded the maximum size. diff --git a/tests/rest/client/sliding_sync/test_sliding_sync.py b/tests/rest/client/sliding_sync/test_sliding_sync.py index 578cb384cd7..f3cf2111ec1 100644 --- a/tests/rest/client/sliding_sync/test_sliding_sync.py +++ b/tests/rest/client/sliding_sync/test_sliding_sync.py @@ -1169,12 +1169,6 @@ def test_state_reset_room_comes_down_incremental_sync(self) -> None: self.persistence.persist_event(join_rule_event, join_rule_context) ) - # FIXME: We're manually busting the cache since - # https://github.com/element-hq/synapse/issues/17368 is not solved yet - self.store._membership_stream_cache.entity_has_changed( - user1_id, join_rule_event_pos.stream - ) - # Ensure that the state reset worked and only user2 is in the room now users_in_room = self.get_success(self.store.get_users_in_room(room_id1)) self.assertIncludes(set(users_in_room), {user2_id}, exact=True) @@ -1322,12 +1316,6 @@ def test_state_reset_previously_room_comes_down_incremental_sync_with_filters( self.persistence.persist_event(join_rule_event, join_rule_context) ) - # FIXME: We're manually busting the cache since - # https://github.com/element-hq/synapse/issues/17368 is not solved yet - self.store._membership_stream_cache.entity_has_changed( - user1_id, join_rule_event_pos.stream - ) - # Ensure that the state reset worked and only user2 is in the room now users_in_room = self.get_success(self.store.get_users_in_room(space_room_id)) self.assertIncludes(set(users_in_room), {user2_id}, exact=True) @@ -1506,12 +1494,6 @@ def test_state_reset_never_room_incremental_sync_with_filters( self.persistence.persist_event(join_rule_event, join_rule_context) ) - # FIXME: We're manually busting the cache since - # https://github.com/element-hq/synapse/issues/17368 is not solved yet - self.store._membership_stream_cache.entity_has_changed( - user1_id, join_rule_event_pos.stream - ) - # Ensure that the state reset worked and only user2 is in the room now users_in_room = self.get_success(self.store.get_users_in_room(space_room_id)) self.assertIncludes(set(users_in_room), {user2_id}, exact=True) diff --git a/tests/storage/test_stream.py b/tests/storage/test_stream.py index 38a56419f37..0f58dc8a0a6 100644 --- a/tests/storage/test_stream.py +++ b/tests/storage/test_stream.py @@ -1209,12 +1209,6 @@ def test_state_reset2(self) -> None: self.persistence.persist_event(join_rule_event, join_rule_context) ) - # FIXME: We're manually busting the cache since - # https://github.com/element-hq/synapse/issues/17368 is not solved yet - self.store._membership_stream_cache.entity_has_changed( - user1_id, join_rule_event_pos.stream - ) - after_reset_token = self.event_sources.get_current_token() membership_changes = self.get_success( diff --git a/tests/util/test_stream_change_cache.py b/tests/util/test_stream_change_cache.py index c41f5706af5..9254bff79b5 100644 --- a/tests/util/test_stream_change_cache.py +++ b/tests/util/test_stream_change_cache.py @@ -255,3 +255,28 @@ def test_max_pos(self) -> None: # Unknown entities will return None self.assertEqual(cache.get_max_pos_of_last_change("not@here.website"), None) + + def test_all_entities_changed(self) -> None: + """ + `StreamChangeCache.all_entities_changed(...)` will mark all entites as changed. + """ + cache = StreamChangeCache("#test", 1, max_size=10) + + cache.entity_has_changed("user@foo.com", 2) + cache.entity_has_changed("bar@baz.net", 3) + cache.entity_has_changed("user@elsewhere.org", 4) + + cache.all_entities_changed(5) + + # Everything should be marked as changed before the stream position where the + # change occurred. + self.assertTrue(cache.has_entity_changed("user@foo.com", 4)) + self.assertTrue(cache.has_entity_changed("bar@baz.net", 4)) + self.assertTrue(cache.has_entity_changed("user@elsewhere.org", 4)) + + # Nothing should be marked as changed at/after the stream position where the + # change occurred. In other words, nothing has changed since the stream position + # 5. + self.assertFalse(cache.has_entity_changed("user@foo.com", 5)) + self.assertFalse(cache.has_entity_changed("bar@baz.net", 5)) + self.assertFalse(cache.has_entity_changed("user@elsewhere.org", 5))