Skip to content
This repository has been archived by the owner on Apr 8, 2024. It is now read-only.

Commit

Permalink
Adding perf metrics to each node in both single and multi node scripts (
Browse files Browse the repository at this point in the history
  • Loading branch information
jfomhover authored Dec 9, 2021
1 parent c7298fa commit 65b5619
Show file tree
Hide file tree
Showing 7 changed files with 145 additions and 9 deletions.
23 changes: 23 additions & 0 deletions src/common/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from distutils.util import strtobool

from .metrics import MetricsLogger
from .perf import PerformanceMetricsCollector, PerfReportPlotter

class RunnableScript():
"""
Expand Down Expand Up @@ -44,6 +45,8 @@ def __init__(self, task, framework, framework_version, metrics_prefix=None):
metrics_prefix=metrics_prefix
)

self.perf_report_collector = None

@classmethod
def get_arg_parser(cls, parser=None):
"""Adds component/module arguments to a given argument parser.
Expand Down Expand Up @@ -77,6 +80,13 @@ def get_arg_parser(cls, parser=None):
type=str,
help="provide custom properties as json dict",
)
group_general.add_argument(
"--disable_perf_metrics",
required=False,
default=False,
type=strtobool,
help="disable performance metrics (default: False)",
)

return parser

Expand All @@ -101,6 +111,12 @@ def initialize_run(self, args):
# add properties about environment of this script
self.metrics_logger.set_platform_properties()

# enable perf reporting
if not args.disable_perf_metrics:
self.perf_report_collector = PerformanceMetricsCollector()
self.perf_report_collector.start()


def run(self, args, logger, metrics_logger, unknown_args):
"""The run function of your script. You are required to override this method
with your own implementation.
Expand All @@ -117,9 +133,16 @@ def finalize_run(self, args):
"""Finalize the run, close what needs to be"""
self.logger.info("Finalizing script run...")

if self.perf_report_collector:
self.perf_report_collector.finalize()
plotter = PerfReportPlotter(self.metrics_logger)
plotter.add_perf_reports(self.perf_report_collector.perf_reports, node=0)
plotter.report_nodes_perf()

# close mlflow
self.metrics_logger.close()


@classmethod
def main(cls, cli_args=None):
""" Component main function, it is not recommended to override this method.
Expand Down
18 changes: 16 additions & 2 deletions src/common/distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
from .components import RunnableScript
from collections import namedtuple

from .perf import PerformanceMetricsCollector, PerfReportPlotter

def detect_mpi_config():
""" Detects if we're running in MPI.
Args:
Expand Down Expand Up @@ -92,6 +94,12 @@ def initialize_run(self, args):
# add properties about environment of this script
self.metrics_logger.set_platform_properties()

# enable perf reporting
if not args.disable_perf_metrics:
self.perf_report_collector = PerformanceMetricsCollector()
self.perf_report_collector.start()


def finalize_run(self, args):
"""Finalize the run, close what needs to be"""
self.logger.info("Finalizing multi node component script...")
Expand All @@ -100,5 +108,11 @@ def finalize_run(self, args):
self.logger.info("MPI was initialized, calling MPI.finalize()")
MPI.Finalize()

# then use the super finalization method
super().finalize_run(args)
if self.perf_report_collector:
self.perf_report_collector.finalize()
plotter = PerfReportPlotter(self.metrics_logger)
plotter.add_perf_reports(self.perf_report_collector.perf_reports, node=self._mpi_config.world_rank)
plotter.report_nodes_perf()

# close mlflow
self.metrics_logger.close()
4 changes: 3 additions & 1 deletion src/common/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ def close(self):
else:
self._logger.warning(f"Call to finalize MLFLOW [session='{self._session_name}'] that was never initialized.")

def _remove_non_allowed_chars(self, name_string):
@classmethod
def _remove_non_allowed_chars(cls, name_string):
""" Removes chars not allowed for metric keys in mlflow """
return re.sub(r'[^a-zA-Z0-9_\-\.\ \/]', '', name_string)

Expand Down Expand Up @@ -195,6 +196,7 @@ def log_inferencing_latencies(self, time_per_batch, batch_length=1, factor_to_us
if len(time_per_batch) > 1:
import numpy as np
import matplotlib.pyplot as plt
plt.switch_backend('agg')

# latency per batch
batch_run_times = np.array(time_per_batch) * factor_to_usecs
Expand Down
77 changes: 77 additions & 0 deletions src/common/perf.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,3 +184,80 @@ def append_perf_metrics(self, perf_metrics):
]
self.perf_reports_freqs *= 2 # we'll start accepting reports only 1 out of 2
self.logger.warning(f"Perf report store reached max, increasing freq to {self.perf_reports_freqs}")


class PerfReportPlotter():
"""Once collected all perf reports from all nodes"""
def __init__(self, metrics_logger):
self.all_reports = {}
self.metrics_logger = metrics_logger

def add_perf_reports(self, perf_reports, node):
"""Add a set of reports from a given node"""
self.all_reports[node] = perf_reports

def report_nodes_perf(self):
# Currently reporting one metric per node
for node in self.all_reports:
# CPU UTILIZATION
self.metrics_logger.log_metric(
"max_t_(cpu_pct_per_cpu_avg)",
max([ report["cpu_pct_per_cpu_avg"] for report in self.all_reports[node] ]),
step=node
)
self.metrics_logger.log_metric(
"max_t_(cpu_pct_per_cpu_min)",
max([ report["cpu_pct_per_cpu_min"] for report in self.all_reports[node] ]),
step=node
)
self.metrics_logger.log_metric(
"max_t_(cpu_pct_per_cpu_max)",
max([ report["cpu_pct_per_cpu_max"] for report in self.all_reports[node] ]),
step=node
)

# MEM
self.metrics_logger.log_metric(
"max_t_(mem_percent)",
max([ report["mem_percent"] for report in self.all_reports[node] ]),
step=node
)

# DISK
self.metrics_logger.log_metric(
"max_t_(disk_usage_percent)",
max([ report["disk_usage_percent"] for report in self.all_reports[node] ]),
step=node
)
self.metrics_logger.log_metric(
"max_t_(disk_io_read_mb)",
max([ report["disk_io_read_mb"] for report in self.all_reports[node] ]),
step=node
)
self.metrics_logger.log_metric(
"max_t_(disk_io_write_mb)",
max([ report["disk_io_write_mb"] for report in self.all_reports[node] ]),
step=node
)

# NET I/O
self.metrics_logger.log_metric(
"max_t_(net_io_lo_sent_mb)",
max([ report["net_io_lo_sent_mb"] for report in self.all_reports[node] ]),
step=node
)
self.metrics_logger.log_metric(
"max_t_(net_io_ext_sent_mb)",
max([ report["net_io_ext_sent_mb"] for report in self.all_reports[node] ]),
step=node
)
self.metrics_logger.log_metric(
"max_t_(net_io_lo_recv_mb)",
max([ report["net_io_lo_recv_mb"] for report in self.all_reports[node] ]),
step=node
)
self.metrics_logger.log_metric(
"max_t_(net_io_ext_recv_mb)",
max([ report["net_io_ext_recv_mb"] for report in self.all_reports[node] ]),
step=node
)
2 changes: 1 addition & 1 deletion src/scripts/training/lightgbm_python/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def load_lgbm_params_from_cli(self, args, mpi_config):
cli_params = dict(vars(args))

# removing arguments that are purely CLI
for key in ['verbose', 'custom_properties', 'export_model', 'test', 'train', 'custom_params', 'construct']:
for key in ['verbose', 'custom_properties', 'export_model', 'test', 'train', 'custom_params', 'construct', 'disable_perf_metrics']:
del cli_params[key]

# doing some fixes and hardcoded values
Expand Down
26 changes: 24 additions & 2 deletions tests/common/test_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import json

from common.components import SingleNodeScript
from common.metrics import MetricsLogger

class FakeSingleNodeScript(SingleNodeScript):
def __init__(self):
Expand All @@ -26,7 +27,7 @@ def run(self, args, logger, metrics_logger, unknown_args):
@patch('mlflow.start_run')
def test_single_node_script_metrics(mlflow_start_run_mock, mlflow_set_tags_mock, mlflow_log_metric_mock, mlflow_end_run_mock):
# just run main
FakeSingleNodeScript.main(
test_component = FakeSingleNodeScript.main(
[
"foo.py",
"--verbose", "True",
Expand Down Expand Up @@ -72,10 +73,31 @@ def test_single_node_script_metrics(mlflow_start_run_mock, mlflow_set_tags_mock,

# now let's test all metrics
metrics_calls = mlflow_log_metric_mock.call_args_list
assert len(metrics_calls) == 1

# 1 user metric, 11 performance metrics
assert len(metrics_calls) == 12

# user metric (time block)
assert metrics_calls[0].args[0] == "fake_time_block"
assert isinstance(metrics_calls[0].args[1], float)
assert "step" in metrics_calls[0].kwargs
assert metrics_calls[0].kwargs["step"] == 1

# perf metrics
perf_metrics_call_args = [
"max_t_(cpu_pct_per_cpu_avg)",
"max_t_(cpu_pct_per_cpu_min)",
"max_t_(cpu_pct_per_cpu_max)",
"max_t_(mem_percent)",
"max_t_(disk_usage_percent)",
"max_t_(disk_io_read_mb)",
"max_t_(disk_io_write_mb)",
"max_t_(net_io_lo_sent_mb)",
"max_t_(net_io_ext_sent_mb)",
"max_t_(net_io_lo_recv_mb)",
"max_t_(net_io_ext_recv_mb)",
]
for index, metric_key in enumerate(perf_metrics_call_args):
assert metrics_calls[index+1].args[0] == MetricsLogger._remove_non_allowed_chars(metric_key)
assert "step" in metrics_calls[index+1].kwargs
assert metrics_calls[index+1].kwargs["step"] == 0 # using node id as step
4 changes: 1 addition & 3 deletions tests/common/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,6 @@ def test_metrics_logger_log_metric_too_long(mlflow_log_metric_mock):

def test_metrics_logger_log_metric_non_allowed_chars():
""" Tests MetricsLogger().log_metric() """
metrics_logger = MetricsLogger()

test_cases = [
{
'input': "a!@$b%^&c_-/d",
Expand All @@ -137,7 +135,7 @@ def test_metrics_logger_log_metric_non_allowed_chars():
]

for test_case in test_cases:
assert metrics_logger._remove_non_allowed_chars(test_case['input']) == test_case['expected']
assert MetricsLogger._remove_non_allowed_chars(test_case['input']) == test_case['expected']


@patch('mlflow.set_tags')
Expand Down

0 comments on commit 65b5619

Please sign in to comment.