diff --git a/Cargo.lock b/Cargo.lock index 150e685911820..a4165aeed834e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1173,7 +1173,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "40723b8fb387abc38f4f4a37c09073622e41dd12327033091ef8950659e6dc0c" dependencies = [ "memchr", - "regex-automata 0.4.7", "serde", ] @@ -1882,23 +1881,25 @@ dependencies = [ [[package]] name = "csv-async" -version = "1.2.6" +version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "71933d3f2d0481d5111cb2817b15b6961961458ec58adf8008194e6c850046f4" +checksum = "d37fe5b0d07f4a8260ce1e9a81413e88f459af0f2dfc55c15e96868a2f99c0f0" dependencies = [ - "bstr 1.10.0", "cfg-if", "csv-core", "futures", + "itoa", + "ryu", + "serde", "tokio", "tokio-stream", ] [[package]] name = "csv-core" -version = "0.1.10" +version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b2466559f260f48ad25fe6317b3c8dac77b5bdb5763ac7d9d6103530663bc90" +checksum = "5efa2b3d7902f4b634a20cae3c9c4e6209dc4779feb6863329607560143efa70" dependencies = [ "memchr", ] @@ -4235,7 +4236,7 @@ dependencies = [ "mz-ore", "open", "openssl-probe", - "reqwest", + "reqwest 0.12.4", "rpassword", "security-framework", "semver", @@ -4330,7 +4331,7 @@ dependencies = [ "rand_chacha", "rdkafka", "regex", - "reqwest", + "reqwest 0.11.24", "semver", "serde", "serde_json", @@ -4515,7 +4516,7 @@ dependencies = [ "postgres", "prometheus", "proxy-header", - "reqwest", + "reqwest 0.11.24", "semver", "tempfile", "tokio", @@ -4691,7 +4692,7 @@ dependencies = [ "proptest", "proptest-derive", "prost-build", - "reqwest", + "reqwest 0.11.24", "serde", "serde_json", "tokio", @@ -4708,7 +4709,7 @@ dependencies = [ "chrono", "mz-frontegg-auth", "mz-frontegg-client", - "reqwest", + "reqwest 0.11.24", "serde", "thiserror", "tokio", @@ -5184,7 +5185,7 @@ dependencies = [ "rdkafka", "rdkafka-sys", "regex", - "reqwest", + "reqwest 0.11.24", "rlimit", "semver", "sentry", @@ -5327,7 +5328,7 @@ dependencies = [ "prost", "prost-build", "prost-types", - "reqwest", + "reqwest 0.11.24", "serde", "serde_json", "sha2", @@ -5360,7 +5361,7 @@ dependencies = [ "mz-ore", "mz-repr", "prometheus", - "reqwest", + "reqwest 0.11.24", "reqwest-middleware", "reqwest-retry", "serde", @@ -5379,7 +5380,7 @@ dependencies = [ "jsonwebtoken", "mz-frontegg-auth", "mz-ore", - "reqwest", + "reqwest 0.11.24", "serde", "serde_json", "thiserror", @@ -5404,7 +5405,7 @@ dependencies = [ "mz-frontegg-auth", "mz-ore", "openssl", - "reqwest", + "reqwest 0.11.24", "serde", "serde_json", "tokio", @@ -5560,7 +5561,7 @@ dependencies = [ name = "mz-metabase" version = "0.0.0" dependencies = [ - "reqwest", + "reqwest 0.11.24", "serde", "workspace-hack", ] @@ -5634,7 +5635,7 @@ dependencies = [ "flate2", "hex", "hex-literal", - "reqwest", + "reqwest 0.11.24", "sha2", "tar", "walkdir", @@ -5765,7 +5766,7 @@ dependencies = [ "mz-prof-http", "prometheus", "rand", - "reqwest", + "reqwest 0.11.24", "serde", "serde_json", "sha2", @@ -6545,7 +6546,7 @@ dependencies = [ "protobuf-native", "rdkafka", "regex", - "reqwest", + "reqwest 0.11.24", "serde", "serde_json", "static_assertions", @@ -6644,7 +6645,7 @@ dependencies = [ "mz-tracing", "postgres-protocol", "regex", - "reqwest", + "reqwest 0.11.24", "serde_json", "shell-words", "tempfile", @@ -6814,6 +6815,7 @@ dependencies = [ "rdkafka", "serde", "serde_json", + "smallvec", "static_assertions", "timely", "tokio", @@ -6872,7 +6874,9 @@ dependencies = [ "arrow", "async-stream", "aws-types", + "bytes", "bytesize", + "csv-async", "differential-dataflow", "futures", "http 1.1.0", @@ -6891,12 +6895,16 @@ dependencies = [ "parquet", "prometheus", "proptest", + "reqwest 0.11.24", "sentry", "serde", "thiserror", "timely", "tokio", + "tokio-stream", + "tokio-util", "tracing", + "url", "uuid", "workspace-hack", ] @@ -7044,7 +7052,7 @@ dependencies = [ "rand", "rdkafka", "regex", - "reqwest", + "reqwest 0.11.24", "semver", "serde", "serde_json", @@ -8693,7 +8701,7 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project-lite", - "rustls-pemfile", + "rustls-pemfile 1.0.4", "serde", "serde_json", "serde_urlencoded", @@ -8701,12 +8709,56 @@ dependencies = [ "system-configuration", "tokio", "tokio-native-tls", + "tokio-util", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "wasm-streams", + "web-sys", + "winreg 0.50.0", +] + +[[package]] +name = "reqwest" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "566cafdd92868e0939d3fb961bd0dc25fcfaaed179291093b3d43e6b3150ea10" +dependencies = [ + "base64 0.22.0", + "bytes", + "encoding_rs", + "futures-channel", + "futures-core", + "futures-util", + "h2 0.4.5", + "http 1.1.0", + "http-body 1.0.0", + "http-body-util", + "hyper 1.4.1", + "hyper-tls 0.6.0", + "hyper-util", + "ipnet", + "js-sys", + "log", + "mime", + "native-tls", + "once_cell", + "percent-encoding", + "pin-project-lite", + "rustls-pemfile 2.2.0", + "serde", + "serde_json", + "serde_urlencoded", + "sync_wrapper 0.1.2", + "tokio", + "tokio-native-tls", "tower-service", "url", "wasm-bindgen", "wasm-bindgen-futures", "web-sys", - "winreg", + "winreg 0.52.0", ] [[package]] @@ -8717,7 +8769,7 @@ dependencies = [ "anyhow", "async-trait", "http 0.2.9", - "reqwest", + "reqwest 0.11.24", "serde", "task-local-extensions", "thiserror", @@ -8734,7 +8786,7 @@ dependencies = [ "futures", "http 0.2.9", "hyper 0.14.27", - "reqwest", + "reqwest 0.11.24", "reqwest-middleware", "retry-policies", "task-local-extensions", @@ -8888,6 +8940,21 @@ dependencies = [ "base64 0.21.5", ] +[[package]] +name = "rustls-pemfile" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dce314e5fee3f39953d46bb63bb8a46d40c2f8fb7cc5a3b6cab2bde9721d6e50" +dependencies = [ + "rustls-pki-types", +] + +[[package]] +name = "rustls-pki-types" +version = "1.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2bf47e6ff922db3825eb750c4e2ff784c6ff8fb9e13046ef6a1d1c5401b0b37" + [[package]] name = "rustversion" version = "1.0.9" @@ -9033,7 +9100,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "24fc91c898e0487ff3e471d0849bbaf7d38a00ff5e3531009d386b0bab9b6b12" dependencies = [ "async-trait", - "reqwest", + "reqwest 0.11.24", "serde", "serde_json", "thiserror", @@ -9067,7 +9134,7 @@ checksum = "17ad137b9df78294b98cab1a650bef237cc6c950e82e5ce164655e674d07c5cc" dependencies = [ "httpdate", "native-tls", - "reqwest", + "reqwest 0.11.24", "sentry-backtrace", "sentry-contexts", "sentry-core", @@ -10878,9 +10945,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.19" +version = "0.4.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1fe9756085a84584ee9457a002b7cdfe0bfff169f45d2591d8be1345a6780e35" +checksum = "c02dbc21516f9f1f04f187958890d7e6026df8d16540b7ad9492bc34a67cea03" dependencies = [ "cfg-if", "js-sys", @@ -10917,11 +10984,24 @@ version = "0.2.87" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ca6ad05a4870b2bf5fe995117d3728437bd27d7cd5f06f13c17443ef369775a1" +[[package]] +name = "wasm-streams" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e072d4e72f700fb3443d8fe94a39315df013eef1104903cdb0a2abd322bbecd" +dependencies = [ + "futures-util", + "js-sys", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "web-sys" -version = "0.3.51" +version = "0.3.64" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e828417b379f3df7111d3a2a9e5753706cae29c41f7c4029ee9fd77f3e09e582" +checksum = "9b85cbef8c220a6abc02aefd892dfc0fc23afb1c6a426316ec33253a3877249b" dependencies = [ "js-sys", "wasm-bindgen", @@ -11223,6 +11303,16 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "winreg" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a277a57398d4bfa075df44f501a17cfdf8542d224f0d36095a2adc7aee4ef0a5" +dependencies = [ + "cfg-if", + "windows-sys 0.48.0", +] + [[package]] name = "workspace-hack" version = "0.0.0" @@ -11244,7 +11334,6 @@ dependencies = [ "axum-core", "bitflags 2.4.1", "bstr 0.2.14", - "bstr 1.10.0", "byteorder", "bytes", "camino", @@ -11258,13 +11347,13 @@ dependencies = [ "crossbeam-epoch", "crossbeam-utils", "crypto-common", + "csv-async", "debugid", "dec", "digest", "either", "flate2", "form_urlencoded", - "futures", "futures-channel", "futures-core", "futures-executor", @@ -11325,7 +11414,7 @@ dependencies = [ "regex", "regex-automata 0.4.7", "regex-syntax 0.8.3", - "reqwest", + "reqwest 0.11.24", "ring", "rustix", "schemars", diff --git a/deny.toml b/deny.toml index 71c35233c19b0..d80dfacb981a2 100644 --- a/deny.toml +++ b/deny.toml @@ -48,10 +48,8 @@ skip = [ { name = "windows-sys", version = "0.52.0" }, # Newer versions of crates like `tempfile` are held back by crates like `atty`. # This is very Unfortunate as we don't actually use these platforms. - { name = "hermit-abi", version = "0.1.6" }, { name = "hermit-abi", version = "0.2.6" }, { name = "redox_syscall", version = "0.2.10" }, - { name = "linux-raw-sys", version = "0.3.4" }, { name = "rustix", version = "0.38.21" }, # Will require updating many crates @@ -118,6 +116,11 @@ skip = [ { name = "sync_wrapper", version = "0.1.2" }, { name = "memmap2", version = "0.5.4" }, + + # Part of the upgrade to reqwest 0.12 + { name = "reqwest", version = "0.11.24" }, + { name = "rustls-pemfile", version = "1.0.4" }, + { name = "winreg", version = "0.50.0" } ] # Use `tracing` instead. @@ -187,7 +190,6 @@ wrappers = [ "bindgen", "bstr", "console", - "criterion", "dynfmt", "findshlibs", "insta", diff --git a/src/adapter/src/command.rs b/src/adapter/src/command.rs index ebfa65b7acffb..be5364ef2830d 100644 --- a/src/adapter/src/command.rs +++ b/src/adapter/src/command.rs @@ -598,7 +598,7 @@ impl ExecuteResponse { &[AlteredSystemConfiguration] } Close => &[ClosedCursor], - PlanKind::CopyFrom => &[ExecuteResponseKind::CopyFrom], + PlanKind::CopyFrom => &[ExecuteResponseKind::CopyFrom, ExecuteResponseKind::Copied], PlanKind::CopyTo => &[ExecuteResponseKind::Copied], PlanKind::Comment => &[ExecuteResponseKind::Comment], CommitTransaction => &[TransactionCommitted, TransactionRolledBack], diff --git a/src/adapter/src/coord.rs b/src/adapter/src/coord.rs index 002bf4dd6416f..8713990e62500 100644 --- a/src/adapter/src/coord.rs +++ b/src/adapter/src/coord.rs @@ -124,6 +124,7 @@ use mz_ore::vec::VecExt; use mz_ore::{ assert_none, instrument, soft_assert_eq_or_log, soft_assert_or_log, soft_panic_or_log, stack, }; +use mz_persist_client::batch::ProtoBatch; use mz_persist_client::usage::{ShardsUsageReferenced, StorageUsageClient}; use mz_repr::explain::{ExplainConfig, ExplainFormat}; use mz_repr::global_id::TransientIdGen; @@ -144,7 +145,7 @@ use mz_sql::session::user::User; use mz_sql::session::vars::SystemVars; use mz_sql_parser::ast::display::AstDisplay; use mz_sql_parser::ast::ExplainStage; -use mz_storage_client::client::TimestamplessUpdate; +use mz_storage_client::client::TableData; use mz_storage_client::controller::{CollectionDescription, DataSource}; use mz_storage_types::connections::inline::{IntoInlineConnection, ReferencedConnection}; use mz_storage_types::connections::Connection as StorageConnection; @@ -253,6 +254,11 @@ pub enum Message { conn_id: ConnectionId, }, LinearizeReads, + StagedBatches { + conn_id: ConnectionId, + table_id: CatalogItemId, + batches: Vec>, + }, StorageUsageSchedule, StorageUsageFetch, StorageUsageUpdate(ShardsUsageReferenced), @@ -353,6 +359,7 @@ impl Message { Message::ClusterEvent(_) => "cluster_event", Message::CancelPendingPeeks { .. } => "cancel_pending_peeks", Message::LinearizeReads => "linearize_reads", + Message::StagedBatches { .. } => "staged_batches", Message::StorageUsageSchedule => "storage_usage_schedule", Message::StorageUsageFetch => "storage_usage_fetch", Message::StorageUsageUpdate(_) => "storage_usage_update", @@ -1658,6 +1665,10 @@ pub struct Coordinator { active_compute_sinks: BTreeMap, /// A map from active webhooks to their invalidation handle. active_webhooks: BTreeMap, + /// A map of active `COPY FROM` statements. The Coordinator waits for `clusterd` + /// to stage Batches in Persist that we will then link into the shard. + active_copies: BTreeMap, + /// A map from connection ids to a watch channel that is set to `true` if the connection /// received a cancel request. staged_cancellation: BTreeMap, watch::Receiver)>, @@ -2218,13 +2229,7 @@ impl Coordinator { ); let appends = appends .into_iter() - .map(|(id, updates)| { - let updates = updates - .into_iter() - .map(|(row, diff)| TimestamplessUpdate { row, diff }) - .collect(); - (id, updates) - }) + .map(|(id, updates)| (id, vec![TableData::Rows(updates)])) .collect(); let fut = self .controller @@ -4152,6 +4157,7 @@ pub fn serve( serialized_ddl: LockedVecDeque::new(), active_compute_sinks: BTreeMap::new(), active_webhooks: BTreeMap::new(), + active_copies: BTreeMap::new(), staged_cancellation: BTreeMap::new(), introspection_subscribes: BTreeMap::new(), write_locks: BTreeMap::new(), diff --git a/src/adapter/src/coord/appends.rs b/src/adapter/src/coord/appends.rs index 3f89f8d44956b..b4e21929bf0e5 100644 --- a/src/adapter/src/coord/appends.rs +++ b/src/adapter/src/coord/appends.rs @@ -21,12 +21,13 @@ use mz_ore::metrics::MetricsFutureExt; use mz_ore::task; use mz_ore::tracing::OpenTelemetryContext; use mz_ore::{assert_none, instrument}; -use mz_repr::{CatalogItemId, Diff, Row, Timestamp}; +use mz_repr::{CatalogItemId, Timestamp}; use mz_sql::names::ResolvedIds; use mz_sql::plan::Plan; use mz_sql::session::metadata::SessionMetadata; -use mz_storage_client::client::TimestamplessUpdate; +use mz_storage_client::client::TableData; use mz_timestamp_oracle::WriteTimestamp; +use smallvec::SmallVec; use tokio::sync::{oneshot, Notify, OwnedMutexGuard, OwnedSemaphorePermit, Semaphore}; use tracing::{debug_span, info, warn, Instrument, Span}; @@ -109,7 +110,7 @@ pub struct DeferredPlan { #[derive(Debug)] pub struct DeferredWrite { pub span: Span, - pub writes: BTreeMap>, + pub writes: BTreeMap>, pub pending_txn: PendingTxn, } @@ -129,7 +130,7 @@ pub(crate) enum PendingWriteTxn { User { span: Span, /// List of all write operations within the transaction. - writes: BTreeMap>, + writes: BTreeMap>, /// If they exist, should contain locks for each [`CatalogItemId`] in `writes`. write_locks: Option, /// Inner transaction. @@ -447,7 +448,7 @@ impl Coordinator { .await .unwrap_or_terminate("unable to confirm leadership"); - let mut appends: BTreeMap> = BTreeMap::new(); + let mut appends: BTreeMap> = BTreeMap::new(); let mut responses = Vec::with_capacity(validated_writes.len()); let mut notifies = Vec::new(); @@ -465,14 +466,14 @@ impl Coordinator { }, } => { assert_none!(write_locks, "should have merged together all locks above"); - for (id, rows) in writes { + for (id, table_data) in writes { // If the table that some write was targeting has been deleted while the // write was waiting, then the write will be ignored and we respond to the // client that the write was successful. This is only possible if the write // and the delete were concurrent. Therefore, we are free to order the // write before the delete without violating any consistency guarantees. if self.catalog().try_get_entry(&id).is_some() { - appends.entry(id).or_default().extend(rows); + appends.entry(id).or_default().extend(table_data); } } if let Some(id) = ctx.extra().contents() { @@ -483,10 +484,8 @@ impl Coordinator { } PendingWriteTxn::System { updates, source } => { for update in updates { - appends - .entry(update.id) - .or_default() - .push((update.row, update.diff)); + let data = TableData::Rows(vec![(update.row, update.diff)]); + appends.entry(update.id).or_default().push(data); } // Once the write completes we notify any waiters. if let BuiltinTableUpdateSource::Internal(tx) = source { @@ -496,21 +495,34 @@ impl Coordinator { } } - for (_, updates) in &mut appends { - differential_dataflow::consolidation::consolidate(updates); - } // Add table advancements for all tables. for table in self.catalog().entries().filter(|entry| entry.is_table()) { appends.entry(table.id()).or_default(); } - let appends: Vec<_> = appends + + // Consolidate all Rows for a given table. We do not consolidate the + // staged batches, that's up to whoever staged them. + let mut all_appends = Vec::with_capacity(appends.len()); + for (item_id, table_data) in appends.into_iter() { + let mut all_rows = Vec::new(); + let mut all_data = Vec::new(); + for data in table_data { + match data { + TableData::Rows(rows) => all_rows.extend(rows), + TableData::Batches(_) => all_data.push(data), + } + } + differential_dataflow::consolidation::consolidate(&mut all_rows); + all_data.push(TableData::Rows(all_rows)); + + // TODO(parkmycar): Use SmallVec throughout. + all_appends.push((item_id, all_data)); + } + + let appends: Vec<_> = all_appends .into_iter() .map(|(id, updates)| { let gid = self.catalog().get_entry(&id).latest_global_id(); - let updates: Vec<_> = updates - .into_iter() - .map(|(row, diff)| TimestamplessUpdate { row, diff }) - .collect(); (gid, updates) }) .collect(); @@ -519,7 +531,7 @@ impl Coordinator { let modified_tables: Vec<_> = appends .iter() .filter_map(|(id, updates)| { - if id.is_user() && !updates.is_empty() { + if id.is_user() && !updates.iter().all(|u| u.is_empty()) { Some(id) } else { None diff --git a/src/adapter/src/coord/command_handler.rs b/src/adapter/src/coord/command_handler.rs index 9ef5b8c4baf2b..cfc732b953eda 100644 --- a/src/adapter/src/coord/command_handler.rs +++ b/src/adapter/src/coord/command_handler.rs @@ -1239,6 +1239,7 @@ impl Coordinator { self.cancel_compute_sinks_for_conn(&conn_id).await; self.cancel_cluster_reconfigurations_for_conn(&conn_id) .await; + self.cancel_pending_copy(&conn_id); if let Some((tx, _rx)) = self.staged_cancellation.get_mut(&conn_id) { let _ = tx.send(true); } @@ -1273,6 +1274,7 @@ impl Coordinator { .dec(); self.cancel_pending_peeks(conn.conn_id()); self.cancel_pending_watchsets(&conn_id); + self.cancel_pending_copy(&conn_id); self.end_session_for_statement_logging(conn.uuid()); // Queue the builtin table update, but do not wait for it to complete. We explicitly do diff --git a/src/adapter/src/coord/message_handler.rs b/src/adapter/src/coord/message_handler.rs index b3d34d24b5998..e0f93c810f182 100644 --- a/src/adapter/src/coord/message_handler.rs +++ b/src/adapter/src/coord/message_handler.rs @@ -112,6 +112,13 @@ impl Coordinator { Message::LinearizeReads => { self.message_linearize_reads().boxed_local().await; } + Message::StagedBatches { + conn_id, + table_id, + batches, + } => { + self.commit_staged_batches(conn_id, table_id, batches); + } Message::StorageUsageSchedule => { self.schedule_storage_usage_collection().boxed_local().await; } diff --git a/src/adapter/src/coord/sequencer.rs b/src/adapter/src/coord/sequencer.rs index 3ba9ff5920eaa..e01e47bfb6036 100644 --- a/src/adapter/src/coord/sequencer.rs +++ b/src/adapter/src/coord/sequencer.rs @@ -22,12 +22,13 @@ use mz_repr::{CatalogItemId, Diff, GlobalId}; use mz_sql::catalog::CatalogError; use mz_sql::names::ResolvedIds; use mz_sql::plan::{ - self, AbortTransactionPlan, CommitTransactionPlan, CreateRolePlan, CreateSourcePlanBundle, - FetchPlan, MutationKind, Params, Plan, PlanKind, RaisePlan, + self, AbortTransactionPlan, CommitTransactionPlan, CopyFromSource, CreateRolePlan, + CreateSourcePlanBundle, FetchPlan, MutationKind, Params, Plan, PlanKind, RaisePlan, }; use mz_sql::rbac; use mz_sql::session::metadata::SessionMetadata; use mz_sql_parser::ast::{Raw, Statement}; +use mz_storage_client::client::TableData; use mz_storage_types::connections::inline::IntoInlineConnection; use std::sync::Arc; use tokio::sync::oneshot; @@ -359,18 +360,23 @@ impl Coordinator { self.sequence_peek(ctx, show_columns_plan.select_plan, target_cluster, max) .await; } - Plan::CopyFrom(plan) => { - let (tx, _, session, ctx_extra) = ctx.into_parts(); - tx.send( - Ok(ExecuteResponse::CopyFrom { - id: plan.id, - columns: plan.columns, - params: plan.params, - ctx_extra, - }), - session, - ); - } + Plan::CopyFrom(plan) => match plan.source { + CopyFromSource::Stdin => { + let (tx, _, session, ctx_extra) = ctx.into_parts(); + tx.send( + Ok(ExecuteResponse::CopyFrom { + id: plan.id, + columns: plan.columns, + params: plan.params, + ctx_extra, + }), + session, + ); + } + CopyFromSource::Url(_) => { + self.sequence_copy_from(ctx, plan, target_cluster).await; + } + }, Plan::ExplainPlan(plan) => { self.sequence_explain_plan(ctx, plan, target_cluster).await; } @@ -827,7 +833,7 @@ impl Coordinator { session.add_transaction_ops(TransactionOps::Writes(vec![WriteOp { id: plan.id, - rows: plan.updates, + rows: TableData::Rows(plan.updates), }]))?; if !plan.returning.is_empty() { let finishing = RowSetFinishing { diff --git a/src/adapter/src/coord/sequencer/inner.rs b/src/adapter/src/coord/sequencer/inner.rs index d2afff2f0575a..3e3f5b8fc7278 100644 --- a/src/adapter/src/coord/sequencer/inner.rs +++ b/src/adapter/src/coord/sequencer/inner.rs @@ -52,6 +52,7 @@ use mz_sql::names::{ use mz_sql::plan::{ConnectionDetails, NetworkPolicyRule, StatementContext}; use mz_sql::pure::{generate_subsource_statements, PurifiedSourceExport}; use mz_storage_types::sinks::StorageSinkDesc; +use smallvec::SmallVec; use timely::progress::Timestamp as TimelyTimestamp; // Import `plan` module, but only import select elements to avoid merge conflicts on use statements. use mz_adapter_types::connection::ConnectionId; @@ -117,6 +118,7 @@ use crate::util::{viewable_variables, ClientTransmitter, ResultExt}; use crate::{PeekResponseUnary, ReadHolds}; mod cluster; +mod copy_from; mod create_continual_task; mod create_index; mod create_materialized_view; @@ -2198,10 +2200,10 @@ impl Coordinator { }, }; - let mut collected_writes: BTreeMap> = BTreeMap::new(); + let mut collected_writes: BTreeMap> = BTreeMap::new(); for WriteOp { id, rows } in writes { let total_rows = collected_writes.entry(id).or_default(); - total_rows.extend(rows); + total_rows.push(rows); } self.submit_write(PendingWriteTxn::User { diff --git a/src/adapter/src/coord/sequencer/inner/copy_from.rs b/src/adapter/src/coord/sequencer/inner/copy_from.rs new file mode 100644 index 0000000000000..1606a8a159e0f --- /dev/null +++ b/src/adapter/src/coord/sequencer/inner/copy_from.rs @@ -0,0 +1,185 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use mz_adapter_types::connection::ConnectionId; +use mz_ore::cast::CastInto; +use mz_persist_client::batch::ProtoBatch; +use mz_repr::{CatalogItemId, Datum, RowArena}; +use mz_sql::plan::{self, CopyFromSource, HirScalarExpr}; +use mz_sql::session::metadata::SessionMetadata; +use mz_storage_client::client::TableData; +use mz_storage_types::oneshot_sources::OneshotIngestionRequest; +use smallvec::SmallVec; +use url::Url; + +use crate::coord::sequencer::inner::return_if_err; +use crate::coord::{Coordinator, TargetCluster}; +use crate::optimize::dataflows::{prep_scalar_expr, EvalTime, ExprPrepStyle}; +use crate::session::{TransactionOps, WriteOp}; +use crate::{AdapterError, ExecuteContext, ExecuteResponse}; + +impl Coordinator { + pub(crate) async fn sequence_copy_from( + &mut self, + ctx: ExecuteContext, + plan: plan::CopyFromPlan, + target_cluster: TargetCluster, + ) { + let plan::CopyFromPlan { + id, + source, + columns: _, + params: _, + } = plan; + + let from_expr = match source { + CopyFromSource::Url(from_expr) => from_expr, + CopyFromSource::Stdin => { + unreachable!("COPY FROM STDIN should be handled elsewhere") + } + }; + + let eval_url = |from: HirScalarExpr| -> Result { + let style = ExprPrepStyle::OneShot { + logical_time: EvalTime::NotAvailable, + session: ctx.session(), + catalog_state: self.catalog().state(), + }; + let mut from = from.lower_uncorrelated()?; + prep_scalar_expr(&mut from, style)?; + + // TODO(cf3): Add structured errors for the below uses of `coord_bail!` + // and AdapterError::Unstructured. + let temp_storage = RowArena::new(); + let eval_result = from.eval(&[], &temp_storage)?; + let eval_string = match eval_result { + Datum::Null => coord_bail!("COPY FROM target value cannot be NULL"), + Datum::String(url_str) => url_str, + other => coord_bail!("programming error! COPY FROM target cannot be {other}"), + }; + + Url::parse(eval_string) + .map_err(|err| AdapterError::Unstructured(anyhow::anyhow!("{err}"))) + }; + let url = return_if_err!(eval_url(from_expr), ctx); + + // We check in planning that we're copying into a Table, but be defensive. + let Some(dest_table) = self.catalog().get_entry(&id).table() else { + let typ = self.catalog().get_entry(&id).item().typ(); + let msg = format!("programming error: expected a Table found {typ:?}"); + return ctx.retire(Err(AdapterError::Unstructured(anyhow::anyhow!(msg)))); + }; + + let collection_id = dest_table.global_id_writes(); + let (_, ingestion_id) = self.transient_id_gen.allocate_id(); + let request = OneshotIngestionRequest { + source: mz_storage_types::oneshot_sources::ContentSource::Http { url }, + format: mz_storage_types::oneshot_sources::ContentFormat::Csv, + }; + + let target_cluster = match self + .catalog() + .resolve_target_cluster(target_cluster, ctx.session()) + { + Ok(cluster) => cluster, + Err(err) => { + return ctx.retire(Err(err)); + } + }; + let cluster_id = target_cluster.id; + + // When we finish staging the Batches in Persist, we'll send a command + // to the Coordinator. + let command_tx = self.internal_cmd_tx.clone(); + let conn_id = ctx.session().conn_id().clone(); + let closure = Box::new(move |batches| { + let _ = command_tx.send(crate::coord::Message::StagedBatches { + conn_id, + table_id: id, + batches, + }); + }); + // Stash the execute context so we can cancel the COPY. + self.active_copies + .insert(ctx.session().conn_id().clone(), ctx); + + let _result = self + .controller + .storage + .create_oneshot_ingestion(ingestion_id, collection_id, cluster_id, request, closure) + .await; + } + + pub(crate) fn commit_staged_batches( + &mut self, + conn_id: ConnectionId, + table_id: CatalogItemId, + batches: Vec>, + ) { + let Some(mut ctx) = self.active_copies.remove(&conn_id) else { + tracing::warn!(?conn_id, "got response for canceled COPY FROM"); + return; + }; + + let mut all_batches = SmallVec::with_capacity(batches.len()); + let mut all_errors = SmallVec::<[String; 1]>::with_capacity(batches.len()); + let mut row_count = 0u64; + + for maybe_batch in batches { + match maybe_batch { + Ok(batch) => { + let count = batch.batch.as_ref().map(|b| b.len).unwrap_or(0); + all_batches.push(batch); + row_count = row_count.saturating_add(count); + } + Err(err) => all_errors.push(err), + } + } + + // If we got any errors we need to fail the whole operation. + if let Some(error) = all_errors.pop() { + tracing::warn!(?error, ?all_errors, "failed COPY FROM"); + + // TODO(cf1): Cleanup the existing ProtoBatches to prevent leaking them. + // TODO(cf2): Carry structured errors all the way through. + + ctx.retire(Err(AdapterError::Unstructured(anyhow::anyhow!( + "COPY FROM: {error}" + )))); + + return; + } + + // Stage a WriteOp, then when the Session is retired we complete the + // transaction, which handles acquiring the write lock for `table_id`, + // advancing the timestamps of the staged batches, and waiting for + // everything to complete before sending a response to the client. + let stage_write = ctx + .session_mut() + .add_transaction_ops(TransactionOps::Writes(vec![WriteOp { + id: table_id, + rows: TableData::Batches(all_batches), + }])); + + if let Err(err) = stage_write { + ctx.retire(Err(err)); + } else { + ctx.retire(Ok(ExecuteResponse::Copied(row_count.cast_into()))); + } + } + + /// Cancel any active `COPY FROM` statements/oneshot ingestions. + #[mz_ore::instrument(level = "debug")] + pub(crate) fn cancel_pending_copy(&mut self, conn_id: &ConnectionId) { + // TODO(cf1): Also cancel the dataflow running on clusterd. + if let Some(ctx) = self.active_copies.remove(conn_id) { + ctx.retire(Err(AdapterError::Canceled)); + } + } +} diff --git a/src/adapter/src/session.rs b/src/adapter/src/session.rs index ad3a0d08ab6d8..8594e1bdcd1b7 100644 --- a/src/adapter/src/session.rs +++ b/src/adapter/src/session.rs @@ -29,7 +29,7 @@ use mz_ore::now::{EpochMillis, NowFn}; use mz_pgwire_common::Format; use mz_repr::role_id::RoleId; use mz_repr::user::ExternalUserMetadata; -use mz_repr::{CatalogItemId, Datum, Diff, Row, RowIterator, ScalarType, TimestampManipulation}; +use mz_repr::{CatalogItemId, Datum, Row, RowIterator, ScalarType, TimestampManipulation}; use mz_sql::ast::{AstInfo, Raw, Statement, TransactionAccessMode}; use mz_sql::plan::{Params, PlanContext, QueryWhen, StatementDesc}; use mz_sql::session::metadata::SessionMetadata; @@ -42,6 +42,7 @@ pub use mz_sql::session::vars::{ SERVER_MINOR_VERSION, SERVER_PATCH_VERSION, }; use mz_sql_parser::ast::TransactionIsolationLevel; +use mz_storage_client::client::TableData; use mz_storage_types::sources::Timeline; use qcell::{QCell, QCellOwner}; use rand::Rng; @@ -1447,7 +1448,7 @@ pub struct WriteOp { /// The target table. pub id: CatalogItemId, /// The data rows. - pub rows: Vec<(Row, Diff)>, + pub rows: TableData, } /// Whether a transaction requires linearization. diff --git a/src/mz/Cargo.toml b/src/mz/Cargo.toml index 340f18cafc756..949f96dbbf321 100644 --- a/src/mz/Cargo.toml +++ b/src/mz/Cargo.toml @@ -25,7 +25,7 @@ mz-ore = { path = "../ore", features = ["async", "cli", "test"] } open = "3.2.0" openssl-probe = "0.1.2" hyper = "1.4.1" -reqwest = { version = "0.11", features = ["blocking", "json"] } +reqwest = { version = "0.12", features = ["blocking", "json", "default-tls", "charset", "http2"], default-features = false } rpassword = "7.2.0" semver = "1.0.16" serde = { version = "1.0.152", features = ["derive"] } diff --git a/src/sql-parser/src/parser.rs b/src/sql-parser/src/parser.rs index 16c92ab92fe3a..1164e70259272 100644 --- a/src/sql-parser/src/parser.rs +++ b/src/sql-parser/src/parser.rs @@ -6333,9 +6333,12 @@ impl<'a> Parser<'a> { ) .map_no_statement_parser_err(); } - self.expect_keyword(STDIN) - .map_parser_err(StatementKind::Copy)?; - (CopyDirection::From, CopyTarget::Stdin) + if self.parse_keyword(STDIN) { + (CopyDirection::From, CopyTarget::Stdin) + } else { + let url_expr = self.parse_expr().map_parser_err(StatementKind::Copy)?; + (CopyDirection::From, CopyTarget::Expr(url_expr)) + } } TO => { if self.parse_keyword(STDOUT) { diff --git a/src/sql/src/plan.rs b/src/sql/src/plan.rs index e8534788df346..61bf5deef2dbc 100644 --- a/src/sql/src/plan.rs +++ b/src/sql/src/plan.rs @@ -930,10 +930,21 @@ pub struct ShowColumnsPlan { #[derive(Debug)] pub struct CopyFromPlan { pub id: CatalogItemId, + pub source: CopyFromSource, pub columns: Vec, pub params: CopyFormatParams<'static>, } +#[derive(Debug)] +pub enum CopyFromSource { + /// Copying from a file local to the user, transmitted via pgwire. + Stdin, + /// A remote resource, e.g. S3. + /// + /// The contained [`HirScalarExpr`] evaluates to the Url for the remote resource. + Url(HirScalarExpr), +} + #[derive(Debug, Clone)] pub struct CopyToPlan { /// The select query plan whose data will be copied to destination uri. diff --git a/src/sql/src/plan/statement/dml.rs b/src/sql/src/plan/statement/dml.rs index 9ee68720d7a22..232906ed47793 100644 --- a/src/sql/src/plan/statement/dml.rs +++ b/src/sql/src/plan/statement/dml.rs @@ -53,7 +53,6 @@ use crate::normalize; use crate::plan::query::{plan_expr, plan_up_to, ExprContext, QueryLifetime}; use crate::plan::scope::Scope; use crate::plan::statement::{ddl, StatementContext, StatementDesc}; -use crate::plan::with_options; use crate::plan::{ self, side_effecting_func, transform_ast, CopyToPlan, CreateSinkPlan, ExplainPushdownPlan, ExplainSinkSchemaPlan, ExplainTimestampPlan, @@ -62,7 +61,8 @@ use crate::plan::{ query, CopyFormat, CopyFromPlan, ExplainPlanPlan, InsertPlan, MutationKind, Params, Plan, PlanError, QueryContext, ReadThenWritePlan, SelectPlan, SubscribeFrom, SubscribePlan, }; -use crate::session::vars; +use crate::plan::{with_options, CopyFromSource}; +use crate::session::vars::{self, ENABLE_COPY_FROM_REMOTE}; // TODO(benesch): currently, describing a `SELECT` or `INSERT` query // plans the whole query to determine its shape and parameter types, @@ -1101,6 +1101,7 @@ fn plan_copy_to_expr( fn plan_copy_from( scx: &StatementContext, + target: &CopyTarget, table_name: ResolvedItemName, columns: Vec, format: CopyFormat, @@ -1113,6 +1114,32 @@ fn plan_copy_from( } } + let source = match target { + CopyTarget::Stdin => CopyFromSource::Stdin, + CopyTarget::Expr(from) => { + scx.require_feature_flag(&ENABLE_COPY_FROM_REMOTE)?; + + // Converting the expr to an HirScalarExpr + let mut from_expr = from.clone(); + transform_ast::transform(scx, &mut from_expr)?; + let relation_type = RelationDesc::empty(); + let ecx = &ExprContext { + qcx: &QueryContext::root(scx, QueryLifetime::OneShot), + name: "COPY FROM target", + scope: &Scope::empty(), + relation_type: relation_type.typ(), + allow_aggregates: false, + allow_subqueries: false, + allow_parameters: false, + allow_windows: false, + }; + let from = plan_expr(ecx, &from_expr)?.type_as(ecx, &ScalarType::String)?; + + CopyFromSource::Url(from) + } + CopyTarget::Stdout => bail_never_supported!("COPY FROM {} not supported", target), + }; + let params = match format { CopyFormat::Text => { only_available_with_csv(options.quote, "quote")?; @@ -1148,6 +1175,7 @@ fn plan_copy_from( let (id, _, columns) = query::plan_copy_from(scx, table_name, columns)?; Ok(Plan::CopyFrom(CopyFromPlan { id, + source, columns, params, })) @@ -1224,9 +1252,10 @@ pub fn plan_copy( )?), } } - (CopyDirection::From, CopyTarget::Stdin) => match relation { + (CopyDirection::From, target) => match relation { CopyRelation::Named { name, columns } => plan_copy_from( scx, + target, name, columns, format.unwrap_or(CopyFormat::Text), diff --git a/src/sql/src/rbac.rs b/src/sql/src/rbac.rs index 2f57bde11cd1f..fabb8603ceb10 100644 --- a/src/sql/src/rbac.rs +++ b/src/sql/src/rbac.rs @@ -838,6 +838,7 @@ fn generate_rbac_requirements( } Plan::CopyFrom(plan::CopyFromPlan { id, + source: _, columns: _, params: _, }) => RbacRequirements { diff --git a/src/sql/src/session/vars/definitions.rs b/src/sql/src/session/vars/definitions.rs index 3d1cccbb5c9f6..c0caa8d08d91a 100644 --- a/src/sql/src/session/vars/definitions.rs +++ b/src/sql/src/session/vars/definitions.rs @@ -2182,6 +2182,12 @@ feature_flags!( default: false, enable_for_item_parsing: true, }, + { + name: enable_copy_from_remote, + desc: "Whether to allow COPY FROM .", + default: false, + enable_for_item_parsing: false, + }, ); impl From<&super::SystemVars> for OptimizerFeatures { diff --git a/src/storage-client/BUILD.bazel b/src/storage-client/BUILD.bazel index 93bf765e34809..37394bc4d4fa2 100644 --- a/src/storage-client/BUILD.bazel +++ b/src/storage-client/BUILD.bazel @@ -131,6 +131,7 @@ filegroup( "src/client.proto", "src/statistics.proto", "//src/cluster-client:all_protos", + "//src/persist-client:all_protos", "//src/proto:all_protos", "//src/repr:all_protos", "//src/storage-types:all_protos", diff --git a/src/storage-client/Cargo.toml b/src/storage-client/Cargo.toml index 33aa35bbf2f98..79d85d88d5e3d 100644 --- a/src/storage-client/Cargo.toml +++ b/src/storage-client/Cargo.toml @@ -44,6 +44,7 @@ rdkafka = { version = "0.29.0", features = [ ] } serde = { version = "1.0.152", features = ["derive"] } serde_json = { version = "1.0.125" } +smallvec = { version = "1.10.0", features = ["serde", "union"] } static_assertions = "1.1" timely = "0.15.1" tokio = { version = "1.38.0", features = [ diff --git a/src/storage-client/src/client.proto b/src/storage-client/src/client.proto index a7745d2cf350f..992ffd0150fe8 100644 --- a/src/storage-client/src/client.proto +++ b/src/storage-client/src/client.proto @@ -15,11 +15,14 @@ package mz_storage_client.client; import "cluster-client/src/client.proto"; import "google/protobuf/empty.proto"; +import "persist-client/src/batch.proto"; import "proto/src/chrono.proto"; import "proto/src/proto.proto"; import "repr/src/antichain.proto"; import "repr/src/global_id.proto"; import "storage-client/src/statistics.proto"; +import "storage-types/src/controller.proto"; +import "storage-types/src/oneshot_sources.proto"; import "storage-types/src/parameters.proto"; import "storage-types/src/sinks.proto"; import "storage-types/src/sources.proto"; @@ -45,6 +48,13 @@ message ProtoRunIngestionCommand { mz_storage_types.sources.ProtoIngestionDescription description = 2; } +message ProtoRunOneshotIngestionCommand { + mz_repr.global_id.ProtoGlobalId ingestion_id = 1; + mz_repr.global_id.ProtoGlobalId collection_id = 2; + mz_storage_types.controller.ProtoCollectionMetadata storage_metadata = 3; + mz_storage_types.oneshot_sources.ProtoOneshotIngestionRequest request = 4; +} + message ProtoCreateSources { repeated ProtoRunIngestionCommand sources = 1; } @@ -84,6 +94,7 @@ message ProtoStorageCommand { google.protobuf.Empty allow_writes = 7; ProtoRunSinks run_sinks = 4; mz_storage_types.parameters.ProtoStorageParameters update_configuration = 5; + ProtoRunOneshotIngestionCommand oneshot_ingestion = 10; } } @@ -121,10 +132,27 @@ message ProtoStorageResponse { repeated mz_repr.global_id.ProtoGlobalId ids = 1; } + message ProtoStagedBatches { + message BatchResult { + oneof value { + mz_persist_client.batch.ProtoBatch batch = 1; + string error = 2; + } + } + + message Inner { + mz_repr.global_id.ProtoGlobalId id = 1; + repeated BatchResult batches = 2; + } + + repeated Inner batches = 1; + } + oneof kind { ProtoFrontierUppersKind frontier_uppers = 1; ProtoDroppedIds dropped_ids = 2; ProtoStatisticsUpdates stats = 3; ProtoStatusUpdates status_updates = 4; + ProtoStagedBatches staged_batches = 5; } } diff --git a/src/storage-client/src/client.rs b/src/storage-client/src/client.rs index bdf8c87b9742f..791dea68e1ba9 100644 --- a/src/storage-client/src/client.rs +++ b/src/storage-client/src/client.rs @@ -21,11 +21,14 @@ use std::iter; use async_trait::async_trait; use differential_dataflow::lattice::Lattice; use mz_cluster_client::client::{ClusterStartupEpoch, TimelyConfig, TryIntoTimelyConfig}; +use mz_ore::assert_none; +use mz_persist_client::batch::ProtoBatch; use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError}; use mz_repr::{Diff, GlobalId, Row}; use mz_service::client::{GenericClient, Partitionable, PartitionedState}; use mz_service::grpc::{GrpcClient, GrpcServer, ProtoServiceTypes, ResponseStream}; use mz_storage_types::controller::CollectionMetadata; +use mz_storage_types::oneshot_sources::OneshotIngestionRequest; use mz_storage_types::parameters::StorageParameters; use mz_storage_types::sinks::{MetadataFilled, StorageSinkDesc}; use mz_storage_types::sources::IngestionDescription; @@ -33,6 +36,7 @@ use mz_timely_util::progress::any_antichain; use proptest::prelude::{any, Arbitrary}; use proptest::strategy::{BoxedStrategy, Strategy, Union}; use serde::{Deserialize, Serialize}; +use smallvec::SmallVec; use timely::progress::frontier::{Antichain, MutableAntichain}; use timely::PartialOrder; use tonic::{Request, Status as TonicStatus, Streaming}; @@ -119,6 +123,12 @@ pub enum StorageCommand { UpdateConfiguration(StorageParameters), /// Run the enumerated sources, each associated with its identifier. RunIngestions(Vec), + /// Run a dataflow which will ingest data from an external source and only __stage__ it in + /// Persist. + /// + /// Unlike regular ingestions/sources, some other component (e.g. `environmentd`) is + /// responsible for linking the staged data into a shard. + RunOneshotIngestion(RunOneshotIngestionCommand), /// Enable compaction in storage-managed collections. /// /// Each entry in the vector names a collection and provides a frontier after which @@ -137,7 +147,10 @@ impl StorageCommand { | AllowWrites | UpdateConfiguration(_) | AllowCompaction(_) => false, - RunIngestions(_) | RunSinks(_) => true, + // TODO(cf2): multi-replica oneshot ingestions. At the moment returning + // true here means we can't run `COPY FROM` on multi-replica clusters, this + // should be easy enough to support though. + RunIngestions(_) | RunSinks(_) | RunOneshotIngestion(_) => true, } } } @@ -184,6 +197,47 @@ impl RustType for RunIngestionCommand { } } +/// A command that starts ingesting the given ingestion description +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +pub struct RunOneshotIngestionCommand { + /// The ID of the ingestion dataflow. + pub ingestion_id: GlobalId, + /// The ID of collection we'll stage batches for. + pub collection_id: GlobalId, + /// Metadata for the collection we'll stage batches for. + pub collection_meta: CollectionMetadata, + /// Details for the oneshot ingestion. + pub request: OneshotIngestionRequest, +} + +impl RustType for RunOneshotIngestionCommand { + fn into_proto(&self) -> ProtoRunOneshotIngestionCommand { + ProtoRunOneshotIngestionCommand { + ingestion_id: Some(self.ingestion_id.into_proto()), + collection_id: Some(self.collection_id.into_proto()), + storage_metadata: Some(self.collection_meta.into_proto()), + request: Some(self.request.into_proto()), + } + } + + fn from_proto(proto: ProtoRunOneshotIngestionCommand) -> Result { + Ok(RunOneshotIngestionCommand { + ingestion_id: proto + .ingestion_id + .into_rust_if_some("ProtoRunOneshotIngestionCommand::ingestion_id")?, + collection_id: proto + .collection_id + .into_rust_if_some("ProtoRunOneshotIngestionCommand::collection_id")?, + collection_meta: proto + .storage_metadata + .into_rust_if_some("ProtoRunOneshotIngestionCommand::storage_metadata")?, + request: proto + .request + .into_rust_if_some("ProtoRunOneshotIngestionCommand::request")?, + }) + } +} + impl RustType for RunSinkCommand { fn into_proto(&self) -> ProtoRunSinkCommand { ProtoRunSinkCommand { @@ -246,6 +300,9 @@ impl RustType for StorageCommand { StorageCommand::RunIngestions(sources) => CreateSources(ProtoCreateSources { sources: sources.into_proto(), }), + StorageCommand::RunOneshotIngestion(oneshot) => { + OneshotIngestion(oneshot.into_proto()) + } StorageCommand::RunSinks(sinks) => RunSinks(ProtoRunSinks { sinks: sinks.into_proto(), }), @@ -277,6 +334,9 @@ impl RustType for StorageCommand { Some(RunSinks(ProtoRunSinks { sinks })) => { Ok(StorageCommand::RunSinks(sinks.into_rust()?)) } + Some(OneshotIngestion(oneshot)) => { + Ok(StorageCommand::RunOneshotIngestion(oneshot.into_rust()?)) + } None => Err(TryFromProtoError::missing_field( "ProtoStorageCommand::kind", )), @@ -543,6 +603,8 @@ pub enum StorageResponse { FrontierUppers(Vec<(GlobalId, Antichain)>), /// Punctuation indicates that no more responses will be transmitted for the specified ids DroppedIds(BTreeSet), + /// Batches that have been staged in Persist and maybe will be linked into a shard. + StagedBatches(BTreeMap>>), /// A list of statistics updates, currently only for sources. StatisticsUpdates(Vec, Vec), @@ -554,7 +616,9 @@ pub enum StorageResponse { impl RustType for StorageResponse { fn into_proto(&self) -> ProtoStorageResponse { use proto_storage_response::Kind::*; - use proto_storage_response::{ProtoDroppedIds, ProtoStatisticsUpdates, ProtoStatusUpdates}; + use proto_storage_response::{ + ProtoDroppedIds, ProtoStagedBatches, ProtoStatisticsUpdates, ProtoStatusUpdates, + }; ProtoStorageResponse { kind: Some(match self { StorageResponse::FrontierUppers(traces) => FrontierUppers(traces.into_proto()), @@ -576,6 +640,29 @@ impl RustType for StorageResponse { StorageResponse::StatusUpdates(updates) => StatusUpdates(ProtoStatusUpdates { updates: updates.into_proto(), }), + StorageResponse::StagedBatches(staged) => { + let batches = staged + .into_iter() + .map(|(collection_id, batches)| { + let batches = batches + .into_iter() + .map(|result| { + use proto_storage_response::proto_staged_batches::batch_result::Value; + let value = match result { + Ok(batch) => Value::Batch(batch.clone()), + Err(err) => Value::Error(err.clone()), + }; + proto_storage_response::proto_staged_batches::BatchResult { value: Some(value) } + }) + .collect(); + proto_storage_response::proto_staged_batches::Inner { + id: Some(collection_id.into_proto()), + batches, + } + }) + .collect(); + StagedBatches(ProtoStagedBatches { batches }) + } }), } } @@ -605,6 +692,35 @@ impl RustType for StorageResponse { Some(StatusUpdates(ProtoStatusUpdates { updates })) => { Ok(StorageResponse::StatusUpdates(updates.into_rust()?)) } + Some(StagedBatches(staged)) => { + let batches: BTreeMap<_, _> = staged + .batches + .into_iter() + .map(|inner| { + let id = inner + .id + .into_rust_if_some("ProtoStagedBatches::Inner::id")?; + + let mut batches = Vec::with_capacity(inner.batches.len()); + for maybe_batch in inner.batches { + use proto_storage_response::proto_staged_batches::batch_result::Value; + + let value = maybe_batch.value.ok_or_else(|| { + TryFromProtoError::missing_field("BatchResult::value") + })?; + let batch = match value { + Value::Batch(batch) => Ok(batch), + Value::Error(err) => Err(err), + }; + batches.push(batch); + } + + Ok::<_, TryFromProtoError>((id, batches)) + }) + .collect::>()?; + + Ok(StorageResponse::StagedBatches(batches)) + } None => Err(TryFromProtoError::missing_field( "ProtoStorageResponse::kind", )), @@ -638,6 +754,8 @@ pub struct PartitionedStorageState { /// Upper frontiers for sources and sinks, both unioned across all partitions and from each /// individual partition. uppers: BTreeMap, Vec>>)>, + /// Staged batches from oneshot sources that will get appended by `environmentd`. + oneshot_source_responses: BTreeMap>>>, } impl Partitionable, StorageResponse> @@ -651,6 +769,7 @@ where PartitionedStorageState { parts, uppers: BTreeMap::new(), + oneshot_source_responses: BTreeMap::new(), } } } @@ -681,7 +800,8 @@ where StorageCommand::InitializationComplete | StorageCommand::AllowWrites | StorageCommand::UpdateConfiguration(_) - | StorageCommand::AllowCompaction(_) => {} + | StorageCommand::AllowCompaction(_) + | StorageCommand::RunOneshotIngestion(_) => {} }; } @@ -805,6 +925,37 @@ where StorageResponse::StatusUpdates(updates) => { Some(Ok(StorageResponse::StatusUpdates(updates))) } + StorageResponse::StagedBatches(batches) => { + let mut finished_batches = BTreeMap::new(); + + for (collection_id, batches) in batches { + tracing::info!(%shard_id, %collection_id, "got batch"); + + let entry = self + .oneshot_source_responses + .entry(collection_id) + .or_default(); + let novel = entry.insert(shard_id, batches); + assert_none!(novel, "Duplicate oneshot source response"); + + // Check if we've received responses from all shards. + if entry.len() == self.parts { + let entry = self + .oneshot_source_responses + .remove(&collection_id) + .expect("checked above"); + let all_batches: Vec<_> = entry.into_values().flatten().collect(); + + finished_batches.insert(collection_id, all_batches); + } + } + + if !finished_batches.is_empty() { + Some(Ok(StorageResponse::StagedBatches(finished_batches))) + } else { + None + } + } } } } @@ -820,11 +971,32 @@ pub struct Update { #[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] /// A batch of updates to be fed to a local input; however, the input must /// determine the most appropriate timestamps to use. +/// +/// TODO(cf2): Can we remove this and use only on [`TableData`]. pub struct TimestamplessUpdate { pub row: Row, pub diff: Diff, } +#[derive(Debug, Clone, PartialEq)] +pub enum TableData { + /// Rows that still need to be persisted and appended. + /// + /// The contained [`Row`]s are _not_ consolidated. + Rows(Vec<(Row, Diff)>), + /// Batches already staged in Persist ready to be appended. + Batches(SmallVec<[ProtoBatch; 1]>), +} + +impl TableData { + pub fn is_empty(&self) -> bool { + match self { + TableData::Rows(rows) => rows.is_empty(), + TableData::Batches(batches) => batches.is_empty(), + } + } +} + impl RustType for (GlobalId, Antichain) { fn into_proto(&self) -> ProtoTrace { ProtoTrace { diff --git a/src/storage-client/src/controller.rs b/src/storage-client/src/controller.rs index c8d9b61973217..346deb7b41294 100644 --- a/src/storage-client/src/controller.rs +++ b/src/storage-client/src/controller.rs @@ -32,6 +32,7 @@ use mz_cluster_client::client::ClusterReplicaLocation; use mz_cluster_client::metrics::WallclockLagMetrics; use mz_cluster_client::ReplicaId; use mz_ore::collections::CollectionExt; +use mz_persist_client::batch::ProtoBatch; use mz_persist_client::read::{Cursor, ReadHandle}; use mz_persist_client::stats::{SnapshotPartsStats, SnapshotStats}; use mz_persist_types::schema::SchemaId; @@ -41,6 +42,7 @@ use mz_storage_types::configuration::StorageConfiguration; use mz_storage_types::connections::inline::InlinedConnection; use mz_storage_types::controller::{CollectionMetadata, StorageError}; use mz_storage_types::instances::StorageInstanceId; +use mz_storage_types::oneshot_sources::{OneshotIngestionRequest, OneshotResultCallback}; use mz_storage_types::parameters::StorageParameters; use mz_storage_types::read_holds::{ReadHold, ReadHoldError}; use mz_storage_types::read_policy::ReadPolicy; @@ -54,7 +56,7 @@ use timely::progress::Timestamp as TimelyTimestamp; use timely::progress::{Antichain, Timestamp}; use tokio::sync::{mpsc, oneshot}; -use crate::client::{AppendOnlyUpdate, StatusUpdate, TimestamplessUpdate}; +use crate::client::{AppendOnlyUpdate, StatusUpdate, TableData}; use crate::statistics::WebhookStatistics; #[derive(Clone, Copy, Debug, Serialize, Deserialize, Eq, PartialEq, Hash, PartialOrd, Ord)] @@ -508,6 +510,16 @@ pub trait StorageController: Debug { exports: Vec<(GlobalId, ExportDescription)>, ) -> Result<(), StorageError>; + /// Create a oneshot ingestion. + async fn create_oneshot_ingestion( + &mut self, + ingestion_id: GlobalId, + collection_id: GlobalId, + instance_id: StorageInstanceId, + request: OneshotIngestionRequest, + result_tx: OneshotResultCallback, + ) -> Result<(), StorageError>; + /// Alter the sink identified by the given id to match the provided `ExportDescription`. async fn alter_export( &mut self, @@ -582,7 +594,7 @@ pub trait StorageController: Debug { &mut self, write_ts: Self::Timestamp, advance_to: Self::Timestamp, - commands: Vec<(GlobalId, Vec)>, + commands: Vec<(GlobalId, Vec)>, ) -> Result< tokio::sync::oneshot::Receiver>>, StorageError, diff --git a/src/storage-controller/src/history.rs b/src/storage-controller/src/history.rs index 9d8f8732e8225..549fde90a845c 100644 --- a/src/storage-controller/src/history.rs +++ b/src/storage-controller/src/history.rs @@ -71,6 +71,9 @@ impl CommandHistory { RunIngestions(x) => metrics.run_ingestions_count.add(x.len().cast_into()), RunSinks(x) => metrics.run_sinks_count.add(x.len().cast_into()), AllowCompaction(x) => metrics.allow_compaction_count.add(x.len().cast_into()), + RunOneshotIngestion(_) => { + // TODO(cf2): Add metrics for oneshot ingestions. + } } } } @@ -89,6 +92,7 @@ impl CommandHistory { // this scenario, we only want to send the most recent definition of the object. let mut final_ingestions = BTreeMap::new(); let mut final_sinks = BTreeMap::new(); + let mut final_oneshot_ingestions = BTreeMap::new(); // Collect only the final configuration. // Note that this means the final configuration is applied to all objects installed on the @@ -111,11 +115,15 @@ impl CommandHistory { final_sinks.extend(cmds.into_iter().map(|c| (c.id, c))); } AllowCompaction(updates) => final_compactions.extend(updates), + RunOneshotIngestion(oneshot) => { + final_oneshot_ingestions.insert(oneshot.ingestion_id, oneshot); + } } } let mut run_ingestions = Vec::new(); let mut run_sinks = Vec::new(); + let mut run_oneshot_ingestions = Vec::new(); let mut allow_compaction = Vec::new(); // Discard ingestions that have been dropped, keep the rest. @@ -147,6 +155,10 @@ impl CommandHistory { run_sinks.push(sink); } + // TODO(cf1): Add a CancelOneshotIngestion command similar to CancelPeek + // that will compact/reduce away the RunOneshotIngestion. + run_oneshot_ingestions.extend(final_oneshot_ingestions.into_values()); + // Reconstitute the commands as a compact history. // // When we update `metrics`, we need to be careful to not transiently report incorrect @@ -180,6 +192,16 @@ impl CommandHistory { self.commands.push(StorageCommand::RunSinks(run_sinks)); } + // TODO(cf1): Add a CancelOneshotIngestion command, make sure we prevent + // re-sending commands for ingestions that we've already responded to. + if !run_oneshot_ingestions.is_empty() { + self.commands.extend( + run_oneshot_ingestions + .into_iter() + .map(|oneshot| StorageCommand::RunOneshotIngestion(oneshot)), + ); + } + let count = u64::cast_from(allow_compaction.len()); self.metrics.allow_compaction_count.set(count); if !allow_compaction.is_empty() { diff --git a/src/storage-controller/src/lib.rs b/src/storage-controller/src/lib.rs index 37114f8ce2834..c5285ef17853a 100644 --- a/src/storage-controller/src/lib.rs +++ b/src/storage-controller/src/lib.rs @@ -35,6 +35,7 @@ use mz_ore::metrics::MetricsRegistry; use mz_ore::now::{EpochMillis, NowFn}; use mz_ore::task::AbortOnDropHandle; use mz_ore::{assert_none, instrument, soft_panic_or_log}; +use mz_persist_client::batch::ProtoBatch; use mz_persist_client::cache::PersistClientCache; use mz_persist_client::cfg::USE_CRITICAL_SINCE_SNAPSHOT; use mz_persist_client::read::ReadHandle; @@ -50,8 +51,8 @@ use mz_repr::adt::interval::Interval; use mz_repr::adt::timestamp::CheckedTimestamp; use mz_repr::{Datum, Diff, GlobalId, RelationDesc, Row, TimestampManipulation}; use mz_storage_client::client::{ - ProtoStorageCommand, ProtoStorageResponse, RunIngestionCommand, RunSinkCommand, Status, - StatusUpdate, StorageCommand, StorageResponse, TimestamplessUpdate, + ProtoStorageCommand, ProtoStorageResponse, RunIngestionCommand, RunOneshotIngestionCommand, + RunSinkCommand, Status, StatusUpdate, StorageCommand, StorageResponse, TableData, }; use mz_storage_client::controller::{ BoxFuture, CollectionDescription, DataSource, ExportDescription, ExportState, @@ -72,6 +73,7 @@ use mz_storage_types::connections::inline::InlinedConnection; use mz_storage_types::connections::ConnectionContext; use mz_storage_types::controller::{AlterError, CollectionMetadata, StorageError, TxnsCodecRow}; use mz_storage_types::instances::StorageInstanceId; +use mz_storage_types::oneshot_sources::{OneshotIngestionRequest, OneshotResultCallback}; use mz_storage_types::parameters::StorageParameters; use mz_storage_types::read_holds::{ReadHold, ReadHoldError}; use mz_storage_types::read_policy::ReadPolicy; @@ -160,6 +162,9 @@ pub struct Controller + Tim /// Channel for receiving table handle drops. #[derivative(Debug = "ignore")] pending_table_handle_drops_rx: mpsc::UnboundedReceiver, + /// Closures that can be used to send responses from oneshot ingestions. + #[derivative(Debug = "ignore")] + pending_oneshot_ingestions: BTreeMap>, /// Interface for managed collections pub(crate) collection_manager: collection_mgmt::CollectionManager, @@ -1346,6 +1351,46 @@ where Ok(()) } + /// Create a oneshot ingestion. + async fn create_oneshot_ingestion( + &mut self, + ingestion_id: GlobalId, + collection_id: GlobalId, + instance_id: StorageInstanceId, + request: OneshotIngestionRequest, + result_tx: OneshotResultCallback, + ) -> Result<(), StorageError> { + let collection_meta = self + .collections + .get(&collection_id) + .ok_or_else(|| StorageError::IdentifierMissing(collection_id))? + .collection_metadata + .clone(); + let instance = self.instances.get_mut(&instance_id).ok_or_else(|| { + StorageError::ExportInstanceMissing { + storage_instance_id: instance_id, + export_id: ingestion_id, + } + })?; + let oneshot_cmd = RunOneshotIngestionCommand { + ingestion_id, + collection_id, + collection_meta, + request, + }; + + if !self.read_only { + instance.send(StorageCommand::RunOneshotIngestion(oneshot_cmd)); + let novel = self + .pending_oneshot_ingestions + .insert(ingestion_id, result_tx); + assert!(novel.is_none()); + Ok(()) + } else { + Err(StorageError::ReadOnly) + } + } + async fn alter_export( &mut self, id: GlobalId, @@ -1734,7 +1779,7 @@ where &mut self, write_ts: Self::Timestamp, advance_to: Self::Timestamp, - commands: Vec<(GlobalId, Vec)>, + commands: Vec<(GlobalId, Vec)>, ) -> Result< tokio::sync::oneshot::Receiver>>, StorageError, @@ -2066,6 +2111,16 @@ where } self.record_status_updates(updates); } + Some(StorageResponse::StagedBatches(batches)) => { + for (collection_id, batches) in batches { + match self.pending_oneshot_ingestions.remove(&collection_id) { + Some(sender) => (sender)(batches), + // TODO(cf2): When we support running COPY FROM on multiple + // replicas we can probably just ignore the case of `None`. + None => mz_ore::soft_panic_or_log!("no sender for {collection_id}!"), + } + } + } } // IDs of sources that were dropped whose statuses should be updated. @@ -2523,6 +2578,7 @@ where pending_compaction_commands: vec![], pending_table_handle_drops_tx, pending_table_handle_drops_rx, + pending_oneshot_ingestions: BTreeMap::default(), collection_manager, introspection_ids, introspection_tokens, diff --git a/src/storage-controller/src/persist_handles.rs b/src/storage-controller/src/persist_handles.rs index ebe15fbfb987a..2d95eeb4c1809 100644 --- a/src/storage-controller/src/persist_handles.rs +++ b/src/storage-controller/src/persist_handles.rs @@ -25,7 +25,7 @@ use mz_persist_client::write::WriteHandle; use mz_persist_client::ShardId; use mz_persist_types::Codec64; use mz_repr::{Diff, GlobalId, TimestampManipulation}; -use mz_storage_client::client::{TimestamplessUpdate, Update}; +use mz_storage_client::client::{TableData, Update}; use mz_storage_types::controller::{InvalidUpper, TxnsCodecRow}; use mz_storage_types::sources::SourceData; use mz_txn_wal::txns::{Tidy, TxnsHandle}; @@ -74,7 +74,7 @@ enum PersistTableWriteCmd { Append { write_ts: T, advance_to: T, - updates: Vec<(GlobalId, Vec)>, + updates: Vec<(GlobalId, Vec)>, tx: tokio::sync::oneshot::Sender>>, }, Shutdown, @@ -223,7 +223,7 @@ impl PersistTableWrite &self, write_ts: T, advance_to: T, - updates: Vec<(GlobalId, Vec)>, + updates: Vec<(GlobalId, Vec)>, ) -> tokio::sync::oneshot::Receiver>> { let (tx, rx) = tokio::sync::oneshot::channel(); if updates.is_empty() { @@ -380,7 +380,7 @@ impl TxnsTableWorker)>, + updates: Vec<(GlobalId, Vec)>, tx: tokio::sync::oneshot::Sender>>, ) { debug!( @@ -416,13 +416,27 @@ impl TxnsTableWorker { + for (row, diff) in updates { + let () = txn.write(data_id, SourceData(Ok(row)), (), diff).await; + } + } + TableData::Batches(batches) => { + for batch in batches { + let () = txn.write_batch(data_id, batch); + } + } + } } } // Sneak in any txns shard tidying from previous commits. diff --git a/src/storage-controller/src/persist_handles/read_only_table_worker.rs b/src/storage-controller/src/persist_handles/read_only_table_worker.rs index ec02223f3c272..afb75d6349405 100644 --- a/src/storage-controller/src/persist_handles/read_only_table_worker.rs +++ b/src/storage-controller/src/persist_handles/read_only_table_worker.rs @@ -18,7 +18,7 @@ use futures::FutureExt; use mz_persist_client::write::WriteHandle; use mz_persist_types::Codec64; use mz_repr::{Diff, GlobalId, TimestampManipulation}; -use mz_storage_client::client::Update; +use mz_storage_client::client::{TableData, Update}; use mz_storage_types::controller::InvalidUpper; use mz_storage_types::sources::SourceData; use timely::progress::{Antichain, Timestamp}; @@ -198,10 +198,22 @@ where // than nothing. old_span.follows_from(span.id()); } - let updates_with_ts = updates_no_ts.into_iter().map(|x| Update { - row: x.row, - timestamp: write_ts.clone(), - diff: x.diff, + let updates_with_ts = updates_no_ts.into_iter().flat_map(|x| match x { + TableData::Rows(rows) => { + let iter = rows.into_iter().map(|(row, diff)| Update { + row, + timestamp: write_ts.clone(), + diff, + }); + itertools::Either::Left(iter) + } + TableData::Batches(_) => { + // TODO(cf1): Handle Batches of updates in ReadOnlyTableWorker. + mz_ore::soft_panic_or_log!( + "handle Batches of updates in the ReadOnlyTableWorker" + ); + itertools::Either::Right(std::iter::empty()) + } }); updates.extend(updates_with_ts); old_new_upper.join_assign(&Antichain::from_elem(advance_to.clone())); diff --git a/src/storage-operators/Cargo.toml b/src/storage-operators/Cargo.toml index c87a81705e5a5..b5f54425f68ee 100644 --- a/src/storage-operators/Cargo.toml +++ b/src/storage-operators/Cargo.toml @@ -14,8 +14,10 @@ anyhow = "1.0.66" arrow = { version = "53.3.0", default-features = false } async-stream = "0.3.3" aws-types = "1.1.1" +bytes = "1.3.0" bytesize = "1.1.0" differential-dataflow = "0.13.2" +csv-async = { version = "1.3.0", features = ["tokio"] } futures = "0.3.25" http = "1.1.0" mz-aws-util = { path = "../aws-util" } @@ -33,12 +35,16 @@ mz-txn-wal = { path = "../txn-wal" } parquet = { version = "53.3.0", default-features = false, features = ["arrow", "snap"] } prometheus = { version = "0.13.3", default-features = false } proptest = { version = "1.6.0", default-features = false, features = ["std"] } +reqwest = { version = "0.11.13", features = ["stream"] } sentry = { version = "0.29.1" } serde = { version = "1.0.152", features = ["derive"] } timely = "0.15.1" thiserror = "1.0.37" tokio = { version = "1.38.0", features = ["fs", "rt", "sync", "test-util", "time"] } +tokio-stream = "0.1.11" +tokio-util = { version = "0.7.4", features = ["io"] } tracing = "0.1.37" +url = "2.3.1" uuid = { version = "1.7.0", features = ["v4"] } workspace-hack = { version = "0.0.0", path = "../workspace-hack", optional = true } diff --git a/src/storage-operators/src/lib.rs b/src/storage-operators/src/lib.rs index 040b0104b15b2..8a78eed1605e6 100644 --- a/src/storage-operators/src/lib.rs +++ b/src/storage-operators/src/lib.rs @@ -10,6 +10,7 @@ //! Shared Storage dataflow operators pub mod metrics; +pub mod oneshot_source; pub mod persist_source; pub mod s3_oneshot_sink; pub mod stats; diff --git a/src/storage-operators/src/oneshot_source.rs b/src/storage-operators/src/oneshot_source.rs new file mode 100644 index 0000000000000..1bf6d67faedc5 --- /dev/null +++ b/src/storage-operators/src/oneshot_source.rs @@ -0,0 +1,880 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +//! "Oneshot" sources are a one-time ingestion of data from an external system, unlike traditional +//! sources, they __do not__ run continuously. Oneshot sources are generally used for `COPY FROM` +//! SQL statements. +//! +//! The implementation of reading and parsing data is behind the [`OneshotSource`] and +//! [`OneshotFormat`] traits, respectively. Users looking to add new sources or formats, should +//! only need to add new implementations for these traits. +//! +//! * [`OneshotSource`] is an interface for listing and reading from an external system, e.g. an +//! HTTP server. +//! * [`OneshotFormat`] is an interface for how to parallelize and parse data, e.g. CSV. +//! +//! Given a [`OneshotSource`] and a [`OneshotFormat`] we build a dataflow structured like the +//! following: +//! +//! ```text +//! ┏━━━━━━━━━━━━━━━┓ +//! ┃ Discover ┃ +//! ┃ objects ┃ +//! ┗━━━━━━━┯━━━━━━━┛ +//! ┌───< Distribute >───┐ +//! │ │ +//! ┏━━━━━v━━━━┓ ┏━━━━━v━━━━┓ +//! ┃ Split ┃ ... ┃ Split ┃ +//! ┃ Work 1 ┃ ┃ Work n ┃ +//! ┗━━━━━┯━━━━┛ ┗━━━━━┯━━━━┛ +//! │ │ +//! ├───< Distribute >───┤ +//! │ │ +//! ┏━━━━━v━━━━┓ ┏━━━━━v━━━━┓ +//! ┃ Fetch ┃ ... ┃ Fetch ┃ +//! ┃ Work 1 ┃ ┃ Work n ┃ +//! ┗━━━━━┯━━━━┛ ┗━━━━━┯━━━━┛ +//! │ │ +//! ├───< Distribute >───┤ +//! │ │ +//! ┏━━━━━v━━━━┓ ┏━━━━━v━━━━┓ +//! ┃ Decode ┃ ... ┃ Decode ┃ +//! ┃ Chunk 1 ┃ ┃ Chunk n ┃ +//! ┗━━━━━┯━━━━┛ ┗━━━━━┯━━━━┛ +//! │ │ +//! │ │ +//! ┏━━━━━v━━━━┓ ┏━━━━━v━━━━┓ +//! ┃ Stage ┃ ... ┃ Stage ┃ +//! ┃ Batch 1 ┃ ┃ Batch n ┃ +//! ┗━━━━━┯━━━━┛ ┗━━━━━┯━━━━┛ +//! │ │ +//! └─────────┬──────────┘ +//! ┏━━━━━v━━━━┓ +//! ┃ Result ┃ +//! ┃ Callback ┃ +//! ┗━━━━━━━━━━┛ +//! ``` +//! + +use std::fmt; +use std::sync::Arc; + +use bytes::Bytes; +use differential_dataflow::Hashable; +use futures::stream::BoxStream; +use futures::{StreamExt, TryStreamExt}; +use mz_ore::cast::CastFrom; +use mz_persist_client::batch::ProtoBatch; +use mz_persist_client::cache::PersistClientCache; +use mz_persist_client::Diagnostics; +use mz_persist_types::codec_impls::UnitSchema; +use mz_repr::{Diff, GlobalId, Row, Timestamp}; +use mz_storage_types::controller::CollectionMetadata; +use mz_storage_types::oneshot_sources::{ContentFormat, ContentSource, OneshotIngestionRequest}; +use mz_storage_types::sources::SourceData; +use mz_timely_util::builder_async::{ + Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton, +}; +use mz_timely_util::pact::Distribute; +use serde::de::DeserializeOwned; +use serde::{Deserialize, Serialize}; +use std::collections::LinkedList; +use std::fmt::{Debug, Display}; +use std::future::Future; +use timely::container::CapacityContainerBuilder; +use timely::dataflow::channels::pact::Pipeline; +use timely::dataflow::{Scope, Stream as TimelyStream}; +use timely::progress::Antichain; +use tracing::info; +use url::Url; + +use crate::oneshot_source::csv::{CsvDecoder, CsvRecord, CsvWorkRequest}; +use crate::oneshot_source::http_source::{HttpChecksum, HttpOneshotSource}; + +pub mod csv; +pub mod http_source; + +/// Render a dataflow to do a "oneshot" ingestion. +/// +/// Roughly the operators we render do the following: +/// +/// 1. Discover objects with a [`OneshotSource`]. +/// 2. Split objects into separate units of work based on the [`OneshotFormat`]. +/// 3. Fetch individual units of work (aka fetch byte blobs) with the +/// [`OneshotFormat`] and [`OneshotSource`]. +/// 4. Decode the fetched byte blobs into [`Row`]s. +/// 5. Stage the [`Row`]s into Persist returning [`ProtoBatch`]es. +/// +/// TODO(cf3): Benchmark combining operators 3, 4, and 5. Currently we keep them +/// separate for the [`CsvDecoder`]. CSV decoding is hard to do in parallel so we +/// currently have a single worker Fetch an entire file, and then distributes +/// chunks for parallel Decoding. We should benchmark if this is actually faster +/// than just a single worker both fetching and decoding. +/// +pub fn render( + scope: G, + persist_clients: Arc, + collection_id: GlobalId, + collection_meta: CollectionMetadata, + request: OneshotIngestionRequest, + worker_callback: F, +) -> Vec +where + G: Scope, + F: FnOnce(Result, String>) -> () + 'static, +{ + let OneshotIngestionRequest { source, format } = request; + + let source = match source { + ContentSource::Http { url } => { + let source = HttpOneshotSource::new(reqwest::Client::default(), url); + SourceKind::Http(source) + } + }; + let format = match format { + ContentFormat::Csv => { + let format = CsvDecoder::default(); + FormatKind::Csv(format) + } + }; + + // Discover what objects are available to copy. + let (objects_stream, discover_token) = + render_discover_objects(scope.clone(), collection_id, source.clone()); + // Split the objects into individual units of work. + let (work_stream, split_token) = render_split_work( + scope.clone(), + collection_id, + &objects_stream, + source.clone(), + format.clone(), + ); + // Fetch each unit of work, returning chunks of records. + let (records_stream, fetch_token) = render_fetch_work( + scope.clone(), + collection_id, + source.clone(), + format.clone(), + &work_stream, + ); + // Parse chunks of records into Rows. + let (rows_stream, decode_token) = + render_decode_chunk(scope.clone(), format.clone(), &records_stream); + // Stage the Rows in Persist. + let (batch_stream, batch_token) = render_stage_batches_operator( + scope.clone(), + collection_id, + &collection_meta, + persist_clients, + &rows_stream, + ); + + // Collect all results together and notify the upstream of whether or not we succeeded. + render_completion_operator(scope, &batch_stream, worker_callback); + + let tokens = vec![ + discover_token, + split_token, + fetch_token, + decode_token, + batch_token, + ]; + + tokens +} + +/// Render an operator that using a [`OneshotSource`] will discover what objects are available +/// for fetching. +pub fn render_discover_objects( + scope: G, + collection_id: GlobalId, + source: S, +) -> ( + TimelyStream>, + PressOnDropButton, +) +where + G: Scope, + S: OneshotSource + 'static, +{ + // Only a single worker is responsible for discovering objects. + let worker_id = scope.index(); + let num_workers = scope.peers(); + let active_worker_id = usize::cast_from((collection_id, "discover").hashed()) % num_workers; + let is_active_worker = worker_id == active_worker_id; + + let mut builder = AsyncOperatorBuilder::new("CopyFrom-discover".to_string(), scope.clone()); + + let (start_handle, start_stream) = builder.new_output::>(); + + let shutdown = builder.build(move |caps| async move { + let [start_cap] = caps.try_into().unwrap(); + + if !is_active_worker { + return; + } + + info!(%collection_id, %worker_id, "CopyFrom Leader Discover"); + + let work = source.list().await.context("list"); + match work { + Ok(objects) => objects + .into_iter() + .for_each(|object| start_handle.give(&start_cap, Ok(object))), + Err(err) => start_handle.give(&start_cap, Err(err)), + } + }); + + (start_stream, shutdown.press_on_drop()) +} + +/// Render an operator that given a stream of [`OneshotSource::Object`]s will split them into units +/// of work based on the provided [`OneshotFormat`]. +pub fn render_split_work( + scope: G, + collection_id: GlobalId, + objects: &TimelyStream>, + source: S, + format: F, +) -> ( + TimelyStream, StorageErrorX>>, + PressOnDropButton, +) +where + G: Scope, + S: OneshotSource + Send + Sync + 'static, + F: OneshotFormat + Send + Sync + 'static, +{ + let worker_id = scope.index(); + let mut builder = AsyncOperatorBuilder::new("CopyFrom-split_work".to_string(), scope.clone()); + + let (request_handle, request_stream) = builder.new_output::>(); + let mut objects_handle = builder.new_input_for(objects, Distribute, &request_handle); + + let shutdown = builder.build(move |caps| async move { + let [_objects_cap] = caps.try_into().unwrap(); + + info!(%collection_id, %worker_id, "CopyFrom Split Work"); + + while let Some(event) = objects_handle.next().await { + let (capability, maybe_objects) = match event { + AsyncEvent::Data(cap, req) => (cap, req), + AsyncEvent::Progress(_) => continue, + }; + + // Nest the `split_work(...)` method in an async-block so we can use the `?` + // without returning from the entire operator, and so we can add context. + let result = async { + let mut requests = Vec::new(); + + for maybe_object in maybe_objects { + // Return early if the upstream Discover step failed. + let (object, checksum) = maybe_object?; + + let format_ = format.clone(); + let source_ = source.clone(); + let work_requests = mz_ore::task::spawn(|| "split-work", async move { + format_.split_work(source_.clone(), object, checksum).await + }) + .await + .expect("failed to spawn task")?; + + requests.extend(work_requests); + } + + Ok::<_, StorageErrorX>(requests) + } + .await + .context("split"); + + match result { + Ok(requests) => requests + .into_iter() + .for_each(|req| request_handle.give(&capability, Ok(req))), + Err(err) => request_handle.give(&capability, Err(err)), + } + } + }); + + (request_stream, shutdown.press_on_drop()) +} + +/// Render an operator that given a stream [`OneshotFormat::WorkRequest`]s will fetch chunks of the +/// remote [`OneshotSource::Object`] and return a stream of [`OneshotFormat::RecordChunk`]s that +/// can be decoded into [`Row`]s. +pub fn render_fetch_work( + scope: G, + collection_id: GlobalId, + source: S, + format: F, + work_requests: &TimelyStream, StorageErrorX>>, +) -> ( + TimelyStream>, + PressOnDropButton, +) +where + G: Scope, + S: OneshotSource + Sync + 'static, + F: OneshotFormat + Sync + 'static, +{ + let worker_id = scope.index(); + let mut builder = AsyncOperatorBuilder::new("CopyFrom-fetch_work".to_string(), scope.clone()); + + let (record_handle, record_stream) = builder.new_output::>(); + let mut work_requests_handle = builder.new_input_for(work_requests, Distribute, &record_handle); + + let shutdown = builder.build(move |caps| async move { + let [_work_cap] = caps.try_into().unwrap(); + + info!(%collection_id, %worker_id, "CopyFrom Fetch Work"); + + while let Some(event) = work_requests_handle.next().await { + let (capability, maybe_requests) = match event { + AsyncEvent::Data(cap, req) => (cap, req), + AsyncEvent::Progress(_) => continue, + }; + + // Wrap our work in a block to capture `?`. + let result = async { + let mut record_chunks = Vec::new(); + + // Process each stream of work, one at a time. + for maybe_request in maybe_requests { + let request = maybe_request?; + + let mut work_stream = format.fetch_work(&source, request); + while let Some(result) = work_stream.next().await { + let record_chunk = result.context("fetch worker")?; + record_chunks.push(record_chunk); + } + } + + Ok::<_, StorageErrorX>(record_chunks) + } + .await + .context("fetch work"); + + match result { + Ok(record_chunks) => record_chunks + .into_iter() + .for_each(|chunk| record_handle.give(&capability, Ok(chunk))), + Err(err) => record_handle.give(&capability, Err(err)), + } + } + }); + + (record_stream, shutdown.press_on_drop()) +} + +/// Render an operator that given a stream of [`OneshotFormat::RecordChunk`]s will decode these +/// chunks into a stream of [`Row`]s. +pub fn render_decode_chunk( + scope: G, + format: F, + record_chunks: &TimelyStream>, +) -> ( + TimelyStream>, + PressOnDropButton, +) +where + G: Scope, + F: OneshotFormat + 'static, +{ + let mut builder = AsyncOperatorBuilder::new("CopyFrom-decode_chunk".to_string(), scope.clone()); + + let (row_handle, row_stream) = builder.new_output::>(); + let mut record_chunk_handle = builder.new_input_for(record_chunks, Distribute, &row_handle); + + let shutdown = builder.build(move |caps| async move { + let [_row_cap] = caps.try_into().unwrap(); + + while let Some(event) = record_chunk_handle.next().await { + let (capability, maybe_chunks) = match event { + AsyncEvent::Data(cap, data) => (cap, data), + AsyncEvent::Progress(_) => continue, + }; + + let result = async { + let mut rows = Vec::new(); + for maybe_chunk in maybe_chunks { + let chunk = maybe_chunk?; + format.decode_chunk(chunk, &mut rows)?; + } + Ok::<_, StorageErrorX>(rows) + } + .await + .context("decode chunk"); + + match result { + Ok(rows) => rows + .into_iter() + .for_each(|row| row_handle.give(&capability, Ok(row))), + Err(err) => row_handle.give(&capability, Err(err)), + } + } + }); + + (row_stream, shutdown.press_on_drop()) +} + +/// Render an operator that given a stream of [`Row`]s will stage them in Persist and return a +/// stream of [`ProtoBatch`]es that can later be linked into a shard. +pub fn render_stage_batches_operator( + scope: G, + collection_id: GlobalId, + collection_meta: &CollectionMetadata, + persist_clients: Arc, + rows_stream: &TimelyStream>, +) -> ( + TimelyStream>, + PressOnDropButton, +) +where + G: Scope, +{ + let persist_location = collection_meta.persist_location.clone(); + let shard_id = collection_meta.data_shard; + let collection_desc = collection_meta.relation_desc.clone(); + + let mut builder = + AsyncOperatorBuilder::new("CopyFrom-stage_batches".to_string(), scope.clone()); + + let (proto_batch_handle, proto_batch_stream) = + builder.new_output::>(); + let mut rows_handle = builder.new_input_for(rows_stream, Pipeline, &proto_batch_handle); + + let shutdown = builder.build(move |caps| async move { + let [proto_batch_cap] = caps.try_into().unwrap(); + + // Open a Persist handle that we can use to stage a batch. + let persist_client = persist_clients + .open(persist_location) + .await + .expect("failed to open Persist client"); + let persist_diagnostics = Diagnostics { + shard_name: collection_id.to_string(), + handle_purpose: "CopyFrom::stage_batches".to_string(), + }; + let write_handle = persist_client + .open_writer::( + shard_id, + Arc::new(collection_desc), + Arc::new(UnitSchema), + persist_diagnostics, + ) + .await + .expect("could not open Persist shard"); + + // Create a batch using the minimum timestamp since these batches will + // get sent back to `environmentd` and their timestamps re-written + // before being finally appended. + let lower = mz_repr::Timestamp::MIN; + let upper = Antichain::from_elem(lower.step_forward()); + + let mut batch_builder = write_handle.builder(Antichain::from_elem(lower)); + + while let Some(event) = rows_handle.next().await { + let AsyncEvent::Data(_, row_batch) = event else { + continue; + }; + + // Pull Rows off our stream and stage them into a Batch. + for maybe_row in row_batch { + match maybe_row { + // Happy path, add the Row to our batch! + Ok(row) => { + let data = SourceData(Ok(row)); + batch_builder + .add(&data, &(), &lower, &1) + .await + .expect("failed to add Row to batch"); + } + // Sad path, something upstream hit an error. + Err(err) => { + // Clean up our in-progress batch so we don't leak data. + let batch = batch_builder + .finish(upper) + .await + .expect("failed to cleanup batch"); + batch.delete().await; + + // Pass on the error. + proto_batch_handle + .give(&proto_batch_cap, Err(err).context("stage batches")); + return; + } + } + } + } + + let batch = batch_builder + .finish(upper) + .await + .expect("failed to create Batch"); + + // Turn out Batch into a ProtoBatch that will later be linked in to + // the shard. + // + // Note: By turning this into a ProtoBatch, the onus is now on us to + // cleanup the Batch if it's never linked into the shard. + // + // TODO(cf2): Make sure these batches get cleaned up if another + // worker encounters an error. + let proto_batch = batch.into_transmittable_batch(); + proto_batch_handle.give(&proto_batch_cap, Ok(proto_batch)); + }); + + (proto_batch_stream, shutdown.press_on_drop()) +} + +/// Render an operator that given a stream of [`ProtoBatch`]es will call our `worker_callback` to +/// report the results upstream. +pub fn render_completion_operator( + scope: G, + results_stream: &TimelyStream>, + worker_callback: F, +) where + G: Scope, + F: FnOnce(Result, String>) -> () + 'static, +{ + let mut builder = AsyncOperatorBuilder::new("CopyFrom-completion".to_string(), scope.clone()); + let mut results_input = builder.new_disconnected_input(results_stream, Pipeline); + + builder.build(move |_| async move { + let result = async move { + let mut maybe_payload: Option = None; + + while let Some(event) = results_input.next().await { + if let AsyncEvent::Data(_cap, results) = event { + let [result] = results + .try_into() + .expect("only 1 event on the result stream"); + + // TODO(cf2): Lift this restriction. + if maybe_payload.is_some() { + panic!("expected only one batch!"); + } + + maybe_payload = Some(result.map_err(|e| e.to_string())?); + } + } + + Ok(maybe_payload) + } + .await; + + // Report to the caller of our final status. + worker_callback(result); + }); +} + +/// Defines a remote system that we can fetch data from for a "one time" ingestion. +pub trait OneshotSource: Clone + Send { + /// An individual unit within the source, e.g. a file. + type Object: Debug + Clone + Send + Serialize + DeserializeOwned + 'static; + /// Checksum for a [`Self::Object`]. + type Checksum: Debug + Clone + Send + Serialize + DeserializeOwned + 'static; + + /// Returns all of the objects for this source. + fn list<'a>( + &'a self, + ) -> impl Future, StorageErrorX>> + Send; + + /// Resturns a stream of the data for a specific object. + fn get<'s>( + &'s self, + object: Self::Object, + checksum: Self::Checksum, + range: Option>, + ) -> BoxStream<'s, Result>; +} + +/// An enum wrapper around [`OneshotSource`]s. +/// +/// An alternative to this wrapper would be to use `Box`, but that requires +/// making the trait object safe and it's easier to just wrap it in an enum. Also, this wrapper +/// provides a convenient place to add [`StorageErrorXContext::context`] for all of our source +/// types. +#[derive(Clone)] +pub(crate) enum SourceKind { + Http(HttpOneshotSource), +} + +impl OneshotSource for SourceKind { + type Object = ObjectKind; + type Checksum = ChecksumKind; + + async fn list<'a>(&'a self) -> Result, StorageErrorX> { + match self { + SourceKind::Http(http) => { + let objects = http.list().await.context("http")?; + let objects = objects + .into_iter() + .map(|(object, checksum)| { + (ObjectKind::Http(object), ChecksumKind::Http(checksum)) + }) + .collect(); + Ok(objects) + } + } + } + + fn get<'s>( + &'s self, + object: Self::Object, + checksum: Self::Checksum, + range: Option>, + ) -> BoxStream<'s, Result> { + match (self, object, checksum) { + (SourceKind::Http(http), ObjectKind::Http(object), ChecksumKind::Http(checksum)) => { + http.get(object, checksum, range) + .map(|result| result.context("http")) + .boxed() + } + } + } +} + +/// Enum wrapper for [`OneshotSource::Object`], see [`SourceKind`] for more details. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub(crate) enum ObjectKind { + Http(Url), +} + +/// Enum wrapper for [`OneshotSource::Checksum`], see [`SourceKind`] for more details. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub(crate) enum ChecksumKind { + Http(HttpChecksum), +} + +/// Defines a format that we fetch for a "one time" ingestion. +pub trait OneshotFormat: Clone { + /// A single unit of work for decoding this format, e.g. a single Parquet RowGroup. + type WorkRequest: Debug + Clone + Send + Serialize + DeserializeOwned + 'static + where + S: OneshotSource; + /// A chunk of records in this format that can be decoded into Rows. + type RecordChunk: Debug + Clone + Send + Serialize + DeserializeOwned + 'static; + + /// Given an upstream object, defines how we should parse this object in parallel. + /// + /// Note: It's totally fine to not process an object in parallel, and just return a single + /// [`Self::WorkRequest`] here. + fn split_work( + &self, + source: S, + object: S::Object, + checksum: S::Checksum, + ) -> impl Future>, StorageErrorX>> + Send; + + /// Given a work request, fetch data from the [`OneshotSource`] and return it in a format that + /// can later be decoded. + fn fetch_work<'a, S: OneshotSource + Sync>( + &'a self, + source: &'a S, + request: Self::WorkRequest, + ) -> BoxStream<'a, Result>; + + /// Decode a chunk of records into [`Row`]s. + fn decode_chunk( + &self, + chunk: Self::RecordChunk, + rows: &mut Vec, + ) -> Result; +} + +/// An enum wrapper around [`OneshotFormat`]s. +/// +/// An alternative to this wrapper would be to use `Box`, but that requires +/// making the trait object safe and it's easier to just wrap it in an enum. Also, this wrapper +/// provides a convenient place to add [`StorageErrorXContext::context`] for all of our format +/// types. +#[derive(Clone)] +pub(crate) enum FormatKind { + Csv(CsvDecoder), +} + +impl OneshotFormat for FormatKind { + type WorkRequest + = RequestKind + where + S: OneshotSource; + type RecordChunk = RecordChunkKind; + + async fn split_work( + &self, + source: S, + object: S::Object, + checksum: S::Checksum, + ) -> Result>, StorageErrorX> { + match self { + FormatKind::Csv(csv) => { + let work = csv + .split_work(source, object, checksum) + .await + .context("csv")? + .into_iter() + .map(RequestKind::Csv) + .collect(); + Ok(work) + } + } + } + + fn fetch_work<'a, S: OneshotSource + Sync>( + &'a self, + source: &'a S, + request: Self::WorkRequest, + ) -> BoxStream<'a, Result> { + match (self, request) { + (FormatKind::Csv(csv), RequestKind::Csv(request)) => csv + .fetch_work(source, request) + .map_ok(RecordChunkKind::Csv) + .map(|result| result.context("csv")) + .boxed(), + } + } + + fn decode_chunk( + &self, + chunk: Self::RecordChunk, + rows: &mut Vec, + ) -> Result { + match (self, chunk) { + (FormatKind::Csv(csv), RecordChunkKind::Csv(chunk)) => { + csv.decode_chunk(chunk, rows).context("csv") + } + } + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub(crate) enum RequestKind { + Csv(CsvWorkRequest), +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub(crate) enum RecordChunkKind { + Csv(CsvRecord), +} + +/// Experimental Error Type. +/// +/// The goal of this type is to combine concepts from both `thiserror` and +/// `anyhow`. Having "stongly typed" errors from `thiserror` is useful for +/// determining what action to take and tracking the context of an error like +/// `anyhow` is useful for determining where an error came from. +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct StorageErrorX { + kind: StorageErrorXKind, + context: LinkedList, +} + +impl fmt::Display for StorageErrorX { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + writeln!(f, "error: {}", self.kind)?; + writeln!(f, "causes: {:?}", self.context)?; + Ok(()) + } +} + +/// Experimental Error Type, see [`StorageErrorX`]. +#[derive(Debug, Clone, Serialize, Deserialize, thiserror::Error)] +pub enum StorageErrorXKind { + #[error("csv decoding error: {0}")] + CsvDecoding(Arc), + #[error("reqwest error: {0}")] + Reqwest(Arc), + #[error("invalid reqwest header: {0}")] + InvalidHeader(Arc), + #[error("something went wrong: {0}")] + Generic(String), +} + +impl From for StorageErrorXKind { + fn from(err: csv_async::Error) -> Self { + StorageErrorXKind::CsvDecoding(err.to_string().into()) + } +} + +impl From for StorageErrorXKind { + fn from(err: reqwest::Error) -> Self { + StorageErrorXKind::Reqwest(err.to_string().into()) + } +} + +impl From for StorageErrorXKind { + fn from(err: reqwest::header::ToStrError) -> Self { + StorageErrorXKind::InvalidHeader(err.to_string().into()) + } +} + +impl StorageErrorXKind { + pub fn with_context(self, context: C) -> StorageErrorX { + StorageErrorX { + kind: self, + context: LinkedList::from([context.to_string()]), + } + } + + pub fn generic(error: C) -> StorageErrorXKind { + StorageErrorXKind::Generic(error.to_string()) + } +} + +impl From for StorageErrorX +where + E: Into, +{ + fn from(err: E) -> Self { + StorageErrorX { + kind: err.into(), + context: LinkedList::new(), + } + } +} + +trait StorageErrorXContext { + fn context(self, context: C) -> Result + where + C: Display; +} + +impl StorageErrorXContext for Result +where + E: Into, +{ + fn context(self, context: C) -> Result + where + C: Display, + { + match self { + Ok(val) => Ok(val), + Err(kind) => Err(StorageErrorX { + kind: kind.into(), + context: LinkedList::from([context.to_string()]), + }), + } + } +} + +impl StorageErrorXContext for Result { + fn context(self, context: C) -> Result + where + C: Display, + { + match self { + Ok(val) => Ok(val), + Err(mut e) => { + e.context.push_back(context.to_string()); + Err(e) + } + } + } +} diff --git a/src/storage-operators/src/oneshot_source/csv.rs b/src/storage-operators/src/oneshot_source/csv.rs new file mode 100644 index 0000000000000..3aacffad4b4d6 --- /dev/null +++ b/src/storage-operators/src/oneshot_source/csv.rs @@ -0,0 +1,108 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +//! CSV to Row Decoder. + +use std::fmt::Debug; +use std::io; + +use futures::stream::{BoxStream, StreamExt}; +use futures::TryStreamExt; +use mz_repr::fixed_length::FromDatumIter; +use mz_repr::{Datum, Row}; +use serde::{Deserialize, Serialize}; +use tokio_util::io::StreamReader; + +use crate::oneshot_source::{OneshotFormat, OneshotSource, StorageErrorX, StorageErrorXKind}; + +#[derive(Default, Clone)] +pub struct CsvDecoder; + +/// Instructions on how to parse a single CSV file. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct CsvWorkRequest { + object: O, + checksum: C, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct CsvRecord { + bytes: Vec, + ranges: Vec>, +} + +impl OneshotFormat for CsvDecoder { + type WorkRequest + = CsvWorkRequest + where + S: OneshotSource; + type RecordChunk = CsvRecord; + + async fn split_work( + &self, + _source: S, + object: S::Object, + checksum: S::Checksum, + ) -> Result>, StorageErrorX> { + // Decoding a CSV in parallel is hard. + // + // TODO(cf3): If necessary, we can get a 2x speedup by parsing a CSV + // from the start and end in parallel, and meeting in the middle. + // + // See for general parallelization strategies. + let request = CsvWorkRequest { object, checksum }; + Ok(vec![request]) + } + + fn fetch_work<'a, S: OneshotSource + Sync>( + &'a self, + source: &'a S, + request: Self::WorkRequest, + ) -> BoxStream<'a, Result> { + let CsvWorkRequest { object, checksum } = request; + + let raw_byte_stream = source + .get(object, checksum, None) + .map_err(|e| io::Error::new(io::ErrorKind::Interrupted, format!("{e:?}"))); + let stream_reader = StreamReader::new(raw_byte_stream); + + csv_async::AsyncReader::from_reader(stream_reader) + .into_byte_records() + .map_ok(|record| { + let bytes = record.as_slice().to_vec(); + let ranges = (0..record.len()) + .map(|idx| record.range(idx).expect("known to exist")) + .collect(); + CsvRecord { bytes, ranges } + }) + .map_err(|err| StorageErrorXKind::from(err).with_context("csv decoding")) + .boxed() + } + + fn decode_chunk( + &self, + chunk: Self::RecordChunk, + rows: &mut Vec, + ) -> Result { + let CsvRecord { bytes, ranges } = chunk; + + // TODO(cf1): Get a RelationDesc here to parse the proper types. + let datums = ranges.into_iter().map(|range| { + let slice = bytes.get(range).expect("known to exist"); + let s = std::str::from_utf8(slice).expect("valid UTF-8"); + Datum::String(s) + }); + + let mut row = Row::default(); + row.from_datum_iter(datums); + rows.push(row); + + Ok(1) + } +} diff --git a/src/storage-operators/src/oneshot_source/http_source.rs b/src/storage-operators/src/oneshot_source/http_source.rs new file mode 100644 index 0000000000000..fcb81944ef009 --- /dev/null +++ b/src/storage-operators/src/oneshot_source/http_source.rs @@ -0,0 +1,110 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +//! Generic HTTP oneshot source that will fetch a file from the public internet. + +use bytes::Bytes; +use futures::stream::{BoxStream, StreamExt}; +use futures::TryStreamExt; +use reqwest::Client; +use serde::{Deserialize, Serialize}; +use url::Url; + +use crate::oneshot_source::{OneshotSource, StorageErrorX, StorageErrorXContext}; + +/// Generic oneshot source that fetches a file from a URL on the public internet. +#[derive(Clone)] +pub struct HttpOneshotSource { + client: Client, + origin: Url, +} + +impl HttpOneshotSource { + pub fn new(client: Client, origin: Url) -> Self { + HttpOneshotSource { client, origin } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum HttpChecksum { + /// No checksumming is requested. + None, + /// The HTTP [`ETag`](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/ETag) header. + ETag(String), + /// The HTTP [`Last-Modified`](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Last-Modified) header. + LastModified(String), +} + +impl OneshotSource for HttpOneshotSource { + type Object = Url; + type Checksum = HttpChecksum; + + async fn list<'a>(&'a self) -> Result, StorageErrorX> { + // TODO(cf3): Support listing files from a directory index. + + // Submit a HEAD request so we can discover metadata about the file. + let response = self + .client + .head(self.origin.clone()) + .send() + .await + .context("HEAD request")?; + + let get_header = |name: &reqwest::header::HeaderName| { + let header = response.headers().get(name)?; + match header.to_str() { + Err(e) => { + tracing::warn!("failed to deserialize header '{name}', err: {e}"); + None + } + Ok(value) => Some(value), + } + }; + + // Get a checksum from the content. + let checksum = if let Some(etag) = get_header(&reqwest::header::ETAG) { + HttpChecksum::ETag(etag.to_string()) + } else if let Some(last_modified) = get_header(&reqwest::header::LAST_MODIFIED) { + let last_modified = last_modified.to_string(); + HttpChecksum::LastModified(last_modified.to_string()) + } else { + HttpChecksum::None + }; + + // TODO(cf1): We should probably check the content-type as well. At least for advisory purposes. + + Ok(vec![(self.origin.clone(), checksum)]) + } + + fn get<'s>( + &'s self, + object: Self::Object, + _checksum: Self::Checksum, + _range: Option>, + ) -> BoxStream<'s, Result> { + // TODO(cf1): Support the range param. + // TODO(cf1): Validate our checksum. + + let initial_response = async move { + let response = self + .client + .get(object.to_owned()) + .send() + .await + .context("get")?; + let bytes_stream = response.bytes_stream().err_into(); + + Ok::<_, StorageErrorX>(bytes_stream) + }; + + futures::stream::once(initial_response) + .try_flatten() + .boxed() + } +} diff --git a/src/storage-types/BUILD.bazel b/src/storage-types/BUILD.bazel index fc8f88dff561e..bd409e300f777 100644 --- a/src/storage-types/BUILD.bazel +++ b/src/storage-types/BUILD.bazel @@ -155,6 +155,7 @@ filegroup( "src/controller.proto", "src/errors.proto", "src/instances.proto", + "src/oneshot_sources.proto", "src/parameters.proto", "src/sinks.proto", "src/sources.proto", diff --git a/src/storage-types/build.rs b/src/storage-types/build.rs index c04f9541fe8f3..6c45b1ffb7c21 100644 --- a/src/storage-types/build.rs +++ b/src/storage-types/build.rs @@ -61,6 +61,7 @@ fn main() { "storage-types/src/connections.proto", "storage-types/src/errors.proto", "storage-types/src/instances.proto", + "storage-types/src/oneshot_sources.proto", "storage-types/src/parameters.proto", "storage-types/src/sinks.proto", "storage-types/src/sources.proto", diff --git a/src/storage-types/src/lib.rs b/src/storage-types/src/lib.rs index 6526e81e6fc2f..683bedd209c96 100644 --- a/src/storage-types/src/lib.rs +++ b/src/storage-types/src/lib.rs @@ -16,6 +16,7 @@ pub mod controller; pub mod dyncfgs; pub mod errors; pub mod instances; +pub mod oneshot_sources; pub mod parameters; pub mod read_holds; pub mod read_policy; diff --git a/src/storage-types/src/oneshot_sources.proto b/src/storage-types/src/oneshot_sources.proto new file mode 100644 index 0000000000000..41e228a375706 --- /dev/null +++ b/src/storage-types/src/oneshot_sources.proto @@ -0,0 +1,28 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +syntax = "proto3"; + +package mz_storage_types.oneshot_sources; + +message ProtoOneshotIngestionRequest { + oneof source { + ProtoHttpContentSource http = 1; + } + + oneof format { + ProtoCsvContentFormat csv = 2; + } +} + +message ProtoHttpContentSource { + string url = 1; +} + +message ProtoCsvContentFormat {} diff --git a/src/storage-types/src/oneshot_sources.rs b/src/storage-types/src/oneshot_sources.rs new file mode 100644 index 0000000000000..4b798817c9959 --- /dev/null +++ b/src/storage-types/src/oneshot_sources.rs @@ -0,0 +1,114 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +//! Types for oneshot sources. + +use mz_proto::{IntoRustIfSome, RustType}; +use mz_timely_util::builder_async::PressOnDropButton; + +use serde::{Deserialize, Serialize}; +use tokio::sync::mpsc::UnboundedReceiver; +use url::Url; + +include!(concat!( + env!("OUT_DIR"), + "/mz_storage_types.oneshot_sources.rs" +)); + +/// Callback type used to send the result of a oneshot source. +pub type OneshotResultCallback = + Box>) -> () + Send + 'static>; + +pub struct OneshotIngestionDescription { + /// Tokens for the running dataflows. + pub tokens: Vec, + /// Receiving end of the channel the dataflow uses to report results. + pub results: UnboundedReceiver, String>>, +} + +#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)] +pub struct OneshotIngestionRequest { + pub source: ContentSource, + pub format: ContentFormat, +} + +impl RustType for OneshotIngestionRequest { + fn into_proto(&self) -> ProtoOneshotIngestionRequest { + ProtoOneshotIngestionRequest { + source: Some(self.source.into_proto()), + format: Some(self.format.into_proto()), + } + } + + fn from_proto( + proto: ProtoOneshotIngestionRequest, + ) -> Result { + let source = proto + .source + .into_rust_if_some("ProtoOneshotIngestionRequest::source")?; + let format = proto + .format + .into_rust_if_some("ProtoOneshotIngestionRequest::format")?; + + Ok(OneshotIngestionRequest { source, format }) + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)] +pub enum ContentSource { + Http { url: Url }, +} + +impl RustType for ContentSource { + fn into_proto(&self) -> proto_oneshot_ingestion_request::Source { + match self { + ContentSource::Http { url } => { + proto_oneshot_ingestion_request::Source::Http(ProtoHttpContentSource { + url: url.to_string(), + }) + } + } + } + + fn from_proto( + proto: proto_oneshot_ingestion_request::Source, + ) -> Result { + match proto { + proto_oneshot_ingestion_request::Source::Http(source) => { + let url = Url::parse(&source.url).expect("failed to roundtrip Url"); + Ok(ContentSource::Http { url }) + } + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)] +pub enum ContentFormat { + Csv, +} + +impl RustType for ContentFormat { + fn into_proto(&self) -> proto_oneshot_ingestion_request::Format { + match self { + ContentFormat::Csv => { + proto_oneshot_ingestion_request::Format::Csv(ProtoCsvContentFormat::default()) + } + } + } + + fn from_proto( + proto: proto_oneshot_ingestion_request::Format, + ) -> Result { + match proto { + proto_oneshot_ingestion_request::Format::Csv(ProtoCsvContentFormat {}) => { + Ok(ContentFormat::Csv) + } + } + } +} diff --git a/src/storage/src/internal_control.rs b/src/storage/src/internal_control.rs index 904cf68bd5ee1..49f93d8dbd1dd 100644 --- a/src/storage/src/internal_control.rs +++ b/src/storage/src/internal_control.rs @@ -12,6 +12,7 @@ use std::time::Instant; use mz_repr::{GlobalId, Row}; use mz_rocksdb::config::SharedWriteBufferManager; use mz_storage_types::controller::CollectionMetadata; +use mz_storage_types::oneshot_sources::OneshotIngestionRequest; use mz_storage_types::parameters::StorageParameters; use mz_storage_types::sinks::{MetadataFilled, StorageSinkDesc}; use mz_storage_types::sources::IngestionDescription; @@ -82,6 +83,18 @@ pub enum InternalStorageCommand { /// have already been durably ingested. source_resume_uppers: BTreeMap>, }, + /// Render a oneshot ingestion dataflow that fetches data from an external system and stages + /// batches in Persist, that can later be appended to the shard. + RunOneshotIngestion { + /// ID of the running dataflow that is doing the ingestion. + ingestion_id: GlobalId, + /// ID of the collection we'll create batches for. + collection_id: GlobalId, + /// Metadata of the collection we'll create batches for. + collection_meta: CollectionMetadata, + /// Description of the oneshot ingestion. + request: OneshotIngestionRequest, + }, /// Render a sink dataflow. RunSinkDataflow( GlobalId, diff --git a/src/storage/src/render.rs b/src/storage/src/render.rs index 0c04e3049e041..dc8b290b3086e 100644 --- a/src/storage/src/render.rs +++ b/src/storage/src/render.rs @@ -205,6 +205,7 @@ use mz_ore::error::ErrorExt; use mz_repr::{GlobalId, Row}; use mz_storage_types::controller::CollectionMetadata; use mz_storage_types::dyncfgs; +use mz_storage_types::oneshot_sources::{OneshotIngestionDescription, OneshotIngestionRequest}; use mz_storage_types::sinks::{MetadataFilled, StorageSinkDesc}; use mz_storage_types::sources::{GenericSourceConnection, IngestionDescription, SourceConnection}; use mz_timely_util::antichain::AntichainExt; @@ -489,3 +490,40 @@ pub fn build_export_dataflow( }) }); } + +pub(crate) fn build_oneshot_ingestion_dataflow( + timely_worker: &mut TimelyWorker, + storage_state: &mut StorageState, + ingestion_id: GlobalId, + collection_id: GlobalId, + collection_meta: CollectionMetadata, + description: OneshotIngestionRequest, +) { + let (results_tx, results_rx) = tokio::sync::mpsc::unbounded_channel(); + let callback = move |result| { + // TODO(cf3): Do we care if the receiver has gone away? + // + // Persist is working on cleaning up leaked blobs, we could also use `OneshotReceiverExt` + // here, but that might run into the infamous async-Drop problem. + let _ = results_tx.send(result); + }; + + let tokens = timely_worker.dataflow(|scope| { + mz_storage_operators::oneshot_source::render( + scope.clone(), + Arc::clone(&storage_state.persist_clients), + collection_id, + collection_meta, + description, + callback, + ) + }); + let ingestion_description = OneshotIngestionDescription { + tokens, + results: results_rx, + }; + + storage_state + .oneshot_ingestions + .insert(ingestion_id, ingestion_description); +} diff --git a/src/storage/src/storage_state.rs b/src/storage/src/storage_state.rs index 19049058903a7..90bea535be72c 100644 --- a/src/storage/src/storage_state.rs +++ b/src/storage/src/storage_state.rs @@ -86,6 +86,7 @@ use mz_ore::now::NowFn; use mz_ore::tracing::TracingHandle; use mz_ore::vec::VecExt; use mz_ore::{soft_assert_or_log, soft_panic_or_log}; +use mz_persist_client::batch::ProtoBatch; use mz_persist_client::cache::PersistClientCache; use mz_repr::{GlobalId, Timestamp}; use mz_rocksdb::config::SharedWriteBufferManager; @@ -95,6 +96,7 @@ use mz_storage_client::client::{ use mz_storage_types::configuration::StorageConfiguration; use mz_storage_types::connections::ConnectionContext; use mz_storage_types::controller::CollectionMetadata; +use mz_storage_types::oneshot_sources::OneshotIngestionDescription; use mz_storage_types::sinks::{MetadataFilled, StorageSinkDesc}; use mz_storage_types::sources::IngestionDescription; use mz_storage_types::AlterCompatible; @@ -213,6 +215,7 @@ impl<'w, A: Allocate> Worker<'w, A> { reported_frontiers: BTreeMap::new(), ingestions: BTreeMap::new(), exports: BTreeMap::new(), + oneshot_ingestions: BTreeMap::new(), now, timely_worker_index: timely_worker.index(), timely_worker_peers: timely_worker.peers(), @@ -274,6 +277,8 @@ pub struct StorageState { pub ingestions: BTreeMap>, /// Descriptions of each installed export. pub exports: BTreeMap>, + /// Descriptions of oneshot ingestions that are currently running. + pub oneshot_ingestions: BTreeMap>, /// Undocumented pub now: NowFn, /// Index of the associated timely dataflow worker. @@ -448,6 +453,7 @@ impl<'w, A: Allocate> Worker<'w, A> { } self.report_frontier_progress(&response_tx); + self.process_oneshot_ingestions(&response_tx); // Report status updates if any are present if self.storage_state.object_status_updates.borrow().len() > 0 { @@ -699,6 +705,21 @@ impl<'w, A: Allocate> Worker<'w, A> { source_resume_uppers, ); } + InternalStorageCommand::RunOneshotIngestion { + ingestion_id, + collection_id, + collection_meta, + request, + } => { + crate::render::build_oneshot_ingestion_dataflow( + self.timely_worker, + &mut self.storage_state, + ingestion_id, + collection_id, + collection_meta, + request, + ); + } InternalStorageCommand::RunSinkDataflow(sink_id, sink_description) => { info!( "worker {}/{} trying to (re-)start sink {sink_id}", @@ -839,6 +860,39 @@ impl<'w, A: Allocate> Worker<'w, A> { let _ = response_tx.send(response); } + fn process_oneshot_ingestions(&mut self, response_tx: &ResponseSender) { + use tokio::sync::mpsc::error::TryRecvError; + + let mut to_remove = vec![]; + + for (ingestion_id, ingestion_state) in &mut self.storage_state.oneshot_ingestions { + loop { + match ingestion_state.results.try_recv() { + Ok(result) => { + let response = match result { + Ok(maybe_batch) => maybe_batch.into_iter().map(Result::Ok).collect(), + Err(err) => vec![Err(err)], + }; + let staged_batches = BTreeMap::from([(*ingestion_id, response)]); + let _ = response_tx.send(StorageResponse::StagedBatches(staged_batches)); + } + Err(TryRecvError::Empty) => { + break; + } + Err(TryRecvError::Disconnected) => { + to_remove.push(*ingestion_id); + break; + } + } + } + } + + for ingestion_id in to_remove { + tracing::info!(?ingestion_id, "removing oneshot ingestion"); + self.storage_state.oneshot_ingestions.remove(&ingestion_id); + } + } + /// Extract commands until `InitializationComplete`, and make the worker /// reflect those commands. If the worker can not be made to reflect the /// commands, return an error. @@ -900,6 +954,12 @@ impl<'w, A: Allocate> Worker<'w, A> { } } } + StorageCommand::RunOneshotIngestion(oneshot) => { + info!(%worker_id, ?oneshot, "reconcile: received RunOneshotIngestion command"); + // TODO(cf1): Handle CancelOneshotIngestion, clean out stale oneshot + // ingestions from our state. Possibly here we respond to the client + // with a cancelation to make sure the client doesn't wait forever. + } StorageCommand::RunSinks(exports) => { info!(%worker_id, ?exports, "reconcile: received RunSinks command"); @@ -1025,7 +1085,8 @@ impl<'w, A: Allocate> Worker<'w, A> { StorageCommand::InitializationComplete | StorageCommand::AllowWrites | StorageCommand::UpdateConfiguration(_) - | StorageCommand::AllowCompaction(_) => (), + | StorageCommand::AllowCompaction(_) + | StorageCommand::RunOneshotIngestion(_) => (), } } @@ -1162,6 +1223,18 @@ impl StorageState { } } } + StorageCommand::RunOneshotIngestion(oneshot) => { + if self.timely_worker_index == 0 { + self.internal_cmd_tx.borrow_mut().broadcast( + InternalStorageCommand::RunOneshotIngestion { + ingestion_id: oneshot.ingestion_id, + collection_id: oneshot.collection_id, + collection_meta: oneshot.collection_meta, + request: oneshot.request, + }, + ); + } + } StorageCommand::RunSinks(exports) => { for export in exports { // Remember the sink description to facilitate possible diff --git a/src/txn-wal/src/txn_write.rs b/src/txn-wal/src/txn_write.rs index 6b126422b4f39..ae0810766668b 100644 --- a/src/txn-wal/src/txn_write.rs +++ b/src/txn-wal/src/txn_write.rs @@ -20,7 +20,7 @@ use futures::stream::FuturesUnordered; use futures::StreamExt; use mz_ore::cast::CastFrom; use mz_ore::instrument; -use mz_persist_client::batch::Batch; +use mz_persist_client::batch::{Batch, ProtoBatch}; use mz_persist_client::ShardId; use mz_persist_types::txn::{TxnsCodec, TxnsEntry}; use mz_persist_types::{Codec, Codec64, Opaque, StepForward}; @@ -36,6 +36,7 @@ use crate::txns::{Tidy, TxnsHandle}; #[derive(Debug)] pub(crate) struct TxnWrite { pub(crate) batches: Vec>, + pub(crate) staged: Vec, pub(crate) writes: Vec<(K, V, D)>, } @@ -43,6 +44,7 @@ impl TxnWrite { /// Merges the staged writes in `other` into this. pub fn merge(&mut self, other: Self) { self.batches.extend(other.batches); + self.staged.extend(other.staged); self.writes.extend(other.writes); } } @@ -51,6 +53,7 @@ impl Default for TxnWrite { fn default() -> Self { Self { batches: Vec::default(), + staged: Vec::default(), writes: Vec::default(), } } @@ -92,6 +95,13 @@ where .push((key, val, diff)) } + /// Stage a [`Batch`] to the in-progress txn. + /// + /// The timestamp will be assigned at commit time. + pub fn write_batch(&mut self, data_id: &ShardId, batch: ProtoBatch) { + self.writes.entry(*data_id).or_default().staged.push(batch) + } + /// Commit this transaction at `commit_ts`. /// /// This either atomically commits all staged writes or, if that's no longer @@ -170,8 +180,10 @@ where let commit_ts = commit_ts.clone(); txn_batches_updates.push(async move { let mut batches = updates - .batches + .staged .into_iter() + .map(|staged| data_write.batch_from_transmittable_batch(staged)) + .chain(updates.batches.into_iter()) .map(|mut batch| { batch .rewrite_ts( @@ -182,6 +194,7 @@ where batch.into_transmittable_batch() }) .collect::>(); + if !updates.writes.is_empty() { let mut batch = data_write.builder(Antichain::from_elem(T::minimum())); for (k, v, d) in updates.writes.iter() { @@ -302,6 +315,7 @@ where .collect(); let txn_write = TxnWrite { writes: Vec::new(), + staged: Vec::new(), batches, }; self.writes.insert(data_write.shard_id(), txn_write); diff --git a/src/workspace-hack/Cargo.toml b/src/workspace-hack/Cargo.toml index 0c5d9cc6ade1a..5d20aff942b2a 100644 --- a/src/workspace-hack/Cargo.toml +++ b/src/workspace-hack/Cargo.toml @@ -29,8 +29,7 @@ aws-smithy-runtime = { version = "1.3.1", default-features = false, features = [ aws-smithy-types = { version = "1.1.8", default-features = false, features = ["byte-stream-poll-next", "http-body-0-4-x", "http-body-1-x", "rt-tokio", "test-util"] } axum = { version = "0.7.5", features = ["ws"] } axum-core = { version = "0.4.3", default-features = false, features = ["tracing"] } -bstr-6f8ce4dd05d13bba = { package = "bstr", version = "0.2.14" } -bstr-dff4ba8e3ae991db = { package = "bstr", version = "1.10.0" } +bstr = { version = "0.2.14" } byteorder = { version = "1.5.0" } bytes = { version = "1.4.0", features = ["serde"] } chrono = { version = "0.4.35", default-features = false, features = ["clock", "serde"] } @@ -42,13 +41,13 @@ crossbeam-deque = { version = "0.8.3" } crossbeam-epoch = { version = "0.9.13" } crossbeam-utils = { version = "0.8.20" } crypto-common = { version = "0.1.3", default-features = false, features = ["std"] } +csv-async = { version = "1.3.0", features = ["tokio"] } debugid = { version = "0.8.0", default-features = false, features = ["serde"] } dec = { version = "0.4.8", default-features = false, features = ["serde"] } digest = { version = "0.10.6", features = ["mac", "std"] } either = { version = "1.8.0", features = ["serde"] } flate2 = { version = "1.0.24", features = ["zlib"] } form_urlencoded = { version = "1.2.1" } -futures = { version = "0.3.25" } futures-channel = { version = "0.3.30", features = ["sink"] } futures-core = { version = "0.3.30" } futures-executor = { version = "0.3.25" } @@ -105,9 +104,9 @@ rand = { version = "0.8.5", features = ["small_rng"] } rand_chacha = { version = "0.3.0" } rdkafka-sys = { git = "https://github.com/MaterializeInc/rust-rdkafka.git", features = ["cmake-build", "libz-static", "ssl-vendored", "zstd"] } regex = { version = "1.10.5" } -regex-automata = { version = "0.4.7", default-features = false, features = ["dfa-onepass", "dfa-search", "hybrid", "meta", "nfa", "perf", "unicode"] } +regex-automata = { version = "0.4.7", default-features = false, features = ["dfa-onepass", "hybrid", "meta", "nfa", "perf", "unicode"] } regex-syntax = { version = "0.8.3" } -reqwest = { version = "0.11.24", features = ["blocking", "json", "multipart", "native-tls-vendored"] } +reqwest = { version = "0.11.24", features = ["blocking", "json", "multipart", "native-tls-vendored", "stream"] } schemars = { version = "0.8.11", features = ["uuid1"] } scopeguard = { version = "1.1.0" } semver = { version = "1.0.23", features = ["serde"] } @@ -160,8 +159,7 @@ aws-smithy-runtime = { version = "1.3.1", default-features = false, features = [ aws-smithy-types = { version = "1.1.8", default-features = false, features = ["byte-stream-poll-next", "http-body-0-4-x", "http-body-1-x", "rt-tokio", "test-util"] } axum = { version = "0.7.5", features = ["ws"] } axum-core = { version = "0.4.3", default-features = false, features = ["tracing"] } -bstr-6f8ce4dd05d13bba = { package = "bstr", version = "0.2.14" } -bstr-dff4ba8e3ae991db = { package = "bstr", version = "1.10.0" } +bstr = { version = "0.2.14" } byteorder = { version = "1.5.0" } bytes = { version = "1.4.0", features = ["serde"] } cc = { version = "1.1.28", default-features = false, features = ["parallel"] } @@ -174,13 +172,13 @@ crossbeam-deque = { version = "0.8.3" } crossbeam-epoch = { version = "0.9.13" } crossbeam-utils = { version = "0.8.20" } crypto-common = { version = "0.1.3", default-features = false, features = ["std"] } +csv-async = { version = "1.3.0", features = ["tokio"] } debugid = { version = "0.8.0", default-features = false, features = ["serde"] } dec = { version = "0.4.8", default-features = false, features = ["serde"] } digest = { version = "0.10.6", features = ["mac", "std"] } either = { version = "1.8.0", features = ["serde"] } flate2 = { version = "1.0.24", features = ["zlib"] } form_urlencoded = { version = "1.2.1" } -futures = { version = "0.3.25" } futures-channel = { version = "0.3.30", features = ["sink"] } futures-core = { version = "0.3.30" } futures-executor = { version = "0.3.25" } @@ -238,9 +236,9 @@ rand = { version = "0.8.5", features = ["small_rng"] } rand_chacha = { version = "0.3.0" } rdkafka-sys = { git = "https://github.com/MaterializeInc/rust-rdkafka.git", features = ["cmake-build", "libz-static", "ssl-vendored", "zstd"] } regex = { version = "1.10.5" } -regex-automata = { version = "0.4.7", default-features = false, features = ["dfa-onepass", "dfa-search", "hybrid", "meta", "nfa", "perf", "unicode"] } +regex-automata = { version = "0.4.7", default-features = false, features = ["dfa-onepass", "hybrid", "meta", "nfa", "perf", "unicode"] } regex-syntax = { version = "0.8.3" } -reqwest = { version = "0.11.24", features = ["blocking", "json", "multipart", "native-tls-vendored"] } +reqwest = { version = "0.11.24", features = ["blocking", "json", "multipart", "native-tls-vendored", "stream"] } schemars = { version = "0.8.11", features = ["uuid1"] } scopeguard = { version = "1.1.0" } semver = { version = "1.0.23", features = ["serde"] }