Skip to content

Commit

Permalink
Merge pull request #22399 from teskje/storage-managed-compute-depende…
Browse files Browse the repository at this point in the history
…ncies

Use storage-managed collections for compute controller introspection
  • Loading branch information
teskje authored Oct 18, 2023
2 parents 314c6e9 + 88096d3 commit e47e437
Show file tree
Hide file tree
Showing 16 changed files with 224 additions and 244 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

50 changes: 9 additions & 41 deletions src/adapter/src/catalog/builtin_table_updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@ use crate::coord::ConnMeta;
use crate::subscribe::ActiveSubscribe;
use mz_catalog::builtin::{
MZ_AGGREGATES, MZ_ARRAY_TYPES, MZ_AUDIT_EVENTS, MZ_AWS_PRIVATELINK_CONNECTIONS, MZ_BASE_TYPES,
MZ_CLUSTERS, MZ_CLUSTER_LINKS, MZ_CLUSTER_REPLICAS, MZ_CLUSTER_REPLICA_HEARTBEATS,
MZ_CLUSTER_REPLICA_METRICS, MZ_CLUSTER_REPLICA_SIZES, MZ_CLUSTER_REPLICA_STATUSES, MZ_COLUMNS,
MZ_COMMENTS, MZ_COMPUTE_DEPENDENCIES, MZ_CONNECTIONS, MZ_DATABASES, MZ_DEFAULT_PRIVILEGES,
MZ_EGRESS_IPS, MZ_FUNCTIONS, MZ_INDEXES, MZ_INDEX_COLUMNS, MZ_INTERNAL_CLUSTER_REPLICAS,
MZ_KAFKA_CONNECTIONS, MZ_KAFKA_SINKS, MZ_KAFKA_SOURCES, MZ_LIST_TYPES, MZ_MAP_TYPES,
MZ_MATERIALIZED_VIEWS, MZ_OBJECT_DEPENDENCIES, MZ_OPERATORS, MZ_POSTGRES_SOURCES,
MZ_PSEUDO_TYPES, MZ_ROLES, MZ_ROLE_MEMBERS, MZ_SCHEMAS, MZ_SECRETS, MZ_SESSIONS, MZ_SINKS,
MZ_SOURCES, MZ_SSH_TUNNEL_CONNECTIONS, MZ_STORAGE_USAGE_BY_SHARD, MZ_SUBSCRIPTIONS,
MZ_SYSTEM_PRIVILEGES, MZ_TABLES, MZ_TYPES, MZ_TYPE_PG_METADATA, MZ_VIEWS, MZ_WEBHOOKS_SOURCES,
MZ_CLUSTERS, MZ_CLUSTER_LINKS, MZ_CLUSTER_REPLICAS, MZ_CLUSTER_REPLICA_METRICS,
MZ_CLUSTER_REPLICA_SIZES, MZ_CLUSTER_REPLICA_STATUSES, MZ_COLUMNS, MZ_COMMENTS, MZ_CONNECTIONS,
MZ_DATABASES, MZ_DEFAULT_PRIVILEGES, MZ_EGRESS_IPS, MZ_FUNCTIONS, MZ_INDEXES, MZ_INDEX_COLUMNS,
MZ_INTERNAL_CLUSTER_REPLICAS, MZ_KAFKA_CONNECTIONS, MZ_KAFKA_SINKS, MZ_KAFKA_SOURCES,
MZ_LIST_TYPES, MZ_MAP_TYPES, MZ_MATERIALIZED_VIEWS, MZ_OBJECT_DEPENDENCIES, MZ_OPERATORS,
MZ_POSTGRES_SOURCES, MZ_PSEUDO_TYPES, MZ_ROLES, MZ_ROLE_MEMBERS, MZ_SCHEMAS, MZ_SECRETS,
MZ_SESSIONS, MZ_SINKS, MZ_SOURCES, MZ_SSH_TUNNEL_CONNECTIONS, MZ_STORAGE_USAGE_BY_SHARD,
MZ_SUBSCRIPTIONS, MZ_SYSTEM_PRIVILEGES, MZ_TABLES, MZ_TYPES, MZ_TYPE_PG_METADATA, MZ_VIEWS,
MZ_WEBHOOKS_SOURCES,
};

/// An update to a built-in table.
Expand Down Expand Up @@ -1168,22 +1168,6 @@ impl CatalogState {
})
}

pub fn pack_replica_heartbeat_update(
&self,
id: ReplicaId,
last_heartbeat: DateTime<Utc>,
diff: Diff,
) -> BuiltinTableUpdate {
BuiltinTableUpdate {
id: self.resolve_builtin_table(&MZ_CLUSTER_REPLICA_HEARTBEATS),
row: Row::pack_slice(&[
Datum::String(&id.to_string()),
Datum::TimestampTz(last_heartbeat.try_into().expect("must fit")),
]),
diff,
}
}

pub fn pack_storage_usage_update(
&self,
VersionedStorageUsage::V1(event): &VersionedStorageUsage,
Expand Down Expand Up @@ -1388,22 +1372,6 @@ impl CatalogState {
row
}

pub fn pack_compute_dependency_update(
&self,
object_id: GlobalId,
dependency_id: GlobalId,
diff: Diff,
) -> BuiltinTableUpdate {
BuiltinTableUpdate {
id: self.resolve_builtin_table(&MZ_COMPUTE_DEPENDENCIES),
row: Row::pack_slice(&[
Datum::String(&object_id.to_string()),
Datum::String(&dependency_id.to_string()),
]),
diff,
}
}

pub fn pack_comment_update(
&self,
object_id: CommentObjectId,
Expand Down
3 changes: 0 additions & 3 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ use std::sync::{atomic, Arc, Mutex};
use std::thread;
use std::time::{Duration, Instant};

use chrono::{DateTime, Utc};
use derivative::Derivative;
use differential_dataflow::lattice::Lattice;
use fail::fail_point;
Expand Down Expand Up @@ -569,8 +568,6 @@ pub struct Config {
/// Soft-state metadata about a compute replica
#[derive(Clone, Default, Debug, Eq, PartialEq)]
pub struct ReplicaMetadata {
/// The last time we heard from this replica (possibly rounded)
pub last_heartbeat: Option<DateTime<Utc>>,
/// The last known CPU and memory metrics
pub metrics: Option<Vec<ServiceProcessMetrics>>,
}
Expand Down
14 changes: 2 additions & 12 deletions src/adapter/src/coord/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -546,20 +546,10 @@ impl Coordinator {
}

async fn drop_replica(&mut self, cluster_id: ClusterId, replica_id: ReplicaId) {
if let Some(Some(ReplicaMetadata {
last_heartbeat,
metrics,
})) = self.transient_replica_metadata.insert(replica_id, None)
if let Some(Some(ReplicaMetadata { metrics })) =
self.transient_replica_metadata.insert(replica_id, None)
{
let mut updates = vec![];
if let Some(last_heartbeat) = last_heartbeat {
let retraction = self.catalog().state().pack_replica_heartbeat_update(
replica_id,
last_heartbeat,
-1,
);
updates.push(retraction);
}
if let Some(metrics) = metrics {
let retraction = self
.catalog()
Expand Down
47 changes: 0 additions & 47 deletions src/adapter/src/coord/message_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
use std::collections::{BTreeMap, BTreeSet};
use std::time::{Duration, Instant};

use chrono::DurationRound;
use mz_controller::clusters::ClusterEvent;
use mz_controller::ControllerResponse;
use mz_ore::now::EpochMillis;
Expand Down Expand Up @@ -267,40 +266,6 @@ impl Coordinator {
}
}
}
ControllerResponse::ComputeReplicaHeartbeat(replica_id, when) => {
let replica_status_interval = chrono::Duration::seconds(60);
let new = when
.duration_trunc(replica_status_interval)
.expect("Time coarsening should not fail");
let hb = match self
.transient_replica_metadata
.entry(replica_id)
.or_insert_with(|| Some(Default::default()))
{
// `None` is the tombstone for a removed replica
None => return,
Some(md) => &mut md.last_heartbeat,
};
let old = std::mem::replace(hb, Some(new));

if old.as_ref() != Some(&new) {
let retraction = old.map(|old| {
self.catalog()
.state()
.pack_replica_heartbeat_update(replica_id, old, -1)
});
let insertion = self
.catalog()
.state()
.pack_replica_heartbeat_update(replica_id, new, 1);
let updates = if let Some(retraction) = retraction {
vec![retraction, insertion]
} else {
vec![insertion]
};
self.buffer_builtin_table_updates(updates);
}
}
ControllerResponse::ComputeReplicaMetrics(replica_id, new) => {
let m = match self
.transient_replica_metadata
Expand Down Expand Up @@ -333,18 +298,6 @@ impl Coordinator {
self.buffer_builtin_table_updates(updates);
}
}
ControllerResponse::ComputeDependencyUpdate {
id,
dependencies,
diff,
} => {
let state = self.catalog().state();
let updates = dependencies
.into_iter()
.map(|dep_id| state.pack_compute_dependency_update(id, dep_id, diff))
.collect();
self.buffer_builtin_table_updates(updates);
}
}
}

Expand Down
35 changes: 22 additions & 13 deletions src/adapter/src/coord/statement_logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use mz_ore::{cast::CastFrom, now::EpochMillis};
use mz_repr::adt::array::ArrayDimension;
use mz_repr::{Datum, Diff, Row, RowPacker};
use mz_sql::plan::Params;
use mz_storage_client::controller::IntrospectionType;
use qcell::QCell;
use rand::SeedableRng;
use rand::{distributions::Bernoulli, prelude::Distribution, thread_rng};
Expand Down Expand Up @@ -118,21 +119,29 @@ impl Coordinator {
}

pub(crate) async fn drain_statement_log(&mut self) {
let pending_session_events =
std::mem::take(&mut self.statement_logging.pending_session_events);
let pending_prepared_statement_events =
std::mem::take(&mut self.statement_logging.pending_prepared_statement_events);
let pending_statement_execution_events =
let session_updates = std::mem::take(&mut self.statement_logging.pending_session_events)
.into_iter()
.map(|update| (update, 1))
.collect();
let prepared_statement_updates =
std::mem::take(&mut self.statement_logging.pending_prepared_statement_events)
.into_iter()
.map(|update| (update, 1))
.collect();
let statement_execution_updates =
std::mem::take(&mut self.statement_logging.pending_statement_execution_events);

self.controller
.storage
.send_statement_log_updates(
pending_statement_execution_events,
pending_prepared_statement_events,
pending_session_events,
)
.await;
use IntrospectionType::*;
for (type_, updates) in [
(SessionHistory, session_updates),
(PreparedStatementHistory, prepared_statement_updates),
(StatementExecutionHistory, statement_execution_updates),
] {
self.controller
.storage
.record_introspection_updates(type_, updates)
.await;
}
}

/// Returns any statement logging events needed for a particular
Expand Down
14 changes: 8 additions & 6 deletions src/catalog/src/builtin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1556,9 +1556,10 @@ pub static MZ_OBJECT_DEPENDENCIES: Lazy<BuiltinTable> = Lazy::new(|| BuiltinTabl
.with_column("referenced_object_id", ScalarType::String.nullable(false)),
is_retained_metrics_object: false,
});
pub static MZ_COMPUTE_DEPENDENCIES: Lazy<BuiltinTable> = Lazy::new(|| BuiltinTable {
pub static MZ_COMPUTE_DEPENDENCIES: Lazy<BuiltinSource> = Lazy::new(|| BuiltinSource {
name: "mz_compute_dependencies",
schema: MZ_INTERNAL_SCHEMA,
data_source: Some(IntrospectionType::ComputeDependencies),
desc: RelationDesc::empty()
.with_column("object_id", ScalarType::String.nullable(false))
.with_column("dependency_id", ScalarType::String.nullable(false)),
Expand Down Expand Up @@ -1995,9 +1996,10 @@ pub static MZ_CLUSTER_REPLICA_SIZES: Lazy<BuiltinTable> = Lazy::new(|| BuiltinTa
is_retained_metrics_object: true,
});

pub static MZ_CLUSTER_REPLICA_HEARTBEATS: Lazy<BuiltinTable> = Lazy::new(|| BuiltinTable {
pub static MZ_CLUSTER_REPLICA_HEARTBEATS: Lazy<BuiltinSource> = Lazy::new(|| BuiltinSource {
name: "mz_cluster_replica_heartbeats",
schema: MZ_INTERNAL_SCHEMA,
data_source: Some(IntrospectionType::ComputeReplicaHeartbeats),
desc: RelationDesc::empty()
.with_column("replica_id", ScalarType::String.nullable(false))
.with_column(
Expand Down Expand Up @@ -5259,7 +5261,6 @@ pub static BUILTINS_STATIC: Lazy<Vec<Builtin<NameReference>>> = Lazy::new(|| {
Builtin::Table(&MZ_KAFKA_CONNECTIONS),
Builtin::Table(&MZ_KAFKA_SOURCES),
Builtin::Table(&MZ_OBJECT_DEPENDENCIES),
Builtin::Table(&MZ_COMPUTE_DEPENDENCIES),
Builtin::Table(&MZ_DATABASES),
Builtin::Table(&MZ_SCHEMAS),
Builtin::Table(&MZ_COLUMNS),
Expand Down Expand Up @@ -5292,7 +5293,6 @@ pub static BUILTINS_STATIC: Lazy<Vec<Builtin<NameReference>>> = Lazy::new(|| {
Builtin::Table(&MZ_CLUSTER_REPLICA_METRICS),
Builtin::Table(&MZ_CLUSTER_REPLICA_SIZES),
Builtin::Table(&MZ_CLUSTER_REPLICA_STATUSES),
Builtin::Table(&MZ_CLUSTER_REPLICA_HEARTBEATS),
Builtin::Table(&MZ_INTERNAL_CLUSTER_REPLICAS),
Builtin::Table(&MZ_AUDIT_EVENTS),
Builtin::Table(&MZ_STORAGE_USAGE_BY_SHARD),
Expand Down Expand Up @@ -5351,8 +5351,6 @@ pub static BUILTINS_STATIC: Lazy<Vec<Builtin<NameReference>>> = Lazy::new(|| {
Builtin::View(&MZ_SCHEDULING_PARKS_HISTOGRAM),
Builtin::View(&MZ_COMPUTE_DELAYS_HISTOGRAM_PER_WORKER),
Builtin::View(&MZ_COMPUTE_DELAYS_HISTOGRAM),
Builtin::View(&MZ_COMPUTE_ERROR_COUNTS_PER_WORKER),
Builtin::View(&MZ_COMPUTE_ERROR_COUNTS),
Builtin::View(&MZ_SHOW_SOURCES),
Builtin::View(&MZ_SHOW_SINKS),
Builtin::View(&MZ_SHOW_MATERIALIZED_VIEWS),
Expand Down Expand Up @@ -5437,6 +5435,10 @@ pub static BUILTINS_STATIC: Lazy<Vec<Builtin<NameReference>>> = Lazy::new(|| {
Builtin::View(&MZ_STORAGE_USAGE),
Builtin::Source(&MZ_FRONTIERS),
Builtin::View(&MZ_GLOBAL_FRONTIERS),
Builtin::Source(&MZ_COMPUTE_DEPENDENCIES),
Builtin::View(&MZ_COMPUTE_ERROR_COUNTS_PER_WORKER),
Builtin::View(&MZ_COMPUTE_ERROR_COUNTS),
Builtin::Source(&MZ_CLUSTER_REPLICA_HEARTBEATS),
Builtin::Index(&MZ_SHOW_DATABASES_IND),
Builtin::Index(&MZ_SHOW_SCHEMAS_IND),
Builtin::Index(&MZ_SHOW_CONNECTIONS_IND),
Expand Down
1 change: 1 addition & 0 deletions src/compute-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ async-stream = "0.3.3"
async-trait = "0.1.68"
bytesize = "1.1.0"
chrono = { version = "0.4.23", default-features = false, features = ["std"] }
crossbeam-channel = "0.5.8"
differential-dataflow = "0.12.0"
futures = "0.3.25"
http = "0.2.8"
Expand Down
Loading

0 comments on commit e47e437

Please sign in to comment.