Skip to content

Commit

Permalink
reorder data processor code
Browse files Browse the repository at this point in the history
This commit addresses comment:
#484 (comment)
It removes unrendered doc strings and
reorders data processor code so that
the data processor is on top of the
source files. In addtion it adds
`__all__` variables to limit imports
to the objects that constitute the
user-interface.

The commit also adds doc-string
rendering for the module
`datalad_next.runners.data_processors`
This includes markup fixes
  • Loading branch information
christian-monch committed Oct 26, 2023
1 parent 4e0ce6a commit 721775f
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 112 deletions.
16 changes: 15 additions & 1 deletion datalad_next/runners/data_processors/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,19 @@
""" This module contains data processors for the data pipeline processor
Available data processors:
.. currentmodule:: datalad_next.runner.data_processors
.. autosummary::
:toctree: generated
decode
jsonline
pattern
splitlines
"""

from .decode import decode_processor
from .jsonline import jsonline_processor
from .pattern import pattern_processor
from .splitlines import splitlines_processor
from .splitlines import splitlines_processor
59 changes: 29 additions & 30 deletions datalad_next/runners/data_processors/decode.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
""" This module contains data processors for the data pipeline processor
""" Data processor that decodes bytes into strings """

The data processors contained here are:
- decode_utf8_processor
"""
from __future__ import annotations

from typing import Callable
Expand All @@ -15,6 +10,32 @@
)


__all__ = ['decode_processor']


def decode_processor(encoding: str = 'utf-8') -> Callable:
""" create a data processor that decodes a byte-stream
The created data processor will decode byte-streams, even if the encoding
is split at chunk borders.
If an encoding error occurs on the final data chunk, the un-decodable bytes
will be replaced with their escaped hex-values, i.e. ``\\xHH``,
for hex-value HH.
Parameters
----------
encoding: str
The name of encoding that should be decoded.
Returns
-------
Callable
A data processor that can be used in a processing pipeline to decode
chunks of bytes. The result are chunks of strings.
"""
return _DecodeProcessor(encoding)


class _DecodeProcessor:
""" Decode a byte-stream, even if the encoding is split at chunk borders
Expand All @@ -35,7 +56,7 @@ def __call__(self, data_chunks: BytesList,
) -> tuple[StrList, BytesList]:
""" The data processor interface
This allows instances of :class:`DecodeProcessor` to be used as
This allows instances of :class:``DecodeProcessor`` to be used as
data processor in pipeline definitions.
Parameters
Expand All @@ -45,7 +66,7 @@ def __call__(self, data_chunks: BytesList,
final : bool
the data chunks are the final data chunks of the source. If an
encoding error happens, the offending bytes will be replaced with
their escaped hex-values, i.e. `\\xHH`, for hex-value HH.
their escaped hex-values, i.e. ``\\xHH``, for hex-value HH.
Returns
-------
Expand All @@ -62,25 +83,3 @@ def __call__(self, data_chunks: BytesList,
else:
return [], data_chunks
return [text], []


def decode_processor(encoding: str = 'utf-8') -> Callable:
""" create a data processor that decodes a byte-stream
The created data processor will decode byte-streams, even if the encoding
is split at chunk borders.
If an encoding error occurs, the un-decodable bytes will be replaced with
their escaped hex-values, i.e. `\\xHH`, for hex-value HH.
Parameters
----------
encoding: str
The name of encoding that should be decoded.
Returns
-------
Callable
A data processor that can be used in a processing pipeline to decode
chunks of bytes. The result are chunks of strings.
"""
return _DecodeProcessor(encoding)
42 changes: 23 additions & 19 deletions datalad_next/runners/data_processors/jsonline.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
""" This module contains data processors for the data pipeline processor
""" Data processor that generates JSON objects from lines of bytes or strings """

The data processors contained here are:
- jsonline_processor
"""
from __future__ import annotations

import json
Expand All @@ -16,21 +11,30 @@
def jsonline_processor(lines: StrOrBytesList,
_: bool = False
) -> tuple[list[tuple[bool, Any]], StrOrBytesList]:
"""
A processor that converts lines into JSON objects, if possible.
""" A data processor that converts lines into JSON objects, if possible.
Parameters
----------
lines: StrOrBytesList
A list containing strings or byte-strings that that hold JSON-serialized
data.
Returns: tuple[list[Tuple[bool, StrOrBytes]], StrOrByteList]
The result, i.e. the first element of the result tuple, is a list that
contains one tuple for each element of `lines`. The first element of the
tuple is a bool that indicates whether the line could be converted. If it
was successfully converted the value is `True`. The second element is the
Python structure that resulted from the conversion if the first element
was `True`. If the first element is `False`, the second element contains
the input that could not be converted.
A list containing strings or byte-strings that that hold JSON-serialized
data.
_: bool
The ``final`` parameter is ignored because lines are assumed to be
complete and the conversion takes place for every line. Consequently,
no remaining input data exists, and there is no need for "flushing" in
a final round.
Returns
-------
tuple[list[Tuple[bool, StrOrBytes]], StrOrByteList]
The result, i.e. the first element of the result tuple, is a list that
contains one tuple for each element of ``lines``. The first element of the
tuple is a bool that indicates whether the line could be converted. If it
was successfully converted the value is ``True``. The second element is the
Python structure that resulted from the conversion if the first element
was ``True``. If the first element is ``False``, the second element contains
the input that could not be converted.
"""
result = []
for line in lines:
Expand Down
52 changes: 25 additions & 27 deletions datalad_next/runners/data_processors/pattern.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
""" This module contains data processors for the data pipeline processor
""" Data processor that ensure that a pattern odes not cross data chunk borders """

The data processors contained here are:
- pattern_processor
"""
from __future__ import annotations

from functools import partial
Expand All @@ -16,18 +11,40 @@
)


__all__ = ['pattern_processor']


def pattern_processor(pattern: StrOrBytes) -> Callable:
""" Create a pattern processor for the given ``pattern``.
A pattern processor re-assembles data chunks in such a way, that a single
data chunk could contain the complete pattern and will contain the complete
pattern, if the complete pattern start in the data chunk. It guarantees:
1. All chunks have at minimum the size of the pattern
2. If a complete pattern exists, it will be contained completely within a
single chunk, i.e. it will NOT be the case that a prefix of the pattern
is at the end of a chunk, and the rest of the pattern in the beginning
of the next chunk
The pattern might be present multiple times in a data chunk.
"""
assert len(pattern) > 0
return partial(_pattern_processor, pattern)


def _pattern_processor(pattern: StrOrBytes,
data_chunks: StrOrBytesList,
final: bool = False,
) -> tuple[StrOrBytesList, StrOrBytesList]:
""" Ensure that pattern is completely within a chunk,
""" Ensure that ``pattern`` appears only completely contained within a chunk
This processor ensures that a given data pattern (if it exists in the data
chunks) is either completely contained in a chunk or not in the chunk. That
means the processor ensures that all data chunks have at least the length of
the data pattern and that they do not end with a prefix of the data pattern.
As a result, a simple `pattern in data_chunk` test is sufficient to
As a result, a simple ``pattern in data_chunk`` test is sufficient to
determine whether a pattern appears in the data stream.
To use this function as a data processor, use partial to "fix" the first
Expand Down Expand Up @@ -81,22 +98,3 @@ def ends_with_pattern_prefix(data: StrOrBytes, pattern: StrOrBytes) -> bool:
if ends_with_pattern_prefix(data_chunks[-1], pattern):
return data_chunks[:-1], data_chunks[-1:]
return data_chunks, []


def pattern_processor(pattern: StrOrBytes) -> Callable:
""" Give out data chunks that contain a complete pattern, if it is present
This processor re-assembles data chunks in such a way, that a single
data chunk could contain the complete pattern and will contain the complete
pattern, if the complete pattern start in the data chunk. It guarantees:
1. All chunks have at minimum the size of the pattern
2. If a complete pattern exists, it will be contained completely within a
single chunk, i.e. it will NOT be the case that a prefix of the pattern
is at the end of a chunk, and the rest of the pattern in the beginning
of the next chunk
The pattern might be present multiple times in a data chunk.
"""
assert len(pattern) > 0
return partial(_pattern_processor, pattern)
66 changes: 31 additions & 35 deletions datalad_next/runners/data_processors/splitlines.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,3 @@
""" This module contains data processors for the data pipeline processor
The data processors contained here are:
- splitlines_processor
"""
from __future__ import annotations

from functools import partial
Expand All @@ -16,6 +9,37 @@
)


__all__ = ['splitlines_processor']


def splitlines_processor(
separator: StrOrBytes | None = None,
keep_ends: bool = True
) -> Callable[[StrOrBytesList, bool], tuple[StrOrBytesList, StrOrBytesList]]:
""" Generate a data processor the splits character- or byte-strings into lines
This function returns a data processor, that splits lines either on a given
separator, if 'separator' is not ``None``, or on one of the known line endings,
if 'separator' is ``None``. If ``separator`` is ``None``, the line endings are
determined by python.
Parameters
----------
separator: Optional[str]
If not None, the provided separator will be used to split lines.
keep_ends: bool
If True, the separator will be contained in the returned lines.
Returns
-------
Callable
A data processor that takes a list of strings or bytes, and returns
a list of strings or bytes, where every element is a single line (as
defined by the ``separator`` and ``keep_ends`` argument).
"""
return partial(_splitlines_processor, separator, keep_ends)


# We don't use LineSplitter here because it has two "problems". Firstly, it does
# not support `bytes`. Secondly, it can not be properly re-used because it does
# not delete its internal storage when calling `LineSplitter.finish_processing`.
Expand Down Expand Up @@ -78,31 +102,3 @@ def _splitlines_processor(separator: StrOrBytes | None,
if final:
result = result[0].extend(result[1]), []
return result


def splitlines_processor(separator: StrOrBytes | None = None,
keep_ends: bool = True
) -> Callable:
""" A data processor the splits character-strings or byte-strings into lines
Split lines either on a given separator, if 'separator' is not `None`,
or on one of the known line endings, if 'separator' is `None`. The line
endings are determined by python
Parameters
----------
separator: Optional[str]
If not None, the provided separator will be used to split lines.
keep_ends: bool
If True, the separator will be contained in the returned lines.
Returns
-------
list[str | bytes]
if the input data chunks contained bytes the result will be a list of
byte-strings that end with byte-size line-delimiters. If the input data
chunks contained strings, the result will be a list strings that end with
string delimiters (see Python-documentation for a definition of string
line delimiters).
"""
return partial(_splitlines_processor, separator, keep_ends)

0 comments on commit 721775f

Please sign in to comment.