Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[copy_from]: Initial implementation, add OneshotSource and OneshotFormat, support appending Batches to Tables #30942

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion src/adapter/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,7 @@ impl ExecuteResponse {
&[AlteredSystemConfiguration]
}
Close => &[ClosedCursor],
PlanKind::CopyFrom => &[ExecuteResponseKind::CopyFrom],
PlanKind::CopyFrom => &[ExecuteResponseKind::CopyFrom, ExecuteResponseKind::Copied],
PlanKind::CopyTo => &[ExecuteResponseKind::Copied],
PlanKind::Comment => &[ExecuteResponseKind::Comment],
CommitTransaction => &[TransactionCommitted, TransactionRolledBack],
Expand Down
11 changes: 11 additions & 0 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,11 @@ pub enum Message {
conn_id: ConnectionId,
},
LinearizeReads,
StagedBatches {
conn_id: ConnectionId,
table_id: CatalogItemId,
batches: Vec<Result<ProtoBatch, String>>,
},
StorageUsageSchedule,
StorageUsageFetch,
StorageUsageUpdate(ShardsUsageReferenced),
Expand Down Expand Up @@ -354,6 +359,7 @@ impl Message {
Message::ClusterEvent(_) => "cluster_event",
Message::CancelPendingPeeks { .. } => "cancel_pending_peeks",
Message::LinearizeReads => "linearize_reads",
Message::StagedBatches { .. } => "staged_batches",
Message::StorageUsageSchedule => "storage_usage_schedule",
Message::StorageUsageFetch => "storage_usage_fetch",
Message::StorageUsageUpdate(_) => "storage_usage_update",
Expand Down Expand Up @@ -1659,6 +1665,10 @@ pub struct Coordinator {
active_compute_sinks: BTreeMap<GlobalId, ActiveComputeSink>,
/// A map from active webhooks to their invalidation handle.
active_webhooks: BTreeMap<CatalogItemId, WebhookAppenderInvalidator>,
/// A map of active `COPY FROM` statements. The Coordinator waits for `clusterd`
/// to stage Batches in Persist that we will then link into the shard.
active_copies: BTreeMap<ConnectionId, ExecuteContext>,

/// A map from connection ids to a watch channel that is set to `true` if the connection
/// received a cancel request.
staged_cancellation: BTreeMap<ConnectionId, (watch::Sender<bool>, watch::Receiver<bool>)>,
Expand Down Expand Up @@ -4147,6 +4157,7 @@ pub fn serve(
serialized_ddl: LockedVecDeque::new(),
active_compute_sinks: BTreeMap::new(),
active_webhooks: BTreeMap::new(),
active_copies: BTreeMap::new(),
staged_cancellation: BTreeMap::new(),
introspection_subscribes: BTreeMap::new(),
write_locks: BTreeMap::new(),
Expand Down
2 changes: 2 additions & 0 deletions src/adapter/src/coord/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1239,6 +1239,7 @@ impl Coordinator {
self.cancel_compute_sinks_for_conn(&conn_id).await;
self.cancel_cluster_reconfigurations_for_conn(&conn_id)
.await;
self.cancel_pending_copy(&conn_id);
if let Some((tx, _rx)) = self.staged_cancellation.get_mut(&conn_id) {
let _ = tx.send(true);
}
Expand Down Expand Up @@ -1273,6 +1274,7 @@ impl Coordinator {
.dec();
self.cancel_pending_peeks(conn.conn_id());
self.cancel_pending_watchsets(&conn_id);
self.cancel_pending_copy(&conn_id);
self.end_session_for_statement_logging(conn.uuid());

// Queue the builtin table update, but do not wait for it to complete. We explicitly do
Expand Down
7 changes: 7 additions & 0 deletions src/adapter/src/coord/message_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,13 @@ impl Coordinator {
Message::LinearizeReads => {
self.message_linearize_reads().boxed_local().await;
}
Message::StagedBatches {
conn_id,
table_id,
batches,
} => {
self.commit_staged_batches(conn_id, table_id, batches);
}
Message::StorageUsageSchedule => {
self.schedule_storage_usage_collection().boxed_local().await;
}
Expand Down
36 changes: 21 additions & 15 deletions src/adapter/src/coord/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ use mz_repr::{CatalogItemId, Diff, GlobalId};
use mz_sql::catalog::CatalogError;
use mz_sql::names::ResolvedIds;
use mz_sql::plan::{
self, AbortTransactionPlan, CommitTransactionPlan, CreateRolePlan, CreateSourcePlanBundle,
FetchPlan, MutationKind, Params, Plan, PlanKind, RaisePlan,
self, AbortTransactionPlan, CommitTransactionPlan, CopyFromSource, CreateRolePlan,
CreateSourcePlanBundle, FetchPlan, MutationKind, Params, Plan, PlanKind, RaisePlan,
};
use mz_sql::rbac;
use mz_sql::session::metadata::SessionMetadata;
use mz_sql_parser::ast::{Raw, Statement};
use mz_storage_client::client::TableData;
use mz_storage_types::connections::inline::IntoInlineConnection;
use std::sync::Arc;
use tokio::sync::oneshot;
Expand Down Expand Up @@ -359,18 +360,23 @@ impl Coordinator {
self.sequence_peek(ctx, show_columns_plan.select_plan, target_cluster, max)
.await;
}
Plan::CopyFrom(plan) => {
let (tx, _, session, ctx_extra) = ctx.into_parts();
tx.send(
Ok(ExecuteResponse::CopyFrom {
id: plan.id,
columns: plan.columns,
params: plan.params,
ctx_extra,
}),
session,
);
}
Plan::CopyFrom(plan) => match plan.source {
CopyFromSource::Stdin => {
let (tx, _, session, ctx_extra) = ctx.into_parts();
tx.send(
Ok(ExecuteResponse::CopyFrom {
id: plan.id,
columns: plan.columns,
params: plan.params,
ctx_extra,
}),
session,
);
}
CopyFromSource::Url(_) => {
self.sequence_copy_from(ctx, plan, target_cluster).await;
}
},
Plan::ExplainPlan(plan) => {
self.sequence_explain_plan(ctx, plan, target_cluster).await;
}
Expand Down Expand Up @@ -827,7 +833,7 @@ impl Coordinator {

session.add_transaction_ops(TransactionOps::Writes(vec![WriteOp {
id: plan.id,
rows: plan.updates,
rows: TableData::Rows(plan.updates),
}]))?;
if !plan.returning.is_empty() {
let finishing = RowSetFinishing {
Expand Down
181 changes: 181 additions & 0 deletions src/adapter/src/coord/sequencer/inner/copy_from.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use mz_adapter_types::connection::ConnectionId;
use mz_ore::cast::CastInto;
use mz_persist_client::batch::ProtoBatch;
use mz_repr::{CatalogItemId, Datum, RowArena};
use mz_sql::plan::{self, CopyFromSource, HirScalarExpr};
use mz_sql::session::metadata::SessionMetadata;
use mz_storage_client::client::TableData;
use mz_storage_types::oneshot_sources::OneshotIngestionRequest;
use smallvec::SmallVec;
use url::Url;

use crate::coord::sequencer::inner::return_if_err;
use crate::coord::{Coordinator, TargetCluster};
use crate::optimize::dataflows::{prep_scalar_expr, EvalTime, ExprPrepStyle};
use crate::session::{TransactionOps, WriteOp};
use crate::{AdapterError, ExecuteContext, ExecuteResponse};

impl Coordinator {
pub(crate) async fn sequence_copy_from(
&mut self,
ctx: ExecuteContext,
plan: plan::CopyFromPlan,
target_cluster: TargetCluster,
) {
let plan::CopyFromPlan {
id,
source,
columns: _,
params: _,
} = plan;

let from_expr = match source {
CopyFromSource::Url(from_expr) => from_expr,
CopyFromSource::Stdin => {
unreachable!("COPY FROM STDIN should be handled elsewhere")
}
};

let eval_url = |from: HirScalarExpr| -> Result<Url, AdapterError> {
let style = ExprPrepStyle::OneShot {
logical_time: EvalTime::NotAvailable,
session: ctx.session(),
catalog_state: self.catalog().state(),
};
let mut from = from.lower_uncorrelated()?;
prep_scalar_expr(&mut from, style)?;

let temp_storage = RowArena::new();
let eval_result = from.eval(&[], &temp_storage)?;

if eval_result == Datum::Null {
coord_bail!("COPY FROM target value cannot be NULL");
}
ParkMyCar marked this conversation as resolved.
Show resolved Hide resolved
let eval_string = match eval_result {
Datum::Null => coord_bail!("COPY FROM target value cannot be NULL"),
Datum::String(url_str) => url_str,
other => coord_bail!("programming error! COPY FROM target cannot be {other}"),
};

Url::parse(eval_string)
.map_err(|err| AdapterError::Unstructured(anyhow::anyhow!("{err}")))
ParkMyCar marked this conversation as resolved.
Show resolved Hide resolved
};
let url = return_if_err!(eval_url(from_expr), ctx);

let dest_table = self
.catalog()
.get_entry(&id)
.table()
.expect("TODO SHOULD BE A TABLE");
ParkMyCar marked this conversation as resolved.
Show resolved Hide resolved

let collection_id = dest_table.global_id_writes();
let (_, ingestion_id) = self.transient_id_gen.allocate_id();
let request = OneshotIngestionRequest {
source: mz_storage_types::oneshot_sources::ContentSource::Http { url },
format: mz_storage_types::oneshot_sources::ContentFormat::Csv,
};

let cluster_id = self
.catalog()
.resolve_target_cluster(target_cluster, ctx.session())
.expect("TODO do this in planning")
ParkMyCar marked this conversation as resolved.
Show resolved Hide resolved
.id;

// When we finish staging the Batches in Persist, we'll send a command
// to the Coordinator.
let command_tx = self.internal_cmd_tx.clone();
let conn_id = ctx.session().conn_id().clone();
let closure = Box::new(move |batches| {
let _ = command_tx.send(crate::coord::Message::StagedBatches {
conn_id,
table_id: id,
batches,
});
});
// Stash the execute context so we can cancel the COPY.
self.active_copies
.insert(ctx.session().conn_id().clone(), ctx);
Comment on lines +108 to +110
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this has been discussed, but if we cancel after the batch has been staged, then will we leak the batch in persist?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would leak the batches. If we cancel the request we could spawn a task that will wait for the response and clean them up, but concurrently Persist is also working on a leaked blob detector so this shouldn't be too much of an issue


let _result = self
.controller
.storage
.create_oneshot_ingestion(ingestion_id, collection_id, cluster_id, request, closure)
.await;
}

pub(crate) fn commit_staged_batches(
&mut self,
conn_id: ConnectionId,
table_id: CatalogItemId,
batches: Vec<Result<ProtoBatch, String>>,
) {
let Some(mut ctx) = self.active_copies.remove(&conn_id) else {
tracing::warn!(?conn_id, "got response for canceled COPY FROM");
return;
};

let mut all_batches = SmallVec::with_capacity(batches.len());
let mut all_errors = SmallVec::<[String; 1]>::with_capacity(batches.len());
let mut row_count = 0u64;

for maybe_batch in batches {
match maybe_batch {
Ok(batch) => {
let count = batch.batch.as_ref().map(|b| b.len).unwrap_or(0);
all_batches.push(batch);
row_count = row_count.saturating_add(count);
}
Err(err) => all_errors.push(err),
}
}

// If we got any errors we need to fail the whole operation.
if let Some(error) = all_errors.pop() {
tracing::warn!(?error, ?all_errors, "failed COPY FROM");

// TODO(parkmycar): Cleanup the existing ProtoBatches to prevent leaking them.
// TODO(parkmycar): Carry structured errors all the way through.

ctx.retire(Err(AdapterError::Unstructured(anyhow::anyhow!(
"COPY FROM: {error}"
))));
ParkMyCar marked this conversation as resolved.
Show resolved Hide resolved

return;
}

// Stage a WriteOp, then when the Session is retired we complete the
// transaction, which handles acquiring the write lock for `table_id`,
// advancing the timestamps of the staged batches, and waiting for
// everything to complete before sending a response to the client.
let stage_write = ctx
.session_mut()
.add_transaction_ops(TransactionOps::Writes(vec![WriteOp {
id: table_id,
rows: TableData::Batches(all_batches),
}]));

if let Err(err) = stage_write {
ctx.retire(Err(err));
} else {
ctx.retire(Ok(ExecuteResponse::Copied(row_count.cast_into())));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why Copied and not CopyFrom?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The CopyFrom execute response is actually what drives the existing COPY FROM implementation, so it doesn't really work as the response type here. When ending a session with ExecuteResponse::CopyFrom we actually move the Session to a separate task which streams in data

}
}

/// Cancel any active `COPY FROM` statements/oneshot ingestions.
#[mz_ore::instrument(level = "debug")]
pub(crate) fn cancel_pending_copy(&mut self, conn_id: &ConnectionId) {
// TODO(parkmycar): Also cancel the dataflow running on clusterd.
if let Some(ctx) = self.active_copies.remove(conn_id) {
ctx.retire(Err(AdapterError::Canceled));
}
}
}
9 changes: 6 additions & 3 deletions src/sql-parser/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6333,9 +6333,12 @@ impl<'a> Parser<'a> {
)
.map_no_statement_parser_err();
}
self.expect_keyword(STDIN)
.map_parser_err(StatementKind::Copy)?;
(CopyDirection::From, CopyTarget::Stdin)
if self.parse_keyword(STDIN) {
(CopyDirection::From, CopyTarget::Stdin)
} else {
let url_expr = self.parse_expr().map_parser_err(StatementKind::Copy)?;
(CopyDirection::From, CopyTarget::Expr(url_expr))
}
}
TO => {
if self.parse_keyword(STDOUT) {
Expand Down
11 changes: 11 additions & 0 deletions src/sql/src/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -930,10 +930,21 @@ pub struct ShowColumnsPlan {
#[derive(Debug)]
pub struct CopyFromPlan {
pub id: CatalogItemId,
pub source: CopyFromSource,
pub columns: Vec<usize>,
pub params: CopyFormatParams<'static>,
}

#[derive(Debug)]
pub enum CopyFromSource {
/// Copying from a file local to the user, transmitted via pgwire.
Stdin,
/// A remote resource, e.g. S3.
///
/// The contained [`HirScalarExpr`] evaluates to the Url for the remote resource.
Url(HirScalarExpr),
}

#[derive(Debug, Clone)]
pub struct CopyToPlan {
/// The select query plan whose data will be copied to destination uri.
Expand Down
Loading