Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vine: daskvine scheduling algorithm #3992

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ def get(self, dsk, keys, *,
self.wrapper = wrapper
self.wrapper_proc = wrapper_proc
self.prune_files = prune_files
self.category_execution_time = defaultdict(list)
self.category_execution_time = defaultdict(lambda: {"num_tasks": 0, "avg_execution_time": 0})
JinZhou5042 marked this conversation as resolved.
Show resolved Hide resolved
self.max_priority = float('inf')
self.min_priority = float('-inf')

Expand Down Expand Up @@ -281,7 +281,9 @@ def _dask_execute(self, dsk, keys):
print(f"{t.key} ran on {t.hostname}")

if t.successful():
self.category_execution_time[t.category].append(t.execution_time)
cat_total_time = self.category_execution_time[t.category]["avg_execution_time"] * self.category_execution_time[t.category]["num_tasks"]
self.category_execution_time[t.category]["num_tasks"] += 1
self.category_execution_time[t.category]["avg_execution_time"] = (cat_total_time + t.resources_measured.wall_time) / self.category_execution_time[t.category]["num_tasks"]
JinZhou5042 marked this conversation as resolved.
Show resolved Hide resolved
JinZhou5042 marked this conversation as resolved.
Show resolved Hide resolved
result_file = DaskVineFile(t.output_file, t.key, dag, self.task_mode)
rs = dag.set_result(t.key, result_file)
self._enqueue_dask_calls(dag, tag, rs, self.retries, enqueued_calls)
Expand Down Expand Up @@ -355,12 +357,12 @@ def _enqueue_dask_calls(self, dag, tag, rs, retries, enqueued_calls):
elif self.scheduling_mode == 'breadth-first':
# prefer to start all branches as soon as possible
priority = -task_depth
elif self.scheduling_mode == 'longest-first':
elif self.scheduling_mode == 'longest-category-first':
# if no tasks have been executed in this category, set a high priority so that we know more information about each category
priority = sum(self.category_execution_time[cat]) / len(self.category_execution_time[cat]) if len(self.category_execution_time[cat]) else self.max_priority
elif self.scheduling_mode == 'shortest-first':
priority = self.category_execution_time[cat]["avg_execution_time"] if self.category_execution_time[cat]["num_tasks"] else self.max_priority
elif self.scheduling_mode == 'shortest-category-first':
# if no tasks have been executed in this category, set a high priority so that we know more information about each category
priority = -sum(self.category_execution_time[cat]) / len(self.category_execution_time[cat]) if len(self.category_execution_time[cat]) else self.max_priority
priority = -self.category_execution_time[cat]["avg_execution_time"] if self.category_execution_time[cat]["num_tasks"] else self.max_priority
elif self.scheduling_mode == 'FIFO':
# first in first out, the default behavior
priority = -round(time.time(), 6)
Expand Down
7 changes: 0 additions & 7 deletions taskvine/src/bindings/python3/ndcctools/taskvine/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -829,13 +829,6 @@ def resources_allocated(self):
return None
return cvine.vine_task_get_resources(self._task, "allocated")

##
# Get the execution time of this task in seconds.
#
@property
def execution_time(self):
return cvine.vine_task_get_execution_time(self._task) / 1e6

##
# Adds inputs for nopen library and rules file and sets LD_PRELOAD
#
Expand Down
7 changes: 0 additions & 7 deletions taskvine/src/manager/taskvine.h
Original file line number Diff line number Diff line change
Expand Up @@ -454,13 +454,6 @@ regardless of the priority.
*/
void vine_task_set_priority(struct vine_task *t, double priority);

/** Get the actual execution time of the task.
execution_time = t->time_workers_execute_last_end - t->time_workers_execute_last_start.
@param t A task object.
@return The actual execution time of the task in seconds.
*/
double vine_task_get_execution_time(struct vine_task *t);

/** Specify an environment variable to be added to the task.
@param t A task object
@param name Name of the variable.
Expand Down
1 change: 1 addition & 0 deletions taskvine/src/manager/vine_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,7 @@ static vine_result_code_t get_completion_result(struct vine_manager *q, struct v
t->time_workers_execute_last = observed_execution_time > execution_time ? execution_time : observed_execution_time;
t->time_workers_execute_last_start = start_time;
t->time_workers_execute_last_end = end_time;
t->resources_measured->wall_time = t->time_workers_execute_last_end - t->time_workers_execute_last_start;
t->time_workers_execute_all += t->time_workers_execute_last;
t->output_length = output_length;
t->result = task_status;
Expand Down
Loading