Skip to content

Commit

Permalink
storage/sources: Allow user-specified column names in CREATE TABLE FR…
Browse files Browse the repository at this point in the history
…OM SOURCE statements
  • Loading branch information
rjobanp committed Oct 15, 2024
1 parent 1d66921 commit 1e4ee25
Show file tree
Hide file tree
Showing 7 changed files with 207 additions and 40 deletions.
34 changes: 30 additions & 4 deletions src/sql-parser/src/ast/defs/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1614,12 +1614,30 @@ pub struct TableFromSourceOption<T: AstInfo> {
}
impl_display_for_with_option!(TableFromSourceOption);

/// `CREATE TABLE .. FROM SOURCE` columns specification
/// can have 3 states:
/// Before purification they can be `NotSpecified` or `Named`
/// by the user to specify the column names to use.
/// After purification they can be in any of the 3 states.
/// For some source types we define the columns during purification
/// and for others the columns are defined during planning based
/// on the encoding option of the source.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum TableFromSourceColumns<T: AstInfo> {
/// The user did not specify which columns to use.
NotSpecified,
/// The user requested the named columns. Only compatible
/// with source types that allow user-specified column names.
Named(Vec<Ident>),
/// Columns defined during purification for some source types.
Defined(Vec<ColumnDef<T>>),
}

/// `CREATE TABLE .. FROM SOURCE`
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct CreateTableFromSourceStatement<T: AstInfo> {
/// Table name
pub name: UnresolvedItemName,
pub columns: Vec<ColumnDef<T>>,
pub columns: TableFromSourceColumns<T>,
pub constraints: Vec<TableConstraint<T>>,
pub if_not_exists: bool,
pub source: T::ItemName,
Expand Down Expand Up @@ -1649,10 +1667,18 @@ impl<T: AstInfo> AstDisplay for CreateTableFromSourceStatement<T> {
f.write_str("IF NOT EXISTS ");
}
f.write_node(name);
if !columns.is_empty() || !constraints.is_empty() {
if !matches!(columns, TableFromSourceColumns::NotSpecified) || !constraints.is_empty() {
f.write_str(" (");

f.write_node(&display::comma_separated(columns));
match columns {
TableFromSourceColumns::NotSpecified => unreachable!(),
TableFromSourceColumns::Named(columns) => {
f.write_node(&display::comma_separated(columns))
}
TableFromSourceColumns::Defined(columns) => {
f.write_node(&display::comma_separated(columns))
}
};
if !constraints.is_empty() {
f.write_str(", ");
f.write_node(&display::comma_separated(constraints));
Expand Down
89 changes: 86 additions & 3 deletions src/sql-parser/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4476,9 +4476,7 @@ impl<'a> Parser<'a> {
self.expect_keyword(TABLE)?;
let if_not_exists = self.parse_if_not_exists()?;
let table_name = self.parse_item_name()?;
// Columns are not specified by users, but are populated in this
// statement after purification, so Optional is used.
let (columns, constraints) = self.parse_columns(Optional)?;
let (columns, constraints) = self.parse_table_from_source_columns()?;

self.expect_keywords(&[FROM, SOURCE])?;

Expand Down Expand Up @@ -4575,6 +4573,91 @@ impl<'a> Parser<'a> {
Ok(option)
}

fn parse_table_from_source_columns(
&mut self,
) -> Result<(TableFromSourceColumns<Raw>, Vec<TableConstraint<Raw>>), ParserError> {
let mut constraints = vec![];

if !self.consume_token(&Token::LParen) {
return Ok((TableFromSourceColumns::NotSpecified, constraints));
}
if self.consume_token(&Token::RParen) {
// Tables with zero columns are a PostgreSQL extension.
return Ok((TableFromSourceColumns::NotSpecified, constraints));
}

let mut column_names = vec![];
let mut column_defs = vec![];
loop {
if let Some(constraint) = self.parse_optional_table_constraint()? {
constraints.push(constraint);
} else if let Some(column_name) = self.consume_identifier()? {
let next_token = self.peek_token();
match next_token {
Some(Token::Comma) | Some(Token::RParen) => {
column_names.push(column_name);
}
_ => {
let data_type = self.parse_data_type()?;
let collation = if self.parse_keyword(COLLATE) {
Some(self.parse_item_name()?)
} else {
None
};
let mut options = vec![];
loop {
match self.peek_token() {
None | Some(Token::Comma) | Some(Token::RParen) => break,
_ => options.push(self.parse_column_option_def()?),
}
}

column_defs.push(ColumnDef {
name: column_name,
data_type,
collation,
options,
});
}
}
} else {
return self.expected(
self.peek_pos(),
"column name or constraint definition",
self.peek_token(),
);
}
if self.consume_token(&Token::Comma) {
// Continue.
} else if self.consume_token(&Token::RParen) {
break;
} else {
return self.expected(
self.peek_pos(),
"',' or ')' after column definition",
self.peek_token(),
);
}
}
if !column_defs.is_empty() && !column_names.is_empty() {
return parser_err!(
self,
self.peek_prev_pos(),
"cannot mix column definitions and column names"
);
}

let columns = match column_defs.is_empty() {
true => match column_names.is_empty() {
true => TableFromSourceColumns::NotSpecified,
false => TableFromSourceColumns::Named(column_names),
},
false => TableFromSourceColumns::Defined(column_defs),
};

Ok((columns, constraints))
}

fn parse_columns(
&mut self,
optional: IsOptional,
Expand Down
24 changes: 16 additions & 8 deletions src/sql-parser/tests/testdata/ddl
Original file line number Diff line number Diff line change
Expand Up @@ -253,21 +253,29 @@ CREATE TABLE t (c int4, d int4) FROM SOURCE foo (REFERENCE bar)
----
CREATE TABLE t (c int4, d int4) FROM SOURCE foo (REFERENCE = bar)
=>
CreateTableFromSource(CreateTableFromSourceStatement { name: UnresolvedItemName([Ident("t")]), columns: [ColumnDef { name: Ident("c"), data_type: Other { name: Name(UnresolvedItemName([Ident("int4")])), typ_mod: [] }, collation: None, options: [] }, ColumnDef { name: Ident("d"), data_type: Other { name: Name(UnresolvedItemName([Ident("int4")])), typ_mod: [] }, collation: None, options: [] }], constraints: [], if_not_exists: false, source: Name(UnresolvedItemName([Ident("foo")])), external_reference: Some(UnresolvedItemName([Ident("bar")])), with_options: [], include_metadata: [], format: None, envelope: None })
CreateTableFromSource(CreateTableFromSourceStatement { name: UnresolvedItemName([Ident("t")]), columns: Defined([ColumnDef { name: Ident("c"), data_type: Other { name: Name(UnresolvedItemName([Ident("int4")])), typ_mod: [] }, collation: None, options: [] }, ColumnDef { name: Ident("d"), data_type: Other { name: Name(UnresolvedItemName([Ident("int4")])), typ_mod: [] }, collation: None, options: [] }]), constraints: [], if_not_exists: false, source: Name(UnresolvedItemName([Ident("foo")])), external_reference: Some(UnresolvedItemName([Ident("bar")])), with_options: [], include_metadata: [], format: None, envelope: None })

parse-statement
CREATE TABLE t (c, d) FROM SOURCE foo (REFERENCE bar)
----
error: Expected a data type name, found comma
CREATE TABLE t (c, d) FROM SOURCE foo (REFERENCE bar)
^
CREATE TABLE t (c, d) FROM SOURCE foo (REFERENCE = bar)
=>
CreateTableFromSource(CreateTableFromSourceStatement { name: UnresolvedItemName([Ident("t")]), columns: Named([Ident("c"), Ident("d")]), constraints: [], if_not_exists: false, source: Name(UnresolvedItemName([Ident("foo")])), external_reference: Some(UnresolvedItemName([Ident("bar")])), with_options: [], include_metadata: [], format: None, envelope: None })


parse-statement
CREATE TABLE t (c, d int4) FROM SOURCE foo (REFERENCE bar)
----
error: cannot mix column definitions and column names
CREATE TABLE t (c, d int4) FROM SOURCE foo (REFERENCE bar)
^

parse-statement
CREATE TABLE t FROM SOURCE foo
----
CREATE TABLE t FROM SOURCE foo
=>
CreateTableFromSource(CreateTableFromSourceStatement { name: UnresolvedItemName([Ident("t")]), columns: [], constraints: [], if_not_exists: false, source: Name(UnresolvedItemName([Ident("foo")])), external_reference: None, with_options: [], include_metadata: [], format: None, envelope: None })
CreateTableFromSource(CreateTableFromSourceStatement { name: UnresolvedItemName([Ident("t")]), columns: NotSpecified, constraints: [], if_not_exists: false, source: Name(UnresolvedItemName([Ident("foo")])), external_reference: None, with_options: [], include_metadata: [], format: None, envelope: None })

parse-statement
CREATE TABLE t FROM SOURCE foo (OPTION)
Expand All @@ -281,21 +289,21 @@ CREATE TABLE t FROM SOURCE foo (REFERENCE = baz)
----
CREATE TABLE t FROM SOURCE foo (REFERENCE = baz)
=>
CreateTableFromSource(CreateTableFromSourceStatement { name: UnresolvedItemName([Ident("t")]), columns: [], constraints: [], if_not_exists: false, source: Name(UnresolvedItemName([Ident("foo")])), external_reference: Some(UnresolvedItemName([Ident("baz")])), with_options: [], include_metadata: [], format: None, envelope: None })
CreateTableFromSource(CreateTableFromSourceStatement { name: UnresolvedItemName([Ident("t")]), columns: NotSpecified, constraints: [], if_not_exists: false, source: Name(UnresolvedItemName([Ident("foo")])), external_reference: Some(UnresolvedItemName([Ident("baz")])), with_options: [], include_metadata: [], format: None, envelope: None })

parse-statement
CREATE TABLE t FROM SOURCE foo (REFERENCE = baz) WITH (TEXT COLUMNS (bam))
----
CREATE TABLE t FROM SOURCE foo (REFERENCE = baz) WITH (TEXT COLUMNS = (bam))
=>
CreateTableFromSource(CreateTableFromSourceStatement { name: UnresolvedItemName([Ident("t")]), columns: [], constraints: [], if_not_exists: false, source: Name(UnresolvedItemName([Ident("foo")])), external_reference: Some(UnresolvedItemName([Ident("baz")])), with_options: [TableFromSourceOption { name: TextColumns, value: Some(Sequence([Ident(Ident("bam"))])) }], include_metadata: [], format: None, envelope: None })
CreateTableFromSource(CreateTableFromSourceStatement { name: UnresolvedItemName([Ident("t")]), columns: NotSpecified, constraints: [], if_not_exists: false, source: Name(UnresolvedItemName([Ident("foo")])), external_reference: Some(UnresolvedItemName([Ident("baz")])), with_options: [TableFromSourceOption { name: TextColumns, value: Some(Sequence([Ident(Ident("bam"))])) }], include_metadata: [], format: None, envelope: None })

parse-statement
CREATE TABLE t FROM SOURCE foo (REFERENCE = baz) FORMAT PROTOBUF USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn SEED KEY SCHEMA '{"some": "seed"}' MESSAGE 'Batch' VALUE SCHEMA '123' MESSAGE 'M' ENVELOPE UPSERT
----
CREATE TABLE t FROM SOURCE foo (REFERENCE = baz) FORMAT PROTOBUF USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn SEED KEY SCHEMA '{"some": "seed"}' MESSAGE 'Batch' VALUE SCHEMA '123' MESSAGE 'M' ENVELOPE UPSERT
=>
CreateTableFromSource(CreateTableFromSourceStatement { name: UnresolvedItemName([Ident("t")]), columns: [], constraints: [], if_not_exists: false, source: Name(UnresolvedItemName([Ident("foo")])), external_reference: Some(UnresolvedItemName([Ident("baz")])), with_options: [], include_metadata: [], format: Some(Bare(Protobuf(Csr { csr_connection: CsrConnectionProtobuf { connection: CsrConnection { connection: Name(UnresolvedItemName([Ident("csr_conn")])), options: [] }, seed: Some(CsrSeedProtobuf { key: Some(CsrSeedProtobufSchema { schema: "{\"some\": \"seed\"}", message_name: "Batch" }), value: CsrSeedProtobufSchema { schema: "123", message_name: "M" } }) } }))), envelope: Some(Upsert { value_decode_err_policy: [] }) })
CreateTableFromSource(CreateTableFromSourceStatement { name: UnresolvedItemName([Ident("t")]), columns: NotSpecified, constraints: [], if_not_exists: false, source: Name(UnresolvedItemName([Ident("foo")])), external_reference: Some(UnresolvedItemName([Ident("baz")])), with_options: [], include_metadata: [], format: Some(Bare(Protobuf(Csr { csr_connection: CsrConnectionProtobuf { connection: CsrConnection { connection: Name(UnresolvedItemName([Ident("csr_conn")])), options: [] }, seed: Some(CsrSeedProtobuf { key: Some(CsrSeedProtobufSchema { schema: "{\"some\": \"seed\"}", message_name: "Batch" }), value: CsrSeedProtobufSchema { schema: "123", message_name: "M" } }) } }))), envelope: Some(Upsert { value_decode_err_policy: [] }) })

parse-statement
CREATE DATABASE IF EXISTS foo
Expand Down
10 changes: 6 additions & 4 deletions src/sql/src/normalize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ use mz_sql_parser::ast::{
CreateSinkStatement, CreateSourceStatement, CreateSubsourceStatement,
CreateTableFromSourceStatement, CreateTableStatement, CreateTypeStatement, CreateViewStatement,
CreateWebhookSourceStatement, CteBlock, Function, FunctionArgs, Ident, IfExistsBehavior,
MutRecBlock, Op, Query, Statement, TableFactor, UnresolvedItemName, UnresolvedSchemaName,
Value, ViewDefinition,
MutRecBlock, Op, Query, Statement, TableFactor, TableFromSourceColumns, UnresolvedItemName,
UnresolvedSchemaName, Value, ViewDefinition,
};

use crate::names::{Aug, FullItemName, PartialItemName, PartialSchemaName, RawDatabaseSpecifier};
Expand Down Expand Up @@ -312,8 +312,10 @@ pub fn create_statement(
}) => {
*name = allocate_name(name)?;
let mut normalizer = QueryNormalizer::new();
for c in columns {
normalizer.visit_column_def_mut(c);
if let TableFromSourceColumns::Defined(columns) = columns {
for c in columns {
normalizer.visit_column_def_mut(c);
}
}
if let Some(err) = normalizer.err {
return Err(err);
Expand Down
35 changes: 22 additions & 13 deletions src/sql/src/plan/statement/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,10 @@ use mz_sql_parser::ast::{
PgConfigOptionName, ProtobufSchema, QualifiedReplica, RefreshAtOptionValue,
RefreshEveryOptionValue, RefreshOptionValue, ReplicaDefinition, ReplicaOption,
ReplicaOptionName, RoleAttribute, SetRoleVar, SourceErrorPolicy, SourceIncludeMetadata,
Statement, TableConstraint, TableFromSourceOption, TableFromSourceOptionName, TableOption,
TableOptionName, UnresolvedDatabaseName, UnresolvedItemName, UnresolvedObjectName,
UnresolvedSchemaName, Value, ViewDefinition, WithOptionValue,
Statement, TableConstraint, TableFromSourceColumns, TableFromSourceOption,
TableFromSourceOptionName, TableOption, TableOptionName, UnresolvedDatabaseName,
UnresolvedItemName, UnresolvedObjectName, UnresolvedSchemaName, Value, ViewDefinition,
WithOptionValue,
};
use mz_sql_parser::ident;
use mz_sql_parser::parser::StatementParseResult;
Expand Down Expand Up @@ -1689,17 +1690,22 @@ pub fn plan_create_table_from_source(
let source_connection = &source_item.source_desc()?.expect("is source").connection;

// Some source-types (e.g. postgres, mysql, multi-output load-gen sources) define a value_schema
// during purification and populate the `columns` and `constraints` fields for the statement,
// during purification and define the `columns` and `constraints` fields for the statement,
// whereas other source-types (e.g. kafka, single-output load-gen sources) do not, so instead
// we use the source connection's default schema.
let (key_desc, value_desc) = if !columns.is_empty() || !constraints.is_empty() {
let desc = plan_source_export_desc(scx, name, columns, constraints)?;
(None, desc)
} else {
let key_desc = source_connection.default_key_desc();
let value_desc = source_connection.default_value_desc();
(Some(key_desc), value_desc)
};
let (key_desc, value_desc) =
if matches!(columns, TableFromSourceColumns::Defined(_)) || !constraints.is_empty() {
let columns = match columns {
TableFromSourceColumns::Defined(columns) => columns,
_ => unreachable!(),
};
let desc = plan_source_export_desc(scx, name, columns, constraints)?;
(None, desc)
} else {
let key_desc = source_connection.default_key_desc();
let value_desc = source_connection.default_value_desc();
(Some(key_desc), value_desc)
};

let metadata_columns_desc = match &details {
SourceExportDetails::Kafka(KafkaSourceExportDetails {
Expand All @@ -1708,7 +1714,7 @@ pub fn plan_create_table_from_source(
_ => vec![],
};

let (desc, envelope, encoding) = apply_source_envelope_encoding(
let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
scx,
&envelope,
format,
Expand All @@ -1719,6 +1725,9 @@ pub fn plan_create_table_from_source(
metadata_columns_desc,
source_connection,
)?;
if let TableFromSourceColumns::Named(col_names) = columns {
plan_utils::maybe_rename_columns(format!("source table {}", name), &mut desc, col_names)?;
}

let data_source = DataSourceDesc::IngestionExport {
ingestion_id,
Expand Down
Loading

0 comments on commit 1e4ee25

Please sign in to comment.