Skip to content

Commit

Permalink
Typing fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
mikenerone committed Jan 17, 2024
1 parent e88c5ec commit 1f10f4b
Show file tree
Hide file tree
Showing 8 changed files with 36 additions and 35 deletions.
15 changes: 8 additions & 7 deletions slurry/_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
"""Contains the main Slurry ``Pipeline`` class."""

import math
from typing import AsyncContextManager, Sequence
from typing import Any, AsyncGenerator

import trio
from async_generator import aclosing, asynccontextmanager

from .sections.weld import weld
from ._tap import Tap
from .sections.abc import PipelineSection

class Pipeline:
"""The main Slurry ``Pipeline`` class.
Expand All @@ -22,7 +23,7 @@ class Pipeline:
* ``nursery``: The :class:`trio.Nursery` that is executing the pipeline.
"""
def __init__(self, *sections: Sequence["PipelineSection"],
def __init__(self, *sections: PipelineSection,
nursery: trio.Nursery,
enabled: trio.Event):
self.sections = sections
Expand All @@ -32,10 +33,10 @@ def __init__(self, *sections: Sequence["PipelineSection"],

@classmethod
@asynccontextmanager
async def create(cls, *sections: Sequence["PipelineSection"]) -> AsyncContextManager["Pipeline"]:
async def create(cls, *sections: PipelineSection) -> AsyncGenerator["Pipeline", None]:
"""Creates a new pipeline context and adds the given section sequence to it.
:param Sequence[PipelineSection] \\*sections: One or more
:param PipelineSection \\*sections: One or more
:mod:`PipelineSection <slurry.sections.weld>` compatible objects.
"""
async with trio.open_nursery() as nursery:
Expand Down Expand Up @@ -69,7 +70,7 @@ def tap(self, *,
max_buffer_size: int = 0,
timeout: float = math.inf,
retrys: int = 0,
start: bool = True) -> trio.MemoryReceiveChannel:
start: bool = True) -> trio.MemoryReceiveChannel[Any]:
# pylint: disable=line-too-long
"""Create a new output channel for this pipeline.
Expand Down Expand Up @@ -103,7 +104,7 @@ def tap(self, *,
self._enabled.set()
return receive_channel

def extend(self, *sections: Sequence["PipelineSection"], start: bool = False) -> "Pipeline":
def extend(self, *sections: PipelineSection, start: bool = False) -> "Pipeline":
"""Extend this pipeline into a new pipeline.
An extension will add a tap to the existing pipeline and use this tap as input to the
Expand All @@ -112,7 +113,7 @@ def extend(self, *sections: Sequence["PipelineSection"], start: bool = False) ->
Extensions can be added dynamically during runtime. The data feed
will start at the current position. Old events won't be replayed.
:param Sequence[PipelineSection] \\*sections: One or more pipeline sections.
:param PipelineSection \\*sections: One or more pipeline sections.
:param bool start: Start processing when adding this extension. (default: ``False``)
"""
pipeline = Pipeline(
Expand Down
2 changes: 1 addition & 1 deletion slurry/_tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class Tap:
:meth:`slurry.pipeline.Pipeline.tap`.
:param send_channel: The output to which items are sent.
:type send_channel: trio.MemorySendChannel
:type send_channel: trio.MemorySendChannel[Any]
:param timeout: Seconds to wait for receiver to respond.
:type timeout: float
:param retrys: Number of times to reattempt a send that timed out.
Expand Down
4 changes: 2 additions & 2 deletions slurry/environments/_multiprocessing.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Implements a section that runs in an independent python proces."""

from multiprocessing import Process, SimpleQueue
from typing import Any, Iterable, Callable
from typing import Any, AsyncIterable, Awaitable, Callable, Optional

import trio

Expand All @@ -19,7 +19,7 @@ class ProcessSection(SyncSection):
<https://docs.python.org/3/library/pickle.html#what-can-be-pickled-and-unpickled>`_.
"""

async def pump(self, input: Iterable[Any], output: Callable[[Any], None]):
async def pump(self, input: Optional[AsyncIterable[Any]], output: Callable[[Any], Awaitable[None]]):
"""
The ``ProcessSection`` pump method works similar to the threaded version, however
since communication between processes is not as simple as it is between threads,
Expand Down
6 changes: 3 additions & 3 deletions slurry/environments/_threading.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""The threading module implements a synchronous section that runs in a background thread."""
from typing import Any, Iterable, Callable
from typing import Any, AsyncIterable, Awaitable, Callable, Optional

import trio

Expand All @@ -12,8 +12,8 @@ class ThreadSection(SyncSection):
"""

async def pump(self,
input: Iterable[Any],
output: Callable[[Any], None]):
input: Optional[AsyncIterable[Any]],
output: Callable[[Any], Awaitable[None]]):
"""Runs the refine method in a background thread with synchronous input and output
wrappers, which transparently bridges the input and outputs between the parent
trio event loop and the sync world.
Expand Down
2 changes: 1 addition & 1 deletion slurry/sections/_buffers.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class Group(TrioSection):
:type reducer: Optional[Callable[[Sequence[Any]], Any]]
"""
def __init__(self, interval: float, source: Optional[AsyncIterable[Any]] = None, *,
max_size: int = math.inf,
max_size: float = math.inf,
mapper: Optional[Callable[[Any], Any]] = None,
reducer: Optional[Callable[[Sequence[Any]], Any]] = None):
super().__init__()
Expand Down
25 changes: 12 additions & 13 deletions slurry/sections/_combiners.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
"""Pipeline sections for combining multiple inputs into a single output."""
import builtins
import itertools
from typing import Any, AsyncIterable, Sequence

import trio
from async_generator import aclosing

from ..environments import TrioSection
from .abc import PipelineSection
from .weld import weld

class Chain(TrioSection):
Expand All @@ -21,13 +21,12 @@ class Chain(TrioSection):
By default, the input is added as the first source. If the input is added last instead
of first, it will cause backpressure to be applied upstream.
:param sources: One or more ``PipelineSection`` that will be chained together.
:type sources: Sequence[PipelineSection]
:param PipelineSection \\*sources: One or more ``PipelineSection`` that will be chained together.
:param place_input: Iteration priority of the pipeline input source. Options:
``'first'`` (default) \\| ``'last'``.
:type place_input: string
"""
def __init__(self, *sources: Sequence["PipelineSection"], place_input='first'):
def __init__(self, *sources: PipelineSection, place_input: str = 'first'):
super().__init__()
self.sources = sources
self.place_input = _validate_place_input(place_input)
Expand Down Expand Up @@ -57,10 +56,10 @@ class Merge(TrioSection):
Sources can be pipeline sections, which will be treated as first sections, with
no input. Merge will take care of running the pump task for these sections.
:param sources: One or more async iterables or sections who's contents will be merged.
:type sources: Sequence[PipelineSection]
:param PipelineSection \\*sources: One or more async iterables or sections whose contents
will be merged.
"""
def __init__(self, *sources: Sequence["PipelineSection"]):
def __init__(self, *sources: PipelineSection):
super().__init__()
self.sources = sources

Expand Down Expand Up @@ -90,13 +89,13 @@ class Zip(TrioSection):
If sources are out of sync, the fastest source will have to wait for the slowest, which
will cause backpressure.
:param sources: One or more ``PipelineSection``, who's contents will be zipped.
:type sources: Sequence[PipelineSection]
:param PipelineSection \\*sources: One or more ``PipelineSection``, whose contents will be
zipped.
:param place_input: Position of the pipeline input source in the output tuple. Options:
``'first'`` (default) \\| ``'last'``.
:type place_input: string
"""
def __init__(self, *sources: Sequence["PipelineSection"], place_input='first'):
def __init__(self, *sources: PipelineSection, place_input: str = 'first'):
super().__init__()
self.sources = sources
self.place_input = _validate_place_input(place_input)
Expand Down Expand Up @@ -148,8 +147,8 @@ class ZipLatest(TrioSection):
If any single source is exhausted, all remaining sources will be forcibly closed, and
the pipeline will stop.
:param sources: One or more async iterables that will be zipped together.
:type sources: Sequence[AsyncIterable[Any]]
:param PipelineSection \\*sources: One or more ``PipelineSection`` that will be zipped
together.
:param partial: If ``True`` (default) output will be sent as soon as the first input arrives.
Otherwise, all main sources must send at least one item, before an output is generated.
:type partial: bool
Expand All @@ -165,7 +164,7 @@ class ZipLatest(TrioSection):
Defaults to ``False``
:type monitor_input: bool
"""
def __init__(self, *sources: Sequence["PipelineSection"],
def __init__(self, *sources: PipelineSection,
partial=True,
default=None,
monitor=(),
Expand Down
4 changes: 3 additions & 1 deletion slurry/sections/abc.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
""" Abstract Base Classes for building pipeline sections. """
from abc import ABC, abstractmethod
from typing import Any, AsyncIterable, Awaitable, Callable, Iterable, Optional
from typing import Any, AsyncIterable, Awaitable, Callable, Iterable, Optional, Tuple, Union

PipelineSection = Union["Section", Tuple["PipelineSection", ...]]

class Section(ABC):
"""Defines the basic environment api."""
Expand Down
13 changes: 6 additions & 7 deletions slurry/sections/weld.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,22 @@
"""Contains the `weld` utility function for composing sections."""

from typing import Any, AsyncIterable, Optional, Sequence
from typing import Any, AsyncIterable, Optional, cast

import trio

from .abc import Section
from .abc import PipelineSection, Section

def weld(nursery, *sections: Sequence["PipelineSection"]) -> AsyncIterable[Any]:
def weld(nursery, *sections: PipelineSection) -> AsyncIterable[Any]:
"""
Connects the individual parts of a sequence of pipeline sections together and starts pumps for
individual Sections. It returns an async iterable which yields results of the sequence.
:param nursery: The nursery that runs individual pipeline section pumps.
:type nursery: :class:`trio.Nursery`
:param \\*sections: A sequence of pipeline sections.
:type \\*sections: Sequence[PipelineSection]
:param PipelineSection \\*sections: Pipeline sections.
"""

async def pump(section, input: Optional[AsyncIterable[Any]], output: trio.MemorySendChannel):
async def pump(section, input: Optional[AsyncIterable[Any]], output: trio.MemorySendChannel[Any]):
try:
await section.pump(input, output.send)
except trio.BrokenResourceError:
Expand All @@ -43,4 +42,4 @@ async def pump(section, input: Optional[AsyncIterable[Any]], output: trio.Memory
output = section
section_input = output

return output
return cast(AsyncIterable[Any], output)

0 comments on commit 1f10f4b

Please sign in to comment.