From 8424cd3164048cfafe10bdfae7f994203871e638 Mon Sep 17 00:00:00 2001 From: Parker Timmerman Date: Fri, 10 Jan 2025 12:22:26 -0500 Subject: [PATCH] start, actually treat CREATE TABLE ... FROM WEBHOOK as a table * changes in planning to use the Table datatype * update tests to assert previously missing issues --- .../materialize/checks/all_checks/webhook.py | 4 +- src/adapter/src/catalog/state.rs | 36 ++++-- src/adapter/src/coord/command_handler.rs | 93 ++++++++------- src/adapter/src/coord/consistency.rs | 11 +- src/adapter/src/coord/sequencer/inner.rs | 109 +++++++++++------- src/catalog/src/memory/objects.rs | 23 ++-- src/sql/src/plan.rs | 10 +- src/sql/src/plan/statement/ddl.rs | 62 +++++++--- test/testdrive/webhook.td | 21 ++++ 9 files changed, 251 insertions(+), 118 deletions(-) diff --git a/misc/python/materialize/checks/all_checks/webhook.py b/misc/python/materialize/checks/all_checks/webhook.py index 06a7f926105ad..623096dfdb54b 100644 --- a/misc/python/materialize/checks/all_checks/webhook.py +++ b/misc/python/materialize/checks/all_checks/webhook.py @@ -125,7 +125,7 @@ def validate(self) -> Testdrive: class WebhookTable(Check): def _can_run(self, e: Executor) -> bool: - return self.base_version >= MzVersion.parse_mz("v0.128.0-dev") + return self.base_version >= MzVersion.parse_mz("v0.130.0-dev") def initialize(self) -> Testdrive: return Testdrive( @@ -164,7 +164,7 @@ def validate(self) -> Testdrive: anotha_one! threeeeeee - > SHOW CREATE SOURCE webhook_table_text + > SHOW CREATE TABLE webhook_table_text materialize.public.webhook_table_text "CREATE TABLE \\"materialize\\".\\"public\\".\\"webhook_table_text\\" FROM WEBHOOK BODY FORMAT TEXT" """ ) diff --git a/src/adapter/src/catalog/state.rs b/src/adapter/src/catalog/state.rs index 343f430598f9d..23d34cfc233d8 100644 --- a/src/adapter/src/catalog/state.rs +++ b/src/adapter/src/catalog/state.rs @@ -1071,6 +1071,21 @@ impl CatalogState { }, timeline, }, + mz_sql::plan::DataSourceDesc::Webhook { + validate_using, + body_format, + headers, + cluster_id, + } => TableDataSource::DataSource { + desc: DataSourceDesc::Webhook { + validate_using, + body_format, + headers, + cluster_id: cluster_id + .expect("Webhook Tables must have a cluster_id set"), + }, + timeline, + }, _ => { return Err(( AdapterError::Unstructured(anyhow::anyhow!( @@ -1123,13 +1138,20 @@ impl CatalogState { validate_using, body_format, headers, - } => DataSourceDesc::Webhook { - validate_using, - body_format, - headers, - cluster_id: in_cluster - .expect("webhook sources must use an existing cluster"), - }, + cluster_id, + } => { + mz_ore::soft_assert_or_log!( + cluster_id.is_none(), + "cluster_id set at Source level for Webhooks" + ); + DataSourceDesc::Webhook { + validate_using, + body_format, + headers, + cluster_id: in_cluster + .expect("webhook sources must use an existing cluster"), + } + } }, desc: source.desc, global_id, diff --git a/src/adapter/src/coord/command_handler.rs b/src/adapter/src/coord/command_handler.rs index 9ef5b8c4baf2b..4a33d05308d6f 100644 --- a/src/adapter/src/coord/command_handler.rs +++ b/src/adapter/src/coord/command_handler.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use futures::future::LocalBoxFuture; use futures::FutureExt; use mz_adapter_types::connection::{ConnectionId, ConnectionIdType}; -use mz_catalog::memory::objects::{CatalogItem, DataSourceDesc, Source}; +use mz_catalog::memory::objects::{CatalogItem, DataSourceDesc, Source, Table, TableDataSource}; use mz_catalog::SYSTEM_CONN_ID; use mz_ore::task; use mz_ore::tracing::OpenTelemetryContext; @@ -1317,51 +1317,66 @@ impl Coordinator { return Err(name); }; - let (body_format, header_tys, validator, global_id) = match entry.item() { + // Webhooks can be created with `CREATE SOURCE` or `CREATE TABLE`. + let (data_source, desc, global_id) = match entry.item() { CatalogItem::Source(Source { - data_source: - DataSourceDesc::Webhook { - validate_using, - body_format, - headers, - .. - }, + data_source: data_source @ DataSourceDesc::Webhook { .. }, desc, global_id, .. - }) => { - // Assert we have one column for the body, and how ever many are required for - // the headers. - let num_columns = headers.num_columns() + 1; - mz_ore::soft_assert_or_log!( - desc.arity() <= num_columns, - "expected at most {} columns, but got {}", - num_columns, - desc.arity() - ); - - // Double check that the body column of the webhook source matches the type - // we're about to deserialize as. - let body_column = desc - .get_by_name(&"body".into()) - .map(|(_idx, ty)| ty.clone()) - .ok_or(name.clone())?; - assert!(!body_column.nullable, "webhook body column is nullable!?"); - assert_eq!(body_column.scalar_type, ScalarType::from(*body_format)); - - // Create a validator that can be called to validate a webhook request. - let validator = validate_using.as_ref().map(|v| { - let validation = v.clone(); - AppendWebhookValidator::new( - validation, - coord.caching_secrets_reader.clone(), - ) - }); - (*body_format, headers.clone(), validator, *global_id) - } + }) => (data_source, desc, *global_id), + CatalogItem::Table( + table @ Table { + desc, + data_source: + TableDataSource::DataSource { + desc: data_source @ DataSourceDesc::Webhook { .. }, + .. + }, + .. + }, + ) => (data_source, desc, table.global_id_writes()), _ => return Err(name), }; + let DataSourceDesc::Webhook { + validate_using, + body_format, + headers, + .. + } = data_source + else { + mz_ore::soft_panic_or_log!("programming error! checked above for webhook"); + return Err(name); + }; + let body_format = body_format.clone(); + let header_tys = headers.clone(); + + // Assert we have one column for the body, and how ever many are required for + // the headers. + let num_columns = headers.num_columns() + 1; + mz_ore::soft_assert_or_log!( + desc.arity() <= num_columns, + "expected at most {} columns, but got {}", + num_columns, + desc.arity() + ); + + // Double check that the body column of the webhook source matches the type + // we're about to deserialize as. + let body_column = desc + .get_by_name(&"body".into()) + .map(|(_idx, ty)| ty.clone()) + .ok_or(name.clone())?; + assert!(!body_column.nullable, "webhook body column is nullable!?"); + assert_eq!(body_column.scalar_type, ScalarType::from(body_format)); + + // Create a validator that can be called to validate a webhook request. + let validator = validate_using.as_ref().map(|v| { + let validation = v.clone(); + AppendWebhookValidator::new(validation, coord.caching_secrets_reader.clone()) + }); + // Get a channel so we can queue updates to be written. let row_tx = coord .controller diff --git a/src/adapter/src/coord/consistency.rs b/src/adapter/src/coord/consistency.rs index 4f5b7b1858874..9235c5e8abcef 100644 --- a/src/adapter/src/coord/consistency.rs +++ b/src/adapter/src/coord/consistency.rs @@ -12,7 +12,7 @@ use super::Coordinator; use crate::catalog::consistency::CatalogInconsistencies; use mz_adapter_types::connection::ConnectionIdType; -use mz_catalog::memory::objects::{CatalogItem, DataSourceDesc, Source}; +use mz_catalog::memory::objects::{CatalogItem, DataSourceDesc, Source, Table, TableDataSource}; use mz_controller_types::{ClusterId, ReplicaId}; use mz_ore::instrument; use mz_repr::{CatalogItemId, GlobalId}; @@ -117,8 +117,13 @@ impl Coordinator { .try_get_entry(id) .map(|entry| entry.item()) .and_then(|item| { - let CatalogItem::Source(Source { data_source, .. }) = &item else { - return None; + let data_source = match &item { + CatalogItem::Source(Source { data_source, .. }) => data_source, + CatalogItem::Table(Table { + data_source: TableDataSource::DataSource { desc, .. }, + .. + }) => desc, + _ => return None, }; Some(matches!(data_source, DataSourceDesc::Webhook { .. })) }) diff --git a/src/adapter/src/coord/sequencer/inner.rs b/src/adapter/src/coord/sequencer/inner.rs index d2afff2f0575a..c442721f67426 100644 --- a/src/adapter/src/coord/sequencer/inner.rs +++ b/src/adapter/src/coord/sequencer/inner.rs @@ -51,6 +51,7 @@ use mz_sql::names::{ }; use mz_sql::plan::{ConnectionDetails, NetworkPolicyRule, StatementContext}; use mz_sql::pure::{generate_subsource_statements, PurifiedSourceExport}; +use mz_storage_types::instances::StorageInstanceId; use mz_storage_types::sinks::StorageSinkDesc; use timely::progress::Timestamp as TimelyTimestamp; // Import `plan` module, but only import select elements to avoid merge conflicts on use statements. @@ -1048,6 +1049,20 @@ impl Coordinator { }, timeline, }, + plan::DataSourceDesc::Webhook { + validate_using, + body_format, + headers, + cluster_id, + } => TableDataSource::DataSource { + desc: DataSourceDesc::Webhook { + validate_using, + body_format, + headers, + cluster_id: cluster_id.expect("Webhook Tables must have cluster_id set"), + }, + timeline, + }, o => { unreachable!("CREATE TABLE data source got {:?}", o) } @@ -1075,7 +1090,7 @@ impl Coordinator { // The table data_source determines whether this table will be written to // by environmentd (e.g. with INSERT INTO statements) or by the storage layer // (e.g. a source-fed table). - match table.data_source { + let (collections, register_ts, read_policies) = match table.data_source { TableDataSource::TableWrites { defaults: _ } => { // Determine the initial validity for the table. let register_ts = coord.get_local_write_ts().await.timestamp; @@ -1097,27 +1112,15 @@ impl Coordinator { } let collection_desc = CollectionDescription::for_table(table.desc.clone()); - let storage_metadata = coord.catalog.state().storage_metadata(); - coord - .controller - .storage - .create_collections( - storage_metadata, - Some(register_ts), - vec![(global_id, collection_desc)], - ) - .await - .unwrap_or_terminate("cannot fail to create collections"); - coord.apply_local_write(register_ts).await; + let collections = vec![(global_id, collection_desc)]; - coord - .initialize_storage_read_policies( - btreeset![table_id], - table - .custom_logical_compaction_window - .unwrap_or(CompactionWindow::Default), - ) - .await; + let compaction_window = table + .custom_logical_compaction_window + .unwrap_or(CompactionWindow::Default); + let read_policies = + BTreeMap::from([(compaction_window, btreeset! { table_id })]); + + (collections, Some(register_ts), read_policies) } TableDataSource::DataSource { desc: data_source, @@ -1159,34 +1162,62 @@ impl Coordinator { status_collection_id, timeline: Some(timeline.clone()), }; - let storage_metadata = coord.catalog.state().storage_metadata(); - coord - .controller - .storage - .create_collections( - storage_metadata, - None, - vec![(global_id, collection_desc)], - ) - .await - .unwrap_or_terminate("cannot fail to create collections"); + let collections = vec![(global_id, collection_desc)]; let read_policies = coord .catalog() .state() .source_compaction_windows(vec![table_id]); - for (compaction_window, storage_policies) in read_policies { - coord - .initialize_storage_read_policies( - storage_policies, - compaction_window, - ) - .await; + + (collections, None, read_policies) + } + DataSourceDesc::Webhook { .. } => { + if let Some(url) = + coord.catalog().state().try_get_webhook_url(&table_id) + { + ctx.session() + .add_notice(AdapterNotice::WebhookSourceCreated { url }) } + + let collection_desc = CollectionDescription { + desc: table.desc.clone(), + data_source: DataSource::Webhook, + since: None, + status_collection_id: None, + timeline: Some(timeline.clone()), + }; + let collections = vec![(global_id, collection_desc)]; + let read_policies = coord + .catalog() + .state() + .source_compaction_windows(vec![table_id]); + + (collections, None, read_policies) } _ => unreachable!("CREATE TABLE data source got {:?}", data_source), } } + }; + + // Create the collections. + let storage_metadata = coord.catalog.state().storage_metadata(); + coord + .controller + .storage + .create_collections(storage_metadata, register_ts, collections) + .await + .unwrap_or_terminate("cannot fail to create collections"); + + // Mark the register timestamp as completed. + if let Some(register_ts) = register_ts { + coord.apply_local_write(register_ts).await; + } + + // Initialize the Read Policies. + for (compaction_window, storage_policies) in read_policies { + coord + .initialize_storage_read_policies(storage_policies, compaction_window) + .await; } }) .await; diff --git a/src/catalog/src/memory/objects.rs b/src/catalog/src/memory/objects.rs index 91d10e7686c7b..f3dd7fe72493d 100644 --- a/src/catalog/src/memory/objects.rs +++ b/src/catalog/src/memory/objects.rs @@ -1082,14 +1082,21 @@ impl Source { validate_using, body_format, headers, - } => DataSourceDesc::Webhook { - validate_using, - body_format, - headers, - cluster_id: plan - .in_cluster - .expect("webhook sources must be given a cluster ID"), - }, + cluster_id, + } => { + mz_ore::soft_assert_or_log!( + cluster_id.is_none(), + "cluster_id set at Source level for Webhooks" + ); + DataSourceDesc::Webhook { + validate_using, + body_format, + headers, + cluster_id: plan + .in_cluster + .expect("webhook sources must be given a cluster ID"), + } + } }, desc: plan.source.desc, global_id, diff --git a/src/sql/src/plan.rs b/src/sql/src/plan.rs index e8534788df346..29f70f77711ad 100644 --- a/src/sql/src/plan.rs +++ b/src/sql/src/plan.rs @@ -61,6 +61,7 @@ use mz_storage_types::connections::{ AwsPrivatelinkConnection, CsrConnection, KafkaConnection, MySqlConnection, PostgresConnection, SshConnection, }; +use mz_storage_types::instances::StorageInstanceId; use mz_storage_types::sinks::{ S3SinkFormat, SinkEnvelope, SinkPartitionStrategy, StorageSinkConnection, }; @@ -272,9 +273,10 @@ impl Plan { StatementKind::CreateSchema => &[PlanKind::CreateSchema], StatementKind::CreateSecret => &[PlanKind::CreateSecret], StatementKind::CreateSink => &[PlanKind::CreateSink], - StatementKind::CreateSource - | StatementKind::CreateSubsource - | StatementKind::CreateWebhookSource => &[PlanKind::CreateSource], + StatementKind::CreateSource | StatementKind::CreateSubsource => { + &[PlanKind::CreateSource] + } + StatementKind::CreateWebhookSource => &[PlanKind::CreateSource, PlanKind::CreateTable], StatementKind::CreateTable => &[PlanKind::CreateTable], StatementKind::CreateTableFromSource => &[PlanKind::CreateTable], StatementKind::CreateType => &[PlanKind::CreateType], @@ -1440,6 +1442,8 @@ pub enum DataSourceDesc { validate_using: Option, body_format: WebhookBodyFormat, headers: WebhookHeaders, + /// Only `Some` when created via `CREATE TABLE ... FROM WEBHOOK`. + cluster_id: Option, }, } diff --git a/src/sql/src/plan/statement/ddl.rs b/src/sql/src/plan/statement/ddl.rs index 1783266c53730..2d3f678e9cb4f 100644 --- a/src/sql/src/plan/statement/ddl.rs +++ b/src/sql/src/plan/statement/ddl.rs @@ -505,10 +505,9 @@ pub fn plan_create_webhook_source( body_format, include_headers, validate_using, + is_table, // We resolved `in_cluster` above, so we want to ignore it here. in_cluster: _, - // Whether or not we created a webhook as a source or table is irrelevant here. - is_table: _, } = stmt; let validate_using = validate_using @@ -630,22 +629,51 @@ pub fn plan_create_webhook_source( // such, we always use a default of EpochMilliseconds. let timeline = Timeline::EpochMilliseconds; - Ok(Plan::CreateSource(CreateSourcePlan { - name, - source: Source { - create_sql, - data_source: DataSourceDesc::Webhook { - validate_using, - body_format, - headers, + let plan = if is_table { + let data_source = DataSourceDesc::Webhook { + validate_using, + body_format, + headers, + cluster_id: Some(in_cluster), + }; + let data_source = TableDataSource::DataSource { + desc: data_source, + timeline, + }; + Plan::CreateTable(CreateTablePlan { + name, + if_not_exists, + table: Table { + create_sql, + desc, + temporary: false, + compaction_window: None, + data_source, }, - desc, - compaction_window: None, - }, - if_not_exists, - timeline, - in_cluster: Some(in_cluster), - })) + }) + } else { + let data_source = DataSourceDesc::Webhook { + validate_using, + body_format, + headers, + // Important: The cluster is set at the `Source` level. + cluster_id: None, + }; + Plan::CreateSource(CreateSourcePlan { + name, + source: Source { + create_sql, + data_source, + desc, + compaction_window: None, + }, + if_not_exists, + timeline, + in_cluster: Some(in_cluster), + }) + }; + + Ok(plan) } pub fn plan_create_source( diff --git a/test/testdrive/webhook.td b/test/testdrive/webhook.td index 36632500fe2c4..53a1ad3796178 100644 --- a/test/testdrive/webhook.td +++ b/test/testdrive/webhook.td @@ -23,6 +23,12 @@ name nullable type comment ------------------------------ body false text "" +> SHOW CREATE SOURCE webhook_text; +materialize.public.webhook_text "CREATE SOURCE \"materialize\".\"public\".\"webhook_text\" IN CLUSTER \"webhook_cluster\" FROM WEBHOOK BODY FORMAT TEXT" + +> SELECT name, type FROM mz_objects WHERE name = 'webhook_text'; +webhook_text source + $ webhook-append database=materialize schema=public name=webhook_text a @@ -548,3 +554,18 @@ aaa_1 > SELECT * FROM webhook_as_table; aaa_1 + +> SHOW CREATE TABLE webhook_as_table; +materialize.public.webhook_as_table "CREATE TABLE \"materialize\".\"public\".\"webhook_as_table\" FROM WEBHOOK BODY FORMAT TEXT" + +> SHOW COLUMNS FROM webhook_as_table; +name nullable type comment +------------------------------ +body false text "" + +> SHOW TABLES +not_a_webhook "" +webhook_as_table "" + +> SELECT name, type FROM mz_objects WHERE name = 'webhook_as_table'; +webhook_as_table table