Skip to content

Commit

Permalink
fixup! ct: add ::ContinualTask various adapter enums
Browse files Browse the repository at this point in the history
  • Loading branch information
danhhz committed Sep 19, 2024
1 parent 4e276b6 commit c5daf0f
Show file tree
Hide file tree
Showing 12 changed files with 148 additions and 14 deletions.
5 changes: 5 additions & 0 deletions src/adapter/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1136,6 +1136,11 @@ impl Catalog {
.filter(|role| role.is_user())
}

pub fn user_continual_tasks(&self) -> impl Iterator<Item = &CatalogEntry> {
self.entries()
.filter(|entry| entry.is_continual_task() && entry.id().is_user())
}

pub fn system_privileges(&self) -> &PrivilegeMap {
&self.state.system_privileges
}
Expand Down
27 changes: 22 additions & 5 deletions src/adapter/src/catalog/builtin_table_updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use mz_catalog::builtin::{
MZ_AWS_PRIVATELINK_CONNECTIONS, MZ_BASE_TYPES, MZ_CLUSTERS, MZ_CLUSTER_REPLICAS,
MZ_CLUSTER_REPLICA_METRICS, MZ_CLUSTER_REPLICA_SIZES, MZ_CLUSTER_REPLICA_STATUSES,
MZ_CLUSTER_SCHEDULES, MZ_CLUSTER_WORKLOAD_CLASSES, MZ_COLUMNS, MZ_COMMENTS, MZ_CONNECTIONS,
MZ_DATABASES, MZ_DEFAULT_PRIVILEGES, MZ_EGRESS_IPS, MZ_FUNCTIONS,
MZ_CONTINUAL_TASKS, MZ_DATABASES, MZ_DEFAULT_PRIVILEGES, MZ_EGRESS_IPS, MZ_FUNCTIONS,
MZ_HISTORY_RETENTION_STRATEGIES, MZ_INDEXES, MZ_INDEX_COLUMNS, MZ_INTERNAL_CLUSTER_REPLICAS,
MZ_KAFKA_CONNECTIONS, MZ_KAFKA_SINKS, MZ_KAFKA_SOURCES, MZ_LIST_TYPES, MZ_MAP_TYPES,
MZ_MATERIALIZED_VIEWS, MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES, MZ_MYSQL_SOURCE_TABLES,
Expand Down Expand Up @@ -53,7 +53,7 @@ use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
use mz_repr::refresh_schedule::RefreshEvery;
use mz_repr::role_id::RoleId;
use mz_repr::{Datum, Diff, GlobalId, Row, RowPacker, ScalarType, Timestamp};
use mz_sql::ast::{CreateIndexStatement, Statement, UnresolvedItemName};
use mz_sql::ast::{ContinualTaskStmt, CreateIndexStatement, Statement, UnresolvedItemName};
use mz_sql::catalog::{
CatalogCluster, CatalogDatabase, CatalogSchema, CatalogType, DefaultPrivilegeObject,
TypeCategory,
Expand Down Expand Up @@ -1359,13 +1359,30 @@ impl CatalogState {
})
.into_element()
.ast;
let query_string = "TODO(ct)";
let query_string = match &create_stmt {
Statement::CreateContinualTask(stmt) => {
let mut query_string = String::new();
for stmt in &stmt.stmts {
let s = match stmt {
ContinualTaskStmt::Insert(stmt) => stmt.to_ast_string_stable(),
ContinualTaskStmt::Delete(stmt) => stmt.to_ast_string_stable(),
};
if query_string.is_empty() {
query_string = s;
} else {
query_string.push_str("; ");
query_string.push_str(&s);
}
}
query_string
}
_ => unreachable!(),
};

let mut updates = Vec::new();

updates.push(BuiltinTableUpdate {
// TODO(ct): MZ_CONTINUAL_TASKS
id: &*MZ_MATERIALIZED_VIEWS,
id: &*MZ_CONTINUAL_TASKS,
row: Row::pack_slice(&[
Datum::String(&id.to_string()),
Datum::UInt32(oid),
Expand Down
16 changes: 11 additions & 5 deletions src/adapter/src/coord/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use mz_sql::names::ResolvedDatabaseSpecifier;
use mz_sql::plan::ConnectionDetails;
use mz_sql::session::metadata::SessionMetadata;
use mz_sql::session::vars::{
self, SystemVars, Var, MAX_AWS_PRIVATELINK_CONNECTIONS, MAX_CLUSTERS,
self, SystemVars, Var, MAX_AWS_PRIVATELINK_CONNECTIONS, MAX_CLUSTERS, MAX_CONTINUAL_TASKS,
MAX_CREDIT_CONSUMPTION_RATE, MAX_DATABASES, MAX_KAFKA_CONNECTIONS, MAX_MATERIALIZED_VIEWS,
MAX_MYSQL_CONNECTIONS, MAX_OBJECTS_PER_SCHEMA, MAX_POSTGRES_CONNECTIONS,
MAX_REPLICAS_PER_CLUSTER, MAX_ROLES, MAX_SCHEMAS_PER_DATABASE, MAX_SECRETS, MAX_SINKS,
Expand Down Expand Up @@ -1321,6 +1321,7 @@ impl Coordinator {
let mut new_objects_per_schema = BTreeMap::new();
let mut new_secrets = 0;
let mut new_roles = 0;
let mut new_continual_tasks = 0;
for op in ops {
match op {
Op::CreateDatabase { .. } => {
Expand Down Expand Up @@ -1387,8 +1388,7 @@ impl Coordinator {
new_secrets += 1;
}
CatalogItem::ContinualTask(_) => {
// TODO(ct): Give CTs their own limit?
new_materialized_views += 1;
new_continual_tasks += 1;
}
CatalogItem::Log(_)
| CatalogItem::View(_)
Expand Down Expand Up @@ -1463,8 +1463,7 @@ impl Coordinator {
new_secrets -= 1;
}
CatalogItem::ContinualTask(_) => {
// TODO(ct): Give CTs their own limit?
new_materialized_views -= 1;
new_continual_tasks -= 1;
}
CatalogItem::Log(_)
| CatalogItem::View(_)
Expand Down Expand Up @@ -1702,6 +1701,13 @@ impl Coordinator {
"role",
MAX_ROLES.name(),
)?;
self.validate_resource_limit(
self.catalog().user_continual_tasks().count(),
new_continual_tasks,
SystemVars::max_continual_tasks,
"continual_task",
MAX_CONTINUAL_TASKS.name(),
)?;
Ok(())
}

Expand Down
1 change: 0 additions & 1 deletion src/adapter/src/coord/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,6 @@ impl Coordinator {
CatalogItem::ContinualTask(ContinualTask { raw_expr, .. }) => {
// See comment in MaterializedView
timelines.insert(TimelineContext::TimestampDependent);
// TODO(ct): optimized_expr?
ids.extend(raw_expr.depends_on());
}
CatalogItem::Table(table) => {
Expand Down
51 changes: 51 additions & 0 deletions src/catalog/src/builtin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2410,6 +2410,30 @@ pub static MZ_TYPES: LazyLock<BuiltinTable> = LazyLock::new(|| BuiltinTable {
is_retained_metrics_object: false,
access: vec![PUBLIC_SELECT],
});
pub static MZ_CONTINUAL_TASKS: LazyLock<BuiltinTable> = LazyLock::new(|| BuiltinTable {
name: "mz_continual_tasks",
schema: MZ_INTERNAL_SCHEMA,
oid: oid::TABLE_MZ_CONTINUAL_TASKS_OID,
desc: RelationDesc::builder()
.with_column("id", ScalarType::String.nullable(false))
.with_column("oid", ScalarType::Oid.nullable(false))
.with_column("schema_id", ScalarType::String.nullable(false))
.with_column("name", ScalarType::String.nullable(false))
.with_column("cluster_id", ScalarType::String.nullable(false))
.with_column("definition", ScalarType::String.nullable(false))
.with_column("owner_id", ScalarType::String.nullable(false))
.with_column(
"privileges",
ScalarType::Array(Box::new(ScalarType::MzAclItem)).nullable(false),
)
.with_column("create_sql", ScalarType::String.nullable(false))
.with_column("redacted_create_sql", ScalarType::String.nullable(false))
.with_key(vec![0])
.with_key(vec![1])
.finish(),
is_retained_metrics_object: false,
access: vec![PUBLIC_SELECT],
});
/// PostgreSQL-specific metadata about types that doesn't make sense to expose
/// in the `mz_types` table as part of our public, stable API.
pub static MZ_TYPE_PG_METADATA: LazyLock<BuiltinTable> = LazyLock::new(|| BuiltinTable {
Expand Down Expand Up @@ -6739,6 +6763,31 @@ ORDER BY 1, 2"#,
access: vec![PUBLIC_SELECT],
});

pub static MZ_SHOW_CONTINUAL_TASKS: LazyLock<BuiltinView> = LazyLock::new(|| BuiltinView {
name: "mz_show_continual_tasks",
schema: MZ_INTERNAL_SCHEMA,
oid: oid::VIEW_MZ_SHOW_CONTINUAL_TASKS_OID,
column_defs: None,
sql: "
WITH comments AS (
SELECT id, comment
FROM mz_internal.mz_comments
WHERE object_type = 'continual-task' AND object_sub_id IS NULL
)
SELECT
cts.id as id,
cts.name,
clusters.name AS cluster,
schema_id,
cluster_id,
COALESCE(comments.comment, '') as comment
FROM
mz_internal.mz_continual_tasks AS cts
JOIN mz_catalog.mz_clusters AS clusters ON clusters.id = cts.cluster_id
LEFT JOIN comments ON cts.id = comments.id",
access: vec![PUBLIC_SELECT],
});

pub static MZ_SHOW_ROLE_MEMBERS: LazyLock<BuiltinView> = LazyLock::new(|| BuiltinView {
name: "mz_show_role_members",
schema: MZ_INTERNAL_SCHEMA,
Expand Down Expand Up @@ -8190,6 +8239,7 @@ pub static BUILTINS_STATIC: LazyLock<Vec<Builtin<NameReference>>> = LazyLock::ne
Builtin::Table(&MZ_COMMENTS),
Builtin::Table(&MZ_WEBHOOKS_SOURCES),
Builtin::Table(&MZ_HISTORY_RETENTION_STRATEGIES),
Builtin::Table(&MZ_CONTINUAL_TASKS),
Builtin::View(&MZ_RELATIONS),
Builtin::View(&MZ_OBJECT_OID_ALIAS),
Builtin::View(&MZ_OBJECTS),
Expand Down Expand Up @@ -8252,6 +8302,7 @@ pub static BUILTINS_STATIC: LazyLock<Vec<Builtin<NameReference>>> = LazyLock::ne
Builtin::View(&MZ_SHOW_SINKS),
Builtin::View(&MZ_SHOW_MATERIALIZED_VIEWS),
Builtin::View(&MZ_SHOW_INDEXES),
Builtin::View(&MZ_SHOW_CONTINUAL_TASKS),
Builtin::View(&MZ_CLUSTER_REPLICA_HISTORY),
Builtin::View(&MZ_TIMEZONE_NAMES),
Builtin::View(&MZ_TIMEZONE_ABBREVIATIONS),
Expand Down
5 changes: 5 additions & 0 deletions src/catalog/src/memory/objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1799,6 +1799,11 @@ impl CatalogEntry {
matches!(self.item(), CatalogItem::Index(_))
}

/// Reports whether this catalog entry is a continual task.
pub fn is_continual_task(&self) -> bool {
matches!(self.item(), CatalogItem::ContinualTask(_))
}

/// Reports whether this catalog entry can be treated as a relation, it can produce rows.
pub fn is_relation(&self) -> bool {
mz_sql::catalog::ObjectType::from(self.item_type()).is_relation()
Expand Down
2 changes: 2 additions & 0 deletions src/pgrepr-consts/src/oid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -744,3 +744,5 @@ pub const SOURCE_MZ_WALLCLOCK_LAG_HISTORY_OID: u32 = 17021;
pub const VIEW_MZ_WALLCLOCK_GLOBAL_LAG_HISTORY_OID: u32 = 17022;
pub const VIEW_MZ_WALLCLOCK_GLOBAL_LAG_RECENT_HISTORY_OID: u32 = 17023;
pub const INDEX_MZ_WALLCLOCK_GLOBAL_LAG_RECENT_HISTORY_IND_OID: u32 = 17024;
pub const TABLE_MZ_CONTINUAL_TASKS_OID: u32 = 17025;
pub const VIEW_MZ_SHOW_CONTINUAL_TASKS_OID: u32 = 17026;
7 changes: 6 additions & 1 deletion src/sql-parser/src/ast/defs/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3163,6 +3163,9 @@ pub enum ShowObjectType<T: AstInfo> {
RoleMembership {
role: Option<T::RoleName>,
},
ContinualTask {
in_cluster: Option<T::ClusterName>,
},
}
/// `SHOW <object>S`
///
Expand Down Expand Up @@ -3204,6 +3207,7 @@ impl<T: AstInfo> AstDisplay for ShowObjectsStatement<T> {
ShowObjectType::Privileges { .. } => "PRIVILEGES",
ShowObjectType::DefaultPrivileges { .. } => "DEFAULT PRIVILEGES",
ShowObjectType::RoleMembership { .. } => "ROLE MEMBERSHIP",
ShowObjectType::ContinualTask { .. } => "CONTINUAL TASK",
});

if let ShowObjectType::Index { on_object, .. } = &self.object_type {
Expand All @@ -3228,7 +3232,8 @@ impl<T: AstInfo> AstDisplay for ShowObjectsStatement<T> {
ShowObjectType::MaterializedView { in_cluster }
| ShowObjectType::Index { in_cluster, .. }
| ShowObjectType::Sink { in_cluster }
| ShowObjectType::Source { in_cluster } => {
| ShowObjectType::Source { in_cluster }
| ShowObjectType::ContinualTask { in_cluster } => {
if let Some(cluster) = in_cluster {
f.write_str(" IN CLUSTER ");
f.write_node(cluster);
Expand Down
3 changes: 1 addition & 2 deletions src/sql-parser/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7241,8 +7241,7 @@ impl<'a> Parser<'a> {
}
ObjectType::ContinualTask => {
let in_cluster = self.parse_optional_in_cluster()?;
// TODO(ct): ShowObjectType::ContinualTask
ShowObjectType::MaterializedView { in_cluster }
ShowObjectType::ContinualTask { in_cluster }
}
ObjectType::Index => {
let on_object = if self.parse_one_of_keywords(&[ON]).is_some() {
Expand Down
32 changes: 32 additions & 0 deletions src/sql/src/plan/statement/show.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,9 @@ pub fn show_objects<'a>(
assert_none!(from, "parser should reject from");
show_role_membership(scx, role, filter)
}
ShowObjectType::ContinualTask { in_cluster } => {
show_continual_tasks(scx, from, in_cluster, filter)
}
}
}

Expand Down Expand Up @@ -820,6 +823,35 @@ pub fn show_role_membership<'a>(
)
}

fn show_continual_tasks<'a>(
scx: &'a StatementContext<'a>,
from: Option<ResolvedSchemaName>,
in_cluster: Option<ResolvedClusterName>,
filter: Option<ShowStatementFilter<Aug>>,
) -> Result<ShowSelect<'a>, PlanError> {
let schema_spec = scx.resolve_optional_schema(&from)?;
let mut where_clause = format!("schema_id = '{schema_spec}'");

if let Some(cluster) = in_cluster {
write!(where_clause, " AND cluster_id = '{}'", cluster.id)
.expect("write on string cannot fail");
}

let query = format!(
"SELECT name, cluster, comment
FROM mz_internal.mz_show_continual_tasks
WHERE {where_clause}"
);

ShowSelect::new(
scx,
query,
filter,
None,
Some(&["name", "cluster", "comment"]),
)
}

/// An intermediate result when planning a `SHOW` query.
///
/// Can be interrogated for its columns, or converted into a proper [`Plan`].
Expand Down
6 changes: 6 additions & 0 deletions src/sql/src/session/vars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1179,6 +1179,7 @@ impl SystemVars {
&MAX_OBJECTS_PER_SCHEMA,
&MAX_SECRETS,
&MAX_ROLES,
&MAX_CONTINUAL_TASKS,
&MAX_RESULT_SIZE,
&MAX_COPY_FROM_SIZE,
&ALLOWED_CLUSTER_REPLICA_SIZES,
Expand Down Expand Up @@ -1690,6 +1691,11 @@ impl SystemVars {
*self.expect_value(&MAX_ROLES)
}

/// Returns the value of the `max_continual_tasks` configuration parameter.
pub fn max_continual_tasks(&self) -> u32 {
*self.expect_value(&MAX_CONTINUAL_TASKS)
}

/// Returns the value of the `max_result_size` configuration parameter.
pub fn max_result_size(&self) -> u64 {
self.expect_value::<ByteSize>(&MAX_RESULT_SIZE).as_bytes()
Expand Down
7 changes: 7 additions & 0 deletions src/sql/src/session/vars/definitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,13 @@ pub static MAX_ROLES: VarDefinition = VarDefinition::new(
true,
);

pub static MAX_CONTINUAL_TASKS: VarDefinition = VarDefinition::new(
"max_continual_tasks",
value!(u32; 100),
"The maximum number of continual_tasks in the region, across all schemas (Materialize).",
true,
);

// Cloud environmentd is configured with 4 GiB of RAM, so 1 GiB is a good heuristic for a single
// query.
//
Expand Down

0 comments on commit c5daf0f

Please sign in to comment.