Skip to content

Commit

Permalink
test: add tests for mz_wallclock_lag_history
Browse files Browse the repository at this point in the history
Also adds a dyncfg to configure the refresh interval, to avoid having
the test take several minutes.
  • Loading branch information
teskje committed Sep 12, 2024
1 parent 8dfaf3d commit 807d26e
Show file tree
Hide file tree
Showing 9 changed files with 126 additions and 5 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions src/compute-client/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ rust_library(
"//src/build-info:mz_build_info",
"//src/cluster-client:mz_cluster_client",
"//src/compute-types:mz_compute_types",
"//src/controller-types:mz_controller_types",
"//src/dyncfg:mz_dyncfg",
"//src/dyncfgs:mz_dyncfgs",
"//src/expr:mz_expr",
Expand Down Expand Up @@ -77,6 +78,7 @@ rust_test(
"//src/build-info:mz_build_info",
"//src/cluster-client:mz_cluster_client",
"//src/compute-types:mz_compute_types",
"//src/controller-types:mz_controller_types",
"//src/dyncfg:mz_dyncfg",
"//src/dyncfgs:mz_dyncfgs",
"//src/expr:mz_expr",
Expand Down Expand Up @@ -105,6 +107,7 @@ rust_doc_test(
"//src/build-info:mz_build_info",
"//src/cluster-client:mz_cluster_client",
"//src/compute-types:mz_compute_types",
"//src/controller-types:mz_controller_types",
"//src/dyncfg:mz_dyncfg",
"//src/dyncfgs:mz_dyncfgs",
"//src/expr:mz_expr",
Expand Down
1 change: 1 addition & 0 deletions src/compute-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ http = "1.1.0"
mz-build-info = { path = "../build-info" }
mz-cluster-client = { path = "../cluster-client" }
mz-compute-types = { path = "../compute-types" }
mz-controller-types = { path = "../controller-types" }
mz-dyncfg = { path = "../dyncfg" }
mz-dyncfgs = { path = "../dyncfgs" }
mz-expr = { path = "../expr" }
Expand Down
6 changes: 4 additions & 2 deletions src/compute-client/src/controller/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use mz_compute_types::plan::flat_plan::FlatPlan;
use mz_compute_types::plan::LirId;
use mz_compute_types::sinks::{ComputeSinkConnection, ComputeSinkDesc, PersistSinkConnection};
use mz_compute_types::sources::SourceInstanceDesc;
use mz_controller_types::dyncfgs::WALLCLOCK_LAG_REFRESH_INTERVAL;
use mz_dyncfg::ConfigSet;
use mz_expr::RowSetFinishing;
use mz_ore::cast::CastFrom;
Expand Down Expand Up @@ -492,8 +493,9 @@ impl<T: ComputeControllerTimestamp> Instance<T> {
/// This method is invoked by `ComputeController::maintain`, which we expect to be called once
/// per second during normal operation.
fn refresh_wallclock_lag(&mut self) {
let refresh_introspection =
!self.read_only && self.wallclock_lag_last_refresh.elapsed() >= Duration::from_secs(60);
let refresh_introspection = !self.read_only
&& self.wallclock_lag_last_refresh.elapsed()
>= WALLCLOCK_LAG_REFRESH_INTERVAL.get(&self.dyncfg);
let mut introspection_updates = refresh_introspection.then(Vec::new);

let now = mz_ore::now::to_datetime((self.now)());
Expand Down
11 changes: 10 additions & 1 deletion src/controller-types/src/dyncfgs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,16 @@ pub const CONTROLLER_PAST_GENERATION_REPLICA_CLEANUP_RETRY_INTERVAL: Config<Dura
"The interval at which to attempt to retry cleaning up replicas from past generations.",
);

/// The interval at which to refresh wallclock lag introspection.
pub const WALLCLOCK_LAG_REFRESH_INTERVAL: Config<Duration> = Config::new(
"wallclock_lag_refresh_interval",
Duration::from_secs(60),
"The interval at which to refresh wallclock lag introspection.",
);

/// Adds the full set of all controller `Config`s.
pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
configs.add(&CONTROLLER_PAST_GENERATION_REPLICA_CLEANUP_RETRY_INTERVAL)
configs
.add(&CONTROLLER_PAST_GENERATION_REPLICA_CLEANUP_RETRY_INTERVAL)
.add(&WALLCLOCK_LAG_REFRESH_INTERVAL)
}
3 changes: 3 additions & 0 deletions src/storage-controller/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ rust_library(
deps = [
"//src/build-info:mz_build_info",
"//src/cluster-client:mz_cluster_client",
"//src/controller-types:mz_controller_types",
"//src/dyncfgs:mz_dyncfgs",
"//src/ore:mz_ore",
"//src/persist-client:mz_persist_client",
Expand Down Expand Up @@ -70,6 +71,7 @@ rust_test(
deps = [
"//src/build-info:mz_build_info",
"//src/cluster-client:mz_cluster_client",
"//src/controller-types:mz_controller_types",
"//src/dyncfgs:mz_dyncfgs",
"//src/ore:mz_ore",
"//src/persist-client:mz_persist_client",
Expand All @@ -94,6 +96,7 @@ rust_doc_test(
deps = [
"//src/build-info:mz_build_info",
"//src/cluster-client:mz_cluster_client",
"//src/controller-types:mz_controller_types",
"//src/dyncfgs:mz_dyncfgs",
"//src/ore:mz_ore",
"//src/persist-client:mz_persist_client",
Expand Down
1 change: 1 addition & 0 deletions src/storage-controller/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ futures = "0.3.25"
itertools = { version = "0.10.5" }
mz-build-info = { path = "../build-info" }
mz-cluster-client = { path = "../cluster-client" }
mz-controller-types = { path = "../controller-types" }
mz-dyncfgs = { path = "../dyncfgs" }
mz-ore = { path = "../ore", features = ["async", "chrono", "tracing_"] }
mz-persist-client = { path = "../persist-client" }
Expand Down
6 changes: 4 additions & 2 deletions src/storage-controller/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use itertools::Itertools;
use mz_build_info::BuildInfo;
use mz_cluster_client::client::ClusterReplicaLocation;
use mz_cluster_client::{ReplicaId, WallclockLagFn};
use mz_controller_types::dyncfgs::WALLCLOCK_LAG_REFRESH_INTERVAL;
use mz_ore::collections::CollectionExt;
use mz_ore::metrics::MetricsRegistry;
use mz_ore::now::{EpochMillis, NowFn};
Expand Down Expand Up @@ -3818,8 +3819,9 @@ where
/// This method is invoked by `ComputeController::maintain`, which we expect to be called once
/// per second during normal operation.
fn update_wallclock_lag_introspection(&mut self) {
let refresh_introspection =
!self.read_only && self.wallclock_lag_last_refresh.elapsed() >= Duration::from_secs(60);
let refresh_introspection = !self.read_only
&& self.wallclock_lag_last_refresh.elapsed()
>= WALLCLOCK_LAG_REFRESH_INTERVAL.get(self.config.config_set());
let mut introspection_updates = refresh_introspection.then(Vec::new);

let now = mz_ore::now::to_datetime((self.now)());
Expand Down
98 changes: 98 additions & 0 deletions test/testdrive/wallclock-lag.td
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

# Test the contents of `mz_wallclock_lag_history`.
#
# These tests rely on testdrive's retry feature, as `mz_wallclock_lag_history`
# is only refreshed periodically, so data is likely not immediately available.

$ postgres-connect name=mz_system url=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}

$ postgres-execute connection=mz_system
ALTER SYSTEM SET wallclock_lag_refresh_interval = '1s'

> CREATE CLUSTER storage SIZE '1'
> CREATE CLUSTER compute SIZE '1', REPLICATION FACTOR 2

# Set up a bunch of frontiered objects and test that their wallclock lags get
# reported and are reasonably small.

> CREATE SOURCE src IN CLUSTER storage FROM LOAD GENERATOR counter

> CREATE TABLE tbl (a int)

> CREATE VIEW src_plus_tbl AS SELECT counter + a AS a FROM src, tbl
> CREATE INDEX idx IN CLUSTER compute ON src_plus_tbl (a)
> CREATE MATERIALIZED VIEW mv IN CLUSTER compute AS SELECT * FROM src_plus_tbl

> CREATE MATERIALIZED VIEW mv_const IN CLUSTER compute AS SELECT 1
> CREATE DEFAULT INDEX idx_const IN CLUSTER compute ON mv_const

> CREATE CONNECTION kafka_conn
TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT)
> CREATE CONNECTION csr_conn
TO CONFLUENT SCHEMA REGISTRY (URL '${testdrive.schema-registry-url}')
> CREATE SINK snk
IN CLUSTER storage
FROM mv
INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink1-${testdrive.seed}')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
ENVELOPE DEBEZIUM

> SELECT DISTINCT ON(o.name, r.name)
o.name, r.name, l.lag > 0, l.lag < '5s'
FROM mz_internal.mz_wallclock_lag_history l
JOIN mz_objects o ON o.id = l.object_id
LEFT JOIN mz_cluster_replicas r ON r.id = l.replica_id
WHERE l.object_id LIKE 'u%'
ORDER BY o.name, r.name, l.occurred_at DESC
idx r1 true true
idx r2 true true
idx_const r1 false true
idx_const r2 false true
mv r1 true true
mv r2 true true
mv <null> true true
mv_const r1 false true
mv_const r2 false true
mv_const <null> false true
snk <null> true true
src <null> true true
src_progress <null> true true
tbl <null> true true

> SELECT DISTINCT ON(o.name)
o.name, l.lag > 0, l.lag < '5s'
FROM mz_internal.mz_wallclock_global_lag_history l
JOIN mz_objects o ON o.id = l.object_id
WHERE l.object_id LIKE 'u%'
ORDER BY o.name, l.occurred_at DESC
idx true true
idx_const false true
mv true true
mv_const false true
snk true true
src true true
src_progress true true
tbl true true

> SELECT DISTINCT ON(o.name)
o.name, l.lag > 0, l.lag < '5s'
FROM mz_internal.mz_wallclock_global_lag_recent_history l
JOIN mz_objects o ON o.id = l.object_id
WHERE l.object_id LIKE 'u%'
ORDER BY o.name, l.occurred_at DESC
idx true true
idx_const false true
mv true true
mv_const false true
snk true true
src true true
src_progress true true
tbl true true

0 comments on commit 807d26e

Please sign in to comment.