Skip to content

Commit

Permalink
Add REFRESH options for materialized views
Browse files Browse the repository at this point in the history
  • Loading branch information
ggevay committed Dec 12, 2023
1 parent 626f59a commit f421c70
Show file tree
Hide file tree
Showing 45 changed files with 1,757 additions and 53 deletions.
2 changes: 1 addition & 1 deletion misc/python/materialize/checks/all_checks/null_value.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def validate(self) -> Testdrive:
<null> <null> <null>
> SHOW CREATE MATERIALIZED VIEW null_value_view2;
materialize.public.null_value_view2 "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"null_value_view2\\" IN CLUSTER \\"default\\" AS SELECT \\"f1\\", \\"f2\\", NULL FROM \\"materialize\\".\\"public\\".\\"null_value_table\\" WHERE \\"f1\\" IS NULL OR \\"f1\\" IS NOT NULL OR \\"f1\\" = NULL"
materialize.public.null_value_view2 "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"null_value_view2\\" IN CLUSTER \\"default\\" WITH (REFRESH = ON COMMIT) AS SELECT \\"f1\\", \\"f2\\", NULL FROM \\"materialize\\".\\"public\\".\\"null_value_table\\" WHERE \\"f1\\" IS NULL OR \\"f1\\" IS NOT NULL OR \\"f1\\" = NULL"
> SELECT * FROM null_value_view2;
<null> <null> <null>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def validate(self) -> Testdrive:
dedent(
"""
> SHOW CREATE MATERIALIZED VIEW string_bytea_types_view1;
materialize.public.string_bytea_types_view1 "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"string_bytea_types_view1\\" IN CLUSTER \\"default\\" AS SELECT \\"text_col\\", \\"bytea_col\\", 'това'::\\"pg_catalog\\".\\"text\\", '\\\\xAAAA'::\\"pg_catalog\\".\\"bytea\\" FROM \\"materialize\\".\\"public\\".\\"text_bytea_types_table\\" WHERE \\"text_col\\" >= ''::\\"pg_catalog\\".\\"text\\" AND \\"bytea_col\\" >= ''::\\"pg_catalog\\".\\"bytea\\""
materialize.public.string_bytea_types_view1 "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"string_bytea_types_view1\\" IN CLUSTER \\"default\\" WITH (REFRESH = ON COMMIT) AS SELECT \\"text_col\\", \\"bytea_col\\", 'това'::\\"pg_catalog\\".\\"text\\", '\\\\xAAAA'::\\"pg_catalog\\".\\"bytea\\" FROM \\"materialize\\".\\"public\\".\\"text_bytea_types_table\\" WHERE \\"text_col\\" >= ''::\\"pg_catalog\\".\\"text\\" AND \\"bytea_col\\" >= ''::\\"pg_catalog\\".\\"bytea\\""
> SELECT text_col, text, LENGTH(bytea_col), LENGTH(bytea) FROM string_bytea_types_view1;
aaaa това 2 2
Expand Down
12 changes: 6 additions & 6 deletions misc/python/materialize/checks/all_checks/top_k.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,14 @@ def validate(self) -> Testdrive:
dedent(
"""
> SHOW CREATE MATERIALIZED VIEW basic_topk_view1;
materialize.public.basic_topk_view1 "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"basic_topk_view1\\" IN CLUSTER \\"default\\" AS SELECT \\"f1\\", \\"pg_catalog\\".\\"count\\"(\\"f1\\") FROM \\"materialize\\".\\"public\\".\\"basic_topk_table\\" GROUP BY \\"f1\\" ORDER BY \\"f1\\" DESC NULLS LAST LIMIT 2"
materialize.public.basic_topk_view1 "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"basic_topk_view1\\" IN CLUSTER \\"default\\" WITH (REFRESH = ON COMMIT) AS SELECT \\"f1\\", \\"pg_catalog\\".\\"count\\"(\\"f1\\") FROM \\"materialize\\".\\"public\\".\\"basic_topk_table\\" GROUP BY \\"f1\\" ORDER BY \\"f1\\" DESC NULLS LAST LIMIT 2"
> SELECT * FROM basic_topk_view1;
2 32
3 48
> SHOW CREATE MATERIALIZED VIEW basic_topk_view2;
materialize.public.basic_topk_view2 "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"basic_topk_view2\\" IN CLUSTER \\"default\\" AS SELECT \\"f1\\", \\"pg_catalog\\".\\"count\\"(\\"f1\\") FROM \\"materialize\\".\\"public\\".\\"basic_topk_table\\" GROUP BY \\"f1\\" ORDER BY \\"f1\\" ASC NULLS FIRST LIMIT 2"
materialize.public.basic_topk_view2 "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"basic_topk_view2\\" IN CLUSTER \\"default\\" WITH (REFRESH = ON COMMIT) AS SELECT \\"f1\\", \\"pg_catalog\\".\\"count\\"(\\"f1\\") FROM \\"materialize\\".\\"public\\".\\"basic_topk_table\\" GROUP BY \\"f1\\" ORDER BY \\"f1\\" ASC NULLS FIRST LIMIT 2"
> SELECT * FROM basic_topk_view2;
1 16
Expand Down Expand Up @@ -123,14 +123,14 @@ def validate(self) -> Testdrive:
dedent(
"""
> SHOW CREATE MATERIALIZED VIEW monotonic_topk_view1;
materialize.public.monotonic_topk_view1 "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"monotonic_topk_view1\\" IN CLUSTER \\"default\\" AS SELECT \\"f1\\", \\"pg_catalog\\".\\"count\\"(\\"f1\\") FROM \\"materialize\\".\\"public\\".\\"monotonic_topk_source\\" GROUP BY \\"f1\\" ORDER BY \\"f1\\" DESC NULLS LAST LIMIT 2"
materialize.public.monotonic_topk_view1 "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"monotonic_topk_view1\\" IN CLUSTER \\"default\\" WITH (REFRESH = ON COMMIT) AS SELECT \\"f1\\", \\"pg_catalog\\".\\"count\\"(\\"f1\\") FROM \\"materialize\\".\\"public\\".\\"monotonic_topk_source\\" GROUP BY \\"f1\\" ORDER BY \\"f1\\" DESC NULLS LAST LIMIT 2"
> SELECT * FROM monotonic_topk_view1;
E 5
D 4
> SHOW CREATE MATERIALIZED VIEW monotonic_topk_view2;
materialize.public.monotonic_topk_view2 "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"monotonic_topk_view2\\" IN CLUSTER \\"default\\" AS SELECT \\"f1\\", \\"pg_catalog\\".\\"count\\"(\\"f1\\") FROM \\"materialize\\".\\"public\\".\\"monotonic_topk_source\\" GROUP BY \\"f1\\" ORDER BY \\"f1\\" ASC NULLS FIRST LIMIT 2"
materialize.public.monotonic_topk_view2 "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"monotonic_topk_view2\\" IN CLUSTER \\"default\\" WITH (REFRESH = ON COMMIT) AS SELECT \\"f1\\", \\"pg_catalog\\".\\"count\\"(\\"f1\\") FROM \\"materialize\\".\\"public\\".\\"monotonic_topk_source\\" GROUP BY \\"f1\\" ORDER BY \\"f1\\" ASC NULLS FIRST LIMIT 2"
> SELECT * FROM monotonic_topk_view2;
A 1
Expand Down Expand Up @@ -186,13 +186,13 @@ def validate(self) -> Testdrive:
dedent(
"""
> SHOW CREATE MATERIALIZED VIEW monotonic_top1_view1;
materialize.public.monotonic_top1_view1 "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"monotonic_top1_view1\\" IN CLUSTER \\"default\\" AS SELECT \\"f1\\", \\"pg_catalog\\".\\"count\\"(\\"f1\\") FROM \\"materialize\\".\\"public\\".\\"monotonic_top1_source\\" GROUP BY \\"f1\\" ORDER BY \\"f1\\" DESC NULLS LAST LIMIT 1"
materialize.public.monotonic_top1_view1 "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"monotonic_top1_view1\\" IN CLUSTER \\"default\\" WITH (REFRESH = ON COMMIT) AS SELECT \\"f1\\", \\"pg_catalog\\".\\"count\\"(\\"f1\\") FROM \\"materialize\\".\\"public\\".\\"monotonic_top1_source\\" GROUP BY \\"f1\\" ORDER BY \\"f1\\" DESC NULLS LAST LIMIT 1"
> SELECT * FROM monotonic_top1_view1;
D 5
> SHOW CREATE MATERIALIZED VIEW monotonic_top1_view2;
materialize.public.monotonic_top1_view2 "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"monotonic_top1_view2\\" IN CLUSTER \\"default\\" AS SELECT \\"f1\\", \\"pg_catalog\\".\\"count\\"(\\"f1\\") FROM \\"materialize\\".\\"public\\".\\"monotonic_top1_source\\" GROUP BY \\"f1\\" ORDER BY \\"f1\\" ASC NULLS FIRST LIMIT 1"
materialize.public.monotonic_top1_view2 "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"monotonic_top1_view2\\" IN CLUSTER \\"default\\" WITH (REFRESH = ON COMMIT) AS SELECT \\"f1\\", \\"pg_catalog\\".\\"count\\"(\\"f1\\") FROM \\"materialize\\".\\"public\\".\\"monotonic_top1_source\\" GROUP BY \\"f1\\" ORDER BY \\"f1\\" ASC NULLS FIRST LIMIT 1"
> SELECT * FROM monotonic_top1_view2;
A 1
Expand Down
21 changes: 18 additions & 3 deletions src/adapter/src/catalog/builtin_table_updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,12 @@ impl CatalogState {
diff: Diff,
) -> Vec<BuiltinTableUpdate> {
let create_stmt = mz_sql::parse::parse(&view.create_sql)
.unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", view.create_sql))
.unwrap_or_else(|e| {
panic!(
"create_sql cannot be invalid: `{}` --- error: `{}`",
view.create_sql, e
)
})
.into_element()
.ast;
let query = match &create_stmt {
Expand Down Expand Up @@ -777,7 +782,12 @@ impl CatalogState {
diff: Diff,
) -> Vec<BuiltinTableUpdate> {
let create_stmt = mz_sql::parse::parse(&mview.create_sql)
.unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", mview.create_sql))
.unwrap_or_else(|e| {
panic!(
"create_sql cannot be invalid: `{}` --- error: `{}`",
mview.create_sql, e
)
})
.into_element()
.ast;
let query = match &create_stmt {
Expand Down Expand Up @@ -875,7 +885,12 @@ impl CatalogState {
let mut updates = vec![];

let create_stmt = mz_sql::parse::parse(&index.create_sql)
.unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", index.create_sql))
.unwrap_or_else(|e| {
panic!(
"create_sql cannot be invalid: `{}` --- error: `{}`",
index.create_sql, e
)
})
.into_element()
.ast;

Expand Down
1 change: 1 addition & 0 deletions src/adapter/src/catalog/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1904,6 +1904,7 @@ mod builtin_migration_tests {
resolved_ids: ResolvedIds(BTreeSet::from_iter(resolved_ids)),
cluster_id: ClusterId::User(1),
non_null_assertions: vec![],
refresh_schedule: None,
})
}
SimplifiedItem::Index { on } => {
Expand Down
1 change: 1 addition & 0 deletions src/adapter/src/catalog/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -927,6 +927,7 @@ impl CatalogState {
resolved_ids,
cluster_id: materialized_view.cluster_id,
non_null_assertions: materialized_view.non_null_assertions,
refresh_schedule: materialized_view.refresh_schedule,
})
}
Plan::CreateIndex(CreateIndexPlan { index, .. }) => CatalogItem::Index(Index {
Expand Down
1 change: 1 addition & 0 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1711,6 +1711,7 @@ impl Coordinator {
internal_view_id,
mv.desc.iter_names().cloned().collect(),
mv.non_null_assertions.clone(),
mv.refresh_schedule.clone(),
debug_name,
optimizer_config.clone(),
);
Expand Down
92 changes: 89 additions & 3 deletions src/adapter/src/coord/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,21 @@ use mz_sql::ast::{
CopyRelation, CopyStatement, InsertSource, Query, Raw, SetExpr, Statement, SubscribeStatement,
};
use mz_sql::catalog::RoleAttributes;
use mz_sql::names::{PartialItemName, ResolvedIds};
use mz_sql::names::{Aug, PartialItemName, ResolvedIds};
use mz_sql::plan::{
AbortTransactionPlan, CommitTransactionPlan, CreateRolePlan, Params, Plan, TransactionType,
};
use mz_sql::pure::{
materialized_view_option_contains_temporal, purify_create_materialized_view_options,
};
use mz_sql::rbac;
use mz_sql::rbac::CREATE_ITEM_USAGE;
use mz_sql::session::user::User;
use mz_sql::session::vars::{
EndTransactionAction, OwnedVarInput, Var, STATEMENT_LOGGING_SAMPLE_RATE,
};
use mz_sql_parser::ast::CreateMaterializedViewStatement;
use mz_storage_types::sources::Timeline;
use opentelemetry::trace::TraceContextExt;
use tokio::sync::{mpsc, oneshot, watch};
use tracing::{debug_span, Instrument};
Expand All @@ -53,7 +58,7 @@ use crate::notice::AdapterNotice;
use crate::session::{Session, TransactionOps, TransactionStatus};
use crate::util::{ClientTransmitter, ResultExt};
use crate::webhook::{AppendWebhookResponse, AppendWebhookValidator};
use crate::{catalog, metrics, ExecuteContext};
use crate::{catalog, metrics, ExecuteContext, TimelineContext};

use super::ExecuteContextExtra;

Expand Down Expand Up @@ -614,7 +619,7 @@ impl Coordinator {
let catalog = self.catalog();
let catalog = catalog.for_session(ctx.session());
let original_stmt = stmt.clone();
let (stmt, resolved_ids) = match mz_sql::names::resolve(&catalog, stmt) {
let (stmt, mut resolved_ids) = match mz_sql::names::resolve(&catalog, stmt) {
Ok(resolved) => resolved,
Err(e) => return ctx.retire(Err(e.into())),
};
Expand Down Expand Up @@ -679,6 +684,87 @@ impl Coordinator {
"CREATE SUBSOURCE statements",
))),

Statement::CreateMaterializedView(mut cmvs) => {
// (This won't be the same timestamp as the system table inserts, unfortunately.)
let mz_now = if cmvs
.with_options
.iter()
.any(materialized_view_option_contains_temporal)
{
let timeline_context =
match self.validate_timeline_context(resolved_ids.0.clone()) {
Ok(tc) => tc,
Err(e) => return ctx.retire(Err(e)),
};
let timeline = match timeline_context {
TimelineContext::TimelineDependent(timeline) => timeline,
TimelineContext::TimestampDependent
| TimelineContext::TimestampIndependent => {
// We default to EpochMilliseconds, similarly to `determine_timestamp_for`.
// Note that we didn't accurately decide whether we are TimestampDependent
// or TimestampIndependent, because for this we'd need to also check whether
// `query.contains_temporal()`, similarly to how `peek_stage_validate` does.
// However, this doesn't matter here, as we are just going to default to
// EpochMilliseconds in both cases.
Timeline::EpochMilliseconds
}
};
Some(self.get_timestamp_oracle(&timeline).read_ts().await)
// TODO: It might be good to take into account `least_valid_read` in addition to
// the oracle's `read_ts`, but there are two problems:
// 1. At this point, we don't know which indexes would be used. We could do an
// overestimation here by grabbing the ids of all indexes that are on ids
// involved in the query. (We'd have to recursively follow view definitions,
// similarly to `validate_timeline_context`.)
// 2. For a peak, when the `least_valid_read` is later than the oracle's
// `read_ts`, then the peak doesn't return before it completes at the chosen
// timestamp. However, for a CRATE MATERIALIZED VIEW statement, it's not clear
// whether we want to make it block until the chosen time. If it doesn't block,
// then the initial refresh wouldn't be linearized with the CREATE MATERIALIZED
// VIEW statement.
//
// Note: The Adapter is usually keeping a read hold of all objects at the oracle
// read timestamp, so `least_valid_read` usually won't actually be later than
// the oracle's `read_ts`. (see `Coordinator::advance_timelines`)
//
// Note 2: If we choose a timestamp here that is earlier than
// `least_valid_read`, that is somewhat bad, but not catastrophic: The only
// bad thing that happens is that we won't perform that refresh that was
// specified to be at `mz_now()` (which is usually the initial refresh)
// (similarly to how we don't perform refreshes that were specified to be in the
// past).
} else {
None
};

let owned_catalog = self.owned_catalog();
let catalog = owned_catalog.for_session(ctx.session());

purify_create_materialized_view_options(
catalog,
mz_now,
&mut cmvs,
&mut resolved_ids,
);

let purified_stmt =
Statement::CreateMaterializedView(CreateMaterializedViewStatement::<Aug> {
if_exists: cmvs.if_exists,
name: cmvs.name,
columns: cmvs.columns,
in_cluster: cmvs.in_cluster,
query: cmvs.query,
with_options: cmvs.with_options,
});

// (Purifying CreateMaterializedView doesn't happen async, so no need to send
// `Message::PurifiedStatementReady` here.)
match self.plan_statement(ctx.session(), purified_stmt, &params, &resolved_ids) {
Ok(plan) => self.sequence_plan(ctx, plan, resolved_ids).await,
Err(e) => ctx.retire(Err(e)),
}
}

// All other statements are handled immediately.
_ => match self.plan_statement(ctx.session(), stmt, &params, &resolved_ids) {
Ok(plan) => self.sequence_plan(ctx, plan, resolved_ids).await,
Expand Down
Loading

0 comments on commit f421c70

Please sign in to comment.