Skip to content

Commit

Permalink
Add chain operator
Browse files Browse the repository at this point in the history
Signed-off-by: Paolo Di Tommaso <[email protected]>
  • Loading branch information
pditommaso committed Jan 8, 2025
1 parent e7c8722 commit 88ae57e
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 32 deletions.
95 changes: 95 additions & 0 deletions modules/nextflow/src/main/groovy/nextflow/extension/ChainOp.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright 2013-2024, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package nextflow.extension

import groovy.transform.CompileStatic
import groovyx.gpars.dataflow.DataflowReadChannel
import groovyx.gpars.dataflow.DataflowWriteChannel
import groovyx.gpars.dataflow.operator.ChainWithClosure
import groovyx.gpars.dataflow.operator.DataflowEventListener

import static nextflow.extension.DataflowHelper.newOperator
import static nextflow.extension.DataflowHelper.OpParams

/**
* Implements the chain operator
*
* @author Paolo Di Tommaso <[email protected]>
*/
@CompileStatic
class ChainOp {

private DataflowReadChannel source
private DataflowWriteChannel target
private List<DataflowEventListener> listeners = List.of()
private boolean accumulator
private Closure action

static ChainOp create() {
new ChainOp()
}

ChainOp withSource(DataflowReadChannel source) {
assert source
this.source = source
return this
}

ChainOp withTarget(DataflowWriteChannel target) {
assert target
this.target = target
return this
}

ChainOp withListener(DataflowEventListener listener) {
assert listener != null
this.listeners = List.of(listener)
return this
}

ChainOp withListeners(List<DataflowEventListener> listeners) {
assert listeners != null
this.listeners = listeners
return this
}

ChainOp withAccumulator(boolean value) {
this.accumulator = value
return this
}

ChainOp withAction(Closure action) {
this.action = action
return this
}

DataflowWriteChannel apply() {
assert source
assert target
assert action

final OpParams parameters = new OpParams()
.withInput(source)
.withOutput(target)
.withAccumulator(accumulator)
.withListeners(listeners)

newOperator(parameters, new ChainWithClosure(action))
return target
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import groovyx.gpars.dataflow.DataflowReadChannel
import groovyx.gpars.dataflow.DataflowVariable
import groovyx.gpars.dataflow.DataflowWriteChannel
import groovyx.gpars.dataflow.expression.DataflowExpression
import groovyx.gpars.dataflow.operator.ChainWithClosure
import groovyx.gpars.dataflow.operator.DataflowEventAdapter
import groovyx.gpars.dataflow.operator.DataflowEventListener
import groovyx.gpars.dataflow.operator.DataflowProcessor
Expand Down Expand Up @@ -364,20 +363,6 @@ class DataflowHelper {
}
}

@Deprecated
static DataflowProcessor chainImpl(final DataflowReadChannel source, final DataflowWriteChannel target, final Map params, final Closure closure) {

final OpParams parameters = new OpParams()
.withInput(source)
.withOutput(target)

newOperator(parameters, new ChainWithClosure(closure))
}

static DataflowProcessor chainImpl(OpParams params, final Closure closure) {
newOperator(params, new ChainWithClosure(closure))
}

@PackageScope
@CompileStatic
static KeyPair makeKey(List<Integer> pivot, entry) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,11 @@ class OperatorImpl {
* @return
*/
DataflowWriteChannel chain(final DataflowReadChannel<?> source, final Map<String, Object> params, final Closure closure) {
final target = CH.createBy(source)
chainImpl(source, target, params, closure)
return target;
return ChainOp.create()
.withSource(source)
.withTarget(CH.createBy(source))
.withAction(closure)
.apply()
}

/**
Expand Down Expand Up @@ -344,10 +346,12 @@ class OperatorImpl {
}
} as Closure

// filter removing all duplicates
chainImpl(source, target, [listeners: [events]], filter )

return target
return ChainOp.create()
.withSource(source)
.withTarget(target)
.withListener(events)
.withAction(filter)
.apply()
}

/**
Expand Down Expand Up @@ -383,9 +387,11 @@ class OperatorImpl {
return it
}

chainImpl(source, target, [:], filter)

return target
return ChainOp.create()
.withSource(source)
.withTarget(target)
.withAction(filter)
.apply()
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@ import groovyx.gpars.dataflow.operator.DataflowProcessor
import nextflow.Channel
import nextflow.Global
import nextflow.Session

import static nextflow.extension.DataflowHelper.chainImpl

/**
* Implements reduce operator logic
*
Expand Down Expand Up @@ -130,12 +127,13 @@ class ReduceOp {
}
}

final params = new DataflowHelper.OpParams()
.withInput(source)
.withOutput(CH.create())
ChainOp.create()
.withSource(source)
.withTarget(CH.create())
.withListener(listener)
.withAccumulator(true)
chainImpl(params, {true})
.withAction({true})
.apply()

return target
}
Expand Down

0 comments on commit 88ae57e

Please sign in to comment.