diff --git a/src/compute-client/src/controller/instance.rs b/src/compute-client/src/controller/instance.rs index 9e9b490a58a16..6ed0def246f71 100644 --- a/src/compute-client/src/controller/instance.rs +++ b/src/compute-client/src/controller/instance.rs @@ -918,9 +918,11 @@ where epoch: ClusterStartupEpoch::new(self.envd_epoch, 0), }); + // Send a placeholder instance configuration for the replica task to fill in. let dummy_logging_config = Default::default(); self.send(ComputeCommand::CreateInstance(InstanceConfig { logging: dummy_logging_config, + expiration_offset: None, })); loop { diff --git a/src/compute-client/src/controller/replica.rs b/src/compute-client/src/controller/replica.rs index d3e1494f92bd0..959390960fcd4 100644 --- a/src/compute-client/src/controller/replica.rs +++ b/src/compute-client/src/controller/replica.rs @@ -15,6 +15,7 @@ use std::time::Duration; use anyhow::bail; use mz_build_info::BuildInfo; use mz_cluster_client::client::{ClusterReplicaLocation, ClusterStartupEpoch, TimelyConfig}; +use mz_compute_types::dyncfgs::COMPUTE_REPLICA_EXPIRATION_OFFSET; use mz_dyncfg::ConfigSet; use mz_ore::channel::InstrumentedUnboundedSender; use mz_ore::retry::Retry; @@ -79,6 +80,8 @@ where // the replica. let (command_tx, command_rx) = unbounded_channel(); + let expiration_offset = COMPUTE_REPLICA_EXPIRATION_OFFSET.get(&dyncfg); + let task = mz_ore::task::spawn( || format!("active-replication-replica-{id}"), ReplicaTask { @@ -90,6 +93,7 @@ where epoch, metrics: metrics.clone(), dyncfg, + expiration_offset: (!expiration_offset.is_zero()).then_some(expiration_offset), } .run(), ); @@ -139,6 +143,8 @@ struct ReplicaTask { metrics: ReplicaMetrics, /// Dynamic system configuration. dyncfg: Arc, + /// The offset to use for replica expiration, if any. + expiration_offset: Option, } impl ReplicaTask @@ -255,18 +261,26 @@ where /// Most `ComputeCommand`s are independent of the target replica, but some /// contain replica-specific fields that must be adjusted before sending. fn specialize_command(&self, command: &mut ComputeCommand) { - if let ComputeCommand::CreateInstance(InstanceConfig { logging }) = command { - *logging = self.config.logging.clone(); - } - - if let ComputeCommand::CreateTimely { config, epoch } = command { - *config = TimelyConfig { - workers: self.config.location.workers, - process: 0, - addresses: self.config.location.dataflow_addrs.clone(), - arrangement_exert_proportionality: self.config.arrangement_exert_proportionality, - }; - *epoch = self.epoch; + match command { + ComputeCommand::CreateTimely { config, epoch } => { + *config = TimelyConfig { + workers: self.config.location.workers, + process: 0, + addresses: self.config.location.dataflow_addrs.clone(), + arrangement_exert_proportionality: self + .config + .arrangement_exert_proportionality, + }; + *epoch = self.epoch; + } + ComputeCommand::CreateInstance(InstanceConfig { + logging, + expiration_offset, + }) => { + *logging = self.config.logging.clone(); + *expiration_offset = self.expiration_offset; + } + _ => {} } } diff --git a/src/compute-client/src/protocol/command.proto b/src/compute-client/src/protocol/command.proto index ef700e6428dc4..1be164cbea166 100644 --- a/src/compute-client/src/protocol/command.proto +++ b/src/compute-client/src/protocol/command.proto @@ -50,6 +50,7 @@ message ProtoComputeCommand { message ProtoInstanceConfig { logging.ProtoLoggingConfig logging = 1; + optional mz_proto.ProtoDuration expiration_offset = 2; } message ProtoIndexTarget { diff --git a/src/compute-client/src/protocol/command.rs b/src/compute-client/src/protocol/command.rs index c4edf663a3c6b..c957de351d1d4 100644 --- a/src/compute-client/src/protocol/command.rs +++ b/src/compute-client/src/protocol/command.rs @@ -9,6 +9,8 @@ //! Compute protocol commands. +use std::time::Duration; + use mz_cluster_client::client::{ClusterStartupEpoch, TimelyConfig, TryIntoTimelyConfig}; use mz_compute_types::dataflows::DataflowDescription; use mz_compute_types::plan::flat_plan::FlatPlan; @@ -368,14 +370,17 @@ impl Arbitrary for ComputeCommand { /// for anything in this struct. #[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Arbitrary)] pub struct InstanceConfig { - /// TODO(database-issues#7533): Add documentation. + /// Specification of introspection logging. pub logging: LoggingConfig, + /// The offset relative to the replica startup at which it should expire. None disables feature. + pub expiration_offset: Option, } impl RustType for InstanceConfig { fn into_proto(&self) -> ProtoInstanceConfig { ProtoInstanceConfig { logging: Some(self.logging.into_proto()), + expiration_offset: self.expiration_offset.into_proto(), } } @@ -384,6 +389,7 @@ impl RustType for InstanceConfig { logging: proto .logging .into_rust_if_some("ProtoCreateInstance::logging")?, + expiration_offset: proto.expiration_offset.into_rust()?, }) } } diff --git a/src/compute-types/src/dyncfgs.rs b/src/compute-types/src/dyncfgs.rs index 856dc1eb7657b..5b4835f04508c 100644 --- a/src/compute-types/src/dyncfgs.rs +++ b/src/compute-types/src/dyncfgs.rs @@ -126,7 +126,6 @@ pub const COPY_TO_S3_MULTIPART_PART_SIZE_BYTES: Config = Config::new( /// Used in temporal filters to drop diffs generated at timestamps beyond the expiration time. /// /// Disabled by default. Once set, cannot be disabled again during the lifetime of a replica. -/// When set multiple times, existing replicas only accept strictly decreasing offsets. pub const COMPUTE_REPLICA_EXPIRATION_OFFSET: Config = Config::new( "compute_replica_expiration_offset", Duration::ZERO, diff --git a/src/compute/src/compute_state.rs b/src/compute/src/compute_state.rs index 1d4b1a8ae4f78..51dc2628b9e0d 100644 --- a/src/compute/src/compute_state.rs +++ b/src/compute/src/compute_state.rs @@ -308,52 +308,34 @@ impl ComputeState { // Remember the maintenance interval locally to avoid reading it from the config set on // every server iteration. self.server_maintenance_interval = COMPUTE_SERVER_MAINTENANCE_INTERVAL.get(config); + } - // Set replica expiration. - { - let offset = COMPUTE_REPLICA_EXPIRATION_OFFSET.get(&self.worker_config); - if offset.is_zero() { - if !self.replica_expiration.is_empty() { - warn!( - current_replica_expiration = ?self.replica_expiration, - "replica_expiration: cannot disable once expiration is enabled", - ); - } - } else { - let offset: EpochMillis = offset - .as_millis() - .try_into() - .expect("duration must fit within u64"); - let replica_expiration_millis = self.init_system_time + offset; - let replica_expiration = Timestamp::from(replica_expiration_millis); - - // We only allow updating `replica_expiration` to an earlier time. Allowing it to be - // updated to a later time could be surprising since existing dataflows would still - // panic at their original expiration. - if !self.replica_expiration.less_equal(&replica_expiration) { - info!( - offset = %offset, - replica_expiration_millis = %replica_expiration_millis, - replica_expiration_utc = %mz_ore::now::to_datetime(replica_expiration_millis), - "updating replica_expiration", - ); - self.replica_expiration = Antichain::from_elem(replica_expiration); - } else { - warn!( - new_offset = %offset, - current_replica_expiration_millis = %replica_expiration_millis, - current_replica_expiration_utc = %mz_ore::now::to_datetime(replica_expiration_millis), - "replica_expiration: ignoring new offset as it is greater than current value", - ); - } - } + /// Apply the provided replica expiration `offset` by converting it to a frontier relative to + /// the replica's initialization system time. + /// + /// Only expected to be called once when creating the instance. Guards against calling it + /// multiple times by checking if the local expiration time is set. + pub fn apply_expiration_offset(&mut self, offset: Duration) { + if self.replica_expiration.is_empty() { + let offset: EpochMillis = offset + .as_millis() + .try_into() + .expect("duration must fit within u64"); + let replica_expiration_millis = self.init_system_time + offset; + let replica_expiration = Timestamp::from(replica_expiration_millis); + + info!( + offset = %offset, + replica_expiration_millis = %replica_expiration_millis, + replica_expiration_utc = %mz_ore::now::to_datetime(replica_expiration_millis), + "setting replica expiration", + ); + self.replica_expiration = Antichain::from_elem(replica_expiration); // Record the replica expiration in the metrics. - if let Some(expiration) = self.replica_expiration.as_option() { - self.worker_metrics - .replica_expiration_timestamp_seconds - .set(expiration.into()); - } + self.worker_metrics + .replica_expiration_timestamp_seconds + .set(replica_expiration.into()); } } @@ -435,6 +417,9 @@ impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> { fn handle_create_instance(&mut self, config: InstanceConfig) { // Ensure the state is consistent with the config before we initialize anything. self.compute_state.apply_worker_config(); + if let Some(offset) = config.expiration_offset { + self.compute_state.apply_expiration_offset(offset); + } self.initialize_logging(config.logging); }