Skip to content

Commit

Permalink
[benchmark] notebook for interactively computing benchmark config
Browse files Browse the repository at this point in the history
  • Loading branch information
ehigham committed Dec 5, 2024
1 parent ada7cfa commit 479c1cc
Show file tree
Hide file tree
Showing 12 changed files with 1,010 additions and 231 deletions.
2 changes: 2 additions & 0 deletions hail/notebooks/benchmark/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
in/
out/
459 changes: 459 additions & 0 deletions hail/notebooks/benchmark/minimal-detectable-slowdown.ipynb

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion hail/python/benchmark/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def pytest_addoption(parser):
const='cpu',
default=None,
)
parser.addoption('--max-failures', type=int, help='Stop benchmarking item after this many failures', default=3)
parser.addoption('--profiler-path', type=str, help='path to aysnc profiler', default=None)
parser.addoption('--profiler-fmt', choices=['html', 'flame', 'jfr'], help='Choose profiler output.', default='html')

Expand All @@ -40,14 +41,15 @@ def run_config_from_pytest_config(pytest_config):
('cores', 1),
('data_dir', os.getenv('HAIL_BENCHMARK_DIR')),
('iterations', None),
('max_failures', None),
('profile', None),
('profiler_path', os.getenv('ASYNC_PROFILER_HOME')),
('profiler_fmt', None),
]
},
'verbose': pytest_config.getoption('verbose') > 0,
'quiet': pytest_config.getoption('verbose') < 0,
'timeout': int(pytest_config.getoption('timeout') or 1800),
'timeout': int(pytest_config.getoption('timeout') or 100),
},
)

Expand Down
26 changes: 26 additions & 0 deletions hail/python/benchmark/hail/benchmark_benchmark_analysis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import tempfile
from pathlib import Path

from benchmark.tools import benchmark
from benchmark.tools.impex import dump_tsv, import_timings
from benchmark.tools.statistics import analyze_benchmarks


@benchmark()
def benchmark_analyze_benchmarks(local_tmpdir, onethreetwo, onethreethree):
inputs = (onethreetwo, onethreethree)
inputs = ((v, Path(tempfile.mktemp(dir=local_tmpdir))) for v in inputs)
inputs = ((dump_tsv(v, t), t)[-1] for v, t in inputs)

tables = (import_timings(v) for v in inputs)
tables = (t.select(instances=t.instances.trials.time) for t in tables)
tables = (t._key_by_assert_sorted(*t.key.drop('version')) for t in tables)
tables = (t.checkpoint(tempfile.mktemp(suffix='.mt', dir=local_tmpdir)) for t in tables)

results = analyze_benchmarks(
*tables,
n_bootstrap_iterations=1000,
confidence=0.95,
)

results._force_count()
17 changes: 17 additions & 0 deletions hail/python/benchmark/tools/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import functools
import logging
from typing import Callable, List, TypeVar, Union

import pytest

Expand All @@ -21,5 +22,21 @@ def chunk(size, seq):
yield seq[pos : pos + size]


A = TypeVar('A')
B = TypeVar('B')


def maybe(f: Callable[[A], B], ma: Union[A, None], default: Union[B, None] = None) -> Union[B, None]:
return f(ma) if ma is not None else default


def prune(kvs: dict) -> dict:
return {k: v for k, v in kvs.items() if v is not None}


def select(keys: List[str], **kwargs):
return (kwargs.get(k, None) for k in keys)


def init_logging(file=None):
logging.basicConfig(format="%(asctime)-15s: %(levelname)s: %(message)s", level=logging.INFO, filename=file)
146 changes: 0 additions & 146 deletions hail/python/benchmark/tools/compare.py

This file was deleted.

99 changes: 99 additions & 0 deletions hail/python/benchmark/tools/impex.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import json
from pathlib import Path
from typing import Any, Generator, List, Sequence

import hail as hl
from benchmark.tools import maybe, prune


class __types:
trun = hl.tstruct(
iteration=hl.tint, # 0-based
is_burn_in=hl.tbool, # ignore for a/b testing
time=hl.tfloat, # seconds
failure=hl.tstr, # exception message dumped to a string, optional
timed_out=hl.tbool, # whether or not the failure was caused by a timeout
task_memory=hl.tfloat, # don't think this works yet sadly.
)

ttrial = hl.tstruct(
path=hl.tstr,
name=hl.tstr,
version=hl.tstr,
uname=hl.tdict(hl.tstr, hl.tstr),
batch_id=hl.tint,
job_id=hl.tint,
trial=hl.tint,
attempt_id=hl.tstr,
start=hl.tstr,
end=hl.tstr,
**trun,
)


def __write_tsv_row(os, row: Sequence[str]) -> None:
if len(row) > 0:
os.write('\t'.join(row))
os.write('\n')


def dump_tsv(jsonl: Path, tsv: Path) -> None:
def explode(trial: dict) -> Generator[List[Any], Any, None]:
trial['uname'] = json.dumps(trial['uname'])
for run in trial['runs']:
flattened = prune({**trial, **run, 'runs': None})
yield [maybe(str, flattened.get(f), 'NA') for f in __types.ttrial]

with (
jsonl.open(encoding='utf-8') as in_,
tsv.open('w', encoding='utf-8') as out,
):
__write_tsv_row(out, [n for n in __types.ttrial])
for line in in_:
trial = json.loads(line)
for row in explode(trial):
__write_tsv_row(out, row)


def import_timings(timings_tsv: Path) -> hl.Table:
ht = hl.import_table(str(timings_tsv), types=__types.ttrial)
trial_key = [t for t in __types.ttrial.fields if t not in set(('uname', *__types.trun.fields))]
ht = ht.group_by(*trial_key).aggregate(
runs=hl.sorted(
hl.agg.collect(ht.row_value.select(*__types.trun)),
lambda t: t.iteration,
),
)

# Rename terms to be consistent with that of Laaber et al.:
# - "trial" (ie batch job) -> "instance"
# - "run" (benchmark invocation) -> "trial"
#
# Note we don't run benchmarks multiple times per trial as these are
# "macro"-benchmarks. This is one area where we differ from Laaber at al.
ht = ht.select(
instance=hl.struct(
instance=ht.trial,
batch_id=ht.batch_id,
job_id=ht.job_id,
attempt_id=ht.attempt_id,
start=ht.start,
end=ht.end,
trials=hl.filter(
lambda t: (
hl.is_missing(t.failure)
| (hl.is_defined(t.failure) & (hl.len(t.failure) == 0))
| ~t.timed_out
| ~t.is_burn_in
),
ht.runs,
),
),
)

return ht.group_by(ht.path, ht.name, ht.version).aggregate(
instances=hl.sorted(
hl.agg.collect(ht.instance),
key=lambda i: (i.instance, i.attempt_id),
)
)
35 changes: 35 additions & 0 deletions hail/python/benchmark/tools/plotting.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from collections.abc import Generator
from typing import Any, List, Optional

import hail as hl
from hail.ggplot import GGPlot, aes, geom_line, geom_point, ggplot, ggtitle


def __agg_names(ht: hl.Table) -> List[str]:
return ht.aggregate(hl.array(hl.agg.collect_as_set(ht.name)))


def plot_trial_against_time(
ht: hl.Table,
names: Optional[List[str]] = None,
) -> Generator[GGPlot, Any, Any]:
for name in names or __agg_names(ht):
k = ht.filter(ht.name == name)
k = k.explode(k.instances, name='__instance')
k = k.select(**k.__instance)
k = k.explode(k.trials, name='trial')
yield (
ggplot(k, aes(x=k.trial.iteration, y=k.trial.time, color=hl.str(k.instance))) + geom_line() + ggtitle(name)
)


def plot_mean_time_per_instance(
ht: hl.Table,
names: Optional[List[str]] = None,
) -> Generator[GGPlot, Any, Any]:
for name in names or __agg_names(ht):
k = ht.filter(ht.name == name)
k = k.explode(k.instances, name='__instance')
k = k.select(**k.__instance)
k = k.annotate(s=k.trials.aggregate(lambda t: hl.agg.stats(t.time)))
yield (ggplot(k, aes(x=k.instance, y=k.s.mean)) + geom_point() + ggtitle(name))
Loading

0 comments on commit 479c1cc

Please sign in to comment.