Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use iter_subproc in datalad_next.url_operations.ssh instead of datalad-core runner code #546

Closed
Closed
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions datalad_next/iterable_subprocess/test_iterable_subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,13 @@


def test_exception_from_return_code():
with pytest.raises(IterableSubprocessError, match='No such file or directory') as excinfo:
with pytest.raises(IterableSubprocessError) as excinfo:
with iterable_subprocess(['ls', 'does-not-exist'], ()) as output:
a = b''.join(output)

assert excinfo.value.returncode > 0
assert b'No such file or directory' in excinfo.value.stderr
assert (b'No such file or directory' in excinfo.value.stderr) \
or (b'Datei oder Verzeichnis nicht gefunden' in excinfo.value.stderr)

Check failure on line 137 in datalad_next/iterable_subprocess/test_iterable_subprocess.py

View workflow job for this annotation

GitHub Actions / Check for spelling errors

oder ==> order, older, coder, odder, odor, over, doer
mih marked this conversation as resolved.
Show resolved Hide resolved


def test_exception_from_context_even_though_return_code_with_long_standard_error():
Expand Down
2 changes: 2 additions & 0 deletions datalad_next/itertools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
.. autosummary::
:toctree: generated

align_pattern
decode_bytes
itemize
load_json
Expand All @@ -13,6 +14,7 @@
"""


from .align_pattern import align_pattern
from .decode_bytes import decode_bytes
from .itemize import itemize
from .load_json import (
Expand Down
102 changes: 102 additions & 0 deletions datalad_next/itertools/align_pattern.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
""" Function to ensure that a pattern is completely contained in single chunks
"""

from __future__ import annotations

from typing import (
Generator,
Iterable,
)


def align_pattern(iterable: Iterable[str | bytes | bytearray],
pattern: str | bytes | bytearray
) -> Generator[str | bytes | bytearray, None, None]:
""" Yield data chunks that contain a complete pattern, if it is present

``align_pattern`` makes it easy to find a pattern (``str``, ``bytes``,
or ``bytearray``) in data chunks. It joins data-chunks in such a way,
that a simple containment-check (e.g. ``pattern in chunk``) on the chunks
that ``align_pattern`` yields will suffice to determine whether the pattern
is present in the stream yielded by the underlying iterable or not.

To achieve this, ``align_pattern`` will join consecutive chunks to ensures
that the following two assertions hold:

1. Each chunk that is yielded by ``align_pattern`` has at least the length
of the pattern (unless the underlying iterable is exhausted before the
length of the pattern is reached).

2. The pattern is not split between two chunks, i.e. no chunk that is
yielded by ``align_pattern`` ends with a prefix of the pattern (unless
it is the last chunk that the underlying iterable yield).

The pattern might be present multiple times in a yielded data chunk.

Note: the ``pattern`` is compared verbatim to the content in the data
chunks, i.e. no parsing of the ``pattern`` is performed and no regular
expressions or wildcards are supported.

.. code-block:: python

>>> from datalad_next.itertools import align_pattern
>>> tuple(align_pattern([b'abcd', b'e', b'fghi'], pattern=b'def'))
(b'abcdefghi',)
>>> # The pattern can be present multiple times in a yielded chunk
>>> tuple(align_pattern([b'abcd', b'e', b'fdefghi'], pattern=b'def'))
(b'abcdefdefghi',)

Use this function if you want to locate a pattern in an input stream. It
allows to use a simple ``in``-check to determine whether the pattern is
present in the yielded result chunks.
mih marked this conversation as resolved.
Show resolved Hide resolved

The function always yields everything it has fetched from the underlying
iterable. So after a yield it does not cache any data from the underlying
iterable. That means, if the functionality of
``align_pattern`` is no longer required, the underlying iterator can be
used, when ``align_pattern`` has yielded a data chunk.
This allows more efficient processing of the data that remains in the
underlying iterable.

Parameters
----------
iterable: Iterable
An iterable that yields data chunks.
pattern: str | bytes | bytearray
The pattern that should be contained in the chunks. Its type must be
compatible to the type of the elements in ``iterable``.

Yields
-------
str | bytes | bytearray
data chunks that have at least the size of the pattern and do not end
with a prefix of the pattern. Note that a data chunk might contain the
pattern multiple times.
"""

def ends_with_pattern_prefix(data: str | bytes | bytearray,
pattern: str | bytes | bytearray,
) -> bool:
""" Check whether the chunk ends with a prefix of the pattern """
for index in range(len(pattern) - 1, 0, -1):
if data[-index:] == pattern[:index]:
return True

Check warning on line 83 in datalad_next/itertools/align_pattern.py

View check run for this annotation

Codecov / codecov/patch

datalad_next/itertools/align_pattern.py#L83

Added line #L83 was not covered by tests
return False

# Join data chunks until they are sufficiently long to contain the pattern,
# i.e. have at least size: `len(pattern)`. Continue joining, if the chunk
# ends with a prefix of the pattern.
current_chunk = None
for data_chunk in iterable:
# get the type of current_chunk from the type of this data_chunk
if current_chunk is None:
current_chunk = data_chunk
else:
current_chunk += data_chunk
if len(current_chunk) >= len(pattern) \
and not ends_with_pattern_prefix(current_chunk, pattern):
yield current_chunk
current_chunk = None

if current_chunk is not None:
yield current_chunk
24 changes: 24 additions & 0 deletions datalad_next/itertools/tests/test_align_pattern.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from __future__ import annotations

import pytest

from ..align_pattern import align_pattern


@pytest.mark.parametrize('data_chunks,pattern,expected', [
(['a', 'b', 'c', 'd', 'e'], 'abc', ['abc', 'de']),
(['a', 'b', 'c', 'a', 'b', 'c'], 'abc', ['abc', 'abc']),
# Ensure that unaligned pattern prefixes are not keeping data chunks short.
(['a', 'b', 'c', 'dddbbb', 'a', 'b', 'x'], 'abc', ['abc', 'dddbbb', 'abx']),
# Expect that a trailing minimum length-chunk that ends with a pattern
# prefix is not returned as data, but as remainder, if it is not the final
# chunk.
(['a', 'b', 'c', 'd', 'a'], 'abc', ['abc', 'da']),
# Expect the last chunk to be returned as data, if final is True, although
# it ends with a pattern prefix. If final is false, the last chunk will be
# returned as a remainder, because it ends with a pattern prefix.
(['a', 'b', 'c', 'dddbbb', 'a'], 'abc', ['abc', 'dddbbb', 'a']),
(['a', 'b', 'c', '9', 'a'], 'abc', ['abc', '9a']),
])
def test_pattern_processor(data_chunks, pattern, expected):
assert expected == list(align_pattern(data_chunks, pattern=pattern))
5 changes: 4 additions & 1 deletion datalad_next/runners/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@
StdOutErrCapture
"""

from .iter_subproc import iter_subproc
from .iter_subproc import (
iter_subproc,
IterableSubprocessError,
)

# runners
from datalad.runner import (
Expand Down
8 changes: 6 additions & 2 deletions datalad_next/runners/iter_subproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@
List,
)

from datalad_next.iterable_subprocess.iterable_subprocess \
import iterable_subprocess
from datalad_next.iterable_subprocess.iterable_subprocess import (
iterable_subprocess,
# not needed here, but we want to provide all critical pieces from
# the same place. This is the key exception type
IterableSubprocessError,
)
from datalad_next.consts import COPY_BUFSIZE

__all__ = ['iter_subproc']
Expand Down
35 changes: 35 additions & 0 deletions datalad_next/url_operations/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,14 @@
from __future__ import annotations

import logging
from functools import partial
from more_itertools import side_effect
from pathlib import Path
from typing import (
Any,
Dict,
Generator,
Iterable,
)

import datalad
Expand Down Expand Up @@ -338,6 +342,37 @@ def _progress_report_stop(self, pid: str, log_msg: tuple):
def _get_hasher(self, hash: list[str] | None) -> NoOpHash | MultiHash:
return MultiHash(hash) if hash is not None else NoOpHash()

def _reporting(self,
stream: Iterable[Any],
*,
progress_id: str,
label: str,
expected_size: int | None,
start_log_msg: tuple,
end_log_msg: tuple,
update_log_msg: tuple
) -> Generator[Any, None, None]:
yield from side_effect(
lambda chunk: self._progress_report_update(
progress_id,
update_log_msg,
len(chunk)
),
stream,
before=partial(
self._progress_report_start,
progress_id,
start_log_msg,
label,
expected_size
),
after=partial(
self._progress_report_stop,
progress_id,
end_log_msg
)
)


#
# Exceptions to be used by all handlers
Expand Down
Loading
Loading