diff --git a/src/adapter/src/active_compute_sink.rs b/src/adapter/src/active_compute_sink.rs index d413923338d8e..ab60993b75220 100644 --- a/src/adapter/src/active_compute_sink.rs +++ b/src/adapter/src/active_compute_sink.rs @@ -13,6 +13,7 @@ use std::cmp::Ordering; use std::collections::BTreeSet; use std::iter; +use anyhow::anyhow; use itertools::Itertools; use mz_adapter_types::connection::ConnectionId; use mz_compute_client::protocol::response::SubscribeBatch; @@ -48,14 +49,14 @@ impl ActiveComputeSink { pub fn connection_id(&self) -> &ConnectionId { match &self { ActiveComputeSink::Subscribe(subscribe) => &subscribe.conn_id, - ActiveComputeSink::CopyTo(copy_to) => ©_to.conn_id, + ActiveComputeSink::CopyTo(copy_to) => copy_to.ctx.session().conn_id(), } } pub fn user(&self) -> &User { match &self { ActiveComputeSink::Subscribe(subscribe) => &subscribe.user, - ActiveComputeSink::CopyTo(copy_to) => ©_to.user, + ActiveComputeSink::CopyTo(copy_to) => copy_to.ctx.session().user(), } } @@ -65,6 +66,34 @@ impl ActiveComputeSink { ActiveComputeSink::CopyTo(copy_to) => ©_to.depends_on, } } + + /// Consumes the `ActiveComputeSink` and sends a response if required. + pub fn drop_with_reason(self, reason: ComputeSinkRemovalReason) { + match self { + ActiveComputeSink::Subscribe(subscribe) => { + let message = match reason { + ComputeSinkRemovalReason::Finished => return, + ComputeSinkRemovalReason::Canceled => PeekResponseUnary::Canceled, + ComputeSinkRemovalReason::DependencyDropped(d) => PeekResponseUnary::Error( + format!("subscribe has been terminated because underlying {d} was dropped"), + ), + }; + subscribe.send(message); + } + ActiveComputeSink::CopyTo(copy_to) => { + let message = match reason { + ComputeSinkRemovalReason::Finished => return, + ComputeSinkRemovalReason::Canceled => Err(AdapterError::Canceled), + ComputeSinkRemovalReason::DependencyDropped(d) => { + Err(AdapterError::Unstructured(anyhow!( + "copy has been terminated because underlying {d} was dropped" + ))) + } + }; + copy_to.process_response(message); + } + } + } } /// A description of an active subscribe from coord's perspective @@ -363,13 +392,9 @@ pub enum ComputeSinkRemovalReason { /// A description of an active copy to from coord's perspective. #[derive(Debug)] pub struct ActiveCopyTo { - /// The user of the session that created the subscribe. - pub user: User, - /// The connection id of the session that created the subscribe. - pub conn_id: ConnectionId, /// The context about the COPY TO statement getting executed. /// Used to eventually call `ctx.retire` on. - pub ctx: Option, + pub ctx: ExecuteContext, /// The cluster that the copy to is running on. pub cluster_id: ClusterId, /// All `GlobalId`s that the copy to's expression depends on. @@ -377,12 +402,7 @@ pub struct ActiveCopyTo { } impl ActiveCopyTo { - pub(crate) fn process_response(&mut self, response: Result) { - // TODO(mouli): refactor so that process_response takes `self` instead of `&mut self`. Then, - // we can make `ctx` not optional, and get rid of the `user` and `conn_id` as well. - // Using an option to get an owned value after take to call `retire` on it. - if let Some(ctx) = self.ctx.take() { - let _ = ctx.retire(response); - } + pub(crate) fn process_response(self, response: Result) { + let _ = self.ctx.retire(response); } } diff --git a/src/adapter/src/coord/ddl.rs b/src/adapter/src/coord/ddl.rs index 1c4a40fd21e30..92646d644bb66 100644 --- a/src/adapter/src/coord/ddl.rs +++ b/src/adapter/src/coord/ddl.rs @@ -44,6 +44,7 @@ use mz_sql::session::vars::{ use mz_storage_client::controller::ExportDescription; use mz_storage_types::connections::inline::IntoInlineConnection; use mz_storage_types::controller::StorageError; +use mz_storage_types::instances::StorageInstanceId; use mz_storage_types::read_policy::ReadPolicy; use mz_storage_types::sources::GenericSourceConnection; use serde_json::json; @@ -557,7 +558,8 @@ impl Coordinator { self.drop_storage_sinks(storage_sinks_to_drop); } if !compute_sinks_to_drop.is_empty() { - self.drop_compute_sinks(compute_sinks_to_drop).await; + self.drop_compute_sinks_with_reason(compute_sinks_to_drop) + .await; } if !peeks_to_drop.is_empty() { for (dropped_name, uuid) in peeks_to_drop { @@ -738,21 +740,17 @@ impl Coordinator { } } - pub(crate) async fn drop_compute_sinks( + /// This method drops the compute sink, by calling `compute.drop_collections`. + /// This should only be called after removing the corresponding sink entry from + /// `self.active_compute_sinks`. + pub(crate) fn drop_compute_sinks( &mut self, - sinks: impl IntoIterator, + sink_and_cluster_ids: impl IntoIterator, ) { let mut by_cluster: BTreeMap<_, Vec<_>> = BTreeMap::new(); - for (sink_id, reason) in sinks { - let cluster_id = match self.active_compute_sinks.get(&sink_id) { - None => { - tracing::error!(%sink_id, "drop_compute_sinks called on nonexistent sink"); - continue; - } - Some(s) => s.cluster_id(), - }; - - self.remove_active_sink(sink_id, reason.clone()).await; + for (sink_id, cluster_id) in sink_and_cluster_ids { + // The active sink should have already been removed from the state + assert!(!self.active_compute_sinks.contains_key(&sink_id)); if !self .controller @@ -780,6 +778,25 @@ impl Coordinator { } } + /// Drops active compute sinks and might send appropriate response back + /// depending upon the reason. + pub(crate) async fn drop_compute_sinks_with_reason( + &mut self, + sinks: impl IntoIterator, + ) { + let mut sink_cluster_id_map = BTreeMap::new(); + for (sink_id, reason) in sinks { + if let Some(sink) = self.remove_active_sink(sink_id).await { + sink_cluster_id_map.insert(sink_id, sink.cluster_id()); + sink.drop_with_reason(reason); + } else { + tracing::error!(%sink_id, "drop_compute_sinks called on nonexistent sink"); + continue; + } + } + self.drop_compute_sinks(sink_cluster_id_map.into_iter()); + } + pub(crate) fn drop_storage_sinks(&mut self, sinks: Vec) { for id in &sinks { self.drop_storage_read_policy(id); diff --git a/src/adapter/src/coord/message_handler.rs b/src/adapter/src/coord/message_handler.rs index 9d2bdaa47ea82..2dcecebbe4fd7 100644 --- a/src/adapter/src/coord/message_handler.rs +++ b/src/adapter/src/coord/message_handler.rs @@ -349,7 +349,7 @@ impl Coordinator { Some(ActiveComputeSink::Subscribe(active_subscribe)) => { let finished = active_subscribe.process_response(response); if finished { - self.drop_compute_sinks([( + self.drop_compute_sinks_with_reason([( sink_id, ComputeSinkRemovalReason::Finished, )]) @@ -362,23 +362,15 @@ impl Coordinator { } } ControllerResponse::CopyToResponse(sink_id, response) => { - match self.active_compute_sinks.get_mut(&sink_id) { + match self.remove_active_sink(sink_id).await { Some(ActiveComputeSink::CopyTo(active_copy_to)) => { let response = match response { Ok(n) => Ok(ExecuteResponse::Copied(usize::cast_from(n))), Err(error) => Err(AdapterError::Unstructured(error)), }; - // Ideally it would be better to get an owned active_copy_to here - // and have `process_response` take a `self` instead of `&mut self` - // and consume the object along with the `ctx` it holds. - // But if we remove the entry from `active_compute_sinks` here, - // the following `drop_compute_sinks` will not drop the compute sinks - // since it does expect the entry there. - // TODO (mouli): refactor `drop_compute_sinks` so that we can get - // an owned value here. + let sink_and_cluster_id = (sink_id, active_copy_to.cluster_id); active_copy_to.process_response(response); - self.drop_compute_sinks([(sink_id, ComputeSinkRemovalReason::Finished)]) - .await; + self.drop_compute_sinks([sink_and_cluster_id]); } _ => { tracing::error!(%sink_id, "received CopyToResponse for nonexistent copy to"); diff --git a/src/adapter/src/coord/sequencer/inner/peek.rs b/src/adapter/src/coord/sequencer/inner/peek.rs index 2f8db3018bc81..8d56c3aa9c46c 100644 --- a/src/adapter/src/coord/sequencer/inner/peek.rs +++ b/src/adapter/src/coord/sequencer/inner/peek.rs @@ -842,9 +842,7 @@ impl Coordinator { // Callback for the active copy to. let active_copy_to = ActiveCopyTo { - user: ctx.session().user().clone(), - conn_id: ctx.session().conn_id().clone(), - ctx: Some(ctx), + ctx, cluster_id, depends_on: validity.dependency_ids.clone(), }; diff --git a/src/adapter/src/coord/sql.rs b/src/adapter/src/coord/sql.rs index b4ec75f6177c4..eb6731e744680 100644 --- a/src/adapter/src/coord/sql.rs +++ b/src/adapter/src/coord/sql.rs @@ -10,7 +10,6 @@ //! Various utility methods used by the [`Coordinator`]. Ideally these are all //! put in more meaningfully named modules. -use anyhow::anyhow; use mz_adapter_types::connection::ConnectionId; use mz_ore::now::EpochMillis; use mz_repr::{GlobalId, ScalarType}; @@ -25,7 +24,7 @@ use crate::coord::appends::BuiltinTableAppendNotify; use crate::coord::Coordinator; use crate::session::{Session, TransactionStatus}; use crate::util::describe; -use crate::{metrics, AdapterError, ExecuteContext, ExecuteResponse, PeekResponseUnary}; +use crate::{metrics, AdapterError, ExecuteContext, ExecuteResponse}; impl Coordinator { pub(crate) fn plan_statement( @@ -272,16 +271,12 @@ impl Coordinator { .iter() .map(|sink_id| (*sink_id, reason.clone())) .collect::>(); - self.drop_compute_sinks(drop_sinks).await; + self.drop_compute_sinks_with_reason(drop_sinks).await; } /// Handle removing metadata associated with a SUBSCRIBE or a COPY TO query. #[tracing::instrument(level = "debug", skip(self))] - pub(crate) async fn remove_active_sink( - &mut self, - id: GlobalId, - reason: ComputeSinkRemovalReason, - ) { + pub(crate) async fn remove_active_sink(&mut self, id: GlobalId) -> Option { if let Some(sink) = self.active_compute_sinks.remove(&id) { let session_type = metrics::session_type_label_value(sink.user()); @@ -291,48 +286,29 @@ impl Coordinator { .drop_sinks .remove(&id); - match sink { + match &sink { ActiveComputeSink::Subscribe(active_subscribe) => { let update = self.catalog() .state() - .pack_subscribe_update(id, &active_subscribe, -1); + .pack_subscribe_update(id, active_subscribe, -1); self.builtin_table_update().blocking(vec![update]).await; self.metrics .active_subscribes .with_label_values(&[session_type]) .dec(); - - let message = match reason { - ComputeSinkRemovalReason::Finished => return, - ComputeSinkRemovalReason::Canceled => PeekResponseUnary::Canceled, - ComputeSinkRemovalReason::DependencyDropped(d) => { - PeekResponseUnary::Error(format!( - "subscribe has been terminated because underlying {d} was dropped" - )) - } - }; - active_subscribe.send(message); } - ActiveComputeSink::CopyTo(mut active_copy_to) => { + ActiveComputeSink::CopyTo(_) => { self.metrics .active_copy_tos .with_label_values(&[session_type]) .dec(); - - let message = match reason { - ComputeSinkRemovalReason::Finished => return, - ComputeSinkRemovalReason::Canceled => Err(AdapterError::Canceled), - ComputeSinkRemovalReason::DependencyDropped(d) => { - Err(AdapterError::Unstructured(anyhow!( - "copy has been terminated because underlying {d} was dropped" - ))) - } - }; - active_copy_to.process_response(message); } } + Some(sink) + } else { + None } } }