Skip to content

Commit

Permalink
storage: add DataSourceOther::Shard
Browse files Browse the repository at this point in the history
Splitting this out of MaterializeInc#24138, which exposes the catalog shard via SQL,
because there are some open questions to resolve for the adapter pieces,
but the storage part is ready to go. Bonus, this neatly splits the
needed code reviewers.

The catalog is already a persist shard with the usual `SourceData, (),
Timestamp, Diff` type, maintained entirely outside storage. Model this
as a storage collection which happens to be backed by a persist shard.
We could copy this shard into one managed entirely by storage, but as a
performance and complexity optimization, we don't.
  • Loading branch information
danhhz committed Sep 27, 2024
1 parent 30ab0ea commit dba0752
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 10 deletions.
4 changes: 3 additions & 1 deletion src/storage-client/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ pub enum DataSourceOther {
TableWrites,
/// Compute maintains, i.e. it is a `MATERIALIZED VIEW`.
Compute,
/// A shard maintained outside of storage.
Shard(ShardId),
}

/// Describes a request to create a source.
Expand Down Expand Up @@ -732,7 +734,7 @@ impl DataSource {
pub fn in_txns(&self) -> bool {
match self {
DataSource::Other(DataSourceOther::TableWrites) => true,
DataSource::Other(DataSourceOther::Compute)
DataSource::Other(DataSourceOther::Compute | DataSourceOther::Shard(_))
| DataSource::Ingestion(_)
| DataSource::IngestionExport { .. }
| DataSource::Introspection(_)
Expand Down
21 changes: 16 additions & 5 deletions src/storage-client/src/storage_collections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -809,9 +809,10 @@ where
let dependencies = match &data_source {
DataSource::Introspection(_)
| DataSource::Webhook
| DataSource::Other(DataSourceOther::TableWrites)
| DataSource::Progress
| DataSource::Other(DataSourceOther::Compute) => Vec::new(),
| DataSource::Other(
DataSourceOther::TableWrites | DataSourceOther::Compute | DataSourceOther::Shard(_),
)
| DataSource::Progress => Vec::new(),
DataSource::IngestionExport { ingestion_id, .. } => {
// Ingestion exports depend on their primary source's remap
// collection.
Expand Down Expand Up @@ -1378,6 +1379,15 @@ where
.into_iter()
.map(|(id, description)| {
let data_shard = storage_metadata.get_collection_shard::<T>(id)?;
// For a shard managed outside storage, sanity check that the `CollectionMetdata::data_shard` agrees with the one declared in `DataSourceOther::Shard`.
match &description.data_source {
DataSource::Other(DataSourceOther::Shard(x)) if x != data_shard => {
let err =
format!("shard DataSource {x} did not match metadata {data_shard}");
return Err(StorageError::InvalidUsage(err));
}
_ => {}
};

let get_shard = |id| -> Result<ShardId, StorageError<T>> {
let shard = storage_metadata.get_collection_shard::<T>(id)?;
Expand Down Expand Up @@ -1473,7 +1483,7 @@ where
| DataSource::Webhook
| DataSource::Ingestion(_)
| DataSource::Progress
| DataSource::Other(DataSourceOther::Compute) => {},
| DataSource::Other(DataSourceOther::Compute | DataSourceOther::Shard(_)) => {},
DataSource::Other(DataSourceOther::TableWrites) => {
let register_ts = register_ts.expect("caller should have provided a register_ts when creating a table");
if since_handle.since().elements() == &[T::minimum()] && !migrated_storage_collections.contains(&id) {
Expand Down Expand Up @@ -1652,7 +1662,8 @@ where
}
self_collections.insert(id, collection_state);
}
DataSource::Progress | DataSource::Other(DataSourceOther::Compute) => {
DataSource::Progress
| DataSource::Other(DataSourceOther::Compute | DataSourceOther::Shard(_)) => {
self_collections.insert(id, collection_state);
}
DataSource::Ingestion(_) => {
Expand Down
10 changes: 6 additions & 4 deletions src/storage-controller/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,8 @@ where
self.collections.insert(id, collection_state);
table_registers.push((id, write));
}
DataSource::Progress | DataSource::Other(DataSourceOther::Compute) => {
DataSource::Progress
| DataSource::Other(DataSourceOther::Compute | DataSourceOther::Shard(_)) => {
debug!(data_source = ?collection_state.data_source, meta = ?metadata, "not registering {} with a controller persist worker", id);
self.collections.insert(id, collection_state);
}
Expand Down Expand Up @@ -2941,9 +2942,10 @@ where
let dependency = match &data_source {
DataSource::Introspection(_)
| DataSource::Webhook
| DataSource::Other(DataSourceOther::TableWrites)
| DataSource::Progress
| DataSource::Other(DataSourceOther::Compute) => vec![],
| DataSource::Other(
DataSourceOther::TableWrites | DataSourceOther::Compute | DataSourceOther::Shard(_),
)
| DataSource::Progress => vec![],
DataSource::IngestionExport { ingestion_id, .. } => {
// Ingestion exports depend on their primary source's remap
// collection.
Expand Down

0 comments on commit dba0752

Please sign in to comment.