From 5ac76e87867299401b6287804aead58a7d63b760 Mon Sep 17 00:00:00 2001 From: Jan Teske Date: Wed, 31 Jan 2024 10:31:17 +0100 Subject: [PATCH] Add a feature flag for aggressive read hold downgrades The flag is on by default and meant as a measure for derisking the rollout to prod, so we can easily revert back to the old behavior if we encounter issues. --- src/adapter/src/coord.rs | 3 +++ src/compute-client/src/controller.rs | 12 ++++++++++++ src/compute-client/src/controller/instance.rs | 9 ++++++++- src/controller/src/lib.rs | 5 +++++ src/sql/src/session/vars.rs | 7 +++++++ 5 files changed, 35 insertions(+), 1 deletion(-) diff --git a/src/adapter/src/coord.rs b/src/adapter/src/coord.rs index 1f0b22400886b..05d914e284409 100644 --- a/src/adapter/src/coord.rs +++ b/src/adapter/src/coord.rs @@ -1389,6 +1389,7 @@ impl Coordinator { let scheduling_config = flags::orchestrator_scheduling_config(system_config); let merge_effort = system_config.default_idle_arrangement_merge_effort(); let exert_prop = system_config.default_arrangement_exert_proportionality(); + let aggressive_downgrades = system_config.enable_compute_aggressive_readhold_downgrades(); self.controller.compute.update_configuration(compute_config); self.controller.storage.update_parameters(storage_config); self.controller @@ -1397,6 +1398,8 @@ impl Coordinator { .set_default_idle_arrangement_merge_effort(merge_effort); self.controller .set_default_arrangement_exert_proportionality(exert_prop); + self.controller + .set_enable_compute_aggressive_readhold_downgrades(aggressive_downgrades); let mut policies_to_set: BTreeMap = Default::default(); diff --git a/src/compute-client/src/controller.rs b/src/compute-client/src/controller.rs index cf7e640efd700..9160e3a11415c 100644 --- a/src/compute-client/src/controller.rs +++ b/src/compute-client/src/controller.rs @@ -138,6 +138,12 @@ pub struct ComputeController { response_rx: crossbeam_channel::Receiver>, /// Response sender that's passed to new `Instance`s. response_tx: crossbeam_channel::Sender>, + + /// Whether to aggressively downgrade read holds for sink dataflows. + /// + /// This flag exists to derisk the rollout of the aggressive downgrading approach. + /// TODO(teskje): Remove this after a couple weeks. + enable_aggressive_readhold_downgrades: bool, } impl ComputeController { @@ -162,6 +168,7 @@ impl ComputeController { introspection: Introspection::new(), response_rx, response_tx, + enable_aggressive_readhold_downgrades: true, } } @@ -241,6 +248,10 @@ impl ComputeController { self.default_arrangement_exert_proportionality = value; } + pub fn set_enable_aggressive_readhold_downgrades(&mut self, value: bool) { + self.enable_aggressive_readhold_downgrades = value; + } + /// Returns the read and write frontiers for each collection. pub fn collection_frontiers(&self) -> BTreeMap, Antichain)> { let collections = self.instances.values().flat_map(|i| i.collections_iter()); @@ -290,6 +301,7 @@ where self.metrics.for_instance(id), self.response_tx.clone(), self.introspection.tx.clone(), + self.enable_aggressive_readhold_downgrades, ), ); diff --git a/src/compute-client/src/controller/instance.rs b/src/compute-client/src/controller/instance.rs index 226d3c08a804a..e9a0db414edd9 100644 --- a/src/compute-client/src/controller/instance.rs +++ b/src/compute-client/src/controller/instance.rs @@ -166,6 +166,11 @@ pub(super) struct Instance { replica_epochs: BTreeMap, /// The registry the controller uses to report metrics. metrics: InstanceMetrics, + /// Whether to aggressively downgrade read holds for sink dataflows. + /// + /// This flag exists to derisk the rollout of the aggressive downgrading approach. + /// TODO(teskje): Remove this after a couple weeks. + enable_aggressive_readhold_downgrades: bool, } impl Instance { @@ -219,7 +224,7 @@ impl Instance { let mut state = CollectionState::new(as_of.clone(), storage_dependencies, compute_dependencies); // If the collection is write-only, clear its read policy to reflect that. - if write_only { + if write_only && self.enable_aggressive_readhold_downgrades { state.read_policy = None; } self.collections.insert(id, state); @@ -490,6 +495,7 @@ where metrics: InstanceMetrics, response_tx: crossbeam_channel::Sender>, introspection_tx: crossbeam_channel::Sender, + enable_aggressive_readhold_downgrades: bool, ) -> Self { let collections = arranged_logs .iter() @@ -514,6 +520,7 @@ where envd_epoch, replica_epochs: Default::default(), metrics, + enable_aggressive_readhold_downgrades, }; instance.send(ComputeCommand::CreateTimely { diff --git a/src/controller/src/lib.rs b/src/controller/src/lib.rs index 0da02090e4dc9..a8f492267c095 100644 --- a/src/controller/src/lib.rs +++ b/src/controller/src/lib.rs @@ -184,6 +184,11 @@ impl Controller { .set_default_arrangement_exert_proportionality(value); } + pub fn set_enable_compute_aggressive_readhold_downgrades(&mut self, value: bool) { + self.compute + .set_enable_aggressive_readhold_downgrades(value); + } + /// Returns the connection context installed in the controller. /// /// This is purely a helper, and can be obtained from `self.storage`. diff --git a/src/sql/src/session/vars.rs b/src/sql/src/session/vars.rs index 0ce704156cb2f..9759657149056 100644 --- a/src/sql/src/session/vars.rs +++ b/src/sql/src/session/vars.rs @@ -2153,6 +2153,13 @@ feature_flags!( internal: true, enable_for_item_parsing: false, }, + { + name: enable_compute_aggressive_readhold_downgrades, + desc: "let the compute controller aggressively downgrade read holds for sink dataflows", + default: true, + internal: true, + enable_for_item_parsing: false, + }, ); /// Returns a new ConfigSet containing every `Config` in Materialize.