Skip to content

Commit

Permalink
Merge pull request MaterializeInc#29524 from def-/pr-pb-long
Browse files Browse the repository at this point in the history
parallel-benchmark: Long-running against QA Benchmarking Staging env
  • Loading branch information
def- authored Sep 27, 2024
2 parents 5275c46 + eb8e8be commit ce2551f
Show file tree
Hide file tree
Showing 8 changed files with 183 additions and 21 deletions.
1 change: 1 addition & 0 deletions bin/ci-builder
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ case "$cmd" in
# For cloud canary
--env REDPANDA_CLOUD_CLIENT_ID
--env REDPANDA_CLOUD_CLIENT_SECRET
--env QA_BENCHMARKING_APP_PASSWORD
)

if [[ $detach_container == "true" ]]; then
Expand Down
2 changes: 1 addition & 1 deletion ci/plugins/mzcompose/hooks/post-command
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ timeout 300 buildkite-agent artifact upload "$artifacts_str" || true
bin/ci-builder run stable bin/ci-annotate-errors --test-cmd="$(cat test_cmd)" --test-desc="$(cat test_desc)" "${artifacts[@]}" > ci-annotate-errors.log || CI_ANNOTATE_ERRORS_RESULT=$?
buildkite-agent artifact upload "ci-annotate-errors.log"

if [ ! -s services.log ] && [ "$BUILDKITE_LABEL" != "Maelstrom coverage of persist" ] && [ "$BUILDKITE_LABEL" != "Long single-node Maelstrom coverage of persist" ] && [ "$BUILDKITE_LABEL" != "Maelstrom coverage of txn-wal" ] && [ "$BUILDKITE_LABEL" != "Mz E2E Test" ] && [ "$BUILDKITE_LABEL" != "Output consistency (version for DFR)" ] && [ "$BUILDKITE_LABEL" != "Output consistency (version for CTF)" ] && [ "$BUILDKITE_LABEL" != "QA Canary Environment Base Load" ]; then
if [ ! -s services.log ] && [ "$BUILDKITE_LABEL" != "Maelstrom coverage of persist" ] && [ "$BUILDKITE_LABEL" != "Long single-node Maelstrom coverage of persist" ] && [ "$BUILDKITE_LABEL" != "Maelstrom coverage of txn-wal" ] && [ "$BUILDKITE_LABEL" != "Mz E2E Test" ] && [ "$BUILDKITE_LABEL" != "Output consistency (version for DFR)" ] && [ "$BUILDKITE_LABEL" != "Output consistency (version for CTF)" ] && [ "$BUILDKITE_LABEL" != "QA Canary Environment Base Load" ] && [ "$BUILDKITE_LABEL" != "Parallel Benchmark against QA Canary Environment" ]; then
echo "+++ services.log is empty, failing"
exit 1
fi
Expand Down
15 changes: 15 additions & 0 deletions ci/qa-canary/pipeline.template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,18 @@ steps:
args: ["--runtime=82800"] # 23 hours
agents:
queue: linux-aarch64-small

- id: remote-parallel-benchmark
label: "Parallel Benchmark against QA Benchmarking Staging Environment"
depends_on: build-aarch64
timeout_in_minutes: 1440 # 24 hours
concurrency: 1
concurrency_group: 'parallel-benchmark-canary'
agents:
queue: linux-aarch64-small
plugins:
- ./ci/plugins/mzcompose:
composition: parallel-benchmark
args:
- --benchmarking-env
- --scenario=StagingBench
8 changes: 7 additions & 1 deletion misc/python/materialize/parallel_benchmark/framework.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,10 @@ class Scenario:
def __init__(self, c: Composition, conn_infos: dict[str, PgConnInfo]):
raise NotImplementedError

@staticmethod
def enabled_by_default() -> bool:
return True

@classmethod
def name(cls) -> str:
return cls.__name__
Expand Down Expand Up @@ -364,7 +368,9 @@ def setup(self, c: Composition, conn_infos: dict[str, PgConnInfo]) -> None:
thread.start()
# Start threads and have them wait for work from a queue
for i in range(self.conn_pool_size):
self.conns.put(conn_info.connect())
conn = conn_info.connect()
conn.autocommit = True
self.conns.put(conn)

def run(
self,
Expand Down
73 changes: 73 additions & 0 deletions misc/python/materialize/parallel_benchmark/scenarios.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

from copy import deepcopy

from materialize.mzcompose.composition import Composition
from materialize.mzcompose.services.mysql import MySql
Expand Down Expand Up @@ -1086,3 +1087,75 @@ def __init__(self, c: Composition, conn_infos: dict[str, PgConnInfo]):
),
]
)


class StagingBench(Scenario):
# TODO: Reenable queries other than SELECT 1
# TODO: Kafka source + sink
# TODO: Webhook source
@staticmethod
def enabled_by_default() -> bool:
return False

def __init__(self, c: Composition, conn_infos: dict[str, PgConnInfo]):
conn_infos = deepcopy(conn_infos)
conn_infos["materialized"].cluster = "quickstart"
self.init(
[
LoadPhase(
duration=82800,
actions=[
OpenLoop(
action=PooledQuery(
"SELECT 1", conn_info=conn_infos["materialized"]
),
dist=Periodic(per_second=500),
),
# TODO: Reenable when database-issues#5511 is fixed
# ClosedLoop(
# action=ReuseConnQuery(
# "SELECT COUNT(DISTINCT l_returnflag) FROM qa_canary_environment.public_tpch.tpch_q01 WHERE sum_charge > 0",
# conn_info=conn_infos["materialized"],
# ),
# ),
# ClosedLoop(
# action=ReuseConnQuery(
# "SELECT COUNT(DISTINCT c_name) FROM qa_canary_environment.public_tpch.tpch_q18 WHERE o_orderdate <= '2023-01-01'",
# conn_info=conn_infos["materialized"],
# ),
# ),
# ClosedLoop(
# action=ReuseConnQuery(
# "SELECT COUNT(DISTINCT a_name) FROM qa_canary_environment.public_pg_cdc.wmr WHERE degree > 1",
# conn_info=conn_infos["materialized"],
# ),
# ),
# ClosedLoop(
# action=ReuseConnQuery(
# "SELECT COUNT(DISTINCT a_name) FROM qa_canary_environment.public_mysql_cdc.mysql_wmr WHERE degree > 1",
# conn_info=conn_infos["materialized"],
# ),
# ),
# ClosedLoop(
# action=ReuseConnQuery(
# "SELECT COUNT(DISTINCT count_star) FROM qa_canary_environment.public_loadgen.sales_product_product_category WHERE count_distinct_product_id > 0",
# conn_info=conn_infos["materialized"],
# ),
# ),
# ClosedLoop(
# action=ReuseConnQuery(
# "SELECT * FROM qa_canary_environment.public_table.table_mv",
# conn_info=conn_infos["materialized"],
# ),
# ),
# ClosedLoop(
# action=ReuseConnQuery(
# "SELECT min(c), max(c), count(*) FROM qa_canary_environment.public_table.table",
# conn_info=conn_infos["materialized"],
# ),
# ),
],
),
],
conn_pool_size=100,
)
19 changes: 17 additions & 2 deletions misc/python/materialize/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from pathlib import Path
from threading import Thread
from typing import Protocol, TypeVar
from urllib.parse import parse_qs, unquote, urlparse
from urllib.parse import parse_qs, quote, unquote, urlparse

import psycopg
import xxhash
Expand Down Expand Up @@ -157,16 +157,31 @@ class PgConnInfo:
database: str
password: str | None = None
ssl: bool = False
cluster: str | None = None
autocommit: bool = False

def connect(self) -> psycopg.Connection:
return psycopg.connect(
conn = psycopg.connect(
host=self.host,
port=self.port,
user=self.user,
password=self.password,
dbname=self.database,
sslmode="require" if self.ssl else None,
)
if self.autocommit:
conn.autocommit = True
if self.cluster:
with conn.cursor() as cur:
cur.execute(f"SET cluster = {self.cluster}".encode())
return conn

def to_conn_string(self) -> str:
return (
f"postgres://{quote(self.user)}:{quote(self.password)}@{self.host}:{self.port}/{quote(self.database)}"
if self.password
else f"postgres://{quote(self.user)}@{self.host}:{self.port}/{quote(self.database)}"
)


def parse_pg_conn_string(conn_string: str) -> PgConnInfo:
Expand Down
4 changes: 2 additions & 2 deletions test/canary-load/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ def validate_updated_data(c: Composition, i: int) -> None:
c.testdrive(
dedent(
f"""
# TODO: Reenable when materialize#18645 is fixed
# TODO: Reenable when database-issues#5511 is fixed
# > SELECT COUNT(DISTINCT l_returnflag) FROM qa_canary_environment.public_tpch.tpch_q01 WHERE sum_charge < 0
# 0
Expand Down Expand Up @@ -333,7 +333,7 @@ def validate_data_through_http_connection(
result = http_sql_query(host, "SELECT 1", token)
assert result == [["1"]]

# TODO: Reenable when materialize#18645 is fixed
# TODO: Reenable when database-issues#5511 is fixed
# result = http_sql_query(
# host,
# "SELECT COUNT(DISTINCT l_returnflag) FROM qa_canary_environment.public_tpch.tpch_q01 WHERE sum_charge < 0",
Expand Down
82 changes: 67 additions & 15 deletions test/parallel-benchmark/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from matplotlib.markers import MarkerStyle

from materialize import MZ_ROOT, buildkite
from materialize.mz_env_util import get_cloud_hostname
from materialize.mzcompose import ADDITIONAL_BENCHMARKING_SYSTEM_PARAMETERS
from materialize.mzcompose.composition import Composition, WorkflowArgumentParser
from materialize.mzcompose.services.balancerd import Balancerd
Expand Down Expand Up @@ -183,7 +184,7 @@ def report(
times: list[float] = [x.timestamp - start_time for x in m]
durations: list[float] = [x.duration * 1000 for x in m]
stats[key] = Statistics(times, durations)
plt.scatter(times, durations, label=key, marker=MarkerStyle("+"))
plt.scatter(times, durations, label=key[:60], marker=MarkerStyle("+"))
print(f"Statistics for {key}:\n{stats[key]}")
if key in scenario.guarantees and guarantees:
for stat, guarantee in scenario.guarantees[key].items():
Expand Down Expand Up @@ -253,19 +254,57 @@ def run_once(

overrides = []

if args.mz_url:
if args.benchmarking_env:
assert not args.mz_url
assert not args.canary_env
region = "aws/us-east-1"
environment = os.getenv("ENVIRONMENT", "staging")
app_password = os.environ["QA_BENCHMARKING_APP_PASSWORD"]

target = PgConnInfo(
user="qabenchmarking",
password=app_password,
database="materialize",
# Service accounts can't use mz
host="4pe2w4etmpsnwx1iizersezg7.lb.us-east-1.aws.staging.materialize.cloud",
# host=get_cloud_hostname(
# c, region=region, environment=environment, app_password=app_password
# ),
port=6875,
ssl=True,
)
elif args.canary_env:
assert not args.mz_url
assert not args.benchmarking_env
region = "aws/us-east-1"
environment = os.getenv("ENVIRONMENT", "production")
app_password = os.environ["CANARY_LOADTEST_APP_PASSWORD"]

target = PgConnInfo(
user=os.getenv(
"CANARY_LOADTEST_USERNAME", "[email protected]"
),
password=app_password,
database="materialize",
host=get_cloud_hostname(
c, region=region, environment=environment, app_password=app_password
),
port=6875,
ssl=True,
)
elif args.mz_url:
overrides = [
Testdrive(
no_reset=True,
materialize_url=args.mz_url,
no_consistency_checks=True,
)
]
target = parse_pg_conn_string(args.mz_url)
else:
mz_image = f"materialize/materialized:{tag}" if tag else None
overrides = [
Materialized(
image=mz_image,
image=f"materialize/materialized:{tag}" if tag else None,
default_size=args.size,
soft_assertions=False,
external_cockroach=True,
Expand All @@ -275,16 +314,13 @@ def run_once(
| {"max_connections": "100000"},
)
]
target = None

c.silent = True

with c.override(*overrides):
for scenario_class in scenarios:
scenario_name = scenario_class.name()
print(f"--- Running scenario {scenario_name}")

if args.mz_url:
target = parse_pg_conn_string(args.mz_url)
if target:
c.up("testdrive", persistent=True)
conn_infos = {"materialized": target}
conn = target.connect()
Expand All @@ -294,6 +330,7 @@ def run_once(
conn.close()
mz_string = f"{mz_version} ({target.host})"
else:
print("~~~ Starting up services")
c.up(*service_names)
c.up("testdrive", persistent=True)

Expand Down Expand Up @@ -321,6 +358,8 @@ def run_once(
),
}

scenario_name = scenario_class.name()
print(f"--- Running scenario {scenario_name}")
state = State(
measurements=defaultdict(list),
load_phase_duration=args.load_phase_duration,
Expand All @@ -331,8 +370,9 @@ def run_once(
start_time = time.time()
Path(MZ_ROOT / "plots").mkdir(parents=True, exist_ok=True)
try:
# Don't let the garbage collector interfere with our measurements
gc.disable()
if not args.benchmarking_env:
# Don't let the garbage collector interfere with our measurements
gc.disable()
scenario.run(c, state)
scenario.teardown()
gc.collect()
Expand All @@ -349,7 +389,10 @@ def run_once(
failures.extend(new_failures)
stats[scenario] = new_stats

if not args.mz_url:
if not target:
print(
"~~~ Resetting materialized to prevent interference between scenarios"
)
c.kill("cockroach", "materialized", "testdrive")
c.rm("cockroach", "materialized", "testdrive")
c.rm_volumes("mzdata")
Expand Down Expand Up @@ -571,6 +614,18 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:

parser.add_argument("--mz-url", type=str, help="Remote Mz instance to run against")

parser.add_argument(
"--canary-env",
action="store_true",
help="Run against QA Canary production environment",
)

parser.add_argument(
"--benchmarking-env",
action="store_true",
help="Run against QA Benchmarking staging environment",
)

args = parser.parse_args()

if args.scenario:
Expand Down Expand Up @@ -624,8 +679,5 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
raise FailedTestExecutionError(errors=failures)


# TODO: 24 hour runs (also against real staging with sources, similar to QA canary)
# Set up remote sources, share with QA canary pipeline, use concurrency group, start first 24 hour runs
# Maybe also set up the real rr-bench there as a separate step
# TODO: Choose an existing cluster name (for remote mz)
# TODO: Measure Memory?

0 comments on commit ce2551f

Please sign in to comment.