Skip to content

Commit

Permalink
ct: establish CREATE CONTINUAL TASK plumbing
Browse files Browse the repository at this point in the history
  • Loading branch information
danhhz committed Sep 6, 2024
1 parent 04206f5 commit 74a4630
Show file tree
Hide file tree
Showing 22 changed files with 205 additions and 28 deletions.
7 changes: 4 additions & 3 deletions src/adapter/src/catalog/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ use mz_sql::names::{
ResolvedIds, SchemaId, SchemaSpecifier, SystemObjectId,
};
use mz_sql::plan::{
CreateConnectionPlan, CreateIndexPlan, CreateMaterializedViewPlan, CreateSecretPlan,
CreateSinkPlan, CreateSourcePlan, CreateTablePlan, CreateTypePlan, CreateViewPlan, Params,
Plan, PlanContext,
CreateConnectionPlan, CreateContinualTaskPlan, CreateIndexPlan, CreateMaterializedViewPlan,
CreateSecretPlan, CreateSinkPlan, CreateSourcePlan, CreateTablePlan, CreateTypePlan,
CreateViewPlan, Params, Plan, PlanContext,
};
use mz_sql::rbac;
use mz_sql::session::metadata::SessionMetadata;
Expand Down Expand Up @@ -942,6 +942,7 @@ impl CatalogState {
initial_as_of,
})
}
Plan::CreateContinualTask(CreateContinualTaskPlan {}) => todo!("WIP"),
Plan::CreateIndex(CreateIndexPlan { index, .. }) => CatalogItem::Index(Index {
create_sql: index.create_sql,
on: index.on,
Expand Down
5 changes: 5 additions & 0 deletions src/adapter/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,8 @@ pub enum ExecuteResponse {
CreatedViews,
/// The requested materialized view was created.
CreatedMaterializedView,
/// The requested continual task was created.
CreatedContinualTask,
/// The requested type was created.
CreatedType,
/// The requested prepared statement was removed.
Expand Down Expand Up @@ -488,6 +490,7 @@ impl TryInto<ExecuteResponse> for ExecuteResponseKind {
ExecuteResponseKind::CreatedMaterializedView => {
Ok(ExecuteResponse::CreatedMaterializedView)
}
ExecuteResponseKind::CreatedContinualTask => Ok(ExecuteResponse::CreatedContinualTask),
ExecuteResponseKind::CreatedType => Ok(ExecuteResponse::CreatedType),
ExecuteResponseKind::Deallocate => Err(()),
ExecuteResponseKind::DeclaredCursor => Ok(ExecuteResponse::DeclaredCursor),
Expand Down Expand Up @@ -549,6 +552,7 @@ impl ExecuteResponse {
CreatedView { .. } => Some("CREATE VIEW".into()),
CreatedViews { .. } => Some("CREATE VIEWS".into()),
CreatedMaterializedView { .. } => Some("CREATE MATERIALIZED VIEW".into()),
CreatedContinualTask { .. } => Some("CREATE CONTINUAL TASK".into()),
CreatedType => Some("CREATE TYPE".into()),
Deallocate { all } => Some(format!("DEALLOCATE{}", if *all { " ALL" } else { "" })),
DeclaredCursor => Some("DECLARE CURSOR".into()),
Expand Down Expand Up @@ -637,6 +641,7 @@ impl ExecuteResponse {
CreateTable => &[CreatedTable],
CreateView => &[CreatedView],
CreateMaterializedView => &[CreatedMaterializedView],
CreateContinualTask => &[CreatedContinualTask],
CreateIndex => &[CreatedIndex],
CreateType => &[CreatedType],
PlanKind::Deallocate => &[ExecuteResponseKind::Deallocate],
Expand Down
1 change: 1 addition & 0 deletions src/adapter/src/coord/catalog_serving.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ pub fn auto_run_on_catalog_server<'a, 's, 'p>(
| Plan::CreateRole(_)
| Plan::CreateCluster(_)
| Plan::CreateClusterReplica(_)
| Plan::CreateContinualTask(_)
| Plan::CreateSource(_)
| Plan::CreateSources(_)
| Plan::CreateSecret(_)
Expand Down
1 change: 1 addition & 0 deletions src/adapter/src/coord/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,7 @@ impl Coordinator {
| Statement::CreateDatabase(_)
| Statement::CreateIndex(_)
| Statement::CreateMaterializedView(_)
| Statement::CreateContinualTask(_)
| Statement::CreateRole(_)
| Statement::CreateSchema(_)
| Statement::CreateSecret(_)
Expand Down
6 changes: 6 additions & 0 deletions src/adapter/src/coord/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,12 @@ impl Coordinator {
self.sequence_create_materialized_view(ctx, plan, resolved_ids)
.await;
}
Plan::CreateContinualTask(plan) => {
let res = self
.sequence_create_continual_task(ctx.session(), plan, resolved_ids)
.await;
ctx.retire(res);
}
Plan::CreateIndex(plan) => {
self.sequence_create_index(ctx, plan, resolved_ids).await;
}
Expand Down
1 change: 1 addition & 0 deletions src/adapter/src/coord/sequencer/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ use crate::util::{viewable_variables, ClientTransmitter, ResultExt};
use crate::{guard_write_critical_section, PeekResponseUnary, ReadHolds};

mod cluster;
mod create_continual_task;
mod create_index;
mod create_materialized_view;
mod create_view;
Expand Down
29 changes: 29 additions & 0 deletions src/adapter/src/coord/sequencer/inner/create_continual_task.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use mz_ore::instrument;
use mz_sql::names::ResolvedIds;
use mz_sql::plan;

use crate::command::ExecuteResponse;
use crate::coord::Coordinator;
use crate::error::AdapterError;
use crate::session::Session;

impl Coordinator {
#[instrument]
pub(crate) async fn sequence_create_continual_task(
&mut self,
session: &Session,
plan: plan::CreateContinualTaskPlan,
resolved_ids: ResolvedIds,
) -> Result<ExecuteResponse, AdapterError> {
todo!("WIP {:?}", (session, plan, resolved_ids));
}
}
1 change: 1 addition & 0 deletions src/adapter/src/statement_logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ impl From<&ExecuteResponse> for StatementEndedExecutionReason {
| ExecuteResponse::CreatedView
| ExecuteResponse::CreatedViews
| ExecuteResponse::CreatedMaterializedView
| ExecuteResponse::CreatedContinualTask
| ExecuteResponse::CreatedType
| ExecuteResponse::Deallocate { .. }
| ExecuteResponse::DeclaredCursor
Expand Down
5 changes: 5 additions & 0 deletions src/catalog/src/durable/objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1150,6 +1150,11 @@ impl ItemValue {
assert_eq!(tokens.next(), Some("VIEW"));
CatalogItemType::MaterializedView
}
Some("CONTINUAL") => {
assert_eq!(tokens.next(), Some("TASK"));
// TODO(ct): CatalogItemType::ContinualTask
CatalogItemType::MaterializedView
}
Some("INDEX") => CatalogItemType::Index,
Some("TYPE") => CatalogItemType::Type,
Some("FUNCTION") => CatalogItemType::Func,
Expand Down
3 changes: 3 additions & 0 deletions src/compute-client/src/controller/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1242,6 +1242,9 @@ where
};
ComputeSinkConnection::Persist(conn)
}
ComputeSinkConnection::ContinualTask(conn) => {
todo!("WIP {:?}", conn);
}
ComputeSinkConnection::Subscribe(conn) => ComputeSinkConnection::Subscribe(conn),
ComputeSinkConnection::CopyToS3Oneshot(conn) => {
ComputeSinkConnection::CopyToS3Oneshot(conn)
Expand Down
4 changes: 4 additions & 0 deletions src/compute-types/src/sinks.proto
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ message ProtoComputeSinkConnection {
google.protobuf.Empty subscribe = 1;
ProtoPersistSinkConnection persist = 2;
ProtoCopyToS3OneshotSinkConnection copy_to_s3_oneshot = 3;
ProtoContinualTaskConnection continual_task = 4;
}
}

Expand All @@ -46,6 +47,9 @@ message ProtoPersistSinkConnection {
mz_storage_types.controller.ProtoCollectionMetadata storage_metadata = 2;
}

message ProtoContinualTaskConnection {
}

message ProtoCopyToS3OneshotSinkConnection {
mz_storage_types.sinks.ProtoS3UploadInfo upload_info = 1;
mz_storage_types.connections.aws.ProtoAwsConnection aws_connection = 2;
Expand Down
25 changes: 25 additions & 0 deletions src/compute-types/src/sinks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ pub enum ComputeSinkConnection<S: 'static = ()> {
Subscribe(SubscribeSinkConnection),
/// TODO(#25239): Add documentation.
Persist(PersistSinkConnection<S>),
/// TODO(#25239): Add documentation.
ContinualTask(ContinualTaskConnection<S>),
/// A compute sink to do a oneshot copy to s3.
CopyToS3Oneshot(CopyToS3OneshotSinkConnection),
}
Expand All @@ -129,6 +131,7 @@ impl<S> ComputeSinkConnection<S> {
match self {
ComputeSinkConnection::Subscribe(_) => "subscribe",
ComputeSinkConnection::Persist(_) => "persist",
ComputeSinkConnection::ContinualTask(_) => "continual_task",
ComputeSinkConnection::CopyToS3Oneshot(_) => "copy_to_s3_oneshot",
}
}
Expand All @@ -150,6 +153,9 @@ impl RustType<ProtoComputeSinkConnection> for ComputeSinkConnection<CollectionMe
kind: Some(match self {
ComputeSinkConnection::Subscribe(_) => Kind::Subscribe(()),
ComputeSinkConnection::Persist(persist) => Kind::Persist(persist.into_proto()),
ComputeSinkConnection::ContinualTask(continual_task) => {
Kind::ContinualTask(continual_task.into_proto())
}
ComputeSinkConnection::CopyToS3Oneshot(s3) => {
Kind::CopyToS3Oneshot(s3.into_proto())
}
Expand All @@ -165,6 +171,9 @@ impl RustType<ProtoComputeSinkConnection> for ComputeSinkConnection<CollectionMe
Ok(match kind {
Kind::Subscribe(_) => ComputeSinkConnection::Subscribe(SubscribeSinkConnection {}),
Kind::Persist(persist) => ComputeSinkConnection::Persist(persist.into_rust()?),
Kind::ContinualTask(continual_task) => {
ComputeSinkConnection::ContinualTask(continual_task.into_rust()?)
}
Kind::CopyToS3Oneshot(s3) => ComputeSinkConnection::CopyToS3Oneshot(s3.into_rust()?),
})
}
Expand Down Expand Up @@ -243,3 +252,19 @@ impl RustType<ProtoPersistSinkConnection> for PersistSinkConnection<CollectionMe
})
}
}

#[allow(missing_docs)] // WIP
#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub struct ContinualTaskConnection<S> {
_phantom: std::marker::PhantomData<S>,
}

impl RustType<ProtoContinualTaskConnection> for ContinualTaskConnection<CollectionMetadata> {
fn into_proto(&self) -> ProtoContinualTaskConnection {
todo!("WIP");
}

fn from_proto(proto: ProtoContinualTaskConnection) -> Result<Self, TryFromProtoError> {
todo!("WIP {:?}", proto);
}
}
4 changes: 4 additions & 0 deletions src/compute/src/render/sinks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ where
let region_name = match sink.connection {
ComputeSinkConnection::Subscribe(_) => format!("SubscribeSink({:?})", sink_id),
ComputeSinkConnection::Persist(_) => format!("PersistSink({:?})", sink_id),
ComputeSinkConnection::ContinualTask(_) => {
format!("ContinualTask({:?})", sink_id)
}
ComputeSinkConnection::CopyToS3Oneshot(_) => {
format!("CopyToS3OneshotSink({:?})", sink_id)
}
Expand Down Expand Up @@ -178,6 +181,7 @@ where
match connection {
ComputeSinkConnection::Subscribe(connection) => Box::new(connection.clone()),
ComputeSinkConnection::Persist(connection) => Box::new(connection.clone()),
ComputeSinkConnection::ContinualTask(connection) => Box::new(connection.clone()),
ComputeSinkConnection::CopyToS3Oneshot(connection) => Box::new(connection.clone()),
}
}
1 change: 1 addition & 0 deletions src/compute/src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

pub(crate) mod continual_task;
mod copy_to_s3_oneshot;
mod correction;
mod persist_sink;
Expand Down
52 changes: 52 additions & 0 deletions src/compute/src/sink/continual_task.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::any::Any;
use std::rc::Rc;

use differential_dataflow::Collection;
use mz_compute_types::sinks::{ComputeSinkDesc, ContinualTaskConnection};
use mz_repr::{Diff, GlobalId, Row, Timestamp};
use mz_storage_types::controller::CollectionMetadata;
use mz_storage_types::errors::DataflowError;
use timely::dataflow::Scope;
use timely::progress::Antichain;

use crate::compute_state::ComputeState;
use crate::render::sinks::SinkRender;
use crate::render::StartSignal;

impl<G> SinkRender<G> for ContinualTaskConnection<CollectionMetadata>
where
G: Scope<Timestamp = Timestamp>,
{
fn render_sink(
&self,
compute_state: &mut ComputeState,
sink: &ComputeSinkDesc<CollectionMetadata>,
sink_id: GlobalId,
as_of: Antichain<Timestamp>,
start_signal: StartSignal,
oks: Collection<G, Row, Diff>,
errs: Collection<G, DataflowError, Diff>,
) -> Option<Rc<dyn Any>> {
todo!(
"WIP {:?}",
(
std::any::type_name_of_val(&compute_state),
sink,
sink_id,
as_of,
std::any::type_name_of_val(&start_signal),
std::any::type_name_of_val(&oks),
std::any::type_name_of_val(&errs),
)
);
}
}
1 change: 1 addition & 0 deletions src/environmentd/src/http/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1217,6 +1217,7 @@ async fn execute_stmt<S: ResultSender>(
| ExecuteResponse::CreatedView { .. }
| ExecuteResponse::CreatedViews { .. }
| ExecuteResponse::CreatedMaterializedView { .. }
| ExecuteResponse::CreatedContinualTask { .. }
| ExecuteResponse::CreatedType
| ExecuteResponse::Comment
| ExecuteResponse::Deleted(_)
Expand Down
1 change: 1 addition & 0 deletions src/pgwire/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1719,6 +1719,7 @@ where
| ExecuteResponse::CreatedIndex { .. }
| ExecuteResponse::CreatedIntrospectionSubscribe
| ExecuteResponse::CreatedMaterializedView { .. }
| ExecuteResponse::CreatedContinualTask { .. }
| ExecuteResponse::CreatedRole
| ExecuteResponse::CreatedSchema { .. }
| ExecuteResponse::CreatedSecret { .. }
Expand Down
22 changes: 16 additions & 6 deletions src/sql/src/normalize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ use mz_repr::{ColumnName, GlobalId};
use mz_sql_parser::ast::display::AstDisplay;
use mz_sql_parser::ast::visit_mut::{self, VisitMut};
use mz_sql_parser::ast::{
CreateConnectionStatement, CreateIndexStatement, CreateMaterializedViewStatement,
CreateSecretStatement, CreateSinkStatement, CreateSourceStatement, CreateSubsourceStatement,
CreateTableFromSourceStatement, CreateTableStatement, CreateTypeStatement, CreateViewStatement,
CreateWebhookSourceStatement, CteBlock, Function, FunctionArgs, Ident, IfExistsBehavior,
MutRecBlock, Op, Query, Statement, TableFactor, UnresolvedItemName, UnresolvedSchemaName,
Value, ViewDefinition,
CreateConnectionStatement, CreateContinualTaskStatement, CreateIndexStatement,
CreateMaterializedViewStatement, CreateSecretStatement, CreateSinkStatement,
CreateSourceStatement, CreateSubsourceStatement, CreateTableFromSourceStatement,
CreateTableStatement, CreateTypeStatement, CreateViewStatement, CreateWebhookSourceStatement,
CteBlock, Function, FunctionArgs, Ident, IfExistsBehavior, MutRecBlock, Op, Query, Statement,
TableFactor, UnresolvedItemName, UnresolvedSchemaName, Value, ViewDefinition,
};

use crate::names::{Aug, FullItemName, PartialItemName, PartialSchemaName, RawDatabaseSpecifier};
Expand Down Expand Up @@ -412,6 +412,16 @@ pub fn create_statement(
*if_exists = IfExistsBehavior::Error;
}

Statement::CreateContinualTask(CreateContinualTaskStatement {
name,
columns,
input,
stmts,
in_cluster,
}) => {
todo!("WIP {:?}", (name, columns, input, stmts, in_cluster));
}

Statement::CreateIndex(CreateIndexStatement {
name: _,
in_cluster: _,
Expand Down
6 changes: 6 additions & 0 deletions src/sql/src/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ pub enum Plan {
CreateTable(CreateTablePlan),
CreateView(CreateViewPlan),
CreateMaterializedView(CreateMaterializedViewPlan),
CreateContinualTask(CreateContinualTaskPlan),
CreateIndex(CreateIndexPlan),
CreateType(CreateTypePlan),
Comment(CommentPlan),
Expand Down Expand Up @@ -251,6 +252,7 @@ impl Plan {
StatementKind::CreateDatabase => &[PlanKind::CreateDatabase],
StatementKind::CreateIndex => &[PlanKind::CreateIndex],
StatementKind::CreateMaterializedView => &[PlanKind::CreateMaterializedView],
StatementKind::CreateContinualTask => &[PlanKind::CreateContinualTask],
StatementKind::CreateRole => &[PlanKind::CreateRole],
StatementKind::CreateSchema => &[PlanKind::CreateSchema],
StatementKind::CreateSecret => &[PlanKind::CreateSecret],
Expand Down Expand Up @@ -319,6 +321,7 @@ impl Plan {
Plan::CreateTable(_) => "create table",
Plan::CreateView(_) => "create view",
Plan::CreateMaterializedView(_) => "create materialized view",
Plan::CreateContinualTask(_) => "create continual task",
Plan::CreateIndex(_) => "create index",
Plan::CreateType(_) => "create type",
Plan::Comment(_) => "comment",
Expand Down Expand Up @@ -696,6 +699,9 @@ pub struct CreateMaterializedViewPlan {
pub ambiguous_columns: bool,
}

#[derive(Debug, Clone)]
pub struct CreateContinualTaskPlan {}

#[derive(Debug, Clone)]
pub struct CreateIndexPlan {
pub name: QualifiedItemName,
Expand Down
Loading

0 comments on commit 74a4630

Please sign in to comment.