Skip to content

Commit

Permalink
Merge pull request #24487 from teskje/downgrade-readholds-aggressive
Browse files Browse the repository at this point in the history
compute-client: aggressively downgrade read holds for sink dataflows
  • Loading branch information
teskje authored Feb 1, 2024
2 parents 932f8dd + 5ac76e8 commit 6ca7078
Show file tree
Hide file tree
Showing 11 changed files with 384 additions and 124 deletions.
11 changes: 8 additions & 3 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1389,6 +1389,7 @@ impl Coordinator {
let scheduling_config = flags::orchestrator_scheduling_config(system_config);
let merge_effort = system_config.default_idle_arrangement_merge_effort();
let exert_prop = system_config.default_arrangement_exert_proportionality();
let aggressive_downgrades = system_config.enable_compute_aggressive_readhold_downgrades();
self.controller.compute.update_configuration(compute_config);
self.controller.storage.update_parameters(storage_config);
self.controller
Expand All @@ -1397,6 +1398,8 @@ impl Coordinator {
.set_default_idle_arrangement_merge_effort(merge_effort);
self.controller
.set_default_arrangement_exert_proportionality(exert_prop);
self.controller
.set_enable_compute_aggressive_readhold_downgrades(aggressive_downgrades);

let mut policies_to_set: BTreeMap<CompactionWindow, CollectionIdBundle> =
Default::default();
Expand Down Expand Up @@ -2644,20 +2647,22 @@ impl Coordinator {
}

/// Call into the compute controller to install a finalized dataflow, and
/// initialize the read policies for its exported objects.
/// initialize the read policies for its exported readable objects.
pub(crate) async fn ship_dataflow(
&mut self,
dataflow: DataflowDescription<Plan>,
instance: ComputeInstanceId,
) {
let export_ids = dataflow.export_ids().collect();
// We must only install read policies for indexes, not for sinks.
// Sinks are write-only compute collections that don't have read policies.
let index_ids = dataflow.exported_index_ids().collect();

self.controller
.active_compute()
.create_dataflow(instance, dataflow)
.unwrap_or_terminate("dataflow creation cannot fail");

self.initialize_compute_read_policies(export_ids, instance, CompactionWindow::Default)
self.initialize_compute_read_policies(index_ids, instance, CompactionWindow::Default)
.await;
}
}
Expand Down
28 changes: 10 additions & 18 deletions src/adapter/src/coord/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -758,18 +758,14 @@ impl Coordinator {
continue;
}

if self.drop_compute_read_policy(&sink.global_id) {
by_cluster
.entry(sink.cluster_id)
.or_default()
.push(sink.global_id);

// Mark the sink as dropped so we don't try to drop it again.
if let Some(sink) = self.active_subscribes.get_mut(&sink.global_id) {
sink.dropping = true;
}
} else {
tracing::error!("Instructed to drop a compute sink that isn't one");
by_cluster
.entry(sink.cluster_id)
.or_default()
.push(sink.global_id);

// Mark the sink as dropped so we don't try to drop it again.
if let Some(sink) = self.active_subscribes.get_mut(&sink.global_id) {
sink.dropping = true;
}
}
let mut compute = self.controller.active_compute();
Expand Down Expand Up @@ -817,12 +813,8 @@ impl Coordinator {
let mut by_cluster: BTreeMap<_, Vec<_>> = BTreeMap::new();
let mut source_ids = Vec::new();
for (cluster_id, id) in mviews {
if self.drop_compute_read_policy(&id) {
by_cluster.entry(cluster_id).or_default().push(id);
source_ids.push(id);
} else {
tracing::error!("Instructed to drop a materialized view that isn't one");
}
by_cluster.entry(cluster_id).or_default().push(id);
source_ids.push(id);
}

// Drop compute sinks.
Expand Down
13 changes: 12 additions & 1 deletion src/adapter/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
use std::fmt::Debug;

use mz_compute_client::controller::error::{
CollectionUpdateError, DataflowCreationError, InstanceMissing, PeekError, SubscribeTargetError,
CollectionUpdateError, DataflowCreationError, InstanceMissing, PeekError, ReadPolicyError,
SubscribeTargetError,
};
use mz_controller_types::ClusterId;
use mz_ore::tracing::OpenTelemetryContext;
Expand Down Expand Up @@ -378,6 +379,16 @@ impl ShouldHalt for PeekError {
}
}

impl ShouldHalt for ReadPolicyError {
fn should_halt(&self) -> bool {
match self {
ReadPolicyError::InstanceMissing(_)
| ReadPolicyError::CollectionMissing(_)
| ReadPolicyError::WriteOnlyCollection(_) => false,
}
}
}

impl ShouldHalt for SubscribeTargetError {
fn should_halt(&self) -> bool {
match self {
Expand Down
29 changes: 24 additions & 5 deletions src/compute-client/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ use uuid::Uuid;

use crate::controller::error::{
CollectionLookupError, CollectionMissing, CollectionUpdateError, DataflowCreationError,
InstanceExists, InstanceMissing, PeekError, ReplicaCreationError, ReplicaDropError,
SubscribeTargetError,
InstanceExists, InstanceMissing, PeekError, ReadPolicyError, ReplicaCreationError,
ReplicaDropError, SubscribeTargetError,
};
use crate::controller::instance::{ActiveInstance, Instance};
use crate::controller::replica::ReplicaConfig;
Expand Down Expand Up @@ -138,6 +138,12 @@ pub struct ComputeController<T> {
response_rx: crossbeam_channel::Receiver<ComputeControllerResponse<T>>,
/// Response sender that's passed to new `Instance`s.
response_tx: crossbeam_channel::Sender<ComputeControllerResponse<T>>,

/// Whether to aggressively downgrade read holds for sink dataflows.
///
/// This flag exists to derisk the rollout of the aggressive downgrading approach.
/// TODO(teskje): Remove this after a couple weeks.
enable_aggressive_readhold_downgrades: bool,
}

impl<T: Timestamp> ComputeController<T> {
Expand All @@ -162,6 +168,7 @@ impl<T: Timestamp> ComputeController<T> {
introspection: Introspection::new(),
response_rx,
response_tx,
enable_aggressive_readhold_downgrades: true,
}
}

Expand Down Expand Up @@ -241,6 +248,10 @@ impl<T: Timestamp> ComputeController<T> {
self.default_arrangement_exert_proportionality = value;
}

pub fn set_enable_aggressive_readhold_downgrades(&mut self, value: bool) {
self.enable_aggressive_readhold_downgrades = value;
}

/// Returns the read and write frontiers for each collection.
pub fn collection_frontiers(&self) -> BTreeMap<GlobalId, (Antichain<T>, Antichain<T>)> {
let collections = self.instances.values().flat_map(|i| i.collections_iter());
Expand Down Expand Up @@ -290,6 +301,7 @@ where
self.metrics.for_instance(id),
self.response_tx.clone(),
self.introspection.tx.clone(),
self.enable_aggressive_readhold_downgrades,
),
);

Expand Down Expand Up @@ -566,11 +578,14 @@ where
/// capability is already ahead of it.
///
/// Identifiers not present in `policies` retain their existing read policies.
///
/// It is an error to attempt to set a read policy for a collection that is not readable in the
/// context of compute. At this time, only indexes are readable compute collections.
pub fn set_read_policy(
&mut self,
instance_id: ComputeInstanceId,
policies: Vec<(GlobalId, ReadPolicy<T>)>,
) -> Result<(), CollectionUpdateError> {
) -> Result<(), ReadPolicyError> {
self.instance(instance_id)?.set_read_policy(policies)?;
Ok(())
}
Expand Down Expand Up @@ -674,7 +689,11 @@ pub struct CollectionState<T> {
/// The implicit capability associated with collection creation.
implied_capability: Antichain<T>,
/// The policy to use to downgrade `self.implied_capability`.
read_policy: ReadPolicy<T>,
///
/// If `None`, the collection is a write-only collection (i.e. a sink). For write-only
/// collections, the `implied_capability` is only required for maintaining read holds on the
/// inputs, so we can immediately downgrade it to the `write_frontier`.
read_policy: Option<ReadPolicy<T>>,

/// Storage identifiers on which this collection depends.
storage_dependencies: Vec<GlobalId>,
Expand Down Expand Up @@ -731,7 +750,7 @@ impl<T: Timestamp> CollectionState<T> {
dropped: false,
read_capabilities,
implied_capability: since.clone(),
read_policy: ReadPolicy::ValidFrom(since),
read_policy: Some(ReadPolicy::ValidFrom(since)),
storage_dependencies,
compute_dependencies,
write_frontier: upper,
Expand Down
27 changes: 27 additions & 0 deletions src/compute-client/src/controller/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,33 @@ impl From<CollectionMissing> for CollectionUpdateError {
}
}

/// Errors arising during collection read policy assignment.
#[derive(Error, Debug)]
pub enum ReadPolicyError {
#[error("instance does not exist: {0}")]
InstanceMissing(ComputeInstanceId),
#[error("collection does not exist: {0}")]
CollectionMissing(GlobalId),
#[error("collection is write-only: {0}")]
WriteOnlyCollection(GlobalId),
}

impl From<InstanceMissing> for ReadPolicyError {
fn from(error: InstanceMissing) -> Self {
Self::InstanceMissing(error.0)
}
}

impl From<instance::ReadPolicyError> for ReadPolicyError {
fn from(error: instance::ReadPolicyError) -> Self {
use instance::ReadPolicyError::*;
match error {
CollectionMissing(id) => Self::CollectionMissing(id),
WriteOnlyCollection(id) => Self::WriteOnlyCollection(id),
}
}
}

// Errors arising during subscribe target assignment.
#[derive(Error, Debug)]
pub enum SubscribeTargetError {
Expand Down
Loading

0 comments on commit 6ca7078

Please sign in to comment.