Skip to content

Commit

Permalink
vine: daskvine scheduling algorithm (#3992)
Browse files Browse the repository at this point in the history
* vine: daskvine scheduling algorithm

* lint

* use wall time

* use 'inf'

* remove unneeded func

* use total time instead of avg
  • Loading branch information
JinZhou5042 authored Dec 4, 2024
1 parent 469f1da commit c43120d
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 2 deletions.
3 changes: 3 additions & 0 deletions taskvine/src/bindings/python3/ndcctools/taskvine/dask_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,10 @@ def find_dependencies(self, sexpr, depth=0):

def set_relations(self, key, sexpr):
sexpr = self._working_graph[key]

self._children_of[key] = self.find_dependencies(sexpr)
self._depth_of[key] = max([self._depth_of[c] for c in self._children_of[key]]) + 1 if self._children_of[key] else 0

self._missing_of[key] = set(self._children_of[key])

for c in self._children_of[key]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@
from .dask_dag import DaskVineDag
from .cvine import VINE_TEMP

import os
import time
import random
import contextlib
import cloudpickle
import os
from uuid import uuid4
from collections import defaultdict

try:
import rich
Expand Down Expand Up @@ -123,6 +126,7 @@ def get(self, dsk, keys, *,
lib_command=None,
lib_modules=None,
task_mode='tasks',
scheduling_mode='FIFO',
env_per_task=False,
progress_disable=False,
progress_label="[green]tasks",
Expand Down Expand Up @@ -164,12 +168,16 @@ def get(self, dsk, keys, *,
else:
self.lib_modules = hoisting_modules if hoisting_modules else import_modules # Deprecated
self.task_mode = task_mode
self.scheduling_mode = scheduling_mode
self.env_per_task = env_per_task
self.progress_disable = progress_disable
self.progress_label = progress_label
self.wrapper = wrapper
self.wrapper_proc = wrapper_proc
self.prune_files = prune_files
self.category_info = defaultdict(lambda: {"num_tasks": 0, "total_execution_time": 0})
self.max_priority = float('inf')
self.min_priority = float('-inf')

if submit_per_cycle is not None and submit_per_cycle < 1:
submit_per_cycle = None
Expand Down Expand Up @@ -274,6 +282,8 @@ def _dask_execute(self, dsk, keys):
print(f"{t.key} ran on {t.hostname}")

if t.successful():
self.category_info[t.category]["num_tasks"] += 1
self.category_info[t.category]["total_execution_time"] += t.resources_measured.wall_time
result_file = DaskVineFile(t.output_file, t.key, dag, self.task_mode)
rs = dag.set_result(t.key, result_file)
self._enqueue_dask_calls(dag, tag, rs, self.retries, enqueued_calls)
Expand Down Expand Up @@ -335,7 +345,42 @@ def _enqueue_dask_calls(self, dag, tag, rs, retries, enqueued_calls):
if lazy and self.checkpoint_fn:
lazy = self.checkpoint_fn(dag, k)

# each task has a category name
cat = self.category_name(sexpr)

task_depth = dag.depth_of(k)
if self.scheduling_mode == 'random':
priority = random.randint(self.min_priority, self.max_priority)
elif self.scheduling_mode == 'depth-first':
# dig more information about different kinds of tasks
priority = task_depth
elif self.scheduling_mode == 'breadth-first':
# prefer to start all branches as soon as possible
priority = -task_depth
elif self.scheduling_mode == 'longest-category-first':
# if no tasks have been executed in this category, set a high priority so that we know more information about each category
if self.category_info[cat]["num_tasks"]:
priority = self.category_info[cat]["total_execution_time"] / self.category_info[cat]["num_tasks"]
else:
priority = self.max_priority
elif self.scheduling_mode == 'shortest-category-first':
# if no tasks have been executed in this category, set a high priority so that we know more information about each category
if self.category_info[cat]["num_tasks"]:
priority = -self.category_info[cat]["total_execution_time"] / self.category_info[cat]["num_tasks"]
else:
priority = self.max_priority
elif self.scheduling_mode == 'FIFO':
# first in first out, the default behavior
priority = -round(time.time(), 6)
elif self.scheduling_mode == 'LIFO':
# last in first out, the opposite of FIFO
priority = round(time.time(), 6)
elif self.scheduling_mode == 'largest-input-first':
# best for saving disk space (with pruing)
priority = sum([len(dag.get_result(c)._file) for c in dag.get_children(k)])
else:
raise ValueError(f"Unknown scheduling mode {self.scheduling_mode}")

if self.task_mode == 'tasks':
if cat not in self._categories_known:
if self.resources:
Expand All @@ -357,6 +402,7 @@ def _enqueue_dask_calls(self, dag, tag, rs, retries, enqueued_calls):
worker_transfers=lazy,
wrapper=self.wrapper)

t.set_priority(priority)
if self.env_per_task:
t.set_command(
f"mkdir envdir && tar -xf {self._environment_name} -C envdir && envdir/bin/run_in_env {t._command}")
Expand All @@ -374,6 +420,7 @@ def _enqueue_dask_calls(self, dag, tag, rs, retries, enqueued_calls):
worker_transfers=lazy,
wrapper=self.wrapper)

t.set_priority(priority)
t.set_tag(tag) # tag that identifies this dag

enqueued_calls.append(t)
Expand Down Expand Up @@ -632,6 +679,7 @@ def __init__(self, m,
self.set_category(category)
if worker_transfers:
self.enable_temp_output()

if extra_files:
for f, name in extra_files.items():
self.add_input(f, name)
Expand Down
1 change: 0 additions & 1 deletion taskvine/src/manager/taskvine.h
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,6 @@ regardless of the priority.
@param t A task object.
@param priority The priority of the task.
*/

void vine_task_set_priority(struct vine_task *t, double priority);

/** Specify an environment variable to be added to the task.
Expand Down
1 change: 1 addition & 0 deletions taskvine/src/manager/vine_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,7 @@ static vine_result_code_t get_completion_result(struct vine_manager *q, struct v
t->time_workers_execute_last = observed_execution_time > execution_time ? execution_time : observed_execution_time;
t->time_workers_execute_last_start = start_time;
t->time_workers_execute_last_end = end_time;
t->resources_measured->wall_time = t->time_workers_execute_last_end - t->time_workers_execute_last_start;
t->time_workers_execute_all += t->time_workers_execute_last;
t->output_length = output_length;
t->result = task_status;
Expand Down

0 comments on commit c43120d

Please sign in to comment.