Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alternative paradigm for concurrent batch processes #537

Closed
mih opened this issue Nov 15, 2023 · 6 comments · Fixed by #538
Closed

Alternative paradigm for concurrent batch processes #537

mih opened this issue Nov 15, 2023 · 6 comments · Fixed by #538

Comments

@mih
Copy link
Member

mih commented Nov 15, 2023

I searched for alternatives to our current ThreadedRunner-based implementation for batch process handling (with context managers).

Related:

and some more prior attempts by myself. All solutions so far appear to be rather heavy, and do not deliver the necessary performance.

Below is a demo of a leaner solution:

It runs the equivalent of the following pipeline:

git ls-files | git annex find --anything --format="\${key}\n" --batch | grep -v '^[[:space:]]*$' | git annex examinekey --format="\${objectpath}\n" --batch

As shown below, it connects three subprocesses via a "pipe". A custom iterator takes the conceptual place of our "protocols" and performs some output processing (pipe-inline).

from iterable_subprocess import iterable_subprocess

# small helper to ensure that only complete records are passed on
# here a record is a full line
# XXX this implementation is not fully correct, few lines seem to be missing
def glue(iter):
    count = 0
    trailer = None
    for i in iter:
        lines = i.splitlines()
        if trailer is not None:
            count += 1
            yield trailer + lines[0]
            trailer = None
            lines = lines[1:]
        if not lines[-1].endswith(b'\n'):
            trailer = lines[-1]
            lines = lines[:-1]
        for line in lines:
            if not line:
                continue
            count += 1
            yield line + b'\n'
    if trailer is not None:
        count += 1
        yield trailer


result_recs = []
with \
        iterable_subprocess(
            ['git', 'ls-files'], tuple()) as glsf, \
        iterable_subprocess(
            ['git', 'annex', 'find', '--anything', '--format=\${key}\n', '--batch'], glsf) as gaf, \
        iterable_subprocess(
            ['git', 'annex', 'examinekey', '--format=\${objectpath}\n', '--batch'], glue(gaf)) as gek:
    for r in glue(gek):
        print(r)

Benchmark:

❯ multitime -n5 python ~/hacking/datalad/next/iter_subproc.py > /dev/null
===> multitime results
1: python /home/mih/hacking/datalad/next/iter_subproc.py
            Mean        Std.Dev.    Min         Median      Max
real        3.208       0.073       3.099       3.239       3.302       
user        2.806       0.037       2.754       2.805       2.864       
sys         2.113       0.036       2.084       2.093       2.181       

❯ multitime -n 5 git ls-files | git annex find --anything --format="\${key}\n" --batch | grep -v '^[[:space:]]*$' | git annex examinekey --format="\${objectpath}\n" --batch > /dev/null
===> multitime results
1: git ls-files
            Mean        Std.Dev.    Min         Median      Max
real        3.276       0.040       3.234       3.267       3.327       
user        0.009       0.003       0.005       0.008       0.013       
sys         0.004       0.003       0.000       0.004       0.009       

So on my machine and test dataset with 36k files, I achieve the same performance as with the shell command.

I like this solution, because it has minimal (code) overhead. The underlying implementation of iterable_subprocess is ~100 lines of code on top of the stdlib.

While this looks good and solves an urgently needed use case, it is unclear to me what this code cannot do, that ThreadedRunner does for our existing implementations.

@christian-monch please have a look!

@mih
Copy link
Member Author

mih commented Nov 16, 2023

#538 has a port to Windows, and is ready to merge.

Next steps:

  • Implement post-processors for command output. This is similar to what was done in and for Add batch command-context manager on top of run-context manager #532, but the implementations must be (simple) iterators. They would be injected into the processing pipelines (similar to the toy example glue() above) -- pretty much like main-thread pipeline elements.\
    • join and split incoming chunks to yield items that fit a pattern (think: lines, or zero-byte-separated)
    • yield JSON-lines

@christian-monch
Copy link
Contributor

christian-monch commented Nov 16, 2023

Very interesting, nice short implementation, nice interface. A very good idea to use an additional thread to fill stdin. That makes it quite fast.

Paradigm

The pipeline approach is nice, but AFAICS there are two open questions:

  • It seems that the pipeline approach requires to route data around individual sub-processes, if this data is to be retained until the end of the pipeline. For example, filenames and annex-status get lost in the example pipeline. This data routing and retaining has to be performed most likely by specific generators that process the iterable input to each pipeline step.

  • How challenging is it to keep a generator-approach, if we have to run multiple pipelines to create a result (if that is the case).

I think it would be interesting to see how, for example, iter_annexworktree is implemented with this paradigm.

Implementation

In order to evaluate how much code duplication, code change, and code reuse we would suffer or enjoy, I added the thread-based approach for stdin-writing to the ThreadedRunner in datalad-core, and an additional iterable-parameter to
the batchcommand of datalad_next. The runtime of the equivalent (33k annexed files) was:

> multitime -n5 python /.../scratch_216.py >/tmp/216
===> multitime results
1: python /.../scratch_216.py
            Mean        Std.Dev.    Min         Median      Max
real        4.719       0.224       4.543       4.636       5.162       
user        5.566       0.100       5.465       5.524       5.711       
sys         1.855       0.097       1.700       1.885       1.985       

The equivalent execution with iterable_subprocess is slightly faster:

multitime -n5 python /.../scratch_206.py >/tmp/206
===> multitime results
1: python /.../scratch_206.py
            Mean        Std.Dev.    Min         Median      Max
real        3.058       0.032       3.025       3.038       3.113       
user        3.419       0.016       3.402       3.414       3.450       
sys         1.310       0.034       1.284       1.290       1.373       

The datalad-runner based implementation is about 1.5 times slower than the iterable-subprocess-implementation.

Other criteria

Pros of iterable_subprocess

  • Fast
  • Small (the implementation is quite small)

Cons of iterable_subprocess

  • No timeout
  • No guaranteed subprocess killing (should be easy to add)
  • No stderr capture (relevance of this point is debatable)
  • No protocol support (relevance debatable)

@yarikoptic
Copy link
Member

I am curious on how it would compare to async implementation which would do the same. Not that we would be able to use it within datalad but as a data point to beware of. @jwodder would it be hard to come up with a "native" async version for such batching? I guess we might have it already to some degree within dandisets tooling?

@jwodder
Copy link
Member

jwodder commented Nov 16, 2023

@yarikoptic I'm not entirely clear what you're asking. backups2datalad uses asynchronous execution to write to the stdin of a git-annex addurl process in one task while reading from stdout in another task. If you wanted to pipe multiple processes together, I'd just use the stdout filehandle of each process as the stdin for the next one and only explicitly manipulate the streams at the ends. If by "native" you mean "using no third-party libraries", my original draft implementation of the code did that, but I'd now much prefer to either use anyio or Python 3.11's TaskGroups.

@mih
Copy link
Member Author

mih commented Nov 16, 2023

Here is a complete implementation that sits on #538

It goes all-in with the itertools!!

Bit slower than the sketch above, but still faster than anything else.

❯ multitime -n5 python ~/hacking/datalad/next/iter_subproc.py > /dev/null
===> multitime results
1: python /home/mih/hacking/datalad/next/iter_subproc.py
            Mean        Std.Dev.    Min         Median      Max
real        3.472       0.037       3.410       3.472       3.522       
user        3.552       0.044       3.466       3.563       3.591       
sys         2.201       0.046       2.120       2.221       2.248       

Note that this is using iterable_subprocess from datalad-next, but this is just a slightly fixed up version that also works on windows. Other than that, this is using no datalad code and is self-contained.

from datalad_next.iterable_subprocess.iterable_subprocess import iterable_subprocess
from re import finditer
from more_itertools import (
    intersperse,
    side_effect,
)
from queue import Queue


# takes an iterable of bytes, yield items defined by delimiter
# tries to make as few copies as possible
def proc_byte_chunks(iter, delim):
    remainder = b''
    for chunk in iter:
        for item, incomplete in proc_byte_chunk(chunk, delim):
            if incomplete:
                remainder += item
                continue
            elif remainder:
                item = remainder + item
                remainder = b''
            yield item
    if remainder:
        yield remainder


# takes bytes, yields items defined by delimiter,
# plus bool to indicate incomplete items (did not end with delimiter)
def proc_byte_chunk(chunk, delim):
    pos = 0
    for delim in finditer(delim, chunk):
        yield (chunk[pos:delim.start()], False)
        pos = delim.end()
    remainder = chunk[pos:]
    if remainder:
        yield (remainder, True)


# replaces empty items in an iterable with a marker
def none2marker(iter, marker):
    for i in iter:
        yield i if i else marker


# replaced items that end with a marker with `None`
def marker2none(iter, marker):
    for i in iter:
        yield None if i.endswith(marker) else i


# FIFO to buffer filenames
# we buffer them and not the key examination outcomes to minimie memory demands
filenames = Queue()


# helper to put an end marker into the filename queue
# this is used to signal stopiteration to zip() at the very end
def mark_filename_queue_end():
    filenames.put(None)


# fake URL annex key we feed through examinekeys instead of not calling
# it at all -- lazy but fast
magic_marker = b'URL--MAGIC'


with \
        iterable_subprocess(
        # feeder of the pipeline, zero-byte delimited items
            ['git', 'ls-files', '-z'], tuple(),
        ) as glsf, \
        iterable_subprocess(
        # we get the annex key for any filename (or empty if not annexed)
            ['git', 'annex', 'find', '--anything', '--format=\${key}\n', '--batch'],
            # intersperse items with newlines to trigger a batch run
            # this avoids string operations to append newlines to items
            intersperse(
                b'\n',
                # the sideeffect is that any filename items coming from ls-files
                # is put into a FIFO, to be later merged with the key properties
                side_effect(
                    filenames.put,
                    # we parse the raw byte chunks from the subprocess into
                    # zero-byte delimited items
                    proc_byte_chunks(glsf, rb'\0'),
                    # after the last filename, we put an end marker into the queue
                    after=mark_filename_queue_end,
                )
            ),
        ) as gaf, \
        iterable_subprocess(
        # get the key object path (example property, could also be JSON record)
            ['git', 'annex', 'examinekey', '--format=\${objectpath}\n', '--batch'],
            # again intersperse to kick off batches
            intersperse(
                b'\n',
                # feed a fake key that does no harm to the command, when there is none
                none2marker(
                    proc_byte_chunks(gaf, rb'\n'),
                    magic_marker,
                )
            ),
        ) as gek:
    # consume (the enumerate is not needed, just for decoration)
    for r in enumerate(
            # zip filenames from ls-files to key examination outcomes
            # the `None` argument to iter() stops iteration when it
            # gets the end marker from the FIFO
            zip(iter(filenames.get, None),
                # we undo the marker and turn them into `None`
                marker2none(
                    proc_byte_chunks(gek, rb'\n'),
                    magic_marker)
            )
    ):
        print(r)

@mih
Copy link
Member Author

mih commented Nov 17, 2023

And here one more variant of the script. No printing of records, but proper loading on complete JSON-records from git annex examinekeys. The final consumer is a dict(), to simulate how this would be used in some other code.

End-to-end benchmark:

❯ multitime -n5 python ~/hacking/datalad/next/iter_subproc.py
N records=36408
N records=36408
N records=36408
N records=36408
N records=36408
===> multitime results
1: python /home/mih/hacking/datalad/next/iter_subproc.py
            Mean        Std.Dev.    Min         Median      Max
real        3.904       0.052       3.821       3.916       3.978       
user        4.970       0.131       4.754       4.983       5.167       
sys         2.384       0.061       2.273       2.412       2.441       
from datalad_next.iterable_subprocess.iterable_subprocess import iterable_subprocess
from re import finditer
from more_itertools import (
    intersperse,
    side_effect,
)
from queue import Queue
import json


# takes an iterable of bytes, yield items defined by delimiter
# tries to make as few copies as possible
def proc_byte_chunks(iter, delim):
    remainder = b''
    for chunk in iter:
        for item, incomplete in proc_byte_chunk(chunk, delim):
            if incomplete:
                remainder += item
                continue
            elif remainder:
                item = remainder + item
                remainder = b''
            yield item
    if remainder:
        yield remainder


# takes bytes, yields items defined by delimiter,
# plus bool to indicate incomplete items (did not end with delimiter)
def proc_byte_chunk(chunk, delim):
    pos = 0
    for delim in finditer(delim, chunk):
        yield (chunk[pos:delim.start()], False)
        pos = delim.end()
    remainder = chunk[pos:]
    if remainder:
        yield (remainder, True)


# replaces empty items in an iterable with a marker
def none2marker(iter, marker):
    for i in iter:
        yield i if i else marker


# replaced items that end with a marker with `None`
def marker2none(iter, marker):
    for i in iter:
        yield None if i.endswith(marker) else i


# load json-encoded byte strings
def jsonloads(iter):
    for i in iter:
        yield None if i is None else json.loads(i)


# FIFO to buffer filenames
# we buffer them and not the key examination outcomes to minimie memory demands
filenames = Queue()


# helper to put an end marker into the filename queue
# this is used to signal stopiteration to zip() at the very end
def mark_filename_queue_end():
    filenames.put(None)


# fake URL annex key we feed through examinekeys instead of not calling
# it at all -- lazy but fast
magic_marker = b'URL--MAGIC'


with \
        iterable_subprocess(
        # feeder of the pipeline, zero-byte delimited items
            ['git', 'ls-files', '-z'], tuple(),
        ) as glsf, \
        iterable_subprocess(
        # we get the annex key for any filename (or empty if not annexed)
            ['git', 'annex', 'find', '--anything', '--format=\${key}\n', '--batch'],
            # intersperse items with newlines to trigger a batch run
            # this avoids string operations to append newlines to items
            intersperse(
                b'\n',
                # the sideeffect is that any filename items coming from ls-files
                # is put into a FIFO, to be later merged with the key properties
                side_effect(
                    filenames.put,
                    # we parse the raw byte chunks from the subprocess into
                    # zero-byte delimited items
                    proc_byte_chunks(glsf, rb'\0'),
                    # after the last filename, we put an end marker into the queue
                    after=mark_filename_queue_end,
                )
            ),
        ) as gaf, \
        iterable_subprocess(
        # get the key properties JSON-lines style
            ['git', 'annex', 'examinekey', '--json', '--batch'],
            # again intersperse to kick off batches
            intersperse(
                b'\n',
                # feed a fake key that does no harm to the command,
                # when there is none
                none2marker(
                    proc_byte_chunks(gaf, rb'\n'),
                    magic_marker,
                )
            ),
        ) as gek:
    # zip filenames from ls-files to key examination outcomes
    # the `None` argument to iter() stops iteration when it
    # gets the end marker from the FIFO
    lookup = dict(
        zip(iter(filenames.get, None),
            jsonloads(
                # we undo the marker and turn them into `None`
                marker2none(
                    proc_byte_chunks(gek, rb'\n'),
                    magic_marker)))
    )
    print(f'N records={len(lookup)}')

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants