Skip to content

Commit

Permalink
add cancelation for graceful reconfiguration
Browse files Browse the repository at this point in the history
  • Loading branch information
jubrad committed Aug 14, 2024
1 parent 6c300bd commit cf965d9
Show file tree
Hide file tree
Showing 6 changed files with 237 additions and 91 deletions.
4 changes: 4 additions & 0 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1021,6 +1021,10 @@ pub struct ConnMeta {
#[serde(skip)]
deferred_lock: Option<OwnedMutexGuard<()>>,

/// Cluster reconfigurations that will need to be
/// cleaned up when the current transaction is cleared
pending_cluster_alters: BTreeSet<ClusterId>,

/// Channel on which to send notices to a session.
#[serde(skip)]
notice_tx: mpsc::UnboundedSender<AdapterNotice>,
Expand Down
3 changes: 3 additions & 0 deletions src/adapter/src/coord/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ impl Coordinator {
secret_key,
notice_tx,
drop_sinks: BTreeSet::new(),
pending_cluster_alters: BTreeSet::new(),
connected_at: self.now(),
user,
application_name,
Expand Down Expand Up @@ -1214,6 +1215,8 @@ impl Coordinator {
self.cancel_pending_peeks(&conn_id);
self.cancel_pending_watchsets(&conn_id);
self.cancel_compute_sinks_for_conn(&conn_id).await;
self.cancel_cluster_reconfigurations_for_conn(&conn_id)
.await;
if let Some((tx, _rx)) = self.staged_cancellation.get_mut(&conn_id) {
let _ = tx.send(true);
}
Expand Down
62 changes: 61 additions & 1 deletion src/adapter/src/coord/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ use serde_json::json;
use tracing::{event, info_span, warn, Instrument, Level};

use crate::active_compute_sink::{ActiveComputeSink, ActiveComputeSinkRetireReason};
use crate::catalog::{DropObjectInfo, Op, TransactionResult};
use crate::catalog::{DropObjectInfo, Op, ReplicaCreateDropReason, TransactionResult};
use crate::coord::appends::BuiltinTableAppendNotify;
use crate::coord::timeline::{TimelineContext, TimelineState};
use crate::coord::{Coordinator, ReplicaMetadata};
Expand Down Expand Up @@ -929,13 +929,51 @@ impl Coordinator {
}
}

/// Drops all pending replicas for a set of clusters
/// that are undergoing reconfiguration.
pub async fn drop_reconfiguration_replicas(&mut self, cluster_ids: BTreeSet<ClusterId>) {
let pending_replica_drops_by_cluster: Vec<Vec<DropObjectInfo>> = cluster_ids
.iter()
.map(|c| {
self.catalog()
.get_cluster(c.clone())
.replicas()
.filter_map(|r| match r.config.location {
ReplicaLocation::Managed(ref l) if l.pending => {
Some(DropObjectInfo::ClusterReplica((
c.clone(),
r.replica_id,
ReplicaCreateDropReason::Manual,
)))
}
_ => None,
})
.collect::<Vec<DropObjectInfo>>()
})
.collect();
for cluster_replicas in pending_replica_drops_by_cluster {
self.catalog_transact(None, vec![Op::DropObjects(cluster_replicas)])
.await
.unwrap_or_terminate("cannot fail to drop replicas");
}
}

/// Cancels all active compute sinks for the identified connection.
#[mz_ore::instrument(level = "debug")]
pub(crate) async fn cancel_compute_sinks_for_conn(&mut self, conn_id: &ConnectionId) {
self.retire_compute_sinks_for_conn(conn_id, ActiveComputeSinkRetireReason::Canceled)
.await
}

/// Cancels all active cluster reconfigurations sinks for the identified connection.
#[mz_ore::instrument(level = "debug")]
pub(crate) async fn cancel_cluster_reconfigurations_for_conn(
&mut self,
conn_id: &ConnectionId,
) {
self.retire_cluster_reconfigurations_for_conn(conn_id).await
}

/// Retires all active compute sinks for the identified connection with the
/// specified reason.
#[mz_ore::instrument(level = "debug")]
Expand All @@ -955,6 +993,28 @@ impl Coordinator {
self.retire_compute_sinks(drop_sinks).await;
}

/// Cleans pending cluster reconfiguraiotns for the identified connection
#[mz_ore::instrument(level = "debug")]
pub(crate) async fn retire_cluster_reconfigurations_for_conn(
&mut self,
conn_id: &ConnectionId,
) {
let reconfiguring_clusters = self
.active_conns
.get(conn_id)
.expect("must exist for active session")
.pending_cluster_alters
.clone();
self.drop_reconfiguration_replicas(reconfiguring_clusters)
.await;

self.active_conns
.get_mut(conn_id)
.expect("must exist for active session")
.pending_cluster_alters
.clear();
}

pub(crate) fn drop_storage_sinks(&mut self, sinks: Vec<GlobalId>) {
for id in &sinks {
self.drop_storage_read_policy(id);
Expand Down
18 changes: 16 additions & 2 deletions src/adapter/src/coord/sequencer/inner/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ impl Staged for ClusterStage {
}

fn cancel_enabled(&self) -> bool {
// Cluster create and alter are not yet cancelable
false
true
}
}

Expand Down Expand Up @@ -245,6 +244,14 @@ impl Coordinator {
.await?;
return match alter_followup {
FinalizationNeeded::In(duration) => {
// For non backgrounded graceful alters,
// store the cluster_id in the ConnMeta
// to allow for cancellation.
self.active_conns
.get_mut(session.conn_id())
.expect("There must be an active connection")
.pending_cluster_alters
.insert(cluster_id.clone());
let span = Span::current();
let new_config_managed = new_config_managed.clone();
Ok(StageResult::Handle(mz_ore::task::spawn(
Expand Down Expand Up @@ -410,6 +417,13 @@ impl Coordinator {
},
});
self.catalog_transact(Some(session), ops).await?;
// Remove the cluster being altered from the ConnMeta
// pending_cluster_alters BTreeSet
self.active_conns
.get_mut(session.conn_id())
.expect("There must be an active connection")
.pending_cluster_alters
.remove(&cluster_id);
Ok(StageResult::Response(ExecuteResponse::AlteredObject(
ObjectType::Cluster,
)))
Expand Down
1 change: 1 addition & 0 deletions src/adapter/src/coord/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ impl Coordinator {
self.staged_cancellation.remove(conn_id);
self.retire_compute_sinks_for_conn(conn_id, ActiveComputeSinkRetireReason::Finished)
.await;
self.retire_cluster_reconfigurations_for_conn(conn_id).await;

// Release this transaction's compaction hold on collections.
if let Some(txn_reads) = self.txn_read_holds.remove(conn_id) {
Expand Down
Loading

0 comments on commit cf965d9

Please sign in to comment.