diff --git a/hail/notebooks/benchmark/.gitignore b/hail/notebooks/benchmark/.gitignore new file mode 100644 index 000000000000..89ca722e4f1d --- /dev/null +++ b/hail/notebooks/benchmark/.gitignore @@ -0,0 +1,2 @@ +in/ +out/ diff --git a/hail/notebooks/benchmark/minimal-detectable-slowdown.ipynb b/hail/notebooks/benchmark/minimal-detectable-slowdown.ipynb new file mode 100644 index 000000000000..d9f89e33c3da --- /dev/null +++ b/hail/notebooks/benchmark/minimal-detectable-slowdown.ipynb @@ -0,0 +1,459 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "This notebook explores variability in hail's python (macro)-benchmarks when\n", + "said benchmarks are executed on the hail batch service. The analyses within \n", + "are based off the methods proposed in [1], albeit slightly modified for long\n", + "running benchmarks. The goals of these analyses are\n", + "\n", + "- to determine if we can detect slowdowns of 5% or less reliably when running\n", + " benchmarks on hail batch.\n", + "- to identify configurations (number of batch jobs x iterations) that allow us\n", + " to detect slowdowns efficiently (ie without excesssive time and money).\n", + "\n", + "[1] Laaber et al., Software Microbenchmarking in the Cloud.How Bad is it Really?\n", + " https://dl.acm.org/doi/10.1007/s10664-019-09681-1" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from pathlib import Path\n", + "from typing import Dict, List\n", + "\n", + "from benchmark.tools import maybe\n", + "from benchmark.tools.impex import dump_tsv, import_timings\n", + "from benchmark.tools.plotting import plot_mean_time_per_instance, plot_trial_against_time\n", + "from benchmark.tools.statistics import (\n", + " bootstrap_mean_confidence_interval,\n", + " laaber_mds,\n", + " schultz_mds,\n", + " variability,\n", + ")\n", + "from IPython.display import clear_output\n", + "from plotly.io import renderers\n", + "\n", + "import hail as hl\n", + "\n", + "renderers.default = 'notebook_connected'" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "hl.init(backend='spark', idempotent=True, local_tmpdir='/tmp/mds')\n", + "hl._set_flags(use_new_shuffle='1', lower='1')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Import benchmark data\n", + "# ---------------------\n", + "#\n", + "# benchmarks under `hail/python/benchmarks` are executed with a custom pytest\n", + "# plugin and their results are output as json lines (.jsonl).\n", + "# Unscrupulously, we use hail to analyse itself.\n", + "\n", + "with hl.TemporaryFilename(dir='/tmp') as tsvfile:\n", + " timings = Path(tsvfile)\n", + " dump_tsv(Path('in/benchmarks.jsonl'), timings)\n", + " ht = import_timings(timings)\n", + " ht = ht.checkpoint('out/imported.ht', overwrite=True)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "ht = hl.read_table('out/imported.ht')\n", + "\n", + "benchmarks = ht.aggregate(hl.agg.collect_as_set(ht.name))\n", + "print(*benchmarks, sep='\\n')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# In this next section, we'll estimate the number of iterations required for\n", + "# a benchmark to reach a steady-state, or the number of so-called \"burn-in\"\n", + "# iterations.\n", + "\n", + "\n", + "def filter_burn_in_iterations(ht: hl.Table, first_stable_index: Dict[str, int]) -> hl.Table:\n", + " ht = ht.annotate_globals(first_stable_index=first_stable_index)\n", + " return ht.select(\n", + " instances=ht.instances.map(\n", + " lambda instance: instance.annotate(\n", + " trials=hl.filter(\n", + " lambda t: t.iteration >= ht.first_stable_index.get(ht.name, 0),\n", + " instance.trials,\n", + " ),\n", + " )\n", + " ),\n", + " )\n", + "\n", + "\n", + "first_stable_index = {\n", + " 'benchmark_matrix_table_scan_count_cols_2': 20,\n", + " 'benchmark_kyle_sex_specific_qc': 7,\n", + " 'benchmark_matrix_table_entries_table_no_key': 10,\n", + " 'benchmark_test_left_join_region_memory': 10,\n", + " 'benchmark_matrix_table_call_stats_star_star': 8,\n", + " 'benchmark_ndarray_matmul_int64': 10,\n", + " 'benchmark_table_scan_prev_non_null': 20,\n", + " 'benchmark_export_range_matrix_table_row_p100': 8,\n", + " 'benchmark_join_partitions_table[1000-10]': 12,\n", + " 'benchmark_split_multi': 4,\n", + " 'benchmark_matrix_table_filter_entries': 6,\n", + " 'benchmark_mt_localize_and_collect': 5,\n", + " 'benchmark_table_range_array_range_force_count': 6,\n", + " 'benchmark_variant_qc': 8,\n", + " 'benchmark_matrix_table_take_entry': 10,\n", + " 'benchmark_union_partitions_table[1000-10]': 12,\n", + " 'benchmark_import_bgen_info_score': 12,\n", + " 'benchmark_export_range_matrix_table_col_p100': 15,\n", + " 'benchmark_join_partitions_table[100-100]': 10,\n", + " 'benchmark_logistic_regression_rows_wald': 5,\n", + " 'benchmark_matrix_table_decode_and_count_just_gt': 5,\n", + " 'benchmark_table_range_force_count': 10,\n", + " 'benchmark_matrix_table_array_arithmetic': 20,\n", + " 'benchmark_sentinel_read_gunzip': 10,\n", + " 'benchmark_join_partitions_table[100-10]': 10,\n", + " 'benchmark_matrix_table_take_col': 10,\n", + " 'benchmark_join_partitions_table[1000-1000]': 10,\n", + " 'benchmark_read_force_count_partitions[10]': 10,\n", + " 'benchmark_import_and_transform_gvcf': 10,\n", + " 'benchmark_table_take': 25,\n", + " 'benchmark_write_range_table[10000000-100]': 3,\n", + " 'benchmark_table_read_force_count_strings': 10,\n", + " 'benchmark_test_inner_join_region_memory': 10,\n", + " 'benchmark_write_range_table[10000000-1000]': 6,\n", + " 'benchmark_join_partitions_table[100-1000]': 10,\n", + " 'benchmark_matrix_table_cols_show': 16,\n", + " 'benchmark_python_only_10k_transform': 10,\n", + " 'benchmark_union_partitions_table[100-100]': 5,\n", + " 'benchmark_read_force_count_partitions[1000]': 10,\n", + " 'benchmark_sample_qc': 10,\n", + " 'benchmark_matrix_table_entries_show': 15,\n", + " 'benchmark_sentinel_cpu_hash_1': 5,\n", + " 'benchmark_analyze_benchmarks': 10,\n", + " 'benchmark_import_gvcf_force_count': 8,\n", + " 'benchmark_table_range_join_1b_1b': 10,\n", + " 'benchmark_import_bgen_filter_count': 18,\n", + " 'benchmark_table_read_force_count_ints': 5,\n", + " 'benchmark_split_multi_hts': 10,\n", + " 'benchmark_union_partitions_table[100-10]': 20,\n", + " 'benchmark_table_range_join_1b_1k': 20,\n", + " 'benchmark_matrix_table_aggregate_entries': 8,\n", + " 'benchmark_export_vcf': 15,\n", + " 'benchmark_matrix_table_many_aggs_col_wise': 10,\n", + " 'benchmark_test_map_filter_region_memory': 15,\n", + " 'benchmark_import_vcf_count_rows': 1,\n", + " 'benchmark_ndarray_matmul_float64': 6,\n", + " 'benchmark_table_python_construction': 10,\n", + " 'benchmark_per_row_stats_star_star': 10,\n", + " 'benchmark_matrix_table_take_row': 10,\n", + " 'benchmark_import_bgen_force_count_just_gp': 20,\n", + " 'benchmark_matrix_table_entries_table': 10,\n", + " 'benchmark_union_partitions_table[10-100]': 5,\n", + " 'benchmark_union_partitions_table[10-10]': 5,\n", + " 'benchmark_table_aggregate_array_sum': 5,\n", + " 'benchmark_mt_group_by_memory_usage': 5,\n", + " 'benchmark_shuffle_key_by_aggregate_good_locality': 5,\n", + " 'benchmark_hwe_normalized_pca_blanczos_small_data_0_iterations': 5,\n", + " 'benchmark_read_force_count_partitions[100]': 8,\n", + " 'benchmark_table_show': 20,\n", + " 'benchmark_concordance': 3,\n", + " 'benchmark_matrix_table_filter_entries_unfilter': 3,\n", + " 'benchmark_union_partitions_table[10-1000]': 8,\n", + " 'benchmark_matrix_table_many_aggs_row_wise': 4,\n", + " 'benchmark_sum_table_of_ndarrays': 10,\n", + " 'benchmark_python_only_10k_combine': 20,\n", + " 'benchmark_union_partitions_table[100-1000]': 15,\n", + " 'benchmark_join_partitions_table[1000-100]': 10,\n", + " 'benchmark_union_partitions_table[1000-100]': 6,\n", + " 'benchmark_shuffle_key_by_aggregate_bad_locality': 8,\n", + " 'benchmark_vds_combiner_chr22': 10,\n", + " 'benchmark_write_range_table[10000000-10]': 5,\n", + " 'benchmark_read_with_index[1000]': 8,\n", + " 'benchmark_genetics_pipeline': 10,\n", + " 'benchmark_import_vcf_write': 5,\n", + " 'benchmark_import_bgen_force_count_all': 20,\n", + " 'benchmark_linear_regression_rows': 10,\n", + " 'benchmark_export_range_matrix_table_entry_field_p100': 3,\n", + " 'benchmark_blockmatrix_write_from_entry_expr_range_mt': 10,\n", + " 'benchmark_matrix_table_rows_show': 15,\n", + " 'benchmark_matrix_table_show': 15,\n", + " 'benchmark_write_range_matrix_table_p100': 8,\n", + " 'benchmark_test_head_and_tail_region_memory': 10,\n", + " 'benchmark_matrix_table_rows_is_transition': 10,\n", + " 'benchmark_matrix_table_decode_and_count': 8,\n", + " 'benchmark_matrix_table_rows_force_count': 30,\n", + " 'benchmark_ndarray_addition': 10,\n", + " 'benchmark_table_range_means': 10,\n", + " 'benchmark_write_profile_mt': 15,\n", + " 'benchmark_make_ndarray': 5,\n", + " 'benchmark_matrix_table_scan_count_rows_2': 5,\n", + " 'benchmark_read_decode_gnomad_coverage': 10,\n", + " 'benchmark_table_aggregate_approx_cdf': 16,\n", + " 'benchmark_table_scan_sum_1k_partitions': 6,\n", + " 'benchmark_union_partitions_table[1000-1000]': 10,\n", + " 'benchmark_variant_and_sample_qc_nested_with_filters_4_counts': 20,\n", + " 'benchmark_group_by_take_rekey': 10,\n", + " 'benchmark_table_annotate_many_flat': 18,\n", + " 'benchmark_table_import_strings': 3,\n", + " 'benchmark_table_aggregate_int_stats': 18,\n", + " 'benchmark_variant_and_sample_qc': 18,\n", + " 'benchmark_table_foreign_key_join[1000000-1000]': 8,\n", + " 'benchmark_table_group_by_aggregate_sorted': 10,\n", + " 'benchmark_shuffle_key_rows_by_4096_byte_rows': 4,\n", + " 'benchmark_hwe_normalized_pca': 4,\n", + " 'benchmark_table_aggregate_downsample_dense': 4,\n", + " 'benchmark_join_partitions_table[10-1000]': 14,\n", + " 'benchmark_join_partitions_table[10-10]': 6,\n", + " 'benchmark_table_group_by_aggregate_unsorted': 7,\n", + " 'benchmark_shuffle_order_by_10m_int': 22,\n", + " 'benchmark_table_aggregate_take_by_strings': 10,\n", + " 'benchmark_join_partitions_table[10-100]': 4,\n", + " 'benchmark_variant_and_sample_qc_nested_with_filters_4': 20,\n", + " 'benchmark_table_aggregate_counter': 20,\n", + " 'benchmark_table_key_by_shuffle': 6,\n", + " 'benchmark_table_expr_take': 20,\n", + " 'benchmark_shuffle_key_rows_by_65k_byte_rows': 4,\n", + " 'benchmark_table_foreign_key_join[1000000-1000000]': 20,\n", + " 'benchmark_table_import_ints_impute': 10,\n", + " 'benchmark_group_by_collect_per_row': 8,\n", + " 'benchmark_table_aggregate_downsample_worst_case': 10,\n", + " 'benchmark_table_big_aggregate_compilation': 5,\n", + " 'benchmark_shuffle_key_rows_by_mt': 10,\n", + " 'benchmark_table_big_aggregate_compile_and_execute': 5,\n", + " 'benchmark_table_import_ints': 9,\n", + " 'benchmark_variant_and_sample_qc_nested_with_filters_2': 13,\n", + " 'benchmark_table_aggregate_linreg': 8,\n", + "}" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Short of an accurate algorithm for computing this, you, noble reader, are\n", + "# tasked with the mind-numbing task of looking at graphs and picking numbers.\n", + "# This is an iterative process and you'll likely lose the will to live mid-way.\n", + "#\n", + "# Persevere, friend. Your sacrifice will not go unrewarded.\n", + "#\n", + "# In what follows, you'll be shown two plots. On the top will be the unfiltered\n", + "# benchmark times vs iteration for all batch jobs. The plot below will show the\n", + "# same benchmark filtered to the number of burn in iterations you selected\n", + "# previously.\n", + "#\n", + "# You'll be promted to enter a new first stable index for each benchmark until\n", + "# you arrive at a fixed point. Note that some benchmarks never really reach\n", + "# stability. In this case try to pick a value that compromises between cost and\n", + "# accuracy (ie if a benchmark is really slow, we don't want to be doing tons\n", + "# of burn in iterations, whereas for a fast benchmark we could justify more).\n", + "#\n", + "# Good luck.\n", + "\n", + "names: List[str] = benchmarks # type: ignore\n", + "\n", + "while len(names) != 0:\n", + " ft = filter_burn_in_iterations(ht, first_stable_index)\n", + " __new_names, names = names, []\n", + " for before, after in zip(plot_trial_against_time(ht, __new_names), plot_trial_against_time(ft, __new_names)):\n", + " clear_output(wait=True)\n", + "\n", + " name: str = before.labels.title # type: ignore\n", + " cur_index = first_stable_index.get(name)\n", + "\n", + " after.labels.title = f'{name} (filtered iteration={cur_index})'\n", + " before.show()\n", + " after.show()\n", + "\n", + " new_index = maybe(int, input('Enter the first stable index (or blank keep same)') or None)\n", + "\n", + " if new_index is not None and new_index != cur_index:\n", + " first_stable_index[name] = new_index\n", + " names.append(name)\n", + "\n", + "first_stable_index" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# As a final step of cleaning, we'll filter out trials that differ by some\n", + "# multiplier of the median for each instance\n", + "\n", + "\n", + "def filter_outliers(ht: hl.Table, factor: hl.Float64Expression) -> hl.Table:\n", + " return ht.select(\n", + " instances=ht.instances.map(\n", + " lambda instance: instance.annotate(\n", + " trials=hl.bind(\n", + " lambda median: instance.trials.filter(\n", + " lambda t: hl.max([t.time, median]) / hl.min([t.time, median]) < factor\n", + " ),\n", + " hl.median(instance.trials.map(lambda t: t.time)),\n", + " )\n", + " ),\n", + " ),\n", + " )\n", + "\n", + "\n", + "def filter_names(ht: hl.Table, names: hl.SetExpression) -> hl.Table:\n", + " return ht.filter(names.contains(ht.name))\n", + "\n", + "\n", + "def filter_failed_trials(ht: hl.Table) -> hl.Table:\n", + " return ht.annotate(\n", + " instances=ht.instances.map(\n", + " lambda i: i.annotate(\n", + " trials=hl.filter(\n", + " lambda t: (~t.timed_out) | hl.is_missing(t.failure),\n", + " i.trials,\n", + " ),\n", + " )\n", + " ),\n", + " )\n", + "\n", + "\n", + "def filter_non_viable_instances(ht: hl.Table, ninstances: hl.Int32Expression, ntrials: hl.Int32Expression) -> hl.Table:\n", + " ht = ht.select(\n", + " instances=hl.filter(\n", + " lambda instance: hl.len(instance.trials) >= ntrials,\n", + " ht.instances,\n", + " ),\n", + " )\n", + "\n", + " return ht.filter(hl.len(ht.instances) >= ninstances)\n", + "\n", + "\n", + "ht = hl.read_table('out/imported.ht')\n", + "all: List[str] = ht.aggregate(hl.agg.collect_as_set(ht.name)) # type: ignore\n", + "\n", + "\n", + "ht = filter_names(ht, hl.set([n for n in all if n in first_stable_index]))\n", + "ht = filter_burn_in_iterations(ht, first_stable_index)\n", + "ht = filter_failed_trials(ht)\n", + "ht = filter_outliers(ht, hl.float64(10))\n", + "ht = filter_non_viable_instances(ht, 50, 50)\n", + "ht = ht.checkpoint('out/filtered.ht', overwrite=True)\n", + "\n", + "benchmarks = ht.aggregate(hl.agg.collect_as_set(ht.name))\n", + "\n", + "print('Filtered:', *(n for n in all if n not in set(benchmarks)), sep='\\n')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# These plots show the mean time per instance. This provides a visual way of\n", + "# identifying differences in instance type if there are multiple distinct layers\n", + "\n", + "for f in plot_mean_time_per_instance(ht):\n", + " f.show()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# laaber et al. section 4\n", + "\n", + "ht = ht.select(instances=ht.instances.trials.time)\n", + "variability(ht).show(len(benchmarks))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# laaber et al. section 5 - boostrapping confidence intervals of the mean\n", + "\n", + "bootstrap_mean_confidence_interval(ht, 1000, 0.95).show()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Laaber et al - Minimal-Detectable Slowdown\n", + "\n", + "ht = hl.read_table('out/filtered.ht')\n", + "laaber_mds(ht).write('out/laaber-mds.ht', overwrite=True)\n", + "schultz_mds(ht).write('out/schultz-mds.ht', overwrite=True)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "slideshow": { + "slide_type": "fragment" + } + }, + "outputs": [], + "source": [ + "mds, schultz = [hl.read_table(f'out/{m}-mds.ht') for m in ('laaber', 'schultz')]\n", + "mds = mds.annotate_globals(first_stable_index=first_stable_index)\n", + "mds = mds.select(nburn_in=mds.first_stable_index[mds.name] - 1, laaber=mds.row_value, schultz=schultz[mds.key])\n", + "mds.show()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.18" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/hail/python/benchmark/conftest.py b/hail/python/benchmark/conftest.py index c25d0306e102..4bcf86295f10 100644 --- a/hail/python/benchmark/conftest.py +++ b/hail/python/benchmark/conftest.py @@ -23,6 +23,7 @@ def pytest_addoption(parser): const='cpu', default=None, ) + parser.addoption('--max-failures', type=int, help='Stop benchmarking item after this many failures', default=3) parser.addoption('--profiler-path', type=str, help='path to aysnc profiler', default=None) parser.addoption('--profiler-fmt', choices=['html', 'flame', 'jfr'], help='Choose profiler output.', default='html') @@ -40,6 +41,7 @@ def run_config_from_pytest_config(pytest_config): ('cores', 1), ('data_dir', os.getenv('HAIL_BENCHMARK_DIR')), ('iterations', None), + ('max_failures', None), ('profile', None), ('profiler_path', os.getenv('ASYNC_PROFILER_HOME')), ('profiler_fmt', None), @@ -47,7 +49,7 @@ def run_config_from_pytest_config(pytest_config): }, 'verbose': pytest_config.getoption('verbose') > 0, 'quiet': pytest_config.getoption('verbose') < 0, - 'timeout': int(pytest_config.getoption('timeout') or 1800), + 'timeout': int(pytest_config.getoption('timeout') or 100), }, ) diff --git a/hail/python/benchmark/hail/benchmark_benchmark_analysis.py b/hail/python/benchmark/hail/benchmark_benchmark_analysis.py new file mode 100644 index 000000000000..a89e6b07680b --- /dev/null +++ b/hail/python/benchmark/hail/benchmark_benchmark_analysis.py @@ -0,0 +1,26 @@ +import tempfile +from pathlib import Path + +from benchmark.tools import benchmark +from benchmark.tools.impex import dump_tsv, import_timings +from benchmark.tools.statistics import analyze_benchmarks + + +@benchmark() +def benchmark_analyze_benchmarks(local_tmpdir, onethreetwo, onethreethree): + inputs = (onethreetwo, onethreethree) + inputs = ((v, Path(tempfile.mktemp(dir=local_tmpdir))) for v in inputs) + inputs = ((dump_tsv(v, t), t)[-1] for v, t in inputs) + + tables = (import_timings(v) for v in inputs) + tables = (t.select(instances=t.instances.trials.time) for t in tables) + tables = (t._key_by_assert_sorted(*t.key.drop('version')) for t in tables) + tables = (t.checkpoint(tempfile.mktemp(suffix='.mt', dir=local_tmpdir)) for t in tables) + + results = analyze_benchmarks( + *tables, + n_bootstrap_iterations=1000, + confidence=0.95, + ) + + results._force_count() diff --git a/hail/python/benchmark/tools/__init__.py b/hail/python/benchmark/tools/__init__.py index 8662551f86c7..67da79872a9d 100644 --- a/hail/python/benchmark/tools/__init__.py +++ b/hail/python/benchmark/tools/__init__.py @@ -1,5 +1,6 @@ import functools import logging +from typing import Callable, List, TypeVar, Union import pytest @@ -21,5 +22,21 @@ def chunk(size, seq): yield seq[pos : pos + size] +A = TypeVar('A') +B = TypeVar('B') + + +def maybe(f: Callable[[A], B], ma: Union[A, None], default: Union[B, None] = None) -> Union[B, None]: + return f(ma) if ma is not None else default + + +def prune(kvs: dict) -> dict: + return {k: v for k, v in kvs.items() if v is not None} + + +def select(keys: List[str], **kwargs): + return (kwargs.get(k, None) for k in keys) + + def init_logging(file=None): logging.basicConfig(format="%(asctime)-15s: %(levelname)s: %(message)s", level=logging.INFO, filename=file) diff --git a/hail/python/benchmark/tools/compare.py b/hail/python/benchmark/tools/compare.py deleted file mode 100644 index ff4efd79b51b..000000000000 --- a/hail/python/benchmark/tools/compare.py +++ /dev/null @@ -1,146 +0,0 @@ -import json -import math -import os -import sys -from argparse import ArgumentParser - -import numpy as np -from scipy.stats.mstats import gmean, hmean - - -def load_file(path): - if path.endswith('.json'): - with open(path, 'r') as f: - js_data = json.load(f) - elif path.endswith('.tsv'): - import pandas as pd - - js_data = pd.read_table(path).to_json(orient='records') - else: - raise ValueError(f'unknown format: {os.path.basename(path)}') - - return {x['name']: x for x in js_data['benchmarks']} - - -def fmt_mem_ratio(m1, m2): - if m1 is None or m2 is None or m1 == 0 or m2 == 0: - return 'NA' - return fmt_diff(m1 / m2) - - -def fmt_diff(ratio): - return f'{ratio * 100:.1f}%' - - -def fmt_time(x, size): - return f'{x:.3f}'.rjust(size) - - -def fmt_mem(b): - if b is None: - return 'NA' - return f'{int(math.ceil(b / (1024 * 1024)))}' - - -def compare(args): - run1 = args.run1 - run2 = args.run2 - - min_time_for_inclusion = args.min_time - - data1 = load_file(run1) - data2 = load_file(run2) - - names1 = set(data1.keys()) - names2 = set(data2.keys()) - all_names = names1.union(names2) - overlap = names1.intersection(names2) - diff = all_names - overlap - - if diff: - sys.stderr.write("Found non-overlapping benchmarks:" + ''.join(f'\n {t}' for t in diff) + '\n') - - if args.metric == 'best': - metric_f = min - elif args.metric == 'median': - metric_f = np.median - - def get_time_metric(data): - return metric_f(data['times']) - - def get_memory_metric(data): - if 'peak_task_memory' in data: - return max(data['peak_task_memory']) - return None - - failed_1 = [] - failed_2 = [] - comparison = [] - for name in overlap: - d1 = data1[name] - d2 = data2[name] - d1_failed = d1.get('failed') or d1.get('times') == [] # rescue bugs in previous versions - d2_failed = d2.get('failed') or d2.get('times') == [] # rescue bugs in previous versions - if d1_failed: - failed_1.append(name) - if d2_failed: - failed_2.append(name) - if d1_failed or d2_failed: - continue - try: - run1_time_metric = get_time_metric(d1) - run2_time_metric = get_time_metric(d2) - run1_memory_metric = get_memory_metric(d1) - run2_memory_metric = get_memory_metric(d2) - except Exception as e: - raise ValueError(f"error while computing metric for {name}:\n d1={d1}\n d2={d2}") from e - if run1_time_metric < min_time_for_inclusion and run2_time_metric < min_time_for_inclusion: - continue - - comparison.append((name, run1_time_metric, run2_time_metric, run1_memory_metric, run2_memory_metric)) - - if failed_1: - sys.stderr.write("Failed benchmarks in run 1:" + ''.join(f'\n {t}' for t in failed_1) + '\n') - if failed_2: - sys.stderr.write("Failed benchmarks in run 2:" + ''.join(f'\n {t}' for t in failed_2) + '\n') - comparison = sorted(comparison, key=lambda x: x[2] / x[1], reverse=True) - - longest_name = max(len('Benchmark Name'), *[len(t[0]) for t in comparison]) - - comps = [] - - def format(name, ratio, t1, t2, memory_ratio, mem1, mem2): - return f'{name:>{longest_name}} {ratio:>8} {t1:>8} {t2:>8} {memory_ratio:>9} {mem1:>10} {mem2:>10}' - - print(format('Benchmark Name', 'Ratio', 'Time 1', 'Time 2', 'Mem Ratio', 'Mem 1 (MB)', 'Mem 2 (MB)')) - print(format('--------------', '-----', '------', '------', '---------', '----------', '----------')) - for name, r1, r2, m1, m2 in comparison: - print( - format( - name, - fmt_diff(r2 / r1), - fmt_time(r1, 8), - fmt_time(r2, 8), - fmt_mem_ratio(m2, m1), - fmt_mem(m1), - fmt_mem(m2), - ) - ) - if name.startswith('sentinel'): - continue - comps.append(r2 / r1) - - print('----------------------') - print(f'Harmonic mean: {fmt_diff(hmean(comps))}') - print(f'Geometric mean: {fmt_diff(gmean(comps))}') - print(f'Arithmetic mean: {fmt_diff(np.mean(comps))}') - print(f'Median: {fmt_diff(np.median(comps))}') - - -if __name__ == '__main__': - parser = ArgumentParser('compare', description='Run Hail benchmarks.') - parser.add_argument('run1', type=str, help='First benchmarking run.') - parser.add_argument('run2', type=str, help='Second benchmarking run.') - parser.add_argument('--min-time', type=float, default=1.0, help='Minimum runtime in either run for inclusion.') - parser.add_argument('--metric', type=str, default='median', choices=['best', 'median'], help='Comparison metric.') - compare(parser.parse_args()) diff --git a/hail/python/benchmark/tools/impex.py b/hail/python/benchmark/tools/impex.py new file mode 100644 index 000000000000..cd0507f0baf0 --- /dev/null +++ b/hail/python/benchmark/tools/impex.py @@ -0,0 +1,99 @@ +import json +from pathlib import Path +from typing import Any, Generator, List, Sequence + +import hail as hl +from benchmark.tools import maybe, prune + + +class __types: + trun = hl.tstruct( + iteration=hl.tint, # 0-based + is_burn_in=hl.tbool, # ignore for a/b testing + time=hl.tfloat, # seconds + failure=hl.tstr, # exception message dumped to a string, optional + timed_out=hl.tbool, # whether or not the failure was caused by a timeout + task_memory=hl.tfloat, # don't think this works yet sadly. + ) + + ttrial = hl.tstruct( + path=hl.tstr, + name=hl.tstr, + version=hl.tstr, + uname=hl.tdict(hl.tstr, hl.tstr), + batch_id=hl.tint, + job_id=hl.tint, + trial=hl.tint, + attempt_id=hl.tstr, + start=hl.tstr, + end=hl.tstr, + **trun, + ) + + +def __write_tsv_row(os, row: Sequence[str]) -> None: + if len(row) > 0: + os.write('\t'.join(row)) + os.write('\n') + + +def dump_tsv(jsonl: Path, tsv: Path) -> None: + def explode(trial: dict) -> Generator[List[Any], Any, None]: + trial['uname'] = json.dumps(trial['uname']) + for run in trial['runs']: + flattened = prune({**trial, **run, 'runs': None}) + yield [maybe(str, flattened.get(f), 'NA') for f in __types.ttrial] + + with ( + jsonl.open(encoding='utf-8') as in_, + tsv.open('w', encoding='utf-8') as out, + ): + __write_tsv_row(out, [n for n in __types.ttrial]) + for line in in_: + trial = json.loads(line) + for row in explode(trial): + __write_tsv_row(out, row) + + +def import_timings(timings_tsv: Path) -> hl.Table: + ht = hl.import_table(str(timings_tsv), types=__types.ttrial) + trial_key = [t for t in __types.ttrial.fields if t not in set(('uname', *__types.trun.fields))] + ht = ht.group_by(*trial_key).aggregate( + runs=hl.sorted( + hl.agg.collect(ht.row_value.select(*__types.trun)), + lambda t: t.iteration, + ), + ) + + # Rename terms to be consistent with that of Laaber et al.: + # - "trial" (ie batch job) -> "instance" + # - "run" (benchmark invocation) -> "trial" + # + # Note we don't run benchmarks multiple times per trial as these are + # "macro"-benchmarks. This is one area where we differ from Laaber at al. + ht = ht.select( + instance=hl.struct( + instance=ht.trial, + batch_id=ht.batch_id, + job_id=ht.job_id, + attempt_id=ht.attempt_id, + start=ht.start, + end=ht.end, + trials=hl.filter( + lambda t: ( + hl.is_missing(t.failure) + | (hl.is_defined(t.failure) & (hl.len(t.failure) == 0)) + | ~t.timed_out + | ~t.is_burn_in + ), + ht.runs, + ), + ), + ) + + return ht.group_by(ht.path, ht.name, ht.version).aggregate( + instances=hl.sorted( + hl.agg.collect(ht.instance), + key=lambda i: (i.instance, i.attempt_id), + ) + ) diff --git a/hail/python/benchmark/tools/plotting.py b/hail/python/benchmark/tools/plotting.py new file mode 100644 index 000000000000..73b743829d3e --- /dev/null +++ b/hail/python/benchmark/tools/plotting.py @@ -0,0 +1,35 @@ +from collections.abc import Generator +from typing import Any, List, Optional + +import hail as hl +from hail.ggplot import GGPlot, aes, geom_line, geom_point, ggplot, ggtitle + + +def __agg_names(ht: hl.Table) -> List[str]: + return ht.aggregate(hl.array(hl.agg.collect_as_set(ht.name))) + + +def plot_trial_against_time( + ht: hl.Table, + names: Optional[List[str]] = None, +) -> Generator[GGPlot, Any, Any]: + for name in names or __agg_names(ht): + k = ht.filter(ht.name == name) + k = k.explode(k.instances, name='__instance') + k = k.select(**k.__instance) + k = k.explode(k.trials, name='trial') + yield ( + ggplot(k, aes(x=k.trial.iteration, y=k.trial.time, color=hl.str(k.instance))) + geom_line() + ggtitle(name) + ) + + +def plot_mean_time_per_instance( + ht: hl.Table, + names: Optional[List[str]] = None, +) -> Generator[GGPlot, Any, Any]: + for name in names or __agg_names(ht): + k = ht.filter(ht.name == name) + k = k.explode(k.instances, name='__instance') + k = k.select(**k.__instance) + k = k.annotate(s=k.trials.aggregate(lambda t: hl.agg.stats(t.time))) + yield (ggplot(k, aes(x=k.instance, y=k.s.mean)) + geom_point() + ggtitle(name)) diff --git a/hail/python/benchmark/tools/statistics.py b/hail/python/benchmark/tools/statistics.py new file mode 100644 index 000000000000..f2614c9e6df4 --- /dev/null +++ b/hail/python/benchmark/tools/statistics.py @@ -0,0 +1,367 @@ +from os import cpu_count +from typing import Callable + +import hail as hl + + +def cv(trial: hl.StructExpression) -> hl.Float64Expression: + """coefficient of variation""" + return hl.bind(lambda s: s.stdev / s.mean, hl.agg.stats(trial)) + + +def variability(ht: hl.Table) -> hl.Table: + """Compute benchmark total and per-trial variability""" + return ht.select( + total=ht.instances.aggregate( + lambda trials: hl.agg.explode(cv, trials), + ), + trials=( + ht.instances.map(lambda trials: trials.aggregate(cv)).aggregate( + lambda cvs: hl.agg.stats(cvs).select('mean', 'stdev') + ) + ), + ) + + +def boostrap_confidence_interval( + statistic: Callable[[hl.Table], hl.NumericExpression], + ht: hl.Table, + n_bootstrap_iterations: int, + confidence: float, +) -> hl.Table: + if confidence <= 0 or confidence >= 1: + raise ValueError(f'Confidence must fall within interval (0, 1), got {confidence}.') + + endpoints = (lower := (1 - confidence) / 2, 1 - lower) + + ht = ht.annotate(__bootstrap=hl.range(n_bootstrap_iterations)) + ht = ht.explode('__bootstrap') + ht = ht.select(__bootstrap=statistic(ht)) + ht = ht.group_by(*ht.key).aggregate(__results=hl.agg.collect(ht.__bootstrap)) + + # not obvious why, but this checkpoint adds a 4x performance improvement + ht = ht.checkpoint(hl.utils.new_local_temp_file()) + + return ht.select( + **hl.bind( + lambda results, len: hl.bind( + lambda lo, hi: hl.struct( + ci=hl.interval(lo, hi, includes_end=True), + radius=hl.rbind((hi + lo) / 2, lambda mid: (hi - mid) / mid), + ), + *[results[hl.int(p * len)] for p in endpoints], + ), + hl.sorted(ht.__results), + hl.int(hl.len(ht.__results)), + ), + ) + + +def __randomize_with_replacement(xs: hl.ArrayExpression) -> hl.ArrayExpression: + return hl.bind( + lambda xs: hl.bind( + lambda len: hl.map( + lambda idx: xs[idx], + hl.repeat(lambda: hl.rand_int32(len), len), + ), + hl.len(xs), + ), + xs, + ) + + +def __agg_randomized_mean(instances: hl.ArrayStructExpression) -> hl.NumericExpression: + return ( + __randomize_with_replacement(instances) + .map(__randomize_with_replacement) + .aggregate(lambda trials: hl.agg.explode(hl.agg.mean, trials)) + ) + + +def bootstrap_mean_confidence_interval( + ht: hl.Table, + n_bootstrap_iterations: int, + confidence: float, +) -> hl.Table: + """ + Approximate confidence interval of mean execution time across all trails + of a benchmark via bootstrap simulations as proposed by Laaber et al. + """ + return boostrap_confidence_interval( + statistic=lambda ht: __agg_randomized_mean(ht.instances), + ht=ht, + n_bootstrap_iterations=n_bootstrap_iterations, + confidence=confidence, + ) + + +def bootstrap_ib_difference_confidence_interval( + ht: hl.Table, + n_bootstrap_iterations: int, + confidence: float, +) -> hl.Table: + """ + Approximate confidence interval of difference in mean execution time of + control and test groups of benchmark trials on different instances via + bootstrap simulations. + """ + return boostrap_confidence_interval( + statistic=lambda ht: __agg_randomized_mean(ht.control) / __agg_randomized_mean(ht.test), + ht=ht, + n_bootstrap_iterations=n_bootstrap_iterations, + confidence=confidence, + ) + + +def bootstrap_tb_difference_confidence_interval( + ht: hl.Table, + n_bootstrap_iterations: int, + confidence: float, +) -> hl.Table: + """ + Approximate confidence interval of difference in mean execution time of + control and test groups of benchmark trials on same instances via + bootstrap simulations. + """ + + def agg_mean(trials: hl.ArrayNumericExpression) -> hl.NumericExpression: + return hl.agg.explode(hl.agg.mean, __randomize_with_replacement(trials)) + + def ratio_of_means(ht: hl.Table) -> hl.NumericExpression: + return __randomize_with_replacement(ht.instances).aggregate( + lambda instance: agg_mean(instance.control) / agg_mean(instance.test) + ) + + return boostrap_confidence_interval( + statistic=ratio_of_means, + ht=ht, + n_bootstrap_iterations=n_bootstrap_iterations, + confidence=confidence, + ) + + +def overlapping_confidence_interval_test( + control: hl.Table, + test: hl.Table, + n_bootstrap_iterations: int, + confidence: float, +) -> hl.Table: + """ + Test for performance changes by comparing overlapping confidence intervals + of mean execution time between a control and test set of benchmark timings + """ + control = bootstrap_mean_confidence_interval(control, n_bootstrap_iterations, confidence) + test = bootstrap_mean_confidence_interval(test, n_bootstrap_iterations, confidence) + return control.select(overlaps=test[control.key].ci.overlaps(control.ci)) + + +def analyze_benchmarks( + control: hl.Table, + test: hl.Table, + n_bootstrap_iterations: int, + confidence: float, +) -> hl.Table: + """ + Example: + + import hail as hl + + from benchmark.tools.impex import import_timings + from benchmark.tools.statistics import analyse_benchmarks + from pathlib import Path + + tables = [ + import_timings(Path('data') / f'{version}.jsonl') + for version in ['0.2.132', '0.2.133'] + ] + + control, test = [ + table + .select(instances=table.instances.trials.time) + ._key_by_assert_sorted(*table.key.drop('version')) + for table in tables + ] + + results = analyze_benchmarks( + control, + test, + n_bootstrap_iterations=10_000, + confidence=.95, + ) + + results.show() + """ + + results = overlapping_confidence_interval_test( + control, + test, + n_bootstrap_iterations, + confidence, + ) + + diffs = bootstrap_ib_difference_confidence_interval( + control.select( + control=control.instances, + test=test[control.key].instances, + ), + n_bootstrap_iterations, + confidence, + ) + + return hl.Table.parallelize( + hl.sorted( + results.select(changed=~results.overlaps, relative_change=diffs[results.key].ci).collect(_localize=False), + key=lambda r: r.relative_change.end, + ) + ) + + +def __select_disjoint(n: hl.Int32Expression, xs: hl.ArrayExpression) -> hl.TupleExpression: + return hl.bind( + lambda n, xs: hl.bind( + lambda len: hl.case() + .when(n <= 2 * len, hl.bind(lambda ys: hl.tuple([ys[:n], ys[n : 2 * n]]), hl.shuffle(xs))) + .or_error("split position '" + hl.str(n) + "' exceeds twice array length '" + hl.str(len) + "'."), + hl.len(xs), + ), + n, + xs, + ) + + +def __ibs( + ht: hl.Table, + ninstances: hl.Int32Expression, + ntrials: hl.Int32Expression, +) -> hl.Table: + return ht.select( + **dict( + zip( + ('control', 'test'), + __select_disjoint(ninstances, ht.instances.map(lambda trials: hl.shuffle(trials)[:ntrials])), + ) + ), + ) + + +def __tbs( + ht: hl.Table, + ninstances: hl.Int32Expression, + ntrials: hl.Int32Expression, +) -> hl.Table: + return ht.select( + **hl.shuffle(ht.instances)[:ninstances].aggregate( + lambda i: hl.struct(**{ + group: hl.agg.collect(trials) + for group, trials in zip( + ('control', 'test'), + __select_disjoint(ntrials, i), + ) + }) + ) + ) + + +def __sel(strategy: Callable, ht: hl.Table, n_experiments: int) -> hl.Table: + ht = ht.annotate(experiment=hl.range(n_experiments)).explode('experiment') + ht = ht._key_by_assert_sorted(*ht.key, 'experiment') + return strategy(ht, ht.ninstances, ht.ntrials) + + +def __scale( + instances: hl.ArrayExpression, + factor: hl.Float64Expression, +) -> hl.ArrayExpression: + return instances.map(lambda instance: instance.map(lambda trial: trial * factor)) + + +def __extend_key(ht: hl.Table, **kwargs) -> hl.Table: + return ht.annotate(**kwargs)._key_by_assert_sorted(*ht.key, *kwargs) + + +def laaber_mds( + ht: hl.Table, + n_bootstrap_iterations: int = 1_000, + n_experiments: int = 100, + confidence: float = 0.95, +) -> hl.Table: + """Minimal detectable slowdown as described in Laaber et al""" + + s = ( + ht.annotate( + slowdown=[1 + x / 10 for x in range(0, 6)], + ninstances=[5, 10, 15, 20, 25], + ntrials=[5, 10, 15, 20, 25], + ) + .explode('slowdown') + .explode('ninstances') + .explode('ntrials') + ._key_by_assert_sorted(*ht.key, 'slowdown', 'ninstances', 'ntrials') + .repartition(cpu_count() * 7) + ) + + ib = __sel(__ibs, __extend_key(s, strategy='ibs'), n_experiments) + tb = __sel(__tbs, __extend_key(s, strategy='tbs'), n_experiments) + s = ib.union(tb) + s = s.annotate(test=__scale(s.test, s.slowdown)) + s = s.checkpoint(hl.utils.new_local_temp_file()) + + mds = overlapping_confidence_interval_test( + s.select(instances=s.control), + s.select(instances=s.test), + n_bootstrap_iterations, + confidence, + ) + + return mds.group_by(*mds.key.drop('strategy', 'experiment')).aggregate( + **hl.bind( + lambda groups: hl.struct(**{k: groups[k] for k in ['ibs', 'tbs']}), + hl.agg.group_by(mds.strategy, hl.agg.fraction(~mds.overlaps)), + ) + ) + + +def schultz_mds( + ht: hl.Table, + n_bootstrap_iterations: int = 1_000, + n_experiments: int = 100, + confidence: float = 0.95, +) -> hl.Table: + s = ( + ht.annotate( + slowdown=[1 + x / 10 for x in range(0, 6)], + ninstances=[5, 10, 15, 20, 25], + ntrials=[5, 10, 15, 20, 25], + ) + .explode('slowdown') + .explode('ninstances') + .explode('ntrials') + ._key_by_assert_sorted(*ht.key, 'slowdown', 'ninstances', 'ntrials') + .repartition(cpu_count() * 7) + ) + + ib = __sel(__ibs, s, n_experiments) + ib = bootstrap_ib_difference_confidence_interval( + ib.annotate(test=__scale(ib.test, ib.slowdown)), + n_bootstrap_iterations, + confidence, + ) + + ib = ib.group_by(*ib.key.drop('experiment')).aggregate(rate=hl.agg.fraction(~ib.ci.contains(1.0))) + + tb = __sel(__tbs, s, n_experiments) + tb = bootstrap_tb_difference_confidence_interval( + tb.select( + instances=hl.map( + lambda control, test: hl.struct(control=control, test=test), + tb.control, + __scale(tb.test, tb.slowdown), + ), + ), + n_bootstrap_iterations, + confidence, + ) + + tb = tb.group_by(*tb.key.drop('experiment')).aggregate(rate=hl.agg.fraction(~tb.ci.contains(1.0))) + + return ib.select(ibs=ib.rate, tbs=tb[ib.key].rate) diff --git a/hail/python/benchmark/tools/summarize.py b/hail/python/benchmark/tools/summarize.py deleted file mode 100644 index a1cd7fe86217..000000000000 --- a/hail/python/benchmark/tools/summarize.py +++ /dev/null @@ -1,37 +0,0 @@ -import json -import logging -from argparse import ArgumentParser - -from . import init_logging - - -def summarize(files): - init_logging() - n_files = len(files) - if n_files < 1: - raise ValueError("'summarize' requires at least 1 file to summarize") - logging.info(f'{len(files)} files to summarize') - - for file in files: - logging.info(f'Summary for {file}:') - - with open(file, 'r') as f: - data = json.load(f) - logging.info(f"config: {data['config']}") - for bm in data['benchmarks']: - if bm['failed']: - logging.info(f"benchmark failed: {bm['name']}") - elif not bm.get('times'): - logging.info(f"benchmark has no times but is not marked failed: {bm['name']}") - if bm.get('timed_out'): - logging.info(f"benchmark timed out: {bm['name']}") - - -if __name__ == '__main__': - parser = ArgumentParser( - 'summarize', - description='Summarize a benchmark json results file', - ) - parser.add_argument("files", type=str, nargs='*', help="JSON files to summarize.") - argv = parser.parse_args() - summarize(argv.files) diff --git a/hail/python/benchmark/tools/visualize.py b/hail/python/benchmark/tools/visualize.py deleted file mode 100644 index 7a4d75052122..000000000000 --- a/hail/python/benchmark/tools/visualize.py +++ /dev/null @@ -1,47 +0,0 @@ -from argparse import ArgumentParser -from collections import defaultdict -from typing import List, Optional - -import matplotlib.pyplot as plt -import pandas as pd - -from .compare import load_file - - -def collect_results(files: 'List[str]', metric: 'str') -> 'pd.DataFrame': - results = defaultdict(lambda: [None] * len(files)) - for k, file in enumerate(files): - for name, time in load_file(file).items(): - results[name][k] = time[metric] if not time['failed'] else None - - return pd.DataFrame(data=results, index=files) - - -def plot(results: 'pd.DataFrame', abs_differences: 'bool', head: 'Optional[int]') -> None: - r_ = results.iloc[1:] - results.iloc[0] - results = r_ if abs_differences else r_ / results.iloc[0] - - if head is not None: - results = results[results.abs().max().sort_values(ascending=False).head(head).keys()] - - results.T.sort_index().plot.bar() - plt.show() - - -def main(args) -> 'None': - files = [args.baseline, *args.runs] - results = collect_results(files, args.metric) - plot(results, args.abs, args.head) - - -if __name__ == '__main__': - parser = ArgumentParser( - 'visualize', - description='Visualize benchmark results', - ) - parser.add_argument('baseline', help='baseline benchmark results') - parser.add_argument('runs', nargs='+', help='benchmarks to compare against baseline') - parser.add_argument('--metric', choices=['mean', 'median', 'stdev', 'max_memory'], default='mean') - parser.add_argument('--head', type=int, help="number of most significant results to take") - parser.add_argument('--abs', action='store_true', help="plot absolute differences") - main(parser.parse_args()) diff --git a/hail/python/hail/expr/__init__.py b/hail/python/hail/expr/__init__.py index bff07932fd93..ce225a7a1adc 100644 --- a/hail/python/hail/expr/__init__.py +++ b/hail/python/hail/expr/__init__.py @@ -3,6 +3,7 @@ Aggregation, ArrayExpression, ArrayNumericExpression, + ArrayStructExpression, BooleanExpression, CallExpression, CollectionExpression, @@ -514,6 +515,7 @@ 'ExpressionException', 'ArrayExpression', 'ArrayNumericExpression', + 'ArrayStructExpression', 'BooleanExpression', 'CallExpression', 'CollectionExpression',