Skip to content

Commit

Permalink
adapter: refactor ActiveCopyTo to have a non-optional ctx (#25311)
Browse files Browse the repository at this point in the history
adapter: refactor ActiveCopyTo to have a non-optional ctx
  • Loading branch information
Mouli Mukherjee authored Feb 16, 2024
1 parent 73e6ab5 commit 10b13b3
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 75 deletions.
48 changes: 34 additions & 14 deletions src/adapter/src/active_compute_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use std::cmp::Ordering;
use std::collections::BTreeSet;
use std::iter;

use anyhow::anyhow;
use itertools::Itertools;
use mz_adapter_types::connection::ConnectionId;
use mz_compute_client::protocol::response::SubscribeBatch;
Expand Down Expand Up @@ -48,14 +49,14 @@ impl ActiveComputeSink {
pub fn connection_id(&self) -> &ConnectionId {
match &self {
ActiveComputeSink::Subscribe(subscribe) => &subscribe.conn_id,
ActiveComputeSink::CopyTo(copy_to) => &copy_to.conn_id,
ActiveComputeSink::CopyTo(copy_to) => copy_to.ctx.session().conn_id(),
}
}

pub fn user(&self) -> &User {
match &self {
ActiveComputeSink::Subscribe(subscribe) => &subscribe.user,
ActiveComputeSink::CopyTo(copy_to) => &copy_to.user,
ActiveComputeSink::CopyTo(copy_to) => copy_to.ctx.session().user(),
}
}

Expand All @@ -65,6 +66,34 @@ impl ActiveComputeSink {
ActiveComputeSink::CopyTo(copy_to) => &copy_to.depends_on,
}
}

/// Consumes the `ActiveComputeSink` and sends a response if required.
pub fn drop_with_reason(self, reason: ComputeSinkRemovalReason) {
match self {
ActiveComputeSink::Subscribe(subscribe) => {
let message = match reason {
ComputeSinkRemovalReason::Finished => return,
ComputeSinkRemovalReason::Canceled => PeekResponseUnary::Canceled,
ComputeSinkRemovalReason::DependencyDropped(d) => PeekResponseUnary::Error(
format!("subscribe has been terminated because underlying {d} was dropped"),
),
};
subscribe.send(message);
}
ActiveComputeSink::CopyTo(copy_to) => {
let message = match reason {
ComputeSinkRemovalReason::Finished => return,
ComputeSinkRemovalReason::Canceled => Err(AdapterError::Canceled),
ComputeSinkRemovalReason::DependencyDropped(d) => {
Err(AdapterError::Unstructured(anyhow!(
"copy has been terminated because underlying {d} was dropped"
)))
}
};
copy_to.process_response(message);
}
}
}
}

/// A description of an active subscribe from coord's perspective
Expand Down Expand Up @@ -363,26 +392,17 @@ pub enum ComputeSinkRemovalReason {
/// A description of an active copy to from coord's perspective.
#[derive(Debug)]
pub struct ActiveCopyTo {
/// The user of the session that created the subscribe.
pub user: User,
/// The connection id of the session that created the subscribe.
pub conn_id: ConnectionId,
/// The context about the COPY TO statement getting executed.
/// Used to eventually call `ctx.retire` on.
pub ctx: Option<ExecuteContext>,
pub ctx: ExecuteContext,
/// The cluster that the copy to is running on.
pub cluster_id: ClusterId,
/// All `GlobalId`s that the copy to's expression depends on.
pub depends_on: BTreeSet<GlobalId>,
}

impl ActiveCopyTo {
pub(crate) fn process_response(&mut self, response: Result<ExecuteResponse, AdapterError>) {
// TODO(mouli): refactor so that process_response takes `self` instead of `&mut self`. Then,
// we can make `ctx` not optional, and get rid of the `user` and `conn_id` as well.
// Using an option to get an owned value after take to call `retire` on it.
if let Some(ctx) = self.ctx.take() {
let _ = ctx.retire(response);
}
pub(crate) fn process_response(self, response: Result<ExecuteResponse, AdapterError>) {
let _ = self.ctx.retire(response);
}
}
43 changes: 30 additions & 13 deletions src/adapter/src/coord/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use mz_sql::session::vars::{
use mz_storage_client::controller::ExportDescription;
use mz_storage_types::connections::inline::IntoInlineConnection;
use mz_storage_types::controller::StorageError;
use mz_storage_types::instances::StorageInstanceId;
use mz_storage_types::read_policy::ReadPolicy;
use mz_storage_types::sources::GenericSourceConnection;
use serde_json::json;
Expand Down Expand Up @@ -557,7 +558,8 @@ impl Coordinator {
self.drop_storage_sinks(storage_sinks_to_drop);
}
if !compute_sinks_to_drop.is_empty() {
self.drop_compute_sinks(compute_sinks_to_drop).await;
self.drop_compute_sinks_with_reason(compute_sinks_to_drop)
.await;
}
if !peeks_to_drop.is_empty() {
for (dropped_name, uuid) in peeks_to_drop {
Expand Down Expand Up @@ -738,21 +740,17 @@ impl Coordinator {
}
}

pub(crate) async fn drop_compute_sinks(
/// This method drops the compute sink, by calling `compute.drop_collections`.
/// This should only be called after removing the corresponding sink entry from
/// `self.active_compute_sinks`.
pub(crate) fn drop_compute_sinks(
&mut self,
sinks: impl IntoIterator<Item = (GlobalId, ComputeSinkRemovalReason)>,
sink_and_cluster_ids: impl IntoIterator<Item = (GlobalId, StorageInstanceId)>,
) {
let mut by_cluster: BTreeMap<_, Vec<_>> = BTreeMap::new();
for (sink_id, reason) in sinks {
let cluster_id = match self.active_compute_sinks.get(&sink_id) {
None => {
tracing::error!(%sink_id, "drop_compute_sinks called on nonexistent sink");
continue;
}
Some(s) => s.cluster_id(),
};

self.remove_active_sink(sink_id, reason.clone()).await;
for (sink_id, cluster_id) in sink_and_cluster_ids {
// The active sink should have already been removed from the state
assert!(!self.active_compute_sinks.contains_key(&sink_id));

if !self
.controller
Expand Down Expand Up @@ -780,6 +778,25 @@ impl Coordinator {
}
}

/// Drops active compute sinks and might send appropriate response back
/// depending upon the reason.
pub(crate) async fn drop_compute_sinks_with_reason(
&mut self,
sinks: impl IntoIterator<Item = (GlobalId, ComputeSinkRemovalReason)>,
) {
let mut sink_cluster_id_map = BTreeMap::new();
for (sink_id, reason) in sinks {
if let Some(sink) = self.remove_active_sink(sink_id).await {
sink_cluster_id_map.insert(sink_id, sink.cluster_id());
sink.drop_with_reason(reason);
} else {
tracing::error!(%sink_id, "drop_compute_sinks called on nonexistent sink");
continue;
}
}
self.drop_compute_sinks(sink_cluster_id_map.into_iter());
}

pub(crate) fn drop_storage_sinks(&mut self, sinks: Vec<GlobalId>) {
for id in &sinks {
self.drop_storage_read_policy(id);
Expand Down
16 changes: 4 additions & 12 deletions src/adapter/src/coord/message_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ impl Coordinator {
Some(ActiveComputeSink::Subscribe(active_subscribe)) => {
let finished = active_subscribe.process_response(response);
if finished {
self.drop_compute_sinks([(
self.drop_compute_sinks_with_reason([(
sink_id,
ComputeSinkRemovalReason::Finished,
)])
Expand All @@ -362,23 +362,15 @@ impl Coordinator {
}
}
ControllerResponse::CopyToResponse(sink_id, response) => {
match self.active_compute_sinks.get_mut(&sink_id) {
match self.remove_active_sink(sink_id).await {
Some(ActiveComputeSink::CopyTo(active_copy_to)) => {
let response = match response {
Ok(n) => Ok(ExecuteResponse::Copied(usize::cast_from(n))),
Err(error) => Err(AdapterError::Unstructured(error)),
};
// Ideally it would be better to get an owned active_copy_to here
// and have `process_response` take a `self` instead of `&mut self`
// and consume the object along with the `ctx` it holds.
// But if we remove the entry from `active_compute_sinks` here,
// the following `drop_compute_sinks` will not drop the compute sinks
// since it does expect the entry there.
// TODO (mouli): refactor `drop_compute_sinks` so that we can get
// an owned value here.
let sink_and_cluster_id = (sink_id, active_copy_to.cluster_id);
active_copy_to.process_response(response);
self.drop_compute_sinks([(sink_id, ComputeSinkRemovalReason::Finished)])
.await;
self.drop_compute_sinks([sink_and_cluster_id]);
}
_ => {
tracing::error!(%sink_id, "received CopyToResponse for nonexistent copy to");
Expand Down
4 changes: 1 addition & 3 deletions src/adapter/src/coord/sequencer/inner/peek.rs
Original file line number Diff line number Diff line change
Expand Up @@ -842,9 +842,7 @@ impl Coordinator {

// Callback for the active copy to.
let active_copy_to = ActiveCopyTo {
user: ctx.session().user().clone(),
conn_id: ctx.session().conn_id().clone(),
ctx: Some(ctx),
ctx,
cluster_id,
depends_on: validity.dependency_ids.clone(),
};
Expand Down
42 changes: 9 additions & 33 deletions src/adapter/src/coord/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
//! Various utility methods used by the [`Coordinator`]. Ideally these are all
//! put in more meaningfully named modules.
use anyhow::anyhow;
use mz_adapter_types::connection::ConnectionId;
use mz_ore::now::EpochMillis;
use mz_repr::{GlobalId, ScalarType};
Expand All @@ -25,7 +24,7 @@ use crate::coord::appends::BuiltinTableAppendNotify;
use crate::coord::Coordinator;
use crate::session::{Session, TransactionStatus};
use crate::util::describe;
use crate::{metrics, AdapterError, ExecuteContext, ExecuteResponse, PeekResponseUnary};
use crate::{metrics, AdapterError, ExecuteContext, ExecuteResponse};

impl Coordinator {
pub(crate) fn plan_statement(
Expand Down Expand Up @@ -272,16 +271,12 @@ impl Coordinator {
.iter()
.map(|sink_id| (*sink_id, reason.clone()))
.collect::<Vec<_>>();
self.drop_compute_sinks(drop_sinks).await;
self.drop_compute_sinks_with_reason(drop_sinks).await;
}

/// Handle removing metadata associated with a SUBSCRIBE or a COPY TO query.
#[tracing::instrument(level = "debug", skip(self))]
pub(crate) async fn remove_active_sink(
&mut self,
id: GlobalId,
reason: ComputeSinkRemovalReason,
) {
pub(crate) async fn remove_active_sink(&mut self, id: GlobalId) -> Option<ActiveComputeSink> {
if let Some(sink) = self.active_compute_sinks.remove(&id) {
let session_type = metrics::session_type_label_value(sink.user());

Expand All @@ -291,48 +286,29 @@ impl Coordinator {
.drop_sinks
.remove(&id);

match sink {
match &sink {
ActiveComputeSink::Subscribe(active_subscribe) => {
let update =
self.catalog()
.state()
.pack_subscribe_update(id, &active_subscribe, -1);
.pack_subscribe_update(id, active_subscribe, -1);
self.builtin_table_update().blocking(vec![update]).await;

self.metrics
.active_subscribes
.with_label_values(&[session_type])
.dec();

let message = match reason {
ComputeSinkRemovalReason::Finished => return,
ComputeSinkRemovalReason::Canceled => PeekResponseUnary::Canceled,
ComputeSinkRemovalReason::DependencyDropped(d) => {
PeekResponseUnary::Error(format!(
"subscribe has been terminated because underlying {d} was dropped"
))
}
};
active_subscribe.send(message);
}
ActiveComputeSink::CopyTo(mut active_copy_to) => {
ActiveComputeSink::CopyTo(_) => {
self.metrics
.active_copy_tos
.with_label_values(&[session_type])
.dec();

let message = match reason {
ComputeSinkRemovalReason::Finished => return,
ComputeSinkRemovalReason::Canceled => Err(AdapterError::Canceled),
ComputeSinkRemovalReason::DependencyDropped(d) => {
Err(AdapterError::Unstructured(anyhow!(
"copy has been terminated because underlying {d} was dropped"
)))
}
};
active_copy_to.process_response(message);
}
}
Some(sink)
} else {
None
}
}
}

0 comments on commit 10b13b3

Please sign in to comment.