Skip to content

Commit

Permalink
Iterators without aclose() (rare) are notoriously problematic...
Browse files Browse the repository at this point in the history
- So now aclose() is explicitly required.
- Since we effectively had an in-project `aclosing()` implementation, also
  removed async_generator dep.
- The new, better-annotated `aclosing()` triggered Pyright to alert on a few
  places where it was being applied to an iterable instead of an iterator.
  Fixed those, too.
  • Loading branch information
mikenerone committed Jan 6, 2024
1 parent 4a2e9f4 commit 98a65f3
Show file tree
Hide file tree
Showing 18 changed files with 102 additions and 109 deletions.
6 changes: 3 additions & 3 deletions docs/user.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ pipeline out of individual sections. A ``PipelineSection`` is any object that is
function. This currently includes the following types:

AsyncIterables
Async iterables are valid only as the very first ``PipelineSection``. Subsequent
sections will use this async iterable as input source. Placing an ``AsyncIterable`` into the middle of
a sequence of pipeline sections, will cause a ``ValueError``.
Async iterables are valid only as the very first ``PipelineSection``, and must support the ``aclose()``
method (nearly all do). Subsequent sections will use this async iterable as input source. Placing an
``AsyncIterable`` into the middle of a sequence of pipeline sections, will cause a ``ValueError``.
Sections
Any :class:`Section <slurry.sections.abc.Section>` abc subclass is a valid ``PipelineSection``, at any
position in the pipeline.
Expand Down
13 changes: 1 addition & 12 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ classifiers = [


[tool.poetry.dependencies]
async-generator = "^1.10"
python = "^3.8"
trio = "^0.23.0"

Expand Down
4 changes: 2 additions & 2 deletions slurry/_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
from contextlib import asynccontextmanager

import trio
from async_generator import aclosing

from .sections.weld import weld
from ._tap import Tap
from ._types import PipelineSection
from ._utils import aclosing

class Pipeline:
"""The main Slurry ``Pipeline`` class.
Expand Down Expand Up @@ -54,7 +54,7 @@ async def _pump(self):
output = weld(nursery, *self.sections)

# Output to taps
async with aclosing(output) as aiter:
async with aclosing(output.__aiter__()) as aiter:
async for item in aiter:
self._taps = set(filter(lambda tap: not tap.closed, self._taps))
if not self._taps:
Expand Down
20 changes: 15 additions & 5 deletions slurry/_types.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,20 @@
from typing import Any, AsyncIterable, Awaitable, Protocol, Tuple, Union, runtime_checkable
from typing import Any, AsyncIterable, Awaitable, Protocol, Tuple, TypeVar, Union, runtime_checkable

from .sections.abc import Section
from .sections import abc

PipelineSection = Union[AsyncIterable[Any], Section, Tuple["PipelineSection", ...]]
PipelineSection = Union["AsyncIterableWithAcloseableIterator[Any]", "abc.Section", Tuple["PipelineSection", ...]]

_T_co = TypeVar("_T_co", covariant=True)

@runtime_checkable
class SupportsAclose(Protocol):
def aclose(self) -> Awaitable[object]:
...
def aclose(self) -> Awaitable[object]: ...

@runtime_checkable
class AcloseableAsyncIterator(SupportsAclose, Protocol[_T_co]):
def __anext__(self) -> Awaitable[_T_co]: ...
def __aiter__(self) -> "AcloseableAsyncIterator[_T_co]": ...

@runtime_checkable
class AsyncIterableWithAcloseableIterator(Protocol[_T_co]):
def __aiter__(self) -> AcloseableAsyncIterator[_T_co]: ...
9 changes: 4 additions & 5 deletions slurry/_utils.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
from typing import AsyncGenerator, AsyncIterator, TypeVar
from typing import AsyncGenerator, TypeVar

from ._types import SupportsAclose
from ._types import AcloseableAsyncIterator

from contextlib import asynccontextmanager

_T = TypeVar("_T")

@asynccontextmanager
async def safe_aclosing(obj: AsyncIterator[_T]) -> AsyncGenerator[AsyncIterator[_T], None]:
async def aclosing(obj: AcloseableAsyncIterator[_T]) -> AsyncGenerator[AcloseableAsyncIterator[_T], None]:
try:
yield obj
finally:
if isinstance(obj, SupportsAclose):
await obj.aclose()
await obj.aclose()
7 changes: 5 additions & 2 deletions slurry/environments/_multiprocessing.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
"""Implements a section that runs in an independent python proces."""

from multiprocessing import Process, SimpleQueue
from typing import AsyncIterable, Any, Awaitable, Callable, Optional, cast
from typing import Any, Awaitable, Callable, Optional, cast

import trio

from ..sections.abc import SyncSection
from .._types import AsyncIterableWithAcloseableIterator

class ProcessSection(SyncSection):
"""ProcessSection defines a section interface with a synchronous
Expand All @@ -19,7 +20,9 @@ class ProcessSection(SyncSection):
<https://docs.python.org/3/library/pickle.html#what-can-be-pickled-and-unpickled>`_.
"""

async def pump(self, input: Optional[AsyncIterable[Any]], output: Callable[[Any], Awaitable[None]]):
async def pump(
self, input: Optional[AsyncIterableWithAcloseableIterator[Any]], output: Callable[[Any], Awaitable[None]]
):
"""
The ``ProcessSection`` pump method works similar to the threaded version, however
since communication between processes is not as simple as it is between threads,
Expand Down
5 changes: 3 additions & 2 deletions slurry/environments/_threading.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
"""The threading module implements a synchronous section that runs in a background thread."""
from typing import Any, AsyncIterable, Awaitable, Callable, Optional
from typing import Any, Awaitable, Callable, Optional

import trio

from ..sections.abc import SyncSection
from .._types import AsyncIterableWithAcloseableIterator


class ThreadSection(SyncSection):
Expand All @@ -12,7 +13,7 @@ class ThreadSection(SyncSection):
"""

async def pump(self,
input: Optional[AsyncIterable[Any]],
input: Optional[AsyncIterableWithAcloseableIterator[Any]],
output: Callable[[Any], Awaitable[None]]):
"""Runs the refine method in a background thread with synchronous input and output
wrappers, which transparently bridges the input and outputs between the parent
Expand Down
7 changes: 5 additions & 2 deletions slurry/environments/_trio.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
"""The Trio environment implements ``TrioSection``, which is a Trio-native
:class:`AsyncSection <slurry.sections.abc.AsyncSection>`."""
from typing import Any, AsyncIterable, Awaitable, Callable, Optional
from typing import Any, Awaitable, Callable, Optional

from ..sections.abc import AsyncSection
from .._types import AsyncIterableWithAcloseableIterator

class TrioSection(AsyncSection):
"""Since Trio is the native Slurry event loop, this environment is simple to implement.
The pump method does not need to do anything special to bridge the input and output. It
simply delegates directly to the refine method, as the api is identical."""
async def pump(self, input: Optional[AsyncIterable[Any]], output: Callable[[Any], Awaitable[None]]):
async def pump(
self, input: Optional[AsyncIterableWithAcloseableIterator[Any]], output: Callable[[Any], Awaitable[None]]
):
"""Calls refine."""
await self.refine(input, output)
23 changes: 12 additions & 11 deletions slurry/sections/_buffers.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
"""Pipeline sections with age- and volume-based buffers."""
from collections import deque
import math
from typing import Any, AsyncIterable, Callable, Optional, Sequence
from typing import Any, Callable, Optional, Sequence

import trio
from async_generator import aclosing

from ..environments import TrioSection
from .._types import AsyncIterableWithAcloseableIterator
from .._utils import aclosing

class Window(TrioSection):
"""Window buffer with size and age limits.
Expand All @@ -26,13 +27,13 @@ class Window(TrioSection):
:param max_size: The maximum buffer size.
:type max_size: int
:param source: Input when used as first section.
:type source: Optional[AsyncIterable[Any]]
:type source: Optional[AsyncIterableWithAcloseableIterator[Any]]
:param max_age: Maximum item age in seconds. (default: unlimited)
:type max_age: float
:param min_size: Minimum amount of items in the buffer to trigger an output.
:type min_size: int
"""
def __init__(self, max_size: int, source: Optional[AsyncIterable[Any]] = None, *,
def __init__(self, max_size: int, source: Optional[AsyncIterableWithAcloseableIterator[Any]] = None, *,
max_age: float = math.inf,
min_size: int = 1):
super().__init__()
Expand All @@ -51,7 +52,7 @@ async def refine(self, input, output):

buf = deque()

async with aclosing(source) as aiter:
async with aclosing(source.__aiter__()) as aiter:
async for item in aiter:
now = trio.current_time()
buf.append((item, now))
Expand Down Expand Up @@ -80,7 +81,7 @@ class Group(TrioSection):
:param interval: Time in seconds from when an item arrives until the buffer is sent.
:type interval: float
:param source: Input when used as first section.
:type source: Optional[AsyncIterable[Any]]
:type source: Optional[AsyncIterableWithAcloseableIterator[Any]]
:param max_size: Maximum number of items in buffer, which when reached, will cause the buffer
to be sent.
:type max_size: int
Expand All @@ -89,7 +90,7 @@ class Group(TrioSection):
:param reducer: Optional reducer function used to transform the buffer to a single value.
:type reducer: Optional[Callable[[Sequence[Any]], Any]]
"""
def __init__(self, interval: float, source: Optional[AsyncIterable[Any]] = None, *,
def __init__(self, interval: float, source: Optional[AsyncIterableWithAcloseableIterator[Any]] = None, *,
max_size: Optional[int] = None,
mapper: Optional[Callable[[Any], Any]] = None,
reducer: Optional[Callable[[Sequence[Any]], Any]] = None):
Expand All @@ -111,7 +112,7 @@ async def refine(self, input, output):

send_channel, receive_channel = trio.open_memory_channel(0)
async def pull_task():
async with send_channel, aclosing(source) as aiter:
async with send_channel, aclosing(source.__aiter__()) as aiter:
async for item in aiter:
await send_channel.send(item)
nursery.start_soon(pull_task)
Expand Down Expand Up @@ -152,9 +153,9 @@ class Delay(TrioSection):
:param interval: Number of seconds that each item is delayed.
:type interval: float
:param source: Input when used as first section.
:type source: Optional[AsyncIterable[Any]]
:type source: Optional[AsyncIterableWithAcloseableIterator[Any]]
"""
def __init__(self, interval: float, source: Optional[AsyncIterable[Any]] = None):
def __init__(self, interval: float, source: Optional[AsyncIterableWithAcloseableIterator[Any]] = None):
super().__init__()
self.source = source
self.interval = interval
Expand All @@ -169,7 +170,7 @@ async def refine(self, input, output):
buffer_input_channel, buffer_output_channel = trio.open_memory_channel(math.inf)

async def pull_task():
async with buffer_input_channel, aclosing(source) as aiter:
async with buffer_input_channel, aclosing(source.__aiter__()) as aiter:
async for item in aiter:
await buffer_input_channel.send((item, trio.current_time() + self.interval))

Expand Down
11 changes: 6 additions & 5 deletions slurry/sections/_combiners.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
import itertools

import trio
from async_generator import aclosing

from ..environments import TrioSection
from .weld import weld
from .._types import PipelineSection
from .._utils import aclosing

class Chain(TrioSection):
"""Chains input from one or more sources. Any valid ``PipelineSection`` is an allowed source.
Expand Down Expand Up @@ -40,7 +40,7 @@ async def refine(self, input, output):
sources = self.sources
async with trio.open_nursery() as nursery:
for source in sources:
async with aclosing(weld(nursery, source)) as agen:
async with aclosing(weld(nursery, source).__aiter__()) as agen:
async for item in agen:
await output(item)

Expand All @@ -66,7 +66,7 @@ async def refine(self, input, output):
async with trio.open_nursery() as nursery:

async def pull_task(source):
async with aclosing(weld(nursery, source)) as aiter:
async with aclosing(weld(nursery, source).__aiter__()) as aiter:
async for item in aiter:
await output(item)

Expand Down Expand Up @@ -155,7 +155,8 @@ class ZipLatest(TrioSection):
default value to output, until an input has arrived on a source. Defaults to ``None``.
:type default: Any
:param monitor: Additional asynchronous sequences to monitor.
:type monitor: Optional[Union[AsyncIterable[Any], Sequence[AsyncIterable[Any]]]]
:type monitor: Optional[Union[AsyncIterableWithAcloseableIterator[Any],
Sequence[AsyncIterableWithAcloseableIterator[Any]]]]
:param place_input: Position of the pipeline input source in the output tuple. Options:
``'first'`` (default)|``'last'``
:type place_input: string
Expand Down Expand Up @@ -203,7 +204,7 @@ async def refine(self, input, output):
async with trio.open_nursery() as nursery:

async def pull_task(index, source, monitor=False):
async with aclosing(weld(nursery, source)) as aiter:
async with aclosing(weld(nursery, source).__aiter__()) as aiter:
async for item in aiter:
results[index] = item
ready[index] = True
Expand Down
Loading

0 comments on commit 98a65f3

Please sign in to comment.