diff --git a/bin/ci-builder b/bin/ci-builder index a6ea49349539d..ae69adf063a1b 100755 --- a/bin/ci-builder +++ b/bin/ci-builder @@ -243,6 +243,7 @@ case "$cmd" in # For cloud canary --env REDPANDA_CLOUD_CLIENT_ID --env REDPANDA_CLOUD_CLIENT_SECRET + --env QA_BENCHMARKING_APP_PASSWORD ) if [[ $detach_container == "true" ]]; then diff --git a/ci/plugins/mzcompose/hooks/post-command b/ci/plugins/mzcompose/hooks/post-command index 62fd29929969b..0057cb88d0e84 100755 --- a/ci/plugins/mzcompose/hooks/post-command +++ b/ci/plugins/mzcompose/hooks/post-command @@ -85,7 +85,7 @@ timeout 300 buildkite-agent artifact upload "$artifacts_str" || true bin/ci-builder run stable bin/ci-annotate-errors --test-cmd="$(cat test_cmd)" --test-desc="$(cat test_desc)" "${artifacts[@]}" > ci-annotate-errors.log || CI_ANNOTATE_ERRORS_RESULT=$? buildkite-agent artifact upload "ci-annotate-errors.log" -if [ ! -s services.log ] && [ "$BUILDKITE_LABEL" != "Maelstrom coverage of persist" ] && [ "$BUILDKITE_LABEL" != "Long single-node Maelstrom coverage of persist" ] && [ "$BUILDKITE_LABEL" != "Maelstrom coverage of txn-wal" ] && [ "$BUILDKITE_LABEL" != "Mz E2E Test" ] && [ "$BUILDKITE_LABEL" != "Output consistency (version for DFR)" ] && [ "$BUILDKITE_LABEL" != "Output consistency (version for CTF)" ] && [ "$BUILDKITE_LABEL" != "QA Canary Environment Base Load" ]; then +if [ ! -s services.log ] && [ "$BUILDKITE_LABEL" != "Maelstrom coverage of persist" ] && [ "$BUILDKITE_LABEL" != "Long single-node Maelstrom coverage of persist" ] && [ "$BUILDKITE_LABEL" != "Maelstrom coverage of txn-wal" ] && [ "$BUILDKITE_LABEL" != "Mz E2E Test" ] && [ "$BUILDKITE_LABEL" != "Output consistency (version for DFR)" ] && [ "$BUILDKITE_LABEL" != "Output consistency (version for CTF)" ] && [ "$BUILDKITE_LABEL" != "QA Canary Environment Base Load" ] && [ "$BUILDKITE_LABEL" != "Parallel Benchmark against QA Canary Environment" ]; then echo "+++ services.log is empty, failing" exit 1 fi diff --git a/ci/qa-canary/pipeline.template.yml b/ci/qa-canary/pipeline.template.yml index 18ffb33e5210f..87e13ca044e12 100644 --- a/ci/qa-canary/pipeline.template.yml +++ b/ci/qa-canary/pipeline.template.yml @@ -38,3 +38,18 @@ steps: args: ["--runtime=82800"] # 23 hours agents: queue: linux-aarch64-small + + - id: remote-parallel-benchmark + label: "Parallel Benchmark against QA Benchmarking Staging Environment" + depends_on: build-aarch64 + timeout_in_minutes: 1440 # 24 hours + concurrency: 1 + concurrency_group: 'parallel-benchmark-canary' + agents: + queue: linux-aarch64-small + plugins: + - ./ci/plugins/mzcompose: + composition: parallel-benchmark + args: + - --benchmarking-env + - --scenario=StagingBench diff --git a/misc/python/materialize/parallel_benchmark/framework.py b/misc/python/materialize/parallel_benchmark/framework.py index 770d8e81f7a58..416ca81009dfe 100644 --- a/misc/python/materialize/parallel_benchmark/framework.py +++ b/misc/python/materialize/parallel_benchmark/framework.py @@ -334,6 +334,10 @@ class Scenario: def __init__(self, c: Composition, conn_infos: dict[str, PgConnInfo]): raise NotImplementedError + @staticmethod + def enabled_by_default() -> bool: + return True + @classmethod def name(cls) -> str: return cls.__name__ @@ -364,7 +368,9 @@ def setup(self, c: Composition, conn_infos: dict[str, PgConnInfo]) -> None: thread.start() # Start threads and have them wait for work from a queue for i in range(self.conn_pool_size): - self.conns.put(conn_info.connect()) + conn = conn_info.connect() + conn.autocommit = True + self.conns.put(conn) def run( self, diff --git a/misc/python/materialize/parallel_benchmark/scenarios.py b/misc/python/materialize/parallel_benchmark/scenarios.py index b93f269cc2f27..68167e3a35ceb 100644 --- a/misc/python/materialize/parallel_benchmark/scenarios.py +++ b/misc/python/materialize/parallel_benchmark/scenarios.py @@ -7,6 +7,7 @@ # the Business Source License, use of this software will be governed # by the Apache License, Version 2.0. +from copy import deepcopy from materialize.mzcompose.composition import Composition from materialize.mzcompose.services.mysql import MySql @@ -1086,3 +1087,75 @@ def __init__(self, c: Composition, conn_infos: dict[str, PgConnInfo]): ), ] ) + + +class StagingBench(Scenario): + # TODO: Reenable queries other than SELECT 1 + # TODO: Kafka source + sink + # TODO: Webhook source + @staticmethod + def enabled_by_default() -> bool: + return False + + def __init__(self, c: Composition, conn_infos: dict[str, PgConnInfo]): + conn_infos = deepcopy(conn_infos) + conn_infos["materialized"].cluster = "quickstart" + self.init( + [ + LoadPhase( + duration=82800, + actions=[ + OpenLoop( + action=PooledQuery( + "SELECT 1", conn_info=conn_infos["materialized"] + ), + dist=Periodic(per_second=500), + ), + # TODO: Reenable when database-issues#5511 is fixed + # ClosedLoop( + # action=ReuseConnQuery( + # "SELECT COUNT(DISTINCT l_returnflag) FROM qa_canary_environment.public_tpch.tpch_q01 WHERE sum_charge > 0", + # conn_info=conn_infos["materialized"], + # ), + # ), + # ClosedLoop( + # action=ReuseConnQuery( + # "SELECT COUNT(DISTINCT c_name) FROM qa_canary_environment.public_tpch.tpch_q18 WHERE o_orderdate <= '2023-01-01'", + # conn_info=conn_infos["materialized"], + # ), + # ), + # ClosedLoop( + # action=ReuseConnQuery( + # "SELECT COUNT(DISTINCT a_name) FROM qa_canary_environment.public_pg_cdc.wmr WHERE degree > 1", + # conn_info=conn_infos["materialized"], + # ), + # ), + # ClosedLoop( + # action=ReuseConnQuery( + # "SELECT COUNT(DISTINCT a_name) FROM qa_canary_environment.public_mysql_cdc.mysql_wmr WHERE degree > 1", + # conn_info=conn_infos["materialized"], + # ), + # ), + # ClosedLoop( + # action=ReuseConnQuery( + # "SELECT COUNT(DISTINCT count_star) FROM qa_canary_environment.public_loadgen.sales_product_product_category WHERE count_distinct_product_id > 0", + # conn_info=conn_infos["materialized"], + # ), + # ), + # ClosedLoop( + # action=ReuseConnQuery( + # "SELECT * FROM qa_canary_environment.public_table.table_mv", + # conn_info=conn_infos["materialized"], + # ), + # ), + # ClosedLoop( + # action=ReuseConnQuery( + # "SELECT min(c), max(c), count(*) FROM qa_canary_environment.public_table.table", + # conn_info=conn_infos["materialized"], + # ), + # ), + ], + ), + ], + conn_pool_size=100, + ) diff --git a/misc/python/materialize/util.py b/misc/python/materialize/util.py index 09cbbadc7f9b6..2bc3f286f77ee 100644 --- a/misc/python/materialize/util.py +++ b/misc/python/materialize/util.py @@ -24,7 +24,7 @@ from pathlib import Path from threading import Thread from typing import Protocol, TypeVar -from urllib.parse import parse_qs, unquote, urlparse +from urllib.parse import parse_qs, quote, unquote, urlparse import psycopg import xxhash @@ -157,9 +157,11 @@ class PgConnInfo: database: str password: str | None = None ssl: bool = False + cluster: str | None = None + autocommit: bool = False def connect(self) -> psycopg.Connection: - return psycopg.connect( + conn = psycopg.connect( host=self.host, port=self.port, user=self.user, @@ -167,6 +169,19 @@ def connect(self) -> psycopg.Connection: dbname=self.database, sslmode="require" if self.ssl else None, ) + if self.autocommit: + conn.autocommit = True + if self.cluster: + with conn.cursor() as cur: + cur.execute(f"SET cluster = {self.cluster}".encode()) + return conn + + def to_conn_string(self) -> str: + return ( + f"postgres://{quote(self.user)}:{quote(self.password)}@{self.host}:{self.port}/{quote(self.database)}" + if self.password + else f"postgres://{quote(self.user)}@{self.host}:{self.port}/{quote(self.database)}" + ) def parse_pg_conn_string(conn_string: str) -> PgConnInfo: diff --git a/test/canary-load/mzcompose.py b/test/canary-load/mzcompose.py index ec46bdc918268..8471b50773d1f 100644 --- a/test/canary-load/mzcompose.py +++ b/test/canary-load/mzcompose.py @@ -261,7 +261,7 @@ def validate_updated_data(c: Composition, i: int) -> None: c.testdrive( dedent( f""" - # TODO: Reenable when materialize#18645 is fixed + # TODO: Reenable when database-issues#5511 is fixed # > SELECT COUNT(DISTINCT l_returnflag) FROM qa_canary_environment.public_tpch.tpch_q01 WHERE sum_charge < 0 # 0 @@ -333,7 +333,7 @@ def validate_data_through_http_connection( result = http_sql_query(host, "SELECT 1", token) assert result == [["1"]] - # TODO: Reenable when materialize#18645 is fixed + # TODO: Reenable when database-issues#5511 is fixed # result = http_sql_query( # host, # "SELECT COUNT(DISTINCT l_returnflag) FROM qa_canary_environment.public_tpch.tpch_q01 WHERE sum_charge < 0", diff --git a/test/parallel-benchmark/mzcompose.py b/test/parallel-benchmark/mzcompose.py index 82cef5430571a..4711946ebb252 100644 --- a/test/parallel-benchmark/mzcompose.py +++ b/test/parallel-benchmark/mzcompose.py @@ -23,6 +23,7 @@ from matplotlib.markers import MarkerStyle from materialize import MZ_ROOT, buildkite +from materialize.mz_env_util import get_cloud_hostname from materialize.mzcompose import ADDITIONAL_BENCHMARKING_SYSTEM_PARAMETERS from materialize.mzcompose.composition import Composition, WorkflowArgumentParser from materialize.mzcompose.services.balancerd import Balancerd @@ -183,7 +184,7 @@ def report( times: list[float] = [x.timestamp - start_time for x in m] durations: list[float] = [x.duration * 1000 for x in m] stats[key] = Statistics(times, durations) - plt.scatter(times, durations, label=key, marker=MarkerStyle("+")) + plt.scatter(times, durations, label=key[:60], marker=MarkerStyle("+")) print(f"Statistics for {key}:\n{stats[key]}") if key in scenario.guarantees and guarantees: for stat, guarantee in scenario.guarantees[key].items(): @@ -253,7 +254,45 @@ def run_once( overrides = [] - if args.mz_url: + if args.benchmarking_env: + assert not args.mz_url + assert not args.canary_env + region = "aws/us-east-1" + environment = os.getenv("ENVIRONMENT", "staging") + app_password = os.environ["QA_BENCHMARKING_APP_PASSWORD"] + + target = PgConnInfo( + user="qabenchmarking", + password=app_password, + database="materialize", + # Service accounts can't use mz + host="4pe2w4etmpsnwx1iizersezg7.lb.us-east-1.aws.staging.materialize.cloud", + # host=get_cloud_hostname( + # c, region=region, environment=environment, app_password=app_password + # ), + port=6875, + ssl=True, + ) + elif args.canary_env: + assert not args.mz_url + assert not args.benchmarking_env + region = "aws/us-east-1" + environment = os.getenv("ENVIRONMENT", "production") + app_password = os.environ["CANARY_LOADTEST_APP_PASSWORD"] + + target = PgConnInfo( + user=os.getenv( + "CANARY_LOADTEST_USERNAME", "infra+qacanaryload@materialize.io" + ), + password=app_password, + database="materialize", + host=get_cloud_hostname( + c, region=region, environment=environment, app_password=app_password + ), + port=6875, + ssl=True, + ) + elif args.mz_url: overrides = [ Testdrive( no_reset=True, @@ -261,11 +300,11 @@ def run_once( no_consistency_checks=True, ) ] + target = parse_pg_conn_string(args.mz_url) else: - mz_image = f"materialize/materialized:{tag}" if tag else None overrides = [ Materialized( - image=mz_image, + image=f"materialize/materialized:{tag}" if tag else None, default_size=args.size, soft_assertions=False, external_cockroach=True, @@ -275,16 +314,13 @@ def run_once( | {"max_connections": "100000"}, ) ] + target = None c.silent = True with c.override(*overrides): for scenario_class in scenarios: - scenario_name = scenario_class.name() - print(f"--- Running scenario {scenario_name}") - - if args.mz_url: - target = parse_pg_conn_string(args.mz_url) + if target: c.up("testdrive", persistent=True) conn_infos = {"materialized": target} conn = target.connect() @@ -294,6 +330,7 @@ def run_once( conn.close() mz_string = f"{mz_version} ({target.host})" else: + print("~~~ Starting up services") c.up(*service_names) c.up("testdrive", persistent=True) @@ -321,6 +358,8 @@ def run_once( ), } + scenario_name = scenario_class.name() + print(f"--- Running scenario {scenario_name}") state = State( measurements=defaultdict(list), load_phase_duration=args.load_phase_duration, @@ -331,8 +370,9 @@ def run_once( start_time = time.time() Path(MZ_ROOT / "plots").mkdir(parents=True, exist_ok=True) try: - # Don't let the garbage collector interfere with our measurements - gc.disable() + if not args.benchmarking_env: + # Don't let the garbage collector interfere with our measurements + gc.disable() scenario.run(c, state) scenario.teardown() gc.collect() @@ -349,7 +389,10 @@ def run_once( failures.extend(new_failures) stats[scenario] = new_stats - if not args.mz_url: + if not target: + print( + "~~~ Resetting materialized to prevent interference between scenarios" + ) c.kill("cockroach", "materialized", "testdrive") c.rm("cockroach", "materialized", "testdrive") c.rm_volumes("mzdata") @@ -571,6 +614,18 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: parser.add_argument("--mz-url", type=str, help="Remote Mz instance to run against") + parser.add_argument( + "--canary-env", + action="store_true", + help="Run against QA Canary production environment", + ) + + parser.add_argument( + "--benchmarking-env", + action="store_true", + help="Run against QA Benchmarking staging environment", + ) + args = parser.parse_args() if args.scenario: @@ -624,8 +679,5 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: raise FailedTestExecutionError(errors=failures) -# TODO: 24 hour runs (also against real staging with sources, similar to QA canary) -# Set up remote sources, share with QA canary pipeline, use concurrency group, start first 24 hour runs -# Maybe also set up the real rr-bench there as a separate step # TODO: Choose an existing cluster name (for remote mz) # TODO: Measure Memory?