From a8fc4b042684dfb3241e81a3445d5c1aab6b485f Mon Sep 17 00:00:00 2001 From: Gabor Gevay Date: Mon, 11 Dec 2023 19:11:53 +0100 Subject: [PATCH] Add REFRESH options for MVs -- Compute --- src/adapter/src/coord.rs | 35 +- src/adapter/src/coord/command_handler.rs | 145 +++++--- src/adapter/src/coord/sequencer/inner.rs | 3 +- .../inner/create_materialized_view.rs | 59 +++- src/adapter/src/error.rs | 23 +- src/adapter/src/optimize/materialized_view.rs | 6 + src/adapter/src/optimize/subscribe.rs | 4 + src/compute-client/src/controller/instance.rs | 1 + src/compute-types/build.rs | 1 + src/compute-types/src/sinks.proto | 2 + src/compute-types/src/sinks.rs | 17 +- src/compute/src/sink/mod.rs | 1 + src/compute/src/sink/persist_sink.rs | 12 +- src/compute/src/sink/refresh.rs | 130 ++++++++ src/expr/build.rs | 3 + src/expr/src/refresh_schedule.proto | 24 ++ src/expr/src/refresh_schedule.rs | 313 ++++++++++++++++++ src/ore/src/vec.rs | 2 + src/repr/build.rs | 1 + src/repr/src/adt/interval.rs | 21 ++ src/repr/src/lib.rs | 2 +- src/repr/src/timestamp.proto | 16 + src/repr/src/timestamp.rs | 15 + src/sql-parser/src/ast/defs/statement.rs | 2 + src/sql/src/plan/statement/ddl.rs | 6 +- test/sqllogictest/materialized_views.slt | 256 +++++++++++++- .../materialized-view-refresh-options.td | 178 ++++++++++ test/testdrive/materialized-views.td | 1 - 28 files changed, 1195 insertions(+), 84 deletions(-) create mode 100644 src/compute/src/sink/refresh.rs create mode 100644 src/expr/src/refresh_schedule.proto create mode 100644 src/repr/src/timestamp.proto create mode 100644 test/testdrive/materialized-view-refresh-options.td diff --git a/src/adapter/src/coord.rs b/src/adapter/src/coord.rs index 6f6583e40ce99..e6cda2374a4aa 100644 --- a/src/adapter/src/coord.rs +++ b/src/adapter/src/coord.rs @@ -161,6 +161,7 @@ use crate::webhook::{WebhookAppenderInvalidator, WebhookConcurrencyLimiter}; use crate::{flags, AdapterNotice, TimestampProvider}; use mz_catalog::builtin::BUILTINS; use mz_catalog::durable::DurableCatalogState; +use mz_expr::refresh_schedule::RefreshSchedule; use self::statement_logging::{StatementLogging, StatementLoggingId}; @@ -1558,9 +1559,23 @@ impl Coordinator { .clone(); // Timestamp selection - let as_of = self.bootstrap_materialized_view_as_of(&df_desc, mview.cluster_id); + let as_of = self.bootstrap_materialized_view_as_of( + &df_desc, + mview.cluster_id, + &mview.refresh_schedule, + ); df_desc.set_as_of(as_of); + // If we have a refresh schedule that has a last refresh, then set the `until` to the last refresh. + let until = mview + .refresh_schedule + .as_ref() + .and_then(|s| s.last_refresh()) + .and_then(|r| r.try_step_forward()); + if let Some(until) = until { + df_desc.until.meet_assign(&Antichain::from_elem(until)); + } + let df_meta = self .catalog() .try_get_dataflow_metainfo(&entry.id()) @@ -1932,6 +1947,7 @@ impl Coordinator { internal_view_id, mv.desc.iter_names().cloned().collect(), mv.non_null_assertions.clone(), + mv.refresh_schedule.clone(), debug_name, optimizer_config.clone(), ); @@ -2154,6 +2170,7 @@ impl Coordinator { &self, dataflow: &DataflowDescription, cluster_id: ComputeInstanceId, + refresh_schedule: &Option, ) -> Antichain { // All inputs must be readable at the chosen `as_of`, so it must be at least the join of // the `since`s of all dependencies. @@ -2172,12 +2189,26 @@ impl Coordinator { let write_frontier = self.storage_write_frontier(*sink_id); // Things go wrong if we try to create a dataflow with `as_of = []`, so avoid that. - let as_of = if write_frontier.is_empty() { + let mut as_of = if write_frontier.is_empty() { min_as_of.clone() } else { min_as_of.join(write_frontier) }; + // If we have a RefreshSchedule, then round up the `as_of` to the next refresh. + // Note that in many cases the `as_of` would already be at this refresh, because the `write_frontier` will be + // usually there. However, it can happen that we restart after the MV was created in the catalog but before + // its upper was initialized in persist. + if let Some(refresh_schedule) = &refresh_schedule { + if let Some(rounded_up_ts) = + refresh_schedule.round_up_timestamp(*as_of.as_option().expect("as_of is non-empty")) + { + as_of = Antichain::from_elem(rounded_up_ts); + } else { + // We are past the last refresh. Let's not move the as_of. + } + } + tracing::info!( export_ids = %dataflow.display_export_ids(), %cluster_id, diff --git a/src/adapter/src/coord/command_handler.rs b/src/adapter/src/coord/command_handler.rs index a97338f16cb67..18d97cfc2e302 100644 --- a/src/adapter/src/coord/command_handler.rs +++ b/src/adapter/src/coord/command_handler.rs @@ -22,6 +22,7 @@ use mz_compute_client::protocol::response::PeekResponse; use mz_ore::task; use mz_ore::tracing::OpenTelemetryContext; use mz_repr::role_id::RoleId; +use mz_repr::Timestamp; use mz_sql::ast::{ CopyRelation, CopyStatement, InsertSource, Query, Raw, SetExpr, Statement, SubscribeStatement, }; @@ -39,7 +40,7 @@ use mz_sql::session::user::User; use mz_sql::session::vars::{ EndTransactionAction, OwnedVarInput, Value, Var, STATEMENT_LOGGING_SAMPLE_RATE, }; -use mz_sql_parser::ast::CreateMaterializedViewStatement; +use mz_sql_parser::ast::{CreateMaterializedViewStatement, ExplainPlanStatement, Explainee}; use mz_storage_types::sources::Timeline; use opentelemetry::trace::TraceContextExt; use tokio::sync::{mpsc, oneshot, watch}; @@ -691,54 +692,12 @@ impl Coordinator { ))), Statement::CreateMaterializedView(mut cmvs) => { - // (This won't be the same timestamp as the system table inserts, unfortunately.) - let mz_now = if cmvs - .with_options - .iter() - .any(materialized_view_option_contains_temporal) + let mz_now = match self + .resolve_mz_now_for_create_materialized_view(&cmvs, &resolved_ids) + .await { - let timeline_context = - match self.validate_timeline_context(resolved_ids.0.clone()) { - Ok(tc) => tc, - Err(e) => return ctx.retire(Err(e)), - }; - - // We default to EpochMilliseconds, similarly to `determine_timestamp_for`, - // but even in the TimestampIndependent case. - // Note that we didn't accurately decide whether we are TimestampDependent - // or TimestampIndependent, because for this we'd need to also check whether - // `query.contains_temporal()`, similarly to how `peek_stage_validate` does. - // However, this doesn't matter here, as we are just going to default to - // EpochMilliseconds in both cases. - let timeline = timeline_context - .timeline() - .unwrap_or(&Timeline::EpochMilliseconds); - Some(self.get_timestamp_oracle(timeline).read_ts().await) - // TODO: It might be good to take into account `least_valid_read` in addition to - // the oracle's `read_ts`, but there are two problems: - // 1. At this point, we don't know which indexes would be used. We could do an - // overestimation here by grabbing the ids of all indexes that are on ids - // involved in the query. (We'd have to recursively follow view definitions, - // similarly to `validate_timeline_context`.) - // 2. For a peek, when the `least_valid_read` is later than the oracle's - // `read_ts`, then the peek doesn't return before it completes at the chosen - // timestamp. However, for a CRATE MATERIALIZED VIEW statement, it's not clear - // whether we want to make it block until the chosen time. If it doesn't block, - // then the initial refresh wouldn't be linearized with the CREATE MATERIALIZED - // VIEW statement. - // - // Note: The Adapter is usually keeping a read hold of all objects at the oracle - // read timestamp, so `least_valid_read` usually won't actually be later than - // the oracle's `read_ts`. (see `Coordinator::advance_timelines`) - // - // Note 2: If we choose a timestamp here that is earlier than - // `least_valid_read`, that is somewhat bad, but not catastrophic: The only - // bad thing that happens is that we won't perform that refresh that was - // specified to be at `mz_now()` (which is usually the initial refresh) - // (similarly to how we don't perform refreshes that were specified to be in the - // past). - } else { - None + Ok(mz_now) => mz_now, + Err(e) => return ctx.retire(Err(e)), }; let owned_catalog = self.owned_catalog(); @@ -769,6 +728,44 @@ impl Coordinator { } } + Statement::ExplainPlan(ExplainPlanStatement { + stage, + config_flags, + format, + explainee: Explainee::CreateMaterializedView(box_cmvs, broken), + }) => { + let mut cmvs = *box_cmvs; + let mz_now = match self + .resolve_mz_now_for_create_materialized_view(&cmvs, &resolved_ids) + .await + { + Ok(mz_now) => mz_now, + Err(e) => return ctx.retire(Err(e)), + }; + + let owned_catalog = self.owned_catalog(); + let catalog = owned_catalog.for_session(ctx.session()); + + purify_create_materialized_view_options( + catalog, + mz_now, + &mut cmvs, + &mut resolved_ids, + ); + + let purified_stmt = Statement::ExplainPlan(ExplainPlanStatement { + stage, + config_flags, + format, + explainee: Explainee::CreateMaterializedView(Box::new(cmvs), broken), + }); + + match self.plan_statement(ctx.session(), purified_stmt, ¶ms, &resolved_ids) { + Ok(plan) => self.sequence_plan(ctx, plan, resolved_ids).await, + Err(e) => ctx.retire(Err(e)), + } + } + // All other statements are handled immediately. _ => match self.plan_statement(ctx.session(), stmt, ¶ms, &resolved_ids) { Ok(plan) => self.sequence_plan(ctx, plan, resolved_ids).await, @@ -777,6 +774,58 @@ impl Coordinator { } } + async fn resolve_mz_now_for_create_materialized_view( + &self, + cmvs: &CreateMaterializedViewStatement, + resolved_ids: &ResolvedIds, + ) -> Result, AdapterError> { + // (This won't be the same timestamp as the system table inserts, unfortunately.) + if cmvs + .with_options + .iter() + .any(materialized_view_option_contains_temporal) + { + let timeline_context = self.validate_timeline_context(resolved_ids.0.clone())?; + + // We default to EpochMilliseconds, similarly to `determine_timestamp_for`, + // but even in the TimestampIndependent case. + // Note that we didn't accurately decide whether we are TimestampDependent + // or TimestampIndependent, because for this we'd need to also check whether + // `query.contains_temporal()`, similarly to how `peek_stage_validate` does. + // However, this doesn't matter here, as we are just going to default to + // EpochMilliseconds in both cases. + let timeline = timeline_context + .timeline() + .unwrap_or(&Timeline::EpochMilliseconds); + Ok(Some(self.get_timestamp_oracle(timeline).read_ts().await)) + // TODO: It might be good to take into account `least_valid_read` in addition to + // the oracle's `read_ts`, but there are two problems: + // 1. At this point, we don't know which indexes would be used. We could do an + // overestimation here by grabbing the ids of all indexes that are on ids + // involved in the query. (We'd have to recursively follow view definitions, + // similarly to `validate_timeline_context`.) + // 2. For a peek, when the `least_valid_read` is later than the oracle's + // `read_ts`, then the peek doesn't return before it completes at the chosen + // timestamp. However, for a CRATE MATERIALIZED VIEW statement, it's not clear + // whether we want to make it block until the chosen time. If it doesn't block, + // then the initial refresh wouldn't be linearized with the CREATE MATERIALIZED + // VIEW statement. + // + // Note: The Adapter is usually keeping a read hold of all objects at the oracle + // read timestamp, so `least_valid_read` usually won't actually be later than + // the oracle's `read_ts`. (see `Coordinator::advance_timelines`) + // + // Note 2: If we choose a timestamp here that is earlier than + // `least_valid_read`, that is somewhat bad, but not catastrophic: The only + // bad thing that happens is that we won't perform that refresh that was + // specified to be at `mz_now()` (which is usually the initial refresh) + // (similarly to how we don't perform refreshes that were specified to be in the + // past). + } else { + Ok(None) + } + } + /// Instruct the dataflow layer to cancel any ongoing, interactive work for /// the named `conn_id` if the correct secret key is specified. /// diff --git a/src/adapter/src/coord/sequencer/inner.rs b/src/adapter/src/coord/sequencer/inner.rs index 81ee2c0f16edd..c4a8bb722a5f1 100644 --- a/src/adapter/src/coord/sequencer/inner.rs +++ b/src/adapter/src/coord/sequencer/inner.rs @@ -3127,7 +3127,7 @@ impl Coordinator { target_cluster_id: ClusterId, broken: bool, non_null_assertions: Vec, - _refresh_schedule: Option, + refresh_schedule: Option, explain_config: &mz_repr::explain::ExplainConfig, _root_dispatch: tracing::Dispatch, ) -> Result< @@ -3168,6 +3168,7 @@ impl Coordinator { internal_view_id, column_names.clone(), non_null_assertions, + refresh_schedule, debug_name, optimizer_config, ); diff --git a/src/adapter/src/coord/sequencer/inner/create_materialized_view.rs b/src/adapter/src/coord/sequencer/inner/create_materialized_view.rs index a9129147a639b..df560f8269804 100644 --- a/src/adapter/src/coord/sequencer/inner/create_materialized_view.rs +++ b/src/adapter/src/coord/sequencer/inner/create_materialized_view.rs @@ -7,14 +7,17 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use differential_dataflow::lattice::Lattice; use mz_adapter_types::compaction::CompactionWindow; use mz_catalog::memory::objects::{CatalogItem, MaterializedView}; use mz_expr::CollectionPlan; +use mz_ore::soft_panic_or_log; use mz_ore::tracing::OpenTelemetryContext; use mz_sql::catalog::CatalogError; use mz_sql::names::{ObjectId, ResolvedIds}; use mz_sql::plan; use mz_storage_client::controller::{CollectionDescription, DataSource, DataSourceOther}; +use timely::progress::Antichain; use crate::command::ExecuteResponse; use crate::coord::sequencer::inner::return_if_err; @@ -97,21 +100,12 @@ impl Coordinator { let plan::CreateMaterializedViewPlan { materialized_view: plan::MaterializedView { - expr, - cluster_id, - refresh_schedule, - .. + expr, cluster_id, .. }, ambiguous_columns, .. } = &plan; - if refresh_schedule.is_some() { - return Err(AdapterError::Unsupported( - "REFRESH options other than ON COMMIT", - )); - } - // Validate any references in the materialized view's expression. We do // this on the unoptimized plan to better reflect what the user typed. // We want to reject queries that depend on log sources, for example, @@ -167,6 +161,7 @@ impl Coordinator { column_names, cluster_id, non_null_assertions, + refresh_schedule, .. }, .. @@ -193,6 +188,7 @@ impl Coordinator { internal_view_id, column_names.clone(), non_null_assertions.clone(), + refresh_schedule.clone(), debug_name, optimizer_config, ); @@ -285,6 +281,38 @@ impl Coordinator { ) .collect::>(); + // Timestamp selection + let as_of = { + // Normally, `as_of` should be the least_valid_read. + let id_bundle = dataflow_import_id_bundle(global_lir_plan.df_desc(), cluster_id); + let mut as_of = self.least_valid_read(&id_bundle); + // But for MVs with non-trivial REFRESH schedules, it's important to set the + // `as_of` to the first refresh. This is because we'd like queries on the MV to + // block until the first refresh (rather than to show an empty MV). + if let Some(refresh_schedule) = &refresh_schedule { + if let Some(as_of_ts) = as_of.as_option() { + let Some(rounded_up_ts) = refresh_schedule.round_up_timestamp(*as_of_ts) else { + return Err(AdapterError::MaterializedViewWouldNeverRefresh( + refresh_schedule.last_refresh().expect("if round_up_timestamp returned None, then there should be a last refresh"), + *as_of_ts + )); + }; + as_of = Antichain::from_elem(rounded_up_ts); + } else { + // The `as_of` should never be empty, because then the MV would be unreadable. + soft_panic_or_log!("creating a materialized view with an empty `as_of`"); + } + } + as_of + }; + + // If we have a refresh schedule that has a last refresh, then set the `until` to the last refresh. + // (If the `try_step_forward` fails, then no need to set an `until`, because it's not possible to get any data + // beyond that last refresh time, because there are no times beyond that time.) + let until = refresh_schedule + .and_then(|s| s.last_refresh()) + .and_then(|r| r.try_step_forward()); + let transact_result = self .catalog_transact_with_side_effects(Some(ctx.session()), ops, |coord| async { // Save plan structures. @@ -298,10 +326,11 @@ impl Coordinator { let output_desc = global_lir_plan.desc().clone(); let (mut df_desc, df_meta) = global_lir_plan.unapply(); - // Timestamp selection - let id_bundle = dataflow_import_id_bundle(&df_desc, cluster_id); - let since = coord.least_valid_read(&id_bundle); - df_desc.set_as_of(since.clone()); + df_desc.set_as_of(as_of.clone()); + + if let Some(until) = until { + df_desc.until.meet_assign(&Antichain::from_elem(until)); + } // Emit notices. coord.emit_optimizer_notices(ctx.session(), &df_meta.optimizer_notices); @@ -323,7 +352,7 @@ impl Coordinator { CollectionDescription { desc: output_desc, data_source: DataSource::Other(DataSourceOther::Compute), - since: Some(since), + since: Some(as_of), status_collection_id: None, }, )], diff --git a/src/adapter/src/error.rs b/src/adapter/src/error.rs index 96cd38569150f..101f9aae6551b 100644 --- a/src/adapter/src/error.rs +++ b/src/adapter/src/error.rs @@ -22,7 +22,7 @@ use mz_ore::str::StrExt; use mz_pgwire_common::{ErrorResponse, Severity}; use mz_repr::adt::timestamp::TimestampError; use mz_repr::explain::ExplainError; -use mz_repr::NotNullViolation; +use mz_repr::{NotNullViolation, Timestamp}; use mz_sql::plan::PlanError; use mz_sql::rbac; use mz_sql::session::vars::VarError; @@ -215,6 +215,10 @@ pub enum AdapterError { InvalidAlter(&'static str, PlanError), /// An error occurred while validating a connection. ConnectionValidation(ConnectionValidationError), + /// We refuse to create the materialized view, because it would never be refreshed, so it would + /// never be queryable. This can happen when the only specified refreshes are further back in + /// the past than the initial compaction window of the materialized view. + MaterializedViewWouldNeverRefresh(Timestamp, Timestamp), } impl AdapterError { @@ -301,6 +305,14 @@ impl AdapterError { AdapterError::InvalidAlter(_, e) => e.detail(), AdapterError::Optimizer(e) => e.detail(), AdapterError::ConnectionValidation(e) => e.detail(), + AdapterError::MaterializedViewWouldNeverRefresh(last_refresh, earliest_possible) => { + Some(format!( + "The specified last refresh is at {}, while the earliest possible time to compute the materialized \ + view is {}.", + last_refresh, + earliest_possible, + )) + } _ => None, } } @@ -476,6 +488,8 @@ impl AdapterError { AdapterError::DependentObject(_) => SqlState::DEPENDENT_OBJECTS_STILL_EXIST, AdapterError::InvalidAlter(_, _) => SqlState::FEATURE_NOT_SUPPORTED, AdapterError::ConnectionValidation(_) => SqlState::SYSTEM_ERROR, + // `DATA_EXCEPTION`, similarly to `AbsurdSubscribeBounds`. + AdapterError::MaterializedViewWouldNeverRefresh(_, _) => SqlState::DATA_EXCEPTION, } } @@ -689,6 +703,13 @@ impl fmt::Display for AdapterError { write!(f, "invalid ALTER {t}: {e}") } AdapterError::ConnectionValidation(e) => e.fmt(f), + AdapterError::MaterializedViewWouldNeverRefresh(_, _) => { + write!( + f, + "all the specified refreshes of the materialized view would be too far in the past, and thus they \ + would never happen" + ) + } } } } diff --git a/src/adapter/src/optimize/materialized_view.rs b/src/adapter/src/optimize/materialized_view.rs index 688a6b8f444a1..36f0e21a84e92 100644 --- a/src/adapter/src/optimize/materialized_view.rs +++ b/src/adapter/src/optimize/materialized_view.rs @@ -30,6 +30,7 @@ use std::sync::Arc; use mz_compute_types::dataflows::BuildDesc; use mz_compute_types::plan::Plan; use mz_compute_types::sinks::{ComputeSinkConnection, ComputeSinkDesc, PersistSinkConnection}; +use mz_expr::refresh_schedule::RefreshSchedule; use mz_expr::{MirRelationExpr, OptimizedMirRelationExpr}; use mz_repr::explain::trace_plan; use mz_repr::{ColumnName, GlobalId, RelationDesc}; @@ -65,6 +66,8 @@ pub struct Optimizer { /// Output columns that are asserted to be not null in the `CREATE VIEW` /// statement. non_null_assertions: Vec, + /// Refresh schedule, e.g., `REFRESH EVERY '1 day'` + refresh_schedule: Option, /// A human-readable name exposed internally (useful for debugging). debug_name: String, // Optimizer config. @@ -79,6 +82,7 @@ impl Optimizer { internal_view_id: GlobalId, column_names: Vec, non_null_assertions: Vec, + refresh_schedule: Option, debug_name: String, config: OptimizerConfig, ) -> Self { @@ -90,6 +94,7 @@ impl Optimizer { internal_view_id, column_names, non_null_assertions, + refresh_schedule, debug_name, config, } @@ -218,6 +223,7 @@ impl Optimize for Optimizer { with_snapshot: true, up_to: Antichain::default(), non_null_assertions: self.non_null_assertions.clone(), + refresh_schedule: self.refresh_schedule.clone(), }; let df_meta = df_builder.build_sink_dataflow_into( diff --git a/src/adapter/src/optimize/subscribe.rs b/src/adapter/src/optimize/subscribe.rs index f8d97afb2c065..f1538620df6ef 100644 --- a/src/adapter/src/optimize/subscribe.rs +++ b/src/adapter/src/optimize/subscribe.rs @@ -188,6 +188,8 @@ impl Optimize for Optimizer { up_to: self.up_to.map(Antichain::from_elem).unwrap_or_default(), // No `FORCE NOT NULL` for subscribes non_null_assertions: vec![], + // No `REFRESH` for subscribes + refresh_schedule: None, }; let mut df_builder = @@ -229,6 +231,8 @@ impl Optimize for Optimizer { up_to: self.up_to.map(Antichain::from_elem).unwrap_or_default(), // No `FORCE NOT NULL` for subscribes non_null_assertions: vec![], + // No `REFRESH` for subscribes + refresh_schedule: None, }; let mut df_builder = diff --git a/src/compute-client/src/controller/instance.rs b/src/compute-client/src/controller/instance.rs index 53e5409b3e5cd..d95a8c72626a5 100644 --- a/src/compute-client/src/controller/instance.rs +++ b/src/compute-client/src/controller/instance.rs @@ -821,6 +821,7 @@ where with_snapshot: se.with_snapshot, up_to: se.up_to, non_null_assertions: se.non_null_assertions, + refresh_schedule: se.refresh_schedule, }; sink_exports.insert(id, desc); } diff --git a/src/compute-types/build.rs b/src/compute-types/build.rs index 016a2a702b4b8..fcf471e2ee4de 100644 --- a/src/compute-types/build.rs +++ b/src/compute-types/build.rs @@ -93,6 +93,7 @@ fn main() { .extern_path(".mz_ccsr.config", "::mz_ccsr") .extern_path(".mz_expr.id", "::mz_expr") .extern_path(".mz_expr.linear", "::mz_expr") + .extern_path(".mz_expr.refresh_schedule", "::mz_expr::refresh_schedule") .extern_path(".mz_expr.relation", "::mz_expr") .extern_path(".mz_expr.scalar", "::mz_expr") .extern_path(".mz_kafka_util.addr", "::mz_kafka_util") diff --git a/src/compute-types/src/sinks.proto b/src/compute-types/src/sinks.proto index 834b7bbc5762b..546fb520cac09 100644 --- a/src/compute-types/src/sinks.proto +++ b/src/compute-types/src/sinks.proto @@ -12,6 +12,7 @@ import "google/protobuf/empty.proto"; import "repr/src/antichain.proto"; import "repr/src/global_id.proto"; +import "expr/src/refresh_schedule.proto"; import "repr/src/relation_and_scalar.proto"; import "storage-types/src/controller.proto"; @@ -24,6 +25,7 @@ message ProtoComputeSinkDesc { bool with_snapshot = 4; mz_repr.antichain.ProtoU64Antichain up_to = 5; repeated uint64 non_null_assertions = 6; + mz_expr.refresh_schedule.ProtoRefreshSchedule refresh_schedule = 7; } message ProtoComputeSinkConnection { diff --git a/src/compute-types/src/sinks.rs b/src/compute-types/src/sinks.rs index 23c1fcc806c0f..a62db94b82538 100644 --- a/src/compute-types/src/sinks.rs +++ b/src/compute-types/src/sinks.rs @@ -9,8 +9,9 @@ //! Types for describing dataflow sinks. +use mz_expr::refresh_schedule::RefreshSchedule; use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError}; -use mz_repr::{GlobalId, RelationDesc}; +use mz_repr::{GlobalId, RelationDesc, Timestamp}; use mz_storage_types::controller::CollectionMetadata; use proptest::prelude::{any, Arbitrary, BoxedStrategy, Strategy}; use proptest_derive::Arbitrary; @@ -21,16 +22,17 @@ include!(concat!(env!("OUT_DIR"), "/mz_compute_types.sinks.rs")); /// A sink for updates to a relational collection. #[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] -pub struct ComputeSinkDesc { +pub struct ComputeSinkDesc { pub from: GlobalId, pub from_desc: RelationDesc, pub connection: ComputeSinkConnection, pub with_snapshot: bool, pub up_to: Antichain, pub non_null_assertions: Vec, + pub refresh_schedule: Option, } -impl Arbitrary for ComputeSinkDesc { +impl Arbitrary for ComputeSinkDesc { type Strategy = BoxedStrategy; type Parameters = (); @@ -40,8 +42,9 @@ impl Arbitrary for ComputeSinkDesc { any::(), any::>(), any::(), - proptest::collection::vec(any::(), 1..4), + proptest::collection::vec(any::(), 1..4), proptest::collection::vec(any::(), 0..4), + proptest::option::of(any::()), ) .prop_map( |( @@ -51,6 +54,7 @@ impl Arbitrary for ComputeSinkDesc { with_snapshot, up_to_frontier, non_null_assertions, + refresh_schedule, )| { ComputeSinkDesc { from, @@ -59,6 +63,7 @@ impl Arbitrary for ComputeSinkDesc { with_snapshot, up_to: Antichain::from(up_to_frontier), non_null_assertions, + refresh_schedule, } }, ) @@ -66,7 +71,7 @@ impl Arbitrary for ComputeSinkDesc { } } -impl RustType for ComputeSinkDesc { +impl RustType for ComputeSinkDesc { fn into_proto(&self) -> ProtoComputeSinkDesc { ProtoComputeSinkDesc { connection: Some(self.connection.into_proto()), @@ -75,6 +80,7 @@ impl RustType for ComputeSinkDesc for ComputeSinkDesc SinkRender for PersistSinkConnection where @@ -59,7 +60,13 @@ where where G: Scope, { - let desired_collection = sinked_collection.map(Ok).concat(&err_collection.map(Err)); + let mut desired_collection = sinked_collection.map(Ok).concat(&err_collection.map(Err)); + + // If a `RefreshSchedule` was specified, round up timestamps. + if let Some(refresh_schedule) = &sink.refresh_schedule { + desired_collection = apply_refresh(desired_collection, refresh_schedule.clone()); + } + if sink.up_to != Antichain::default() { unimplemented!( "UP TO is not supported for persist sinks yet, and shouldn't have been accepted during parsing/planning" @@ -102,7 +109,6 @@ where None, // no MFP None, // no flow control ); - use differential_dataflow::AsCollection; let persist_collection = ok_stream .as_collection() .map(Ok) diff --git a/src/compute/src/sink/refresh.rs b/src/compute/src/sink/refresh.rs new file mode 100644 index 0000000000000..2c12d079045ec --- /dev/null +++ b/src/compute/src/sink/refresh.rs @@ -0,0 +1,130 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use differential_dataflow::{AsCollection, Collection, Data}; +use mz_expr::refresh_schedule::RefreshSchedule; +use mz_ore::soft_panic_or_log; +use mz_repr::{Diff, Timestamp}; +use mz_timely_util::buffer::ConsolidateBuffer; +use timely::dataflow::channels::pact::Pipeline; +use timely::dataflow::operators::generic::builder_rc::OperatorBuilder; +use timely::dataflow::Scope; +use timely::progress::Antichain; + +/// This is for REFRESH options on materialized views. It adds an operator that rounds up the +/// timestamps of data and frontiers to the time of the next refresh. See +/// `doc/developer/design/20231027_refresh_every_mv.md`. +/// +/// Note that this currently only works with 1-dim timestamps. (This is not an issue for WMR, +/// because iteration numbers should disappear by the time the data gets to the Persist sink.) +pub(crate) fn apply_refresh( + coll: Collection, + refresh_schedule: RefreshSchedule, +) -> Collection +where + G: Scope, + D: Data, +{ + // We need to disconnect the reachability graph and manage capabilities manually, because we'd + // like to round up frontiers as well as data: as soon as our input frontier passes a refresh + // time, we'll round it up to the next refresh time. + let mut builder = OperatorBuilder::new("apply_refresh".to_string(), coll.scope()); + let (mut output_buf, output_stream) = builder.new_output(); + let mut input = builder.new_input_connection(&coll.inner, Pipeline, vec![Antichain::new()]); + builder.build(move |capabilities| { + // This capability directly controls this operator's output frontier (because we have + // disconnected the input above). We wrap it in an Option so we can drop it to advance to + // the empty output frontier when the last refresh is done. (We must be careful that we only + // ever emit output updates at times that are at or beyond this capability.) + let mut capability = capabilities.into_iter().next(); // (We have 1 one input.) + let mut buffer = Vec::new(); + move |frontiers| { + let mut output_handle_core = output_buf.activate(); + let mut output_buf = ConsolidateBuffer::new(&mut output_handle_core, 0); + input.for_each(|input_cap, data| { + // Note that we can't use `input_cap` to get an output session because we might have + // advanced our output frontier already beyond the frontier of this capability. + + // `capability` will be None if we are past the last refresh. We have made sure to + // not receive any data that is after the last refresh by setting the `until` of the + // dataflow to the last refresh. + let Some(capability) = capability.as_mut() else { + soft_panic_or_log!( + "should have a capability if we received data. input_cap: {:?}, frontier: {:?}", + input_cap.time(), + frontiers[0].frontier() + ); + return; + }; + + data.swap(&mut buffer); + let mut cached_ts: Option = None; + let mut cached_rounded_up_data_ts = None; + for (d, ts, r) in buffer.drain(..) { + let rounded_up_data_ts = { + // We cache the rounded up timestamp for the last seen timestamp, + // because the rounding up has a non-negligible cost. Caching for + // just the 1 last timestamp helps already, because in some common + // cases, we'll be seeing a lot of identical timestamps, e.g., + // during a rehydration, or when we have much more than 1000 records + // during a single second. + if cached_ts != Some(ts) { + cached_ts = Some(ts); + cached_rounded_up_data_ts = refresh_schedule.round_up_timestamp(ts); + } + cached_rounded_up_data_ts + }; + match rounded_up_data_ts { + Some(rounded_up_ts) => { + output_buf.give_at(capability, (d, rounded_up_ts, r)); + } + None => { + // This record is after the last refresh, which is not possible because + // we set the dataflow `until` to the last refresh. + soft_panic_or_log!("Received data after the last refresh"); + } + } + } + }); + + // Round up the frontier. + // Note that `round_up_timestamp` is monotonic. This is needed to ensure that the + // timestamp (t) of any received data that has a larger timestamp than the original + // frontier (f) will get rounded up to a time that is at least at the rounded up + // frontier. In other words, monotonicity ensures that + // when `t >= f` then `round_up_timestamp(t) >= round_up_timestamp(f)`. + match frontiers[0].frontier().as_option() { // (We have only 1 input, so only 1 frontier.) + Some(ts) => { + match refresh_schedule.round_up_timestamp(*ts) { + Some(rounded_up_ts) => { + capability + .as_mut() + .expect("capability must exist if frontier is <= last refresh") + .downgrade(&rounded_up_ts); + } + None => { + // We are past the last refresh. Drop the capability to signal that we + // are done. + capability = None; + // We can only get here if we see the frontier advancing to a time after + // the last refresh, but not empty, which would be a bug somewhere in + // the `until` handling. + soft_panic_or_log!("frontier advancements after the `until` should be suppressed"); + } + } + } + None => { + capability = None; + } + } + } + }); + + output_stream.as_collection() +} diff --git a/src/expr/build.rs b/src/expr/build.rs index 24afb8b363bd1..1dda6892cd548 100644 --- a/src/expr/build.rs +++ b/src/expr/build.rs @@ -86,6 +86,7 @@ fn main() { .extern_path(".mz_repr.adt.array", "::mz_repr::adt::array") .extern_path(".mz_repr.adt.char", "::mz_repr::adt::char") .extern_path(".mz_repr.adt.datetime", "::mz_repr::adt::datetime") + .extern_path(".mz_repr.adt.interval", "::mz_repr::adt::interval") .extern_path(".mz_repr.adt.numeric", "::mz_repr::adt::numeric") .extern_path(".mz_repr.adt.range", "::mz_repr::adt::range") .extern_path(".mz_repr.adt.regex", "::mz_repr::adt::regex") @@ -94,6 +95,7 @@ fn main() { .extern_path(".mz_repr.global_id", "::mz_repr::global_id") .extern_path(".mz_repr.relation_and_scalar", "::mz_repr") .extern_path(".mz_repr.row", "::mz_repr") + .extern_path(".mz_repr.timestamp", "::mz_repr::timestamp") .extern_path(".mz_repr.strconv", "::mz_repr::strconv") .type_attribute(".", "#[allow(missing_docs)]") .btree_map(["."]) @@ -101,6 +103,7 @@ fn main() { &[ "expr/src/id.proto", "expr/src/linear.proto", + "expr/src/refresh_schedule.proto", "expr/src/relation.proto", "expr/src/relation/func.proto", "expr/src/scalar.proto", diff --git a/src/expr/src/refresh_schedule.proto b/src/expr/src/refresh_schedule.proto new file mode 100644 index 0000000000000..3a3d1afe3f20b --- /dev/null +++ b/src/expr/src/refresh_schedule.proto @@ -0,0 +1,24 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. +syntax = "proto3"; + +package mz_expr.refresh_schedule; + +import "proto/src/proto.proto"; +import "repr/src/timestamp.proto"; + +message ProtoRefreshSchedule { + repeated ProtoRefreshEvery everies = 1; + repeated mz_repr.timestamp.ProtoTimestamp ats = 2; +} + +message ProtoRefreshEvery { + mz_proto.ProtoDuration interval = 1; + mz_repr.timestamp.ProtoTimestamp aligned_to = 2; +} diff --git a/src/expr/src/refresh_schedule.rs b/src/expr/src/refresh_schedule.rs index 84cd33f055583..a20222b0ce37d 100644 --- a/src/expr/src/refresh_schedule.rs +++ b/src/expr/src/refresh_schedule.rs @@ -7,11 +7,17 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use proptest::arbitrary::{any, Arbitrary}; +use proptest::prelude::{BoxedStrategy, Strategy}; use std::time::Duration; +use mz_proto::IntoRustIfSome; +use mz_proto::{ProtoType, RustType, TryFromProtoError}; use mz_repr::Timestamp; use serde::{Deserialize, Serialize}; +include!(concat!(env!("OUT_DIR"), "/mz_expr.refresh_schedule.rs")); + #[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] pub struct RefreshSchedule { // `REFRESH EVERY`s @@ -27,6 +33,36 @@ impl RefreshSchedule { ats: Vec::new(), } } + + /// Rounds up the timestamp to the time of the next refresh. + /// Returns None if there is no next refresh. + /// Note that this fn is monotonic. + pub fn round_up_timestamp(&self, timestamp: Timestamp) -> Option { + let next_every = self + .everies + .iter() + .map(|refresh_every| refresh_every.round_up_timestamp(timestamp)) + .min(); + let next_at = self + .ats + .iter() + .filter(|at| **at >= timestamp) + .min() + .cloned(); + // Take the min of `next_every` and `next_at`, but with considering None to be bigger than + // any Some. Note: Simply `min(next_every, next_at)` wouldn't do what we want, because None + // is smaller than any Some. + next_every.into_iter().chain(next_at).min() + } + + /// Returns the time of the last refresh. Returns None if there is no last refresh (e.g., for a periodic refresh). + pub fn last_refresh(&self) -> Option { + if self.everies.is_empty() { + self.ats.iter().max().cloned() + } else { + None + } + } } #[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] @@ -34,3 +70,280 @@ pub struct RefreshEvery { pub interval: Duration, pub aligned_to: Timestamp, } + +impl RefreshEvery { + /// Rounds up the timestamp to the time of the next refresh, according to the given periodic refresh schedule. + /// It saturates, i.e., if the rounding would make it overflow, then it returns the maximum possible timestamp. + /// + /// # Panics + /// - if the refresh interval converted to milliseconds cast to u64 overflows; + /// - if the interval is 0. + pub fn round_up_timestamp(&self, timestamp: Timestamp) -> Timestamp { + let RefreshEvery { + interval, + aligned_to, + } = self; + let interval = u64::try_from(interval.as_millis()).unwrap(); + // Rounds up `x` to the nearest multiple of `interval`. + let round_up_to_multiple_of_interval = |x: u64| -> u64 { + assert_ne!(x, 0); + (((x - 1) / interval) + 1).saturating_mul(interval) + }; + // Rounds down `x` to the nearest multiple of `interval`. + let round_down_to_multiple_of_interval = |x: u64| -> u64 { x / interval * interval }; + let result = + if timestamp > *aligned_to { + Timestamp::new(u64::from(aligned_to).saturating_add( + round_up_to_multiple_of_interval(u64::from(timestamp) - u64::from(aligned_to)), + )) + } else { + // Note: `timestamp == aligned_to` has to be handled here, because in the other branch + // `x - 1` would underflow. + // + // Also, no need to check for overflows here, since all the numbers are either between + // `timestamp` and `aligned_to`, or not greater than `aligned_to.internal - self.internal`. + Timestamp::new( + u64::from(aligned_to) + - round_down_to_multiple_of_interval( + u64::from(aligned_to) - u64::from(timestamp), + ), + ) + }; + // TODO: Downgrade these to non-logging soft asserts when we have built more confidence in the code. + assert!(u64::from(result) >= u64::from(timestamp)); + assert!(u64::from(result) - u64::from(timestamp) <= interval); + result + } +} + +impl RustType for RefreshSchedule { + fn into_proto(&self) -> ProtoRefreshSchedule { + ProtoRefreshSchedule { + everies: self.everies.into_proto(), + ats: self.ats.into_proto(), + } + } + + fn from_proto(proto: ProtoRefreshSchedule) -> Result { + Ok(RefreshSchedule { + everies: proto.everies.into_rust()?, + ats: proto.ats.into_rust()?, + }) + } +} + +impl RustType for RefreshEvery { + fn into_proto(&self) -> ProtoRefreshEvery { + ProtoRefreshEvery { + interval: Some(self.interval.into_proto()), + aligned_to: Some(self.aligned_to.into_proto()), + } + } + + fn from_proto(proto: ProtoRefreshEvery) -> Result { + Ok(RefreshEvery { + interval: proto + .interval + .into_rust_if_some("ProtoRefreshEvery::interval")?, + aligned_to: proto + .aligned_to + .into_rust_if_some("ProtoRefreshEvery::aligned_to")?, + }) + } +} + +impl Arbitrary for RefreshSchedule { + type Strategy = BoxedStrategy; + type Parameters = (); + + fn arbitrary_with(_: Self::Parameters) -> Self::Strategy { + ( + proptest::collection::vec(any::(), 0..4), + proptest::collection::vec(any::(), 0..4), + ) + .prop_map(|(everies, ats)| RefreshSchedule { everies, ats }) + .boxed() + } +} + +impl Arbitrary for RefreshEvery { + type Strategy = BoxedStrategy; + type Parameters = (); + + fn arbitrary_with(_: Self::Parameters) -> Self::Strategy { + (any::(), any::()) + .prop_map(|(interval, aligned_to)| RefreshEvery { + interval, + aligned_to, + }) + .boxed() + } +} + +#[cfg(test)] +mod tests { + use crate::refresh_schedule::{RefreshEvery, RefreshSchedule}; + use mz_repr::adt::interval::Interval; + use mz_repr::Timestamp; + use std::str::FromStr; + + #[mz_ore::test] + fn test_round_up_timestamp() { + let ts = |t: u64| Timestamp::new(t); + let test = |schedule: RefreshSchedule| { + move |expected_ts: Option, input_ts| { + assert_eq!( + expected_ts.map(ts), + schedule.round_up_timestamp(ts(input_ts)) + ) + } + }; + { + let schedule = RefreshSchedule { + everies: vec![], + ats: vec![ts(123), ts(456)], + }; + let test = test(schedule); + test(Some(123), 0); + test(Some(123), 50); + test(Some(123), 122); + test(Some(123), 123); + test(Some(456), 124); + test(Some(456), 130); + test(Some(456), 455); + test(Some(456), 456); + test(None, 457); + test(None, 12345678); + test(None, u64::MAX - 1000); + test(None, u64::MAX - 1); + test(None, u64::MAX); + } + { + let schedule = RefreshSchedule { + everies: vec![RefreshEvery { + interval: Interval::from_str("100 milliseconds") + .unwrap() + .duration() + .unwrap(), + aligned_to: ts(500), + }], + ats: vec![], + }; + let test = test(schedule); + test(Some(0), 0); + test(Some(100), 1); + test(Some(100), 2); + test(Some(100), 99); + test(Some(100), 100); + test(Some(200), 101); + test(Some(200), 102); + test(Some(200), 199); + test(Some(200), 200); + test(Some(300), 201); + test(Some(400), 400); + test(Some(500), 401); + test(Some(500), 450); + test(Some(500), 499); + test(Some(500), 500); + test(Some(600), 501); + test(Some(600), 599); + test(Some(600), 600); + test(Some(700), 601); + test(Some(5434532600), 5434532599); + test(Some(5434532600), 5434532600); + test(Some(5434532700), 5434532601); + test(Some(u64::MAX), u64::MAX - 1); + test(Some(u64::MAX), u64::MAX); + } + { + let schedule = RefreshSchedule { + everies: vec![RefreshEvery { + interval: Interval::from_str("100 milliseconds") + .unwrap() + .duration() + .unwrap(), + aligned_to: ts(542), + }], + ats: vec![], + }; + let test = test(schedule); + test(Some(42), 0); + test(Some(42), 1); + test(Some(42), 41); + test(Some(42), 42); + test(Some(142), 43); + test(Some(442), 441); + test(Some(442), 442); + test(Some(542), 443); + test(Some(542), 541); + test(Some(542), 542); + test(Some(642), 543); + test(Some(u64::MAX), u64::MAX - 1); + test(Some(u64::MAX), u64::MAX); + } + { + let schedule = RefreshSchedule { + everies: vec![ + RefreshEvery { + interval: Interval::from_str("100 milliseconds") + .unwrap() + .duration() + .unwrap(), + aligned_to: ts(400), + }, + RefreshEvery { + interval: Interval::from_str("100 milliseconds") + .unwrap() + .duration() + .unwrap(), + aligned_to: ts(542), + }, + ], + ats: vec![ts(2), ts(300), ts(400), ts(471), ts(541), ts(123456)], + }; + let test = test(schedule); + test(Some(0), 0); + test(Some(2), 1); + test(Some(2), 2); + test(Some(42), 3); + test(Some(42), 41); + test(Some(42), 42); + test(Some(100), 43); + test(Some(100), 99); + test(Some(100), 100); + test(Some(142), 101); + test(Some(142), 141); + test(Some(142), 142); + test(Some(200), 143); + test(Some(300), 243); + test(Some(300), 299); + test(Some(300), 300); + test(Some(342), 301); + test(Some(400), 343); + test(Some(400), 399); + test(Some(400), 400); + test(Some(442), 401); + test(Some(442), 441); + test(Some(442), 442); + test(Some(471), 443); + test(Some(471), 470); + test(Some(471), 471); + test(Some(500), 472); + test(Some(500), 472); + test(Some(500), 500); + test(Some(541), 501); + test(Some(541), 540); + test(Some(541), 541); + test(Some(542), 542); + test(Some(600), 543); + test(Some(65500), 65454); + test(Some(87842), 87831); + test(Some(123442), 123442); + test(Some(123456), 123443); + test(Some(123456), 123456); + test(Some(123500), 123457); + test(Some(u64::MAX), u64::MAX - 1); + test(Some(u64::MAX), u64::MAX); + } + } +} diff --git a/src/ore/src/vec.rs b/src/ore/src/vec.rs index 0e79a66a7bf3a..0d7ccd4b2542e 100644 --- a/src/ore/src/vec.rs +++ b/src/ore/src/vec.rs @@ -191,6 +191,8 @@ where /// This struct is created by [`VecExt::drain_filter_swapping`]. /// See its documentation for more. /// +/// Warning: The vector is modified only if the iterator is consumed! +/// /// # Example /// /// ``` diff --git a/src/repr/build.rs b/src/repr/build.rs index b4c9f1e6bd1b6..cb31d1888daaa 100644 --- a/src/repr/build.rs +++ b/src/repr/build.rs @@ -92,6 +92,7 @@ fn main() { "repr/src/relation_and_scalar.proto", "repr/src/role_id.proto", "repr/src/url.proto", + "repr/src/timestamp.proto", "repr/src/adt/array.proto", "repr/src/adt/char.proto", "repr/src/adt/date.proto", diff --git a/src/repr/src/adt/interval.rs b/src/repr/src/adt/interval.rs index 4c194de9bbc60..6dc0db99c48ca 100644 --- a/src/repr/src/adt/interval.rs +++ b/src/repr/src/adt/interval.rs @@ -16,6 +16,7 @@ use anyhow::{anyhow, bail}; use mz_proto::{RustType, TryFromProtoError}; use num_traits::CheckedMul; use once_cell::sync::Lazy; +use proptest::prelude::{any, Arbitrary, BoxedStrategy, Strategy}; use serde::{Deserialize, Serialize}; use crate::adt::datetime::DateTimeField; @@ -376,6 +377,11 @@ impl Interval { i128::from(self.micros) } + /// Computes the total number of milliseconds in the interval. Discards fractional milliseconds! + pub fn as_milliseconds(&self) -> i128 { + self.as_microseconds() / 1000 + } + /// Converts this `Interval`'s duration into `chrono::Duration`. pub fn duration_as_chrono(&self) -> chrono::Duration { use chrono::Duration; @@ -770,6 +776,21 @@ impl fmt::Display for Interval { } } +impl Arbitrary for Interval { + type Strategy = BoxedStrategy; + type Parameters = (); + + fn arbitrary_with(_: Self::Parameters) -> Self::Strategy { + (any::(), any::(), any::()) + .prop_map(|(months, days, micros)| Interval { + months, + days, + micros, + }) + .boxed() + } +} + #[cfg(test)] mod test { use super::*; diff --git a/src/repr/src/lib.rs b/src/repr/src/lib.rs index d5df57cf98139..fefbce67a7764 100644 --- a/src/repr/src/lib.rs +++ b/src/repr/src/lib.rs @@ -96,7 +96,6 @@ mod relation; mod relation_and_scalar; mod row; mod scalar; -mod timestamp; pub mod adt; pub mod antichain; @@ -107,6 +106,7 @@ pub mod namespaces; pub mod role_id; pub mod stats; pub mod strconv; +pub mod timestamp; pub mod url; pub mod user; diff --git a/src/repr/src/timestamp.proto b/src/repr/src/timestamp.proto new file mode 100644 index 0000000000000..917a3e4c3b5c5 --- /dev/null +++ b/src/repr/src/timestamp.proto @@ -0,0 +1,16 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +syntax = "proto3"; + +package mz_repr.timestamp; + +message ProtoTimestamp { + uint64 internal = 1; +} diff --git a/src/repr/src/timestamp.rs b/src/repr/src/timestamp.rs index f37ba045e7047..ca22ae7874ab1 100644 --- a/src/repr/src/timestamp.rs +++ b/src/repr/src/timestamp.rs @@ -12,12 +12,15 @@ use std::num::TryFromIntError; use std::time::Duration; use dec::TryFromDecimalError; +use mz_proto::{RustType, TryFromProtoError}; use proptest_derive::Arbitrary; use serde::{Deserialize, Serialize, Serializer}; use crate::adt::numeric::Numeric; use crate::strconv::parse_timestamp; +include!(concat!(env!("OUT_DIR"), "/mz_repr.timestamp.rs")); + /// System-wide timestamp type. #[derive( Clone, @@ -36,6 +39,18 @@ pub struct Timestamp { internal: u64, } +impl RustType for Timestamp { + fn into_proto(&self) -> ProtoTimestamp { + ProtoTimestamp { + internal: self.into(), + } + } + + fn from_proto(proto: ProtoTimestamp) -> Result { + Ok(Timestamp::new(proto.internal)) + } +} + pub trait TimestampManipulation: timely::progress::Timestamp + timely::order::TotalOrder diff --git a/src/sql-parser/src/ast/defs/statement.rs b/src/sql-parser/src/ast/defs/statement.rs index 476ea6cccea9e..5f16bd53d0d26 100644 --- a/src/sql-parser/src/ast/defs/statement.rs +++ b/src/sql-parser/src/ast/defs/statement.rs @@ -3408,6 +3408,8 @@ impl AstDisplay for ExplainStage { } impl_display!(ExplainStage); +/// What is being explained. +/// The bools mean whether this is an EXPLAIN BROKEN. #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub enum Explainee { View(T::ItemName), diff --git a/src/sql/src/plan/statement/ddl.rs b/src/sql/src/plan/statement/ddl.rs index 1b5726a8d16b4..5f4cf97355fe7 100644 --- a/src/sql/src/plan/statement/ddl.rs +++ b/src/sql/src/plan/statement/ddl.rs @@ -2141,6 +2141,10 @@ pub fn plan_create_materialized_view( // See `Timestamp::round_up`. sql_bail!("REFRESH interval must not involve units larger than days"); } + let interval = interval.duration()?; + if u64::try_from(interval.as_millis()).is_err() { + sql_bail!("REFRESH interval too large"); + } let mut aligned_to = match aligned_to { Some(aligned_to) => aligned_to, @@ -2178,7 +2182,7 @@ pub fn plan_create_materialized_view( .ok_or_else(|| PlanError::InvalidRefreshEveryAlignedTo)?; refresh_schedule.everies.push(RefreshEvery { - interval: interval.duration()?, + interval, aligned_to: aligned_to_const, }); } diff --git a/test/sqllogictest/materialized_views.slt b/test/sqllogictest/materialized_views.slt index 0539f4229f260..6d0a7f48e9cb4 100644 --- a/test/sqllogictest/materialized_views.slt +++ b/test/sqllogictest/materialized_views.slt @@ -750,13 +750,12 @@ CREATE MATERIALIZED VIEW mv_bad WITH (ASSERT NOT NULL x REFRESH EVERY '8 seconds query error Expected literal string, found ASSERT CREATE MATERIALIZED VIEW mv_bad WITH (REFRESH EVERY ASSERT NOT NULL x) AS SELECT * FROM t2; -# Test that we call `transform_ast::transform`. (This has an `Expr::Nested`, which needs to be desugared, or we panic.) -query error db error: ERROR: REFRESH options other than ON COMMIT are not supported -CREATE MATERIALIZED VIEW mv_desugar1 WITH (REFRESH AT (mz_now())) AS SELECT * FROM t2; +query error db error: ERROR: invalid input syntax for type interval: Overflows maximum days; cannot exceed 2147483647/\-2147483648 days: "213503982001" +CREATE MATERIALIZED VIEW mv_bad WITH (REFRESH EVERY '213503982001' days) AS SELECT * FROM t2; -# Same with ALIGNED TO -query error db error: ERROR: REFRESH options other than ON COMMIT are not supported -CREATE MATERIALIZED VIEW mv_desugar2 WITH (REFRESH EVERY '1 day' ALIGNED TO (mz_now())) AS SELECT * FROM t2; +# This tests that we don't forget to purify EXPLAIN CREATE MATERIALIZED VIEW +statement ok +EXPLAIN CREATE MATERIALIZED VIEW mv_explain WITH (REFRESH EVERY '2 seconds') AS SELECT * FROM t2; statement ok CREATE MATERIALIZED VIEW mv_on_commit WITH (REFRESH ON COMMIT) AS SELECT * FROM t2; @@ -768,3 +767,248 @@ FROM mv_on_commit; 7000 8000 NULL 4000 NULL 6000 1000 2000 3000 + +# Test that we call `transform_ast::transform`. (This has an `Expr::Nested`, which needs to be desugared, or we panic.) +statement ok +CREATE MATERIALIZED VIEW mv_desugar1 WITH (REFRESH AT (mz_now())) AS SELECT * FROM t2; + +# Same with ALIGNED TO +statement ok +CREATE MATERIALIZED VIEW mv_desugar2 WITH (REFRESH EVERY '1 day' ALIGNED TO (mz_now())) AS SELECT * FROM t2; + +## REFRESH options together with ASSERT NOT NULL options + +statement ok +INSERT INTO t2 VALUES (10, 11, 12); + +statement ok +CREATE MATERIALIZED VIEW mv_assertion_plus_refresh_every WITH (ASSERT NOT NULL x, REFRESH EVERY '8 seconds') AS SELECT * FROM t2; + +# There should be a refresh immediately when creating the MV. This refresh should already see what we just inserted. +query III +SELECT * FROM mv_assertion_plus_refresh_every ORDER BY x; +---- +1 2 3 +4 NULL 6 +7 8 NULL +10 11 12 + +statement ok +INSERT INTO t2 VALUES (NULL, -1, -2); + +# This insert shouldn't be visible yet. +query III +SELECT * FROM mv_assertion_plus_refresh_every ORDER BY x; +---- +1 2 3 +4 NULL 6 +7 8 NULL +10 11 12 + +# Sleep for the refresh interval, so that we get a refresh. +# Actually, we sleep one more second than the refresh interval, because we don't have real-time recency guarantees: +# When we query the MV at a particular wall clock time time `t`, there is not guarantee that we see a refresh that +# happened at, say, `t - 1ms`. Note that the test was actually failing sometimes when it was sleeping for only 8s. +# A proper solution might be to add `AS OF now()` to this SELECT, but calling `now()` seems to not be currently allowed +# in `AS OF`. +statement ok +SELECT mz_unsafe.mz_sleep(8+1); + +# Now we should see the NULL that should error out the MV. +query error db error: ERROR: Evaluation error: column "x" must not be null +SELECT * FROM mv_assertion_plus_refresh_every ORDER BY x; + +## Check `REFRESH AT greatest(, mz_now())`, because this is an idiom that we are recommending to users. +# Insert something into the underlying table. +statement ok +INSERT INTO t2 VALUES (30, 30, 30); + +statement ok +CREATE MATERIALIZED VIEW mv_greatest +WITH (REFRESH AT greatest('1990-01-04 11:00', mz_now())) +AS SELECT * FROM t2; + +query III +SELECT * FROM mv_greatest; +---- +NULL -1 -2 +4 NULL 6 +7 8 NULL +1 2 3 +10 11 12 +30 30 30 + +## If there is no creation refresh, then a query should block until the first refresh. + +# Save the current time. +statement ok +CREATE TABLE start_time(t timestamp); + +statement ok +INSERT INTO start_time VALUES (now()); + +# Create an MV whose first refresh is 5 sec after its creation. +statement ok +CREATE MATERIALIZED VIEW mv_no_creation_refresh +WITH (REFRESH EVERY '100000 sec' ALIGNED TO mz_now()::string::int8 + 5000) +AS SELECT * FROM t2; + +# Insert something into the underlying table. +statement ok +INSERT INTO t2 VALUES (100, 100, 100); + +# Query it. +# - The query should block until the first refresh. +# - The newly inserted stuff should be visible. +query III +SELECT * FROM mv_no_creation_refresh; +---- +NULL -1 -2 +4 NULL 6 +7 8 NULL +1 2 3 +10 11 12 +30 30 30 +100 100 100 + +# Verify that at least 5 seconds have passed. +query B +SELECT now() - (SELECT * from start_time) >= INTERVAL '5 sec'; +---- +true + +## Check ALIGNED TO in the far past + +# Save the current time. +statement ok +DELETE FROM start_time; + +statement ok +INSERT INTO start_time VALUES (now()); + +statement ok +CREATE MATERIALIZED VIEW mv_aligned_to_past +WITH (REFRESH EVERY '10000 ms' ALIGNED TO mz_now()::text::int8 - 10*10000 + 3000) +AS SELECT * FROM t2; + +query III +SELECT * FROM mv_aligned_to_past; +---- +NULL -1 -2 +4 NULL 6 +7 8 NULL +1 2 3 +10 11 12 +30 30 30 +100 100 100 + +# Verify that at least 3 seconds have passed. +query B +SELECT now() - (SELECT * from start_time) >= INTERVAL '3 sec'; +---- +true + +## Check ALIGNED TO in the far future + +# Save the current time. +statement ok +DELETE FROM start_time; + +statement ok +INSERT INTO start_time VALUES (now()); + +statement ok +CREATE MATERIALIZED VIEW mv_aligned_to_future +WITH (REFRESH EVERY '10000 ms' ALIGNED TO mz_now()::text::int8 + 10*10000 + 3000) +AS SELECT * FROM t2; + +query III +SELECT * FROM mv_aligned_to_future; +---- +NULL -1 -2 +4 NULL 6 +7 8 NULL +1 2 3 +10 11 12 +30 30 30 +100 100 100 + +# Verify that at least 3 seconds have passed. +query B +SELECT now() - (SELECT * from start_time) >= INTERVAL '3 sec'; +---- +true + +## Constant query in an MV with REFRESH options +statement ok +CREATE MATERIALIZED VIEW const_mv +WITH (REFRESH EVERY '1 day') +AS SELECT 1; + +query I +SELECT * FROM const_mv +---- +1 + +## We should be able to immediately query a constant MV even if the first refresh is in the future +statement ok +CREATE MATERIALIZED VIEW const_mv2 +WITH (REFRESH AT '3000-01-01 23:59') +AS SELECT 2; + +query I +SELECT * FROM const_mv2 +---- +2 + +## MV that has refreshes only in the past +query error db error: ERROR: all the specified refreshes of the materialized view would be too far in the past, and thus they would never happen +CREATE MATERIALIZED VIEW mv_no_refresh +WITH (REFRESH AT '2000-01-01 10:00') +AS SELECT * FROM t2; + +## Query MV after the last refresh +statement ok +CREATE MATERIALIZED VIEW mv3 +WITH (REFRESH AT mz_now()::text::int8 + 2000, REFRESH AT mz_now()::text::int8 + 4000) +AS SELECT * FROM t2; + +# Wait until the first refresh +query III +SELECT * FROM mv3; +---- +NULL -1 -2 +4 NULL 6 +7 8 NULL +1 2 3 +10 11 12 +30 30 30 +100 100 100 + +# Wait until the second refresh, which is the last one +query III +SELECT * FROM mv3; +---- +NULL -1 -2 +4 NULL 6 +7 8 NULL +1 2 3 +10 11 12 +30 30 30 +100 100 100 + +# This insert will happen after the last refresh. +statement ok +INSERT INTO t2 VALUES (70, 70, 70); + +# We should be able to query the MV after the last refresh, and the newly inserted data shouldn't be visible. +query III +SELECT * FROM mv3; +---- +NULL -1 -2 +4 NULL 6 +7 8 NULL +1 2 3 +10 11 12 +30 30 30 +100 100 100 diff --git a/test/testdrive/materialized-view-refresh-options.td b/test/testdrive/materialized-view-refresh-options.td new file mode 100644 index 0000000000000..6ee54fd624d62 --- /dev/null +++ b/test/testdrive/materialized-view-refresh-options.td @@ -0,0 +1,178 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +$ postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}/materialize +ALTER SYSTEM SET enable_refresh_every_mvs = true + +> CREATE DATABASE materialized_view_refresh_options; +> SET DATABASE = materialized_view_refresh_options; + +> CREATE TABLE t1(x int); + +> INSERT INTO t1 VALUES (1); + +# This refresh interval needs to be not too small and not too big. See the constraints in comments below. +> CREATE MATERIALIZED VIEW mv1 + WITH (REFRESH EVERY '8sec') + AS SELECT x+x as x2 FROM t1; + +> INSERT INTO t1 VALUES (3); + +# The following will not immediately return the recently inserted values, but Testdrive will wait. +# Warning: This test assumes that Testdrive's timeout is greater than the above refresh interval. +> SELECT * FROM mv1; +2 +6 + +> INSERT INTO t1 VALUES (4); + +# What we just inserted shouldn't be in the mv yet, because we are just after a refresh (because the previous SELECT +# returned correct results only after a refresh). +# Warning: this test assumes that the above INSERT completes within the above refresh interval. If we have some +# transient infrastructure problem that makes the INSERT really slow, then this test will fail. +> SELECT * FROM mv1; +2 +6 + +> SELECT * FROM mv1; +2 +6 +8 + +# Check that I can query it together with other objects, even between refreshes, and that data added later than the last +# refresh in other objects is reflected in the result. +> CREATE MATERIALIZED VIEW mv2 + WITH (REFRESH = EVERY '10000sec') + AS SELECT x+x as x2 FROM t1; + +> CREATE TABLE t2(y int); + +> INSERT INTO t2 VALUES (100); + +> (SELECT * from mv2) UNION (SELECT * FROM t2); +2 +6 +8 +100 + +# The following DELETE shouldn't affect mv2, because mv2 has a very large refresh interval. +> DELETE FROM t1; + +> (SELECT * from mv2) UNION (SELECT * FROM t2); +2 +6 +8 +100 + +# Check that there is an implicit refresh immediately at the creation of the MV, even if it's REFRESH EVERY. +> CREATE MATERIALIZED VIEW mv3 + WITH (REFRESH EVERY '10000sec') + AS SELECT y+y as y2 FROM t2; + +> SELECT * FROM mv3; +200 + +# Check that mz_now() occurring in the original statement works. This tests that after we purify away `mz_now()`, we +# also remove it from `resolved_ids`. Importantly, this has to be a Testdrive test, and not an slt test, because slt +# doesn't do the "the in-memory state of the catalog does not match its on-disk state" check. +# +# Also tests that planning uses `cast_to` with `CastContext::Implicit` (instead of `type_as`) when planning the +# REFRESH AT. +> CREATE MATERIALIZED VIEW mv4 + WITH (REFRESH AT mz_now()::string::int8 + 2000) + AS SELECT y*y as y2 FROM t2; + +> SELECT * FROM mv4; +10000 + +## Check turning the replica off and on + +> CREATE CLUSTER refresh_cluster SIZE = '1', REPLICATION FACTOR = 1; +> SET cluster = refresh_cluster; +> CREATE MATERIALIZED VIEW mv5 + WITH (REFRESH EVERY '8 sec' ALIGNED TO mz_now()::text::int8 + 5000) + AS SELECT 3*y as y2 FROM t2; +> SET cluster = default; + +> SELECT * FROM mv5; +300 + +> INSERT INTO t2 VALUES (102); + +# Wait until the insert is reflected, so that we are just after a refresh. This is important, because otherwise the +# below `SET (REPLICATION FACTOR 0)` and the `SELECT` after that might straddle a refresh, in which case the `SELECT` +# would hang forever. +> SELECT * FROM mv5; +300 +306 + +> ALTER CLUSTER refresh_cluster SET (REPLICATION FACTOR 0); + +> SELECT * FROM mv5; +300 +306 + +> ALTER CLUSTER refresh_cluster SET (REPLICATION FACTOR 1); + +> SELECT * FROM mv5; +300 +306 + +> INSERT INTO t2 VALUES (110); + +# Wait until the insert is reflected, so we are just after a refresh. +> SELECT * FROM mv5; +300 +306 +330 + +# Turn off the cluster, and insert something, and then sleep through a scheduled refresh. +> ALTER CLUSTER refresh_cluster SET (REPLICATION FACTOR 0); + +> INSERT INTO t2 VALUES (120); + +# (See the explanation for the `+1` in materialized_views.slt at a similar `mz_sleep`.) +> SELECT mz_unsafe.mz_sleep(8+1); + + +# Turn it back on, and check that we recover. Data that were added while we slept should be visible now. +> ALTER CLUSTER refresh_cluster SET (REPLICATION FACTOR 1); + +> SELECT * FROM mv5; +300 +306 +330 +360 + +# REFRESH AT + REFRESH EVERY + +> CREATE TABLE t3(x int); +> INSERT INTO t3 VALUES (1); + +> CREATE MATERIALIZED VIEW mv6 WITH (REFRESH AT mz_now()::text::int8 + 6000, REFRESH EVERY '8 seconds') AS SELECT * FROM t3; +> SELECT * FROM mv6 +1 + +> INSERT INTO t3 VALUES (2); +> SELECT * FROM mv6 +1 +2 + +> SELECT mz_unsafe.mz_sleep(8+1); + + +> INSERT INTO t3 VALUES (3); +> SELECT * FROM mv6 +1 +2 +3 + +######## Cleanup ######## +> DROP DATABASE materialized_view_refresh_options CASCADE; +> DROP CLUSTER refresh_cluster CASCADE; diff --git a/test/testdrive/materialized-views.td b/test/testdrive/materialized-views.td index 2ca880da6485a..200a049f9ab44 100644 --- a/test/testdrive/materialized-views.td +++ b/test/testdrive/materialized-views.td @@ -1,4 +1,3 @@ - # Copyright Materialize, Inc. and contributors. All rights reserved. # # Use of this software is governed by the Business Source License