Skip to content

Commit

Permalink
Add toList and toSortedList operators
Browse files Browse the repository at this point in the history
Signed-off-by: Paolo Di Tommaso <[email protected]>
  • Loading branch information
pditommaso committed Jan 6, 2025
1 parent 494bbb0 commit ddeee05
Show file tree
Hide file tree
Showing 6 changed files with 480 additions and 247 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,12 @@ class DataflowHelper {
OpParams() { }

OpParams(Map params) {
this.inputs = params.inputs as List<DataflowReadChannel> ?: List.<DataflowReadChannel>of()
this.outputs = params.outputs as List<DataflowWriteChannel> ?: List.<DataflowWriteChannel>of()
this.listeners = params.listeners as List<DataflowEventListener> ?: List.<DataflowEventListener>of()
if( params.inputs )
this.inputs = params.inputs as List<DataflowReadChannel>
if( params.outputs )
this.outputs = params.outputs as List<DataflowWriteChannel>
if( params.listeners )
this.listeners = params.listeners as List<DataflowEventListener>
}

OpParams withInput(DataflowReadChannel channel) {
Expand Down Expand Up @@ -108,6 +111,44 @@ class DataflowHelper {
}
}

static class ReduceParams {
DataflowReadChannel source
DataflowVariable target
Object seed
Closure action
Closure beforeBind

static ReduceParams build() { new ReduceParams() }

ReduceParams withSource(DataflowReadChannel channel) {
assert channel!=null
this.source = channel
return this
}

ReduceParams withTarget(DataflowVariable output) {
assert output!=null
this.target = output
return this
}

ReduceParams withSeed(Object seed) {
this.seed = seed
return this
}

ReduceParams withAction(Closure action) {
this.action = action
return this
}

ReduceParams withBeforeBind(Closure beforeBind) {
this.beforeBind = beforeBind
return this
}

}

private static Session getSession() { Global.getSession() as Session }

/**
Expand Down Expand Up @@ -383,10 +424,14 @@ class DataflowHelper {
* @param closure
* @return
*/
static DataflowProcessor reduceImpl(final DataflowReadChannel channel, final DataflowVariable result, def seed, final Closure closure) {
static DataflowProcessor reduceImpl(ReduceParams opts) {
assert opts
assert opts.source, "Reduce 'source' channel cannot be null"
assert opts.target, "Reduce 'target' channel cannot be null"
assert opts.action, "Reduce 'action' closure cannot be null"

// the *accumulator* value
def accum = seed
def accum = opts.seed

// intercepts operator events
def listener = new DataflowEventAdapter() {
Expand All @@ -395,7 +440,7 @@ class DataflowHelper {
*/
void afterRun(final DataflowProcessor processor, final List<Object> messages) {
final item = Op.unwrap(messages).get(0)
final value = accum == null ? item : closure.call(accum, item)
final value = accum == null ? item : opts.action.call(accum, item)

if( value == Channel.VOID ) {
// do nothing
Expand All @@ -412,7 +457,10 @@ class DataflowHelper {
* when terminates bind the result value
*/
void afterStop(final DataflowProcessor processor) {
Op.bind(result, accum)
final result = opts.beforeBind
? opts.beforeBind.call(accum)
: accum
Op.bind(opts.target, result)
}

boolean onException(final DataflowProcessor processor, final Throwable e) {
Expand All @@ -423,7 +471,7 @@ class DataflowHelper {
}

final params = new OpParams()
.withInput(channel)
.withInput(opts.source)
.withOutput(CH.create())
.withListener(listener)
.withAccumulator(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,11 @@ class OperatorImpl {
throw new IllegalArgumentException('Operator `reduce` cannot be applied to a value channel')

final target = new DataflowVariable()
reduceImpl( source, target, null, closure )
reduceImpl( ReduceParams
.build()
.withSource(source)
.withTarget(target)
.withAction(closure) )
return target
}

Expand All @@ -211,7 +215,12 @@ class OperatorImpl {
throw new IllegalArgumentException('Operator `reduce` cannot be applied to a value channel')

final target = new DataflowVariable()
reduceImpl( source, target, seed, closure )
reduceImpl( ReduceParams
.build()
.withSource(source)
.withTarget(target)
.withSeed(seed)
.withAction(closure) )
return target
}

Expand Down Expand Up @@ -488,8 +497,7 @@ class OperatorImpl {
* @return A list holding all the items send over the channel
*/
DataflowWriteChannel toList(final DataflowReadChannel source) {
final target = ToListOp.apply(source)
return target
return new ToListOp(source).apply()
}

/**
Expand All @@ -499,8 +507,7 @@ class OperatorImpl {
* @return A list holding all the items send over the channel
*/
DataflowWriteChannel toSortedList(final DataflowReadChannel source, Closure closure = null) {
final target = new ToListOp(source, closure ?: true).apply()
return target as DataflowVariable
return new ToListOp(source, closure ?: true).apply()
}

/**
Expand Down Expand Up @@ -538,9 +545,16 @@ class OperatorImpl {
}
}
else {
reduceImpl(source, target, 0) { current, item ->
final action = { current, item ->
discriminator == null || discriminator.invoke(criteria, item) ? current+1 : current
}
reduceImpl(ReduceParams
.build()
.withSource(source)
.withTarget(target)
.withSeed(0)
.withAction(action)
)
}

return target
Expand All @@ -554,7 +568,11 @@ class OperatorImpl {
*/
DataflowWriteChannel min(final DataflowReadChannel source) {
final target = new DataflowVariable()
reduceImpl(source, target, null) { min, val -> val<min ? val : min }
reduceImpl( ReduceParams
.build()
.withSource(source)
.withTarget(target)
.withAction{ min, val -> val<min ? val : min } )
return target
}

Expand All @@ -570,7 +588,7 @@ class OperatorImpl {
*/
DataflowWriteChannel min(final DataflowReadChannel source, Closure comparator) {

def action
Closure action = null
if( comparator.getMaximumNumberOfParameters() == 1 ) {
action = (Closure){ min, item -> comparator.call(item) < comparator.call(min) ? item : min }
}
Expand All @@ -579,7 +597,11 @@ class OperatorImpl {
}

final target = new DataflowVariable()
reduceImpl(source, target, null, action)
reduceImpl(ReduceParams
.build()
.withSource(source)
.withTarget(target)
.withAction(action))
return target
}

Expand All @@ -592,7 +614,12 @@ class OperatorImpl {
*/
DataflowWriteChannel min(final DataflowReadChannel source, Comparator comparator) {
final target = new DataflowVariable()
reduceImpl(source, target, null) { a, b -> comparator.compare(a,b)<0 ? a : b }
reduceImpl(ReduceParams
.build()
.withSource(source)
.withTarget(target)
.withAction{ a, b -> comparator.compare(a,b)<0 ? a : b }
)
return target
}

Expand All @@ -604,7 +631,12 @@ class OperatorImpl {
*/
DataflowWriteChannel max(final DataflowReadChannel source) {
final target = new DataflowVariable()
reduceImpl(source,target, null) { max, val -> val>max ? val : max }
reduceImpl(ReduceParams
.build()
.withSource(source)
.withTarget(target)
.withAction { max, val -> val>max ? val : max }
)
return target
}

Expand Down Expand Up @@ -632,7 +664,11 @@ class OperatorImpl {
}

final target = new DataflowVariable()
reduceImpl(source, target, null, action)
reduceImpl(ReduceParams
.build()
.withSource(source)
.withTarget(target)
.withAction(action) )
return target
}

Expand All @@ -645,7 +681,11 @@ class OperatorImpl {
*/
DataflowVariable max(final DataflowReadChannel source, Comparator comparator) {
final target = new DataflowVariable()
reduceImpl(source, target, null) { a, b -> comparator.compare(a,b)>0 ? a : b }
reduceImpl(ReduceParams
.build()
.withSource(source)
.withTarget(target)
.withAction { a, b -> comparator.compare(a,b)>0 ? a : b } )
return target
}

Expand Down
27 changes: 17 additions & 10 deletions modules/nextflow/src/main/groovy/nextflow/extension/ToListOp.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,26 @@ class ToListOp {
final result = new ArrayList(1)
Map<String,Closure> events = [:]
events.onNext = { result.add(it) }
events.onComplete = { target.bind(result) }
DataflowHelper.subscribeImpl(source, events )
events.onComplete = { Op.bind(target, result) }
DataflowHelper.subscribeImpl(source, events)
return target
}

DataflowHelper.reduceImpl(source, target, []) { List list, item -> list << item }
if( ordering ) {
final sort = { List list -> ordering instanceof Closure ? list.sort((Closure) ordering) : list.sort() }
return (DataflowVariable)target.then(sort)
}
else {
return target
}
Closure beforeBind = ordering
? { List list -> ordering instanceof Closure ? list.sort((Closure) ordering) : list.sort() }
: null

final reduce = DataflowHelper
.ReduceParams
.build()
.withSource(source)
.withTarget(target)
.withSeed([])
.withBeforeBind(beforeBind)
.withAction{ List list, item -> list << item }

DataflowHelper.reduceImpl(reduce)
return target
}

@Deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@

package nextflow.extension

import groovyx.gpars.dataflow.DataflowQueue
import groovyx.gpars.dataflow.DataflowVariable
import groovyx.gpars.dataflow.operator.DataflowEventListener
import nextflow.Session
import spock.lang.Specification
import spock.lang.Unroll

/**
*
* @author Paolo Di Tommaso <[email protected]>
Expand Down Expand Up @@ -70,4 +72,75 @@ class DataflowHelperTest extends Specification {
[0,1,4] | ['A','B','C','D','E','F'] | ['A','B','E'] | ['C','D','F']
[0] | 'A' | ['A'] | []
}

def 'should validate reduce params' () {
given:
def source = new DataflowQueue()
def target = new DataflowVariable()
def action = {-> 1}
def beforeBind = {-> 2}
def params = new DataflowHelper.ReduceParams()
.withSource(source)
.withTarget(target)
.withSeed('xyz')
.withAction(action)
.withBeforeBind(beforeBind)

expect:
params.source.is(source)
params.target.is(target)
params.seed.is('xyz')
params.action.is(action)
params.beforeBind.is(beforeBind)
}

def 'should validate operator params' () {
when:
def p1 = new DataflowHelper.OpParams().toMap()
then:
p1.inputs == List.of()
p1.outputs == List.of()
p1.listeners == List.of()

when:
def s1 = new DataflowQueue()
def t1 = new DataflowQueue()
def l1 = Mock(DataflowEventListener)
and:
def p2 = new DataflowHelper.OpParams()
.withInput(s1)
.withOutput(t1)
.withListener(l1)
.withAccumulator(true)
then:
p2.inputs == List.of(s1)
p2.outputs == List.of(t1)
p2.listeners == List.of(l1)
p2.accumulator
and:
p2.toMap().inputs == List.of(s1)
p2.toMap().outputs == List.of(t1)
p2.toMap().listeners == List.of(l1)

when:
def s2 = new DataflowQueue()
def t2 = new DataflowQueue()
def l2 = Mock(DataflowEventListener)
and:
def p3 = new DataflowHelper.OpParams()
.withInputs([s1,s2])
.withOutputs([t1,t2])
.withListeners([l1,l2])
.withAccumulator(false)
then:
p3.inputs == List.of(s1,s2)
p3.outputs == List.of(t1,t2)
p3.listeners == List.of(l1,l2)
!p3.accumulator
and:
p3.toMap().inputs == List.of(s1,s2)
p3.toMap().outputs == List.of(t1,t2)
p3.toMap().listeners == List.of(l1,l2)

}
}
Loading

0 comments on commit ddeee05

Please sign in to comment.