Skip to content

Commit

Permalink
adapter,storage: expose the catalog shard to SQL
Browse files Browse the repository at this point in the history
Adapter's catalog is now stored in a persist shard with our usual
`<SourceData, (), Timestamp, i64>` types. As a result, we can present
this shard for introspection to the rest of the system.

A number of internal tables contain information derived entirely from
the catalog, which is the source of truth. For these tables, when the
catalog changes, we issue a second write to the storage controller
Append api.

This PR sets us up for starting to replace those table with `VIEW`s. Not
only does this save us the second write's complexity and performance
hit, it reduces the chance for discrepancies when catalog changes are
happening in multiple places (Pv2).

It also happens to be extremely cool for debugging. Now that the catalog
also contains the storage controller state, this allows us to introspect
that, too.

```
materialize=> COPY (SUBSCRIBE TO (select data from mz_internal.mz_catalog_raw where data->>'kind' = 'UnfinalizedShard') as of 1) TO STDOUT;
13	1	{"key":{"shard":"sc6df7783-69cb-4b31-9b45-98c8e2799076"},"kind":"UnfinalizedShard"}

materialize=> COPY (SUBSCRIBE TO (select data from mz_internal.mz_catalog_raw where data->>'kind' = 'StorageCollectionMetadata') as of 1) TO STDOUT;
6	1	{"key":{"id":{"value":{"System":450}}},"kind":"StorageCollectionMetadata","value":{"shard":"s04d31384-3a30-48d4-8ca8-6d35464ebd56"}}
...
6	1	{"key":{"id":{"value":{"System":487}}},"kind":"StorageCollectionMetadata","value":{"shard":"s9b99714a-0f13-4653-a6e6-92cf0eab50a8"}}
12	1	{"key":{"id":{"value":{"User":1}}},"kind":"StorageCollectionMetadata","value":{"shard":"sc6df7783-69cb-4b31-9b45-98c8e2799076"}}
13	-1	{"key":{"id":{"value":{"User":1}}},"kind":"StorageCollectionMetadata","value":{"shard":"sc6df7783-69cb-4b31-9b45-98c8e2799076"}}
```
  • Loading branch information
danhhz committed Apr 10, 2024
1 parent 8744741 commit 1bacb83
Show file tree
Hide file tree
Showing 13 changed files with 105 additions and 35 deletions.
31 changes: 28 additions & 3 deletions src/adapter/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ use mz_adapter_types::connection::ConnectionId;
use mz_audit_log::{EventDetails, EventType, FullNameV1, IdFullNameV1, ObjectType, VersionedEvent};
use mz_build_info::DUMMY_BUILD_INFO;
use mz_catalog::builtin::{
BuiltinCluster, BuiltinLog, BuiltinSource, BuiltinTable, BuiltinType, BUILTINS,
BUILTIN_PREFIXES, MZ_INTROSPECTION_CLUSTER,
BuiltinCluster, BuiltinLog, BuiltinSource, BuiltinSourceType, BuiltinTable, BuiltinType,
BUILTINS, BUILTIN_PREFIXES, MZ_INTROSPECTION_CLUSTER,
};
use mz_catalog::config::{ClusterReplicaSizeMap, Config, StateConfig};
use mz_catalog::durable::{test_bootstrap_args, DurableCatalogState, Transaction};
use mz_catalog::memory::error::{AmbiguousRename, Error, ErrorKind};
use mz_catalog::memory::objects::{
CatalogEntry, CatalogItem, Cluster, ClusterConfig, ClusterReplica, ClusterReplicaProcessStatus,
Database, Role, Schema,
DataSourceDesc, Database, Role, Schema,
};
use mz_catalog::SYSTEM_CONN_ID;
use mz_compute_types::dataflows::DataflowDescription;
Expand Down Expand Up @@ -182,6 +182,31 @@ impl Catalog {
let mut storage = self.storage().await;
let mut txn = storage.transaction().await?;

// WIP we communicate to the storage controller that the mz_catalog_raw
// collection already has a shard_id (that it can reuse as an
// optimization instead of needlessly copying it), but this happens much
// later in create_collections and we invent and write down the shard
// ids here. figure out how we want to model this
for e in self.entries() {
let shard_id = match &e.item {
CatalogItem::Source(x) => match x.data_source {
DataSourceDesc::Introspection(BuiltinSourceType::Catalog) => {
self.state.config().shard_id.to_string()
}
_ => continue,
},
_ => continue,
};
use mz_storage_client::controller::StorageTxn;
match txn.get_collection_metadata().get(&e.id()) {
Some(x) => assert_eq!(x, &shard_id),
None => {
txn.insert_collection_metadata([(e.id(), shard_id)].into())
.map_err(mz_catalog::durable::DurableCatalogError::from)?;
}
}
}

storage_controller
.initialize_state(
&mut txn,
Expand Down
1 change: 1 addition & 0 deletions src/adapter/src/catalog/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ impl Catalog {
timestamp_interval: Duration::from_secs(1),
now: config.now.clone(),
connection_context: config.connection_context,
shard_id: storage.shard_id(),
},
cluster_replica_sizes: config.cluster_replica_sizes,
availability_zones: config.availability_zones,
Expand Down
2 changes: 2 additions & 0 deletions src/adapter/src/catalog/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use itertools::Itertools;
use mz_adapter_types::compaction::CompactionWindow;
use mz_adapter_types::connection::ConnectionId;
use mz_catalog::durable::Transaction;
use mz_persist_types::ShardId;
use mz_sql::session::metadata::SessionMetadata;
use once_cell::sync::Lazy;
use serde::Serialize;
Expand Down Expand Up @@ -174,6 +175,7 @@ impl CatalogState {
connection_context: ConnectionContext::for_tests(Arc::new(
InMemorySecretsController::new(),
)),
shard_id: ShardId::new(),
},
cluster_replica_sizes: Default::default(),
availability_zones: Default::default(),
Expand Down
8 changes: 6 additions & 2 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ use crate::statement_logging::StatementEndedExecutionReason;
use crate::util::{ClientTransmitter, CompletedClientTransmitter, ResultExt};
use crate::webhook::{WebhookAppenderInvalidator, WebhookConcurrencyLimiter};
use crate::{flags, AdapterNotice, ReadHolds, TimestampProvider};
use mz_catalog::builtin::BUILTINS;
use mz_catalog::builtin::{BuiltinSourceType, BUILTINS};
use mz_catalog::durable::OpenableDurableCatalogState;
use mz_ore::future::TimeoutError;
use mz_timestamp_oracle::postgres_oracle::{
Expand Down Expand Up @@ -1947,9 +1947,13 @@ impl Coordinator {
(DataSource::Webhook, Some(source_status_collection_id))
}
DataSourceDesc::Progress => (DataSource::Progress, None),
DataSourceDesc::Introspection(introspection) => {
DataSourceDesc::Introspection(BuiltinSourceType::Storage(introspection)) => {
(DataSource::Introspection(*introspection), None)
}
DataSourceDesc::Introspection(BuiltinSourceType::Catalog) => (
DataSource::Other(DataSourceOther::Shard(catalog.state().config().shard_id)),
None,
),
};
CollectionDescription {
desc: source.desc.clone(),
Expand Down
55 changes: 37 additions & 18 deletions src/catalog/src/builtin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,13 +130,19 @@ pub struct BuiltinTable {
pub access: Vec<MzAclItem>,
}

#[derive(Clone, Copy, Debug, Hash, Serialize)]
pub enum BuiltinSourceType {
Storage(IntrospectionType),
Catalog,
}

#[derive(Clone, Debug, Hash, Serialize)]
pub struct BuiltinSource {
pub name: &'static str,
pub schema: &'static str,
pub oid: u32,
pub desc: RelationDesc,
pub data_source: IntrospectionType,
pub data_source: BuiltinSourceType,
/// Whether the source's retention policy is controlled by
/// the system variable `METRICS_RETENTION`
pub is_retained_metrics_object: bool,
Expand Down Expand Up @@ -1974,7 +1980,7 @@ pub static MZ_COMPUTE_DEPENDENCIES: Lazy<BuiltinSource> = Lazy::new(|| BuiltinSo
name: "mz_compute_dependencies",
schema: MZ_INTERNAL_SCHEMA,
oid: oid::SOURCE_MZ_COMPUTE_DEPENDENCIES_OID,
data_source: IntrospectionType::ComputeDependencies,
data_source: BuiltinSourceType::Storage(IntrospectionType::ComputeDependencies),
desc: RelationDesc::empty()
.with_column("object_id", ScalarType::String.nullable(false))
.with_column("dependency_id", ScalarType::String.nullable(false)),
Expand All @@ -1985,7 +1991,7 @@ pub static MZ_COMPUTE_HYDRATION_STATUSES: Lazy<BuiltinSource> = Lazy::new(|| Bui
name: "mz_compute_hydration_statuses",
schema: MZ_INTERNAL_SCHEMA,
oid: oid::SOURCE_MZ_COMPUTE_HYDRATION_STATUSES_OID,
data_source: IntrospectionType::ComputeHydrationStatus,
data_source: BuiltinSourceType::Storage(IntrospectionType::ComputeHydrationStatus),
desc: RelationDesc::empty()
.with_column("object_id", ScalarType::String.nullable(false))
.with_column("replica_id", ScalarType::String.nullable(false))
Expand All @@ -1998,7 +2004,7 @@ pub static MZ_COMPUTE_OPERATOR_HYDRATION_STATUSES_PER_WORKER: Lazy<BuiltinSource
name: "mz_compute_operator_hydration_statuses_per_worker",
schema: MZ_INTERNAL_SCHEMA,
oid: oid::SOURCE_MZ_COMPUTE_OPERATOR_HYDRATION_STATUSES_PER_WORKER_OID,
data_source: IntrospectionType::ComputeOperatorHydrationStatus,
data_source: BuiltinSourceType::Storage(IntrospectionType::ComputeOperatorHydrationStatus),
desc: RelationDesc::empty()
.with_column("object_id", ScalarType::String.nullable(false))
.with_column("physical_plan_node_id", ScalarType::UInt64.nullable(false))
Expand All @@ -2009,6 +2015,16 @@ pub static MZ_COMPUTE_OPERATOR_HYDRATION_STATUSES_PER_WORKER: Lazy<BuiltinSource
access: vec![PUBLIC_SELECT],
});

pub static MZ_CATALOG_RAW: Lazy<BuiltinSource> = Lazy::new(|| BuiltinSource {
name: "mz_catalog_raw",
schema: MZ_INTERNAL_SCHEMA,
oid: oid::SOURCE_CATALOG_RAW_OID,
data_source: BuiltinSourceType::Catalog,
desc: crate::durable::persist::desc(),
is_retained_metrics_object: false,
access: vec![MONITOR_SELECT],
});

pub static MZ_DATABASES: Lazy<BuiltinTable> = Lazy::new(|| BuiltinTable {
name: "mz_databases",
schema: MZ_CATALOG_SCHEMA,
Expand Down Expand Up @@ -2515,7 +2531,7 @@ pub static MZ_CLUSTER_REPLICA_HEARTBEATS: Lazy<BuiltinSource> = Lazy::new(|| Bui
name: "mz_cluster_replica_heartbeats",
schema: MZ_INTERNAL_SCHEMA,
oid: oid::SOURCE_MZ_CLUSTER_REPLICA_HEARTBEATS_OID,
data_source: IntrospectionType::ComputeReplicaHeartbeats,
data_source: BuiltinSourceType::Storage(IntrospectionType::ComputeReplicaHeartbeats),
desc: RelationDesc::empty()
.with_column("replica_id", ScalarType::String.nullable(false))
.with_column(
Expand Down Expand Up @@ -2548,7 +2564,7 @@ pub static MZ_SOURCE_STATUS_HISTORY: Lazy<BuiltinSource> = Lazy::new(|| BuiltinS
name: "mz_source_status_history",
schema: MZ_INTERNAL_SCHEMA,
oid: oid::SOURCE_MZ_SOURCE_STATUS_HISTORY_OID,
data_source: IntrospectionType::SourceStatusHistory,
data_source: BuiltinSourceType::Storage(IntrospectionType::SourceStatusHistory),
desc: MZ_SOURCE_STATUS_HISTORY_DESC.clone(),
is_retained_metrics_object: false,
access: vec![PUBLIC_SELECT],
Expand All @@ -2559,7 +2575,9 @@ pub static MZ_AWS_PRIVATELINK_CONNECTION_STATUS_HISTORY: Lazy<BuiltinSource> =
name: "mz_aws_privatelink_connection_status_history",
schema: MZ_INTERNAL_SCHEMA,
oid: oid::SOURCE_MZ_AWS_PRIVATELINK_CONNECTION_STATUS_HISTORY_OID,
data_source: IntrospectionType::PrivatelinkConnectionStatusHistory,
data_source: BuiltinSourceType::Storage(
IntrospectionType::PrivatelinkConnectionStatusHistory,
),
desc: MZ_AWS_PRIVATELINK_CONNECTION_STATUS_HISTORY_DESC.clone(),
is_retained_metrics_object: false,
access: vec![PUBLIC_SELECT],
Expand Down Expand Up @@ -2602,7 +2620,7 @@ pub static MZ_STATEMENT_EXECUTION_HISTORY: Lazy<BuiltinSource> = Lazy::new(|| Bu
name: "mz_statement_execution_history",
schema: MZ_INTERNAL_SCHEMA,
oid: oid::SOURCE_MZ_STATEMENT_EXECUTION_HISTORY_OID,
data_source: IntrospectionType::StatementExecutionHistory,
data_source: BuiltinSourceType::Storage(IntrospectionType::StatementExecutionHistory),
desc: MZ_STATEMENT_EXECUTION_HISTORY_DESC.clone(),
is_retained_metrics_object: false,
access: vec![MONITOR_SELECT],
Expand All @@ -2627,7 +2645,7 @@ pub static MZ_PREPARED_STATEMENT_HISTORY: Lazy<BuiltinSource> = Lazy::new(|| Bui
name: "mz_prepared_statement_history",
schema: MZ_INTERNAL_SCHEMA,
oid: oid::SOURCE_MZ_PREPARED_STATEMENT_HISTORY_OID,
data_source: IntrospectionType::PreparedStatementHistory,
data_source: BuiltinSourceType::Storage(IntrospectionType::PreparedStatementHistory),
desc: MZ_PREPARED_STATEMENT_HISTORY_DESC.clone(),
is_retained_metrics_object: false,
access: vec![MONITOR_SELECT],
Expand All @@ -2638,7 +2656,7 @@ pub static MZ_SQL_TEXT: Lazy<BuiltinSource> = Lazy::new(|| BuiltinSource {
schema: MZ_INTERNAL_SCHEMA,
oid: oid::SOURCE_MZ_SQL_TEXT_OID,
desc: MZ_SQL_TEXT_DESC.clone(),
data_source: IntrospectionType::SqlText,
data_source: BuiltinSourceType::Storage(IntrospectionType::SqlText),
is_retained_metrics_object: false,
access: vec![MONITOR_SELECT],
});
Expand Down Expand Up @@ -2688,7 +2706,7 @@ pub static MZ_SESSION_HISTORY: Lazy<BuiltinSource> = Lazy::new(|| BuiltinSource
name: "mz_session_history",
schema: MZ_INTERNAL_SCHEMA,
oid: oid::SOURCE_MZ_SESSION_HISTORY_OID,
data_source: IntrospectionType::SessionHistory,
data_source: BuiltinSourceType::Storage(IntrospectionType::SessionHistory),
desc: MZ_SESSION_HISTORY_DESC.clone(),
is_retained_metrics_object: false,
access: vec![PUBLIC_SELECT],
Expand Down Expand Up @@ -2772,7 +2790,7 @@ pub static MZ_STATEMENT_LIFECYCLE_HISTORY: Lazy<BuiltinSource> = Lazy::new(|| Bu
"occurred_at",
ScalarType::TimestampTz { precision: None }.nullable(false),
),
data_source: IntrospectionType::StatementLifecycleHistory,
data_source: BuiltinSourceType::Storage(IntrospectionType::StatementLifecycleHistory),
is_retained_metrics_object: false,
// TODO[btv]: Maybe this should be public instead of
// `MONITOR_REDACTED`, but since that would be a backwards-compatible
Expand Down Expand Up @@ -2860,7 +2878,7 @@ pub static MZ_SINK_STATUS_HISTORY: Lazy<BuiltinSource> = Lazy::new(|| BuiltinSou
name: "mz_sink_status_history",
schema: MZ_INTERNAL_SCHEMA,
oid: oid::SOURCE_MZ_SINK_STATUS_HISTORY_OID,
data_source: IntrospectionType::SinkStatusHistory,
data_source: BuiltinSourceType::Storage(IntrospectionType::SinkStatusHistory),
desc: MZ_SINK_STATUS_HISTORY_DESC.clone(),
is_retained_metrics_object: false,
access: vec![PUBLIC_SELECT],
Expand Down Expand Up @@ -2977,7 +2995,7 @@ pub static MZ_CLUSTER_REPLICA_FRONTIERS: Lazy<BuiltinSource> = Lazy::new(|| Buil
name: "mz_cluster_replica_frontiers",
schema: MZ_INTERNAL_SCHEMA,
oid: oid::SOURCE_MZ_CLUSTER_REPLICA_FRONTIERS_OID,
data_source: IntrospectionType::ReplicaFrontiers,
data_source: BuiltinSourceType::Storage(IntrospectionType::ReplicaFrontiers),
desc: RelationDesc::empty()
.with_column("object_id", ScalarType::String.nullable(false))
.with_column("replica_id", ScalarType::String.nullable(false))
Expand All @@ -2990,7 +3008,7 @@ pub static MZ_FRONTIERS: Lazy<BuiltinSource> = Lazy::new(|| BuiltinSource {
name: "mz_frontiers",
schema: MZ_INTERNAL_SCHEMA,
oid: oid::SOURCE_MZ_FRONTIERS_OID,
data_source: IntrospectionType::Frontiers,
data_source: BuiltinSourceType::Storage(IntrospectionType::Frontiers),
desc: RelationDesc::empty()
.with_column("object_id", ScalarType::String.nullable(false))
.with_column("read_frontier", ScalarType::MzTimestamp.nullable(true))
Expand Down Expand Up @@ -3106,7 +3124,7 @@ pub static MZ_SOURCE_STATISTICS_RAW: Lazy<BuiltinSource> = Lazy::new(|| BuiltinS
name: "mz_source_statistics_raw",
schema: MZ_INTERNAL_SCHEMA,
oid: oid::SOURCE_MZ_SOURCE_STATISTICS_RAW_OID,
data_source: IntrospectionType::StorageSourceStatistics,
data_source: BuiltinSourceType::Storage(IntrospectionType::StorageSourceStatistics),
desc: MZ_SOURCE_STATISTICS_RAW_DESC.clone(),
is_retained_metrics_object: true,
access: vec![PUBLIC_SELECT],
Expand All @@ -3115,7 +3133,7 @@ pub static MZ_SINK_STATISTICS_RAW: Lazy<BuiltinSource> = Lazy::new(|| BuiltinSou
name: "mz_sink_statistics_raw",
schema: MZ_INTERNAL_SCHEMA,
oid: oid::SOURCE_MZ_SINK_STATISTICS_RAW_OID,
data_source: IntrospectionType::StorageSinkStatistics,
data_source: BuiltinSourceType::Storage(IntrospectionType::StorageSinkStatistics),
desc: MZ_SINK_STATISTICS_RAW_DESC.clone(),
is_retained_metrics_object: true,
access: vec![PUBLIC_SELECT],
Expand All @@ -3125,7 +3143,7 @@ pub static MZ_STORAGE_SHARDS: Lazy<BuiltinSource> = Lazy::new(|| BuiltinSource {
name: "mz_storage_shards",
schema: MZ_INTERNAL_SCHEMA,
oid: oid::SOURCE_MZ_STORAGE_SHARDS_OID,
data_source: IntrospectionType::ShardMapping,
data_source: BuiltinSourceType::Storage(IntrospectionType::ShardMapping),
desc: RelationDesc::empty()
.with_column("object_id", ScalarType::String.nullable(false))
.with_column("shard_id", ScalarType::String.nullable(false)),
Expand Down Expand Up @@ -7109,6 +7127,7 @@ pub static BUILTINS_STATIC: Lazy<Vec<Builtin<NameReference>>> = Lazy::new(|| {
Builtin::Source(&MZ_COMPUTE_DEPENDENCIES),
Builtin::Source(&MZ_COMPUTE_HYDRATION_STATUSES),
Builtin::Source(&MZ_COMPUTE_OPERATOR_HYDRATION_STATUSES_PER_WORKER),
Builtin::Source(&MZ_CATALOG_RAW),
Builtin::View(&MZ_HYDRATION_STATUSES),
Builtin::View(&MZ_MATERIALIZATION_LAG),
Builtin::View(&MZ_COMPUTE_ERROR_COUNTS_PER_WORKER),
Expand Down
6 changes: 5 additions & 1 deletion src/catalog/src/durable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use std::sync::Arc;
use std::time::Duration;

use async_trait::async_trait;
use mz_persist_types::ShardId;
use uuid::Uuid;

use mz_audit_log::{VersionedEvent, VersionedStorageUsage};
Expand Down Expand Up @@ -45,7 +46,7 @@ mod error;
pub mod initialize;
mod metrics;
pub mod objects;
mod persist;
pub(crate) mod persist;
mod transaction;
mod upgrade;

Expand Down Expand Up @@ -278,6 +279,9 @@ pub trait DurableCatalogState: ReadOnlyDurableCatalogState {
let id = id.into_element();
Ok(ReplicaId::System(id))
}

/// WIP
fn shard_id(&self) -> ShardId;
}

/// Creates an openable durable catalog state implemented using persist.
Expand Down
12 changes: 7 additions & 5 deletions src/catalog/src/durable/persist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ const CATALOG_SHARD_NAME: &str = "catalog";
const UPGRADE_SHARD_NAME: &str = "catalog_upgrade";

/// Seed used to generate the persist shard ID for the catalog.
const CATALOG_SEED: usize = 1;
pub const CATALOG_SEED: usize = 1;
/// Seed used to generate the catalog upgrade shard ID.
///
/// All state that gets written to persist is tagged with the version of the code that wrote that
Expand Down Expand Up @@ -735,9 +735,7 @@ impl UnopenedPersistCatalogState {
let since_handle = persist_client
.open_critical_since(
catalog_shard_id,
// TODO: We may need to use a different critical reader
// id for this if we want to be able to introspect it via SQL.
PersistClient::CONTROLLER_CRITICAL_SINCE,
PersistClient::CATALOG_CRITICAL_SINCE,
Diagnostics {
shard_name: CATALOG_SHARD_NAME.to_string(),
handle_purpose: "durable catalog state critical since".to_string(),
Expand Down Expand Up @@ -1413,6 +1411,10 @@ impl DurableCatalogState for PersistCatalogState {

Ok(events)
}

fn shard_id(&self) -> ShardId {
self.shard_id
}
}

/// Deterministically generate an ID for the given `organization_id` and `seed`.
Expand All @@ -1425,7 +1427,7 @@ fn shard_id(organization_id: Uuid, seed: usize) -> ShardId {

/// Returns the schema of the `Row`s/`SourceData`s stored in the persist
/// shard backing the catalog.
fn desc() -> RelationDesc {
pub(crate) fn desc() -> RelationDesc {
RelationDesc::empty().with_column("data", ScalarType::Jsonb.nullable(false))
}

Expand Down
5 changes: 2 additions & 3 deletions src/catalog/src/memory/objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ use mz_sql::plan::{
use mz_sql::rbac;
use mz_sql::session::vars::OwnedVarInput;
use mz_sql_parser::ast::ClusterScheduleOptionValue;
use mz_storage_client::controller::IntrospectionType;
use mz_storage_types::connections::inline::ReferencedConnection;
use mz_storage_types::sinks::{KafkaSinkFormat, SinkEnvelope, StorageSinkConnection};
use mz_storage_types::sources::{
Expand All @@ -61,7 +60,7 @@ use serde::{Deserialize, Serialize};
use timely::progress::Antichain;
use tracing::debug;

use crate::builtin::{MZ_INTROSPECTION_CLUSTER, MZ_SYSTEM_CLUSTER};
use crate::builtin::{BuiltinSourceType, MZ_INTROSPECTION_CLUSTER, MZ_SYSTEM_CLUSTER};
use crate::durable;

#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
Expand Down Expand Up @@ -389,7 +388,7 @@ pub enum DataSourceDesc {
/// Receives data from some other source
Source,
/// Receives introspection data from an internal system
Introspection(IntrospectionType),
Introspection(BuiltinSourceType),
/// Receives data from the source's reclocking/remapping operations.
Progress,
/// Receives data from HTTP requests.
Expand Down
Loading

0 comments on commit 1bacb83

Please sign in to comment.