Skip to content

Commit

Permalink
Task provenance - poc #2
Browse files Browse the repository at this point in the history
Signed-off-by: Paolo Di Tommaso <[email protected]>
  • Loading branch information
pditommaso committed Jan 5, 2025
1 parent ca9fe88 commit 494bbb0
Show file tree
Hide file tree
Showing 27 changed files with 1,024 additions and 267 deletions.
9 changes: 2 additions & 7 deletions modules/nextflow/src/main/groovy/nextflow/Session.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ import nextflow.processor.ErrorStrategy
import nextflow.processor.TaskFault
import nextflow.processor.TaskHandler
import nextflow.processor.TaskProcessor
import nextflow.provenance.ProvTracker
import nextflow.prov.Tracker
import nextflow.script.BaseScript
import nextflow.script.ProcessConfig
import nextflow.script.ProcessFactory
Expand Down Expand Up @@ -224,8 +224,6 @@ class Session implements ISession {

private DAG dag

private ProvTracker provenance

private CacheDB cache

private Barrier processesBarrier = new Barrier()
Expand Down Expand Up @@ -384,9 +382,6 @@ class Session implements ISession {
// -- DAG object
this.dag = new DAG()

// -- create the provenance tracker
this.provenance = new ProvTracker()

// -- init output dir
this.outputDir = FileHelper.toCanonicalPath(config.outputDir ?: 'results')

Expand Down Expand Up @@ -862,7 +857,7 @@ class Session implements ISession {

DAG getDag() { this.dag }

ProvTracker getProvenance() { provenance }
Tracker getProvenance() { provenance }

ExecutorService getExecService() { execService }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class BranchOp {
protected void doNext(it) {
TokenBranchChoice ret = switchDef.closure.call(it)
if( ret ) {
targets[ret.choice].bind(ret.value)
Op.bind(targets[ret.choice], ret.value)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package nextflow.extension

import static nextflow.util.CacheHelper.*
import static nextflow.util.CheckHelper.*

import java.nio.file.Path

import groovy.util.logging.Slf4j
Expand All @@ -28,8 +31,6 @@ import nextflow.file.FileHelper
import nextflow.file.SimpleFileCollector
import nextflow.file.SortFileCollector
import nextflow.util.CacheHelper
import static nextflow.util.CacheHelper.HashMode
import static nextflow.util.CheckHelper.checkParams
/**
* Implements the body of {@link OperatorImpl#collectFile(groovyx.gpars.dataflow.DataflowReadChannel)} operator
*
Expand Down Expand Up @@ -185,10 +186,10 @@ class CollectFileOp {
protected emitItems( obj ) {
// emit collected files to 'result' channel
collector.saveTo(storeDir).each {
result.bind(it)
Op.bind(result,it)
}
// close the channel
result.bind(Channel.STOP)
Op.bind(result,Channel.STOP)
// close the collector
collector.safeClose()
}
Expand Down Expand Up @@ -261,9 +262,8 @@ class CollectFileOp {
return collector
}


DataflowWriteChannel apply() {
DataflowHelper.subscribeImpl( channel, [onNext: this.&processItem, onComplete: this.&emitItems] )
DataflowHelper.subscribeImpl( channel, true, [onNext: this.&processItem, onComplete: this.&emitItems] )
return result
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package nextflow.extension

import static nextflow.util.CheckHelper.checkParams
import static nextflow.util.CheckHelper.*

import groovy.transform.CompileStatic
import groovyx.gpars.dataflow.DataflowReadChannel
Expand Down Expand Up @@ -55,7 +55,10 @@ class CollectOp {

Map<String,Closure> events = [:]
events.onNext = { append(result, it) }
events.onComplete = { target << ( result ? new ArrayBag(normalise(result)) : Channel.STOP ) }
events.onComplete = {
final msg = result ? new ArrayBag(normalise(result)) : Channel.STOP
Op.bind(target, msg)
}

DataflowHelper.subscribeImpl(source, events)
return target
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ class ConcatOp {
def next = index < channels.size() ? channels[index] : null

def events = new HashMap<String,Closure>(2)
events.onNext = { result.bind(it) }
events.onNext = { Op.bind(result, it) }
events.onComplete = {
if(next) append(result, channels, index)
else result.bind(Channel.STOP)
else Op.bind(result, Channel.STOP)
}

DataflowHelper.subscribeImpl(current, events)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import nextflow.Channel
import nextflow.Global
import nextflow.Session
import nextflow.dag.NodeMarker
import static java.util.Arrays.asList
/**
* This class provides helper methods to implement nextflow operators
*
Expand All @@ -45,6 +44,70 @@ import static java.util.Arrays.asList
@Slf4j
class DataflowHelper {

static class OpParams {
List<DataflowReadChannel> inputs
List<DataflowWriteChannel> outputs
List<DataflowEventListener> listeners
boolean accumulator

OpParams() { }

OpParams(Map params) {
this.inputs = params.inputs as List<DataflowReadChannel> ?: List.<DataflowReadChannel>of()
this.outputs = params.outputs as List<DataflowWriteChannel> ?: List.<DataflowWriteChannel>of()
this.listeners = params.listeners as List<DataflowEventListener> ?: List.<DataflowEventListener>of()
}

OpParams withInput(DataflowReadChannel channel) {
assert channel != null
this.inputs = List.of(channel)
return this
}

OpParams withInputs(List<DataflowReadChannel> channels) {
assert channels != null
this.inputs = channels
return this
}

OpParams withOutput(DataflowWriteChannel channel) {
assert channel != null
this.outputs = List.of(channel)
return this
}

OpParams withOutputs(List<DataflowWriteChannel> channels) {
assert channels != null
this.outputs = channels
return this
}

OpParams withListener(DataflowEventListener listener) {
assert listener != null
this.listeners = List.of(listener)
return this
}

OpParams withListeners(List<DataflowEventListener> listeners) {
assert listeners != null
this.listeners = listeners
return this
}

OpParams withAccumulator(boolean acc) {
this.accumulator = acc
return this
}

Map toMap() {
final ret = new HashMap()
ret.inputs = inputs ?: List.of()
ret.outputs = outputs ?: List.of()
ret.listeners = listeners ?: List.of()
return ret
}
}

private static Session getSession() { Global.getSession() as Session }

/**
Expand Down Expand Up @@ -141,6 +204,7 @@ class DataflowHelper {
* @param params The map holding inputs, outputs channels and other parameters
* @param code The closure to be executed by the operator
*/
@Deprecated
static DataflowProcessor newOperator( Map params, Closure code ) {

// -- add a default error listener
Expand All @@ -149,13 +213,13 @@ class DataflowHelper {
params.listeners = [ DEF_ERROR_LISTENER ]
}

final op = Dataflow.operator(params, code)
NodeMarker.appendOperator(op)
if( session && session.allOperators != null ) {
session.allOperators.add(op)
}
return newOperator0(new OpParams(params), code)
}

return op
static DataflowProcessor newOperator( OpParams params, Closure code ) {
if( !params.listeners )
params.withListener(DEF_ERROR_LISTENER)
return newOperator0(params, code)
}

/**
Expand Down Expand Up @@ -195,16 +259,25 @@ class DataflowHelper {
* @param code The closure to be executed by the operator
*/
static DataflowProcessor newOperator( DataflowReadChannel input, DataflowWriteChannel output, DataflowEventListener listener, Closure code ) {

if( !listener )
listener = DEF_ERROR_LISTENER

def params = [:]
final params = [:]
params.inputs = [input]
params.outputs = [output]
params.listeners = [listener]

final op = Dataflow.operator(params, code)
return newOperator0(new OpParams(params), code)
}

static private DataflowProcessor newOperator0(OpParams params, Closure code) {
assert params
assert params.inputs
assert params.listeners

// create the underlying dataflow operator
final op = Dataflow.operator(params.toMap(), Op.instrument(code, params.accumulator))
// track the operator as dag node
NodeMarker.appendOperator(op)
if( session && session.allOperators != null ) {
session.allOperators << op
Expand Down Expand Up @@ -236,14 +309,11 @@ class DataflowHelper {

}

/**
* Subscribe *onNext*, *onError* and *onComplete*
*
* @param source
* @param closure
* @return
*/
static final DataflowProcessor subscribeImpl(final DataflowReadChannel source, final Map<String,Closure> events ) {
subscribeImpl(source, false, events)
}

static final DataflowProcessor subscribeImpl(final DataflowReadChannel source, final boolean accumulator, final Map<String,Closure> events ) {
checkSubscribeHandlers(events)

def error = false
Expand Down Expand Up @@ -276,13 +346,12 @@ class DataflowHelper {
}
}

final params = new OpParams()
.withInput(source)
.withListener(listener)
.withAccumulator(accumulator)

final Map<String, Object> parameters = new HashMap<String, Object>();
parameters.put("inputs", [source])
parameters.put("outputs", [])
parameters.put('listeners', [listener])

newOperator (parameters) {
newOperator (params) {
if( events.onNext ) {
events.onNext.call(it)
}
Expand All @@ -292,7 +361,7 @@ class DataflowHelper {
}
}


@Deprecated
static DataflowProcessor chainImpl(final DataflowReadChannel source, final DataflowWriteChannel target, final Map params, final Closure closure) {

final Map<String, Object> parameters = new HashMap<String, Object>(params)
Expand All @@ -302,6 +371,10 @@ class DataflowHelper {
newOperator(parameters, new ChainWithClosure(closure))
}

static DataflowProcessor chainImpl(OpParams params, final Closure closure) {
newOperator(params, new ChainWithClosure(closure))
}

/**
* Implements the {@code #reduce} operator
*
Expand All @@ -321,7 +394,7 @@ class DataflowHelper {
* call the passed closure each time
*/
void afterRun(final DataflowProcessor processor, final List<Object> messages) {
final item = messages.get(0)
final item = Op.unwrap(messages).get(0)
final value = accum == null ? item : closure.call(accum, item)

if( value == Channel.VOID ) {
Expand All @@ -339,7 +412,7 @@ class DataflowHelper {
* when terminates bind the result value
*/
void afterStop(final DataflowProcessor processor) {
result.bind(accum)
Op.bind(result, accum)
}

boolean onException(final DataflowProcessor processor, final Throwable e) {
Expand All @@ -349,7 +422,12 @@ class DataflowHelper {
}
}

chainImpl(channel, CH.create(), [listeners: [listener]], {true})
final params = new OpParams()
.withInput(channel)
.withOutput(CH.create())
.withListener(listener)
.withAccumulator(true)
chainImpl(params, {true})
}

@PackageScope
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ class GroupTupleOp {
target = CH.create()

/*
* apply the logic the the source channel
* apply the logic to the source channel
*/
DataflowHelper.subscribeImpl(channel, [onNext: this.&collect, onComplete: this.&finalise])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package nextflow.extension


import groovyx.gpars.dataflow.DataflowReadChannel
import groovyx.gpars.dataflow.DataflowWriteChannel
import groovyx.gpars.dataflow.expression.DataflowExpression
Expand Down Expand Up @@ -53,16 +52,16 @@ class MapOp {
final stopOnFirst = source instanceof DataflowExpression
DataflowHelper.newOperator(source, target) { it ->

def result = mapper.call(it)
def proc = (DataflowProcessor) getDelegate()
final result = mapper.call(it)
final proc = (DataflowProcessor) getDelegate()

// bind the result value
if (result != Channel.VOID)
proc.bindOutput(result)
Op.bind(target, result)

// when the `map` operator is applied to a dataflow flow variable
// terminate the processor after the first emission -- Issue #44
if( result == Channel.STOP || stopOnFirst )
if( stopOnFirst )
proc.terminate()

}
Expand Down
Loading

0 comments on commit 494bbb0

Please sign in to comment.