Skip to content

Commit

Permalink
Add REFRESH options for MVs -- Compute
Browse files Browse the repository at this point in the history
  • Loading branch information
ggevay committed Jan 4, 2024
1 parent 3a23184 commit a8fc4b0
Show file tree
Hide file tree
Showing 28 changed files with 1,195 additions and 84 deletions.
35 changes: 33 additions & 2 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ use crate::webhook::{WebhookAppenderInvalidator, WebhookConcurrencyLimiter};
use crate::{flags, AdapterNotice, TimestampProvider};
use mz_catalog::builtin::BUILTINS;
use mz_catalog::durable::DurableCatalogState;
use mz_expr::refresh_schedule::RefreshSchedule;

use self::statement_logging::{StatementLogging, StatementLoggingId};

Expand Down Expand Up @@ -1558,9 +1559,23 @@ impl Coordinator {
.clone();

// Timestamp selection
let as_of = self.bootstrap_materialized_view_as_of(&df_desc, mview.cluster_id);
let as_of = self.bootstrap_materialized_view_as_of(
&df_desc,
mview.cluster_id,
&mview.refresh_schedule,
);
df_desc.set_as_of(as_of);

// If we have a refresh schedule that has a last refresh, then set the `until` to the last refresh.
let until = mview
.refresh_schedule
.as_ref()
.and_then(|s| s.last_refresh())
.and_then(|r| r.try_step_forward());
if let Some(until) = until {
df_desc.until.meet_assign(&Antichain::from_elem(until));
}

let df_meta = self
.catalog()
.try_get_dataflow_metainfo(&entry.id())
Expand Down Expand Up @@ -1932,6 +1947,7 @@ impl Coordinator {
internal_view_id,
mv.desc.iter_names().cloned().collect(),
mv.non_null_assertions.clone(),
mv.refresh_schedule.clone(),
debug_name,
optimizer_config.clone(),
);
Expand Down Expand Up @@ -2154,6 +2170,7 @@ impl Coordinator {
&self,
dataflow: &DataflowDescription<Plan>,
cluster_id: ComputeInstanceId,
refresh_schedule: &Option<RefreshSchedule>,
) -> Antichain<Timestamp> {
// All inputs must be readable at the chosen `as_of`, so it must be at least the join of
// the `since`s of all dependencies.
Expand All @@ -2172,12 +2189,26 @@ impl Coordinator {
let write_frontier = self.storage_write_frontier(*sink_id);

// Things go wrong if we try to create a dataflow with `as_of = []`, so avoid that.
let as_of = if write_frontier.is_empty() {
let mut as_of = if write_frontier.is_empty() {
min_as_of.clone()
} else {
min_as_of.join(write_frontier)
};

// If we have a RefreshSchedule, then round up the `as_of` to the next refresh.
// Note that in many cases the `as_of` would already be at this refresh, because the `write_frontier` will be
// usually there. However, it can happen that we restart after the MV was created in the catalog but before
// its upper was initialized in persist.
if let Some(refresh_schedule) = &refresh_schedule {
if let Some(rounded_up_ts) =
refresh_schedule.round_up_timestamp(*as_of.as_option().expect("as_of is non-empty"))
{
as_of = Antichain::from_elem(rounded_up_ts);
} else {
// We are past the last refresh. Let's not move the as_of.
}
}

tracing::info!(
export_ids = %dataflow.display_export_ids(),
%cluster_id,
Expand Down
145 changes: 97 additions & 48 deletions src/adapter/src/coord/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use mz_compute_client::protocol::response::PeekResponse;
use mz_ore::task;
use mz_ore::tracing::OpenTelemetryContext;
use mz_repr::role_id::RoleId;
use mz_repr::Timestamp;
use mz_sql::ast::{
CopyRelation, CopyStatement, InsertSource, Query, Raw, SetExpr, Statement, SubscribeStatement,
};
Expand All @@ -39,7 +40,7 @@ use mz_sql::session::user::User;
use mz_sql::session::vars::{
EndTransactionAction, OwnedVarInput, Value, Var, STATEMENT_LOGGING_SAMPLE_RATE,
};
use mz_sql_parser::ast::CreateMaterializedViewStatement;
use mz_sql_parser::ast::{CreateMaterializedViewStatement, ExplainPlanStatement, Explainee};
use mz_storage_types::sources::Timeline;
use opentelemetry::trace::TraceContextExt;
use tokio::sync::{mpsc, oneshot, watch};
Expand Down Expand Up @@ -691,54 +692,12 @@ impl Coordinator {
))),

Statement::CreateMaterializedView(mut cmvs) => {
// (This won't be the same timestamp as the system table inserts, unfortunately.)
let mz_now = if cmvs
.with_options
.iter()
.any(materialized_view_option_contains_temporal)
let mz_now = match self
.resolve_mz_now_for_create_materialized_view(&cmvs, &resolved_ids)
.await
{
let timeline_context =
match self.validate_timeline_context(resolved_ids.0.clone()) {
Ok(tc) => tc,
Err(e) => return ctx.retire(Err(e)),
};

// We default to EpochMilliseconds, similarly to `determine_timestamp_for`,
// but even in the TimestampIndependent case.
// Note that we didn't accurately decide whether we are TimestampDependent
// or TimestampIndependent, because for this we'd need to also check whether
// `query.contains_temporal()`, similarly to how `peek_stage_validate` does.
// However, this doesn't matter here, as we are just going to default to
// EpochMilliseconds in both cases.
let timeline = timeline_context
.timeline()
.unwrap_or(&Timeline::EpochMilliseconds);
Some(self.get_timestamp_oracle(timeline).read_ts().await)
// TODO: It might be good to take into account `least_valid_read` in addition to
// the oracle's `read_ts`, but there are two problems:
// 1. At this point, we don't know which indexes would be used. We could do an
// overestimation here by grabbing the ids of all indexes that are on ids
// involved in the query. (We'd have to recursively follow view definitions,
// similarly to `validate_timeline_context`.)
// 2. For a peek, when the `least_valid_read` is later than the oracle's
// `read_ts`, then the peek doesn't return before it completes at the chosen
// timestamp. However, for a CRATE MATERIALIZED VIEW statement, it's not clear
// whether we want to make it block until the chosen time. If it doesn't block,
// then the initial refresh wouldn't be linearized with the CREATE MATERIALIZED
// VIEW statement.
//
// Note: The Adapter is usually keeping a read hold of all objects at the oracle
// read timestamp, so `least_valid_read` usually won't actually be later than
// the oracle's `read_ts`. (see `Coordinator::advance_timelines`)
//
// Note 2: If we choose a timestamp here that is earlier than
// `least_valid_read`, that is somewhat bad, but not catastrophic: The only
// bad thing that happens is that we won't perform that refresh that was
// specified to be at `mz_now()` (which is usually the initial refresh)
// (similarly to how we don't perform refreshes that were specified to be in the
// past).
} else {
None
Ok(mz_now) => mz_now,
Err(e) => return ctx.retire(Err(e)),
};

let owned_catalog = self.owned_catalog();
Expand Down Expand Up @@ -769,6 +728,44 @@ impl Coordinator {
}
}

Statement::ExplainPlan(ExplainPlanStatement {
stage,
config_flags,
format,
explainee: Explainee::CreateMaterializedView(box_cmvs, broken),
}) => {
let mut cmvs = *box_cmvs;
let mz_now = match self
.resolve_mz_now_for_create_materialized_view(&cmvs, &resolved_ids)
.await
{
Ok(mz_now) => mz_now,
Err(e) => return ctx.retire(Err(e)),
};

let owned_catalog = self.owned_catalog();
let catalog = owned_catalog.for_session(ctx.session());

purify_create_materialized_view_options(
catalog,
mz_now,
&mut cmvs,
&mut resolved_ids,
);

let purified_stmt = Statement::ExplainPlan(ExplainPlanStatement {
stage,
config_flags,
format,
explainee: Explainee::CreateMaterializedView(Box::new(cmvs), broken),
});

match self.plan_statement(ctx.session(), purified_stmt, &params, &resolved_ids) {
Ok(plan) => self.sequence_plan(ctx, plan, resolved_ids).await,
Err(e) => ctx.retire(Err(e)),
}
}

// All other statements are handled immediately.
_ => match self.plan_statement(ctx.session(), stmt, &params, &resolved_ids) {
Ok(plan) => self.sequence_plan(ctx, plan, resolved_ids).await,
Expand All @@ -777,6 +774,58 @@ impl Coordinator {
}
}

async fn resolve_mz_now_for_create_materialized_view(
&self,
cmvs: &CreateMaterializedViewStatement<Aug>,
resolved_ids: &ResolvedIds,
) -> Result<Option<Timestamp>, AdapterError> {
// (This won't be the same timestamp as the system table inserts, unfortunately.)
if cmvs
.with_options
.iter()
.any(materialized_view_option_contains_temporal)
{
let timeline_context = self.validate_timeline_context(resolved_ids.0.clone())?;

// We default to EpochMilliseconds, similarly to `determine_timestamp_for`,
// but even in the TimestampIndependent case.
// Note that we didn't accurately decide whether we are TimestampDependent
// or TimestampIndependent, because for this we'd need to also check whether
// `query.contains_temporal()`, similarly to how `peek_stage_validate` does.
// However, this doesn't matter here, as we are just going to default to
// EpochMilliseconds in both cases.
let timeline = timeline_context
.timeline()
.unwrap_or(&Timeline::EpochMilliseconds);
Ok(Some(self.get_timestamp_oracle(timeline).read_ts().await))
// TODO: It might be good to take into account `least_valid_read` in addition to
// the oracle's `read_ts`, but there are two problems:
// 1. At this point, we don't know which indexes would be used. We could do an
// overestimation here by grabbing the ids of all indexes that are on ids
// involved in the query. (We'd have to recursively follow view definitions,
// similarly to `validate_timeline_context`.)
// 2. For a peek, when the `least_valid_read` is later than the oracle's
// `read_ts`, then the peek doesn't return before it completes at the chosen
// timestamp. However, for a CRATE MATERIALIZED VIEW statement, it's not clear
// whether we want to make it block until the chosen time. If it doesn't block,
// then the initial refresh wouldn't be linearized with the CREATE MATERIALIZED
// VIEW statement.
//
// Note: The Adapter is usually keeping a read hold of all objects at the oracle
// read timestamp, so `least_valid_read` usually won't actually be later than
// the oracle's `read_ts`. (see `Coordinator::advance_timelines`)
//
// Note 2: If we choose a timestamp here that is earlier than
// `least_valid_read`, that is somewhat bad, but not catastrophic: The only
// bad thing that happens is that we won't perform that refresh that was
// specified to be at `mz_now()` (which is usually the initial refresh)
// (similarly to how we don't perform refreshes that were specified to be in the
// past).
} else {
Ok(None)
}
}

/// Instruct the dataflow layer to cancel any ongoing, interactive work for
/// the named `conn_id` if the correct secret key is specified.
///
Expand Down
3 changes: 2 additions & 1 deletion src/adapter/src/coord/sequencer/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3127,7 +3127,7 @@ impl Coordinator {
target_cluster_id: ClusterId,
broken: bool,
non_null_assertions: Vec<usize>,
_refresh_schedule: Option<RefreshSchedule>,
refresh_schedule: Option<RefreshSchedule>,
explain_config: &mz_repr::explain::ExplainConfig,
_root_dispatch: tracing::Dispatch,
) -> Result<
Expand Down Expand Up @@ -3168,6 +3168,7 @@ impl Coordinator {
internal_view_id,
column_names.clone(),
non_null_assertions,
refresh_schedule,
debug_name,
optimizer_config,
);
Expand Down
59 changes: 44 additions & 15 deletions src/adapter/src/coord/sequencer/inner/create_materialized_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,17 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use differential_dataflow::lattice::Lattice;
use mz_adapter_types::compaction::CompactionWindow;
use mz_catalog::memory::objects::{CatalogItem, MaterializedView};
use mz_expr::CollectionPlan;
use mz_ore::soft_panic_or_log;
use mz_ore::tracing::OpenTelemetryContext;
use mz_sql::catalog::CatalogError;
use mz_sql::names::{ObjectId, ResolvedIds};
use mz_sql::plan;
use mz_storage_client::controller::{CollectionDescription, DataSource, DataSourceOther};
use timely::progress::Antichain;

use crate::command::ExecuteResponse;
use crate::coord::sequencer::inner::return_if_err;
Expand Down Expand Up @@ -97,21 +100,12 @@ impl Coordinator {
let plan::CreateMaterializedViewPlan {
materialized_view:
plan::MaterializedView {
expr,
cluster_id,
refresh_schedule,
..
expr, cluster_id, ..
},
ambiguous_columns,
..
} = &plan;

if refresh_schedule.is_some() {
return Err(AdapterError::Unsupported(
"REFRESH options other than ON COMMIT",
));
}

// Validate any references in the materialized view's expression. We do
// this on the unoptimized plan to better reflect what the user typed.
// We want to reject queries that depend on log sources, for example,
Expand Down Expand Up @@ -167,6 +161,7 @@ impl Coordinator {
column_names,
cluster_id,
non_null_assertions,
refresh_schedule,
..
},
..
Expand All @@ -193,6 +188,7 @@ impl Coordinator {
internal_view_id,
column_names.clone(),
non_null_assertions.clone(),
refresh_schedule.clone(),
debug_name,
optimizer_config,
);
Expand Down Expand Up @@ -285,6 +281,38 @@ impl Coordinator {
)
.collect::<Vec<_>>();

// Timestamp selection
let as_of = {
// Normally, `as_of` should be the least_valid_read.
let id_bundle = dataflow_import_id_bundle(global_lir_plan.df_desc(), cluster_id);
let mut as_of = self.least_valid_read(&id_bundle);
// But for MVs with non-trivial REFRESH schedules, it's important to set the
// `as_of` to the first refresh. This is because we'd like queries on the MV to
// block until the first refresh (rather than to show an empty MV).
if let Some(refresh_schedule) = &refresh_schedule {
if let Some(as_of_ts) = as_of.as_option() {
let Some(rounded_up_ts) = refresh_schedule.round_up_timestamp(*as_of_ts) else {
return Err(AdapterError::MaterializedViewWouldNeverRefresh(
refresh_schedule.last_refresh().expect("if round_up_timestamp returned None, then there should be a last refresh"),
*as_of_ts
));
};
as_of = Antichain::from_elem(rounded_up_ts);
} else {
// The `as_of` should never be empty, because then the MV would be unreadable.
soft_panic_or_log!("creating a materialized view with an empty `as_of`");
}
}
as_of
};

// If we have a refresh schedule that has a last refresh, then set the `until` to the last refresh.
// (If the `try_step_forward` fails, then no need to set an `until`, because it's not possible to get any data
// beyond that last refresh time, because there are no times beyond that time.)
let until = refresh_schedule
.and_then(|s| s.last_refresh())
.and_then(|r| r.try_step_forward());

let transact_result = self
.catalog_transact_with_side_effects(Some(ctx.session()), ops, |coord| async {
// Save plan structures.
Expand All @@ -298,10 +326,11 @@ impl Coordinator {
let output_desc = global_lir_plan.desc().clone();
let (mut df_desc, df_meta) = global_lir_plan.unapply();

// Timestamp selection
let id_bundle = dataflow_import_id_bundle(&df_desc, cluster_id);
let since = coord.least_valid_read(&id_bundle);
df_desc.set_as_of(since.clone());
df_desc.set_as_of(as_of.clone());

if let Some(until) = until {
df_desc.until.meet_assign(&Antichain::from_elem(until));
}

// Emit notices.
coord.emit_optimizer_notices(ctx.session(), &df_meta.optimizer_notices);
Expand All @@ -323,7 +352,7 @@ impl Coordinator {
CollectionDescription {
desc: output_desc,
data_source: DataSource::Other(DataSourceOther::Compute),
since: Some(since),
since: Some(as_of),
status_collection_id: None,
},
)],
Expand Down
Loading

0 comments on commit a8fc4b0

Please sign in to comment.