diff --git a/datalad_next/runners/tests/test_data_processors.py b/datalad_next/runners/tests/test_data_processors.py index 7d987b843..ebdb972fb 100644 --- a/datalad_next/runners/tests/test_data_processors.py +++ b/datalad_next/runners/tests/test_data_processors.py @@ -98,7 +98,7 @@ def perform_test(data_chunks: list[str | bytes], copied_data_chunks = data_chunks[:] for final, result in ((True, expected_final), (False, expected_non_final)): r = pattern_border_processor(pattern)(data_chunks, final=final) - assert tuple(r) == result + assert tuple(r) == result, f'failed with final {final}' # Check that the original list was not modified assert copied_data_chunks == data_chunks @@ -143,3 +143,11 @@ def perform_test(data_chunks: list[str | bytes], expected_non_final=(['abc', 'dddbbb'], ['a']), expected_final=(['abc', 'dddbbb', 'a'], []) ) + + + perform_test( + data_chunks=['a', 'b', 'c', '9', 'a'], + pattern='abc', + expected_non_final=(['abc'], ['9a']), + expected_final=(['abc', '9a'], []) + ) diff --git a/datalad_next/url_operations/ssh.py b/datalad_next/url_operations/ssh.py index 4858d79f3..b4d254cc7 100644 --- a/datalad_next/url_operations/ssh.py +++ b/datalad_next/url_operations/ssh.py @@ -32,6 +32,10 @@ CommandError, ) +from datalad_next.runners.data_processors import ( + pattern_border_processor, + process_from, +) from datalad_next.runners.run import run from datalad_next.utils.consts import COPY_BUFSIZE @@ -101,37 +105,37 @@ def _stat(self, url: str, cmd: str) -> Dict: ssh_cat = _SshCat(url) cmd = ssh_cat.get_cmd(cmd) with run(cmd, protocol_class=StdOutCaptureGeneratorProtocol) as stream: - for chunk in stream: - if need_magic: - expected_magic = need_magic[:min(len(need_magic), - len(chunk))] - incoming_magic = chunk[:len(need_magic)] - # does the incoming data have the remaining magic bytes? - if incoming_magic != expected_magic: - raise RuntimeError( - "Protocol error: report header not received") - # reduce (still missing) magic, if any - need_magic = need_magic[len(expected_magic):] - # strip magic from input - chunk = chunk[len(expected_magic):] - if chunk and expected_size is None: - # we have incoming data left and - # we have not yet consumed the size info - size_data = chunk.split(b'\1', maxsplit=1) - expected_size_str += size_data[0] - if len(size_data) > 1: - # this is not only size info, but we found the start of - # the data - expected_size = int(expected_size_str) - chunk = size_data[1] - if expected_size: - props = { - 'content-length': expected_size, - '_stream': chain([chunk], stream) if chunk else stream, - } - return props - # there should be no data left to process, or something went wrong - assert not chunk + + # The try clause enables us to execute the code after the context + # handler if the iterator stops unexpectedly. That would, for + # example be the case, if the ssh-subprocess terminates prematurely. + try: + filtered_stream = process_from( + stream, + [pattern_border_processor(need_magic)] + ) + + # The first chunk should start with the magic + chunk = next(filtered_stream) + if chunk[:len(need_magic)] != need_magic: + raise RuntimeError("Protocol error: report header not received") + chunk = chunk[len(need_magic):] + + # The length is transferred now and terminated by b'\x01'. + while b'\x01' not in chunk: + chunk += next(filtered_stream) + + marker_index = chunk.index(b'\x01') + expected_size = int(chunk[:marker_index]) + + props = { + 'content-length': expected_size, + '_stream': chain([chunk[marker_index + 1:]], filtered_stream), + } + return props + + except StopIteration: + pass # At this point the subprocess has either exited, was terminated, or # was killed.