diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/DataflowHelper.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/DataflowHelper.groovy index 4543a89710..b7423fe8a4 100644 --- a/modules/nextflow/src/main/groovy/nextflow/extension/DataflowHelper.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/extension/DataflowHelper.groovy @@ -53,9 +53,12 @@ class DataflowHelper { OpParams() { } OpParams(Map params) { - this.inputs = params.inputs as List ?: List.of() - this.outputs = params.outputs as List ?: List.of() - this.listeners = params.listeners as List ?: List.of() + if( params.inputs ) + this.inputs = params.inputs as List + if( params.outputs ) + this.outputs = params.outputs as List + if( params.listeners ) + this.listeners = params.listeners as List } OpParams withInput(DataflowReadChannel channel) { @@ -108,6 +111,44 @@ class DataflowHelper { } } + static class ReduceParams { + DataflowReadChannel source + DataflowVariable target + Object seed + Closure action + Closure beforeBind + + static ReduceParams build() { new ReduceParams() } + + ReduceParams withSource(DataflowReadChannel channel) { + assert channel!=null + this.source = channel + return this + } + + ReduceParams withTarget(DataflowVariable output) { + assert output!=null + this.target = output + return this + } + + ReduceParams withSeed(Object seed) { + this.seed = seed + return this + } + + ReduceParams withAction(Closure action) { + this.action = action + return this + } + + ReduceParams withBeforeBind(Closure beforeBind) { + this.beforeBind = beforeBind + return this + } + + } + private static Session getSession() { Global.getSession() as Session } /** @@ -383,10 +424,14 @@ class DataflowHelper { * @param closure * @return */ - static DataflowProcessor reduceImpl(final DataflowReadChannel channel, final DataflowVariable result, def seed, final Closure closure) { + static DataflowProcessor reduceImpl(ReduceParams opts) { + assert opts + assert opts.source, "Reduce 'source' channel cannot be null" + assert opts.target, "Reduce 'target' channel cannot be null" + assert opts.action, "Reduce 'action' closure cannot be null" // the *accumulator* value - def accum = seed + def accum = opts.seed // intercepts operator events def listener = new DataflowEventAdapter() { @@ -395,7 +440,7 @@ class DataflowHelper { */ void afterRun(final DataflowProcessor processor, final List messages) { final item = Op.unwrap(messages).get(0) - final value = accum == null ? item : closure.call(accum, item) + final value = accum == null ? item : opts.action.call(accum, item) if( value == Channel.VOID ) { // do nothing @@ -412,7 +457,10 @@ class DataflowHelper { * when terminates bind the result value */ void afterStop(final DataflowProcessor processor) { - Op.bind(result, accum) + final result = opts.beforeBind + ? opts.beforeBind.call(accum) + : accum + Op.bind(opts.target, result) } boolean onException(final DataflowProcessor processor, final Throwable e) { @@ -423,7 +471,7 @@ class DataflowHelper { } final params = new OpParams() - .withInput(channel) + .withInput(opts.source) .withOutput(CH.create()) .withListener(listener) .withAccumulator(true) diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/OperatorImpl.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/OperatorImpl.groovy index 28de8e1521..2de12c46a6 100644 --- a/modules/nextflow/src/main/groovy/nextflow/extension/OperatorImpl.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/extension/OperatorImpl.groovy @@ -188,7 +188,11 @@ class OperatorImpl { throw new IllegalArgumentException('Operator `reduce` cannot be applied to a value channel') final target = new DataflowVariable() - reduceImpl( source, target, null, closure ) + reduceImpl( ReduceParams + .build() + .withSource(source) + .withTarget(target) + .withAction(closure) ) return target } @@ -211,7 +215,12 @@ class OperatorImpl { throw new IllegalArgumentException('Operator `reduce` cannot be applied to a value channel') final target = new DataflowVariable() - reduceImpl( source, target, seed, closure ) + reduceImpl( ReduceParams + .build() + .withSource(source) + .withTarget(target) + .withSeed(seed) + .withAction(closure) ) return target } @@ -488,8 +497,7 @@ class OperatorImpl { * @return A list holding all the items send over the channel */ DataflowWriteChannel toList(final DataflowReadChannel source) { - final target = ToListOp.apply(source) - return target + return new ToListOp(source).apply() } /** @@ -499,8 +507,7 @@ class OperatorImpl { * @return A list holding all the items send over the channel */ DataflowWriteChannel toSortedList(final DataflowReadChannel source, Closure closure = null) { - final target = new ToListOp(source, closure ?: true).apply() - return target as DataflowVariable + return new ToListOp(source, closure ?: true).apply() } /** @@ -538,9 +545,16 @@ class OperatorImpl { } } else { - reduceImpl(source, target, 0) { current, item -> + final action = { current, item -> discriminator == null || discriminator.invoke(criteria, item) ? current+1 : current } + reduceImpl(ReduceParams + .build() + .withSource(source) + .withTarget(target) + .withSeed(0) + .withAction(action) + ) } return target @@ -554,7 +568,11 @@ class OperatorImpl { */ DataflowWriteChannel min(final DataflowReadChannel source) { final target = new DataflowVariable() - reduceImpl(source, target, null) { min, val -> val val comparator.call(item) < comparator.call(min) ? item : min } } @@ -579,7 +597,11 @@ class OperatorImpl { } final target = new DataflowVariable() - reduceImpl(source, target, null, action) + reduceImpl(ReduceParams + .build() + .withSource(source) + .withTarget(target) + .withAction(action)) return target } @@ -592,7 +614,12 @@ class OperatorImpl { */ DataflowWriteChannel min(final DataflowReadChannel source, Comparator comparator) { final target = new DataflowVariable() - reduceImpl(source, target, null) { a, b -> comparator.compare(a,b)<0 ? a : b } + reduceImpl(ReduceParams + .build() + .withSource(source) + .withTarget(target) + .withAction{ a, b -> comparator.compare(a,b)<0 ? a : b } + ) return target } @@ -604,7 +631,12 @@ class OperatorImpl { */ DataflowWriteChannel max(final DataflowReadChannel source) { final target = new DataflowVariable() - reduceImpl(source,target, null) { max, val -> val>max ? val : max } + reduceImpl(ReduceParams + .build() + .withSource(source) + .withTarget(target) + .withAction { max, val -> val>max ? val : max } + ) return target } @@ -632,7 +664,11 @@ class OperatorImpl { } final target = new DataflowVariable() - reduceImpl(source, target, null, action) + reduceImpl(ReduceParams + .build() + .withSource(source) + .withTarget(target) + .withAction(action) ) return target } @@ -645,7 +681,11 @@ class OperatorImpl { */ DataflowVariable max(final DataflowReadChannel source, Comparator comparator) { final target = new DataflowVariable() - reduceImpl(source, target, null) { a, b -> comparator.compare(a,b)>0 ? a : b } + reduceImpl(ReduceParams + .build() + .withSource(source) + .withTarget(target) + .withAction { a, b -> comparator.compare(a,b)>0 ? a : b } ) return target } diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/ToListOp.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/ToListOp.groovy index 163415b3f2..0d2c18893c 100644 --- a/modules/nextflow/src/main/groovy/nextflow/extension/ToListOp.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/extension/ToListOp.groovy @@ -52,19 +52,26 @@ class ToListOp { final result = new ArrayList(1) Map events = [:] events.onNext = { result.add(it) } - events.onComplete = { target.bind(result) } - DataflowHelper.subscribeImpl(source, events ) + events.onComplete = { Op.bind(target, result) } + DataflowHelper.subscribeImpl(source, events) return target } - DataflowHelper.reduceImpl(source, target, []) { List list, item -> list << item } - if( ordering ) { - final sort = { List list -> ordering instanceof Closure ? list.sort((Closure) ordering) : list.sort() } - return (DataflowVariable)target.then(sort) - } - else { - return target - } + Closure beforeBind = ordering + ? { List list -> ordering instanceof Closure ? list.sort((Closure) ordering) : list.sort() } + : null + + final reduce = DataflowHelper + .ReduceParams + .build() + .withSource(source) + .withTarget(target) + .withSeed([]) + .withBeforeBind(beforeBind) + .withAction{ List list, item -> list << item } + + DataflowHelper.reduceImpl(reduce) + return target } @Deprecated diff --git a/modules/nextflow/src/test/groovy/nextflow/extension/DataflowHelperTest.groovy b/modules/nextflow/src/test/groovy/nextflow/extension/DataflowHelperTest.groovy index 44c934d288..cd95f87fea 100644 --- a/modules/nextflow/src/test/groovy/nextflow/extension/DataflowHelperTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/extension/DataflowHelperTest.groovy @@ -16,10 +16,12 @@ package nextflow.extension +import groovyx.gpars.dataflow.DataflowQueue +import groovyx.gpars.dataflow.DataflowVariable +import groovyx.gpars.dataflow.operator.DataflowEventListener import nextflow.Session import spock.lang.Specification import spock.lang.Unroll - /** * * @author Paolo Di Tommaso @@ -70,4 +72,75 @@ class DataflowHelperTest extends Specification { [0,1,4] | ['A','B','C','D','E','F'] | ['A','B','E'] | ['C','D','F'] [0] | 'A' | ['A'] | [] } + + def 'should validate reduce params' () { + given: + def source = new DataflowQueue() + def target = new DataflowVariable() + def action = {-> 1} + def beforeBind = {-> 2} + def params = new DataflowHelper.ReduceParams() + .withSource(source) + .withTarget(target) + .withSeed('xyz') + .withAction(action) + .withBeforeBind(beforeBind) + + expect: + params.source.is(source) + params.target.is(target) + params.seed.is('xyz') + params.action.is(action) + params.beforeBind.is(beforeBind) + } + + def 'should validate operator params' () { + when: + def p1 = new DataflowHelper.OpParams().toMap() + then: + p1.inputs == List.of() + p1.outputs == List.of() + p1.listeners == List.of() + + when: + def s1 = new DataflowQueue() + def t1 = new DataflowQueue() + def l1 = Mock(DataflowEventListener) + and: + def p2 = new DataflowHelper.OpParams() + .withInput(s1) + .withOutput(t1) + .withListener(l1) + .withAccumulator(true) + then: + p2.inputs == List.of(s1) + p2.outputs == List.of(t1) + p2.listeners == List.of(l1) + p2.accumulator + and: + p2.toMap().inputs == List.of(s1) + p2.toMap().outputs == List.of(t1) + p2.toMap().listeners == List.of(l1) + + when: + def s2 = new DataflowQueue() + def t2 = new DataflowQueue() + def l2 = Mock(DataflowEventListener) + and: + def p3 = new DataflowHelper.OpParams() + .withInputs([s1,s2]) + .withOutputs([t1,t2]) + .withListeners([l1,l2]) + .withAccumulator(false) + then: + p3.inputs == List.of(s1,s2) + p3.outputs == List.of(t1,t2) + p3.listeners == List.of(l1,l2) + !p3.accumulator + and: + p3.toMap().inputs == List.of(s1,s2) + p3.toMap().outputs == List.of(t1,t2) + p3.toMap().listeners == List.of(l1,l2) + + } } diff --git a/modules/nextflow/src/test/groovy/nextflow/extension/OperatorImplTest.groovy b/modules/nextflow/src/test/groovy/nextflow/extension/OperatorImplTest.groovy index 10a96ba95a..c67af804d4 100644 --- a/modules/nextflow/src/test/groovy/nextflow/extension/OperatorImplTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/extension/OperatorImplTest.groovy @@ -19,9 +19,11 @@ package nextflow.extension import java.nio.file.Paths import groovyx.gpars.dataflow.DataflowQueue +import groovyx.gpars.dataflow.DataflowReadChannel import groovyx.gpars.dataflow.DataflowVariable import nextflow.Channel import nextflow.Session +import nextflow.prov.Tracker import spock.lang.Specification import spock.lang.Timeout /** @@ -35,56 +37,65 @@ class OperatorImplTest extends Specification { new Session() } + private mval(Object obj) { + if( obj instanceof DataflowReadChannel ) { + def result = obj.val + return result instanceof Tracker.Msg ? result.value : result + } + else + return obj + } + def testFilter() { when: def c1 = Channel.of(1,2,3,4,5).filter { it > 3 } then: - c1.val == 4 - c1.val == 5 - c1.val == Channel.STOP + mval(c1) == 4 + mval(c1) == 5 + mval(c1) == Channel.STOP when: def c2 = Channel.of('hola','hello','cioa','miao').filter { it =~ /^h.*/ } then: - c2.val == 'hola' - c2.val == 'hello' - c2.val == Channel.STOP + mval(c2) == 'hola' + mval(c2) == 'hello' + mval(c2) == Channel.STOP when: def c3 = Channel.of('hola','hello','cioa','miao').filter { it ==~ /^h.*/ } then: - c3.val == 'hola' - c3.val == 'hello' - c3.val == Channel.STOP + mval(c3) == 'hola' + mval(c3) == 'hello' + mval(c3) == Channel.STOP when: def c4 = Channel.of('hola','hello','cioa','miao').filter( ~/^h.*/ ) then: - c4.val == 'hola' - c4.val == 'hello' - c4.val == Channel.STOP + mval(c4) == 'hola' + mval(c4) == 'hello' + mval(c4) == Channel.STOP when: def c5 = Channel.of('hola',1,'cioa',2,3).filter( Number ) then: - c5.val == 1 - c5.val == 2 - c5.val == 3 - c5.val == Channel.STOP + mval(c5) == 1 + mval(c5) == 2 + mval(c5) == 3 + mval(c5) == Channel.STOP expect: - Channel.of(1,2,4,2,4,5,6,7,4).filter(1) .count().val == 1 - Channel.of(1,2,4,2,4,5,6,7,4).filter(2) .count().val == 2 - Channel.of(1,2,4,2,4,5,6,7,4).filter(4) .count().val == 3 + mval( Channel.of(1,2,4,2,4,5,6,7,4).filter(1) .count() ) == 1 + mval( Channel.of(1,2,4,2,4,5,6,7,4).filter(2) .count() ) == 2 + mval( Channel.of(1,2,4,2,4,5,6,7,4).filter(4) .count() ) == 3 } def testFilterWithValue() { expect: - Channel.value(3).filter { it>1 }.val == 3 - Channel.value(0).filter { it>1 }.val == Channel.STOP - Channel.value(Channel.STOP).filter { it>1 }.val == Channel.STOP + mval(Channel.value(3).filter { it>1 }) == 3 + mval(Channel.value(0).filter { it>1 }) == Channel.STOP + mval(Channel.value(Channel.STOP).filter { it>1 }) == Channel.STOP } def testSubscribe() { @@ -161,10 +172,10 @@ class OperatorImplTest extends Specification { when: def result = Channel.of(1,2,3).map { "Hello $it" } then: - result.val == 'Hello 1' - result.val == 'Hello 2' - result.val == 'Hello 3' - result.val == Channel.STOP + mval(result) == 'Hello 1' + mval(result) == 'Hello 2' + mval(result) == 'Hello 3' + mval(result) == Channel.STOP } def testMapWithVariable() { @@ -173,9 +184,9 @@ class OperatorImplTest extends Specification { when: def result = variable.map { it.reverse() } then: - result.val == 'olleH' - result.val == 'olleH' - result.val == 'olleH' + mval(result) == 'olleH' + mval(result) == 'olleH' + mval(result) == 'olleH' } def testMapParamExpanding () { @@ -183,10 +194,10 @@ class OperatorImplTest extends Specification { when: def result = Channel.of(1,2,3).map { [it, it] }.map { x, y -> x+y } then: - result.val == 2 - result.val == 4 - result.val == 6 - result.val == Channel.STOP + mval(result) == 2 + mval(result) == 4 + mval(result) == 6 + mval(result) == Channel.STOP } def testSkip() { @@ -194,9 +205,9 @@ class OperatorImplTest extends Specification { when: def result = Channel.of(1,2,3).map { it == 2 ? Channel.VOID : "Hello $it" } then: - result.val == 'Hello 1' - result.val == 'Hello 3' - result.val == Channel.STOP + mval(result) == 'Hello 1' + mval(result) == 'Hello 3' + mval(result) == Channel.STOP } @@ -206,13 +217,13 @@ class OperatorImplTest extends Specification { when: def result = Channel.of(1,2,3).flatMap { it -> [it, it*2] } then: - result.val == 1 - result.val == 2 - result.val == 2 - result.val == 4 - result.val == 3 - result.val == 6 - result.val == Channel.STOP + mval(result) == 1 + mval(result) == 2 + mval(result) == 2 + mval(result) == 4 + mval(result) == 3 + mval(result) == 6 + mval(result) == Channel.STOP } def testMapManyWithSingleton() { @@ -220,15 +231,15 @@ class OperatorImplTest extends Specification { when: def result = Channel.value([1,2,3]).flatMap() then: - result.val == 1 - result.val == 2 - result.val == 3 - result.val == Channel.STOP + mval(result) == 1 + mval(result) == 2 + mval(result) == 3 + mval(result) == Channel.STOP when: result = Channel.empty().flatMap() then: - result.val == Channel.STOP + mval(result) == Channel.STOP } @@ -237,11 +248,11 @@ class OperatorImplTest extends Specification { when: def result = Channel.of( [1,2], ['a','b'] ).flatMap { it -> [it, it.reverse()] } then: - result.val == [1,2] - result.val == [2,1] - result.val == ['a','b'] - result.val == ['b','a'] - result.val == Channel.STOP + mval(result) == [1, 2] + mval(result) == [2, 1] + mval(result) == ['a', 'b'] + mval(result) == ['b', 'a'] + mval(result) == Channel.STOP } def testMapManyDefault () { @@ -249,11 +260,11 @@ class OperatorImplTest extends Specification { when: def result = Channel.of( [1,2], ['a',['b','c']] ).flatMap() then: - result.val == 1 - result.val == 2 - result.val == 'a' - result.val == ['b','c'] // <-- nested list are preserved - result.val == Channel.STOP + mval(result) == 1 + mval(result) == 2 + mval(result) == 'a' + mval(result) == ['b', 'c'] // <-- nested list are preserved + mval(result) == Channel.STOP } def testMapManyWithHashArray () { @@ -261,13 +272,13 @@ class OperatorImplTest extends Specification { when: def result = Channel.of(1,2,3).flatMap { it -> [ k: it, v: it*2] } then: - result.val == new MapEntry('k',1) - result.val == new MapEntry('v',2) - result.val == new MapEntry('k',2) - result.val == new MapEntry('v',4) - result.val == new MapEntry('k',3) - result.val == new MapEntry('v',6) - result.val == Channel.STOP + mval(result) == new MapEntry('k',1) + mval(result) == new MapEntry('v',2) + mval(result) == new MapEntry('k',2) + mval(result) == new MapEntry('v',4) + mval(result) == new MapEntry('k',3) + mval(result) == new MapEntry('v',6) + mval(result) == Channel.STOP } @@ -280,33 +291,33 @@ class OperatorImplTest extends Specification { def result = channel.reduce { a, e -> a += e } channel << 1 << 2 << 3 << 4 << 5 << Channel.STOP then: - result.getVal() == 15 + mval(result) == 15 when: channel = Channel.of(1,2,3,4,5) result = channel.reduce { a, e -> a += e } then: - result.getVal() == 15 + mval(result) == 15 when: channel = Channel.create() result = channel.reduce { a, e -> a += e } channel << 99 << Channel.STOP then: - result.getVal() == 99 + mval(result) == 99 when: channel = Channel.create() result = channel.reduce { a, e -> a += e } channel << Channel.STOP then: - result.getVal() == null + mval(result) == null when: result = Channel.of(6,5,4,3,2,1).reduce { a, e -> Channel.STOP } then: - result.val == 6 + mval(result) == 6 } @@ -318,50 +329,50 @@ class OperatorImplTest extends Specification { def result = channel.reduce (1) { a, e -> a += e } channel << 1 << 2 << 3 << 4 << 5 << Channel.STOP then: - result.getVal() == 16 + mval(result) == 16 when: channel = Channel.create() result = channel.reduce (10) { a, e -> a += e } channel << Channel.STOP then: - result.getVal() == 10 + mval(result) == 10 when: result = Channel.of(6,5,4,3,2,1).reduce(0) { a, e -> a < 3 ? a+1 : Channel.STOP } then: - result.val == 3 + mval(result) == 3 } def testFirst() { expect: - Channel.of(3,6,4,5,4,3,4).first().val == 3 + mval(Channel.of(3,6,4,5,4,3,4).first()) == 3 } def testFirstWithCriteria() { expect: - Channel.of(3,6,4,5,4,3,4).first{ it>4 } .val == 6 + mval(Channel.of(3,6,4,5,4,3,4).first{ it>4 }) == 6 } def testFirstWithValue() { expect: - Channel.value(3).first().val == 3 - Channel.value(3).first{ it>1 }.val == 3 - Channel.value(3).first{ it>3 }.val == Channel.STOP - Channel.value(Channel.STOP).first { it>3 }.val == Channel.STOP + mval(Channel.value(3).first()) == 3 + mval(Channel.value(3).first{ it>1 }) == 3 + mval(Channel.value(3).first{ it>3 }) == Channel.STOP + mval(Channel.value(Channel.STOP).first { it>3 }) == Channel.STOP } def testFirstWithCondition() { expect: - Channel.of(3,6,4,5,4,3,4).first { it % 2 == 0 } .val == 6 - Channel.of( 'a', 'b', 'c', 1, 2 ).first( Number ) .val == 1 - Channel.of( 'a', 'b', 1, 2, 'aaa', 'bbb' ).first( ~/aa.*/ ) .val == 'aaa' - Channel.of( 'a', 'b', 1, 2, 'aaa', 'bbb' ).first( 1 ) .val == 1 + mval(Channel.of(3,6,4,5,4,3,4).first { it % 2 == 0 }) == 6 + mval(Channel.of( 'a', 'b', 'c', 1, 2 ).first( Number )) == 1 + mval(Channel.of( 'a', 'b', 1, 2, 'aaa', 'bbb' ).first( ~/aa.*/ )) == 'aaa' + mval(Channel.of( 'a', 'b', 1, 2, 'aaa', 'bbb' ).first( 1 )) == 1 } @@ -371,45 +382,45 @@ class OperatorImplTest extends Specification { when: def result = Channel.of(1,2,3,4,5,6).take(3) then: - result.val == 1 - result.val == 2 - result.val == 3 - result.val == Channel.STOP + mval(result) == 1 + mval(result) == 2 + mval(result) == 3 + mval(result) == Channel.STOP when: result = Channel.of(1).take(3) then: - result.val == 1 - result.val == Channel.STOP + mval(result) == 1 + mval(result) == Channel.STOP when: result = Channel.of(1,2,3).take(0) then: - result.val == Channel.STOP + mval(result) == Channel.STOP when: result = Channel.of(1,2,3).take(-1) then: - result.val == 1 - result.val == 2 - result.val == 3 - result.val == Channel.STOP + mval(result) == 1 + mval(result) == 2 + mval(result) == 3 + mval(result) == Channel.STOP when: result = Channel.of(1,2,3).take(3) then: - result.val == 1 - result.val == 2 - result.val == 3 - result.val == Channel.STOP + mval(result) == 1 + mval(result) == 2 + mval(result) == 3 + mval(result) == Channel.STOP } def testLast() { expect: - Channel.of(3,6,4,5,4,3,9).last().val == 9 - Channel.value('x').last().val == 'x' + mval(Channel.of(3,6,4,5,4,3,9).last()) == 9 + mval(Channel.value('x').last()) == 'x' } @@ -417,12 +428,12 @@ class OperatorImplTest extends Specification { def testCount() { expect: - Channel.of(4,1,7,5).count().val == 4 - Channel.of(4,1,7,1,1).count(1).val == 3 - Channel.of('a','c','c','q','b').count ( ~/c/ ) .val == 2 - Channel.value(5).count().val == 1 - Channel.value(5).count(5).val == 1 - Channel.value(5).count(6).val == 0 + mval(Channel.of(4,1,7,5).count()) == 4 + mval(Channel.of(4,1,7,1,1).count(1)) == 3 + mval(Channel.of('a','c','c','q','b').count ( ~/c/ )) == 2 + mval(Channel.value(5).count()) == 1 + mval(Channel.value(5).count(5)) == 1 + mval(Channel.value(5).count(6)) == 0 } def testToList() { @@ -430,23 +441,23 @@ class OperatorImplTest extends Specification { when: def channel = Channel.of(1,2,3) then: - channel.toList().val == [1,2,3] + mval(channel.toList()) == [1, 2, 3] when: channel = Channel.create() channel << Channel.STOP then: - channel.toList().val == [] + mval(channel.toList()) == [] when: channel = Channel.value(1) then: - channel.toList().val == [1] + mval(channel.toList()) == [1] when: channel = Channel.empty() then: - channel.toList().val == [] + mval(channel.toList()) == [] } def testToSortedList() { @@ -454,45 +465,44 @@ class OperatorImplTest extends Specification { when: def channel = Channel.of(3,1,4,2) then: - channel.toSortedList().val == [1,2,3,4] + mval(channel.toSortedList()) == [1, 2, 3, 4] when: - channel = Channel.create() - channel << Channel.STOP + channel = Channel.empty() then: - channel.toSortedList().val == [] + mval(channel.toSortedList()) == [] when: channel = Channel.of([1,'zeta'], [2,'gamma'], [3,'alpaha'], [4,'delta']) then: - channel.toSortedList { it[1] } .val == [[3,'alpaha'], [4,'delta'], [2,'gamma'], [1,'zeta'] ] + mval(channel.toSortedList { it[1] }) == [[3, 'alpaha'], [4, 'delta'], [2, 'gamma'], [1, 'zeta'] ] when: channel = Channel.value(1) then: - channel.toSortedList().val == [1] + mval(channel.toSortedList()) == [1] when: channel = Channel.empty() then: - channel.toSortedList().val == [] + mval(channel.toSortedList()) == [] } def testUnique() { expect: - Channel.of(1,1,1,5,7,7,7,3,3).unique().toList().val == [1,5,7,3] - Channel.of(1,3,4,5).unique { it%2 } .toList().val == [1,4] + mval(Channel.of(1,1,1,5,7,7,7,3,3).unique().toList()) == [1, 5, 7, 3] + mval(Channel.of(1,3,4,5).unique { it%2 } .toList()) == [1, 4] and: - Channel.of(1).unique().val == 1 - Channel.value(1).unique().val == 1 + mval(Channel.of(1).unique()) == 1 + mval(Channel.value(1).unique()) == 1 } def testDistinct() { expect: - Channel.of(1,1,2,2,2,3,1,1,2,2,3).distinct().toList().val == [1,2,3,1,2,3] - Channel.of(1,1,2,2,2,3,1,1,2,4,6).distinct { it%2 } .toList().val == [1,2,3,2] + mval(Channel.of(1,1,2,2,2,3,1,1,2,2,3).distinct().toList()) == [1, 2, 3, 1, 2, 3] + mval(Channel.of(1,1,2,2,2,3,1,1,2,4,6).distinct { it%2 } .toList()) == [1, 2, 3, 2] } @@ -501,54 +511,54 @@ class OperatorImplTest extends Specification { when: def r1 = Channel.of(1,2,3).flatten() then: - r1.val == 1 - r1.val == 2 - r1.val == 3 - r1.val == Channel.STOP + mval(r1) == 1 + mval(r1) == 2 + mval(r1) == 3 + mval(r1) == Channel.STOP when: def r2 = Channel.of([1,'a'], [2,'b']).flatten() then: - r2.val == 1 - r2.val == 'a' - r2.val == 2 - r2.val == 'b' - r2.val == Channel.STOP + mval(r2) == 1 + mval(r2) == 'a' + mval(r2) == 2 + mval(r2) == 'b' + mval(r2) == Channel.STOP when: def r3 = Channel.of( [1,2] as Integer[], [3,4] as Integer[] ).flatten() then: - r3.val == 1 - r3.val == 2 - r3.val == 3 - r3.val == 4 - r3.val == Channel.STOP + mval(r3) == 1 + mval(r3) == 2 + mval(r3) == 3 + mval(r3) == 4 + mval(r3) == Channel.STOP when: def r4 = Channel.of( [1,[2,3]], 4, [5,[6]] ).flatten() then: - r4.val == 1 - r4.val == 2 - r4.val == 3 - r4.val == 4 - r4.val == 5 - r4.val == 6 - r4.val == Channel.STOP + mval(r4) == 1 + mval(r4) == 2 + mval(r4) == 3 + mval(r4) == 4 + mval(r4) == 5 + mval(r4) == 6 + mval(r4) == Channel.STOP } def testFlattenWithSingleton() { when: def result = Channel.value([3,2,1]).flatten() then: - result.val == 3 - result.val == 2 - result.val == 1 - result.val == Channel.STOP + mval(result) == 3 + mval(result) == 2 + mval(result) == 1 + mval(result) == Channel.STOP when: result = Channel.empty().flatten() then: - result.val == Channel.STOP + mval(result) == Channel.STOP } def testCollate() { @@ -556,18 +566,18 @@ class OperatorImplTest extends Specification { when: def r1 = Channel.of(1,2,3,1,2,3,1).collate( 2, false ) then: - r1.val == [1,2] - r1.val == [3,1] - r1.val == [2,3] - r1.val == Channel.STOP + mval(r1) == [1, 2] + mval(r1) == [3, 1] + mval(r1) == [2, 3] + mval(r1) == Channel.STOP when: def r2 = Channel.of(1,2,3,1,2,3,1).collate( 3 ) then: - r2.val == [1,2,3] - r2.val == [1,2,3] - r2.val == [1] - r2.val == Channel.STOP + mval(r2) == [1, 2, 3] + mval(r2) == [1, 2, 3] + mval(r2) == [1] + mval(r2) == Channel.STOP } @@ -576,44 +586,44 @@ class OperatorImplTest extends Specification { when: def r1 = Channel.of(1,2,3,4).collate( 3, 1, false ) then: - r1.val == [1,2,3] - r1.val == [2,3,4] - r1.val == Channel.STOP + mval(r1) == [1, 2, 3] + mval(r1) == [2, 3, 4] + mval(r1) == Channel.STOP when: def r2 = Channel.of(1,2,3,4).collate( 3, 1, true ) then: - r2.val == [1,2,3] - r2.val == [2,3,4] - r2.val == [3,4] - r2.val == [4] - r2.val == Channel.STOP + mval(r2) == [1, 2, 3] + mval(r2) == [2, 3, 4] + mval(r2) == [3, 4] + mval(r2) == [4] + mval(r2) == Channel.STOP when: def r3 = Channel.of(1,2,3,4).collate( 3, 1 ) then: - r3.val == [1,2,3] - r3.val == [2,3,4] - r3.val == [3,4] - r3.val == [4] - r3.val == Channel.STOP + mval(r3) == [1, 2, 3] + mval(r3) == [2, 3, 4] + mval(r3) == [3, 4] + mval(r3) == [4] + mval(r3) == Channel.STOP when: def r4 = Channel.of(1,2,3,4).collate( 4,4 ) then: - r4.val == [1,2,3,4] - r4.val == Channel.STOP + mval(r4) == [1, 2, 3, 4] + mval(r4) == Channel.STOP when: def r5 = Channel.of(1,2,3,4).collate( 6,6 ) then: - r5.val == [1,2,3,4] - r5.val == Channel.STOP + mval(r5) == [1, 2, 3, 4] + mval(r5) == Channel.STOP when: def r6 = Channel.of(1,2,3,4).collate( 6,6,false ) then: - r6.val == Channel.STOP + mval(r6) == Channel.STOP } @@ -644,25 +654,25 @@ class OperatorImplTest extends Specification { when: def result = Channel.value(1).collate(1) then: - result.val == [1] - result.val == Channel.STOP + mval(result) == [1] + mval(result) == Channel.STOP when: result = Channel.value(1).collate(10) then: - result.val == [1] - result.val == Channel.STOP + mval(result) == [1] + mval(result) == Channel.STOP when: result = Channel.value(1).collate(10, true) then: - result.val == [1] - result.val == Channel.STOP + mval(result) == [1] + mval(result) == Channel.STOP when: result = Channel.value(1).collate(10, false) then: - result.val == Channel.STOP + mval(result) == Channel.STOP } def testMix() { @@ -687,7 +697,7 @@ class OperatorImplTest extends Specification { when: def result = Channel.value(1).mix( Channel.of(2,3) ) then: - result.toList().val.sort() == [1,2,3] + mval(result.toList().sort()) == [1, 2, 3] } @@ -741,12 +751,12 @@ class OperatorImplTest extends Specification { def result = ch1.cross(ch2) then: - result.val == [ [1, 'x'], [1,11] ] - result.val == [ [1, 'x'], [1,13] ] - result.val == [ [2, 'y'], [2,21] ] - result.val == [ [2, 'y'], [2,22] ] - result.val == [ [2, 'y'], [2,23] ] - result.val == Channel.STOP + mval(result) == [[1, 'x'], [1, 11] ] + mval(result) == [[1, 'x'], [1, 13] ] + mval(result) == [[2, 'y'], [2, 21] ] + mval(result) == [[2, 'y'], [2, 22] ] + mval(result) == [[2, 'y'], [2, 23] ] + mval(result) == Channel.STOP } @@ -761,9 +771,9 @@ class OperatorImplTest extends Specification { def result = ch1.cross(ch2) then: - result.val == [ ['PF00006', 'PF00006.sp_lib'], ['PF00006', 'PF00006_mafft.aln'] ] - result.val == [ ['PF00006', 'PF00006.sp_lib'], ['PF00006', 'PF00006_clustalo.aln'] ] - result.val == Channel.STOP + mval(result) == [['PF00006', 'PF00006.sp_lib'], ['PF00006', 'PF00006_mafft.aln'] ] + mval(result) == [['PF00006', 'PF00006.sp_lib'], ['PF00006', 'PF00006_clustalo.aln'] ] + mval(result) == Channel.STOP } @@ -779,9 +789,9 @@ class OperatorImplTest extends Specification { def result = ch1.cross(ch2) then: - result.val == [ ['PF00006', 'PF00006.sp_lib'], ['PF00006', 'PF00006_mafft.aln'] ] - result.val == [ ['PF00006', 'PF00006.sp_lib'], ['PF00006', 'PF00006_clustalo.aln'] ] - result.val == Channel.STOP + mval(result) == [['PF00006', 'PF00006.sp_lib'], ['PF00006', 'PF00006_mafft.aln'] ] + mval(result) == [['PF00006', 'PF00006.sp_lib'], ['PF00006', 'PF00006_clustalo.aln'] ] + mval(result) == Channel.STOP } @@ -793,13 +803,13 @@ class OperatorImplTest extends Specification { def c2 = Channel.of('a','b','c') def all = c1.concat(c2) then: - all.val == 1 - all.val == 2 - all.val == 3 - all.val == 'a' - all.val == 'b' - all.val == 'c' - all.val == Channel.STOP + mval(all) == 1 + mval(all) == 2 + mval(all) == 3 + mval(all) == 'a' + mval(all) == 'b' + mval(all) == 'c' + mval(all) == Channel.STOP when: def d1 = Channel.create() @@ -811,14 +821,14 @@ class OperatorImplTest extends Specification { Thread.start { sleep 100; d1 << 1 << 2 << Channel.STOP } then: - result.val == 1 - result.val == 2 - result.val == 'a' - result.val == 'b' - result.val == 'c' - result.val == 'p' - result.val == 'q' - result.val == Channel.STOP + mval(result) == 1 + mval(result) == 2 + mval(result) == 'a' + mval(result) == 'b' + mval(result) == 'c' + mval(result) == 'p' + mval(result) == 'q' + mval(result) == Channel.STOP } @@ -826,10 +836,10 @@ class OperatorImplTest extends Specification { when: def result = Channel.value(1).concat( Channel.of(2,3) ) then: - result.val == 1 - result.val == 2 - result.val == 3 - result.val == Channel.STOP + mval(result) == 1 + mval(result) == 2 + mval(result) == 3 + mval(result) == Channel.STOP } diff --git a/modules/nextflow/src/test/groovy/nextflow/prov/ProvTest.groovy b/modules/nextflow/src/test/groovy/nextflow/prov/ProvTest.groovy index e4ee22054f..f13bd7a7b2 100644 --- a/modules/nextflow/src/test/groovy/nextflow/prov/ProvTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/prov/ProvTest.groovy @@ -203,4 +203,59 @@ class ProvTest extends Dsl2Spec { t1.name == ['p1 (1)', 'p1 (2)', 'p1 (3)'] } + + def 'should track the provenance of two tasks and toList operator' () { + when: + dsl_eval(globalConfig(), ''' + workflow { + channel.of('a','b','c') | p1 | toList | p2 + } + + process p1 { + input: val(x) + output: val(y) + exec: + y = x + } + + process p2 { + input: val(x) + exec: + println x + } + ''') + + then: + def t1 = upstreamTasksOf('p2') + t1.name == ['p1 (1)', 'p1 (2)', 'p1 (3)'] + + } + + def 'should track the provenance of two tasks and toSortedList operator' () { + when: + dsl_eval(globalConfig(), ''' + workflow { + channel.of('a','b','c') | p1 | toList | p2 + } + + process p1 { + input: val(x) + output: val(y) + exec: + y = x + } + + process p2 { + input: val(x) + exec: + println x + } + ''') + + then: + def t1 = upstreamTasksOf('p2') + t1.name == ['p1 (1)', 'p1 (2)', 'p1 (3)'] + + } + }