Skip to content

Commit

Permalink
Merge pull request #22440 from nrainer-materialize/scalability/determ…
Browse files Browse the repository at this point in the history
…ine-regression
  • Loading branch information
nrainer-materialize authored Oct 18, 2023
2 parents 3482be3 + 53da2bd commit 61cf877
Show file tree
Hide file tree
Showing 10 changed files with 365 additions and 24 deletions.
10 changes: 10 additions & 0 deletions misc/python/materialize/git.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,16 @@ def get_common_ancestor_commit(remote: str, branch: str, fetch_branch: bool) ->
return spawn.capture(command).strip()


def get_commit_message(commit_sha: str) -> str | None:
try:
command = ["git", "log", "-1", "--pretty=format:%s", commit_sha]
return spawn.capture(command).strip()
except subprocess.CalledProcessError:
# Sometimes mz_version() will report a Git SHA that is not available
# in the current repository
return None


# Work tree mutation


Expand Down
9 changes: 8 additions & 1 deletion misc/python/materialize/scalability/endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@


class Endpoint:

_name: str | None = None

def sql_connection(self) -> psycopg.connection.Connection[tuple[Any, ...]]:
conn = psycopg.connect(self.url())
conn.autocommit = True
Expand Down Expand Up @@ -44,8 +47,12 @@ def sql(self, sql: str) -> None:
cursor.execute(sql.encode("utf8"))

def name(self) -> str:
if self._name is not None:
return self._name

cursor = self.sql_connection().cursor()
cursor.execute(b"SELECT mz_version()")
row = cursor.fetchone()
assert row is not None
return str(row[0])
self._name = str(row[0])
return self._name
28 changes: 27 additions & 1 deletion misc/python/materialize/scalability/endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@

import pg8000

from materialize import git
from materialize.mzcompose.composition import Composition
from materialize.mzcompose.services.materialized import Materialized
from materialize.mzcompose.services.postgres import Postgres
from materialize.scalability.endpoint import Endpoint

POSTGRES_ENDPOINT_NAME = "postgres"


class MaterializeRemote(Endpoint):
"""Connect to a remote Materialize instance using a psql URL"""
Expand All @@ -27,6 +30,9 @@ def url(self) -> str:
def up(self) -> None:
pass

def __str__(self) -> str:
return f"MaterializeRemote ({self.materialize_url})"


class PostgresContainer(Endpoint):
def __init__(self, composition: Composition) -> None:
Expand Down Expand Up @@ -54,7 +60,10 @@ def up(self) -> None:
self._port = self.composition.default_port("postgres")

def name(self) -> str:
return "postgres"
return POSTGRES_ENDPOINT_NAME

def __str__(self) -> str:
return "PostgresContainer"


class MaterializeNonRemote(Endpoint):
Expand Down Expand Up @@ -92,6 +101,9 @@ def priv_port(self) -> int:
def up(self) -> None:
self.lift_limits()

def __str__(self) -> str:
return f"MaterializeLocal ({self.host()})"


class MaterializeContainer(MaterializeNonRemote):
def __init__(
Expand Down Expand Up @@ -140,3 +152,17 @@ def up_internal(self) -> None:
):
self.composition.up("materialized")
self._port = self.composition.default_port("materialized")

def __str__(self) -> str:
return f"MaterializeContainer ({self.image})"


def endpoint_name_to_description(endpoint_name: str) -> str:
if endpoint_name == POSTGRES_ENDPOINT_NAME:
return endpoint_name

commit_sha = endpoint_name.split(" ")[1].strip("()")

# empty when mz_version() reports a Git SHA that is not available in the current repository
commit_message = git.get_commit_message(commit_sha)
return f"{endpoint_name} - {commit_message}"
55 changes: 55 additions & 0 deletions misc/python/materialize/scalability/regression.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
from materialize.scalability.endpoint import Endpoint


class Regression:
def __init__(
self,
workload_name: str,
concurrency: int,
count: int,
tps: float,
tps_baseline: float,
tps_diff: float,
tps_diff_percent: float,
endpoint: Endpoint,
):
self.workload_name = workload_name
self.concurrency = concurrency
self.count = count
self.tps = tps
self.tps_baseline = tps_baseline
assert tps_diff < 0, "Not a regression!"
self.tps_diff = tps_diff
self.tps_diff_percent = tps_diff_percent
self.endpoint = endpoint

def __str__(self) -> str:
return (
f"Regression in workload '{self.workload_name}' at concurrency {self.concurrency} with {self.endpoint}:"
f" {round(self.tps, 2)} tps vs. {round(self.tps_baseline, 2)} tps"
f" ({round(self.tps_diff, 2)} tps; {round(100 * self.tps_diff_percent, 2)}%)"
)


class RegressionOutcome:
def __init__(
self,
):
self.regressions: list[Regression] = []

def has_regressions(self) -> bool:
return len(self.regressions) > 0

def __str__(self) -> str:
if not self.has_regressions():
return "No regressions"

return "\n".join(f"* {x}" for x in self.regressions)
42 changes: 42 additions & 0 deletions misc/python/materialize/scalability/result_analyzer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.


from __future__ import annotations

from materialize.scalability.endpoint import Endpoint
from materialize.scalability.regression import RegressionOutcome
from materialize.scalability.workload_result import WorkloadResult


class ResultAnalyzer:
def determine_regression(
self,
baseline_endpoint: Endpoint,
results_by_workload_name: dict[str, dict[Endpoint, WorkloadResult]],
) -> RegressionOutcome:
regression_outcome = RegressionOutcome()
for workload_name in results_by_workload_name.keys():
self.determine_regressions_in_workload(
regression_outcome,
baseline_endpoint,
workload_name,
results_by_workload_name[workload_name],
)

return regression_outcome

def determine_regressions_in_workload(
self,
regression_outcome: RegressionOutcome,
baseline_endpoint: Endpoint,
workload_name: str,
results_by_endpoint: dict[Endpoint, WorkloadResult],
) -> bool:
raise NotImplementedError
120 changes: 120 additions & 0 deletions misc/python/materialize/scalability/result_analyzers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.


import pandas as pd

from materialize.scalability.endpoint import Endpoint
from materialize.scalability.regression import Regression, RegressionOutcome
from materialize.scalability.result_analyzer import (
ResultAnalyzer,
)
from materialize.scalability.workload_result import WorkloadResult

COL_CONCURRENCY = "concurrency"
COL_COUNT = "count"
COL_TPS = "tps"
COL_TPS_DIFF = "tps_diff"
COL_TPS_DIFF_PERC = "tps_diff_perc"
COL_TPS_BASELINE = "tps_x"
COL_TPS_OTHER = "tps_y"


class DefaultResultAnalyzer(ResultAnalyzer):
def __init__(self, max_deviation_in_percent: float):
self.max_deviation_in_percent = max_deviation_in_percent

def determine_regressions_in_workload(
self,
regression_outcome: RegressionOutcome,
baseline_endpoint: Endpoint,
workload_name: str,
results_by_endpoint: dict[Endpoint, WorkloadResult],
) -> None:
count_endpoints = len(results_by_endpoint)

if count_endpoints <= 1:
raise RuntimeError("Cannot compute regressions with a single target")

if baseline_endpoint not in results_by_endpoint.keys():
raise RuntimeError("Regression baseline endpoint not in results!")

other_endpoints = list(results_by_endpoint.keys() - {baseline_endpoint})

for other_endpoint in other_endpoints:
self.determine_regression_in_workload(
regression_outcome,
workload_name,
baseline_endpoint,
other_endpoint,
results_by_endpoint[baseline_endpoint],
results_by_endpoint[other_endpoint],
)

def determine_regression_in_workload(
self,
regression_outcome: RegressionOutcome,
workload_name: str,
baseline_endpoint: Endpoint,
other_endpoint: Endpoint,
regression_baseline_result: WorkloadResult,
other_result: WorkloadResult,
) -> None:
# tps = transactions per seconds (higher is better)

columns_to_keep = [COL_COUNT, COL_CONCURRENCY, COL_TPS]
tps_per_endpoint = regression_baseline_result.df_totals[columns_to_keep].merge(
other_result.df_totals[columns_to_keep], on=[COL_COUNT, COL_CONCURRENCY]
)

tps_per_endpoint[COL_TPS_DIFF] = (
tps_per_endpoint[COL_TPS_OTHER] - tps_per_endpoint[COL_TPS_BASELINE]
)
tps_per_endpoint[COL_TPS_DIFF_PERC] = (
tps_per_endpoint[COL_TPS_DIFF] / tps_per_endpoint[COL_TPS_BASELINE]
)

entries_exceeding_threshold = tps_per_endpoint.loc[
# keep entries x% worse than the baseline
tps_per_endpoint[COL_TPS_DIFF_PERC] * (-1)
> self.max_deviation_in_percent
]

self.collect_regressions(
regression_outcome,
workload_name,
baseline_endpoint,
other_endpoint,
entries_exceeding_threshold,
)

def collect_regressions(
self,
regression_outcome: RegressionOutcome,
workload_name: str,
baseline_endpoint: Endpoint,
other_endpoint: Endpoint,
entries_exceeding_threshold: pd.DataFrame,
) -> None:
for index, row in entries_exceeding_threshold.iterrows():
regression = Regression(
workload_name,
concurrency=int(row[COL_CONCURRENCY]),
count=int(row[COL_COUNT]),
tps=row[COL_TPS_OTHER],
tps_baseline=row[COL_TPS_BASELINE],
tps_diff=row[COL_TPS_DIFF],
tps_diff_percent=row[COL_TPS_DIFF_PERC],
endpoint=other_endpoint,
)
regression_outcome.regressions.append(regression)


def row_count(data_frame: pd.DataFrame) -> int:
return len(data_frame.index)
21 changes: 21 additions & 0 deletions misc/python/materialize/scalability/workload_result.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

import pandas as pd

from materialize.scalability.workload import Workload


class WorkloadResult:
def __init__(
self, workload: Workload, df_totals: pd.DataFrame, df_details: pd.DataFrame
):
self.workload = workload
self.df_totals = df_totals
self.df_details = df_details
7 changes: 7 additions & 0 deletions test/scalability/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,13 @@ In both of those cases, Materialize, CRDB and Python will run within the same ma
./mzcompose run default --target=remote \--materialize-url="postgres://user:password@host:6875/materialize?sslmode=require" --cluster-name= ...
```

## Detecting regressions
A regression is defined as a deterioration in performance (transactions per seconds) of more than 10% for a given
workload and count factor compared to the baseline.

To detect a regression, add the parameter `--regression-against` and specify a target. The specified target will be
added to the `--target`s if it is not already present.

## Specifying the concurrencies to be benchmarked

The framework uses an exponential function to determine what concurrencies to test. By default, exponent base of 2 is used, with a default
Expand Down
Loading

0 comments on commit 61cf877

Please sign in to comment.