diff --git a/.github/workflows/mypy.yml b/.github/workflows/mypy.yml index 8b3d9be0..48465fda 100644 --- a/.github/workflows/mypy.yml +++ b/.github/workflows/mypy.yml @@ -13,7 +13,7 @@ jobs: - name: Setup Python uses: actions/setup-python@v4 with: - python-version: 3.8 + python-version: 3.11 architecture: x64 - name: Checkout uses: actions/checkout@v3 diff --git a/LICENSE b/LICENSE index c5c42c38..062ec2e2 100644 --- a/LICENSE +++ b/LICENSE @@ -26,3 +26,11 @@ documentation is covered by the MIT license. THE SOFTWARE. See CONTRIBUTORS file for a full list of contributors. + +# 3rd-party code + +A copy of https://github.com/uktrade/iterable-subprocess is included at +`datalad_next/iterable_subprocess`. It was written by +Michal Charemza for UK Department of Business and Trade, and was made +available under the terms of the MIT license. +See `datalad_next/iterable_subprocess/LICENSE`. diff --git a/datalad_next/consts/__init__.py b/datalad_next/consts/__init__.py new file mode 100644 index 00000000..37a60f6b --- /dev/null +++ b/datalad_next/consts/__init__.py @@ -0,0 +1,17 @@ +"""Common constants +""" + +# import from "utils", but these really are constants +from datalad.utils import ( + on_linux, + on_windows, +) + +try: + from shutil import COPY_BUFSIZE +except ImportError: # pragma: no cover + # too old + # from PY3.10 + COPY_BUFSIZE = 1024 * 1024 if on_windows else 64 * 1024 + +from datalad.consts import PRE_INIT_COMMIT_SHA diff --git a/datalad_next/iter_collections/gitworktree.py b/datalad_next/iter_collections/gitworktree.py index a2dee1b5..237fe97b 100644 --- a/datalad_next/iter_collections/gitworktree.py +++ b/datalad_next/iter_collections/gitworktree.py @@ -20,11 +20,10 @@ Tuple, ) -from datalad_next.runners import ( - DEVNULL, - LineSplitter, - ThreadedRunner, - StdOutCaptureGeneratorProtocol, +from datalad_next.runners import iter_subproc +from datalad_next.itertools import ( + decode_bytes, + itemize, ) from .utils import ( @@ -250,23 +249,20 @@ def _lsfiles_line2props( def _git_ls_files(path, *args): - # we use a plain runner to avoid the overhead of a GitRepo instance - runner = ThreadedRunner( - cmd=[ - 'git', 'ls-files', - # we rely on zero-byte splitting below - '-z', - # otherwise take whatever is coming in - *args, - ], - protocol_class=StdOutCaptureGeneratorProtocol, - stdin=DEVNULL, - # run in the directory we want info on - cwd=path, - ) - line_splitter = LineSplitter('\0', keep_ends=False) - # for each command output chunk received by the runner - for content in runner.run(): - # for each zerobyte-delimited "line" in the output - for line in line_splitter.process(content.decode('utf-8')): - yield line + with iter_subproc( + [ + 'git', '-C', str(path), + 'ls-files', + # we rely on zero-byte splitting below + '-z', + # otherwise take whatever is coming in + *args, + ], + ) as r: + yield from decode_bytes( + itemize( + r, + separator=b'\0', + keep_ends=False, + ) + ) diff --git a/datalad_next/iterable_subprocess/.coveragerc b/datalad_next/iterable_subprocess/.coveragerc new file mode 100644 index 00000000..398ff08a --- /dev/null +++ b/datalad_next/iterable_subprocess/.coveragerc @@ -0,0 +1,2 @@ +[run] +branch = True diff --git a/datalad_next/iterable_subprocess/.github/workflows/deploy-package-to-pypi.yml b/datalad_next/iterable_subprocess/.github/workflows/deploy-package-to-pypi.yml new file mode 100644 index 00000000..f500c37d --- /dev/null +++ b/datalad_next/iterable_subprocess/.github/workflows/deploy-package-to-pypi.yml @@ -0,0 +1,44 @@ +name: Deploy package to PyPI + +on: + release: + types: [published] + +jobs: + build: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + + - uses: actions/setup-python@v3 + with: + python-version: 3.11 + + - name: Update version in pyproject.toml from current git tag + run: >- + sed -i "s/0\\.0\\.0\\.dev0/${GITHUB_REF/refs\/tags\/v/}/g" pyproject.toml + + - run: | + pip install build + python -m build + + - uses: actions/upload-artifact@v3 + with: + path: ./dist + + deploy: + needs: ['build'] + environment: 'pypi' + + name: upload release to PyPI + runs-on: ubuntu-latest + permissions: + # IMPORTANT: this permission is mandatory for trusted publishing + id-token: write + steps: + - uses: actions/download-artifact@v3 + + - name: Publish package distributions to PyPI + uses: pypa/gh-action-pypi-publish@release/v1 + with: + packages_dir: artifact/ diff --git a/datalad_next/iterable_subprocess/.github/workflows/test.yml b/datalad_next/iterable_subprocess/.github/workflows/test.yml new file mode 100644 index 00000000..f154785a --- /dev/null +++ b/datalad_next/iterable_subprocess/.github/workflows/test.yml @@ -0,0 +1,37 @@ +name: Tests +on: + push: + branches: [ "main" ] + pull_request: + branches: [ "main" ] + +jobs: + test: + name: Test + runs-on: ubuntu-20.04 + strategy: + matrix: + python-version: + - "3.6.7" + - "3.7.1" + - "3.8.0" + - "3.9.0" + - "3.10.0" + - "3.11.0" + steps: + - name: "Checkout" + uses: "actions/checkout@v3" + - uses: "actions/setup-python@v4" + with: + python-version: '${{ matrix.python-version }}' + - name: "Install funzip" + run: | + sudo apt-get update + sudo apt-get install unzip + - name: "Install package and python dependencies" + run: | + pip install .[dev] + - name: "Test" + run: | + pytest --cov + - uses: codecov/codecov-action@v3 diff --git a/datalad_next/iterable_subprocess/.gitignore b/datalad_next/iterable_subprocess/.gitignore new file mode 100644 index 00000000..b6e47617 --- /dev/null +++ b/datalad_next/iterable_subprocess/.gitignore @@ -0,0 +1,129 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +pip-wheel-metadata/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +.python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ diff --git a/datalad_next/iterable_subprocess/LICENSE b/datalad_next/iterable_subprocess/LICENSE new file mode 100644 index 00000000..5a6779fd --- /dev/null +++ b/datalad_next/iterable_subprocess/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2021 Department for International Trade + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/datalad_next/iterable_subprocess/README.md b/datalad_next/iterable_subprocess/README.md new file mode 100644 index 00000000..045ee2d7 --- /dev/null +++ b/datalad_next/iterable_subprocess/README.md @@ -0,0 +1,81 @@ +# iterable-subprocess + +[![PyPI package](https://img.shields.io/pypi/v/iterable-subprocess?label=PyPI%20package&color=%234c1)](https://pypi.org/project/iterable-subprocess/) [![Test suite](https://img.shields.io/github/actions/workflow/status/uktrade/iterable-subprocess/test.yml?label=Test%20suite)](https://github.com/uktrade/iterable-subprocess/actions/workflows/test.yml) [![Code coverage](https://img.shields.io/codecov/c/github/uktrade/iterable-subprocess?label=Code%20coverage)](https://app.codecov.io/gh/uktrade/iterable-subprocess) + +Python context manager to communicate with a subprocess using iterables. This offers a higher level interface to subprocesses than Python's built-in subprocess module, and is particularly helpful when data won't fit in memory and has to be streamed. + +This also allows an external subprocess to be naturally placed in a chain of iterables as part of a data processing pipeline. + + +## Installation + +```bash +pip install iterable-subprocess +``` + + +## Usage + +A single context manager `iterable_subprocess` is exposed. The first parameter is the `args` argument passed to the [Popen Constructor](https://docs.python.org/3/library/subprocess.html#popen-constructor), and the second is an iterable whose items must be `bytes` instances and are sent to the subprocess's standard input. + +Returned from the function is an iterable whose items are `bytes` instances of the process's standard output. + +```python +from iterable_subprocess import iterable_subprocess + +# In a real case could be a generator function that reads from the filesystem or the network +iterable_of_bytes = ( + b'first\n', + b'second\n', + b'third\n', +) + +with iterable_subprocess(['cat'], iterable_of_bytes) as output: + for chunk in output: + print(chunk) +``` + + +## Exceptions + +Python's `subprocess.Popen` is used to start the process, and any exceptions it raises are propagated without transformation. For example, if the subprocess can't be found, then a `FileNotFoundError` is raised. + +If the process starts, but exits with a non-zero return code, then an `iterable_subprocess.IterableSubprocessError` exception will be raised with two members: + +- `returncode` - the return code of the process +- `stderr` - the final 65536 bytes of the standard error of the process + +However, if the process starts, but an exception is raised from inside the context or from the source iterable, then this exception is propagated, even if the process subsequently exits with a non-zero return code. + + +## Example: unzip the first file of a ZIP archive while downloading + +It's possible to download the bytes of a ZIP file in Python, and unzip by passing the bytes to `funzip`, as in the following example. + +```python +import httpx +from iterable_subprocess import iterable_subprocess + +with \ + httpx.stream('GET', 'https://www.example.com/my.zip') as r, \ + iterable_subprocess(['funzip'], r.iter_bytes()) as unzipped_chunks: + + for chunk in unzipped_chunks: + print(chunk) +``` + +Note that it's also possible to stream unzip files without resorting to another process using [stream-unzip](https://github.com/uktrade/stream-unzip). + + +## Example: download file using curl and process in Python + +You would usually download directly from Python, but as an example, you can download using the curl executable and process its output in Python. + +```python +from iterable_subprocess import iterable_subprocess + +url = 'https://data.api.trade.gov.uk/v1/datasets/uk-tariff-2021-01-01/versions/v3.0.212/tables/measures-on-declarable-commodities/data?format=csv' +with iterable_subprocess(['curl', '--no-progress-meter', '--fail-with-body', url], ()) as output: + for chunk in output: + print(chunk) +``` diff --git a/datalad_next/iterable_subprocess/__init__.py b/datalad_next/iterable_subprocess/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/datalad_next/iterable_subprocess/codecov.yml b/datalad_next/iterable_subprocess/codecov.yml new file mode 100644 index 00000000..69cb7601 --- /dev/null +++ b/datalad_next/iterable_subprocess/codecov.yml @@ -0,0 +1 @@ +comment: false diff --git a/datalad_next/iterable_subprocess/iterable_subprocess.py b/datalad_next/iterable_subprocess/iterable_subprocess.py new file mode 100644 index 00000000..b9f3dbaa --- /dev/null +++ b/datalad_next/iterable_subprocess/iterable_subprocess.py @@ -0,0 +1,159 @@ +from collections import deque +from contextlib import contextmanager +from subprocess import PIPE, SubprocessError, Popen +from threading import Thread + + +@contextmanager +def iterable_subprocess(program, input_chunks, chunk_size=65536): + # This context starts a thread that populates the subprocess's standard input. It + # also starts a threads that reads the process's standard error. Otherwise we risk + # a deadlock - there is no output because the process is waiting for more input. + # + # This itself introduces its own complications and risks, but hopefully mitigated + # by having a well defined start and stop mechanism that also avoid sending data + # to the process if it's not running + # + # To start, i.e. on entry to the context from client code + # - The process is started + # - The thread to read from standard error is started + # - The thread to populate input is started + # + # When running: + # - The standard input thread iterates over the input, passing chunks to the process + # - While the standard error thread fetches the error output + # - And while this thread iterates over the processe's output from client code + # in the context + # + # To stop, i.e. on exit of the context from client code + # - This thread closes the process's standard output + # - Wait for the standard input thread to exit + # - Wait for the standard error thread to exit + # - Wait for the process to exit + # + # By using context managers internally, this also gives quite strong guarantees that + # the above order is enforced to make sure the thread doesn't send data to the process + # whose standard input is closed and so we don't get BrokenPipe errors + + # Writing to the process can result in a BrokenPipeError. If this then results in + # a non-zero code from the process, the process's standard error probably has useful + # information on the cause of this. However, the non-zero error code happens after + # BrokenPipeError, so propagating "what happens first" isn't helpful in this case. + # So, we re-raise BrokenPipeError as _BrokenPipeError so we can catch it after the + # process ends to then allow us to branch on its error code: + # - if it's non-zero raise an IterableSubprocessError containing its standard error + # - if it's zero, re-raise the original BrokenPipeError + class _BrokenPipeError(Exception): + pass + + @contextmanager + def thread(target, *args): + exception = None + def wrapper(): + nonlocal exception + try: + target(*args) + except BaseException as e: + exception = e + + t = Thread(target=wrapper) + + def start(): + t.start() + + def join(): + if t.ident: + t.join() + return exception + + yield start, join + + def input_to(stdin): + try: + for chunk in input_chunks: + try: + stdin.write(chunk) + except BrokenPipeError: + raise _BrokenPipeError() + except OSError as e: + if e.errno != 22: + # Errno22 indicates an IO failure with a + # file descriptor (maybe process is dead already) + raise _BrokenPipeError() + else: + # no idea what this could be, let it bubble up + raise + finally: + try: + stdin.close() + except BrokenPipeError: + raise _BrokenPipeError() + except OSError as e: + # silently ignore Errno22, which happens on + # windows when trying to interacted with file descriptors + # associated with a process that exited already + if e.errno != 22: + raise + + def output_from(stdout): + while True: + chunk = stdout.read(chunk_size) + if not chunk: + break + yield chunk + + def keep_only_most_recent(stderr, stderr_deque): + total_length = 0 + while True: + chunk = stderr.read(chunk_size) + total_length += len(chunk) + if not chunk: + break + stderr_deque.append(chunk) + if total_length - len(stderr_deque[0]) >= chunk_size: + total_length -= len(stderr_deque[0]) + stderr_deque.popleft() + + def raise_if_not_none(exception): + if exception is not None: + raise exception from None + + proc = None + stderr_deque = deque() + exception_stdin = None + exception_stderr = None + + try: + + with \ + Popen(program, stdin=PIPE, stdout=PIPE, stderr=PIPE) as proc, \ + thread(keep_only_most_recent, proc.stderr, stderr_deque) as (start_t_stderr, join_t_stderr), \ + thread(input_to, proc.stdin) as (start_t_stdin, join_t_stdin): + + try: + start_t_stderr() + start_t_stdin() + yield output_from(proc.stdout) + except BaseException: + proc.terminate() + raise + finally: + proc.stdout.close() + exception_stdin = join_t_stdin() + exception_stderr = join_t_stderr() + + raise_if_not_none(exception_stdin) + raise_if_not_none(exception_stderr) + + except _BrokenPipeError as e: + if proc.returncode == 0: + raise e.__context__ from None + + if proc.returncode: + raise IterableSubprocessError(proc.returncode, b''.join(stderr_deque)[-chunk_size:]) + + +class IterableSubprocessError(SubprocessError): + def __init__(self, returncode, stderr): + self.returncode = returncode + self.stderr = stderr diff --git a/datalad_next/iterable_subprocess/pyproject.toml b/datalad_next/iterable_subprocess/pyproject.toml new file mode 100644 index 00000000..c6e311e0 --- /dev/null +++ b/datalad_next/iterable_subprocess/pyproject.toml @@ -0,0 +1,31 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "iterable-subprocess" +version = "0.0.0.dev0" +authors = [ + { name="Department for International Trade", email="sre@digital.trade.gov.uk" }, +] +description = "Python context manager to communicate with a subprocess using iterables of bytes rather Python's built-in subprocess module" +readme = "README.md" +requires-python = ">=3.6.7" +classifiers = [ + 'Programming Language :: Python :: 3', + 'License :: OSI Approved :: MIT License', +] + +[project.optional-dependencies] +dev = [ + "psutil", + "pytest-cov", +] + +[project.urls] +"Source" = "https://github.com/uktrade/iterable-subprocess" + +[tool.hatch.build] +include = [ + "iterable_subprocess.py" +] diff --git a/datalad_next/iterable_subprocess/test_iterable_subprocess.py b/datalad_next/iterable_subprocess/test_iterable_subprocess.py new file mode 100644 index 00000000..a68444ea --- /dev/null +++ b/datalad_next/iterable_subprocess/test_iterable_subprocess.py @@ -0,0 +1,313 @@ +import io +import sys +import threading +import time +import zipfile + +import psutil +import pytest +from threading import Thread + +from .iterable_subprocess import IterableSubprocessError, iterable_subprocess + + +def test_cat_not_necessarily_streamed(): + def yield_small_input(): + yield b'first' + yield b'second' + yield b'third' + + with iterable_subprocess(['cat'], yield_small_input()) as output: + assert b''.join(output) == b'firstsecondthird' + + +def test_cat_streamed(): + latest_input = None + + def yield_input(): + nonlocal latest_input + + for i in range(0, 10000000): + yield b'*' * 10 + latest_input = i + + with iterable_subprocess(['cat'], yield_input()) as output: + latest_input_during_output = [latest_input for _ in output] + + # Make sure the input is progressing during the output. In test, there + # are about 915 steps, so checking that it's greater than 50 shouldm't + # make this test too flakey + num_steps = 0 + prev_i = 0 + for i in latest_input_during_output: + if i != prev_i: + num_steps += 1 + prev_i = i + + assert num_steps > 50 + + +def test_process_closed_after(): + # in datalad-next we do not necessarily have no child-processes + # so determine the number of test incrementally + #assert len(psutil.Process().children(recursive=True)) == 0 + n_children = len(psutil.Process().children(recursive=True)) + with iterable_subprocess(['cat'], ()) as output: + assert len(psutil.Process().children(recursive=True)) == (n_children + 1) + assert len(psutil.Process().children(recursive=True)) == n_children + + +def test_exception_from_input_before_yield_propagated(): + def yield_input(): + raise Exception('Something went wrong') + + with pytest.raises(Exception, match='Something went wrong'): + with iterable_subprocess(['cat'], yield_input()) as output: + pass + + +def test_exception_from_input_after_yield_propagated(): + def yield_input(): + yield b'*' + raise Exception('Something went wrong') + + with pytest.raises(Exception, match='Something went wrong'): + with iterable_subprocess(['cat'], yield_input()) as output: + pass + + +def test_exception_from_input_incorrect_type_propagated(): + def yield_input(): + yield 'this-should-be-bytes' + + + with pytest.raises(TypeError): + with iterable_subprocess(['cat'], yield_input()) as output: + pass + + +@pytest.mark.parametrize("size", [ + 1, 100, 10000, 1000000, +]) +def test_exception_from_output_during_input_iterating_propagates_and_does_not_hang(size): + event = threading.Event() + + def yield_input(): + while True: + event.set() + yield b'*' * size + + with pytest.raises(Exception, match='My error'): + with iterable_subprocess(['cat'], yield_input()) as output: + event.wait() + raise Exception('My error') + + +@pytest.mark.parametrize("chunk_size", [ + 1, 100, 10000, 1000000, +]) +@pytest.mark.parametrize("at_iteration", [ + 0, 1, 100, +]) +def test_exception_from_output_iterating_propagates_and_does_not_hang(at_iteration, chunk_size): + def yield_input(): + while True: + yield b'*' * chunk_size + + with pytest.raises(Exception, match='My error'): + with iterable_subprocess(['cat'], yield_input(), chunk_size=chunk_size) as output: + for i, chunk in enumerate(output): + if i == at_iteration: + raise Exception('My error') + + +def test_exception_from_not_found_process_propagated(): + with pytest.raises(FileNotFoundError): + with iterable_subprocess(['does-not-exist'], ()) as output: + b''.join(output) + + +def test_exception_from_return_code(): + with pytest.raises(IterableSubprocessError, match='No such file or directory') as excinfo: + with iterable_subprocess(['ls', 'does-not-exist'], ()) as output: + a = b''.join(output) + + assert excinfo.value.returncode > 0 + assert b'No such file or directory' in excinfo.value.stderr + + +def test_exception_from_context_even_though_return_code_with_long_standard_error(): + with pytest.raises(Exception, match="Another exception"): + with iterable_subprocess([sys.executable, '-c', 'import sys; print("Out"); print("Error message" * 100000, file=sys.stderr); sys.exit(1)'], ()) as output: + for _ in output: + pass + raise Exception('Another exception') + + +def test_exception_from_return_code_with_long_standard_error(): + with pytest.raises(IterableSubprocessError) as excinfo: + with iterable_subprocess([sys.executable, '-c', 'import sys; print("Out"); print("Error message" * 100000, file=sys.stderr); sys.exit(2)'], ()) as output: + for _ in output: + pass + + assert excinfo.value.returncode == 2 + assert len(excinfo.value.stderr) == 65536 + + +def test_if_process_exits_with_non_zero_error_code_and_inner_exception_it_propagates(): + def yield_input(): + while True: + yield b'*' * 10 + + with pytest.raises(Exception, match='Another exception'): + with iterable_subprocess([ + sys.executable, '-c', 'import sys; print("The error", file=sys.stderr); print("After output"); sys.exit(1)', + ], yield_input()) as output: + all_output = b''.join(output) + raise Exception('Another exception') + + # rstrip to account for different platform line endings here + assert all_output.rstrip() == b'After output' + + + +def test_if_process_closes_standard_input_but_exits_with_non_zero_error_code_then_broken_pipe_error(): + def yield_input(): + while True: + yield b'*' * 10 + + with pytest.raises(BrokenPipeError): + with iterable_subprocess([ + sys.executable, '-c', 'import sys; sys.stdin.close(); print("The error", file=sys.stderr); print("After output"); sys.exit(0)', + ], yield_input()) as output: + all_output = b''.join(output) + + # rstrip to account for different platform line endings here + assert all_output.rstrip() == b'After output' + + +def test_if_process_closes_standard_input_but_exits_with_non_zero_error_code_then_iterable_subprocess_error(): + def yield_input(): + while True: + yield b'*' * 10 + + with pytest.raises(IterableSubprocessError) as excinfo: + with iterable_subprocess([ + sys.executable, '-c', 'import sys; sys.stdin.close(); print("The error", file=sys.stderr); print("After output"); sys.exit(3)', + ], yield_input()) as output: + all_output = b''.join(output) + + # rstrip to account for different platform line endings here + assert all_output.rstrip() == b'After output' + assert excinfo.value.returncode == 3 + assert excinfo.value.stderr.rstrip()== b'The error' + + +def test_program_that_outputs_for_a_long_time_is_interrupted_on_context_exit(): + start = time.monotonic() + + with pytest.raises(IterableSubprocessError) as excinfo: + with iterable_subprocess([sys.executable, '-c', 'import time; start = time.monotonic()\nwhile (time.monotonic() - start) < 60:\n print("Output" * 1000)'], ()) as output: + pass + + end = time.monotonic() + + assert excinfo.value.returncode != 0 + # alternative condition reflects error communication on windows (errno22) + assert b'BrokenPipeError' in excinfo.value.stderr or b'Errno 22' in excinfo.value.stderr + assert end - start < 10 + + +def test_program_that_sleeps_exits_quickly_if_exception(): + start = time.monotonic() + + with pytest.raises(Exception, match='From context'): + with iterable_subprocess([sys.executable, '-c', 'import time; time.sleep(60)'], ()) as output: + raise Exception('From context') + + end = time.monotonic() + + assert end - start < 10 + + +def test_program_that_sleeps_exits_quickly_if_keyboard_interrupt(): + start = time.monotonic() + + with pytest.raises(KeyboardInterrupt, match='From context'): + with iterable_subprocess([sys.executable, '-c', 'import time; time.sleep(60)'], ()) as output: + raise KeyboardInterrupt('From context') + + end = time.monotonic() + + assert end - start < 10 + + +def test_program_that_sleeps_exits_quickly_if_keyboard_interrupt_just_before_thread_starts(monkeypatch): + start = time.monotonic() + + def start_that_raises_keyboard_interrupt(self): + raise KeyboardInterrupt('Just before starting thread') + monkeypatch.setattr(Thread, 'start', start_that_raises_keyboard_interrupt) + + with pytest.raises(KeyboardInterrupt, match='Just before starting thread'): + iterable_subprocess([sys.executable, '-c', 'import time; time.sleep(60)'], ()).__enter__() + + end = time.monotonic() + + assert end - start < 10 + + +def test_program_that_sleeps_exits_quickly_if_keyboard_interrupt_just_after_thread_starts(monkeypatch): + start = time.monotonic() + + original_start = Thread.start + def start_that_raises_keyboard_interrupt(self): + original_start(self) + raise KeyboardInterrupt('Just after starting thread') + monkeypatch.setattr(Thread, 'start', start_that_raises_keyboard_interrupt) + + with pytest.raises(KeyboardInterrupt, match='Just after starting thread'): + iterable_subprocess([sys.executable, '-c', 'import time; time.sleep(60)'], ()).__enter__() + + end = time.monotonic() + + assert end - start < 10 + + +def test_program_that_sleeps_not_quickly_if_no_exception(): + start = time.monotonic() + + with iterable_subprocess([sys.executable, '-c', 'import time; time.sleep(2)'], ()) as output: + pass + + end = time.monotonic() + + assert end - start > 2 + + +def test_funzip_no_compression(): + contents = b'*' * 100000 + + def yield_input(): + file = io.BytesIO() + with zipfile.ZipFile(file, 'w', zipfile.ZIP_STORED) as zf: + zf.writestr('any.txt', contents) + + yield file.getvalue() + + with iterable_subprocess(['funzip'], yield_input()) as output: + assert b''.join(output) == contents + + +def test_funzip_deflate(): + contents = b'*' * 100000 + + def yield_input(): + file = io.BytesIO() + with zipfile.ZipFile(file, 'w', zipfile.ZIP_DEFLATED) as zf: + zf.writestr('any.txt', contents) + + yield file.getvalue() + + with iterable_subprocess(['funzip'], yield_input()) as output: + assert b''.join(output) == contents diff --git a/datalad_next/itertools/__init__.py b/datalad_next/itertools/__init__.py new file mode 100644 index 00000000..87696217 --- /dev/null +++ b/datalad_next/itertools/__init__.py @@ -0,0 +1,13 @@ +"""Various iterators, e.g., for subprocess pipelining and output processing + +.. currentmodule:: datalad_next.itertools +.. autosummary:: + :toctree: generated + + decode_bytes + itemize +""" + + +from .decode_bytes import decode_bytes +from .itemize import itemize diff --git a/datalad_next/itertools/decode_bytes.py b/datalad_next/itertools/decode_bytes.py new file mode 100644 index 00000000..d05933b1 --- /dev/null +++ b/datalad_next/itertools/decode_bytes.py @@ -0,0 +1,76 @@ +"""Iterator that decodes bytes into strings""" + +from __future__ import annotations + +from typing import ( + Generator, + Iterable, +) + + +__all__ = ['decode_bytes'] + + +def decode_bytes( + iterable: Iterable[bytes], + encoding: str = 'utf-8', + backslash_replace: bool = True, +) -> Generator[str, None, None]: + """Decode bytes in an ``iterable`` into strings + + Parameters + ---------- + iterable: Iterable[bytes] + Iterable that yields bytes that should be decoded. + encoding: str (default: ``'utf-8'``) + Encoding to be used for decoding. + backslash_replace: bool (default: ``True``) + If ``True``, backslash-escapes are used for undecodable bytes. If + ``False``, a ``UnicodeDecodeError`` is raised if a byte sequence cannot + be decoded. + + Yields + ------ + str + Decoded strings that are generated by decoding the data yielded by + ``iterable`` with the specified ``encoding`` + + Raises + ------ + UnicodeDecodeError + If ``backslash_replace`` is ``False`` and the data yielded by + ``iterable`` cannot be decoded with the specified ``encoding`` + """ + joined_data = b'' + position = 0 + for chunk in iterable: + joined_data += chunk + while position < len(joined_data): + try: + yield joined_data[position:].decode(encoding) + joined_data = b'' + except UnicodeDecodeError as e: + # If an encoding error occurs, we first check whether it was + # in the middle of `joined_data` or whether it extends until the + # end of `joined_data`. + # If it occurred in the middle of + # `joined_data`, we replace it with backslash encoding or + # re-raise the decoding error. + # If it occurred at the end of `joined_data`, we wait for the + # next chunk, which might fix the problem. + if position + e.end == len(joined_data): + # Wait for the next chunk, which might fix the problem + break + else: + if not backslash_replace: + # Signal the error to the caller + raise + else: + yield ( + joined_data[:position + e.start].decode(encoding) + + joined_data[position + e.start:position + e.end].decode( + encoding, + errors='backslashreplace' + ) + ) + position += e.end diff --git a/datalad_next/itertools/itemize.py b/datalad_next/itertools/itemize.py new file mode 100644 index 00000000..ad972396 --- /dev/null +++ b/datalad_next/itertools/itemize.py @@ -0,0 +1,126 @@ +""" Generator the emits only complete lines """ + +from __future__ import annotations + +from typing import ( + Generator, + Iterable, +) + + +__all__ = ['itemize'] + + +def itemize( + iterable: Iterable[bytes | str], + separator: str | bytes | None = None, + keep_ends: bool = False, +) -> Generator[bytes | str, None, None]: + """ Generator that emits only complete items from chunks of an iterable + + This generator consumes chunks from an iterable and yields items defined by + a separator. An item might span multiple input chunks. + + Items are defined by a ``separator``. If ``separator`` is ``None``, the + line-separators built into `str.plitlines` are used. + + The generator works on string or byte chunks, depending on the type of the + first element in ``iterable``. During its runtime, the type of the elements + in ``iterable`` must not change. If ``separator`` is not `None`, its type + must match the type of the elements in ``iterable``. + + The complexity of itemization without a defined separator is higher than + the complexity of itemization with a defined separator (this is due to + the externally unavailable set of line-separators that are built into + `splitlines`). + + Runtime with ``keep_end=False`` is faster than otherwise, when a separator + is defined. + + EOF ends all lines, but will never be present in the result, even if + ``keep_ends`` is ``True``. + + Parameters + ---------- + iterable: Iterable[bytes | str] + The iterable that yields the input data + separator: str | bytes | None + The separator that defines items. If ``None``, the items are + determined by the line-separators that are built into `splitlines`. + keep_ends: bool + If `True`, the item-separator will be present at the end of a + yielded item line. If `False`, items will not contain the + separator. Preserving separators an additional implies a runtime cost. + + Yields + ------ + bytes | str + The items determined from the input iterable. The type of the yielded + lines depends on the type of the first element in ``iterable``. + """ + if separator is None: + yield from _split_lines(iterable, keep_ends=keep_ends) + else: + yield from _split_lines_with_separator( + iterable, + separator=separator, + keep_ends=keep_ends, + ) + + +def _split_lines_with_separator(iterable: Iterable[bytes | str], + separator: str | bytes, + keep_ends: bool = False, + ) -> Generator[bytes | str, None, None]: + assembled = None + for chunk in iterable: + if not assembled: + assembled = chunk + else: + assembled += chunk + lines = assembled.split(sep=separator) + if len(lines) == 1: + continue + + if assembled.endswith(separator): + assembled = None + else: + assembled = lines[-1] + lines.pop(-1) + if keep_ends: + for line in lines: + yield line + separator + else: + yield from lines + + if assembled: + yield assembled + + +def _split_lines(iterable: Iterable[bytes | str], + keep_ends: bool = False, + ) -> Generator[bytes | str, None, None]: + assembled = None + for chunk in iterable: + if not assembled: + assembled = chunk + else: + assembled += chunk + # We don't know all elements on which python splits lines, therefore we + # split once with ends and once without ends. Lines that differ have no + # ending + lines_with_end = assembled.splitlines(keepends=True) + lines_without_end = assembled.splitlines(keepends=False) + if lines_with_end[-1] == lines_without_end[-1]: + assembled = lines_with_end[-1] + lines_with_end.pop(-1) + lines_without_end.pop(-1) + else: + assembled = None + if keep_ends: + yield from lines_with_end + else: + yield from lines_without_end + + if assembled: + yield assembled diff --git a/datalad_next/itertools/tests/__init__.py b/datalad_next/itertools/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/datalad_next/itertools/tests/test_decode_bytes.py b/datalad_next/itertools/tests/test_decode_bytes.py new file mode 100644 index 00000000..ff38e0ee --- /dev/null +++ b/datalad_next/itertools/tests/test_decode_bytes.py @@ -0,0 +1,35 @@ +from __future__ import annotations + +import sys +import timeit + +from ..decode_bytes import decode_bytes + + +def test_split_decoding(): + encoded = 'ö'.encode('utf-8') + part_1, part_2 = encoded[:1], encoded[1:] + + # check that incomplete encodings are caught + r = tuple(decode_bytes([b'abc' + part_1, part_2 + b'def'])) + assert ''.join(r) == 'abcödef' + + +def test_unfixable_error_decoding(): + encoded = 'ö'.encode('utf-8') + part_1, part_2 = encoded[:1], encoded[1:] + + # check that incomplete encodings are caught + r = tuple(decode_bytes([b'abc' + part_1 + b'def' + part_1, part_2 + b'ghi'])) + assert ''.join(r) == 'abc\\xc3deföghi' + + +def test_performance(): + encoded = 'ö'.encode('utf-8') + part_1, part_2 = encoded[:1], encoded[1:] + + # check that incomplete encodings are caught + iterable = [b'abc' + part_1 + b'def' + part_1, part_2 + b'ghi'] + + d1 = timeit.timeit(lambda: tuple(decode_bytes(iterable)), number=1000000) + print(d1, file=sys.stderr) diff --git a/datalad_next/itertools/tests/test_itemize.py b/datalad_next/itertools/tests/test_itemize.py new file mode 100644 index 00000000..7d62b187 --- /dev/null +++ b/datalad_next/itertools/tests/test_itemize.py @@ -0,0 +1,45 @@ +from __future__ import annotations + +import pytest + +from ..itemize import itemize + + +text_chunks = [ + 'abc', + 'def\n012', + '\n', + '\n' +] +byte_chunks = [chunk.encode() for chunk in text_chunks] +text_chunks_other = [chunk.replace('\n', '\r\n') for chunk in text_chunks] +byte_chunks_other = [chunk.encode() for chunk in text_chunks_other] + + +@pytest.mark.parametrize( + 'input_chunks,separator', + [ + (text_chunks, '\n'), + (byte_chunks, b'\n'), + (text_chunks_other, '\r\n'), + (byte_chunks_other, b'\r\n') + ] +) +def test_assembling_and_splitting(input_chunks, separator): + empty = input_chunks[0][:0] + + r = tuple(itemize(input_chunks, keep_ends=True)) + assert len(r) == 3 + assert empty.join(r) == empty.join(input_chunks) + + r = tuple(itemize(input_chunks, separator=separator, keep_ends=True)) + assert len(r) == 3 + assert empty.join(r) == empty.join(input_chunks) + + r = tuple(itemize(input_chunks, separator=separator)) + assert len(r) == 3 + assert empty.join(r) == empty.join(input_chunks).replace(separator, empty) + + r = tuple(itemize(input_chunks + input_chunks[:1], separator=separator, keep_ends=True)) + assert len(r) == 4 + assert r[3] == input_chunks[0] diff --git a/datalad_next/runners/__init__.py b/datalad_next/runners/__init__.py index cca244f9..7a96f952 100644 --- a/datalad_next/runners/__init__.py +++ b/datalad_next/runners/__init__.py @@ -7,7 +7,7 @@ Low-level tooling ----------------- -Two essential process execution/management utilities are provided, for +Few process execution/management utilities are provided, for generic command execution, and for execution command in the context of a Git repository. @@ -16,6 +16,7 @@ GitRunner Runner + iter_subproc Additional information on the design of the subprocess execution tooling is available from https://docs.datalad.org/design/threaded_runner.html @@ -41,6 +42,8 @@ StdOutErrCapture """ +from .iter_subproc import iter_subproc + # runners from datalad.runner import ( GitRunner, diff --git a/datalad_next/runners/iter_subproc.py b/datalad_next/runners/iter_subproc.py new file mode 100644 index 00000000..d1193a17 --- /dev/null +++ b/datalad_next/runners/iter_subproc.py @@ -0,0 +1,57 @@ +from __future__ import annotations +from typing import List + +from datalad_next.iterable_subprocess.iterable_subprocess \ + import iterable_subprocess +from datalad_next.consts import COPY_BUFSIZE + +__all__ = ['iter_subproc'] + + +def iter_subproc( + args: List[str], + *, + input: List[bytes] | None = None, + chunk_size: int = COPY_BUFSIZE, +): + """Context manager to communicate with a subprocess using iterables + + This offers a higher level interface to subprocesses than Python's + built-in ``subprocess`` module. It allows a subprocess to be naturally + placed in a chain of iterables as part of a data processing pipeline. + It is also helpful when data won't fit in memory and has to be streamed. + + This is a convenience wrapper around ``datalad_next.iterable_subprocess``, + which itself is a slightly modified (for use on Windows) fork of + https://github.com/uktrade/iterable-subprocess, written by + Michal Charemza. + + Parameters + ---------- + args: list + Sequence of program arguments to be passed to ``subprocess.Popen``. + input: iterable, optional + If given, chunks of ``bytes`` to be written, iteratively, to the + subprocess's ``stdin``. + chunk_size: int, optional + Size of chunks to read from the subprocess's stdout/stderr in bytes. + + Returns + ------- + contextmanager + On entering the context, the subprocess is started, the thread to read + from standard error is started, the thread to populate subprocess + input is started. + When running, the standard input thread iterates over the input, + passing chunks to the process, while the standard error thread + fetches the error output, and while the main thread iterates over + the process's output from client code in the context. + On context exit, the main thread closes the process's standard output, + waits for the standard input thread to exit, waits for the standard + error thread to exit, and wait for the process to exit. + """ + return iterable_subprocess( + args, + tuple() if input is None else input, + chunk_size=COPY_BUFSIZE, + ) diff --git a/datalad_next/utils/consts.py b/datalad_next/utils/consts.py index ff1fa9b0..2695d919 100644 --- a/datalad_next/utils/consts.py +++ b/datalad_next/utils/consts.py @@ -1,9 +1,5 @@ -try: - from shutil import COPY_BUFSIZE -except ImportError: # pragma: no cover - # too old - from datalad_next.utils import on_windows - # from PY3.10 - COPY_BUFSIZE = 1024 * 1024 if on_windows else 64 * 1024 - -from datalad.consts import PRE_INIT_COMMIT_SHA +# ATTN! These are legacy imports. DO NOT ADD ANYTHING! +from datalad_next.consts import ( + COPY_BUFSIZE, + PRE_INIT_COMMIT_SHA, +) diff --git a/docs/source/pyutils.rst b/docs/source/pyutils.rst index b97a248d..410945fd 100644 --- a/docs/source/pyutils.rst +++ b/docs/source/pyutils.rst @@ -19,9 +19,11 @@ packages. commands config constraints + consts credman datasets exceptions + itertools iter_collections runners tests.fixtures diff --git a/setup.cfg b/setup.cfg index 3f6897ae..50e0922e 100644 --- a/setup.cfg +++ b/setup.cfg @@ -29,6 +29,8 @@ devel = pytest pytest-cov coverage + # for iterable_subprocess + psutil # for webdav testing cheroot wsgidav