From d1ac42c4f70ee46c17b1e91d9375964397ba932e Mon Sep 17 00:00:00 2001 From: nrainer-materialize <129728240+nrainer-materialize@users.noreply.github.com> Date: Wed, 18 Oct 2023 16:45:25 +0200 Subject: [PATCH] ci: compare benchmarks against common-ancestor (#22355) --- ci/nightly/pipeline.template.yml | 5 +- misc/buildkite/git.bash | 6 +- misc/python/materialize/benchmark_utils.py | 23 +++++++ misc/python/materialize/buildkite.py | 48 ++++++++++++++ .../materialize/cli/ci_coverage_pr_report.py | 16 +---- misc/python/materialize/git.py | 47 +++++++++++-- .../materialize/mzcompose/composition.py | 23 ++++++- .../materialize/scalability/endpoints.py | 30 ++++++++- misc/python/materialize/version_list.py | 4 +- test/feature-benchmark/mzcompose.py | 66 +++++++++++++------ test/scalability/mzcompose.py | 10 ++- 11 files changed, 225 insertions(+), 53 deletions(-) create mode 100644 misc/python/materialize/benchmark_utils.py create mode 100644 misc/python/materialize/buildkite.py diff --git a/ci/nightly/pipeline.template.yml b/ci/nightly/pipeline.template.yml index 334a8ef57ed49..caf595fcb00cf 100644 --- a/ci/nightly/pipeline.template.yml +++ b/ci/nightly/pipeline.template.yml @@ -39,7 +39,7 @@ steps: queue: builder-linux-x86_64 - id: feature-benchmark - label: "Feature benchmark against 'latest'" + label: "Feature benchmark against merge base or 'latest'" timeout_in_minutes: 360 agents: queue: linux-x86_64-large @@ -49,7 +49,8 @@ steps: composition: feature-benchmark args: - --other-tag - - latest + # common-ancestor will default to latest if not in a PR + - common-ancestor - id: kafka-matrix label: Kafka smoke test against previous Kafka versions diff --git a/misc/buildkite/git.bash b/misc/buildkite/git.bash index 1ca6aa059fad6..52ff0fe209b2a 100755 --- a/misc/buildkite/git.bash +++ b/misc/buildkite/git.bash @@ -16,8 +16,8 @@ set -euo pipefail export BUILDKITE_REPO_REF="${BUILDKITE_REPO_REF:-origin}" export BUILDKITE_PULL_REQUEST_BASE_BRANCH="${BUILDKITE_PULL_REQUEST_BASE_BRANCH:-main}" -configure_git_user() { - if [[ "$BUILDKITE" == "true" ]]; then +configure_git_user_if_in_buildkite() { + if [[ "${BUILDKITE:-}" == "true" ]]; then ci_collapsed_heading "Configure git" run git config --global user.email "buildkite@materialize.com" run git config --global user.name "Buildkite" @@ -30,7 +30,7 @@ fetch_pr_target_branch() { } merge_pr_target_branch() { - configure_git_user + configure_git_user_if_in_buildkite ci_collapsed_heading "Merge target branch" run git merge "$BUILDKITE_REPO_REF"/"$BUILDKITE_PULL_REQUEST_BASE_BRANCH" --message "Merge" diff --git a/misc/python/materialize/benchmark_utils.py b/misc/python/materialize/benchmark_utils.py new file mode 100644 index 0000000000000..1ab50520be319 --- /dev/null +++ b/misc/python/materialize/benchmark_utils.py @@ -0,0 +1,23 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +"""Benchmark utilities.""" + +from materialize import buildkite + + +def resolve_tag_of_common_ancestor(tag_when_on_default_branch: str = "latest") -> str: + if buildkite.is_on_default_branch(): + print(f"On default branch, using {tag_when_on_default_branch} as tag") + return tag_when_on_default_branch + else: + commit_hash = buildkite.get_merge_base() + tag = f"unstable-{commit_hash}" + print(f"Resolved common-ancestor to {tag} (commit: {commit_hash})") + return tag diff --git a/misc/python/materialize/buildkite.py b/misc/python/materialize/buildkite.py new file mode 100644 index 0000000000000..53b05ae6ed7c7 --- /dev/null +++ b/misc/python/materialize/buildkite.py @@ -0,0 +1,48 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +"""Buildkite utilities.""" + +import os + +from materialize import git + + +def is_in_buildkite() -> bool: + return os.getenv("BUILDKITE", "false") == "true" + + +def is_in_pull_request() -> bool: + """ + Note that this does not work in (manually triggered) nightly builds because they don't carry this information! + Consider using #is_on_default_branch() instead. + """ + return os.getenv("BUILDKITE_PULL_REQUEST", "false") != "false" + + +def is_on_default_branch() -> bool: + current_branch = os.getenv("BUILDKITE_BRANCH", "unknown") + default_branch = os.getenv("BUILDKITE_PIPELINE_DEFAULT_BRANCH", "main") + return current_branch == default_branch + + +def get_pull_request_base_branch(fallback: str = "main"): + return os.getenv("BUILDKITE_PULL_REQUEST_BASE_BRANCH", fallback) + + +def get_pipeline_default_branch(fallback: str = "main"): + return os.getenv("BUILDKITE_PIPELINE_DEFAULT_BRANCH", fallback) + + +def get_merge_base(remote="origin") -> str: + base_branch = get_pull_request_base_branch() or get_pipeline_default_branch() + merge_base = git.get_common_ancestor_commit( + remote, branch=base_branch, fetch_branch=True + ) + return merge_base diff --git a/misc/python/materialize/cli/ci_coverage_pr_report.py b/misc/python/materialize/cli/ci_coverage_pr_report.py index c7d8354dd14a5..a3f937378854c 100755 --- a/misc/python/materialize/cli/ci_coverage_pr_report.py +++ b/misc/python/materialize/cli/ci_coverage_pr_report.py @@ -16,7 +16,7 @@ import junit_xml -from materialize import MZ_ROOT, ci_util +from materialize import MZ_ROOT, buildkite, ci_util # - None value indicates that this line is interesting, but we don't know yet # if it can actually be covered. @@ -55,18 +55,8 @@ def find_modified_lines() -> Coverage: """ Find each line that has been added or modified in the current pull request. """ - base_branch = os.getenv("BUILDKITE_PULL_REQUEST_BASE_BRANCH", "main") or os.getenv( - "BUILDKITE_PIPELINE_DEFAULT_BRANCH", "main" - ) - # Make sure we have the latest state to correctly identify the merge base - subprocess.run(["git", "fetch", "origin", base_branch], check=True) - result = subprocess.run( - ["git", "merge-base", "HEAD", f"origin/{base_branch}"], - check=True, - capture_output=True, - ) - merge_base = result.stdout.strip() - print(f"Merge base: {merge_base.decode('utf-8')}") + merge_base = buildkite.get_merge_base() + print(f"Merge base: {merge_base}") result = subprocess.run( ["git", "diff", "-U0", merge_base], check=True, capture_output=True ) diff --git a/misc/python/materialize/git.py b/misc/python/materialize/git.py index 0b429902027e3..f44cdc7e256e6 100644 --- a/misc/python/materialize/git.py +++ b/misc/python/materialize/git.py @@ -95,7 +95,7 @@ def get_version_tags(*, fetch: bool = True, prefix: str = "v") -> list[Version]: tag as a version. """ if fetch: - _fetch() + _fetch(all_remotes=True, include_tags=True, force=True) tags = [] for t in spawn.capture(["git", "tag"]).splitlines(): if not t.startswith(prefix): @@ -139,13 +139,52 @@ def describe() -> str: return spawn.capture(["git", "describe"]).strip() -def fetch() -> str: - """Fetch from all configured default fetch remotes""" - return spawn.capture(["git", "fetch", "--all", "--tags", "--force"]).strip() +def fetch( + remote: str | None = None, + all_remotes: bool = False, + include_tags: bool = False, + force: bool = False, + branch: str | None = None, +) -> str: + """Fetch from remotes""" + + if remote and all_remotes: + raise RuntimeError("all_remotes must be false when a remote is specified") + + if branch and not remote: + raise RuntimeError("remote must be specified when a branch is specified") + + command = ["git", "fetch"] + + if remote: + command.append(remote) + + if branch: + command.append(branch) + + if all_remotes: + command.append("--all") + + if include_tags: + command.append("--tags") + + if force: + command.append("--force") + + return spawn.capture(command).strip() _fetch = fetch # renamed because an argument shadows the fetch name in get_tags + +def get_common_ancestor_commit(remote: str, branch: str, fetch_branch: bool) -> str: + if fetch_branch: + fetch(remote=remote, branch=branch) + + command = ["git", "merge-base", "HEAD", f"{remote}/{branch}"] + return spawn.capture(command).strip() + + # Work tree mutation diff --git a/misc/python/materialize/mzcompose/composition.py b/misc/python/materialize/mzcompose/composition.py index 98a5518773558..5205b100fc4e5 100644 --- a/misc/python/materialize/mzcompose/composition.py +++ b/misc/python/materialize/mzcompose/composition.py @@ -625,7 +625,7 @@ def exec( ) def pull_if_variable(self, services: list[str], max_tries: int = 2) -> None: - """Pull fresh service images in case the tag indicates thee underlying image may change over time. + """Pull fresh service images in case the tag indicates the underlying image may change over time. Args: services: List of service names @@ -633,10 +633,27 @@ def pull_if_variable(self, services: list[str], max_tries: int = 2) -> None: for service in services: if "image" in self.compose["services"][service] and any( - self.compose["services"][service]["image"].endswith(tag) + tag in self.compose["services"][service]["image"] for tag in [":latest", ":unstable", ":rolling"] ): - self.invoke("pull", service, max_tries=max_tries) + self.pull_single_image_by_service_name(service, max_tries=max_tries) + + def pull_single_image_by_service_name( + self, service_name: str, max_tries: int + ) -> None: + self.invoke("pull", service_name, max_tries=max_tries) + + def try_pull_service_image(self, service: Service, max_tries: int = 2) -> bool: + """Tries to pull the specified image and returns if this was successful.""" + + with self.override(service): + try: + self.pull_single_image_by_service_name( + service.name, max_tries=max_tries + ) + return True + except UIError: + return False def up( self, diff --git a/misc/python/materialize/scalability/endpoints.py b/misc/python/materialize/scalability/endpoints.py index 825e665b74380..49d192ac85141 100644 --- a/misc/python/materialize/scalability/endpoints.py +++ b/misc/python/materialize/scalability/endpoints.py @@ -94,9 +94,17 @@ def up(self) -> None: class MaterializeContainer(MaterializeNonRemote): - def __init__(self, composition: Composition, image: str | None = None) -> None: + def __init__( + self, + composition: Composition, + image: str | None = None, + alternative_image: str | None = None, + ) -> None: self.composition = composition self.image = image + self.alternative_image = ( + alternative_image if image != alternative_image else None + ) self._port: int | None = None super().__init__() @@ -109,10 +117,26 @@ def priv_port(self) -> int: def up(self) -> None: self.composition.down(destroy_volumes=True) + + if ( + self.image is not None + and self.alternative_image is not None + and not self.composition.try_pull_service_image( + Materialized(image=self.image) + ) + ): + # explicitly specified image cannot be found and alternative exists + print( + f"Unable to find image {self.image}, proceeding with alternative image {self.alternative_image}!" + ) + self.image = self.alternative_image + + self.up_internal() + self.lift_limits() + + def up_internal(self) -> None: with self.composition.override( Materialized(image=self.image, sanity_restart=False) ): self.composition.up("materialized") self._port = self.composition.default_port("materialized") - - self.lift_limits() diff --git a/misc/python/materialize/version_list.py b/misc/python/materialize/version_list.py index fae834950f0b7..4527df3ae1616 100644 --- a/misc/python/materialize/version_list.py +++ b/misc/python/materialize/version_list.py @@ -13,7 +13,7 @@ import frontmatter -from materialize.git import get_version_tags +from materialize import git from materialize.util import MzVersion MZ_ROOT = Path(os.environ["MZ_ROOT"]) @@ -84,7 +84,7 @@ class VersionsFromGit(VersionList): def __init__(self) -> None: self.versions = list( - {MzVersion.from_semver(t) for t in get_version_tags(fetch=True)} + {MzVersion.from_semver(t) for t in git.get_version_tags(fetch=True)} - INVALID_VERSIONS ) self.versions.sort() diff --git a/test/feature-benchmark/mzcompose.py b/test/feature-benchmark/mzcompose.py index ff85bc2feb02f..5d780b4fa5e00 100644 --- a/test/feature-benchmark/mzcompose.py +++ b/test/feature-benchmark/mzcompose.py @@ -26,6 +26,7 @@ from scenarios_skew import * # noqa: F401 F403 from scenarios_subscribe import * # noqa: F401 F403 +from materialize import benchmark_utils from materialize.feature_benchmark.aggregation import Aggregation, MinAggregation from materialize.feature_benchmark.benchmark import Benchmark, Report from materialize.feature_benchmark.comparator import ( @@ -123,6 +124,9 @@ def run_one_scenario( else (args.other_tag, args.other_size, args.other_params) ) + if tag == "common-ancestor": + tag = benchmark_utils.resolve_tag_of_common_ancestor() + c.up("testdrive", persistent=True) additional_system_parameter_defaults = None @@ -132,29 +136,17 @@ def run_one_scenario( param_name, param_value = param.split("=") additional_system_parameter_defaults[param_name] = param_value - mz = Materialized( - image=f"materialize/materialized:{tag}" if tag else None, - default_size=size, - # Avoid clashes with the Kafka sink progress topic across restarts - environment_id=f"local-az1-{uuid.uuid4()}-0", - soft_assertions=False, - additional_system_parameter_defaults=additional_system_parameter_defaults, - external_cockroach=True, - external_minio=True, - sanity_restart=False, - ) + mz_image = f"materialize/materialized:{tag}" if tag else None + mz = create_mz_service(mz_image, size, additional_system_parameter_defaults) - with c.override(mz): - print(f"The version of the '{instance.upper()}' Mz instance is:") - c.run( - "materialized", - "-c", - "environmentd --version | grep environmentd", - entrypoint="bash", - rm=True, + if tag is not None and not c.try_pull_service_image(mz): + print( + f"Unable to find materialize image with tag {tag}, proceeding with latest instead!" ) + mz_image = "materialize/materialized:latest" + mz = create_mz_service(mz_image, size, additional_system_parameter_defaults) - c.up("cockroach", "materialized") + start_overridden_mz_and_cockroach(c, mz, instance) executor = Docker(composition=c, seed=common_seed, materialized=mz) @@ -180,6 +172,40 @@ def run_one_scenario( return comparators +def create_mz_service( + mz_image: str | None, + default_size: int, + additional_system_parameter_defaults: dict[str, str] | None, +) -> Materialized: + return Materialized( + image=mz_image, + default_size=default_size, + # Avoid clashes with the Kafka sink progress topic across restarts + environment_id=f"local-az1-{uuid.uuid4()}-0", + soft_assertions=False, + additional_system_parameter_defaults=additional_system_parameter_defaults, + external_cockroach=True, + external_minio=True, + sanity_restart=False, + ) + + +def start_overridden_mz_and_cockroach( + c: Composition, mz: Materialized, instance: str +) -> None: + with c.override(mz): + print(f"The version of the '{instance.upper()}' Mz instance is:") + c.run( + "materialized", + "-c", + "environmentd --version | grep environmentd", + entrypoint="bash", + rm=True, + ) + + c.up("cockroach", "materialized") + + def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: """Feature benchmark framework.""" diff --git a/test/scalability/mzcompose.py b/test/scalability/mzcompose.py index 5c89a16764ba8..d851e9cc66d43 100644 --- a/test/scalability/mzcompose.py +++ b/test/scalability/mzcompose.py @@ -20,7 +20,7 @@ from jupyter_core.command import main as jupyter_core_command_main from psycopg import Cursor -from materialize import MZ_ROOT +from materialize import MZ_ROOT, benchmark_utils from materialize.mzcompose.composition import Composition, WorkflowArgumentParser from materialize.mzcompose.services.materialized import Materialized from materialize.mzcompose.services.postgres import Postgres @@ -200,7 +200,7 @@ def run_workload( def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: parser.add_argument( "--target", - help="Target for the benchmark: 'HEAD', 'local', 'remote', 'Postgres', or a DockerHub tag", + help="Target for the benchmark: 'HEAD', 'local', 'remote', 'common-ancestor', 'Postgres', or a DockerHub tag", action="append", default=[], ) @@ -292,8 +292,12 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: elif target == "HEAD": endpoint = MaterializeContainer(composition=c) else: + if target == "common-ancestor": + target = benchmark_utils.resolve_tag_of_common_ancestor() endpoint = MaterializeContainer( - composition=c, image=f"materialize/materialized:{target}" + composition=c, + image=f"materialize/materialized:{target}", + alternative_image="materialize/materialized:latest", ) assert endpoint is not None