From c77ab6f9f1d47591760feeb58cce9360c9b905a3 Mon Sep 17 00:00:00 2001 From: Benjamin Tovar Date: Fri, 17 Jan 2025 11:24:47 -0500 Subject: [PATCH] update dask vine executor to new dask graphs (#4015) * DaskVine to new graph representation * update simple graph example * fix bug with depth * always convert from legacy representation, for now * check for dask in test * do not import DaskVineDag if dask not available * update function calls * lint * handle generic container graph nodes * add warning about dask version * example_to_revert * remove print statement --- .../python3/ndcctools/taskvine/__init__.py | 3 +- .../python3/ndcctools/taskvine/dask_dag.py | 216 ++++++++---------- .../ndcctools/taskvine/dask_executor.py | 190 +++++++-------- .../python3/ndcctools/taskvine/futures.py | 1 + .../src/examples/vine_example_dask_graph.py | 14 +- .../test/TR_vine_python_unlink_when_done.sh | 2 +- 6 files changed, 204 insertions(+), 222 deletions(-) diff --git a/taskvine/src/bindings/python3/ndcctools/taskvine/__init__.py b/taskvine/src/bindings/python3/ndcctools/taskvine/__init__.py index 79a19a529f..85c850bc56 100644 --- a/taskvine/src/bindings/python3/ndcctools/taskvine/__init__.py +++ b/taskvine/src/bindings/python3/ndcctools/taskvine/__init__.py @@ -57,12 +57,11 @@ LibraryTask, FunctionCall, ) -from .dask_dag import DaskVineDag - from . import cvine try: from .dask_executor import DaskVine + from .dask_dag import DaskVineDag except ImportError as e: print(f"DaskVine not available. Couldn't find module: {e.name}") diff --git a/taskvine/src/bindings/python3/ndcctools/taskvine/dask_dag.py b/taskvine/src/bindings/python3/ndcctools/taskvine/dask_dag.py index e9165e441f..4f0c075050 100644 --- a/taskvine/src/bindings/python3/ndcctools/taskvine/dask_dag.py +++ b/taskvine/src/bindings/python3/ndcctools/taskvine/dask_dag.py @@ -2,80 +2,81 @@ # This software is distributed under the GNU General Public License. # See the file COPYING for details. -from uuid import uuid4 from collections import defaultdict +import dask._task_spec as dts class DaskVineDag: """A directed graph that encodes the steps and state a computation needs. - Single computations are encoded as s-expressions, therefore it is 'upside-down', - in the sense that the children of a node are the nodes required to compute it. - E.g., for + Single computations are encoded as dts.Task's, with dependecies expressed as the keys needed by the task. dsk = {'x': 1, 'y': 2, - 'z': (add, 'x', 'y'), - 'w': (sum, ['x', 'y', 'z']), - 'v': [(sum, ['w', 'z']), 2] + 'z': dts.Task('z', add, dts.TaskRef('x'), dts.TaskRef('y')) + 'w': dts.Task('w', sum, [dts.TaskRef('x'), dts.TaskRef('y'), dts.TaskRef('z')]), + 'v': dts.Task('v', sum, [dts.TaskRef('w'), dts.TaskRef('z')]) + 't': dts.Task('v', sum, [dts.TaskRef('v'), 2]) } - 'z' has as children 'x' and 'y'. - - Each node is referenced by its key. When the value of a key is list of - sexprs, like 'v' above, and low_memory_mode is True, then a key is automatically computed recursively - for each computation. + 'z' has as dependecies 'x' and 'y'. Computation is done lazily. The DaskVineDag is initialized from a task graph, but not computation is decoded. To use the DaskVineDag: - DaskVineDag.set_targets(keys): Request the computation associated with key to be decoded. - - DaskVineDag.get_ready(): A list of [key, sexpr] of expressions that are ready - to be executed. + - DaskVineDag.get_ready(): A list of dts.Task that are ready to be executed. - DaskVineDag.set_result(key, value): Sets the result of key to value. - DaskVineDag.get_result(key): Get result associated with key. Raises DagNoResult - DaskVineDag.has_result(key): Whether the key has a computed result. """ + @staticmethod + def hashable(s): + try: + hash(s) + return True + except TypeError: + return False + @staticmethod def keyp(s): - return DaskVineDag.hashable(s) and not DaskVineDag.taskp(s) + return DaskVineDag.hashable(s) and not DaskVineDag.taskref(s) and not DaskVineDag.taskp(s) @staticmethod - def taskp(s): - return isinstance(s, tuple) and len(s) > 0 and callable(s[0]) + def taskref(s): + return isinstance(s, (dts.TaskRef, dts.Alias)) @staticmethod - def listp(s): - return isinstance(s, list) + def taskp(s): + return isinstance(s, dts.Task) @staticmethod - def symbolp(s): - return not (DaskVineDag.taskp(s) or DaskVineDag.listp(s)) + def containerp(s): + return isinstance(s, dts.NestedContainer) @staticmethod - def hashable(s): - try: - hash(s) - return True - except TypeError: - return False + def symbolp(s): + return isinstance(s, dts.DataNode) - def __init__(self, dsk, low_memory_mode=False): + def __init__(self, dsk): self._dsk = dsk - # child -> parents. I.e., which parents needs the result of child - self._parents_of = defaultdict(lambda: set()) + # For a key, the set of keys that need it to perform a computation. + self._needed_by = defaultdict(lambda: set()) - # parent->children still waiting for result. A key is ready to be computed when children left is [] - self._missing_of = {} + # For a key, the subset of self._needed_by[key] that still need to be completed. + # Only useful for gc. + self._pending_needed_by = defaultdict(lambda: set()) - # parent->nchildren get the number of children for parent computation - self._children_of = {} + # For a key, the set of keys that it needs for computation. + self._dependencies_of = {} + + # For a key, the set of keys with a pending result for they key to be computed. + # When the set is empty, the key is ready to be computed. It is always a subset + # of self._dependencies_of[key]. + self._missing_of = {} # key->value of its computation self._result_of = {} - # child -> nodes that use the child as an input, and that have not been completed - self._pending_parents_of = defaultdict(lambda: set()) - # key->depth. The shallowest level the key is found self._depth_of = defaultdict(lambda: float('inf')) @@ -83,50 +84,33 @@ def __init__(self, dsk, low_memory_mode=False): self._targets = set() self._working_graph = dict(dsk) - if low_memory_mode: - self._flatten_graph() self.initialize_graph() def left_to_compute(self): return len(self._working_graph) - len(self._result_of) - def graph_keyp(self, s): - if DaskVineDag.keyp(s): - return s in self._working_graph - return False - def depth_of(self, key): return self._depth_of[key] def initialize_graph(self): - for key, sexpr in self._working_graph.items(): - self.set_relations(key, sexpr) - - def find_dependencies(self, sexpr, depth=0): - dependencies = set() - if self.graph_keyp(sexpr): - dependencies.add(sexpr) - self._depth_of[sexpr] = min(depth, self._depth_of[sexpr]) - elif not DaskVineDag.symbolp(sexpr): - for sub in sexpr: - dependencies.update(self.find_dependencies(sub, depth + 1)) - return dependencies + for task in self._working_graph.values(): + self.set_relations(task) - def set_relations(self, key, sexpr): - sexpr = self._working_graph[key] + for task in self._working_graph.values(): + if isinstance(task, dts.DataNode): + self._depth_of[task.key] = 0 + self.set_result(task.key, task.value) - self._children_of[key] = self.find_dependencies(sexpr) - self._depth_of[key] = max([self._depth_of[c] for c in self._children_of[key]]) + 1 if self._children_of[key] else 0 - - self._missing_of[key] = set(self._children_of[key]) - - for c in self._children_of[key]: - self._parents_of[c].add(key) - self._pending_parents_of[c].add(key) + def set_relations(self, task): + self._dependencies_of[task.key] = task.dependencies + self._missing_of[task.key] = set(self._dependencies_of[task.key]) + for c in self._dependencies_of[task.key]: + self._needed_by[c].add(task.key) + self._pending_needed_by[c].add(task.key) def get_ready(self): - """ List of [(key, sexpr),...] ready for computation. + """ List of dts.Task ready for computation. This call should be used only for bootstrapping. Further calls should use DaskVineDag.set_result to discover the new computations that become ready to be executed. """ @@ -134,81 +118,67 @@ def get_ready(self): for (key, cs) in self._missing_of.items(): if self.has_result(key) or cs: continue - sexpr = self._working_graph[key] - if self.graph_keyp(sexpr): - rs.update(self.set_result(key, self.get_result(sexpr))) - elif self.symbolp(sexpr): - rs.update(self.set_result(key, sexpr)) + node = self._working_graph[key] + if self.taskref(node): + rs.update(self.set_result(key, self.get_result(node.key))) + elif self.symbolp(node): + rs.update(self.set_result(key, node)) else: - rs[key] = (key, sexpr) + rs[key] = node + + for r in rs: + if self._dependencies_of[r]: + self._depth_of[r] = min(self._depth_of[d] for d in self._dependencies_of[r]) + 1 + else: + self._depth_of[r] = 0 + return rs.values() def set_result(self, key, value): - """ Sets new result and propagates in the DaskVineDag. Returns a list of [(key, sexpr),...] + """ Sets new result and propagates in the DaskVineDag. Returns a list of dts.Task of computations that become ready to be executed """ rs = {} self._result_of[key] = value - for p in self._parents_of[key]: + for p in self._pending_needed_by[key]: self._missing_of[p].discard(key) if self._missing_of[p]: + # the key p still has dependencies unmet... continue - sexpr = self._working_graph[p] - if self.graph_keyp(sexpr): + node = self._working_graph[p] + if self.taskref(node): rs.update( - self.set_result(p, self.get_result(sexpr)) + self.set_result(p, self.get_result(node)) ) # case e.g, "x": "y", and we just set the value of "y" - elif self.symbolp(sexpr): - rs.update(self.set_result(p, sexpr)) + elif self.symbolp(node): + rs.update(self.set_result(p, node)) else: - rs[p] = (p, sexpr) + rs[p] = node - for c in self._children_of[key]: - self._pending_parents_of[c].discard(key) + for r in rs: + if self._dependencies_of[r]: + self._depth_of[r] = min(self._depth_of[d] for d in self._dependencies_of[r]) + 1 + else: + self._depth_of[r] = 0 - return rs.values() + for c in self._dependencies_of[key]: + self._pending_needed_by[c].discard(key) - def _flatten_graph(self): - """ Recursively decomposes a sexpr associated with key, so that its arguments, if any - are keys. """ - for key in list(self._working_graph.keys()): - self.flatten_rec(key, self._working_graph[key], toplevel=True) + return rs.values() def _add_second_targets(self, key): v = self._working_graph[key] - if self.graph_keyp(v): + if self.taskref(v): lst = [v] - elif DaskVineDag.listp(v): + elif DaskVineDag.containerp(v): lst = v else: return for c in lst: - if self.graph_keyp(c): - self._targets.add(c) - self._add_second_targets(c) - - def flatten_rec(self, key, sexpr, toplevel=False): - if key in self._working_graph and not toplevel: - return - if DaskVineDag.symbolp(sexpr): - return - - nargs = [] - next_flat = [] - cons = type(sexpr) - - for arg in sexpr: - if DaskVineDag.symbolp(arg): - nargs.append(arg) - else: - next_key = uuid4() - nargs.append(next_key) - next_flat.append((next_key, arg)) - - self._working_graph[key] = cons(nargs) - for (n, a) in next_flat: - self.flatten_rec(n, a) + if self.taskref(c): + self._targets.add(c.key) + self._add_second_targets(c.key) def has_result(self, key): return key in self._result_of @@ -219,17 +189,17 @@ def get_result(self, key): except KeyError: raise DaskVineNoResult(key) - def get_children(self, key): - return self._children_of[key] + def get_dependencies(self, key): + return self._dependencies_of[key] - def get_missing_children(self, key): + def get_missing_dependencies(self, key): return self._missing_of[key] - def get_parents(self, key): - return self._parents_of[key] + def get_needed_by(self, key): + return self._needed_by[key] - def get_pending_parents(self, key): - return self._pending_parents_of[key] + def get_pending_needed_by(self, key): + return self._pending_needed_by[key] def set_targets(self, keys): """ Values of keys that need to be computed. """ diff --git a/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py b/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py index ebab6a1f95..36a0506cfd 100644 --- a/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py +++ b/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py @@ -14,6 +14,13 @@ from .dask_dag import DaskVineDag from .cvine import VINE_TEMP +try: + import dask._task_spec as dts +except ModuleNotFoundError: + print("Dask version 2024.12.0 or greater not found.") + raise + + import os import time import random @@ -77,8 +84,6 @@ class DaskVine(Manager): # @param env_vars A dictionary of VAR=VALUE environment variables to set per task. A value # should be either a string, or a function that accepts as arguments the manager # and task, and that returns a string. - # @param low_memory_mode Split graph vertices to reduce memory needed per function call. It - # removes some of the dask graph optimizations, thus proceed with care. # @param checkpoint_fn When using worker_transfers, a predicate with arguments (dag, key) # called before submitting a task. If True, the result is brought back # to the manager. @@ -113,7 +118,6 @@ def get(self, dsk, keys, *, extra_files=None, worker_transfers=True, env_vars=None, - low_memory_mode=False, checkpoint_fn=None, resources=None, resources_mode=None, @@ -138,6 +142,10 @@ def get(self, dsk, keys, *, lazy_transfers=True, # Deprecated, use worker_tranfers ): try: + + # just in case, convert any tuple task to dts.Task + dsk = dts.convert_legacy_graph(dsk) + self.set_property("framework", "dask") if retries and retries < 1: raise ValueError("retries should be larger than 0") @@ -154,7 +162,6 @@ def get(self, dsk, keys, *, else: self.worker_transfers = True self.env_vars = env_vars - self.low_memory_mode = low_memory_mode self.checkpoint_fn = checkpoint_fn self.resources = resources self.resources_mode = resources_mode @@ -212,7 +219,7 @@ def _dask_execute(self, dsk, keys): indices = {k: inds for (k, inds) in find_dask_keys(keys)} keys_flatten = indices.keys() - dag = DaskVineDag(dsk, low_memory_mode=self.low_memory_mode) + dag = DaskVineDag(dsk) tag = f"dag-{id(dag)}" # create Library if using 'function-calls' task mode. @@ -300,7 +307,7 @@ def _dask_execute(self, dsk, keys): retries_left = t.decrement_retry() print(f"task id {t.id} key {t.key} failed: {t.result}. {retries_left} attempts left.\n{t.std_output}") if retries_left > 0: - self._enqueue_dask_calls(dag, tag, [(t.key, t.sexpr)], retries_left, enqueued_calls) + self._enqueue_dask_calls(dag, tag, [t.dask_task], retries_left, enqueued_calls) else: raise Exception(f"tasks for key {t.key} failed permanently") t = None # drop task reference @@ -332,54 +339,68 @@ def update(*args, **kwargs): return (progress, update) - def category_name(self, sexpr): - if DaskVineDag.taskp(sexpr): - return str(sexpr[0]).replace(" ", "_") + def category_name(self, node): + if DaskVineDag.taskp(node): + return str(node.func).replace(" ", "_") else: return "other" + def _task_priority(self, dag, cat, key): + task_depth = dag.depth_of(key) + + if self.scheduling_mode == "random": + priority = random.randint(self.min_priority, self.max_priority) + elif self.scheduling_mode == "depth-first": + # dig more information about different kinds of tasks + priority = task_depth + elif self.scheduling_mode == "breadth-first": + # prefer to start all branches as soon as possible + priority = -task_depth + elif self.scheduling_mode == "longest-category-first": + # if no tasks have been executed in this category, set a high priority + # so that we know more information about each category + if self.category_info[cat]["num_tasks"]: + priority = ( + self.category_info[cat]["total_execution_time"] + / self.category_info[cat]["num_tasks"] + ) + else: + priority = self.max_priority + elif self.scheduling_mode == "shortest-category-first": + # if no tasks have been executed in this category, set a high priority + # so that we know more information about each category + if self.category_info[cat]["num_tasks"]: + priority = ( + -self.category_info[cat]["total_execution_time"] + / self.category_info[cat]["num_tasks"] + ) + else: + priority = self.max_priority + elif self.scheduling_mode == "FIFO": + # first in first out, the default behavior + priority = -round(time.time(), 6) + elif self.scheduling_mode == "LIFO": + # last in first out, the opposite of FIFO + priority = round(time.time(), 6) + elif self.scheduling_mode == "largest-input-first": + # best for saving disk space (with pruing) + priority = sum([len(dag.get_result(c)._file) for c in dag.get_dependencies(key)]) + else: + raise ValueError(f"Unknown scheduling mode {self.scheduling_mode}") + + return priority + def _enqueue_dask_calls(self, dag, tag, rs, retries, enqueued_calls): targets = dag.get_targets() - for (k, sexpr) in rs: + for dask_task in rs: + k = dask_task.key lazy = self.worker_transfers and k not in targets if lazy and self.checkpoint_fn: lazy = self.checkpoint_fn(dag, k) # each task has a category name - cat = self.category_name(sexpr) - - task_depth = dag.depth_of(k) - if self.scheduling_mode == 'random': - priority = random.randint(self.min_priority, self.max_priority) - elif self.scheduling_mode == 'depth-first': - # dig more information about different kinds of tasks - priority = task_depth - elif self.scheduling_mode == 'breadth-first': - # prefer to start all branches as soon as possible - priority = -task_depth - elif self.scheduling_mode == 'longest-category-first': - # if no tasks have been executed in this category, set a high priority so that we know more information about each category - if self.category_info[cat]["num_tasks"]: - priority = self.category_info[cat]["total_execution_time"] / self.category_info[cat]["num_tasks"] - else: - priority = self.max_priority - elif self.scheduling_mode == 'shortest-category-first': - # if no tasks have been executed in this category, set a high priority so that we know more information about each category - if self.category_info[cat]["num_tasks"]: - priority = -self.category_info[cat]["total_execution_time"] / self.category_info[cat]["num_tasks"] - else: - priority = self.max_priority - elif self.scheduling_mode == 'FIFO': - # first in first out, the default behavior - priority = -round(time.time(), 6) - elif self.scheduling_mode == 'LIFO': - # last in first out, the opposite of FIFO - priority = round(time.time(), 6) - elif self.scheduling_mode == 'largest-input-first': - # best for saving disk space (with pruing) - priority = sum([len(dag.get_result(c)._file) for c in dag.get_children(k)]) - else: - raise ValueError(f"Unknown scheduling mode {self.scheduling_mode}") + cat = self.category_name(dask_task) + priority = self._task_priority(dag, cat, k) if self.task_mode == 'tasks': if cat not in self._categories_known: @@ -393,7 +414,7 @@ def _enqueue_dask_calls(self, dag, tag, rs, retries, enqueued_calls): self._categories_known.add(cat) t = PythonTaskDask(self, - dag, k, sexpr, + dag, dask_task, category=cat, environment=self.environment, extra_files=self.extra_files, @@ -405,7 +426,8 @@ def _enqueue_dask_calls(self, dag, tag, rs, retries, enqueued_calls): t.set_priority(priority) if self.env_per_task: t.set_command( - f"mkdir envdir && tar -xf {self._environment_name} -C envdir && envdir/bin/run_in_env {t._command}") + f"mkdir envdir && tar -xf {self._environment_name} -C envdir && envdir/bin/run_in_env {t._command}" + ) t.add_input(self.environment_file, self.environment_name) t.set_tag(tag) # tag that identifies this dag @@ -413,7 +435,7 @@ def _enqueue_dask_calls(self, dag, tag, rs, retries, enqueued_calls): if self.task_mode == 'function-calls': t = FunctionCallDask(self, - dag, k, sexpr, + dag, dask_task, category=cat, extra_files=self.extra_files, retries=retries, @@ -436,7 +458,7 @@ def _load_results(self, dag, key_indices, keys): def _fill_key_result(self, dag, key): raw = dag.get_result(key) - if DaskVineDag.listp(raw): + if DaskVineDag.containerp(raw): result = list(raw) file_indices = find_result_files(raw) for (f, inds) in file_indices: @@ -448,9 +470,9 @@ def _fill_key_result(self, dag, key): return raw def _prune_file(self, dag, key): - children = dag.get_children(key) + children = dag.get_dependencies(key) for c in children: - if len(dag.get_pending_parents(c)) == 0: + if len(dag.get_pending_needed_by(c)) == 0: c_result = dag.get_result(c) self.prune_file(c_result._file) @@ -497,7 +519,7 @@ def is_temp(self): def ready_for_gc(self): # file on disk ready to be gc if the keys that needed as an input for computation are themselves ready. if not self._ready_for_gc: - self._ready_for_gc = all(f.ready_for_gc() for f in self._dag.get_parents().values()) + self._ready_for_gc = all(f.ready_for_gc() for f in self._dag.get_needed_by().values()) return self._ready_for_gc @@ -528,8 +550,7 @@ class PythonTaskDask(PythonTask): # @param self This task object. # @param m TaskVine manager object. # @param dag Dask graph object. - # @param key Key of task in graph. - # @param sexpr Positional arguments to function. + # @param dask_task Dask task encoding a computation. # @param category TaskVine category name. # @param environment TaskVine execution environment. # @param extra_files Additional files to provide to the task. @@ -539,7 +560,7 @@ class PythonTaskDask(PythonTask): # @param wrapper # def __init__(self, m, - dag, key, sexpr, *, + dag, dask_task, *, category=None, environment=None, extra_files=None, @@ -547,14 +568,13 @@ def __init__(self, m, retries=5, worker_transfers=False, wrapper=None): - self._key = key - self._sexpr = sexpr + self._dask_task = dask_task self._retries_left = retries self._wrapper_output_file = None self._wrapper_output = None - args_raw = {k: dag.get_result(k) for k in dag.get_children(key)} + args_raw = {k: dag.get_result(k) for k in dag.get_dependencies(self.key)} args = { k: f"{uuid4()}.p" for k, v in args_raw.items() @@ -564,7 +584,7 @@ def __init__(self, m, keys_of_files = list(args.keys()) args = args_raw | args - super().__init__(execute_graph_vertex, wrapper, key, sexpr, args, keys_of_files) + super().__init__(execute_graph_vertex, wrapper, dask_task, args, keys_of_files) if wrapper: wo = m.declare_buffer() self.add_output(wo, "wrapper.output") @@ -595,11 +615,11 @@ def __init__(self, m, @property def key(self): - return self._key + return self._dask_task.key @property - def sexpr(self): - return self._sexpr + def dask_task(self): + return self._dask_task def decrement_retry(self): self._retries_left -= 1 @@ -631,7 +651,7 @@ class FunctionCallDask(FunctionCall): # @param self This task object. # @param m TaskVine manager object. # @param dag Dask graph object. - # @param key Key of task in graph. + # @param dask_task Dask task encoding a computation. # @param sexpr Positional arguments to function. # @param category TaskVine category name. # @param resources Resources to be set for a FunctionCall. @@ -641,7 +661,7 @@ class FunctionCallDask(FunctionCall): # def __init__(self, m, - dag, key, sexpr, *, + dag, dask_task, *, category=None, resources=None, extra_files=None, @@ -649,12 +669,11 @@ def __init__(self, m, worker_transfers=False, wrapper=None): - self._key = key + self._dask_task = dask_task self.resources = resources - self._sexpr = sexpr self._retries_left = retries - args_raw = {k: dag.get_result(k) for k in dag.get_children(key)} + args_raw = {k: dag.get_result(k) for k in dag.get_dependencies(self.key)} args = {k: f"{uuid4()}.p" for k, v in args_raw.items() if isinstance(v, DaskVineFile)} keys_of_files = list(args.keys()) @@ -663,7 +682,7 @@ def __init__(self, m, self._wrapper_output_file = None self._wrapper_output = None - super().__init__(f'Dask-Library-{id(dag)}', 'execute_graph_vertex', wrapper, key, sexpr, args, keys_of_files) + super().__init__(f'Dask-Library-{id(dag)}', 'execute_graph_vertex', wrapper, dask_task, args, keys_of_files) if wrapper: wo = m.declare_buffer() self.add_output(wo, "wrapper.output") @@ -686,11 +705,11 @@ def __init__(self, m, @property def key(self): - return self._key + return self.dask_task.key @property - def sexpr(self): - return self._sexpr + def dask_task(self): + return self._dask_task def decrement_retry(self): self._retries_left -= 1 @@ -708,50 +727,41 @@ def load_wrapper_output(self, manager): return self._wrapper_output -def execute_graph_vertex(wrapper, key, sexpr, args, keys_of_files): +def execute_graph_vertex(wrapper, dask_task, task_args, keys_of_files): import traceback import cloudpickle - from ndcctools.taskvine import DaskVineDag - - def rec_call(sexpr): - if DaskVineDag.keyp(sexpr) and sexpr in args: - return args[sexpr] - elif DaskVineDag.taskp(sexpr): - return sexpr[0](*[rec_call(a) for a in sexpr[1:]]) - elif DaskVineDag.listp(sexpr): - return [rec_call(a) for a in sexpr] - else: - return sexpr - for k in keys_of_files: + for key in keys_of_files: + filename = task_args[key] try: - with open(args[k], "rb") as f: + with open(filename, "rb") as f: arg = cloudpickle.load(f) if isinstance(arg, dict) and 'Result' in arg and arg['Result'] is not None: arg = arg['Result'] - args[k] = arg + task_args[key] = arg except Exception as e: - print(f"Could not read input file {args[k]} for key {k}: {e}") + print(f"Could not read input file {filename} for key {key}: {e}") raise try: if wrapper: try: wrapper_result = None - (wrapper_result, call_result) = wrapper(key, rec_call, sexpr) + (wrapper_result, call_result) = wrapper(dask_task, task_args) return call_result except Exception: - print(f"Wrapped call for {key} failed.") + print(f"Wrapped call for {dask_task} failed.") raise finally: try: with open("wrapper.output", "wb") as f: cloudpickle.dump(wrapper_result, f) except Exception: - print(f"Wrapped call for {key} failed to write output.") + print(f"Wrapped call for {dask_task.key} failed to write output.") raise else: - return rec_call(sexpr) + print(f"args {dask_task} {task_args} {keys_of_files} {dask_task.dependencies}") + return dask_task(task_args) except Exception: print(traceback.format_exc()) raise diff --git a/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py b/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py index b74cc1a3c0..7cc62b23f5 100644 --- a/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py +++ b/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py @@ -460,6 +460,7 @@ def vineLoadArg(arg): args = [vineLoadArg(arg) if isinstance(arg, dict) and "VineFutureFile" in arg else arg for arg in args] error = None try: + print(args, kwargs) exec_out = exec_function(*args, **kwargs) except Exception as e: exec_out = e diff --git a/taskvine/src/examples/vine_example_dask_graph.py b/taskvine/src/examples/vine_example_dask_graph.py index 5690f2f436..8e0d3cb502 100644 --- a/taskvine/src/examples/vine_example_dask_graph.py +++ b/taskvine/src/examples/vine_example_dask_graph.py @@ -12,12 +12,13 @@ import argparse import getpass import sys - import traceback +import dask from operator import add # use add function in the example graph dsk_graph = { + "A": [1, 2, 3, "x"], "x": 1, "y": 2, "z": (add, "x", "y"), @@ -26,7 +27,7 @@ "t": (sum, "v") } -expected_result = 11 +expected_result = dask.get(dsk_graph, "t") if __name__ == "__main__": parser = argparse.ArgumentParser( @@ -65,9 +66,9 @@ if args.disable_peer_transfers: m.disable_peer_transfers() - # checkpoint at even levels when nodes have at least one children + # checkpoint at even levels when nodes have at least one dependency def checkpoint(dag, key): - if dag.depth_of(key) % 2 == 0 and len(dag.get_children(key)) > 0: + if dag.depth_of(key) % 2 == 0 and len(dag.get_dependencies(key)) > 0: print(f"checkpoint for {key}") return True return False @@ -77,12 +78,13 @@ def checkpoint(dag, key): f.max_workers = 1 f.min_workers = 1 with f: - desired_keys = ["t", "w"] + desired_keys = ["t", "v"] + desired_keys = list(dsk_graph.keys()) print(f"dask graph example is:\n{dsk_graph}") print(f"desired keys are {desired_keys}") try: - results = m.get(dsk_graph, desired_keys, lazy_transfers=True, checkpoint_fn=checkpoint, resources={"cores": 1}) # 1 core per step + results = m.get(dsk_graph, desired_keys, lazy_transfers=True, checkpoint_fn=checkpoint, resources={"cores": 1}, task_mode="function-calls") # 1 core per step print({k: v for k, v in zip(desired_keys, results)}) except Exception: traceback.print_exc() diff --git a/taskvine/test/TR_vine_python_unlink_when_done.sh b/taskvine/test/TR_vine_python_unlink_when_done.sh index 608720230c..92972ecf4e 100755 --- a/taskvine/test/TR_vine_python_unlink_when_done.sh +++ b/taskvine/test/TR_vine_python_unlink_when_done.sh @@ -15,7 +15,7 @@ PORT_FILE=vine.port check_needed() { [ -n "${CCTOOLS_PYTHON_TEST_EXEC}" ] || return 1 - "${CCTOOLS_PYTHON_TEST_EXEC}" -c "import cloudpickle" || return 1 + "${CCTOOLS_PYTHON_TEST_EXEC}" -c "import cloudpickle; import dask" || return 1 return 0 }