diff --git a/src/adapter/src/flags.rs b/src/adapter/src/flags.rs index 73577c3dbc5ac..ae5f0dc3e225b 100644 --- a/src/adapter/src/flags.rs +++ b/src/adapter/src/flags.rs @@ -122,6 +122,7 @@ pub fn storage_config(config: &SystemVars) -> StorageParameters { .storage_shrink_upsert_unused_buffers_by_ratio(), truncate_statement_log: config.truncate_statement_log(), statement_logging_retention_time_seconds: config.statement_logging_retention().as_secs(), + record_namespaced_errors: config.storage_record_source_sink_namespaced_errors(), } } diff --git a/src/sql/src/session/vars.rs b/src/sql/src/session/vars.rs index d0a5626129f24..3291174243970 100644 --- a/src/sql/src/session/vars.rs +++ b/src/sql/src/session/vars.rs @@ -1086,6 +1086,13 @@ const STORAGE_PERSIST_SINK_MINIMUM_BATCH_UPDATES: ServerVar = ServerVar { internal: true }; +const STORAGE_RECORD_SOURCE_SINK_NAMESPACED_ERRORS: ServerVar = ServerVar { + name: UncasedStr::new("storage_record_source_sink_namespaced_errors"), + value: &true, + description: "Whether or not to record namespaced errors in the status history tables", + internal: true, +}; + /// Controls [`mz_persist_client::cfg::DynamicConfig::stats_audit_percent`]. const PERSIST_STATS_AUDIT_PERCENT: ServerVar = ServerVar { name: UncasedStr::new("persist_stats_audit_percent"), @@ -2436,6 +2443,7 @@ impl SystemVars { .with_var(&STORAGE_SHRINK_UPSERT_UNUSED_BUFFERS_BY_RATIO) .with_var(&PERSIST_SINK_MINIMUM_BATCH_UPDATES) .with_var(&STORAGE_PERSIST_SINK_MINIMUM_BATCH_UPDATES) + .with_var(&STORAGE_RECORD_SOURCE_SINK_NAMESPACED_ERRORS) .with_var(&PERSIST_NEXT_LISTEN_BATCH_RETRYER_INITIAL_BACKOFF) .with_var(&PERSIST_NEXT_LISTEN_BATCH_RETRYER_MULTIPLIER) .with_var(&PERSIST_NEXT_LISTEN_BATCH_RETRYER_CLAMP) @@ -3029,6 +3037,11 @@ impl SystemVars { *self.expect_value(&STORAGE_PERSIST_SINK_MINIMUM_BATCH_UPDATES) } + /// Returns the `storage_record_source_sink_namespaced_errors` configuration parameter. + pub fn storage_record_source_sink_namespaced_errors(&self) -> bool { + *self.expect_value(&STORAGE_RECORD_SOURCE_SINK_NAMESPACED_ERRORS) + } + /// Returns the `persist_stats_audit_percent` configuration parameter. pub fn persist_stats_audit_percent(&self) -> usize { *self.expect_value(&PERSIST_STATS_AUDIT_PERCENT) @@ -4804,6 +4817,7 @@ pub fn is_storage_config_var(name: &str) -> bool { || name == STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_DISK_ONLY.name() || name == STORAGE_DATAFLOW_DELAY_SOURCES_PAST_REHYDRATION.name() || name == STORAGE_SHRINK_UPSERT_UNUSED_BUFFERS_BY_RATIO.name() + || name == STORAGE_RECORD_SOURCE_SINK_NAMESPACED_ERRORS.name() || is_upsert_rocksdb_config_var(name) || is_persist_config_var(name) || is_tracing_var(name) @@ -4841,6 +4855,7 @@ fn is_persist_config_var(name: &str) -> bool { || name == CRDB_TCP_USER_TIMEOUT.name() || name == PERSIST_SINK_MINIMUM_BATCH_UPDATES.name() || name == STORAGE_PERSIST_SINK_MINIMUM_BATCH_UPDATES.name() + || name == STORAGE_PERSIST_SINK_MINIMUM_BATCH_UPDATES.name() || name == PERSIST_NEXT_LISTEN_BATCH_RETRYER_INITIAL_BACKOFF.name() || name == PERSIST_NEXT_LISTEN_BATCH_RETRYER_MULTIPLIER.name() || name == PERSIST_NEXT_LISTEN_BATCH_RETRYER_CLAMP.name() diff --git a/src/storage-client/src/healthcheck.rs b/src/storage-client/src/healthcheck.rs index 9d676d4894f64..de03000281e13 100644 --- a/src/storage-client/src/healthcheck.rs +++ b/src/storage-client/src/healthcheck.rs @@ -7,6 +7,8 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::collections::{BTreeMap, BTreeSet}; + use chrono::{DateTime, NaiveDateTime, Utc}; use mz_repr::{Datum, GlobalId, RelationDesc, Row, ScalarType}; use once_cell::sync::Lazy; @@ -15,8 +17,9 @@ pub fn pack_status_row( collection_id: GlobalId, status_name: &str, error: Option<&str>, + hints: &BTreeSet, + namespaced_errors: &BTreeMap, ts: u64, - hint: Option<&str>, ) -> Row { let timestamp = NaiveDateTime::from_timestamp_opt( (ts / 1000) @@ -41,13 +44,27 @@ pub fn pack_status_row( let mut packer = row.packer(); packer.extend([timestamp, collection_id, status, error]); - match hint { - Some(hint) => { - let metadata = vec![("hint", Datum::String(hint))]; - packer.push_dict(metadata); - } - None => packer.push(Datum::Null), - }; + if !hints.is_empty() || !namespaced_errors.is_empty() { + packer.push_dict_with(|dict_packer| { + // `hint` and `namespaced` are ordered, + // as well as the BTree's they each contain. + if !hints.is_empty() { + dict_packer.push(Datum::String("hints")); + dict_packer.push_list(hints.iter().map(|s| Datum::String(s))); + } + if !namespaced_errors.is_empty() { + dict_packer.push(Datum::String("namespaced")); + dict_packer.push_dict( + namespaced_errors + .iter() + .map(|(k, v)| (k.as_str(), Datum::String(v))), + ); + } + }); + } else { + packer.push(Datum::Null); + } + row } @@ -138,7 +155,14 @@ mod tests { let hint = "hint message"; let id = GlobalId::User(1); let status = "dropped"; - let row = pack_status_row(id, status, Some(error_message), 1000, Some(hint)); + let row = pack_status_row( + id, + status, + Some(error_message), + &BTreeSet::from([hint.to_string()]), + &Default::default(), + 1000, + ); for (datum, column_type) in row.iter().zip(MZ_SINK_STATUS_HISTORY_DESC.iter_types()) { assert!(datum.is_instance_of(column_type)); @@ -151,14 +175,22 @@ mod tests { assert_eq!(row.iter().nth(1).unwrap(), Datum::String(&id.to_string())); assert_eq!(row.iter().nth(2).unwrap(), Datum::String(status)); assert_eq!(row.iter().nth(3).unwrap(), Datum::String(error_message)); + + let details = row + .iter() + .nth(4) + .unwrap() + .unwrap_map() + .iter() + .collect::>(); + + assert_eq!(details.len(), 1); + let hint_datum = &details[0]; + + assert_eq!(hint_datum.0, "hints"); assert_eq!( - row.iter() - .nth(4) - .unwrap() - .unwrap_map() - .iter() - .collect::>(), - vec![("hint", Datum::String(hint))] + hint_datum.1.unwrap_list().iter().next().unwrap(), + Datum::String(hint) ); } @@ -167,7 +199,14 @@ mod tests { let error_message = "error message"; let id = GlobalId::User(1); let status = "dropped"; - let row = pack_status_row(id, status, Some(error_message), 1000, None); + let row = pack_status_row( + id, + status, + Some(error_message), + &Default::default(), + &Default::default(), + 1000, + ); for (datum, column_type) in row.iter().zip(MZ_SINK_STATUS_HISTORY_DESC.iter_types()) { assert!(datum.is_instance_of(column_type)); @@ -188,7 +227,14 @@ mod tests { let id = GlobalId::User(1); let status = "dropped"; let hint = "hint message"; - let row = pack_status_row(id, status, None, 1000, Some(hint)); + let row = pack_status_row( + id, + status, + None, + &BTreeSet::from([hint.to_string()]), + &Default::default(), + 1000, + ); for (datum, column_type) in row.iter().zip(MZ_SINK_STATUS_HISTORY_DESC.iter_types()) { assert!(datum.is_instance_of(column_type)); @@ -201,14 +247,119 @@ mod tests { assert_eq!(row.iter().nth(1).unwrap(), Datum::String(&id.to_string())); assert_eq!(row.iter().nth(2).unwrap(), Datum::String(status)); assert_eq!(row.iter().nth(3).unwrap(), Datum::Null); + + let details = row + .iter() + .nth(4) + .unwrap() + .unwrap_map() + .iter() + .collect::>(); + + assert_eq!(details.len(), 1); + let hint_datum = &details[0]; + + assert_eq!(hint_datum.0, "hints"); + assert_eq!( + hint_datum.1.unwrap_list().iter().next().unwrap(), + Datum::String(hint) + ); + } + + #[mz_ore::test] + fn test_row_with_namespaced() { + let error_message = "error message"; + let id = GlobalId::User(1); + let status = "dropped"; + let row = pack_status_row( + id, + status, + Some(error_message), + &Default::default(), + &BTreeMap::from([("thing".to_string(), &"error".to_string())]), + 1000, + ); + + for (datum, column_type) in row.iter().zip(MZ_SINK_STATUS_HISTORY_DESC.iter_types()) { + assert!(datum.is_instance_of(column_type)); + } + + for (datum, column_type) in row.iter().zip(MZ_SOURCE_STATUS_HISTORY_DESC.iter_types()) { + assert!(datum.is_instance_of(column_type)); + } + + assert_eq!(row.iter().nth(1).unwrap(), Datum::String(&id.to_string())); + assert_eq!(row.iter().nth(2).unwrap(), Datum::String(status)); + assert_eq!(row.iter().nth(3).unwrap(), Datum::String(error_message)); + + let details = row + .iter() + .nth(4) + .unwrap() + .unwrap_map() + .iter() + .collect::>(); + + assert_eq!(details.len(), 1); + let ns_datum = &details[0]; + + assert_eq!(ns_datum.0, "namespaced"); + assert_eq!( + ns_datum.1.unwrap_map().iter().next().unwrap(), + ("thing", Datum::String("error")) + ); + } + + #[mz_ore::test] + fn test_row_with_everything() { + let error_message = "error message"; + let hint = "hint message"; + let id = GlobalId::User(1); + let status = "dropped"; + let row = pack_status_row( + id, + status, + Some(error_message), + &BTreeSet::from([hint.to_string()]), + &BTreeMap::from([("thing".to_string(), &"error".to_string())]), + 1000, + ); + + for (datum, column_type) in row.iter().zip(MZ_SINK_STATUS_HISTORY_DESC.iter_types()) { + assert!(datum.is_instance_of(column_type)); + } + + for (datum, column_type) in row.iter().zip(MZ_SOURCE_STATUS_HISTORY_DESC.iter_types()) { + assert!(datum.is_instance_of(column_type)); + } + + assert_eq!(row.iter().nth(1).unwrap(), Datum::String(&id.to_string())); + assert_eq!(row.iter().nth(2).unwrap(), Datum::String(status)); + assert_eq!(row.iter().nth(3).unwrap(), Datum::String(error_message)); + + let details = row + .iter() + .nth(4) + .unwrap() + .unwrap_map() + .iter() + .collect::>(); + + assert_eq!(details.len(), 2); + // These are always sorted + let hint_datum = &details[0]; + let ns_datum = &details[1]; + + assert_eq!(hint_datum.0, "hints"); + assert_eq!( + hint_datum.1.unwrap_list().iter().next().unwrap(), + Datum::String(hint) + ); + + assert_eq!(ns_datum.0, "namespaced"); assert_eq!( - row.iter() - .nth(4) - .unwrap() - .unwrap_map() - .iter() - .collect::>(), - vec![("hint", Datum::String(hint))] + ns_datum.1.unwrap_map().iter().next().unwrap(), + ("thing", Datum::String("error")) ); } } diff --git a/src/storage-controller/src/lib.rs b/src/storage-controller/src/lib.rs index c7a7af40ab478..2762059c871d7 100644 --- a/src/storage-controller/src/lib.rs +++ b/src/storage-controller/src/lib.rs @@ -1631,7 +1631,14 @@ where self.introspection_ids[&IntrospectionType::SourceStatusHistory]; let mut updates = vec![]; for id in pending_source_drops.drain(..) { - let status_row = healthcheck::pack_status_row(id, "dropped", None, (self.now)(), None); + let status_row = healthcheck::pack_status_row( + id, + "dropped", + None, + &Default::default(), + &Default::default(), + (self.now)(), + ); updates.push((status_row, 1)); } @@ -1651,8 +1658,14 @@ where { let mut sink_statistics = self.sink_statistics.lock().expect("poisoned"); for id in pending_sink_drops.drain(..) { - let status_row = - healthcheck::pack_status_row(id, "dropped", None, (self.now)(), None); + let status_row = healthcheck::pack_status_row( + id, + "dropped", + None, + &Default::default(), + &Default::default(), + (self.now)(), + ); updates.push((status_row, 1)); sink_statistics.remove(&id); diff --git a/src/storage-types/src/parameters.proto b/src/storage-types/src/parameters.proto index 1e03b13819d39..474ce4a5ae6ce 100644 --- a/src/storage-types/src/parameters.proto +++ b/src/storage-types/src/parameters.proto @@ -34,6 +34,7 @@ message ProtoStorageParameters { uint64 statement_logging_retention_time_seconds = 15; ProtoPgSourceTcpTimeouts pg_source_tcp_timeouts = 16; mz_proto.ProtoDuration pg_source_snapshot_statement_timeout = 17; + bool record_namespaced_errors = 18; } diff --git a/src/storage-types/src/parameters.rs b/src/storage-types/src/parameters.rs index e99108ddcf334..360b86b2604a1 100644 --- a/src/storage-types/src/parameters.rs +++ b/src/storage-types/src/parameters.rs @@ -24,7 +24,7 @@ include!(concat!(env!("OUT_DIR"), "/mz_storage_types.parameters.rs")); /// /// Parameters can be set (`Some`) or unset (`None`). /// Unset parameters should be interpreted to mean "use the previous value". -#[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq)] +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] pub struct StorageParameters { /// Persist client configuration. pub persist: PersistParameters, @@ -55,6 +55,34 @@ pub struct StorageParameters { /// How long entries in the statement log should be retained, in seconds. /// Ignored if `truncate_statement_log` is false. pub statement_logging_retention_time_seconds: u64, + /// Whether or not to record errors by namespace in the `details` + /// column of the status history tables. + pub record_namespaced_errors: bool, +} + +// Implement `Default` manually, so that the default can match the +// LD default. This is not strictly necessary, but improves clarity. +impl Default for StorageParameters { + fn default() -> Self { + Self { + persist: Default::default(), + pg_source_tcp_timeouts: Default::default(), + pg_source_snapshot_statement_timeout: Default::default(), + keep_n_source_status_history_entries: Default::default(), + keep_n_sink_status_history_entries: Default::default(), + upsert_rocksdb_tuning_config: Default::default(), + finalize_shards: Default::default(), + tracing: Default::default(), + upsert_auto_spill_config: Default::default(), + storage_dataflow_max_inflight_bytes_config: Default::default(), + grpc_client: Default::default(), + delay_sources_past_rehydration: Default::default(), + shrink_upsert_unused_buffers_by_ratio: Default::default(), + truncate_statement_log: Default::default(), + statement_logging_retention_time_seconds: Default::default(), + record_namespaced_errors: true, + } + } } #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] @@ -141,6 +169,7 @@ impl StorageParameters { shrink_upsert_unused_buffers_by_ratio, statement_logging_retention_time_seconds, truncate_statement_log, + record_namespaced_errors, }: StorageParameters, ) { self.persist.update(persist); @@ -160,6 +189,7 @@ impl StorageParameters { self.shrink_upsert_unused_buffers_by_ratio = shrink_upsert_unused_buffers_by_ratio; self.statement_logging_retention_time_seconds = statement_logging_retention_time_seconds; self.truncate_statement_log = truncate_statement_log; + self.record_namespaced_errors = record_namespaced_errors; } } @@ -191,6 +221,7 @@ impl RustType for StorageParameters { ), truncate_statement_log: self.truncate_statement_log, statement_logging_retention_time_seconds: self.statement_logging_retention_time_seconds, + record_namespaced_errors: self.record_namespaced_errors, } } @@ -238,6 +269,7 @@ impl RustType for StorageParameters { truncate_statement_log: proto.truncate_statement_log, statement_logging_retention_time_seconds: proto .statement_logging_retention_time_seconds, + record_namespaced_errors: proto.record_namespaced_errors, }) } } diff --git a/src/storage/src/decode/mod.rs b/src/storage/src/decode/mod.rs index c8476bc7ae3e8..691237e009805 100644 --- a/src/storage/src/decode/mod.rs +++ b/src/storage/src/decode/mod.rs @@ -42,8 +42,9 @@ use crate::decode::avro::AvroDecoderState; use crate::decode::csv::CsvDecoderState; use crate::decode::metrics::DecodeMetrics; use crate::decode::protobuf::ProtobufDecoderState; +use crate::healthcheck::HealthStatusUpdate; use crate::render::sources::OutputIndex; -use crate::source::types::{DecodeResult, HealthStatus, HealthStatusUpdate, SourceOutput}; +use crate::source::types::{DecodeResult, SourceOutput}; mod avro; mod csv; @@ -499,13 +500,7 @@ where }); let health = transient_errors.map(|err: Rc| { - let halt_status = HealthStatusUpdate { - update: HealthStatus::StalledWithError { - error: err.display_with_causes().to_string(), - hint: None, - }, - should_halt: true, - }; + let halt_status = HealthStatusUpdate::halting(err.display_with_causes().to_string(), None); (0, halt_status) }); diff --git a/src/storage/src/healthcheck.rs b/src/storage/src/healthcheck.rs index 2f59909e99526..8bbb0ba1835ad 100644 --- a/src/storage/src/healthcheck.rs +++ b/src/storage/src/healthcheck.rs @@ -10,7 +10,10 @@ //! Healthcheck common use std::any::Any; +use std::cell::RefCell; +use std::collections::btree_map::Entry; use std::collections::{BTreeMap, BTreeSet}; +use std::fmt; use std::fmt::Debug; use std::rc::Rc; use std::sync::Arc; @@ -20,6 +23,7 @@ use differential_dataflow::lattice::Lattice; use differential_dataflow::Hashable; use mz_ore::cast::CastFrom; use mz_ore::now::NowFn; +use mz_persist_client::cache::PersistClientCache; use mz_persist_client::PersistLocation; use mz_persist_client::{Diagnostics, PersistClient, ShardId}; use mz_persist_types::codec_impls::UnitSchema; @@ -31,27 +35,36 @@ use timely::dataflow::operators::{Enter, Map}; use timely::dataflow::scopes::Child; use timely::dataflow::{Scope, Stream}; use timely::progress::Antichain; -use tracing::{info, trace, warn}; +use tracing::{error, info, trace}; -use crate::internal_control::InternalStorageCommand; +use crate::internal_control::{InternalCommandSender, InternalStorageCommand}; pub async fn write_to_persist( collection_id: GlobalId, new_status: &str, new_error: Option<&str>, + hints: &BTreeSet, + namespaced_errors: &BTreeMap, now: NowFn, client: &PersistClient, status_shard: ShardId, relation_desc: &RelationDesc, - hint: Option<&str>, ) { let now_ms = now(); let row = mz_storage_client::healthcheck::pack_status_row( collection_id, new_status, new_error, + hints, + // `pack_status_row` requires keys sorted by their string representation. + // This is technically an few extra allocations, but its relatively rare so + // we don't worry about it. Sorting it in a vec like with `Itertools::sorted` + // would also cost allocations. + &namespaced_errors + .iter() + .map(|(ns, val)| (ns.to_string(), val)) + .collect(), now_ms, - hint, ); let mut handle = client @@ -118,16 +131,174 @@ pub(crate) struct ObjectHealthConfig { pub(crate) persist_location: PersistLocation, } -struct HealthState<'a, O> { +/// The namespace of the update. The `Ord` impl matter here, later variants are +/// displayed over earlier ones. +/// +/// Some namespaces (referred to as "sidechannels") can come from any worker_id, +/// and `Running` statuses from them do not mark the entire object as running. +/// +/// Ensure you update `is_sidechannel` when adding variants. +#[derive(Copy, Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] +pub enum StatusNamespace { + /// A normal status namespaces. Any `Running` status from any worker will mark the object + /// `Running`. + Generator, + TestScript, + Kafka, + Postgres, + Ssh, + Upsert, + Decode, + Internal, +} + +impl StatusNamespace { + fn is_sidechannel(&self) -> bool { + matches!(self, StatusNamespace::Ssh) + } +} + +impl fmt::Display for StatusNamespace { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + use StatusNamespace::*; + match self { + Generator => write!(f, "generator"), + TestScript => write!(f, "testscript"), + Kafka => write!(f, "kafka"), + Postgres => write!(f, "postgres"), + Ssh => write!(f, "ssh"), + Upsert => write!(f, "upsert"), + Decode => write!(f, "decode"), + Internal => write!(f, "internal"), + } + } +} + +struct PerWorkerHealthStatus { + pub(crate) errors_by_worker: Vec>, +} + +impl PerWorkerHealthStatus { + fn merge_update( + &mut self, + mut worker: usize, + namespace: StatusNamespace, + update: HealthStatusUpdate, + only_greater: bool, + ) { + if namespace.is_sidechannel() { + worker = 0; + } + + let errors = &mut self.errors_by_worker[worker]; + match errors.entry(namespace) { + Entry::Vacant(v) => { + v.insert(update); + } + Entry::Occupied(mut o) => { + if !only_greater || o.get() < &update { + o.insert(update); + } + } + } + } + + fn decide_status(&self) -> OverallStatus { + let mut output_status = OverallStatus::Starting; + let mut namespaced_errors: BTreeMap = BTreeMap::new(); + let mut hints: BTreeSet = BTreeSet::new(); + + for status in self.errors_by_worker.iter() { + use HealthStatusUpdate::*; + for (ns, ns_status) in status.iter() { + if let Stalled { error, hint, .. } = ns_status { + if Some(error) > namespaced_errors.get(ns).as_deref() { + namespaced_errors.insert(*ns, error.to_string()); + } + + if let Some(hint) = hint { + hints.insert(hint.to_string()); + } + } + + if !ns.is_sidechannel() && matches!(ns_status, HealthStatusUpdate::Running) { + output_status = OverallStatus::Running; + } + } + } + + if !namespaced_errors.is_empty() { + // Pick the most important error. + let (ns, err) = namespaced_errors.last_key_value().unwrap(); + output_status = OverallStatus::Stalled { + error: format!("{}: {}", ns, err), + hints, + namespaced_errors, + } + } + + output_status + } +} + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] +pub enum OverallStatus { + Starting, + Running, + Stalled { + error: String, + hints: BTreeSet, + namespaced_errors: BTreeMap, + }, +} + +impl OverallStatus { + /// The user-readable name of the status state. + pub(crate) fn name(&self) -> &'static str { + match self { + OverallStatus::Starting => "starting", + OverallStatus::Running => "running", + OverallStatus::Stalled { .. } => "stalled", + } + } + + /// The user-readable error string, if there is one. + pub(crate) fn error(&self) -> Option<&str> { + match self { + OverallStatus::Starting | OverallStatus::Running => None, + OverallStatus::Stalled { error, .. } => Some(error), + } + } + + /// A set of namespaced errors, if there are any. + pub(crate) fn errors(&self) -> Option<&BTreeMap> { + match self { + OverallStatus::Starting | OverallStatus::Running => None, + OverallStatus::Stalled { + namespaced_errors, .. + } => Some(namespaced_errors), + } + } + + /// A set of hints, if there are any. + pub(crate) fn hints(&self) -> Option<&BTreeSet> { + match self { + OverallStatus::Starting | OverallStatus::Running => None, + OverallStatus::Stalled { hints, .. } => Some(hints), + } + } +} + +struct HealthState<'a> { id: GlobalId, persist_details: Option<(ShardId, &'a PersistClient)>, - healths: Vec>, + healths: PerWorkerHealthStatus, schema: &'static RelationDesc, - last_reported_status: Option, - halt_with: Option, + last_reported_status: Option, + halt_with: Option<(StatusNamespace, HealthStatusUpdate)>, } -impl<'a, O: Clone> HealthState<'a, O> { +impl<'a> HealthState<'a> { fn new( id: GlobalId, status_shard: Option, @@ -135,7 +306,7 @@ impl<'a, O: Clone> HealthState<'a, O> { persist_clients: &'a BTreeMap, schema: &'static RelationDesc, worker_count: usize, - ) -> HealthState<'a, O> { + ) -> HealthState<'a> { let persist_details = match (status_shard, persist_clients.get(&persist_location)) { (Some(shard), Some(persist_client)) => Some((shard, persist_client)), _ => None, @@ -144,7 +315,9 @@ impl<'a, O: Clone> HealthState<'a, O> { HealthState { id, persist_details, - healths: vec![None; worker_count], + healths: PerWorkerHealthStatus { + errors_by_worker: vec![Default::default(); worker_count], + }, schema, last_reported_status: None, halt_with: None, @@ -152,32 +325,91 @@ impl<'a, O: Clone> HealthState<'a, O> { } } -/// A trait that lets a user of the `health_operator` specify a health status object. -/// -/// In addition to these methods, heath statuses should: -/// - Be cloneable/exhangeable so they can be moved around in timely. -/// - `Debug`-printable. -/// - `Ord`, where the order increases in severity. The state returned by `starting()` should -/// be the minimum. -// TODO(guswynn): the implementors of this trait are probably merge-able. -pub(crate) trait HealthStatus: timely::ExchangeData + Debug + Ord { - /// The user-readable name of the status state. - fn name(&self) -> &'static str; - /// The user-readable error string, if there is one. - fn error(&self) -> Option<&str>; - /// A hint for solving the error, if there is one. - fn hint(&self) -> Option<&str>; - /// Whether or not we should halt the dataflow instances and restart it. - fn should_halt(&self) -> bool; - /// Whether or not we can transition from a state (or lack of one). +/// A trait that lets a user configure the `health_operator` with custom +/// behavior. This is mostly useful for testing, and the [`DefaultWriter`] +/// should be the correct implementation for everyone. +#[async_trait::async_trait(?Send)] +pub trait HealthOperator { + /// Record a new status. + async fn record_new_status( + &self, + collection_id: GlobalId, + new_status: &str, + new_error: Option<&str>, + hints: &BTreeSet, + namespaced_errors: &BTreeMap, + now: NowFn, + // TODO(guswynn): not urgent: + // Ideally this would be entirely included in the `DefaultWriter`, but that + // requires a fairly heavy change to the `health_operator`, which hardcodes + // some use of persist. For now we just leave it and ignore it in tests. + client: &PersistClient, + status_shard: ShardId, + relation_desc: &RelationDesc, + ); + async fn send_halt(&self, id: GlobalId, error: Option<(StatusNamespace, HealthStatusUpdate)>); + + /// Optionally override the chosen worker index. Default is semi-random. + /// Only useful for tests. + fn chosen_worker(&self) -> Option { + None + } +} + +/// A default `HealthOperator` for use in normal cases. +pub struct DefaultWriter(pub Rc>); + +#[async_trait::async_trait(?Send)] +impl HealthOperator for DefaultWriter { + async fn record_new_status( + &self, + collection_id: GlobalId, + new_status: &str, + new_error: Option<&str>, + hints: &BTreeSet, + namespaced_errors: &BTreeMap, + now: NowFn, + client: &PersistClient, + status_shard: ShardId, + relation_desc: &RelationDesc, + ) { + write_to_persist( + collection_id, + new_status, + new_error, + hints, + namespaced_errors, + now, + client, + status_shard, + relation_desc, + ) + .await + } + + async fn send_halt(&self, id: GlobalId, error: Option<(StatusNamespace, HealthStatusUpdate)>) { + self.0 + .borrow_mut() + .broadcast(InternalStorageCommand::SuspendAndRestart { + // Suspend and restart is expected to operate on the primary object and + // not any of the sub-objects + id, + reason: format!("{:?}", error), + }); + } +} + +/// A health message consumed by the `health_operator`. +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] +pub struct HealthStatusMessage { + /// The index of the object this message describes. /// - /// Each time this returns `true`, a new status message will be communicated to the user. - /// Note that messages that are identical except for `should_halt` don't need to be - /// able to transition between each other, that is handled separately. - fn can_transition_from(&self, other: Option<&Self>) -> bool; - /// A instance of a status that specifies that the object is starting. Used when the dataflow - /// starts up. - fn starting() -> Self; + /// Useful for sub-objects like sub-sources. + pub index: usize, + /// The namespace of the health update. + pub namespace: StatusNamespace, + /// The update itself. + pub update: HealthStatusUpdate, } /// Writes updates that come across `health_stream` to the collection's status shards, as identified @@ -187,10 +419,11 @@ pub(crate) trait HealthStatus: timely::ExchangeData + Debug + Ord { /// /// The `OutputIndex` values that come across `health_stream` must be a strict subset of thosema, /// `configs`'s keys. -pub(crate) fn health_operator<'g, G, O>( +pub(crate) fn health_operator<'g, G, P>( scope: &Child<'g, G, mz_repr::Timestamp>, - storage_state: &crate::storage_state::StorageState, - // A set of id's that should be marked as `HealthStatus::starting()` during startup. + persist_clients: Arc, + now: NowFn, + // A set of id's that should be marked as `HealthStatusUpdate::starting()` during startup. mark_starting: BTreeSet, // An id that is allowed to halt the dataflow. Others are ignored, and panic during debug // mode. @@ -198,29 +431,33 @@ pub(crate) fn health_operator<'g, G, O>( // A description of the object type we are writing status updates about. Used in log lines. object_type: &'static str, // An indexed stream of health updates. Indexes are configured in `configs`. - health_stream: &Stream, + health_stream: &Stream, // A configuration per _index_. Indexes support things like subsources, and allow the // `health_operator` to understand where each sub-object should have its status written down // at. configs: BTreeMap, + // An impl of `HealthOperator` that configures the output behavior of this operator. + health_operator_impl: P, ) -> Rc where G: Scope, - O: HealthStatus, + P: HealthOperator + 'static, { // Derived config options let healthcheck_worker_id = scope.index(); let worker_count = scope.peers(); - let now = storage_state.now.clone(); - let persist_clients = Arc::clone(&storage_state.persist_clients); - let internal_cmd_tx = Rc::clone(&storage_state.internal_cmd_tx); // Inject the originating worker id to each item before exchanging to the chosen worker let health_stream = health_stream.map(move |status| (healthcheck_worker_id, status)); - // We'll route all the work to a single arbitrary worker; - // there's not much to do, and we need a global view. - let chosen_worker_id = usize::cast_from(configs.keys().next().hashed()) % worker_count; + let chosen_worker_id = if let Some(index) = health_operator_impl.chosen_worker() { + index + } else { + // We'll route all the work to a single arbitrary worker; + // there's not much to do, and we need a global view. + usize::cast_from(configs.keys().next().hashed()) % worker_count + }; + let is_active_worker = chosen_worker_id == healthcheck_worker_id; let operator_name = format!("healthcheck({})", healthcheck_worker_id); @@ -291,7 +528,7 @@ where )| { ( output_idx, - HealthState::::new( + HealthState::new( id, status_shard, persist_location, @@ -309,18 +546,20 @@ where for state in health_states.values_mut() { if mark_starting.contains(&state.id) { if let Some((status_shard, persist_client)) = state.persist_details { - let status = O::starting(); - write_to_persist( - state.id, - status.name(), - status.error(), - now.clone(), - persist_client, - status_shard, - state.schema, - status.hint(), - ) - .await; + let status = OverallStatus::Starting; + health_operator_impl + .record_new_status( + state.id, + status.name(), + status.error(), + status.hints().unwrap_or(&BTreeSet::new()), + status.errors().unwrap_or(&BTreeMap::new()), + now.clone(), + persist_client, + status_shard, + state.schema, + ) + .await; state.last_reported_status = Some(status); } @@ -331,7 +570,15 @@ where let mut outputs_seen = BTreeSet::new(); while let Some(event) = input.next_mut().await { if let AsyncEvent::Data(_cap, rows) = event { - for (worker_id, (output_index, health_event)) in rows.drain(..) { + for ( + worker_id, + HealthStatusMessage { + index: output_index, + namespace: ns, + update: health_event, + }, + ) in rows.drain(..) + { let HealthState { id, healths, @@ -348,22 +595,17 @@ where let new_round = outputs_seen.insert(output_index); if !is_active_worker { - warn!( + error!( "Health messages for {object_type} {id} passed to \ an unexpected worker id: {healthcheck_worker_id}" ) } if health_event.should_halt() { - *halt_with = Some(health_event.clone()); + *halt_with = Some((ns.clone(), health_event.clone())); } - let update = Some(health_event); - // Keep the max of the messages in each round; this ensures that errors don't - // get lost while also letting us frequently update to the newest status. - if new_round || &healths[worker_id] < &update { - healths[worker_id] = update; - } + healths.merge_update(worker_id, ns, health_event, !new_round); } let mut halt_with_outer = None; @@ -380,30 +622,31 @@ where .get_mut(&output_index) .expect("known to exist"); - let overall_status = healths.iter().filter_map(Option::as_ref).max(); + let new_status = healths.decide_status(); - if let Some(new_status) = overall_status { - if new_status.can_transition_from(last_reported_status.as_ref()) { - info!( - "Health transition for {object_type} {id}: \ - {last_reported_status:?} -> {new_status:?}" - ); - if let Some((status_shard, persist_client)) = persist_details { - write_to_persist( + if Some(&new_status) != last_reported_status.as_ref() { + info!( + "Health transition for {object_type} {id}: \ + {last_reported_status:?} -> {:?}", + Some(&new_status) + ); + if let Some((status_shard, persist_client)) = persist_details { + health_operator_impl + .record_new_status( *id, new_status.name(), new_status.error(), + new_status.hints().unwrap_or(&BTreeSet::new()), + new_status.errors().unwrap_or(&BTreeMap::new()), now.clone(), persist_client, *status_shard, schema, - new_status.hint(), ) .await; - } - - *last_reported_status = Some(new_status.clone()); } + + *last_reported_status = Some(new_status.clone()); } // Set halt with if None. @@ -432,14 +675,7 @@ where halt_with, SUSPEND_AND_RESTART_DELAY ); tokio::time::sleep(SUSPEND_AND_RESTART_DELAY).await; - internal_cmd_tx.borrow_mut().broadcast( - InternalStorageCommand::SuspendAndRestart { - // Suspend and restart is expected to operate on the primary object and - // not any of the sub-objects - id, - reason: format!("{:?}", halt_with), - }, - ); + health_operator_impl.send_halt(id, halt_with).await; } } } @@ -447,3 +683,631 @@ where Rc::new(button.press_on_drop()) } + +use serde::{Deserialize, Serialize}; + +/// NB: we derive Ord here, so the enum order matters. Generally, statuses later in the list +/// take precedence over earlier ones: so if one worker is stalled, we'll consider the entire +/// source to be stalled. +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] +pub enum HealthStatusUpdate { + Running, + Stalled { + error: String, + hint: Option, + should_halt: bool, + }, +} + +impl HealthStatusUpdate { + /// Generates a running [`HealthStatusUpdate`]. + pub(crate) fn running() -> Self { + HealthStatusUpdate::Running + } + + /// Generates a non-halting [`HealthStatusUpdate`] with `update`. + pub(crate) fn stalled(error: String, hint: Option) -> Self { + HealthStatusUpdate::Stalled { + error, + hint, + should_halt: false, + } + } + + /// Generates a halting [`HealthStatusUpdate`] with `update`. + pub(crate) fn halting(error: String, hint: Option) -> Self { + HealthStatusUpdate::Stalled { + error, + hint, + should_halt: true, + } + } + + /// Whether or not we should halt the dataflow instances and restart it. + pub(crate) fn should_halt(&self) -> bool { + match self { + HealthStatusUpdate::Running => false, + HealthStatusUpdate::Stalled { should_halt, .. } => *should_halt, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use itertools::Itertools; + + // Actual timely tests for `health_operator`. + + #[mz_ore::test] + #[cfg_attr(miri, ignore)] // Reports undefined behavior + fn test_health_operator_basic() { + use Step::*; + + // Test 2 inputs across 2 workers. + health_operator_runner( + 2, + 2, + vec![ + AssertStatus(vec![ + // Assert both inputs started. + StatusToAssert { + collection_index: 0, + status: "starting".to_string(), + ..Default::default() + }, + StatusToAssert { + collection_index: 1, + status: "starting".to_string(), + ..Default::default() + }, + ]), + // Update and assert one is running. + Update(TestUpdate { + worker_id: 1, + namespace: StatusNamespace::TestScript, + input_index: 0, + update: HealthStatusUpdate::running(), + }), + AssertStatus(vec![StatusToAssert { + collection_index: 0, + status: "running".to_string(), + ..Default::default() + }]), + // Assert the other can be stalled by 1 worker. + // + // TODO(guswynn): ideally we could push these updates + // at the same time, but because they are coming from separately + // workers, they could end up in different rounds, causing flakes. + // For now, we just do this. + Update(TestUpdate { + worker_id: 1, + namespace: StatusNamespace::TestScript, + input_index: 1, + update: HealthStatusUpdate::running(), + }), + AssertStatus(vec![StatusToAssert { + collection_index: 1, + status: "running".to_string(), + ..Default::default() + }]), + Update(TestUpdate { + worker_id: 0, + namespace: StatusNamespace::TestScript, + input_index: 1, + update: HealthStatusUpdate::stalled("uhoh".to_string(), None), + }), + AssertStatus(vec![StatusToAssert { + collection_index: 1, + status: "stalled".to_string(), + error: Some("testscript: uhoh".to_string()), + errors: Some("testscript: uhoh".to_string()), + ..Default::default() + }]), + // And that it can recover. + Update(TestUpdate { + worker_id: 0, + namespace: StatusNamespace::TestScript, + input_index: 1, + update: HealthStatusUpdate::running(), + }), + AssertStatus(vec![StatusToAssert { + collection_index: 1, + status: "running".to_string(), + ..Default::default() + }]), + ], + ); + } + + #[mz_ore::test] + #[cfg_attr(miri, ignore)] // Reports undefined behavior + fn test_health_operator_namespaces() { + use Step::*; + + // Test 2 inputs across 2 workers. + health_operator_runner( + 2, + 1, + vec![ + AssertStatus(vec![ + // Assert both inputs started. + StatusToAssert { + collection_index: 0, + status: "starting".to_string(), + ..Default::default() + }, + ]), + // Assert that we merge namespaced errors correctly. + // + // Note that these all happen on the same worker id. + Update(TestUpdate { + worker_id: 0, + namespace: StatusNamespace::TestScript, + input_index: 0, + update: HealthStatusUpdate::stalled("uhoh".to_string(), None), + }), + AssertStatus(vec![StatusToAssert { + collection_index: 0, + status: "stalled".to_string(), + error: Some("testscript: uhoh".to_string()), + errors: Some("testscript: uhoh".to_string()), + ..Default::default() + }]), + Update(TestUpdate { + worker_id: 0, + namespace: StatusNamespace::Kafka, + input_index: 0, + update: HealthStatusUpdate::stalled("uhoh".to_string(), None), + }), + AssertStatus(vec![StatusToAssert { + collection_index: 0, + status: "stalled".to_string(), + error: Some("kafka: uhoh".to_string()), + errors: Some("testscript: uhoh, kafka: uhoh".to_string()), + ..Default::default() + }]), + // And that it can recover. + Update(TestUpdate { + worker_id: 0, + namespace: StatusNamespace::Kafka, + input_index: 0, + update: HealthStatusUpdate::running(), + }), + AssertStatus(vec![StatusToAssert { + collection_index: 0, + status: "stalled".to_string(), + error: Some("testscript: uhoh".to_string()), + errors: Some("testscript: uhoh".to_string()), + ..Default::default() + }]), + Update(TestUpdate { + worker_id: 0, + namespace: StatusNamespace::TestScript, + input_index: 0, + update: HealthStatusUpdate::running(), + }), + AssertStatus(vec![StatusToAssert { + collection_index: 0, + status: "running".to_string(), + ..Default::default() + }]), + ], + ); + } + + #[mz_ore::test] + #[cfg_attr(miri, ignore)] // Reports undefined behavior + fn test_health_operator_namespace_side_channel() { + use Step::*; + + health_operator_runner( + 2, + 1, + vec![ + AssertStatus(vec![ + // Assert both inputs started. + StatusToAssert { + collection_index: 0, + status: "starting".to_string(), + ..Default::default() + }, + ]), + // Assert that sidechannel namespaces don't downgrade the status + // + // Note that these all happen on the same worker id. + Update(TestUpdate { + worker_id: 0, + namespace: StatusNamespace::Ssh, + input_index: 0, + update: HealthStatusUpdate::stalled("uhoh".to_string(), None), + }), + AssertStatus(vec![StatusToAssert { + collection_index: 0, + status: "stalled".to_string(), + error: Some("ssh: uhoh".to_string()), + errors: Some("ssh: uhoh".to_string()), + ..Default::default() + }]), + // Note this is from a different work, but it merges as if its from + // the same worker. + Update(TestUpdate { + worker_id: 1, + namespace: StatusNamespace::Ssh, + input_index: 0, + update: HealthStatusUpdate::stalled("uhoh2".to_string(), None), + }), + AssertStatus(vec![StatusToAssert { + collection_index: 0, + status: "stalled".to_string(), + error: Some("ssh: uhoh2".to_string()), + errors: Some("ssh: uhoh2".to_string()), + ..Default::default() + }]), + Update(TestUpdate { + worker_id: 0, + namespace: StatusNamespace::Ssh, + input_index: 0, + update: HealthStatusUpdate::running(), + }), + // We haven't starting running yet, as a `Default` namespace hasn't told us. + AssertStatus(vec![StatusToAssert { + collection_index: 0, + status: "starting".to_string(), + ..Default::default() + }]), + Update(TestUpdate { + worker_id: 0, + namespace: StatusNamespace::TestScript, + input_index: 0, + update: HealthStatusUpdate::running(), + }), + AssertStatus(vec![StatusToAssert { + collection_index: 0, + status: "running".to_string(), + ..Default::default() + }]), + ], + ); + } + + #[mz_ore::test] + #[cfg_attr(miri, ignore)] // Reports undefined behavior + fn test_health_operator_hints() { + use Step::*; + + health_operator_runner( + 2, + 1, + vec![ + AssertStatus(vec![ + // Assert both inputs started. + StatusToAssert { + collection_index: 0, + status: "starting".to_string(), + ..Default::default() + }, + ]), + // Note that these all happen across worker ids. + Update(TestUpdate { + worker_id: 0, + namespace: StatusNamespace::TestScript, + input_index: 0, + update: HealthStatusUpdate::stalled( + "uhoh".to_string(), + Some("hint1".to_string()), + ), + }), + AssertStatus(vec![StatusToAssert { + collection_index: 0, + status: "stalled".to_string(), + error: Some("testscript: uhoh".to_string()), + errors: Some("testscript: uhoh".to_string()), + hint: Some("hint1".to_string()), + }]), + Update(TestUpdate { + worker_id: 1, + namespace: StatusNamespace::TestScript, + input_index: 0, + update: HealthStatusUpdate::stalled( + "uhoh2".to_string(), + Some("hint2".to_string()), + ), + }), + AssertStatus(vec![StatusToAssert { + collection_index: 0, + status: "stalled".to_string(), + // Note the error sorts later so we just use that. + error: Some("testscript: uhoh2".to_string()), + errors: Some("testscript: uhoh2".to_string()), + hint: Some("hint1, hint2".to_string()), + }]), + // Update one of the hints + Update(TestUpdate { + worker_id: 1, + namespace: StatusNamespace::TestScript, + input_index: 0, + update: HealthStatusUpdate::stalled( + "uhoh2".to_string(), + Some("hint3".to_string()), + ), + }), + AssertStatus(vec![StatusToAssert { + collection_index: 0, + status: "stalled".to_string(), + // Note the error sorts later so we just use that. + error: Some("testscript: uhoh2".to_string()), + errors: Some("testscript: uhoh2".to_string()), + hint: Some("hint1, hint3".to_string()), + }]), + // Assert recovery. + Update(TestUpdate { + worker_id: 0, + namespace: StatusNamespace::TestScript, + input_index: 0, + update: HealthStatusUpdate::running(), + }), + AssertStatus(vec![StatusToAssert { + collection_index: 0, + status: "stalled".to_string(), + // Note the error sorts later so we just use that. + error: Some("testscript: uhoh2".to_string()), + errors: Some("testscript: uhoh2".to_string()), + hint: Some("hint3".to_string()), + }]), + Update(TestUpdate { + worker_id: 1, + namespace: StatusNamespace::TestScript, + input_index: 0, + update: HealthStatusUpdate::running(), + }), + AssertStatus(vec![StatusToAssert { + collection_index: 0, + status: "running".to_string(), + ..Default::default() + }]), + ], + ); + } + + // The below is ALL test infrastructure for the above + + use timely::dataflow::operators::exchange::Exchange; + use timely::dataflow::Scope; + use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; + // Doesn't really matter which we use + use mz_storage_client::healthcheck::MZ_SOURCE_STATUS_HISTORY_DESC; + + /// A status to assert. + #[derive(Debug, Clone, Default, PartialEq, Eq)] + struct StatusToAssert { + collection_index: usize, + status: String, + error: Option, + errors: Option, + hint: Option, + } + + /// An update to push into the operator. + /// Can come from any worker, and from any input. + #[derive(Debug, Clone)] + struct TestUpdate { + worker_id: u64, + namespace: StatusNamespace, + input_index: usize, + update: HealthStatusUpdate, + } + + #[derive(Debug, Clone)] + enum Step { + /// Insert a new health update. + Update(TestUpdate), + /// Assert a set of outputs. Note that these should + /// have unique `collection_index`'s + AssertStatus(Vec), + } + + struct TestWriter { + sender: UnboundedSender, + input_mapping: BTreeMap, + } + + #[async_trait::async_trait(?Send)] + impl HealthOperator for TestWriter { + async fn record_new_status( + &self, + collection_id: GlobalId, + new_status: &str, + new_error: Option<&str>, + hints: &BTreeSet, + namespaced_errors: &BTreeMap, + _now: NowFn, + _client: &PersistClient, + _status_shard: ShardId, + _relation_desc: &RelationDesc, + ) { + let _ = self.sender.send(StatusToAssert { + collection_index: *self.input_mapping.get(&collection_id).unwrap(), + status: new_status.to_string(), + error: new_error.map(str::to_string), + errors: if !namespaced_errors.is_empty() { + Some( + namespaced_errors + .iter() + .map(|(ns, err)| format!("{}: {}", ns, err)) + .join(", "), + ) + } else { + None + }, + hint: if !hints.is_empty() { + Some(hints.iter().join(", ")) + } else { + None + }, + }); + } + + async fn send_halt( + &self, + _id: GlobalId, + _error: Option<(StatusNamespace, HealthStatusUpdate)>, + ) { + // Not yet unit-tested + unimplemented!() + } + + fn chosen_worker(&self) -> Option { + // We input and assert outputs on the first worker. + Some(0) + } + } + + /// Setup a `health_operator` with a set number of workers and inputs, and the + /// steps on the first worker. + fn health_operator_runner(workers: usize, inputs: usize, steps: Vec) { + let tokio_runtime = tokio::runtime::Runtime::new().unwrap(); + let tokio_handle = tokio_runtime.handle().clone(); + + let inputs: BTreeMap = (0..inputs) + .map(|index| (GlobalId::User(u64::cast_from(index)), index)) + .collect(); + + timely::execute::execute( + timely::execute::Config { + communication: timely::CommunicationConfig::Process(workers), + worker: Default::default(), + }, + move |worker| { + let steps = steps.clone(); + let inputs = inputs.clone(); + + let _tokio_guard = tokio_handle.enter(); + let (in_tx, in_rx) = unbounded_channel(); + let (out_tx, mut out_rx) = unbounded_channel(); + + worker.dataflow::<(), _, _>(|root_scope| { + root_scope + .clone() + .scoped::("gus", |scope| { + let input = producer(root_scope.clone(), in_rx); + Box::leak(Box::new(health_operator( + scope, + Arc::new(PersistClientCache::new_no_metrics()), + mz_ore::now::SYSTEM_TIME.clone(), + inputs.keys().copied().collect(), + *inputs.first_key_value().unwrap().0, + "source_test", + &input, + inputs + .iter() + .map(|(id, index)| { + ( + *index, + ObjectHealthConfig { + id: *id, + schema: &*MZ_SOURCE_STATUS_HISTORY_DESC, + status_shard: Some(ShardId::new()), + persist_location: PersistLocation::new_in_mem(), + }, + ) + }) + .collect(), + TestWriter { + sender: out_tx, + input_mapping: inputs, + }, + ))); + }); + }); + + // We arbitrarily do all the testing on the first worker. + if worker.index() == 0 { + use Step::*; + for step in steps { + match step { + Update(update) => { + let _ = in_tx.send(update); + } + AssertStatus(mut statuses) => loop { + match out_rx.try_recv() { + Err(_) => { + worker.step(); + // This makes testing easier. + std::thread::sleep(std::time::Duration::from_millis(50)); + } + Ok(update) => { + let pos = statuses + .iter() + .position(|s| { + s.collection_index == update.collection_index + }) + .unwrap(); + + let status_to_assert = &statuses[pos]; + assert_eq!(&update, status_to_assert); + + statuses.remove(pos); + if statuses.is_empty() { + break; + } + } + } + }, + } + } + + // Assert that nothing is left in the channel. + assert!(out_rx.try_recv().is_err()); + } + }, + ) + .unwrap(); + } + + /// Produces (input_index, HealthStatusUpdate)'s based on the input channel. + /// + /// Only the first worker is used, all others immediately drop their capabilities and channels. + /// After the channel is empty on the first worker, then the frontier will go to []. + /// Also ensures that updates are routed to the correct worker based on the `TestUpdate` + /// using an exchange. + fn producer>( + scope: G, + mut input: UnboundedReceiver, + ) -> Stream { + let mut iterator = AsyncOperatorBuilder::new("iterator".to_string(), scope.clone()); + let (mut output_handle, output) = iterator.new_output(); + + let index = scope.index(); + iterator.build(|mut caps| async move { + // We input and assert outputs on the first worker. + if index != 0 { + return; + } + let mut capability = Some(caps.pop().unwrap()); + while let Some(element) = input.recv().await { + output_handle + .give( + capability.as_ref().unwrap(), + ( + element.worker_id, + element.input_index, + element.namespace, + element.update, + ), + ) + .await; + } + + capability.take(); + }); + + let output = output.exchange(|d| d.0).map(|d| HealthStatusMessage { + index: d.1, + namespace: d.2, + update: d.3, + }); + + output + } +} diff --git a/src/storage/src/internal_control.rs b/src/storage/src/internal_control.rs index 801a9ffbd6b39..5c5247da84004 100644 --- a/src/storage/src/internal_control.rs +++ b/src/storage/src/internal_control.rs @@ -43,6 +43,9 @@ pub struct DataflowParameters { /// Configuration ratio to shrink upsert buffers. /// Defaults to 0, which means no shrinking will happen. pub shrink_upsert_unused_buffers_by_ratio: usize, + /// Whether or not to record errors by namespace in the `details` + /// column of the status history tables. + pub record_namespaced_errors: bool, } impl DataflowParameters { @@ -63,6 +66,8 @@ impl DataflowParameters { storage_dataflow_max_inflight_bytes_config: Default::default(), delay_sources_past_rehydration: Default::default(), shrink_upsert_unused_buffers_by_ratio: Default::default(), + // Note the default is true in `vars.rs` as well. + record_namespaced_errors: true, } } /// Update the `DataflowParameters` with new configuration. @@ -75,6 +80,7 @@ impl DataflowParameters { storage_dataflow_max_inflight_bytes_config: mz_storage_types::parameters::StorageMaxInflightBytesConfig, delay_sources_past_rehydration: bool, shrink_upsert_unused_buffers_by_ratio: usize, + record_namespaced_errors: bool, ) { self.pg_source_tcp_timeouts = pg_source_tcp_timeouts; self.pg_source_snapshot_statement_timeout = pg_source_snapshot_statement_timeout; @@ -84,6 +90,7 @@ impl DataflowParameters { storage_dataflow_max_inflight_bytes_config; self.delay_sources_past_rehydration = delay_sources_past_rehydration; self.shrink_upsert_unused_buffers_by_ratio = shrink_upsert_unused_buffers_by_ratio; + self.record_namespaced_errors = record_namespaced_errors; } } @@ -140,6 +147,8 @@ pub enum InternalStorageCommand { delay_sources_past_rehydration: bool, /// Configuration ratio to shrink upsert buffers by shrink_upsert_unused_buffers_by_ratio: usize, + /// Configuration for status history. + record_namespaced_errors: bool, }, } diff --git a/src/storage/src/render/mod.rs b/src/storage/src/render/mod.rs index 6cf6ccff1153f..013245d25ab6a 100644 --- a/src/storage/src/render/mod.rs +++ b/src/storage/src/render/mod.rs @@ -199,6 +199,7 @@ use std::collections::BTreeMap; use std::rc::Rc; +use std::sync::Arc; use mz_ore::error::ErrorExt; use mz_repr::{GlobalId, Row}; @@ -211,7 +212,8 @@ use timely::dataflow::Scope; use timely::progress::Antichain; use timely::worker::Worker as TimelyWorker; -use crate::source::types::{HealthStatus, HealthStatusUpdate, SourcePersistSinkMetrics}; +use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace}; +use crate::source::types::SourcePersistSinkMetrics; use crate::storage_state::StorageState; mod debezium; @@ -297,14 +299,13 @@ pub fn build_ingestion_dataflow( tokens.push(token); let sink_health = errors.map(|err: Rc| { - let halt_status = HealthStatusUpdate { - update: HealthStatus::StalledWithError { - error: err.display_with_causes().to_string(), - hint: None, - }, - should_halt: true, - }; - (0, halt_status) + let halt_status = + HealthStatusUpdate::halting(err.display_with_causes().to_string(), None); + HealthStatusMessage { + index: 0, + namespace: StatusNamespace::Internal, + update: halt_status, + } }); health_streams.push(sink_health.leave()); use mz_storage_client::healthcheck::MZ_SOURCE_STATUS_HISTORY_DESC; @@ -326,7 +327,8 @@ pub fn build_ingestion_dataflow( let health_stream = root_scope.concatenate(health_streams); let health_token = crate::healthcheck::health_operator( into_time_scope, - storage_state, + Arc::clone(&storage_state.persist_clients), + storage_state.now.clone(), resume_uppers .iter() .filter_map(|(id, frontier)| { @@ -339,6 +341,7 @@ pub fn build_ingestion_dataflow( "source", &health_stream, health_configs, + crate::healthcheck::DefaultWriter(Rc::clone(&storage_state.internal_cmd_tx)), ); tokens.push(health_token); @@ -397,12 +400,14 @@ pub fn build_export_dataflow( // `health_operator` has to do internally. let health_token = crate::healthcheck::health_operator( scope, - storage_state, + Arc::clone(&storage_state.persist_clients), + storage_state.now.clone(), [id].into_iter().collect(), id, "sink", &health_stream, health_configs, + crate::healthcheck::DefaultWriter(Rc::clone(&storage_state.internal_cmd_tx)), ); tokens.push(health_token); diff --git a/src/storage/src/render/sinks.rs b/src/storage/src/render/sinks.rs index 3d7e8404ae2eb..03116d7a09382 100644 --- a/src/storage/src/render/sinks.rs +++ b/src/storage/src/render/sinks.rs @@ -27,12 +27,12 @@ use mz_storage_types::errors::DataflowError; use mz_storage_types::sinks::{ MetadataFilled, SinkEnvelope, StorageSinkConnection, StorageSinkDesc, }; -use timely::dataflow::operators::{Leave, Map}; +use timely::dataflow::operators::Leave; use timely::dataflow::scopes::Child; use timely::dataflow::{Scope, Stream}; use tracing::warn; -use crate::sink::SinkStatus; +use crate::healthcheck::HealthStatusMessage; use crate::storage_state::StorageState; /// _Renders_ complete _differential_ [`Collection`]s @@ -44,7 +44,7 @@ pub(crate) fn render_sink<'g, G: Scope>( tokens: &mut Vec>, sink_id: GlobalId, sink: &StorageSinkDesc, -) -> Stream { +) -> Stream { let sink_render = get_sink_render_for(&sink.connection); let (ok_collection, err_collection, source_token) = persist_source::persist_source( @@ -76,7 +76,7 @@ pub(crate) fn render_sink<'g, G: Scope>( tokens.push(sink_token); } - health.map(|status| (0, status)).leave() + health.leave() } #[allow(clippy::borrowed_box)] @@ -245,7 +245,7 @@ where sink_id: GlobalId, sinked_collection: Collection, Option), Diff>, err_collection: Collection, - ) -> (Stream, Option>) + ) -> (Stream, Option>) where G: Scope; } diff --git a/src/storage/src/render/sources.rs b/src/storage/src/render/sources.rs index 2603a31f2d24a..b3d6fbfa3fc38 100644 --- a/src/storage/src/render/sources.rs +++ b/src/storage/src/render/sources.rs @@ -33,14 +33,15 @@ use mz_timely_util::operator::CollectionExt; use mz_timely_util::order::refine_antichain; use serde::{Deserialize, Serialize}; use timely::dataflow::operators::generic::operator::empty; -use timely::dataflow::operators::{Concat, ConnectLoop, Exchange, Feedback, Leave, OkErr}; +use timely::dataflow::operators::{Concat, ConnectLoop, Exchange, Feedback, Leave, Map, OkErr}; use timely::dataflow::scopes::{Child, Scope}; use timely::dataflow::Stream; use timely::progress::{Antichain, Timestamp as _}; use crate::decode::{render_decode_cdcv2, render_decode_delimited}; +use crate::healthcheck::{HealthStatusMessage, StatusNamespace}; use crate::render::upsert::UpsertKey; -use crate::source::types::{DecodeResult, HealthStatusUpdate, SourceOutput}; +use crate::source::types::{DecodeResult, SourceOutput}; use crate::source::{self, RawSourceCreationConfig, SourceCreationParams}; /// A type-level enum that holds one of two types of sources depending on their message type @@ -84,7 +85,7 @@ pub fn render_source<'g, G: Scope>( Collection, Row, Diff>, Collection, DataflowError, Diff>, )>, - Stream, + Stream, Rc, ) { // Tokens that we should return from the method. @@ -261,7 +262,7 @@ fn render_source_stream( Collection, Collection, Vec>, - Stream, + Stream, ) where G: Scope, @@ -549,7 +550,11 @@ where None => upsert.as_collection(), }; - (upsert.leave(), health_update.leave()) + (upsert.leave(), health_update.map(|(index, update)| HealthStatusMessage { + index, + namespace: StatusNamespace::Upsert, + update + }).leave()) }, ); @@ -577,7 +582,13 @@ where ( envelope_ok, envelope_err, - decode_health.concat(&envelope_health), + decode_health + .map(|(index, update)| HealthStatusMessage { + index, + namespace: StatusNamespace::Decode, + update, + }) + .concat(&envelope_health), ) } }; diff --git a/src/storage/src/render/upsert.rs b/src/storage/src/render/upsert.rs index 4b659eed9d003..084a4ff37e710 100644 --- a/src/storage/src/render/upsert.rs +++ b/src/storage/src/render/upsert.rs @@ -39,12 +39,13 @@ use timely::dataflow::{Scope, ScopeParent, Stream}; use timely::order::{PartialOrder, TotalOrder}; use timely::progress::{Antichain, Timestamp}; +use crate::healthcheck::HealthStatusUpdate; use crate::render::sources::OutputIndex; use crate::render::upsert::types::{ upsert_bincode_opts, AutoSpillBackend, InMemoryHashMap, RocksDBParams, UpsertState, UpsertStateBackend, }; -use crate::source::types::{HealthStatus, HealthStatusUpdate, UpsertMetrics}; +use crate::source::types::UpsertMetrics; use crate::storage_state::StorageInstanceContext; mod rocksdb; @@ -728,13 +729,7 @@ async fn process_upsert_state_error( >, health_cap: &Capability<::Timestamp>, ) { - let update = HealthStatusUpdate { - update: HealthStatus::StalledWithError { - error: e.context(context).to_string_with_causes(), - hint: None, - }, - should_halt: true, - }; + let update = HealthStatusUpdate::halting(e.context(context).to_string_with_causes(), None); health_output.give(health_cap, (0, update)).await; std::future::pending::<()>().await; unreachable!("pending future never returns"); diff --git a/src/storage/src/sink/healthcheck.rs b/src/storage/src/sink/healthcheck.rs deleted file mode 100644 index 46ef0ffad3823..0000000000000 --- a/src/storage/src/sink/healthcheck.rs +++ /dev/null @@ -1,160 +0,0 @@ -// Copyright Materialize, Inc. and contributors. All rights reserved. -// -// Use of this software is governed by the Business Source License -// included in the LICENSE file. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0. - -//! Healthchecks for sinks -use std::fmt::Display; - -/// Identify the state a worker for a given source can be at a point in time -#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize, PartialOrd, Ord)] -pub enum SinkStatus { - /// Intended to be the state while the `clusterd` process is initializing itself - /// Pushed by the Healthchecker on creation. - Starting, - /// State indicating the sink is running fine. Pushed automatically as long - /// as rows are being consumed. - Running, - /// Represents a stall in the export process that might get resolved. - /// Existing data is still available and queryable. - Stalled { - /// Error string used to populate the `error` column in the `mz_sink_status_history` table. - error: String, - /// Optional hint string which if present, will be added to the `details` column in - /// the `mz_sink_status_history` table. - hint: Option, - }, - // Managed by the storage controller. - // Dropped, -} - -impl crate::healthcheck::HealthStatus for SinkStatus { - fn name(&self) -> &'static str { - self.name() - } - fn error(&self) -> Option<&str> { - self.error() - } - fn hint(&self) -> Option<&str> { - self.hint() - } - fn should_halt(&self) -> bool { - matches!(self, Self::Stalled { .. }) - } - fn can_transition_from(&self, other: Option<&Self>) -> bool { - Self::can_transition(other, self) - } - fn starting() -> Self { - Self::Starting - } -} - -impl SinkStatus { - fn name(&self) -> &'static str { - match self { - SinkStatus::Starting => "starting", - SinkStatus::Running => "running", - SinkStatus::Stalled { .. } => "stalled", - } - } - - fn error(&self) -> Option<&str> { - match self { - SinkStatus::Stalled { error, .. } => Some(&*error), - SinkStatus::Starting => None, - SinkStatus::Running => None, - } - } - - fn hint(&self) -> Option<&str> { - match self { - SinkStatus::Stalled { error: _, hint } => hint.as_deref(), - SinkStatus::Starting => None, - SinkStatus::Running => None, - } - } - - fn can_transition(old_status: Option<&SinkStatus>, new_status: &SinkStatus) -> bool { - match old_status { - None => true, - // All other states can transition freely to any other state - Some( - old @ SinkStatus::Starting - | old @ SinkStatus::Running - | old @ SinkStatus::Stalled { .. }, - ) => old != new_status, - } - } -} - -impl Display for SinkStatus { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.name()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - fn stalled() -> SinkStatus { - SinkStatus::Stalled { - error: "".into(), - hint: None, - } - } - - #[mz_ore::test] - fn test_can_transition() { - let test_cases = [ - // Allowed transitions - ( - Some(SinkStatus::Starting), - vec![SinkStatus::Running, stalled()], - true, - ), - ( - Some(SinkStatus::Running), - vec![SinkStatus::Starting, stalled()], - true, - ), - ( - Some(stalled()), - vec![SinkStatus::Starting, SinkStatus::Running], - true, - ), - ( - None, - vec![SinkStatus::Starting, SinkStatus::Running, stalled()], - true, - ), - // Forbidden transitions - ( - Some(SinkStatus::Starting), - vec![SinkStatus::Starting], - false, - ), - (Some(SinkStatus::Running), vec![SinkStatus::Running], false), - (Some(stalled()), vec![stalled()], false), - ]; - - for test_case in test_cases { - run_test(test_case) - } - - fn run_test(test_case: (Option, Vec, bool)) { - let (from_status, to_status, allowed) = test_case; - for status in to_status { - assert_eq!( - allowed, - SinkStatus::can_transition(from_status.as_ref(), &status), - "Bad can_transition: {from_status:?} -> {status:?}; expected allowed: {allowed:?}" - ); - } - } - } -} diff --git a/src/storage/src/sink/kafka.rs b/src/storage/src/sink/kafka.rs index 244abdae94fab..dcc4eca3c42fd 100644 --- a/src/storage/src/sink/kafka.rs +++ b/src/storage/src/sink/kafka.rs @@ -59,8 +59,9 @@ use timely::PartialOrder; use tokio::sync::Mutex; use tracing::{debug, error, info, warn}; +use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace}; use crate::render::sinks::SinkRender; -use crate::sink::{KafkaBaseMetrics, SinkStatus}; +use crate::sink::KafkaBaseMetrics; use crate::statistics::{SinkStatisticsMetrics, StorageStatistics}; use crate::storage_state::StorageState; @@ -96,7 +97,7 @@ where // TODO(benesch): errors should stream out through the sink, // if we figure out a protocol for that. _err_collection: Collection, - ) -> (Stream, Option>) + ) -> (Stream, Option>) where G: Scope, { @@ -370,8 +371,8 @@ struct HealthOutputHandle { handle: Mutex< mz_timely_util::builder_async::AsyncOutputHandle< mz_repr::Timestamp, - Vec, - TeeCore>, + Vec, + TeeCore>, >, >, } @@ -891,22 +892,30 @@ impl KafkaSinkState { progress_emitted } - async fn update_status(&self, status: SinkStatus) { + async fn update_status(&self, status: HealthStatusUpdate) { update_status(&self.healthchecker, status).await; } - /// Report a SinkStatus::Stalled and then halt with the same message. + /// Report a stalled HealthStatusUpdate and then halt with the same message. pub async fn halt_on_err(&self, result: Result) -> T { halt_on_err(&self.healthchecker, result).await } } -async fn update_status(healthchecker: &HealthOutputHandle, status: SinkStatus) { +async fn update_status(healthchecker: &HealthOutputHandle, status: HealthStatusUpdate) { healthchecker .handle .lock() .await - .give(&healthchecker.health_cap, status) + .give( + &healthchecker.health_cap, + HealthStatusMessage { + // sinks only have 1 logical object. + index: 0, + namespace: StatusNamespace::Kafka, + update: status, + }, + ) .await; } @@ -935,10 +944,7 @@ async fn halt_on_err(healthchecker: &HealthOutputHandle, result: Result( metrics: KafkaBaseMetrics, sink_statistics: StorageStatistics, connection_context: ConnectionContext, -) -> (Stream, Rc) +) -> (Stream, Rc) where G: Scope, { @@ -1055,7 +1061,7 @@ pub fn produce_to_kafka( metrics: KafkaBaseMetrics, sink_statistics: StorageStatistics, connection_context: ConnectionContext, -) -> (Stream, Rc) +) -> (Stream, Rc) where G: Scope, { @@ -1128,7 +1134,7 @@ where s.maybe_update_progress(&gate); } - s.update_status(SinkStatus::Running).await; + s.update_status(HealthStatusUpdate::running()).await; while let Some(event) = input.next_mut().await { match event { diff --git a/src/storage/src/sink/mod.rs b/src/storage/src/sink/mod.rs index 94d2d3f2eaf52..661d1a40e0eb9 100644 --- a/src/storage/src/sink/mod.rs +++ b/src/storage/src/sink/mod.rs @@ -9,10 +9,8 @@ //! Moving data to external systems -mod healthcheck; mod kafka; pub mod metrics; -pub use healthcheck::SinkStatus; pub(crate) use metrics::KafkaBaseMetrics; pub use metrics::SinkBaseMetrics; diff --git a/src/storage/src/source/generator.rs b/src/storage/src/source/generator.rs index 75e8f8818571c..0812a3994ead8 100644 --- a/src/storage/src/source/generator.rs +++ b/src/storage/src/source/generator.rs @@ -25,7 +25,8 @@ use timely::dataflow::operators::ToStream; use timely::dataflow::{Scope, Stream}; use timely::progress::Antichain; -use crate::source::types::{HealthStatus, HealthStatusUpdate, SourceRender}; +use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace}; +use crate::source::types::SourceRender; use crate::source::{RawSourceCreationConfig, SourceMessage, SourceReaderError}; mod auction; @@ -73,6 +74,8 @@ impl SourceRender for LoadGeneratorSourceConnection { type Value = Row; type Time = MzOffset; + const STATUS_NAMESPACE: StatusNamespace = StatusNamespace::Generator; + fn render>( self, scope: &mut G, @@ -90,7 +93,7 @@ impl SourceRender for LoadGeneratorSourceConnection { Diff, >, Option>, - Stream, + Stream, Rc, ) { let mut builder = AsyncOperatorBuilder::new(config.name.clone(), scope.clone()); @@ -157,7 +160,12 @@ impl SourceRender for LoadGeneratorSourceConnection { } }); - let status = [(0, HealthStatusUpdate::status(HealthStatus::Running))].to_stream(scope); + let status = [HealthStatusMessage { + index: 0, + namespace: Self::STATUS_NAMESPACE.clone(), + update: HealthStatusUpdate::running(), + }] + .to_stream(scope); ( stream.as_collection(), None, diff --git a/src/storage/src/source/kafka.rs b/src/storage/src/source/kafka.rs index cf8995d2abbf0..0fba899b806b9 100644 --- a/src/storage/src/source/kafka.rs +++ b/src/storage/src/source/kafka.rs @@ -52,8 +52,9 @@ use timely::PartialOrder; use tokio::sync::Notify; use tracing::{error, info, trace, warn}; +use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace}; use crate::source::kafka::metrics::KafkaPartitionMetrics; -use crate::source::types::{HealthStatus, HealthStatusUpdate, SourceReaderMetrics, SourceRender}; +use crate::source::types::{SourceReaderMetrics, SourceRender}; use crate::source::{RawSourceCreationConfig, SourceMessage, SourceReaderError}; mod metrics; @@ -93,7 +94,7 @@ pub struct KafkaSourceReader { /// The metadata columns requested by the user metadata_columns: Vec, /// The latest status detected by the metadata refresh thread. - health_status: Arc>>, + health_status: Arc>>, /// Per partition capabilities used to produce messages partition_capabilities: BTreeMap, } @@ -128,6 +129,8 @@ impl SourceRender for KafkaSourceConnection { type Value = Option>; type Time = Partitioned; + const STATUS_NAMESPACE: StatusNamespace = StatusNamespace::Kafka; + fn render>>( self, scope: &mut G, @@ -146,7 +149,7 @@ impl SourceRender for KafkaSourceConnection { Diff, >, Option>, - Stream, + Stream, Rc, ) { let mut builder = AsyncOperatorBuilder::new(config.name.clone(), scope.clone()); @@ -162,7 +165,8 @@ impl SourceRender for KafkaSourceConnection { let mut start_offsets: BTreeMap<_, i64> = self .start_offsets.clone() .into_iter() - .filter(|(pid, _offset)| config.responsible_for(pid)) + .filter(|(pid, _offset)| config.responsible_for(pid) + ) .map(|(k, v)| (k, v)) .collect(); @@ -266,17 +270,21 @@ impl SourceRender for KafkaSourceConnection { let consumer = match consumer { Ok(consumer) => Arc::new(consumer), Err(e) => { - let update = HealthStatusUpdate { - update: HealthStatus::StalledWithError { - error: format!( - "failed creating kafka consumer: {}", - e.display_with_causes() - ), - hint: None, - }, - should_halt: true, - }; - health_output.give(&health_cap, (0, update)).await; + let update = HealthStatusUpdate::halting( + format!( + "failed creating kafka consumer: {}", + e.display_with_causes() + ), + None + ); + health_output.give( + &health_cap, + HealthStatusMessage { + index:0, + namespace: Self::STATUS_NAMESPACE.clone(), + update + } + ).await; // IMPORTANT: wedge forever until the `SuspendAndRestart` is processed. // Returning would incorrectly present to the remap operator as progress to the // empty frontier which would be incorrectly recorded to the remap shard. @@ -349,14 +357,14 @@ impl SourceRender for KafkaSourceConnection { num_workers = config.worker_count, "kafka metadata thread: updated partition metadata info", ); - *status_report.lock().unwrap() = Some(HealthStatus::Running); + *status_report.lock().unwrap() = Some(HealthStatusUpdate::running()); } Err(e) => { *status_report.lock().unwrap() = - Some(HealthStatus::StalledWithError { - error: format!("{}", e.display_with_causes()), - hint: None, - }); + Some(HealthStatusUpdate::stalled( + format!("{}", e.display_with_causes()), + None, + )); } } thread::park_timeout(metadata_refresh_frequency); @@ -484,7 +492,7 @@ impl SourceRender for KafkaSourceConnection { let part_upper_ts = Partitioned::with_partition(pid, MzOffset::from(watermarks.high)); // This is the moment at which we have discovered a new partition - // and we need to make sure we produce its initial snapshot at a + // and we need to make sure we produce its initial snapshot at a, // single timestamp so that the source transitions from no data // from this partition to all the data of this partition. We do // this by initializing the data capability to the starting offset @@ -518,12 +526,18 @@ impl SourceRender for KafkaSourceConnection { "kafka error when polling consumer for source: {} topic: {} : {}", reader.source_name, reader.topic_name, e ); - let status = - HealthStatusUpdate::status(HealthStatus::StalledWithError { - error, - hint: None, - }); - health_output.give(&health_cap, (0, status)).await; + let status = HealthStatusUpdate::stalled( + error, + None, + ); + health_output.give( + &health_cap, + HealthStatusMessage { + index:0, + namespace: Self::STATUS_NAMESPACE.clone(), + update: status, + } + ).await; } Ok(message) => { let (message, ts) = @@ -561,16 +575,20 @@ impl SourceRender for KafkaSourceConnection { .get(&pid) .expect("partition known to be installed"); - let status = HealthStatus::StalledWithError { - error: format!( + let status = HealthStatusUpdate::stalled( + format!( "error consuming from source: {} topic: {topic}: partition:\ {pid} last processed offset: {last_offset} : {err}", config.name ), - hint: None, - }; + None, + ); health_output - .give(&health_cap, (0, HealthStatusUpdate::status(status))) + .give(&health_cap, HealthStatusMessage{ + index: 0, + namespace: Self::STATUS_NAMESPACE.clone(), + update: status, + }) .await; } } @@ -603,7 +621,11 @@ impl SourceRender for KafkaSourceConnection { let status = reader.health_status.lock().unwrap().take(); if let Some(status) = status { health_output - .give(&health_cap, (0, HealthStatusUpdate::status(status))) + .give(&health_cap, HealthStatusMessage{ + index: 0, + namespace: Self::STATUS_NAMESPACE.clone(), + update: status, + }) .await; } diff --git a/src/storage/src/source/postgres.rs b/src/storage/src/source/postgres.rs index 71183e6ad6b1f..10f2e3e1fd172 100644 --- a/src/storage/src/source/postgres.rs +++ b/src/storage/src/source/postgres.rs @@ -103,7 +103,8 @@ use tokio_postgres::error::SqlState; use tokio_postgres::types::PgLsn; use tokio_postgres::{Client, SimpleQueryMessage, SimpleQueryRow}; -use crate::source::types::{HealthStatus, HealthStatusUpdate, SourceRender}; +use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace}; +use crate::source::types::SourceRender; use crate::source::{RawSourceCreationConfig, SourceMessage, SourceReaderError}; mod metrics; @@ -115,6 +116,8 @@ impl SourceRender for PostgresSourceConnection { type Value = Row; type Time = MzOffset; + const STATUS_NAMESPACE: StatusNamespace = StatusNamespace::Postgres; + /// Render the ingestion dataflow. This function only connects things together and contains no /// actual processing logic. fn render>( @@ -127,7 +130,7 @@ impl SourceRender for PostgresSourceConnection { ) -> ( Collection, SourceReaderError>), Diff>, Option>, - Stream, + Stream, Rc, ) { // Determined which collections need to be snapshot and which already have been. @@ -193,24 +196,23 @@ impl SourceRender for PostgresSourceConnection { }); let health = snapshot_err.concat(&repl_err).flat_map(move |err| { - let update = HealthStatus::StalledWithError { - error: err.display_with_causes().to_string(), - hint: None, - }; // This update will cause the dataflow to restart - let halt_status = HealthStatusUpdate { - update: update.clone(), - should_halt: true, - }; - let mut statuses = vec![(0, halt_status)]; + let err_string = err.display_with_causes().to_string(); + let update = HealthStatusUpdate::halting(err_string.clone(), None); + let mut statuses = vec![HealthStatusMessage { + index: 0, + namespace: Self::STATUS_NAMESPACE.clone(), + update, + }]; // But we still want to report the transient error for all subsources statuses.extend(subsource_outputs.iter().map(|index| { - let status = HealthStatusUpdate { - update: update.clone(), - should_halt: false, - }; - (*index, status) + let status = HealthStatusUpdate::stalled(err_string.clone(), None); + HealthStatusMessage { + index: *index, + namespace: Self::STATUS_NAMESPACE.clone(), + update: status, + } })); statuses }); diff --git a/src/storage/src/source/source_reader_pipeline.rs b/src/storage/src/source/source_reader_pipeline.rs index a8b7b8e7e681a..95b8bfefeb8a0 100644 --- a/src/storage/src/source/source_reader_pipeline.rs +++ b/src/storage/src/source/source_reader_pipeline.rs @@ -75,12 +75,11 @@ use timely::PartialOrder; use tokio::sync::mpsc::UnboundedReceiver; use tracing::{info, trace}; -use crate::render::sources::OutputIndex; +use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate}; use crate::source::metrics::SourceBaseMetrics; use crate::source::reclock::{ReclockBatch, ReclockError, ReclockFollower, ReclockOperator}; use crate::source::types::{ - HealthStatus, HealthStatusUpdate, MaybeLength, SourceMessage, SourceMetrics, SourceOutput, - SourceReaderError, SourceRender, + MaybeLength, SourceMessage, SourceMetrics, SourceOutput, SourceReaderError, SourceRender, }; use crate::statistics::{SourceStatisticsMetrics, StorageStatistics}; @@ -179,7 +178,7 @@ pub fn create_raw_source<'g, G: Scope, C>( Collection, SourceOutput, Diff>, Collection, SourceError, Diff>, )>, - Stream, + Stream, Option>, ) where @@ -260,7 +259,7 @@ fn source_render_operator( Diff, >, Stream, - Stream, + Stream, Rc, ) where @@ -304,20 +303,23 @@ where }; for ((output_index, message), _, _) in data.iter() { let status = match message { - Ok(_) => HealthStatusUpdate::status(HealthStatus::Running), - Err(ref error) => HealthStatusUpdate::status(HealthStatus::StalledWithError { - error: error.inner.to_string(), - hint: None, - }), + Ok(_) => HealthStatusUpdate::running(), + Err(ref error) => HealthStatusUpdate::stalled(error.inner.to_string(), None), }; let statuses: &mut Vec<_> = statuses_by_idx.entry(*output_index).or_default(); - let status = (*output_index, status); + let mut status = HealthStatusMessage { + index: *output_index, + namespace: C::STATUS_NAMESPACE.clone(), + update: status, + }; if statuses.last() != Some(&status) { statuses.push(status.clone()); // The global status contains the most recent update of the subsources - statuses.push((0, status.1)); + + status.index = 0; + statuses.push(status); } match message { diff --git a/src/storage/src/source/testscript.rs b/src/storage/src/source/testscript.rs index 8c5c5e9e7a9fa..8819dda32c28a 100644 --- a/src/storage/src/source/testscript.rs +++ b/src/storage/src/source/testscript.rs @@ -21,7 +21,8 @@ use timely::dataflow::operators::ToStream; use timely::dataflow::{Scope, Stream}; use timely::progress::Antichain; -use crate::source::types::{HealthStatus, HealthStatusUpdate, SourceRender}; +use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace}; +use crate::source::types::SourceRender; use crate::source::{RawSourceCreationConfig, SourceMessage, SourceReaderError}; #[derive(serde::Serialize, serde::Deserialize, Clone)] @@ -47,6 +48,8 @@ impl SourceRender for TestScriptSourceConnection { type Value = Option>; type Time = MzOffset; + const STATUS_NAMESPACE: StatusNamespace = StatusNamespace::Generator; + fn render>( self, scope: &mut G, @@ -64,7 +67,7 @@ impl SourceRender for TestScriptSourceConnection { Diff, >, Option>, - Stream, + Stream, Rc, ) { let mut builder = AsyncOperatorBuilder::new(config.name, scope.clone()); @@ -100,7 +103,12 @@ impl SourceRender for TestScriptSourceConnection { futures::future::pending::<()>().await; }); - let status = [(0, HealthStatusUpdate::status(HealthStatus::Running))].to_stream(scope); + let status = [HealthStatusMessage { + index: 0, + namespace: Self::STATUS_NAMESPACE.clone(), + update: HealthStatusUpdate::running(), + }] + .to_stream(scope); ( stream.as_collection(), None, diff --git a/src/storage/src/source/types.rs b/src/storage/src/source/types.rs index 0752ad2c4f3a4..af17c0a20f67d 100644 --- a/src/storage/src/source/types.rs +++ b/src/storage/src/source/types.rs @@ -33,7 +33,7 @@ use serde::{Deserialize, Serialize}; use timely::dataflow::{Scope, Stream}; use timely::progress::Antichain; -use crate::render::sources::OutputIndex; +use crate::healthcheck::{HealthStatusMessage, StatusNamespace}; use crate::source::metrics::{SourceBaseMetrics, UpsertSharedMetrics}; use crate::source::RawSourceCreationConfig; @@ -42,6 +42,7 @@ pub trait SourceRender { type Key: timely::Data + MaybeLength; type Value: timely::Data + MaybeLength; type Time: SourceTimestamp; + const STATUS_NAMESPACE: StatusNamespace; /// Renders the source in the provided timely scope. /// @@ -85,86 +86,11 @@ pub trait SourceRender { Diff, >, Option>, - Stream, + Stream, Rc, ); } -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] -pub struct HealthStatusUpdate { - pub update: HealthStatus, - pub should_halt: bool, -} - -/// NB: we derive Ord here, so the enum order matters. Generally, statuses later in the list -/// take precedence over earlier ones: so if one worker is stalled, we'll consider the entire -/// source to be stalled. -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] -pub enum HealthStatus { - Starting, - Running, - StalledWithError { error: String, hint: Option }, -} - -impl HealthStatus { - pub fn name(&self) -> &'static str { - match self { - HealthStatus::Starting => "starting", - HealthStatus::Running => "running", - HealthStatus::StalledWithError { .. } => "stalled", - } - } - - pub fn error(&self) -> Option<&str> { - match self { - HealthStatus::Starting | HealthStatus::Running => None, - HealthStatus::StalledWithError { error, .. } => Some(error), - } - } - - pub fn hint(&self) -> Option<&str> { - match self { - HealthStatus::Starting | HealthStatus::Running => None, - HealthStatus::StalledWithError { error: _, hint } => hint.as_deref(), - } - } -} - -impl HealthStatusUpdate { - /// Generates a non-halting [`HealthStatusUpdate`] with `update`. - pub(crate) fn status(update: HealthStatus) -> Self { - HealthStatusUpdate { - update, - should_halt: false, - } - } -} - -impl crate::healthcheck::HealthStatus for HealthStatusUpdate { - fn name(&self) -> &'static str { - self.update.name() - } - fn error(&self) -> Option<&str> { - self.update.error() - } - fn hint(&self) -> Option<&str> { - self.update.hint() - } - fn should_halt(&self) -> bool { - self.should_halt - } - fn can_transition_from(&self, other: Option<&Self>) -> bool { - if let Some(other) = other { - self.update != other.update - } else { - true - } - } - fn starting() -> Self { - Self::status(HealthStatus::Starting) - } -} - /// Source-agnostic wrapper for messages. Each source must implement a /// conversion to Message. #[derive(Debug, Clone)] diff --git a/src/storage/src/storage_state.rs b/src/storage/src/storage_state.rs index c6dcfba4a6280..7c27a8f8e1042 100644 --- a/src/storage/src/storage_state.rs +++ b/src/storage/src/storage_state.rs @@ -873,6 +873,7 @@ impl<'w, A: Allocate> Worker<'w, A> { auto_spill_config, delay_sources_past_rehydration, shrink_upsert_unused_buffers_by_ratio, + record_namespaced_errors, } => self.storage_state.dataflow_parameters.update( pg_source_tcp_timeouts, pg_source_snapshot_statement_timeout, @@ -881,6 +882,7 @@ impl<'w, A: Allocate> Worker<'w, A> { storage_dataflow_max_inflight_bytes_config, delay_sources_past_rehydration, shrink_upsert_unused_buffers_by_ratio, + record_namespaced_errors, ), } } @@ -1203,6 +1205,7 @@ impl StorageState { delay_sources_past_rehydration: params.delay_sources_past_rehydration, shrink_upsert_unused_buffers_by_ratio: params .shrink_upsert_unused_buffers_by_ratio, + record_namespaced_errors: params.record_namespaced_errors, }) } } diff --git a/test/cluster/pg-snapshot-resumption/03-ensure-source-down.td b/test/cluster/pg-snapshot-resumption/03-ensure-source-down.td index 039284622539c..f64e046e43cd0 100644 --- a/test/cluster/pg-snapshot-resumption/03-ensure-source-down.td +++ b/test/cluster/pg-snapshot-resumption/03-ensure-source-down.td @@ -15,6 +15,6 @@ mz_sources.name =ANY (ARRAY['mz_source', 't1', 'ten']) AND status = 'stalled'; -t1 "recoverable errors should crash the process during snapshots" -ten "recoverable errors should crash the process during snapshots" -mz_source "recoverable errors should crash the process during snapshots" +t1 "postgres: recoverable errors should crash the process during snapshots" +ten "postgres: recoverable errors should crash the process during snapshots" +mz_source "postgres: recoverable errors should crash the process during snapshots" diff --git a/test/cluster/sink-failure/02-ensure-sink-down.td b/test/cluster/sink-failure/02-ensure-sink-down.td index 6c76873bc8c08..215ca5d8c05bc 100644 --- a/test/cluster/sink-failure/02-ensure-sink-down.td +++ b/test/cluster/sink-failure/02-ensure-sink-down.td @@ -15,4 +15,4 @@ mz_sinks.name = 'snk' AND status = 'stalled'; -snk "synthetic error" +snk "kafka: synthetic error" diff --git a/test/pg-cdc/pg-cdc.td b/test/pg-cdc/pg-cdc.td index bed6cda86427f..db211d3018e02 100644 --- a/test/pg-cdc/pg-cdc.td +++ b/test/pg-cdc/pg-cdc.td @@ -713,8 +713,8 @@ INSERT INTO pk_table VALUES (99999); # should be "materialize.public.pk_table" > SELECT error FROM mz_internal.mz_source_statuses where name in ('enum_source', 'mz_source'); -"publication \"mz_source\" does not exist" -"publication \"mz_source\" does not exist" +"postgres: publication \"mz_source\" does not exist" +"postgres: publication \"mz_source\" does not exist" > DROP SOURCE enum_source CASCADE; > DROP SOURCE "mz_source" CASCADE; diff --git a/test/source-sink-errors/mzcompose.py b/test/source-sink-errors/mzcompose.py index 4cdfea10756ec..4ebeaf127cf92 100644 --- a/test/source-sink-errors/mzcompose.py +++ b/test/source-sink-errors/mzcompose.py @@ -111,7 +111,7 @@ def assert_error(self, c: Composition, error: str, hint: str) -> None: c.testdrive( dedent( f""" - > SELECT bool_or(error ~* '{error}'), bool_or(details::json#>>'{{hint}}' ~* '{hint}') + > SELECT bool_or(error ~* '{error}'), bool_or(details::json#>>'{{hints,0}}' ~* '{hint}') FROM mz_internal.mz_sink_status_history JOIN mz_sinks ON mz_sinks.id = sink_id WHERE name = 'kafka_sink' and status = 'stalled' @@ -207,11 +207,11 @@ def assert_error(self, c: Composition, error: str) -> None: # Sinks generally halt after receiving an error, which means that they may alternate # between `stalled` and `starting`. Instead of relying on the current status, we # check that there is a stalled status with the expected error. - > SELECT bool_or(error ~* '{error}') + > SELECT bool_or(error ~* '{error}'), bool_or(details->'namespaced'->>'kafka' ~* '{error}') FROM mz_internal.mz_sink_status_history JOIN mz_sinks ON mz_sinks.id = sink_id WHERE name = 'sink1' and status = 'stalled' - true + true true """ ) ) diff --git a/test/upsert/mzcompose.py b/test/upsert/mzcompose.py index 87f9bea9307d5..babed05cfdd3e 100644 --- a/test/upsert/mzcompose.py +++ b/test/upsert/mzcompose.py @@ -242,15 +242,15 @@ def workflow_failpoint(c: Composition) -> None: for failpoint in [ ( "fail_merge_snapshot_chunk", - "Failed to rehydrate state: Error merging snapshot values", + "upsert: Failed to rehydrate state: Error merging snapshot values", ), ( "fail_state_multi_put", - "Failed to update records in state: Error putting values into state", + "upsert: Failed to update records in state: Error putting values into state", ), ( "fail_state_multi_get", - "Failed to fetch records from state: Error getting values from state", + "upsert: Failed to fetch records from state: Error getting values from state", ), ]: run_one_failpoint(c, failpoint[0], failpoint[1])