Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

REFRESH options for MVs -- Compute #23819

Merged
merged 1 commit into from
Jan 4, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 33 additions & 2 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
@@ -161,6 +161,7 @@ use crate::webhook::{WebhookAppenderInvalidator, WebhookConcurrencyLimiter};
use crate::{flags, AdapterNotice, TimestampProvider};
use mz_catalog::builtin::BUILTINS;
use mz_catalog::durable::DurableCatalogState;
use mz_expr::refresh_schedule::RefreshSchedule;

use self::statement_logging::{StatementLogging, StatementLoggingId};

@@ -1558,9 +1559,23 @@ impl Coordinator {
.clone();

// Timestamp selection
let as_of = self.bootstrap_materialized_view_as_of(&df_desc, mview.cluster_id);
let as_of = self.bootstrap_materialized_view_as_of(
&df_desc,
mview.cluster_id,
&mview.refresh_schedule,
);
df_desc.set_as_of(as_of);

// If we have a refresh schedule that has a last refresh, then set the `until` to the last refresh.
let until = mview
.refresh_schedule
.as_ref()
.and_then(|s| s.last_refresh())
.and_then(|r| r.try_step_forward());
if let Some(until) = until {
df_desc.until.meet_assign(&Antichain::from_elem(until));
}

let df_meta = self
.catalog()
.try_get_dataflow_metainfo(&entry.id())
@@ -1932,6 +1947,7 @@ impl Coordinator {
internal_view_id,
mv.desc.iter_names().cloned().collect(),
mv.non_null_assertions.clone(),
mv.refresh_schedule.clone(),
debug_name,
optimizer_config.clone(),
);
@@ -2154,6 +2170,7 @@ impl Coordinator {
&self,
dataflow: &DataflowDescription<Plan>,
cluster_id: ComputeInstanceId,
refresh_schedule: &Option<RefreshSchedule>,
) -> Antichain<Timestamp> {
// All inputs must be readable at the chosen `as_of`, so it must be at least the join of
// the `since`s of all dependencies.
@@ -2172,12 +2189,26 @@ impl Coordinator {
let write_frontier = self.storage_write_frontier(*sink_id);

// Things go wrong if we try to create a dataflow with `as_of = []`, so avoid that.
let as_of = if write_frontier.is_empty() {
let mut as_of = if write_frontier.is_empty() {
min_as_of.clone()
} else {
min_as_of.join(write_frontier)
};

// If we have a RefreshSchedule, then round up the `as_of` to the next refresh.
// Note that in many cases the `as_of` would already be at this refresh, because the `write_frontier` will be
// usually there. However, it can happen that we restart after the MV was created in the catalog but before
// its upper was initialized in persist.
if let Some(refresh_schedule) = &refresh_schedule {
if let Some(rounded_up_ts) =
refresh_schedule.round_up_timestamp(*as_of.as_option().expect("as_of is non-empty"))
{
as_of = Antichain::from_elem(rounded_up_ts);
} else {
// We are past the last refresh. Let's not move the as_of.
}
}

tracing::info!(
export_ids = %dataflow.display_export_ids(),
%cluster_id,
145 changes: 97 additions & 48 deletions src/adapter/src/coord/command_handler.rs
Original file line number Diff line number Diff line change
@@ -22,6 +22,7 @@ use mz_compute_client::protocol::response::PeekResponse;
use mz_ore::task;
use mz_ore::tracing::OpenTelemetryContext;
use mz_repr::role_id::RoleId;
use mz_repr::Timestamp;
use mz_sql::ast::{
CopyRelation, CopyStatement, InsertSource, Query, Raw, SetExpr, Statement, SubscribeStatement,
};
@@ -39,7 +40,7 @@ use mz_sql::session::user::User;
use mz_sql::session::vars::{
EndTransactionAction, OwnedVarInput, Value, Var, STATEMENT_LOGGING_SAMPLE_RATE,
};
use mz_sql_parser::ast::CreateMaterializedViewStatement;
use mz_sql_parser::ast::{CreateMaterializedViewStatement, ExplainPlanStatement, Explainee};
use mz_storage_types::sources::Timeline;
use opentelemetry::trace::TraceContextExt;
use tokio::sync::{mpsc, oneshot, watch};
@@ -691,54 +692,12 @@ impl Coordinator {
))),

Statement::CreateMaterializedView(mut cmvs) => {
// (This won't be the same timestamp as the system table inserts, unfortunately.)
let mz_now = if cmvs
.with_options
.iter()
.any(materialized_view_option_contains_temporal)
let mz_now = match self
.resolve_mz_now_for_create_materialized_view(&cmvs, &resolved_ids)
.await
{
let timeline_context =
match self.validate_timeline_context(resolved_ids.0.clone()) {
Ok(tc) => tc,
Err(e) => return ctx.retire(Err(e)),
};

// We default to EpochMilliseconds, similarly to `determine_timestamp_for`,
// but even in the TimestampIndependent case.
// Note that we didn't accurately decide whether we are TimestampDependent
// or TimestampIndependent, because for this we'd need to also check whether
// `query.contains_temporal()`, similarly to how `peek_stage_validate` does.
// However, this doesn't matter here, as we are just going to default to
// EpochMilliseconds in both cases.
let timeline = timeline_context
.timeline()
.unwrap_or(&Timeline::EpochMilliseconds);
Some(self.get_timestamp_oracle(timeline).read_ts().await)
// TODO: It might be good to take into account `least_valid_read` in addition to
// the oracle's `read_ts`, but there are two problems:
// 1. At this point, we don't know which indexes would be used. We could do an
// overestimation here by grabbing the ids of all indexes that are on ids
// involved in the query. (We'd have to recursively follow view definitions,
// similarly to `validate_timeline_context`.)
// 2. For a peek, when the `least_valid_read` is later than the oracle's
// `read_ts`, then the peek doesn't return before it completes at the chosen
// timestamp. However, for a CRATE MATERIALIZED VIEW statement, it's not clear
// whether we want to make it block until the chosen time. If it doesn't block,
// then the initial refresh wouldn't be linearized with the CREATE MATERIALIZED
// VIEW statement.
//
// Note: The Adapter is usually keeping a read hold of all objects at the oracle
// read timestamp, so `least_valid_read` usually won't actually be later than
// the oracle's `read_ts`. (see `Coordinator::advance_timelines`)
//
// Note 2: If we choose a timestamp here that is earlier than
// `least_valid_read`, that is somewhat bad, but not catastrophic: The only
// bad thing that happens is that we won't perform that refresh that was
// specified to be at `mz_now()` (which is usually the initial refresh)
// (similarly to how we don't perform refreshes that were specified to be in the
// past).
} else {
None
Ok(mz_now) => mz_now,
Err(e) => return ctx.retire(Err(e)),
};

let owned_catalog = self.owned_catalog();
@@ -769,6 +728,44 @@ impl Coordinator {
}
}

Statement::ExplainPlan(ExplainPlanStatement {
stage,
config_flags,
format,
explainee: Explainee::CreateMaterializedView(box_cmvs, broken),
}) => {
let mut cmvs = *box_cmvs;
let mz_now = match self
.resolve_mz_now_for_create_materialized_view(&cmvs, &resolved_ids)
.await
{
Ok(mz_now) => mz_now,
Err(e) => return ctx.retire(Err(e)),
};

let owned_catalog = self.owned_catalog();
let catalog = owned_catalog.for_session(ctx.session());

purify_create_materialized_view_options(
catalog,
mz_now,
&mut cmvs,
&mut resolved_ids,
);

let purified_stmt = Statement::ExplainPlan(ExplainPlanStatement {
stage,
config_flags,
format,
explainee: Explainee::CreateMaterializedView(Box::new(cmvs), broken),
});

match self.plan_statement(ctx.session(), purified_stmt, &params, &resolved_ids) {
Ok(plan) => self.sequence_plan(ctx, plan, resolved_ids).await,
Err(e) => ctx.retire(Err(e)),
}
}

// All other statements are handled immediately.
_ => match self.plan_statement(ctx.session(), stmt, &params, &resolved_ids) {
Ok(plan) => self.sequence_plan(ctx, plan, resolved_ids).await,
@@ -777,6 +774,58 @@ impl Coordinator {
}
}

async fn resolve_mz_now_for_create_materialized_view(
&self,
cmvs: &CreateMaterializedViewStatement<Aug>,
resolved_ids: &ResolvedIds,
) -> Result<Option<Timestamp>, AdapterError> {
// (This won't be the same timestamp as the system table inserts, unfortunately.)
if cmvs
.with_options
.iter()
.any(materialized_view_option_contains_temporal)
{
let timeline_context = self.validate_timeline_context(resolved_ids.0.clone())?;

// We default to EpochMilliseconds, similarly to `determine_timestamp_for`,
// but even in the TimestampIndependent case.
// Note that we didn't accurately decide whether we are TimestampDependent
// or TimestampIndependent, because for this we'd need to also check whether
// `query.contains_temporal()`, similarly to how `peek_stage_validate` does.
// However, this doesn't matter here, as we are just going to default to
// EpochMilliseconds in both cases.
let timeline = timeline_context
.timeline()
.unwrap_or(&Timeline::EpochMilliseconds);
Ok(Some(self.get_timestamp_oracle(timeline).read_ts().await))
// TODO: It might be good to take into account `least_valid_read` in addition to
// the oracle's `read_ts`, but there are two problems:
// 1. At this point, we don't know which indexes would be used. We could do an
// overestimation here by grabbing the ids of all indexes that are on ids
// involved in the query. (We'd have to recursively follow view definitions,
// similarly to `validate_timeline_context`.)
// 2. For a peek, when the `least_valid_read` is later than the oracle's
// `read_ts`, then the peek doesn't return before it completes at the chosen
// timestamp. However, for a CRATE MATERIALIZED VIEW statement, it's not clear
// whether we want to make it block until the chosen time. If it doesn't block,
// then the initial refresh wouldn't be linearized with the CREATE MATERIALIZED
// VIEW statement.
//
// Note: The Adapter is usually keeping a read hold of all objects at the oracle
// read timestamp, so `least_valid_read` usually won't actually be later than
// the oracle's `read_ts`. (see `Coordinator::advance_timelines`)
//
// Note 2: If we choose a timestamp here that is earlier than
// `least_valid_read`, that is somewhat bad, but not catastrophic: The only
// bad thing that happens is that we won't perform that refresh that was
// specified to be at `mz_now()` (which is usually the initial refresh)
// (similarly to how we don't perform refreshes that were specified to be in the
// past).
} else {
Ok(None)
}
}

/// Instruct the dataflow layer to cancel any ongoing, interactive work for
/// the named `conn_id` if the correct secret key is specified.
///
3 changes: 2 additions & 1 deletion src/adapter/src/coord/sequencer/inner.rs
Original file line number Diff line number Diff line change
@@ -3127,7 +3127,7 @@ impl Coordinator {
target_cluster_id: ClusterId,
broken: bool,
non_null_assertions: Vec<usize>,
_refresh_schedule: Option<RefreshSchedule>,
refresh_schedule: Option<RefreshSchedule>,
explain_config: &mz_repr::explain::ExplainConfig,
_root_dispatch: tracing::Dispatch,
) -> Result<
@@ -3168,6 +3168,7 @@ impl Coordinator {
internal_view_id,
column_names.clone(),
non_null_assertions,
refresh_schedule,
debug_name,
optimizer_config,
);
59 changes: 44 additions & 15 deletions src/adapter/src/coord/sequencer/inner/create_materialized_view.rs
Original file line number Diff line number Diff line change
@@ -7,14 +7,17 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use differential_dataflow::lattice::Lattice;
use mz_adapter_types::compaction::CompactionWindow;
use mz_catalog::memory::objects::{CatalogItem, MaterializedView};
use mz_expr::CollectionPlan;
use mz_ore::soft_panic_or_log;
use mz_ore::tracing::OpenTelemetryContext;
use mz_sql::catalog::CatalogError;
use mz_sql::names::{ObjectId, ResolvedIds};
use mz_sql::plan;
use mz_storage_client::controller::{CollectionDescription, DataSource, DataSourceOther};
use timely::progress::Antichain;

use crate::command::ExecuteResponse;
use crate::coord::sequencer::inner::return_if_err;
@@ -97,21 +100,12 @@ impl Coordinator {
let plan::CreateMaterializedViewPlan {
materialized_view:
plan::MaterializedView {
expr,
cluster_id,
refresh_schedule,
..
expr, cluster_id, ..
},
ambiguous_columns,
..
} = &plan;

if refresh_schedule.is_some() {
return Err(AdapterError::Unsupported(
"REFRESH options other than ON COMMIT",
));
}

// Validate any references in the materialized view's expression. We do
// this on the unoptimized plan to better reflect what the user typed.
// We want to reject queries that depend on log sources, for example,
@@ -167,6 +161,7 @@ impl Coordinator {
column_names,
cluster_id,
non_null_assertions,
refresh_schedule,
..
},
..
@@ -193,6 +188,7 @@ impl Coordinator {
internal_view_id,
column_names.clone(),
non_null_assertions.clone(),
refresh_schedule.clone(),
debug_name,
optimizer_config,
);
@@ -285,6 +281,38 @@ impl Coordinator {
)
.collect::<Vec<_>>();

// Timestamp selection
let as_of = {
// Normally, `as_of` should be the least_valid_read.
let id_bundle = dataflow_import_id_bundle(global_lir_plan.df_desc(), cluster_id);
let mut as_of = self.least_valid_read(&id_bundle);
// But for MVs with non-trivial REFRESH schedules, it's important to set the
// `as_of` to the first refresh. This is because we'd like queries on the MV to
// block until the first refresh (rather than to show an empty MV).
if let Some(refresh_schedule) = &refresh_schedule {
if let Some(as_of_ts) = as_of.as_option() {
let Some(rounded_up_ts) = refresh_schedule.round_up_timestamp(*as_of_ts) else {
return Err(AdapterError::MaterializedViewWouldNeverRefresh(
refresh_schedule.last_refresh().expect("if round_up_timestamp returned None, then there should be a last refresh"),
*as_of_ts
));
};
as_of = Antichain::from_elem(rounded_up_ts);
} else {
// The `as_of` should never be empty, because then the MV would be unreadable.
soft_panic_or_log!("creating a materialized view with an empty `as_of`");
}
}
as_of
};

// If we have a refresh schedule that has a last refresh, then set the `until` to the last refresh.
// (If the `try_step_forward` fails, then no need to set an `until`, because it's not possible to get any data
// beyond that last refresh time, because there are no times beyond that time.)
let until = refresh_schedule
.and_then(|s| s.last_refresh())
.and_then(|r| r.try_step_forward());

let transact_result = self
.catalog_transact_with_side_effects(Some(ctx.session()), ops, |coord| async {
// Save plan structures.
@@ -298,10 +326,11 @@ impl Coordinator {
let output_desc = global_lir_plan.desc().clone();
let (mut df_desc, df_meta) = global_lir_plan.unapply();

// Timestamp selection
let id_bundle = dataflow_import_id_bundle(&df_desc, cluster_id);
let since = coord.least_valid_read(&id_bundle);
df_desc.set_as_of(since.clone());
df_desc.set_as_of(as_of.clone());

if let Some(until) = until {
df_desc.until.meet_assign(&Antichain::from_elem(until));
}

// Emit notices.
coord.emit_optimizer_notices(ctx.session(), &df_meta.optimizer_notices);
@@ -323,7 +352,7 @@ impl Coordinator {
CollectionDescription {
desc: output_desc,
data_source: DataSource::Other(DataSourceOther::Compute),
since: Some(since),
since: Some(as_of),
status_collection_id: None,
},
)],
Loading