From e8cb1fb1a74bb68f651a13641c288485d1119419 Mon Sep 17 00:00:00 2001 From: Edmund Higham Date: Tue, 3 Dec 2024 15:58:46 -0500 Subject: [PATCH] [query] Various Benchmark Suite Improvements --- hail/python/benchmark/conftest.py | 41 ++---- .../hail/benchmark_benchmark_analysis.py | 5 +- .../benchmark/hail/benchmark_combiner.py | 24 ++-- .../python/benchmark/hail/benchmark_linalg.py | 29 ++-- .../benchmark/hail/benchmark_matrix_table.py | 105 +++++++------- .../benchmark/hail/benchmark_methods.py | 53 +++---- .../benchmark/hail/benchmark_sentinel.py | 6 +- .../benchmark/hail/benchmark_shuffle.py | 29 +++- hail/python/benchmark/hail/benchmark_table.py | 101 ++++++------- hail/python/benchmark/hail/conftest.py | 90 ++++++++---- hail/python/benchmark/hail/fixtures.py | 53 +++++-- hail/python/benchmark/hail/utils.py | 105 +++++++------- hail/python/benchmark/tools/__init__.py | 24 +--- hail/scripts/benchmark_in_batch.py | 135 +++++++++++++----- 14 files changed, 473 insertions(+), 327 deletions(-) diff --git a/hail/python/benchmark/conftest.py b/hail/python/benchmark/conftest.py index 4bcf86295f10..e0b8f30dfbc7 100644 --- a/hail/python/benchmark/conftest.py +++ b/hail/python/benchmark/conftest.py @@ -12,7 +12,7 @@ def pytest_addoption(parser): parser.addoption("--log", type=str, help='Log file path', default=None) parser.addoption("--output", type=str, help="Output file path.", default=None) - parser.addoption("--data-dir", type=str, help="Data directory.", default=None) + parser.addoption("--data-dir", type=str, help="Data directory.", default=os.getenv('HAIL_BENCHMARK_DIR')) parser.addoption('--iterations', type=int, help='override number of iterations for all benchmarks', default=None) parser.addoption('--cores', type=int, help='Number of cores to use.', default=1) parser.addoption( @@ -23,38 +23,19 @@ def pytest_addoption(parser): const='cpu', default=None, ) + parser.addoption( + '--max-duration', + type=int, + help='Maximum permitted duration for any benchmark trial in seconds, not to be confused with pytest-timeout', + default=200, + ) parser.addoption('--max-failures', type=int, help='Stop benchmarking item after this many failures', default=3) - parser.addoption('--profiler-path', type=str, help='path to aysnc profiler', default=None) - parser.addoption('--profiler-fmt', choices=['html', 'flame', 'jfr'], help='Choose profiler output.', default='html') - - -def run_config_from_pytest_config(pytest_config): - return type( - 'RunConfig', - (object,), - { - **{ - flag: pytest_config.getoption(flag) or default - for flag, default in [ - ('log', None), - ('output', None), - ('cores', 1), - ('data_dir', os.getenv('HAIL_BENCHMARK_DIR')), - ('iterations', None), - ('max_failures', None), - ('profile', None), - ('profiler_path', os.getenv('ASYNC_PROFILER_HOME')), - ('profiler_fmt', None), - ] - }, - 'verbose': pytest_config.getoption('verbose') > 0, - 'quiet': pytest_config.getoption('verbose') < 0, - 'timeout': int(pytest_config.getoption('timeout') or 100), - }, + parser.addoption( + '--profiler-path', type=str, help='path to aysnc profiler', default=os.getenv('ASYNC_PROFILER_HOME') ) + parser.addoption('--profiler-fmt', choices=['html', 'flame', 'jfr'], help='Choose profiler output.', default='html') @pytest.hookimpl def pytest_configure(config): - config.run_config = run_config_from_pytest_config(config) - init_logging(file=config.run_config.log) + init_logging(file=config.getoption('log')) diff --git a/hail/python/benchmark/hail/benchmark_benchmark_analysis.py b/hail/python/benchmark/hail/benchmark_benchmark_analysis.py index a89e6b07680b..2b1c147f9c2f 100644 --- a/hail/python/benchmark/hail/benchmark_benchmark_analysis.py +++ b/hail/python/benchmark/hail/benchmark_benchmark_analysis.py @@ -1,12 +1,13 @@ import tempfile from pathlib import Path -from benchmark.tools import benchmark +import pytest + from benchmark.tools.impex import dump_tsv, import_timings from benchmark.tools.statistics import analyze_benchmarks -@benchmark() +@pytest.mark.benchmark() def benchmark_analyze_benchmarks(local_tmpdir, onethreetwo, onethreethree): inputs = (onethreetwo, onethreethree) inputs = ((v, Path(tempfile.mktemp(dir=local_tmpdir))) for v in inputs) diff --git a/hail/python/benchmark/hail/benchmark_combiner.py b/hail/python/benchmark/hail/benchmark_combiner.py index 89467020a866..1d6848d08e38 100644 --- a/hail/python/benchmark/hail/benchmark_combiner.py +++ b/hail/python/benchmark/hail/benchmark_combiner.py @@ -3,7 +3,8 @@ import pytest import hail as hl -from benchmark.tools import benchmark, chunk +from benchmark.hail.utils import XFail +from benchmark.tools import chunk from hail.vds.combiner import combine_variant_datasets, new_combiner, transform_gvcf COMBINE_GVCF_MAX = 100 @@ -14,7 +15,8 @@ def import_vcf(path): return hl.import_vcf(str(path), reference_genome='GRCh38', force=True) -@benchmark() +@pytest.mark.benchmark() +@pytest.mark.xfail(raises=TimeoutError, reason=XFail.Timeout) @with_flags(no_ir_logging='1') def benchmark_compile_2k_merge(empty_gvcf, tmp_path): vcf = import_vcf(empty_gvcf) @@ -23,13 +25,14 @@ def benchmark_compile_2k_merge(empty_gvcf, tmp_path): hl.vds.write_variant_datasets(combined, str(tmp_path / 'combiner-multi-write'), overwrite=True) -@benchmark() +@pytest.mark.benchmark() +@pytest.mark.xfail(raises=TimeoutError, reason=XFail.Timeout) def benchmark_python_only_10k_transform(empty_gvcf): for vcf in [import_vcf(empty_gvcf)] * 10_000: transform_gvcf(vcf, []) -@benchmark() +@pytest.mark.benchmark() def benchmark_python_only_10k_combine(empty_gvcf): vcf = import_vcf(empty_gvcf) mt = transform_gvcf(vcf, []) @@ -37,7 +40,7 @@ def benchmark_python_only_10k_combine(empty_gvcf): combine_variant_datasets(mts) -@benchmark() +@pytest.mark.benchmark() def benchmark_import_and_transform_gvcf(single_gvcf): mt = import_vcf(single_gvcf) vds = transform_gvcf(mt, []) @@ -45,7 +48,7 @@ def benchmark_import_and_transform_gvcf(single_gvcf): vds.variant_data._force_count_rows() -@benchmark() +@pytest.mark.benchmark() def benchmark_import_gvcf_force_count(single_gvcf): mt = import_vcf(single_gvcf) mt._force_count_rows() @@ -60,14 +63,15 @@ def tmp_and_output_paths(tmp_path): return (tmp, output) -@benchmark() +@pytest.mark.benchmark() +@pytest.mark.xfail(raises=TimeoutError, reason=XFail.Timeout) def benchmark_vds_combiner_chr22(chr22_gvcfs, tmp_and_output_paths): parts = hl.eval([hl.parse_locus_interval('chr22:start-end', reference_genome='GRCh38')]) - + tmp, output = tmp_and_output_paths combiner = new_combiner( - output_path=str(tmp_and_output_paths[0]), + output_path=str(output), intervals=parts, - temp_path=str(tmp_and_output_paths[1]), + temp_path=str(tmp), gvcf_paths=[str(path) for path in chr22_gvcfs], reference_genome='GRCh38', branch_factor=16, diff --git a/hail/python/benchmark/hail/benchmark_linalg.py b/hail/python/benchmark/hail/benchmark_linalg.py index 0625b1818c36..ea66253fcbee 100644 --- a/hail/python/benchmark/hail/benchmark_linalg.py +++ b/hail/python/benchmark/hail/benchmark_linalg.py @@ -1,8 +1,11 @@ +import pytest + import hail as hl -from benchmark.tools import benchmark +from benchmark.hail.utils import XFail -@benchmark() +@pytest.mark.benchmark() +@pytest.mark.xfail(raises=TimeoutError, reason=XFail.Timeout) def benchmark_block_matrix_nested_multiply(tmp_path): bm = hl.linalg.BlockMatrix.random(8 * 1024, 8 * 1024) bm = bm.checkpoint(str(tmp_path / 'checkpoint.mt')) @@ -10,39 +13,41 @@ def benchmark_block_matrix_nested_multiply(tmp_path): bm.write(str(tmp_path / 'result.mt'), overwrite=True) -@benchmark() +@pytest.mark.benchmark() def benchmark_make_ndarray(): ht = hl.utils.range_table(200_000) ht = ht.annotate(x=hl.nd.array(hl.range(ht.idx))) ht._force_count() -@benchmark() +@pytest.mark.benchmark() def benchmark_ndarray_addition(): arr = hl.nd.ones((1024, 1024)) hl.eval(arr + arr) -@benchmark() +@pytest.mark.benchmark() def benchmark_ndarray_matmul_int64(): arr = hl.nd.arange(1024 * 1024).map(hl.int64).reshape((1024, 1024)) hl.eval(arr @ arr) -@benchmark() +@pytest.mark.benchmark() def benchmark_ndarray_matmul_float64(): arr = hl.nd.arange(1024 * 1024).map(hl.float64).reshape((1024, 1024)) hl.eval(arr @ arr) -@benchmark() +@pytest.mark.benchmark() +@pytest.mark.xfail(raises=TimeoutError, reason=XFail.Timeout) def benchmark_blockmatrix_write_from_entry_expr_range_mt(tmp_path): mt = hl.utils.range_matrix_table(40_000, 40_000, n_partitions=4) path = str(tmp_path / 'result.bm') hl.linalg.BlockMatrix.write_from_entry_expr(mt.row_idx + mt.col_idx, path) -@benchmark() +@pytest.mark.benchmark() +@pytest.mark.xfail(raises=TimeoutError, reason=XFail.Timeout) def benchmark_blockmatrix_write_from_entry_expr_range_mt_standardize(tmp_path): mt = hl.utils.range_matrix_table(40_000, 40_000, n_partitions=4) path = str(tmp_path / 'result.bm') @@ -51,20 +56,22 @@ def benchmark_blockmatrix_write_from_entry_expr_range_mt_standardize(tmp_path): ) -@benchmark() +@pytest.mark.benchmark() def benchmark_sum_table_of_ndarrays(): ht = hl.utils.range_table(400).annotate(nd=hl.nd.ones((4096, 4096))) ht.aggregate(hl.agg.ndarray_sum(ht.nd)) -@benchmark() +@pytest.mark.benchmark() +@pytest.mark.xfail(raises=TimeoutError, reason=XFail.Timeout) def benchmark_block_matrix_to_matrix_table_row_major(): mt = hl.utils.range_matrix_table(20_000, 20_000, n_partitions=4) bm = hl.linalg.BlockMatrix.from_entry_expr(mt.row_idx + mt.col_idx) bm.to_matrix_table_row_major()._force_count_rows() -@benchmark() +@pytest.mark.benchmark() +@pytest.mark.xfail(raises=TimeoutError, reason=XFail.Timeout) def benchmark_king(tmp_path): mt = hl.balding_nichols_model(6, n_variants=10000, n_samples=4096) path = str(tmp_path / 'result.mt') diff --git a/hail/python/benchmark/hail/benchmark_matrix_table.py b/hail/python/benchmark/hail/benchmark_matrix_table.py index bf9a65abe1e5..43d2dac0dffd 100644 --- a/hail/python/benchmark/hail/benchmark_matrix_table.py +++ b/hail/python/benchmark/hail/benchmark_matrix_table.py @@ -1,126 +1,127 @@ import pytest import hail as hl -from benchmark.tools import benchmark +from benchmark.hail.utils import XFail -@benchmark() +@pytest.mark.benchmark() def benchmark_matrix_table_decode_and_count(profile25_mt): mt = hl.read_matrix_table(str(profile25_mt)) mt._force_count_rows() -@benchmark() +@pytest.mark.benchmark() def benchmark_matrix_table_decode_and_count_just_gt(profile25_mt): mt = hl.read_matrix_table(str(profile25_mt)).select_entries('GT') mt._force_count_rows() -@benchmark() +@pytest.mark.benchmark() def benchmark_matrix_table_array_arithmetic(profile25_mt): mt = hl.read_matrix_table(str(profile25_mt)) mt = mt.filter_rows(mt.alleles.length() == 2) mt.select_entries(dosage=hl.pl_dosage(mt.PL)).select_rows()._force_count_rows() -@benchmark() +@pytest.mark.benchmark() def benchmark_matrix_table_entries_table(profile25_mt): mt = hl.read_matrix_table(str(profile25_mt)) mt.entries()._force_count() -@benchmark() +@pytest.mark.benchmark() def benchmark_matrix_table_entries_table_no_key(profile25_mt): mt = hl.read_matrix_table(str(profile25_mt)).key_rows_by().key_cols_by() mt.entries()._force_count() -@benchmark() +@pytest.mark.benchmark() def benchmark_matrix_table_rows_force_count(profile25_mt): ht = hl.read_matrix_table(str(profile25_mt)).rows().key_by() ht._force_count() -@benchmark() +@pytest.mark.benchmark() def benchmark_matrix_table_show(profile25_mt): mt = hl.read_matrix_table(str(profile25_mt)) mt.show(100) -@benchmark() +@pytest.mark.benchmark() def benchmark_matrix_table_rows_show(profile25_mt): mt = hl.read_matrix_table(str(profile25_mt)) mt.rows().show(100) -@benchmark() +@pytest.mark.benchmark() def benchmark_matrix_table_cols_show(profile25_mt): mt = hl.read_matrix_table(str(profile25_mt)) mt.cols().show(100) -@benchmark() +@pytest.mark.benchmark() def benchmark_matrix_table_take_entry(profile25_mt): mt = hl.read_matrix_table(str(profile25_mt)) mt.GT.take(100) -@benchmark() +@pytest.mark.benchmark() def benchmark_matrix_table_entries_show(profile25_mt): mt = hl.read_matrix_table(str(profile25_mt)) mt.entries().show() -@benchmark() +@pytest.mark.benchmark() def benchmark_matrix_table_take_row(profile25_mt): mt = hl.read_matrix_table(str(profile25_mt)) mt.info.AF.take(100) -@benchmark() +@pytest.mark.benchmark() def benchmark_matrix_table_take_col(profile25_mt): mt = hl.read_matrix_table(str(profile25_mt)) mt.s.take(100) -@benchmark() +@pytest.mark.benchmark() def benchmark_write_range_matrix_table_p100(tmp_path): mt = hl.utils.range_matrix_table(n_rows=1_000_000, n_cols=10, n_partitions=100) mt = mt.annotate_entries(x=mt.col_idx + mt.row_idx) mt.write(str(tmp_path / 'tmp.mt')) -@benchmark() +@pytest.mark.benchmark() def benchmark_write_profile_mt(profile25_mt, tmp_path): hl.read_matrix_table(str(profile25_mt)).write(str(tmp_path / 'tmp.mt')) -@benchmark() +@pytest.mark.benchmark() def benchmark_matrix_table_rows_is_transition(profile25_mt): ht = hl.read_matrix_table(str(profile25_mt)).rows().key_by() ht.select(is_snp=hl.is_snp(ht.alleles[0], ht.alleles[1]))._force_count() -@benchmark() +@pytest.mark.benchmark() def benchmark_matrix_table_filter_entries(profile25_mt): mt = hl.read_matrix_table(str(profile25_mt)) mt.filter_entries((mt.GQ > 8) & (mt.DP > 2))._force_count_rows() -@benchmark() +@pytest.mark.benchmark() def benchmark_matrix_table_filter_entries_unfilter(profile25_mt): mt = hl.read_matrix_table(str(profile25_mt)) mt.filter_entries((mt.GQ > 8) & (mt.DP > 2)).unfilter_entries()._force_count_rows() -@benchmark() +@pytest.mark.benchmark() +@pytest.mark.xfail(raises=TimeoutError, reason=XFail.Timeout) def benchmark_matrix_table_nested_annotate_rows_annotate_entries(profile25_mt): mt = hl.read_matrix_table(str(profile25_mt)) mt = mt.annotate_rows(r0=mt.info.AF[0] + 1) mt = mt.annotate_entries(e0=mt.GQ + 5) for i in range(1, 20): - mt = mt.annotate_rows(**{f'r{i}': mt[f'r{i-1}'] + 1}) - mt = mt.annotate_entries(**{f'e{i}': mt[f'e{i-1}'] + 1}) + mt = mt.annotate_rows(**{f'r{i}': mt[f'r{i - 1}'] + 1}) + mt = mt.annotate_entries(**{f'e{i}': mt[f'e{i - 1}'] + 1}) mt._force_count_rows() @@ -163,36 +164,35 @@ def many_aggs(mt): return {f'x{i}': expr for i, expr in enumerate(aggs)} -@benchmark() +@pytest.mark.benchmark() def benchmark_matrix_table_many_aggs_row_wise(profile25_mt): mt = hl.read_matrix_table(str(profile25_mt)) mt = mt.annotate_rows(**many_aggs(mt)) mt.rows()._force_count() -@benchmark() +@pytest.mark.benchmark() def benchmark_matrix_table_many_aggs_col_wise(profile25_mt): mt = hl.read_matrix_table(str(profile25_mt)) mt = mt.annotate_cols(**many_aggs(mt)) mt.cols()._force_count() -@benchmark() +@pytest.mark.benchmark() def benchmark_matrix_table_aggregate_entries(profile25_mt): mt = hl.read_matrix_table(str(profile25_mt)) mt.aggregate_entries(hl.agg.stats(mt.GQ)) -@benchmark() +@pytest.mark.benchmark() def benchmark_matrix_table_call_stats_star_star(profile25_mt): mt = hl.read_matrix_table(str(profile25_mt)) mt.annotate_rows(**hl.agg.call_stats(mt.GT, mt.alleles))._force_count_rows() -@benchmark() -@pytest.mark.skip(reason='never finishes') -def benchmark_gnomad_coverage_stats(profile25_mt): - mt = hl.read_matrix_table(str(profile25_mt)) +@pytest.mark.benchmark() +def benchmark_gnomad_coverage_stats(gnomad_dp_sim): + mt = hl.read_matrix_table(str(gnomad_dp_sim)) def get_coverage_expr(mt): cov_arrays = hl.literal({ @@ -217,12 +217,16 @@ def get_coverage_expr(mt): ), ) - mt = mt.annotate_rows(mean=hl.agg.mean(mt.x), median=hl.median(hl.agg.collect(mt.x)), **get_coverage_expr(mt)) + mt = mt.annotate_rows( + mean=hl.agg.mean(mt.x), + median=hl.median(hl.agg.collect(mt.x)), + **get_coverage_expr(mt), + ) mt.rows()._force_count() -@benchmark() -def gnomad_coverage_stats_optimized(gnomad_dp_sim): +@pytest.mark.benchmark() +def benchmark_gnomad_coverage_stats_optimized(gnomad_dp_sim): mt = hl.read_matrix_table(str(gnomad_dp_sim)) mt = mt.annotate_rows( mean=hl.agg.mean(mt.x), @@ -238,24 +242,24 @@ def gnomad_coverage_stats_optimized(gnomad_dp_sim): mt.rows()._force_count() -@benchmark() +@pytest.mark.benchmark() def benchmark_per_row_stats_star_star(gnomad_dp_sim): mt = hl.read_matrix_table(str(gnomad_dp_sim)) mt.annotate_rows(**hl.agg.stats(mt.x))._force_count_rows() -@benchmark() +@pytest.mark.benchmark() def benchmark_read_decode_gnomad_coverage(gnomad_dp_sim): hl.read_matrix_table(str(gnomad_dp_sim))._force_count_rows() -@benchmark() +@pytest.mark.benchmark() def benchmark_import_bgen_force_count_just_gp(sim_ukb_bgen, sim_ukb_sample): mt = hl.import_bgen(str(sim_ukb_bgen), sample_file=str(sim_ukb_sample), entry_fields=['GP'], n_partitions=8) mt._force_count_rows() -@benchmark() +@pytest.mark.benchmark() def benchmark_import_bgen_force_count_all(sim_ukb_bgen, sim_ukb_sample): mt = hl.import_bgen( str(sim_ukb_bgen), sample_file=str(sim_ukb_sample), entry_fields=['GT', 'GP', 'dosage'], n_partitions=8 @@ -263,47 +267,49 @@ def benchmark_import_bgen_force_count_all(sim_ukb_bgen, sim_ukb_sample): mt._force_count_rows() -@benchmark() +@pytest.mark.benchmark() +@pytest.mark.xfail(raises=TimeoutError, reason=XFail.Timeout) def benchmark_import_bgen_info_score(sim_ukb_bgen, sim_ukb_sample): mt = hl.import_bgen(str(sim_ukb_bgen), sample_file=str(sim_ukb_sample), entry_fields=['GP'], n_partitions=8) mt = mt.annotate_rows(info_score=hl.agg.info_score(mt.GP)) mt.rows().select('info_score')._force_count() -@benchmark() +@pytest.mark.benchmark() def benchmark_import_bgen_filter_count(sim_ukb_bgen, sim_ukb_sample): mt = hl.import_bgen(str(sim_ukb_bgen), sample_file=str(sim_ukb_sample), entry_fields=['GT', 'GP'], n_partitions=8) mt = mt.filter_rows(mt.alleles == ['A', 'T']) mt._force_count_rows() -@benchmark() +@pytest.mark.benchmark() def benchmark_export_range_matrix_table_entry_field_p100(tmp_path): mt = hl.utils.range_matrix_table(n_rows=1_000_000, n_cols=10, n_partitions=100) mt = mt.annotate_entries(x=mt.col_idx + mt.row_idx) mt.x.export(str(tmp_path / 'result.txt')) -@benchmark() +@pytest.mark.benchmark() def benchmark_export_range_matrix_table_row_p100(tmp_path): mt = hl.utils.range_matrix_table(n_rows=1_000_000, n_cols=10, n_partitions=100) mt.row.export(str(tmp_path / 'result.txt')) -@benchmark() +@pytest.mark.benchmark() def benchmark_export_range_matrix_table_col_p100(tmp_path): mt = hl.utils.range_matrix_table(n_rows=1_000_000, n_cols=10, n_partitions=100) mt.col.export(str(tmp_path / 'result.txt')) -@benchmark() +@pytest.mark.benchmark() +@pytest.mark.xfail(raises=TimeoutError, reason=XFail.Timeout) def benchmark_large_range_matrix_table_sum(): mt = hl.utils.range_matrix_table(n_cols=500000, n_rows=10000, n_partitions=2500) mt = mt.annotate_entries(x=mt.col_idx + mt.row_idx) mt.annotate_cols(foo=hl.agg.sum(mt.x))._force_count_cols() -@benchmark() +@pytest.mark.benchmark() def benchmark_kyle_sex_specific_qc(profile25_mt): mt = hl.read_matrix_table(str(profile25_mt)) mt = mt.annotate_cols(sex=hl.if_else(hl.rand_bool(0.5), 'Male', 'Female')) @@ -344,35 +350,36 @@ def benchmark_kyle_sex_specific_qc(profile25_mt): mt.rows()._force_count() -@benchmark() +@pytest.mark.benchmark() def benchmark_matrix_table_scan_count_rows_2(): mt = hl.utils.range_matrix_table(n_rows=200_000_000, n_cols=10, n_partitions=16) mt = mt.annotate_rows(x=hl.scan.count()) mt._force_count_rows() -@benchmark() +@pytest.mark.benchmark() def benchmark_matrix_table_scan_count_cols_2(): mt = hl.utils.range_matrix_table(n_cols=10_000_000, n_rows=10) mt = mt.annotate_cols(x=hl.scan.count()) mt._force_count_rows() -@benchmark() +@pytest.mark.benchmark() +@pytest.mark.xfail(raises=TimeoutError, reason=XFail.Timeout) def benchmark_matrix_multi_write_nothing(tmp_path): mt = hl.utils.range_matrix_table(1, 1, n_partitions=1) mts = [mt] * 1000 hl.experimental.write_matrix_tables(mts, str(tmp_path / 'multi-write')) -@benchmark() +@pytest.mark.benchmark() def benchmark_mt_localize_and_collect(profile25_mt): mt = hl.read_matrix_table(str(profile25_mt)) ht = mt.localize_entries("ent") ht.head(150).collect() -@benchmark() +@pytest.mark.benchmark() def benchmark_mt_group_by_memory_usage(random_doubles_mt): mt = hl.read_matrix_table(str(random_doubles_mt)) mt = mt.group_rows_by(new_idx=mt.row_idx % 3).aggregate(x=hl.agg.mean(mt.x)) diff --git a/hail/python/benchmark/hail/benchmark_methods.py b/hail/python/benchmark/hail/benchmark_methods.py index 787f6ff16cb2..43a79e275188 100644 --- a/hail/python/benchmark/hail/benchmark_methods.py +++ b/hail/python/benchmark/hail/benchmark_methods.py @@ -1,44 +1,46 @@ +import pytest + import hail as hl -from benchmark.tools import benchmark +from benchmark.hail.utils import XFail -@benchmark() +@pytest.mark.benchmark() def benchmark_import_vcf_write(profile25_vcf, tmp_path): mt = hl.import_vcf(str(profile25_vcf)) out = str(tmp_path / 'out.mt') mt.write(out) -@benchmark() +@pytest.mark.benchmark() def benchmark_import_vcf_count_rows(profile25_vcf): mt = hl.import_vcf(str(profile25_vcf)) mt.count_rows() -@benchmark() +@pytest.mark.benchmark() def benchmark_export_vcf(profile25_mt, tmp_path): mt = hl.read_matrix_table(str(profile25_mt)) out = str(tmp_path / 'out.vcf.bgz') hl.export_vcf(mt, out) -@benchmark() +@pytest.mark.benchmark() def benchmark_sample_qc(profile25_mt): hl.sample_qc(hl.read_matrix_table(str(profile25_mt))).cols()._force_count() -@benchmark() +@pytest.mark.benchmark() def benchmark_variant_qc(profile25_mt): hl.variant_qc(hl.read_matrix_table(str(profile25_mt))).rows()._force_count() -@benchmark() +@pytest.mark.benchmark() def benchmark_variant_and_sample_qc(profile25_mt): mt = hl.read_matrix_table(str(profile25_mt)) hl.sample_qc(hl.variant_qc(mt))._force_count_rows() -@benchmark() +@pytest.mark.benchmark() def benchmark_variant_and_sample_qc_nested_with_filters_2(profile25_mt): mt = hl.read_matrix_table(str(profile25_mt)) mt = hl.variant_qc(mt) @@ -52,7 +54,7 @@ def benchmark_variant_and_sample_qc_nested_with_filters_2(profile25_mt): mt.count() -@benchmark() +@pytest.mark.benchmark() def benchmark_variant_and_sample_qc_nested_with_filters_4(profile25_mt): mt = hl.read_matrix_table(str(profile25_mt)) mt = hl.variant_qc(mt) @@ -74,7 +76,7 @@ def benchmark_variant_and_sample_qc_nested_with_filters_4(profile25_mt): mt.count() -@benchmark() +@pytest.mark.benchmark() def benchmark_variant_and_sample_qc_nested_with_filters_4_counts(profile25_mt): mt = hl.read_matrix_table(str(profile25_mt)) mt = hl.variant_qc(mt) @@ -99,40 +101,40 @@ def benchmark_variant_and_sample_qc_nested_with_filters_4_counts(profile25_mt): mt.count() -@benchmark() +@pytest.mark.benchmark() def benchmark_hwe_normalized_pca(profile25_mt): mt = hl.read_matrix_table(str(profile25_mt)) mt = mt.filter_rows(mt.info.AF[0] > 0.01) hl.hwe_normalized_pca(mt.GT) -@benchmark() +@pytest.mark.benchmark() def benchmark_hwe_normalized_pca_blanczos_small_data_0_iterations(profile25_mt): mt = hl.read_matrix_table(str(profile25_mt)) mt = mt.filter_rows(mt.info.AF[0] > 0.01) hl._hwe_normalized_blanczos(mt.GT, q_iterations=0) -@benchmark() +@pytest.mark.benchmark() def benchmark_hwe_normalized_pca_blanczos_small_data_10_iterations(profile25_mt): mt = hl.read_matrix_table(str(profile25_mt)) mt = mt.filter_rows(mt.info.AF[0] > 0.01) hl._hwe_normalized_blanczos(mt.GT, q_iterations=10) -@benchmark() +@pytest.mark.benchmark() def benchmark_split_multi_hts(profile25_mt): mt = hl.read_matrix_table(str(profile25_mt)) hl.split_multi_hts(mt)._force_count_rows() -@benchmark() +@pytest.mark.benchmark() def benchmark_split_multi(profile25_mt): mt = hl.read_matrix_table(str(profile25_mt)) hl.split_multi(mt)._force_count_rows() -@benchmark() +@pytest.mark.benchmark() def benchmark_concordance(profile25_mt): mt = hl.read_matrix_table(str(profile25_mt)) mt = mt.filter_rows(mt.alleles.length() == 2) @@ -141,7 +143,7 @@ def benchmark_concordance(profile25_mt): c._force_count() -@benchmark() +@pytest.mark.benchmark() def benchmark_genetics_pipeline(profile25_mt, tmp_path): mt = hl.read_matrix_table(str(profile25_mt)) mt = hl.split_multi_hts(mt) @@ -153,14 +155,15 @@ def benchmark_genetics_pipeline(profile25_mt, tmp_path): mt.write(str(tmp_path / 'genetics_pipeline.mt')) -@benchmark() +@pytest.mark.benchmark() +@pytest.mark.xfail(raises=TimeoutError, reason=XFail.Timeout) def benchmark_ld_prune_profile_25(profile25_mt): mt = hl.read_matrix_table(str(profile25_mt)) mt = mt.filter_rows(hl.len(mt.alleles) == 2) hl.ld_prune(mt.GT)._force_count() -@benchmark() +@pytest.mark.benchmark() def benchmark_pc_relate(profile25_mt): mt = hl.read_matrix_table(str(profile25_mt)) mt = mt.annotate_cols(scores=hl.range(2).map(lambda x: hl.rand_unif(0, 1))) @@ -168,7 +171,8 @@ def benchmark_pc_relate(profile25_mt): rel._force_count() -@benchmark() +@pytest.mark.benchmark() +@pytest.mark.xfail(raises=TimeoutError, reason=XFail.Timeout) def benchmark_pc_relate_5k_5k(balding_nichols_5k_5k): mt = hl.read_matrix_table(str(balding_nichols_5k_5k)) mt = mt.annotate_cols(scores=hl.range(2).map(lambda x: hl.rand_unif(0, 1))) @@ -176,7 +180,7 @@ def benchmark_pc_relate_5k_5k(balding_nichols_5k_5k): rel._force_count() -@benchmark() +@pytest.mark.benchmark() def benchmark_linear_regression_rows(random_doubles_mt): mt = hl.read_matrix_table(str(random_doubles_mt)) num_phenos = 100 @@ -191,7 +195,7 @@ def benchmark_linear_regression_rows(random_doubles_mt): res._force_count() -@benchmark() +@pytest.mark.benchmark() def benchmark_linear_regression_rows_nd(random_doubles_mt): mt = hl.read_matrix_table(str(random_doubles_mt)) num_phenos = 100 @@ -206,7 +210,7 @@ def benchmark_linear_regression_rows_nd(random_doubles_mt): res._force_count() -@benchmark() +@pytest.mark.benchmark() def benchmark_logistic_regression_rows_wald(random_doubles_mt): mt = hl.read_matrix_table(str(random_doubles_mt)) mt = mt.head(2000) @@ -222,7 +226,8 @@ def benchmark_logistic_regression_rows_wald(random_doubles_mt): res._force_count() -@benchmark() +@pytest.mark.benchmark() +@pytest.mark.xfail(raises=TimeoutError, reason=XFail.Timeout) def benchmark_logistic_regression_rows_wald_nd(random_doubles_mt): mt = hl.read_matrix_table(str(random_doubles_mt)) mt = mt.head(2000) diff --git a/hail/python/benchmark/hail/benchmark_sentinel.py b/hail/python/benchmark/hail/benchmark_sentinel.py index 9c881b0f10e7..8f82fc648406 100644 --- a/hail/python/benchmark/hail/benchmark_sentinel.py +++ b/hail/python/benchmark/hail/benchmark_sentinel.py @@ -1,16 +1,16 @@ import gzip -from benchmark.tools import benchmark +import pytest -@benchmark(iterations=15) +@pytest.mark.benchmark() def benchmark_sentinel_read_gunzip(many_ints_tsv): with gzip.open(many_ints_tsv) as f: for _ in f: pass -@benchmark(iterations=15) +@pytest.mark.benchmark() def benchmark_sentinel_cpu_hash_1(): x = 0 for _ in range(10_000): diff --git a/hail/python/benchmark/hail/benchmark_shuffle.py b/hail/python/benchmark/hail/benchmark_shuffle.py index 13ee24ef40a2..d1c963feeff6 100644 --- a/hail/python/benchmark/hail/benchmark_shuffle.py +++ b/hail/python/benchmark/hail/benchmark_shuffle.py @@ -1,8 +1,23 @@ +import pytest + import hail as hl -from benchmark.tools import benchmark -@benchmark() +@pytest.fixture(autouse=True) +def new_query_tmpdir(tmp_path): + # if hl.version() < '0.2.134': + # yield + # else: + backend = hl.current_backend() + old = backend.local_tmpdir + backend.local_tmpdir = str(tmp_path) + try: + yield + finally: + backend.local_tmpdir = old + + +@pytest.mark.benchmark() def benchmark_shuffle_key_rows_by_mt(profile25_mt): mt = hl.read_matrix_table(str(profile25_mt)) mt = mt.annotate_rows(reversed_position_locus=hl.struct(contig=mt.locus.contig, position=-mt.locus.position)) @@ -10,14 +25,14 @@ def benchmark_shuffle_key_rows_by_mt(profile25_mt): mt._force_count_rows() -@benchmark() +@pytest.mark.benchmark() def benchmark_shuffle_order_by_10m_int(): t = hl.utils.range_table(10_000_000, n_partitions=100) t = t.order_by(-t.idx) t._force_count() -@benchmark() +@pytest.mark.benchmark() def benchmark_shuffle_key_rows_by_4096_byte_rows(): mt = hl.utils.range_matrix_table(100_000, (1 << 12) // 4) mt = mt.annotate_entries(entry=mt.row_idx * mt.col_idx) @@ -25,7 +40,7 @@ def benchmark_shuffle_key_rows_by_4096_byte_rows(): mt._force_count_rows() -@benchmark() +@pytest.mark.benchmark() def benchmark_shuffle_key_rows_by_65k_byte_rows(): mt = hl.utils.range_matrix_table(10_000, (1 << 16) // 4) mt = mt.annotate_entries(entry=mt.row_idx * mt.col_idx) @@ -33,13 +48,13 @@ def benchmark_shuffle_key_rows_by_65k_byte_rows(): mt._force_count_rows() -@benchmark() +@pytest.mark.benchmark() def benchmark_shuffle_key_by_aggregate_bad_locality(many_ints_ht): ht = hl.read_table(str(many_ints_ht)) ht.group_by(x=ht.i0 % 1000).aggregate(c=hl.agg.count(), m=hl.agg.mean(ht.i2))._force_count() -@benchmark() +@pytest.mark.benchmark() def benchmark_shuffle_key_by_aggregate_good_locality(many_ints_ht): ht = hl.read_table(str(many_ints_ht)) divisor = 7_500_000 / 51 # should ensure each partition never overflows default buffer size diff --git a/hail/python/benchmark/hail/benchmark_table.py b/hail/python/benchmark/hail/benchmark_table.py index 37d711a8ed56..295e4701b002 100644 --- a/hail/python/benchmark/hail/benchmark_table.py +++ b/hail/python/benchmark/hail/benchmark_table.py @@ -2,10 +2,10 @@ import hail as hl from benchmark.hail.fixtures import many_partitions_ht -from benchmark.tools import benchmark +from benchmark.hail.utils import XFail -@benchmark() +@pytest.mark.benchmark() def benchmark_table_key_by_shuffle(): n = 1_000_000 ht = hl.utils.range_table(n) @@ -13,7 +13,7 @@ def benchmark_table_key_by_shuffle(): ht._force_count() -@benchmark() +@pytest.mark.benchmark() def benchmark_table_group_by_aggregate_sorted(): n = 10_000_000 ht = hl.utils.range_table(n) @@ -21,7 +21,7 @@ def benchmark_table_group_by_aggregate_sorted(): ht._force_count() -@benchmark() +@pytest.mark.benchmark() def benchmark_table_group_by_aggregate_unsorted(): n = 10_000_000 ht = hl.utils.range_table(n) @@ -29,26 +29,26 @@ def benchmark_table_group_by_aggregate_unsorted(): ht._force_count() -@benchmark() +@pytest.mark.benchmark() def benchmark_table_range_force_count(): hl.utils.range_table(100_000_000)._force_count() -@benchmark() +@pytest.mark.benchmark() def benchmark_table_range_join_1b_1k(): ht1 = hl.utils.range_table(1_000_000_000) ht2 = hl.utils.range_table(1_000) ht1.join(ht2, 'inner').count() -@benchmark() +@pytest.mark.benchmark() def benchmark_table_range_join_1b_1b(): ht1 = hl.utils.range_table(1_000_000_000) ht2 = hl.utils.range_table(1_000_000_000) ht1.join(ht2, 'inner').count() -@benchmark() +@pytest.mark.benchmark() def benchmark_table_python_construction(): n = 100 ht = hl.utils.range_table(100) @@ -56,7 +56,7 @@ def benchmark_table_python_construction(): ht = ht.annotate(**{f'x_{i}': 0}) -@benchmark() +@pytest.mark.benchmark() def benchmark_table_big_aggregate_compilation(): n = 1_000 ht = hl.utils.range_table(1) @@ -64,7 +64,7 @@ def benchmark_table_big_aggregate_compilation(): ht.aggregate(expr) -@benchmark() +@pytest.mark.benchmark() def benchmark_table_big_aggregate_compile_and_execute(): n = 1_000 m = 1_000_000 @@ -73,7 +73,7 @@ def benchmark_table_big_aggregate_compile_and_execute(): ht.aggregate(expr) -@benchmark() +@pytest.mark.benchmark() @pytest.mark.parametrize('m, n', [(1_000_000, 1_000_000), (1_000_000, 1_000)]) def benchmark_table_foreign_key_join(m, n): ht = hl.utils.range_table(m) @@ -81,7 +81,7 @@ def benchmark_table_foreign_key_join(m, n): ht.annotate(x=ht2[(m - 1 - ht.idx) % n])._force_count() -@benchmark() +@pytest.mark.benchmark() def benchmark_table_aggregate_array_sum(): n = 10_000_000 m = 100 @@ -89,7 +89,7 @@ def benchmark_table_aggregate_array_sum(): ht.aggregate(hl.agg.array_sum(hl.range(0, m))) -@benchmark() +@pytest.mark.benchmark() def benchmark_table_annotate_many_flat(): n = 1_000_000 m = 100 @@ -98,7 +98,8 @@ def benchmark_table_annotate_many_flat(): ht._force_count() -@benchmark() +@pytest.mark.benchmark() +@pytest.mark.skip(reason='never finishes') def benchmark_table_annotate_many_nested_no_dependence(): n = 1_000_000 m = 100 @@ -108,7 +109,8 @@ def benchmark_table_annotate_many_nested_no_dependence(): ht._force_count() -@benchmark() +@pytest.mark.benchmark() +@pytest.mark.skip(reason='never finishes') def benchmark_table_annotate_many_nested_dependence_constants(): n = 1_000_000 m = 100 @@ -118,7 +120,8 @@ def benchmark_table_annotate_many_nested_dependence_constants(): ht._force_count() -@benchmark() +@pytest.mark.benchmark() +@pytest.mark.skip(reason='never finishes') def benchmark_table_annotate_many_nested_dependence(): n = 1_000_000 m = 100 @@ -129,19 +132,19 @@ def benchmark_table_annotate_many_nested_dependence(): ht._force_count() -@benchmark() +@pytest.mark.benchmark() def benchmark_table_read_force_count_ints(many_ints_ht): ht = hl.read_table(str(many_ints_ht)) ht._force_count() -@benchmark() +@pytest.mark.benchmark() def benchmark_table_read_force_count_strings(many_strings_ht): ht = hl.read_table(str(many_strings_ht)) ht._force_count() -@benchmark() +@pytest.mark.benchmark() def benchmark_table_import_ints(many_ints_tsv): hl.import_table( str(many_ints_tsv), @@ -149,17 +152,17 @@ def benchmark_table_import_ints(many_ints_tsv): )._force_count() -@benchmark() +@pytest.mark.benchmark() def benchmark_table_import_ints_impute(many_ints_tsv): hl.import_table(str(many_ints_tsv), impute=True)._force_count() -@benchmark() +@pytest.mark.benchmark() def benchmark_table_import_strings(many_strings_tsv): hl.import_table(str(many_strings_tsv))._force_count() -@benchmark() +@pytest.mark.benchmark() def benchmark_table_aggregate_int_stats(many_ints_ht): ht = hl.read_table(str(many_ints_ht)) ht.aggregate( @@ -171,20 +174,21 @@ def benchmark_table_aggregate_int_stats(many_ints_ht): ) -@benchmark() +@pytest.mark.benchmark() def benchmark_table_range_means(): ht = hl.utils.range_table(10_000_000, 16) ht = ht.annotate(m=hl.mean(hl.range(0, ht.idx % 1111))) ht._force_count() -@benchmark() +@pytest.mark.benchmark() def benchmark_table_range_array_range_force_count(): ht = hl.utils.range_table(30).annotate(big_range=hl.range(100_000_000)) ht._force_count() -@benchmark() +@pytest.mark.benchmark() +@pytest.mark.xfail(raises=TimeoutError, reason=XFail.Timeout) def benchmark_table_aggregate_approx_cdf(random_doubles_mt): mt = hl.read_matrix_table(str(random_doubles_mt)) mt.aggregate_entries(( @@ -194,74 +198,74 @@ def benchmark_table_aggregate_approx_cdf(random_doubles_mt): )) -@benchmark() +@pytest.mark.benchmark() def benchmark_table_aggregate_counter(many_strings_ht): ht = hl.read_table(str(many_strings_ht)) ht.aggregate(hl.tuple([hl.agg.counter(ht[f'f{i}']) for i in range(8)])) -@benchmark() +@pytest.mark.benchmark() def benchmark_table_aggregate_take_by_strings(many_strings_ht): ht = hl.read_table(str(many_strings_ht)) ht.aggregate(hl.tuple([hl.agg.take(ht['f18'], 25, ordering=ht[f'f{i}']) for i in range(18)])) -@benchmark() +@pytest.mark.benchmark() def benchmark_table_aggregate_downsample_dense(many_ints_ht): ht = hl.read_table(str(many_ints_ht)) ht.aggregate(tuple([hl.agg.downsample(ht[f'i{i}'], ht['i3'], label=hl.str(ht['i4'])) for i in range(3)])) -@benchmark() +@pytest.mark.benchmark() def benchmark_table_aggregate_downsample_worst_case(): ht = hl.utils.range_table(250_000_000, 8) ht.aggregate(hl.agg.downsample(ht.idx, -ht.idx)) -@benchmark() +@pytest.mark.benchmark() @pytest.mark.skip(reason='FIXME: this needs fixtures to accurately measure downsample (rather than randomness') def benchmark_table_aggregate_downsample_sparse(): ht = hl.utils.range_table(250_000_000, 8) ht.aggregate(hl.agg.downsample(hl.rand_norm() ** 5, hl.rand_norm() ** 5)) -@benchmark() +@pytest.mark.benchmark() def benchmark_table_aggregate_linreg(many_ints_ht): ht = hl.read_table(str(many_ints_ht)) ht.aggregate(hl.agg.array_agg(lambda i: hl.agg.linreg(ht.i0 + i, [ht.i1, ht.i2, ht.i3, ht.i4]), hl.range(75))) -@benchmark() +@pytest.mark.benchmark() def benchmark_table_take(many_strings_ht): ht = hl.read_table(str(many_strings_ht)) ht.take(100) -@benchmark() +@pytest.mark.benchmark() def benchmark_table_show(many_strings_ht): ht = hl.read_table(str(many_strings_ht)) ht.show(100) -@benchmark() +@pytest.mark.benchmark() def benchmark_table_expr_take(many_strings_ht): ht = hl.read_table(str(many_strings_ht)) hl.tuple([ht.f1, ht.f2]).take(100) -@benchmark() +@pytest.mark.benchmark() def benchmark_read_force_count_partitions(many_partitions_ht): hl.read_table(str(many_partitions_ht))._force_count() -@benchmark() +@pytest.mark.benchmark() @pytest.mark.parametrize('n,n_partitions', [(10_000_000, 1000), (10_000_000, 100), (10_000_000, 10)]) def benchmark_write_range_table(tmp_path, n, n_partitions): ht = hl.utils.range_table(n, n_partitions) ht.write(str(tmp_path / 'tmp.ht')) -@benchmark() +@pytest.mark.benchmark() @pytest.mark.parametrize('many_partitions_ht', [1_000], indirect=True) def benchmark_read_with_index(many_partitions_ht): rows = 10_000_000 @@ -272,44 +276,43 @@ def benchmark_read_with_index(many_partitions_ht): ht._force_count() -many_partitions_ht1 = many_partitions_ht -many_partitions_ht2 = many_partitions_ht +many_partitions_ht1, many_partitions_ht2 = [many_partitions_ht] * 2 -@benchmark() +@pytest.mark.benchmark() def benchmark_union_partitions_table(many_partitions_ht1, many_partitions_ht2): ht1 = hl.read_table(str(many_partitions_ht1)) ht2 = hl.read_table(str(many_partitions_ht2)) ht1.union(ht2)._force_count() -@benchmark() +@pytest.mark.benchmark() def benchmark_join_partitions_table(many_partitions_ht1, many_partitions_ht2): ht1 = hl.read_table(str(many_partitions_ht1)) ht2 = hl.read_table(str(many_partitions_ht2)) ht1.join(ht2)._force_count() -@benchmark() +@pytest.mark.benchmark() def benchmark_group_by_collect_per_row(gnomad_dp_sim): ht = hl.read_matrix_table(str(gnomad_dp_sim)).localize_entries('e', 'c') ht.group_by(*ht.key).aggregate(value=hl.agg.collect(ht.row_value))._force_count() -@benchmark() +@pytest.mark.benchmark() def benchmark_group_by_take_rekey(gnomad_dp_sim): ht = hl.read_matrix_table(str(gnomad_dp_sim)).localize_entries('e', 'c') ht.group_by(k=hl.int(ht.row_idx / 50)).aggregate(value=hl.agg.take(ht.row_value, 1))._force_count() -@benchmark() +@pytest.mark.benchmark() def benchmark_table_scan_sum_1k_partitions(): ht = hl.utils.range_table(1000000, n_partitions=1000) ht = ht.annotate(x=hl.scan.sum(ht.idx)) ht._force_count() -@benchmark() +@pytest.mark.benchmark() def benchmark_table_scan_prev_non_null(): ht = hl.utils.range_table(100000000, n_partitions=10) ht = ht.annotate(x=hl.range(0, ht.idx % 25)) @@ -317,21 +320,21 @@ def benchmark_table_scan_prev_non_null(): ht._force_count() -@benchmark() +@pytest.mark.benchmark() def benchmark_test_map_filter_region_memory(): high_mem_table = hl.utils.range_table(30).naive_coalesce(1).annotate(big_array=hl.zeros(100_000_000)) high_mem_table = high_mem_table.filter(high_mem_table.idx % 2 == 0) assert high_mem_table._force_count() == 15 -@benchmark() +@pytest.mark.benchmark() def benchmark_test_head_and_tail_region_memory(): high_mem_table = hl.utils.range_table(100).annotate(big_array=hl.zeros(100_000_000)) high_mem_table = high_mem_table.head(30) high_mem_table._force_count() -@benchmark() +@pytest.mark.benchmark() def benchmark_test_inner_join_region_memory(): high_mem_table = hl.utils.range_table(30).naive_coalesce(1).annotate(big_array=hl.zeros(50_000_000)) high_mem_table2 = hl.utils.range_table(30).naive_coalesce(1).annotate(big_array=hl.zeros(50_000_000)) @@ -339,7 +342,7 @@ def benchmark_test_inner_join_region_memory(): joined._force_count() -@benchmark() +@pytest.mark.benchmark() def benchmark_test_left_join_region_memory(): high_mem_table = hl.utils.range_table(30).naive_coalesce(1).annotate(big_array=hl.zeros(50_000_000)) high_mem_table2 = hl.utils.range_table(30).naive_coalesce(1).annotate(big_array=hl.zeros(50_000_000)) diff --git a/hail/python/benchmark/hail/conftest.py b/hail/python/benchmark/hail/conftest.py index a8a10df208db..53a4015b8879 100644 --- a/hail/python/benchmark/hail/conftest.py +++ b/hail/python/benchmark/hail/conftest.py @@ -8,7 +8,6 @@ from typing import Any, Dict, List, Literal import pytest -from _pytest.runner import pytest_runtest_protocol as runner_runtest_protocol import hail as hl from benchmark.hail.fixtures import ( @@ -66,12 +65,15 @@ chr22_gvcfs, empty_gvcf, gnomad_dp_sim, + local_tmpdir, many_ints_ht, many_ints_tsv, many_partitions_ht, many_strings_ht, many_strings_tsv, onekg_chr22, + onethreethree, + onethreetwo, profile25_mt, profile25_vcf, random_doubles_mt, @@ -85,14 +87,14 @@ get_peak_task_memory, init_hail_for_benchmarks, run_with_timeout, - select, ) +from benchmark.tools import maybe, prune from hail.utils.java import Env @contextmanager -def init_hail(run_config): - init_hail_for_benchmarks(run_config) +def init_hail(config): + init_hail_for_benchmarks(config) try: yield finally: @@ -106,6 +108,7 @@ def init_hail(run_config): end = pytest.StashKey[datetime]() iteration = pytest.StashKey[int]() runs = pytest.StashKey[List[Dict[str, Any]]]() +consecutive_fail_count = pytest.StashKey[int]() # used internally context = pytest.StashKey[Literal['burn_in', 'benchmark']]() @@ -113,15 +116,14 @@ def init_hail(run_config): @pytest.hookimpl(tryfirst=True) def pytest_runtest_protocol(item, nextitem): - run_config = item.session.config.run_config + runner = item.config.pluginmanager.get_plugin('runner') # Initialise hail before running every benchmark for two reasons: # - each benchmark runs in a clean hail session # - our means of getting max task memory is quite crude (regex on logs) # and a fresh session provides a new log - with init_hail(run_config): - if run_config.iterations is not None: + with init_hail(item.config): + if (iterations := item.config.getoption('iterations')) is not None: burn_in_iterations = 0 - iterations = run_config.iterations logging.info( msg=( f'Picked up iterations override. Config: ' @@ -131,13 +133,14 @@ def pytest_runtest_protocol(item, nextitem): ) else: - burn_in_iterations, iterations = select( - ['burn_in_iterations', 'iterations'], **(item.get_closest_marker('benchmark').kwargs) - ) + params = item.get_closest_marker('benchmark').kwargs + burn_in_iterations = params.get('burn_in_iterations', 1) + iterations = params.get('iterations', 5) s = item.stash s[start] = datetime.now(timezone.utc).isoformat() s[runs] = [] + s[consecutive_fail_count] = 0 s[end] = None logging.info( @@ -148,8 +151,13 @@ def pytest_runtest_protocol(item, nextitem): ) ) + max_failures = item.config.getoption('max_failures') + s[context] = 'burn_in' for k in range(burn_in_iterations): + if max_failures and s[consecutive_fail_count] >= max_failures: + break + s[iteration] = k # `nextitem` is used to determine which fixtures need to be torn-down # after the test finishes. For example, if `nextitem` is `None`, then @@ -157,18 +165,29 @@ def pytest_runtest_protocol(item, nextitem): # Since we're invoking this benchmark repeatedly, we want to tear-down # function/method level fixtures only, leaving module and session # fixtures in place; `item.parent` is one such `Item` that represents this. - runner_runtest_protocol(item, nextitem=item.parent) + runner.pytest_runtest_protocol(item, nextitem=item.parent) s[context] = 'benchmark' total_iterations = burn_in_iterations + iterations for k in range(burn_in_iterations, total_iterations): + if max_failures and s[consecutive_fail_count] >= max_failures: + break + s[iteration] = k # on the final iteration, perform the required teardown for the test is_final_iteration = k == total_iterations - 1 - runner_runtest_protocol(item, nextitem=nextitem if is_final_iteration else item.parent) + runner.pytest_runtest_protocol(item, nextitem=nextitem if is_final_iteration else item.parent) s[end] = datetime.now(timezone.utc).isoformat() + if max_failures and s[consecutive_fail_count] >= max_failures: + logging.error( + msg=( + f'Benchmarking "{item.nodeid}" aborted due to too many ' + f'consecutive failures (max={max_failures})' + ) + ) + # prevent other plugins running that might invoke the benchmark again return True @@ -176,7 +195,7 @@ def pytest_runtest_protocol(item, nextitem): @pytest.hookimpl(tryfirst=True) def pytest_pyfunc_call(pyfuncitem): with run_with_timeout( - pyfuncitem.config.run_config, + pyfuncitem.config.getoption('max_duration'), pyfuncitem.obj, **{arg: pyfuncitem.funcargs[arg] for arg in pyfuncitem._fixtureinfo.argnames}, ) as (time, timed_out, traceback): @@ -184,14 +203,21 @@ def pytest_pyfunc_call(pyfuncitem): is_burn_in = s[context] == 'burn_in' - s[runs].append({ - 'iteration': s[iteration], - 'time': time, - 'is_burn_in': is_burn_in, - 'timed_out': timed_out, - 'failure': traceback, - 'task_memory': get_peak_task_memory(Env.hc()._log), - }) + if timed_out or traceback is not None: + s[consecutive_fail_count] += 1 + else: + s[consecutive_fail_count] = 0 + + s[runs].append( + prune({ + 'iteration': s[iteration], + 'time': time, + 'is_burn_in': is_burn_in, + 'timed_out': timed_out, + 'failure': maybe(json.dumps, traceback), + 'task_memory': get_peak_task_memory(Env.hc()._log), + }) + ) logging.info(f'{"(burn in) " if is_burn_in else ""}iteration {s[iteration]}, time: {time}s') @@ -210,30 +236,29 @@ def open_file_or_stdout(file): @pytest.hookimpl def pytest_sessionfinish(session): - if not session.config.option.collectonly: - run_config = session.config.run_config - + if hasattr(session, 'items') and len(session.items) > 0 and not session.config.option.collectonly: meta = { 'uname': platform.uname()._asdict(), 'version': hl.__version__, - **({'batch_id': batch} if (batch := os.getenv('HAIL_BATCH_ID')) else {}), - **({'job_id': job} if (job := os.getenv('HAIL_JOB_ID')) else {}), - **({'trial': trial} if (trial := os.getenv('BENCHMARK_TRIAL_ID')) else {}), + 'batch_id': maybe(int, os.getenv('HAIL_BATCH_ID')), + 'job_id': maybe(int, os.getenv('HAIL_JOB_ID')), + 'attempt_id': os.getenv('HAIL_ATTEMPT_ID'), + 'trial': maybe(int, os.getenv('BENCHMARK_TRIAL_ID')), } now = datetime.now(timezone.utc).isoformat() - with open_file_or_stdout(run_config.output) as out: + with open_file_or_stdout(session.config.getoption('output')) as out: for item in session.items: path, _, name = item.location json.dump( - { + prune({ 'path': path, 'name': name, **meta, 'start': item.stash[start], 'end': item.stash.get(end, now), 'runs': item.stash[runs], - }, + }), out, ) out.write('\n') @@ -296,12 +321,15 @@ def pytest_sessionfinish(session): 'chr22_gvcfs', 'empty_gvcf', 'gnomad_dp_sim', + 'local_tmpdir', 'many_ints_ht', 'many_ints_tsv', 'many_partitions_ht', 'many_strings_ht', 'many_strings_tsv', 'onekg_chr22', + 'onethreethree', + 'onethreetwo', 'profile25_mt', 'profile25_vcf', 'random_doubles_mt', diff --git a/hail/python/benchmark/hail/fixtures.py b/hail/python/benchmark/hail/fixtures.py index 2387d9bff5c3..d4e3820994bb 100644 --- a/hail/python/benchmark/hail/fixtures.py +++ b/hail/python/benchmark/hail/fixtures.py @@ -1,21 +1,22 @@ import logging import os +import shutil import subprocess from pathlib import Path import pytest import hail as hl +from benchmark.tools import maybe +from hailtop.utils import async_to_blocking, retry_transient_errors @pytest.fixture(scope='session') def resource_dir(request, tmpdir_factory): - run_config = request.config.run_config - if run_config.data_dir is not None: - resource_dir = Path(run_config.data_dir) + if (resource_dir := maybe(Path, request.config.getoption('data_dir'))) is not None: resource_dir.mkdir(parents=True, exist_ok=True) else: - resource_dir = tmpdir_factory.mktemp('hail_benchmark_resources') + resource_dir = Path(tmpdir_factory.mktemp('hail_benchmark_resources')) return resource_dir @@ -23,24 +24,42 @@ def resource_dir(request, tmpdir_factory): gs_curl_root = 'https://storage.googleapis.com/hail-common/benchmark' -def __download(data_dir, filename): +async def __download(data_dir, filename): url = os.path.join(gs_curl_root, filename) logging.info(f'downloading: {filename}') - # Note: the below does not work on batch due to docker/ssl problems - # dest = os.path.join(data_dir, filename) - # urlretrieve(url, dest) - subprocess.check_call(['curl', url, '-Lfs', '--output', f'{data_dir / filename}']) + subprocess.check_call(['curl', url, '-Lfs', '-m', '200', '--output', f'{data_dir / filename}']) logging.info(f'done: {filename}') -def localize(path: Path): +def localize(path: Path) -> Path: if not path.exists(): - path.parent.mkdir(exist_ok=True) - __download(path.parent, path.name) + path.parent.mkdir(parents=True, exist_ok=True) + async_to_blocking( + retry_transient_errors( + __download, + path.parent, + path.name, + ) + ) return path +@pytest.fixture(autouse=True, scope='function') +def local_tmpdir(tmp_path): + # if hl.version() < '0.2.134': + # yield + # else: + backend = hl.current_backend() + old = backend.local_tmpdir + backend.local_tmpdir = str(tmp_path) + try: + yield tmp_path + finally: + backend.local_tmpdir = old + shutil.rmtree(tmp_path) + + @pytest.fixture(scope='session') def empty_gvcf(resource_dir): path = resource_dir / 'empty_gvcf' @@ -332,3 +351,13 @@ def many_partitions_ht(resource_dir, request): hl.utils.range_table(10_000_000, n_partitions).write(str(path)) return path + + +@pytest.fixture(scope='session') +def onethreetwo(resource_dir): + return localize(resource_dir / '0.2.132.jsonl') + + +@pytest.fixture(scope='session') +def onethreethree(resource_dir): + return localize(resource_dir / '0.2.133.jsonl') diff --git a/hail/python/benchmark/hail/utils.py b/hail/python/benchmark/hail/utils.py index 86ea3190df82..37fe4cb0b429 100644 --- a/hail/python/benchmark/hail/utils.py +++ b/hail/python/benchmark/hail/utils.py @@ -1,26 +1,34 @@ -import contextlib +import glob import logging import re import signal import timeit import traceback from contextlib import contextmanager +from enum import Enum +from pathlib import Path +from typing import NoReturn import hail as hl -def init_hail_for_benchmarks(run_config): +class XFail(str, Enum): + Timeout = 'Runtime exceeds maximum permitted duration' + OutOfMemory = 'Out of memory' + + +def init_hail_for_benchmarks(config): init_args = { - 'master': f'local[{run_config.cores}]', - 'quiet': not run_config.verbose, - 'log': run_config.log, + 'master': f'local[{config.getoption("cores")}]', + 'quiet': config.getoption('verbose') < 0, + 'log': config.getoption('log'), } - if run_config.profile is not None: - if run_config.profile_fmt == 'html': + if (profile := config.getoption('profile')) is not None: + if config.getoption('profile_fmt') == 'html': filetype = 'html' fmt_arg = 'tree=total' - elif run_config.profile_fmt == 'flame': + elif config.getoption('profile_fmt') == 'flame': filetype = 'svg' fmt_arg = 'svg=total' else: @@ -28,10 +36,10 @@ def init_hail_for_benchmarks(run_config): fmt_arg = 'jfr' prof_args = ( - f'-agentpath:{run_config.profiler_path}/build/libasyncProfiler.so=start,' - f'event={run_config.profile},' + f'-agentpath:{config.getoption("profiler_path")}/build/libasyncProfiler.so=start,' + f'event={profile},' f'{fmt_arg},' - f'file=bench-profile-{run_config.profile}-%t.{filetype},' + f'file=bench-profile-{profile}-%t.{filetype},' 'interval=1ms,' 'framebuf=15000000' ) @@ -48,50 +56,57 @@ def init_hail_for_benchmarks(run_config): logging.getLogger('py4j.java_gateway').setLevel(logging.CRITICAL) -__timeout_state = False +# Using a custom exception instead of TimeoutError allows explicit handling +class __Timeout(BaseException): + pass # https://stackoverflow.com/questions/492519/timeout-on-a-function-call/494273#494273 -@contextlib.contextmanager -def timeout_signal(run_config): - global __timeout_state - __timeout_state = False - - def handler(signum, frame): - global __timeout_state - __timeout_state = True +@contextmanager +def timeout_signal(duration): + def handler(signum, frame) -> NoReturn: try: - hl.stop() - init_hail_for_benchmarks(run_config) - except Exception: - traceback.print_exc() # we're fucked. - - raise TimeoutError(f'Timed out after {run_config.timeout}s') - - signal.signal(signal.SIGALRM, handler) - signal.alarm(run_config.timeout) + signal.siginterrupt(signal.SIGINT, True) + except KeyboardInterrupt: + pass + finally: + raise __Timeout() + restore = signal.signal(signal.SIGALRM, handler) + signal.alarm(duration) try: yield finally: + signal.signal(signal.SIGALRM, restore) + signal.alarm(0) - def no_op(signum, frame): - pass - signal.signal(signal.SIGALRM, no_op) - signal.alarm(0) +# Spark exposes the configuration parameter 'spark.worker.cleanup.enabled'. +# When enabled, spark periodically cleans up temporary files. It's not clear +# how to trigger a clean-up manually or how to configure which directory gets +# used. +def __hack_cleanup_spark_tmpfiles(): + for tmpdir in glob.glob('/tmp/blockmgr*/**/*'): + Path(tmpdir).unlink() @contextmanager -def run_with_timeout(run_config, fn, *args, **kwargs): - with timeout_signal(run_config): +def run_with_timeout(max_duration, fn, *args, **kwargs): + try: try: - timer = timeit.Timer(lambda: fn(*args, **kwargs)).timeit(1) - yield timer, False, None - except Exception as e: - timed_out = isinstance(e, TimeoutError) - yield (run_config.timeout if timed_out else None, timed_out, traceback.format_exc()) - raise e + timer = timeit.Timer(lambda: fn(*args, **kwargs)) + with timeout_signal(max_duration): + duration = timer.timeit(1) + except __Timeout: + yield (max_duration, True, traceback.format_exc()) + raise TimeoutError(f'Timed out after {max_duration}s') + except Exception: + yield (None, False, traceback.format_exc()) + raise + + yield duration, False, None + finally: + __hack_cleanup_spark_tmpfiles() __peak_mem_pattern = re.compile(r'.*TaskReport:.*peakBytes=(\d+),.*') @@ -100,16 +115,10 @@ def run_with_timeout(run_config, fn, *args, **kwargs): def get_peak_task_memory(log_path) -> int: with open(log_path, 'r') as f: task_peak_bytes = [ - int(match.groups()[0]) - for line in f - if (match := __peak_mem_pattern.match(line)) is not None # + int(match.groups()[0]) for line in f if (match := __peak_mem_pattern.match(line)) is not None ] if len(task_peak_bytes) == 0: return 0 return max(task_peak_bytes) - - -def select(keys, **kwargs): - return (kwargs.get(k, None) for k in keys) diff --git a/hail/python/benchmark/tools/__init__.py b/hail/python/benchmark/tools/__init__.py index 67da79872a9d..3466eb4348b9 100644 --- a/hail/python/benchmark/tools/__init__.py +++ b/hail/python/benchmark/tools/__init__.py @@ -1,28 +1,14 @@ -import functools import logging -from typing import Callable, List, TypeVar, Union +from typing import Any, Callable, Generator, List, Sequence, TypeVar, Union -import pytest - - -def benchmark(burn_in_iterations=1, iterations=5, batch_jobs=5): - def wrap(benchmark_fn): - @pytest.mark.benchmark(burn_in_iterations=burn_in_iterations, iterations=iterations, batch_jobs=batch_jobs) - @functools.wraps(benchmark_fn) - def runner(*args, **kwargs): - return benchmark_fn(*args, **kwargs) - - return runner - - return wrap +A = TypeVar('A') -def chunk(size, seq): +def chunk(size: int, seq: Sequence[A]) -> Generator[Sequence[A], A, Any]: for pos in range(0, len(seq), size): yield seq[pos : pos + size] -A = TypeVar('A') B = TypeVar('B') @@ -34,8 +20,8 @@ def prune(kvs: dict) -> dict: return {k: v for k, v in kvs.items() if v is not None} -def select(keys: List[str], **kwargs): - return (kwargs.get(k, None) for k in keys) +def select(keys: List[str], **kwargs) -> List[Union[Any, None]]: + return [kwargs.get(k) for k in keys] def init_logging(file=None): diff --git a/hail/scripts/benchmark_in_batch.py b/hail/scripts/benchmark_in_batch.py index 78bc0fc3911b..52938ea818de 100755 --- a/hail/scripts/benchmark_in_batch.py +++ b/hail/scripts/benchmark_in_batch.py @@ -13,7 +13,7 @@ import benchmark import pytest -from benchmark.tools import chunk, init_logging +from benchmark.tools import chunk, init_logging, maybe from hailtop import batch as hb from hailtop.batch import Batch, Resource @@ -22,39 +22,56 @@ class CollectBenchmarks: - def __init__(self): + def __init__(self, include: Optional[List[str]], exclude: Optional[List[str]]): + assert not ((include is not None) and (exclude is not None)) + self.includes = maybe(set, include) + self.excludes = maybe(set, exclude) self.items = [] @pytest.hookimpl(trylast=True) def pytest_collection_modifyitems(self, items): self.items = [ - (item.location[0], item.name, marker.kwargs['batch_jobs']) + (item.nodeid, marker.kwargs['instances']) for item in items - if (marker := item.get_closest_marker('benchmark')) + if ( + (marker := item.get_closest_marker('benchmark')) is not None + and (self.includes is None or item.nodeid in self.includes) + and (self.excludes is None or item.nodeid not in self.excludes) + and (item.get_closest_marker('skip') is None) + ) ] # https://github.com/pytest-dev/pytest/discussions/2039 -def list_benchmarks(include: str, exclude: str) -> List[Tuple[Path, str, int]]: - collect = CollectBenchmarks() +def list_benchmarks( + include: Optional[List[str]], + exclude: Optional[List[str]], +) -> List[Tuple[str, int]]: + collect = CollectBenchmarks(include, exclude) directory = Path(benchmark.__file__).parent.parent pytest.main( [ '-qq', '--co', - directory, + str(directory), '-m', 'benchmark', - *(['-k', include] if include is not None else []), - *(['--ignore', exclude] if exclude is not None else []), ], plugins=[collect], ) - return collect.items + + items = collect.items + + if include is not None: + diff = set(include) - set([nodeid for nodeid, *_ in items]) + if len(diff) != 0: + logging.warning(f'The following includes matched no pytest items {diff}') + + return items def run_list_benchmarks(args: Namespace) -> None: - print(sep='\n', *[f'{path}::{name}' for path, name, _ in list_benchmarks(args.include, args.exclude)]) + print(sep='\n', *[nodeid for nodeid, *_ in list_benchmarks(args.include, args.exclude)]) def build_and_push_benchmark_image(hail_dev_image: str, artifact_uri: str, tag: str) -> str: @@ -88,13 +105,13 @@ def make_test_storage_permissions_job(b: Batch, object_prefix: str, labelled_sha def make_benchmark_trial( b: Batch, env: Dict[str, Optional[str]], - path: Path, - benchmark_name: str, + benchmark_id: str, trial: int, iterations: Optional[int], + max_duration: Optional[int], deps: List[Job], # dont reformat -) -> Job: - j = b.new_job(name=f'{path}::{benchmark_name}-{trial}') +) -> Resource: + j = b.new_job(name=f'{benchmark_id}-{trial}') j.depends_on(*deps) j.always_copy_output() @@ -110,7 +127,7 @@ def make_benchmark_trial( j.command('mkdir -p benchmark-resources') j.command( ' '.join([ - f"python3 -m pytest '{path}::{benchmark_name}'", + f"python3 -m pytest '{benchmark_id}'", '-Werror:::hail -Werror:::hailtop -Werror::ResourceWarning', '--log-cli-level=ERROR', '-s', @@ -118,7 +135,6 @@ def make_benchmark_trial( '-vv', '--instafail', '--durations=50', - '--timeout=1800', # pytest keeps 3 test sessions worth of temp files by default. # some benchmarks generate very large output files which can quickly # fill the tmpfs and so we'll make pytest always delete tmp files @@ -128,6 +144,7 @@ def make_benchmark_trial( f'--output={j.ofile}', '--data-dir=benchmark-resources', f'--iterations={iterations}' if iterations is not None else '', + f'--max-duration={max_duration}' if max_duration is not None else '', ]) ) @@ -135,11 +152,19 @@ def make_benchmark_trial( def read_jsonl(p: Path): - logging.info(f'reading json lines from {p}.') - with p.open(encoding='utf-8') as r: - for line in r: - if len(line) > 1: - yield json.loads(line) + try: + f = p.open(encoding='utf-8') + except IOError as e: + logging.error(e) + else: + logging.info(f'reading json lines from {p}.') + with f: + for line in f: + if len(line) > 1: + try: + yield json.loads(line) + except Exception as e: + logging.error(e) def combine(output: Resource, files: List[Resource]) -> None: @@ -192,8 +217,8 @@ def run_submit(args: Namespace) -> None: output_file = P.join(object_prefix, f'{timestamp}-{labelled_sha}.jsonl') all_benchmarks = [ - (path, name, trial) - for path, name, num_jobs in list_benchmarks(args.include, args.exclude) + (nodeid, trial) + for nodeid, num_jobs in list_benchmarks(args.include, args.exclude) for trial in range(args.jobs or num_jobs) ] @@ -235,8 +260,16 @@ def run_submit(args: Namespace) -> None: b, args.branch_factor, [ - make_benchmark_trial(b, envvars, path, name, trial, args.iterations, deps=[test_permissions]) - for path, name, trial in all_benchmarks + make_benchmark_trial( + b, + envvars, + nodeid, + trial, + args.iterations, + args.max_duration, + deps=[test_permissions], + ) + for nodeid, trial in all_benchmarks ], ) @@ -265,8 +298,21 @@ def nonempty(s: str): 'list', description='List known hail benchmarks', ) - list_p.add_argument('--include', type=nonempty, help="see pytest -k", default=None) - list_p.add_argument('--exclude', type=nonempty, help='see pytest --ignore', default=None) + group = list_p.add_mutually_exclusive_group(required=False) + group.add_argument( + '--include', + type=nonempty, + help='fully-qualified benchmark name to run', + action='append', + default=None, + ) + group.add_argument( + '--exclude', + type=nonempty, + help='fully-qualified benchmark name to skip', + action='append', + default=None, + ) list_p.set_defaults(main=run_list_benchmarks) combine_p = subparser.add_parser( @@ -290,16 +336,41 @@ def nonempty(s: str): submit_p.add_argument('--label', type=nonempty, help='batch job label', default=None) submit_p.add_argument('--branch-factor', type=int, help='number of benchmarks to combine at a time', default=32) submit_p.add_argument( - '--iterations', type=int, help='override number of iterations for each benchmark', default=None + '--iterations', + type=int, + help='override number of iterations for each benchmark', + default=None, ) submit_p.add_argument( - '--jobs', type=int, help='override number of batch jobs created for each benchmark', default=None + '--jobs', + type=int, + help='override number of batch jobs created for each benchmark', + default=None, + ) + group = submit_p.add_mutually_exclusive_group(required=False) + group.add_argument( + '--include', + type=nonempty, + help='fully-qualified benchmark name to run. May be specified more than once.', + action='append', + default=None, + ) + group.add_argument( + '--exclude', + type=nonempty, + help='fully-qualified benchmark name to skip. May be specified more than once.', + action='append', + default=None, ) - submit_p.add_argument('--include', type=nonempty, help="see pytest -k", default=None) - submit_p.add_argument('--exclude', type=nonempty, help='see pytest --ignore', default=None) submit_p.add_argument('--lower', action='store_true') submit_p.add_argument('--lower-only', action='store_true') submit_p.add_argument('--wait', action='store_true', help='wait for batch to complete') + submit_p.add_argument( + '--max-duration', + type=int, + help='Maximum duration in seconds for a benchmark trial, after which the trial will be interrupted.', + default=None, + ) submit_p.set_defaults(main=run_submit) args = parser.parse_args()