Skip to content

Commit

Permalink
add cancelation for graceful reconfiguration
Browse files Browse the repository at this point in the history
  • Loading branch information
jubrad committed Jul 14, 2024
1 parent de57b2b commit cad2501
Show file tree
Hide file tree
Showing 6 changed files with 158 additions and 6 deletions.
4 changes: 4 additions & 0 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1001,6 +1001,10 @@ pub struct ConnMeta {
/// any, is cleared.
drop_sinks: BTreeSet<GlobalId>,

/// Cluster reconfigurations that will need to be
/// cleaned up when the current transaction is cleared
pending_cluster_alters: BTreeSet<ClusterId>,

/// Channel on which to send notices to a session.
#[serde(skip)]
notice_tx: mpsc::UnboundedSender<AdapterNotice>,
Expand Down
3 changes: 3 additions & 0 deletions src/adapter/src/coord/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ impl Coordinator {
secret_key,
notice_tx,
drop_sinks: BTreeSet::new(),
pending_cluster_alters: BTreeSet::new(),
connected_at: self.now(),
user,
application_name,
Expand Down Expand Up @@ -1045,6 +1046,8 @@ impl Coordinator {
self.cancel_pending_peeks(&conn_id);
self.cancel_pending_watchsets(&conn_id);
self.cancel_compute_sinks_for_conn(&conn_id).await;
self.cancel_cluster_reconfigurations_for_conn(&conn_id)
.await;
if let Some((tx, _rx)) = self.staged_cancellation.get_mut(&conn_id) {
let _ = tx.send(true);
}
Expand Down
61 changes: 60 additions & 1 deletion src/adapter/src/coord/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ use serde_json::json;
use tracing::{event, info_span, warn, Instrument, Level};

use crate::active_compute_sink::{ActiveComputeSink, ActiveComputeSinkRetireReason};
use crate::catalog::{DropObjectInfo, Op, TransactionResult};
use crate::catalog::{DropObjectInfo, Op, ReplicaCreateDropReason, TransactionResult};
use crate::coord::appends::BuiltinTableAppendNotify;
use crate::coord::timeline::{TimelineContext, TimelineState};
use crate::coord::{Coordinator, ReplicaMetadata};
Expand Down Expand Up @@ -916,13 +916,51 @@ impl Coordinator {
}
}

/// Drops all pending replicas for a set of clusters
/// that are undergoing reconfiguration.
pub async fn drop_reconfiguration_replicas(&mut self, cluster_ids: BTreeSet<ClusterId>) {
let pending_replica_drops: Vec<Vec<DropObjectInfo>> = cluster_ids
.iter()
.map(|c| {
self.catalog()
.get_cluster(c.clone())
.replicas()
.filter_map(|r| match r.config.location {
ReplicaLocation::Managed(ref l) if l.pending => {
Some(DropObjectInfo::ClusterReplica((
c.clone(),
r.replica_id,
ReplicaCreateDropReason::Manual,
)))
}
_ => None,
})
.collect::<Vec<DropObjectInfo>>()
})
.collect();
for replica_drops in pending_replica_drops {
self.catalog_transact(None, vec![Op::DropObjects(replica_drops)])
.await
.unwrap_or_terminate("cannot fail to drop replicas");
}
}

/// Cancels all active compute sinks for the identified connection.
#[mz_ore::instrument(level = "debug")]
pub(crate) async fn cancel_compute_sinks_for_conn(&mut self, conn_id: &ConnectionId) {
self.retire_compute_sinks_for_conn(conn_id, ActiveComputeSinkRetireReason::Canceled)
.await
}

/// Cancels all active cluster reconfigurations sinks for the identified connection.
#[mz_ore::instrument(level = "debug")]
pub(crate) async fn cancel_cluster_reconfigurations_for_conn(
&mut self,
conn_id: &ConnectionId,
) {
self.retire_cluster_reconfigurations_for_conn(conn_id).await
}

/// Retires all active compute sinks for the identified connection with the
/// specified reason.
#[mz_ore::instrument(level = "debug")]
Expand All @@ -942,6 +980,27 @@ impl Coordinator {
self.retire_compute_sinks(drop_sinks).await;
}

/// Cleans pending cluster reconfiguraiotns for the identified connection
#[mz_ore::instrument(level = "debug")]
pub(crate) async fn retire_cluster_reconfigurations_for_conn(
&mut self,
conn_id: &ConnectionId,
) {
let reconfiguring_clusters = self
.active_conns
.get(conn_id)
.expect("must exist for active session")
.pending_cluster_alters
.clone();
self.drop_reconfiguration_replicas(reconfiguring_clusters)
.await;
self.active_conns
.get_mut(conn_id)
.expect("must exist for active session")
.pending_cluster_alters
.clear();
}

pub(crate) fn drop_storage_sinks(&mut self, sinks: Vec<GlobalId>) {
for id in &sinks {
self.drop_storage_read_policy(id);
Expand Down
23 changes: 20 additions & 3 deletions src/adapter/src/coord/sequencer/inner/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@ impl Staged for ClusterStage {
}

fn cancel_enabled(&self) -> bool {
// Cluster create and alter are not yet cancelable
false
true
}
}

Expand Down Expand Up @@ -245,6 +244,17 @@ impl Coordinator {
.await?;
return match alter_followup {
NeedsFinalization::In(duration) => {
// For non backgrounded graceful alters,
// store the cluster_id in the ConnMeta
// to allow for cancellation.
// For non backgrounded graceful alters,
// store the cluster_id in the ConnMeta
// to allow for cancellation.
self.active_conns
.get_mut(session.conn_id())
.expect("There must be an active connection")
.pending_cluster_alters
.insert(cluster_id.clone());
let span = Span::current();
Ok(StageResult::Handle(mz_ore::task::spawn(
|| "Finalize Alter Cluster",
Expand Down Expand Up @@ -392,7 +402,14 @@ impl Coordinator {
variant: ClusterVariant::Managed(new_config),
},
});
self.catalog_transact(Some(session), ops).await?;
self.catalog_transact(Some(session), ops.clone()).await?;
// Remove the cluster being altered from the ConnMeta
// pending_cluster_alters BTreeSet
self.active_conns
.get_mut(session.conn_id())
.expect("There must be an active connection")
.pending_cluster_alters
.remove(&cluster_id);
Ok(StageResult::Response(ExecuteResponse::AlteredObject(
ObjectType::Cluster,
)))
Expand Down
1 change: 1 addition & 0 deletions src/adapter/src/coord/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ impl Coordinator {
self.staged_cancellation.remove(conn_id);
self.retire_compute_sinks_for_conn(conn_id, ActiveComputeSinkRetireReason::Finished)
.await;
self.retire_cluster_reconfigurations_for_conn(conn_id).await;

// Release this transaction's compaction hold on collections.
if let Some(txn_reads) = self.txn_read_holds.remove(conn_id) {
Expand Down
72 changes: 70 additions & 2 deletions test/cloudtest/test_managed_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,15 @@

import time
from textwrap import dedent
from threading import Thread
from typing import Any

from materialize.cloudtest.app.materialize_application import MaterializeApplication
from pg8000 import Connection

from materialize.cloudtest.app.materialize_application import (
LOGGER,
MaterializeApplication,
)
from materialize.cloudtest.util.cluster import cluster_pod_name
from materialize.cloudtest.util.wait import wait

Expand Down Expand Up @@ -207,8 +214,8 @@ def assert_replica_names(names, allow_pending=False):
user="mz_system",
)

from threading import Thread

def test_graceful_reconfiguration(mz: MaterializeApplication) -> None:
mz.environmentd.sql(
"""
ALTER SYSTEM SET enable_graceful_cluster_reconfiguration = true;
Expand Down Expand Up @@ -264,3 +271,64 @@ def gracefully_alter():
)
== (["2"],)
), "Cluster should use new config after alter completes"

# Validate cancelation of alter cluster
mz.environmentd.sql(
"""
DROP CLUSTER IF EXISTS cluster1 CASCADE;
CREATE CLUSTER cluster1 ( SIZE = '1');
""",
port="internal",
user="mz_system",
)

# We need persistent connection that we can later issue a cancel backend to
conn = mz.environmentd.sql_conn(
port="internal",
user="mz_system",
)

def query_with_conn(sql: str, conn: Connection) -> list[list[Any]]:
"""Execute a SQL query against the service and return results."""
with conn.cursor() as cursor:
LOGGER.info(f"> {sql}")
cursor.execute(sql)
return cursor.fetchall()

pid = query_with_conn("select pg_backend_pid();", conn)[0][0]
thread = Thread(
target=query_with_conn,
args=[
"""
ALTER CLUSTER cluster1 SET (SIZE = '2') WITH ( WAIT FOR '5s')
""",
conn,
],
)
thread.start()
time.sleep(1)
mz.environmentd.sql(
f"select pg_cancel_backend({pid});",
port="internal",
user="mz_system",
)
time.sleep(1)

assert (
mz.environmentd.sql_query(
"""
SELECT mz_cluster_replicas.name, mz_cluster_replicas.pending_reconfiguration
FROM mz_cluster_replicas, mz_clusters WHERE
mz_cluster_replicas.cluster_id = mz_clusters.id AND mz_clusters.name='cluster1';
"""
)
== (["r1", False],)
), "No pending replicas should be present after alter cancelation"
assert (
mz.environmentd.sql_query(
"""
SELECT size FROM mz_clusters WHERE name='cluster1';
"""
)
== (["1"],)
), "Cluster should not have updated if canceled during alter"

0 comments on commit cad2501

Please sign in to comment.