From 1e4ee254ac5cd630bb5274a339d93be2692f09dd Mon Sep 17 00:00:00 2001 From: Roshan Jobanputra Date: Tue, 15 Oct 2024 12:57:25 -0400 Subject: [PATCH] storage/sources: Allow user-specified column names in CREATE TABLE FROM SOURCE statements --- src/sql-parser/src/ast/defs/statement.rs | 34 +++++++-- src/sql-parser/src/parser.rs | 89 +++++++++++++++++++++++- src/sql-parser/tests/testdata/ddl | 24 ++++--- src/sql/src/normalize.rs | 10 +-- src/sql/src/plan/statement/ddl.rs | 35 ++++++---- src/sql/src/pure.rs | 40 ++++++++--- test/testdrive/source-tables.td | 15 ++++ 7 files changed, 207 insertions(+), 40 deletions(-) diff --git a/src/sql-parser/src/ast/defs/statement.rs b/src/sql-parser/src/ast/defs/statement.rs index a52c8b7f24d53..15b99aa2df7ba 100644 --- a/src/sql-parser/src/ast/defs/statement.rs +++ b/src/sql-parser/src/ast/defs/statement.rs @@ -1614,12 +1614,30 @@ pub struct TableFromSourceOption { } impl_display_for_with_option!(TableFromSourceOption); +/// `CREATE TABLE .. FROM SOURCE` columns specification +/// can have 3 states: +/// Before purification they can be `NotSpecified` or `Named` +/// by the user to specify the column names to use. +/// After purification they can be in any of the 3 states. +/// For some source types we define the columns during purification +/// and for others the columns are defined during planning based +/// on the encoding option of the source. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub enum TableFromSourceColumns { + /// The user did not specify which columns to use. + NotSpecified, + /// The user requested the named columns. Only compatible + /// with source types that allow user-specified column names. + Named(Vec), + /// Columns defined during purification for some source types. + Defined(Vec>), +} + /// `CREATE TABLE .. FROM SOURCE` #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct CreateTableFromSourceStatement { - /// Table name pub name: UnresolvedItemName, - pub columns: Vec>, + pub columns: TableFromSourceColumns, pub constraints: Vec>, pub if_not_exists: bool, pub source: T::ItemName, @@ -1649,10 +1667,18 @@ impl AstDisplay for CreateTableFromSourceStatement { f.write_str("IF NOT EXISTS "); } f.write_node(name); - if !columns.is_empty() || !constraints.is_empty() { + if !matches!(columns, TableFromSourceColumns::NotSpecified) || !constraints.is_empty() { f.write_str(" ("); - f.write_node(&display::comma_separated(columns)); + match columns { + TableFromSourceColumns::NotSpecified => unreachable!(), + TableFromSourceColumns::Named(columns) => { + f.write_node(&display::comma_separated(columns)) + } + TableFromSourceColumns::Defined(columns) => { + f.write_node(&display::comma_separated(columns)) + } + }; if !constraints.is_empty() { f.write_str(", "); f.write_node(&display::comma_separated(constraints)); diff --git a/src/sql-parser/src/parser.rs b/src/sql-parser/src/parser.rs index 31a6f0adcc3e9..6eef2fab3a9ec 100644 --- a/src/sql-parser/src/parser.rs +++ b/src/sql-parser/src/parser.rs @@ -4476,9 +4476,7 @@ impl<'a> Parser<'a> { self.expect_keyword(TABLE)?; let if_not_exists = self.parse_if_not_exists()?; let table_name = self.parse_item_name()?; - // Columns are not specified by users, but are populated in this - // statement after purification, so Optional is used. - let (columns, constraints) = self.parse_columns(Optional)?; + let (columns, constraints) = self.parse_table_from_source_columns()?; self.expect_keywords(&[FROM, SOURCE])?; @@ -4575,6 +4573,91 @@ impl<'a> Parser<'a> { Ok(option) } + fn parse_table_from_source_columns( + &mut self, + ) -> Result<(TableFromSourceColumns, Vec>), ParserError> { + let mut constraints = vec![]; + + if !self.consume_token(&Token::LParen) { + return Ok((TableFromSourceColumns::NotSpecified, constraints)); + } + if self.consume_token(&Token::RParen) { + // Tables with zero columns are a PostgreSQL extension. + return Ok((TableFromSourceColumns::NotSpecified, constraints)); + } + + let mut column_names = vec![]; + let mut column_defs = vec![]; + loop { + if let Some(constraint) = self.parse_optional_table_constraint()? { + constraints.push(constraint); + } else if let Some(column_name) = self.consume_identifier()? { + let next_token = self.peek_token(); + match next_token { + Some(Token::Comma) | Some(Token::RParen) => { + column_names.push(column_name); + } + _ => { + let data_type = self.parse_data_type()?; + let collation = if self.parse_keyword(COLLATE) { + Some(self.parse_item_name()?) + } else { + None + }; + let mut options = vec![]; + loop { + match self.peek_token() { + None | Some(Token::Comma) | Some(Token::RParen) => break, + _ => options.push(self.parse_column_option_def()?), + } + } + + column_defs.push(ColumnDef { + name: column_name, + data_type, + collation, + options, + }); + } + } + } else { + return self.expected( + self.peek_pos(), + "column name or constraint definition", + self.peek_token(), + ); + } + if self.consume_token(&Token::Comma) { + // Continue. + } else if self.consume_token(&Token::RParen) { + break; + } else { + return self.expected( + self.peek_pos(), + "',' or ')' after column definition", + self.peek_token(), + ); + } + } + if !column_defs.is_empty() && !column_names.is_empty() { + return parser_err!( + self, + self.peek_prev_pos(), + "cannot mix column definitions and column names" + ); + } + + let columns = match column_defs.is_empty() { + true => match column_names.is_empty() { + true => TableFromSourceColumns::NotSpecified, + false => TableFromSourceColumns::Named(column_names), + }, + false => TableFromSourceColumns::Defined(column_defs), + }; + + Ok((columns, constraints)) + } + fn parse_columns( &mut self, optional: IsOptional, diff --git a/src/sql-parser/tests/testdata/ddl b/src/sql-parser/tests/testdata/ddl index d8bfa8d45ff47..4145f0442f564 100644 --- a/src/sql-parser/tests/testdata/ddl +++ b/src/sql-parser/tests/testdata/ddl @@ -253,21 +253,29 @@ CREATE TABLE t (c int4, d int4) FROM SOURCE foo (REFERENCE bar) ---- CREATE TABLE t (c int4, d int4) FROM SOURCE foo (REFERENCE = bar) => -CreateTableFromSource(CreateTableFromSourceStatement { name: UnresolvedItemName([Ident("t")]), columns: [ColumnDef { name: Ident("c"), data_type: Other { name: Name(UnresolvedItemName([Ident("int4")])), typ_mod: [] }, collation: None, options: [] }, ColumnDef { name: Ident("d"), data_type: Other { name: Name(UnresolvedItemName([Ident("int4")])), typ_mod: [] }, collation: None, options: [] }], constraints: [], if_not_exists: false, source: Name(UnresolvedItemName([Ident("foo")])), external_reference: Some(UnresolvedItemName([Ident("bar")])), with_options: [], include_metadata: [], format: None, envelope: None }) +CreateTableFromSource(CreateTableFromSourceStatement { name: UnresolvedItemName([Ident("t")]), columns: Defined([ColumnDef { name: Ident("c"), data_type: Other { name: Name(UnresolvedItemName([Ident("int4")])), typ_mod: [] }, collation: None, options: [] }, ColumnDef { name: Ident("d"), data_type: Other { name: Name(UnresolvedItemName([Ident("int4")])), typ_mod: [] }, collation: None, options: [] }]), constraints: [], if_not_exists: false, source: Name(UnresolvedItemName([Ident("foo")])), external_reference: Some(UnresolvedItemName([Ident("bar")])), with_options: [], include_metadata: [], format: None, envelope: None }) parse-statement CREATE TABLE t (c, d) FROM SOURCE foo (REFERENCE bar) ---- -error: Expected a data type name, found comma -CREATE TABLE t (c, d) FROM SOURCE foo (REFERENCE bar) - ^ +CREATE TABLE t (c, d) FROM SOURCE foo (REFERENCE = bar) +=> +CreateTableFromSource(CreateTableFromSourceStatement { name: UnresolvedItemName([Ident("t")]), columns: Named([Ident("c"), Ident("d")]), constraints: [], if_not_exists: false, source: Name(UnresolvedItemName([Ident("foo")])), external_reference: Some(UnresolvedItemName([Ident("bar")])), with_options: [], include_metadata: [], format: None, envelope: None }) + + +parse-statement +CREATE TABLE t (c, d int4) FROM SOURCE foo (REFERENCE bar) +---- +error: cannot mix column definitions and column names +CREATE TABLE t (c, d int4) FROM SOURCE foo (REFERENCE bar) + ^ parse-statement CREATE TABLE t FROM SOURCE foo ---- CREATE TABLE t FROM SOURCE foo => -CreateTableFromSource(CreateTableFromSourceStatement { name: UnresolvedItemName([Ident("t")]), columns: [], constraints: [], if_not_exists: false, source: Name(UnresolvedItemName([Ident("foo")])), external_reference: None, with_options: [], include_metadata: [], format: None, envelope: None }) +CreateTableFromSource(CreateTableFromSourceStatement { name: UnresolvedItemName([Ident("t")]), columns: NotSpecified, constraints: [], if_not_exists: false, source: Name(UnresolvedItemName([Ident("foo")])), external_reference: None, with_options: [], include_metadata: [], format: None, envelope: None }) parse-statement CREATE TABLE t FROM SOURCE foo (OPTION) @@ -281,21 +289,21 @@ CREATE TABLE t FROM SOURCE foo (REFERENCE = baz) ---- CREATE TABLE t FROM SOURCE foo (REFERENCE = baz) => -CreateTableFromSource(CreateTableFromSourceStatement { name: UnresolvedItemName([Ident("t")]), columns: [], constraints: [], if_not_exists: false, source: Name(UnresolvedItemName([Ident("foo")])), external_reference: Some(UnresolvedItemName([Ident("baz")])), with_options: [], include_metadata: [], format: None, envelope: None }) +CreateTableFromSource(CreateTableFromSourceStatement { name: UnresolvedItemName([Ident("t")]), columns: NotSpecified, constraints: [], if_not_exists: false, source: Name(UnresolvedItemName([Ident("foo")])), external_reference: Some(UnresolvedItemName([Ident("baz")])), with_options: [], include_metadata: [], format: None, envelope: None }) parse-statement CREATE TABLE t FROM SOURCE foo (REFERENCE = baz) WITH (TEXT COLUMNS (bam)) ---- CREATE TABLE t FROM SOURCE foo (REFERENCE = baz) WITH (TEXT COLUMNS = (bam)) => -CreateTableFromSource(CreateTableFromSourceStatement { name: UnresolvedItemName([Ident("t")]), columns: [], constraints: [], if_not_exists: false, source: Name(UnresolvedItemName([Ident("foo")])), external_reference: Some(UnresolvedItemName([Ident("baz")])), with_options: [TableFromSourceOption { name: TextColumns, value: Some(Sequence([Ident(Ident("bam"))])) }], include_metadata: [], format: None, envelope: None }) +CreateTableFromSource(CreateTableFromSourceStatement { name: UnresolvedItemName([Ident("t")]), columns: NotSpecified, constraints: [], if_not_exists: false, source: Name(UnresolvedItemName([Ident("foo")])), external_reference: Some(UnresolvedItemName([Ident("baz")])), with_options: [TableFromSourceOption { name: TextColumns, value: Some(Sequence([Ident(Ident("bam"))])) }], include_metadata: [], format: None, envelope: None }) parse-statement CREATE TABLE t FROM SOURCE foo (REFERENCE = baz) FORMAT PROTOBUF USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn SEED KEY SCHEMA '{"some": "seed"}' MESSAGE 'Batch' VALUE SCHEMA '123' MESSAGE 'M' ENVELOPE UPSERT ---- CREATE TABLE t FROM SOURCE foo (REFERENCE = baz) FORMAT PROTOBUF USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn SEED KEY SCHEMA '{"some": "seed"}' MESSAGE 'Batch' VALUE SCHEMA '123' MESSAGE 'M' ENVELOPE UPSERT => -CreateTableFromSource(CreateTableFromSourceStatement { name: UnresolvedItemName([Ident("t")]), columns: [], constraints: [], if_not_exists: false, source: Name(UnresolvedItemName([Ident("foo")])), external_reference: Some(UnresolvedItemName([Ident("baz")])), with_options: [], include_metadata: [], format: Some(Bare(Protobuf(Csr { csr_connection: CsrConnectionProtobuf { connection: CsrConnection { connection: Name(UnresolvedItemName([Ident("csr_conn")])), options: [] }, seed: Some(CsrSeedProtobuf { key: Some(CsrSeedProtobufSchema { schema: "{\"some\": \"seed\"}", message_name: "Batch" }), value: CsrSeedProtobufSchema { schema: "123", message_name: "M" } }) } }))), envelope: Some(Upsert { value_decode_err_policy: [] }) }) +CreateTableFromSource(CreateTableFromSourceStatement { name: UnresolvedItemName([Ident("t")]), columns: NotSpecified, constraints: [], if_not_exists: false, source: Name(UnresolvedItemName([Ident("foo")])), external_reference: Some(UnresolvedItemName([Ident("baz")])), with_options: [], include_metadata: [], format: Some(Bare(Protobuf(Csr { csr_connection: CsrConnectionProtobuf { connection: CsrConnection { connection: Name(UnresolvedItemName([Ident("csr_conn")])), options: [] }, seed: Some(CsrSeedProtobuf { key: Some(CsrSeedProtobufSchema { schema: "{\"some\": \"seed\"}", message_name: "Batch" }), value: CsrSeedProtobufSchema { schema: "123", message_name: "M" } }) } }))), envelope: Some(Upsert { value_decode_err_policy: [] }) }) parse-statement CREATE DATABASE IF EXISTS foo diff --git a/src/sql/src/normalize.rs b/src/sql/src/normalize.rs index 9f29349794166..dfd264b8c5458 100644 --- a/src/sql/src/normalize.rs +++ b/src/sql/src/normalize.rs @@ -26,8 +26,8 @@ use mz_sql_parser::ast::{ CreateSinkStatement, CreateSourceStatement, CreateSubsourceStatement, CreateTableFromSourceStatement, CreateTableStatement, CreateTypeStatement, CreateViewStatement, CreateWebhookSourceStatement, CteBlock, Function, FunctionArgs, Ident, IfExistsBehavior, - MutRecBlock, Op, Query, Statement, TableFactor, UnresolvedItemName, UnresolvedSchemaName, - Value, ViewDefinition, + MutRecBlock, Op, Query, Statement, TableFactor, TableFromSourceColumns, UnresolvedItemName, + UnresolvedSchemaName, Value, ViewDefinition, }; use crate::names::{Aug, FullItemName, PartialItemName, PartialSchemaName, RawDatabaseSpecifier}; @@ -312,8 +312,10 @@ pub fn create_statement( }) => { *name = allocate_name(name)?; let mut normalizer = QueryNormalizer::new(); - for c in columns { - normalizer.visit_column_def_mut(c); + if let TableFromSourceColumns::Defined(columns) = columns { + for c in columns { + normalizer.visit_column_def_mut(c); + } } if let Some(err) = normalizer.err { return Err(err); diff --git a/src/sql/src/plan/statement/ddl.rs b/src/sql/src/plan/statement/ddl.rs index 30e790b168ed4..bd07a04ca0a83 100644 --- a/src/sql/src/plan/statement/ddl.rs +++ b/src/sql/src/plan/statement/ddl.rs @@ -72,9 +72,10 @@ use mz_sql_parser::ast::{ PgConfigOptionName, ProtobufSchema, QualifiedReplica, RefreshAtOptionValue, RefreshEveryOptionValue, RefreshOptionValue, ReplicaDefinition, ReplicaOption, ReplicaOptionName, RoleAttribute, SetRoleVar, SourceErrorPolicy, SourceIncludeMetadata, - Statement, TableConstraint, TableFromSourceOption, TableFromSourceOptionName, TableOption, - TableOptionName, UnresolvedDatabaseName, UnresolvedItemName, UnresolvedObjectName, - UnresolvedSchemaName, Value, ViewDefinition, WithOptionValue, + Statement, TableConstraint, TableFromSourceColumns, TableFromSourceOption, + TableFromSourceOptionName, TableOption, TableOptionName, UnresolvedDatabaseName, + UnresolvedItemName, UnresolvedObjectName, UnresolvedSchemaName, Value, ViewDefinition, + WithOptionValue, }; use mz_sql_parser::ident; use mz_sql_parser::parser::StatementParseResult; @@ -1689,17 +1690,22 @@ pub fn plan_create_table_from_source( let source_connection = &source_item.source_desc()?.expect("is source").connection; // Some source-types (e.g. postgres, mysql, multi-output load-gen sources) define a value_schema - // during purification and populate the `columns` and `constraints` fields for the statement, + // during purification and define the `columns` and `constraints` fields for the statement, // whereas other source-types (e.g. kafka, single-output load-gen sources) do not, so instead // we use the source connection's default schema. - let (key_desc, value_desc) = if !columns.is_empty() || !constraints.is_empty() { - let desc = plan_source_export_desc(scx, name, columns, constraints)?; - (None, desc) - } else { - let key_desc = source_connection.default_key_desc(); - let value_desc = source_connection.default_value_desc(); - (Some(key_desc), value_desc) - }; + let (key_desc, value_desc) = + if matches!(columns, TableFromSourceColumns::Defined(_)) || !constraints.is_empty() { + let columns = match columns { + TableFromSourceColumns::Defined(columns) => columns, + _ => unreachable!(), + }; + let desc = plan_source_export_desc(scx, name, columns, constraints)?; + (None, desc) + } else { + let key_desc = source_connection.default_key_desc(); + let value_desc = source_connection.default_value_desc(); + (Some(key_desc), value_desc) + }; let metadata_columns_desc = match &details { SourceExportDetails::Kafka(KafkaSourceExportDetails { @@ -1708,7 +1714,7 @@ pub fn plan_create_table_from_source( _ => vec![], }; - let (desc, envelope, encoding) = apply_source_envelope_encoding( + let (mut desc, envelope, encoding) = apply_source_envelope_encoding( scx, &envelope, format, @@ -1719,6 +1725,9 @@ pub fn plan_create_table_from_source( metadata_columns_desc, source_connection, )?; + if let TableFromSourceColumns::Named(col_names) = columns { + plan_utils::maybe_rename_columns(format!("source table {}", name), &mut desc, col_names)?; + } let data_source = DataSourceDesc::IngestionExport { ingestion_id, diff --git a/src/sql/src/pure.rs b/src/sql/src/pure.rs index e2568dbe2f0ae..5ffe91b65c184 100644 --- a/src/sql/src/pure.rs +++ b/src/sql/src/pure.rs @@ -47,7 +47,7 @@ use mz_sql_parser::ast::{ MaterializedViewOption, MaterializedViewOptionName, MySqlConfigOption, MySqlConfigOptionName, PgConfigOption, PgConfigOptionName, RawItemName, ReaderSchemaSelectionStrategy, RefreshAtOptionValue, RefreshEveryOptionValue, RefreshOptionValue, SourceEnvelope, Statement, - TableFromSourceOption, TableFromSourceOptionName, UnresolvedItemName, + TableFromSourceColumns, TableFromSourceOption, TableFromSourceOptionName, UnresolvedItemName, }; use mz_storage_types::configuration::StorageConfiguration; use mz_storage_types::connections::inline::IntoInlineConnection; @@ -1386,7 +1386,7 @@ async fn purify_create_table_from_source( } = &mut stmt; // Columns and constraints cannot be specified by the user but will be populated below. - if !columns.is_empty() { + if matches!(columns, TableFromSourceColumns::Defined(_)) { sql_bail!("CREATE TABLE .. FROM SOURCE column definitions cannot be specified directly"); } if !constraints.is_empty() { @@ -1713,8 +1713,16 @@ async fn purify_create_table_from_source( ); } } - *columns = gen_columns; - *constraints = gen_constraints; + match columns { + TableFromSourceColumns::Defined(_) => unreachable!(), + TableFromSourceColumns::NotSpecified => { + *columns = TableFromSourceColumns::Defined(gen_columns); + *constraints = gen_constraints; + } + TableFromSourceColumns::Named(_) => { + sql_bail!("columns cannot be named for Postgres sources") + } + } with_options.push(TableFromSourceOption { name: TableFromSourceOptionName::Details, value: Some(WithOptionValue::Value(Value::String(hex::encode( @@ -1756,8 +1764,16 @@ async fn purify_create_table_from_source( ); } } - *columns = gen_columns; - *constraints = gen_constraints; + match columns { + TableFromSourceColumns::Defined(_) => unreachable!(), + TableFromSourceColumns::NotSpecified => { + *columns = TableFromSourceColumns::Defined(gen_columns); + *constraints = gen_constraints; + } + TableFromSourceColumns::Named(_) => { + sql_bail!("columns cannot be named for MySQL sources") + } + } with_options.push(TableFromSourceOption { name: TableFromSourceOptionName::Details, value: Some(WithOptionValue::Value(Value::String(hex::encode( @@ -1776,8 +1792,16 @@ async fn purify_create_table_from_source( // schema. if let Some(desc) = desc { let (gen_columns, gen_constraints) = scx.relation_desc_into_table_defs(&desc)?; - *columns = gen_columns; - *constraints = gen_constraints; + match columns { + TableFromSourceColumns::Defined(_) => unreachable!(), + TableFromSourceColumns::NotSpecified => { + *columns = TableFromSourceColumns::Defined(gen_columns); + *constraints = gen_constraints; + } + TableFromSourceColumns::Named(_) => { + sql_bail!("columns cannot be named for multi-output load generator sources") + } + } } let details = SourceExportStatementDetails::LoadGenerator { output }; with_options.push(TableFromSourceOption { diff --git a/test/testdrive/source-tables.td b/test/testdrive/source-tables.td index 8323cef3c63c5..b533167cdee39 100644 --- a/test/testdrive/source-tables.td +++ b/test/testdrive/source-tables.td @@ -338,6 +338,10 @@ $ kafka-ingest format=avro topic=avroavro key-format=avro key-schema=${keyschema FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn ENVELOPE NONE +> CREATE TABLE avro_table_append_cols (a, b) FROM SOURCE avro_source (REFERENCE "testdrive-avroavro-${testdrive.seed}") + FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn + ENVELOPE NONE + > SELECT * from avro_table_upsert key f1 f2 --------------------------- @@ -356,6 +360,17 @@ moose 1 moose 2 moose 42 +> SELECT * from avro_table_append_cols +a b +--------------- +fish 1000 +geese 2 +geese 56 +goose 1 +moose 1 +moose 2 +moose 42 + # # Key-value load generator source using source-fed tables