Skip to content

Commit

Permalink
[minor] Change run signature
Browse files Browse the repository at this point in the history
Forget about `force_local_execution`; if you want to run it locally just don't set the damned executor. No point in a boolean runtime flag for this, it's already a boolean choice to set the executor or not! Simplify.
  • Loading branch information
liamhuber committed Sep 20, 2024
1 parent 2893abf commit aba278f
Show file tree
Hide file tree
Showing 5 changed files with 7 additions and 82 deletions.
11 changes: 1 addition & 10 deletions pyiron_workflow/mixin/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ def executor_shutdown(self, wait=True, *, cancel_futures=False):
def run(
self,
check_readiness: bool = True,
force_local_execution: bool = False,
_finished_callback: Optional[callable] = None,
**kwargs,
) -> Any | tuple | Future:
Expand All @@ -115,9 +114,6 @@ def run(
Args:
check_readiness (bool): Whether to raise a `ReadinessError` if not
:attr:`ready`. (Default is True.)
force_local_execution (bool): Whether to run on the main process regardless
of the value of :attr:`executor`. (Default is False, use an
:attr:`executor` if provided.)
_finished_callback (callable): What to do with the output of :meth:`on_run`
after the execution is complete (including waiting for the future to
finish!). This method is responsible for updating the status
Expand All @@ -136,7 +132,6 @@ def run(
if _finished_callback is None
else _finished_callback
),
force_local_execution=force_local_execution,
)
return out
except Exception as e:
Expand Down Expand Up @@ -178,21 +173,17 @@ def _readiness_error_message(self) -> str:
def _run(
self,
finished_callback: callable,
force_local_execution: bool,
) -> Any | tuple | Future:
args, kwargs = self.run_args
if "self" in kwargs.keys():
raise ValueError(
f"{self.label} got 'self' as a run kwarg, but self is already the "
f"first positional argument passed to :meth:`on_run`."
)
if force_local_execution or self.executor is None:
# Run locally
if self.executor is None:
run_output = self.on_run(*args, **kwargs)
return finished_callback(run_output)
else:
# Just blindly try to execute -- as we nail down the executor interaction
# we'll want to fail more cleanly here.
executor = self._parse_executor(self.executor)
if isinstance(self.executor, ThreadPoolExecutor):
self.future = executor.submit(self.thread_pool_run, *args, **kwargs)
Expand Down
17 changes: 2 additions & 15 deletions pyiron_workflow/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,6 @@ def run(
run_parent_trees_too: bool = False,
fetch_input: bool = True,
check_readiness: bool = True,
force_local_execution: bool = False,
emit_ran_signal: bool = True,
**kwargs,
):
Expand Down Expand Up @@ -407,8 +406,6 @@ def run(
is True.)
check_readiness (bool): Whether to raise an exception if the node is not
:attr:`ready` to run after fetching new input. (Default is True.)
force_local_execution (bool): Whether to ignore any executor settings and
force the computation to run locally. (Default is False.)
emit_ran_signal (bool): Whether to fire off all the output `ran` signal
afterwards. (Default is True.)
**kwargs: Keyword arguments matching input channel labels; used to update
Expand All @@ -430,7 +427,6 @@ def run(

return super().run(
check_readiness=check_readiness,
force_local_execution=force_local_execution,
run_data_tree=run_data_tree,
run_parent_trees_too=run_parent_trees_too,
fetch_input=fetch_input,
Expand Down Expand Up @@ -471,17 +467,10 @@ def _before_run(

return super()._before_run(check_readiness=check_readiness)

def _run(
self,
finished_callback: callable,
force_local_execution: bool,
) -> Any | tuple | Future:
def _run(self, finished_callback: callable) -> Any | tuple | Future:
if self.parent is not None:
self.parent.register_child_starting(self)
return super()._run(
finished_callback=finished_callback,
force_local_execution=force_local_execution,
)
return super()._run(finished_callback=finished_callback)

def run_data_tree(self, run_parent_trees_too=False) -> None:
"""
Expand Down Expand Up @@ -644,7 +633,6 @@ def execute(self, *args, **kwargs):
run_parent_trees_too=False,
fetch_input=False,
check_readiness=False,
force_local_execution=True,
emit_ran_signal=False,
**kwargs,
)
Expand All @@ -668,7 +656,6 @@ def pull(self, *args, run_parent_trees_too=False, **kwargs):
run_parent_trees_too=run_parent_trees_too,
fetch_input=True,
check_readiness=True,
force_local_execution=False,
emit_ran_signal=False,
**kwargs,
)
Expand Down
2 changes: 0 additions & 2 deletions pyiron_workflow/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,6 @@ def _build_io(
def run(
self,
check_readiness: bool = True,
force_local_execution: bool = False,
**kwargs,
):
# Note: Workflows may have neither parents nor siblings, so we don't need to
Expand All @@ -352,7 +351,6 @@ def run(
run_parent_trees_too=False,
fetch_input=False,
check_readiness=check_readiness,
force_local_execution=force_local_execution,
emit_ran_signal=False,
**kwargs,
)
Expand Down
12 changes: 4 additions & 8 deletions tests/unit/mixin/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,11 @@ def test_failure(self):

def test_runnable_run_local(self):
runnable = ConcreteRunnable()
runnable.executor = CloudpickleProcessPoolExecutor()

result = runnable.run(force_local_execution=True)
result = runnable.run()
self.assertIsNone(
runnable.future,
msg="The local execution flag should override the executor"
msg="Without an executor, we expect no future"
)
self.assertDictEqual(
runnable.expected_run_output,
Expand All @@ -120,7 +119,7 @@ def test_runnable_run_with_executor(self):
runnable = ConcreteRunnable()
runnable.executor = CloudpickleProcessPoolExecutor()

result = runnable.run(force_local_execution=False)
result = runnable.run()
self.assertIsInstance(
result,
Future,
Expand All @@ -146,10 +145,7 @@ def test_runnable_run_with_executor_and_callback(self):
runnable = ConcreteRunnable()
runnable.executor = CloudpickleProcessPoolExecutor()

result = runnable.run(
force_local_execution=False,
_finished_callback=runnable.custom_callback,
)
result = runnable.run(_finished_callback=runnable.custom_callback)
self.assertDictEqual(
runnable.expected_run_output,
result.result(timeout=30),
Expand Down
47 changes: 0 additions & 47 deletions tests/unit/test_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,53 +142,6 @@ def test_check_readiness(self):

self.n3.use_cache = n3_cache

def test_force_local_execution(self):
self.n1.executor = ProcessPoolExecutor()
out = self.n1.run(force_local_execution=False)
with self.subTest("Test running with an executor fulfills promises"):
self.assertIsInstance(
out,
Future,
msg="With an executor, we expect a futures object back"
)
self.assertTrue(
self.n1.running,
msg="The running flag should be true while it's running, and "
"(de)serialization is time consuming enough that we still expect"
"this to be the case"
)
self.assertFalse(
self.n1.ready,
msg="While running, the node should not be ready."
)
with self.assertRaises(
RuntimeError,
msg="Running nodes should not be allowed to get their input updated",
):
self.n1.inputs.x = 42
self.assertEqual(
1,
out.result(timeout=120),
msg="If we wait for the remote execution to finish, it should give us"
"the right thing"
)
self.assertEqual(
1,
self.n1.outputs.y.value,
msg="The callback on the executor should ensure the output processing "
"happens"
)

self.n2.executor = ProcessPoolExecutor()
self.n2.inputs.x = 0
self.assertEqual(
1,
self.n2.run(fetch_input=False, force_local_execution=True),
msg="Forcing local execution should do just that."
)
self.n1.executor_shutdown()
self.n2.executor_shutdown()

def test_emit_ran_signal(self):
self.n1 >> self.n2 >> self.n3 # Chained connection declaration

Expand Down

0 comments on commit aba278f

Please sign in to comment.