diff --git a/pyiron_workflow/mixin/run.py b/pyiron_workflow/mixin/run.py index 826e8b91..6566b78b 100644 --- a/pyiron_workflow/mixin/run.py +++ b/pyiron_workflow/mixin/run.py @@ -97,7 +97,6 @@ def executor_shutdown(self, wait=True, *, cancel_futures=False): def run( self, check_readiness: bool = True, - force_local_execution: bool = False, _finished_callback: Optional[callable] = None, **kwargs, ) -> Any | tuple | Future: @@ -115,9 +114,6 @@ def run( Args: check_readiness (bool): Whether to raise a `ReadinessError` if not :attr:`ready`. (Default is True.) - force_local_execution (bool): Whether to run on the main process regardless - of the value of :attr:`executor`. (Default is False, use an - :attr:`executor` if provided.) _finished_callback (callable): What to do with the output of :meth:`on_run` after the execution is complete (including waiting for the future to finish!). This method is responsible for updating the status @@ -136,7 +132,6 @@ def run( if _finished_callback is None else _finished_callback ), - force_local_execution=force_local_execution, ) return out except Exception as e: @@ -178,7 +173,6 @@ def _readiness_error_message(self) -> str: def _run( self, finished_callback: callable, - force_local_execution: bool, ) -> Any | tuple | Future: args, kwargs = self.run_args if "self" in kwargs.keys(): @@ -186,13 +180,10 @@ def _run( f"{self.label} got 'self' as a run kwarg, but self is already the " f"first positional argument passed to :meth:`on_run`." ) - if force_local_execution or self.executor is None: - # Run locally + if self.executor is None: run_output = self.on_run(*args, **kwargs) return finished_callback(run_output) else: - # Just blindly try to execute -- as we nail down the executor interaction - # we'll want to fail more cleanly here. executor = self._parse_executor(self.executor) if isinstance(self.executor, ThreadPoolExecutor): self.future = executor.submit(self.thread_pool_run, *args, **kwargs) diff --git a/pyiron_workflow/node.py b/pyiron_workflow/node.py index f6415453..2904e0fc 100644 --- a/pyiron_workflow/node.py +++ b/pyiron_workflow/node.py @@ -375,7 +375,6 @@ def run( run_parent_trees_too: bool = False, fetch_input: bool = True, check_readiness: bool = True, - force_local_execution: bool = False, emit_ran_signal: bool = True, **kwargs, ): @@ -407,8 +406,6 @@ def run( is True.) check_readiness (bool): Whether to raise an exception if the node is not :attr:`ready` to run after fetching new input. (Default is True.) - force_local_execution (bool): Whether to ignore any executor settings and - force the computation to run locally. (Default is False.) emit_ran_signal (bool): Whether to fire off all the output `ran` signal afterwards. (Default is True.) **kwargs: Keyword arguments matching input channel labels; used to update @@ -430,7 +427,6 @@ def run( return super().run( check_readiness=check_readiness, - force_local_execution=force_local_execution, run_data_tree=run_data_tree, run_parent_trees_too=run_parent_trees_too, fetch_input=fetch_input, @@ -471,17 +467,10 @@ def _before_run( return super()._before_run(check_readiness=check_readiness) - def _run( - self, - finished_callback: callable, - force_local_execution: bool, - ) -> Any | tuple | Future: + def _run(self, finished_callback: callable) -> Any | tuple | Future: if self.parent is not None: self.parent.register_child_starting(self) - return super()._run( - finished_callback=finished_callback, - force_local_execution=force_local_execution, - ) + return super()._run(finished_callback=finished_callback) def run_data_tree(self, run_parent_trees_too=False) -> None: """ @@ -644,7 +633,6 @@ def execute(self, *args, **kwargs): run_parent_trees_too=False, fetch_input=False, check_readiness=False, - force_local_execution=True, emit_ran_signal=False, **kwargs, ) @@ -668,7 +656,6 @@ def pull(self, *args, run_parent_trees_too=False, **kwargs): run_parent_trees_too=run_parent_trees_too, fetch_input=True, check_readiness=True, - force_local_execution=False, emit_ran_signal=False, **kwargs, ) diff --git a/pyiron_workflow/workflow.py b/pyiron_workflow/workflow.py index 41cc175b..64391f8e 100644 --- a/pyiron_workflow/workflow.py +++ b/pyiron_workflow/workflow.py @@ -339,7 +339,6 @@ def _build_io( def run( self, check_readiness: bool = True, - force_local_execution: bool = False, **kwargs, ): # Note: Workflows may have neither parents nor siblings, so we don't need to @@ -352,7 +351,6 @@ def run( run_parent_trees_too=False, fetch_input=False, check_readiness=check_readiness, - force_local_execution=force_local_execution, emit_ran_signal=False, **kwargs, ) diff --git a/tests/unit/mixin/test_run.py b/tests/unit/mixin/test_run.py index bcceaa00..9bacd495 100644 --- a/tests/unit/mixin/test_run.py +++ b/tests/unit/mixin/test_run.py @@ -98,12 +98,11 @@ def test_failure(self): def test_runnable_run_local(self): runnable = ConcreteRunnable() - runnable.executor = CloudpickleProcessPoolExecutor() - result = runnable.run(force_local_execution=True) + result = runnable.run() self.assertIsNone( runnable.future, - msg="The local execution flag should override the executor" + msg="Without an executor, we expect no future" ) self.assertDictEqual( runnable.expected_run_output, @@ -120,7 +119,7 @@ def test_runnable_run_with_executor(self): runnable = ConcreteRunnable() runnable.executor = CloudpickleProcessPoolExecutor() - result = runnable.run(force_local_execution=False) + result = runnable.run() self.assertIsInstance( result, Future, @@ -146,10 +145,7 @@ def test_runnable_run_with_executor_and_callback(self): runnable = ConcreteRunnable() runnable.executor = CloudpickleProcessPoolExecutor() - result = runnable.run( - force_local_execution=False, - _finished_callback=runnable.custom_callback, - ) + result = runnable.run(_finished_callback=runnable.custom_callback) self.assertDictEqual( runnable.expected_run_output, result.result(timeout=30), diff --git a/tests/unit/test_node.py b/tests/unit/test_node.py index 55799521..0b343e0e 100644 --- a/tests/unit/test_node.py +++ b/tests/unit/test_node.py @@ -142,53 +142,6 @@ def test_check_readiness(self): self.n3.use_cache = n3_cache - def test_force_local_execution(self): - self.n1.executor = ProcessPoolExecutor() - out = self.n1.run(force_local_execution=False) - with self.subTest("Test running with an executor fulfills promises"): - self.assertIsInstance( - out, - Future, - msg="With an executor, we expect a futures object back" - ) - self.assertTrue( - self.n1.running, - msg="The running flag should be true while it's running, and " - "(de)serialization is time consuming enough that we still expect" - "this to be the case" - ) - self.assertFalse( - self.n1.ready, - msg="While running, the node should not be ready." - ) - with self.assertRaises( - RuntimeError, - msg="Running nodes should not be allowed to get their input updated", - ): - self.n1.inputs.x = 42 - self.assertEqual( - 1, - out.result(timeout=120), - msg="If we wait for the remote execution to finish, it should give us" - "the right thing" - ) - self.assertEqual( - 1, - self.n1.outputs.y.value, - msg="The callback on the executor should ensure the output processing " - "happens" - ) - - self.n2.executor = ProcessPoolExecutor() - self.n2.inputs.x = 0 - self.assertEqual( - 1, - self.n2.run(fetch_input=False, force_local_execution=True), - msg="Forcing local execution should do just that." - ) - self.n1.executor_shutdown() - self.n2.executor_shutdown() - def test_emit_ran_signal(self): self.n1 >> self.n2 >> self.n3 # Chained connection declaration