Skip to content

Commit

Permalink
Merge branch 'minimal-batch' into iter-annexworktree-run-and-batch
Browse files Browse the repository at this point in the history
  • Loading branch information
christian-monch committed Nov 6, 2023
2 parents 34db805 + 5aea03b commit 63b21ab
Show file tree
Hide file tree
Showing 6 changed files with 246 additions and 53 deletions.
27 changes: 10 additions & 17 deletions datalad_next/constraints/git.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
"""Constraints for Git-related concepts and parameters"""

from datalad_next.runners import (
CommandError,
GitRunner,
StdOutCapture,
)
# We use GitRunner only to provide the adjusted environment
from datalad.runner import GitRunner as _EnvAdjuster
from datalad_next.runners import StdOutCapture
from datalad_next.runners.run import run

from .base import Constraint

Expand Down Expand Up @@ -43,7 +42,6 @@ def __call__(self, value: str) -> str:
# simple, do here
self.raise_for(value, 'refname must not be empty')

runner = GitRunner()
cmd = ['git', 'check-ref-format']
cmd.append('--allow-onelevel'
if self._allow_onelevel
Expand All @@ -55,18 +53,13 @@ def __call__(self, value: str) -> str:

cmd.append(value)

try:
out = runner.run(cmd, protocol=StdOutCapture)
except CommandError as e:
self.raise_for(
value,
'is not a valid refname',
__caused_by__=e,
)
applied_env = _EnvAdjuster()._get_adjusted_env()

if self._normalize:
return out['stdout'].strip()
else:
with run(cmd, protocol_class=StdOutCapture, env=applied_env) as out:
if out['code'] != 0:
self.raise_for(value,'is not a valid refname')
if self._normalize:
return out['stdout'].strip()
return value

def short_description(self):
Expand Down
79 changes: 62 additions & 17 deletions datalad_next/runners/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,32 +4,67 @@
.. currentmodule:: datalad_next.runners
Low-level tooling
-----------------
Two essential process execution/management utilities are provided, for
generic command execution, and for execution command in the context
of a Git repository.
Execution of subprocesses is provided by the context manager
:func:`datalad_next.runners.run.run`. The context manager uses protocols
to process the output of the subprocess. A number of protocols are provided
in this module. New protocols can be implemented by the user of the
``run``-context.
There are two execution modes for subprocesses, a synchronous mode and an
asynchronous mode.
In the synchronous mode the subprocess will be executed and
the result of the ``as``-variable is (usually) a dictionary containing the
exit code, stdout-output, and stderr-output of the subprocess execution (whether
stdout- and stderr-content is actually contained depends on the protocol that
was used).
In asynchronous mode the ``as``-variable will reference a generator that will
yield output from the subprocess as soon as it is available, i.e. as soon as the
protocol "sends" them to the generator. After the process has exited, the
generator will stop the iteration and the exit code of the process will be
available in the ``return_code``-attribute of the generator.
The context manager will only exit the context, if the subprocess has exited.
That means, a subprocess that never exits will prevent the context manager from
exiting the context. The user of the run-context should therefore trigger a
subprocess exit before leaving the context, e.g. by closing stdin of the
subprocess, or by sending it a signal.
To ensure that control flow leaves the run-context, the run-context provides
timeouts that will terminate and, if termination fails, finally kill the
subprocess. Timeouts are supported in synchronous and in asynchronous mode.
In synchronous mode the timeout is measured from the moment when the context
is entered. In asynchronous mode, the timeout is measured when fetching the next
element from the result generator and stopped when the result generator yields
the element. In asynchronous mode the timeout is also measured, when the
control flow leaves the run-context, i.e. when the control flow enters the
exit-handler of the context. (Check the documentation of the context manager
:func:`datalad_next.runners.run.run`. For additional keyword arguments check
also the documentation of
:class:`datalad.runner.nonasyncrunner.ThreadedRunner`.)
.. currentmodule:: datalad_next.runners
.. autosummary::
:toctree: generated
GitRunner
Runner
run
Additional information on the design of the subprocess execution tooling
is available from https://docs.datalad.org/design/threaded_runner.html
A standard exception type is used to communicate any process termination
with a non-zero exit code
with a non-zero exit code (unless the keyword argument ``exception_on_error`` is
set to ``False``.
.. autosummary::
:toctree: generated
CommandError
Command output can be processed via "protocol" implementations that are
inspired by ``asyncio.SubprocessProtocol``.
inspired by ``asyncio.SubprocessProtocol``. The following synchronous protocols
are provided in ``datalad_next.runners``
.. autosummary::
:toctree: generated
Expand All @@ -39,13 +74,23 @@
StdOutCapture
StdErrCapture
StdOutErrCapture
"""
# runners
from datalad.runner import (
GitRunner,
Runner,
)
In addition, ``datalad_next.runners`` provides the following asynchronous
protocols:
.. autosummary::
:toctree: generated
NoCaptureGeneratorProtocol
StdOutCaptureGeneratorProtocol
Low-level tooling
-----------------
The ``run``-context uses the class :class:`ThreadedRunner` to execute
subprocesses. Additional information on the design of :class:`ThreadedRunner`
is available from https://docs.datalad.org/design/threaded_runner.html
"""

# protocols
from datalad.runner import (
Expand Down
70 changes: 55 additions & 15 deletions datalad_next/runners/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
:toctree: generated
StdOutCaptureGeneratorProtocol
StdOutLineCaptureGeneratorProtocol
GeneratorAnnexJsonProtocol
_ResultGenerator
"""
Expand Down Expand Up @@ -74,6 +75,7 @@ def batchcommand(
terminate_time: int | None = None,
kill_time: int | None = None,
protocol_kwargs: dict | None = None,
**kwargs,
) -> Generator[BatchProcess, None, None]:
"""Generic context manager for batch processes
Expand Down Expand Up @@ -110,7 +112,7 @@ def batchcommand(
While this generic context manager can be used directly, it can be
more convenient to use any of the more specialized implementations
that employ a specific protocol (e.g., :func:`stdout_batchcommand`,
:func:`annexjson_batchcommand`).
:func:`stdoutline_batchcommand`, or :func:`annexjson_batchcommand`).
Parameters
----------
Expand All @@ -131,6 +133,10 @@ def batchcommand(
after ``kill_time`` timeouts.
It is a good idea to set ``kill_time`` and ``terminate_time`` in order
to let the process exit gracefully, if it is capable to do so.
kwargs : dict
All other keyword arguments are forwarded to the
``datalad_next.runners.run.run``-context manager, that is used to
execute the batch command.
Yields
-------
Expand All @@ -148,7 +154,8 @@ def batchcommand(
cwd=cwd,
terminate_time=terminate_time,
kill_time=kill_time,
protocol_kwargs=protocol_kwargs
protocol_kwargs=protocol_kwargs,
**kwargs,
) as result_generator:
batch_process = BatchProcess(result_generator)
try:
Expand All @@ -164,6 +171,7 @@ def stdout_batchcommand(
cwd: Path | None = None,
terminate_time: int | None = None,
kill_time: int | None = None,
**kwargs,
) -> Generator[BatchProcess, None, None]:
"""Context manager for commands that produce arbitrary output on ``stdout``
Expand All @@ -177,49 +185,80 @@ def stdout_batchcommand(
cwd=cwd,
terminate_time=terminate_time,
kill_time=kill_time,
**kwargs,
)


def annexjson_batchcommand(
def stdoutline_batchcommand(
cmd: list,
cwd: Path | None = None,
separator: str | None = None,
keep_ends: bool = False,
terminate_time: int | None = None,
kill_time: int | None = None,
protocol_kwargs: dict | None = None,
**kwargs,
) -> Generator[BatchProcess, None, None]:
"""
Context manager for git-annex commands that support ``--batch --json``
"""Context manager for batch commands that respond with a single line.
The given ``cmd``-list must be complete, i.e., include
``git annex ... --batch --json``, and any additional flags that may be
needed.
The context manager returns a ``BatchProcess``-instance in the
``as``-Variable. It uses :class:`StdOutLineCaptureGeneratorProtocol` to
process data from the subprocess. This protocol will yield individual lines,
of decoded data.
The context manager can be used, for example, to interact with
git-annex commands that support ``--batch`` and return a single line in
response to an input.
Internally this calls :func:`batchcommand` with the
:class:`GeneratorAnnexJsonProtocol` protocol implementation. See the
documentation of :func:`batchcommand` for a description of the parameters.
:class:`StdOutLineCaptureGeneratorProtocol` protocol implementation. See the
Parameters
----------
separator : str | None (default ``None``)
If ``None``, lines are separated by the built-in separators of python.
If not ``None``, lines are separated by the given character.
The``separator`` keyword argument will override a ``separator`` keyword
argument provided in the ``protocol_kwargs``.
keep_ends: bool (default: ``False``)
If ``False``, the returned lines will not contain the terminating
character. If ``True``, the returned lines will contain the terminating
character.
The``keep_ends`` keyword argument will override a ``keep_ends`` keyword
argument provided in the ``protocol_kwargs``.
See the documentation of :func:`batchcommand` for a description of the
remaining parameters.
"""
return batchcommand(
cmd,
protocol_class=GeneratorAnnexJsonProtocol,
protocol_class=StdOutLineCaptureGeneratorProtocol,
cwd=cwd,
terminate_time=terminate_time,
kill_time=kill_time,
protocol_kwargs=protocol_kwargs,
protocol_kwargs=dict(
**(protocol_kwargs or {}),
separator=separator,
keep_ends=keep_ends,
),
**kwargs,
)


def annexline_batchcommand(
def annexjson_batchcommand(
cmd: list,
cwd: Path | None = None,
terminate_time: int | None = None,
kill_time: int | None = None,
protocol_kwargs: dict | None = None,
**kwargs,
) -> Generator[BatchProcess, None, None]:
"""
Context manager for git-annex commands that support ``--batch --json``
The given ``cmd``-list must be complete, i.e., include
``git annex ... --batch``, and any additional flags that may be
``git annex ... --batch --json``, and any additional flags that may be
needed.
Internally this calls :func:`batchcommand` with the
Expand All @@ -228,9 +267,10 @@ def annexline_batchcommand(
"""
return batchcommand(
cmd,
protocol_class=StdOutLineCaptureGeneratorProtocol,
protocol_class=GeneratorAnnexJsonProtocol,
cwd=cwd,
terminate_time=terminate_time,
kill_time=kill_time,
protocol_kwargs=protocol_kwargs,
**kwargs,
)
25 changes: 25 additions & 0 deletions datalad_next/runners/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"""
from __future__ import annotations

import logging
import subprocess
import sys
from collections.abc import Generator
Expand All @@ -24,6 +25,9 @@
)


lgr = logging.getLogger('datalad.ext.next.runners.run')


def _create_kill_wrapper(cls: type[Protocol]) -> type[Protocol]:
""" Extend ``cls`` to supports the "kill-interface"
Expand Down Expand Up @@ -132,6 +136,7 @@ def run(
terminate_time: int | None = None,
kill_time: int | None = None,
protocol_kwargs: dict | None = None,
**kwargs,
) -> Any | Generator:
""" A context manager for subprocesses
Expand Down Expand Up @@ -199,6 +204,11 @@ def run(
protocol_kwargs : dict
A dictionary with Keyword arguments that will be used when
instantiating the protocol class.
kwargs : dict
All other keyword arguments are forwarded the ``subprocess.Popen`` call
that is used to start the subprocess. Note: some keywords are set
internally by the underlying class :class:`ThreadedRunner`. The values
given here will be ignored for those keywords.
Yields
-------
Expand Down Expand Up @@ -237,6 +247,7 @@ def run(
timeout=timeout,
exception_on_error=False,
cwd=cwd,
**kwargs,
)
result = runner.run()
# We distinguish between a non-generator run, i,e. a blocking run and
Expand All @@ -246,6 +257,16 @@ def run(
else:
try:
yield KillingResultGenerator(result)
except:
e = sys.exc_info()[1]
if kill_time is None:
lgr.warning(
f'Possible stall: exception ({e!r}) raised in '
f'run-context ({cmd!r}) without kill-timeout. Waiting for '
f'subprocess exit, If subprocess exit was not triggered yet, '
f'this might wait forever.'
)
raise
finally:
# Arm the protocol, that will enable terminate signaling or kill
# signaling, if terminate_time or kill_time are not None.
Expand All @@ -256,5 +277,9 @@ def run(
# the result code of the terminated process.
# NOTE: if terminate_time and kill_time are both None, this might
# loop forever.
lgr.debug(
f'Waiting for termination of {cmd!r}, terminate_time: '
f'{terminate_time!r}, kill_time: {kill_time!r}'
)
for _ in result:
pass
Loading

0 comments on commit 63b21ab

Please sign in to comment.