Skip to content

Commit

Permalink
vine: daskvine scheduling algorithm
Browse files Browse the repository at this point in the history
  • Loading branch information
JinZhou5042 committed Nov 25, 2024
1 parent 8530039 commit 3180b58
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 5 deletions.
7 changes: 5 additions & 2 deletions taskvine/src/bindings/python3/ndcctools/taskvine/dask_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def __init__(self, dsk, low_memory_mode=False):
self._pending_parents_of = defaultdict(lambda: set())

# key->depth. The shallowest level the key is found
self._depth_of = defaultdict(lambda: float('inf'))
self._depth_of = defaultdict(lambda: float("inf"))

# target keys that the dag should compute
self._targets = set()
Expand Down Expand Up @@ -115,7 +115,10 @@ def find_dependencies(self, sexpr, depth=0):

def set_relations(self, key, sexpr):
sexpr = self._working_graph[key]

self._children_of[key] = self.find_dependencies(sexpr)
self._depth_of[key] = max([self._depth_of[c] for c in self._children_of[key]]) + 1 if self._children_of[key] else 0

self._missing_of[key] = set(self._children_of[key])

for c in self._children_of[key]:
Expand Down Expand Up @@ -242,4 +245,4 @@ def get_targets(self):
class DaskVineNoResult(Exception):
"""Exception raised when asking for a result from a computation that has not been performed."""
pass
# vim: set sts=4 sw=4 ts=4 expandtab ft=python:
# vim: set sts=4 sw=4 ts=4 expandtab ft=python:
45 changes: 43 additions & 2 deletions taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@
from .dask_dag import DaskVineDag
from .cvine import VINE_TEMP

import os
import time
import random
import contextlib
import cloudpickle
import os
from uuid import uuid4
from collections import defaultdict

try:
import rich
Expand Down Expand Up @@ -123,6 +126,7 @@ def get(self, dsk, keys, *,
lib_command=None,
lib_modules=None,
task_mode='tasks',
scheduling_mode='FIFO',
env_per_task=False,
progress_disable=False,
progress_label="[green]tasks",
Expand Down Expand Up @@ -164,12 +168,16 @@ def get(self, dsk, keys, *,
else:
self.lib_modules = hoisting_modules if hoisting_modules else import_modules # Deprecated
self.task_mode = task_mode
self.scheduling_mode = scheduling_mode
self.env_per_task = env_per_task
self.progress_disable = progress_disable
self.progress_label = progress_label
self.wrapper = wrapper
self.wrapper_proc = wrapper_proc
self.prune_files = prune_files
self.category_execution_time = defaultdict(list)
self.max_priority = float('inf')
self.min_priority = float('-inf')

if submit_per_cycle is not None and submit_per_cycle < 1:
submit_per_cycle = None
Expand Down Expand Up @@ -273,6 +281,7 @@ def _dask_execute(self, dsk, keys):
print(f"{t.key} ran on {t.hostname}")

if t.successful():
self.category_execution_time[t.category].append(t.execution_time)
result_file = DaskVineFile(t.output_file, t.key, dag, self.task_mode)
rs = dag.set_result(t.key, result_file)
self._enqueue_dask_calls(dag, tag, rs, self.retries, enqueued_calls)
Expand Down Expand Up @@ -334,7 +343,36 @@ def _enqueue_dask_calls(self, dag, tag, rs, retries, enqueued_calls):
if lazy and self.checkpoint_fn:
lazy = self.checkpoint_fn(dag, k)

# each task has a category name
cat = self.category_name(sexpr)

task_depth = dag.depth_of(k)
if self.scheduling_mode == 'random':
priority = random.randint(self.min_priority, self.max_priority)
elif self.scheduling_mode == 'depth-first':
# dig more information about different kinds of tasks
priority = task_depth
elif self.scheduling_mode == 'breadth-first':
# prefer to start all branches as soon as possible
priority = -task_depth
elif self.scheduling_mode == 'longest-first':
# if no tasks have been executed in this category, set a high priority so that we know more information about each category
priority = sum(self.category_execution_time[cat]) / len(self.category_execution_time[cat]) if len(self.category_execution_time[cat]) else self.max_priority
elif self.scheduling_mode == 'shortest-first':
# if no tasks have been executed in this category, set a high priority so that we know more information about each category
priority = -sum(self.category_execution_time[cat]) / len(self.category_execution_time[cat]) if len(self.category_execution_time[cat]) else self.max_priority
elif self.scheduling_mode == 'FIFO':
# first in first out, the default behavior
priority = -round(time.time(), 6)
elif self.scheduling_mode == 'LIFO':
# last in first out, the opposite of FIFO
priority = round(time.time(), 6)
elif self.scheduling_mode == 'largest-input-first':
# best for saving disk space (with pruing)
priority = sum([len(dag.get_result(c)._file) for c in dag.get_children(k)])
else:
raise ValueError(f"Unknown scheduling mode {self.scheduling_mode}")

if self.task_mode == 'tasks':
if cat not in self._categories_known:
if self.resources:
Expand All @@ -356,6 +394,7 @@ def _enqueue_dask_calls(self, dag, tag, rs, retries, enqueued_calls):
worker_transfers=lazy,
wrapper=self.wrapper)

t.set_priority(priority)
if self.env_per_task:
t.set_command(
f"mkdir envdir && tar -xf {self._environment_name} -C envdir && envdir/bin/run_in_env {t._command}")
Expand All @@ -373,6 +412,7 @@ def _enqueue_dask_calls(self, dag, tag, rs, retries, enqueued_calls):
worker_transfers=lazy,
wrapper=self.wrapper)

t.set_priority(priority)
t.set_tag(tag) # tag that identifies this dag

enqueued_calls.append(t)
Expand Down Expand Up @@ -631,6 +671,7 @@ def __init__(self, m,
self.set_category(category)
if worker_transfers:
self.enable_temp_output()

if extra_files:
for f, name in extra_files.items():
self.add_input(f, name)
Expand Down Expand Up @@ -741,4 +782,4 @@ def find_dask_keys(lists):

def find_result_files(lists):
return find_in_lists(lists, lambda f: isinstance(f, DaskVineFile))
# vim: set sts=4 sw=4 ts=4 expandtab ft=python:
# vim: set sts=4 sw=4 ts=4 expandtab ft=python:
7 changes: 7 additions & 0 deletions taskvine/src/bindings/python3/ndcctools/taskvine/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -829,6 +829,13 @@ def resources_allocated(self):
return None
return cvine.vine_task_get_resources(self._task, "allocated")

##
# Get the execution time of this task in seconds.
#
@property
def execution_time(self):
return cvine.vine_task_get_execution_time(self._task) / 1e6

##
# Adds inputs for nopen library and rules file and sets LD_PRELOAD
#
Expand Down
8 changes: 7 additions & 1 deletion taskvine/src/manager/taskvine.h
Original file line number Diff line number Diff line change
Expand Up @@ -452,9 +452,15 @@ regardless of the priority.
@param t A task object.
@param priority The priority of the task.
*/

void vine_task_set_priority(struct vine_task *t, double priority);

/** Get the actual execution time of the task.
execution_time = t->time_workers_execute_last_end - t->time_workers_execute_last_start.
@param t A task object.
@return The actual execution time of the task in seconds.
*/
double vine_task_get_execution_time(struct vine_task *t);

/** Specify an environment variable to be added to the task.
@param t A task object
@param name Name of the variable.
Expand Down
5 changes: 5 additions & 0 deletions taskvine/src/manager/vine_task.c
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,11 @@ void vine_task_set_priority(struct vine_task *t, double priority)
t->priority = priority;
}

double vine_task_get_execution_time(struct vine_task *t)
{
return t->time_workers_execute_last_end - t->time_workers_execute_last_start;
}

int vine_task_set_monitor_output(struct vine_task *t, const char *monitor_output_directory)
{

Expand Down

0 comments on commit 3180b58

Please sign in to comment.