Skip to content

Commit

Permalink
DNM: Try to reproduce 0dt upsert source panic reproducibly
Browse files Browse the repository at this point in the history
  • Loading branch information
def- committed Dec 13, 2024
1 parent 058c3c1 commit 3ce979a
Showing 1 changed file with 134 additions and 0 deletions.
134 changes: 134 additions & 0 deletions test/0dt/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -1388,3 +1388,137 @@ def get_correction_metrics():
raise AssertionError(
f"unexpected correction metrics: {insertions=}, {deletions=}"
)


def workflow_upsert_sources(c: Composition) -> None:
c.down(destroy_volumes=True)
c.up("zookeeper", "kafka", "schema-registry", "postgres", "mysql", "mz_old")
c.up("testdrive", persistent=True)

c.sql(
"""
DROP CLUSTER IF EXISTS cluster CASCADE;
CREATE CLUSTER cluster SIZE '2-1';
GRANT ALL ON CLUSTER cluster TO materialize;
ALTER SYSTEM SET cluster = cluster;
ALTER SYSTEM SET max_sources = 100000;
""",
service="mz_old",
port=6877,
user="mz_system",
)

c.testdrive(
dedent(
"""
> SET CLUSTER = cluster;
> CREATE TABLE t (a int, b int);
> CREATE CONNECTION IF NOT EXISTS kafka_conn FOR KAFKA BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL = 'PLAINTEXT';
> CREATE CONNECTION IF NOT EXISTS csr_conn FOR CONFLUENT SCHEMA REGISTRY URL '${testdrive.schema-registry-url}';
"""
)
)

i = 1
mz1 = "mz_old"
mz2 = "mz_new"
while True:
with c.override(
Materialized(
name=mz2,
sanity_restart=False,
deploy_generation=i,
system_parameter_defaults=SYSTEM_PARAMETER_DEFAULTS,
restart="on-failure",
external_metadata_store=True,
),
Testdrive(
materialize_url=f"postgres://materialize@{mz1}:6875",
materialize_url_internal=f"postgres://materialize@{mz1}:6877",
mz_service=mz1,
materialize_params={"cluster": "cluster"},
no_reset=True,
seed=1,
default_timeout=DEFAULT_TIMEOUT,
),
):
c.up(mz2)

c.testdrive(
dedent(
f"""
$ kafka-create-topic topic=kafka{i}
$ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=kafka{i}
key1A,key1B:value1A,value1B
> CREATE SOURCE kafka_source{i}
IN CLUSTER cluster
FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-kafka{i}-${{testdrive.seed}}');
> CREATE TABLE kafka_source_tbl{i} (key1, key2, value1, value2)
FROM SOURCE kafka_source{i} (REFERENCE "testdrive-kafka{i}-${{testdrive.seed}}")
KEY FORMAT CSV WITH 2 COLUMNS DELIMITED BY ','
VALUE FORMAT CSV WITH 2 COLUMNS DELIMITED BY ','
ENVELOPE UPSERT;
> CREATE MATERIALIZED VIEW mv{i} AS SELECT * FROM kafka_source_tbl{i}
"""
)
)

for j in range(1, i + 1):
c.testdrive(
dedent(
f"""
$ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=kafka{j}
key{i}A,key{i}B:value2A,value2B
$ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=kafka{j}
key3A,key3B:value{i}A,value3B
> SELECT count(*) > 0 FROM kafka_source_tbl{j}
true
> SELECT count(*) > 0 FROM mv{i}
true
"""
)
)

c.await_mz_deployment_status(DeploymentStatus.READY_TO_PROMOTE, mz2)

with c.override(
Testdrive(
materialize_url=f"postgres://materialize@{mz2}:6875",
materialize_url_internal=f"postgres://materialize@{mz2}:6877",
mz_service=mz2,
materialize_params={"cluster": "cluster"},
no_reset=True,
seed=1,
default_timeout=DEFAULT_TIMEOUT,
),
):
for j in range(1, i + 1):
c.testdrive(
dedent(
f"""
$ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=kafka{j}
key4A,key4B:value4A,value4B
$ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=kafka{j}
key2A,key2B:value5A,value5B
$ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=kafka{j}
key3A,key3B:value6A,value6B
> SELECT count(*) > 0 FROM kafka_source_tbl{j}
true
> SELECT count(*) > 0 FROM mv{j}
true
"""
)
)

c.promote_mz(mz2)
c.await_mz_deployment_status(DeploymentStatus.IS_LEADER, mz2)

mz1, mz2 = mz2, mz1
i += 1

0 comments on commit 3ce979a

Please sign in to comment.