Skip to content
This repository has been archived by the owner on Apr 8, 2024. It is now read-only.

Commit

Permalink
Create performance reporting thread for single node components (#189)
Browse files Browse the repository at this point in the history
* working draft class
* reduce class, implement unit tests
  • Loading branch information
jfomhover authored Dec 7, 2021
1 parent efda302 commit a1d92a1
Show file tree
Hide file tree
Showing 2 changed files with 181 additions and 0 deletions.
123 changes: 123 additions & 0 deletions src/common/perf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.

"""
Helps with reporting performance metrics (cpu/mem utilization).
Needs to be implemented in the rest of the code.
"""
import logging
import threading
import time
import psutil


class PerformanceReportingThread(threading.Thread):
"""Thread to report performance (cpu/mem/net)"""
def __init__(self,
initial_time_increment=1.0):
"""Constructor"""
threading.Thread.__init__(self)
self.killed = False # flag, set to True to kill from the inside

self.logger = logging.getLogger(__name__)

# time between perf reports
self.time_increment = initial_time_increment


##################
### CALL BACKS ###
##################

def call_on_loop(self, perf_report):
"""You need to implement this"""
pass

def call_on_exit(self):
"""You need to implement this"""
pass

#####################
### RUN FUNCTIONS ###
#####################

def run(self):
"""Run function of the thread, while(True)"""
while not(self.killed):
if self.time_increment >= 1.0: # cpu_percent.interval already consumes 1sec
time.sleep(self.time_increment - 1.0) # will double every time report_store_max_length is reached
self._run_loop()

self.call_on_exit()

def _run_loop(self):
"""What to run within the while(not(killed))"""
perf_report = {}

# CPU UTILIZATION
cpu_utilization = psutil.cpu_percent(interval=1.0, percpu=True) # will take 1 sec to return
perf_report["cpu_pct_per_cpu_avg"] = sum(cpu_utilization) / len(cpu_utilization)
perf_report["cpu_pct_per_cpu_min"] = min(cpu_utilization)
perf_report["cpu_pct_per_cpu_max"] = max(cpu_utilization)

# MEM UTILIZATION
perf_report["mem_percent"] = psutil.virtual_memory().percent

# DISK UTILIZAITON
perf_report["disk_usage_percent"] = psutil.disk_usage('/').percent
perf_report["disk_io_read_mb"] = (psutil.disk_io_counters(perdisk=False).read_bytes / (1024 * 1024))
perf_report["disk_io_write_mb"] = (psutil.disk_io_counters(perdisk=False).write_count / (1024 * 1024))

# NET I/O SEND/RECV
net_io_counters = psutil.net_io_counters(pernic=True)
net_io_lo_identifiers = []
net_io_ext_identifiers = []

for key in net_io_counters:
if 'loopback' in key.lower():
net_io_lo_identifiers.append(key)
elif key.lower() == 'lo':
net_io_lo_identifiers.append(key)
else:
net_io_ext_identifiers.append(key)

lo_sent_mb = sum(
[
net_io_counters.get(key).bytes_sent
for key in net_io_lo_identifiers
]
) / (1024 * 1024)

ext_sent_mb = sum(
[
net_io_counters.get(key).bytes_sent
for key in net_io_ext_identifiers
]
) / (1024 * 1024)

lo_recv_mb = sum(
[
net_io_counters.get(key).bytes_recv
for key in net_io_lo_identifiers
]
) / (1024 * 1024)

ext_recv_mb = sum(
[
net_io_counters.get(key).bytes_recv
for key in net_io_ext_identifiers
]
) / (1024 * 1024)

perf_report["net_io_lo_sent_mb"] = lo_sent_mb
perf_report["net_io_ext_sent_mb"] = ext_sent_mb
perf_report["net_io_lo_recv_mb"] = lo_recv_mb
perf_report["net_io_ext_recv_mb"] = ext_recv_mb

# END OF REPORT
self.call_on_loop(perf_report)

def finalize(self):
"""Ask the thread to finalize (clean)"""
self.killed = True
self.join()
58 changes: 58 additions & 0 deletions tests/common/test_perf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
"""Tests src/common/metrics.py"""
import os
import pytest
from unittest.mock import call, Mock, patch
import time

from common.perf import PerformanceReportingThread

def verify_all_perf_report_keys(perf_report):
"""Helper test function, tests all keys in perf report"""
assert isinstance(perf_report, dict)

required_keys = [
"cpu_pct_per_cpu_avg",
"cpu_pct_per_cpu_min",
"cpu_pct_per_cpu_max",
"mem_percent",
"disk_usage_percent",
"disk_io_read_mb",
"disk_io_write_mb",
"net_io_lo_sent_mb",
"net_io_ext_sent_mb",
"net_io_lo_recv_mb",
"net_io_ext_recv_mb"
]

for key in required_keys:
assert key in perf_report, f"key {key} should be in the perf report, but instead we find: {list(perf_report.keys())}"
assert isinstance(perf_report[key], float) # all metrics are float so far\

assert "not_in_perf_report" not in perf_report


def test_perf_report_run_as_thread():
""" Tests PerformanceReportingThread() as a thread """
# creating a mock to provide as callback
call_on_loop_method = Mock()
call_on_exit_method = Mock()

perf_report_thread = PerformanceReportingThread(initial_time_increment=2.0)
perf_report_thread.call_on_loop = call_on_loop_method
perf_report_thread.call_on_exit = call_on_exit_method

perf_report_thread.start() # will engage in first loop and sleep 2.0
time.sleep(0.5) # will wait to be in the middle of that loop
perf_report_thread.finalize()

# on exit not called in this one
call_on_exit_method.assert_called_once()

# get all mock calls
callback_call_args = call_on_loop_method.call_args_list

assert len(callback_call_args) == 1 # just called once
assert len(callback_call_args[0].args) == 1 # only 1 argument

perf_report = callback_call_args[0].args[0]
verify_all_perf_report_keys(perf_report)

0 comments on commit a1d92a1

Please sign in to comment.