Skip to content

Commit

Permalink
use run-context-manager in SshUrlOperations._stat
Browse files Browse the repository at this point in the history
This commit uses the run-context-manager in
SshUrlOperations._stat.

It also adds and uses a new data processor,
called "pattern_border_processor", that ensures
that a given pattern is completely contained
within a chunk, if it exists in the  stream.

This makes it easier to search for patterns
and fixes a potential bug in `_stat`, that
was triggered when data chunks were shorter
than the pattern `b'\1\2\3`.
  • Loading branch information
christian-monch committed Oct 16, 2023
1 parent ba21054 commit ba486bc
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 32 deletions.
10 changes: 9 additions & 1 deletion datalad_next/runners/tests/test_data_processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def perform_test(data_chunks: list[str | bytes],
copied_data_chunks = data_chunks[:]
for final, result in ((True, expected_final), (False, expected_non_final)):
r = pattern_border_processor(pattern)(data_chunks, final=final)
assert tuple(r) == result
assert tuple(r) == result, f'failed with final {final}'
# Check that the original list was not modified
assert copied_data_chunks == data_chunks

Expand Down Expand Up @@ -143,3 +143,11 @@ def perform_test(data_chunks: list[str | bytes],
expected_non_final=(['abc', 'dddbbb'], ['a']),
expected_final=(['abc', 'dddbbb', 'a'], [])
)


perform_test(
data_chunks=['a', 'b', 'c', '9', 'a'],
pattern='abc',
expected_non_final=(['abc'], ['9a']),
expected_final=(['abc', '9a'], [])
)
66 changes: 35 additions & 31 deletions datalad_next/url_operations/ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@
CommandError,
)

from datalad_next.runners.data_processors import (
pattern_border_processor,
process_from,
)
from datalad_next.runners.run import run
from datalad_next.utils.consts import COPY_BUFSIZE

Expand Down Expand Up @@ -101,37 +105,37 @@ def _stat(self, url: str, cmd: str) -> Dict:
ssh_cat = _SshCat(url)
cmd = ssh_cat.get_cmd(cmd)
with run(cmd, protocol_class=StdOutCaptureGeneratorProtocol) as stream:
for chunk in stream:
if need_magic:
expected_magic = need_magic[:min(len(need_magic),
len(chunk))]
incoming_magic = chunk[:len(need_magic)]
# does the incoming data have the remaining magic bytes?
if incoming_magic != expected_magic:
raise RuntimeError(
"Protocol error: report header not received")
# reduce (still missing) magic, if any
need_magic = need_magic[len(expected_magic):]
# strip magic from input
chunk = chunk[len(expected_magic):]
if chunk and expected_size is None:
# we have incoming data left and
# we have not yet consumed the size info
size_data = chunk.split(b'\1', maxsplit=1)
expected_size_str += size_data[0]
if len(size_data) > 1:
# this is not only size info, but we found the start of
# the data
expected_size = int(expected_size_str)
chunk = size_data[1]
if expected_size:
props = {
'content-length': expected_size,
'_stream': chain([chunk], stream) if chunk else stream,
}
return props
# there should be no data left to process, or something went wrong
assert not chunk

# The try clause enables us to execute the code after the context
# handler if the iterator stops unexpectedly. That would, for
# example be the case, if the ssh-subprocess terminates prematurely.
try:
filtered_stream = process_from(
stream,
[pattern_border_processor(need_magic)]
)

# The first chunk should start with the magic
chunk = next(filtered_stream)
if chunk[:len(need_magic)] != need_magic:
raise RuntimeError("Protocol error: report header not received")
chunk = chunk[len(need_magic):]

# The length is transferred now and terminated by b'\x01'.
while b'\x01' not in chunk:
chunk += next(filtered_stream)

marker_index = chunk.index(b'\x01')
expected_size = int(chunk[:marker_index])

props = {
'content-length': expected_size,
'_stream': chain([chunk[marker_index + 1:]], filtered_stream),
}
return props

except StopIteration:
pass

# At this point the subprocess has either exited, was terminated, or
# was killed.
Expand Down

0 comments on commit ba486bc

Please sign in to comment.