Skip to content
This repository has been archived by the owner on Apr 8, 2024. It is now read-only.

Adding perf metrics to each node in both single and multi node scripts #192

Merged
merged 20 commits into from
Dec 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions src/common/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from distutils.util import strtobool

from .metrics import MetricsLogger
from .perf import PerformanceMetricsCollector, PerfReportPlotter

class RunnableScript():
"""
Expand Down Expand Up @@ -44,6 +45,8 @@ def __init__(self, task, framework, framework_version, metrics_prefix=None):
metrics_prefix=metrics_prefix
)

self.perf_report_collector = None

@classmethod
def get_arg_parser(cls, parser=None):
"""Adds component/module arguments to a given argument parser.
Expand Down Expand Up @@ -77,6 +80,13 @@ def get_arg_parser(cls, parser=None):
type=str,
help="provide custom properties as json dict",
)
group_general.add_argument(
"--disable_perf_metrics",
required=False,
default=False,
type=strtobool,
help="disable performance metrics (default: False)",
)

return parser

Expand All @@ -101,6 +111,12 @@ def initialize_run(self, args):
# add properties about environment of this script
self.metrics_logger.set_platform_properties()

# enable perf reporting
if not args.disable_perf_metrics:
self.perf_report_collector = PerformanceMetricsCollector()
self.perf_report_collector.start()


def run(self, args, logger, metrics_logger, unknown_args):
"""The run function of your script. You are required to override this method
with your own implementation.
Expand All @@ -117,9 +133,16 @@ def finalize_run(self, args):
"""Finalize the run, close what needs to be"""
self.logger.info("Finalizing script run...")

if self.perf_report_collector:
self.perf_report_collector.finalize()
plotter = PerfReportPlotter(self.metrics_logger)
plotter.add_perf_reports(self.perf_report_collector.perf_reports, node=0)
plotter.report_nodes_perf()

# close mlflow
self.metrics_logger.close()


@classmethod
def main(cls, cli_args=None):
""" Component main function, it is not recommended to override this method.
Expand Down
18 changes: 16 additions & 2 deletions src/common/distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
from .components import RunnableScript
from collections import namedtuple

from .perf import PerformanceMetricsCollector, PerfReportPlotter

def detect_mpi_config():
""" Detects if we're running in MPI.
Args:
Expand Down Expand Up @@ -92,6 +94,12 @@ def initialize_run(self, args):
# add properties about environment of this script
self.metrics_logger.set_platform_properties()

# enable perf reporting
if not args.disable_perf_metrics:
self.perf_report_collector = PerformanceMetricsCollector()
self.perf_report_collector.start()


def finalize_run(self, args):
"""Finalize the run, close what needs to be"""
self.logger.info("Finalizing multi node component script...")
Expand All @@ -100,5 +108,11 @@ def finalize_run(self, args):
self.logger.info("MPI was initialized, calling MPI.finalize()")
MPI.Finalize()

# then use the super finalization method
super().finalize_run(args)
if self.perf_report_collector:
self.perf_report_collector.finalize()
plotter = PerfReportPlotter(self.metrics_logger)
plotter.add_perf_reports(self.perf_report_collector.perf_reports, node=self._mpi_config.world_rank)
plotter.report_nodes_perf()

# close mlflow
self.metrics_logger.close()
4 changes: 3 additions & 1 deletion src/common/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ def close(self):
else:
self._logger.warning(f"Call to finalize MLFLOW [session='{self._session_name}'] that was never initialized.")

def _remove_non_allowed_chars(self, name_string):
@classmethod
def _remove_non_allowed_chars(cls, name_string):
""" Removes chars not allowed for metric keys in mlflow """
return re.sub(r'[^a-zA-Z0-9_\-\.\ \/]', '', name_string)

Expand Down Expand Up @@ -195,6 +196,7 @@ def log_inferencing_latencies(self, time_per_batch, batch_length=1, factor_to_us
if len(time_per_batch) > 1:
import numpy as np
import matplotlib.pyplot as plt
plt.switch_backend('agg')

# latency per batch
batch_run_times = np.array(time_per_batch) * factor_to_usecs
Expand Down
77 changes: 77 additions & 0 deletions src/common/perf.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,3 +184,80 @@ def append_perf_metrics(self, perf_metrics):
]
self.perf_reports_freqs *= 2 # we'll start accepting reports only 1 out of 2
self.logger.warning(f"Perf report store reached max, increasing freq to {self.perf_reports_freqs}")


class PerfReportPlotter():
"""Once collected all perf reports from all nodes"""
def __init__(self, metrics_logger):
self.all_reports = {}
self.metrics_logger = metrics_logger

def add_perf_reports(self, perf_reports, node):
"""Add a set of reports from a given node"""
self.all_reports[node] = perf_reports

def report_nodes_perf(self):
# Currently reporting one metric per node
for node in self.all_reports:
# CPU UTILIZATION
self.metrics_logger.log_metric(
"max_t_(cpu_pct_per_cpu_avg)",
max([ report["cpu_pct_per_cpu_avg"] for report in self.all_reports[node] ]),
step=node
)
self.metrics_logger.log_metric(
"max_t_(cpu_pct_per_cpu_min)",
max([ report["cpu_pct_per_cpu_min"] for report in self.all_reports[node] ]),
step=node
)
self.metrics_logger.log_metric(
"max_t_(cpu_pct_per_cpu_max)",
max([ report["cpu_pct_per_cpu_max"] for report in self.all_reports[node] ]),
step=node
)

# MEM
self.metrics_logger.log_metric(
"max_t_(mem_percent)",
max([ report["mem_percent"] for report in self.all_reports[node] ]),
step=node
)

# DISK
self.metrics_logger.log_metric(
"max_t_(disk_usage_percent)",
max([ report["disk_usage_percent"] for report in self.all_reports[node] ]),
step=node
)
self.metrics_logger.log_metric(
"max_t_(disk_io_read_mb)",
max([ report["disk_io_read_mb"] for report in self.all_reports[node] ]),
step=node
)
self.metrics_logger.log_metric(
"max_t_(disk_io_write_mb)",
max([ report["disk_io_write_mb"] for report in self.all_reports[node] ]),
step=node
)

# NET I/O
self.metrics_logger.log_metric(
"max_t_(net_io_lo_sent_mb)",
max([ report["net_io_lo_sent_mb"] for report in self.all_reports[node] ]),
step=node
)
self.metrics_logger.log_metric(
"max_t_(net_io_ext_sent_mb)",
max([ report["net_io_ext_sent_mb"] for report in self.all_reports[node] ]),
step=node
)
self.metrics_logger.log_metric(
"max_t_(net_io_lo_recv_mb)",
max([ report["net_io_lo_recv_mb"] for report in self.all_reports[node] ]),
step=node
)
self.metrics_logger.log_metric(
"max_t_(net_io_ext_recv_mb)",
max([ report["net_io_ext_recv_mb"] for report in self.all_reports[node] ]),
step=node
)
2 changes: 1 addition & 1 deletion src/scripts/training/lightgbm_python/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def load_lgbm_params_from_cli(self, args, mpi_config):
cli_params = dict(vars(args))

# removing arguments that are purely CLI
for key in ['verbose', 'custom_properties', 'export_model', 'test', 'train', 'custom_params', 'construct']:
for key in ['verbose', 'custom_properties', 'export_model', 'test', 'train', 'custom_params', 'construct', 'disable_perf_metrics']:
del cli_params[key]

# doing some fixes and hardcoded values
Expand Down
26 changes: 24 additions & 2 deletions tests/common/test_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import json

from common.components import SingleNodeScript
from common.metrics import MetricsLogger

class FakeSingleNodeScript(SingleNodeScript):
def __init__(self):
Expand All @@ -26,7 +27,7 @@ def run(self, args, logger, metrics_logger, unknown_args):
@patch('mlflow.start_run')
def test_single_node_script_metrics(mlflow_start_run_mock, mlflow_set_tags_mock, mlflow_log_metric_mock, mlflow_end_run_mock):
# just run main
FakeSingleNodeScript.main(
test_component = FakeSingleNodeScript.main(
[
"foo.py",
"--verbose", "True",
Expand Down Expand Up @@ -72,10 +73,31 @@ def test_single_node_script_metrics(mlflow_start_run_mock, mlflow_set_tags_mock,

# now let's test all metrics
metrics_calls = mlflow_log_metric_mock.call_args_list
assert len(metrics_calls) == 1

# 1 user metric, 11 performance metrics
assert len(metrics_calls) == 12

# user metric (time block)
assert metrics_calls[0].args[0] == "fake_time_block"
assert isinstance(metrics_calls[0].args[1], float)
assert "step" in metrics_calls[0].kwargs
assert metrics_calls[0].kwargs["step"] == 1

# perf metrics
perf_metrics_call_args = [
"max_t_(cpu_pct_per_cpu_avg)",
"max_t_(cpu_pct_per_cpu_min)",
"max_t_(cpu_pct_per_cpu_max)",
"max_t_(mem_percent)",
"max_t_(disk_usage_percent)",
"max_t_(disk_io_read_mb)",
"max_t_(disk_io_write_mb)",
"max_t_(net_io_lo_sent_mb)",
"max_t_(net_io_ext_sent_mb)",
"max_t_(net_io_lo_recv_mb)",
"max_t_(net_io_ext_recv_mb)",
]
for index, metric_key in enumerate(perf_metrics_call_args):
assert metrics_calls[index+1].args[0] == MetricsLogger._remove_non_allowed_chars(metric_key)
assert "step" in metrics_calls[index+1].kwargs
assert metrics_calls[index+1].kwargs["step"] == 0 # using node id as step
4 changes: 1 addition & 3 deletions tests/common/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,6 @@ def test_metrics_logger_log_metric_too_long(mlflow_log_metric_mock):

def test_metrics_logger_log_metric_non_allowed_chars():
""" Tests MetricsLogger().log_metric() """
metrics_logger = MetricsLogger()

test_cases = [
{
'input': "a!@$b%^&c_-/d",
Expand All @@ -137,7 +135,7 @@ def test_metrics_logger_log_metric_non_allowed_chars():
]

for test_case in test_cases:
assert metrics_logger._remove_non_allowed_chars(test_case['input']) == test_case['expected']
assert MetricsLogger._remove_non_allowed_chars(test_case['input']) == test_case['expected']


@patch('mlflow.set_tags')
Expand Down