Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adapter,controller: add replica metrics history #29254

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 47 additions & 2 deletions src/catalog/src/builtin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use mz_sql::session::user::{
MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID, SUPPORT_USER_NAME, SYSTEM_USER_NAME,
};
use mz_storage_client::controller::IntrospectionType;
use mz_storage_client::healthcheck::REPLICA_METRICS_HISTORY_DESC;
use mz_storage_client::healthcheck::{
MZ_AWS_PRIVATELINK_CONNECTION_STATUS_HISTORY_DESC, MZ_PREPARED_STATEMENT_HISTORY_DESC,
MZ_SESSION_HISTORY_DESC, MZ_SINK_STATUS_HISTORY_DESC, MZ_SOURCE_STATUS_HISTORY_DESC,
Expand Down Expand Up @@ -3218,10 +3219,10 @@ pub static MZ_AWS_CONNECTIONS: LazyLock<BuiltinTable> = LazyLock::new(|| Builtin
access: vec![PUBLIC_SELECT],
});

// TODO(teskje) Remove this table in favor of `MZ_CLUSTER_REPLICA_METRICS_HISTORY`, once the latter
// has been backfilled to 30 days worth of data.
pub static MZ_CLUSTER_REPLICA_METRICS: LazyLock<BuiltinTable> = LazyLock::new(|| BuiltinTable {
name: "mz_cluster_replica_metrics",
// TODO[btv] - make this public once we work out whether and how to fuse it with
// the corresponding Storage tables.
schema: MZ_INTERNAL_SCHEMA,
oid: oid::TABLE_MZ_CLUSTER_REPLICA_METRICS_OID,
desc: RelationDesc::builder()
Expand All @@ -3235,6 +3236,17 @@ pub static MZ_CLUSTER_REPLICA_METRICS: LazyLock<BuiltinTable> = LazyLock::new(||
access: vec![PUBLIC_SELECT],
});

pub static MZ_CLUSTER_REPLICA_METRICS_HISTORY: LazyLock<BuiltinSource> =
LazyLock::new(|| BuiltinSource {
name: "mz_cluster_replica_metrics_history",
schema: MZ_INTERNAL_SCHEMA,
oid: oid::SOURCE_MZ_CLUSTER_REPLICA_METRICS_HISTORY_OID,
data_source: IntrospectionType::ReplicaMetricsHistory,
desc: REPLICA_METRICS_HISTORY_DESC.clone(),
is_retained_metrics_object: false,
access: vec![PUBLIC_SELECT],
});

pub static MZ_CLUSTER_REPLICA_FRONTIERS: LazyLock<BuiltinSource> =
LazyLock::new(|| BuiltinSource {
name: "mz_cluster_replica_frontiers",
Expand Down Expand Up @@ -5361,6 +5373,27 @@ FROM
access: vec![PUBLIC_SELECT],
});

pub static MZ_CLUSTER_REPLICA_UTILIZATION_HISTORY: LazyLock<BuiltinView> =
LazyLock::new(|| BuiltinView {
name: "mz_cluster_replica_utilization_history",
schema: MZ_INTERNAL_SCHEMA,
oid: oid::VIEW_MZ_CLUSTER_REPLICA_UTILIZATION_HISTORY_OID,
column_defs: None,
sql: "
SELECT
r.id AS replica_id,
m.process_id,
m.cpu_nano_cores::float8 / s.cpu_nano_cores * 100 AS cpu_percent,
m.memory_bytes::float8 / s.memory_bytes * 100 AS memory_percent,
m.disk_bytes::float8 / s.disk_bytes * 100 AS disk_percent,
m.occurred_at
FROM
mz_catalog.mz_cluster_replicas AS r
JOIN mz_catalog.mz_cluster_replica_sizes AS s ON r.size = s.size
JOIN mz_internal.mz_cluster_replica_metrics_history AS m ON m.replica_id = r.id",
access: vec![PUBLIC_SELECT],
});

pub static MZ_DATAFLOW_OPERATOR_PARENTS_PER_WORKER: LazyLock<BuiltinView> =
LazyLock::new(|| BuiltinView {
name: "mz_dataflow_operator_parents_per_worker",
Expand Down Expand Up @@ -7630,6 +7663,15 @@ ON mz_internal.mz_cluster_replica_metrics (replica_id)",
is_retained_metrics_object: true,
};

pub const MZ_CLUSTER_REPLICA_METRICS_HISTORY_IND: BuiltinIndex = BuiltinIndex {
name: "mz_cluster_replica_metrics_history_ind",
schema: MZ_INTERNAL_SCHEMA,
oid: oid::INDEX_MZ_CLUSTER_REPLICA_METRICS_HISTORY_IND_OID,
sql: "IN CLUSTER mz_catalog_server
ON mz_internal.mz_cluster_replica_metrics_history (replica_id)",
is_retained_metrics_object: false,
};

pub const MZ_CLUSTER_REPLICA_HISTORY_IND: BuiltinIndex = BuiltinIndex {
name: "mz_cluster_replica_history_ind",
schema: MZ_INTERNAL_SCHEMA,
Expand Down Expand Up @@ -8051,6 +8093,7 @@ pub static BUILTINS_STATIC: LazyLock<Vec<Builtin<NameReference>>> = LazyLock::ne
Builtin::Table(&MZ_SSH_TUNNEL_CONNECTIONS),
Builtin::Table(&MZ_CLUSTER_REPLICAS),
Builtin::Table(&MZ_CLUSTER_REPLICA_METRICS),
Builtin::Source(&MZ_CLUSTER_REPLICA_METRICS_HISTORY),
Builtin::Table(&MZ_CLUSTER_REPLICA_SIZES),
Builtin::Table(&MZ_CLUSTER_REPLICA_STATUSES),
Builtin::Table(&MZ_INTERNAL_CLUSTER_REPLICAS),
Expand Down Expand Up @@ -8087,6 +8130,7 @@ pub static BUILTINS_STATIC: LazyLock<Vec<Builtin<NameReference>>> = LazyLock::ne
Builtin::View(&MZ_DATAFLOW_OPERATOR_REACHABILITY_PER_WORKER),
Builtin::View(&MZ_DATAFLOW_OPERATOR_REACHABILITY),
Builtin::View(&MZ_CLUSTER_REPLICA_UTILIZATION),
Builtin::View(&MZ_CLUSTER_REPLICA_UTILIZATION_HISTORY),
Builtin::View(&MZ_DATAFLOW_OPERATOR_PARENTS_PER_WORKER),
Builtin::View(&MZ_DATAFLOW_OPERATOR_PARENTS),
Builtin::View(&MZ_COMPUTE_EXPORTS),
Expand Down Expand Up @@ -8288,6 +8332,7 @@ pub static BUILTINS_STATIC: LazyLock<Vec<Builtin<NameReference>>> = LazyLock::ne
Builtin::Index(&MZ_CLUSTER_REPLICA_SIZES_IND),
Builtin::Index(&MZ_CLUSTER_REPLICA_STATUSES_IND),
Builtin::Index(&MZ_CLUSTER_REPLICA_METRICS_IND),
Builtin::Index(&MZ_CLUSTER_REPLICA_METRICS_HISTORY_IND),
Builtin::Index(&MZ_CLUSTER_REPLICA_HISTORY_IND),
Builtin::Index(&MZ_OBJECT_LIFETIMES_IND),
Builtin::Index(&MZ_OBJECT_DEPENDENCIES_IND),
Expand Down
64 changes: 56 additions & 8 deletions src/controller/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,14 @@ use mz_persist_client::cache::PersistClientCache;
use mz_persist_client::PersistLocation;
use mz_persist_types::Codec64;
use mz_proto::RustType;
use mz_repr::{GlobalId, TimestampManipulation};
use mz_repr::{Datum, GlobalId, Row, TimestampManipulation};
use mz_service::secrets::SecretsReaderCliArgs;
use mz_storage_client::client::{
ProtoStorageCommand, ProtoStorageResponse, StorageCommand, StorageResponse,
};
use mz_storage_client::controller::{StorageController, StorageMetadata, StorageTxn};
use mz_storage_client::controller::{
IntrospectionType, StorageController, StorageMetadata, StorageTxn,
};
use mz_storage_client::storage_collections::{self, StorageCollections};
use mz_storage_types::configuration::StorageConfiguration;
use mz_storage_types::connections::ConnectionContext;
Expand Down Expand Up @@ -175,6 +177,8 @@ pub struct Controller<T: Timestamp = mz_repr::Timestamp> {
metrics_rx: Peekable<UnboundedReceiverStream<(ReplicaId, Vec<ServiceProcessMetrics>)>>,
/// Periodic notification to record frontiers.
frontiers_ticker: Interval,
/// A function providing the current wallclock time.
now: NowFn,

/// The URL for Persist PubSub.
persist_pubsub_url: String,
Expand Down Expand Up @@ -246,6 +250,7 @@ impl<T: ComputeControllerTimestamp> Controller<T> {
metrics_tx: _,
metrics_rx: _,
frontiers_ticker: _,
now: _,
persist_pubsub_url: _,
secrets_args: _,
unfulfilled_watch_sets_by_object: _,
Expand Down Expand Up @@ -534,11 +539,7 @@ where
Readiness::NotReady => Ok(None),
Readiness::Storage => self.process_storage_response(storage_metadata).await,
Readiness::Compute => self.process_compute_response().await,
Readiness::Metrics => Ok(self
.metrics_rx
.next()
.await
.map(|(id, metrics)| ControllerResponse::ComputeReplicaMetrics(id, metrics))),
Readiness::Metrics => self.process_replica_metrics().await,
Readiness::Frontiers => {
self.record_frontiers().await;
Ok(None)
Expand Down Expand Up @@ -588,6 +589,52 @@ where
(!(finished.is_empty())).then(|| ControllerResponse::WatchSetFinished(finished))
}

async fn process_replica_metrics(
&mut self,
) -> Result<Option<ControllerResponse<T>>, anyhow::Error> {
let Some((id, metrics)) = self.metrics_rx.next().await else {
return Ok(None);
};

self.record_replica_metrics(id, &metrics).await;
Ok(Some(ControllerResponse::ComputeReplicaMetrics(id, metrics)))
}

async fn record_replica_metrics(
&mut self,
replica_id: ReplicaId,
metrics: &[ServiceProcessMetrics],
) {
if self.read_only() {
return;
}

let now = mz_ore::now::to_datetime((self.now)());
let now_tz = now.try_into().expect("must fit");

let replica_id = replica_id.to_string();
let mut row = Row::default();
let updates = metrics
.iter()
.zip(0..)
.map(|(m, process_id)| {
row.packer().extend(&[
Datum::String(&replica_id),
Datum::UInt64(process_id),
m.cpu_nano_cores.into(),
m.memory_bytes.into(),
m.disk_usage_bytes.into(),
Datum::TimestampTz(now_tz),
]);
(row.clone(), 1)
})
.collect();

self.storage
.append_introspection_updates(IntrospectionType::ReplicaMetricsHistory, updates)
.await;
}

async fn record_frontiers(&mut self) {
let compute_frontiers = self.compute.collection_frontiers();
self.storage.record_frontiers(compute_frontiers).await;
Expand Down Expand Up @@ -679,7 +726,7 @@ where
config.build_info,
config.persist_location,
config.persist_clients,
config.now,
config.now.clone(),
Arc::clone(&txns_metrics),
envd_epoch,
read_only,
Expand Down Expand Up @@ -721,6 +768,7 @@ where
metrics_tx,
metrics_rx: UnboundedReceiverStream::new(metrics_rx).peekable(),
frontiers_ticker,
now: config.now,
persist_pubsub_url: config.persist_pubsub_url,
secrets_args: config.secrets_args,
unfulfilled_watch_sets_by_object: BTreeMap::new(),
Expand Down
Loading