diff --git a/src/adapter/src/coord.rs b/src/adapter/src/coord.rs index 19fd3127be019..4ff0fbedad69a 100644 --- a/src/adapter/src/coord.rs +++ b/src/adapter/src/coord.rs @@ -1001,6 +1001,10 @@ pub struct ConnMeta { /// any, is cleared. drop_sinks: BTreeSet, + /// Cluster reconfigurations that will need to be + /// cleaned up when the current transaction is cleared + pending_cluster_alters: BTreeSet, + /// Channel on which to send notices to a session. #[serde(skip)] notice_tx: mpsc::UnboundedSender, diff --git a/src/adapter/src/coord/command_handler.rs b/src/adapter/src/coord/command_handler.rs index 9df9c71c485cd..6f95003babfff 100644 --- a/src/adapter/src/coord/command_handler.rs +++ b/src/adapter/src/coord/command_handler.rs @@ -314,6 +314,7 @@ impl Coordinator { secret_key, notice_tx, drop_sinks: BTreeSet::new(), + pending_cluster_alters: BTreeSet::new(), connected_at: self.now(), user, application_name, @@ -1045,6 +1046,8 @@ impl Coordinator { self.cancel_pending_peeks(&conn_id); self.cancel_pending_watchsets(&conn_id); self.cancel_compute_sinks_for_conn(&conn_id).await; + self.cancel_cluster_reconfigurations_for_conn(&conn_id) + .await; if let Some((tx, _rx)) = self.staged_cancellation.get_mut(&conn_id) { let _ = tx.send(true); } diff --git a/src/adapter/src/coord/ddl.rs b/src/adapter/src/coord/ddl.rs index 4f89942d83ff0..5f338f178c8a6 100644 --- a/src/adapter/src/coord/ddl.rs +++ b/src/adapter/src/coord/ddl.rs @@ -55,7 +55,7 @@ use serde_json::json; use tracing::{event, info_span, warn, Instrument, Level}; use crate::active_compute_sink::{ActiveComputeSink, ActiveComputeSinkRetireReason}; -use crate::catalog::{DropObjectInfo, Op, TransactionResult}; +use crate::catalog::{DropObjectInfo, Op, ReplicaCreateDropReason, TransactionResult}; use crate::coord::appends::BuiltinTableAppendNotify; use crate::coord::timeline::{TimelineContext, TimelineState}; use crate::coord::{Coordinator, ReplicaMetadata}; @@ -916,6 +916,35 @@ impl Coordinator { } } + /// Drops all pending replicas for a set of clusters + /// that are undergoing reconfiguration. + pub async fn drop_reconfiguration_replicas(&mut self, cluster_ids: BTreeSet) { + let pending_replica_drops: Vec> = cluster_ids + .iter() + .map(|c| { + self.catalog() + .get_cluster(c.clone()) + .replicas() + .filter_map(|r| match r.config.location { + ReplicaLocation::Managed(ref l) if l.pending => { + Some(DropObjectInfo::ClusterReplica(( + c.clone(), + r.replica_id, + ReplicaCreateDropReason::Manual, + ))) + } + _ => None, + }) + .collect::>() + }) + .collect(); + for replica_drops in pending_replica_drops { + self.catalog_transact(None, vec![Op::DropObjects(replica_drops)]) + .await + .unwrap_or_terminate("cannot fail to drop replicas"); + } + } + /// Cancels all active compute sinks for the identified connection. #[mz_ore::instrument(level = "debug")] pub(crate) async fn cancel_compute_sinks_for_conn(&mut self, conn_id: &ConnectionId) { @@ -923,6 +952,15 @@ impl Coordinator { .await } + /// Cancels all active cluster reconfigurations sinks for the identified connection. + #[mz_ore::instrument(level = "debug")] + pub(crate) async fn cancel_cluster_reconfigurations_for_conn( + &mut self, + conn_id: &ConnectionId, + ) { + self.retire_cluster_reconfigurations_for_conn(conn_id).await + } + /// Retires all active compute sinks for the identified connection with the /// specified reason. #[mz_ore::instrument(level = "debug")] @@ -942,6 +980,27 @@ impl Coordinator { self.retire_compute_sinks(drop_sinks).await; } + /// Cleans pending cluster reconfiguraiotns for the identified connection + #[mz_ore::instrument(level = "debug")] + pub(crate) async fn retire_cluster_reconfigurations_for_conn( + &mut self, + conn_id: &ConnectionId, + ) { + let reconfiguring_clusters = self + .active_conns + .get(conn_id) + .expect("must exist for active session") + .pending_cluster_alters + .clone(); + self.drop_reconfiguration_replicas(reconfiguring_clusters) + .await; + self.active_conns + .get_mut(conn_id) + .expect("must exist for active session") + .pending_cluster_alters + .clear(); + } + pub(crate) fn drop_storage_sinks(&mut self, sinks: Vec) { for id in &sinks { self.drop_storage_read_policy(id); diff --git a/src/adapter/src/coord/sequencer/inner/cluster.rs b/src/adapter/src/coord/sequencer/inner/cluster.rs index 093cc973689f1..b218cd6d0890f 100644 --- a/src/adapter/src/coord/sequencer/inner/cluster.rs +++ b/src/adapter/src/coord/sequencer/inner/cluster.rs @@ -74,8 +74,7 @@ impl Staged for ClusterStage { } fn cancel_enabled(&self) -> bool { - // Cluster create and alter are not yet cancelable - false + true } } @@ -245,6 +244,17 @@ impl Coordinator { .await?; return match alter_followup { NeedsFinalization::In(duration) => { + // For non backgrounded graceful alters, + // store the cluster_id in the ConnMeta + // to allow for cancellation. + // For non backgrounded graceful alters, + // store the cluster_id in the ConnMeta + // to allow for cancellation. + self.active_conns + .get_mut(session.conn_id()) + .expect("There must be an active connection") + .pending_cluster_alters + .insert(cluster_id.clone()); let span = Span::current(); Ok(StageResult::Handle(mz_ore::task::spawn( || "Finalize Alter Cluster", @@ -392,7 +402,14 @@ impl Coordinator { variant: ClusterVariant::Managed(new_config), }, }); - self.catalog_transact(Some(session), ops).await?; + self.catalog_transact(Some(session), ops.clone()).await?; + // Remove the cluster being altered from the ConnMeta + // pending_cluster_alters BTreeSet + self.active_conns + .get_mut(session.conn_id()) + .expect("There must be an active connection") + .pending_cluster_alters + .remove(&cluster_id); Ok(StageResult::Response(ExecuteResponse::AlteredObject( ObjectType::Cluster, ))) diff --git a/src/adapter/src/coord/sql.rs b/src/adapter/src/coord/sql.rs index 01ebf1f71b84b..b1da09e7830e4 100644 --- a/src/adapter/src/coord/sql.rs +++ b/src/adapter/src/coord/sql.rs @@ -201,6 +201,7 @@ impl Coordinator { self.staged_cancellation.remove(conn_id); self.retire_compute_sinks_for_conn(conn_id, ActiveComputeSinkRetireReason::Finished) .await; + self.retire_cluster_reconfigurations_for_conn(conn_id).await; // Release this transaction's compaction hold on collections. if let Some(txn_reads) = self.txn_read_holds.remove(conn_id) { diff --git a/test/cloudtest/test_managed_cluster.py b/test/cloudtest/test_managed_cluster.py index 448c78c5a7b57..da84ce1767f74 100644 --- a/test/cloudtest/test_managed_cluster.py +++ b/test/cloudtest/test_managed_cluster.py @@ -9,8 +9,15 @@ import time from textwrap import dedent +from threading import Thread +from typing import Any -from materialize.cloudtest.app.materialize_application import MaterializeApplication +from pg8000 import Connection + +from materialize.cloudtest.app.materialize_application import ( + LOGGER, + MaterializeApplication, +) from materialize.cloudtest.util.cluster import cluster_pod_name from materialize.cloudtest.util.wait import wait @@ -207,8 +214,8 @@ def assert_replica_names(names, allow_pending=False): user="mz_system", ) - from threading import Thread +def test_graceful_reconfiguration(mz: MaterializeApplication) -> None: mz.environmentd.sql( """ ALTER SYSTEM SET enable_graceful_cluster_reconfiguration = true; @@ -264,3 +271,64 @@ def gracefully_alter(): ) == (["2"],) ), "Cluster should use new config after alter completes" + + # Validate cancelation of alter cluster + mz.environmentd.sql( + """ + DROP CLUSTER IF EXISTS cluster1 CASCADE; + CREATE CLUSTER cluster1 ( SIZE = '1'); + """, + port="internal", + user="mz_system", + ) + + # We need persistent connection that we can later issue a cancel backend to + conn = mz.environmentd.sql_conn( + port="internal", + user="mz_system", + ) + + def query_with_conn(sql: str, conn: Connection) -> list[list[Any]]: + """Execute a SQL query against the service and return results.""" + with conn.cursor() as cursor: + LOGGER.info(f"> {sql}") + cursor.execute(sql) + return cursor.fetchall() + + pid = query_with_conn("select pg_backend_pid();", conn)[0][0] + thread = Thread( + target=query_with_conn, + args=[ + """ + ALTER CLUSTER cluster1 SET (SIZE = '2') WITH ( WAIT FOR '5s') + """, + conn, + ], + ) + thread.start() + time.sleep(1) + mz.environmentd.sql( + f"select pg_cancel_backend({pid});", + port="internal", + user="mz_system", + ) + time.sleep(1) + + assert ( + mz.environmentd.sql_query( + """ + SELECT mz_cluster_replicas.name, mz_cluster_replicas.pending_reconfiguration + FROM mz_cluster_replicas, mz_clusters WHERE + mz_cluster_replicas.cluster_id = mz_clusters.id AND mz_clusters.name='cluster1'; + """ + ) + == (["r1", False],) + ), "No pending replicas should be present after alter cancelation" + assert ( + mz.environmentd.sql_query( + """ + SELECT size FROM mz_clusters WHERE name='cluster1'; + """ + ) + == (["1"],) + ), "Cluster should not have updated if canceled during alter"