Skip to content

Commit

Permalink
Merge pull request #30408 from ggevay/refresh_compaction
Browse files Browse the repository at this point in the history
Fix Persist compaction for `REFRESH` MVs
  • Loading branch information
ggevay authored and ParkMyCar committed Nov 11, 2024
1 parent fa08738 commit 5aa1c98
Show file tree
Hide file tree
Showing 18 changed files with 1,586 additions and 74 deletions.
10 changes: 5 additions & 5 deletions src/adapter/src/catalog/transact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use mz_adapter_types::dyncfgs::{
};
use mz_audit_log::{
CreateOrDropClusterReplicaReasonV1, EventDetails, EventType, IdFullNameV1, IdNameV1,
ObjectType, SchedulingDecisionsWithReasonsV1, VersionedEvent, VersionedStorageUsage,
ObjectType, SchedulingDecisionsWithReasonsV2, VersionedEvent, VersionedStorageUsage,
};
use mz_catalog::builtin::BuiltinLog;
use mz_catalog::durable::{NetworkPolicy, Transaction};
Expand Down Expand Up @@ -294,7 +294,7 @@ impl ReplicaCreateDropReason {
self,
) -> (
CreateOrDropClusterReplicaReasonV1,
Option<SchedulingDecisionsWithReasonsV1>,
Option<SchedulingDecisionsWithReasonsV2>,
) {
let (reason, scheduling_policies) = match self {
ReplicaCreateDropReason::Manual => (CreateOrDropClusterReplicaReasonV1::Manual, None),
Expand Down Expand Up @@ -952,8 +952,8 @@ impl Catalog {
}) = &config.location
{
let (reason, scheduling_policies) = reason.into_audit_log();
let details = EventDetails::CreateClusterReplicaV2(
mz_audit_log::CreateClusterReplicaV2 {
let details = EventDetails::CreateClusterReplicaV3(
mz_audit_log::CreateClusterReplicaV3 {
cluster_id: cluster_id.to_string(),
cluster_name: cluster.name.clone(),
replica_id: Some(id.to_string()),
Expand Down Expand Up @@ -1443,7 +1443,7 @@ impl Catalog {

let (reason, scheduling_policies) = reason.into_audit_log();
let details =
EventDetails::DropClusterReplicaV2(mz_audit_log::DropClusterReplicaV2 {
EventDetails::DropClusterReplicaV3(mz_audit_log::DropClusterReplicaV3 {
cluster_id: cluster_id.to_string(),
cluster_name: cluster.name.clone(),
replica_id: Some(replica_id.to_string()),
Expand Down
164 changes: 113 additions & 51 deletions src/adapter/src/coord/cluster_scheduling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@

use crate::coord::{Coordinator, Message};
use itertools::Itertools;
use mz_audit_log::SchedulingDecisionsWithReasonsV1;
use mz_audit_log::SchedulingDecisionsWithReasonsV2;
use mz_catalog::memory::objects::{CatalogItem, ClusterVariant, ClusterVariantManaged};
use mz_controller_types::ClusterId;
use mz_ore::collections::CollectionExt;
use mz_ore::soft_panic_or_log;
use mz_ore::{soft_assert_or_log, soft_panic_or_log};
use mz_repr::adt::interval::Interval;
use mz_repr::GlobalId;
use mz_repr::{GlobalId, TimestampManipulation};
use mz_sql::catalog::CatalogCluster;
use mz_sql::plan::{AlterClusterPlanStrategy, ClusterSchedule};
use std::time::{Duration, Instant};
Expand Down Expand Up @@ -48,36 +48,50 @@ pub struct RefreshDecision {
/// Whether the ON REFRESH policy wants a certain cluster to be On.
cluster_on: bool,
/// Objects that currently need a refresh on the cluster (taking into account the rehydration
/// time estimate).
/// time estimate), and therefore should keep the cluster On.
objects_needing_refresh: Vec<GlobalId>,
/// Objects for which we estimate that they currently need Persist compaction, and therefore
/// should keep the cluster On.
objects_needing_compaction: Vec<GlobalId>,
/// The HYDRATION TIME ESTIMATE setting of the cluster.
hydration_time_estimate: Duration,
}

impl SchedulingDecision {
pub fn reasons_to_audit_log_reasons<'a, I>(reasons: I) -> SchedulingDecisionsWithReasonsV1
pub fn reasons_to_audit_log_reasons<'a, I>(reasons: I) -> SchedulingDecisionsWithReasonsV2
where
I: IntoIterator<Item = &'a SchedulingDecision>,
{
SchedulingDecisionsWithReasonsV1 {
SchedulingDecisionsWithReasonsV2 {
on_refresh: reasons
.into_iter()
.filter_map(|r| match r {
SchedulingDecision::Refresh(RefreshDecision {
cluster_on,
objects_needing_refresh: mvs_needing_refresh,
objects_needing_refresh,
objects_needing_compaction,
hydration_time_estimate,
}) => {
soft_assert_or_log!(
!cluster_on
|| !objects_needing_refresh.is_empty()
|| !objects_needing_compaction.is_empty(),
"`cluster_on = true` should have an explanation"
);
let mut hydration_time_estimate_str = String::new();
mz_repr::strconv::format_interval(
&mut hydration_time_estimate_str,
Interval::from_duration(hydration_time_estimate).expect(
"planning ensured that this is convertible back to Interval",
),
);
Some(mz_audit_log::RefreshDecisionWithReasonV1 {
Some(mz_audit_log::RefreshDecisionWithReasonV2 {
decision: (*cluster_on).into(),
objects_needing_refresh: mvs_needing_refresh
objects_needing_refresh: objects_needing_refresh
.iter()
.map(|id| id.to_string())
.collect(),
objects_needing_compaction: objects_needing_compaction
.iter()
.map(|id| id.to_string())
.collect(),
Expand Down Expand Up @@ -105,8 +119,13 @@ impl Coordinator {
fn check_refresh_policy(&self) {
let start_time = Instant::now();

// Collect the smallest REFRESH MV write frontiers per cluster.
let mut refresh_mv_write_frontiers = Vec::new();
// Collect information about REFRESH MVs:
// - cluster
// - hydration_time_estimate of the cluster
// - MV's id
// - MV's write frontier
// - MV's refresh schedule
let mut refresh_mv_infos = Vec::new();
for cluster in self.catalog().clusters() {
if let ClusterVariant::Managed(ref config) = cluster.config.variant {
match config.schedule {
Expand All @@ -123,23 +142,21 @@ impl Coordinator {
if let CatalogItem::MaterializedView(mv) =
self.catalog().get_entry(id).item()
{
if mv.refresh_schedule.is_some() {
mv.refresh_schedule.clone().map(|refresh_schedule| {
let (_since, write_frontier) = self
.controller
.storage
.collection_frontiers(*id)
.collection_frontiers(*id)
.expect("the storage controller should know about MVs that exist in the catalog");
Some((*id, write_frontier))
} else {
None
}
(*id, write_frontier, refresh_schedule)
})
} else {
None
}
})
.collect_vec();
debug!(%cluster.id, ?refresh_mv_write_frontiers, "check_refresh_policy");
refresh_mv_write_frontiers.push((cluster.id, hydration_time_estimate, mvs));
debug!(%cluster.id, ?refresh_mv_infos, "check_refresh_policy");
refresh_mv_infos.push((cluster.id, hydration_time_estimate, mvs));
}
}
}
Expand All @@ -152,42 +169,87 @@ impl Coordinator {
let internal_cmd_tx = self.internal_cmd_tx.clone();
let check_scheduling_policies_seconds_cloned =
self.metrics.check_scheduling_policies_seconds.clone();
let compaction_estimate = self
.catalog()
.system_config()
.cluster_refresh_mv_compaction_estimate()
.try_into()
.expect("should be configured to a reasonable value");
mz_ore::task::spawn(|| "refresh policy get ts and make decisions", async move {
let task_start_time = Instant::now();
let local_read_ts = ts_oracle.read_ts().await;
debug!(%local_read_ts, ?refresh_mv_write_frontiers, "check_refresh_policy background task");
let decisions = refresh_mv_write_frontiers
debug!(%local_read_ts, ?refresh_mv_infos, "check_refresh_policy background task");
let decisions = refresh_mv_infos
.into_iter()
.map(
|(cluster_id, hydration_time_estimate, refresh_mv_write_frontiers)| {
// We are just checking that
// write_frontier < local_read_ts + hydration_time_estimate
let hydration_estimate = &hydration_time_estimate
.try_into()
.expect("checked during planning");
let local_read_ts_adjusted =
local_read_ts.step_forward_by(hydration_estimate);
let mvs_needing_refresh = refresh_mv_write_frontiers
.into_iter()
.filter_map(|(id, frontier)| {
if frontier.less_than(&local_read_ts_adjusted) {
Some(id)
} else {
None
}
})
.collect_vec();
let cluster_on = !mvs_needing_refresh.is_empty();
(
cluster_id,
SchedulingDecision::Refresh(RefreshDecision {
cluster_on,
objects_needing_refresh: mvs_needing_refresh,
hydration_time_estimate,
}),
)
},
)
.map(|(cluster_id, hydration_time_estimate, refresh_mv_info)| {
// 1. check that
// write_frontier < local_read_ts + hydration_time_estimate
let hydration_estimate = &hydration_time_estimate
.try_into()
.expect("checked during planning");
let local_read_ts_adjusted = local_read_ts.step_forward_by(hydration_estimate);
let mvs_needing_refresh = refresh_mv_info
.iter()
.cloned()
.filter_map(|(id, frontier, _refresh_schedule)| {
if frontier.less_than(&local_read_ts_adjusted) {
Some(id)
} else {
None
}
})
.collect_vec();

// 2. check that
// prev_refresh + compaction_estimate > local_read_ts
let mvs_needing_compaction = refresh_mv_info
.into_iter()
.filter_map(|(id, frontier, refresh_schedule)| {
let frontier = frontier.as_option();
// `prev_refresh` will be None in two cases:
// 1. When there is no previous refresh, because we haven't yet had
// the first refresh. In this case, there is no need to schedule
// time now for compaction.
// 2. In the niche case where a `REFRESH EVERY` MV's write frontier
// is empty. In this case, it's not impossible that there would be a
// need for compaction. But I can't see any easy way to correctly
// handle this case, because we don't have any info handy about when
// the last refresh happened in wall clock time, because the
// frontiers have no relation to wall clock time. So, we'll not
// schedule any compaction time.
// (Note that `REFRESH AT` MVs with empty frontiers, which is a more
// common case, are fine, because `last_refresh` will return
// Some(...) for them.)
let prev_refresh = match frontier {
Some(frontier) => frontier.round_down_minus_1(&refresh_schedule),
None => refresh_schedule.last_refresh(),
};
prev_refresh
.map(|prev_refresh| {
if prev_refresh.step_forward_by(&compaction_estimate)
> local_read_ts
{
Some(id)
} else {
None
}
})
.flatten()
})
.collect_vec();

let cluster_on =
!mvs_needing_refresh.is_empty() || !mvs_needing_compaction.is_empty();
(
cluster_id,
SchedulingDecision::Refresh(RefreshDecision {
cluster_on,
objects_needing_refresh: mvs_needing_refresh,
objects_needing_compaction: mvs_needing_compaction,
hydration_time_estimate,
}),
)
})
.collect();
if let Err(e) = internal_cmd_tx.send(Message::SchedulingDecisions(vec![(
REFRESH_POLICY_NAME,
Expand Down
56 changes: 56 additions & 0 deletions src/audit-log/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,11 @@ pub enum EventDetails {
#[serde(rename = "CreateComputeReplicaV1")] // historical name
CreateClusterReplicaV1(CreateClusterReplicaV1),
CreateClusterReplicaV2(CreateClusterReplicaV2),
CreateClusterReplicaV3(CreateClusterReplicaV3),
#[serde(rename = "DropComputeReplicaV1")] // historical name
DropClusterReplicaV1(DropClusterReplicaV1),
DropClusterReplicaV2(DropClusterReplicaV2),
DropClusterReplicaV3(DropClusterReplicaV3),
CreateSourceSinkV1(CreateSourceSinkV1),
CreateSourceSinkV2(CreateSourceSinkV2),
CreateSourceSinkV3(CreateSourceSinkV3),
Expand Down Expand Up @@ -267,6 +269,17 @@ pub struct DropClusterReplicaV2 {
pub scheduling_policies: Option<SchedulingDecisionsWithReasonsV1>,
}

#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
pub struct DropClusterReplicaV3 {
pub cluster_id: String,
pub cluster_name: String,
pub replica_id: Option<String>,
pub replica_name: String,
pub reason: CreateOrDropClusterReplicaReasonV1,
#[serde(skip_serializing_if = "Option::is_none")]
pub scheduling_policies: Option<SchedulingDecisionsWithReasonsV2>,
}

#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
pub struct CreateClusterReplicaV1 {
pub cluster_id: String,
Expand Down Expand Up @@ -296,6 +309,21 @@ pub struct CreateClusterReplicaV2 {
pub scheduling_policies: Option<SchedulingDecisionsWithReasonsV1>,
}

#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
pub struct CreateClusterReplicaV3 {
pub cluster_id: String,
pub cluster_name: String,
pub replica_id: Option<String>,
pub replica_name: String,
pub logical_size: String,
pub disk: bool,
pub billed_as: Option<String>,
pub internal: bool,
pub reason: CreateOrDropClusterReplicaReasonV1,
#[serde(skip_serializing_if = "Option::is_none")]
pub scheduling_policies: Option<SchedulingDecisionsWithReasonsV2>,
}

#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
#[serde(rename_all = "kebab-case")]
pub enum CreateOrDropClusterReplicaReasonV1 {
Expand All @@ -313,6 +341,15 @@ pub struct SchedulingDecisionsWithReasonsV1 {
pub on_refresh: RefreshDecisionWithReasonV1,
}

/// The reason for the automated cluster scheduling to turn a cluster On or Off. Each existing
/// policy's On/Off opinion should be recorded, along with their reasons. (Among the reasons there
/// can be settings of the policy as well as other information about the state of the system.)
#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
pub struct SchedulingDecisionsWithReasonsV2 {
/// The reason for the refresh policy for wanting to turn a cluster On or Off.
pub on_refresh: RefreshDecisionWithReasonV2,
}

#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
pub struct RefreshDecisionWithReasonV1 {
pub decision: SchedulingDecisionV1,
Expand All @@ -323,6 +360,19 @@ pub struct RefreshDecisionWithReasonV1 {
pub hydration_time_estimate: String,
}

#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
pub struct RefreshDecisionWithReasonV2 {
pub decision: SchedulingDecisionV1,
/// Objects that currently need a refresh on the cluster (taking into account the rehydration
/// time estimate), and therefore should keep the cluster On.
pub objects_needing_refresh: Vec<String>,
/// Objects for which we estimate that they currently need Persist compaction, and therefore
/// should keep the cluster On.
pub objects_needing_compaction: Vec<String>,
/// The HYDRATION TIME ESTIMATE setting of the cluster.
pub hydration_time_estimate: String,
}

#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
#[serde(rename_all = "kebab-case")]
pub enum SchedulingDecisionV1 {
Expand Down Expand Up @@ -520,12 +570,18 @@ impl EventDetails {
EventDetails::CreateClusterReplicaV2(v) => {
serde_json::to_value(v).expect("must serialize")
}
EventDetails::CreateClusterReplicaV3(v) => {
serde_json::to_value(v).expect("must serialize")
}
EventDetails::DropClusterReplicaV1(v) => {
serde_json::to_value(v).expect("must serialize")
}
EventDetails::DropClusterReplicaV2(v) => {
serde_json::to_value(v).expect("must serialize")
}
EventDetails::DropClusterReplicaV3(v) => {
serde_json::to_value(v).expect("must serialize")
}
EventDetails::IdFullNameV1(v) => serde_json::to_value(v).expect("must serialize"),
EventDetails::RenameClusterV1(v) => serde_json::to_value(v).expect("must serialize"),
EventDetails::RenameClusterReplicaV1(v) => {
Expand Down
Loading

0 comments on commit 5aa1c98

Please sign in to comment.