Skip to content

Commit

Permalink
Add tests for new source table types
Browse files Browse the repository at this point in the history
  • Loading branch information
rjobanp committed Sep 9, 2024
1 parent f635591 commit a0ac3b6
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 2 deletions.
1 change: 1 addition & 0 deletions src/storage-types/src/sources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2118,6 +2118,7 @@ mod tests {
#[mz_ore::test]
#[cfg_attr(miri, ignore)]
fn backward_compatible_migrate_from_common() {
use mz_repr::ColumnType;
fn test_case(old: RelationDesc, diffs: Vec<PropRelationDescDiff>, datas: Vec<SourceData>) {
// TODO(parkmycar): As we iterate on schema migrations more things should become compatible.
let should_be_compatible = diffs.iter().all(|diff| match diff {
Expand Down
2 changes: 1 addition & 1 deletion src/storage-types/src/sources/kafka.proto
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,4 @@ message ProtoKafkaHeader {
// statement options
// Be extra careful about changes, ensuring that all changes are backwards
// compatible
message ProtoKafkaSourceExportStatementDetails {}
message ProtoKafkaSourceExportStatementDetails {}
92 changes: 91 additions & 1 deletion test/testdrive/source-tables.td
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.mater
ALTER SYSTEM SET enable_create_table_from_source = true

#
# Load generator source using source-fed tables
# Multi-output load generator source using source-fed tables
#

> CREATE SOURCE auction_house
Expand All @@ -53,6 +53,26 @@ ALTER SYSTEM SET enable_create_table_from_source = true

> DROP SOURCE auction_house CASCADE;

#
# Single-output load generator source using source-fed tables
#

> CREATE SOURCE counter
IN CLUSTER ${arg.single-replica-cluster}
FROM LOAD GENERATOR COUNTER (AS OF 4, UP TO 5);

> CREATE TABLE counter_1 FROM SOURCE counter (REFERENCE "counter");

> CREATE TABLE counter_2 FROM SOURCE counter (REFERENCE "counter");

> SELECT count(*) from counter_1;
5

> SELECT count(*) from counter_2;
5

> DROP SOURCE counter CASCADE;


#
# Postgres source using source-fed tables
Expand Down Expand Up @@ -252,3 +272,73 @@ var0 5555 6666
var1 4444 12

> DROP SOURCE mysql_source CASCADE;

#
# Kafka source using source-fed tables
#

$ set keyschema={
"type": "record",
"name": "Key",
"fields": [
{"name": "key", "type": "string"}
]
}

$ set schema={
"type" : "record",
"name" : "test",
"fields" : [
{"name":"f1", "type":"string"},
{"name":"f2", "type":"long"}
]
}

> CREATE CONNECTION kafka_conn
TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);

> CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
URL '${testdrive.schema-registry-url}'
);

$ kafka-create-topic topic=avroavro

$ kafka-ingest format=avro topic=avroavro key-format=avro key-schema=${keyschema} schema=${schema}
{"key": "fish"} {"f1": "fish", "f2": 1000}
{"key": "bird1"} {"f1":"goose", "f2": 1}
{"key": "birdmore"} {"f1":"geese", "f2": 2}
{"key": "mammal1"} {"f1": "moose", "f2": 1}
{"key": "bird1"}
{"key": "birdmore"} {"f1":"geese", "f2": 56}
{"key": "mammalmore"} {"f1": "moose", "f2": 42}
{"key": "mammal1"}
{"key": "mammalmore"} {"f1":"moose", "f2": 2}

> CREATE SOURCE avro_source
FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-avroavro-${testdrive.seed}');

> CREATE TABLE avro_table_upsert FROM SOURCE avro_source (REFERENCE "testdrive-avroavro-${testdrive.seed}")
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
ENVELOPE UPSERT

> CREATE TABLE avro_table_append FROM SOURCE avro_source (REFERENCE "testdrive-avroavro-${testdrive.seed}")
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
ENVELOPE NONE

> SELECT * from avro_table_upsert
key f1 f2
---------------------------
fish fish 1000
birdmore geese 56
mammalmore moose 2

> SELECT * from avro_table_append
f1 f2
---------------
fish 1000
geese 2
geese 56
goose 1
moose 1
moose 2
moose 42

0 comments on commit a0ac3b6

Please sign in to comment.