Skip to content

Commit

Permalink
Merge pull request andersea#15 from mikenerone/mikenerone/typing-fixes
Browse files Browse the repository at this point in the history
General typing fixes
  • Loading branch information
andersea authored Jan 17, 2024
2 parents 3333520 + 1f10f4b commit bdb467a
Show file tree
Hide file tree
Showing 14 changed files with 55 additions and 54 deletions.
14 changes: 7 additions & 7 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ classifiers = [
[tool.poetry.dependencies]
async-generator = "^1.10"
python = "^3.8"
trio = "^0.23.0"
trio = ">=0.23.0"

[tool.poetry.dev-dependencies]
pytest = "^7.2.0"
Expand Down
2 changes: 1 addition & 1 deletion slurry/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""An async streaming data processing framework."""
__version__ = '1.3.1'

from ._pipeline import Pipeline
from ._pipeline import Pipeline as Pipeline
15 changes: 8 additions & 7 deletions slurry/_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
"""Contains the main Slurry ``Pipeline`` class."""

import math
from typing import AsyncContextManager, Sequence
from typing import Any, AsyncGenerator

import trio
from async_generator import aclosing, asynccontextmanager

from .sections.weld import weld
from ._tap import Tap
from .sections.abc import PipelineSection

class Pipeline:
"""The main Slurry ``Pipeline`` class.
Expand All @@ -22,7 +23,7 @@ class Pipeline:
* ``nursery``: The :class:`trio.Nursery` that is executing the pipeline.
"""
def __init__(self, *sections: Sequence["PipelineSection"],
def __init__(self, *sections: PipelineSection,
nursery: trio.Nursery,
enabled: trio.Event):
self.sections = sections
Expand All @@ -32,10 +33,10 @@ def __init__(self, *sections: Sequence["PipelineSection"],

@classmethod
@asynccontextmanager
async def create(cls, *sections: Sequence["PipelineSection"]) -> AsyncContextManager["Pipeline"]:
async def create(cls, *sections: PipelineSection) -> AsyncGenerator["Pipeline", None]:
"""Creates a new pipeline context and adds the given section sequence to it.
:param Sequence[PipelineSection] \\*sections: One or more
:param PipelineSection \\*sections: One or more
:mod:`PipelineSection <slurry.sections.weld>` compatible objects.
"""
async with trio.open_nursery() as nursery:
Expand Down Expand Up @@ -69,7 +70,7 @@ def tap(self, *,
max_buffer_size: int = 0,
timeout: float = math.inf,
retrys: int = 0,
start: bool = True) -> trio.MemoryReceiveChannel:
start: bool = True) -> trio.MemoryReceiveChannel[Any]:
# pylint: disable=line-too-long
"""Create a new output channel for this pipeline.
Expand Down Expand Up @@ -103,7 +104,7 @@ def tap(self, *,
self._enabled.set()
return receive_channel

def extend(self, *sections: Sequence["PipelineSection"], start: bool = False) -> "Pipeline":
def extend(self, *sections: PipelineSection, start: bool = False) -> "Pipeline":
"""Extend this pipeline into a new pipeline.
An extension will add a tap to the existing pipeline and use this tap as input to the
Expand All @@ -112,7 +113,7 @@ def extend(self, *sections: Sequence["PipelineSection"], start: bool = False) ->
Extensions can be added dynamically during runtime. The data feed
will start at the current position. Old events won't be replayed.
:param Sequence[PipelineSection] \\*sections: One or more pipeline sections.
:param PipelineSection \\*sections: One or more pipeline sections.
:param bool start: Start processing when adding this extension. (default: ``False``)
"""
pipeline = Pipeline(
Expand Down
2 changes: 1 addition & 1 deletion slurry/_tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class Tap:
:meth:`slurry.pipeline.Pipeline.tap`.
:param send_channel: The output to which items are sent.
:type send_channel: trio.MemorySendChannel
:type send_channel: trio.MemorySendChannel[Any]
:param timeout: Seconds to wait for receiver to respond.
:type timeout: float
:param retrys: Number of times to reattempt a send that timed out.
Expand Down
6 changes: 3 additions & 3 deletions slurry/environments/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from ._trio import TrioSection
from ._threading import ThreadSection
from ._multiprocessing import ProcessSection
from ._trio import TrioSection as TrioSection
from ._threading import ThreadSection as ThreadSection
from ._multiprocessing import ProcessSection as ProcessSection
4 changes: 2 additions & 2 deletions slurry/environments/_multiprocessing.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Implements a section that runs in an independent python proces."""

from multiprocessing import Process, SimpleQueue
from typing import Any, Iterable, Callable
from typing import Any, AsyncIterable, Awaitable, Callable, Optional

import trio

Expand All @@ -19,7 +19,7 @@ class ProcessSection(SyncSection):
<https://docs.python.org/3/library/pickle.html#what-can-be-pickled-and-unpickled>`_.
"""

async def pump(self, input: Iterable[Any], output: Callable[[Any], None]):
async def pump(self, input: Optional[AsyncIterable[Any]], output: Callable[[Any], Awaitable[None]]):
"""
The ``ProcessSection`` pump method works similar to the threaded version, however
since communication between processes is not as simple as it is between threads,
Expand Down
6 changes: 3 additions & 3 deletions slurry/environments/_threading.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""The threading module implements a synchronous section that runs in a background thread."""
from typing import Any, Iterable, Callable
from typing import Any, AsyncIterable, Awaitable, Callable, Optional

import trio

Expand All @@ -12,8 +12,8 @@ class ThreadSection(SyncSection):
"""

async def pump(self,
input: Iterable[Any],
output: Callable[[Any], None]):
input: Optional[AsyncIterable[Any]],
output: Callable[[Any], Awaitable[None]]):
"""Runs the refine method in a background thread with synchronous input and output
wrappers, which transparently bridges the input and outputs between the parent
trio event loop and the sync world.
Expand Down
10 changes: 5 additions & 5 deletions slurry/sections/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""A collection of common stream operations."""
from ._buffers import Window, Group, Delay
from ._combiners import Chain, Merge, Zip, ZipLatest
from ._filters import Skip, SkipWhile, Filter, Changes, RateLimit
from ._producers import Repeat, Metronome, InsertValue
from ._refiners import Map
from ._buffers import Window as Window, Group as Group, Delay as Delay
from ._combiners import Chain as Chain, Merge as Merge, Zip as Zip, ZipLatest as ZipLatest
from ._filters import Skip as Skip, SkipWhile as SkipWhile, Filter as Filter, Changes as Changes, RateLimit as RateLimit
from ._producers import Repeat as Repeat, Metronome as Metronome, InsertValue as InsertValue
from ._refiners import Map as Map
2 changes: 1 addition & 1 deletion slurry/sections/_buffers.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class Group(TrioSection):
:type reducer: Optional[Callable[[Sequence[Any]], Any]]
"""
def __init__(self, interval: float, source: Optional[AsyncIterable[Any]] = None, *,
max_size: int = math.inf,
max_size: float = math.inf,
mapper: Optional[Callable[[Any], Any]] = None,
reducer: Optional[Callable[[Sequence[Any]], Any]] = None):
super().__init__()
Expand Down
27 changes: 13 additions & 14 deletions slurry/sections/_combiners.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
"""Pipeline sections for combining multiple inputs into a single output."""
import builtins
import itertools
from typing import Any, AsyncIterable, Sequence

import trio
from async_generator import aclosing

from ..environments import TrioSection
from .abc import PipelineSection
from .weld import weld

class Chain(TrioSection):
Expand All @@ -21,13 +21,12 @@ class Chain(TrioSection):
By default, the input is added as the first source. If the input is added last instead
of first, it will cause backpressure to be applied upstream.
:param sources: One or more ``PipelineSection`` that will be chained together.
:type sources: Sequence[PipelineSection]
:param PipelineSection \\*sources: One or more ``PipelineSection`` that will be chained together.
:param place_input: Iteration priority of the pipeline input source. Options:
``'first'`` (default) \\| ``'last'``.
:type place_input: string
"""
def __init__(self, *sources: Sequence["PipelineSection"], place_input='first'):
def __init__(self, *sources: PipelineSection, place_input: str = 'first'):
super().__init__()
self.sources = sources
self.place_input = _validate_place_input(place_input)
Expand Down Expand Up @@ -57,10 +56,10 @@ class Merge(TrioSection):
Sources can be pipeline sections, which will be treated as first sections, with
no input. Merge will take care of running the pump task for these sections.
:param sources: One or more async iterables or sections who's contents will be merged.
:type sources: Sequence[PipelineSection]
:param PipelineSection \\*sources: One or more async iterables or sections whose contents
will be merged.
"""
def __init__(self, *sources: Sequence["PipelineSection"]):
def __init__(self, *sources: PipelineSection):
super().__init__()
self.sources = sources

Expand Down Expand Up @@ -90,13 +89,13 @@ class Zip(TrioSection):
If sources are out of sync, the fastest source will have to wait for the slowest, which
will cause backpressure.
:param sources: One or more ``PipelineSection``, who's contents will be zipped.
:type sources: Sequence[PipelineSection]
:param PipelineSection \\*sources: One or more ``PipelineSection``, whose contents will be
zipped.
:param place_input: Position of the pipeline input source in the output tuple. Options:
``'first'`` (default) \\| ``'last'``.
:type place_input: string
"""
def __init__(self, *sources: Sequence["PipelineSection"], place_input='first'):
def __init__(self, *sources: PipelineSection, place_input: str = 'first'):
super().__init__()
self.sources = sources
self.place_input = _validate_place_input(place_input)
Expand Down Expand Up @@ -145,11 +144,11 @@ class ZipLatest(TrioSection):
added as an input.
.. Note::
If any single source is exchausted, all remaining sources will be forcibly closed, and
If any single source is exhausted, all remaining sources will be forcibly closed, and
the pipeline will stop.
:param sources: One or more async iterables that will be zipped together.
:type sources: Sequence[AsyncIterable[Any]]
:param PipelineSection \\*sources: One or more ``PipelineSection`` that will be zipped
together.
:param partial: If ``True`` (default) output will be sent as soon as the first input arrives.
Otherwise, all main sources must send at least one item, before an output is generated.
:type partial: bool
Expand All @@ -165,7 +164,7 @@ class ZipLatest(TrioSection):
Defaults to ``False``
:type monitor_input: bool
"""
def __init__(self, *sources: Sequence["PipelineSection"],
def __init__(self, *sources: PipelineSection,
partial=True,
default=None,
monitor=(),
Expand Down
2 changes: 1 addition & 1 deletion slurry/sections/_producers.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class Metronome(TrioSection):
If used as a middle section, the input can be used to set the value that is sent. When
an input is received, it is stored and send at the next tick of the clock. If multiple
inputs are received during a tick, only the latest is sent. The preceeding inputs are
inputs are received during a tick, only the latest is sent. The preceding inputs are
dropped.
When an input is used, closure of the input stream will cause the metronome to close as well.
Expand Down
4 changes: 3 additions & 1 deletion slurry/sections/abc.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
""" Abstract Base Classes for building pipeline sections. """
from abc import ABC, abstractmethod
from typing import Any, AsyncIterable, Awaitable, Callable, Iterable, Optional
from typing import Any, AsyncIterable, Awaitable, Callable, Iterable, Optional, Tuple, Union

PipelineSection = Union["Section", Tuple["PipelineSection", ...]]

class Section(ABC):
"""Defines the basic environment api."""
Expand Down
13 changes: 6 additions & 7 deletions slurry/sections/weld.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,22 @@
"""Contains the `weld` utility function for composing sections."""

from typing import Any, AsyncIterable, Optional, Sequence
from typing import Any, AsyncIterable, Optional, cast

import trio

from .abc import Section
from .abc import PipelineSection, Section

def weld(nursery, *sections: Sequence["PipelineSection"]) -> AsyncIterable[Any]:
def weld(nursery, *sections: PipelineSection) -> AsyncIterable[Any]:
"""
Connects the individual parts of a sequence of pipeline sections together and starts pumps for
individual Sections. It returns an async iterable which yields results of the sequence.
:param nursery: The nursery that runs individual pipeline section pumps.
:type nursery: :class:`trio.Nursery`
:param \\*sections: A sequence of pipeline sections.
:type \\*sections: Sequence[PipelineSection]
:param PipelineSection \\*sections: Pipeline sections.
"""

async def pump(section, input: Optional[AsyncIterable[Any]], output: trio.MemorySendChannel):
async def pump(section, input: Optional[AsyncIterable[Any]], output: trio.MemorySendChannel[Any]):
try:
await section.pump(input, output.send)
except trio.BrokenResourceError:
Expand All @@ -43,4 +42,4 @@ async def pump(section, input: Optional[AsyncIterable[Any]], output: trio.Memory
output = section
section_input = output

return output
return cast(AsyncIterable[Any], output)

0 comments on commit bdb467a

Please sign in to comment.