Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[copy_from]: Initial implementation, add OneshotSource and OneshotFormat, support appending Batches to Tables #30942

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
159 changes: 124 additions & 35 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 5 additions & 3 deletions deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,8 @@ skip = [
{ name = "windows-sys", version = "0.52.0" },
# Newer versions of crates like `tempfile` are held back by crates like `atty`.
# This is very Unfortunate as we don't actually use these platforms.
{ name = "hermit-abi", version = "0.1.6" },
{ name = "hermit-abi", version = "0.2.6" },
{ name = "redox_syscall", version = "0.2.10" },
{ name = "linux-raw-sys", version = "0.3.4" },
{ name = "rustix", version = "0.38.21" },

# Will require updating many crates
Expand Down Expand Up @@ -118,6 +116,11 @@ skip = [
{ name = "sync_wrapper", version = "0.1.2" },

{ name = "memmap2", version = "0.5.4" },

# Part of the upgrade to reqwest 0.12
{ name = "reqwest", version = "0.11.24" },
{ name = "rustls-pemfile", version = "1.0.4" },
{ name = "winreg", version = "0.50.0" }
]

# Use `tracing` instead.
Expand Down Expand Up @@ -187,7 +190,6 @@ wrappers = [
"bindgen",
"bstr",
"console",
"criterion",
"dynfmt",
"findshlibs",
"insta",
Expand Down
2 changes: 1 addition & 1 deletion src/adapter/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,7 @@ impl ExecuteResponse {
&[AlteredSystemConfiguration]
}
Close => &[ClosedCursor],
PlanKind::CopyFrom => &[ExecuteResponseKind::CopyFrom],
PlanKind::CopyFrom => &[ExecuteResponseKind::CopyFrom, ExecuteResponseKind::Copied],
PlanKind::CopyTo => &[ExecuteResponseKind::Copied],
PlanKind::Comment => &[ExecuteResponseKind::Comment],
CommitTransaction => &[TransactionCommitted, TransactionRolledBack],
Expand Down
22 changes: 14 additions & 8 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ use mz_ore::vec::VecExt;
use mz_ore::{
assert_none, instrument, soft_assert_eq_or_log, soft_assert_or_log, soft_panic_or_log, stack,
};
use mz_persist_client::batch::ProtoBatch;
use mz_persist_client::usage::{ShardsUsageReferenced, StorageUsageClient};
use mz_repr::explain::{ExplainConfig, ExplainFormat};
use mz_repr::global_id::TransientIdGen;
Expand All @@ -144,7 +145,7 @@ use mz_sql::session::user::User;
use mz_sql::session::vars::SystemVars;
use mz_sql_parser::ast::display::AstDisplay;
use mz_sql_parser::ast::ExplainStage;
use mz_storage_client::client::TimestamplessUpdate;
use mz_storage_client::client::TableData;
use mz_storage_client::controller::{CollectionDescription, DataSource};
use mz_storage_types::connections::inline::{IntoInlineConnection, ReferencedConnection};
use mz_storage_types::connections::Connection as StorageConnection;
Expand Down Expand Up @@ -253,6 +254,11 @@ pub enum Message {
conn_id: ConnectionId,
},
LinearizeReads,
StagedBatches {
conn_id: ConnectionId,
table_id: CatalogItemId,
batches: Vec<Result<ProtoBatch, String>>,
},
StorageUsageSchedule,
StorageUsageFetch,
StorageUsageUpdate(ShardsUsageReferenced),
Expand Down Expand Up @@ -353,6 +359,7 @@ impl Message {
Message::ClusterEvent(_) => "cluster_event",
Message::CancelPendingPeeks { .. } => "cancel_pending_peeks",
Message::LinearizeReads => "linearize_reads",
Message::StagedBatches { .. } => "staged_batches",
Message::StorageUsageSchedule => "storage_usage_schedule",
Message::StorageUsageFetch => "storage_usage_fetch",
Message::StorageUsageUpdate(_) => "storage_usage_update",
Expand Down Expand Up @@ -1658,6 +1665,10 @@ pub struct Coordinator {
active_compute_sinks: BTreeMap<GlobalId, ActiveComputeSink>,
/// A map from active webhooks to their invalidation handle.
active_webhooks: BTreeMap<CatalogItemId, WebhookAppenderInvalidator>,
/// A map of active `COPY FROM` statements. The Coordinator waits for `clusterd`
/// to stage Batches in Persist that we will then link into the shard.
active_copies: BTreeMap<ConnectionId, ExecuteContext>,

/// A map from connection ids to a watch channel that is set to `true` if the connection
/// received a cancel request.
staged_cancellation: BTreeMap<ConnectionId, (watch::Sender<bool>, watch::Receiver<bool>)>,
Expand Down Expand Up @@ -2218,13 +2229,7 @@ impl Coordinator {
);
let appends = appends
.into_iter()
.map(|(id, updates)| {
let updates = updates
.into_iter()
.map(|(row, diff)| TimestamplessUpdate { row, diff })
.collect();
(id, updates)
})
.map(|(id, updates)| (id, vec![TableData::Rows(updates)]))
.collect();
let fut = self
.controller
Expand Down Expand Up @@ -4152,6 +4157,7 @@ pub fn serve(
serialized_ddl: LockedVecDeque::new(),
active_compute_sinks: BTreeMap::new(),
active_webhooks: BTreeMap::new(),
active_copies: BTreeMap::new(),
staged_cancellation: BTreeMap::new(),
introspection_subscribes: BTreeMap::new(),
write_locks: BTreeMap::new(),
Expand Down
51 changes: 31 additions & 20 deletions src/adapter/src/coord/appends.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ use mz_ore::metrics::MetricsFutureExt;
use mz_ore::task;
use mz_ore::tracing::OpenTelemetryContext;
use mz_ore::{assert_none, instrument};
use mz_repr::{CatalogItemId, Diff, Row, Timestamp};
use mz_repr::{CatalogItemId, Timestamp};
use mz_sql::names::ResolvedIds;
use mz_sql::plan::Plan;
use mz_sql::session::metadata::SessionMetadata;
use mz_storage_client::client::TimestamplessUpdate;
use mz_storage_client::client::TableData;
use mz_timestamp_oracle::WriteTimestamp;
use smallvec::SmallVec;
use tokio::sync::{oneshot, Notify, OwnedMutexGuard, OwnedSemaphorePermit, Semaphore};
use tracing::{debug_span, info, warn, Instrument, Span};

Expand Down Expand Up @@ -109,7 +110,7 @@ pub struct DeferredPlan {
#[derive(Debug)]
pub struct DeferredWrite {
pub span: Span,
pub writes: BTreeMap<CatalogItemId, Vec<(Row, i64)>>,
pub writes: BTreeMap<CatalogItemId, SmallVec<[TableData; 1]>>,
pub pending_txn: PendingTxn,
}

Expand All @@ -129,7 +130,7 @@ pub(crate) enum PendingWriteTxn {
User {
span: Span,
/// List of all write operations within the transaction.
writes: BTreeMap<CatalogItemId, Vec<(Row, Diff)>>,
writes: BTreeMap<CatalogItemId, SmallVec<[TableData; 1]>>,
/// If they exist, should contain locks for each [`CatalogItemId`] in `writes`.
write_locks: Option<WriteLocks>,
/// Inner transaction.
Expand Down Expand Up @@ -447,7 +448,7 @@ impl Coordinator {
.await
.unwrap_or_terminate("unable to confirm leadership");

let mut appends: BTreeMap<CatalogItemId, Vec<(Row, Diff)>> = BTreeMap::new();
let mut appends: BTreeMap<CatalogItemId, SmallVec<[TableData; 1]>> = BTreeMap::new();
let mut responses = Vec::with_capacity(validated_writes.len());
let mut notifies = Vec::new();

Expand All @@ -465,14 +466,14 @@ impl Coordinator {
},
} => {
assert_none!(write_locks, "should have merged together all locks above");
for (id, rows) in writes {
for (id, table_data) in writes {
// If the table that some write was targeting has been deleted while the
// write was waiting, then the write will be ignored and we respond to the
// client that the write was successful. This is only possible if the write
// and the delete were concurrent. Therefore, we are free to order the
// write before the delete without violating any consistency guarantees.
if self.catalog().try_get_entry(&id).is_some() {
appends.entry(id).or_default().extend(rows);
appends.entry(id).or_default().extend(table_data);
}
}
if let Some(id) = ctx.extra().contents() {
Expand All @@ -483,10 +484,8 @@ impl Coordinator {
}
PendingWriteTxn::System { updates, source } => {
for update in updates {
appends
.entry(update.id)
.or_default()
.push((update.row, update.diff));
let data = TableData::Rows(vec![(update.row, update.diff)]);
appends.entry(update.id).or_default().push(data);
}
// Once the write completes we notify any waiters.
if let BuiltinTableUpdateSource::Internal(tx) = source {
Expand All @@ -496,21 +495,33 @@ impl Coordinator {
}
}

for (_, updates) in &mut appends {
differential_dataflow::consolidation::consolidate(updates);
}
// Add table advancements for all tables.
for table in self.catalog().entries().filter(|entry| entry.is_table()) {
appends.entry(table.id()).or_default();
}
let appends: Vec<_> = appends

// Consolidate all Rows for a given table.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we're only consolidating the "raw" rows we might have, batched data is passed through untouched, yes?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly, I left a comment describing as much

let mut all_appends = Vec::with_capacity(appends.len());
for (item_id, table_data) in appends.into_iter() {
let mut all_rows = Vec::new();
let mut all_data = Vec::new();
for data in table_data {
match data {
TableData::Rows(rows) => all_rows.extend(rows),
TableData::Batches(_) => all_data.push(data),
}
}
differential_dataflow::consolidation::consolidate(&mut all_rows);
all_data.push(TableData::Rows(all_rows));

// TODO(parkmycar): Use SmallVec throughout.
all_appends.push((item_id, all_data));
}

let appends: Vec<_> = all_appends
.into_iter()
.map(|(id, updates)| {
let gid = self.catalog().get_entry(&id).latest_global_id();
let updates: Vec<_> = updates
.into_iter()
.map(|(row, diff)| TimestamplessUpdate { row, diff })
.collect();
(gid, updates)
})
.collect();
Expand All @@ -519,7 +530,7 @@ impl Coordinator {
let modified_tables: Vec<_> = appends
.iter()
.filter_map(|(id, updates)| {
if id.is_user() && !updates.is_empty() {
if id.is_user() && !updates.iter().all(|u| u.is_empty()) {
Some(id)
} else {
None
Expand Down
2 changes: 2 additions & 0 deletions src/adapter/src/coord/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1239,6 +1239,7 @@ impl Coordinator {
self.cancel_compute_sinks_for_conn(&conn_id).await;
self.cancel_cluster_reconfigurations_for_conn(&conn_id)
.await;
self.cancel_pending_copy(&conn_id);
if let Some((tx, _rx)) = self.staged_cancellation.get_mut(&conn_id) {
let _ = tx.send(true);
}
Expand Down Expand Up @@ -1273,6 +1274,7 @@ impl Coordinator {
.dec();
self.cancel_pending_peeks(conn.conn_id());
self.cancel_pending_watchsets(&conn_id);
self.cancel_pending_copy(&conn_id);
self.end_session_for_statement_logging(conn.uuid());

// Queue the builtin table update, but do not wait for it to complete. We explicitly do
Expand Down
7 changes: 7 additions & 0 deletions src/adapter/src/coord/message_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,13 @@ impl Coordinator {
Message::LinearizeReads => {
self.message_linearize_reads().boxed_local().await;
}
Message::StagedBatches {
conn_id,
table_id,
batches,
} => {
self.commit_staged_batches(conn_id, table_id, batches);
}
Message::StorageUsageSchedule => {
self.schedule_storage_usage_collection().boxed_local().await;
}
Expand Down
36 changes: 21 additions & 15 deletions src/adapter/src/coord/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ use mz_repr::{CatalogItemId, Diff, GlobalId};
use mz_sql::catalog::CatalogError;
use mz_sql::names::ResolvedIds;
use mz_sql::plan::{
self, AbortTransactionPlan, CommitTransactionPlan, CreateRolePlan, CreateSourcePlanBundle,
FetchPlan, MutationKind, Params, Plan, PlanKind, RaisePlan,
self, AbortTransactionPlan, CommitTransactionPlan, CopyFromSource, CreateRolePlan,
CreateSourcePlanBundle, FetchPlan, MutationKind, Params, Plan, PlanKind, RaisePlan,
};
use mz_sql::rbac;
use mz_sql::session::metadata::SessionMetadata;
use mz_sql_parser::ast::{Raw, Statement};
use mz_storage_client::client::TableData;
use mz_storage_types::connections::inline::IntoInlineConnection;
use std::sync::Arc;
use tokio::sync::oneshot;
Expand Down Expand Up @@ -359,18 +360,23 @@ impl Coordinator {
self.sequence_peek(ctx, show_columns_plan.select_plan, target_cluster, max)
.await;
}
Plan::CopyFrom(plan) => {
let (tx, _, session, ctx_extra) = ctx.into_parts();
tx.send(
Ok(ExecuteResponse::CopyFrom {
id: plan.id,
columns: plan.columns,
params: plan.params,
ctx_extra,
}),
session,
);
}
Plan::CopyFrom(plan) => match plan.source {
CopyFromSource::Stdin => {
let (tx, _, session, ctx_extra) = ctx.into_parts();
tx.send(
Ok(ExecuteResponse::CopyFrom {
id: plan.id,
columns: plan.columns,
params: plan.params,
ctx_extra,
}),
session,
);
}
CopyFromSource::Url(_) => {
self.sequence_copy_from(ctx, plan, target_cluster).await;
}
},
Plan::ExplainPlan(plan) => {
self.sequence_explain_plan(ctx, plan, target_cluster).await;
}
Expand Down Expand Up @@ -827,7 +833,7 @@ impl Coordinator {

session.add_transaction_ops(TransactionOps::Writes(vec![WriteOp {
id: plan.id,
rows: plan.updates,
rows: TableData::Rows(plan.updates),
}]))?;
if !plan.returning.is_empty() {
let finishing = RowSetFinishing {
Expand Down
6 changes: 4 additions & 2 deletions src/adapter/src/coord/sequencer/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use mz_sql::names::{
use mz_sql::plan::{ConnectionDetails, NetworkPolicyRule, StatementContext};
use mz_sql::pure::{generate_subsource_statements, PurifiedSourceExport};
use mz_storage_types::sinks::StorageSinkDesc;
use smallvec::SmallVec;
use timely::progress::Timestamp as TimelyTimestamp;
// Import `plan` module, but only import select elements to avoid merge conflicts on use statements.
use mz_adapter_types::connection::ConnectionId;
Expand Down Expand Up @@ -117,6 +118,7 @@ use crate::util::{viewable_variables, ClientTransmitter, ResultExt};
use crate::{PeekResponseUnary, ReadHolds};

mod cluster;
mod copy_from;
mod create_continual_task;
mod create_index;
mod create_materialized_view;
Expand Down Expand Up @@ -2198,10 +2200,10 @@ impl Coordinator {
},
};

let mut collected_writes: BTreeMap<CatalogItemId, Vec<_>> = BTreeMap::new();
let mut collected_writes: BTreeMap<CatalogItemId, SmallVec<_>> = BTreeMap::new();
for WriteOp { id, rows } in writes {
let total_rows = collected_writes.entry(id).or_default();
total_rows.extend(rows);
total_rows.push(rows);
}

self.submit_write(PendingWriteTxn::User {
Expand Down
Loading
Loading