Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrent execution of workflow steps #544

Merged
merged 159 commits into from
Dec 11, 2023
Merged

Conversation

calum-chamberlain
Copy link
Member

@calum-chamberlain calum-chamberlain commented Apr 12, 2023

What does this PR do?

This PR implements concurrent processing of the main steps in the .detect workflow. Alongside this there are several other major changes including moving all the matched-filter components out of core.matched_filter.matched_filter to focus on the tribe.detect method. The matched_filter function has been retained, but it simply creates a Tribe internally and runs the tribe.detect method.

Fundamentally this PR breaks the workflow down into the following steps:
0. Downloading data (if running .client_detect, which I recommend)

  1. Processing data (which is now possible in parallel from a Process thanks to Convert preprocessing functions to multithreaded with GIL-released #540 )
  2. Prepare data for correlation
  3. Correlate and detect peaks
  4. Convert peaks to Detections
  5. Convert detections to Party
  6. Collect all parties into a final Party

Each step as listed runs in it's own process, except steps 0 and 1, which run together when using .client_detect, and 0 is not run when using .detect. Communication between steps is via multiprocessing Queues. Steps will wait until data are available in their incoming queue, then work on those data and put them into their output queue which forms the input queue for the following step. Steps loop until they get None from the input queue, used to signify the end of processing.

.detect supports passing a queue as input, or a Stream. This means that if users do not want to use a client to get their data, they can provide a simple queue that might look something like:

import glob

from typing import Iterable
from multiprocessing import Queue, Process

from obspy import read

from eqcorrscan import Tribe


def reader(files: Iterable, output_queue: Queue):
    for file in files:
        output_queue.put(read(file))
    return

def main():
    tribe = Tribe().read("Some_tribe.tgz")
    files = glob.glob("somewhere-with-some-waveforms")
    files.sort()
    stream_queue = Queue(maxsize=1)
    stream_getter = Process(target=reader, kwargs={"files": files, "output_queue": stream_queue})

    stream_getter.start()
    party = tribe.detect(st=stream_getter, ...)

As currently implemented, only step 3, correlate and detect peaks, runs in the MainProcess. This enables this step to make use of multiprocessing if needed, and provides more safety for openmp threading in underlying C-functions. I attempted to move peak finding into its own Process, but found that putting the large arrays of cross-correlation sums into the Queue was prohibitively slow, and there is a strong risk of exceeding queue size. I also tried saving these and reading them back in in another process, but again, saving them to disk was too slow.

Minor changes:

  • _spike_test is now threaded for speed.
  • Detection objects provide a more helpful error when the correlation value exceeds the number of channels. I have only ever seen this happen for fmf correlations.
  • Party additional has been sped up
  • multi_find_peaks now uses multithreading rather than openmp C threading. I found this was faster and safer on my machine (see pytest fixtures in correlate_test alter omp threads #543).
  • Many minor structural changes to the correlation functions to enable pre-preparation of data into the form required for correlation functions.

Why was it initiated? Any relevant Issues?

Concurrent processing should enable us to more heavily load systems, which should please HPC admins... This PR speeds up EQcorrscan for large datasets, and should enable much better GPU utilisation when using the FMF backend.

PR Checklist

  • develop base branch selected?
  • This PR is not directly related to an existing issue (which has no PR yet).
  • All tests still pass.
  • Any new features or fixed regressions are be covered via new tests.
  • Any new or changed features have are fully documented.
  • Significant changes have been added to CHANGES.md.
    - [ ] First time contributors have added your name to CONTRIBUTORS.md.

TODO:

  • Benchmarks - compare to develop and master
  • Tutorial, particularly on making use of clients - highlight obsplus for local client emulation
  • Fix any bugs that crop up in more major testing on large scale, long-duration datasets (currently applying to 11k templates over 10 years).
  • Clean code:
    • Standardise queue naming (input_xx_queue, output_yy_queue)
    • docstrings
      - [ ] type-hints
    • clean logic and make sure comments are appropriate
    • Clean out stream pickle files once done (.streams/xxxx.pkl), but do not remove the .streams directory.
  • Meet coverage requirements.

@calum-chamberlain calum-chamberlain merged commit 7231e1a into develop Dec 11, 2023
16 of 18 checks passed
@calum-chamberlain calum-chamberlain deleted the parallel-steps branch December 11, 2023 03:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant