diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy index 8013f97fea..0a503ab920 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy @@ -25,10 +25,12 @@ import java.nio.file.NoSuchFileException import java.nio.file.Path import java.nio.file.PathMatcher import java.util.concurrent.ExecutorService +import java.util.regex.Pattern import groovy.transform.CompileDynamic import groovy.transform.CompileStatic import groovy.transform.EqualsAndHashCode +import groovy.transform.Memoized import groovy.transform.PackageScope import groovy.transform.ToString import groovy.util.logging.Slf4j @@ -38,6 +40,7 @@ import nextflow.Session import nextflow.extension.FilesEx import nextflow.file.FileHelper import nextflow.file.TagAwareFile +import nextflow.fusion.FusionHelper import nextflow.util.PathTrie /** * Implements the {@code publishDir} directory. It create links or copies the output @@ -51,10 +54,14 @@ import nextflow.util.PathTrie @CompileStatic class PublishDir { + final static private Pattern FUSION_PATH_REGEX = ~/^\/fusion\/([^\/]+)\/(.*)/ + enum Mode { SYMLINK, LINK, COPY, MOVE, COPY_NO_FOLLOW, RELLINK } private Map makeCache = new HashMap<>() + private Session session = Global.session as Session + /** * The target path where create the links or copy the output files */ @@ -117,10 +124,18 @@ class PublishDir { private boolean nullPathWarn - private String taskName + private TaskRun task @Lazy - private ExecutorService threadPool = { def sess = Global.session as Session; sess.publishDirExecutorService() }() + private ExecutorService threadPool = { session.publishDirExecutorService() }() + + protected String getTaskName() { + return task?.getName() + } + + protected Map getTaskInputs() { + return task?.getInputFilesMap() + } void setPath( def value ) { final resolved = value instanceof Closure ? value.call() : value @@ -202,7 +217,6 @@ class PublishDir { return result } - @CompileStatic protected void apply0(Set files) { assert path @@ -266,7 +280,6 @@ class PublishDir { * @param files Set of output files * @param task The task whose output need to be published */ - @CompileStatic void apply( Set files, TaskRun task ) { if( !files || !enabled ) @@ -281,13 +294,10 @@ class PublishDir { this.sourceDir = task.targetDir this.sourceFileSystem = sourceDir.fileSystem this.stageInMode = task.config.stageInMode - this.taskName = task.name apply0(files) } - - @CompileStatic protected void apply1(Path source, boolean inProcess ) { def target = sourceDir ? sourceDir.relativize(source) : source.getFileName() @@ -328,7 +338,6 @@ class PublishDir { } - @CompileStatic protected Path resolveDestination(target) { if( target instanceof Path ) { @@ -347,7 +356,6 @@ class PublishDir { throw new IllegalArgumentException("Not a valid publish target path: `$target` [${target?.class?.name}]") } - @CompileStatic protected void safeProcessFile(Path source, Path target) { try { processFile(source, target) @@ -355,15 +363,20 @@ class PublishDir { catch( Throwable e ) { log.warn "Failed to publish file: ${source.toUriString()}; to: ${target.toUriString()} [${mode.toString().toLowerCase()}] -- See log file for details", e if( NF.strictMode || failOnError){ - final session = Global.session as Session session?.abort(e) } } } - @CompileStatic protected void processFile( Path source, Path destination ) { + // resolve Fusion symlink if applicable + if( FusionHelper.isFusionEnabled(session) ) { + final inputs = getTaskInputs() + if( source.name in inputs ) + source = resolveFusionLink(inputs[source.name]) + } + // create target dirs if required makeDirs(destination.parent) @@ -387,6 +400,29 @@ class PublishDir { notifyFilePublish(destination, source) } + /** + * Resolve a Fusion symlink by following the .fusion.symlinks + * file in the task directory until the original file is reached. + * + * @param file + */ + protected Path resolveFusionLink(Path file) { + while( file.name in getFusionLinks(file.parent) ) + file = file.text.replaceFirst(FUSION_PATH_REGEX) { _, scheme, path -> "${scheme}://${path}" } as Path + return file + } + + @Memoized + protected List getFusionLinks(Path workDir) { + try { + final file = workDir.resolve('.fusion.symlinks') + return file.text.tokenize('\n') + } + catch( NoSuchFileException e ) { + return List.of() + } + } + private String real0(Path p) { try { // resolve symlink if it's file in the default (posix) file system @@ -422,8 +458,9 @@ class PublishDir { final s1 = real0(sourceDir) if( t1.startsWith(s1) ) { def msg = "Refusing to publish file since destination path conflicts with the task work directory!" - if( taskName ) - msg += "\n- offending task : $taskName" + def name0 = getTaskName() + if( name0 ) + msg += "\n- offending task : $name0" msg += "\n- offending file : $target" if( t1 != target.toString() ) msg += "\n- real destination: $t1" @@ -438,7 +475,6 @@ class PublishDir { return !mode || mode == Mode.SYMLINK || mode == Mode.RELLINK } - @CompileStatic protected void processFileImpl( Path source, Path destination ) { log.trace "publishing file: $source -[$mode]-> $destination" @@ -466,13 +502,11 @@ class PublishDir { } } - @CompileStatic - private void createPublishDir() { + protected void createPublishDir() { makeDirs(this.path) } - @CompileStatic - private void makeDirs(Path dir) { + protected void makeDirs(Path dir) { if( !dir || makeCache.containsKey(dir) ) return @@ -491,7 +525,6 @@ class PublishDir { * That valid publish mode has been selected * Note: link and symlinks are not allowed across different file system */ - @CompileStatic @PackageScope void validatePublishMode() { if( log.isTraceEnabled() ) @@ -512,11 +545,7 @@ class PublishDir { } protected void notifyFilePublish(Path destination, Path source=null) { - final sess = Global.session - if (sess instanceof Session) { - sess.notifyFilePublish(destination, source) - } + session.notifyFilePublish(destination, source) } - } diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskRun.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskRun.groovy index 205951016e..23672e4a42 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskRun.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskRun.groovy @@ -433,8 +433,8 @@ class TaskRun implements Cloneable { */ Map getInputFilesMap() { - def result = [:] - def allFiles = getInputFiles().values() + final allFiles = getInputFiles().values() + final result = new HashMap(allFiles.size()) for( List entry : allFiles ) { if( entry ) for( FileHolder it : entry ) { result[ it.stageName ] = it.storePath diff --git a/modules/nextflow/src/test/groovy/nextflow/extension/PublishOpTest.groovy b/modules/nextflow/src/test/groovy/nextflow/extension/PublishOpTest.groovy index b968b284c7..e904387db3 100644 --- a/modules/nextflow/src/test/groovy/nextflow/extension/PublishOpTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/extension/PublishOpTest.groovy @@ -42,6 +42,7 @@ class PublishOpTest extends BaseSpec { def BASE = folder def sess = Mock(Session) { getWorkDir() >> BASE + getConfig() >> [:] } Global.session = sess diff --git a/modules/nextflow/src/test/groovy/nextflow/processor/PublishDirTest.groovy b/modules/nextflow/src/test/groovy/nextflow/processor/PublishDirTest.groovy index 3c858344cb..613fead059 100644 --- a/modules/nextflow/src/test/groovy/nextflow/processor/PublishDirTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/processor/PublishDirTest.groovy @@ -20,6 +20,7 @@ import java.nio.file.FileSystems import java.nio.file.Files import java.nio.file.Paths +import nextflow.Global import nextflow.Session import spock.lang.Specification import test.TestHelper @@ -114,8 +115,9 @@ class PublishDirTest extends Specification { } def 'should create symlinks for output files' () { - given: + Global.session = Mock(Session) { getConfig()>>[:] } + and: def folder = Files.createTempDirectory('nxf') folder.resolve('work-dir').mkdir() folder.resolve('work-dir/file1.txt').text = 'aaa' diff --git a/plugins/nf-amazon/src/test/nextflow/processor/PublishDirS3Test.groovy b/plugins/nf-amazon/src/test/nextflow/processor/PublishDirS3Test.groovy index 460de36581..4e92cb97e7 100644 --- a/plugins/nf-amazon/src/test/nextflow/processor/PublishDirS3Test.groovy +++ b/plugins/nf-amazon/src/test/nextflow/processor/PublishDirS3Test.groovy @@ -16,14 +16,14 @@ package nextflow.processor +import java.nio.file.FileSystems import java.nio.file.Files +import nextflow.Global +import nextflow.Session import nextflow.cloud.aws.nio.S3Path -import spock.lang.Specification - -import java.nio.file.FileSystems - import nextflow.file.FileHelper +import spock.lang.Specification /** * @@ -73,4 +73,30 @@ class PublishDirS3Test extends Specification { folder?.deleteDir() } + def 'should resolve fusion symlinks' () { + given: + Global.session = Mock(Session) { + config >> [fusion: [enabled: true]] + } + and: + def prev = FileHelper.asPath('s3://bucket/work/0/foo.txt') + def file = FileHelper.asPath('s3://bucket/work/1/foo.txt') + def taskInputs = ['foo.txt': file] + and: + def targetDir = FileHelper.asPath('s3://bucket/results') + def target = targetDir.resolve('foo.txt') + def publisher = Spy(new PublishDir(path: targetDir)) { getTaskInputs()>>taskInputs } + + when: + publisher.processFile(file, target) + then: + 1 * publisher.resolveFusionLink(file) >> prev + _ * publisher.makeDirs(target.parent) >> _ + 1 * publisher.processFileImpl(prev, target) >> _ + 1 * publisher.notifyFilePublish(target, prev) >> _ + + cleanup: + Global.session = null + } + } diff --git a/tests/checks/fusion-symlink.nf/.checks b/tests/checks/fusion-symlink.nf/.checks new file mode 100644 index 0000000000..a28366d245 --- /dev/null +++ b/tests/checks/fusion-symlink.nf/.checks @@ -0,0 +1,25 @@ +#!/bin/bash + +# Skip test if AWS keys are missing +if [ -z "$AWS_ACCESS_KEY_ID" ]; then + echo "Missing AWS credentials -- Skipping test" + exit 0 +fi + +# +# normal run +# +echo initial run +$NXF_RUN -c .config + +$NXF_CMD fs cp s3://nextflow-ci/work/ci-test/fusion-symlink/data.txt data.txt +cmp data.txt .expected || false + +# +# resume run +# +echo resumed run +$NXF_RUN -c .config -resume + +$NXF_CMD fs cp s3://nextflow-ci/work/ci-test/fusion-symlink/data.txt data.txt +cmp data.txt .expected || false diff --git a/tests/checks/fusion-symlink.nf/.config b/tests/checks/fusion-symlink.nf/.config new file mode 100644 index 0000000000..630af142fa --- /dev/null +++ b/tests/checks/fusion-symlink.nf/.config @@ -0,0 +1,4 @@ +workDir = 's3://nextflow-ci/work' +fusion.enabled = true +fusion.exportStorageCredentials = true +wave.enabled = true \ No newline at end of file diff --git a/tests/checks/fusion-symlink.nf/.expected b/tests/checks/fusion-symlink.nf/.expected new file mode 100644 index 0000000000..e427984d4a --- /dev/null +++ b/tests/checks/fusion-symlink.nf/.expected @@ -0,0 +1 @@ +HELLO diff --git a/tests/fusion-symlink.nf b/tests/fusion-symlink.nf new file mode 100644 index 0000000000..30d0cee56d --- /dev/null +++ b/tests/fusion-symlink.nf @@ -0,0 +1,61 @@ +#!/usr/bin/env nextflow +/* + * Copyright 2013-2023, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +process CREATE { + + output: + path "data.txt" + + script: + """ + echo HELLO > data.txt + """ +} + +process FORWARD { + + input: + path "data.txt" + + output: + path "data.txt" + + script: + """ + echo AND + """ +} + +process PUBLISH { + publishDir "s3://nextflow-ci/work/ci-test/fusion-symlink" + + input: + path "data.txt" + + output: + path "data.txt" + + script: + """ + echo BYE + """ +} + +workflow { + CREATE | FORWARD | PUBLISH +}