diff --git a/src/common/components.py b/src/common/components.py index aabd31f8..4c582195 100644 --- a/src/common/components.py +++ b/src/common/components.py @@ -15,6 +15,7 @@ from distutils.util import strtobool from .metrics import MetricsLogger +from .perf import PerformanceMetricsCollector, PerfReportPlotter class RunnableScript(): """ @@ -44,6 +45,8 @@ def __init__(self, task, framework, framework_version, metrics_prefix=None): metrics_prefix=metrics_prefix ) + self.perf_report_collector = None + @classmethod def get_arg_parser(cls, parser=None): """Adds component/module arguments to a given argument parser. @@ -77,6 +80,13 @@ def get_arg_parser(cls, parser=None): type=str, help="provide custom properties as json dict", ) + group_general.add_argument( + "--disable_perf_metrics", + required=False, + default=False, + type=strtobool, + help="disable performance metrics (default: False)", + ) return parser @@ -101,6 +111,12 @@ def initialize_run(self, args): # add properties about environment of this script self.metrics_logger.set_platform_properties() + # enable perf reporting + if not args.disable_perf_metrics: + self.perf_report_collector = PerformanceMetricsCollector() + self.perf_report_collector.start() + + def run(self, args, logger, metrics_logger, unknown_args): """The run function of your script. You are required to override this method with your own implementation. @@ -117,9 +133,16 @@ def finalize_run(self, args): """Finalize the run, close what needs to be""" self.logger.info("Finalizing script run...") + if self.perf_report_collector: + self.perf_report_collector.finalize() + plotter = PerfReportPlotter(self.metrics_logger) + plotter.add_perf_reports(self.perf_report_collector.perf_reports, node=0) + plotter.report_nodes_perf() + # close mlflow self.metrics_logger.close() + @classmethod def main(cls, cli_args=None): """ Component main function, it is not recommended to override this method. diff --git a/src/common/distributed.py b/src/common/distributed.py index 5559acef..3533599a 100644 --- a/src/common/distributed.py +++ b/src/common/distributed.py @@ -14,6 +14,8 @@ from .components import RunnableScript from collections import namedtuple +from .perf import PerformanceMetricsCollector, PerfReportPlotter + def detect_mpi_config(): """ Detects if we're running in MPI. Args: @@ -92,6 +94,12 @@ def initialize_run(self, args): # add properties about environment of this script self.metrics_logger.set_platform_properties() + # enable perf reporting + if not args.disable_perf_metrics: + self.perf_report_collector = PerformanceMetricsCollector() + self.perf_report_collector.start() + + def finalize_run(self, args): """Finalize the run, close what needs to be""" self.logger.info("Finalizing multi node component script...") @@ -100,5 +108,11 @@ def finalize_run(self, args): self.logger.info("MPI was initialized, calling MPI.finalize()") MPI.Finalize() - # then use the super finalization method - super().finalize_run(args) + if self.perf_report_collector: + self.perf_report_collector.finalize() + plotter = PerfReportPlotter(self.metrics_logger) + plotter.add_perf_reports(self.perf_report_collector.perf_reports, node=self._mpi_config.world_rank) + plotter.report_nodes_perf() + + # close mlflow + self.metrics_logger.close() diff --git a/src/common/metrics.py b/src/common/metrics.py index fea6a94c..a3ec9ff0 100644 --- a/src/common/metrics.py +++ b/src/common/metrics.py @@ -54,7 +54,8 @@ def close(self): else: self._logger.warning(f"Call to finalize MLFLOW [session='{self._session_name}'] that was never initialized.") - def _remove_non_allowed_chars(self, name_string): + @classmethod + def _remove_non_allowed_chars(cls, name_string): """ Removes chars not allowed for metric keys in mlflow """ return re.sub(r'[^a-zA-Z0-9_\-\.\ \/]', '', name_string) @@ -195,6 +196,7 @@ def log_inferencing_latencies(self, time_per_batch, batch_length=1, factor_to_us if len(time_per_batch) > 1: import numpy as np import matplotlib.pyplot as plt + plt.switch_backend('agg') # latency per batch batch_run_times = np.array(time_per_batch) * factor_to_usecs diff --git a/src/common/perf.py b/src/common/perf.py index 867aa60d..d1e55b85 100644 --- a/src/common/perf.py +++ b/src/common/perf.py @@ -184,3 +184,80 @@ def append_perf_metrics(self, perf_metrics): ] self.perf_reports_freqs *= 2 # we'll start accepting reports only 1 out of 2 self.logger.warning(f"Perf report store reached max, increasing freq to {self.perf_reports_freqs}") + + +class PerfReportPlotter(): + """Once collected all perf reports from all nodes""" + def __init__(self, metrics_logger): + self.all_reports = {} + self.metrics_logger = metrics_logger + + def add_perf_reports(self, perf_reports, node): + """Add a set of reports from a given node""" + self.all_reports[node] = perf_reports + + def report_nodes_perf(self): + # Currently reporting one metric per node + for node in self.all_reports: + # CPU UTILIZATION + self.metrics_logger.log_metric( + "max_t_(cpu_pct_per_cpu_avg)", + max([ report["cpu_pct_per_cpu_avg"] for report in self.all_reports[node] ]), + step=node + ) + self.metrics_logger.log_metric( + "max_t_(cpu_pct_per_cpu_min)", + max([ report["cpu_pct_per_cpu_min"] for report in self.all_reports[node] ]), + step=node + ) + self.metrics_logger.log_metric( + "max_t_(cpu_pct_per_cpu_max)", + max([ report["cpu_pct_per_cpu_max"] for report in self.all_reports[node] ]), + step=node + ) + + # MEM + self.metrics_logger.log_metric( + "max_t_(mem_percent)", + max([ report["mem_percent"] for report in self.all_reports[node] ]), + step=node + ) + + # DISK + self.metrics_logger.log_metric( + "max_t_(disk_usage_percent)", + max([ report["disk_usage_percent"] for report in self.all_reports[node] ]), + step=node + ) + self.metrics_logger.log_metric( + "max_t_(disk_io_read_mb)", + max([ report["disk_io_read_mb"] for report in self.all_reports[node] ]), + step=node + ) + self.metrics_logger.log_metric( + "max_t_(disk_io_write_mb)", + max([ report["disk_io_write_mb"] for report in self.all_reports[node] ]), + step=node + ) + + # NET I/O + self.metrics_logger.log_metric( + "max_t_(net_io_lo_sent_mb)", + max([ report["net_io_lo_sent_mb"] for report in self.all_reports[node] ]), + step=node + ) + self.metrics_logger.log_metric( + "max_t_(net_io_ext_sent_mb)", + max([ report["net_io_ext_sent_mb"] for report in self.all_reports[node] ]), + step=node + ) + self.metrics_logger.log_metric( + "max_t_(net_io_lo_recv_mb)", + max([ report["net_io_lo_recv_mb"] for report in self.all_reports[node] ]), + step=node + ) + self.metrics_logger.log_metric( + "max_t_(net_io_ext_recv_mb)", + max([ report["net_io_ext_recv_mb"] for report in self.all_reports[node] ]), + step=node + ) diff --git a/src/scripts/training/lightgbm_python/train.py b/src/scripts/training/lightgbm_python/train.py index d9d3f84c..d6b96fef 100644 --- a/src/scripts/training/lightgbm_python/train.py +++ b/src/scripts/training/lightgbm_python/train.py @@ -99,7 +99,7 @@ def load_lgbm_params_from_cli(self, args, mpi_config): cli_params = dict(vars(args)) # removing arguments that are purely CLI - for key in ['verbose', 'custom_properties', 'export_model', 'test', 'train', 'custom_params', 'construct']: + for key in ['verbose', 'custom_properties', 'export_model', 'test', 'train', 'custom_params', 'construct', 'disable_perf_metrics']: del cli_params[key] # doing some fixes and hardcoded values diff --git a/tests/common/test_component.py b/tests/common/test_component.py index fad95309..5fad863e 100644 --- a/tests/common/test_component.py +++ b/tests/common/test_component.py @@ -6,6 +6,7 @@ import json from common.components import SingleNodeScript +from common.metrics import MetricsLogger class FakeSingleNodeScript(SingleNodeScript): def __init__(self): @@ -26,7 +27,7 @@ def run(self, args, logger, metrics_logger, unknown_args): @patch('mlflow.start_run') def test_single_node_script_metrics(mlflow_start_run_mock, mlflow_set_tags_mock, mlflow_log_metric_mock, mlflow_end_run_mock): # just run main - FakeSingleNodeScript.main( + test_component = FakeSingleNodeScript.main( [ "foo.py", "--verbose", "True", @@ -72,10 +73,31 @@ def test_single_node_script_metrics(mlflow_start_run_mock, mlflow_set_tags_mock, # now let's test all metrics metrics_calls = mlflow_log_metric_mock.call_args_list - assert len(metrics_calls) == 1 + + # 1 user metric, 11 performance metrics + assert len(metrics_calls) == 12 # user metric (time block) assert metrics_calls[0].args[0] == "fake_time_block" assert isinstance(metrics_calls[0].args[1], float) assert "step" in metrics_calls[0].kwargs assert metrics_calls[0].kwargs["step"] == 1 + + # perf metrics + perf_metrics_call_args = [ + "max_t_(cpu_pct_per_cpu_avg)", + "max_t_(cpu_pct_per_cpu_min)", + "max_t_(cpu_pct_per_cpu_max)", + "max_t_(mem_percent)", + "max_t_(disk_usage_percent)", + "max_t_(disk_io_read_mb)", + "max_t_(disk_io_write_mb)", + "max_t_(net_io_lo_sent_mb)", + "max_t_(net_io_ext_sent_mb)", + "max_t_(net_io_lo_recv_mb)", + "max_t_(net_io_ext_recv_mb)", + ] + for index, metric_key in enumerate(perf_metrics_call_args): + assert metrics_calls[index+1].args[0] == MetricsLogger._remove_non_allowed_chars(metric_key) + assert "step" in metrics_calls[index+1].kwargs + assert metrics_calls[index+1].kwargs["step"] == 0 # using node id as step diff --git a/tests/common/test_metrics.py b/tests/common/test_metrics.py index 92a601b4..fa58ff79 100644 --- a/tests/common/test_metrics.py +++ b/tests/common/test_metrics.py @@ -119,8 +119,6 @@ def test_metrics_logger_log_metric_too_long(mlflow_log_metric_mock): def test_metrics_logger_log_metric_non_allowed_chars(): """ Tests MetricsLogger().log_metric() """ - metrics_logger = MetricsLogger() - test_cases = [ { 'input': "a!@$b%^&c_-/d", @@ -137,7 +135,7 @@ def test_metrics_logger_log_metric_non_allowed_chars(): ] for test_case in test_cases: - assert metrics_logger._remove_non_allowed_chars(test_case['input']) == test_case['expected'] + assert MetricsLogger._remove_non_allowed_chars(test_case['input']) == test_case['expected'] @patch('mlflow.set_tags')