Skip to content

Commit

Permalink
Add a feature flag for aggressive read hold downgrades
Browse files Browse the repository at this point in the history
The flag is on by default and meant as a measure for derisking the
rollout to prod, so we can easily revert back to the old behavior if we
encounter issues.
  • Loading branch information
teskje committed Feb 1, 2024
1 parent de34308 commit 5ac76e8
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 1 deletion.
3 changes: 3 additions & 0 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1389,6 +1389,7 @@ impl Coordinator {
let scheduling_config = flags::orchestrator_scheduling_config(system_config);
let merge_effort = system_config.default_idle_arrangement_merge_effort();
let exert_prop = system_config.default_arrangement_exert_proportionality();
let aggressive_downgrades = system_config.enable_compute_aggressive_readhold_downgrades();
self.controller.compute.update_configuration(compute_config);
self.controller.storage.update_parameters(storage_config);
self.controller
Expand All @@ -1397,6 +1398,8 @@ impl Coordinator {
.set_default_idle_arrangement_merge_effort(merge_effort);
self.controller
.set_default_arrangement_exert_proportionality(exert_prop);
self.controller
.set_enable_compute_aggressive_readhold_downgrades(aggressive_downgrades);

let mut policies_to_set: BTreeMap<CompactionWindow, CollectionIdBundle> =
Default::default();
Expand Down
12 changes: 12 additions & 0 deletions src/compute-client/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,12 @@ pub struct ComputeController<T> {
response_rx: crossbeam_channel::Receiver<ComputeControllerResponse<T>>,
/// Response sender that's passed to new `Instance`s.
response_tx: crossbeam_channel::Sender<ComputeControllerResponse<T>>,

/// Whether to aggressively downgrade read holds for sink dataflows.
///
/// This flag exists to derisk the rollout of the aggressive downgrading approach.
/// TODO(teskje): Remove this after a couple weeks.
enable_aggressive_readhold_downgrades: bool,
}

impl<T: Timestamp> ComputeController<T> {
Expand All @@ -162,6 +168,7 @@ impl<T: Timestamp> ComputeController<T> {
introspection: Introspection::new(),
response_rx,
response_tx,
enable_aggressive_readhold_downgrades: true,
}
}

Expand Down Expand Up @@ -241,6 +248,10 @@ impl<T: Timestamp> ComputeController<T> {
self.default_arrangement_exert_proportionality = value;
}

pub fn set_enable_aggressive_readhold_downgrades(&mut self, value: bool) {
self.enable_aggressive_readhold_downgrades = value;
}

/// Returns the read and write frontiers for each collection.
pub fn collection_frontiers(&self) -> BTreeMap<GlobalId, (Antichain<T>, Antichain<T>)> {
let collections = self.instances.values().flat_map(|i| i.collections_iter());
Expand Down Expand Up @@ -290,6 +301,7 @@ where
self.metrics.for_instance(id),
self.response_tx.clone(),
self.introspection.tx.clone(),
self.enable_aggressive_readhold_downgrades,
),
);

Expand Down
9 changes: 8 additions & 1 deletion src/compute-client/src/controller/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,11 @@ pub(super) struct Instance<T> {
replica_epochs: BTreeMap<ReplicaId, u64>,
/// The registry the controller uses to report metrics.
metrics: InstanceMetrics,
/// Whether to aggressively downgrade read holds for sink dataflows.
///
/// This flag exists to derisk the rollout of the aggressive downgrading approach.
/// TODO(teskje): Remove this after a couple weeks.
enable_aggressive_readhold_downgrades: bool,
}

impl<T: Timestamp> Instance<T> {
Expand Down Expand Up @@ -219,7 +224,7 @@ impl<T: Timestamp> Instance<T> {
let mut state =
CollectionState::new(as_of.clone(), storage_dependencies, compute_dependencies);
// If the collection is write-only, clear its read policy to reflect that.
if write_only {
if write_only && self.enable_aggressive_readhold_downgrades {
state.read_policy = None;
}
self.collections.insert(id, state);
Expand Down Expand Up @@ -490,6 +495,7 @@ where
metrics: InstanceMetrics,
response_tx: crossbeam_channel::Sender<ComputeControllerResponse<T>>,
introspection_tx: crossbeam_channel::Sender<IntrospectionUpdates>,
enable_aggressive_readhold_downgrades: bool,
) -> Self {
let collections = arranged_logs
.iter()
Expand All @@ -514,6 +520,7 @@ where
envd_epoch,
replica_epochs: Default::default(),
metrics,
enable_aggressive_readhold_downgrades,
};

instance.send(ComputeCommand::CreateTimely {
Expand Down
5 changes: 5 additions & 0 deletions src/controller/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,11 @@ impl<T: Timestamp> Controller<T> {
.set_default_arrangement_exert_proportionality(value);
}

pub fn set_enable_compute_aggressive_readhold_downgrades(&mut self, value: bool) {
self.compute
.set_enable_aggressive_readhold_downgrades(value);
}

/// Returns the connection context installed in the controller.
///
/// This is purely a helper, and can be obtained from `self.storage`.
Expand Down
7 changes: 7 additions & 0 deletions src/sql/src/session/vars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2153,6 +2153,13 @@ feature_flags!(
internal: true,
enable_for_item_parsing: false,
},
{
name: enable_compute_aggressive_readhold_downgrades,
desc: "let the compute controller aggressively downgrade read holds for sink dataflows",
default: true,
internal: true,
enable_for_item_parsing: false,
},
);

/// Returns a new ConfigSet containing every `Config` in Materialize.
Expand Down

0 comments on commit 5ac76e8

Please sign in to comment.