Skip to content

Commit

Permalink
[minor] Make ProcessPoolExecutor the default executor (#389)
Browse files Browse the repository at this point in the history
* Make ProcessPoolExecutor the default executor -- but use executorlib in notebooks

* [minor] Don't have a default `Executor` (#390)

Make the user choose one specifically.
  • Loading branch information
liamhuber authored Jul 24, 2024
1 parent 49f816e commit 9183e48
Show file tree
Hide file tree
Showing 7 changed files with 14 additions and 20 deletions.
8 changes: 4 additions & 4 deletions notebooks/deepdive.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -3883,7 +3883,7 @@
"\n",
"You can currently run nodes in different process by setting that node's `executor` to an instance of a compliant executor object -- i.e. that takes the standard `submit` method of `concurrent.futures.Executor`, returns a `concurrent.futures.Future` (or sub-class). The built-in `Node` instances (workflows, macros, function nodes, etc.) are `pickle`-compliant, and thus will work with a standard `ProcessPoolExecutor` or `ThreadPoolExecutor` from `concurrent.futures`.\n",
"\n",
"For the case of `ProcessPoolExecutor`, the other process needs to be able to find an import the nodes, so they can't have been created in `__main__` (e.g. here in notebook) but need to be in some importable `.py` file. You might also want to have a node holding un-pickleable data. For these cases, we make a couple more flexible executors available on the creator. There is a toy `CloudpickleProcessPoolExecutor` which is a minimal example of handling dynamically defined/un-picklable data and useful for learning, but we also link to `executorlib.Executor`, which is both flexible and powerful. This is the default `Workflow.create.Executor`.\n",
"For the case of `ProcessPoolExecutor`, the other process needs to be able to find an import the nodes, so they can't have been created in `__main__` (e.g. here in notebook) but need to be in some importable `.py` file. You might also want to have a node holding un-pickleable data. For these cases, we make a couple more flexible executors available on the creator. There is a toy `CloudpickleProcessPoolExecutor` which is a minimal example of handling dynamically defined/un-picklable data and useful for learning, but we also link to `executorlib.Executor`, which is both flexible and powerful. For compatibility with GitHub CI on macos, this currently also requires an extra keyword.\n",
"\n",
"Here's a simple example of executor usage:"
]
Expand All @@ -3909,7 +3909,7 @@
"wf.a2 = wf.create.standard.Add(2, 3)\n",
"wf.b = wf.a1 + wf.a2\n",
"\n",
"wf.a2.executor = wf.create.Executor()\n",
"wf.a2.executor = wf.create.ExecutorlibExecutor(hostname_localhost=True)\n",
"wf()\n",
"\n",
"print(wf.a1.future, wf.a1.outputs.add.value)\n",
Expand Down Expand Up @@ -3996,7 +3996,7 @@
}
],
"source": [
"with Workflow.create.Executor() as executor:\n",
"with Workflow.create.ExecutorlibExecutor(hostname_localhost=True) as executor:\n",
" wf = Workflow(\"with_executor\")\n",
" wf.a1 = wf.create.standard.Add(0, 1)\n",
" wf.a2 = wf.create.standard.Add(2, 3)\n",
Expand Down Expand Up @@ -4093,7 +4093,7 @@
"wf.starting_nodes = [wf.a, wf.b, wf.c]\n",
"\n",
"\n",
"with wf.create.Executor(max_workers=3) as executor:\n",
"with wf.create.ExecutorlibExecutor(hostname_localhost=True, max_workers=3) as executor:\n",
" wf.a.executor = executor\n",
" wf.b.executor = executor\n",
" wf.c.executor = executor\n",
Expand Down
2 changes: 1 addition & 1 deletion notebooks/quickstart.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -1775,7 +1775,7 @@
"To learn more, take a look at the `deepdive.ipynb` notebook, and/or start looking through the class docstrings. Here's a brief map of what you're still missing:\n",
"\n",
"- Distributing node execution onto remote processes\n",
" - Single core parallel python processes is available by setting the `.executor` attribute to a compatible executor instance, e.g. `Workflow.create.Executor()`\n",
" - Parallel computation is available by setting the `.executor` attribute to a compatible executor instance, e.g. `Workflow.create.ProcessPoolExecutor()`\n",
"- Acyclic graphs\n",
" - Execution for graphs whose data flow topology is a DAG happens automatically, but you're always free to specify this manually with `Signals`, and indeed _must_ specify the execution flow manually for cyclic graphs -- but cyclic graphs _are_ possible!\n",
"- Complex flow nodes\n",
Expand Down
4 changes: 0 additions & 4 deletions pyiron_workflow/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@
if TYPE_CHECKING:
from pyiron_workflow.node_package import NodePackage

# Specify the standard executor
Executor = ExecutorlibExecutor


class Creator(metaclass=Singleton):
"""
Expand All @@ -47,7 +44,6 @@ def __init__(self):
self._package_access = DotDict()
self._package_registry = bidict()

self.Executor = Executor
# Standard lib
self.ProcessPoolExecutor = ProcessPoolExecutor
self.ThreadPoolExecutor = ThreadPoolExecutor
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def test_executor_and_creator_interaction(self):
wf.register("static.demo_nodes", "demo")

wf.before_pickling = wf.create.demo.OptionallyAdd(1)
wf.before_pickling.executor = wf.create.Executor()
wf.before_pickling.executor = wf.create.ProcessPoolExecutor()
wf()
wf.before_pickling.future.result(timeout=120) # Wait for it to finish
wf.executor_shutdown()
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/nodes/test_macro.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ def test_with_executor(self):
# at the downstream output, and none of this is happening in a workflow

original_one = macro.one
macro.executor = macro.create.Executor()
macro.executor = macro.create.ProcessPoolExecutor()

self.assertIs(
NOT_DATA,
Expand Down
8 changes: 3 additions & 5 deletions tests/unit/test_node.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
from concurrent.futures import Future
from concurrent.futures import Future, ProcessPoolExecutor
import os
import sys
import unittest

from pyiron_snippets.files import DirectoryObject

from pyiron_workflow.channels import InputData, NOT_DATA

from pyiron_workflow.create import Executor
from pyiron_workflow.mixin.injection import OutputDataWithInjection, OutputsWithInjection
from pyiron_workflow.io import Inputs
from pyiron_workflow.node import Node
Expand Down Expand Up @@ -143,7 +141,7 @@ def test_check_readiness(self):
)

def test_force_local_execution(self):
self.n1.executor = Executor()
self.n1.executor = ProcessPoolExecutor()
out = self.n1.run(force_local_execution=False)
with self.subTest("Test running with an executor fulfills promises"):
self.assertIsInstance(
Expand Down Expand Up @@ -179,7 +177,7 @@ def test_force_local_execution(self):
"happens"
)

self.n2.executor = Executor()
self.n2.executor = ProcessPoolExecutor()
self.n2.inputs.x = 0
self.assertEqual(
1,
Expand Down
8 changes: 4 additions & 4 deletions tests/unit/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ def test_with_executor(self):
wf.b = wf.create.function_node(plus_one, x=wf.a)

original_a = wf.a
wf.executor = wf.create.Executor()
wf.executor = wf.create.ProcessPoolExecutor()

self.assertIs(
NOT_DATA,
Expand Down Expand Up @@ -243,7 +243,7 @@ def test_parallel_execution(self):
wf.fast = five()
wf.sum = sum(a=wf.fast, b=wf.slow)

wf.slow.executor = wf.create.Executor()
wf.slow.executor = wf.create.ProcessPoolExecutor()

wf.slow.run()
wf.fast.run()
Expand Down Expand Up @@ -419,7 +419,7 @@ def add_three_macro(self, one__x):
msg="Sanity check, pulling here should work perfectly fine"
)

wf.m.one.executor = wf.create.Executor()
wf.m.one.executor = wf.create.ProcessPoolExecutor()
with self.assertRaises(
ValueError,
msg="Should not be able to pull with executor in local scope"
Expand All @@ -428,7 +428,7 @@ def add_three_macro(self, one__x):
wf.m.one.executor_shutdown() # Shouldn't get this far, but if so, shutdown
wf.m.one.executor = None

wf.n1.executor = wf.create.Executor()
wf.n1.executor = wf.create.ProcessPoolExecutor()
with self.assertRaises(
ValueError,
msg="Should not be able to pull with executor in parent scope"
Expand Down

0 comments on commit 9183e48

Please sign in to comment.