From 1f10f4bc1c2ac58ea5755f9224c31f52edbcb4ed Mon Sep 17 00:00:00 2001 From: Mike Nerone Date: Wed, 17 Jan 2024 01:34:56 -0600 Subject: [PATCH] Typing fixes --- slurry/_pipeline.py | 15 ++++++++------- slurry/_tap.py | 2 +- slurry/environments/_multiprocessing.py | 4 ++-- slurry/environments/_threading.py | 6 +++--- slurry/sections/_buffers.py | 2 +- slurry/sections/_combiners.py | 25 ++++++++++++------------- slurry/sections/abc.py | 4 +++- slurry/sections/weld.py | 13 ++++++------- 8 files changed, 36 insertions(+), 35 deletions(-) diff --git a/slurry/_pipeline.py b/slurry/_pipeline.py index 0bb585f..a30922f 100644 --- a/slurry/_pipeline.py +++ b/slurry/_pipeline.py @@ -1,13 +1,14 @@ """Contains the main Slurry ``Pipeline`` class.""" import math -from typing import AsyncContextManager, Sequence +from typing import Any, AsyncGenerator import trio from async_generator import aclosing, asynccontextmanager from .sections.weld import weld from ._tap import Tap +from .sections.abc import PipelineSection class Pipeline: """The main Slurry ``Pipeline`` class. @@ -22,7 +23,7 @@ class Pipeline: * ``nursery``: The :class:`trio.Nursery` that is executing the pipeline. """ - def __init__(self, *sections: Sequence["PipelineSection"], + def __init__(self, *sections: PipelineSection, nursery: trio.Nursery, enabled: trio.Event): self.sections = sections @@ -32,10 +33,10 @@ def __init__(self, *sections: Sequence["PipelineSection"], @classmethod @asynccontextmanager - async def create(cls, *sections: Sequence["PipelineSection"]) -> AsyncContextManager["Pipeline"]: + async def create(cls, *sections: PipelineSection) -> AsyncGenerator["Pipeline", None]: """Creates a new pipeline context and adds the given section sequence to it. - :param Sequence[PipelineSection] \\*sections: One or more + :param PipelineSection \\*sections: One or more :mod:`PipelineSection ` compatible objects. """ async with trio.open_nursery() as nursery: @@ -69,7 +70,7 @@ def tap(self, *, max_buffer_size: int = 0, timeout: float = math.inf, retrys: int = 0, - start: bool = True) -> trio.MemoryReceiveChannel: + start: bool = True) -> trio.MemoryReceiveChannel[Any]: # pylint: disable=line-too-long """Create a new output channel for this pipeline. @@ -103,7 +104,7 @@ def tap(self, *, self._enabled.set() return receive_channel - def extend(self, *sections: Sequence["PipelineSection"], start: bool = False) -> "Pipeline": + def extend(self, *sections: PipelineSection, start: bool = False) -> "Pipeline": """Extend this pipeline into a new pipeline. An extension will add a tap to the existing pipeline and use this tap as input to the @@ -112,7 +113,7 @@ def extend(self, *sections: Sequence["PipelineSection"], start: bool = False) -> Extensions can be added dynamically during runtime. The data feed will start at the current position. Old events won't be replayed. - :param Sequence[PipelineSection] \\*sections: One or more pipeline sections. + :param PipelineSection \\*sections: One or more pipeline sections. :param bool start: Start processing when adding this extension. (default: ``False``) """ pipeline = Pipeline( diff --git a/slurry/_tap.py b/slurry/_tap.py index eb06644..6995ce5 100644 --- a/slurry/_tap.py +++ b/slurry/_tap.py @@ -12,7 +12,7 @@ class Tap: :meth:`slurry.pipeline.Pipeline.tap`. :param send_channel: The output to which items are sent. - :type send_channel: trio.MemorySendChannel + :type send_channel: trio.MemorySendChannel[Any] :param timeout: Seconds to wait for receiver to respond. :type timeout: float :param retrys: Number of times to reattempt a send that timed out. diff --git a/slurry/environments/_multiprocessing.py b/slurry/environments/_multiprocessing.py index 7fbe9b6..e7b454b 100644 --- a/slurry/environments/_multiprocessing.py +++ b/slurry/environments/_multiprocessing.py @@ -1,7 +1,7 @@ """Implements a section that runs in an independent python proces.""" from multiprocessing import Process, SimpleQueue -from typing import Any, Iterable, Callable +from typing import Any, AsyncIterable, Awaitable, Callable, Optional import trio @@ -19,7 +19,7 @@ class ProcessSection(SyncSection): `_. """ - async def pump(self, input: Iterable[Any], output: Callable[[Any], None]): + async def pump(self, input: Optional[AsyncIterable[Any]], output: Callable[[Any], Awaitable[None]]): """ The ``ProcessSection`` pump method works similar to the threaded version, however since communication between processes is not as simple as it is between threads, diff --git a/slurry/environments/_threading.py b/slurry/environments/_threading.py index c0182e8..f947710 100644 --- a/slurry/environments/_threading.py +++ b/slurry/environments/_threading.py @@ -1,5 +1,5 @@ """The threading module implements a synchronous section that runs in a background thread.""" -from typing import Any, Iterable, Callable +from typing import Any, AsyncIterable, Awaitable, Callable, Optional import trio @@ -12,8 +12,8 @@ class ThreadSection(SyncSection): """ async def pump(self, - input: Iterable[Any], - output: Callable[[Any], None]): + input: Optional[AsyncIterable[Any]], + output: Callable[[Any], Awaitable[None]]): """Runs the refine method in a background thread with synchronous input and output wrappers, which transparently bridges the input and outputs between the parent trio event loop and the sync world. diff --git a/slurry/sections/_buffers.py b/slurry/sections/_buffers.py index 2ff434c..6d51567 100644 --- a/slurry/sections/_buffers.py +++ b/slurry/sections/_buffers.py @@ -90,7 +90,7 @@ class Group(TrioSection): :type reducer: Optional[Callable[[Sequence[Any]], Any]] """ def __init__(self, interval: float, source: Optional[AsyncIterable[Any]] = None, *, - max_size: int = math.inf, + max_size: float = math.inf, mapper: Optional[Callable[[Any], Any]] = None, reducer: Optional[Callable[[Sequence[Any]], Any]] = None): super().__init__() diff --git a/slurry/sections/_combiners.py b/slurry/sections/_combiners.py index 6be1bde..7c79383 100644 --- a/slurry/sections/_combiners.py +++ b/slurry/sections/_combiners.py @@ -1,12 +1,12 @@ """Pipeline sections for combining multiple inputs into a single output.""" import builtins import itertools -from typing import Any, AsyncIterable, Sequence import trio from async_generator import aclosing from ..environments import TrioSection +from .abc import PipelineSection from .weld import weld class Chain(TrioSection): @@ -21,13 +21,12 @@ class Chain(TrioSection): By default, the input is added as the first source. If the input is added last instead of first, it will cause backpressure to be applied upstream. - :param sources: One or more ``PipelineSection`` that will be chained together. - :type sources: Sequence[PipelineSection] + :param PipelineSection \\*sources: One or more ``PipelineSection`` that will be chained together. :param place_input: Iteration priority of the pipeline input source. Options: ``'first'`` (default) \\| ``'last'``. :type place_input: string """ - def __init__(self, *sources: Sequence["PipelineSection"], place_input='first'): + def __init__(self, *sources: PipelineSection, place_input: str = 'first'): super().__init__() self.sources = sources self.place_input = _validate_place_input(place_input) @@ -57,10 +56,10 @@ class Merge(TrioSection): Sources can be pipeline sections, which will be treated as first sections, with no input. Merge will take care of running the pump task for these sections. - :param sources: One or more async iterables or sections who's contents will be merged. - :type sources: Sequence[PipelineSection] + :param PipelineSection \\*sources: One or more async iterables or sections whose contents + will be merged. """ - def __init__(self, *sources: Sequence["PipelineSection"]): + def __init__(self, *sources: PipelineSection): super().__init__() self.sources = sources @@ -90,13 +89,13 @@ class Zip(TrioSection): If sources are out of sync, the fastest source will have to wait for the slowest, which will cause backpressure. - :param sources: One or more ``PipelineSection``, who's contents will be zipped. - :type sources: Sequence[PipelineSection] + :param PipelineSection \\*sources: One or more ``PipelineSection``, whose contents will be + zipped. :param place_input: Position of the pipeline input source in the output tuple. Options: ``'first'`` (default) \\| ``'last'``. :type place_input: string """ - def __init__(self, *sources: Sequence["PipelineSection"], place_input='first'): + def __init__(self, *sources: PipelineSection, place_input: str = 'first'): super().__init__() self.sources = sources self.place_input = _validate_place_input(place_input) @@ -148,8 +147,8 @@ class ZipLatest(TrioSection): If any single source is exhausted, all remaining sources will be forcibly closed, and the pipeline will stop. - :param sources: One or more async iterables that will be zipped together. - :type sources: Sequence[AsyncIterable[Any]] + :param PipelineSection \\*sources: One or more ``PipelineSection`` that will be zipped + together. :param partial: If ``True`` (default) output will be sent as soon as the first input arrives. Otherwise, all main sources must send at least one item, before an output is generated. :type partial: bool @@ -165,7 +164,7 @@ class ZipLatest(TrioSection): Defaults to ``False`` :type monitor_input: bool """ - def __init__(self, *sources: Sequence["PipelineSection"], + def __init__(self, *sources: PipelineSection, partial=True, default=None, monitor=(), diff --git a/slurry/sections/abc.py b/slurry/sections/abc.py index 52ec016..f6bfa69 100644 --- a/slurry/sections/abc.py +++ b/slurry/sections/abc.py @@ -1,6 +1,8 @@ """ Abstract Base Classes for building pipeline sections. """ from abc import ABC, abstractmethod -from typing import Any, AsyncIterable, Awaitable, Callable, Iterable, Optional +from typing import Any, AsyncIterable, Awaitable, Callable, Iterable, Optional, Tuple, Union + +PipelineSection = Union["Section", Tuple["PipelineSection", ...]] class Section(ABC): """Defines the basic environment api.""" diff --git a/slurry/sections/weld.py b/slurry/sections/weld.py index 6814a15..784a6a9 100644 --- a/slurry/sections/weld.py +++ b/slurry/sections/weld.py @@ -1,23 +1,22 @@ """Contains the `weld` utility function for composing sections.""" -from typing import Any, AsyncIterable, Optional, Sequence +from typing import Any, AsyncIterable, Optional, cast import trio -from .abc import Section +from .abc import PipelineSection, Section -def weld(nursery, *sections: Sequence["PipelineSection"]) -> AsyncIterable[Any]: +def weld(nursery, *sections: PipelineSection) -> AsyncIterable[Any]: """ Connects the individual parts of a sequence of pipeline sections together and starts pumps for individual Sections. It returns an async iterable which yields results of the sequence. :param nursery: The nursery that runs individual pipeline section pumps. :type nursery: :class:`trio.Nursery` - :param \\*sections: A sequence of pipeline sections. - :type \\*sections: Sequence[PipelineSection] + :param PipelineSection \\*sections: Pipeline sections. """ - async def pump(section, input: Optional[AsyncIterable[Any]], output: trio.MemorySendChannel): + async def pump(section, input: Optional[AsyncIterable[Any]], output: trio.MemorySendChannel[Any]): try: await section.pump(input, output.send) except trio.BrokenResourceError: @@ -43,4 +42,4 @@ async def pump(section, input: Optional[AsyncIterable[Any]], output: trio.Memory output = section section_input = output - return output + return cast(AsyncIterable[Any], output)