Skip to content

Commit

Permalink
Merge pull request #22324 from guswynn/namespaced-errors
Browse files Browse the repository at this point in the history
  • Loading branch information
guswynn authored Oct 17, 2023
2 parents 31bde54 + f7de406 commit 58c5323
Show file tree
Hide file tree
Showing 28 changed files with 1,397 additions and 490 deletions.
1 change: 1 addition & 0 deletions src/adapter/src/flags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ pub fn storage_config(config: &SystemVars) -> StorageParameters {
.storage_shrink_upsert_unused_buffers_by_ratio(),
truncate_statement_log: config.truncate_statement_log(),
statement_logging_retention_time_seconds: config.statement_logging_retention().as_secs(),
record_namespaced_errors: config.storage_record_source_sink_namespaced_errors(),
}
}

Expand Down
15 changes: 15 additions & 0 deletions src/sql/src/session/vars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1086,6 +1086,13 @@ const STORAGE_PERSIST_SINK_MINIMUM_BATCH_UPDATES: ServerVar<usize> = ServerVar {
internal: true
};

const STORAGE_RECORD_SOURCE_SINK_NAMESPACED_ERRORS: ServerVar<bool> = ServerVar {
name: UncasedStr::new("storage_record_source_sink_namespaced_errors"),
value: &true,
description: "Whether or not to record namespaced errors in the status history tables",
internal: true,
};

/// Controls [`mz_persist_client::cfg::DynamicConfig::stats_audit_percent`].
const PERSIST_STATS_AUDIT_PERCENT: ServerVar<usize> = ServerVar {
name: UncasedStr::new("persist_stats_audit_percent"),
Expand Down Expand Up @@ -2436,6 +2443,7 @@ impl SystemVars {
.with_var(&STORAGE_SHRINK_UPSERT_UNUSED_BUFFERS_BY_RATIO)
.with_var(&PERSIST_SINK_MINIMUM_BATCH_UPDATES)
.with_var(&STORAGE_PERSIST_SINK_MINIMUM_BATCH_UPDATES)
.with_var(&STORAGE_RECORD_SOURCE_SINK_NAMESPACED_ERRORS)
.with_var(&PERSIST_NEXT_LISTEN_BATCH_RETRYER_INITIAL_BACKOFF)
.with_var(&PERSIST_NEXT_LISTEN_BATCH_RETRYER_MULTIPLIER)
.with_var(&PERSIST_NEXT_LISTEN_BATCH_RETRYER_CLAMP)
Expand Down Expand Up @@ -3029,6 +3037,11 @@ impl SystemVars {
*self.expect_value(&STORAGE_PERSIST_SINK_MINIMUM_BATCH_UPDATES)
}

/// Returns the `storage_record_source_sink_namespaced_errors` configuration parameter.
pub fn storage_record_source_sink_namespaced_errors(&self) -> bool {
*self.expect_value(&STORAGE_RECORD_SOURCE_SINK_NAMESPACED_ERRORS)
}

/// Returns the `persist_stats_audit_percent` configuration parameter.
pub fn persist_stats_audit_percent(&self) -> usize {
*self.expect_value(&PERSIST_STATS_AUDIT_PERCENT)
Expand Down Expand Up @@ -4804,6 +4817,7 @@ pub fn is_storage_config_var(name: &str) -> bool {
|| name == STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_DISK_ONLY.name()
|| name == STORAGE_DATAFLOW_DELAY_SOURCES_PAST_REHYDRATION.name()
|| name == STORAGE_SHRINK_UPSERT_UNUSED_BUFFERS_BY_RATIO.name()
|| name == STORAGE_RECORD_SOURCE_SINK_NAMESPACED_ERRORS.name()
|| is_upsert_rocksdb_config_var(name)
|| is_persist_config_var(name)
|| is_tracing_var(name)
Expand Down Expand Up @@ -4841,6 +4855,7 @@ fn is_persist_config_var(name: &str) -> bool {
|| name == CRDB_TCP_USER_TIMEOUT.name()
|| name == PERSIST_SINK_MINIMUM_BATCH_UPDATES.name()
|| name == STORAGE_PERSIST_SINK_MINIMUM_BATCH_UPDATES.name()
|| name == STORAGE_PERSIST_SINK_MINIMUM_BATCH_UPDATES.name()
|| name == PERSIST_NEXT_LISTEN_BATCH_RETRYER_INITIAL_BACKOFF.name()
|| name == PERSIST_NEXT_LISTEN_BATCH_RETRYER_MULTIPLIER.name()
|| name == PERSIST_NEXT_LISTEN_BATCH_RETRYER_CLAMP.name()
Expand Down
201 changes: 176 additions & 25 deletions src/storage-client/src/healthcheck.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::collections::{BTreeMap, BTreeSet};

use chrono::{DateTime, NaiveDateTime, Utc};
use mz_repr::{Datum, GlobalId, RelationDesc, Row, ScalarType};
use once_cell::sync::Lazy;
Expand All @@ -15,8 +17,9 @@ pub fn pack_status_row(
collection_id: GlobalId,
status_name: &str,
error: Option<&str>,
hints: &BTreeSet<String>,
namespaced_errors: &BTreeMap<String, &String>,
ts: u64,
hint: Option<&str>,
) -> Row {
let timestamp = NaiveDateTime::from_timestamp_opt(
(ts / 1000)
Expand All @@ -41,13 +44,27 @@ pub fn pack_status_row(
let mut packer = row.packer();
packer.extend([timestamp, collection_id, status, error]);

match hint {
Some(hint) => {
let metadata = vec![("hint", Datum::String(hint))];
packer.push_dict(metadata);
}
None => packer.push(Datum::Null),
};
if !hints.is_empty() || !namespaced_errors.is_empty() {
packer.push_dict_with(|dict_packer| {
// `hint` and `namespaced` are ordered,
// as well as the BTree's they each contain.
if !hints.is_empty() {
dict_packer.push(Datum::String("hints"));
dict_packer.push_list(hints.iter().map(|s| Datum::String(s)));
}
if !namespaced_errors.is_empty() {
dict_packer.push(Datum::String("namespaced"));
dict_packer.push_dict(
namespaced_errors
.iter()
.map(|(k, v)| (k.as_str(), Datum::String(v))),
);
}
});
} else {
packer.push(Datum::Null);
}

row
}

Expand Down Expand Up @@ -138,7 +155,14 @@ mod tests {
let hint = "hint message";
let id = GlobalId::User(1);
let status = "dropped";
let row = pack_status_row(id, status, Some(error_message), 1000, Some(hint));
let row = pack_status_row(
id,
status,
Some(error_message),
&BTreeSet::from([hint.to_string()]),
&Default::default(),
1000,
);

for (datum, column_type) in row.iter().zip(MZ_SINK_STATUS_HISTORY_DESC.iter_types()) {
assert!(datum.is_instance_of(column_type));
Expand All @@ -151,14 +175,22 @@ mod tests {
assert_eq!(row.iter().nth(1).unwrap(), Datum::String(&id.to_string()));
assert_eq!(row.iter().nth(2).unwrap(), Datum::String(status));
assert_eq!(row.iter().nth(3).unwrap(), Datum::String(error_message));

let details = row
.iter()
.nth(4)
.unwrap()
.unwrap_map()
.iter()
.collect::<Vec<_>>();

assert_eq!(details.len(), 1);
let hint_datum = &details[0];

assert_eq!(hint_datum.0, "hints");
assert_eq!(
row.iter()
.nth(4)
.unwrap()
.unwrap_map()
.iter()
.collect::<Vec<_>>(),
vec![("hint", Datum::String(hint))]
hint_datum.1.unwrap_list().iter().next().unwrap(),
Datum::String(hint)
);
}

Expand All @@ -167,7 +199,14 @@ mod tests {
let error_message = "error message";
let id = GlobalId::User(1);
let status = "dropped";
let row = pack_status_row(id, status, Some(error_message), 1000, None);
let row = pack_status_row(
id,
status,
Some(error_message),
&Default::default(),
&Default::default(),
1000,
);

for (datum, column_type) in row.iter().zip(MZ_SINK_STATUS_HISTORY_DESC.iter_types()) {
assert!(datum.is_instance_of(column_type));
Expand All @@ -188,7 +227,14 @@ mod tests {
let id = GlobalId::User(1);
let status = "dropped";
let hint = "hint message";
let row = pack_status_row(id, status, None, 1000, Some(hint));
let row = pack_status_row(
id,
status,
None,
&BTreeSet::from([hint.to_string()]),
&Default::default(),
1000,
);

for (datum, column_type) in row.iter().zip(MZ_SINK_STATUS_HISTORY_DESC.iter_types()) {
assert!(datum.is_instance_of(column_type));
Expand All @@ -201,14 +247,119 @@ mod tests {
assert_eq!(row.iter().nth(1).unwrap(), Datum::String(&id.to_string()));
assert_eq!(row.iter().nth(2).unwrap(), Datum::String(status));
assert_eq!(row.iter().nth(3).unwrap(), Datum::Null);

let details = row
.iter()
.nth(4)
.unwrap()
.unwrap_map()
.iter()
.collect::<Vec<_>>();

assert_eq!(details.len(), 1);
let hint_datum = &details[0];

assert_eq!(hint_datum.0, "hints");
assert_eq!(
hint_datum.1.unwrap_list().iter().next().unwrap(),
Datum::String(hint)
);
}

#[mz_ore::test]
fn test_row_with_namespaced() {
let error_message = "error message";
let id = GlobalId::User(1);
let status = "dropped";
let row = pack_status_row(
id,
status,
Some(error_message),
&Default::default(),
&BTreeMap::from([("thing".to_string(), &"error".to_string())]),
1000,
);

for (datum, column_type) in row.iter().zip(MZ_SINK_STATUS_HISTORY_DESC.iter_types()) {
assert!(datum.is_instance_of(column_type));
}

for (datum, column_type) in row.iter().zip(MZ_SOURCE_STATUS_HISTORY_DESC.iter_types()) {
assert!(datum.is_instance_of(column_type));
}

assert_eq!(row.iter().nth(1).unwrap(), Datum::String(&id.to_string()));
assert_eq!(row.iter().nth(2).unwrap(), Datum::String(status));
assert_eq!(row.iter().nth(3).unwrap(), Datum::String(error_message));

let details = row
.iter()
.nth(4)
.unwrap()
.unwrap_map()
.iter()
.collect::<Vec<_>>();

assert_eq!(details.len(), 1);
let ns_datum = &details[0];

assert_eq!(ns_datum.0, "namespaced");
assert_eq!(
ns_datum.1.unwrap_map().iter().next().unwrap(),
("thing", Datum::String("error"))
);
}

#[mz_ore::test]
fn test_row_with_everything() {
let error_message = "error message";
let hint = "hint message";
let id = GlobalId::User(1);
let status = "dropped";
let row = pack_status_row(
id,
status,
Some(error_message),
&BTreeSet::from([hint.to_string()]),
&BTreeMap::from([("thing".to_string(), &"error".to_string())]),
1000,
);

for (datum, column_type) in row.iter().zip(MZ_SINK_STATUS_HISTORY_DESC.iter_types()) {
assert!(datum.is_instance_of(column_type));
}

for (datum, column_type) in row.iter().zip(MZ_SOURCE_STATUS_HISTORY_DESC.iter_types()) {
assert!(datum.is_instance_of(column_type));
}

assert_eq!(row.iter().nth(1).unwrap(), Datum::String(&id.to_string()));
assert_eq!(row.iter().nth(2).unwrap(), Datum::String(status));
assert_eq!(row.iter().nth(3).unwrap(), Datum::String(error_message));

let details = row
.iter()
.nth(4)
.unwrap()
.unwrap_map()
.iter()
.collect::<Vec<_>>();

assert_eq!(details.len(), 2);
// These are always sorted
let hint_datum = &details[0];
let ns_datum = &details[1];

assert_eq!(hint_datum.0, "hints");
assert_eq!(
hint_datum.1.unwrap_list().iter().next().unwrap(),
Datum::String(hint)
);

assert_eq!(ns_datum.0, "namespaced");
assert_eq!(
row.iter()
.nth(4)
.unwrap()
.unwrap_map()
.iter()
.collect::<Vec<_>>(),
vec![("hint", Datum::String(hint))]
ns_datum.1.unwrap_map().iter().next().unwrap(),
("thing", Datum::String("error"))
);
}
}
19 changes: 16 additions & 3 deletions src/storage-controller/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1631,7 +1631,14 @@ where
self.introspection_ids[&IntrospectionType::SourceStatusHistory];
let mut updates = vec![];
for id in pending_source_drops.drain(..) {
let status_row = healthcheck::pack_status_row(id, "dropped", None, (self.now)(), None);
let status_row = healthcheck::pack_status_row(
id,
"dropped",
None,
&Default::default(),
&Default::default(),
(self.now)(),
);
updates.push((status_row, 1));
}

Expand All @@ -1651,8 +1658,14 @@ where
{
let mut sink_statistics = self.sink_statistics.lock().expect("poisoned");
for id in pending_sink_drops.drain(..) {
let status_row =
healthcheck::pack_status_row(id, "dropped", None, (self.now)(), None);
let status_row = healthcheck::pack_status_row(
id,
"dropped",
None,
&Default::default(),
&Default::default(),
(self.now)(),
);
updates.push((status_row, 1));

sink_statistics.remove(&id);
Expand Down
1 change: 1 addition & 0 deletions src/storage-types/src/parameters.proto
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ message ProtoStorageParameters {
uint64 statement_logging_retention_time_seconds = 15;
ProtoPgSourceTcpTimeouts pg_source_tcp_timeouts = 16;
mz_proto.ProtoDuration pg_source_snapshot_statement_timeout = 17;
bool record_namespaced_errors = 18;
}


Expand Down
Loading

0 comments on commit 58c5323

Please sign in to comment.