diff --git a/.gitignore b/.gitignore index f7d251f6..23f8e9e2 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,8 @@ +.vagrant +macos-sonoma.conf +macos-sonoma/ +Vagrantfile +Session.vim .idea .venv */__pycache__/* @@ -13,3 +18,5 @@ tmp/ _trial_temp/ pycharm-interpreter.sh python + +scratch.py diff --git a/examples/linux-powerjoular-profiling/README.md b/examples/linux-powerjoular-profiling/README.md index 974dd0f2..5c85d61d 100644 --- a/examples/linux-powerjoular-profiling/README.md +++ b/examples/linux-powerjoular-profiling/README.md @@ -20,9 +20,10 @@ pip install -r requirements.txt ## Running From the root directory of the repo, run the following command: +NOTE: This program must be run as root, as powerjoular requires this for its use of Intel RAPL. ```bash -python experiment-runner/ examples/linux-powerjoular-profiling/RunnerConfig.py +sudo python experiment-runner/ examples/linux-powerjoular-profiling/RunnerConfig.py ``` ## Results diff --git a/examples/linux-powerjoular-profiling/RunnerConfig.py b/examples/linux-powerjoular-profiling/RunnerConfig.py index 244b343d..aeb613dd 100644 --- a/examples/linux-powerjoular-profiling/RunnerConfig.py +++ b/examples/linux-powerjoular-profiling/RunnerConfig.py @@ -6,16 +6,15 @@ from ConfigValidator.Config.Models.OperationType import OperationType from ProgressManager.Output.OutputProcedure import OutputProcedure as output +from Plugins.Profilers.PowerJoular import PowerJoular + from typing import Dict, List, Any, Optional from pathlib import Path from os.path import dirname, realpath -import os -import signal -import pandas as pd import time import subprocess -import shlex +import numpy as np class RunnerConfig: ROOT_DIR = Path(dirname(realpath(__file__))) @@ -88,16 +87,18 @@ def start_run(self, context: RunnerContext) -> None: ) # Configure the environment based on the current variation - subprocess.check_call(shlex.split(f'cpulimit -b -p {self.target.pid} --limit {cpu_limit}')) + subprocess.check_call(f'cpulimit -p {self.target.pid} --limit {cpu_limit} &', shell=True) + time.sleep(1) # allow the process to run a little before measuring def start_measurement(self, context: RunnerContext) -> None: """Perform any activity required for starting measurements.""" - - profiler_cmd = f'powerjoular -l -p {self.target.pid} -f {context.run_dir / "powerjoular.csv"}' - - time.sleep(1) # allow the process to run a little before measuring - self.profiler = subprocess.Popen(shlex.split(profiler_cmd)) + + # Set up the powerjoular object, provide an (optional) target and output file name + self.meter = PowerJoular(target_pid=self.target.pid, + out_file=context.run_dir / "powerjoular.csv") + # Start measuring with powerjoular + self.meter.start() def interact(self, context: RunnerContext) -> None: """Perform any interaction with the running target system here, or block here until the target finishes.""" @@ -109,14 +110,14 @@ def interact(self, context: RunnerContext) -> None: def stop_measurement(self, context: RunnerContext) -> None: """Perform any activity here required for stopping measurements.""" - - os.kill(self.profiler.pid, signal.SIGINT) # graceful shutdown of powerjoular - self.profiler.wait() + + # Stop the measurements + stdout = self.meter.stop() def stop_run(self, context: RunnerContext) -> None: """Perform any activity here required for stopping the run. Activities after stopping the run should also be performed here.""" - + self.target.kill() self.target.wait() @@ -124,15 +125,17 @@ def populate_run_data(self, context: RunnerContext) -> Optional[Dict[str, Any]]: """Parse and process any measurement data here. You can also store the raw measurement data under `context.run_dir` Returns a dictionary with keys `self.run_table_model.data_columns` and their values populated""" - - # powerjoular.csv - Power consumption of the whole system - # powerjoular.csv-PID.csv - Power consumption of that specific process - df = pd.read_csv(context.run_dir / f"powerjoular.csv-{self.target.pid}.csv") - run_data = { - 'avg_cpu': round(df['CPU Utilization'].sum(), 3), - 'total_energy': round(df['CPU Power'].sum(), 3), + + out_file = context.run_dir / "powerjoular.csv" + + results_global = self.meter.parse_log(out_file) + # If you specified a target_pid or used the -p paramter + # a second csv for that target will be generated + # results_process = self.meter.parse_log(self.meter.target_logfile) + return { + 'avg_cpu': round(np.mean(list(results_global['CPU Utilization'].values())), 3), + 'total_energy': round(sum(list(results_global['CPU Power'].values())), 3), } - return run_data def after_experiment(self) -> None: """Perform any activity required after stopping the experiment here diff --git a/examples/linux-ps-profiling/RunnerConfig.py b/examples/linux-ps-profiling/RunnerConfig.py index 83060cec..b686308d 100644 --- a/examples/linux-ps-profiling/RunnerConfig.py +++ b/examples/linux-ps-profiling/RunnerConfig.py @@ -5,12 +5,13 @@ from ConfigValidator.Config.Models.RunnerContext import RunnerContext from ConfigValidator.Config.Models.OperationType import OperationType from ProgressManager.Output.OutputProcedure import OutputProcedure as output +from Plugins.Profilers.Ps import Ps from typing import Dict, List, Any, Optional from pathlib import Path from os.path import dirname, realpath -import pandas as pd +import numpy as np import time import subprocess import shlex @@ -64,14 +65,16 @@ def create_run_table_model(self) -> RunTableModel: exclude_variations = [ {cpu_limit_factor: [70], pin_core_factor: [False]} # all runs having the combination <'70', 'False'> will be excluded ], - data_columns=['avg_cpu'] + data_columns=["avg_cpu", "avg_mem"] ) return self.run_table_model def before_experiment(self) -> None: """Perform any activity required before starting the experiment here Invoked only once during the lifetime of the program.""" - subprocess.check_call(['make'], cwd=self.ROOT_DIR) # compile + + # compile the target program + subprocess.check_call(['make'], cwd=self.ROOT_DIR) def before_run(self) -> None: """Perform any activity required before starting a run. @@ -93,27 +96,21 @@ def start_run(self, context: RunnerContext) -> None: # Configure the environment based on the current variation if pin_core: - subprocess.check_call(shlex.split(f'taskset -cp 0 {self.target.pid}')) - subprocess.check_call(shlex.split(f'cpulimit -b -p {self.target.pid} --limit {cpu_limit}')) + subprocess.check_call(shlex.split(f'taskset -cp 0 {self.target.pid}')) + + # Limit the targets cputime + subprocess.check_call(f'cpulimit --limit={cpu_limit} -p {self.target.pid} &', shell=True) + time.sleep(1) # allow the process to run a little before measuring def start_measurement(self, context: RunnerContext) -> None: """Perform any activity required for starting measurements.""" - - # man 1 ps - # %cpu: - # cpu utilization of the process in "##.#" format. Currently, it is the CPU time used - # divided by the time the process has been running (cputime/realtime ratio), expressed - # as a percentage. It will not add up to 100% unless you are lucky. (alias pcpu). - profiler_cmd = f'ps -p {self.target.pid} --noheader -o %cpu' - wrapper_script = f''' - while true; do {profiler_cmd}; sleep 1; done - ''' - - time.sleep(1) # allow the process to run a little before measuring - self.profiler = subprocess.Popen(['sh', '-c', wrapper_script], - stdout=subprocess.PIPE, stderr=subprocess.PIPE - ) + + # Set up the ps object, provide an (optional) target and output file name + self.meter = Ps(out_file=context.run_dir / "ps.csv", + target_pid=[self.target.pid]) + # Start measuring with ps + self.meter.start() def interact(self, context: RunnerContext) -> None: """Perform any interaction with the running target system here, or block here until the target finishes.""" @@ -126,8 +123,8 @@ def interact(self, context: RunnerContext) -> None: def stop_measurement(self, context: RunnerContext) -> None: """Perform any activity here required for stopping measurements.""" - self.profiler.kill() - self.profiler.wait() + # Stop the measurements + stdout = self.meter.stop() def stop_run(self, context: RunnerContext) -> None: """Perform any activity here required for stopping the run. @@ -141,17 +138,13 @@ def populate_run_data(self, context: RunnerContext) -> Optional[Dict[str, Any]]: You can also store the raw measurement data under `context.run_dir` Returns a dictionary with keys `self.run_table_model.data_columns` and their values populated""" - df = pd.DataFrame(columns=['cpu_usage']) - for i, l in enumerate(self.profiler.stdout.readlines()): - cpu_usage=float(l.decode('ascii').strip()) - df.loc[i] = [cpu_usage] - - df.to_csv(context.run_dir / 'raw_data.csv', index=False) + results = self.meter.parse_log(context.run_dir / "ps.csv", + column_names=["cpu_usage", "memory_usage"]) - run_data = { - 'avg_cpu': round(df['cpu_usage'].mean(), 3) + return { + "avg_cpu": round(np.mean(list(results['cpu_usage'].values())), 3), + "avg_mem": round(np.mean(list(results['memory_usage'].values())), 3) } - return run_data def after_experiment(self) -> None: """Perform any activity required after stopping the experiment here diff --git a/examples/powermetrics-profiling/README.md b/examples/powermetrics-profiling/README.md new file mode 100644 index 00000000..c63c0ab5 --- /dev/null +++ b/examples/powermetrics-profiling/README.md @@ -0,0 +1,25 @@ + +# `powermetrics` profiler + +A simple example using the OS X [powermetrics](https://developer.apple.com/library/archive/documentation/Performance/Conceptual/power_efficiency_guidelines_osx/PrioritizeWorkAtTheTaskLevel.html#//apple_ref/doc/uid/TP40013929-CH35-SW10) cli tool to measure the ambient power consumtption of the system. + +## Requirements + +Install the requirements to run: + +```bash +pip install -r requirements.txt +``` + +## Running + +From the root directory of the repo, run the following command: +NOTE: This program must be run as root, as powermetrics requires this + +```bash +sudo python experiment-runner/ examples/powermetrics-profiling/RunnerConfig.py +``` + +## Results + +The results are generated in the `examples/powermetrics-profiling/experiments` folder. diff --git a/examples/powermetrics-profiling/RunnerConfig.py b/examples/powermetrics-profiling/RunnerConfig.py new file mode 100644 index 00000000..417aeccb --- /dev/null +++ b/examples/powermetrics-profiling/RunnerConfig.py @@ -0,0 +1,128 @@ +from EventManager.Models.RunnerEvents import RunnerEvents +from EventManager.EventSubscriptionController import EventSubscriptionController +from ConfigValidator.Config.Models.RunTableModel import RunTableModel +from ConfigValidator.Config.Models.FactorModel import FactorModel +from ConfigValidator.Config.Models.RunnerContext import RunnerContext +from ConfigValidator.Config.Models.OperationType import OperationType +from ProgressManager.Output.OutputProcedure import OutputProcedure as output +from Plugins.Profilers.PowerMetrics import PowerMetrics + +from typing import Dict, List, Any, Optional +from pathlib import Path +from os.path import dirname, realpath +import time +import numpy as np + +class RunnerConfig: + ROOT_DIR = Path(dirname(realpath(__file__))) + + # ================================ USER SPECIFIC CONFIG ================================ + """The name of the experiment.""" + name: str = "new_runner_experiment" + + """The path in which Experiment Runner will create a folder with the name `self.name`, in order to store the + results from this experiment. (Path does not need to exist - it will be created if necessary.) + Output path defaults to the config file's path, inside the folder 'experiments'""" + results_output_path: Path = ROOT_DIR / 'experiments' + + """Experiment operation type. Unless you manually want to initiate each run, use `OperationType.AUTO`.""" + operation_type: OperationType = OperationType.AUTO + + """The time Experiment Runner will wait after a run completes. + This can be essential to accommodate for cooldown periods on some systems.""" + time_between_runs_in_ms: int = 1000 + + # Dynamic configurations can be one-time satisfied here before the program takes the config as-is + # e.g. Setting some variable based on some criteria + def __init__(self): + """Executes immediately after program start, on config load""" + + EventSubscriptionController.subscribe_to_multiple_events([ + (RunnerEvents.BEFORE_EXPERIMENT, self.before_experiment), + (RunnerEvents.BEFORE_RUN , self.before_run ), + (RunnerEvents.START_RUN , self.start_run ), + (RunnerEvents.START_MEASUREMENT, self.start_measurement), + (RunnerEvents.INTERACT , self.interact ), + (RunnerEvents.STOP_MEASUREMENT , self.stop_measurement ), + (RunnerEvents.STOP_RUN , self.stop_run ), + (RunnerEvents.POPULATE_RUN_DATA, self.populate_run_data), + (RunnerEvents.AFTER_EXPERIMENT , self.after_experiment ) + ]) + self.run_table_model = None # Initialized later + output.console_log("Custom config loaded") + + def create_run_table_model(self) -> RunTableModel: + """Create and return the run_table model here. A run_table is a List (rows) of tuples (columns), + representing each run performed""" + + # Create the experiment run table with factors, and desired data columns + factor1 = FactorModel("test_factor", [1, 2]) + self.run_table_model = RunTableModel( + factors = [factor1], + data_columns=["joules", "avg_cpu", "avg_gpu"]) + + return self.run_table_model + + def before_experiment(self) -> None: + """Perform any activity required before starting the experiment here + Invoked only once during the lifetime of the program.""" + pass + + def before_run(self) -> None: + """Perform any activity required before starting a run. + No context is available here as the run is not yet active (BEFORE RUN)""" + pass + + def start_run(self, context: RunnerContext) -> None: + """Perform any activity required for starting the run here. + For example, starting the target system to measure. + Activities after starting the run should also be performed here.""" + pass + + def start_measurement(self, context: RunnerContext) -> None: + """Perform any activity required for starting measurements.""" + + # Create the powermetrics object we will use to collect data + self.meter = PowerMetrics(out_file=context.run_dir / "powermetrics.plist") + # Start measuring useing powermetrics + self.meter.start() + + def interact(self, context: RunnerContext) -> None: + """Perform any interaction with the running target system here, or block here until the target finishes.""" + + # Wait (block) for a bit to collect some data + time.sleep(20) + + def stop_measurement(self, context: RunnerContext) -> None: + """Perform any activity here required for stopping measurements.""" + + # Stop measuring at the end of a run + stdout = self.meter.stop() + + def stop_run(self, context: RunnerContext) -> None: + """Perform any activity here required for stopping the run. + Activities after stopping the run should also be performed here.""" + pass + + def populate_run_data(self, context: RunnerContext) -> Optional[Dict[str, Any]]: + """Parse and process any measurement data here. + You can also store the raw measurement data under `context.run_dir` + Returns a dictionary with keys `self.run_table_model.data_columns` and their values populated""" + + # Retrieve data from run + run_results = self.meter.parse_log(context.run_dir / "powermetrics.plist") + + # Parse it as required for your experiment and add it to the run table + return { + "joules": sum(map(lambda x: x["processor"]["package_joules"], run_results)), + "avg_cpu": np.mean(list(map(lambda x: x["processor"]["packages"][0]["cores_active_ratio"], run_results))), + "avg_gpu": np.mean(list(map(lambda x: x["processor"]["packages"][0]["gpu_active_ratio"], run_results))), + } + + def after_experiment(self) -> None: + """Perform any activity required after stopping the experiment here + Invoked only once during the lifetime of the program.""" + pass + + # ================================ DO NOT ALTER BELOW THIS LINE ================================ + experiment_path: Path = None diff --git a/experiment-runner/Plugins/Profilers/DataSource.py b/experiment-runner/Plugins/Profilers/DataSource.py new file mode 100644 index 00000000..e4c5891c --- /dev/null +++ b/experiment-runner/Plugins/Profilers/DataSource.py @@ -0,0 +1,234 @@ +from abc import ABC, abstractmethod +from collections import UserDict +from collections.abc import Iterable # This import is only valid >= python 3.10 I think +from pathlib import Path +from typing import get_origin, get_args +import platform +import shlex +from enum import StrEnum +import shutil +import subprocess + +class ParameterDict(UserDict): + def valid_key(self, key): + return isinstance(key, str) \ + or isinstance(key, tuple) \ + or isinstance(key, list[str]) + + def str_to_tuple(self, key): + return tuple([key]) + + def __setitem__(self, key, item): + if not self.valid_key(key): + raise RuntimeError("Unexpected key value") + + if isinstance(key, str): + key = self.str_to_tuple(key) + + for params in self.data.keys(): + if set(key).issubset(params): + raise RuntimeError("Keys cannot have duplicate elements") + + super().__setitem__(tuple(key), item) + + def __getitem__(self, key): + if not self.valid_key(key): + raise RuntimeError("Unexpected key type, expected `str` or `list[str]`") + + if isinstance(key, str): + key = self.str_to_tuple(key) + + for params, val in self.data.items(): + if set(key).issubset(params): + return val + + # Pass to default handler if we cant find it + super().__getitem__(tuple(key)) + + # Must pass entire valid key to delete element + def __delitem__(self, key): + if isinstance(key, str): + key = self.str_to_tuple(key) + + super().__delitem__(tuple(key)) + + def __contains__(self, key): + if isinstance(key, str): + key = self.str_to_tuple(key) + + for params, val in self.data.items(): + if set(key).issubset(params): + return True + + return False + +class DataSource(ABC): + def __init__(self): + self._validate_platform() + self.logfile = None + + def _validate_platform(self): + if platform.system() in self.supported_platforms: + return + + raise RuntimeError(f"One of: {self.supported_platforms} is required for this plugin") + + @property + @abstractmethod + def supported_platforms(self) -> list[str]: + pass + + @property + @abstractmethod + def source_name(self) -> str: + pass + + @abstractmethod + def __del__(self): + pass + + @staticmethod + @abstractmethod + def parse_log(): + pass + + +class CLISource(DataSource): + def __init__(self): + super().__init__() + + self.process = None + self.args = None + + def __del__(self): + if self.process: + self.process.kill() + + @property + @abstractmethod + def parameters(self) -> ParameterDict: + pass + + def _validate_platform(self): + super()._validate_platform() + + if shutil.which(self.source_name) is None: + raise RuntimeError(f"The {self.source_name} cli tool is required for this plugin") + + def _validate_start(self): + if self.process.poll() != None: + raise RuntimeError(f"{self.source_name} did not start correctly") + + def _validate_stop(self, stdout, stderr): + if stderr: + raise RuntimeWarning(f"{self.source_name} did not stop correctly, or encountered an error: {stderr}") + + # Should work well with single level type generics e.g. list[int] + # TODO: Expand this to be more robust with other types + def _validate_type(self, param, p_type): + if p_type != str and not isinstance(param, StrEnum) and isinstance(param, Iterable): + if type(param) != get_origin(p_type): + return False + + if type(param[0]) != get_args(p_type)[0]: + return False + + return True + + return isinstance(param, p_type) + + def _validate_parameters(self, parameters: dict): + for p, v in parameters.items(): + if p not in self.parameters: + raise RuntimeError(f"Unexpected parameter: {p}") + + if self.parameters[p] == None: + continue + + if not self._validate_type(v, self.parameters[p]): + raise RuntimeError(f"Unexpected type: {type(v)} for parameter {p}, expected {self.parameters[p]}") + + def _format_cmd(self): + self._validate_parameters(self.args) + cmd = self.source_name + + # Transform the parameter dict into string format to be parsed by shlex + for p, v in self.args.items(): + if v == None: + cmd += f" {p}" + elif isinstance(v, Iterable) and not (isinstance(v, StrEnum) or isinstance(v, str)): + cmd += f" {p} {",".join(map(str, v))}" + else: + cmd += f" {p} {v}" + + return cmd + + def update_parameters(self, add: dict={}, remove: list[str]=[]): + # Check if the new sets of parameters are sane + self._validate_parameters(add) + + for p, v in add.items(): + self.args[p] = v + + for p in remove: + if p in self.args.keys(): + del self.args[p] + + # Double check that our typeing is still valid + self._validate_parameters(self.args) + + def start(self): + try: + self.process = subprocess.Popen(shlex.split(self._format_cmd()), + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + except Exception as e: + self.process.kill() + raise RuntimeError(f"{self.source_name} process could not start: {e}") + + self._validate_start() + + def stop(self): + if not self.process: + return + + try: + self.process.terminate() + stdout, stderr = self.process.communicate(timeout=5) + + except Exception as e: + self.process.kill() + raise RuntimeError(f"{self.source_name} process could not stop {e}") + + self._validate_stop(stdout.decode("utf-8"), stderr.decode("utf-8")) + return stdout.decode("utf-8") + +class DeviceSource(DataSource): + def __init__(self): + super().__init__() + self.device_handle = None + + def __del__(self): + if self.device_handle: + self.close_device() + + @abstractmethod + def list_devices(self): + pass + + @abstractmethod + def open_device(self): + pass + + @abstractmethod + def close_device(self): + pass + + @abstractmethod + def set_mode(self): + pass + + @abstractmethod + def log(self, timeout: int = 60, logfile: Path = None): + pass + diff --git a/experiment-runner/Plugins/Profilers/PowerJoular.py b/experiment-runner/Plugins/Profilers/PowerJoular.py new file mode 100644 index 00000000..a3957fd5 --- /dev/null +++ b/experiment-runner/Plugins/Profilers/PowerJoular.py @@ -0,0 +1,52 @@ +from __future__ import annotations +from pathlib import Path +import pandas as pd +from Plugins.Profilers.DataSource import CLISource, ParameterDict + +# Supported Paramters for the PowerJoular metrics plugin +POWERJOULAR_PARAMETERS = { + ("-p",): int, + ("-a",): str, + ("-f",): Path, + ("-o",): Path, + ("-t",): None, + ("-l",): None, + ("-m",): str, + ("-s",): str +} + +class PowerJoular(CLISource): + parameters = ParameterDict(POWERJOULAR_PARAMETERS) + source_name = "powerjoular" + supported_platforms = ["Linux"] + + """An integration of PowerJoular into experiment-runner as a data source plugin""" + def __init__(self, + sample_frequency: int = 5000, + out_file: Path = "powerjoular.csv", + additional_args: dict = {}, + target_pid: int = None): + + super().__init__() + self.logfile = out_file + self.args = { + "-l": None, + "-f": Path(self.logfile), + } + + if target_pid: + self.update_parameters(add={"-p": target_pid}) + + self.update_parameters(add=additional_args) + + @property + def target_logfile(self): + if "-p" in self.args.keys(): + return f"{self.logfile}-{self.args["-p"]}.csv" + + return None + + @staticmethod + def parse_log(logfile: Path): + # Things are already in csv format here, no checks needed + return pd.read_csv(logfile).to_dict() diff --git a/experiment-runner/Plugins/Profilers/PowerMetrics.py b/experiment-runner/Plugins/Profilers/PowerMetrics.py new file mode 100644 index 00000000..e94a448a --- /dev/null +++ b/experiment-runner/Plugins/Profilers/PowerMetrics.py @@ -0,0 +1,165 @@ +from __future__ import annotations +from enum import StrEnum +from pathlib import Path +import plistlib + +from Plugins.Profilers.DataSource import ParameterDict, CLISource + +# How to format the output +class PMFormatTypes(StrEnum): + PM_FMT_TEXT = "text" + PM_FMT_PLIST = "plist" + +# How to order results +class PMOrderTypes(StrEnum): + PM_ORDER_PID = "pid" + PM_ORDER_WAKEUP = "wakeups" + PM_ORDER_CPU = "cputime" + PM_ORDER_HYBRID = "composite" + +# Which sources to sample from +class PMSampleTypes(StrEnum): + PM_SAMPLE_TASKS = "tasks" # per task cpu usage and wakeup stats + PM_SAMPLE_BAT = "battery" # battery and backlight info + PM_SAMPLE_NET = "network" # network usage info + PM_SAMPLE_DISK = "disk" # disk usage info + PM_SAMPLE_INT = "interrupts" # interrupt distribution + PM_SAMPLE_CPU_POWER = "cpu_power" # c-state residency, power and frequency info + PM_SAMPLE_TEMP = "thermal" # thermal pressure notifications + PM_SAMPLE_SFI = "sfi" # selective forced idle information + PM_SAMPLE_GPU_POWER = "gpu_power" # gpu c-state residency, p-state residency and frequency info + PM_SAMPLE_AGPM = "gpu_agpm_stats" # Statistics reported by AGPM + PM_SAMPLE_SMC = "smc" # SMC sensors + PM_SAMPLE_DCC = "gpu_dcc_stats" # gpu duty cycle info + PM_SAMPLE_NVME = "nvme_ssd" # NVMe power state information + PM_SAMPLE_THROTTLE = "io_throttle_ssd" # IO Throttling information + +# Supported Paramters for the power metrics plugin +POWERMETRICS_PARAMETERS = { + ("--poweravg", "-a"): int, + ("--buffer-size", "-b"): int, + ("--format", "-f"): PMFormatTypes, + ("--sample-rate", "-i"): int, + ("--sample-count", "-n"): int, + ("--output-file", "-o"): Path, + ("--order", "-r"): PMOrderTypes, + ("--samplers", "-s"): list[PMSampleTypes], + ("--wakeup-cost", "-t"): int, + ("--unhide-info",): list[PMSampleTypes], + ("--show-all", "-A"): None, + ("--show-initial-usage",): None, + ("--show-usage-summary",): None, + ("--show-extra-power-info",): None, + ("--show-pstates",): None, + ("--show-plimits",): None, + ("--show-cpu-qos",): None, + ("--show-cpu-scalability",): None, + ("--show-hwp-capability",): None, + ("--show-process-coalition",): None, + ("--show-responsible-pid",): None, + ("--show-process-wait-times",): None, + ("--show-process-qos-tiers",): None, + ("--show-process-io",): None, + ("--show-process-gpu",): None, + ("--show-process-netstats",): None, + ("--show-process-qos"): None, + ("--show-process-energy",): None, + ("--show-process-samp-norm",): None, + ("--handle-invalid-values",): None, + ("--hide-cpu-duty-cycle",): None, +} + +class PowerMetrics(CLISource): + parameters = ParameterDict(POWERMETRICS_PARAMETERS) + source_name = "powermetrics" + supported_platforms = ["Darwin"] + + """An integration of OSX powermetrics into experiment-runner as a data source plugin""" + def __init__(self, + sample_frequency: int = 5000, + out_file: Path = "pm_out.plist", + additional_args: dict = {}, + additional_samplers: list[PMSampleTypes] = [], + hide_cpu_duty_cycle: bool = True, + order: PMOrderTypes = PMOrderTypes.PM_ORDER_CPU): + super().__init__() + self.logfile = out_file + # Grab all available power stats by default + self.args = { + "--output-file": Path(self.logfile), + "--sample-rate": sample_frequency, + "--format": PMFormatTypes.PM_FMT_PLIST, + "--samplers": [PMSampleTypes.PM_SAMPLE_CPU_POWER, + PMSampleTypes.PM_SAMPLE_GPU_POWER, + PMSampleTypes.PM_SAMPLE_AGPM] + additional_samplers, + "--hide-cpu-duty-cycle": hide_cpu_duty_cycle, + "--order": order + } + + self.update_parameters(add=additional_args) + + @staticmethod + def parse_plist_power(pm_plists: list[dict]): + """ + Extracts from a list of plists, the relavent power statistics if present. If no + power stats are present, this returns an empty list. This is mainly a helper method, + to make the plists easier to work with. + + Parameters: + pm_plists (list[dict]): The list of plists created by parse_pm_plist + + Returns: + A list of dicts, each containing a subset of the available stats related to power. + """ + power_plists = [] + + for plist in pm_plists: + stats = {} + if "GPU" in plist.keys(): + stats["GPU"] = plist["GPU"].copy() + for gpu in range(len(stats["GPU"])): + del stats["GPU"][gpu]["misc_counters"] + del stats["GPU"][gpu]["p_states"] + + if "processor" in plist.keys(): + stats["processor"] = plist["processor"] + del stats["processor"]["packages"] + + if "agpm_stats" in plist.keys(): + stats["agpm_stats"] = plist["agpm_stats"] + + if "timestamp" in plist.keys(): + stats["timestamp"] = plist["timestamp"] + + power_plists.append(stats) + + return power_plists + + @staticmethod + def parse_log(logfile: Path): + """ + Parses a provided logfile from powermetrics in plist format. Powermetrics outputs a plist + for every sample taken, it included a newline after the closing <\plist>, we account for that here + to make things easier to parse. + + Parameters: + logfile (Path): The path to the plist logfile created by powermetrics + + Returns: + A list of dicts, each representing the plist for a given sample + """ + plists = [] + cur_plist = bytearray() + with open(logfile, "rb") as fp: + for l in fp.readlines(): + # Powermetrics outputs plists with null bytes inbetween. We account for this + if l[0] == 0: + cur_plist.extend(l[1:]) + else: + cur_plist.extend(l) + + if b"\n" in l: + plists.append(plistlib.loads(cur_plist)) + cur_plist = bytearray() + + return plists diff --git a/experiment-runner/Plugins/Profilers/Ps.py b/experiment-runner/Plugins/Profilers/Ps.py new file mode 100644 index 00000000..d5bcf461 --- /dev/null +++ b/experiment-runner/Plugins/Profilers/Ps.py @@ -0,0 +1,121 @@ +from __future__ import annotations +from pathlib import Path +import pandas as pd +from Plugins.Profilers.DataSource import CLISource, ParameterDict + +PS_PARAMTERS = { + ("-A", "-e"): None, + ("-a"): None, + ("a"): None, + ("-d"): None, + ("-N", "--deselect"): None, + ("r"): None, + ("T"): None, + ("x"): None, + + ("-C"): list[str], + ("-G", "--Group"): list[int], + ("-g", "--group"): list[str], + ("-p", "p", "--pid"): list[int], + ("--ppid"): list[int], + ("-q", "q", "--quick-pid"): list[int], + ("-s","--sid"): list[int], + ("-t", "t", "--tty"): list[int], + ("-u", "U", "--user"): list[int], + ("-U", "--User"): list[int], + + ("-D"): str, + ("-F"): None, + ("-f"): None, + ("f", "--forest"): None, + ("-H"): None, + ("-j"): None, + ("j"): None, + ("-l"): None, + ("l"): None, + ("-M", "Z"): None, + ("-O"): str, + ("O"): str, + ("-o", "o", "--format"): list[str], + ("-P"): None, + ("s"): None, + ("u"): None, + ("v"): None, + ("X"): None, + ("--context"): None, + ("--headers"): None, + ("--no-headers", "--noheader"): None, + ("--cols", "--columns", "--width"): int, + ("--rows", "--lines"): int, + ("--signames"): None, + + ("H"): None, + ("-L"): None, + ("-m", "m"): None, + ("-T"): None, + + ("-c"): None, + ("c"): None, + ("e"): None, + ("k", "--sort"): str, # There is a format type here, maybe regex this eventually + ("L"): None, + ("n"): None, + ("S", "--cumulative"): None, + ("-y"): None, + ("-w", "w"): None, +} + +class Ps(CLISource): + parameters = ParameterDict(PS_PARAMTERS) + source_name = "ps" + supported_platforms = ["Linux"] + + """An integration of the Linux ps utility into experiment-runner as a data source plugin""" + def __init__(self, + sleep_interval: int = 1, + out_file: Path = "ps.csv", + additional_args: dict = {}, + target_pid: list[int] = None, + out_format: list[str] = ["%cpu", "%mem"]): + + super().__init__() + # man 1 ps + # %cpu: + # cpu utilization of the process in "##.#" format. Currently, it is the CPU time used + # divided by the time the process has been running (cputime/realtime ratio), expressed + # as a percentage. It will not add up to 100% unless you are lucky. (alias pcpu). + # %mem: + # How much memory the process is currently using + self.logfile = out_file + self.sleep_interval = sleep_interval + self.args = { + "--noheader": None, + "-o": out_format + } + + if target_pid: + self.update_parameters(add={"-p": target_pid}) + + self.update_parameters(add=additional_args) + + def _format_cmd(self): + cmd = super()._format_cmd() + + output_cmd = "" + if self.logfile is not None: + output_cmd = f" >> {self.logfile}" + + # This wraps the ps utility so that it runs continously and also outputs into a csv like format + return f'''sh -c "while true; do {cmd} | awk '{{$1=$1}};1' | tr ' ' ','{output_cmd}; sleep {self.sleep_interval}; done"''' + + # The csv saved by default has no header, this must be provided by the user + @staticmethod + def parse_log(logfile: Path, column_names: list[str]): + # Ps has many options, we dont check them all. converting to csv might fail in some cases + try: + df = pd.read_csv(logfile, names=column_names) + except Exception as e: + print(f"Could not parse ps ouput csv: {e}") + + return df.to_dict() + diff --git a/experiment-runner/Plugins/Profilers/WattsUpPro.py b/experiment-runner/Plugins/Profilers/WattsUpPro.py index 43dd6ee7..6cac77f7 100644 --- a/experiment-runner/Plugins/Profilers/WattsUpPro.py +++ b/experiment-runner/Plugins/Profilers/WattsUpPro.py @@ -27,7 +27,7 @@ def __init__(self, port: str = None, interval=1.0): print( 'Default port is /dev/ttyUSB0 for Linux') raise RuntimeError("Invalid port") - self.s = serial.Serial(port, 115200 ) + self.s = serial.Serial(port, 115200) self.logfile = None self.interval = interval # initialize lists for keeping data diff --git a/experiment-runner/ProgressManager/Output/CSVOutputManager.py b/experiment-runner/ProgressManager/Output/CSVOutputManager.py index c42c9fde..fd7d1b50 100644 --- a/experiment-runner/ProgressManager/Output/CSVOutputManager.py +++ b/experiment-runner/ProgressManager/Output/CSVOutputManager.py @@ -6,6 +6,8 @@ from tempfile import NamedTemporaryFile import shutil import csv +import os +import pwd from typing import Dict, List @@ -62,6 +64,11 @@ def update_row_data(self, updated_row: dict): writer.writerow(row) shutil.move(tempfile.name, self._experiment_path / 'run_table.csv') + + # Change permissions so the files can be accessed if run as root (needed for some plugins) + user = pwd.getpwnam(os.getlogin()) + os.chown(self._experiment_path / "run_table.csv", user.pw_uid, user.pw_gid) + output.console_log_WARNING(f"CSVManager: Updated row {updated_row['__run_id']}") # with open(self.experiment_path + '/run_table.csv', 'w', newline='') as myfile: @@ -72,4 +79,4 @@ def update_row_data(self, updated_row: dict): # if name == row['name']: # row['name'] = input("enter new name for {}".format(name)) # # write the row either way - # writer.writerow({'name': row['name'], 'number': row['number'], 'address': row['address']}) \ No newline at end of file + # writer.writerow({'name': row['name'], 'number': row['number'], 'address': row['address']}) diff --git a/test/Plugins/Profilers/test_PowerJoular.py b/test/Plugins/Profilers/test_PowerJoular.py new file mode 100644 index 00000000..fafa627d --- /dev/null +++ b/test/Plugins/Profilers/test_PowerJoular.py @@ -0,0 +1,78 @@ +import os +import unittest +import time +import psutil +import sys + +sys.path.append("experiment-runner") +from Plugins.Profilers.PowerJoular import PowerJoular + +class TestPowerJoular(unittest.TestCase): + def tearDown(self): + if self.plugin is None: + return + + if self.plugin.target_logfile \ + and os.path.exists(self.plugin.target_logfile): + + os.remove(self.plugin.target_logfile) + + if os.path.exists(self.plugin.logfile): + os.remove(self.plugin.logfile) + + self.plugin = None + + def test_update(self): + self.plugin = PowerJoular() + original_args = self.plugin.args.copy() + + self.plugin.update_parameters(add={"-t": None}) + self.assertIn(("-t", None), self.plugin.args.items()) + + self.plugin.update_parameters(remove=["-t"]) + self.assertDictEqual(original_args, self.plugin.args) + + self.plugin.update_parameters(add={"-a": "program"}) + self.assertIn(("-a", "program"), self.plugin.args.items()) + + def test_invalid_update(self): + self.plugin = PowerJoular() + + with self.assertRaises(RuntimeError): + self.plugin.update_parameters(add={"--not-a-valid-parameter": None}) + + original_args = self.plugin.args.copy() + + # This should be a null op + self.plugin.update_parameters(remove=["--not-a-valid-parameter"]) + self.assertDictEqual(original_args, self.plugin.args) + + with self.assertRaises(RuntimeError): + self.plugin.update_parameters(add={"-p": "not the correct type"}) + + def test_run(self): + valid_pid = psutil.pids()[0] # Could possibly fail if the process finishes before we test + test_outfile = "/tmp/pj_test_out.csv" + self.plugin = PowerJoular(out_file=test_outfile, target_pid=valid_pid) + + sleep_len = 2 + + self.plugin.start() + time.sleep(sleep_len) + self.plugin.stop() + + self.assertTrue(os.path.exists(test_outfile)) + self.assertTrue(os.path.exists(self.plugin.target_logfile)) + + # We should see sleep_len - 1 entries in the log + log = self.plugin.parse_log(test_outfile) + target_log = self.plugin.parse_log(self.plugin.target_logfile) + + for k, v in log.items(): + self.assertEqual(len(v), sleep_len - 1) + + for k, v in target_log.items(): + self.assertEqual(len(v), sleep_len - 1) + +if __name__ == '__main__': + unittest.main() diff --git a/test/Plugins/Profilers/test_PowerMetrics.py b/test/Plugins/Profilers/test_PowerMetrics.py new file mode 100644 index 00000000..4ca8241d --- /dev/null +++ b/test/Plugins/Profilers/test_PowerMetrics.py @@ -0,0 +1,89 @@ +import os +import unittest +import time +import sys +from pprint import pprint + +sys.path.append("experiment-runner") +from Plugins.Profilers.PowerMetrics import PowerMetrics +from Plugins.Profilers.PowerMetrics import PMSampleTypes + +class TestPowerMetrics(unittest.TestCase): + # def tearDown(self): + # if self.plugin is None: + # return + + # if os.path.exists(self.plugin.logfile): + # os.remove(self.plugin.logfile) + + # self.plugin = None + + def test_update(self): + self.plugin = PowerMetrics() + original_args = self.plugin.args.copy() + + self.plugin.update_parameters(add={"--show-process-qos": None}) + self.assertIn(("--show-process-qos", None), self.plugin.args.items()) + + self.plugin.update_parameters(remove=["--show-process-qos"]) + self.assertDictEqual(original_args, self.plugin.args) + + self.plugin.update_parameters(add={"--unhide-info": + [PMSampleTypes.PM_SAMPLE_TASKS, PMSampleTypes.PM_SAMPLE_SFI]}) + self.assertIn(("--unhide-info", + [PMSampleTypes.PM_SAMPLE_TASKS, PMSampleTypes.PM_SAMPLE_SFI]), self.plugin.args.items()) + + def test_invalid_update(self): + self.plugin = PowerMetrics() + + with self.assertRaises(RuntimeError): + self.plugin.update_parameters(add={"--not-a-valid-parameter": None}) + + original_args = self.plugin.args.copy() + + # This should be a null op + self.plugin.update_parameters(remove=["--not-a-valid-parameter"]) + self.assertDictEqual(original_args, self.plugin.args) + + with self.assertRaises(RuntimeError): + self.plugin.update_parameters(add={"--unhide-info": ["not", "correct", "type"]}) + + def test_run(self): + test_outfile = "/tmp/pm_test_out.csv" + self.plugin = PowerMetrics(out_file=test_outfile, sample_frequency=1000) + + sleep_len = 2 + + self.plugin.start() + time.sleep(sleep_len) + self.plugin.stop() + + self.assertTrue(os.path.exists(test_outfile)) + + log = self.plugin.parse_log(test_outfile) + power_data = self.plugin.parse_plist_power(log) + + # powermetrics returns a seperate plist for each measurement + self.assertEqual(len(log), (sleep_len/(self.plugin.args["--sample-rate"]/1000)) - 1) + + # Make sure we have results from each sampler + for sampler in map(lambda x: x.value, self.plugin.args["--samplers"]): + # TODO: This doesnt properly check all results, only for the parameters used in this test + # As names of samplers can differ from names of the data headers, we approximate this a bit. + for l in log: + if "cpu_power" in sampler: + self.assertIn("package_joules", l["processor"].keys()) + self.assertIn("package_watts", l["processor"].keys()) + else: + self.assertTrue(len(list(filter(lambda x: sampler.lower() in x.lower() or x.lower() in sampler.lower(), l.keys()))) > 0) + + # Check that our power data filter is also working + for l in power_data: + self.assertIn("GPU", l.keys()) + self.assertIn("agpm_stats", l.keys()) + self.assertIn("processor", l.keys()) + self.assertIn("timestamp", l.keys()) + + +if __name__ == '__main__': + unittest.main() diff --git a/test/Plugins/Profilers/test_Ps.py b/test/Plugins/Profilers/test_Ps.py new file mode 100755 index 00000000..e03b309c --- /dev/null +++ b/test/Plugins/Profilers/test_Ps.py @@ -0,0 +1,76 @@ +import os +import time +import unittest +import sys +import psutil + +sys.path.append("experiment-runner") +from Plugins.Profilers.Ps import Ps + +class TestPs(unittest.TestCase): + def tearDown(self): + if self.plugin is None: + return + + if os.path.exists(self.plugin.logfile): + os.remove(self.plugin.logfile) + + self.plugin = None + + def test_update(self): + self.plugin = Ps() + original_args = self.plugin.args.copy() + + self.plugin.update_parameters(add={"-e": None}) + self.assertIn(("-e", None), self.plugin.args.items()) + + self.plugin.update_parameters(remove=["-e"]) + self.assertDictEqual(original_args, self.plugin.args) + + self.plugin.update_parameters(add={"--cols": 2}) + self.assertIn(("--cols", 2), self.plugin.args.items()) + + self.plugin.update_parameters(add={"-p": [1,2,3]}) + self.assertIn(("-p", [1,2,3]), self.plugin.args.items()) + + def test_invalid_update(self): + self.plugin = Ps() + + with self.assertRaises(RuntimeError): + self.plugin.update_parameters(add={"--not-a-valid-parameter": None}) + + original_args = self.plugin.args.copy() + + # This should be a null op + self.plugin.update_parameters(remove=["--not-a-valid-parameter"]) + self.assertDictEqual(original_args, self.plugin.args) + + with self.assertRaises(RuntimeError): + self.plugin.update_parameters(add={"--cols": "not the correct type"}) + + with self.assertRaises(RuntimeError): + self.plugin.update_parameters(add={"-p": ["not", "correct", "type"]}) + + + def test_run(self): + valid_pid = psutil.pids()[1] # Could possibly fail if the process finishes before we test + test_outfile = "/tmp/ps_test_out.csv" + self.plugin = Ps(out_file=test_outfile, target_pid=[valid_pid]) + + sleep_len = 2 + headers = self.plugin.args["-o"] + + self.plugin.start() + time.sleep(sleep_len) + self.plugin.stop() + + self.assertTrue(os.path.exists(test_outfile)) + + # We should see 2 entries in the log + log = self.plugin.parse_log(test_outfile, headers) + + for hdr in headers: + self.assertEqual(len(log[hdr]), sleep_len) + +if __name__ == '__main__': + unittest.main()