From f421c70ff88bc8cb1857615921daea00d15da7e7 Mon Sep 17 00:00:00 2001 From: Gabor Gevay Date: Mon, 11 Dec 2023 19:11:53 +0100 Subject: [PATCH] Add REFRESH options for materialized views --- .../checks/all_checks/null_value.py | 2 +- .../checks/all_checks/text_bytea_types.py | 2 +- .../materialize/checks/all_checks/top_k.py | 12 +- .../src/catalog/builtin_table_updates.rs | 21 +- src/adapter/src/catalog/open.rs | 1 + src/adapter/src/catalog/state.rs | 1 + src/adapter/src/coord.rs | 1 + src/adapter/src/coord/command_handler.rs | 92 ++++- src/adapter/src/coord/sequencer/inner.rs | 35 +- src/adapter/src/optimize/materialized_view.rs | 6 + src/adapter/src/optimize/subscribe.rs | 4 + src/catalog/src/memory/objects.rs | 2 + src/compute-client/src/controller/instance.rs | 1 + src/compute-types/build.rs | 1 + src/compute-types/src/sinks.proto | 2 + src/compute-types/src/sinks.rs | 32 +- src/compute/src/sink/persist_sink.rs | 98 ++++- src/ore/src/vec.rs | 2 + src/repr/build.rs | 2 + src/repr/src/adt/interval.rs | 35 ++ src/repr/src/lib.rs | 1 + src/repr/src/refresh_schedule.proto | 24 ++ src/repr/src/refresh_schedule.rs | 250 +++++++++++++ src/repr/src/timestamp.proto | 16 + src/repr/src/timestamp.rs | 64 ++++ src/sql-lexer/src/keywords.txt | 3 + src/sql-parser/src/ast/defs/ddl.rs | 3 + src/sql-parser/src/ast/defs/statement.rs | 51 ++- src/sql-parser/src/parser.rs | 60 ++- src/sql-parser/tests/testdata/ddl | 14 + src/sql/src/catalog.rs | 4 +- src/sql/src/names.rs | 19 +- src/sql/src/plan.rs | 3 + src/sql/src/plan/expr.rs | 22 ++ src/sql/src/plan/statement.rs | 3 +- src/sql/src/plan/statement/ddl.rs | 130 ++++++- src/sql/src/plan/statement/dml.rs | 2 + src/sql/src/plan/with_options.rs | 29 +- src/sql/src/pure.rs | 265 ++++++++++++- src/sql/src/session/vars.rs | 7 + test/sqllogictest/materialized_views.slt | 349 +++++++++++++++++- test/sqllogictest/rename.slt | 2 +- test/testdrive/materializations.td | 2 +- .../materialized-view-refresh-options.td | 134 +++++++ test/testdrive/materialized-views.td | 1 - 45 files changed, 1757 insertions(+), 53 deletions(-) create mode 100644 src/repr/src/refresh_schedule.proto create mode 100644 src/repr/src/refresh_schedule.rs create mode 100644 src/repr/src/timestamp.proto create mode 100644 test/testdrive/materialized-view-refresh-options.td diff --git a/misc/python/materialize/checks/all_checks/null_value.py b/misc/python/materialize/checks/all_checks/null_value.py index afb90baaaf0a3..523062d9f29ed 100644 --- a/misc/python/materialize/checks/all_checks/null_value.py +++ b/misc/python/materialize/checks/all_checks/null_value.py @@ -70,7 +70,7 @@ def validate(self) -> Testdrive: > SHOW CREATE MATERIALIZED VIEW null_value_view2; - materialize.public.null_value_view2 "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"null_value_view2\\" IN CLUSTER \\"default\\" AS SELECT \\"f1\\", \\"f2\\", NULL FROM \\"materialize\\".\\"public\\".\\"null_value_table\\" WHERE \\"f1\\" IS NULL OR \\"f1\\" IS NOT NULL OR \\"f1\\" = NULL" + materialize.public.null_value_view2 "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"null_value_view2\\" IN CLUSTER \\"default\\" WITH (REFRESH = ON COMMIT) AS SELECT \\"f1\\", \\"f2\\", NULL FROM \\"materialize\\".\\"public\\".\\"null_value_table\\" WHERE \\"f1\\" IS NULL OR \\"f1\\" IS NOT NULL OR \\"f1\\" = NULL" > SELECT * FROM null_value_view2; diff --git a/misc/python/materialize/checks/all_checks/text_bytea_types.py b/misc/python/materialize/checks/all_checks/text_bytea_types.py index e8207c060adaf..b3d920b692337 100644 --- a/misc/python/materialize/checks/all_checks/text_bytea_types.py +++ b/misc/python/materialize/checks/all_checks/text_bytea_types.py @@ -53,7 +53,7 @@ def validate(self) -> Testdrive: dedent( """ > SHOW CREATE MATERIALIZED VIEW string_bytea_types_view1; - materialize.public.string_bytea_types_view1 "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"string_bytea_types_view1\\" IN CLUSTER \\"default\\" AS SELECT \\"text_col\\", \\"bytea_col\\", 'това'::\\"pg_catalog\\".\\"text\\", '\\\\xAAAA'::\\"pg_catalog\\".\\"bytea\\" FROM \\"materialize\\".\\"public\\".\\"text_bytea_types_table\\" WHERE \\"text_col\\" >= ''::\\"pg_catalog\\".\\"text\\" AND \\"bytea_col\\" >= ''::\\"pg_catalog\\".\\"bytea\\"" + materialize.public.string_bytea_types_view1 "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"string_bytea_types_view1\\" IN CLUSTER \\"default\\" WITH (REFRESH = ON COMMIT) AS SELECT \\"text_col\\", \\"bytea_col\\", 'това'::\\"pg_catalog\\".\\"text\\", '\\\\xAAAA'::\\"pg_catalog\\".\\"bytea\\" FROM \\"materialize\\".\\"public\\".\\"text_bytea_types_table\\" WHERE \\"text_col\\" >= ''::\\"pg_catalog\\".\\"text\\" AND \\"bytea_col\\" >= ''::\\"pg_catalog\\".\\"bytea\\"" > SELECT text_col, text, LENGTH(bytea_col), LENGTH(bytea) FROM string_bytea_types_view1; aaaa това 2 2 diff --git a/misc/python/materialize/checks/all_checks/top_k.py b/misc/python/materialize/checks/all_checks/top_k.py index a54944cccfe4a..9d3284e645ea6 100644 --- a/misc/python/materialize/checks/all_checks/top_k.py +++ b/misc/python/materialize/checks/all_checks/top_k.py @@ -60,14 +60,14 @@ def validate(self) -> Testdrive: dedent( """ > SHOW CREATE MATERIALIZED VIEW basic_topk_view1; - materialize.public.basic_topk_view1 "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"basic_topk_view1\\" IN CLUSTER \\"default\\" AS SELECT \\"f1\\", \\"pg_catalog\\".\\"count\\"(\\"f1\\") FROM \\"materialize\\".\\"public\\".\\"basic_topk_table\\" GROUP BY \\"f1\\" ORDER BY \\"f1\\" DESC NULLS LAST LIMIT 2" + materialize.public.basic_topk_view1 "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"basic_topk_view1\\" IN CLUSTER \\"default\\" WITH (REFRESH = ON COMMIT) AS SELECT \\"f1\\", \\"pg_catalog\\".\\"count\\"(\\"f1\\") FROM \\"materialize\\".\\"public\\".\\"basic_topk_table\\" GROUP BY \\"f1\\" ORDER BY \\"f1\\" DESC NULLS LAST LIMIT 2" > SELECT * FROM basic_topk_view1; 2 32 3 48 > SHOW CREATE MATERIALIZED VIEW basic_topk_view2; - materialize.public.basic_topk_view2 "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"basic_topk_view2\\" IN CLUSTER \\"default\\" AS SELECT \\"f1\\", \\"pg_catalog\\".\\"count\\"(\\"f1\\") FROM \\"materialize\\".\\"public\\".\\"basic_topk_table\\" GROUP BY \\"f1\\" ORDER BY \\"f1\\" ASC NULLS FIRST LIMIT 2" + materialize.public.basic_topk_view2 "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"basic_topk_view2\\" IN CLUSTER \\"default\\" WITH (REFRESH = ON COMMIT) AS SELECT \\"f1\\", \\"pg_catalog\\".\\"count\\"(\\"f1\\") FROM \\"materialize\\".\\"public\\".\\"basic_topk_table\\" GROUP BY \\"f1\\" ORDER BY \\"f1\\" ASC NULLS FIRST LIMIT 2" > SELECT * FROM basic_topk_view2; 1 16 @@ -123,14 +123,14 @@ def validate(self) -> Testdrive: dedent( """ > SHOW CREATE MATERIALIZED VIEW monotonic_topk_view1; - materialize.public.monotonic_topk_view1 "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"monotonic_topk_view1\\" IN CLUSTER \\"default\\" AS SELECT \\"f1\\", \\"pg_catalog\\".\\"count\\"(\\"f1\\") FROM \\"materialize\\".\\"public\\".\\"monotonic_topk_source\\" GROUP BY \\"f1\\" ORDER BY \\"f1\\" DESC NULLS LAST LIMIT 2" + materialize.public.monotonic_topk_view1 "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"monotonic_topk_view1\\" IN CLUSTER \\"default\\" WITH (REFRESH = ON COMMIT) AS SELECT \\"f1\\", \\"pg_catalog\\".\\"count\\"(\\"f1\\") FROM \\"materialize\\".\\"public\\".\\"monotonic_topk_source\\" GROUP BY \\"f1\\" ORDER BY \\"f1\\" DESC NULLS LAST LIMIT 2" > SELECT * FROM monotonic_topk_view1; E 5 D 4 > SHOW CREATE MATERIALIZED VIEW monotonic_topk_view2; - materialize.public.monotonic_topk_view2 "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"monotonic_topk_view2\\" IN CLUSTER \\"default\\" AS SELECT \\"f1\\", \\"pg_catalog\\".\\"count\\"(\\"f1\\") FROM \\"materialize\\".\\"public\\".\\"monotonic_topk_source\\" GROUP BY \\"f1\\" ORDER BY \\"f1\\" ASC NULLS FIRST LIMIT 2" + materialize.public.monotonic_topk_view2 "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"monotonic_topk_view2\\" IN CLUSTER \\"default\\" WITH (REFRESH = ON COMMIT) AS SELECT \\"f1\\", \\"pg_catalog\\".\\"count\\"(\\"f1\\") FROM \\"materialize\\".\\"public\\".\\"monotonic_topk_source\\" GROUP BY \\"f1\\" ORDER BY \\"f1\\" ASC NULLS FIRST LIMIT 2" > SELECT * FROM monotonic_topk_view2; A 1 @@ -186,13 +186,13 @@ def validate(self) -> Testdrive: dedent( """ > SHOW CREATE MATERIALIZED VIEW monotonic_top1_view1; - materialize.public.monotonic_top1_view1 "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"monotonic_top1_view1\\" IN CLUSTER \\"default\\" AS SELECT \\"f1\\", \\"pg_catalog\\".\\"count\\"(\\"f1\\") FROM \\"materialize\\".\\"public\\".\\"monotonic_top1_source\\" GROUP BY \\"f1\\" ORDER BY \\"f1\\" DESC NULLS LAST LIMIT 1" + materialize.public.monotonic_top1_view1 "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"monotonic_top1_view1\\" IN CLUSTER \\"default\\" WITH (REFRESH = ON COMMIT) AS SELECT \\"f1\\", \\"pg_catalog\\".\\"count\\"(\\"f1\\") FROM \\"materialize\\".\\"public\\".\\"monotonic_top1_source\\" GROUP BY \\"f1\\" ORDER BY \\"f1\\" DESC NULLS LAST LIMIT 1" > SELECT * FROM monotonic_top1_view1; D 5 > SHOW CREATE MATERIALIZED VIEW monotonic_top1_view2; - materialize.public.monotonic_top1_view2 "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"monotonic_top1_view2\\" IN CLUSTER \\"default\\" AS SELECT \\"f1\\", \\"pg_catalog\\".\\"count\\"(\\"f1\\") FROM \\"materialize\\".\\"public\\".\\"monotonic_top1_source\\" GROUP BY \\"f1\\" ORDER BY \\"f1\\" ASC NULLS FIRST LIMIT 1" + materialize.public.monotonic_top1_view2 "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"monotonic_top1_view2\\" IN CLUSTER \\"default\\" WITH (REFRESH = ON COMMIT) AS SELECT \\"f1\\", \\"pg_catalog\\".\\"count\\"(\\"f1\\") FROM \\"materialize\\".\\"public\\".\\"monotonic_top1_source\\" GROUP BY \\"f1\\" ORDER BY \\"f1\\" ASC NULLS FIRST LIMIT 1" > SELECT * FROM monotonic_top1_view2; A 1 diff --git a/src/adapter/src/catalog/builtin_table_updates.rs b/src/adapter/src/catalog/builtin_table_updates.rs index b1853976a5de2..a06e443537fe0 100644 --- a/src/adapter/src/catalog/builtin_table_updates.rs +++ b/src/adapter/src/catalog/builtin_table_updates.rs @@ -735,7 +735,12 @@ impl CatalogState { diff: Diff, ) -> Vec { let create_stmt = mz_sql::parse::parse(&view.create_sql) - .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", view.create_sql)) + .unwrap_or_else(|e| { + panic!( + "create_sql cannot be invalid: `{}` --- error: `{}`", + view.create_sql, e + ) + }) .into_element() .ast; let query = match &create_stmt { @@ -777,7 +782,12 @@ impl CatalogState { diff: Diff, ) -> Vec { let create_stmt = mz_sql::parse::parse(&mview.create_sql) - .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", mview.create_sql)) + .unwrap_or_else(|e| { + panic!( + "create_sql cannot be invalid: `{}` --- error: `{}`", + mview.create_sql, e + ) + }) .into_element() .ast; let query = match &create_stmt { @@ -875,7 +885,12 @@ impl CatalogState { let mut updates = vec![]; let create_stmt = mz_sql::parse::parse(&index.create_sql) - .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", index.create_sql)) + .unwrap_or_else(|e| { + panic!( + "create_sql cannot be invalid: `{}` --- error: `{}`", + index.create_sql, e + ) + }) .into_element() .ast; diff --git a/src/adapter/src/catalog/open.rs b/src/adapter/src/catalog/open.rs index 4de5a0f5cae56..d0c13ae46692a 100644 --- a/src/adapter/src/catalog/open.rs +++ b/src/adapter/src/catalog/open.rs @@ -1904,6 +1904,7 @@ mod builtin_migration_tests { resolved_ids: ResolvedIds(BTreeSet::from_iter(resolved_ids)), cluster_id: ClusterId::User(1), non_null_assertions: vec![], + refresh_schedule: None, }) } SimplifiedItem::Index { on } => { diff --git a/src/adapter/src/catalog/state.rs b/src/adapter/src/catalog/state.rs index afaa3862c099b..d943ddd006889 100644 --- a/src/adapter/src/catalog/state.rs +++ b/src/adapter/src/catalog/state.rs @@ -927,6 +927,7 @@ impl CatalogState { resolved_ids, cluster_id: materialized_view.cluster_id, non_null_assertions: materialized_view.non_null_assertions, + refresh_schedule: materialized_view.refresh_schedule, }) } Plan::CreateIndex(CreateIndexPlan { index, .. }) => CatalogItem::Index(Index { diff --git a/src/adapter/src/coord.rs b/src/adapter/src/coord.rs index 9e85bdf54d49b..ac5f56dcabf54 100644 --- a/src/adapter/src/coord.rs +++ b/src/adapter/src/coord.rs @@ -1711,6 +1711,7 @@ impl Coordinator { internal_view_id, mv.desc.iter_names().cloned().collect(), mv.non_null_assertions.clone(), + mv.refresh_schedule.clone(), debug_name, optimizer_config.clone(), ); diff --git a/src/adapter/src/coord/command_handler.rs b/src/adapter/src/coord/command_handler.rs index 3a222bd6d1166..eafda297cd1a5 100644 --- a/src/adapter/src/coord/command_handler.rs +++ b/src/adapter/src/coord/command_handler.rs @@ -27,16 +27,21 @@ use mz_sql::ast::{ CopyRelation, CopyStatement, InsertSource, Query, Raw, SetExpr, Statement, SubscribeStatement, }; use mz_sql::catalog::RoleAttributes; -use mz_sql::names::{PartialItemName, ResolvedIds}; +use mz_sql::names::{Aug, PartialItemName, ResolvedIds}; use mz_sql::plan::{ AbortTransactionPlan, CommitTransactionPlan, CreateRolePlan, Params, Plan, TransactionType, }; +use mz_sql::pure::{ + materialized_view_option_contains_temporal, purify_create_materialized_view_options, +}; use mz_sql::rbac; use mz_sql::rbac::CREATE_ITEM_USAGE; use mz_sql::session::user::User; use mz_sql::session::vars::{ EndTransactionAction, OwnedVarInput, Var, STATEMENT_LOGGING_SAMPLE_RATE, }; +use mz_sql_parser::ast::CreateMaterializedViewStatement; +use mz_storage_types::sources::Timeline; use opentelemetry::trace::TraceContextExt; use tokio::sync::{mpsc, oneshot, watch}; use tracing::{debug_span, Instrument}; @@ -53,7 +58,7 @@ use crate::notice::AdapterNotice; use crate::session::{Session, TransactionOps, TransactionStatus}; use crate::util::{ClientTransmitter, ResultExt}; use crate::webhook::{AppendWebhookResponse, AppendWebhookValidator}; -use crate::{catalog, metrics, ExecuteContext}; +use crate::{catalog, metrics, ExecuteContext, TimelineContext}; use super::ExecuteContextExtra; @@ -614,7 +619,7 @@ impl Coordinator { let catalog = self.catalog(); let catalog = catalog.for_session(ctx.session()); let original_stmt = stmt.clone(); - let (stmt, resolved_ids) = match mz_sql::names::resolve(&catalog, stmt) { + let (stmt, mut resolved_ids) = match mz_sql::names::resolve(&catalog, stmt) { Ok(resolved) => resolved, Err(e) => return ctx.retire(Err(e.into())), }; @@ -679,6 +684,87 @@ impl Coordinator { "CREATE SUBSOURCE statements", ))), + Statement::CreateMaterializedView(mut cmvs) => { + // (This won't be the same timestamp as the system table inserts, unfortunately.) + let mz_now = if cmvs + .with_options + .iter() + .any(materialized_view_option_contains_temporal) + { + let timeline_context = + match self.validate_timeline_context(resolved_ids.0.clone()) { + Ok(tc) => tc, + Err(e) => return ctx.retire(Err(e)), + }; + let timeline = match timeline_context { + TimelineContext::TimelineDependent(timeline) => timeline, + TimelineContext::TimestampDependent + | TimelineContext::TimestampIndependent => { + // We default to EpochMilliseconds, similarly to `determine_timestamp_for`. + // Note that we didn't accurately decide whether we are TimestampDependent + // or TimestampIndependent, because for this we'd need to also check whether + // `query.contains_temporal()`, similarly to how `peek_stage_validate` does. + // However, this doesn't matter here, as we are just going to default to + // EpochMilliseconds in both cases. + Timeline::EpochMilliseconds + } + }; + Some(self.get_timestamp_oracle(&timeline).read_ts().await) + // TODO: It might be good to take into account `least_valid_read` in addition to + // the oracle's `read_ts`, but there are two problems: + // 1. At this point, we don't know which indexes would be used. We could do an + // overestimation here by grabbing the ids of all indexes that are on ids + // involved in the query. (We'd have to recursively follow view definitions, + // similarly to `validate_timeline_context`.) + // 2. For a peak, when the `least_valid_read` is later than the oracle's + // `read_ts`, then the peak doesn't return before it completes at the chosen + // timestamp. However, for a CRATE MATERIALIZED VIEW statement, it's not clear + // whether we want to make it block until the chosen time. If it doesn't block, + // then the initial refresh wouldn't be linearized with the CREATE MATERIALIZED + // VIEW statement. + // + // Note: The Adapter is usually keeping a read hold of all objects at the oracle + // read timestamp, so `least_valid_read` usually won't actually be later than + // the oracle's `read_ts`. (see `Coordinator::advance_timelines`) + // + // Note 2: If we choose a timestamp here that is earlier than + // `least_valid_read`, that is somewhat bad, but not catastrophic: The only + // bad thing that happens is that we won't perform that refresh that was + // specified to be at `mz_now()` (which is usually the initial refresh) + // (similarly to how we don't perform refreshes that were specified to be in the + // past). + } else { + None + }; + + let owned_catalog = self.owned_catalog(); + let catalog = owned_catalog.for_session(ctx.session()); + + purify_create_materialized_view_options( + catalog, + mz_now, + &mut cmvs, + &mut resolved_ids, + ); + + let purified_stmt = + Statement::CreateMaterializedView(CreateMaterializedViewStatement:: { + if_exists: cmvs.if_exists, + name: cmvs.name, + columns: cmvs.columns, + in_cluster: cmvs.in_cluster, + query: cmvs.query, + with_options: cmvs.with_options, + }); + + // (Purifying CreateMaterializedView doesn't happen async, so no need to send + // `Message::PurifiedStatementReady` here.) + match self.plan_statement(ctx.session(), purified_stmt, ¶ms, &resolved_ids) { + Ok(plan) => self.sequence_plan(ctx, plan, resolved_ids).await, + Err(e) => ctx.retire(Err(e)), + } + } + // All other statements are handled immediately. _ => match self.plan_statement(ctx.session(), stmt, ¶ms, &resolved_ids) { Ok(plan) => self.sequence_plan(ctx, plan, resolved_ids).await, diff --git a/src/adapter/src/coord/sequencer/inner.rs b/src/adapter/src/coord/sequencer/inner.rs index 6f13447528bdf..dee5259e486a8 100644 --- a/src/adapter/src/coord/sequencer/inner.rs +++ b/src/adapter/src/coord/sequencer/inner.rs @@ -35,6 +35,7 @@ use mz_repr::explain::json::json_string; use mz_repr::explain::{ ExplainFormat, ExprHumanizer, ExprHumanizerExt, TransientItem, UsedIndexes, }; +use mz_repr::refresh_schedule::RefreshSchedule; use mz_repr::role_id::RoleId; use mz_repr::{ColumnName, Datum, Diff, GlobalId, RelationDesc, Row, RowArena, Timestamp}; use mz_sql::ast::{ExplainStage, IndexOptionName}; @@ -964,6 +965,7 @@ impl Coordinator { column_names, cluster_id, non_null_assertions, + refresh_schedule, }, replace: _, drop_ids, @@ -1013,6 +1015,7 @@ impl Coordinator { internal_view_id, column_names.clone(), non_null_assertions.clone(), + refresh_schedule.clone(), debug_name, optimizer_config, ); @@ -1041,6 +1044,7 @@ impl Coordinator { resolved_ids, cluster_id, non_null_assertions, + refresh_schedule: refresh_schedule.clone(), }), owner_id: *session.current_role_id(), }); @@ -1065,9 +1069,28 @@ impl Coordinator { let mut df_desc = global_lir_plan.unapply().0; // Timestamp selection - let id_bundle = dataflow_import_id_bundle(&df_desc, cluster_id); - let since = coord.least_valid_read(&id_bundle); - df_desc.set_as_of(since.clone()); + let as_of = { + // Normally, `since` should be the least_valid_read. + let id_bundle = dataflow_import_id_bundle(&df_desc, cluster_id); + let mut as_of = coord.least_valid_read(&id_bundle); + // But for MVs with non-trivial REFRESH schedules, it's important to set the + // `since` to the first refresh. This is because we'd like queries on the MV to + // block until the first refresh (rather than to show an empty MV). + if let Some(refresh_schedule) = refresh_schedule { + if let Some(ts) = as_of.as_option() { + let mut rounded_up_as_of = Antichain::new(); + if let Some(rounded_up_ts) = refresh_schedule.round_up_timestamp(*ts) { + rounded_up_as_of.insert(rounded_up_ts); + } else { + // No next refresh. `as_of` will be the empty antichain, so the + // dataflow can terminate quickly. + } + as_of = rounded_up_as_of; + } + } + as_of + }; + df_desc.set_as_of(as_of.clone()); // Announce the creation of the materialized view source. coord @@ -1080,7 +1103,7 @@ impl Coordinator { CollectionDescription { desc: output_desc, data_source: DataSource::Other(DataSourceOther::Compute), - since: Some(since), + since: Some(as_of), status_collection_id: None, }, )], @@ -3309,6 +3332,7 @@ impl Coordinator { cluster_id, broken, non_null_assertions, + refresh_schedule, } => { // Please see the docs on `explain_query_optimizer_pipeline` above. self.explain_create_materialized_view_optimizer_pipeline( @@ -3318,6 +3342,7 @@ impl Coordinator { cluster_id, broken, non_null_assertions, + refresh_schedule, &config, root_dispatch, ) @@ -3649,6 +3674,7 @@ impl Coordinator { target_cluster_id: ClusterId, broken: bool, non_null_assertions: Vec, + refresh_schedule: Option, explain_config: &mz_repr::explain::ExplainConfig, _root_dispatch: tracing::Dispatch, ) -> Result< @@ -3689,6 +3715,7 @@ impl Coordinator { internal_view_id, column_names.clone(), non_null_assertions, + refresh_schedule, debug_name, optimizer_config, ); diff --git a/src/adapter/src/optimize/materialized_view.rs b/src/adapter/src/optimize/materialized_view.rs index 7dd69dffeefce..bd0fe9d414f98 100644 --- a/src/adapter/src/optimize/materialized_view.rs +++ b/src/adapter/src/optimize/materialized_view.rs @@ -32,6 +32,7 @@ use mz_compute_types::plan::Plan; use mz_compute_types::sinks::{ComputeSinkConnection, ComputeSinkDesc, PersistSinkConnection}; use mz_expr::{MirRelationExpr, OptimizedMirRelationExpr}; use mz_repr::explain::trace_plan; +use mz_repr::refresh_schedule::RefreshSchedule; use mz_repr::{ColumnName, GlobalId, RelationDesc}; use mz_sql::plan::HirRelationExpr; use mz_transform::dataflow::DataflowMetainfo; @@ -65,6 +66,8 @@ pub struct Optimizer { /// Output columns that are asserted to be not null in the `CREATE VIEW` /// statement. non_null_assertions: Vec, + /// Refresh schedule, e.g., `REFRESH EVERY '1 day'` + refresh_schedule: Option, /// A human-readable name exposed internally (useful for debugging). debug_name: String, // Optimizer config. @@ -79,6 +82,7 @@ impl Optimizer { internal_view_id: GlobalId, column_names: Vec, non_null_assertions: Vec, + refresh_schedule: Option, debug_name: String, config: OptimizerConfig, ) -> Self { @@ -90,6 +94,7 @@ impl Optimizer { internal_view_id, column_names, non_null_assertions, + refresh_schedule, debug_name, config, } @@ -221,6 +226,7 @@ impl Optimize for Optimizer { with_snapshot: true, up_to: Antichain::default(), non_null_assertions: self.non_null_assertions.clone(), + refresh_schedule: self.refresh_schedule.clone(), }; let df_meta = df_builder.build_sink_dataflow_into( diff --git a/src/adapter/src/optimize/subscribe.rs b/src/adapter/src/optimize/subscribe.rs index fed6b1eb1f931..1d245b21cf682 100644 --- a/src/adapter/src/optimize/subscribe.rs +++ b/src/adapter/src/optimize/subscribe.rs @@ -183,6 +183,8 @@ impl Optimize for Optimizer { up_to: self.up_to.map(Antichain::from_elem).unwrap_or_default(), // No `FORCE NOT NULL` for subscribes non_null_assertions: vec![], + // No `REFRESH EVERY` for subscribes + refresh_schedule: None, }; let mut df_builder = @@ -223,6 +225,8 @@ impl Optimize for Optimizer { up_to: self.up_to.map(Antichain::from_elem).unwrap_or_default(), // No `FORCE NOT NULL` for subscribes non_null_assertions: vec![], + // No `REFRESH EVERY` for subscribes + refresh_schedule: None, }; let mut df_builder = diff --git a/src/catalog/src/memory/objects.rs b/src/catalog/src/memory/objects.rs index deee438e89168..be19ab85eb93a 100644 --- a/src/catalog/src/memory/objects.rs +++ b/src/catalog/src/memory/objects.rs @@ -29,6 +29,7 @@ use mz_controller_types::{ClusterId, ReplicaId}; use mz_expr::{CollectionPlan, MirScalarExpr, OptimizedMirRelationExpr}; use mz_ore::collections::CollectionExt; use mz_repr::adt::mz_acl_item::{AclMode, PrivilegeMap}; +use mz_repr::refresh_schedule::RefreshSchedule; use mz_repr::role_id::RoleId; use mz_repr::{GlobalId, RelationDesc}; use mz_sql::ast::display::AstDisplay; @@ -678,6 +679,7 @@ pub struct MaterializedView { pub resolved_ids: ResolvedIds, pub cluster_id: ClusterId, pub non_null_assertions: Vec, + pub refresh_schedule: Option, } #[derive(Debug, Clone, Serialize)] diff --git a/src/compute-client/src/controller/instance.rs b/src/compute-client/src/controller/instance.rs index e337e7abb35a8..8c5b909d277cd 100644 --- a/src/compute-client/src/controller/instance.rs +++ b/src/compute-client/src/controller/instance.rs @@ -820,6 +820,7 @@ where with_snapshot: se.with_snapshot, up_to: se.up_to, non_null_assertions: se.non_null_assertions, + refresh_schedule: se.refresh_schedule, }; sink_exports.insert(id, desc); } diff --git a/src/compute-types/build.rs b/src/compute-types/build.rs index 016a2a702b4b8..a8869dd81044d 100644 --- a/src/compute-types/build.rs +++ b/src/compute-types/build.rs @@ -102,6 +102,7 @@ fn main() { .extern_path(".mz_repr.adt.regex", "::mz_repr::adt::regex") .extern_path(".mz_repr.antichain", "::mz_repr::antichain") .extern_path(".mz_repr.global_id", "::mz_repr::global_id") + .extern_path(".mz_repr.refresh_schedule", "::mz_repr::refresh_schedule") .extern_path(".mz_repr.relation_and_scalar", "::mz_repr") .extern_path(".mz_repr.explain", "::mz_repr") .extern_path(".mz_repr.row", "::mz_repr") diff --git a/src/compute-types/src/sinks.proto b/src/compute-types/src/sinks.proto index 834b7bbc5762b..ccb7c5cf70e91 100644 --- a/src/compute-types/src/sinks.proto +++ b/src/compute-types/src/sinks.proto @@ -12,6 +12,7 @@ import "google/protobuf/empty.proto"; import "repr/src/antichain.proto"; import "repr/src/global_id.proto"; +import "repr/src/refresh_schedule.proto"; import "repr/src/relation_and_scalar.proto"; import "storage-types/src/controller.proto"; @@ -24,6 +25,7 @@ message ProtoComputeSinkDesc { bool with_snapshot = 4; mz_repr.antichain.ProtoU64Antichain up_to = 5; repeated uint64 non_null_assertions = 6; + mz_repr.refresh_schedule.ProtoRefreshSchedule refresh_schedule = 7; } message ProtoComputeSinkConnection { diff --git a/src/compute-types/src/sinks.rs b/src/compute-types/src/sinks.rs index 23c1fcc806c0f..edea0e4627c9c 100644 --- a/src/compute-types/src/sinks.rs +++ b/src/compute-types/src/sinks.rs @@ -10,7 +10,9 @@ //! Types for describing dataflow sinks. use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError}; -use mz_repr::{GlobalId, RelationDesc}; +use mz_repr::adt::interval::Interval; +use mz_repr::refresh_schedule::{RefreshEvery, RefreshSchedule}; +use mz_repr::{GlobalId, RelationDesc, Timestamp}; use mz_storage_types::controller::CollectionMetadata; use proptest::prelude::{any, Arbitrary, BoxedStrategy, Strategy}; use proptest_derive::Arbitrary; @@ -21,27 +23,43 @@ include!(concat!(env!("OUT_DIR"), "/mz_compute_types.sinks.rs")); /// A sink for updates to a relational collection. #[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] -pub struct ComputeSinkDesc { +pub struct ComputeSinkDesc { pub from: GlobalId, pub from_desc: RelationDesc, pub connection: ComputeSinkConnection, pub with_snapshot: bool, pub up_to: Antichain, pub non_null_assertions: Vec, + pub refresh_schedule: Option, } -impl Arbitrary for ComputeSinkDesc { +impl Arbitrary for ComputeSinkDesc { type Strategy = BoxedStrategy; type Parameters = (); fn arbitrary_with(_: Self::Parameters) -> Self::Strategy { + let interval_strategy: BoxedStrategy = ( + any::(), + any::(), + proptest::collection::vec(any::(), 1..4), + ) + .prop_map(|(interval, aligned_to, ats)| RefreshSchedule { + everies: vec![RefreshEvery { + interval, + aligned_to, + }], + ats, + }) + .boxed(); + ( any::(), any::(), any::>(), any::(), - proptest::collection::vec(any::(), 1..4), + proptest::collection::vec(any::(), 1..4), proptest::collection::vec(any::(), 0..4), + proptest::option::of(interval_strategy), ) .prop_map( |( @@ -51,6 +69,7 @@ impl Arbitrary for ComputeSinkDesc { with_snapshot, up_to_frontier, non_null_assertions, + refresh_schedule, )| { ComputeSinkDesc { from, @@ -59,6 +78,7 @@ impl Arbitrary for ComputeSinkDesc { with_snapshot, up_to: Antichain::from(up_to_frontier), non_null_assertions, + refresh_schedule, } }, ) @@ -66,7 +86,7 @@ impl Arbitrary for ComputeSinkDesc { } } -impl RustType for ComputeSinkDesc { +impl RustType for ComputeSinkDesc { fn into_proto(&self) -> ProtoComputeSinkDesc { ProtoComputeSinkDesc { connection: Some(self.connection.into_proto()), @@ -75,6 +95,7 @@ impl RustType for ComputeSinkDesc for ComputeSinkDesc SinkRender for PersistSinkConnection where G: Scope, { + #[allow(clippy::as_conversions)] fn render_continuous_sink( &self, compute_state: &mut ComputeState, @@ -58,7 +62,13 @@ where where G: Scope, { - let desired_collection = sinked_collection.map(Ok).concat(&err_collection.map(Err)); + let mut desired_collection = sinked_collection.map(Ok).concat(&err_collection.map(Err)); + + // If `REFRESH EVERY` was specified, round up timestamps. + if let Some(refresh_schedule) = &sink.refresh_schedule { + desired_collection = round_up(desired_collection, refresh_schedule.clone()); + } + if sink.up_to != Antichain::default() { unimplemented!( "UP TO is not supported for persist sinks yet, and shouldn't have been accepted during parsing/planning" @@ -1181,3 +1191,87 @@ where let token = Rc::new(shutdown_button.press_on_drop()); (output_stream, token) } + +/// This is for REFRESH options on materialized views. It adds an operator that rounds up the +/// timestamps of data and frontiers to the time of the next refresh. See +/// `doc/developer/design/20231027_refresh_every_mv.md`. +/// +/// Note that this currently only works with 1-dim timestamps. (This is not an issue for WMR, +/// because iteration numbers should disappear by the time the data gets to the Persist sink.) +fn round_up( + coll: Collection, Diff>, + refresh_schedule: RefreshSchedule, +) -> Collection, Diff> +where + G: Scope, +{ + let mut builder = OperatorBuilder::new("round_up".to_string(), coll.scope().clone()); + let (mut output_buf, output_stream) = builder.new_output(); + let mut input = builder.new_input_connection(&coll.inner, Pipeline, vec![Antichain::new()]); + builder.build(move |capabilities| { + let mut capability = Some(capabilities.into_element()); + let mut buffer = Vec::new(); + move |frontiers| { + let ac = frontiers.into_element(); + match ac.frontier().to_owned().into_option() { + Some(ts) => { + match refresh_schedule.round_up_timestamp(ts) { + Some(rounded_up_ts) => { + capability.as_mut().unwrap().downgrade(&rounded_up_ts); + } + None => { + // We are past the last refresh. Drop the capability to signal that we + // are done. + capability = None; + } + } + } + None => { + capability = None; + } + } + if let Some(capability) = &capability { + input.for_each(|_cap, data| { + data.swap(&mut buffer); + let mut cached_ts: Option = None; + let mut cached_rounded_up_ts = None; + buffer + .drain_filter_swapping(|(_d, ts, _r)| { + let rounded_up_ts = { + // We cache the rounded up timestamp for the last seen timestamp, + // because the rounding up has a non-negligible cost. Caching for + // just the 1 last timestamp helps already, because in some common + // cases, we'll be seeing a lot of identical timestamps, e.g., + // during a rehydration, or when we have much more than 1000 records + // during a single second. + let some_ts_clone = Some(ts.clone()); + if cached_ts != some_ts_clone { + cached_ts = some_ts_clone; + cached_rounded_up_ts = refresh_schedule.round_up_timestamp(*ts); + } + cached_rounded_up_ts + }; + match rounded_up_ts { + Some(rounded_up_ts) => { + *ts = rounded_up_ts; + false + } + None => { + // This record is after the last refresh, so drop it. + true + } + } + }) + .for_each(drop); // consume the iterator to apply the filtering + output_buf + .activate() + .session(&capability) + .give_container(&mut buffer); + }); + } + } + }); + + use differential_dataflow::AsCollection; + output_stream.as_collection() +} diff --git a/src/ore/src/vec.rs b/src/ore/src/vec.rs index 0e79a66a7bf3a..0d7ccd4b2542e 100644 --- a/src/ore/src/vec.rs +++ b/src/ore/src/vec.rs @@ -191,6 +191,8 @@ where /// This struct is created by [`VecExt::drain_filter_swapping`]. /// See its documentation for more. /// +/// Warning: The vector is modified only if the iterator is consumed! +/// /// # Example /// /// ``` diff --git a/src/repr/build.rs b/src/repr/build.rs index b4c9f1e6bd1b6..2c4033750eb5c 100644 --- a/src/repr/build.rs +++ b/src/repr/build.rs @@ -92,6 +92,8 @@ fn main() { "repr/src/relation_and_scalar.proto", "repr/src/role_id.proto", "repr/src/url.proto", + "repr/src/refresh_schedule.proto", + "repr/src/timestamp.proto", "repr/src/adt/array.proto", "repr/src/adt/char.proto", "repr/src/adt/date.proto", diff --git a/src/repr/src/adt/interval.rs b/src/repr/src/adt/interval.rs index 4c194de9bbc60..021b1f64e3d64 100644 --- a/src/repr/src/adt/interval.rs +++ b/src/repr/src/adt/interval.rs @@ -16,6 +16,7 @@ use anyhow::{anyhow, bail}; use mz_proto::{RustType, TryFromProtoError}; use num_traits::CheckedMul; use once_cell::sync::Lazy; +use proptest::prelude::{any, Arbitrary, BoxedStrategy, Strategy}; use serde::{Deserialize, Serialize}; use crate::adt::datetime::DateTimeField; @@ -376,6 +377,25 @@ impl Interval { i128::from(self.micros) } + /// Computes the total number of milliseconds in the interval. Discards fractional millisecond! + pub fn as_milliseconds(&self) -> i128 { + // unwrap is safe because i32::MAX/i32::MIN number of months will not overflow an i128 when + // converted to milliseconds. + Self::convert_date_time_unit( + DateTimeField::Month, + DateTimeField::Milliseconds, + i128::from(self.months), + ).unwrap() + + // unwrap is safe because i32::MAX/i32::MIN number of days will not overflow an i128 when + // converted to milliseconds. + Self::convert_date_time_unit( + DateTimeField::Day, + DateTimeField::Milliseconds, + i128::from(self.days), + ).unwrap() + + i128::from(self.micros) / 1000 + } + /// Converts this `Interval`'s duration into `chrono::Duration`. pub fn duration_as_chrono(&self) -> chrono::Duration { use chrono::Duration; @@ -770,6 +790,21 @@ impl fmt::Display for Interval { } } +impl Arbitrary for Interval { + type Strategy = BoxedStrategy; + type Parameters = (); + + fn arbitrary_with(_: Self::Parameters) -> Self::Strategy { + (any::(), any::(), any::()) + .prop_map(|(months, days, micros)| Interval { + months, + days, + micros, + }) + .boxed() + } +} + #[cfg(test)] mod test { use super::*; diff --git a/src/repr/src/lib.rs b/src/repr/src/lib.rs index d5df57cf98139..3eb7036bf1650 100644 --- a/src/repr/src/lib.rs +++ b/src/repr/src/lib.rs @@ -104,6 +104,7 @@ pub mod explain; pub mod fixed_length; pub mod global_id; pub mod namespaces; +pub mod refresh_schedule; pub mod role_id; pub mod stats; pub mod strconv; diff --git a/src/repr/src/refresh_schedule.proto b/src/repr/src/refresh_schedule.proto new file mode 100644 index 0000000000000..16c48b5cc7d7d --- /dev/null +++ b/src/repr/src/refresh_schedule.proto @@ -0,0 +1,24 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. +syntax = "proto3"; + +package mz_repr.refresh_schedule; + +import "repr/src/adt/interval.proto"; +import "repr/src/timestamp.proto"; + +message ProtoRefreshSchedule { + repeated ProtoRefreshEvery everies = 1; + repeated mz_repr.timestamp.ProtoTimestamp ats = 2; +} + +message ProtoRefreshEvery { + mz_repr.adt.interval.ProtoInterval interval = 1; + mz_repr.timestamp.ProtoTimestamp aligned_to = 2; +} diff --git a/src/repr/src/refresh_schedule.rs b/src/repr/src/refresh_schedule.rs new file mode 100644 index 0000000000000..29d4a8cea4eb3 --- /dev/null +++ b/src/repr/src/refresh_schedule.rs @@ -0,0 +1,250 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use crate::adt::interval::Interval; +use crate::Timestamp; +use mz_proto::IntoRustIfSome; +use mz_proto::{ProtoType, RustType, TryFromProtoError}; +use serde::{Deserialize, Serialize}; + +include!(concat!(env!("OUT_DIR"), "/mz_repr.refresh_schedule.rs")); + +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] +pub struct RefreshSchedule { + // `REFRESH EVERY`s + pub everies: Vec, + // `REFRESH AT`s + pub ats: Vec, +} + +impl RefreshSchedule { + pub fn empty() -> RefreshSchedule { + RefreshSchedule { + everies: Vec::new(), + ats: Vec::new(), + } + } + + /// Rounds up the timestamp to the time of the next refresh. + /// Returns None if there is no next refresh. + pub fn round_up_timestamp(&self, timestamp: Timestamp) -> Option { + let next_every = self + .everies + .iter() + // We use a `filter_map` here to simply discard such refreshes where the timestamp + // overflowed, since the system would never reach that time anyway. + .filter_map(|refresh_every| timestamp.round_up(refresh_every)) + .min(); + let next_at = self + .ats + .iter() + .filter(|at| **at >= timestamp) + .min() + .cloned(); + // Take the min of `next_every` and `next_at`, but with considering None to be bigger than + // any Some. Note: Simply `min(next_every, next_at)` wouldn't do what we want, because None + // is smaller than any Some. + next_every.into_iter().chain(next_at).min() + } +} + +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] +pub struct RefreshEvery { + pub interval: Interval, + pub aligned_to: Timestamp, +} + +impl RustType for RefreshSchedule { + fn into_proto(&self) -> ProtoRefreshSchedule { + ProtoRefreshSchedule { + everies: self.everies.into_proto(), + ats: self.ats.into_proto(), + } + } + + fn from_proto(proto: ProtoRefreshSchedule) -> Result { + Ok(RefreshSchedule { + everies: proto.everies.into_rust()?, + ats: proto.ats.into_rust()?, + }) + } +} + +impl RustType for RefreshEvery { + fn into_proto(&self) -> ProtoRefreshEvery { + ProtoRefreshEvery { + interval: Some(self.interval.into_proto()), + aligned_to: Some(self.aligned_to.into_proto()), + } + } + + fn from_proto(proto: ProtoRefreshEvery) -> Result { + Ok(RefreshEvery { + interval: proto + .interval + .into_rust_if_some("ProtoRefreshEvery::interval")?, + aligned_to: proto + .aligned_to + .into_rust_if_some("ProtoRefreshEvery::aligned_to")?, + }) + } +} + +#[cfg(test)] +mod tests { + use crate::adt::interval::Interval; + use crate::refresh_schedule::{RefreshEvery, RefreshSchedule}; + use crate::Timestamp; + use std::str::FromStr; + + #[mz_ore::test] + fn test_round_up_timestamp() { + let ts = |t: u64| Timestamp::new(t); + let test = |schedule: RefreshSchedule| { + move |expected_ts: Option, input_ts| { + assert_eq!( + expected_ts.map(ts), + schedule.round_up_timestamp(ts(input_ts)) + ) + } + }; + { + let schedule = RefreshSchedule { + everies: vec![], + ats: vec![ts(123), ts(456)], + }; + let test = test(schedule); + test(Some(123), 0); + test(Some(123), 50); + test(Some(123), 122); + test(Some(123), 123); + test(Some(456), 124); + test(Some(456), 130); + test(Some(456), 455); + test(Some(456), 456); + test(None, 457); + test(None, 12345678); + test(None, u64::MAX - 1000); + test(None, u64::MAX - 1); + test(None, u64::MAX); + } + { + let schedule = RefreshSchedule { + everies: vec![RefreshEvery { + interval: Interval::from_str("100 milliseconds").unwrap(), + aligned_to: ts(500), + }], + ats: vec![], + }; + let test = test(schedule); + test(Some(0), 0); + test(Some(100), 1); + test(Some(100), 2); + test(Some(100), 99); + test(Some(100), 100); + test(Some(200), 101); + test(Some(200), 102); + test(Some(200), 199); + test(Some(200), 200); + test(Some(300), 201); + test(Some(400), 400); + test(Some(500), 401); + test(Some(500), 450); + test(Some(500), 499); + test(Some(500), 500); + test(Some(600), 501); + test(Some(600), 599); + test(Some(600), 600); + test(Some(700), 601); + test(Some(5434532600), 5434532599); + test(Some(5434532600), 5434532600); + test(Some(5434532700), 5434532601); + test(None, u64::MAX); + } + { + let schedule = RefreshSchedule { + everies: vec![RefreshEvery { + interval: Interval::from_str("100 milliseconds").unwrap(), + aligned_to: ts(542), + }], + ats: vec![], + }; + let test = test(schedule); + test(Some(42), 0); + test(Some(42), 1); + test(Some(42), 41); + test(Some(42), 42); + test(Some(142), 43); + test(Some(442), 441); + test(Some(442), 442); + test(Some(542), 443); + test(Some(542), 541); + test(Some(542), 542); + test(Some(642), 543); + test(None, u64::MAX); + } + { + let schedule = RefreshSchedule { + everies: vec![ + RefreshEvery { + interval: Interval::from_str("100 milliseconds").unwrap(), + aligned_to: ts(400), + }, + RefreshEvery { + interval: Interval::from_str("100 milliseconds").unwrap(), + aligned_to: ts(542), + }, + ], + ats: vec![ts(2), ts(300), ts(400), ts(471), ts(541), ts(123456)], + }; + let test = test(schedule); + test(Some(0), 0); + test(Some(2), 1); + test(Some(2), 2); + test(Some(42), 3); + test(Some(42), 41); + test(Some(42), 42); + test(Some(100), 43); + test(Some(100), 99); + test(Some(100), 100); + test(Some(142), 101); + test(Some(142), 141); + test(Some(142), 142); + test(Some(200), 143); + test(Some(300), 243); + test(Some(300), 299); + test(Some(300), 300); + test(Some(342), 301); + test(Some(400), 343); + test(Some(400), 399); + test(Some(400), 400); + test(Some(442), 401); + test(Some(442), 441); + test(Some(442), 442); + test(Some(471), 443); + test(Some(471), 470); + test(Some(471), 471); + test(Some(500), 472); + test(Some(500), 472); + test(Some(500), 500); + test(Some(541), 501); + test(Some(541), 540); + test(Some(541), 541); + test(Some(542), 542); + test(Some(600), 543); + test(Some(65500), 65454); + test(Some(87842), 87831); + test(Some(123442), 123442); + test(Some(123456), 123443); + test(Some(123456), 123456); + test(Some(123500), 123457); + test(None, u64::MAX); + } + } +} diff --git a/src/repr/src/timestamp.proto b/src/repr/src/timestamp.proto new file mode 100644 index 0000000000000..917a3e4c3b5c5 --- /dev/null +++ b/src/repr/src/timestamp.proto @@ -0,0 +1,16 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +syntax = "proto3"; + +package mz_repr.timestamp; + +message ProtoTimestamp { + uint64 internal = 1; +} diff --git a/src/repr/src/timestamp.rs b/src/repr/src/timestamp.rs index f37ba045e7047..e3c888d2ae4e4 100644 --- a/src/repr/src/timestamp.rs +++ b/src/repr/src/timestamp.rs @@ -12,12 +12,16 @@ use std::num::TryFromIntError; use std::time::Duration; use dec::TryFromDecimalError; +use mz_proto::{RustType, TryFromProtoError}; use proptest_derive::Arbitrary; use serde::{Deserialize, Serialize, Serializer}; use crate::adt::numeric::Numeric; +use crate::refresh_schedule::RefreshEvery; use crate::strconv::parse_timestamp; +include!(concat!(env!("OUT_DIR"), "/mz_repr.timestamp.rs")); + /// System-wide timestamp type. #[derive( Clone, @@ -36,6 +40,18 @@ pub struct Timestamp { internal: u64, } +impl RustType for Timestamp { + fn into_proto(&self) -> ProtoTimestamp { + ProtoTimestamp { + internal: self.into(), + } + } + + fn from_proto(proto: ProtoTimestamp) -> Result { + Ok(Timestamp::new(proto.internal)) + } +} + pub trait TimestampManipulation: timely::progress::Timestamp + timely::order::TotalOrder @@ -182,6 +198,54 @@ impl Timestamp { pub fn step_back(&self) -> Option { self.checked_sub(1) } + + /// Rounds up the timestamp to the time of the next refresh, according to the given periodic + /// refresh schedule. + /// + /// Returns None if an overflow happens. + pub fn round_up( + &self, + RefreshEvery { + interval, + aligned_to, + }: &RefreshEvery, + ) -> Option { + // Planning ensured that + // - interval.months == 0, so we don't need to deal with months being of variable size. + // - The interval can be max 27 days, so the cast to u64 won't overflow. + // - The interval is positive, so the cast to u64 won't underflow. + assert_eq!(interval.months, 0); + let interval: u64 = interval.as_milliseconds().try_into().unwrap(); + // Rounds up `x` to the nearest multiple of `interval`. + let round_up_to_multiple_of_interval = |x: u64| -> Option { + assert_ne!(x, 0); + (((x - 1) / interval).checked_add(1)?).checked_mul(interval) + }; + // Rounds down `x` to the nearest multiple of `interval`. + let round_down_to_multiple_of_interval = |x: u64| -> u64 { x / interval * interval }; + let result = if self > aligned_to { + Self { + internal: aligned_to + .internal + .checked_add(round_up_to_multiple_of_interval( + self.internal - aligned_to.internal, + )?)?, + } + } else { + // Note: `self == aligned_to` has to be handled here, because in the other branch + // `x - 1` would underflow. + // + // Also, no need to check for overflows here, since all the numbers are either between + // `self` and `aligned_to`, or not greater than `aligned_to.internal - self.internal`. + Self { + internal: aligned_to.internal + - round_down_to_multiple_of_interval(aligned_to.internal - self.internal), + } + }; + assert!(result.internal >= self.internal); + assert!(result.internal - self.internal <= interval); + Some(result) + } } impl From for Timestamp { diff --git a/src/sql-lexer/src/keywords.txt b/src/sql-lexer/src/keywords.txt index 16c7e31891873..03debeb632cb2 100644 --- a/src/sql-lexer/src/keywords.txt +++ b/src/sql-lexer/src/keywords.txt @@ -27,6 +27,7 @@ Access Add Addresses Aggregate +Aligned All Alter And @@ -92,6 +93,7 @@ Create Createcluster Createdb Createrole +Creation Cross Csv Current @@ -132,6 +134,7 @@ Enforced Envelope Error Escape +Every Except Execute Exists diff --git a/src/sql-parser/src/ast/defs/ddl.rs b/src/sql-parser/src/ast/defs/ddl.rs index a7f74436ea6fa..2f35160cc3be8 100644 --- a/src/sql-parser/src/ast/defs/ddl.rs +++ b/src/sql-parser/src/ast/defs/ddl.rs @@ -30,12 +30,15 @@ use crate::ast::{AstInfo, Expr, Ident, OrderByExpr, UnresolvedItemName, WithOpti pub enum MaterializedViewOptionName { /// The `ASSERT NOT NULL [=] ` option. AssertNotNull, + /// The `REFRESH [=] ...` option. + Refresh, } impl AstDisplay for MaterializedViewOptionName { fn fmt(&self, f: &mut AstFormatter) { match self { MaterializedViewOptionName::AssertNotNull => f.write_str("ASSERT NOT NULL"), + MaterializedViewOptionName::Refresh => f.write_str("REFRESH"), } } } diff --git a/src/sql-parser/src/ast/defs/statement.rs b/src/sql-parser/src/ast/defs/statement.rs index 3eb4b93b783b6..23393badee12c 100644 --- a/src/sql-parser/src/ast/defs/statement.rs +++ b/src/sql-parser/src/ast/defs/statement.rs @@ -3129,6 +3129,7 @@ pub enum WithOptionValue { // Special cases. ClusterReplicas(Vec>), ConnectionKafkaBroker(KafkaBroker), + Refresh(RefreshOptionValue), } impl AstDisplay for WithOptionValue { @@ -3137,7 +3138,9 @@ impl AstDisplay for WithOptionValue { // When adding branches to this match statement, think about whether it is OK for us to collect // the value as part of our telemetry. Check the data management policy to be sure! match self { - WithOptionValue::Value(_) | WithOptionValue::Sequence(_) => { + WithOptionValue::Value(_) + | WithOptionValue::Sequence(_) + | WithOptionValue::Refresh(_) => { // These are redact-aware. } WithOptionValue::DataType(_) @@ -3179,11 +3182,57 @@ impl AstDisplay for WithOptionValue { WithOptionValue::ConnectionKafkaBroker(broker) => { f.write_node(broker); } + WithOptionValue::Refresh(RefreshOptionValue::OnCommit) => { + f.write_str("ON COMMIT"); + } + WithOptionValue::Refresh(RefreshOptionValue::AtCreation) => { + f.write_str("AT CREATION"); + } + WithOptionValue::Refresh(RefreshOptionValue::At(RefreshAtOptionValue { time })) => { + f.write_str("AT "); + f.write_node(time); + } + WithOptionValue::Refresh(RefreshOptionValue::Every(RefreshEveryOptionValue { + interval, + aligned_to, + })) => { + f.write_str("EVERY '"); + f.write_str(interval); + f.write_str("'"); + if let Some(aligned_to) = aligned_to { + f.write_str(" ALIGNED TO "); + f.write_node(aligned_to) + } + } } } } impl_display_t!(WithOptionValue); +#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] +pub enum RefreshOptionValue { + OnCommit, + AtCreation, + At(RefreshAtOptionValue), + Every(RefreshEveryOptionValue), +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] +pub struct RefreshAtOptionValue { + // We need an Expr because we want to support `mz_now()`. + pub time: Expr, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] +pub struct RefreshEveryOptionValue { + // The following is a String and not an IntervalValue, because that starts with the keyword + // INTERVAL, but that is not needed here, since the only thing that can come here is an + // interval, so no need to indicate this with an extra keyword. + pub interval: String, + // We need an Expr because we want to support `mz_now()`. + pub aligned_to: Option>, +} + #[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] pub enum TransactionMode { AccessMode(TransactionAccessMode), diff --git a/src/sql-parser/src/parser.rs b/src/sql-parser/src/parser.rs index 96bac7ef4d94a..3c144112e3da7 100644 --- a/src/sql-parser/src/parser.rs +++ b/src/sql-parser/src/parser.rs @@ -3218,8 +3218,15 @@ impl<'a> Parser<'a> { fn parse_materialized_view_option_name( &mut self, ) -> Result { - self.expect_keywords(&[ASSERT, NOT, NULL])?; - Ok(MaterializedViewOptionName::AssertNotNull) + let name = match self.expect_one_of_keywords(&[ASSERT, REFRESH])? { + ASSERT => { + self.expect_keywords(&[NOT, NULL])?; + MaterializedViewOptionName::AssertNotNull + } + REFRESH => MaterializedViewOptionName::Refresh, + _ => unreachable!(), + }; + Ok(name) } fn parse_materialized_view_option( @@ -4074,6 +4081,55 @@ impl<'a> Parser<'a> { } else { Ok(WithOptionValue::Ident(ident!("secret"))) } + } else if self.parse_keyword(ON) { + if self.parse_keyword(COMMIT) { + Ok(WithOptionValue::Refresh(RefreshOptionValue::OnCommit)) + } else { + // This is needed when a user supplies `on` as a value to a completely different + // option (not even a REFRESH option). + Ok(WithOptionValue::Ident(ident!("on"))) + } + } else if self.parse_keyword(AT) { + if self.parse_keyword(CREATION) { + Ok(WithOptionValue::Refresh(RefreshOptionValue::AtCreation)) + } else if let Some(expr) = self.maybe_parse(Parser::parse_expr) { + Ok(WithOptionValue::Refresh(RefreshOptionValue::At( + RefreshAtOptionValue { time: expr }, + ))) + } else { + Ok(WithOptionValue::Ident(ident!("at"))) + } + } else if self.parse_keyword(EVERY) { + match self.maybe_parse(Parser::parse_value) { + Some(Value::String(interval)) => { + let aligned_to = if self.parse_keywords(&[ALIGNED, TO]) { + Some(self.parse_expr()?) + } else { + None + }; + Ok(WithOptionValue::Refresh(RefreshOptionValue::Every( + RefreshEveryOptionValue { + interval, + aligned_to, + }, + ))) + } + Some(v @ Value::Interval(_)) => { + parser_err!( + self, + self.peek_prev_pos(), + format!("Invalid value for REFRESH EVERY: `{v}`. The value should be a string parseable as an interval, e.g., '1 day'. The INTERVAL keyword should NOT be present!") + ) + } + Some(v) => { + parser_err!( + self, + self.peek_prev_pos(), + format!("Invalid value for REFRESH EVERY: `{v}`. The value should be a string parseable as an interval, e.g., '1 day'.") + ) + } + None => Ok(WithOptionValue::Ident(ident!("every"))), + } } else if let Some(value) = self.maybe_parse(Parser::parse_value) { Ok(WithOptionValue::Value(value)) } else if let Some(ident) = self.maybe_parse(Parser::parse_identifier) { diff --git a/src/sql-parser/tests/testdata/ddl b/src/sql-parser/tests/testdata/ddl index 79458599c6b65..8f764936f9730 100644 --- a/src/sql-parser/tests/testdata/ddl +++ b/src/sql-parser/tests/testdata/ddl @@ -381,6 +381,20 @@ CREATE MATERIALIZED VIEW v IN CLUSTER [1] AS SELECT 1 => CreateMaterializedView(CreateMaterializedViewStatement { if_exists: Error, name: UnresolvedItemName([Ident("v")]), columns: [], in_cluster: Some(Resolved("1")), query: Query { ctes: Simple([]), body: Select(Select { distinct: None, projection: [Expr { expr: Value(Number("1")), alias: None }], from: [], selection: None, group_by: [], having: None, options: [] }), order_by: [], limit: None, offset: None }, with_options: [] }) +parse-statement +CREATE MATERIALIZED VIEW v WITH (REFRESH EVERY '1 day', ASSERT NOT NULL x) AS SELECT * FROM t; +---- +CREATE MATERIALIZED VIEW v WITH (REFRESH = EVERY '1 day', ASSERT NOT NULL = x) AS SELECT * FROM t +=> +CreateMaterializedView(CreateMaterializedViewStatement { if_exists: Error, name: UnresolvedItemName([Ident("v")]), columns: [], in_cluster: None, query: Query { ctes: Simple([]), body: Select(Select { distinct: None, projection: [Wildcard], from: [TableWithJoins { relation: Table { name: Name(UnresolvedItemName([Ident("t")])), alias: None }, joins: [] }], selection: None, group_by: [], having: None, options: [] }), order_by: [], limit: None, offset: None }, with_options: [MaterializedViewOption { name: Refresh, value: Some(Refresh(Every(RefreshEveryOptionValue { interval: "1 day", aligned_to: None }))) }, MaterializedViewOption { name: AssertNotNull, value: Some(Ident(Ident("x"))) }] }) + +parse-statement +CREATE OR REPLACE MATERIALIZED VIEW v IN CLUSTER [1] WITH (REFRESH EVERY '1 day' ALIGNED TO '2023-12-11 11:00', ASSERT NOT NULL x, REFRESH AT mz_now(), REFRESH ON COMMIT, REFRESH = AT CREATION) AS SELECT * FROM t; +---- +CREATE OR REPLACE MATERIALIZED VIEW v IN CLUSTER [1] WITH (REFRESH = EVERY '1 day' ALIGNED TO '2023-12-11 11:00', ASSERT NOT NULL = x, REFRESH = AT mz_now(), REFRESH = ON COMMIT, REFRESH = AT CREATION) AS SELECT * FROM t +=> +CreateMaterializedView(CreateMaterializedViewStatement { if_exists: Replace, name: UnresolvedItemName([Ident("v")]), columns: [], in_cluster: Some(Resolved("1")), query: Query { ctes: Simple([]), body: Select(Select { distinct: None, projection: [Wildcard], from: [TableWithJoins { relation: Table { name: Name(UnresolvedItemName([Ident("t")])), alias: None }, joins: [] }], selection: None, group_by: [], having: None, options: [] }), order_by: [], limit: None, offset: None }, with_options: [MaterializedViewOption { name: Refresh, value: Some(Refresh(Every(RefreshEveryOptionValue { interval: "1 day", aligned_to: Some(Value(String("2023-12-11 11:00"))) }))) }, MaterializedViewOption { name: AssertNotNull, value: Some(Ident(Ident("x"))) }, MaterializedViewOption { name: Refresh, value: Some(Refresh(At(RefreshAtOptionValue { time: Function(Function { name: Name(UnresolvedItemName([Ident("mz_now")])), args: Args { args: [], order_by: [] }, filter: None, over: None, distinct: false }) }))) }, MaterializedViewOption { name: Refresh, value: Some(Refresh(OnCommit)) }, MaterializedViewOption { name: Refresh, value: Some(Refresh(AtCreation)) }] }) + parse-statement roundtrip CREATE OR REPLACE MATERIALIZED VIEW v WITH (ASSERT NOT NULL a, ASSERT NOT NULL = b) AS SELECT 1 ---- diff --git a/src/sql/src/catalog.rs b/src/sql/src/catalog.rs index c40d1d71c64e8..785eacc1c18f5 100644 --- a/src/sql/src/catalog.rs +++ b/src/sql/src/catalog.rs @@ -196,7 +196,9 @@ pub trait SessionCatalog: fmt::Debug + ExprHumanizer + Send + Sync + ConnectionR cluster_replica_name: &'b QualifiedReplica, ) -> Result<&dyn CatalogClusterReplica<'a>, CatalogError>; - /// Resolves a partially-specified item name. + /// Resolves a partially-specified item name, that is NOT a function or + /// type. (For resolving functions or types, please use resolve_function + /// or resolve_type.) /// /// If the partial name has a database component, it searches only the /// specified database; otherwise, it searches the active database. If the diff --git a/src/sql/src/names.rs b/src/sql/src/names.rs index b3f41dc0fda27..3300892a5bd65 100644 --- a/src/sql/src/names.rs +++ b/src/sql/src/names.rs @@ -34,7 +34,8 @@ use crate::ast::visit_mut::VisitMut; use crate::ast::{ self, AstInfo, Cte, CteBlock, CteMutRec, DocOnIdentifier, GrantTargetSpecification, GrantTargetSpecificationInner, Ident, MutRecBlock, ObjectType, Query, Raw, RawClusterName, - RawDataType, RawItemName, Statement, UnresolvedItemName, UnresolvedObjectName, + RawDataType, RawItemName, RefreshAtOptionValue, RefreshEveryOptionValue, RefreshOptionValue, + Statement, UnresolvedItemName, UnresolvedObjectName, }; use crate::catalog::{ CatalogError, CatalogItem, CatalogItemType, CatalogTypeDetails, SessionCatalog, @@ -1783,6 +1784,22 @@ impl<'a> Fold for NameResolver<'a> { .collect(), ), ConnectionKafkaBroker(broker) => ConnectionKafkaBroker(self.fold_kafka_broker(broker)), + Refresh(refresh) => match refresh { + RefreshOptionValue::OnCommit => Refresh(RefreshOptionValue::OnCommit), + RefreshOptionValue::AtCreation => Refresh(RefreshOptionValue::AtCreation), + RefreshOptionValue::At(RefreshAtOptionValue { time }) => { + Refresh(RefreshOptionValue::At(RefreshAtOptionValue { + time: self.fold_expr(time), + })) + } + RefreshOptionValue::Every(RefreshEveryOptionValue { + interval, + aligned_to, + }) => Refresh(RefreshOptionValue::Every(RefreshEveryOptionValue { + interval, + aligned_to: aligned_to.map(|e| self.fold_expr(e)), + })), + }, } } diff --git a/src/sql/src/plan.rs b/src/sql/src/plan.rs index 357fb48054dae..df1b0607554bb 100644 --- a/src/sql/src/plan.rs +++ b/src/sql/src/plan.rs @@ -39,6 +39,7 @@ use mz_ore::now::{self, NOW_ZERO}; use mz_pgcopy::CopyFormatParams; use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem}; use mz_repr::explain::{ExplainConfig, ExplainFormat}; +use mz_repr::refresh_schedule::RefreshSchedule; use mz_repr::role_id::RoleId; use mz_repr::{ColumnName, Diff, GlobalId, RelationDesc, Row, ScalarType}; use mz_sql_parser::ast::{ @@ -861,6 +862,7 @@ pub enum ExplaineeStatement { /// Broken flag (see [`ExplaineeStatement::broken()`]). broken: bool, non_null_assertions: Vec, + refresh_schedule: Option, }, /// The object to be explained is a CREATE INDEX. CreateIndex { @@ -1431,6 +1433,7 @@ pub struct MaterializedView { pub column_names: Vec, pub cluster_id: ClusterId, pub non_null_assertions: Vec, + pub refresh_schedule: Option, } #[derive(Clone, Debug)] diff --git a/src/sql/src/plan/expr.rs b/src/sql/src/plan/expr.rs index d1d25b937a44d..5b2f2c0b9a7de 100644 --- a/src/sql/src/plan/expr.rs +++ b/src/sql/src/plan/expr.rs @@ -3032,6 +3032,28 @@ impl HirScalarExpr { } }) } + + /// Attempts to simplify this expression to a literal MzTimestamp. + /// + /// Returns `None` if this expression cannot be simplified, e.g. because it + /// contains non-literal values. + /// + /// TODO: Make this (and the other similar fns above) return Result, so that we can show the + /// error when it fails. (E.g., there can be non-trivial cast errors.) + /// + /// # Panics + /// + /// Panics if this expression does not have type [`ScalarType::MzTimestamp`]. + pub fn into_literal_mz_timestamp(self) -> Option { + self.simplify_to_literal().and_then(|row| { + let datum = row.unpack_first(); + if datum.is_null() { + None + } else { + Some(datum.unwrap_mz_timestamp()) + } + }) + } } impl VisitChildren for HirScalarExpr { diff --git a/src/sql/src/plan/statement.rs b/src/sql/src/plan/statement.rs index 8deec1d0a7f46..22c7a804b771e 100644 --- a/src/sql/src/plan/statement.rs +++ b/src/sql/src/plan/statement.rs @@ -240,7 +240,8 @@ pub fn describe( /// Planning is a pure, synchronous function and so requires that the provided /// `stmt` does does not depend on any external state. Statements that rely on /// external state must remove that state prior to calling this function via -/// [`crate::pure::purify_statement`]. +/// [`crate::pure::purify_statement`] or +/// [`crate::pure::purify_create_materialized_view_options`]. /// /// TODO: sinks do not currently obey this rule, which is a bug /// diff --git a/src/sql/src/plan/statement/ddl.rs b/src/sql/src/plan/statement/ddl.rs index a06e33a262c81..9c6862fa1120b 100644 --- a/src/sql/src/plan/statement/ddl.rs +++ b/src/sql/src/plan/statement/ddl.rs @@ -15,6 +15,7 @@ use std::collections::{BTreeMap, BTreeSet}; use std::fmt::Write; use std::iter; +use std::str::FromStr; use std::time::Duration; use itertools::{Either, Itertools}; @@ -25,8 +26,10 @@ use mz_ore::cast::{CastFrom, TryCastFrom}; use mz_ore::collections::HashSet; use mz_ore::str::StrExt; use mz_proto::RustType; +use mz_repr::adt::interval::Interval; use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap}; use mz_repr::adt::system::Oid; +use mz_repr::refresh_schedule::{RefreshEvery, RefreshSchedule}; use mz_repr::role_id::RoleId; use mz_repr::{strconv, ColumnName, ColumnType, GlobalId, RelationDesc, RelationType, ScalarType}; use mz_sql_parser::ast::display::comma_separated; @@ -39,8 +42,8 @@ use mz_sql_parser::ast::{ CreateConnectionOption, CreateConnectionOptionName, CreateConnectionType, CreateTypeListOption, CreateTypeListOptionName, CreateTypeMapOption, CreateTypeMapOptionName, DeferredItemName, DocOnIdentifier, DocOnSchema, DropOwnedStatement, MaterializedViewOption, - MaterializedViewOptionName, SetRoleVar, UnresolvedItemName, UnresolvedObjectName, - UnresolvedSchemaName, Value, + MaterializedViewOptionName, RefreshAtOptionValue, RefreshEveryOptionValue, RefreshOptionValue, + SetRoleVar, UnresolvedItemName, UnresolvedObjectName, UnresolvedSchemaName, Value, }; use mz_sql_parser::ident; use mz_storage_types::connections::inline::{ConnectionAccess, ReferencedConnection}; @@ -94,7 +97,7 @@ use crate::names::{ use crate::normalize::{self, ident}; use crate::plan::error::PlanError; use crate::plan::expr::ColumnRef; -use crate::plan::query::{scalar_type_from_catalog, ExprContext, QueryLifetime}; +use crate::plan::query::{plan_expr, scalar_type_from_catalog, ExprContext, QueryLifetime}; use crate::plan::scope::Scope; use crate::plan::statement::ddl::connection::{INALTERABLE_OPTIONS, MUTUALLY_EXCLUSIVE_SETS}; use crate::plan::statement::{scl, StatementContext, StatementDesc}; @@ -118,6 +121,7 @@ use crate::plan::{ WebhookHeaders, WebhookValidation, }; use crate::session::vars; +use crate::session::vars::ENABLE_REFRESH_EVERY_MVS; mod connection; @@ -2086,9 +2090,125 @@ pub fn plan_create_materialized_view( let MaterializedViewOptionExtracted { assert_not_null, + refresh, seen: _, }: MaterializedViewOptionExtracted = stmt.with_options.try_into()?; + let refresh_schedule = { + let mut refresh_schedule = RefreshSchedule::empty(); + let mut on_commits_seen = 0; + for refresh_option_value in refresh { + if !matches!(refresh_option_value, RefreshOptionValue::OnCommit) { + scx.require_feature_flag(&ENABLE_REFRESH_EVERY_MVS)?; + } + match refresh_option_value { + RefreshOptionValue::OnCommit => { + on_commits_seen += 1; + } + RefreshOptionValue::AtCreation => { + unreachable!("REFRESH AT CREATION should have been purified away") + } + RefreshOptionValue::At(RefreshAtOptionValue { mut time }) => { + transform_ast::transform(scx, &mut time)?; // Desugar the expression + let ecx = &ExprContext { + qcx: &QueryContext::root(scx, QueryLifetime::OneShot), + name: "REFRESH AT", + scope: &Scope::empty(), + relation_type: &RelationType::empty(), + allow_aggregates: false, + allow_subqueries: false, + allow_parameters: false, + allow_windows: false, + }; + let hir = plan_expr(ecx, &time)?.cast_to( + ecx, + CastContext::Implicit, + &ScalarType::MzTimestamp, + )?; + // (mz_now was purified away to a literal earlier) + let timestamp = hir + .into_literal_mz_timestamp(). + ok_or_else(|| sql_err!( + "REFRESH AT argument must be an expression that can be simplified and/or cast \ + to a constant whose type is mz_timestamp (calling mz_now() is allowed)" + ))?; + refresh_schedule.ats.push(timestamp); + } + RefreshOptionValue::Every(RefreshEveryOptionValue { + interval, + aligned_to, + }) => { + let interval = Interval::from_str(interval.as_str())?; + if interval.as_microseconds() <= 0 { + sql_bail!("REFRESH interval must be positive; got: {}", interval); + } + if interval.as_microseconds() > Interval::new(0, 27, 0).as_microseconds() { + // This limitation is because we want Intervals to be cleanly convertable + // to a unix epoch timestamp difference. When it's at least 1 month, then + // this is not true anymore, because months have variable lengths. + // See `Timestamp::round_up`. + sql_bail!( + "REFRESH interval too big: {}. Currently, only intervals not larger than 27 days are supported.", + interval + ); + } + + let mut aligned_to = + aligned_to.expect("ALIGNED TO should have been filled in by purification"); + + // Desugar the `aligned_to` expression + transform_ast::transform(scx, &mut aligned_to)?; + + let ecx = &ExprContext { + qcx: &QueryContext::root(scx, QueryLifetime::OneShot), + name: "REFRESH EVERY ... ALIGNED TO", + scope: &Scope::empty(), + relation_type: &RelationType::empty(), + allow_aggregates: false, + allow_subqueries: false, + allow_parameters: false, + allow_windows: false, + }; + let aligned_to_hir = plan_expr(ecx, &aligned_to)?.cast_to( + ecx, + CastContext::Implicit, + &ScalarType::MzTimestamp, + )?; + // (mz_now was purified away to a literal earlier) + let aligned_to_const = aligned_to_hir + .into_literal_mz_timestamp() + .ok_or_else(|| + sql_err!( + "REFRESH EVERY ... ALIGNED TO argument must be an expression that can be simplified and/or cast \ + to a constant whose type is mz_timestamp (calling mz_now() is allowed)" + ))?; + + refresh_schedule.everies.push(RefreshEvery { + interval, + aligned_to: aligned_to_const, + }); + } + } + } + + if on_commits_seen > 1 { + sql_bail!("REFRESH ON COMMIT can be given only once"); + } + if on_commits_seen > 0 && refresh_schedule != RefreshSchedule::empty() { + sql_bail!("REFRESH ON COMMIT is not compatible with any of the other REFRESH options"); + } + // Note: Seeing no REFRESH options at all (not even REFRESH ON COMMIT) should be acceptable: + // even though purification inserts REFRESH ON COMMIT if no other REFRESH option was given, + // we can't rely on this behavior in planning, because this won't happen for old + // materialized views that were created before this feature was introduced. + + if refresh_schedule == RefreshSchedule::empty() { + None + } else { + Some(refresh_schedule) + } + }; + if !assert_not_null.is_empty() { scx.require_feature_flag(&crate::session::vars::ENABLE_ASSERT_NOT_NULL)?; } @@ -2180,6 +2300,7 @@ pub fn plan_create_materialized_view( column_names, cluster_id, non_null_assertions, + refresh_schedule, }, replace, drop_ids, @@ -2190,7 +2311,8 @@ pub fn plan_create_materialized_view( generate_extracted_config!( MaterializedViewOption, - (AssertNotNull, Ident, AllowMultiple) + (AssertNotNull, Ident, AllowMultiple), + (Refresh, RefreshOptionValue, AllowMultiple) ); pub fn describe_create_sink( diff --git a/src/sql/src/plan/statement/dml.rs b/src/sql/src/plan/statement/dml.rs index fc6dc777c145e..985f8dc21235e 100644 --- a/src/sql/src/plan/statement/dml.rs +++ b/src/sql/src/plan/statement/dml.rs @@ -371,6 +371,7 @@ pub fn plan_explain_plan( column_names, cluster_id, non_null_assertions, + refresh_schedule, .. }, .. @@ -386,6 +387,7 @@ pub fn plan_explain_plan( cluster_id, broken, non_null_assertions, + refresh_schedule, }) } Explainee::CreateIndex(mut stmt, broken) => { diff --git a/src/sql/src/plan/with_options.rs b/src/sql/src/plan/with_options.rs index d222319d10251..a04c6873ed57c 100644 --- a/src/sql/src/plan/with_options.rs +++ b/src/sql/src/plan/with_options.rs @@ -10,7 +10,7 @@ //! Provides tooling to handle `WITH` options. use mz_repr::{strconv, GlobalId}; -use mz_sql_parser::ast::{Ident, KafkaBroker, ReplicaDefinition}; +use mz_sql_parser::ast::{Ident, KafkaBroker, RefreshOptionValue, ReplicaDefinition}; use mz_storage_types::connections::StringOrSecret; use serde::{Deserialize, Serialize}; use std::time::Duration; @@ -432,9 +432,12 @@ impl, T: AstInfo + std::fmt::Debug> TryFromValue sql_bail!( + | WithOptionValue::ConnectionKafkaBroker(_) + | WithOptionValue::Refresh(_) => sql_bail!( "incompatible value types: cannot convert {} to {}", match v { + WithOptionValue::Value(_) => unreachable!(), + WithOptionValue::Ident(_) => unreachable!(), WithOptionValue::Sequence(_) => "sequences", WithOptionValue::Item(_) => "object references", WithOptionValue::UnresolvedItemName(_) => "object names", @@ -442,7 +445,7 @@ impl, T: AstInfo + std::fmt::Debug> TryFromValue "data types", WithOptionValue::ClusterReplicas(_) => "cluster replicas", WithOptionValue::ConnectionKafkaBroker(_) => "connection kafka brokers", - _ => unreachable!(), + WithOptionValue::Refresh(_) => "refresh option values", }, V::name() ), @@ -509,3 +512,23 @@ impl ImpliedValue for Vec> { sql_bail!("must provide a kafka broker") } } + +impl TryFromValue> for RefreshOptionValue { + fn try_from_value(v: WithOptionValue) -> Result { + if let WithOptionValue::Refresh(r) = v { + Ok(r) + } else { + sql_bail!("cannot use value `{}` for a refresh option", v) + } + } + + fn name() -> String { + "refresh option value".to_string() + } +} + +impl ImpliedValue for RefreshOptionValue { + fn implied_value() -> Result { + sql_bail!("must provide a refresh option value") + } +} diff --git a/src/sql/src/pure.rs b/src/sql/src/pure.rs index d3484f0c78741..155cd26e35ec4 100644 --- a/src/sql/src/pure.rs +++ b/src/sql/src/pure.rs @@ -24,16 +24,20 @@ use mz_ore::iter::IteratorExt; use mz_ore::str::StrExt; use mz_postgres_util::replication::WalLevel; use mz_proto::RustType; -use mz_repr::{strconv, GlobalId}; +use mz_repr::{strconv, GlobalId, Timestamp}; use mz_sql_parser::ast::display::AstDisplay; +use mz_sql_parser::ast::visit::{visit_function, Visit}; +use mz_sql_parser::ast::visit_mut::{visit_expr_mut, VisitMut}; use mz_sql_parser::ast::{ AlterSourceAction, AlterSourceAddSubsourceOptionName, AlterSourceStatement, AvroDocOn, - CreateSinkConnection, CreateSinkStatement, CreateSubsourceOption, CreateSubsourceOptionName, - CsrConfigOption, CsrConfigOptionName, CsrConnection, CsrSeedAvro, CsrSeedProtobuf, - CsrSeedProtobufSchema, DbzMode, DeferredItemName, DocOnIdentifier, DocOnSchema, Envelope, - Ident, KafkaConfigOption, KafkaConfigOptionName, KafkaConnection, KafkaSourceConnection, - PgConfigOption, PgConfigOptionName, RawItemName, ReaderSchemaSelectionStrategy, Statement, - UnresolvedItemName, + CreateMaterializedViewStatement, CreateSinkConnection, CreateSinkStatement, + CreateSubsourceOption, CreateSubsourceOptionName, CsrConfigOption, CsrConfigOptionName, + CsrConnection, CsrSeedAvro, CsrSeedProtobuf, CsrSeedProtobufSchema, DbzMode, DeferredItemName, + DocOnIdentifier, DocOnSchema, Envelope, Expr, Function, FunctionArgs, Ident, KafkaConfigOption, + KafkaConfigOptionName, KafkaConnection, KafkaSourceConnection, MaterializedViewOption, + MaterializedViewOptionName, PgConfigOption, PgConfigOptionName, RawItemName, + ReaderSchemaSelectionStrategy, RefreshAtOptionValue, RefreshEveryOptionValue, + RefreshOptionValue, Statement, UnresolvedItemName, }; use mz_storage_types::connections::inline::IntoInlineConnection; use mz_storage_types::connections::{Connection, ConnectionContext}; @@ -55,7 +59,10 @@ use crate::ast::{ }; use crate::catalog::{CatalogItemType, ErsatzCatalog, SessionCatalog}; use crate::kafka_util::KafkaConfigOptionExtracted; -use crate::names::{Aug, ResolvedColumnName, ResolvedItemName}; +use crate::names::{ + Aug, FullItemName, PartialItemName, ResolvedColumnName, ResolvedDataType, ResolvedIds, + ResolvedItemName, +}; use crate::plan::error::PlanError; use crate::plan::statement::ddl::load_generator_ast_to_generator; use crate::plan::StatementContext; @@ -1450,3 +1457,245 @@ async fn compile_proto( message_name, }) } + +const MZ_NOW_NAME: &str = "mz_now"; +const MZ_NOW_SCHEMA: &str = "mz_catalog"; + +/// Purifies a CREATE MATERIALIZED VIEW statement. Additionally, it adjusts `resolved_ids` if +/// references to ids appear or disappear during the purification. +pub fn purify_create_materialized_view_options( + catalog: impl SessionCatalog, + mz_now: Option, + cmvs: &mut CreateMaterializedViewStatement, + resolved_ids: &mut ResolvedIds, +) { + // 0. Preparations: + // Prepare an expression that calls `mz_now()`, which we can insert in various later steps. + let (mz_now_id, mz_now_expr) = { + let item = catalog + .resolve_function(&PartialItemName { + database: None, + schema: Some(MZ_NOW_SCHEMA.to_string()), + item: MZ_NOW_NAME.to_string(), + }) + .expect("we should be able to resolve mz_now"); + ( + item.id(), + Expr::Function(Function { + name: ResolvedItemName::Item { + id: item.id(), + qualifiers: item.name().qualifiers.clone(), + full_name: catalog.resolve_full_name(item.name()), + print_id: false, + }, + args: FunctionArgs::Args { + args: Vec::new(), + order_by: Vec::new(), + }, + filter: None, + over: None, + distinct: false, + }), + ) + }; + // Prepare the `mz_timestamp` type. + let (mz_timestamp_id, mz_timestamp_type) = { + let item = catalog + .resolve_type(&PartialItemName { + database: None, + schema: Some("pg_catalog".to_string()), // todo: Why is this not `mz_catalog`? + item: "mz_timestamp".to_string(), + }) + .expect("mz_timestamp should exist"); + let full_name = catalog.resolve_full_name(item.name()); + ( + item.id(), + ResolvedDataType::Named { + id: item.id(), + qualifiers: item.name().qualifiers.clone(), + full_name, + modifiers: vec![], + print_id: true, + }, + ) + }; + + let mut introduced_mz_timestamp = false; + + for option in cmvs.with_options.iter_mut() { + // 1. Purify `REFRESH AT CREATION` to `REFRESH AT mz_now()`. + if matches!( + option.value, + Some(WithOptionValue::Refresh(RefreshOptionValue::AtCreation)) + ) { + option.value = Some(WithOptionValue::Refresh(RefreshOptionValue::At( + RefreshAtOptionValue { + time: mz_now_expr.clone(), + }, + ))); + } + + // 2. If `REFRESH EVERY` doesn't have a `STARTING AT`, then add `STARTING AT mz_now()`. + if let Some(WithOptionValue::Refresh(RefreshOptionValue::Every( + RefreshEveryOptionValue { aligned_to, .. }, + ))) = &mut option.value + { + if aligned_to.is_none() { + *aligned_to = Some(mz_now_expr.clone()); + } + } + + // 3. Substitute `mz_now()` with the timestamp chosen for the CREATE MATERIALIZED VIEW + // statement. (This has to happen after the above steps, which might introduce `mz_now()`.) + match &mut option.value { + Some(WithOptionValue::Refresh(RefreshOptionValue::At(RefreshAtOptionValue { + time, + }))) => { + let mut visitor = MzNowPurifierVisitor::new(mz_now, mz_timestamp_type.clone()); + visitor.visit_expr_mut(time); + introduced_mz_timestamp |= visitor.introduced_mz_timestamp; + } + Some(WithOptionValue::Refresh(RefreshOptionValue::Every( + RefreshEveryOptionValue { + interval: _, + aligned_to: Some(aligned_to), + }, + ))) => { + let mut visitor = MzNowPurifierVisitor::new(mz_now, mz_timestamp_type.clone()); + visitor.visit_expr_mut(aligned_to); + introduced_mz_timestamp |= visitor.introduced_mz_timestamp; + } + _ => {} + } + } + + // 4. If the user didn't give any REFRESH option, then default to ON COMMIT. + if !cmvs.with_options.iter().any(|o| { + matches!( + o, + MaterializedViewOption { + value: Some(WithOptionValue::Refresh(..)), + .. + } + ) + }) { + cmvs.with_options.push(MaterializedViewOption { + name: MaterializedViewOptionName::Refresh, + value: Some(WithOptionValue::Refresh(RefreshOptionValue::OnCommit)), + }) + } + + // 5. Attend to `resolved_ids`: The purification might have + // - added references to `mz_timestamp`; + // - removed references to `mz_now`. + if introduced_mz_timestamp { + resolved_ids.0.insert(mz_timestamp_id); + } + // Even though we always remove `mz_now()` from the `with_options`, there might be `mz_now()` + // remaining in the main query expression of the MV, so let's visit the entire statement to look + // for `mz_now()` everywhere. + let mut visitor = ExprContainsTemporalVisitor::new(); + visitor.visit_create_materialized_view_statement(cmvs); + if !visitor.contains_temporal { + resolved_ids.0.remove(&mz_now_id); + } +} + +/// Returns true if the [MaterializedViewOption] either already involves `mz_now()` or will involve +/// after purification. +pub fn materialized_view_option_contains_temporal(mvo: &MaterializedViewOption) -> bool { + match &mvo.value { + Some(WithOptionValue::Refresh(RefreshOptionValue::At(RefreshAtOptionValue { time }))) => { + let mut visitor = ExprContainsTemporalVisitor::new(); + visitor.visit_expr(time); + visitor.contains_temporal + } + Some(WithOptionValue::Refresh(RefreshOptionValue::Every(RefreshEveryOptionValue { + interval: _, + aligned_to: Some(aligned_to), + }))) => { + let mut visitor = ExprContainsTemporalVisitor::new(); + visitor.visit_expr(aligned_to); + visitor.contains_temporal + } + Some(WithOptionValue::Refresh(RefreshOptionValue::Every(RefreshEveryOptionValue { + interval: _, + aligned_to: None, + }))) => { + // For a `REFRESH EVERY` without a `STARTING AT`, purification will default the + // `STARTING AT` to `mz_now()`. + true + } + Some(WithOptionValue::Refresh(RefreshOptionValue::AtCreation)) => { + // `REFRESH AT CREATION` will be purified to `REFRESH AT mz_now()`. + true + } + _ => false, + } +} + +/// Determines whether the AST involves `mz_now()`. +struct ExprContainsTemporalVisitor { + pub contains_temporal: bool, +} + +impl ExprContainsTemporalVisitor { + pub fn new() -> ExprContainsTemporalVisitor { + ExprContainsTemporalVisitor { + contains_temporal: false, + } + } +} + +impl Visit<'_, Aug> for ExprContainsTemporalVisitor { + fn visit_function(&mut self, func: &Function) { + self.contains_temporal |= func.name.full_item_name().item == MZ_NOW_NAME; + visit_function(self, func); + } +} + +struct MzNowPurifierVisitor { + pub mz_now: Option, + pub mz_timestamp_type: ResolvedDataType, + pub introduced_mz_timestamp: bool, +} + +impl MzNowPurifierVisitor { + pub fn new( + mz_now: Option, + mz_timestamp_type: ResolvedDataType, + ) -> MzNowPurifierVisitor { + MzNowPurifierVisitor { + mz_now, + mz_timestamp_type, + introduced_mz_timestamp: false, + } + } +} + +impl VisitMut<'_, Aug> for MzNowPurifierVisitor { + fn visit_expr_mut(&mut self, expr: &'_ mut Expr) { + match expr { + Expr::Function(Function { + name: + ResolvedItemName::Item { + full_name: FullItemName { item, .. }, + .. + }, + .. + }) if item == &MZ_NOW_NAME.to_string() => { + let mz_now = self.mz_now.expect( + "we should have chosen a timestamp if the expression contains mz_now()", + ); + // We substitute `mz_now()` with number + a cast to `mz_timestamp`. The cast is to + // not alter the type of the expression. + *expr = Expr::Cast { + expr: Box::new(Expr::Value(Value::Number(mz_now.to_string()))), + data_type: self.mz_timestamp_type.clone(), + }; + self.introduced_mz_timestamp = true; + } + _ => visit_expr_mut(self, expr), + } + } +} diff --git a/src/sql/src/session/vars.rs b/src/sql/src/session/vars.rs index bd7a9c8001729..0fd67381a35a7 100644 --- a/src/sql/src/session/vars.rs +++ b/src/sql/src/session/vars.rs @@ -2080,6 +2080,13 @@ feature_flags!( internal: true, enable_for_item_parsing: true, }, + { + name: enable_refresh_every_mvs, + desc: "REFRESH EVERY materialized views", + default: false, + internal: true, + enable_for_item_parsing: true, + }, ); /// Represents the input to a variable. diff --git a/test/sqllogictest/materialized_views.slt b/test/sqllogictest/materialized_views.slt index 31c8649acaf2b..a9ff8719fe613 100644 --- a/test/sqllogictest/materialized_views.slt +++ b/test/sqllogictest/materialized_views.slt @@ -370,14 +370,14 @@ query TT colnames SHOW CREATE MATERIALIZED VIEW mv ---- name create_sql -materialize.public.mv CREATE␠MATERIALIZED␠VIEW␠"materialize"."public"."mv"␠IN␠CLUSTER␠"default"␠AS␠SELECT␠1 +materialize.public.mv CREATE␠MATERIALIZED␠VIEW␠"materialize"."public"."mv"␠IN␠CLUSTER␠"default"␠WITH␠(REFRESH␠=␠ON␠COMMIT)␠AS␠SELECT␠1 # Test: SHOW CREATE MATERIALIZED VIEW as mz_support simple conn=mz_introspection,user=mz_support SHOW CREATE MATERIALIZED VIEW mv ---- -materialize.public.mv,CREATE MATERIALIZED VIEW "materialize"."public"."mv" IN CLUSTER "default" AS SELECT 1 +materialize.public.mv,CREATE MATERIALIZED VIEW "materialize"."public"."mv" IN CLUSTER "default" WITH (REFRESH = ON COMMIT) AS SELECT 1 COMPLETE 1 # Test: SHOW MATERIALIZED VIEWS @@ -645,7 +645,348 @@ SELECT * FROM mv_assertion_at_begin ORDER BY x; 4 NULL 6 7 8 NULL -# More Cleanup +# ------------------------------------------------------------------ +# REFRESH options (see also in materialized-view-refresh-options.td) +# ------------------------------------------------------------------ +# Planning/parsing errors + +# Should be disabled by default. +query error db error: ERROR: REFRESH EVERY materialized views is not supported +CREATE MATERIALIZED VIEW mv_bad WITH (ASSERT NOT NULL x, REFRESH EVERY '8 seconds') AS SELECT * FROM t2; + +simple conn=mz_system,user=mz_system +ALTER SYSTEM SET enable_refresh_every_mvs = true +---- +COMPLETE 0 + +query error invalid REFRESH: cannot use value `5` for a refresh option +CREATE MATERIALIZED VIEW mv_bad WITH (REFRESH 5) AS SELECT 1; + +query error db error: ERROR: REFRESH ON COMMIT can be given only once +CREATE MATERIALIZED VIEW mv_bad WITH (REFRESH ON COMMIT, REFRESH ON COMMIT) AS SELECT 1; + +query error db error: ERROR: REFRESH ON COMMIT is not compatible with any of the other REFRESH options +CREATE MATERIALIZED VIEW mv_bad WITH (REFRESH ON COMMIT, REFRESH EVERY '1 day') AS SELECT 1; + +query error db error: ERROR: REFRESH AT does not support implicitly casting from record\(f1: integer,f2: integer\) to mz_timestamp +CREATE MATERIALIZED VIEW mv_bad WITH (REFRESH AT row(1,2)) AS SELECT 1; + +query error db error: ERROR: REFRESH AT argument must be an expression that can be simplified and/or cast to a constant whose type is mz_timestamp \(calling mz_now\(\) is allowed\) +CREATE MATERIALIZED VIEW mv_bad WITH (REFRESH AT 'aaaa') AS SELECT 1; + +query error db error: ERROR: column "ccc" does not exist +CREATE MATERIALIZED VIEW mv_bad WITH (REFRESH AT ccc) AS SELECT 1 as ccc; + +query error db error: ERROR: REFRESH AT argument must be an expression that can be simplified and/or cast to a constant whose type is mz_timestamp \(calling mz_now\(\) is allowed\) +CREATE MATERIALIZED VIEW mv_bad WITH (REFRESH AT now()) AS SELECT 1; + +query error db error: ERROR: REFRESH AT argument must be an expression that can be simplified and/or cast to a constant whose type is mz_timestamp \(calling mz_now\(\) is allowed\) +CREATE MATERIALIZED VIEW mv_bad WITH (REFRESH AT now()::mz_timestamp) AS SELECT 1; + +query error db error: ERROR: greatest types mz_timestamp and timestamp with time zone cannot be matched +CREATE MATERIALIZED VIEW mv_bad WITH (REFRESH AT greatest(mz_now(), now())) AS SELECT 1; + +query error db error: ERROR: REFRESH AT argument must be an expression that can be simplified and/or cast to a constant whose type is mz_timestamp \(calling mz_now\(\) is allowed\) +CREATE MATERIALIZED VIEW mv_bad WITH (REFRESH AT greatest(mz_now(), now()::mz_timestamp)) AS SELECT 1; + +query error db error: ERROR: aggregate functions are not allowed in REFRESH AT \(function pg_catalog\.sum\) +CREATE MATERIALIZED VIEW mv_bad WITH (REFRESH AT sum(5)) AS SELECT 1; + +query error db error: ERROR: REFRESH AT does not allow subqueries +CREATE MATERIALIZED VIEW mv_bad WITH (REFRESH AT (SELECT 1)) AS SELECT 1; + +query error db error: ERROR: window functions are not allowed in REFRESH AT \(function pg_catalog\.lag\) +CREATE MATERIALIZED VIEW mv_bad WITH (REFRESH AT lag(7) OVER ()) AS SELECT 1; + +query error Invalid value for REFRESH EVERY: `42`\. The value should be a string parseable as an interval, e\.g\., '1 day'\. +CREATE MATERIALIZED VIEW mv_bad WITH (REFRESH EVERY 42) AS SELECT 1; + +query error db error: ERROR: invalid input syntax for type interval: unknown units dayy: "1 dayy" +CREATE MATERIALIZED VIEW mv_bad WITH (REFRESH EVERY '1 dayy') AS SELECT 1; + +query error Invalid value for REFRESH EVERY: `INTERVAL '1 day'`\. The value should be a string parseable as an interval, e\.g\., '1 day'\. The INTERVAL keyword should NOT be present! +CREATE MATERIALIZED VIEW mv_bad WITH (REFRESH EVERY INTERVAL '1 day') AS SELECT 1; + +query error db error: ERROR: REFRESH interval must be positive; got: \-00:01:00 +CREATE MATERIALIZED VIEW mv_bad WITH (REFRESH EVERY '-1 minutes') AS SELECT 1; + +query error db error: ERROR: REFRESH interval too big: 28 days\. Currently, only intervals not larger than 27 days are supported\. +CREATE MATERIALIZED VIEW mv_bad WITH (REFRESH EVERY '28 days') AS SELECT 1; + +query error db error: ERROR: REFRESH EVERY \.\.\. ALIGNED TO argument must be an expression that can be simplified and/or cast to a constant whose type is mz_timestamp \(calling mz_now\(\) is allowed\) +CREATE MATERIALIZED VIEW mv_bad WITH (REFRESH EVERY '1 day' ALIGNED TO now()) AS SELECT 1; + +query error db error: ERROR: REFRESH EVERY \.\.\. ALIGNED TO argument must be an expression that can be simplified and/or cast to a constant whose type is mz_timestamp \(calling mz_now\(\) is allowed\) +CREATE MATERIALIZED VIEW mv_bad WITH (REFRESH EVERY '1 day' ALIGNED TO now()::mz_timestamp) AS SELECT 1; + +query error db error: ERROR: greatest types mz_timestamp and timestamp with time zone cannot be matched +CREATE MATERIALIZED VIEW mv_bad WITH (REFRESH EVERY '1 day' ALIGNED TO greatest(mz_now(), now())) AS SELECT 1; + +query error db error: ERROR: REFRESH EVERY \.\.\. ALIGNED TO argument must be an expression that can be simplified and/or cast to a constant whose type is mz_timestamp \(calling mz_now\(\) is allowed\) +CREATE MATERIALIZED VIEW mv_bad WITH (REFRESH EVERY '1 day' ALIGNED TO greatest(mz_now(), now()::mz_timestamp)) AS SELECT 1; + +query error db error: ERROR: aggregate functions are not allowed in REFRESH EVERY \.\.\. ALIGNED TO \(function pg_catalog\.sum\) +CREATE MATERIALIZED VIEW mv_bad WITH (REFRESH EVERY '1 day' ALIGNED TO sum(5)) AS SELECT 1; + +query error db error: ERROR: REFRESH EVERY \.\.\. ALIGNED TO does not allow subqueries +CREATE MATERIALIZED VIEW mv_bad WITH (REFRESH EVERY '1 day' ALIGNED TO (SELECT 1)) AS SELECT 1; + +query error db error: ERROR: window functions are not allowed in REFRESH EVERY \.\.\. ALIGNED TO \(function pg_catalog\.lag\) +CREATE MATERIALIZED VIEW mv_bad WITH (REFRESH EVERY '1 day' ALIGNED TO lag(7) OVER ()) AS SELECT 1; + +query error db error: ERROR: invalid REFRESH: cannot use value `every` for a refresh option +CREATE MATERIALIZED VIEW mv_bad WITH (REFRESH EVERY) AS SELECT * FROM t2; + +query error db error: ERROR: invalid REFRESH: cannot use value `every` for a refresh option +CREATE MATERIALIZED VIEW mv_bad WITH (REFRESH EVERY, ASSERT NOT NULL x) AS SELECT * FROM t2; + +query error Expected right parenthesis, found REFRESH +CREATE MATERIALIZED VIEW mv_bad WITH (ASSERT NOT NULL x REFRESH EVERY '8 seconds') AS SELECT * FROM t2; + +query error Expected right parenthesis, found ASSERT +CREATE MATERIALIZED VIEW mv_bad WITH (REFRESH EVERY ASSERT NOT NULL x) AS SELECT * FROM t2; + +# Test that we call `transform_ast::transform`. (This has an `Expr::Nested`, which needs to be desugared, or we panic.) +statement ok +CREATE MATERIALIZED VIEW mv_desugar1 WITH (REFRESH AT (mz_now())) AS SELECT * FROM t2; + +# Same with ALIGNED TO +statement ok +CREATE MATERIALIZED VIEW mv_desugar2 WITH (REFRESH EVERY '1 day' ALIGNED TO (mz_now())) AS SELECT * FROM t2; + +## REFRESH options together with ASSERT NOT NULL options + +statement ok +INSERT INTO t2 VALUES (10, 11, 12); + +statement ok +CREATE MATERIALIZED VIEW mv_assertion_plus_refresh_every WITH (ASSERT NOT NULL x, REFRESH EVERY '8 seconds') AS SELECT * FROM t2; + +# There should be a refresh immediately when creating the MV. THis refresh should already see what we just inserted. +query III +SELECT * FROM mv_assertion_plus_refresh_every ORDER BY x; +---- +1 2 3 +4 NULL 6 +7 8 NULL +10 11 12 + +statement ok +INSERT INTO t2 VALUES (NULL, -1, -2); + +# This insert shouldn't be visible yet. +query III +SELECT * FROM mv_assertion_plus_refresh_every ORDER BY x; +---- +1 2 3 +4 NULL 6 +7 8 NULL +10 11 12 + +# Sleep for the refresh interval, so that we get a refresh. +statement ok +SELECT mz_unsafe.mz_sleep(8); + +# Now we should see the NULL that should error out the MV. +query error db error: ERROR: Evaluation error: column 1 must not be null +SELECT * FROM mv_assertion_plus_refresh_every ORDER BY x; + +## Check `REFRESH AT greatest(, mz_now())`, because this is an idiom that we are recommending to users. +# Insert something into the underlying table. +statement ok +INSERT INTO t2 VALUES (30, 30, 30); + +statement ok +CREATE MATERIALIZED VIEW mv_greatest +WITH (REFRESH AT greatest('1990-01-04 11:00', mz_now())) +AS SELECT * FROM t2; + +query III +SELECT * FROM mv_greatest; +---- +NULL -1 -2 +4 NULL 6 +7 8 NULL +1 2 3 +10 11 12 +30 30 30 + +## If there is no creation refresh, then a query should block until the first refresh. + +# Save the current time. +statement ok +CREATE TABLE start_time(t timestamp); + +statement ok +INSERT INTO start_time VALUES (now()); + +# Create an MV whose first refresh is 5 sec after its creation. +statement ok +CREATE MATERIALIZED VIEW mv_no_creation_refresh +WITH (REFRESH EVERY '100000 sec' ALIGNED TO mz_now()::string::int8 + 5000) +AS SELECT * FROM t2; + +# Insert something into the underlying table. statement ok -DROP TABLE t2 CASCADE +INSERT INTO t2 VALUES (100, 100, 100); + +# Query it. +# - The query should block until the first refresh. +# - The newly inserted stuff should be visible. +query III +SELECT * FROM mv_no_creation_refresh; +---- +NULL -1 -2 +4 NULL 6 +7 8 NULL +1 2 3 +10 11 12 +30 30 30 +100 100 100 + +# Verify that at least 5 seconds have passed. +query B +SELECT now() - (SELECT * from start_time) >= INTERVAL '5 sec'; +---- +true + +## Check ALIGNED TO in the far past + +# Save the current time. +statement ok +DELETE FROM start_time; + +statement ok +INSERT INTO start_time VALUES (now()); + +statement ok +CREATE MATERIALIZED VIEW mv_aligned_to_past +WITH (REFRESH EVERY '10000 ms' ALIGNED TO mz_now()::text::int8 - 10*10000 + 3000) +AS SELECT * FROM t2; + +query III +SELECT * FROM mv_aligned_to_past; +---- +NULL -1 -2 +4 NULL 6 +7 8 NULL +1 2 3 +10 11 12 +30 30 30 +100 100 100 + +# Verify that at least 3 seconds have passed. +query B +SELECT now() - (SELECT * from start_time) >= INTERVAL '3 sec'; +---- +true + +## Check ALIGNED TO in the far future + +# Save the current time. +statement ok +DELETE FROM start_time; + +statement ok +INSERT INTO start_time VALUES (now()); + +statement ok +CREATE MATERIALIZED VIEW mv_aligned_to_future +WITH (REFRESH EVERY '10000 ms' ALIGNED TO mz_now()::text::int8 + 10*10000 + 3000) +AS SELECT * FROM t2; + +query III +SELECT * FROM mv_aligned_to_future; +---- +NULL -1 -2 +4 NULL 6 +7 8 NULL +1 2 3 +10 11 12 +30 30 30 +100 100 100 + +# Verify that at least 3 seconds have passed. +query B +SELECT now() - (SELECT * from start_time) >= INTERVAL '3 sec'; +---- +true + +## Constant query in an MV with REFRESH options +statement ok +CREATE MATERIALIZED VIEW const_mv +WITH (REFRESH EVERY '1 day') +AS SELECT 1; + +query I +SELECT * FROM const_mv +---- +1 + +## We should be able to immediately query a constant MV even if the first refresh is in the future +statement ok +CREATE MATERIALIZED VIEW const_mv2 +WITH (REFRESH AT '3000-01-01 23:59') +AS SELECT 2; + +query I +SELECT * FROM const_mv2 +---- +2 + +## MV that has refreshes only in the past +statement ok +CREATE MATERIALIZED VIEW mv_no_refresh +WITH (REFRESH AT '2000-01-01 10:00') +AS SELECT * FROM t2; + +# (Be sure to NOT include the timestamp in the expected error msg, because that would vary by each test run.) +query error db error: ERROR: Timestamp \( +SELECT * FROM mv_no_refresh; + +## Query MV after the last refresh +statement ok +CREATE MATERIALIZED VIEW mv3 +WITH (REFRESH AT mz_now()::text::int8 + 2000, REFRESH AT mz_now()::text::int8 + 4000) +AS SELECT * FROM t2; + +# Wait until the first refresh +query III +SELECT * FROM mv3; +---- +NULL -1 -2 +4 NULL 6 +7 8 NULL +1 2 3 +10 11 12 +30 30 30 +100 100 100 + +# Wait until the second refresh, which is the last one +query III +SELECT * FROM mv3; +---- +NULL -1 -2 +4 NULL 6 +7 8 NULL +1 2 3 +10 11 12 +30 30 30 +100 100 100 + +# This insert will happen after the last refresh. +statement ok +INSERT INTO t2 VALUES (70, 70, 70); + +# We should be able to query the MV after the last refresh, and the newly inserted data shouldn't be visible. +query III +SELECT * FROM mv3; +---- +NULL -1 -2 +4 NULL 6 +7 8 NULL +1 2 3 +10 11 12 +30 30 30 +100 100 100 diff --git a/test/sqllogictest/rename.slt b/test/sqllogictest/rename.slt index 57f2dd40156d5..dc0c0113da491 100644 --- a/test/sqllogictest/rename.slt +++ b/test/sqllogictest/rename.slt @@ -159,7 +159,7 @@ query TT SHOW CREATE MATERIALIZED VIEW grand_friend.mv1; ---- materialize.grand_friend.mv1 -CREATE MATERIALIZED VIEW "materialize"."grand_friend"."mv1" IN CLUSTER "default" AS SELECT "x" FROM "materialize"."enemy"."v1" +CREATE MATERIALIZED VIEW "materialize"."grand_friend"."mv1" IN CLUSTER "default" WITH (REFRESH = ON COMMIT) AS SELECT "x" FROM "materialize"."enemy"."v1" statement ok CREATE TABLE a1.t (y text); diff --git a/test/testdrive/materializations.td b/test/testdrive/materializations.td index 4c4ce7bf852a4..e52128c0f3fc1 100644 --- a/test/testdrive/materializations.td +++ b/test/testdrive/materializations.td @@ -79,7 +79,7 @@ data_view > SHOW CREATE MATERIALIZED VIEW test1 name create_sql ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- -materialize.public.test1 "CREATE MATERIALIZED VIEW \"materialize\".\"public\".\"test1\" IN CLUSTER \"default\" AS SELECT \"b\", \"pg_catalog\".\"sum\"(\"a\") FROM \"materialize\".\"public\".\"data\" GROUP BY \"b\"" +materialize.public.test1 "CREATE MATERIALIZED VIEW \"materialize\".\"public\".\"test1\" IN CLUSTER \"default\" WITH (REFRESH = ON COMMIT) AS SELECT \"b\", \"pg_catalog\".\"sum\"(\"a\") FROM \"materialize\".\"public\".\"data\" GROUP BY \"b\"" # Materialized view can be built on a not-materialized view. > CREATE MATERIALIZED VIEW test2 AS diff --git a/test/testdrive/materialized-view-refresh-options.td b/test/testdrive/materialized-view-refresh-options.td new file mode 100644 index 0000000000000..254a4c7513f29 --- /dev/null +++ b/test/testdrive/materialized-view-refresh-options.td @@ -0,0 +1,134 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +$ postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}/materialize +ALTER SYSTEM SET enable_refresh_every_mvs = true + +> CREATE TABLE t1(x int); + +> INSERT INTO t1 VALUES (1); + +# This refresh interval needs to be not too small and not too big. See the constraints in comments below. +> CREATE MATERIALIZED VIEW mv1 + WITH (REFRESH EVERY '8sec') + AS SELECT x+x as x2 FROM t1; + +> INSERT INTO t1 VALUES (3); + +# The following will not immediately return the recently inserted values, but Testdrive will wait. +# Warning: This test assumes that Testdrive's timeout is greater than the above refresh interval. +> SELECT * FROM mv1; +2 +6 + +> INSERT INTO t1 VALUES (4); + +# What we just inserted shouldn't be in the mv yet, because we are just after a refresh (because the previous SELECT +# returned correct results only after a refresh). +# Warning: this test assumes that the above INSERT completes within the above refresh interval. If we have some +# transient infrastructure problem that makes the INSERT really slow, then this test will fail. +> SELECT * FROM mv1; +2 +6 + +> SELECT * FROM mv1; +2 +6 +8 + +# Check that I can query it together with other objects, even between refreshes, and that data added later than the last +# refresh in other objects is reflected in the result. +> CREATE MATERIALIZED VIEW mv2 + WITH (REFRESH = EVERY '10000sec') + AS SELECT x+x as x2 FROM t1; + +> CREATE TABLE t2(y int); + +> INSERT INTO t2 VALUES (100); + +> (SELECT * from mv2) UNION (SELECT * FROM t2); +2 +6 +8 +100 + +# The following DELETE shouldn't affect mv2, because mv2 has a very large refresh interval. +> DELETE FROM t1; + +> (SELECT * from mv2) UNION (SELECT * FROM t2); +2 +6 +8 +100 + +# Check that there is an implicit refresh immediately at the creation of the MV, even if it's REFRESH EVERY. +> CREATE MATERIALIZED VIEW mv3 + WITH (REFRESH EVERY '10000sec') + AS SELECT y+y as y2 FROM t2; + +> SELECT * FROM mv3; +200 + +# Check that mz_now() occurring in the original statement works. This tests that after we purify away `mz_now()`, we +# also remove it from `resolved_ids`. Importantly, this has to be a Testdrive test, and not an slt test, because slt +# doesn't do the "the in-memory state of the catalog does not match its on-disk state" check. +# +# Also tests that planning uses `cast_to` with `CastContext::Implicit` (instead of `type_as`) when planning the +# REFRESH AT. +> CREATE MATERIALIZED VIEW mv4 + WITH (REFRESH AT mz_now()::string::int8 + 2000) + AS SELECT y*y as y2 FROM t2; + +> SELECT * FROM mv4; +10000 + +## Check turning the replica off and on + +> CREATE CLUSTER refresh_cluster SIZE = '1', REPLICATION FACTOR = 1; +> SET cluster = refresh_cluster; +> CREATE MATERIALIZED VIEW mv5 + WITH (REFRESH EVERY '8 sec' ALIGNED TO mz_now()::text::int8 + 5000) + AS SELECT 3*y as y2 FROM t2; +> SET cluster = default; + +> SELECT * FROM mv5; +300 + +> ALTER CLUSTER refresh_cluster SET (REPLICATION FACTOR 0); + +> SELECT * FROM mv5; +300 + +> ALTER CLUSTER refresh_cluster SET (REPLICATION FACTOR 1); + +> SELECT * FROM mv5; +300 + +> INSERT INTO t2 VALUES (110); + +# Wait until the insert is reflected, so we are after a refresh. +> SELECT * FROM mv5; +300 +330 + +# Turn off the cluster, and insert something, and then sleep through a scheduled refresh. +> ALTER CLUSTER refresh_cluster SET (REPLICATION FACTOR 0); + +> INSERT INTO t2 VALUES (120); + +> SELECT mz_unsafe.mz_sleep(8); + + +# Turn it back on, and check that we recover. Data that were added while we slept should be visible now. +> ALTER CLUSTER refresh_cluster SET (REPLICATION FACTOR 1); + +> SELECT * FROM mv5; +300 +330 +360 diff --git a/test/testdrive/materialized-views.td b/test/testdrive/materialized-views.td index 2ca880da6485a..200a049f9ab44 100644 --- a/test/testdrive/materialized-views.td +++ b/test/testdrive/materialized-views.td @@ -1,4 +1,3 @@ - # Copyright Materialize, Inc. and contributors. All rights reserved. # # Use of this software is governed by the Business Source License