Skip to content

Commit

Permalink
Replica expiration applied in controller (#29996)
Browse files Browse the repository at this point in the history
Apply the replica expiration only once per replica. Capture the value in
the replica task and encode it in the create-instance command. We only
send this command once per replica. Reconciliation will cause a replica
restart if the value changes and the controller restarts.

This fixes a bug where we could not apply the configuration localized to
a specific replica.

### Checklist

- [ ] This PR has adequate test coverage / QA involvement has been duly
considered. ([trigger-ci for additional test/nightly
runs](https://trigger-ci.dev.materialize.com/))
- [ ] This PR has an associated up-to-date [design
doc](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/README.md),
is a design doc
([template](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/00000000_template.md)),
or is sufficiently small to not require a design.
  <!-- Reference the design in the description. -->
- [ ] If this PR evolves [an existing `$T ⇔ Proto$T`
mapping](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/command-and-response-binary-encoding.md)
(possibly in a backwards-incompatible way), then it is tagged with a
`T-proto` label.
- [ ] If this PR will require changes to cloud orchestration or tests,
there is a companion cloud PR to account for those changes that is
tagged with the release-blocker label
([example](MaterializeInc/cloud#5021)).
<!-- Ask in #team-cloud on Slack if you need help preparing the cloud
PR. -->
- [ ] If this PR includes major [user-facing behavior
changes](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/guide-changes.md#what-changes-require-a-release-note),
I have pinged the relevant PM to schedule a changelog post.

---------

Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru authored Oct 15, 2024
1 parent aadace9 commit 1d66921
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 57 deletions.
2 changes: 2 additions & 0 deletions src/compute-client/src/controller/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -918,9 +918,11 @@ where
epoch: ClusterStartupEpoch::new(self.envd_epoch, 0),
});

// Send a placeholder instance configuration for the replica task to fill in.
let dummy_logging_config = Default::default();
self.send(ComputeCommand::CreateInstance(InstanceConfig {
logging: dummy_logging_config,
expiration_offset: None,
}));

loop {
Expand Down
38 changes: 26 additions & 12 deletions src/compute-client/src/controller/replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use std::time::Duration;
use anyhow::bail;
use mz_build_info::BuildInfo;
use mz_cluster_client::client::{ClusterReplicaLocation, ClusterStartupEpoch, TimelyConfig};
use mz_compute_types::dyncfgs::COMPUTE_REPLICA_EXPIRATION_OFFSET;
use mz_dyncfg::ConfigSet;
use mz_ore::channel::InstrumentedUnboundedSender;
use mz_ore::retry::Retry;
Expand Down Expand Up @@ -79,6 +80,8 @@ where
// the replica.
let (command_tx, command_rx) = unbounded_channel();

let expiration_offset = COMPUTE_REPLICA_EXPIRATION_OFFSET.get(&dyncfg);

let task = mz_ore::task::spawn(
|| format!("active-replication-replica-{id}"),
ReplicaTask {
Expand All @@ -90,6 +93,7 @@ where
epoch,
metrics: metrics.clone(),
dyncfg,
expiration_offset: (!expiration_offset.is_zero()).then_some(expiration_offset),
}
.run(),
);
Expand Down Expand Up @@ -139,6 +143,8 @@ struct ReplicaTask<T> {
metrics: ReplicaMetrics,
/// Dynamic system configuration.
dyncfg: Arc<ConfigSet>,
/// The offset to use for replica expiration, if any.
expiration_offset: Option<Duration>,
}

impl<T> ReplicaTask<T>
Expand Down Expand Up @@ -255,18 +261,26 @@ where
/// Most `ComputeCommand`s are independent of the target replica, but some
/// contain replica-specific fields that must be adjusted before sending.
fn specialize_command(&self, command: &mut ComputeCommand<T>) {
if let ComputeCommand::CreateInstance(InstanceConfig { logging }) = command {
*logging = self.config.logging.clone();
}

if let ComputeCommand::CreateTimely { config, epoch } = command {
*config = TimelyConfig {
workers: self.config.location.workers,
process: 0,
addresses: self.config.location.dataflow_addrs.clone(),
arrangement_exert_proportionality: self.config.arrangement_exert_proportionality,
};
*epoch = self.epoch;
match command {
ComputeCommand::CreateTimely { config, epoch } => {
*config = TimelyConfig {
workers: self.config.location.workers,
process: 0,
addresses: self.config.location.dataflow_addrs.clone(),
arrangement_exert_proportionality: self
.config
.arrangement_exert_proportionality,
};
*epoch = self.epoch;
}
ComputeCommand::CreateInstance(InstanceConfig {
logging,
expiration_offset,
}) => {
*logging = self.config.logging.clone();
*expiration_offset = self.expiration_offset;
}
_ => {}
}
}

Expand Down
1 change: 1 addition & 0 deletions src/compute-client/src/protocol/command.proto
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ message ProtoComputeCommand {

message ProtoInstanceConfig {
logging.ProtoLoggingConfig logging = 1;
optional mz_proto.ProtoDuration expiration_offset = 2;
}

message ProtoIndexTarget {
Expand Down
8 changes: 7 additions & 1 deletion src/compute-client/src/protocol/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

//! Compute protocol commands.
use std::time::Duration;

use mz_cluster_client::client::{ClusterStartupEpoch, TimelyConfig, TryIntoTimelyConfig};
use mz_compute_types::dataflows::DataflowDescription;
use mz_compute_types::plan::flat_plan::FlatPlan;
Expand Down Expand Up @@ -368,14 +370,17 @@ impl Arbitrary for ComputeCommand<mz_repr::Timestamp> {
/// for anything in this struct.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Arbitrary)]
pub struct InstanceConfig {
/// TODO(database-issues#7533): Add documentation.
/// Specification of introspection logging.
pub logging: LoggingConfig,
/// The offset relative to the replica startup at which it should expire. None disables feature.
pub expiration_offset: Option<Duration>,
}

impl RustType<ProtoInstanceConfig> for InstanceConfig {
fn into_proto(&self) -> ProtoInstanceConfig {
ProtoInstanceConfig {
logging: Some(self.logging.into_proto()),
expiration_offset: self.expiration_offset.into_proto(),
}
}

Expand All @@ -384,6 +389,7 @@ impl RustType<ProtoInstanceConfig> for InstanceConfig {
logging: proto
.logging
.into_rust_if_some("ProtoCreateInstance::logging")?,
expiration_offset: proto.expiration_offset.into_rust()?,
})
}
}
Expand Down
1 change: 0 additions & 1 deletion src/compute-types/src/dyncfgs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ pub const COPY_TO_S3_MULTIPART_PART_SIZE_BYTES: Config<usize> = Config::new(
/// Used in temporal filters to drop diffs generated at timestamps beyond the expiration time.
///
/// Disabled by default. Once set, cannot be disabled again during the lifetime of a replica.
/// When set multiple times, existing replicas only accept strictly decreasing offsets.
pub const COMPUTE_REPLICA_EXPIRATION_OFFSET: Config<Duration> = Config::new(
"compute_replica_expiration_offset",
Duration::ZERO,
Expand Down
71 changes: 28 additions & 43 deletions src/compute/src/compute_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,52 +308,34 @@ impl ComputeState {
// Remember the maintenance interval locally to avoid reading it from the config set on
// every server iteration.
self.server_maintenance_interval = COMPUTE_SERVER_MAINTENANCE_INTERVAL.get(config);
}

// Set replica expiration.
{
let offset = COMPUTE_REPLICA_EXPIRATION_OFFSET.get(&self.worker_config);
if offset.is_zero() {
if !self.replica_expiration.is_empty() {
warn!(
current_replica_expiration = ?self.replica_expiration,
"replica_expiration: cannot disable once expiration is enabled",
);
}
} else {
let offset: EpochMillis = offset
.as_millis()
.try_into()
.expect("duration must fit within u64");
let replica_expiration_millis = self.init_system_time + offset;
let replica_expiration = Timestamp::from(replica_expiration_millis);

// We only allow updating `replica_expiration` to an earlier time. Allowing it to be
// updated to a later time could be surprising since existing dataflows would still
// panic at their original expiration.
if !self.replica_expiration.less_equal(&replica_expiration) {
info!(
offset = %offset,
replica_expiration_millis = %replica_expiration_millis,
replica_expiration_utc = %mz_ore::now::to_datetime(replica_expiration_millis),
"updating replica_expiration",
);
self.replica_expiration = Antichain::from_elem(replica_expiration);
} else {
warn!(
new_offset = %offset,
current_replica_expiration_millis = %replica_expiration_millis,
current_replica_expiration_utc = %mz_ore::now::to_datetime(replica_expiration_millis),
"replica_expiration: ignoring new offset as it is greater than current value",
);
}
}
/// Apply the provided replica expiration `offset` by converting it to a frontier relative to
/// the replica's initialization system time.
///
/// Only expected to be called once when creating the instance. Guards against calling it
/// multiple times by checking if the local expiration time is set.
pub fn apply_expiration_offset(&mut self, offset: Duration) {
if self.replica_expiration.is_empty() {
let offset: EpochMillis = offset
.as_millis()
.try_into()
.expect("duration must fit within u64");
let replica_expiration_millis = self.init_system_time + offset;
let replica_expiration = Timestamp::from(replica_expiration_millis);

info!(
offset = %offset,
replica_expiration_millis = %replica_expiration_millis,
replica_expiration_utc = %mz_ore::now::to_datetime(replica_expiration_millis),
"setting replica expiration",
);
self.replica_expiration = Antichain::from_elem(replica_expiration);

// Record the replica expiration in the metrics.
if let Some(expiration) = self.replica_expiration.as_option() {
self.worker_metrics
.replica_expiration_timestamp_seconds
.set(expiration.into());
}
self.worker_metrics
.replica_expiration_timestamp_seconds
.set(replica_expiration.into());
}
}

Expand Down Expand Up @@ -435,6 +417,9 @@ impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> {
fn handle_create_instance(&mut self, config: InstanceConfig) {
// Ensure the state is consistent with the config before we initialize anything.
self.compute_state.apply_worker_config();
if let Some(offset) = config.expiration_offset {
self.compute_state.apply_expiration_offset(offset);
}

self.initialize_logging(config.logging);
}
Expand Down

0 comments on commit 1d66921

Please sign in to comment.