Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage/sources: Implement 'CREATE TABLE .. FROM SOURCE' parsing and planning for Kafka sources #29383

Merged
merged 7 commits into from
Sep 12, 2024
4 changes: 2 additions & 2 deletions src/sql-parser/src/ast/defs/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -414,11 +414,11 @@ impl<T: AstInfo> AstDisplay for FormatSpecifier<T> {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
match self {
FormatSpecifier::Bare(format) => {
f.write_str(" FORMAT ");
f.write_str("FORMAT ");
f.write_node(format)
}
FormatSpecifier::KeyValue { key, value } => {
f.write_str(" KEY FORMAT ");
f.write_str("KEY FORMAT ");
f.write_node(key);
f.write_str(" VALUE FORMAT ");
f.write_node(value);
Expand Down
30 changes: 26 additions & 4 deletions src/sql-parser/src/ast/defs/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1010,6 +1010,7 @@ impl<T: AstInfo> AstDisplay for CreateSourceStatement<T> {
f.write_str(" FROM ");
f.write_node(&self.connection);
if let Some(format) = &self.format {
f.write_str(" ");
f.write_node(format);
}
if !self.include_metadata.is_empty() {
Expand Down Expand Up @@ -1263,6 +1264,7 @@ impl<T: AstInfo> AstDisplay for CreateSinkStatement<T> {
f.write_str(" INTO ");
f.write_node(&self.connection);
if let Some(format) = &self.format {
f.write_str(" ");
f.write_node(format);
}
if let Some(envelope) = &self.envelope {
Expand Down Expand Up @@ -1560,8 +1562,11 @@ pub struct CreateTableFromSourceStatement<T: AstInfo> {
pub constraints: Vec<TableConstraint<T>>,
pub if_not_exists: bool,
pub source: T::ItemName,
pub external_reference: UnresolvedItemName,
pub external_reference: Option<UnresolvedItemName>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this PR implement the thing where if no external reference is provided and it's unambiguous it's accepted? I didn't see any test like that added but this change looks like we accept it now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline - it only implements that for Kafka and single-output load generator sources which only ever expose one output to reference. Will submit a future PR to make this uniform across all source types, such that you can omit a reference on a MySQL or Postgres source that only has one upstream table to ingest

pub with_options: Vec<TableFromSourceOption<T>>,
pub include_metadata: Vec<SourceIncludeMetadata>,
pub format: Option<FormatSpecifier<T>>,
pub envelope: Option<SourceEnvelope>,
}

impl<T: AstInfo> AstDisplay for CreateTableFromSourceStatement<T> {
Expand All @@ -1574,6 +1579,9 @@ impl<T: AstInfo> AstDisplay for CreateTableFromSourceStatement<T> {
external_reference,
if_not_exists,
with_options,
include_metadata,
format,
envelope,
} = self;
f.write_str("CREATE TABLE ");
if *if_not_exists {
Expand All @@ -1592,10 +1600,24 @@ impl<T: AstInfo> AstDisplay for CreateTableFromSourceStatement<T> {
}
f.write_str(" FROM SOURCE ");
f.write_node(source);
f.write_str(" (REFERENCE = ");
f.write_node(external_reference);
f.write_str(")");
if let Some(external_reference) = external_reference {
f.write_str(" (REFERENCE = ");
f.write_node(external_reference);
f.write_str(")");
}

if let Some(format) = &format {
f.write_str(" ");
f.write_node(format);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw that the ast printing of FormatSpecifier adds a leading space itself but I think the approach in the other types is that if a leading space is required then it's the outer's function responsibility. So I think there should be a f.write_str(" ") here and then the formatter of format should immediately begin with FORMAT.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense - pushed a commit to address this

}
if !include_metadata.is_empty() {
f.write_str(" INCLUDE ");
f.write_node(&display::comma_separated(include_metadata));
}
if let Some(envelope) = &envelope {
f.write_str(" ENVELOPE ");
f.write_node(envelope);
}
if !with_options.is_empty() {
f.write_str(" WITH (");
f.write_node(&display::comma_separated(with_options));
Expand Down
38 changes: 33 additions & 5 deletions src/sql-parser/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4398,11 +4398,36 @@ impl<'a> Parser<'a> {
self.expect_keywords(&[FROM, SOURCE])?;

let source = self.parse_raw_name()?;
self.expect_token(&Token::LParen)?;
self.expect_keyword(REFERENCE)?;
let _ = self.consume_token(&Token::Eq);
let external_reference = self.parse_item_name()?;
self.expect_token(&Token::RParen)?;

let external_reference = if self.consume_token(&Token::LParen) {
self.expect_keyword(REFERENCE)?;
let _ = self.consume_token(&Token::Eq);
let external_reference = self.parse_item_name()?;
self.expect_token(&Token::RParen)?;
Some(external_reference)
} else {
None
};

let format = match self.parse_one_of_keywords(&[KEY, FORMAT]) {
Some(KEY) => {
self.expect_keyword(FORMAT)?;
let key = self.parse_format()?;
self.expect_keywords(&[VALUE, FORMAT])?;
let value = self.parse_format()?;
Some(FormatSpecifier::KeyValue { key, value })
}
Some(FORMAT) => Some(FormatSpecifier::Bare(self.parse_format()?)),
Some(_) => unreachable!("parse_one_of_keywords returns None for this"),
None => None,
};
let include_metadata = self.parse_source_include_metadata()?;

let envelope = if self.parse_keyword(ENVELOPE) {
Some(self.parse_source_envelope()?)
} else {
None
};

let with_options = if self.parse_keyword(WITH) {
self.expect_token(&Token::LParen)?;
Expand All @@ -4421,6 +4446,9 @@ impl<'a> Parser<'a> {
if_not_exists,
source,
external_reference,
format,
include_metadata,
envelope,
with_options,
},
))
Expand Down
22 changes: 18 additions & 4 deletions src/sql-parser/tests/testdata/ddl
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ CREATE TABLE t (c int4, d int4) FROM SOURCE foo (REFERENCE bar)
----
CREATE TABLE t (c int4, d int4) FROM SOURCE foo (REFERENCE = bar)
=>
CreateTableFromSource(CreateTableFromSourceStatement { name: UnresolvedItemName([Ident("t")]), columns: [ColumnDef { name: Ident("c"), data_type: Other { name: Name(UnresolvedItemName([Ident("int4")])), typ_mod: [] }, collation: None, options: [] }, ColumnDef { name: Ident("d"), data_type: Other { name: Name(UnresolvedItemName([Ident("int4")])), typ_mod: [] }, collation: None, options: [] }], constraints: [], if_not_exists: false, source: Name(UnresolvedItemName([Ident("foo")])), external_reference: UnresolvedItemName([Ident("bar")]), with_options: [] })
CreateTableFromSource(CreateTableFromSourceStatement { name: UnresolvedItemName([Ident("t")]), columns: [ColumnDef { name: Ident("c"), data_type: Other { name: Name(UnresolvedItemName([Ident("int4")])), typ_mod: [] }, collation: None, options: [] }, ColumnDef { name: Ident("d"), data_type: Other { name: Name(UnresolvedItemName([Ident("int4")])), typ_mod: [] }, collation: None, options: [] }], constraints: [], if_not_exists: false, source: Name(UnresolvedItemName([Ident("foo")])), external_reference: Some(UnresolvedItemName([Ident("bar")])), with_options: [], include_metadata: [], format: None, envelope: None })

parse-statement
CREATE TABLE t (c, d) FROM SOURCE foo (REFERENCE bar)
Expand All @@ -265,16 +265,30 @@ CREATE TABLE t (c, d) FROM SOURCE foo (REFERENCE bar)
parse-statement
CREATE TABLE t FROM SOURCE foo
----
error: Expected left parenthesis, found EOF
CREATE TABLE t FROM SOURCE foo
^
=>
CreateTableFromSource(CreateTableFromSourceStatement { name: UnresolvedItemName([Ident("t")]), columns: [], constraints: [], if_not_exists: false, source: Name(UnresolvedItemName([Ident("foo")])), external_reference: None, with_options: [], include_metadata: [], format: None, envelope: None })

parse-statement
CREATE TABLE t FROM SOURCE foo (OPTION)
----
error: Expected REFERENCE, found identifier "option"
CREATE TABLE t FROM SOURCE foo (OPTION)
^

parse-statement
CREATE TABLE t FROM SOURCE foo (REFERENCE = baz)
----
CREATE TABLE t FROM SOURCE foo (REFERENCE = baz)
=>
CreateTableFromSource(CreateTableFromSourceStatement { name: UnresolvedItemName([Ident("t")]), columns: [], constraints: [], if_not_exists: false, source: Name(UnresolvedItemName([Ident("foo")])), external_reference: UnresolvedItemName([Ident("baz")]), with_options: [] })
CreateTableFromSource(CreateTableFromSourceStatement { name: UnresolvedItemName([Ident("t")]), columns: [], constraints: [], if_not_exists: false, source: Name(UnresolvedItemName([Ident("foo")])), external_reference: Some(UnresolvedItemName([Ident("baz")])), with_options: [], include_metadata: [], format: None, envelope: None })

parse-statement
CREATE TABLE t FROM SOURCE foo (REFERENCE = baz) FORMAT PROTOBUF USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn SEED KEY SCHEMA '{"some": "seed"}' MESSAGE 'Batch' VALUE SCHEMA '123' MESSAGE 'M' ENVELOPE UPSERT
----
CREATE TABLE t FROM SOURCE foo (REFERENCE = baz) FORMAT PROTOBUF USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn SEED KEY SCHEMA '{"some": "seed"}' MESSAGE 'Batch' VALUE SCHEMA '123' MESSAGE 'M' ENVELOPE UPSERT
=>
CreateTableFromSource(CreateTableFromSourceStatement { name: UnresolvedItemName([Ident("t")]), columns: [], constraints: [], if_not_exists: false, source: Name(UnresolvedItemName([Ident("foo")])), external_reference: Some(UnresolvedItemName([Ident("baz")])), with_options: [], include_metadata: [], format: Some(Bare(Protobuf(Csr { csr_connection: CsrConnectionProtobuf { connection: CsrConnection { connection: Name(UnresolvedItemName([Ident("csr_conn")])), options: [] }, seed: Some(CsrSeedProtobuf { key: Some(CsrSeedProtobufSchema { schema: "{\"some\": \"seed\"}", message_name: "Batch" }), value: CsrSeedProtobufSchema { schema: "123", message_name: "M" } }) } }))), envelope: Some(Upsert { value_decode_err_policy: [] }) })

parse-statement
CREATE DATABASE IF EXISTS foo
Expand Down
3 changes: 3 additions & 0 deletions src/sql/src/normalize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,9 @@ pub fn create_statement(
external_reference: _,
source: _,
if_not_exists,
format: _,
include_metadata: _,
envelope: _,
with_options: _,
}) => {
*name = allocate_name(name)?;
Expand Down
Loading
Loading