diff --git a/taskvine/src/bindings/python3/ndcctools/taskvine/file.py b/taskvine/src/bindings/python3/ndcctools/taskvine/file.py index b6eb5bc5a6..69d006f74e 100644 --- a/taskvine/src/bindings/python3/ndcctools/taskvine/file.py +++ b/taskvine/src/bindings/python3/ndcctools/taskvine/file.py @@ -66,6 +66,12 @@ def identity_read(x): elif ftype == cvine.VINE_FILE: with open(self.source(), "rb") as f: return unserializer(f) + elif ftype == cvine.VINE_TEMP: + try: + with io.BytesIO(cvine.vine_file_contents_as_bytes(self._file)) as f: + return unserializer(f) + except Exception as e: + raise e("Temp file does not have local contents, The file much be fetched beforehand", self.type()) else: raise ValueError("File does not have local contents", self.type()) diff --git a/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py b/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py index 90db1091f2..7dfa228ae6 100644 --- a/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py +++ b/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py @@ -193,7 +193,7 @@ def submit(self, fn, *args, **kwargs): return future_task._future def future_task(self, fn, *args, **kwargs): - return FuturePythonTask(self.manager, False, fn, *args, **kwargs) + return FuturePythonTask(self.manager, fn, *args, **kwargs) def create_library_from_functions(self, name, *function_list, poncho_env=None, init_command=None, add_env=True, import_modules=None): return self.manager.create_library_from_functions(name, *function_list, retrieve_output, poncho_env=poncho_env, init_command=init_command, add_env=add_env, import_modules=import_modules) @@ -202,7 +202,7 @@ def install_library(self, libtask): self.manager.install_library(libtask) def future_funcall(self, library_name, fn, *args, **kwargs): - return FutureFunctionCall(self.manager, False, library_name, fn, *args, **kwargs) + return FutureFunctionCall(self.manager, library_name, fn, *args, **kwargs) def set(self, name, value): if self.factory: @@ -234,6 +234,7 @@ def __init__(self, task): self._callback_fns = [] self._result = None self._is_submitted = False + self._ran_functions = False def cancel(self): self._task._module_manager.cancel_by_task_id(self._task.id) @@ -269,6 +270,10 @@ def result(self, timeout="wait_forever"): else: self._result = result self._state = FINISHED + if self._callback_fns and not self._ran_functions: + self._ran_functions = True + for fn in self._callback_fns: + fn(self) return result def add_done_callback(self, fn): @@ -283,7 +288,7 @@ def add_done_callback(self, fn): # This class is a sublcass of FunctionCall that is specialized for future execution class FutureFunctionCall(FunctionCall): - def __init__(self, manager, is_retriever, library_name, fn, *args, **kwargs): + def __init__(self, manager, library_name, fn, *args, **kwargs): super().__init__(library_name, fn, *args, **kwargs) self.enable_temp_output() self.manager = manager @@ -291,64 +296,37 @@ def __init__(self, manager, is_retriever, library_name, fn, *args, **kwargs): self._envs = [] self._future = VineFuture(self) - self._is_retriever = is_retriever self._has_retrieved = False - self._retriever = None - self._ran_functions = False - - # Set a retriever for this task, it has to disable temp_output b/c - # it aims to bring the remote output back to the manager - def set_retriever(self): - self._retriever = FutureFunctionCall(self.manager, True, self.library_name, 'retrieve_output', self._future) - self._retriever.disable_temp_output() - self.manager.submit(self._retriever) # Given that every designated task stores its outcome in a temp file, - # this function is invoked through `VineFuture.result()` to trigger its retriever + # we must first fetch the file before retruning the result. # to bring that output back to the manager. def output(self, timeout="wait_forever"): - # when output() is called, set a retriever task to bring back the on-worker output - if not self._is_retriever and not self._retriever: - self.set_retriever() - - # for retrievee task: wait for retriever to get result - if not self._is_retriever: - if self._saved_output: - return self._saved_output - result = self._retriever.output(timeout=timeout) - if result is RESULT_PENDING: + if not self._has_retrieved: + result = self.manager.wait_for_task_id(self.id, timeout=timeout) + if result: + self._has_retrieved = True + else: return RESULT_PENDING - self._saved_output = result - if not self._ran_functions: - self._ran_functions = True - for fn in self._future._callback_fns: - fn(self._future) - return self._saved_output - - # for retriever task: fetch the result of its retrievee on completion - if self._is_retriever: - if not self._has_retrieved: - result = self._manager.wait_for_task_id(self.id, timeout=timeout) - if result: - self._has_retrieved = True - else: - return RESULT_PENDING - if not self._saved_output and self._has_retrieved: - if self.successful(): - try: - output = cloudpickle.loads(self._output_file.contents()) - if output['Success']: - self._saved_output = output['Result'] - else: - self._saved_output = output['Reason'] - - except Exception as e: - self._saved_output = e - raise e - else: - self._saved_output = FunctionCallNoResult() - return self._saved_output + + if not self._saved_output and self._has_retrieved: + if self.successful(): + try: + self.manager.fetch_file(self._output_file) + output = cloudpickle.loads(self._output_file.contents()) + if output['Success']: + self._saved_output = output['Result'] + else: + self._saved_output = output['Reason'] + + except Exception as e: + self._saved_output = e + raise e + else: + self._saved_output = FunctionCallNoResult() + self._output_loaded = True + return self._saved_output # gather results from preceding tasks to use as inputs for this specific task def submit_finalize(self): @@ -390,63 +368,38 @@ class FuturePythonTask(PythonTask): # @param func # @param args # @param kwargs - def __init__(self, manager, rf, func, *args, **kwargs): + def __init__(self, manager, func, *args, **kwargs): super(FuturePythonTask, self).__init__(func, *args, **kwargs) self.enable_temp_output() self._module_manager = manager self._future = VineFuture(self) self._envs = [] self._has_retrieved = False - self._ran_functions = False - self._is_retriever = rf - self._retriever = None def output(self, timeout="wait_forever"): - def retrieve_output(arg): - return arg - if not self._is_retriever and not self._retriever: - self._retriever = FuturePythonTask(self._module_manager, True, retrieve_output, self._future) - self._retriever.set_cores(1) - for env in self._envs: - self._retriever.add_environment(env) - self._retriever.disable_temp_output() - self._module_manager.submit(self._retriever) - - if not self._is_retriever: - if not self._output_loaded: - result = self._retriever._future.result(timeout=timeout) - if result == RESULT_PENDING: - return RESULT_PENDING - self._output = result - self._output_loaded = True - if not self._ran_functions: - self._ran_functions = True - for fn in self._future._callback_fns: - fn(self._future) - return self._output - else: - if not self._has_retrieved: - result = self._module_manager.wait_for_task_id(self.id, timeout=timeout) - if result: - self._has_retrieved = True - else: - return RESULT_PENDING - if not self._output_loaded and self._has_retrieved: - if self.successful(): - try: - with open(self._output_file.source(), "rb") as f: - if self._serialize_output: - self._output = cloudpickle.load(f) - else: - self._output = f.read() - except Exception as e: - self._output = e - raise e - else: - self._output = PythonTaskNoResult() - self._output_loaded = True - return self._output + # wait for task to complete if it has not been completed + if not self._has_retrieved: + result = self._module_manager.wait_for_task_id(self.id, timeout=timeout) + if result: + self._has_retrieved = True + else: + return RESULT_PENDING + + # fetch output file and load output + if not self._output_loaded and self._has_retrieved: + if self.successful(): + try: + self._module_manager.fetch_file(self._output_file) + self._output = cloudpickle.loads(self._output_file.contents()) + except Exception as e: + self._output = e + raise e + else: + self._output = PythonTaskNoResult() + self._output_loaded = True + + return self._output def submit_finalize(self): func, args, kwargs = self._fn_def