From c0eb76df0c6e3ee8eeefccd1b36e123f828de0e0 Mon Sep 17 00:00:00 2001 From: Jan Teske Date: Tue, 6 Feb 2024 12:16:27 +0100 Subject: [PATCH] compute: fix advancement of replica-targeted subscribe frontiers This commit changes how the compute controller handles frontier updates for replica-targeted subscribes. Previously, each replica could advance the global frontier of a subscribe, targeted or not. With this commit, only the targeted replica can cause a global frontier advancement. This fixes the following error scenario: * A non-targeted replica advances to the empty frontier. * The compute controller downgrades the subscribe's global frontier to the empty frontier. * The compute controller sends an `AllowCompaction([])` command to all replicas. * The targeted replica receives the `AllowCompaction([])` and considers the subscribe dropped. * The compute controller never receives a subscribe response from the targeted replica. --- src/compute-client/src/controller/instance.rs | 60 ++++++++++++------- test/testdrive/replica-targeting.td | 33 ++++++++++ 2 files changed, 70 insertions(+), 23 deletions(-) diff --git a/src/compute-client/src/controller/instance.rs b/src/compute-client/src/controller/instance.rs index ae1234757a6ba..f95da6a3d4162 100644 --- a/src/compute-client/src/controller/instance.rs +++ b/src/compute-client/src/controller/instance.rs @@ -1199,6 +1199,25 @@ where /// Panics if any of the `updates` references an absent collection. /// Panics if any of the `updates` regresses an existing write frontier. #[tracing::instrument(level = "debug", skip(self))] + fn update_write_frontiers( + &mut self, + replica_id: ReplicaId, + updates: &[(GlobalId, Antichain)], + ) { + // Apply advancements of replica frontiers. + self.update_replica_write_frontiers(replica_id, updates); + + // Apply advancements of global collection frontiers. + self.maybe_update_global_write_frontiers(updates); + } + + /// Apply replica write frontier updates. + /// + /// # Panics + /// + /// Panics if any of the `updates` references an absent collection. + /// Panics if any of the `updates` regresses an existing replica write frontier. + #[tracing::instrument(level = "debug", skip(self))] fn update_replica_write_frontiers( &mut self, replica_id: ReplicaId, @@ -1239,17 +1258,6 @@ where self.storage_controller .update_read_capabilities(&mut storage_read_capability_changes); } - - // Apply advancements of global collection frontiers to the controller state. - let global_updates: Vec<_> = updates - .iter() - .filter(|(id, new_upper)| { - let collection = self.compute.expect_collection(*id); - PartialOrder::less_than(&collection.write_frontier, new_upper) - }) - .cloned() - .collect(); - self.update_global_write_frontiers(&global_updates); } /// Remove frontier tracking state for the given replica. @@ -1277,12 +1285,13 @@ where /// Apply global write frontier updates. /// + /// Frontier regressions are gracefully ignored. + /// /// # Panics /// /// Panics if any of the `updates` references an absent collection. - /// Panics if any of the `updates` regresses an existing write frontier. #[tracing::instrument(level = "debug", skip(self))] - fn update_global_write_frontiers(&mut self, updates: &[(GlobalId, Antichain)]) { + fn maybe_update_global_write_frontiers(&mut self, updates: &[(GlobalId, Antichain)]) { // Compute and apply read capability downgrades that result from collection frontier // advancements. let mut read_capability_changes = BTreeMap::new(); @@ -1292,11 +1301,9 @@ where let old_upper = std::mem::replace(&mut collection.write_frontier, new_upper.clone()); let old_since = &collection.implied_capability; - // Safety check against frontier regressions. - assert!( - PartialOrder::less_equal(&old_upper, new_upper), - "global frontier regression: {old_upper:?} -> {new_upper:?}, collection={id}", - ); + if !PartialOrder::less_than(&old_upper, new_upper) { + continue; // frontier has not advanced + } let new_since = match &collection.read_policy { Some(read_policy) => { @@ -1495,7 +1502,7 @@ where self.compute .update_hydration_status(id, replica_id, &new_frontier); - self.update_replica_write_frontiers(replica_id, &[(id, new_frontier)]); + self.update_write_frontiers(replica_id, &[(id, new_frontier)]); } fn handle_peek_response( @@ -1541,9 +1548,9 @@ where return None; } - // Always apply write frontier updates. Even if the subscribe is not tracked anymore, there - // might still be replicas reading from its inputs, so we need to track the frontiers until - // all replicas have advanced to the empty one. + // Always apply replica write frontier updates. Even if the subscribe is not tracked + // anymore, there might still be replicas reading from its inputs, so we need to track the + // frontiers until all replicas have advanced to the empty one. let write_frontier = match &response { SubscribeResponse::Batch(batch) => batch.upper.clone(), SubscribeResponse::DroppedAt(_) => Antichain::new(), @@ -1551,7 +1558,7 @@ where self.compute .update_hydration_status(subscribe_id, replica_id, &write_frontier); - self.update_replica_write_frontiers(replica_id, &[(subscribe_id, write_frontier)]); + self.update_replica_write_frontiers(replica_id, &[(subscribe_id, write_frontier.clone())]); // If the subscribe is not tracked, or targets a different replica, there is nothing to do. let mut subscribe = self.compute.subscribes.get(&subscribe_id)?.clone(); @@ -1560,6 +1567,13 @@ where return None; } + // Apply a global frontier update. + // If this is a replica-targeted subscribe, it is important that we advance the global + // frontier only based on responses from the targeted replica. Otherwise, another replica + // could advance to the empty frontier, making us drop the subscribe on the targeted + // replica prematurely. + self.maybe_update_global_write_frontiers(&[(subscribe_id, write_frontier)]); + match response { SubscribeResponse::Batch(batch) => { let upper = batch.upper; diff --git a/test/testdrive/replica-targeting.td b/test/testdrive/replica-targeting.td index 4234fefc0d308..2ec8852e5f40b 100644 --- a/test/testdrive/replica-targeting.td +++ b/test/testdrive/replica-targeting.td @@ -73,3 +73,36 @@ $ set-regex match=\d{13} replacement= > FETCH c 1 4 > COMMIT + +# Test that replica-targeted subscribes work when the subscribed collection +# advances to the empty frontier. Regression test for #24981. + +> DROP CLUSTER test CASCADE +> CREATE CLUSTER test SIZE '4-4', REPLICATION FACTOR 4 +> SET cluster_replica = r1 + +> CREATE MATERIALIZED VIEW mv AS SELECT 1 + +> BEGIN +> DECLARE c CURSOR FOR SUBSCRIBE mv +> FETCH c + 1 1 +> COMMIT + +# We want to provoke the case where a non-targeted replica returns a response +# first, so try multiple times to be sure. + +> BEGIN +> DECLARE c CURSOR FOR SUBSCRIBE mv +> FETCH c + 1 1 +> COMMIT + +> BEGIN +> DECLARE c CURSOR FOR SUBSCRIBE mv +> FETCH c + 1 1 +> COMMIT + +# Cleanup +> DROP CLUSTER test CASCADE