Skip to content

Commit

Permalink
added collect_benchmark_data to baseexecutor
Browse files Browse the repository at this point in the history
  • Loading branch information
Acribbs committed Jan 1, 2025
1 parent 24fd1e8 commit 87936f0
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 12 deletions.
23 changes: 19 additions & 4 deletions cgatcore/pipeline/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,34 @@ class BaseExecutor:
def __init__(self, **kwargs):
"""Initialize the executor with configuration options."""
self.config = kwargs
self.task_name = "base_task" # Should be overridden by subclasses
self.default_total_time = 0 # Should be overridden by subclasses

def run(self, statement, *args, **kwargs):
"""Run the given job statement. This should be implemented by subclasses."""
raise NotImplementedError("Subclasses must implement this method")

def collect_benchmark_data(self, *args, **kwargs):
"""Collect benchmark data if needed."""
raise NotImplementedError("Subclasses must implement this method")

def collect_metric_data(self, *args, **kwargs):
"""Collect metric data if needed."""
raise NotImplementedError("Subclasses must implement this method")

def collect_benchmark_data(self, statements, resource_usage=None):
"""Collect benchmark data for job execution.
Args:
statements (list): List of executed statements
resource_usage (list, optional): Resource usage data
Returns:
dict: Benchmark data including task name and execution time
"""
return {
"task": self.task_name,
"total_t": self.default_total_time,
"statements": statements,
"resource_usage": resource_usage or []
}

def build_job_script(self, statement):
"""Build a simple job script for execution.
Args:
Expand Down
24 changes: 16 additions & 8 deletions cgatcore/pipeline/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ class SGEExecutor(BaseExecutor):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.logger = logging.getLogger(__name__)
self.task_name = "sge_task"
self.default_total_time = 8

def run(self, statement_list):
benchmark_data = []
Expand Down Expand Up @@ -84,8 +86,8 @@ def collect_benchmark_data(self, statements, resource_usage=None):
dict: Benchmark data including task name and execution time
"""
return {
"task": "sge_task",
"total_t": 8,
"task": self.task_name,
"total_t": self.default_total_time,
"statements": statements,
"resource_usage": resource_usage or []
}
Expand All @@ -97,6 +99,8 @@ class SlurmExecutor(BaseExecutor):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.logger = logging.getLogger(__name__)
self.task_name = "slurm_task"
self.default_total_time = 10

def run(self, statement_list):
benchmark_data = []
Expand Down Expand Up @@ -170,8 +174,8 @@ def collect_benchmark_data(self, statements, resource_usage=None):
dict: Benchmark data including task name and execution time
"""
return {
"task": "slurm_task",
"total_t": 10,
"task": self.task_name,
"total_t": self.default_total_time,
"statements": statements,
"resource_usage": resource_usage or []
}
Expand All @@ -183,6 +187,8 @@ class TorqueExecutor(BaseExecutor):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.logger = logging.getLogger(__name__)
self.task_name = "torque_task"
self.default_total_time = 7

def run(self, statement_list):
benchmark_data = []
Expand Down Expand Up @@ -260,8 +266,8 @@ def collect_benchmark_data(self, statements, resource_usage=None):
dict: Benchmark data including task name and execution time
"""
return {
"task": "torque_task",
"total_t": 7,
"task": self.task_name,
"total_t": self.default_total_time,
"statements": statements,
"resource_usage": resource_usage or []
}
Expand All @@ -273,6 +279,8 @@ class LocalExecutor(BaseExecutor):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.logger = logging.getLogger(__name__)
self.task_name = "local_task"
self.default_total_time = 5

def run(self, statement_list):
benchmark_data = []
Expand Down Expand Up @@ -317,8 +325,8 @@ def collect_benchmark_data(self, statements, resource_usage=None):
dict: Benchmark data including task name and execution time
"""
return {
"task": "local_task",
"total_t": 5,
"task": self.task_name,
"total_t": self.default_total_time,
"statements": statements,
"resource_usage": resource_usage or []
}

0 comments on commit 87936f0

Please sign in to comment.