From dba07525a6a60a4c8bdfe56c499ee1cb5d449997 Mon Sep 17 00:00:00 2001 From: Daniel Harrison <52528+danhhz@users.noreply.github.com> Date: Fri, 27 Sep 2024 11:43:27 -0700 Subject: [PATCH] storage: add DataSourceOther::Shard Splitting this out of #24138, which exposes the catalog shard via SQL, because there are some open questions to resolve for the adapter pieces, but the storage part is ready to go. Bonus, this neatly splits the needed code reviewers. The catalog is already a persist shard with the usual `SourceData, (), Timestamp, Diff` type, maintained entirely outside storage. Model this as a storage collection which happens to be backed by a persist shard. We could copy this shard into one managed entirely by storage, but as a performance and complexity optimization, we don't. --- src/storage-client/src/controller.rs | 4 +++- src/storage-client/src/storage_collections.rs | 21 ++++++++++++++----- src/storage-controller/src/lib.rs | 10 +++++---- 3 files changed, 25 insertions(+), 10 deletions(-) diff --git a/src/storage-client/src/controller.rs b/src/storage-client/src/controller.rs index 266931e3c3808..4c50e3f216184 100644 --- a/src/storage-client/src/controller.rs +++ b/src/storage-client/src/controller.rs @@ -130,6 +130,8 @@ pub enum DataSourceOther { TableWrites, /// Compute maintains, i.e. it is a `MATERIALIZED VIEW`. Compute, + /// A shard maintained outside of storage. + Shard(ShardId), } /// Describes a request to create a source. @@ -732,7 +734,7 @@ impl DataSource { pub fn in_txns(&self) -> bool { match self { DataSource::Other(DataSourceOther::TableWrites) => true, - DataSource::Other(DataSourceOther::Compute) + DataSource::Other(DataSourceOther::Compute | DataSourceOther::Shard(_)) | DataSource::Ingestion(_) | DataSource::IngestionExport { .. } | DataSource::Introspection(_) diff --git a/src/storage-client/src/storage_collections.rs b/src/storage-client/src/storage_collections.rs index 34407a348f414..c3f9eee2c7899 100644 --- a/src/storage-client/src/storage_collections.rs +++ b/src/storage-client/src/storage_collections.rs @@ -809,9 +809,10 @@ where let dependencies = match &data_source { DataSource::Introspection(_) | DataSource::Webhook - | DataSource::Other(DataSourceOther::TableWrites) - | DataSource::Progress - | DataSource::Other(DataSourceOther::Compute) => Vec::new(), + | DataSource::Other( + DataSourceOther::TableWrites | DataSourceOther::Compute | DataSourceOther::Shard(_), + ) + | DataSource::Progress => Vec::new(), DataSource::IngestionExport { ingestion_id, .. } => { // Ingestion exports depend on their primary source's remap // collection. @@ -1378,6 +1379,15 @@ where .into_iter() .map(|(id, description)| { let data_shard = storage_metadata.get_collection_shard::(id)?; + // For a shard managed outside storage, sanity check that the `CollectionMetdata::data_shard` agrees with the one declared in `DataSourceOther::Shard`. + match &description.data_source { + DataSource::Other(DataSourceOther::Shard(x)) if x != data_shard => { + let err = + format!("shard DataSource {x} did not match metadata {data_shard}"); + return Err(StorageError::InvalidUsage(err)); + } + _ => {} + }; let get_shard = |id| -> Result> { let shard = storage_metadata.get_collection_shard::(id)?; @@ -1473,7 +1483,7 @@ where | DataSource::Webhook | DataSource::Ingestion(_) | DataSource::Progress - | DataSource::Other(DataSourceOther::Compute) => {}, + | DataSource::Other(DataSourceOther::Compute | DataSourceOther::Shard(_)) => {}, DataSource::Other(DataSourceOther::TableWrites) => { let register_ts = register_ts.expect("caller should have provided a register_ts when creating a table"); if since_handle.since().elements() == &[T::minimum()] && !migrated_storage_collections.contains(&id) { @@ -1652,7 +1662,8 @@ where } self_collections.insert(id, collection_state); } - DataSource::Progress | DataSource::Other(DataSourceOther::Compute) => { + DataSource::Progress + | DataSource::Other(DataSourceOther::Compute | DataSourceOther::Shard(_)) => { self_collections.insert(id, collection_state); } DataSource::Ingestion(_) => { diff --git a/src/storage-controller/src/lib.rs b/src/storage-controller/src/lib.rs index fe3e9fe0db304..7e825d35a82fb 100644 --- a/src/storage-controller/src/lib.rs +++ b/src/storage-controller/src/lib.rs @@ -721,7 +721,8 @@ where self.collections.insert(id, collection_state); table_registers.push((id, write)); } - DataSource::Progress | DataSource::Other(DataSourceOther::Compute) => { + DataSource::Progress + | DataSource::Other(DataSourceOther::Compute | DataSourceOther::Shard(_)) => { debug!(data_source = ?collection_state.data_source, meta = ?metadata, "not registering {} with a controller persist worker", id); self.collections.insert(id, collection_state); } @@ -2941,9 +2942,10 @@ where let dependency = match &data_source { DataSource::Introspection(_) | DataSource::Webhook - | DataSource::Other(DataSourceOther::TableWrites) - | DataSource::Progress - | DataSource::Other(DataSourceOther::Compute) => vec![], + | DataSource::Other( + DataSourceOther::TableWrites | DataSourceOther::Compute | DataSourceOther::Shard(_), + ) + | DataSource::Progress => vec![], DataSource::IngestionExport { ingestion_id, .. } => { // Ingestion exports depend on their primary source's remap // collection.