diff --git a/datalad_next/runners/data_processors/__init__.py b/datalad_next/runners/data_processors/__init__.py index 5e2b945b6..59ca72025 100644 --- a/datalad_next/runners/data_processors/__init__.py +++ b/datalad_next/runners/data_processors/__init__.py @@ -1,5 +1,19 @@ +""" This module contains data processors for the data pipeline processor + +Available data processors: + +.. currentmodule:: datalad_next.runner.data_processors +.. autosummary:: + :toctree: generated + + decode + jsonline + pattern + splitlines + +""" from .decode import decode_processor from .jsonline import jsonline_processor from .pattern import pattern_processor -from .splitlines import splitlines_processor \ No newline at end of file +from .splitlines import splitlines_processor diff --git a/datalad_next/runners/data_processors/decode.py b/datalad_next/runners/data_processors/decode.py index b0ee113c7..4bb8ed064 100644 --- a/datalad_next/runners/data_processors/decode.py +++ b/datalad_next/runners/data_processors/decode.py @@ -1,10 +1,5 @@ -""" This module contains data processors for the data pipeline processor +""" Data processor that decodes bytes into strings """ -The data processors contained here are: - - - decode_utf8_processor - -""" from __future__ import annotations from typing import Callable @@ -15,6 +10,32 @@ ) +__all__ = ['decode_processor'] + + +def decode_processor(encoding: str = 'utf-8') -> Callable: + """ create a data processor that decodes a byte-stream + + The created data processor will decode byte-streams, even if the encoding + is split at chunk borders. + If an encoding error occurs on the final data chunk, the un-decodable bytes + will be replaced with their escaped hex-values, i.e. ``\\xHH``, + for hex-value HH. + + Parameters + ---------- + encoding: str + The name of encoding that should be decoded. + + Returns + ------- + Callable + A data processor that can be used in a processing pipeline to decode + chunks of bytes. The result are chunks of strings. + """ + return _DecodeProcessor(encoding) + + class _DecodeProcessor: """ Decode a byte-stream, even if the encoding is split at chunk borders @@ -35,7 +56,7 @@ def __call__(self, data_chunks: BytesList, ) -> tuple[StrList, BytesList]: """ The data processor interface - This allows instances of :class:`DecodeProcessor` to be used as + This allows instances of :class:``DecodeProcessor`` to be used as data processor in pipeline definitions. Parameters @@ -45,7 +66,7 @@ def __call__(self, data_chunks: BytesList, final : bool the data chunks are the final data chunks of the source. If an encoding error happens, the offending bytes will be replaced with - their escaped hex-values, i.e. `\\xHH`, for hex-value HH. + their escaped hex-values, i.e. ``\\xHH``, for hex-value HH. Returns ------- @@ -62,25 +83,3 @@ def __call__(self, data_chunks: BytesList, else: return [], data_chunks return [text], [] - - -def decode_processor(encoding: str = 'utf-8') -> Callable: - """ create a data processor that decodes a byte-stream - - The created data processor will decode byte-streams, even if the encoding - is split at chunk borders. - If an encoding error occurs, the un-decodable bytes will be replaced with - their escaped hex-values, i.e. `\\xHH`, for hex-value HH. - - Parameters - ---------- - encoding: str - The name of encoding that should be decoded. - - Returns - ------- - Callable - A data processor that can be used in a processing pipeline to decode - chunks of bytes. The result are chunks of strings. - """ - return _DecodeProcessor(encoding) diff --git a/datalad_next/runners/data_processors/jsonline.py b/datalad_next/runners/data_processors/jsonline.py index 8b1e218da..43aadbba6 100644 --- a/datalad_next/runners/data_processors/jsonline.py +++ b/datalad_next/runners/data_processors/jsonline.py @@ -1,10 +1,5 @@ -""" This module contains data processors for the data pipeline processor +""" Data processor that generates JSON objects from lines of bytes or strings """ -The data processors contained here are: - - - jsonline_processor - -""" from __future__ import annotations import json @@ -16,21 +11,30 @@ def jsonline_processor(lines: StrOrBytesList, _: bool = False ) -> tuple[list[tuple[bool, Any]], StrOrBytesList]: - """ - A processor that converts lines into JSON objects, if possible. + """ A data processor that converts lines into JSON objects, if possible. + Parameters + ---------- lines: StrOrBytesList - A list containing strings or byte-strings that that hold JSON-serialized - data. - - Returns: tuple[list[Tuple[bool, StrOrBytes]], StrOrByteList] - The result, i.e. the first element of the result tuple, is a list that - contains one tuple for each element of `lines`. The first element of the - tuple is a bool that indicates whether the line could be converted. If it - was successfully converted the value is `True`. The second element is the - Python structure that resulted from the conversion if the first element - was `True`. If the first element is `False`, the second element contains - the input that could not be converted. + A list containing strings or byte-strings that that hold JSON-serialized + data. + + _: bool + The ``final`` parameter is ignored because lines are assumed to be + complete and the conversion takes place for every line. Consequently, + no remaining input data exists, and there is no need for "flushing" in + a final round. + + Returns + ------- + tuple[list[Tuple[bool, StrOrBytes]], StrOrByteList] + The result, i.e. the first element of the result tuple, is a list that + contains one tuple for each element of ``lines``. The first element of the + tuple is a bool that indicates whether the line could be converted. If it + was successfully converted the value is ``True``. The second element is the + Python structure that resulted from the conversion if the first element + was ``True``. If the first element is ``False``, the second element contains + the input that could not be converted. """ result = [] for line in lines: diff --git a/datalad_next/runners/data_processors/pattern.py b/datalad_next/runners/data_processors/pattern.py index 72d95878d..df9aba34c 100644 --- a/datalad_next/runners/data_processors/pattern.py +++ b/datalad_next/runners/data_processors/pattern.py @@ -1,10 +1,5 @@ -""" This module contains data processors for the data pipeline processor +""" Data processor that ensure that a pattern odes not cross data chunk borders """ -The data processors contained here are: - - - pattern_processor - -""" from __future__ import annotations from functools import partial @@ -16,18 +11,40 @@ ) +__all__ = ['pattern_processor'] + + +def pattern_processor(pattern: StrOrBytes) -> Callable: + """ Create a pattern processor for the given ``pattern``. + + A pattern processor re-assembles data chunks in such a way, that a single + data chunk could contain the complete pattern and will contain the complete + pattern, if the complete pattern start in the data chunk. It guarantees: + + 1. All chunks have at minimum the size of the pattern + 2. If a complete pattern exists, it will be contained completely within a + single chunk, i.e. it will NOT be the case that a prefix of the pattern + is at the end of a chunk, and the rest of the pattern in the beginning + of the next chunk + + The pattern might be present multiple times in a data chunk. + """ + assert len(pattern) > 0 + return partial(_pattern_processor, pattern) + + def _pattern_processor(pattern: StrOrBytes, data_chunks: StrOrBytesList, final: bool = False, ) -> tuple[StrOrBytesList, StrOrBytesList]: - """ Ensure that pattern is completely within a chunk, + """ Ensure that ``pattern`` appears only completely contained within a chunk This processor ensures that a given data pattern (if it exists in the data chunks) is either completely contained in a chunk or not in the chunk. That means the processor ensures that all data chunks have at least the length of the data pattern and that they do not end with a prefix of the data pattern. - As a result, a simple `pattern in data_chunk` test is sufficient to + As a result, a simple ``pattern in data_chunk`` test is sufficient to determine whether a pattern appears in the data stream. To use this function as a data processor, use partial to "fix" the first @@ -81,22 +98,3 @@ def ends_with_pattern_prefix(data: StrOrBytes, pattern: StrOrBytes) -> bool: if ends_with_pattern_prefix(data_chunks[-1], pattern): return data_chunks[:-1], data_chunks[-1:] return data_chunks, [] - - -def pattern_processor(pattern: StrOrBytes) -> Callable: - """ Give out data chunks that contain a complete pattern, if it is present - - This processor re-assembles data chunks in such a way, that a single - data chunk could contain the complete pattern and will contain the complete - pattern, if the complete pattern start in the data chunk. It guarantees: - - 1. All chunks have at minimum the size of the pattern - 2. If a complete pattern exists, it will be contained completely within a - single chunk, i.e. it will NOT be the case that a prefix of the pattern - is at the end of a chunk, and the rest of the pattern in the beginning - of the next chunk - - The pattern might be present multiple times in a data chunk. - """ - assert len(pattern) > 0 - return partial(_pattern_processor, pattern) diff --git a/datalad_next/runners/data_processors/splitlines.py b/datalad_next/runners/data_processors/splitlines.py index 7aaf713c2..c84b89a8a 100644 --- a/datalad_next/runners/data_processors/splitlines.py +++ b/datalad_next/runners/data_processors/splitlines.py @@ -1,10 +1,3 @@ -""" This module contains data processors for the data pipeline processor - -The data processors contained here are: - - - splitlines_processor - -""" from __future__ import annotations from functools import partial @@ -16,6 +9,37 @@ ) +__all__ = ['splitlines_processor'] + + +def splitlines_processor( + separator: StrOrBytes | None = None, + keep_ends: bool = True +) -> Callable[[StrOrBytesList, bool], tuple[StrOrBytesList, StrOrBytesList]]: + """ Generate a data processor the splits character- or byte-strings into lines + + This function returns a data processor, that splits lines either on a given + separator, if 'separator' is not ``None``, or on one of the known line endings, + if 'separator' is ``None``. If ``separator`` is ``None``, the line endings are + determined by python. + + Parameters + ---------- + separator: Optional[str] + If not None, the provided separator will be used to split lines. + keep_ends: bool + If True, the separator will be contained in the returned lines. + + Returns + ------- + Callable + A data processor that takes a list of strings or bytes, and returns + a list of strings or bytes, where every element is a single line (as + defined by the ``separator`` and ``keep_ends`` argument). + """ + return partial(_splitlines_processor, separator, keep_ends) + + # We don't use LineSplitter here because it has two "problems". Firstly, it does # not support `bytes`. Secondly, it can not be properly re-used because it does # not delete its internal storage when calling `LineSplitter.finish_processing`. @@ -78,31 +102,3 @@ def _splitlines_processor(separator: StrOrBytes | None, if final: result = result[0].extend(result[1]), [] return result - - -def splitlines_processor(separator: StrOrBytes | None = None, - keep_ends: bool = True - ) -> Callable: - """ A data processor the splits character-strings or byte-strings into lines - - Split lines either on a given separator, if 'separator' is not `None`, - or on one of the known line endings, if 'separator' is `None`. The line - endings are determined by python - - Parameters - ---------- - separator: Optional[str] - If not None, the provided separator will be used to split lines. - keep_ends: bool - If True, the separator will be contained in the returned lines. - - Returns - ------- - list[str | bytes] - if the input data chunks contained bytes the result will be a list of - byte-strings that end with byte-size line-delimiters. If the input data - chunks contained strings, the result will be a list strings that end with - string delimiters (see Python-documentation for a definition of string - line delimiters). - """ - return partial(_splitlines_processor, separator, keep_ends)