Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ct: get comments and SHOW working #29769

Merged
merged 1 commit into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,11 @@ impl Coordinator {
let as_of = read_holds.least_valid_read();
df_desc.set_as_of(as_of.clone());

// TODO(ct): HACKs
// The MV optimizer is hardcoded to output a PersistSinkConnection.
// Sniff it and swap for the ContinualTaskSink. If/when we split out a
// Continual Task optimizer, this won't be necessary, and in the
// meantime, it seems undesirable to burden the MV optimizer with a
// configuration for this.
for sink in df_desc.sink_exports.values_mut() {
match &mut sink.connection {
ComputeSinkConnection::Persist(PersistSinkConnection {
Expand Down
9 changes: 9 additions & 0 deletions src/catalog/src/builtin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7650,6 +7650,15 @@ ON mz_internal.mz_sink_status_history (sink_id)",
is_retained_metrics_object: false,
};

pub const MZ_SHOW_CONTINUAL_TASKS_IND: BuiltinIndex = BuiltinIndex {
name: "mz_show_continual_tasks_ind",
schema: MZ_INTERNAL_SCHEMA,
oid: oid::INDEX_MZ_SHOW_CONTINUAL_TASKS_OID,
sql: "IN CLUSTER mz_catalog_server
ON mz_internal.mz_show_continual_tasks (id)",
is_retained_metrics_object: false,
};

// In both `mz_source_statistics` and `mz_sink_statistics` we cast the `SUM` of
// uint8's to `uint8` instead of leaving them as `numeric`. This is because we want to
// save index space, and we don't expect the sum to be > 2^63
Expand Down
3 changes: 1 addition & 2 deletions src/catalog/src/durable/objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1152,8 +1152,7 @@ impl ItemValue {
}
Some("CONTINUAL") => {
assert_eq!(tokens.next(), Some("TASK"));
// TODO(ct): CatalogItemType::ContinualTask
CatalogItemType::MaterializedView
CatalogItemType::ContinualTask
}
Some("INDEX") => CatalogItemType::Index,
Some("TYPE") => CatalogItemType::Type,
Expand Down
1 change: 1 addition & 0 deletions src/pgrepr-consts/src/oid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -747,3 +747,4 @@ pub const INDEX_MZ_WALLCLOCK_GLOBAL_LAG_RECENT_HISTORY_IND_OID: u32 = 17024;
pub const TABLE_MZ_KAFKA_SOURCE_TABLES_OID: u32 = 17025;
pub const TABLE_MZ_CONTINUAL_TASKS_OID: u32 = 17026;
pub const VIEW_MZ_SHOW_CONTINUAL_TASKS_OID: u32 = 17027;
pub const INDEX_MZ_SHOW_CONTINUAL_TASKS_OID: u32 = 17028;
1 change: 1 addition & 0 deletions src/sql-lexer/src/keywords.txt
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,7 @@ Table
Tables
Tail
Task
Tasks
Temp
Temporary
Text
Expand Down
5 changes: 5 additions & 0 deletions src/sql-parser/src/ast/defs/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5143,6 +5143,7 @@ pub enum CommentObjectType<T: AstInfo> {
Schema { name: T::SchemaName },
Cluster { name: T::ClusterName },
ClusterReplica { name: QualifiedReplica },
ContinualTask { name: T::ItemName },
}

impl<T: AstInfo> AstDisplay for CommentObjectType<T> {
Expand Down Expand Up @@ -5214,6 +5215,10 @@ impl<T: AstInfo> AstDisplay for CommentObjectType<T> {
f.write_str("CLUSTER REPLICA ");
f.write_node(name);
}
ContinualTask { name } => {
f.write_str("CONTINUAL TASK ");
f.write_node(name);
}
}
}
}
Expand Down
15 changes: 15 additions & 0 deletions src/sql-parser/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8796,6 +8796,7 @@ impl<'a> Parser<'a> {
DATABASES,
SCHEMAS,
SUBSOURCES,
CONTINUAL,
])? {
TABLES => ObjectType::Table,
VIEWS => ObjectType::View,
Expand Down Expand Up @@ -8826,6 +8827,14 @@ impl<'a> Parser<'a> {
DATABASES => ObjectType::Database,
SCHEMAS => ObjectType::Schema,
SUBSOURCES => ObjectType::Subsource,
CONTINUAL => {
if self.parse_keyword(TASKS) {
ObjectType::ContinualTask
} else {
self.prev_token();
return None;
}
}
_ => unreachable!(),
},
)
Expand Down Expand Up @@ -9005,6 +9014,7 @@ impl<'a> Parser<'a> {
DATABASE,
SCHEMA,
CLUSTER,
CONTINUAL,
])? {
TABLE => {
let name = self.parse_raw_name()?;
Expand Down Expand Up @@ -9072,6 +9082,11 @@ impl<'a> Parser<'a> {
let name = self.parse_column_name()?;
CommentObjectType::Column { name }
}
CONTINUAL => {
self.expect_keyword(TASK)?;
let name = self.parse_raw_name()?;
CommentObjectType::ContinualTask { name }
}
_ => unreachable!(),
};

Expand Down
7 changes: 5 additions & 2 deletions src/sql/src/plan/statement/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6681,7 +6681,6 @@ pub fn plan_comment(
}
}

// TODO(ct): Add ::ContinualTask variant.
let (object_id, column_pos) = match &object {
com_ty @ CommentObjectType::Table { name }
| com_ty @ CommentObjectType::View { name }
Expand All @@ -6691,7 +6690,8 @@ pub fn plan_comment(
| com_ty @ CommentObjectType::Connection { name }
| com_ty @ CommentObjectType::Source { name }
| com_ty @ CommentObjectType::Sink { name }
| com_ty @ CommentObjectType::Secret { name } => {
| com_ty @ CommentObjectType::Secret { name }
| com_ty @ CommentObjectType::ContinualTask { name } => {
let item = scx.get_item_by_resolved_name(name)?;
match (com_ty, item.item_type()) {
(CommentObjectType::Table { .. }, CatalogItemType::Table) => {
Expand Down Expand Up @@ -6721,6 +6721,9 @@ pub fn plan_comment(
(CommentObjectType::Secret { .. }, CatalogItemType::Secret) => {
(CommentObjectId::Secret(item.id()), None)
}
(CommentObjectType::ContinualTask { .. }, CatalogItemType::ContinualTask) => {
(CommentObjectId::ContinualTask(item.id()), None)
}
(com_ty, cat_ty) => {
let expected_type = match com_ty {
CommentObjectType::Table { .. } => ObjectType::Table,
Expand Down
36 changes: 36 additions & 0 deletions test/sqllogictest/ct_show.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

mode cockroach

simple conn=mz_system,user=mz_system
ALTER SYSTEM SET enable_create_continual_task = true
----
COMPLETE 0

statement ok
CREATE TABLE input (key INT)

statement ok
CREATE CONTINUAL TASK ct (key INT) ON INPUT input AS (
INSERT INTO ct SELECT * FROM input;
)

simple conn=mz_system,user=mz_system
ALTER SYSTEM SET enable_comment TO true;
----
COMPLETE 0

statement ok
COMMENT ON CONTINUAL TASK ct is 'foo'

query TTT
SHOW CONTINUAL TASKS;
----
ct quickstart foo
Loading