Skip to content

Commit

Permalink
Fix typing violations (and referring docs where changed).
Browse files Browse the repository at this point in the history
Plus a couple of typos I happened to notice.
  • Loading branch information
mikenerone committed Jan 4, 2024
1 parent 5a5d7f9 commit f942c5f
Show file tree
Hide file tree
Showing 11 changed files with 52 additions and 48 deletions.
2 changes: 1 addition & 1 deletion slurry/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""An async streaming data processing framework."""
__version__ = '1.3.1'

from ._pipeline import Pipeline
from ._pipeline import Pipeline as Pipeline
18 changes: 10 additions & 8 deletions slurry/_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
"""Contains the main Slurry ``Pipeline`` class."""

import math
from typing import AsyncContextManager, Sequence
from typing import Any, AsyncGenerator
from contextlib import asynccontextmanager

import trio
from async_generator import aclosing, asynccontextmanager
from async_generator import aclosing

from .sections.weld import weld
from ._tap import Tap
from ._types import PipelineSection

class Pipeline:
"""The main Slurry ``Pipeline`` class.
Expand All @@ -22,7 +24,7 @@ class Pipeline:
* ``nursery``: The :class:`trio.Nursery` that is executing the pipeline.
"""
def __init__(self, *sections: Sequence["PipelineSection"],
def __init__(self, *sections: PipelineSection,
nursery: trio.Nursery,
enabled: trio.Event):
self.sections = sections
Expand All @@ -32,10 +34,10 @@ def __init__(self, *sections: Sequence["PipelineSection"],

@classmethod
@asynccontextmanager
async def create(cls, *sections: Sequence["PipelineSection"]) -> AsyncContextManager["Pipeline"]:
async def create(cls, *sections: PipelineSection) -> AsyncGenerator["Pipeline", None]:
"""Creates a new pipeline context and adds the given section sequence to it.
:param Sequence[PipelineSection] \\*sections: One or more
:param PipelineSection \\*sections: One or more
:mod:`PipelineSection <slurry.sections.weld>` compatible objects.
"""
async with trio.open_nursery() as nursery:
Expand Down Expand Up @@ -69,7 +71,7 @@ def tap(self, *,
max_buffer_size: int = 0,
timeout: float = math.inf,
retrys: int = 0,
start: bool = True) -> trio.MemoryReceiveChannel:
start: bool = True) -> trio.MemoryReceiveChannel[Any]:
# pylint: disable=line-too-long
"""Create a new output channel for this pipeline.
Expand Down Expand Up @@ -103,7 +105,7 @@ def tap(self, *,
self._enabled.set()
return receive_channel

def extend(self, *sections: Sequence["PipelineSection"], start: bool = False) -> "Pipeline":
def extend(self, *sections: PipelineSection, start: bool = False) -> "Pipeline":
"""Extend this pipeline into a new pipeline.
An extension will add a tap to the existing pipeline and use this tap as input to the
Expand All @@ -112,7 +114,7 @@ def extend(self, *sections: Sequence["PipelineSection"], start: bool = False) ->
Extensions can be added dynamically during runtime. The data feed
will start at the current position. Old events won't be replayed.
:param Sequence[PipelineSection] \\*sections: One or more pipeline sections.
:param PipelineSection \\*sections: One or more pipeline sections.
:param bool start: Start processing when adding this extension. (default: ``False``)
"""
pipeline = Pipeline(
Expand Down
6 changes: 5 additions & 1 deletion slurry/_types.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
from typing import Awaitable, Protocol, runtime_checkable
from typing import Any, AsyncIterable, Awaitable, Protocol, Tuple, Union, runtime_checkable

from .sections.abc import Section

PipelineSection = Union[AsyncIterable[Any], Section, Tuple["PipelineSection", ...]]

@runtime_checkable
class SupportsAclose(Protocol):
Expand Down
6 changes: 3 additions & 3 deletions slurry/environments/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from ._trio import TrioSection
from ._threading import ThreadSection
from ._multiprocessing import ProcessSection
from ._trio import TrioSection as TrioSection
from ._threading import ThreadSection as ThreadSection
from ._multiprocessing import ProcessSection as ProcessSection
4 changes: 2 additions & 2 deletions slurry/environments/_multiprocessing.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Implements a section that runs in an independent python proces."""

from multiprocessing import Process, SimpleQueue
from typing import Any, Iterable, Callable, cast
from typing import AsyncIterable, Any, Awaitable, Callable, Optional, cast

import trio

Expand All @@ -19,7 +19,7 @@ class ProcessSection(SyncSection):
<https://docs.python.org/3/library/pickle.html#what-can-be-pickled-and-unpickled>`_.
"""

async def pump(self, input: Iterable[Any], output: Callable[[Any], None]):
async def pump(self, input: Optional[AsyncIterable[Any]], output: Callable[[Any], Awaitable[None]]):
"""
The ``ProcessSection`` pump method works similar to the threaded version, however
since communication between processes is not as simple as it is between threads,
Expand Down
6 changes: 3 additions & 3 deletions slurry/environments/_threading.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""The threading module implements a synchronous section that runs in a background thread."""
from typing import Any, Iterable, Callable
from typing import Any, AsyncIterable, Awaitable, Callable, Optional

import trio

Expand All @@ -12,8 +12,8 @@ class ThreadSection(SyncSection):
"""

async def pump(self,
input: Iterable[Any],
output: Callable[[Any], None]):
input: Optional[AsyncIterable[Any]],
output: Callable[[Any], Awaitable[None]]):
"""Runs the refine method in a background thread with synchronous input and output
wrappers, which transparently bridges the input and outputs between the parent
trio event loop and the sync world.
Expand Down
10 changes: 5 additions & 5 deletions slurry/sections/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""A collection of common stream operations."""
from ._buffers import Window, Group, Delay
from ._combiners import Chain, Merge, Zip, ZipLatest
from ._filters import Skip, SkipWhile, Filter, Changes, RateLimit
from ._producers import Repeat, Metronome, InsertValue
from ._refiners import Map
from ._buffers import Window as Window, Group as Group, Delay as Delay
from ._combiners import Chain as Chain, Merge as Merge, Zip as Zip, ZipLatest as ZipLatest
from ._filters import Skip as Skip, SkipWhile as SkipWhile, Filter as Filter, Changes as Changes, RateLimit as RateLimit
from ._producers import Repeat as Repeat, Metronome as Metronome, InsertValue as InsertValue
from ._refiners import Map as Map
4 changes: 2 additions & 2 deletions slurry/sections/_buffers.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,13 @@ class Group(TrioSection):
:type reducer: Optional[Callable[[Sequence[Any]], Any]]
"""
def __init__(self, interval: float, source: Optional[AsyncIterable[Any]] = None, *,
max_size: int = math.inf,
max_size: Optional[int] = None,
mapper: Optional[Callable[[Any], Any]] = None,
reducer: Optional[Callable[[Sequence[Any]], Any]] = None):
super().__init__()
self.source = source
self.interval = interval
self.max_size = max_size
self.max_size = math.inf if max_size is None else max_size
self.mapper = mapper
self.reducer = reducer

Expand Down
30 changes: 14 additions & 16 deletions slurry/sections/_combiners.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
"""Pipeline sections for combining multiple inputs into a single output."""
import builtins
import itertools
from typing import Any, AsyncIterable, Sequence

import trio
from async_generator import aclosing

from ..environments import TrioSection
from .weld import weld
from .._types import PipelineSection

class Chain(TrioSection):
"""Chains input from one or more sources. Any valid ``PipelineSection`` is an allowed source.
Expand All @@ -21,13 +21,11 @@ class Chain(TrioSection):
By default, the input is added as the first source. If the input is added last instead
of first, it will cause backpressure to be applied upstream.
:param sources: One or more ``PipelineSection`` that will be chained together.
:type sources: Sequence[PipelineSection]
:param place_input: Iteration priority of the pipeline input source. Options:
:param PipelineSection \\*sources: One or more ``PipelineSection`` that will be chained together.
:param string place_input: Iteration priority of the pipeline input source. Options:
``'first'`` (default) \\| ``'last'``.
:type place_input: string
"""
def __init__(self, *sources: Sequence["PipelineSection"], place_input='first'):
def __init__(self, *sources: PipelineSection, place_input='first'):
super().__init__()
self.sources = sources
self.place_input = _validate_place_input(place_input)
Expand Down Expand Up @@ -57,10 +55,10 @@ class Merge(TrioSection):
Sources can be pipeline sections, which will be treated as first sections, with
no input. Merge will take care of running the pump task for these sections.
:param sources: One or more async iterables or sections who's contents will be merged.
:type sources: Sequence[PipelineSection]
:param PipelineSection \\*sources: One or more async iterables or sections whose contents
will be merged.
"""
def __init__(self, *sources: Sequence["PipelineSection"]):
def __init__(self, *sources: PipelineSection):
super().__init__()
self.sources = sources

Expand Down Expand Up @@ -90,13 +88,13 @@ class Zip(TrioSection):
If sources are out of sync, the fastest source will have to wait for the slowest, which
will cause backpressure.
:param sources: One or more ``PipelineSection``, who's contents will be zipped.
:type sources: Sequence[PipelineSection]
:param PipelineSection \\*sources: One or more ``PipelineSection``, whose contents will be
zipped.
:param place_input: Position of the pipeline input source in the output tuple. Options:
``'first'`` (default) \\| ``'last'``.
:type place_input: string
"""
def __init__(self, *sources: Sequence["PipelineSection"], place_input='first'):
def __init__(self, *sources: PipelineSection, place_input='first'):
super().__init__()
self.sources = sources
self.place_input = _validate_place_input(place_input)
Expand Down Expand Up @@ -145,11 +143,11 @@ class ZipLatest(TrioSection):
added as an input.
.. Note::
If any single source is exchausted, all remaining sources will be forcibly closed, and
If any single source is exhausted, all remaining sources will be forcibly closed, and
the pipeline will stop.
:param sources: One or more async iterables that will be zipped together.
:type sources: Sequence[AsyncIterable[Any]]
:param PipelineSection \\*sources: One or more ``PipelineSection`` that will be zipped
together.
:param partial: If ``True`` (default) output will be sent as soon as the first input arrives.
Otherwise, all main sources must send at least one item, before an output is generated.
:type partial: bool
Expand All @@ -165,7 +163,7 @@ class ZipLatest(TrioSection):
Defaults to ``False``
:type monitor_input: bool
"""
def __init__(self, *sources: Sequence["PipelineSection"],
def __init__(self, *sources: PipelineSection,
partial=True,
default=None,
monitor=(),
Expand Down
3 changes: 2 additions & 1 deletion slurry/sections/_producers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Pipeline sections that produce data streams."""
from time import time
from typing import Any
from typing import Any, AsyncIterable, cast

import trio
from async_generator import aclosing
Expand Down Expand Up @@ -94,6 +94,7 @@ async def refine(self, input, output):
# pylint: disable=line-too-long
raise RuntimeError('If Repeat is used as first section, default value must be provided.')

input = cast(AsyncIterable[Any], input)
item = self.default if self.has_default else None

async def pull_task(cancel_scope, *, task_status=trio.TASK_STATUS_IGNORED):
Expand Down
11 changes: 5 additions & 6 deletions slurry/sections/weld.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
"""Contains the `weld` utility function for composing sections."""

from typing import Any, AsyncIterable, Optional, Sequence
from typing import Any, AsyncIterable, Optional, cast

import trio

from .abc import Section
from .._types import SupportsAclose
from .._types import PipelineSection, SupportsAclose

def weld(nursery, *sections: Sequence["PipelineSection"]) -> AsyncIterable[Any]:
def weld(nursery, *sections: PipelineSection) -> AsyncIterable[Any]:
"""
Connects the individual parts of a sequence of pipeline sections together and starts pumps for
individual Sections. It returns an async iterable which yields results of the sequence.
:param nursery: The nursery that runs individual pipeline section pumps.
:type nursery: :class:`trio.Nursery`
:param \\*sections: A sequence of pipeline sections.
:type \\*sections: Sequence[PipelineSection]
:param PipelineSection \\*sections: Pipeline sections.
"""

async def pump(section, input: Optional[AsyncIterable[Any]], output: trio.MemorySendChannel):
Expand Down Expand Up @@ -44,4 +43,4 @@ async def pump(section, input: Optional[AsyncIterable[Any]], output: trio.Memory
output = section
section_input = output

return output
return cast(AsyncIterable[Any], output)

0 comments on commit f942c5f

Please sign in to comment.