Skip to content

Commit

Permalink
start, actually treat CREATE TABLE ... FROM WEBHOOK as a table
Browse files Browse the repository at this point in the history
* changes in planning to use the Table datatype
* update tests to assert previously missing issues
  • Loading branch information
ParkMyCar committed Jan 10, 2025
1 parent b0fdc53 commit 8424cd3
Show file tree
Hide file tree
Showing 9 changed files with 251 additions and 118 deletions.
4 changes: 2 additions & 2 deletions misc/python/materialize/checks/all_checks/webhook.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def validate(self) -> Testdrive:

class WebhookTable(Check):
def _can_run(self, e: Executor) -> bool:
return self.base_version >= MzVersion.parse_mz("v0.128.0-dev")
return self.base_version >= MzVersion.parse_mz("v0.130.0-dev")

def initialize(self) -> Testdrive:
return Testdrive(
Expand Down Expand Up @@ -164,7 +164,7 @@ def validate(self) -> Testdrive:
anotha_one!
threeeeeee
> SHOW CREATE SOURCE webhook_table_text
> SHOW CREATE TABLE webhook_table_text
materialize.public.webhook_table_text "CREATE TABLE \\"materialize\\".\\"public\\".\\"webhook_table_text\\" FROM WEBHOOK BODY FORMAT TEXT"
"""
)
Expand Down
36 changes: 29 additions & 7 deletions src/adapter/src/catalog/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1071,6 +1071,21 @@ impl CatalogState {
},
timeline,
},
mz_sql::plan::DataSourceDesc::Webhook {
validate_using,
body_format,
headers,
cluster_id,
} => TableDataSource::DataSource {
desc: DataSourceDesc::Webhook {
validate_using,
body_format,
headers,
cluster_id: cluster_id
.expect("Webhook Tables must have a cluster_id set"),
},
timeline,
},
_ => {
return Err((
AdapterError::Unstructured(anyhow::anyhow!(
Expand Down Expand Up @@ -1123,13 +1138,20 @@ impl CatalogState {
validate_using,
body_format,
headers,
} => DataSourceDesc::Webhook {
validate_using,
body_format,
headers,
cluster_id: in_cluster
.expect("webhook sources must use an existing cluster"),
},
cluster_id,
} => {
mz_ore::soft_assert_or_log!(
cluster_id.is_none(),
"cluster_id set at Source level for Webhooks"
);
DataSourceDesc::Webhook {
validate_using,
body_format,
headers,
cluster_id: in_cluster
.expect("webhook sources must use an existing cluster"),
}
}
},
desc: source.desc,
global_id,
Expand Down
93 changes: 54 additions & 39 deletions src/adapter/src/coord/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::sync::Arc;
use futures::future::LocalBoxFuture;
use futures::FutureExt;
use mz_adapter_types::connection::{ConnectionId, ConnectionIdType};
use mz_catalog::memory::objects::{CatalogItem, DataSourceDesc, Source};
use mz_catalog::memory::objects::{CatalogItem, DataSourceDesc, Source, Table, TableDataSource};
use mz_catalog::SYSTEM_CONN_ID;
use mz_ore::task;
use mz_ore::tracing::OpenTelemetryContext;
Expand Down Expand Up @@ -1317,51 +1317,66 @@ impl Coordinator {
return Err(name);
};

let (body_format, header_tys, validator, global_id) = match entry.item() {
// Webhooks can be created with `CREATE SOURCE` or `CREATE TABLE`.
let (data_source, desc, global_id) = match entry.item() {
CatalogItem::Source(Source {
data_source:
DataSourceDesc::Webhook {
validate_using,
body_format,
headers,
..
},
data_source: data_source @ DataSourceDesc::Webhook { .. },
desc,
global_id,
..
}) => {
// Assert we have one column for the body, and how ever many are required for
// the headers.
let num_columns = headers.num_columns() + 1;
mz_ore::soft_assert_or_log!(
desc.arity() <= num_columns,
"expected at most {} columns, but got {}",
num_columns,
desc.arity()
);

// Double check that the body column of the webhook source matches the type
// we're about to deserialize as.
let body_column = desc
.get_by_name(&"body".into())
.map(|(_idx, ty)| ty.clone())
.ok_or(name.clone())?;
assert!(!body_column.nullable, "webhook body column is nullable!?");
assert_eq!(body_column.scalar_type, ScalarType::from(*body_format));

// Create a validator that can be called to validate a webhook request.
let validator = validate_using.as_ref().map(|v| {
let validation = v.clone();
AppendWebhookValidator::new(
validation,
coord.caching_secrets_reader.clone(),
)
});
(*body_format, headers.clone(), validator, *global_id)
}
}) => (data_source, desc, *global_id),
CatalogItem::Table(
table @ Table {
desc,
data_source:
TableDataSource::DataSource {
desc: data_source @ DataSourceDesc::Webhook { .. },
..
},
..
},
) => (data_source, desc, table.global_id_writes()),
_ => return Err(name),
};

let DataSourceDesc::Webhook {
validate_using,
body_format,
headers,
..
} = data_source
else {
mz_ore::soft_panic_or_log!("programming error! checked above for webhook");
return Err(name);
};
let body_format = body_format.clone();
let header_tys = headers.clone();

// Assert we have one column for the body, and how ever many are required for
// the headers.
let num_columns = headers.num_columns() + 1;
mz_ore::soft_assert_or_log!(
desc.arity() <= num_columns,
"expected at most {} columns, but got {}",
num_columns,
desc.arity()
);

// Double check that the body column of the webhook source matches the type
// we're about to deserialize as.
let body_column = desc
.get_by_name(&"body".into())
.map(|(_idx, ty)| ty.clone())
.ok_or(name.clone())?;
assert!(!body_column.nullable, "webhook body column is nullable!?");
assert_eq!(body_column.scalar_type, ScalarType::from(body_format));

// Create a validator that can be called to validate a webhook request.
let validator = validate_using.as_ref().map(|v| {
let validation = v.clone();
AppendWebhookValidator::new(validation, coord.caching_secrets_reader.clone())
});

// Get a channel so we can queue updates to be written.
let row_tx = coord
.controller
Expand Down
11 changes: 8 additions & 3 deletions src/adapter/src/coord/consistency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
use super::Coordinator;
use crate::catalog::consistency::CatalogInconsistencies;
use mz_adapter_types::connection::ConnectionIdType;
use mz_catalog::memory::objects::{CatalogItem, DataSourceDesc, Source};
use mz_catalog::memory::objects::{CatalogItem, DataSourceDesc, Source, Table, TableDataSource};
use mz_controller_types::{ClusterId, ReplicaId};
use mz_ore::instrument;
use mz_repr::{CatalogItemId, GlobalId};
Expand Down Expand Up @@ -117,8 +117,13 @@ impl Coordinator {
.try_get_entry(id)
.map(|entry| entry.item())
.and_then(|item| {
let CatalogItem::Source(Source { data_source, .. }) = &item else {
return None;
let data_source = match &item {
CatalogItem::Source(Source { data_source, .. }) => data_source,
CatalogItem::Table(Table {
data_source: TableDataSource::DataSource { desc, .. },
..
}) => desc,
_ => return None,
};
Some(matches!(data_source, DataSourceDesc::Webhook { .. }))
})
Expand Down
109 changes: 70 additions & 39 deletions src/adapter/src/coord/sequencer/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use mz_sql::names::{
};
use mz_sql::plan::{ConnectionDetails, NetworkPolicyRule, StatementContext};
use mz_sql::pure::{generate_subsource_statements, PurifiedSourceExport};
use mz_storage_types::instances::StorageInstanceId;
use mz_storage_types::sinks::StorageSinkDesc;
use timely::progress::Timestamp as TimelyTimestamp;
// Import `plan` module, but only import select elements to avoid merge conflicts on use statements.
Expand Down Expand Up @@ -1048,6 +1049,20 @@ impl Coordinator {
},
timeline,
},
plan::DataSourceDesc::Webhook {
validate_using,
body_format,
headers,
cluster_id,
} => TableDataSource::DataSource {
desc: DataSourceDesc::Webhook {
validate_using,
body_format,
headers,
cluster_id: cluster_id.expect("Webhook Tables must have cluster_id set"),
},
timeline,
},
o => {
unreachable!("CREATE TABLE data source got {:?}", o)
}
Expand Down Expand Up @@ -1075,7 +1090,7 @@ impl Coordinator {
// The table data_source determines whether this table will be written to
// by environmentd (e.g. with INSERT INTO statements) or by the storage layer
// (e.g. a source-fed table).
match table.data_source {
let (collections, register_ts, read_policies) = match table.data_source {
TableDataSource::TableWrites { defaults: _ } => {
// Determine the initial validity for the table.
let register_ts = coord.get_local_write_ts().await.timestamp;
Expand All @@ -1097,27 +1112,15 @@ impl Coordinator {
}

let collection_desc = CollectionDescription::for_table(table.desc.clone());
let storage_metadata = coord.catalog.state().storage_metadata();
coord
.controller
.storage
.create_collections(
storage_metadata,
Some(register_ts),
vec![(global_id, collection_desc)],
)
.await
.unwrap_or_terminate("cannot fail to create collections");
coord.apply_local_write(register_ts).await;
let collections = vec![(global_id, collection_desc)];

coord
.initialize_storage_read_policies(
btreeset![table_id],
table
.custom_logical_compaction_window
.unwrap_or(CompactionWindow::Default),
)
.await;
let compaction_window = table
.custom_logical_compaction_window
.unwrap_or(CompactionWindow::Default);
let read_policies =
BTreeMap::from([(compaction_window, btreeset! { table_id })]);

(collections, Some(register_ts), read_policies)
}
TableDataSource::DataSource {
desc: data_source,
Expand Down Expand Up @@ -1159,34 +1162,62 @@ impl Coordinator {
status_collection_id,
timeline: Some(timeline.clone()),
};
let storage_metadata = coord.catalog.state().storage_metadata();
coord
.controller
.storage
.create_collections(
storage_metadata,
None,
vec![(global_id, collection_desc)],
)
.await
.unwrap_or_terminate("cannot fail to create collections");

let collections = vec![(global_id, collection_desc)];
let read_policies = coord
.catalog()
.state()
.source_compaction_windows(vec![table_id]);
for (compaction_window, storage_policies) in read_policies {
coord
.initialize_storage_read_policies(
storage_policies,
compaction_window,
)
.await;

(collections, None, read_policies)
}
DataSourceDesc::Webhook { .. } => {
if let Some(url) =
coord.catalog().state().try_get_webhook_url(&table_id)
{
ctx.session()
.add_notice(AdapterNotice::WebhookSourceCreated { url })
}

let collection_desc = CollectionDescription {
desc: table.desc.clone(),
data_source: DataSource::Webhook,
since: None,
status_collection_id: None,
timeline: Some(timeline.clone()),
};
let collections = vec![(global_id, collection_desc)];
let read_policies = coord
.catalog()
.state()
.source_compaction_windows(vec![table_id]);

(collections, None, read_policies)
}
_ => unreachable!("CREATE TABLE data source got {:?}", data_source),
}
}
};

// Create the collections.
let storage_metadata = coord.catalog.state().storage_metadata();
coord
.controller
.storage
.create_collections(storage_metadata, register_ts, collections)
.await
.unwrap_or_terminate("cannot fail to create collections");

// Mark the register timestamp as completed.
if let Some(register_ts) = register_ts {
coord.apply_local_write(register_ts).await;
}

// Initialize the Read Policies.
for (compaction_window, storage_policies) in read_policies {
coord
.initialize_storage_read_policies(storage_policies, compaction_window)
.await;
}
})
.await;
Expand Down
23 changes: 15 additions & 8 deletions src/catalog/src/memory/objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1082,14 +1082,21 @@ impl Source {
validate_using,
body_format,
headers,
} => DataSourceDesc::Webhook {
validate_using,
body_format,
headers,
cluster_id: plan
.in_cluster
.expect("webhook sources must be given a cluster ID"),
},
cluster_id,
} => {
mz_ore::soft_assert_or_log!(
cluster_id.is_none(),
"cluster_id set at Source level for Webhooks"
);
DataSourceDesc::Webhook {
validate_using,
body_format,
headers,
cluster_id: plan
.in_cluster
.expect("webhook sources must be given a cluster ID"),
}
}
},
desc: plan.source.desc,
global_id,
Expand Down
Loading

0 comments on commit 8424cd3

Please sign in to comment.