diff --git a/test/0dt/mzcompose.py b/test/0dt/mzcompose.py index 52acaada2ec9b..8547bf0ebb613 100644 --- a/test/0dt/mzcompose.py +++ b/test/0dt/mzcompose.py @@ -1388,3 +1388,137 @@ def get_correction_metrics(): raise AssertionError( f"unexpected correction metrics: {insertions=}, {deletions=}" ) + + +def workflow_upsert_sources(c: Composition) -> None: + c.down(destroy_volumes=True) + c.up("zookeeper", "kafka", "schema-registry", "postgres", "mysql", "mz_old") + c.up("testdrive", persistent=True) + + c.sql( + """ + DROP CLUSTER IF EXISTS cluster CASCADE; + CREATE CLUSTER cluster SIZE '2-1'; + GRANT ALL ON CLUSTER cluster TO materialize; + ALTER SYSTEM SET cluster = cluster; + ALTER SYSTEM SET max_sources = 100000; + """, + service="mz_old", + port=6877, + user="mz_system", + ) + + c.testdrive( + dedent( + """ + > SET CLUSTER = cluster; + > CREATE TABLE t (a int, b int); + + > CREATE CONNECTION IF NOT EXISTS kafka_conn FOR KAFKA BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL = 'PLAINTEXT'; + > CREATE CONNECTION IF NOT EXISTS csr_conn FOR CONFLUENT SCHEMA REGISTRY URL '${testdrive.schema-registry-url}'; + """ + ) + ) + + i = 1 + mz1 = "mz_old" + mz2 = "mz_new" + while True: + with c.override( + Materialized( + name=mz2, + sanity_restart=False, + deploy_generation=i, + system_parameter_defaults=SYSTEM_PARAMETER_DEFAULTS, + restart="on-failure", + external_metadata_store=True, + ), + Testdrive( + materialize_url=f"postgres://materialize@{mz1}:6875", + materialize_url_internal=f"postgres://materialize@{mz1}:6877", + mz_service=mz1, + materialize_params={"cluster": "cluster"}, + no_reset=True, + seed=1, + default_timeout=DEFAULT_TIMEOUT, + ), + ): + c.up(mz2) + + c.testdrive( + dedent( + f""" + $ kafka-create-topic topic=kafka{i} + $ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=kafka{i} + key1A,key1B:value1A,value1B + > CREATE SOURCE kafka_source{i} + IN CLUSTER cluster + FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-kafka{i}-${{testdrive.seed}}'); + > CREATE TABLE kafka_source_tbl{i} (key1, key2, value1, value2) + FROM SOURCE kafka_source{i} (REFERENCE "testdrive-kafka{i}-${{testdrive.seed}}") + KEY FORMAT CSV WITH 2 COLUMNS DELIMITED BY ',' + VALUE FORMAT CSV WITH 2 COLUMNS DELIMITED BY ',' + ENVELOPE UPSERT; + + > CREATE MATERIALIZED VIEW mv{i} AS SELECT * FROM kafka_source_tbl{i} + """ + ) + ) + + for j in range(1, i + 1): + c.testdrive( + dedent( + f""" + $ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=kafka{j} + key{i}A,key{i}B:value2A,value2B + $ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=kafka{j} + key3A,key3B:value{i}A,value3B + + > SELECT count(*) > 0 FROM kafka_source_tbl{j} + true + + > SELECT count(*) > 0 FROM mv{i} + true + """ + ) + ) + + c.await_mz_deployment_status(DeploymentStatus.READY_TO_PROMOTE, mz2) + + with c.override( + Testdrive( + materialize_url=f"postgres://materialize@{mz2}:6875", + materialize_url_internal=f"postgres://materialize@{mz2}:6877", + mz_service=mz2, + materialize_params={"cluster": "cluster"}, + no_reset=True, + seed=1, + default_timeout=DEFAULT_TIMEOUT, + ), + ): + for j in range(1, i + 1): + c.testdrive( + dedent( + f""" + $ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=kafka{j} + key4A,key4B:value4A,value4B + + $ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=kafka{j} + key2A,key2B:value5A,value5B + $ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=kafka{j} + key3A,key3B:value6A,value6B + + > SELECT count(*) > 0 FROM kafka_source_tbl{j} + true + + > SELECT count(*) > 0 FROM mv{j} + true + """ + ) + ) + + c.promote_mz(mz2) + c.await_mz_deployment_status(DeploymentStatus.IS_LEADER, mz2) + + mz1, mz2 = mz2, mz1 + i += 1