diff --git a/taskvine/src/bindings/python3/ndcctools/taskvine/dask_dag.py b/taskvine/src/bindings/python3/ndcctools/taskvine/dask_dag.py index 71314b626e..e9165e441f 100644 --- a/taskvine/src/bindings/python3/ndcctools/taskvine/dask_dag.py +++ b/taskvine/src/bindings/python3/ndcctools/taskvine/dask_dag.py @@ -115,7 +115,10 @@ def find_dependencies(self, sexpr, depth=0): def set_relations(self, key, sexpr): sexpr = self._working_graph[key] + self._children_of[key] = self.find_dependencies(sexpr) + self._depth_of[key] = max([self._depth_of[c] for c in self._children_of[key]]) + 1 if self._children_of[key] else 0 + self._missing_of[key] = set(self._children_of[key]) for c in self._children_of[key]: diff --git a/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py b/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py index 32de3a1d4e..ebab6a1f95 100644 --- a/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py +++ b/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py @@ -14,10 +14,13 @@ from .dask_dag import DaskVineDag from .cvine import VINE_TEMP +import os +import time +import random import contextlib import cloudpickle -import os from uuid import uuid4 +from collections import defaultdict try: import rich @@ -123,6 +126,7 @@ def get(self, dsk, keys, *, lib_command=None, lib_modules=None, task_mode='tasks', + scheduling_mode='FIFO', env_per_task=False, progress_disable=False, progress_label="[green]tasks", @@ -164,12 +168,16 @@ def get(self, dsk, keys, *, else: self.lib_modules = hoisting_modules if hoisting_modules else import_modules # Deprecated self.task_mode = task_mode + self.scheduling_mode = scheduling_mode self.env_per_task = env_per_task self.progress_disable = progress_disable self.progress_label = progress_label self.wrapper = wrapper self.wrapper_proc = wrapper_proc self.prune_files = prune_files + self.category_info = defaultdict(lambda: {"num_tasks": 0, "total_execution_time": 0}) + self.max_priority = float('inf') + self.min_priority = float('-inf') if submit_per_cycle is not None and submit_per_cycle < 1: submit_per_cycle = None @@ -274,6 +282,8 @@ def _dask_execute(self, dsk, keys): print(f"{t.key} ran on {t.hostname}") if t.successful(): + self.category_info[t.category]["num_tasks"] += 1 + self.category_info[t.category]["total_execution_time"] += t.resources_measured.wall_time result_file = DaskVineFile(t.output_file, t.key, dag, self.task_mode) rs = dag.set_result(t.key, result_file) self._enqueue_dask_calls(dag, tag, rs, self.retries, enqueued_calls) @@ -335,7 +345,42 @@ def _enqueue_dask_calls(self, dag, tag, rs, retries, enqueued_calls): if lazy and self.checkpoint_fn: lazy = self.checkpoint_fn(dag, k) + # each task has a category name cat = self.category_name(sexpr) + + task_depth = dag.depth_of(k) + if self.scheduling_mode == 'random': + priority = random.randint(self.min_priority, self.max_priority) + elif self.scheduling_mode == 'depth-first': + # dig more information about different kinds of tasks + priority = task_depth + elif self.scheduling_mode == 'breadth-first': + # prefer to start all branches as soon as possible + priority = -task_depth + elif self.scheduling_mode == 'longest-category-first': + # if no tasks have been executed in this category, set a high priority so that we know more information about each category + if self.category_info[cat]["num_tasks"]: + priority = self.category_info[cat]["total_execution_time"] / self.category_info[cat]["num_tasks"] + else: + priority = self.max_priority + elif self.scheduling_mode == 'shortest-category-first': + # if no tasks have been executed in this category, set a high priority so that we know more information about each category + if self.category_info[cat]["num_tasks"]: + priority = -self.category_info[cat]["total_execution_time"] / self.category_info[cat]["num_tasks"] + else: + priority = self.max_priority + elif self.scheduling_mode == 'FIFO': + # first in first out, the default behavior + priority = -round(time.time(), 6) + elif self.scheduling_mode == 'LIFO': + # last in first out, the opposite of FIFO + priority = round(time.time(), 6) + elif self.scheduling_mode == 'largest-input-first': + # best for saving disk space (with pruing) + priority = sum([len(dag.get_result(c)._file) for c in dag.get_children(k)]) + else: + raise ValueError(f"Unknown scheduling mode {self.scheduling_mode}") + if self.task_mode == 'tasks': if cat not in self._categories_known: if self.resources: @@ -357,6 +402,7 @@ def _enqueue_dask_calls(self, dag, tag, rs, retries, enqueued_calls): worker_transfers=lazy, wrapper=self.wrapper) + t.set_priority(priority) if self.env_per_task: t.set_command( f"mkdir envdir && tar -xf {self._environment_name} -C envdir && envdir/bin/run_in_env {t._command}") @@ -374,6 +420,7 @@ def _enqueue_dask_calls(self, dag, tag, rs, retries, enqueued_calls): worker_transfers=lazy, wrapper=self.wrapper) + t.set_priority(priority) t.set_tag(tag) # tag that identifies this dag enqueued_calls.append(t) @@ -632,6 +679,7 @@ def __init__(self, m, self.set_category(category) if worker_transfers: self.enable_temp_output() + if extra_files: for f, name in extra_files.items(): self.add_input(f, name) diff --git a/taskvine/src/manager/taskvine.h b/taskvine/src/manager/taskvine.h index ed99441127..2e531a00fb 100644 --- a/taskvine/src/manager/taskvine.h +++ b/taskvine/src/manager/taskvine.h @@ -451,7 +451,6 @@ regardless of the priority. @param t A task object. @param priority The priority of the task. */ - void vine_task_set_priority(struct vine_task *t, double priority); /** Specify an environment variable to be added to the task. diff --git a/taskvine/src/manager/vine_manager.c b/taskvine/src/manager/vine_manager.c index b0fc92078a..3c08bc5cde 100644 --- a/taskvine/src/manager/vine_manager.c +++ b/taskvine/src/manager/vine_manager.c @@ -564,6 +564,7 @@ static vine_result_code_t get_completion_result(struct vine_manager *q, struct v t->time_workers_execute_last = observed_execution_time > execution_time ? execution_time : observed_execution_time; t->time_workers_execute_last_start = start_time; t->time_workers_execute_last_end = end_time; + t->resources_measured->wall_time = t->time_workers_execute_last_end - t->time_workers_execute_last_start; t->time_workers_execute_all += t->time_workers_execute_last; t->output_length = output_length; t->result = task_status;