Skip to content

Commit

Permalink
lint
Browse files Browse the repository at this point in the history
  • Loading branch information
Kevin Xue committed Dec 18, 2024
1 parent 658c212 commit 378aa52
Showing 1 changed file with 32 additions and 29 deletions.
61 changes: 32 additions & 29 deletions taskvine/src/bindings/python3/ndcctools/taskvine/futures.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from . import cvine
import hashlib
from collections import deque
from collections import deque, namedtuple
from concurrent.futures import Executor
from concurrent.futures import Future
from concurrent.futures import FIRST_COMPLETED
Expand All @@ -10,7 +10,6 @@
from concurrent.futures._base import CANCELLED
from concurrent.futures._base import FINISHED
from concurrent.futures import TimeoutError
from collections import namedtuple, deque
from .task import (
PythonTask,
FunctionCall,
Expand Down Expand Up @@ -142,9 +141,11 @@ def _iterator():

return _iterator()


def run_iterable(fn, iterable, dimensions=1):

if not ((hasattr(iterable, '__iter__') or hasattr(iterable, '__getitem__')) and not isinstance(iterable, str)):
return fn(element)
return fn(iterable)
if dimensions < 1:
return None
result = []
Expand All @@ -156,14 +157,15 @@ def run_iterable(fn, iterable, dimensions=1):
result.append(fn(element))
else:
for inner_iterable in iterable:
result.append(run_iterable(fn, inner_iterable, dimensions-1))
result.append(run_iterable(fn, inner_iterable, dimensions - 1))
return result


def reduction_tree(fn, *args):

minimum_parameters = len(inspect.signature(fn).parameters)
curr_size = len(args)
entries = deque([f.result() if isinstance(f, VineFuture) else f for f in args])
return_val = entries
while curr_size >= minimum_parameters:
parameters = []
for _ in range(minimum_parameters):
Expand All @@ -175,7 +177,6 @@ def reduction_tree(fn, *args):
else:
entries.appendleft(new_result)
curr_size = len(entries)
return_val = entries if len(entries) > 1 else entries[0]
return entries[0]
##
# \class FuturesExecutor
Expand All @@ -184,7 +185,9 @@ def reduction_tree(fn, *args):
#
# This class acts as an interface for the creation of Futures


class FuturesExecutor(Executor):

def __init__(self, port=9123, batch_type="local", manager=None, manager_host_port=None, manager_name=None, factory_binary=None, worker_binary=None, log_file=os.devnull, factory=True, opts={}):
self.manager = Manager(port=port)
self.port = self.manager.port
Expand All @@ -207,28 +210,29 @@ def __init__(self, port=9123, batch_type="local", manager=None, manager_host_por
self.set(opt, opts[opt])
self.factory.start()
else:
self.factory = None
self.factory = None

def map(self, fn, iterable, library_name="Some_Library", method=None, chunk_size=1):
def wait_for_map_resolution(*futures_batch):
result = []
for computed_result in futures_batch:
result.extend(computed_result)
result.extend(computed_result)
return result
if (hasattr(iterable, '__iter__') or hasattr(iterable, '__getitem__')) and not isinstance(iterable, str):
if (hasattr(iterable, '__iter__') or hasattr(iterable, '__getitem__')) and not isinstance(iterable, str):
tasks = []
if method == "FutureFunctionCall": # this currently does not work (error described in PR)
if method == "FutureFunctionCall": # this currently does not work (error described in PR)
partial_obj = partial(run_iterable, fn)

def partial_func(*args, **kwargs):
return partial_obj(*args, **kwargs)
libtask = self.create_library_from_functions(library_name, partial_func)
self.install_library(libtask)
for i in range(0, len(iterable), chunk_size):
future_batch_task = self.submit(self.future_funcall(library_name, partial_func, iterable[i:i+chunk_size]))
future_batch_task = self.submit(self.future_funcall(library_name, partial_func, iterable[i:i + chunk_size]))
tasks.append(future_batch_task)
else:
for i in range(0, len(iterable), chunk_size):
future_batch_task = self.submit(run_iterable, fn, iterable[i:i+chunk_size])
future_batch_task = self.submit(run_iterable, fn, iterable[i:i + chunk_size])
tasks.append(future_batch_task)
future = self.submit(wait_for_map_resolution, *tasks)
else:
Expand All @@ -246,17 +250,17 @@ def partial_func(*args, **kwargs):

def reduce(self, fn, iterable, library_name=None, method=None, chunk_size=1):
# This line is just the identity - since when a future is pickled, it actually becomes some file, which means it is evaluated immediately/sent to queue
if (hasattr(iterable, '__iter__') or hasattr(iterable, '__getitem__')) and not isinstance(iterable, str):
if (hasattr(iterable, '__iter__') or hasattr(iterable, '__getitem__')) and not isinstance(iterable, str):
sub_futures = [iterable]
num_parameters = len(inspect.signature(fn).parameters)
reduction_size = chunk_size*(num_parameters-1)
reduction_size = chunk_size * (num_parameters - 1)
while len(sub_futures[-1]) > 1 or len(sub_futures) == 1:
layer = []
for i in range(0, len(sub_futures[-1]), reduction_size):
if method == "FutureFunctionCall":
future_batch_task = self.submit(self.future_funcall(library_name, reduction_tree, fn, *[self.submit(fetch_future_result, f) if isinstance(f, VineFuture) else f for f in sub_futures[-1][i:i+reduction_size]]))
else: # Method is FuturePythonTask
future_batch_task = self.submit(reduction_tree, fn, *[f if isinstance(f, VineFuture) else f for f in sub_futures[-1][i:i+reduction_size]])
future_batch_task = self.submit(self.future_funcall(library_name, reduction_tree, fn, *[f if isinstance(f, VineFuture) else f for f in sub_futures[-1][i:i + reduction_size]]))
else: # Method is FuturePythonTask
future_batch_task = self.submit(reduction_tree, fn, *[f if isinstance(f, VineFuture) else f for f in sub_futures[-1][i:i + reduction_size]])
layer.append(future_batch_task)
sub_futures.append(layer)
future = sub_futures[-1][0]
Expand All @@ -272,29 +276,29 @@ def allpairs(self, fn, iterable_a, iterable_b, library_name=None, method=None, c
def wait_for_allpairs_resolution(row_size, *futures_batch):
result = []
for computed_result in futures_batch:
result.extend(computed_result)
result.extend(computed_result)
processed_result = []
for i in range(len(result)//row_size):
row = result[i*row_size:i*row_size+row_size]
for i in range(len(result) // row_size):
row = result[i * row_size:i * row_size + row_size]
processed_result.append(row)
return processed_result
iterable = [(a, b) for b in iterable_b for a in iterable_a]
if (hasattr(iterable, '__iter__') or hasattr(iterable, '__getitem__')) and not isinstance(iterable, str):
if (hasattr(iterable, '__iter__') or hasattr(iterable, '__getitem__')) and not isinstance(iterable, str):
tasks = []
for i in range(0, len(iterable), chunk_size):
if method == "FutureFunctionCall":
future_batch_task = self.submit(self.future_funcall(library_name, run_iterable, fn, iterable[i:i+chunk_size]))
else: # Method is FuturePythonTask
future_batch_task = self.submit(run_iterable, fn, iterable[i:i+chunk_size])
future_batch_task = self.submit(self.future_funcall(library_name, run_iterable, fn, iterable[i:i + chunk_size]))
else: # Method is FuturePythonTask
future_batch_task = self.submit(run_iterable, fn, iterable[i:i + chunk_size])
tasks.append(future_batch_task)
future = self.submit(wait_for_allpairs_resolution, len(iterable_b), *tasks)
else:
if method == "FutureFunctionCall":
future = self.submit(self.future_funcall(library_name, run_iterable, fn, iterable))
else: # Method is FuturePythonTask
else: # Method is FuturePythonTask
future = self.submit(run_iterable, fn, iterable)
return future

def submit(self, fn, *args, **kwargs):
if isinstance(fn, FuturePythonTask):
self.manager.submit(fn)
Expand Down Expand Up @@ -436,7 +440,7 @@ def __init__(self, manager, library_name, fn, *args, **kwargs):
# we must first fetch the file before retruning the result.
# to bring that output back to the manager.
def output(self, timeout="wait_forever"):

if not self._has_retrieved:
result = self.manager.wait_for_task_id(self.id, timeout=timeout)
if result:
Expand Down Expand Up @@ -599,5 +603,4 @@ def vineLoadArg(arg):
manager._function_buffers[base] = manager.declare_file(name, cache=True)
return manager._function_buffers[base]

# vim: set sts=4 sw=4 ts=4 expandtab ft=python:

# vim: set sts=4 sw=4 ts=4 expandtab ft=python:

0 comments on commit 378aa52

Please sign in to comment.