From 590712a5264492518e5b601ad901d03dfe572601 Mon Sep 17 00:00:00 2001 From: Ben Sherman Date: Tue, 7 Jan 2025 12:58:53 -0600 Subject: [PATCH 1/2] Support `disk` directive for local executor Signed-off-by: Ben Sherman --- docs/executor.md | 1 + docs/reference/config.md | 5 ++ .../processor/LocalPollingMonitor.groovy | 73 ++++++++++++++++--- 3 files changed, 69 insertions(+), 10 deletions(-) diff --git a/docs/executor.md b/docs/executor.md index ec77c89254..ef5caa87b7 100644 --- a/docs/executor.md +++ b/docs/executor.md @@ -255,6 +255,7 @@ The `local` executor is useful for developing and testing a pipeline script on y Resource requests and other job characteristics can be controlled via the following process directives: - {ref}`process-cpus` +- {ref}`process-disk` - {ref}`process-memory` - {ref}`process-time` - {ref}`process-container` diff --git a/docs/reference/config.md b/docs/reference/config.md index 0efb1ce6ae..a68a31848b 100644 --- a/docs/reference/config.md +++ b/docs/reference/config.md @@ -630,6 +630,11 @@ The following settings are available: `executor.cpus` : The maximum number of CPUs made available by the underlying system. Used only by the `local` executor. +`executor.disk` +: :::{versionadded} 24.12.0-edge + ::: +: The maximum amount of disk made available by the underlying system. Used only by the `local` executor. + `executor.dumpInterval` : Determines how often to log the executor status (default: `5min`). diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/LocalPollingMonitor.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/LocalPollingMonitor.groovy index d9dbda638a..60e9e01df3 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/LocalPollingMonitor.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/LocalPollingMonitor.groovy @@ -15,7 +15,9 @@ */ package nextflow.processor + import java.lang.management.ManagementFactory +import java.nio.file.FileSystems import com.sun.management.OperatingSystemMXBean import groovy.transform.CompileStatic @@ -58,6 +60,16 @@ class LocalPollingMonitor extends TaskPollingMonitor { */ private final long maxMemory + /** + * Amount of `free` disk available to execute pending tasks + */ + private long availDisk + + /** + * Total amount of disk available in the system + */ + private final long maxDisk + /** * Create the task polling monitor with the provided named parameters object. *

@@ -74,8 +86,9 @@ class LocalPollingMonitor extends TaskPollingMonitor { super(params) this.availCpus = maxCpus = params.cpus as int this.availMemory = maxMemory = params.memory as long - assert availCpus>0, "Local avail `cpus` attribute cannot be zero" - assert availMemory>0, "Local avail `memory` attribute cannot zero" + this.availDisk = maxDisk = params.disk as long + assert availCpus>0, "Local available `cpus` attribute cannot be zero" + assert availMemory>0, "Local available `memory` attribute cannot be zero" } /** @@ -98,14 +111,16 @@ class LocalPollingMonitor extends TaskPollingMonitor { final int cpus = configCpus(session,name) final long memory = configMem(session,name) + final long disk = configDisk(session,name) final int size = session.getQueueSize(name, OS.getAvailableProcessors()) - log.debug "Creating local task monitor for executor '$name' > cpus=$cpus; memory=${new MemoryUnit(memory)}; capacity=$size; pollInterval=$pollInterval; dumpInterval=$dumpInterval" + log.debug "Creating local task monitor for executor '$name' > cpus=$cpus; memory=${MemoryUnit.of(memory)}; disk=${MemoryUnit.of(disk)}; capacity=$size; pollInterval=$pollInterval; dumpInterval=$dumpInterval" new LocalPollingMonitor( name: name, cpus: cpus, memory: memory, + disk: disk, session: session, capacity: size, pollInterval: pollInterval, @@ -128,6 +143,15 @@ class LocalPollingMonitor extends TaskPollingMonitor { (session.getExecConfigProp(name, 'memory', OS.getTotalPhysicalMemorySize()) as MemoryUnit).toBytes() } + @PackageScope + static long configDisk(Session session, String name) { + if( session.workDir.fileSystem != FileSystems.default ) { + log.debug "Local executor is using a remote work directory -- task disk requirements will be ignored" + return 0 + } + (session.getExecConfigProp(name, 'disk', session.workDir.toFile().getUsableSpace()) as MemoryUnit).toBytes() + } + /** * @param handler * A {@link TaskHandler} instance @@ -149,6 +173,17 @@ class LocalPollingMonitor extends TaskPollingMonitor { handler.task.getConfig()?.getMemory()?.toBytes() ?: 1L } + /** + * + * @param handler + * A {@link TaskHandler} instance + * @return + * The amount of disk (bytes) requested to execute the specified task + */ + private static long disk(TaskHandler handler) { + handler.task.getConfig()?.getDisk()?.toBytes() ?: 1L + } + /** * Determines if a task can be submitted for execution checking if the resources required * (cpus and memory) match the amount of avail resource @@ -167,20 +202,34 @@ class LocalPollingMonitor extends TaskPollingMonitor { protected boolean canSubmit(TaskHandler handler) { final taskCpus = cpus(handler) - if( taskCpus>maxCpus ) - throw new ProcessUnrecoverableException("Process requirement exceeds available CPUs -- req: $taskCpus; avail: $maxCpus") + if( taskCpus > maxCpus ) + throw new ProcessUnrecoverableException("Task requirement exceeds available CPUs -- req: $taskCpus; avail: $maxCpus") final taskMemory = mem(handler) - if( taskMemory>maxMemory) - throw new ProcessUnrecoverableException("Process requirement exceeds available memory -- req: ${new MemoryUnit(taskMemory)}; avail: ${new MemoryUnit(maxMemory)}") + if( taskMemory > maxMemory ) + throw new ProcessUnrecoverableException("Task requirement exceeds available memory -- req: ${MemoryUnit.of(taskMemory)}; avail: ${MemoryUnit.of(maxMemory)}") + + final taskDisk = disk(handler) + if( isDiskEnabled() && taskDisk > maxDisk ) + throw new ProcessUnrecoverableException("Task requirement exceeds available disk -- req: ${MemoryUnit.of(taskDisk)}; avail: ${MemoryUnit.of(maxDisk)}") - final result = super.canSubmit(handler) && taskCpus <= availCpus && taskMemory <= availMemory - if( !result && log.isTraceEnabled( ) ) { - log.trace "Task `${handler.task.name}` cannot be scheduled -- taskCpus: $taskCpus <= availCpus: $availCpus && taskMemory: ${new MemoryUnit(taskMemory)} <= availMemory: ${new MemoryUnit(availMemory)}" + final result = super.canSubmit(handler) && taskCpus <= availCpus && taskMemory <= availMemory && (maxDisk == 0 || taskDisk <= availDisk) + if( !result && log.isTraceEnabled() ) { + def message = "Task `${handler.task.name}` cannot be scheduled -- taskCpus: $taskCpus <= availCpus: $availCpus && taskMemory: ${MemoryUnit.of(taskMemory)} <= availMemory: ${MemoryUnit.of(availMemory)}" + if( isDiskEnabled() ) + message += " && taskDisk: ${MemoryUnit.of(taskDisk)} <= availDisk: ${MemoryUnit.of(availDisk)}" + log.trace message } return result } + /** + * Determine whether task disk requirements should be enforced. + */ + protected boolean isDiskEnabled() { + return maxDisk > 0 + } + /** * Submits a task for execution allocating the resources (cpus and memory) * requested by the task @@ -192,6 +241,8 @@ class LocalPollingMonitor extends TaskPollingMonitor { super.submit(handler) availCpus -= cpus(handler) availMemory -= mem(handler) + if( isDiskEnabled() ) + availDisk -= disk(handler) } /** @@ -209,6 +260,8 @@ class LocalPollingMonitor extends TaskPollingMonitor { if( result ) { availCpus += cpus(handler) availMemory += mem(handler) + if( isDiskEnabled() ) + availDisk += disk(handler) } return result } From 4fad9ee0f9eb8cf538288307aaafe0fb2695b32e Mon Sep 17 00:00:00 2001 From: Ben Sherman Date: Wed, 8 Jan 2025 14:44:55 -0600 Subject: [PATCH 2/2] Update test Signed-off-by: Ben Sherman --- .../processor/LocalPollingMonitorTest.groovy | 67 +++++++++++++++++-- 1 file changed, 62 insertions(+), 5 deletions(-) diff --git a/modules/nextflow/src/test/groovy/nextflow/processor/LocalPollingMonitorTest.groovy b/modules/nextflow/src/test/groovy/nextflow/processor/LocalPollingMonitorTest.groovy index e6f6150ce5..ce6cd8698b 100644 --- a/modules/nextflow/src/test/groovy/nextflow/processor/LocalPollingMonitorTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/processor/LocalPollingMonitorTest.groovy @@ -38,13 +38,14 @@ class LocalPollingMonitorTest extends Specification { cpus: 10, capacity: 20, memory: _20_GB, + disk: _20_GB, session: session, name: 'local', pollInterval: 100 ) def task = new TaskRun() - task.config = new TaskConfig(cpus: 3, memory: MemoryUnit.of('2GB')) + task.config = new TaskConfig(cpus: 3, memory: MemoryUnit.of('2GB'), disk: MemoryUnit.of('10GB')) def handler = Mock(TaskHandler) handler.getTask() >> { task } @@ -52,8 +53,10 @@ class LocalPollingMonitorTest extends Specification { monitor.availCpus == 10 monitor.capacity == 20 monitor.availMemory == _20_GB + monitor.availDisk == _20_GB monitor.maxCpus == 10 monitor.maxMemory == _20_GB + monitor.maxDisk == _20_GB when: monitor.submit(handler) @@ -62,8 +65,10 @@ class LocalPollingMonitorTest extends Specification { monitor.getRunningQueue().size()==1 monitor.availCpus == 7 monitor.availMemory == MemoryUnit.of('18GB').toBytes() + monitor.availDisk == MemoryUnit.of('10GB').toBytes() monitor.maxCpus == 10 monitor.maxMemory == _20_GB + monitor.maxDisk == _20_GB when: monitor.remove(handler) @@ -71,8 +76,10 @@ class LocalPollingMonitorTest extends Specification { monitor.getRunningQueue().size()==0 monitor.availCpus == 10 monitor.availMemory == _20_GB + monitor.availDisk == _20_GB monitor.maxCpus == 10 monitor.maxMemory == _20_GB + monitor.maxDisk == _20_GB } @@ -86,18 +93,18 @@ class LocalPollingMonitorTest extends Specification { cpus: 10, capacity: 10, memory: _20_GB, + disk: _20_GB, session: session, name: 'local', pollInterval: 100 ) def task = new TaskRun() - task.config = new TaskConfig(cpus: 4, memory: MemoryUnit.of('8GB')) + task.config = new TaskConfig(cpus: 4, memory: MemoryUnit.of('8GB'), disk: MemoryUnit.of('10GB')) def handler = Mock(TaskHandler) handler.getTask() >> { task } handler.canForkProcess() >> true handler.isReady() >> true - expect: monitor.canSubmit(handler) == true @@ -110,6 +117,7 @@ class LocalPollingMonitorTest extends Specification { monitor.canSubmit(handler) == true monitor.availCpus == 6 monitor.availMemory == MemoryUnit.of('12GB').toBytes() + monitor.availDisk == MemoryUnit.of('10GB').toBytes() when: monitor.submit(handler) @@ -120,6 +128,7 @@ class LocalPollingMonitorTest extends Specification { monitor.canSubmit(handler) == false monitor.availCpus == 2 monitor.availMemory == MemoryUnit.of('4GB').toBytes() + monitor.availDisk == 0 } @@ -132,6 +141,7 @@ class LocalPollingMonitorTest extends Specification { cpus: 1, capacity: 1, memory: _20_GB, + disk: 0, session: session, name: 'local', pollInterval: 100 @@ -167,6 +177,7 @@ class LocalPollingMonitorTest extends Specification { cpus: 10, capacity: 20, memory: _20_GB, + disk: 0, session: session, name: 'local', pollInterval: 100 @@ -181,7 +192,7 @@ class LocalPollingMonitorTest extends Specification { monitor.canSubmit(handler) then: def e1 = thrown(ProcessUnrecoverableException) - e1.message == 'Process requirement exceeds available CPUs -- req: 12; avail: 10' + e1.message == 'Task requirement exceeds available CPUs -- req: 12; avail: 10' } @@ -195,6 +206,7 @@ class LocalPollingMonitorTest extends Specification { cpus: 10, capacity: 20, memory: _20_GB, + disk: 0, session: session, name: 'local', pollInterval: 100 @@ -209,7 +221,35 @@ class LocalPollingMonitorTest extends Specification { monitor.canSubmit(handler) then: def e2 = thrown(ProcessUnrecoverableException) - e2.message == 'Process requirement exceeds available memory -- req: 22 GB; avail: 20 GB' + e2.message == 'Task requirement exceeds available memory -- req: 22 GB; avail: 20 GB' + + } + + def 'should throw an exception for missing disk' () { + + given: + def _20_GB = MemoryUnit.of('20GB').toBytes() + def session = new Session() + def monitor = new LocalPollingMonitor( + cpus: 10, + capacity: 20, + memory: _20_GB, + disk: _20_GB, + session: session, + name: 'local', + pollInterval: 100 + ) + + def task = new TaskRun() + task.config = new TaskConfig(disk: MemoryUnit.of('22GB')) + def handler = Mock(TaskHandler) + handler.getTask() >> { task } + + when: + monitor.canSubmit(handler) + then: + def e2 = thrown(ProcessUnrecoverableException) + e2.message == 'Task requirement exceeds available disk -- req: 22 GB; avail: 20 GB' } @@ -267,4 +307,21 @@ class LocalPollingMonitorTest extends Specification { LocalPollingMonitor.configMem(session4,'local') == OS.getTotalPhysicalMemorySize() } + + def 'should get the amount of disk' () { + + given: + def _10_GB = MemoryUnit.of('10 GB').toBytes() + + when: + def session2 = new Session([executor: [disk: '10 GB']]) + then: + LocalPollingMonitor.configDisk(session2,'local') == _10_GB + + when: + def session3 = new Session([executor: ['$local': [disk: _10_GB]]]) + then: + LocalPollingMonitor.configDisk(session3,'local') == _10_GB + + } }