Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Offload publishing to separate jobs #5590

Closed
bentsherman opened this issue Dec 9, 2024 · 6 comments
Closed

Offload publishing to separate jobs #5590

bentsherman opened this issue Dec 9, 2024 · 6 comments
Assignees

Comments

@bentsherman
Copy link
Member

The publishing of workflow outputs is performed natively by Nextflow with a thread pool in PublishDir. This logic is used both for the publishDir directive and the workflow output definition.

The publish throughput is limited by the thread pool size, which by default is cpus * 3. This limit can be increased, but even then, Nextflow can be limited by the network bandwidth and API limits for the head node. For example, an EC2 instance will have a limited amount of network bandwidth and API limits against S3 which will limit the number of concurrent API calls that it can effectively perform, even if it is only doing an S3-to-S3 copy.

The goal of this issue is to offload publishing operations to separate jobs, in order to increase the total amount of network bandwidth and API calls available to the pipeline.

  • Try to reuse the existing logic for submitting compute jobs as much as possible

  • As a first iteration, just use Fusion and don't worry about mounting the various CLI tools for object storage providers

  • As a first iteration, focus on AWS / Azure / Google, especially AWS. This feature is intended mainly for the cloud executors.

  • Try to batch publish operations, for now we can expose a config setting for the batch size and tune it manually

  • Failed publish jobs should be retried in a similar manner to PublishDir

Out of scope (possible future goals):

  • Supporting without Fusion. In the future we could use Wave to provide the cloud CLI tools as we do for Fargate.

  • Offloading the staging of remote inputs (i.e. FilePorter). The FilePorter typically isn't needed when using Fusion, so probably would only be useful if we relax the Fusion requirement

cc @pditommaso for comments
@jorgee let me know if you need any further clarifications

@pditommaso
Copy link
Member

pditommaso commented Dec 10, 2024

I'd suggest to focus on POC for S3 only.

The copy should be done by adding behind the scene a regular nextflow process that will take care of copying files across buckets.

When a file needs to be copied from a S3 path to another S3 pat, the file path is sent to the "copy" process and the copyPath is skipped.

The process could be defined as

process copyTask {
  container 'public.cr.seqera.io/wave/s5cmd:v2.2.2'
  input: 
  tuple path(source), path(target)
  '''
  s5cmd cp $source $target
  '''
}

In a second iteration, we can see how to use fusion in place of s5cmd, and batching multiple copies in the same copy task run.

@bentsherman
Copy link
Member Author

Here is the code in nf-boost for implementing a process on-the-fly: https://github.com/bentsherman/nf-boost/blob/main/plugins/nf-boost/src/main/nextflow/boost/ops/ExecOp.groovy

It might be overkill because it will modify the DAG. But it's a good summary of how a process is constructed end-to-end

Instead I would start by copying the bits that you need from TaskProcessor into a separate thing. I can show you how to construct the args for that method to meet your needs

final protected void invokeTask( Object[] args ) {

We'll also need to handle the task finalization. The task monitor normally calls finalizeTask(), but I think if we refactor invokeTask() as a generic function that returns a Future, it should be easier to support both the regular process definition and the publish jobs

@jorgee
Copy link
Contributor

jorgee commented Dec 12, 2024

Following the example provided by @bentsherman, I have been able to do a initial PoC with minimal changes. I have programmatically created the TaskProcessor and invoke a task when a copy is requested. I have currently tested locally (forcing to copy files with mode:'copy') and doing the cp in a task instead of calling FileHelper.copyPath. The new process appear in the list but not in the DAG. We can comment in next meeting to check if I miss something important before doing a first PR. In the next step I am going to modify it to support the s3 with s5cmd as @pditommaso suggested

@jorgee
Copy link
Contributor

jorgee commented Dec 16, 2024

Some issues have appear when running the PoC.

  1. I had to modify the inputs to 'tuple val(source) val(target) if not they were managed as files a symlink/copy was created.
  2. I have modified the container to include the aws client. I is needed to upload .command.err .command.out .exitcode files to the working bucket
  3. I have manually added write permissions to the InstanceRole of the compute environment to allow the copy to the publication bucket directory . I was using a compute environment created from the platform. The generated instance role only has write permissions for the working dir but not others.

@pditommaso
Copy link
Member

i'd suggest to make it workable also with the local execution for simplify dev/testings

@jorgee jorgee linked a pull request Dec 17, 2024 that will close this issue
@ewels
Copy link
Member

ewels commented Jan 22, 2025

Closing, we will focus on #4741 for now and see how far we can get with improving native copy capabilities.

@ewels ewels closed this as not planned Won't fix, can't repro, duplicate, stale Jan 22, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants