From 815709cf7178c8f5ca28cbaaa7df35548559f60e Mon Sep 17 00:00:00 2001 From: Ben Sherman Date: Tue, 26 Sep 2023 14:58:20 -0500 Subject: [PATCH 01/17] Resolve Fusion symlinks when publishing files Signed-off-by: Ben Sherman --- .../src/main/groovy/nextflow/processor/PublishDir.groovy | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy index 8013f97fea..021e1d271f 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy @@ -38,6 +38,7 @@ import nextflow.Session import nextflow.extension.FilesEx import nextflow.file.FileHelper import nextflow.file.TagAwareFile +import nextflow.fusion.FusionHelper import nextflow.util.PathTrie /** * Implements the {@code publishDir} directory. It create links or copies the output @@ -283,6 +284,12 @@ class PublishDir { this.stageInMode = task.config.stageInMode this.taskName = task.name + // resolve Fusion symlinks + if( FusionHelper.isFusionEnabled(Global.session as Session) ) { + final inputFiles = task.getInputFilesMap() + files = files.collect { inputFiles.getOrDefault(it.name, it) } as Set + } + apply0(files) } From 945c01fb7a756a661d9ed9d3dee5ed5f17b30643 Mon Sep 17 00:00:00 2001 From: Ben Sherman Date: Fri, 29 Sep 2023 14:51:26 -0500 Subject: [PATCH 02/17] Use fusion symlinks file to resolve links Signed-off-by: Ben Sherman --- .../nextflow/processor/PublishDir.groovy | 31 ++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy index 021e1d271f..b7bb5f0f1e 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy @@ -29,6 +29,7 @@ import java.util.concurrent.ExecutorService import groovy.transform.CompileDynamic import groovy.transform.CompileStatic import groovy.transform.EqualsAndHashCode +import groovy.transform.Memoized import groovy.transform.PackageScope import groovy.transform.ToString import groovy.util.logging.Slf4j @@ -287,12 +288,40 @@ class PublishDir { // resolve Fusion symlinks if( FusionHelper.isFusionEnabled(Global.session as Session) ) { final inputFiles = task.getInputFilesMap() - files = files.collect { inputFiles.getOrDefault(it.name, it) } as Set + files = files.collect { file -> + file.name in inputFiles + ? resolveFusionLink(inputFiles[file.name]) + : file + } as Set } apply0(files) } + /** + * Resolve a Fusion symlink by following the .fusion.symlinks + * file in the task directory until the original file is reached. + * + * @param file + */ + @CompileStatic + protected Path resolveFusionLink(Path file) { + while( file.name in getFusionLinks(file.parent) ) + file = file.text.replace('/fusion/s3/', 's3://') as Path + return file + } + + @CompileStatic + @Memoized + protected List getFusionLinks(Path workDir) { + try { + final file = workDir.resolve('.fusion.symlinks') + return file.text.tokenize('\n') + } + catch( NoSuchFileException ) { + return [] + } + } @CompileStatic protected void apply1(Path source, boolean inProcess ) { From e905c61bcc7cf84c6edfd8fd55aa1b9154dad862 Mon Sep 17 00:00:00 2001 From: Ben Sherman Date: Fri, 29 Sep 2023 15:03:08 -0500 Subject: [PATCH 03/17] Fix publish target resolution Signed-off-by: Ben Sherman --- .../nextflow/processor/PublishDir.groovy | 67 +++++++++---------- 1 file changed, 33 insertions(+), 34 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy index b7bb5f0f1e..05f34a8773 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy @@ -121,6 +121,8 @@ class PublishDir { private String taskName + private Map taskInputs + @Lazy private ExecutorService threadPool = { def sess = Global.session as Session; sess.publishDirExecutorService() }() @@ -284,44 +286,11 @@ class PublishDir { this.sourceFileSystem = sourceDir.fileSystem this.stageInMode = task.config.stageInMode this.taskName = task.name - - // resolve Fusion symlinks - if( FusionHelper.isFusionEnabled(Global.session as Session) ) { - final inputFiles = task.getInputFilesMap() - files = files.collect { file -> - file.name in inputFiles - ? resolveFusionLink(inputFiles[file.name]) - : file - } as Set - } + this.taskInputs = task.getInputFilesMap() apply0(files) } - /** - * Resolve a Fusion symlink by following the .fusion.symlinks - * file in the task directory until the original file is reached. - * - * @param file - */ - @CompileStatic - protected Path resolveFusionLink(Path file) { - while( file.name in getFusionLinks(file.parent) ) - file = file.text.replace('/fusion/s3/', 's3://') as Path - return file - } - - @CompileStatic - @Memoized - protected List getFusionLinks(Path workDir) { - try { - final file = workDir.resolve('.fusion.symlinks') - return file.text.tokenize('\n') - } - catch( NoSuchFileException ) { - return [] - } - } @CompileStatic protected void apply1(Path source, boolean inProcess ) { @@ -400,6 +369,11 @@ class PublishDir { @CompileStatic protected void processFile( Path source, Path destination ) { + // resolve Fusion symlink if applicable + if( FusionHelper.isFusionEnabled(Global.session as Session) ) + if( source.name in taskInputs ) + source = resolveFusionLink(taskInputs[source.name]) + // create target dirs if required makeDirs(destination.parent) @@ -423,6 +397,31 @@ class PublishDir { notifyFilePublish(destination, source) } + /** + * Resolve a Fusion symlink by following the .fusion.symlinks + * file in the task directory until the original file is reached. + * + * @param file + */ + @CompileStatic + protected Path resolveFusionLink(Path file) { + while( file.name in getFusionLinks(file.parent) ) + file = file.text.replace('/fusion/s3/', 's3://') as Path + return file + } + + @CompileStatic + @Memoized + protected List getFusionLinks(Path workDir) { + try { + final file = workDir.resolve('.fusion.symlinks') + return file.text.tokenize('\n') + } + catch( NoSuchFileException ) { + return [] + } + } + private String real0(Path p) { try { // resolve symlink if it's file in the default (posix) file system From 877064cd672a1a09be6e206fefbf1a363064af58 Mon Sep 17 00:00:00 2001 From: Ben Sherman Date: Thu, 12 Oct 2023 12:55:33 -0500 Subject: [PATCH 04/17] Update modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy Co-authored-by: Robert Syme Signed-off-by: Ben Sherman --- .../src/main/groovy/nextflow/processor/PublishDir.groovy | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy index 05f34a8773..3fab6f7c1f 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy @@ -405,8 +405,10 @@ class PublishDir { */ @CompileStatic protected Path resolveFusionLink(Path file) { - while( file.name in getFusionLinks(file.parent) ) - file = file.text.replace('/fusion/s3/', 's3://') as Path + while( file.name in getFusionLinks(file.parent) ) { + def pattern = ~/^\/fusion\/([^\/]+)\/(.*)/ + file = file.text.replaceFirst(pattern) { _, scheme, path -> "${scheme}://${path}" } as Path + } return file } From 302e57b971d4271710d6d03369cfd0c9a77df5fc Mon Sep 17 00:00:00 2001 From: Ben Sherman Date: Thu, 12 Oct 2023 13:37:55 -0500 Subject: [PATCH 05/17] minor edits Signed-off-by: Ben Sherman --- .../src/main/groovy/nextflow/processor/PublishDir.groovy | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy index 3fab6f7c1f..5756f4e8f9 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy @@ -405,10 +405,9 @@ class PublishDir { */ @CompileStatic protected Path resolveFusionLink(Path file) { - while( file.name in getFusionLinks(file.parent) ) { - def pattern = ~/^\/fusion\/([^\/]+)\/(.*)/ + final pattern = ~/^\/fusion\/([^\/]+)\/(.*)/ + while( file.name in getFusionLinks(file.parent) ) file = file.text.replaceFirst(pattern) { _, scheme, path -> "${scheme}://${path}" } as Path - } return file } From f2e2b7567b58e447d0bf023027b4fce876f5bdae Mon Sep 17 00:00:00 2001 From: Ben Sherman Date: Sat, 28 Oct 2023 14:58:22 -0500 Subject: [PATCH 06/17] Fix failing test Signed-off-by: Ben Sherman --- .../src/test/groovy/nextflow/extension/PublishOpTest.groovy | 1 + 1 file changed, 1 insertion(+) diff --git a/modules/nextflow/src/test/groovy/nextflow/extension/PublishOpTest.groovy b/modules/nextflow/src/test/groovy/nextflow/extension/PublishOpTest.groovy index b968b284c7..e904387db3 100644 --- a/modules/nextflow/src/test/groovy/nextflow/extension/PublishOpTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/extension/PublishOpTest.groovy @@ -42,6 +42,7 @@ class PublishOpTest extends BaseSpec { def BASE = folder def sess = Mock(Session) { getWorkDir() >> BASE + getConfig() >> [:] } Global.session = sess From dd154a0e65ba55797cae2ec1eca4d591cd69a6e3 Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Mon, 30 Oct 2023 21:48:04 +0100 Subject: [PATCH 07/17] Minor change [ci fast] Signed-off-by: Paolo Di Tommaso --- .../src/main/groovy/nextflow/processor/PublishDir.groovy | 4 ++-- .../src/main/groovy/nextflow/processor/TaskRun.groovy | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy index 5756f4e8f9..c7aa29a518 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy @@ -418,8 +418,8 @@ class PublishDir { final file = workDir.resolve('.fusion.symlinks') return file.text.tokenize('\n') } - catch( NoSuchFileException ) { - return [] + catch( NoSuchFileException e ) { + return List.of() } } diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskRun.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskRun.groovy index 205951016e..23672e4a42 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskRun.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskRun.groovy @@ -433,8 +433,8 @@ class TaskRun implements Cloneable { */ Map getInputFilesMap() { - def result = [:] - def allFiles = getInputFiles().values() + final allFiles = getInputFiles().values() + final result = new HashMap(allFiles.size()) for( List entry : allFiles ) { if( entry ) for( FileHolder it : entry ) { result[ it.stageName ] = it.storePath From 250ea5032187673bd0ed00a49bbb976a7b583788 Mon Sep 17 00:00:00 2001 From: Ben Sherman Date: Wed, 1 Nov 2023 14:02:12 -0500 Subject: [PATCH 08/17] Add unit test Signed-off-by: Ben Sherman --- .../nextflow/processor/PublishDir.groovy | 4 +-- .../processor/PublishDirS3Test.groovy | 33 ++++++++++++++++--- 2 files changed, 31 insertions(+), 6 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy index c7aa29a518..8678de94be 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy @@ -503,12 +503,12 @@ class PublishDir { } @CompileStatic - private void createPublishDir() { + protected void createPublishDir() { makeDirs(this.path) } @CompileStatic - private void makeDirs(Path dir) { + protected void makeDirs(Path dir) { if( !dir || makeCache.containsKey(dir) ) return diff --git a/plugins/nf-amazon/src/test/nextflow/processor/PublishDirS3Test.groovy b/plugins/nf-amazon/src/test/nextflow/processor/PublishDirS3Test.groovy index 460de36581..2faf40b718 100644 --- a/plugins/nf-amazon/src/test/nextflow/processor/PublishDirS3Test.groovy +++ b/plugins/nf-amazon/src/test/nextflow/processor/PublishDirS3Test.groovy @@ -16,14 +16,14 @@ package nextflow.processor +import java.nio.file.FileSystems import java.nio.file.Files +import nextflow.Global +import nextflow.Session import nextflow.cloud.aws.nio.S3Path -import spock.lang.Specification - -import java.nio.file.FileSystems - import nextflow.file.FileHelper +import spock.lang.Specification /** * @@ -73,4 +73,29 @@ class PublishDirS3Test extends Specification { folder?.deleteDir() } + def 'should resolve fusion symlinks' () { + given: + Global.session = Mock(Session) { + config >> [fusion: [enabled: true]] + } + def prev = FileHelper.asPath('s3://bucket/work/0/foo.txt') + def file = FileHelper.asPath('s3://bucket/work/1/foo.txt') + def taskInputs = ['foo.txt': file] + and: + def targetDir = FileHelper.asPath('s3://bucket/results') + def target = targetDir.resolve('foo.txt') + def publisher = Spy(new PublishDir(path: targetDir, taskInputs: taskInputs)) + + when: + publisher.processFile(file, target) + then: + 1 * publisher.resolveFusionLink(file) >> prev + _ * publisher.makeDirs(target.parent) >> _ + 1 * publisher.processFileImpl(prev, target) >> _ + 1 * publisher.notifyFilePublish(target, prev) >> _ + + cleanup: + Global.session = null + } + } From 355f86cde52b5ea84d13d6cb844bfdeec9f09e18 Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Mon, 6 Nov 2023 23:38:42 +0100 Subject: [PATCH 09/17] Make taskInputs lazy Signed-off-by: Paolo Di Tommaso --- .../nextflow/processor/PublishDir.groovy | 37 +++++++++++-------- .../nextflow/processor/PublishDirTest.groovy | 4 +- .../processor/PublishDirS3Test.groovy | 3 +- 3 files changed, 26 insertions(+), 18 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy index 8678de94be..44d8c325a4 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy @@ -57,6 +57,8 @@ class PublishDir { private Map makeCache = new HashMap<>() + private Session session = Global.session as Session + /** * The target path where create the links or copy the output files */ @@ -119,12 +121,18 @@ class PublishDir { private boolean nullPathWarn - private String taskName - - private Map taskInputs + private TaskRun task @Lazy - private ExecutorService threadPool = { def sess = Global.session as Session; sess.publishDirExecutorService() }() + private ExecutorService threadPool = { session.publishDirExecutorService() }() + + protected String getTaskName() { + return task?.getName() + } + + protected Map getTaskInputs() { + return task?.getInputFilesMap() + } void setPath( def value ) { final resolved = value instanceof Closure ? value.call() : value @@ -285,8 +293,6 @@ class PublishDir { this.sourceDir = task.targetDir this.sourceFileSystem = sourceDir.fileSystem this.stageInMode = task.config.stageInMode - this.taskName = task.name - this.taskInputs = task.getInputFilesMap() apply0(files) } @@ -360,7 +366,6 @@ class PublishDir { catch( Throwable e ) { log.warn "Failed to publish file: ${source.toUriString()}; to: ${target.toUriString()} [${mode.toString().toLowerCase()}] -- See log file for details", e if( NF.strictMode || failOnError){ - final session = Global.session as Session session?.abort(e) } } @@ -370,9 +375,11 @@ class PublishDir { protected void processFile( Path source, Path destination ) { // resolve Fusion symlink if applicable - if( FusionHelper.isFusionEnabled(Global.session as Session) ) - if( source.name in taskInputs ) - source = resolveFusionLink(taskInputs[source.name]) + if( FusionHelper.isFusionEnabled(session) ) { + final inputs = getTaskInputs() + if( source.name in inputs ) + source = resolveFusionLink(inputs[source.name]) + } // create target dirs if required makeDirs(destination.parent) @@ -458,8 +465,9 @@ class PublishDir { final s1 = real0(sourceDir) if( t1.startsWith(s1) ) { def msg = "Refusing to publish file since destination path conflicts with the task work directory!" - if( taskName ) - msg += "\n- offending task : $taskName" + def name0 = getTaskName() + if( name0 ) + msg += "\n- offending task : $name0" msg += "\n- offending file : $target" if( t1 != target.toString() ) msg += "\n- real destination: $t1" @@ -548,10 +556,7 @@ class PublishDir { } protected void notifyFilePublish(Path destination, Path source=null) { - final sess = Global.session - if (sess instanceof Session) { - sess.notifyFilePublish(destination, source) - } + session.notifyFilePublish(destination, source) } diff --git a/modules/nextflow/src/test/groovy/nextflow/processor/PublishDirTest.groovy b/modules/nextflow/src/test/groovy/nextflow/processor/PublishDirTest.groovy index 3c858344cb..613fead059 100644 --- a/modules/nextflow/src/test/groovy/nextflow/processor/PublishDirTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/processor/PublishDirTest.groovy @@ -20,6 +20,7 @@ import java.nio.file.FileSystems import java.nio.file.Files import java.nio.file.Paths +import nextflow.Global import nextflow.Session import spock.lang.Specification import test.TestHelper @@ -114,8 +115,9 @@ class PublishDirTest extends Specification { } def 'should create symlinks for output files' () { - given: + Global.session = Mock(Session) { getConfig()>>[:] } + and: def folder = Files.createTempDirectory('nxf') folder.resolve('work-dir').mkdir() folder.resolve('work-dir/file1.txt').text = 'aaa' diff --git a/plugins/nf-amazon/src/test/nextflow/processor/PublishDirS3Test.groovy b/plugins/nf-amazon/src/test/nextflow/processor/PublishDirS3Test.groovy index 2faf40b718..4e92cb97e7 100644 --- a/plugins/nf-amazon/src/test/nextflow/processor/PublishDirS3Test.groovy +++ b/plugins/nf-amazon/src/test/nextflow/processor/PublishDirS3Test.groovy @@ -78,13 +78,14 @@ class PublishDirS3Test extends Specification { Global.session = Mock(Session) { config >> [fusion: [enabled: true]] } + and: def prev = FileHelper.asPath('s3://bucket/work/0/foo.txt') def file = FileHelper.asPath('s3://bucket/work/1/foo.txt') def taskInputs = ['foo.txt': file] and: def targetDir = FileHelper.asPath('s3://bucket/results') def target = targetDir.resolve('foo.txt') - def publisher = Spy(new PublishDir(path: targetDir, taskInputs: taskInputs)) + def publisher = Spy(new PublishDir(path: targetDir)) { getTaskInputs()>>taskInputs } when: publisher.processFile(file, target) From b57667642afac31b761863654a060bee53ea77e8 Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Mon, 6 Nov 2023 23:42:24 +0100 Subject: [PATCH 10/17] Remove unneeded @CompileStatic [ci fast] Signed-off-by: Paolo Di Tommaso --- .../groovy/nextflow/processor/PublishDir.groovy | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy index 44d8c325a4..84dd84ed81 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy @@ -214,7 +214,6 @@ class PublishDir { return result } - @CompileStatic protected void apply0(Set files) { assert path @@ -278,7 +277,6 @@ class PublishDir { * @param files Set of output files * @param task The task whose output need to be published */ - @CompileStatic void apply( Set files, TaskRun task ) { if( !files || !enabled ) @@ -297,8 +295,6 @@ class PublishDir { apply0(files) } - - @CompileStatic protected void apply1(Path source, boolean inProcess ) { def target = sourceDir ? sourceDir.relativize(source) : source.getFileName() @@ -339,7 +335,6 @@ class PublishDir { } - @CompileStatic protected Path resolveDestination(target) { if( target instanceof Path ) { @@ -358,7 +353,6 @@ class PublishDir { throw new IllegalArgumentException("Not a valid publish target path: `$target` [${target?.class?.name}]") } - @CompileStatic protected void safeProcessFile(Path source, Path target) { try { processFile(source, target) @@ -371,7 +365,6 @@ class PublishDir { } } - @CompileStatic protected void processFile( Path source, Path destination ) { // resolve Fusion symlink if applicable @@ -410,7 +403,6 @@ class PublishDir { * * @param file */ - @CompileStatic protected Path resolveFusionLink(Path file) { final pattern = ~/^\/fusion\/([^\/]+)\/(.*)/ while( file.name in getFusionLinks(file.parent) ) @@ -418,7 +410,6 @@ class PublishDir { return file } - @CompileStatic @Memoized protected List getFusionLinks(Path workDir) { try { @@ -482,7 +473,6 @@ class PublishDir { return !mode || mode == Mode.SYMLINK || mode == Mode.RELLINK } - @CompileStatic protected void processFileImpl( Path source, Path destination ) { log.trace "publishing file: $source -[$mode]-> $destination" @@ -510,12 +500,10 @@ class PublishDir { } } - @CompileStatic protected void createPublishDir() { makeDirs(this.path) } - @CompileStatic protected void makeDirs(Path dir) { if( !dir || makeCache.containsKey(dir) ) return @@ -535,7 +523,6 @@ class PublishDir { * That valid publish mode has been selected * Note: link and symlinks are not allowed across different file system */ - @CompileStatic @PackageScope void validatePublishMode() { if( log.isTraceEnabled() ) From 0d853ea5cda3b9c5538cccbb6cf2f687082b25a8 Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Mon, 6 Nov 2023 23:44:35 +0100 Subject: [PATCH 11/17] Compile Fusion regex ahead [ci fast] Signed-off-by: Paolo Di Tommaso --- .../src/main/groovy/nextflow/processor/PublishDir.groovy | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy index 84dd84ed81..0a503ab920 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy @@ -25,6 +25,7 @@ import java.nio.file.NoSuchFileException import java.nio.file.Path import java.nio.file.PathMatcher import java.util.concurrent.ExecutorService +import java.util.regex.Pattern import groovy.transform.CompileDynamic import groovy.transform.CompileStatic @@ -53,6 +54,8 @@ import nextflow.util.PathTrie @CompileStatic class PublishDir { + final static private Pattern FUSION_PATH_REGEX = ~/^\/fusion\/([^\/]+)\/(.*)/ + enum Mode { SYMLINK, LINK, COPY, MOVE, COPY_NO_FOLLOW, RELLINK } private Map makeCache = new HashMap<>() @@ -404,9 +407,8 @@ class PublishDir { * @param file */ protected Path resolveFusionLink(Path file) { - final pattern = ~/^\/fusion\/([^\/]+)\/(.*)/ while( file.name in getFusionLinks(file.parent) ) - file = file.text.replaceFirst(pattern) { _, scheme, path -> "${scheme}://${path}" } as Path + file = file.text.replaceFirst(FUSION_PATH_REGEX) { _, scheme, path -> "${scheme}://${path}" } as Path return file } @@ -546,5 +548,4 @@ class PublishDir { session.notifyFilePublish(destination, source) } - } From 09072069f6a76936076fc0fe117abb67480fe8ea Mon Sep 17 00:00:00 2001 From: Ben Sherman Date: Wed, 8 Nov 2023 10:28:14 -0600 Subject: [PATCH 12/17] Add integration test Signed-off-by: Ben Sherman --- tests/checks/fusion-symlink.nf/.checks | 19 ++++++++ tests/checks/fusion-symlink.nf/.expected | 1 + tests/fusion-symlink.nf | 61 ++++++++++++++++++++++++ 3 files changed, 81 insertions(+) create mode 100644 tests/checks/fusion-symlink.nf/.checks create mode 100644 tests/checks/fusion-symlink.nf/.expected create mode 100644 tests/fusion-symlink.nf diff --git a/tests/checks/fusion-symlink.nf/.checks b/tests/checks/fusion-symlink.nf/.checks new file mode 100644 index 0000000000..5646d885a5 --- /dev/null +++ b/tests/checks/fusion-symlink.nf/.checks @@ -0,0 +1,19 @@ +#!/bin/bash + +# +# normal run +# +echo initial run +$NXF_RUN -with-fusion + +nextflow fs cp s3://nextflow-ci/work/ci-test/fusion-symlink/data.txt data.txt +cmp data.txt .expected || false + +# +# resume run +# +echo resumed run +$NXF_RUN -with-fusion -resume + +nextflow fs cp s3://nextflow-ci/work/ci-test/fusion-symlink/data.txt data.txt +cmp data.txt .expected || false diff --git a/tests/checks/fusion-symlink.nf/.expected b/tests/checks/fusion-symlink.nf/.expected new file mode 100644 index 0000000000..e427984d4a --- /dev/null +++ b/tests/checks/fusion-symlink.nf/.expected @@ -0,0 +1 @@ +HELLO diff --git a/tests/fusion-symlink.nf b/tests/fusion-symlink.nf new file mode 100644 index 0000000000..30d0cee56d --- /dev/null +++ b/tests/fusion-symlink.nf @@ -0,0 +1,61 @@ +#!/usr/bin/env nextflow +/* + * Copyright 2013-2023, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +process CREATE { + + output: + path "data.txt" + + script: + """ + echo HELLO > data.txt + """ +} + +process FORWARD { + + input: + path "data.txt" + + output: + path "data.txt" + + script: + """ + echo AND + """ +} + +process PUBLISH { + publishDir "s3://nextflow-ci/work/ci-test/fusion-symlink" + + input: + path "data.txt" + + output: + path "data.txt" + + script: + """ + echo BYE + """ +} + +workflow { + CREATE | FORWARD | PUBLISH +} From 89a5618ff271d3ff24365dca5d6d87664cd78713 Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Fri, 10 Nov 2023 18:36:26 +0100 Subject: [PATCH 13/17] Fix failing test Signed-off-by: Paolo Di Tommaso --- tests/checks/fusion-symlink.nf/.checks | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/checks/fusion-symlink.nf/.checks b/tests/checks/fusion-symlink.nf/.checks index 5646d885a5..c37b73c536 100644 --- a/tests/checks/fusion-symlink.nf/.checks +++ b/tests/checks/fusion-symlink.nf/.checks @@ -4,7 +4,7 @@ # normal run # echo initial run -$NXF_RUN -with-fusion +$NXF_RUN -with-fusion -with-wave nextflow fs cp s3://nextflow-ci/work/ci-test/fusion-symlink/data.txt data.txt cmp data.txt .expected || false @@ -13,7 +13,7 @@ cmp data.txt .expected || false # resume run # echo resumed run -$NXF_RUN -with-fusion -resume +$NXF_RUN -with-fusion -with-wave -resume nextflow fs cp s3://nextflow-ci/work/ci-test/fusion-symlink/data.txt data.txt cmp data.txt .expected || false From 816216614dd90b83a0261192729fccf85fc55e76 Mon Sep 17 00:00:00 2001 From: Ben Sherman Date: Sat, 11 Nov 2023 08:18:17 -0600 Subject: [PATCH 14/17] Fix failing test Signed-off-by: Ben Sherman --- tests/checks/fusion-symlink.nf/.checks | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/checks/fusion-symlink.nf/.checks b/tests/checks/fusion-symlink.nf/.checks index c37b73c536..9e820bdbca 100644 --- a/tests/checks/fusion-symlink.nf/.checks +++ b/tests/checks/fusion-symlink.nf/.checks @@ -4,7 +4,7 @@ # normal run # echo initial run -$NXF_RUN -with-fusion -with-wave +$NXF_RUN -with-fusion -with-wave -w s3://nextflow-ci/work nextflow fs cp s3://nextflow-ci/work/ci-test/fusion-symlink/data.txt data.txt cmp data.txt .expected || false @@ -13,7 +13,7 @@ cmp data.txt .expected || false # resume run # echo resumed run -$NXF_RUN -with-fusion -with-wave -resume +$NXF_RUN -with-fusion -with-wave -w s3://nextflow-ci/work -resume nextflow fs cp s3://nextflow-ci/work/ci-test/fusion-symlink/data.txt data.txt cmp data.txt .expected || false From 89dde036e101e349ec30892ff0487c6ccb7c440c Mon Sep 17 00:00:00 2001 From: Ben Sherman Date: Mon, 13 Nov 2023 10:48:53 -0600 Subject: [PATCH 15/17] Fix failing test Signed-off-by: Ben Sherman --- tests/checks/fusion-symlink.nf/.checks | 4 ++-- tests/checks/fusion-symlink.nf/nextflow.config | 4 ++++ 2 files changed, 6 insertions(+), 2 deletions(-) create mode 100644 tests/checks/fusion-symlink.nf/nextflow.config diff --git a/tests/checks/fusion-symlink.nf/.checks b/tests/checks/fusion-symlink.nf/.checks index 9e820bdbca..8ae01221eb 100644 --- a/tests/checks/fusion-symlink.nf/.checks +++ b/tests/checks/fusion-symlink.nf/.checks @@ -4,7 +4,7 @@ # normal run # echo initial run -$NXF_RUN -with-fusion -with-wave -w s3://nextflow-ci/work +$NXF_RUN nextflow fs cp s3://nextflow-ci/work/ci-test/fusion-symlink/data.txt data.txt cmp data.txt .expected || false @@ -13,7 +13,7 @@ cmp data.txt .expected || false # resume run # echo resumed run -$NXF_RUN -with-fusion -with-wave -w s3://nextflow-ci/work -resume +$NXF_RUN -resume nextflow fs cp s3://nextflow-ci/work/ci-test/fusion-symlink/data.txt data.txt cmp data.txt .expected || false diff --git a/tests/checks/fusion-symlink.nf/nextflow.config b/tests/checks/fusion-symlink.nf/nextflow.config new file mode 100644 index 0000000000..630af142fa --- /dev/null +++ b/tests/checks/fusion-symlink.nf/nextflow.config @@ -0,0 +1,4 @@ +workDir = 's3://nextflow-ci/work' +fusion.enabled = true +fusion.exportStorageCredentials = true +wave.enabled = true \ No newline at end of file From ebf4ab6f38d96077837f588e776913d74f2147bd Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Mon, 13 Nov 2023 21:02:07 +0100 Subject: [PATCH 16/17] fix failing tests Signed-off-by: Paolo Di Tommaso --- tests/checks/fusion-symlink.nf/.checks | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/checks/fusion-symlink.nf/.checks b/tests/checks/fusion-symlink.nf/.checks index 8ae01221eb..c22d2c00a2 100644 --- a/tests/checks/fusion-symlink.nf/.checks +++ b/tests/checks/fusion-symlink.nf/.checks @@ -6,7 +6,7 @@ echo initial run $NXF_RUN -nextflow fs cp s3://nextflow-ci/work/ci-test/fusion-symlink/data.txt data.txt +$NXF_RUN fs cp s3://nextflow-ci/work/ci-test/fusion-symlink/data.txt data.txt cmp data.txt .expected || false # @@ -15,5 +15,5 @@ cmp data.txt .expected || false echo resumed run $NXF_RUN -resume -nextflow fs cp s3://nextflow-ci/work/ci-test/fusion-symlink/data.txt data.txt +$NXF_RUN fs cp s3://nextflow-ci/work/ci-test/fusion-symlink/data.txt data.txt cmp data.txt .expected || false From eb24f72c81b2acac8297e76f14015c1735803eef Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Mon, 13 Nov 2023 21:07:42 +0100 Subject: [PATCH 17/17] Fix test Signed-off-by: Paolo Di Tommaso --- tests/checks/fusion-symlink.nf/.checks | 14 ++++++++++---- .../fusion-symlink.nf/{nextflow.config => .config} | 0 2 files changed, 10 insertions(+), 4 deletions(-) rename tests/checks/fusion-symlink.nf/{nextflow.config => .config} (100%) diff --git a/tests/checks/fusion-symlink.nf/.checks b/tests/checks/fusion-symlink.nf/.checks index c22d2c00a2..a28366d245 100644 --- a/tests/checks/fusion-symlink.nf/.checks +++ b/tests/checks/fusion-symlink.nf/.checks @@ -1,19 +1,25 @@ #!/bin/bash +# Skip test if AWS keys are missing +if [ -z "$AWS_ACCESS_KEY_ID" ]; then + echo "Missing AWS credentials -- Skipping test" + exit 0 +fi + # # normal run # echo initial run -$NXF_RUN +$NXF_RUN -c .config -$NXF_RUN fs cp s3://nextflow-ci/work/ci-test/fusion-symlink/data.txt data.txt +$NXF_CMD fs cp s3://nextflow-ci/work/ci-test/fusion-symlink/data.txt data.txt cmp data.txt .expected || false # # resume run # echo resumed run -$NXF_RUN -resume +$NXF_RUN -c .config -resume -$NXF_RUN fs cp s3://nextflow-ci/work/ci-test/fusion-symlink/data.txt data.txt +$NXF_CMD fs cp s3://nextflow-ci/work/ci-test/fusion-symlink/data.txt data.txt cmp data.txt .expected || false diff --git a/tests/checks/fusion-symlink.nf/nextflow.config b/tests/checks/fusion-symlink.nf/.config similarity index 100% rename from tests/checks/fusion-symlink.nf/nextflow.config rename to tests/checks/fusion-symlink.nf/.config