diff --git a/.gitignore b/.gitignore index 1d47b25..c9bf790 100644 --- a/.gitignore +++ b/.gitignore @@ -23,3 +23,5 @@ env*/* pyenv* *.loop *.spec +venv/ +.venv/ diff --git a/s_tui/about_menu.py b/s_tui/about_menu.py index 351dc36..df57504 100644 --- a/s_tui/about_menu.py +++ b/s_tui/about_menu.py @@ -43,34 +43,31 @@ \n\ """ -ABOUT_MESSAGE += "s-tui " + __version__ +\ - " Released under GNU GPLv2 " +ABOUT_MESSAGE += "s-tui " + __version__ + " Released under GNU GPLv2 " MESSAGE_LEN = 20 class AboutMenu: - """Displays the About message menu """ + """Displays the About message menu""" + MAX_TITLE_LEN = 50 def __init__(self, return_fn): - self.return_fn = return_fn self.about_message = ABOUT_MESSAGE self.time_out_ctrl = urwid.Text(self.about_message) - cancel_button = urwid.Button('Exit', on_press=self.on_cancel) - cancel_button._label.align = 'center' + cancel_button = urwid.Button("Exit", on_press=self.on_cancel) + cancel_button._label.align = "center" if_buttons = urwid.Columns([cancel_button]) - title = urwid.Text(('bold text', u" About Menu \n"), 'center') + title = urwid.Text(("bold text", " About Menu \n"), "center") - self.titles = [title, - self.time_out_ctrl, - if_buttons] + self.titles = [title, self.time_out_ctrl, if_buttons] self.main_window = urwid.LineBox(ViListBox(self.titles)) diff --git a/s_tui/help_menu.py b/s_tui/help_menu.py index db353e2..522ca33 100644 --- a/s_tui/help_menu.py +++ b/s_tui/help_menu.py @@ -52,34 +52,32 @@ class HelpMenu: - """ HelpMenu is a widget containing instructions on usage of s-tui""" + """HelpMenu is a widget containing instructions on usage of s-tui""" + MAX_TITLE_LEN = 90 def __init__(self, return_fn): - self.return_fn = return_fn self.help_message = HELP_MESSAGE self.time_out_ctrl = urwid.Text(self.help_message) - cancel_button = urwid.Button(('Exit'), on_press=self.on_cancel) - cancel_button._label.align = 'center' + cancel_button = urwid.Button(("Exit"), on_press=self.on_cancel) + cancel_button._label.align = "center" if_buttons = urwid.Columns([cancel_button]) - title = urwid.Text(('bold text', u" Help Menu \n"), 'center') + title = urwid.Text(("bold text", " Help Menu \n"), "center") - self.titles = [title, - self.time_out_ctrl, - if_buttons] + self.titles = [title, self.time_out_ctrl, if_buttons] self.main_window = urwid.LineBox(ViListBox(self.titles)) def get_size(self): - """ returns size of HelpMenu""" + """returns size of HelpMenu""" return MESSAGE_LEN + 3, self.MAX_TITLE_LEN def on_cancel(self, w): - """ Returns to original widget""" + """Returns to original widget""" self.return_fn() diff --git a/s_tui/helper_functions.py b/s_tui/helper_functions.py index 4ded596..d7c19af 100644 --- a/s_tui/helper_functions.py +++ b/s_tui/helper_functions.py @@ -30,7 +30,7 @@ from collections import OrderedDict -__version__ = "1.1.4" +__version__ = "1.1.5" _DEFAULT = object() PY3 = sys.version_info[0] == 3 @@ -46,24 +46,28 @@ def get_processor_name(): - """ Returns the processor name in the system """ + """Returns the processor name in the system""" if platform.system() == "Linux": with open("/proc/cpuinfo", "rb") as cpuinfo: all_info = cpuinfo.readlines() for line in all_info: - if b'model name' in line: - return re.sub(b'.*model name.*:', b'', line, 1) + if b"model name" in line: + return re.sub(b".*model name.*:", b"", line, 1) elif platform.system() == "FreeBSD": cmd = ["sysctl", "-n", "hw.model"] process = subprocess.Popen( - cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, ) str_value = process.stdout.read() return str_value elif platform.system() == "Darwin": - cmd = ['sysctl', '-n', 'machdep.cpu.brand_string'] + cmd = ["sysctl", "-n", "machdep.cpu.brand_string"] process = subprocess.Popen( - cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, ) str_value = process.stdout.read() return str_value @@ -72,25 +76,25 @@ def get_processor_name(): def kill_child_processes(parent_proc): - """ Kills a process and all its children """ + """Kills a process and all its children""" logging.debug("Killing stress process") try: for proc in parent_proc.children(recursive=True): - logging.debug('Killing %s', proc) + logging.debug("Killing %s", proc) proc.kill() parent_proc.kill() except AttributeError: - logging.debug('No such process') - logging.debug('Could not kill process') + logging.debug("No such process") + logging.debug("Could not kill process") def output_to_csv(sources, csv_writeable_file): """Print statistics to csv file""" file_exists = os.path.isfile(csv_writeable_file) - with open(csv_writeable_file, 'a') as csvfile: + with open(csv_writeable_file, "a") as csvfile: csv_dict = OrderedDict() - csv_dict.update({'Time': time.strftime("%Y-%m-%d_%H:%M:%S")}) + csv_dict.update({"Time": time.strftime("%Y-%m-%d_%H:%M:%S")}) summaries = [val for key, val in sources.items()] for summarie in summaries: update_dict = dict() @@ -139,11 +143,11 @@ def get_user_config_dir(): """ Return the path to the user s-tui config directory """ - user_home = os.getenv('XDG_CONFIG_HOME') + user_home = os.getenv("XDG_CONFIG_HOME") if user_home is None or not user_home: - config_path = os.path.expanduser(os.path.join('~', '.config', 's-tui')) + config_path = os.path.expanduser(os.path.join("~", ".config", "s-tui")) else: - config_path = os.path.join(user_home, 's-tui') + config_path = os.path.join(user_home, "s-tui") return config_path @@ -152,9 +156,9 @@ def get_config_dir(): """ Return the path to the user home config directory """ - user_home = os.getenv('XDG_CONFIG_HOME') + user_home = os.getenv("XDG_CONFIG_HOME") if user_home is None or not user_home: - config_path = os.path.expanduser(os.path.join('~', '.config')) + config_path = os.path.expanduser(os.path.join("~", ".config")) else: config_path = user_home @@ -165,12 +169,13 @@ def get_user_config_file(): """ Return the path to the user s-tui config directory """ - user_home = os.getenv('XDG_CONFIG_HOME') + user_home = os.getenv("XDG_CONFIG_HOME") if user_home is None or not user_home: - config_path = os.path.expanduser(os.path.join('~', '.config', - 's-tui', 's-tui.conf')) + config_path = os.path.expanduser( + os.path.join("~", ".config", "s-tui", "s-tui.conf") + ) else: - config_path = os.path.join(user_home, 's-tui', 's-tui.conf') + config_path = os.path.join(user_home, "s-tui", "s-tui.conf") return config_path @@ -212,7 +217,7 @@ def make_user_config_dir(): if not user_config_dir_exists(): try: os.mkdir(config_path) - os.mkdir(os.path.join(config_path, 'hooks.d')) + os.mkdir(os.path.join(config_path, "hooks.d")) except OSError: return None @@ -220,24 +225,25 @@ def make_user_config_dir(): def seconds_to_text(secs): - """ Converts seconds to a string of hours:minutes:seconds """ - hours = (secs)//3600 - minutes = (secs - hours*3600)//60 - seconds = secs - hours*3600 - minutes*60 + """Converts seconds to a string of hours:minutes:seconds""" + hours = (secs) // 3600 + minutes = (secs - hours * 3600) // 60 + seconds = secs - hours * 3600 - minutes * 60 return "%02d:%02d:%02d" % (hours, minutes, seconds) def str_to_bool(string): - """ Converts a string to a boolean """ - if string == 'True': + """Converts a string to a boolean""" + if string == "True": return True - if string == 'False': + if string == "False": return False raise ValueError def which(program): - """ Find the path of an executable """ + """Find the path of an executable""" + def is_exe(fpath): return os.path.isfile(fpath) and os.access(fpath, os.X_OK) @@ -264,8 +270,8 @@ def _open_text(fname, **kwargs): On Python 2 this is just an alias for open(name, 'rt'). """ if PY3: - kwargs.setdefault('encoding', ENCODING) - kwargs.setdefault('errors', ENCODING_ERRS) + kwargs.setdefault("encoding", ENCODING) + kwargs.setdefault("errors", ENCODING_ERRS) return open(fname, "rt", **kwargs) diff --git a/s_tui/s_tui.py b/s_tui/s_tui.py index 1c52382..76db817 100755 --- a/s_tui/s_tui.py +++ b/s_tui/s_tui.py @@ -49,6 +49,7 @@ from s_tui.help_menu import HELP_MESSAGE from s_tui.stress_menu import StressMenu from s_tui.sensors_menu import SensorsMenu + # Helpers from s_tui.helper_functions import __version__ from s_tui.helper_functions import get_processor_name @@ -64,6 +65,7 @@ from s_tui.helper_functions import seconds_to_text from s_tui.helper_functions import str_to_bool from s_tui.helper_functions import which + # Ui Elements from s_tui.sturwid.ui_elements import ViListBox from s_tui.sturwid.ui_elements import radio_button @@ -71,6 +73,7 @@ from s_tui.sturwid.ui_elements import DEFAULT_PALETTE from s_tui.sturwid.bar_graph_vector import BarGraphVector from s_tui.sturwid.summary_text_list import SummaryTextList + # Sources from s_tui.sources.util_source import UtilSource from s_tui.sources.freq_source import FreqSource @@ -81,17 +84,19 @@ UPDATE_INTERVAL = 1 HOOK_INTERVAL = 30 * 1000 -DEGREE_SIGN = u'\N{DEGREE SIGN}' +DEGREE_SIGN = "\N{DEGREE SIGN}" ZERO_TIME = seconds_to_text(0) DEFAULT_LOG_FILE = "_s-tui.log" DEFAULT_CSV_FILE = "s-tui_log_" + time.strftime("%Y-%m-%d_%H_%M_%S") + ".csv" -VERSION_MESSAGE = \ - "s-tui " + __version__ +\ - " - (C) 2017-2020 Alex Manuskin, Gil Tsuker\n\ +VERSION_MESSAGE = ( + "s-tui " + + __version__ + + " - (C) 2017-2020 Alex Manuskin, Gil Tsuker\n\ Released under GNU GPLv2" +) ERROR_MESSAGE = "\n\ Oops! s-tui has encountered a fatal error\n\ @@ -101,23 +106,23 @@ class MainLoop(urwid.MainLoop): - """ Inherit urwid Mainloop to catch special character inputs""" + """Inherit urwid Mainloop to catch special character inputs""" + def signal_handler(self, frame): """signal handler for properly exiting Ctrl+C""" graph_controller.stress_conroller.kill_stress_process() raise urwid.ExitMainLoop() def unhandled_input(self, uinput): - logging.debug('Caught %s', uinput) - if uinput == 'q': + logging.debug("Caught %s", uinput) + if uinput == "q": graph_controller.stress_conroller.kill_stress_process() raise urwid.ExitMainLoop() - if uinput == 'f1': - graph_controller.view.on_help_menu_open( - graph_controller.view.main_window_w) + if uinput == "f1": + graph_controller.view.on_help_menu_open(graph_controller.view.main_window_w) - if uinput == 'esc': + if uinput == "esc": graph_controller.view.on_menu_close() signal.signal(signal.SIGINT, signal_handler) @@ -130,39 +135,39 @@ class StressController: """ def __init__(self, stress_installed, firestarter_installed): - self.stress_modes = ['Monitor'] + self.stress_modes = ["Monitor"] if stress_installed: - self.stress_modes.append('Stress') + self.stress_modes.append("Stress") if firestarter_installed: - self.stress_modes.append('FIRESTARTER') + self.stress_modes.append("FIRESTARTER") self.current_mode = self.stress_modes[0] self.stress_process = None def get_modes(self): - """ Returns all possible stress_modes for stress operations """ + """Returns all possible stress_modes for stress operations""" return self.stress_modes def get_current_mode(self): - """ Returns the current stress test mode, monitor/stress/other """ + """Returns the current stress test mode, monitor/stress/other""" return self.current_mode def set_mode(self, mode): - """ Sets a stress test mode monitor/stress/other """ + """Sets a stress test mode monitor/stress/other""" self.current_mode = mode def get_stress_process(self): - """ Returns the current external stress process running """ + """Returns the current external stress process running""" return self.stress_process def set_stress_process(self, proc): - """ Sets the current stress process running """ + """Sets the current stress process running""" self.stress_process = proc def kill_stress_process(self): - """ Kills the current running stress process """ + """Kills the current running stress process""" try: kill_child_processes(self.stress_process) except psutil.NoSuchProcess: @@ -170,11 +175,12 @@ def kill_stress_process(self): self.stress_process = None def start_stress(self, stress_cmd): - """ Starts a new stress process with a given cmd """ - with open(os.devnull, 'w') as dev_null: + """Starts a new stress process with a given cmd""" + with open(os.devnull, "w") as dev_null: try: - stress_proc = subprocess.Popen(stress_cmd, stdout=dev_null, - stderr=dev_null) + stress_proc = subprocess.Popen( + stress_cmd, stdout=dev_null, stderr=dev_null + ) self.set_stress_process(psutil.Process(stress_proc.pid)) except OSError: logging.debug("Unable to start stress") @@ -187,6 +193,7 @@ class GraphView(urwid.WidgetPlaceholder): The GraphView can change the state of the graph, since it provides the UI The change is state should be reflected in the GraphController """ + def __init__(self, controller): # constants self.left_margin = 0 @@ -198,9 +205,10 @@ def __init__(self, controller): # general urwid items self.clock_view = urwid.Text(ZERO_TIME, align="center") - self.refresh_rate_ctrl = urwid.Edit((u'Refresh[s]:'), - self.controller.refresh_rate) - self.hline = urwid.AttrWrap(urwid.SolidFill(u' '), 'line') + self.refresh_rate_ctrl = urwid.Edit( + ("Refresh[s]:"), self.controller.refresh_rate + ) + self.hline = urwid.AttrWrap(urwid.SolidFill(" "), "line") self.mode_buttons = [] @@ -211,25 +219,27 @@ def __init__(self, controller): self.graph_place_holder = urwid.WidgetPlaceholder(urwid.Pile([])) # construct the various menus during init phase - self.stress_menu = StressMenu(self.on_menu_close, - self.controller.stress_exe) + self.stress_menu = StressMenu(self.on_menu_close, self.controller.stress_exe) self.help_menu = HelpMenu(self.on_menu_close) self.about_menu = AboutMenu(self.on_menu_close) - self.graphs_menu = SensorsMenu(self.on_graphs_menu_close, - self.controller.sources, - self.controller.graphs_default_conf) - self.summary_menu = SensorsMenu(self.on_summary_menu_close, - self.controller.sources, - self.controller.summary_default_conf) + self.graphs_menu = SensorsMenu( + self.on_graphs_menu_close, + self.controller.sources, + self.controller.graphs_default_conf, + ) + self.summary_menu = SensorsMenu( + self.on_summary_menu_close, + self.controller.sources, + self.controller.summary_default_conf, + ) # call super urwid.WidgetPlaceholder.__init__(self, self.main_window()) - urwid.connect_signal(self.refresh_rate_ctrl, 'change', - self.update_refresh_rate) + urwid.connect_signal(self.refresh_rate_ctrl, "change", self.update_refresh_rate) def update_refresh_rate(self, _, new_refresh_rate): # Special case of 'q' in refresh rate - if 'q' in new_refresh_rate: + if "q" in new_refresh_rate: self.on_exit_program() try: @@ -238,15 +248,16 @@ def update_refresh_rate(self, _, new_refresh_rate): else: self.controller.refresh_rate = new_refresh_rate except ValueError: - self.controller.refresh_rate = '2.0' + self.controller.refresh_rate = "2.0" def update_displayed_information(self): - """ Update all the graphs that are being displayed """ + """Update all the graphs that are being displayed""" for source in self.controller.sources: source_name = source.get_source_name() - if (any(self.graphs_menu.active_sensors[source_name]) or - any(self.summary_menu.active_sensors[source_name])): + if any(self.graphs_menu.active_sensors[source_name]) or any( + self.summary_menu.active_sensors[source_name] + ): source.update() for graph in self.visible_graphs.values(): @@ -265,9 +276,12 @@ def update_displayed_information(self): pass # Only update clock if not is stress mode - if self.controller.stress_conroller.get_current_mode() != 'Monitor': - self.clock_view.set_text(seconds_to_text( - (timeit.default_timer() - self.controller.stress_start_time))) + if self.controller.stress_conroller.get_current_mode() != "Monitor": + self.clock_view.set_text( + seconds_to_text( + (timeit.default_timer() - self.controller.stress_start_time) + ) + ) def on_reset_button(self, _): """Reset graph data and display empty graph""" @@ -292,8 +306,7 @@ def on_graphs_menu_close(self, update): are active in the view""" logging.info("closing sensor menu, update=%s", update) if update: - for sensor, visible_sensors in \ - self.graphs_menu.active_sensors.items(): + for sensor, visible_sensors in self.graphs_menu.active_sensors.items(): self.graphs[sensor].set_visible_graphs(visible_sensors) # If not sensor is selected, do not display the graph if sensor in self.visible_graphs and not any(visible_sensors): @@ -312,62 +325,69 @@ def on_summary_menu_close(self, update): are active in the view""" logging.info("closing summary_menu menu, update=%s", update) if update: - for sensor, visible_sensors in \ - self.summary_menu.active_sensors.items(): - self.visible_summaries[sensor].update_visibility( - visible_sensors) + for sensor, visible_sensors in self.summary_menu.active_sensors.items(): + self.visible_summaries[sensor].update_visibility(visible_sensors) - self.main_window_w.base_widget[0].body[self.summary_widget_index] =\ - self._generate_summaries() + self.main_window_w.base_widget[0].body[ + self.summary_widget_index + ] = self._generate_summaries() self.original_widget = self.main_window_w def on_stress_menu_open(self, widget): """Open stress options""" - self.original_widget = urwid.Overlay(self.stress_menu.main_window, - self.original_widget, - ('relative', self.left_margin), - self.stress_menu.get_size()[1], - ('relative', self.top_margin), - self.stress_menu.get_size()[0]) + self.original_widget = urwid.Overlay( + self.stress_menu.main_window, + self.original_widget, + ("fixed left", 1), + self.stress_menu.get_size()[1], + "top", + self.stress_menu.get_size()[0], + ) def on_help_menu_open(self, widget): """Open Help menu""" - self.original_widget = urwid.Overlay(self.help_menu.main_window, - self.original_widget, - ('relative', self.left_margin), - self.help_menu.get_size()[1], - ('relative', self.top_margin), - self.help_menu.get_size()[0]) + self.original_widget = urwid.Overlay( + self.help_menu.main_window, + self.original_widget, + ("fixed left", 1), + self.help_menu.get_size()[1], + "top", + self.help_menu.get_size()[0], + ) def on_about_menu_open(self, widget): """Open About menu""" - self.original_widget = urwid.Overlay(self.about_menu.main_window, - self.original_widget, - ('relative', self.left_margin), - self.about_menu.get_size()[1], - ('relative', self.top_margin), - self.about_menu.get_size()[0]) + self.original_widget = urwid.Overlay( + self.about_menu.main_window, + self.original_widget, + ("fixed left", 1), + self.about_menu.get_size()[1], + "top", + self.about_menu.get_size()[0], + ) def on_graphs_menu_open(self, widget): """Open Sensor menu on top of existing frame""" self.original_widget = urwid.Overlay( self.graphs_menu.main_window, self.original_widget, - ('relative', self.left_margin), + ("fixed left", 1), self.graphs_menu.get_size()[1], - ('relative', self.top_margin), - self.graphs_menu.get_size()[0]) + "top", + self.graphs_menu.get_size()[0], + ) def on_summary_menu_open(self, widget): """Open Sensor menu on top of existing frame""" self.original_widget = urwid.Overlay( self.summary_menu.main_window, self.original_widget, - ('relative', self.left_margin), + ("fixed left", 1), self.summary_menu.get_size()[1], - ('relative', self.top_margin), - self.summary_menu.get_size()[0]) + "top", + self.summary_menu.get_size()[0], + ) def on_mode_button(self, my_button, state): """Notify the controller of a new mode setting.""" @@ -383,9 +403,10 @@ def on_unicode_checkbox(self, w=None, state=False): self.controller.smooth_graph_mode = state if state: self.hline = urwid.AttrWrap( - urwid.SolidFill(u'\N{LOWER ONE QUARTER BLOCK}'), 'line') + urwid.SolidFill("\N{LOWER ONE QUARTER BLOCK}"), "line" + ) else: - self.hline = urwid.AttrWrap(urwid.SolidFill(u' '), 'line') + self.hline = urwid.AttrWrap(urwid.SolidFill(" "), "line") for graph in self.graphs.values(): graph.set_smooth_colors(state) @@ -393,104 +414,101 @@ def on_unicode_checkbox(self, w=None, state=False): self.show_graphs() def on_save_settings(self, w=None): - """ Calls controller save settings method """ + """Calls controller save settings method""" self.controller.save_settings() def on_exit_program(self, w=None): - """ Calls controller exit_program method """ + """Calls controller exit_program method""" self.controller.exit_program() def _generate_graph_controls(self): - """ Display sidebar controls. i.e. buttons, and controls""" + """Display sidebar controls. i.e. buttons, and controls""" # setup mode radio buttons stress_modes = self.controller.stress_conroller.get_modes() group = [] for mode in stress_modes: - self.mode_buttons.append(radio_button(group, mode, - self.on_mode_button)) + self.mode_buttons.append(radio_button(group, mode, self.on_mode_button)) # Set default radio button to "Monitor" mode self.mode_buttons[0].set_state(True, do_callback=False) # Create list of buttons control_options = list() - control_options.append(button('Graphs', - self.on_graphs_menu_open)) - control_options.append(button('Summaries', - self.on_summary_menu_open)) + control_options.append(button("Graphs", self.on_graphs_menu_open)) + control_options.append(button("Summaries", self.on_summary_menu_open)) if self.controller.stress_exe: - control_options.append(button('Stress Options', - self.on_stress_menu_open)) + control_options.append(button("Stress Options", self.on_stress_menu_open)) control_options.append(button("Reset", self.on_reset_button)) - control_options.append(button('Help', self.on_help_menu_open)) - control_options.append(button('About', self.on_about_menu_open)) - control_options.append(button("Save Settings", - self.on_save_settings)) + control_options.append(button("Help", self.on_help_menu_open)) + control_options.append(button("About", self.on_about_menu_open)) + control_options.append(button("Save Settings", self.on_save_settings)) control_options.append(button("Quit", self.on_exit_program)) # Create the menu - animate_controls = urwid.GridFlow(control_options, 18, 2, 0, 'center') + animate_controls = urwid.GridFlow(control_options, 18, 2, 0, "center") # Create smooth graph selection button default_smooth = self.controller.smooth_graph_mode if urwid.get_encoding_mode() == "utf8": unicode_checkbox = urwid.CheckBox( - "UTF-8", state=default_smooth, - on_state_change=self.on_unicode_checkbox) + "UTF-8", state=default_smooth, on_state_change=self.on_unicode_checkbox + ) # Init the state of the graph accoding to the selected mode self.on_unicode_checkbox(state=default_smooth) else: - unicode_checkbox = urwid.Text( - "[N/A] UTF-8") + unicode_checkbox = urwid.Text("[N/A] UTF-8") install_stress_message = urwid.Text("") if not self.controller.firestarter and not self.controller.stress_exe: install_stress_message = urwid.Text( - ('button normal', u"(N/A) install stress")) + ("button normal", "(N/A) install stress") + ) clock_widget = [] # if self.controller.stress_exe or self.controller.firestarter: if self.controller.stress_exe or self.controller.firestarter: clock_widget = [ - urwid.Text(('bold text', u"Stress Timer"), align="center"), - self.clock_view - ] + urwid.Text(("bold text", "Stress Timer"), align="center"), + self.clock_view, + ] - controls = [urwid.Text(('bold text', u"Modes"), align="center")] + controls = [urwid.Text(("bold text", "Modes"), align="center")] controls += self.mode_buttons controls += [install_stress_message] controls += clock_widget controls += [ urwid.Divider(), - urwid.Text(('bold text', u"Control Options"), align="center"), + urwid.Text(("bold text", "Control Options"), align="center"), animate_controls, urwid.Divider(), - urwid.Text(('bold text', u"Visual Options"), align="center"), + urwid.Text(("bold text", "Visual Options"), align="center"), unicode_checkbox, self.refresh_rate_ctrl, urwid.Divider(), - urwid.Text(('bold text', u"Summaries"), align="center"), + urwid.Text(("bold text", "Summaries"), align="center"), ] return controls @staticmethod def _generate_cpu_stats(): - """Read and display processor name """ + """Read and display processor name""" cpu_name = urwid.Text("CPU Name N/A", align="center") try: cpu_name = urwid.Text(get_processor_name().strip(), align="center") except OSError: logging.info("CPU name not available") - return [urwid.Text(('bold text', "CPU Detected"), - align="center"), cpu_name, urwid.Divider()] + return [ + urwid.Text(("bold text", "CPU Detected"), align="center"), + cpu_name, + urwid.Divider(), + ] def _generate_summaries(self): - fixed_stats = [] for summary in self.visible_summaries.values(): fixed_stats += summary.get_text_item_list() - fixed_stats += [urwid.Text('')] + fixed_stats += [urwid.Text("")] # return fixed_stats pile widget return urwid.Pile(fixed_stats) @@ -498,8 +516,8 @@ def _generate_summaries(self): def show_graphs(self): """Show a pile of the graph selected for dislpay""" elements = itertools.chain.from_iterable( - ([graph] - for graph in self.visible_graphs.values())) + ([graph] for graph in self.visible_graphs.values()) + ) self.graph_place_holder.original_widget = urwid.Pile(elements) def main_window(self): @@ -512,15 +530,17 @@ def main_window(self): color_pallet = source.get_pallet() alert_pallet = source.get_alert_pallet() self.graphs[source_name] = BarGraphVector( - source, color_pallet, + source, + color_pallet, len(source.get_sensor_list()), self.graphs_menu.active_sensors[source_name], - alert_colors=alert_pallet + alert_colors=alert_pallet, ) if self.controller.script_hooks_enabled: source.add_edge_hook( self.controller.script_loader.load_script( - source.__class__.__name__, HOOK_INTERVAL) + source.__class__.__name__, HOOK_INTERVAL + ) ) # Invoke threshold script every 30s self.summaries[source_name] = SummaryTextList( @@ -532,8 +552,8 @@ def main_window(self): # and summaries. # All available summaries are always visible self.visible_graphs = OrderedDict( - (key, val) for key, val in self.graphs.items() if - val.get_is_available()) + (key, val) for key, val in self.graphs.items() if val.get_is_available() + ) # Do not show the graph if there is no selected sensors for key in self.graphs.keys(): @@ -541,24 +561,29 @@ def main_window(self): del self.visible_graphs[key] self.visible_summaries = OrderedDict( - (key, val) for key, val in self.summaries.items() if - val.get_is_available()) + (key, val) for key, val in self.summaries.items() if val.get_is_available() + ) cpu_stats = self._generate_cpu_stats() graph_controls = self._generate_graph_controls() summaries = self._generate_summaries() - text_col = ViListBox(urwid.SimpleListWalker(cpu_stats + - graph_controls + - [summaries])) - - vline = urwid.AttrWrap(urwid.SolidFill(u'|'), 'line') - widget = urwid.Columns([('fixed', 20, text_col), - ('fixed', 1, vline), - ('weight', 2, self.graph_place_holder)], - dividechars=0, focus_column=0) - - widget = urwid.Padding(widget, ('fixed left', 1), ('fixed right', 1)) + text_col = ViListBox( + urwid.SimpleListWalker(cpu_stats + graph_controls + [summaries]) + ) + + vline = urwid.AttrWrap(urwid.SolidFill("|"), "line") + widget = urwid.Columns( + [ + ("fixed", 20, text_col), + ("fixed", 1, vline), + ("weight", 2, self.graph_place_holder), + ], + dividechars=0, + focus_column=0, + ) + + widget = urwid.Padding(widget, ("fixed left", 1), ("fixed right", 1)) self.main_window_w = widget base = self.main_window_w.base_widget[0].body @@ -598,8 +623,10 @@ def _load_config(self, t_thresh): user_config_dir = get_user_config_dir() if user_config_dir is None: - logging.warning("Failed to find or create scripts directory,\ - proceeding without scripting support") + logging.warning( + "Failed to find or create scripts directory,\ + proceeding without scripting support" + ) self.script_hooks_enabled = False else: self.script_loader = ScriptHookLoader(user_config_dir) @@ -614,85 +641,100 @@ def _load_config(self, t_thresh): # Load refresh refresh rate from config try: - self.refresh_rate = str(self.conf.getfloat( - 'GraphControll', 'refresh')) + self.refresh_rate = str(self.conf.getfloat("GraphControll", "refresh")) logging.debug("User refresh rate: %s", self.refresh_rate) - except (AttributeError, ValueError, configparser.NoOptionError, - configparser.NoSectionError): + except ( + AttributeError, + ValueError, + configparser.NoOptionError, + configparser.NoSectionError, + ): logging.debug("No refresh rate configed") # Change UTF8 setting from config try: - if self.conf.getboolean('GraphControll', 'UTF8'): + if self.conf.getboolean("GraphControll", "UTF8"): self.smooth_graph_mode = True else: - logging.debug("UTF8 selected as %s", - self.conf.get('GraphControll', 'UTF8')) - except (AttributeError, ValueError, configparser.NoOptionError, - configparser.NoSectionError): + logging.debug( + "UTF8 selected as %s", self.conf.get("GraphControll", "UTF8") + ) + except ( + AttributeError, + ValueError, + configparser.NoOptionError, + configparser.NoSectionError, + ): logging.debug("No user config for utf8") # Try to load high temperature threshold if configured if t_thresh is None: try: - self.temp_thresh = self.conf.get('GraphControll', 'TTHRESH') - logging.debug("Temperature threshold set to %s", - self.temp_thresh) - except (AttributeError, ValueError, configparser.NoOptionError, - configparser.NoSectionError): + self.temp_thresh = self.conf.get("GraphControll", "TTHRESH") + logging.debug("Temperature threshold set to %s", self.temp_thresh) + except ( + AttributeError, + ValueError, + configparser.NoOptionError, + configparser.NoSectionError, + ): logging.debug("No user config for temp threshold") else: self.temp_thresh = t_thresh # This should be the only place where sources are configured - possible_sources = [TempSource(self.temp_thresh), - FreqSource(), - UtilSource(), - RaplPowerSource(), - FanSource()] + possible_sources = [ + TempSource(self.temp_thresh), + FreqSource(), + UtilSource(), + RaplPowerSource(), + FanSource(), + ] # Load sensors config if available - sources = [x.get_source_name() for x in possible_sources - if x.get_is_available()] + sources = [ + x.get_source_name() for x in possible_sources if x.get_is_available() + ] for source in sources: try: options = list(self.conf.items(source + ",Graphs")) for option in options: # Returns tuples of values in order - self.graphs_default_conf[source].append( - str_to_bool(option[1])) + self.graphs_default_conf[source].append(str_to_bool(option[1])) options = list(self.conf.items(source + ",Summaries")) for option in options: # Returns tuples of values in order - self.summary_default_conf[source].append( - str_to_bool(option[1])) - except (AttributeError, ValueError, configparser.NoOptionError, - configparser.NoSectionError): + self.summary_default_conf[source].append(str_to_bool(option[1])) + except ( + AttributeError, + ValueError, + configparser.NoOptionError, + configparser.NoSectionError, + ): logging.debug("Error reading sensors config") return possible_sources def _config_stress(self): - """ Configures the possible stress processes and modes """ + """Configures the possible stress processes and modes""" # Configure stress_process self.stress_exe = None stress_installed = False - self.stress_exe = which('stress') + self.stress_exe = which("stress") if self.stress_exe: stress_installed = True else: - self.stress_exe = which('stress-ng') + self.stress_exe = which("stress-ng") if self.stress_exe: stress_installed = True self.firestarter = None firestarter_installed = False - if os.path.isfile('./FIRESTARTER/FIRESTARTER'): - self.firestarter = os.path.join(os.getcwd(), - 'FIRESTARTER', 'FIRESTARTER') + if os.path.isfile("./FIRESTARTER/FIRESTARTER"): + self.firestarter = os.path.join(os.getcwd(), "FIRESTARTER", "FIRESTARTER") firestarter_installed = True else: - firestarter_exe = which('FIRESTARTER') + firestarter_exe = which("FIRESTARTER") if firestarter_exe is not None: self.firestarter = firestarter_exe firestarter_installed = True @@ -747,28 +789,30 @@ def set_mode(self, mode): self.update_stress_mode() def main(self): - """ Starts the main loop and graph animation """ - loop = MainLoop(self.view, DEFAULT_PALETTE, - handle_mouse=self.handle_mouse, - # screen=urwid.curses_display.Screen() - ) + """Starts the main loop and graph animation""" + loop = MainLoop( + self.view, + DEFAULT_PALETTE, + handle_mouse=self.handle_mouse, + # screen=urwid.curses_display.Screen() + ) self.view.show_graphs() self.animate_graph(loop) try: loop.run() - except (ZeroDivisionError) as err: + except ZeroDivisionError as err: # In case of Zero division, we want an error to return, and # get a clue where this happens logging.debug("Some stat caused divide by zero exception. Exiting") logging.error(err, exc_info=True) print(ERROR_MESSAGE) - except (AttributeError) as err: + except AttributeError as err: # In this case we restart the loop, to address bug #50, where # urwid crashes on multiple presses on 'esc' logging.debug("Catch attribute Error in urwid and restart") logging.debug(err, exc_info=True) self.main() - except (psutil.NoSuchProcess) as err: + except psutil.NoSuchProcess as err: # This might happen if the stress process is not found, in this # case, we want to know why logging.error("No such process error") @@ -776,7 +820,7 @@ def main(self): print(ERROR_MESSAGE) def update_stress_mode(self): - """ Updates stress mode according to radio buttons state """ + """Updates stress mode according to radio buttons state""" self.stress_conroller.kill_stress_process() @@ -784,21 +828,22 @@ def update_stress_mode(self): self.view.clock_view.set_text(ZERO_TIME) self.stress_start_time = timeit.default_timer() - if self.stress_conroller.get_current_mode() == 'Stress': + if self.stress_conroller.get_current_mode() == "Stress": stress_cmd = self.view.stress_menu.get_stress_cmd() self.stress_conroller.start_stress(stress_cmd) - elif self.stress_conroller.get_current_mode() == 'FIRESTARTER': + elif self.stress_conroller.get_current_mode() == "FIRESTARTER": stress_cmd = [self.firestarter] self.stress_conroller.start_stress(stress_cmd) def save_settings(self): - """ Save the current configuration to a user config file """ + """Save the current configuration to a user config file""" + def _save_displayed_setting(conf, submenu): items = [] - if (submenu == "Graphs"): + if submenu == "Graphs": items = self.view.graphs_menu.active_sensors.items() - elif (submenu == "Summaries"): + elif submenu == "Summaries": items = self.view.summary_menu.active_sensors.items() for source, visible_sensors in items: @@ -809,13 +854,11 @@ def _save_displayed_setting(conf, submenu): logging.debug("Saving settings for %s", source) logging.debug("Visible sensors %s", visible_sensors) # TODO: consider changing sensors_list to dict - curr_sensor = [x for x in sources if - x.get_source_name() == source][0] + curr_sensor = [x for x in sources if x.get_source_name() == source][0] sensor_list = curr_sensor.get_sensor_list() for sensor_id, sensor in enumerate(sensor_list): try: - conf.set(section, sensor, str( - visible_sensors[sensor_id])) + conf.set(section, sensor, str(visible_sensors[sensor_id])) except IndexError: conf.set(section, sensor, str(True)) @@ -824,25 +867,22 @@ def _save_displayed_setting(conf, submenu): conf = configparser.ConfigParser() config_file = get_user_config_file() - with open(config_file, 'w') as cfgfile: - conf.add_section('GraphControll') + with open(config_file, "w") as cfgfile: + conf.add_section("GraphControll") # Save the configured refresh rete - conf.set('GraphControll', 'refresh', str( - self.refresh_rate)) + conf.set("GraphControll", "refresh", str(self.refresh_rate)) # Save the configured UTF8 setting - conf.set('GraphControll', 'UTF8', str( - self.smooth_graph_mode)) + conf.set("GraphControll", "UTF8", str(self.smooth_graph_mode)) # Save the configured t_thresh if self.temp_thresh: - conf.set('GraphControll', 'TTHRESH', str( - self.temp_thresh)) + conf.set("GraphControll", "TTHRESH", str(self.temp_thresh)) _save_displayed_setting(conf, "Graphs") _save_displayed_setting(conf, "Summaries") conf.write(cfgfile) def exit_program(self): - """ Kill all stress operations upon exit""" + """Kill all stress operations upon exit""" self.stress_conroller.kill_stress_process() raise urwid.ExitMainLoop() @@ -859,7 +899,8 @@ def animate_graph(self, loop, user_data=None): # Set next update self.animate_alarm = loop.set_alarm_in( - float(self.refresh_rate), self.animate_graph) + float(self.refresh_rate), self.animate_graph + ) if self.args.debug_run: # refresh rate is a string in float format @@ -884,7 +925,8 @@ def main(): if args.debug_file is not None: log_file = args.debug_file log_formatter = logging.Formatter( - "%(asctime)s [%(funcName)s()] [%(levelname)-5.5s] %(message)s") + "%(asctime)s [%(funcName)s()] [%(levelname)-5.5s] %(message)s" + ) root_logger = logging.getLogger() file_handler = logging.FileHandler(log_file) file_handler.setFormatter(log_formatter) @@ -893,16 +935,20 @@ def main(): else: level = logging.ERROR log_formatter = logging.Formatter( - "%(asctime)s [%(funcName)s()] [%(levelname)-5.5s] %(message)s") + "%(asctime)s [%(funcName)s()] [%(levelname)-5.5s] %(message)s" + ) root_logger = logging.getLogger() root_logger.setLevel(level) if args.terminal or args.json: logging.info("Printing single line to terminal") - sources = [FreqSource(), TempSource(), - UtilSource(), - RaplPowerSource(), - FanSource()] + sources = [ + FreqSource(), + TempSource(), + UtilSource(), + RaplPowerSource(), + FanSource(), + ] if args.terminal: output_to_terminal(sources) elif args.json: @@ -914,48 +960,78 @@ def main(): def get_args(): - parser = argparse.ArgumentParser( - description=HELP_MESSAGE, - formatter_class=argparse.RawTextHelpFormatter) - - parser.add_argument('-d', '--debug', - default=False, action='store_true', - help="Output debug log to _s-tui.log") - parser.add_argument('--debug-file', - default=None, - help="Use a custom debug file. Default: " + - "_s-tui.log") + description=HELP_MESSAGE, formatter_class=argparse.RawTextHelpFormatter + ) + + parser.add_argument( + "-d", + "--debug", + default=False, + action="store_true", + help="Output debug log to _s-tui.log", + ) + parser.add_argument( + "--debug-file", + default=None, + help="Use a custom debug file. Default: " + "_s-tui.log", + ) # This is mainly to be used for testing purposes - parser.add_argument('-dr', '--debug_run', - default=False, action='store_true', - help="Run for 5 seconds and quit") - parser.add_argument('-c', '--csv', action='store_true', - default=False, help="Save stats to csv file") - parser.add_argument('--csv-file', - default=None, - help="Use a custom CSV file. Default: " + - "s-tui_log_<TIME>.csv") - parser.add_argument('-t', '--terminal', action='store_true', - default=False, - help="Display a single line of stats without tui") - parser.add_argument('-j', '--json', action='store_true', - default=False, - help="Display a single line of stats in JSON format") - parser.add_argument('-nm', '--no-mouse', action='store_true', - default=False, help="Disable Mouse for TTY systems") - parser.add_argument('-v', '--version', - default=False, action='store_true', - help="Display version") - parser.add_argument('-tt', '--t_thresh', - default=None, - help="High Temperature threshold. Default: 80") - parser.add_argument('-r', '--refresh-rate', dest="refresh_rate", - default="2.0", - help="Refresh rate in seconds. Default: 2.0") + parser.add_argument( + "-dr", + "--debug_run", + default=False, + action="store_true", + help="Run for 5 seconds and quit", + ) + parser.add_argument( + "-c", "--csv", action="store_true", default=False, help="Save stats to csv file" + ) + parser.add_argument( + "--csv-file", + default=None, + help="Use a custom CSV file. Default: " + "s-tui_log_<TIME>.csv", + ) + parser.add_argument( + "-t", + "--terminal", + action="store_true", + default=False, + help="Display a single line of stats without tui", + ) + parser.add_argument( + "-j", + "--json", + action="store_true", + default=False, + help="Display a single line of stats in JSON format", + ) + parser.add_argument( + "-nm", + "--no-mouse", + action="store_true", + default=False, + help="Disable Mouse for TTY systems", + ) + parser.add_argument( + "-v", "--version", default=False, action="store_true", help="Display version" + ) + parser.add_argument( + "-tt", + "--t_thresh", + default=None, + help="High Temperature threshold. Default: 80", + ) + parser.add_argument( + "-r", + "--refresh-rate", + dest="refresh_rate", + default="2.0", + help="Refresh rate in seconds. Default: 2.0", + ) args = parser.parse_args() return args -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/s_tui/sensors_menu.py b/s_tui/sensors_menu.py index 9e574a5..1958bb0 100644 --- a/s_tui/sensors_menu.py +++ b/s_tui/sensors_menu.py @@ -35,14 +35,13 @@ def on_mode_button(self, button, state): pass def __init__(self, return_fn, source_list, default_source_conf): - self.return_fn = return_fn # create the cancel and apply buttons, and put the in an urwid column - cancel_button = urwid.Button('Cancel', on_press=self.on_cancel) - cancel_button._label.align = 'center' - apply_button = urwid.Button('Apply', on_press=self.on_apply) - apply_button._label.align = 'center' + cancel_button = urwid.Button("Cancel", on_press=self.on_cancel) + cancel_button._label.align = "center" + apply_button = urwid.Button("Apply", on_press=self.on_apply) + apply_button._label.align = "center" if_buttons = urwid.Columns([apply_button, cancel_button]) @@ -56,30 +55,32 @@ def __init__(self, return_fn, source_list, default_source_conf): # get the saves sensor visibility list if default_source_conf[source_name]: # print(str(default_source_conf[source_name])) - self.sensor_status_dict[source_name] =\ - copy.deepcopy(default_source_conf[source_name]) + self.sensor_status_dict[source_name] = copy.deepcopy( + default_source_conf[source_name] + ) else: - self.sensor_status_dict[source_name] =\ - [True] * len(source.get_sensor_list()) + self.sensor_status_dict[source_name] = [True] * len( + source.get_sensor_list() + ) self.sensor_button_dict[source_name] = [] self.active_sensors[source_name] = [] # add the title at the head of the checkbox column sensor_title_str = source_name - sensor_title = urwid.Text( - ('bold text', sensor_title_str), 'center') + sensor_title = urwid.Text(("bold text", sensor_title_str), "center") # create the checkbox buttons with the saved visibility - for sensor, s_tatus in \ - zip(source.get_sensor_list(), - self.sensor_status_dict[source_name]): + for sensor, s_tatus in zip( + source.get_sensor_list(), self.sensor_status_dict[source_name] + ): cb = urwid.CheckBox(sensor, s_tatus) self.sensor_button_dict[source_name].append(cb) self.active_sensors[source_name].append(s_tatus) - sensor_title_and_buttons = \ - [sensor_title] + self.sensor_button_dict[source_name] + sensor_title_and_buttons = [sensor_title] + self.sensor_button_dict[ + source_name + ] listw = urwid.SimpleFocusListWalker(sensor_title_and_buttons) sensor_column_list.append(urwid.Pile(listw)) @@ -102,7 +103,7 @@ def get_size(self): def set_checkbox_value(self): for sensor, sensor_cb in self.sensor_button_dict.items(): sensor_cb_next_state = self.active_sensors[sensor] - for (checkbox, state) in zip(sensor_cb, sensor_cb_next_state): + for checkbox, state in zip(sensor_cb, sensor_cb_next_state): checkbox.set_state(state) def on_cancel(self, w): @@ -116,8 +117,7 @@ def on_apply(self, w): for sensor_cb in sensor_buttons: cb_sensor_visibility.append(sensor_cb.get_state()) - changed_state = (cb_sensor_visibility != - self.active_sensors[s_name]) + changed_state = cb_sensor_visibility != self.active_sensors[s_name] update_sensor_visibility |= changed_state self.active_sensors[s_name] = cb_sensor_visibility diff --git a/s_tui/sources/fan_source.py b/s_tui/sources/fan_source.py index bfe3cdb..a5e756a 100644 --- a/s_tui/sources/fan_source.py +++ b/s_tui/sources/fan_source.py @@ -26,7 +26,8 @@ class FanSource(Source): - """ Source for fan information """ + """Source for fan information""" + def __init__(self): try: if psutil.sensors_fans(): @@ -38,10 +39,9 @@ def __init__(self): Source.__init__(self) - self.name = 'Fan' - self.measurement_unit = 'RPM' - self.pallet = ('fan light', 'fan dark', - 'fan light smooth', 'fan dark smooth') + self.name = "Fan" + self.measurement_unit = "RPM" + self.pallet = ("fan light", "fan dark", "fan light smooth", "fan dark smooth") sensors_dict = dict() try: @@ -77,7 +77,7 @@ def update(self): for sensor in sample.values(): for minor_sensor in sensor: # Ignore unreasonalbe fan speeds - if (minor_sensor.current > 10000): + if minor_sensor.current > 10000: continue self.last_measurement.append(int(minor_sensor.current)) diff --git a/s_tui/sources/freq_source.py b/s_tui/sources/freq_source.py index 5235f31..cd86bd2 100644 --- a/s_tui/sources/freq_source.py +++ b/s_tui/sources/freq_source.py @@ -25,21 +25,25 @@ class FreqSource(Source): - """ Source class implementing CPU frequency information polling """ + """Source class implementing CPU frequency information polling""" + def __init__(self): self.is_available = True - if (not hasattr(psutil, "cpu_freq") and - psutil.cpu_freq()): + if not hasattr(psutil, "cpu_freq") and psutil.cpu_freq(): self.is_available = False logging.debug("cpu_freq is not available from psutil") return Source.__init__(self) - self.name = 'Frequency' - self.measurement_unit = 'MHz' - self.pallet = ('freq light', 'freq dark', - 'freq light smooth', 'freq dark smooth') + self.name = "Frequency" + self.measurement_unit = "MHz" + self.pallet = ( + "freq light", + "freq dark", + "freq light smooth", + "freq dark smooth", + ) # Check if psutil.cpu_freq is available. # +1 for average frequency @@ -55,7 +59,7 @@ def __init__(self): if max(self.last_measurement) >= 0: self.max_freq = max(self.last_measurement) - self.available_sensors = ['Avg'] + self.available_sensors = ["Avg"] for core_id, _ in enumerate(psutil.cpu_freq(True)): self.available_sensors.append("Core " + str(core_id)) diff --git a/s_tui/sources/hook.py b/s_tui/sources/hook.py index 4d8175a..3c99f1b 100644 --- a/s_tui/sources/hook.py +++ b/s_tui/sources/hook.py @@ -47,8 +47,8 @@ def invoke(self): # Don't sleep a hook if it has never run if self.timeout_milliseconds > 0: - self.ready_time = ( - datetime.now() + - timedelta(milliseconds=self.timeout_milliseconds)) + self.ready_time = datetime.now() + timedelta( + milliseconds=self.timeout_milliseconds + ) self.callback(self.callback_args) diff --git a/s_tui/sources/hook_script.py b/s_tui/sources/hook_script.py index 6d88bdf..2504cc5 100644 --- a/s_tui/sources/hook_script.py +++ b/s_tui/sources/hook_script.py @@ -40,7 +40,7 @@ def _run_script(self, *args): # Run script in a shell subprocess asynchronously so # as to not block main thread (graphs) # if the script is a long-running task - with open(os.devnull, 'w') as dev_null: + with open(os.devnull, "w") as dev_null: subprocess.Popen( ["/bin/sh", args[0][0]], # TODO -- Could redirect this to a separate log diff --git a/s_tui/sources/rapl_power_source.py b/s_tui/sources/rapl_power_source.py index 47068c6..b08a0b2 100644 --- a/s_tui/sources/rapl_power_source.py +++ b/s_tui/sources/rapl_power_source.py @@ -31,16 +31,19 @@ class RaplPowerSource(Source): - MICRO_JOULE_IN_JOULE = 1000000.0 def __init__(self): Source.__init__(self) - self.name = 'Power' - self.measurement_unit = 'W' - self.pallet = ('power light', 'power dark', - 'power light smooth', 'power dark smooth') + self.name = "Power" + self.measurement_unit = "W" + self.pallet = ( + "power light", + "power dark", + "power light smooth", + "power dark smooth", + ) self.reader = get_power_reader() if not self.reader: @@ -68,18 +71,19 @@ def update(self): current_measurement_time = time.time() for m_idx, _ in enumerate(self.last_probe): - joule_used = ((current_measurement_value[m_idx].current - - self.last_probe[m_idx].current) / - float(self.MICRO_JOULE_IN_JOULE)) + joule_used = ( + current_measurement_value[m_idx].current + - self.last_probe[m_idx].current + ) / float(self.MICRO_JOULE_IN_JOULE) self.last_probe[m_idx] = joule_used - seconds_passed = (current_measurement_time - - self.last_probe_time) + seconds_passed = current_measurement_time - self.last_probe_time logging.debug("seconds passed %s", seconds_passed) watts_used = float(joule_used) / float(seconds_passed) logging.debug("watts used %s", watts_used) - logging.info("Joule_Used %d, seconds passed, %d", joule_used, - seconds_passed) + logging.info( + "Joule_Used %d, seconds passed, %d", joule_used, seconds_passed + ) if watts_used > 0: # The information on joules used elapses every once in a while, diff --git a/s_tui/sources/rapl_read.py b/s_tui/sources/rapl_read.py index 9130ec4..30f039c 100644 --- a/s_tui/sources/rapl_read.py +++ b/s_tui/sources/rapl_read.py @@ -29,8 +29,8 @@ from s_tui.helper_functions import cat -INTER_RAPL_DIR = '/sys/class/powercap/intel-rapl/' -AMD_ENERGY_DIR_GLOB = '/sys/devices/platform/amd_energy.0/hwmon/hwmon*/' +INTER_RAPL_DIR = "/sys/class/powercap/intel-rapl/" +AMD_ENERGY_DIR_GLOB = "/sys/devices/platform/amd_energy.0/hwmon/hwmon*/" MICRO_JOULE_IN_JOULE = 1000000.0 @@ -40,35 +40,35 @@ ENERGY_UNIT_MASK = 0x1F00 -RaplStats = namedtuple('rapl', ['label', 'current', 'max']) +RaplStats = namedtuple("rapl", ["label", "current", "max"]) class RaplReader: def __init__(self): - basenames = glob.glob('/sys/class/powercap/intel-rapl:*/') + basenames = glob.glob("/sys/class/powercap/intel-rapl:*/") self.basenames = sorted(set({x for x in basenames})) def read_power(self): - """ Read power stats and return dictionary""" + """Read power stats and return dictionary""" pjoin = os.path.join ret = list() for path in self.basenames: name = None try: - name = cat(pjoin(path, 'name'), fallback=None, binary=False) + name = cat(pjoin(path, "name"), fallback=None, binary=False) except (IOError, OSError, ValueError) as err: - logging.warning("ignoring %r for file %r", - (err, path), RuntimeWarning) + logging.warning("ignoring %r for file %r", (err, path), RuntimeWarning) continue if name: try: - current = cat(pjoin(path, 'energy_uj')) + current = cat(pjoin(path, "energy_uj")) max_reading = 0.0 ret.append(RaplStats(name, float(current), max_reading)) except (IOError, OSError, ValueError) as err: - logging.warning("ignoring %r for file %r", - (err, path), RuntimeWarning) + logging.warning( + "ignoring %r for file %r", (err, path), RuntimeWarning + ) return ret @staticmethod @@ -78,25 +78,30 @@ def available(): class AMDEnergyReader: def __init__(self): - self.inputs = list(zip((cat(filename, binary=False) for filename in - sorted(glob.glob(AMD_ENERGY_DIR_GLOB + - 'energy*_label'))), - sorted(glob.glob(AMD_ENERGY_DIR_GLOB + - 'energy*_input')))) + self.inputs = list( + zip( + ( + cat(filename, binary=False) + for filename in sorted( + glob.glob(AMD_ENERGY_DIR_GLOB + "energy*_label") + ) + ), + sorted(glob.glob(AMD_ENERGY_DIR_GLOB + "energy*_input")), + ) + ) # How many socket does the system have? - socket_number = sum(1 for label, _ in self.inputs if 'socket' in label) - self.inputs.sort( - key=lambda x: self.get_input_position(x[0], socket_number)) + socket_number = sum(1 for label, _ in self.inputs if "socket" in label) + self.inputs.sort(key=lambda x: self.get_input_position(x[0], socket_number)) @staticmethod def match_label(label): - return re.search(r'E(core|socket)([0-9]+)', label) + return re.search(r"E(core|socket)([0-9]+)", label) @staticmethod def get_input_position(label, socket_number): num = int(AMDEnergyReader.match_label(label).group(2)) - if 'socket' in label: + if "socket" in label: return num else: return num + socket_number @@ -118,18 +123,25 @@ def __init__(self): self.core_msr_files = {} self.package_msr_files = {} for i in range(cpu_count()): - curr_core_id = int(cat("/sys/devices/system/cpu/cpu" + str(i) + - "/topology/core_id", binary=False)) + curr_core_id = int( + cat( + "/sys/devices/system/cpu/cpu" + str(i) + "/topology/core_id", + binary=False, + ) + ) if curr_core_id not in self.core_msr_files: - self.core_msr_files[curr_core_id] = "/dev/cpu/" + \ - str(i) + "/msr" - - curr_package_id = int(cat("/sys/devices/system/cpu/cpu" + str(i) + - "/topology/physical_package_id", - binary=False)) + self.core_msr_files[curr_core_id] = "/dev/cpu/" + str(i) + "/msr" + + curr_package_id = int( + cat( + "/sys/devices/system/cpu/cpu" + + str(i) + + "/topology/physical_package_id", + binary=False, + ) + ) if curr_package_id not in self.package_msr_files: - self.package_msr_files[curr_package_id] = "/dev/cpu/" + \ - str(i) + "/msr" + self.package_msr_files[curr_package_id] = "/dev/cpu/" + str(i) + "/msr" @staticmethod def read_msr(filename, register): @@ -145,15 +157,25 @@ def read_power(self): unit_msr = self.read_msr(filename, UNIT_MSR) energy_factor = 0.5 ** ((unit_msr & ENERGY_UNIT_MASK) >> 8) package_msr = self.read_msr(filename, PACKAGE_MSR) - ret.append(RaplStats("Package " + str(i + 1), package_msr * - energy_factor * MICRO_JOULE_IN_JOULE, 0.0)) + ret.append( + RaplStats( + "Package " + str(i + 1), + package_msr * energy_factor * MICRO_JOULE_IN_JOULE, + 0.0, + ) + ) for i, filename in self.core_msr_files.items(): unit_msr = self.read_msr(filename, UNIT_MSR) energy_factor = 0.5 ** ((unit_msr & ENERGY_UNIT_MASK) >> 8) core_msr = self.read_msr(filename, CORE_MSR) - ret.append(RaplStats("Core " + str(i + 1), core_msr * energy_factor - * MICRO_JOULE_IN_JOULE, 0.0)) + ret.append( + RaplStats( + "Core " + str(i + 1), + core_msr * energy_factor * MICRO_JOULE_IN_JOULE, + 0.0, + ) + ) return ret diff --git a/s_tui/sources/script_hook_loader.py b/s_tui/sources/script_hook_loader.py index 5a9309e..4e8088e 100644 --- a/s_tui/sources/script_hook_loader.py +++ b/s_tui/sources/script_hook_loader.py @@ -26,7 +26,7 @@ class ScriptHookLoader: """ def __init__(self, dir_path): - self.scripts_dir_path = os.path.join(dir_path, 'hooks.d') + self.scripts_dir_path = os.path.join(dir_path, "hooks.d") def load_script(self, source_name, timeoutMilliseconds=0): """ @@ -34,12 +34,13 @@ def load_script(self, source_name, timeoutMilliseconds=0): of timeoutMilliseconds """ - script_path = os.path.join(self.scripts_dir_path, - self._source_to_script_name(source_name)) + script_path = os.path.join( + self.scripts_dir_path, self._source_to_script_name(source_name) + ) if os.path.isfile(script_path): return ScriptHook(script_path, timeoutMilliseconds) return None def _source_to_script_name(self, source_name): - return source_name.lower() + '.sh' + return source_name.lower() + ".sh" diff --git a/s_tui/sources/source.py b/s_tui/sources/source.py index 56a20ec..471b678 100644 --- a/s_tui/sources/source.py +++ b/s_tui/sources/source.py @@ -22,40 +22,45 @@ class Source: - """ This is a basic source class for s-tui""" + """This is a basic source class for s-tui""" + def __init__(self): self.edge_hooks = [] - self.measurement_unit = '' + self.measurement_unit = "" self.last_measurement = [] self.is_available = True self.available_sensors = [] - self.name = '' - self.pallet = ('temp light', 'temp dark', - 'temp light smooth', 'temp dark smooth') + self.name = "" + self.pallet = ( + "temp light", + "temp dark", + "temp light smooth", + "temp dark smooth", + ) self.alert_pallet = None def update(self): - """ Updates the last measurement, invokes hooks if present """ + """Updates the last measurement, invokes hooks if present""" self.eval_hooks() def get_maximum(self): - """ Returns the maximum measurement as measured so far """ + """Returns the maximum measurement as measured so far""" raise NotImplementedError("Get maximum is not implemented") def get_top(self): - """ Returns higest theoretical value the sensors can reach """ + """Returns higest theoretical value the sensors can reach""" raise NotImplementedError("get_top is not implemented") def get_is_available(self): - """ Returns is_available """ + """Returns is_available""" return self.is_available def reset(self): - """ Resets source state, e.g. current max """ + """Resets source state, e.g. current max""" raise NotImplementedError("Reset is not implemented") def get_sensors_summary(self): - """ This returns a dict of sensor of the source and their values """ + """This returns a dict of sensor of the source and their values""" sub_title_list = self.get_sensor_list() graph_vector_summary = OrderedDict() @@ -66,39 +71,38 @@ def get_sensors_summary(self): return graph_vector_summary def get_summary(self): - """ Returns a dict of source name and sensors with their values """ + """Returns a dict of source name and sensors with their values""" graph_vector_summary = OrderedDict() - graph_vector_summary[self.get_source_name()] = ( - '[' + self.measurement_unit + ']') + graph_vector_summary[self.get_source_name()] = "[" + self.measurement_unit + "]" graph_vector_summary.update(self.get_sensors_summary()) return graph_vector_summary def get_source_name(self): - """ Returns source name """ + """Returns source name""" return self.name def get_edge_triggered(self): - """ Returns true if a measurement was higher than some thershhold""" + """Returns true if a measurement was higher than some thershhold""" raise NotImplementedError("Get Edge triggered not implemented") def get_measurement_unit(self): - """ Returns measurement unit of source """ + """Returns measurement unit of source""" return self.measurement_unit def get_pallet(self): - """ Returns the pallet of the source for graph plotting """ + """Returns the pallet of the source for graph plotting""" return self.pallet def get_alert_pallet(self): - """ Returns the 'alert' pallet for graph plotting """ + """Returns the 'alert' pallet for graph plotting""" return self.alert_pallet def get_sensor_list(self): - """ Returns list of a available sensors for source """ + """Returns list of a available sensors for source""" return self.available_sensors def get_reading_list(self): - """ Returns a list of the last measurement """ + """Returns a list of the last measurement""" return self.last_measurement def add_edge_hook(self, hook): @@ -124,12 +128,13 @@ def eval_hooks(self): class MockSource(Source): - """ Mock class for testing """ + """Mock class for testing""" + def get_maximum(self): return 20 def get_summary(self): - return {'MockValue': 5, 'Tahat': 34} + return {"MockValue": 5, "Tahat": 34} def get_edge_triggered(self): raise NotImplementedError("Get Edge triggered not implemented") diff --git a/s_tui/sources/temp_source.py b/s_tui/sources/temp_source.py index 8477788..e5e135e 100644 --- a/s_tui/sources/temp_source.py +++ b/s_tui/sources/temp_source.py @@ -28,11 +28,15 @@ class TempSource(Source): - """ This class inherits a source and implements a temprature source """ + """This class inherits a source and implements a temprature source""" + THRESHOLD_TEMP = 80 def __init__(self, temp_thresh=None): - warnings.filterwarnings('ignore', '.*FileNotFound.*',) + warnings.filterwarnings( + "ignore", + ".*FileNotFound.*", + ) try: if psutil.sensors_temperatures(): self.is_available = True @@ -43,19 +47,26 @@ def __init__(self, temp_thresh=None): Source.__init__(self) - self.name = 'Temp' - self.measurement_unit = 'C' + self.name = "Temp" + self.measurement_unit = "C" self.max_last_temp = 0 - self.pallet = ('temp light', 'temp dark', - 'temp light smooth', 'temp dark smooth') - self.alert_pallet = ('high temp light', 'high temp dark', - 'high temp light smooth', 'high temp dark smooth') + self.pallet = ( + "temp light", + "temp dark", + "temp light smooth", + "temp dark smooth", + ) + self.alert_pallet = ( + "high temp light", + "high temp dark", + "high temp light smooth", + "high temp dark smooth", + ) self.max_temp = 10 sensors_dict = None try: - sensors_dict = OrderedDict(sorted( - psutil.sensors_temperatures().items())) + sensors_dict = OrderedDict(sorted(psutil.sensors_temperatures().items())) except IOError: logging.debug("Unable to create sensors dict") self.is_available = False @@ -65,14 +76,14 @@ def __init__(self, temp_thresh=None): sensor_name = "".join(key.title().split(" ")) for sensor_idx, sensor in enumerate(value): sensor_label = sensor.label - if (sensor.current <= 1.0 or sensor.current >= 127.0): + if sensor.current <= 1.0 or sensor.current >= 127.0: continue full_name = "" if not sensor_label: full_name = sensor_name + "," + str(sensor_idx) else: - full_name = ("".join(sensor_label.title().split(" "))) + full_name = "".join(sensor_label.title().split(" ")) sensor_count = multi_sensors.count(full_name) multi_sensors.append(full_name) full_name += "," + str(sensor_count) @@ -87,16 +98,14 @@ def __init__(self, temp_thresh=None): if temp_thresh is not None: if int(temp_thresh) > 0: self.temp_thresh = int(temp_thresh) - logging.debug("Updated custom threshold to %s", - self.temp_thresh) + logging.debug("Updated custom threshold to %s", self.temp_thresh) def update(self): sample = OrderedDict(sorted(psutil.sensors_temperatures().items())) self.last_measurement = [] for sensor in sample: for minor_sensor in sample[sensor]: - if (minor_sensor.current <= 1.0 or - minor_sensor.current >= 127.0): + if minor_sensor.current <= 1.0 or minor_sensor.current >= 127.0: continue self.last_measurement.append(minor_sensor.current) @@ -109,7 +118,7 @@ def get_edge_triggered(self): return self.max_last_temp > self.temp_thresh def get_max_triggered(self): - """ Returns whether the current temperature threshold is exceeded""" + """Returns whether the current temperature threshold is exceeded""" return self.max_temp > self.temp_thresh def reset(self): diff --git a/s_tui/sources/util_source.py b/s_tui/sources/util_source.py index e490111..455ec3f 100644 --- a/s_tui/sources/util_source.py +++ b/s_tui/sources/util_source.py @@ -25,29 +25,31 @@ class UtilSource(Source): - def __init__(self): - if (not hasattr(psutil, "cpu_percent") and psutil.cpu_percent()): + if not hasattr(psutil, "cpu_percent") and psutil.cpu_percent(): self.is_available = False logging.debug("cpu utilization is not available from psutil") return Source.__init__(self) - self.name = 'Util' - self.measurement_unit = '%' - self.pallet = ('util light', 'util dark', - 'util light smooth', 'util dark smooth') + self.name = "Util" + self.measurement_unit = "%" + self.pallet = ( + "util light", + "util dark", + "util light smooth", + "util dark smooth", + ) self.last_measurement = [0] * (psutil.cpu_count() + 1) - self.available_sensors = ['Avg'] + self.available_sensors = ["Avg"] for core_id in range(psutil.cpu_count()): self.available_sensors.append("Core " + str(core_id)) def update(self): - self.last_measurement = [psutil.cpu_percent(interval=0.0, - percpu=False)] + self.last_measurement = [psutil.cpu_percent(interval=0.0, percpu=False)] for util in psutil.cpu_percent(interval=0.0, percpu=True): logging.info("Core id util %s", util) self.last_measurement.append(float(util)) diff --git a/s_tui/stress_menu.py b/s_tui/stress_menu.py index 7e95472..0ad28a4 100644 --- a/s_tui/stress_menu.py +++ b/s_tui/stress_menu.py @@ -31,84 +31,85 @@ class StressMenu: MAX_TITLE_LEN = 50 def __init__(self, return_fn, stress_exe): - self.return_fn = return_fn self.stress_exe = stress_exe - self.time_out = 'none' - self.sqrt_workers = '1' + self.time_out = "none" + self.sqrt_workers = "1" try: self.sqrt_workers = str(psutil.cpu_count()) logging.info("num cpus %s", self.sqrt_workers) except (IOError, OSError) as err: logging.debug(err) - self.sync_workers = '0' - self.memory_workers = '0' - self.malloc_byte = '256M' - self.byte_touch_cnt = '4096' - self.malloc_delay = 'none' + self.sync_workers = "0" + self.memory_workers = "0" + self.malloc_byte = "256M" + self.byte_touch_cnt = "4096" + self.malloc_delay = "none" self.no_malloc = False - self.write_workers = '0' - self.write_bytes = '1G' - - self.time_out_ctrl = urwid.Edit('Time out [sec]: ', self.time_out) - self.sqrt_workers_ctrl = urwid.Edit( - 'Sqrt() worker count: ', self.sqrt_workers) - self.sync_workers_ctrl = urwid.Edit( - 'Sync() worker count: ', self.sync_workers) + self.write_workers = "0" + self.write_bytes = "1G" + + self.time_out_ctrl = urwid.Edit("Time out [sec]: ", self.time_out) + self.sqrt_workers_ctrl = urwid.Edit("Sqrt() worker count: ", self.sqrt_workers) + self.sync_workers_ctrl = urwid.Edit("Sync() worker count: ", self.sync_workers) self.memory_workers_ctrl = urwid.Edit( - 'Malloc() / Free() worker count: ', self.memory_workers) - self.malloc_byte_ctrl = urwid.Edit( - ' Bytes per malloc*: ', self.malloc_byte) + "Malloc() / Free() worker count: ", self.memory_workers + ) + self.malloc_byte_ctrl = urwid.Edit(" Bytes per malloc*: ", self.malloc_byte) self.byte_touch_cnt_ctrl = urwid.Edit( - ' Touch a byte after * bytes: ', self.byte_touch_cnt) + " Touch a byte after * bytes: ", self.byte_touch_cnt + ) self.malloc_delay_ctrl = urwid.Edit( - ' Sleep time between Free() [sec]: ', self.malloc_delay) + " Sleep time between Free() [sec]: ", self.malloc_delay + ) self.no_malloc_ctrl = urwid.CheckBox( - '"dirty" the memory \ninstead of free / alloc', self.no_malloc) + '"dirty" the memory \ninstead of free / alloc', self.no_malloc + ) self.write_workers_ctrl = urwid.Edit( - 'Write() / Unlink() worker count: ', self.write_workers) - self.write_bytes_ctrl = urwid.Edit( - ' Byte per Write(): ', self.write_bytes) - - default_button = urwid.Button('Default', on_press=self.on_default) - default_button._label.align = 'center' - - save_button = urwid.Button('Save', on_press=self.on_save) - save_button._label.align = 'center' - - cancel_button = urwid.Button('Cancel', on_press=self.on_cancel) - cancel_button._label.align = 'center' - - if_buttons = urwid.Columns( - [default_button, save_button, cancel_button]) - - title = urwid.Text(('bold text', u" Stress Options \n"), 'center') - - self.titles = [title, - self.time_out_ctrl, - urwid.Divider(u'-'), - self.sqrt_workers_ctrl, - urwid.Divider(u'-'), - self.sync_workers_ctrl, - urwid.Divider(u'-'), - self.memory_workers_ctrl, - urwid.Divider(), - self.malloc_byte_ctrl, - urwid.Divider(), - self.byte_touch_cnt_ctrl, - urwid.Divider(), - self.malloc_delay_ctrl, - urwid.Divider(), - self.no_malloc_ctrl, - urwid.Divider(u'-'), - self.write_workers_ctrl, - urwid.Divider(), - self.write_bytes_ctrl, - urwid.Divider(u'-'), - if_buttons] + "Write() / Unlink() worker count: ", self.write_workers + ) + self.write_bytes_ctrl = urwid.Edit(" Byte per Write(): ", self.write_bytes) + + default_button = urwid.Button("Default", on_press=self.on_default) + default_button._label.align = "center" + + save_button = urwid.Button("Save", on_press=self.on_save) + save_button._label.align = "center" + + cancel_button = urwid.Button("Cancel", on_press=self.on_cancel) + cancel_button._label.align = "center" + + if_buttons = urwid.Columns([default_button, save_button, cancel_button]) + + title = urwid.Text(("bold text", " Stress Options \n"), "center") + + self.titles = [ + title, + self.time_out_ctrl, + urwid.Divider("-"), + self.sqrt_workers_ctrl, + urwid.Divider("-"), + self.sync_workers_ctrl, + urwid.Divider("-"), + self.memory_workers_ctrl, + urwid.Divider(), + self.malloc_byte_ctrl, + urwid.Divider(), + self.byte_touch_cnt_ctrl, + urwid.Divider(), + self.malloc_delay_ctrl, + urwid.Divider(), + self.no_malloc_ctrl, + urwid.Divider("-"), + self.write_workers_ctrl, + urwid.Divider(), + self.write_bytes_ctrl, + urwid.Divider("-"), + if_buttons, + ] self.main_window = urwid.LineBox(urwid.ListBox(self.titles)) @@ -125,16 +126,16 @@ def set_edit_texts(self): self.write_bytes_ctrl.set_edit_text(self.write_bytes) def on_default(self, _): - self.time_out = 'none' - self.sqrt_workers = '1' - self.sync_workers = '0' - self.memory_workers = '0' - self.malloc_byte = '256M' - self.byte_touch_cnt = '4096' - self.malloc_delay = 'none' + self.time_out = "none" + self.sqrt_workers = "1" + self.sync_workers = "0" + self.memory_workers = "0" + self.malloc_byte = "256M" + self.byte_touch_cnt = "4096" + self.malloc_delay = "none" self.no_malloc = False - self.write_workers = '0' - self.write_bytes = '1G' + self.write_workers = "0" + self.write_bytes = "1G" self.set_edit_texts() self.return_fn() @@ -143,25 +144,32 @@ def get_size(self): return len(self.titles) + 5, self.MAX_TITLE_LEN def on_save(self, _): - self.time_out = self.get_pos_num( - self.time_out_ctrl.get_edit_text(), 'none') + self.time_out = self.get_pos_num(self.time_out_ctrl.get_edit_text(), "none") self.sqrt_workers = self.get_pos_num( - self.sqrt_workers_ctrl.get_edit_text(), '4') + self.sqrt_workers_ctrl.get_edit_text(), "4" + ) self.sync_workers = self.get_pos_num( - self.sync_workers_ctrl.get_edit_text(), '0') + self.sync_workers_ctrl.get_edit_text(), "0" + ) self.memory_workers = self.get_pos_num( - self.memory_workers_ctrl.get_edit_text(), '0') + self.memory_workers_ctrl.get_edit_text(), "0" + ) self.malloc_byte = self.get_valid_byte( - self.malloc_byte_ctrl.get_edit_text(), '256M') + self.malloc_byte_ctrl.get_edit_text(), "256M" + ) self.byte_touch_cnt = self.get_valid_byte( - self.byte_touch_cnt_ctrl.get_edit_text(), '4096') + self.byte_touch_cnt_ctrl.get_edit_text(), "4096" + ) self.malloc_delay = self.get_pos_num( - self.malloc_delay_ctrl.get_edit_text(), 'none') + self.malloc_delay_ctrl.get_edit_text(), "none" + ) self.no_malloc = self.no_malloc_ctrl.get_state() self.write_workers = self.get_pos_num( - self.write_workers_ctrl.get_edit_text(), '0') + self.write_workers_ctrl.get_edit_text(), "0" + ) self.write_bytes = self.get_valid_byte( - self.write_bytes_ctrl.get_edit_text(), '1G') + self.write_bytes_ctrl.get_edit_text(), "1G" + ) self.set_edit_texts() self.return_fn() @@ -173,32 +181,32 @@ def on_cancel(self, _): def get_stress_cmd(self): stress_cmd = [self.stress_exe] if int(self.sqrt_workers) > 0: - stress_cmd.append('-c') + stress_cmd.append("-c") stress_cmd.append(self.sqrt_workers) if int(self.sync_workers) > 0: - stress_cmd.append('-i') + stress_cmd.append("-i") stress_cmd.append(self.sync_workers) if int(self.memory_workers) > 0: - stress_cmd.append('--vm') + stress_cmd.append("--vm") stress_cmd.append(self.memory_workers) - stress_cmd.append('--vm-bytes') + stress_cmd.append("--vm-bytes") stress_cmd.append(self.malloc_byte) - stress_cmd.append('--vm-stride') + stress_cmd.append("--vm-stride") stress_cmd.append(self.byte_touch_cnt) if self.no_malloc: - stress_cmd.append('--vm-keep') + stress_cmd.append("--vm-keep") if int(self.write_workers) > 0: - stress_cmd.append('--hdd') + stress_cmd.append("--hdd") stress_cmd.append(self.write_workers) - stress_cmd.append('--hdd-bytes') + stress_cmd.append("--hdd-bytes") stress_cmd.append(self.write_bytes) - if self.time_out != 'none': - stress_cmd.append('-t') + if self.time_out != "none": + stress_cmd.append("-t") stress_cmd.append(self.time_out) return stress_cmd @@ -206,14 +214,14 @@ def get_stress_cmd(self): @staticmethod def get_pos_num(num, default): num_valid = re.match(r"\A([0-9]+)\Z", num, re.I) - if num_valid or (num == 'none' and default == 'none'): + if num_valid or (num == "none" and default == "none"): return num return default @staticmethod def get_valid_byte(num, default): """check if the format of number is (num)(G|m|B) i.e 500GB, 200mb. 400 - etc.. """ + etc..""" num_valid = re.match(r"\A([0-9]+)(M|G|m|g|)(B|b|\b)\Z", num, re.I) if num_valid: return num diff --git a/s_tui/sturwid/bar_graph_vector.py b/s_tui/sturwid/bar_graph_vector.py index 1681674..0805427 100644 --- a/s_tui/sturwid/bar_graph_vector.py +++ b/s_tui/sturwid/bar_graph_vector.py @@ -22,27 +22,28 @@ from s_tui.sturwid.complex_bar_graph import ScalableBarGraph import logging import math + logger = logging.getLogger(__name__) class BarGraphVector(LabeledBarGraphVector): - @staticmethod def append_latest_value(values, new_val): - values.append(new_val) return values[1:] MAX_SAMPLES = 300 SCALE_DENSITY = 5 - def __init__(self, - source, - regular_colors, - graph_count, - visible_graph_list, - alert_colors=None, - bar_width=1): + def __init__( + self, + source, + regular_colors, + graph_count, + visible_graph_list, + alert_colors=None, + bar_width=1, + ): self.source = source self.graph_count = graph_count self.graph_name = self.source.get_source_name() @@ -73,18 +74,18 @@ def __init__(self, y_label = [] - graph_title = self.graph_name + ' [' + self.measurement_unit + ']' + graph_title = self.graph_name + " [" + self.measurement_unit + "]" sub_title_list = self.source.get_sensor_list() # create several different instances of salable bar graph w = [] for _ in range(graph_count): - graph = ScalableBarGraph(['bg background', - self.color_a, self.color_b]) + graph = ScalableBarGraph(["bg background", self.color_a, self.color_b]) w.append(graph) super(BarGraphVector, self).__init__( - graph_title, sub_title_list, y_label, w, visible_graph_list) + graph_title, sub_title_list, y_label, w, visible_graph_list + ) for graph in self.bar_graph_vector: graph.set_bar_width(bar_width) @@ -101,7 +102,8 @@ def _set_colors(self, colors): for graph in self.bar_graph_vector: graph.set_segment_attributes( - ['bg background', self.color_a, self.color_b], satt=self.satt) + ["bg background", self.color_a, self.color_b], satt=self.satt + ) def get_graph_name(self): return self.graph_name @@ -120,12 +122,15 @@ def get_label_scale(self, min_val, max_val, size): label_cnt = int(size / self.SCALE_DENSITY) try: if max_val >= 100: - label = [int((min_val + i * (max_val - min_val) / label_cnt)) - for i in range(label_cnt + 1)] + label = [ + int((min_val + i * (max_val - min_val) / label_cnt)) + for i in range(label_cnt + 1) + ] else: - label = [round((min_val + i * - (max_val - min_val) / label_cnt), 1) - for i in range(label_cnt + 1)] + label = [ + round((min_val + i * (max_val - min_val) / label_cnt), 1) + for i in range(label_cnt + 1) + ] return label except ZeroDivisionError: logging.debug("Side label creation divided by 0") @@ -139,7 +144,8 @@ def set_smooth_colors(self, smooth): for graph in self.bar_graph_vector: graph.set_segment_attributes( - ['bg background', self.color_a, self.color_b], satt=self.satt) + ["bg background", self.color_a, self.color_b], satt=self.satt + ) def update(self): if not self.get_is_available(): @@ -170,7 +176,8 @@ def update(self): bars = [] if self.visible_graph_list[graph_idx]: self.graph_data[graph_idx] = self.append_latest_value( - self.graph_data[graph_idx], current_reading[graph_idx]) + self.graph_data[graph_idx], current_reading[graph_idx] + ) # Get the graph width (dimension 1) num_displayed_bars = graph.get_size()[1] @@ -185,7 +192,7 @@ def update(self): update_max = True local_max = math.ceil(max(local_top_value)) - if (local_max > self.graph_max): + if local_max > self.graph_max: update_max = True self.graph_max = local_max @@ -193,13 +200,13 @@ def update(self): for graph_idx, graph in enumerate(self.bar_graph_vector): bars = [] if self.visible_graph_list[graph_idx]: - # Get the graph width (dimension 1) num_displayed_bars = graph.get_size()[1] # Iterate over all the information in the graph if self.color_counter_vector[graph_idx] % 2 == 0: - for n in range(self.MAX_SAMPLES - num_displayed_bars, - self.MAX_SAMPLES): + for n in range( + self.MAX_SAMPLES - num_displayed_bars, self.MAX_SAMPLES + ): value = round(self.graph_data[graph_idx][n], 1) # toggle between two bar types if n & 1: @@ -207,8 +214,9 @@ def update(self): else: bars.append([value, 0]) else: - for n in range(self.MAX_SAMPLES - num_displayed_bars, - self.MAX_SAMPLES): + for n in range( + self.MAX_SAMPLES - num_displayed_bars, self.MAX_SAMPLES + ): value = round(self.graph_data[graph_idx][n], 1) if n & 1: bars.append([value, 0]) @@ -219,8 +227,9 @@ def update(self): graph.set_data(bars, float(self.graph_max)) y_label_size_max = max(y_label_size_max, graph.get_size()[0]) - self.set_y_label(self.get_label_scale(0, self.graph_max, - float(y_label_size_max))) + self.set_y_label( + self.get_label_scale(0, self.graph_max, float(y_label_size_max)) + ) # Only create new graphs if the maximum has changed if update_max: self.set_visible_graphs() diff --git a/s_tui/sturwid/complex_bar_graph.py b/s_tui/sturwid/complex_bar_graph.py index d6aa354..09e3053 100644 --- a/s_tui/sturwid/complex_bar_graph.py +++ b/s_tui/sturwid/complex_bar_graph.py @@ -28,6 +28,7 @@ class ScalableBarGraph(urwid.BarGraph): """Scale the graph acording to screen size""" + _size = (0, 0) def render(self, size, focus=False): @@ -50,8 +51,7 @@ def calculate_bar_widths(self, size, bardata): (maxcol, _) = size if self.bar_width is not None: - return [self.bar_width] * min( - len(bardata), int(maxcol / self.bar_width)) + return [self.bar_width] * min(len(bardata), int(maxcol / self.bar_width)) if len(bardata) >= maxcol: return [1] * maxcol @@ -74,21 +74,16 @@ def on_resize(self, new_size): class LabeledBarGraphVector(urwid.WidgetPlaceholder): - """Add option to add labels for X and Y axes """ - - def __init__(self, - title, - sub_title_list, - y_label, - bar_graph_vector, - visible_graph_list): + """Add option to add labels for X and Y axes""" + + def __init__( + self, title, sub_title_list, y_label, bar_graph_vector, visible_graph_list + ): for bar_graph in bar_graph_vector: if not isinstance(bar_graph, ScalableBarGraph): - raise Exception( - 'graph vector items must be ScalableBarGraph') + raise Exception("graph vector items must be ScalableBarGraph") if not self.check_label(y_label): - raise Exception( - 'Y label must be a valid label') + raise Exception("Y label must be a valid label") self.visible_graph_list = visible_graph_list self.bar_graph_vector = [] @@ -117,7 +112,7 @@ def set_y_label(self, y_label): if not y_label: text = urwid.Text("1") pile = urwid.Pile([urwid.ListBox([text])]) - self.y_label = ('fixed', 1, pile) + self.y_label = ("fixed", 1, pile) return str_y_label = [str(i) for i in y_label] @@ -130,27 +125,26 @@ def set_y_label(self, y_label): y_list_walker = urwid.Pile(y_list_walker, focus_item=0) y_scale_len = len(max(str_y_label, key=len)) - self.y_label = ('fixed', y_scale_len, y_list_walker) + self.y_label = ("fixed", y_scale_len, y_list_walker) def set_visible_graphs(self, visible_graph_list=None): """Show a column of the graph selected for display""" if visible_graph_list is None: visible_graph_list = self.visible_graph_list - vline = urwid.AttrWrap(urwid.SolidFill(u'|'), 'line') + vline = urwid.AttrWrap(urwid.SolidFill("|"), "line") graph_vector_column_list = [] - for state, graph, sub_title in zip(visible_graph_list, - self.bar_graph_vector, - self.sub_title_list): + for state, graph, sub_title in zip( + visible_graph_list, self.bar_graph_vector, self.sub_title_list + ): if state: - text_w = urwid.Text(sub_title, align='center') + text_w = urwid.Text(sub_title, align="center") sub_title_widget = urwid.ListBox([text_w]) - graph_a = [('fixed', 1, sub_title_widget), - ('weight', 1, graph)] + graph_a = [("fixed", 1, sub_title_widget), ("weight", 1, graph)] graph_and_title = urwid.Pile(graph_a) - graph_vector_column_list.append(('weight', 1, graph_and_title)) - graph_vector_column_list.append(('fixed', 1, vline)) + graph_vector_column_list.append(("weight", 1, graph_and_title)) + graph_vector_column_list.append(("fixed", 1, vline)) # if all sub graph are disabled if not graph_vector_column_list: @@ -161,14 +155,14 @@ def set_visible_graphs(self, visible_graph_list=None): # remove the last vertical line separator graph_vector_column_list.pop() - y_label_a = ('weight', 1, urwid.Columns(graph_vector_column_list)) - y_label_and_graphs = [self.y_label, - y_label_a] + y_label_a = ("weight", 1, urwid.Columns(graph_vector_column_list)) + y_label_and_graphs = [self.y_label, y_label_a] column_w = urwid.Columns(y_label_and_graphs, dividechars=1) y_label_and_graphs_widget = urwid.WidgetPlaceholder(column_w) - init_widget = urwid.Pile([('fixed', 1, self.title), - ('weight', 1, y_label_and_graphs_widget)]) + init_widget = urwid.Pile( + [("fixed", 1, self.title), ("weight", 1, y_label_and_graphs_widget)] + ) self.visible_graph_list = visible_graph_list self.original_widget = init_widget @@ -178,8 +172,7 @@ def set_graph(self, graph_vector): @staticmethod def check_label(label): - if (len(label) >= 2 and not(None in label) or - not label or label is None): + if len(label) >= 2 and not (None in label) or not label or label is None: return True return False diff --git a/s_tui/sturwid/summary_text_list.py b/s_tui/sturwid/summary_text_list.py index 39783bd..90b5d85 100644 --- a/s_tui/sturwid/summary_text_list.py +++ b/s_tui/sturwid/summary_text_list.py @@ -37,14 +37,13 @@ def __init__(self, source, visible_sensors_list): self.summary_text_items = OrderedDict() def get_text_item_list(self): - summery_text_list = [] for key, val in self.source.get_summary().items(): - label_w = urwid.Text(str(key[0:self.MAX_LABEL_L])) - value_w = urwid.Text(str(val), align='right') + label_w = urwid.Text(str(key[0 : self.MAX_LABEL_L])) + value_w = urwid.Text(str(val), align="right") # This can be accessed by the update method self.summary_text_items[key] = value_w - col_w = urwid.Columns([('weight', 1.5, label_w), value_w]) + col_w = urwid.Columns([("weight", 1.5, label_w), value_w]) try: _ = self.visible_summaries[key] except KeyError: diff --git a/s_tui/sturwid/ui_elements.py b/s_tui/sturwid/ui_elements.py index b6883dc..9ea6725 100644 --- a/s_tui/sturwid/ui_elements.py +++ b/s_tui/sturwid/ui_elements.py @@ -19,87 +19,84 @@ import urwid DEFAULT_PALETTE = [ - ('body', 'default', 'default', 'standout'), - ('header', 'default', 'dark red',), - ('screen edge', 'light blue', 'brown'), - ('main shadow', 'dark gray', 'black'), - ('line', 'default', 'light gray', 'standout'), - ('menu button', 'light gray', 'black'), - ('bg background', 'default', 'default'), - ('overheat dark', 'white', 'light red', 'standout'), - ('bold text', 'default,bold', 'default', 'bold'), - ('under text', 'default,underline', 'default', 'underline'), - - ('util light', 'default', 'light green'), - ('util light smooth', 'light green', 'default'), - ('util dark', 'default', 'dark green'), - ('util dark smooth', 'dark green', 'default'), - - ('high temp dark', 'default', 'dark red'), - ('high temp dark smooth', 'dark red', 'default'), - ('high temp light', 'default', 'light red'), - ('high temp light smooth', 'light red', 'default'), - - ('power dark', 'default', 'light gray', 'standout'), - ('power dark smooth', 'light gray', 'default'), - ('power light', 'default', 'white', 'standout'), - ('power light smooth', 'white', 'default'), - - ('temp dark', 'default', 'dark cyan', 'standout'), - ('temp dark smooth', 'dark cyan', 'default'), - ('temp light', 'default', 'light cyan', 'standout'), - ('temp light smooth', 'light cyan', 'default'), - - ('freq dark', 'default', 'dark magenta', 'standout'), - ('freq dark smooth', 'dark magenta', 'default'), - ('freq light', 'default', 'light magenta', 'standout'), - ('freq light smooth', 'light magenta', 'default'), - - ('fan dark', 'default', 'dark blue', 'standout'), - ('fan dark smooth', 'dark blue', 'default'), - ('fan light', 'default', 'light blue', 'standout'), - ('fan light smooth', 'light blue', 'default'), - - ('button normal', 'dark green', 'default', 'standout'), - ('button select', 'white', 'dark green'), - ('line', 'default', 'default', 'standout'), - ('pg normal', 'white', 'default', 'standout'), - ('pg complete', 'white', 'dark magenta'), - ('high temp txt', 'light red', 'default'), - ('pg smooth', 'dark magenta', 'default') + ("body", "default", "default", "standout"), + ( + "header", + "default", + "dark red", + ), + ("screen edge", "light blue", "brown"), + ("main shadow", "dark gray", "black"), + ("line", "default", "light gray", "standout"), + ("menu button", "light gray", "black"), + ("bg background", "default", "default"), + ("overheat dark", "white", "light red", "standout"), + ("bold text", "default,bold", "default", "bold"), + ("under text", "default,underline", "default", "underline"), + ("util light", "default", "light green"), + ("util light smooth", "light green", "default"), + ("util dark", "default", "dark green"), + ("util dark smooth", "dark green", "default"), + ("high temp dark", "default", "dark red"), + ("high temp dark smooth", "dark red", "default"), + ("high temp light", "default", "light red"), + ("high temp light smooth", "light red", "default"), + ("power dark", "default", "light gray", "standout"), + ("power dark smooth", "light gray", "default"), + ("power light", "default", "white", "standout"), + ("power light smooth", "white", "default"), + ("temp dark", "default", "dark cyan", "standout"), + ("temp dark smooth", "dark cyan", "default"), + ("temp light", "default", "light cyan", "standout"), + ("temp light smooth", "light cyan", "default"), + ("freq dark", "default", "dark magenta", "standout"), + ("freq dark smooth", "dark magenta", "default"), + ("freq light", "default", "light magenta", "standout"), + ("freq light smooth", "light magenta", "default"), + ("fan dark", "default", "dark blue", "standout"), + ("fan dark smooth", "dark blue", "default"), + ("fan light", "default", "light blue", "standout"), + ("fan light smooth", "light blue", "default"), + ("button normal", "dark green", "default", "standout"), + ("button select", "white", "dark green"), + ("line", "default", "default", "standout"), + ("pg normal", "white", "default", "standout"), + ("pg complete", "white", "dark magenta"), + ("high temp txt", "light red", "default"), + ("pg smooth", "dark magenta", "default"), ] class ViListBox(urwid.ListBox): # Catch key presses in box and pass them as arrow keys def keypress(self, size, key): - if key == 'j': - key = 'down' - elif key == 'k': - key = 'up' - elif key == 'h': - key = 'left' - elif key == 'l': - key = 'right' - elif key == 'G': - key = 'page down' - elif key == 'g': - key = 'page up' - elif key == 'x': - key = 'enter' - elif key == 'q': - key = 'q' + if key == "j": + key = "down" + elif key == "k": + key = "up" + elif key == "h": + key = "left" + elif key == "l": + key = "right" + elif key == "G": + key = "page down" + elif key == "g": + key = "page up" + elif key == "x": + key = "enter" + elif key == "q": + key = "q" return super(ViListBox, self).keypress(size, key) def radio_button(group, label, fn): - """ Inheriting radio button of urwid """ + """Inheriting radio button of urwid""" w = urwid.RadioButton(group, label, False, on_state_change=fn) - w = urwid.AttrWrap(w, 'button normal', 'button select') + w = urwid.AttrWrap(w, "button normal", "button select") return w def button(t, fn, data=None): w = urwid.Button(t, fn, data) - w = urwid.AttrWrap(w, 'button normal', 'button select') + w = urwid.AttrWrap(w, "button normal", "button select") return w diff --git a/s_tui/tests/test_util_source.py b/s_tui/tests/test_util_source.py index 37798e7..15dd49e 100644 --- a/s_tui/tests/test_util_source.py +++ b/s_tui/tests/test_util_source.py @@ -9,8 +9,8 @@ def test_util_class(self): def test_util_summary(self): util_source = UtilSource() - self.assertEqual(util_source.get_source_name(), 'Util') + self.assertEqual(util_source.get_source_name(), "Util") -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/setup.py b/setup.py index 8e3b518..03838b0 100644 --- a/setup.py +++ b/setup.py @@ -21,29 +21,26 @@ setup( name="s-tui", - packages=['s_tui', 's_tui.sources', 's_tui.sturwid'], + packages=["s_tui", "s_tui.sources", "s_tui.sturwid"], version=AUX.__version__, author="Alex Manuskin", - author_email="amanusk@tuta.io", + author_email="amanusk@pm.me", description="Stress Terminal UI stress test and monitoring tool", - long_description=open('README.md', 'r').read(), - long_description_content_type='text/markdown', + long_description=open("README.md", "r").read(), + long_description_content_type="text/markdown", license="GPLv2", url="https://github.com/amanusk/s-tui", - keywords=['stress', 'monitoring', 'TUI'], # arbitrary keywords - - entry_points={ - 'console_scripts': ['s-tui=s_tui.s_tui:main'] - }, + keywords=["stress", "monitoring", "TUI"], # arbitrary keywords + entry_points={"console_scripts": ["s-tui=s_tui.s_tui:main"]}, classifiers=[ - 'License :: OSI Approved :: GNU General Public License v2 (GPLv2)', - 'Operating System :: POSIX :: Linux', - 'Programming Language :: Python :: 2', - 'Programming Language :: Python :: 3', - 'Topic :: System :: Monitoring', + "License :: OSI Approved :: GNU General Public License v2 (GPLv2)", + "Operating System :: POSIX :: Linux", + "Programming Language :: Python :: 2", + "Programming Language :: Python :: 3", + "Topic :: System :: Monitoring", ], install_requires=[ - 'urwid>=2.0.1', - 'psutil>=5.9.1', + "urwid>=2.0.1", + "psutil>=5.9.1", ], )