Skip to content

Commit

Permalink
Futures stream output (#3899)
Browse files Browse the repository at this point in the history
* refactory result retrieval

* fetch file for FuturePythonTask

* update file

* change task creation

* fix function call creation

* funcalls manager name

---------

Co-authored-by: Barry Jay Sly-Delgado <[email protected]>
  • Loading branch information
BarrySlyDelgado and Barry Jay Sly-Delgado authored Aug 5, 2024
1 parent ac9f0b5 commit 7c0510b
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 102 deletions.
6 changes: 6 additions & 0 deletions taskvine/src/bindings/python3/ndcctools/taskvine/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ def identity_read(x):
elif ftype == cvine.VINE_FILE:
with open(self.source(), "rb") as f:
return unserializer(f)
elif ftype == cvine.VINE_TEMP:
try:
with io.BytesIO(cvine.vine_file_contents_as_bytes(self._file)) as f:
return unserializer(f)
except Exception as e:
raise e("Temp file does not have local contents, The file much be fetched beforehand", self.type())
else:
raise ValueError("File does not have local contents", self.type())

Expand Down
157 changes: 55 additions & 102 deletions taskvine/src/bindings/python3/ndcctools/taskvine/futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ def submit(self, fn, *args, **kwargs):
return future_task._future

def future_task(self, fn, *args, **kwargs):
return FuturePythonTask(self.manager, False, fn, *args, **kwargs)
return FuturePythonTask(self.manager, fn, *args, **kwargs)

def create_library_from_functions(self, name, *function_list, poncho_env=None, init_command=None, add_env=True, import_modules=None):
return self.manager.create_library_from_functions(name, *function_list, retrieve_output, poncho_env=poncho_env, init_command=init_command, add_env=add_env, import_modules=import_modules)
Expand All @@ -202,7 +202,7 @@ def install_library(self, libtask):
self.manager.install_library(libtask)

def future_funcall(self, library_name, fn, *args, **kwargs):
return FutureFunctionCall(self.manager, False, library_name, fn, *args, **kwargs)
return FutureFunctionCall(self.manager, library_name, fn, *args, **kwargs)

def set(self, name, value):
if self.factory:
Expand Down Expand Up @@ -234,6 +234,7 @@ def __init__(self, task):
self._callback_fns = []
self._result = None
self._is_submitted = False
self._ran_functions = False

def cancel(self):
self._task._module_manager.cancel_by_task_id(self._task.id)
Expand Down Expand Up @@ -269,6 +270,10 @@ def result(self, timeout="wait_forever"):
else:
self._result = result
self._state = FINISHED
if self._callback_fns and not self._ran_functions:
self._ran_functions = True
for fn in self._callback_fns:
fn(self)
return result

def add_done_callback(self, fn):
Expand All @@ -283,72 +288,45 @@ def add_done_callback(self, fn):
# This class is a sublcass of FunctionCall that is specialized for future execution

class FutureFunctionCall(FunctionCall):
def __init__(self, manager, is_retriever, library_name, fn, *args, **kwargs):
def __init__(self, manager, library_name, fn, *args, **kwargs):
super().__init__(library_name, fn, *args, **kwargs)
self.enable_temp_output()
self.manager = manager
self.library_name = library_name
self._envs = []

self._future = VineFuture(self)
self._is_retriever = is_retriever
self._has_retrieved = False
self._retriever = None
self._ran_functions = False

# Set a retriever for this task, it has to disable temp_output b/c
# it aims to bring the remote output back to the manager
def set_retriever(self):
self._retriever = FutureFunctionCall(self.manager, True, self.library_name, 'retrieve_output', self._future)
self._retriever.disable_temp_output()
self.manager.submit(self._retriever)

# Given that every designated task stores its outcome in a temp file,
# this function is invoked through `VineFuture.result()` to trigger its retriever
# we must first fetch the file before retruning the result.
# to bring that output back to the manager.
def output(self, timeout="wait_forever"):

# when output() is called, set a retriever task to bring back the on-worker output
if not self._is_retriever and not self._retriever:
self.set_retriever()

# for retrievee task: wait for retriever to get result
if not self._is_retriever:
if self._saved_output:
return self._saved_output
result = self._retriever.output(timeout=timeout)
if result is RESULT_PENDING:
if not self._has_retrieved:
result = self.manager.wait_for_task_id(self.id, timeout=timeout)
if result:
self._has_retrieved = True
else:
return RESULT_PENDING
self._saved_output = result
if not self._ran_functions:
self._ran_functions = True
for fn in self._future._callback_fns:
fn(self._future)
return self._saved_output

# for retriever task: fetch the result of its retrievee on completion
if self._is_retriever:
if not self._has_retrieved:
result = self._manager.wait_for_task_id(self.id, timeout=timeout)
if result:
self._has_retrieved = True
else:
return RESULT_PENDING
if not self._saved_output and self._has_retrieved:
if self.successful():
try:
output = cloudpickle.loads(self._output_file.contents())
if output['Success']:
self._saved_output = output['Result']
else:
self._saved_output = output['Reason']

except Exception as e:
self._saved_output = e
raise e
else:
self._saved_output = FunctionCallNoResult()
return self._saved_output

if not self._saved_output and self._has_retrieved:
if self.successful():
try:
self.manager.fetch_file(self._output_file)
output = cloudpickle.loads(self._output_file.contents())
if output['Success']:
self._saved_output = output['Result']
else:
self._saved_output = output['Reason']

except Exception as e:
self._saved_output = e
raise e
else:
self._saved_output = FunctionCallNoResult()
self._output_loaded = True
return self._saved_output

# gather results from preceding tasks to use as inputs for this specific task
def submit_finalize(self):
Expand Down Expand Up @@ -390,63 +368,38 @@ class FuturePythonTask(PythonTask):
# @param func
# @param args
# @param kwargs
def __init__(self, manager, rf, func, *args, **kwargs):
def __init__(self, manager, func, *args, **kwargs):
super(FuturePythonTask, self).__init__(func, *args, **kwargs)
self.enable_temp_output()
self._module_manager = manager
self._future = VineFuture(self)
self._envs = []
self._has_retrieved = False
self._ran_functions = False
self._is_retriever = rf
self._retriever = None

def output(self, timeout="wait_forever"):
def retrieve_output(arg):
return arg
if not self._is_retriever and not self._retriever:
self._retriever = FuturePythonTask(self._module_manager, True, retrieve_output, self._future)
self._retriever.set_cores(1)
for env in self._envs:
self._retriever.add_environment(env)
self._retriever.disable_temp_output()
self._module_manager.submit(self._retriever)

if not self._is_retriever:
if not self._output_loaded:
result = self._retriever._future.result(timeout=timeout)
if result == RESULT_PENDING:
return RESULT_PENDING
self._output = result
self._output_loaded = True
if not self._ran_functions:
self._ran_functions = True
for fn in self._future._callback_fns:
fn(self._future)
return self._output

else:
if not self._has_retrieved:
result = self._module_manager.wait_for_task_id(self.id, timeout=timeout)
if result:
self._has_retrieved = True
else:
return RESULT_PENDING
if not self._output_loaded and self._has_retrieved:
if self.successful():
try:
with open(self._output_file.source(), "rb") as f:
if self._serialize_output:
self._output = cloudpickle.load(f)
else:
self._output = f.read()
except Exception as e:
self._output = e
raise e
else:
self._output = PythonTaskNoResult()
self._output_loaded = True
return self._output
# wait for task to complete if it has not been completed
if not self._has_retrieved:
result = self._module_manager.wait_for_task_id(self.id, timeout=timeout)
if result:
self._has_retrieved = True
else:
return RESULT_PENDING

# fetch output file and load output
if not self._output_loaded and self._has_retrieved:
if self.successful():
try:
self._module_manager.fetch_file(self._output_file)
self._output = cloudpickle.loads(self._output_file.contents())
except Exception as e:
self._output = e
raise e
else:
self._output = PythonTaskNoResult()
self._output_loaded = True

return self._output

def submit_finalize(self):
func, args, kwargs = self._fn_def
Expand Down

0 comments on commit 7c0510b

Please sign in to comment.