From f942c5f6c8f422e4089d99dab4df7013a82a0e7e Mon Sep 17 00:00:00 2001 From: Mike Nerone Date: Wed, 3 Jan 2024 15:34:33 -0600 Subject: [PATCH] Fix typing violations (and referring docs where changed). Plus a couple of typos I happened to notice. --- slurry/__init__.py | 2 +- slurry/_pipeline.py | 18 ++++++++------- slurry/_types.py | 6 ++++- slurry/environments/__init__.py | 6 ++--- slurry/environments/_multiprocessing.py | 4 ++-- slurry/environments/_threading.py | 6 ++--- slurry/sections/__init__.py | 10 ++++----- slurry/sections/_buffers.py | 4 ++-- slurry/sections/_combiners.py | 30 ++++++++++++------------- slurry/sections/_producers.py | 3 ++- slurry/sections/weld.py | 11 +++++---- 11 files changed, 52 insertions(+), 48 deletions(-) diff --git a/slurry/__init__.py b/slurry/__init__.py index afc85ce..3a415b4 100644 --- a/slurry/__init__.py +++ b/slurry/__init__.py @@ -1,4 +1,4 @@ """An async streaming data processing framework.""" __version__ = '1.3.1' -from ._pipeline import Pipeline +from ._pipeline import Pipeline as Pipeline diff --git a/slurry/_pipeline.py b/slurry/_pipeline.py index 0bb585f..a92bb57 100644 --- a/slurry/_pipeline.py +++ b/slurry/_pipeline.py @@ -1,13 +1,15 @@ """Contains the main Slurry ``Pipeline`` class.""" import math -from typing import AsyncContextManager, Sequence +from typing import Any, AsyncGenerator +from contextlib import asynccontextmanager import trio -from async_generator import aclosing, asynccontextmanager +from async_generator import aclosing from .sections.weld import weld from ._tap import Tap +from ._types import PipelineSection class Pipeline: """The main Slurry ``Pipeline`` class. @@ -22,7 +24,7 @@ class Pipeline: * ``nursery``: The :class:`trio.Nursery` that is executing the pipeline. """ - def __init__(self, *sections: Sequence["PipelineSection"], + def __init__(self, *sections: PipelineSection, nursery: trio.Nursery, enabled: trio.Event): self.sections = sections @@ -32,10 +34,10 @@ def __init__(self, *sections: Sequence["PipelineSection"], @classmethod @asynccontextmanager - async def create(cls, *sections: Sequence["PipelineSection"]) -> AsyncContextManager["Pipeline"]: + async def create(cls, *sections: PipelineSection) -> AsyncGenerator["Pipeline", None]: """Creates a new pipeline context and adds the given section sequence to it. - :param Sequence[PipelineSection] \\*sections: One or more + :param PipelineSection \\*sections: One or more :mod:`PipelineSection ` compatible objects. """ async with trio.open_nursery() as nursery: @@ -69,7 +71,7 @@ def tap(self, *, max_buffer_size: int = 0, timeout: float = math.inf, retrys: int = 0, - start: bool = True) -> trio.MemoryReceiveChannel: + start: bool = True) -> trio.MemoryReceiveChannel[Any]: # pylint: disable=line-too-long """Create a new output channel for this pipeline. @@ -103,7 +105,7 @@ def tap(self, *, self._enabled.set() return receive_channel - def extend(self, *sections: Sequence["PipelineSection"], start: bool = False) -> "Pipeline": + def extend(self, *sections: PipelineSection, start: bool = False) -> "Pipeline": """Extend this pipeline into a new pipeline. An extension will add a tap to the existing pipeline and use this tap as input to the @@ -112,7 +114,7 @@ def extend(self, *sections: Sequence["PipelineSection"], start: bool = False) -> Extensions can be added dynamically during runtime. The data feed will start at the current position. Old events won't be replayed. - :param Sequence[PipelineSection] \\*sections: One or more pipeline sections. + :param PipelineSection \\*sections: One or more pipeline sections. :param bool start: Start processing when adding this extension. (default: ``False``) """ pipeline = Pipeline( diff --git a/slurry/_types.py b/slurry/_types.py index 8858a5d..8b9a747 100644 --- a/slurry/_types.py +++ b/slurry/_types.py @@ -1,4 +1,8 @@ -from typing import Awaitable, Protocol, runtime_checkable +from typing import Any, AsyncIterable, Awaitable, Protocol, Tuple, Union, runtime_checkable + +from .sections.abc import Section + +PipelineSection = Union[AsyncIterable[Any], Section, Tuple["PipelineSection", ...]] @runtime_checkable class SupportsAclose(Protocol): diff --git a/slurry/environments/__init__.py b/slurry/environments/__init__.py index 845cf52..e14f7ee 100644 --- a/slurry/environments/__init__.py +++ b/slurry/environments/__init__.py @@ -1,3 +1,3 @@ -from ._trio import TrioSection -from ._threading import ThreadSection -from ._multiprocessing import ProcessSection \ No newline at end of file +from ._trio import TrioSection as TrioSection +from ._threading import ThreadSection as ThreadSection +from ._multiprocessing import ProcessSection as ProcessSection diff --git a/slurry/environments/_multiprocessing.py b/slurry/environments/_multiprocessing.py index bd7ec64..2769444 100644 --- a/slurry/environments/_multiprocessing.py +++ b/slurry/environments/_multiprocessing.py @@ -1,7 +1,7 @@ """Implements a section that runs in an independent python proces.""" from multiprocessing import Process, SimpleQueue -from typing import Any, Iterable, Callable, cast +from typing import AsyncIterable, Any, Awaitable, Callable, Optional, cast import trio @@ -19,7 +19,7 @@ class ProcessSection(SyncSection): `_. """ - async def pump(self, input: Iterable[Any], output: Callable[[Any], None]): + async def pump(self, input: Optional[AsyncIterable[Any]], output: Callable[[Any], Awaitable[None]]): """ The ``ProcessSection`` pump method works similar to the threaded version, however since communication between processes is not as simple as it is between threads, diff --git a/slurry/environments/_threading.py b/slurry/environments/_threading.py index 2699819..0d84f66 100644 --- a/slurry/environments/_threading.py +++ b/slurry/environments/_threading.py @@ -1,5 +1,5 @@ """The threading module implements a synchronous section that runs in a background thread.""" -from typing import Any, Iterable, Callable +from typing import Any, AsyncIterable, Awaitable, Callable, Optional import trio @@ -12,8 +12,8 @@ class ThreadSection(SyncSection): """ async def pump(self, - input: Iterable[Any], - output: Callable[[Any], None]): + input: Optional[AsyncIterable[Any]], + output: Callable[[Any], Awaitable[None]]): """Runs the refine method in a background thread with synchronous input and output wrappers, which transparently bridges the input and outputs between the parent trio event loop and the sync world. diff --git a/slurry/sections/__init__.py b/slurry/sections/__init__.py index 1c08cc0..d769289 100644 --- a/slurry/sections/__init__.py +++ b/slurry/sections/__init__.py @@ -1,6 +1,6 @@ """A collection of common stream operations.""" -from ._buffers import Window, Group, Delay -from ._combiners import Chain, Merge, Zip, ZipLatest -from ._filters import Skip, SkipWhile, Filter, Changes, RateLimit -from ._producers import Repeat, Metronome, InsertValue -from ._refiners import Map +from ._buffers import Window as Window, Group as Group, Delay as Delay +from ._combiners import Chain as Chain, Merge as Merge, Zip as Zip, ZipLatest as ZipLatest +from ._filters import Skip as Skip, SkipWhile as SkipWhile, Filter as Filter, Changes as Changes, RateLimit as RateLimit +from ._producers import Repeat as Repeat, Metronome as Metronome, InsertValue as InsertValue +from ._refiners import Map as Map diff --git a/slurry/sections/_buffers.py b/slurry/sections/_buffers.py index 2ff434c..a243802 100644 --- a/slurry/sections/_buffers.py +++ b/slurry/sections/_buffers.py @@ -90,13 +90,13 @@ class Group(TrioSection): :type reducer: Optional[Callable[[Sequence[Any]], Any]] """ def __init__(self, interval: float, source: Optional[AsyncIterable[Any]] = None, *, - max_size: int = math.inf, + max_size: Optional[int] = None, mapper: Optional[Callable[[Any], Any]] = None, reducer: Optional[Callable[[Sequence[Any]], Any]] = None): super().__init__() self.source = source self.interval = interval - self.max_size = max_size + self.max_size = math.inf if max_size is None else max_size self.mapper = mapper self.reducer = reducer diff --git a/slurry/sections/_combiners.py b/slurry/sections/_combiners.py index a075d1e..66dad9d 100644 --- a/slurry/sections/_combiners.py +++ b/slurry/sections/_combiners.py @@ -1,13 +1,13 @@ """Pipeline sections for combining multiple inputs into a single output.""" import builtins import itertools -from typing import Any, AsyncIterable, Sequence import trio from async_generator import aclosing from ..environments import TrioSection from .weld import weld +from .._types import PipelineSection class Chain(TrioSection): """Chains input from one or more sources. Any valid ``PipelineSection`` is an allowed source. @@ -21,13 +21,11 @@ class Chain(TrioSection): By default, the input is added as the first source. If the input is added last instead of first, it will cause backpressure to be applied upstream. - :param sources: One or more ``PipelineSection`` that will be chained together. - :type sources: Sequence[PipelineSection] - :param place_input: Iteration priority of the pipeline input source. Options: + :param PipelineSection \\*sources: One or more ``PipelineSection`` that will be chained together. + :param string place_input: Iteration priority of the pipeline input source. Options: ``'first'`` (default) \\| ``'last'``. - :type place_input: string """ - def __init__(self, *sources: Sequence["PipelineSection"], place_input='first'): + def __init__(self, *sources: PipelineSection, place_input='first'): super().__init__() self.sources = sources self.place_input = _validate_place_input(place_input) @@ -57,10 +55,10 @@ class Merge(TrioSection): Sources can be pipeline sections, which will be treated as first sections, with no input. Merge will take care of running the pump task for these sections. - :param sources: One or more async iterables or sections who's contents will be merged. - :type sources: Sequence[PipelineSection] + :param PipelineSection \\*sources: One or more async iterables or sections whose contents + will be merged. """ - def __init__(self, *sources: Sequence["PipelineSection"]): + def __init__(self, *sources: PipelineSection): super().__init__() self.sources = sources @@ -90,13 +88,13 @@ class Zip(TrioSection): If sources are out of sync, the fastest source will have to wait for the slowest, which will cause backpressure. - :param sources: One or more ``PipelineSection``, who's contents will be zipped. - :type sources: Sequence[PipelineSection] + :param PipelineSection \\*sources: One or more ``PipelineSection``, whose contents will be + zipped. :param place_input: Position of the pipeline input source in the output tuple. Options: ``'first'`` (default) \\| ``'last'``. :type place_input: string """ - def __init__(self, *sources: Sequence["PipelineSection"], place_input='first'): + def __init__(self, *sources: PipelineSection, place_input='first'): super().__init__() self.sources = sources self.place_input = _validate_place_input(place_input) @@ -145,11 +143,11 @@ class ZipLatest(TrioSection): added as an input. .. Note:: - If any single source is exchausted, all remaining sources will be forcibly closed, and + If any single source is exhausted, all remaining sources will be forcibly closed, and the pipeline will stop. - :param sources: One or more async iterables that will be zipped together. - :type sources: Sequence[AsyncIterable[Any]] + :param PipelineSection \\*sources: One or more ``PipelineSection`` that will be zipped + together. :param partial: If ``True`` (default) output will be sent as soon as the first input arrives. Otherwise, all main sources must send at least one item, before an output is generated. :type partial: bool @@ -165,7 +163,7 @@ class ZipLatest(TrioSection): Defaults to ``False`` :type monitor_input: bool """ - def __init__(self, *sources: Sequence["PipelineSection"], + def __init__(self, *sources: PipelineSection, partial=True, default=None, monitor=(), diff --git a/slurry/sections/_producers.py b/slurry/sections/_producers.py index 23278d1..a0f15f0 100644 --- a/slurry/sections/_producers.py +++ b/slurry/sections/_producers.py @@ -1,6 +1,6 @@ """Pipeline sections that produce data streams.""" from time import time -from typing import Any +from typing import Any, AsyncIterable, cast import trio from async_generator import aclosing @@ -94,6 +94,7 @@ async def refine(self, input, output): # pylint: disable=line-too-long raise RuntimeError('If Repeat is used as first section, default value must be provided.') + input = cast(AsyncIterable[Any], input) item = self.default if self.has_default else None async def pull_task(cancel_scope, *, task_status=trio.TASK_STATUS_IGNORED): diff --git a/slurry/sections/weld.py b/slurry/sections/weld.py index 6f7609a..f357a68 100644 --- a/slurry/sections/weld.py +++ b/slurry/sections/weld.py @@ -1,21 +1,20 @@ """Contains the `weld` utility function for composing sections.""" -from typing import Any, AsyncIterable, Optional, Sequence +from typing import Any, AsyncIterable, Optional, cast import trio from .abc import Section -from .._types import SupportsAclose +from .._types import PipelineSection, SupportsAclose -def weld(nursery, *sections: Sequence["PipelineSection"]) -> AsyncIterable[Any]: +def weld(nursery, *sections: PipelineSection) -> AsyncIterable[Any]: """ Connects the individual parts of a sequence of pipeline sections together and starts pumps for individual Sections. It returns an async iterable which yields results of the sequence. :param nursery: The nursery that runs individual pipeline section pumps. :type nursery: :class:`trio.Nursery` - :param \\*sections: A sequence of pipeline sections. - :type \\*sections: Sequence[PipelineSection] + :param PipelineSection \\*sections: Pipeline sections. """ async def pump(section, input: Optional[AsyncIterable[Any]], output: trio.MemorySendChannel): @@ -44,4 +43,4 @@ async def pump(section, input: Optional[AsyncIterable[Any]], output: trio.Memory output = section section_input = output - return output + return cast(AsyncIterable[Any], output)