diff --git a/src/storage-types/src/sources.rs b/src/storage-types/src/sources.rs index a8fa7b97abf79..f14965908f558 100644 --- a/src/storage-types/src/sources.rs +++ b/src/storage-types/src/sources.rs @@ -2118,6 +2118,7 @@ mod tests { #[mz_ore::test] #[cfg_attr(miri, ignore)] fn backward_compatible_migrate_from_common() { + use mz_repr::ColumnType; fn test_case(old: RelationDesc, diffs: Vec, datas: Vec) { // TODO(parkmycar): As we iterate on schema migrations more things should become compatible. let should_be_compatible = diffs.iter().all(|diff| match diff { diff --git a/src/storage-types/src/sources/kafka.proto b/src/storage-types/src/sources/kafka.proto index 3f1b30b9edf69..ff1bb37b405a2 100644 --- a/src/storage-types/src/sources/kafka.proto +++ b/src/storage-types/src/sources/kafka.proto @@ -56,4 +56,4 @@ message ProtoKafkaHeader { // statement options // Be extra careful about changes, ensuring that all changes are backwards // compatible -message ProtoKafkaSourceExportStatementDetails {} \ No newline at end of file +message ProtoKafkaSourceExportStatementDetails {} diff --git a/test/testdrive/source-tables.td b/test/testdrive/source-tables.td index d3726fd4b096f..ee2fac537b24a 100644 --- a/test/testdrive/source-tables.td +++ b/test/testdrive/source-tables.td @@ -31,7 +31,7 @@ $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.mater ALTER SYSTEM SET enable_create_table_from_source = true # -# Load generator source using source-fed tables +# Multi-output load generator source using source-fed tables # > CREATE SOURCE auction_house @@ -53,6 +53,26 @@ ALTER SYSTEM SET enable_create_table_from_source = true > DROP SOURCE auction_house CASCADE; +# +# Single-output load generator source using source-fed tables +# + +> CREATE SOURCE counter + IN CLUSTER ${arg.single-replica-cluster} + FROM LOAD GENERATOR COUNTER (AS OF 4, UP TO 5); + +> CREATE TABLE counter_1 FROM SOURCE counter (REFERENCE "counter"); + +> CREATE TABLE counter_2 FROM SOURCE counter (REFERENCE "counter"); + +> SELECT count(*) from counter_1; +5 + +> SELECT count(*) from counter_2; +5 + +> DROP SOURCE counter CASCADE; + # # Postgres source using source-fed tables @@ -252,3 +272,73 @@ var0 5555 6666 var1 4444 12 > DROP SOURCE mysql_source CASCADE; + +# +# Kafka source using source-fed tables +# + +$ set keyschema={ + "type": "record", + "name": "Key", + "fields": [ + {"name": "key", "type": "string"} + ] + } + +$ set schema={ + "type" : "record", + "name" : "test", + "fields" : [ + {"name":"f1", "type":"string"}, + {"name":"f2", "type":"long"} + ] + } + +> CREATE CONNECTION kafka_conn + TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT); + +> CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY ( + URL '${testdrive.schema-registry-url}' + ); + +$ kafka-create-topic topic=avroavro + +$ kafka-ingest format=avro topic=avroavro key-format=avro key-schema=${keyschema} schema=${schema} +{"key": "fish"} {"f1": "fish", "f2": 1000} +{"key": "bird1"} {"f1":"goose", "f2": 1} +{"key": "birdmore"} {"f1":"geese", "f2": 2} +{"key": "mammal1"} {"f1": "moose", "f2": 1} +{"key": "bird1"} +{"key": "birdmore"} {"f1":"geese", "f2": 56} +{"key": "mammalmore"} {"f1": "moose", "f2": 42} +{"key": "mammal1"} +{"key": "mammalmore"} {"f1":"moose", "f2": 2} + +> CREATE SOURCE avro_source + FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-avroavro-${testdrive.seed}'); + +> CREATE TABLE avro_table_upsert FROM SOURCE avro_source (REFERENCE "testdrive-avroavro-${testdrive.seed}") + FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn + ENVELOPE UPSERT + +> CREATE TABLE avro_table_append FROM SOURCE avro_source (REFERENCE "testdrive-avroavro-${testdrive.seed}") + FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn + ENVELOPE NONE + +> SELECT * from avro_table_upsert +key f1 f2 +--------------------------- +fish fish 1000 +birdmore geese 56 +mammalmore moose 2 + +> SELECT * from avro_table_append +f1 f2 +--------------- +fish 1000 +geese 2 +geese 56 +goose 1 +moose 1 +moose 2 +moose 42