Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adapter,storage: expose the catalog shard to SQL #24138

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/user/content/sql/system-catalog/mz_internal.md
Original file line number Diff line number Diff line change
Expand Up @@ -1139,6 +1139,7 @@ The `mz_webhook_sources` table contains a row for each webhook source in the sys
[`timestamp with time zone`]: /sql/types/timestamp

<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_activity_log_thinned -->
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_catalog_raw -->
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_cluster_workload_classes -->
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_compute_error_counts_raw_unified -->
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_continual_tasks -->
Expand Down
50 changes: 37 additions & 13 deletions src/adapter/src/catalog/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::sync::Arc;

use futures::future;
use itertools::Itertools;
use mz_adapter_types::compaction::CompactionWindow;
use mz_adapter_types::connection::ConnectionId;
use mz_catalog::builtin::{
Builtin, BuiltinLog, BuiltinTable, BuiltinView, BUILTIN_LOG_LOOKUP, BUILTIN_LOOKUP,
Expand All @@ -28,9 +29,9 @@ use mz_catalog::durable::objects::{
use mz_catalog::durable::{CatalogError, DurableCatalogError};
use mz_catalog::memory::error::{Error, ErrorKind};
use mz_catalog::memory::objects::{
CatalogEntry, CatalogItem, Cluster, ClusterReplica, DataSourceDesc, Database, Func, Index, Log,
Role, Schema, Source, StateDiff, StateUpdate, StateUpdateKind, Table, TableDataSource,
TemporaryItem, Type, UpdateFrom,
CatalogEntry, CatalogItem, Cluster, ClusterReplica, DataSourceDesc,
DataSourceIntrospectionDesc, Database, Func, Index, Log, Role, Schema, Source, StateDiff,
StateUpdate, StateUpdateKind, Table, TableDataSource, TemporaryItem, Type, UpdateFrom,
};
use mz_catalog::SYSTEM_CONN_ID;
use mz_compute_client::controller::ComputeReplicaConfig;
Expand Down Expand Up @@ -710,24 +711,47 @@ impl CatalogState {
)];
acl_items.extend_from_slice(&coll.access);

let mut custom_logical_compaction_window =
coll.is_retained_metrics_object.then(|| {
self.system_config()
.metrics_retention()
.try_into()
.expect("invalid metrics retention")
});
let mut timeline = Timeline::EpochMilliseconds;

// The catalog builtin source does not use the epoch
// milliseconds timeline, but instead uses its own timeline that
// starts at time 0 and advances by 1 on each catalog change. So
// we need a custom compaction policy that lags by 1 rather than
// the default 1000. We also set the timeline to
// `External("mz_catalog")`, which ensures we don't use the
// timestamp oracle when selecting a timestamp for queries
// involving the catalog builtin source, and errors obviously if
// the catalog builtin source is joined with collections in the
// EpochMillis timeline.
//
// TODO: move the catalog shard to the EpochMillis timeline.
if matches!(coll.data_source, DataSourceIntrospectionDesc::Catalog) {
// NOTE(benesch): this logical compaction window determines
// the read policy not just for introspection queries but
// also for bootstrapping the catalog itself.
custom_logical_compaction_window =
Some(CompactionWindow::Duration(Timestamp::new(1)));
timeline = Timeline::External("mz_catalog".into());
}

self.insert_item(
id,
coll.oid,
name.clone(),
CatalogItem::Source(Source {
create_sql: None,
data_source: DataSourceDesc::Introspection(coll.data_source),
data_source: DataSourceDesc::Introspection(coll.data_source.clone()),
desc: coll.desc.clone(),
timeline: Timeline::EpochMilliseconds,
timeline,
resolved_ids: ResolvedIds(BTreeSet::new()),
custom_logical_compaction_window: coll.is_retained_metrics_object.then(
|| {
self.system_config()
.metrics_retention()
.try_into()
.expect("invalid metrics retention")
},
),
custom_logical_compaction_window,
is_retained_metrics_object: coll.is_retained_metrics_object,
available_source_references: None,
}),
Expand Down
25 changes: 24 additions & 1 deletion src/adapter/src/catalog/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use futures::future::{BoxFuture, FutureExt};
use mz_adapter_types::compaction::CompactionWindow;
use mz_catalog::builtin::{
Builtin, BuiltinTable, Fingerprint, BUILTINS, BUILTIN_CLUSTERS, BUILTIN_CLUSTER_REPLICAS,
BUILTIN_PREFIXES, BUILTIN_ROLES, MZ_STORAGE_USAGE_BY_SHARD_DESCRIPTION,
BUILTIN_PREFIXES, BUILTIN_ROLES, MZ_CATALOG_RAW, MZ_STORAGE_USAGE_BY_SHARD_DESCRIPTION,
RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL,
};
use mz_catalog::config::StateConfig;
Expand Down Expand Up @@ -59,6 +59,7 @@ use mz_sql::session::user::{MZ_SYSTEM_ROLE_ID, SYSTEM_USER};
use mz_sql::session::vars::{SessionVars, SystemVars, VarError, VarInput};
use mz_sql_parser::ast::display::AstDisplay;
use mz_storage_client::controller::StorageController;
use mz_storage_client::controller::StorageTxn;
use timely::Container;
use tracing::{error, info, warn, Instrument};
use uuid::Uuid;
Expand Down Expand Up @@ -521,8 +522,30 @@ impl Catalog {
let mut state = self.state.clone();

let mut storage = self.storage().await;
let catalog_shard_id = storage.shard_id();
let mut txn = storage.transaction().await?;

// The `mz_catalog_raw` builtin source requires special handling. This
// builtin source exposes the contents of the catalog shard itself. We
// need to install the collection ID -> shard ID mapping manually (i.e.,
// we can't let the storage controller to generate a shard ID for the
// collection) because the catalog shard has already been created with a
// deterministically chosen shard ID.
{
let collection_id = self.resolve_builtin_storage_collection(&MZ_CATALOG_RAW);
match txn.get_collection_metadata().get(&collection_id) {
None => {
txn.insert_collection_metadata([(collection_id, catalog_shard_id)].into())
.map_err(mz_catalog::durable::DurableCatalogError::from)?;
}
Some(x) => {
// A mapping for `mz_catalog_raw` already exists. Ensure it is
// what we expect as a sanity check.
assert_eq!(x, &catalog_shard_id)
}
}
}

storage_controller
.initialize_state(&mut txn, collections, storage_collections_to_drop)
.await
Expand Down
9 changes: 6 additions & 3 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ use mz_catalog::config::{AwsPrincipalContext, BuiltinItemMigrationConfig, Cluste
use mz_catalog::durable::OpenableDurableCatalogState;
use mz_catalog::memory::objects::{
CatalogEntry, CatalogItem, ClusterReplicaProcessStatus, ClusterVariantManaged, Connection,
DataSourceDesc, TableDataSource,
DataSourceDesc, DataSourceIntrospectionDesc, TableDataSource,
};
use mz_cloud_resources::{CloudResourceController, VpcEndpointConfig, VpcEndpointEvent};
use mz_compute_client::controller::error::InstanceMissing;
Expand Down Expand Up @@ -2448,8 +2448,11 @@ impl Coordinator {
(DataSource::Webhook, Some(source_status_collection_id))
}
DataSourceDesc::Progress => (DataSource::Progress, None),
DataSourceDesc::Introspection(introspection) => {
(DataSource::Introspection(introspection), None)
DataSourceDesc::Introspection(DataSourceIntrospectionDesc::Storage(
introspection,
)) => (DataSource::Introspection(introspection), None),
DataSourceDesc::Introspection(DataSourceIntrospectionDesc::Catalog) => {
(DataSource::Other, None)
}
};
CollectionDescription {
Expand Down
Loading
Loading