diff --git a/ci/nightly/pipeline.template.yml b/ci/nightly/pipeline.template.yml index fe7605955a5d9..31378ccb03455 100644 --- a/ci/nightly/pipeline.template.yml +++ b/ci/nightly/pipeline.template.yml @@ -1696,7 +1696,7 @@ steps: - ./ci/plugins/mzcompose: composition: 0dt agents: - queue: hetzner-aarch64-8cpu-16gb + queue: hetzner-aarch64-16cpu-32gb - id: emulator label: Materialize Emulator diff --git a/test/0dt/mzcompose.py b/test/0dt/mzcompose.py index b66b2258b00d2..3c2d45c372ace 100644 --- a/test/0dt/mzcompose.py +++ b/test/0dt/mzcompose.py @@ -13,7 +13,9 @@ """ import time +from datetime import datetime, timedelta from textwrap import dedent +from threading import Thread from psycopg.errors import OperationalError @@ -1388,3 +1390,113 @@ def get_correction_metrics(): raise AssertionError( f"unexpected correction metrics: {insertions=}, {deletions=}" ) + + +def workflow_upsert_sources(c: Composition) -> None: + c.down(destroy_volumes=True) + c.up("zookeeper", "kafka", "schema-registry", "postgres", "mysql", "mz_old") + c.up("testdrive", persistent=True) + num_threads = 500 + + c.sql( + f""" + DROP CLUSTER IF EXISTS cluster CASCADE; + CREATE CLUSTER cluster SIZE '2-1'; + GRANT ALL ON CLUSTER cluster TO materialize; + ALTER SYSTEM SET cluster = cluster; + ALTER SYSTEM SET max_sources = {num_threads * 2}; + ALTER SYSTEM SET max_materialized_views = {num_threads * 2}; + """, + service="mz_old", + port=6877, + user="mz_system", + ) + + c.testdrive( + dedent( + """ + > SET CLUSTER = cluster; + + > CREATE CONNECTION IF NOT EXISTS kafka_conn FOR KAFKA BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL = 'PLAINTEXT'; + > CREATE CONNECTION IF NOT EXISTS csr_conn FOR CONFLUENT SCHEMA REGISTRY URL '${testdrive.schema-registry-url}'; + """ + ) + ) + + end_time = datetime.now() + timedelta(seconds=600) + mz1 = "mz_old" + mz2 = "mz_new" + + def worker(i: int) -> None: + c.testdrive( + dedent( + f""" + $ kafka-create-topic topic=kafka{i} + > CREATE SOURCE kafka_source{i} + IN CLUSTER cluster + FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-kafka{i}-${{testdrive.seed}}'); + > CREATE TABLE kafka_source_tbl{i} (key1, key2, value1, value2) + FROM SOURCE kafka_source{i} (REFERENCE "testdrive-kafka{i}-${{testdrive.seed}}") + KEY FORMAT CSV WITH 2 COLUMNS DELIMITED BY ',' + VALUE FORMAT CSV WITH 2 COLUMNS DELIMITED BY ',' + ENVELOPE UPSERT; + + > CREATE DEFAULT INDEX ON kafka_source_tbl{i} + > CREATE MATERIALIZED VIEW mv{i} AS SELECT * FROM kafka_source_tbl{i} + """ + ) + ) + + while datetime.now() < end_time: + try: + c.testdrive( + dedent( + f""" + $ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=kafka{i} repeat=10000 + key1A,key1B:value1A,value1B + """ + ) + ) + except: + pass + + threads = [] + for i in range(num_threads): + thread = Thread(name=f"worker_{i}", target=worker, args=(i,)) + threads.append(thread) + + for thread in threads: + thread.start() + + i = 1 + while datetime.now() < end_time: + with c.override( + Materialized( + name=mz2, + sanity_restart=False, + deploy_generation=i, + system_parameter_defaults=SYSTEM_PARAMETER_DEFAULTS, + restart="on-failure", + external_metadata_store=True, + ), + Testdrive( + materialize_url=f"postgres://materialize@{mz1}:6875", + materialize_url_internal=f"postgres://materialize@{mz1}:6877", + mz_service=mz1, + materialize_params={"cluster": "cluster"}, + no_consistency_checks=True, + no_reset=True, + seed=1, + default_timeout=DEFAULT_TIMEOUT, + ), + ): + c.up(mz2) + c.await_mz_deployment_status(DeploymentStatus.READY_TO_PROMOTE, mz2) + c.promote_mz(mz2) + c.await_mz_deployment_status(DeploymentStatus.IS_LEADER, mz2) + + i += 1 + mz1, mz2 = mz2, mz1 + + for thread in threads: + thread.join()