Skip to content

Commit

Permalink
Support for Fusion unprivileged execution (#4387)
Browse files Browse the repository at this point in the history
This commit allows disabling the privileged container execution
when using the fusion file system. This can be controlled by using the following 
configuration setting: 

```
 fusion.privileged = true|false // default true
``` 

The effective use of this setting depends on the target execution. 
Currently, it's only supported by the Kubernetes executor which
requires the use the k8s-fuse-plugin to be installed in the target cluster 

More details here https://github.com/nextflow-io/k8s-fuse-plugin



Signed-off-by: Paolo Di Tommaso <[email protected]>
  • Loading branch information
pditommaso authored Oct 10, 2023
1 parent 73fda58 commit 035e6e7
Show file tree
Hide file tree
Showing 9 changed files with 165 additions and 6 deletions.
8 changes: 8 additions & 0 deletions docs/fusion.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,14 @@ The following configuration options are available:
`fusion.logOutput`
: Where the logging output is written.

`fusion.privileged`
: :::{versionadded} 23.10.0
:::
: This allows disabling the privileged container execution when using the Fusion file system.
The effective use of this setting depends on the target execution. Currently, it's only supported by the Kubernetes
executor which requires the use the [k8s-fuse-plugin](https://github.com/nextflow-io/k8s-fuse-plugin) to be installed
in the target cluster (default: `true`).

`fusion.tagsEnabled`
: Enable/disable the tagging of files created in the underlying object storage via the Fusion client (default: `true`).

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ trait FusionAwareTask {
return fusionEnabled
}

FusionConfig fusionConfig() {
return FusionConfig.getConfig()
}

FusionScriptLauncher fusionLauncher() {
if( fusionLauncher==null ) {
fusionLauncher = fusionEnabled()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
package nextflow.fusion

import groovy.transform.CompileStatic
import groovy.transform.Memoized
import nextflow.Global
import nextflow.SysEnv
/**
* Model Fusion config options
*
Expand All @@ -40,6 +43,7 @@ class FusionConfig {
final private String logLevel
final private boolean tagsEnabled
final private String tagsPattern
final private boolean privileged

boolean enabled() { enabled }

Expand All @@ -63,6 +67,10 @@ class FusionConfig {
this.containerConfigUrl ? new URL(this.containerConfigUrl) : null
}

boolean privileged() {
return privileged
}

FusionConfig(Map opts, Map<String,String> env=System.getenv()) {
this.enabled = opts.enabled
this.exportAwsAccessKeys = opts.exportAwsAccessKeys
Expand All @@ -72,6 +80,7 @@ class FusionConfig {
this.logOutput = opts.logOutput
this.tagsEnabled = opts.tags==null || opts.tags.toString()!='false'
this.tagsPattern = (opts.tags==null || (opts.tags instanceof Boolean && opts.tags)) ? DEFAULT_TAGS : ( opts.tags !instanceof Boolean ? opts.tags as String : null )
this.privileged = opts.privileged==null || opts.privileged.toString()=='true'
if( containerConfigUrl && !validProtocol(containerConfigUrl))
throw new IllegalArgumentException("Fusion container config URL should start with 'http:' or 'https:' protocol prefix - offending value: $containerConfigUrl")
}
Expand All @@ -80,4 +89,12 @@ class FusionConfig {
url.startsWith('http://') || url.startsWith('https://') || url.startsWith('file:/')
}

static FusionConfig getConfig() {
return createConfig0(Global.config?.fusion as Map ?: Collections.emptyMap(), SysEnv.get())
}

@Memoized
static private FusionConfig createConfig0(Map config, Map env) {
new FusionConfig(config, env)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
package nextflow.fusion


import nextflow.Global
import nextflow.SysEnv
import nextflow.plugin.Plugins
/**
* Provider strategy for {@link FusionEnv}
Expand All @@ -29,7 +27,7 @@ import nextflow.plugin.Plugins
class FusionEnvProvider {

Map<String,String> getEnvironment(String scheme) {
final config = new FusionConfig(Global.config?.fusion as Map ?: Collections.emptyMap(), SysEnv.get())
final config = FusionConfig.getConfig()
final list = Plugins.getExtensions(FusionEnv)
final result = new HashMap<String,String>()
for( FusionEnv it : list ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,11 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask {
}

if ( fusionEnabled() ) {
builder.withPrivileged(true)
if( fusionConfig().privileged() )
builder.withPrivileged(true)
else {
builder.withResourcesLimits(["nextflow.io/fuse": 1])
}

final env = fusionLauncher().fusionEnv()
for( Map.Entry<String,String> it : env )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ class PodSpecBuilder {

List<String> devices

Map<String,?> resourcesLimits

/**
* @return A sequential volume unique identifier
*/
Expand Down Expand Up @@ -316,6 +318,11 @@ class PodSpecBuilder {
return this
}

PodSpecBuilder withResourcesLimits(Map<String,?> limits) {
this.resourcesLimits = limits
return this
}

PodSpecBuilder withPodOptions(PodOptions opts) {
// -- pull policy
if( opts.imagePullPolicy )
Expand Down Expand Up @@ -495,6 +502,10 @@ class PodSpecBuilder {
container.resources = addDiskResources(this.disk, container.resources as Map)
}

if( this.resourcesLimits ) {
container.resources = addResourcesLimits(this.resourcesLimits, container.resources as Map)
}

// add storage definitions ie. volumes and mounts
final List<Map> mounts = []
final List<Map> volumes = []
Expand Down Expand Up @@ -578,6 +589,18 @@ class PodSpecBuilder {
]
}

@PackageScope
Map addResourcesLimits(Map limits, Map result) {
if( result == null )
result = new LinkedHashMap(10)

final limits0 = result.limits as Map ?: new LinkedHashMap(10)
limits0.putAll( limits )
result.limits = limits0

return result
}

@PackageScope
Map addCpuResources(Integer cpus, Map res) {
if( res == null )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,17 @@ class FusionConfigTest extends Specification {
[tags:'[*.txt](x=1)'] | true | '[*.txt](x=1)'

}

def 'should check privileged flag' () {
given:
def opts = new FusionConfig(OPTS)
expect:
opts.privileged() == EXPECTED

where:
OPTS | EXPECTED
[:] | true
[privileged:true] | true
[privileged:false] | false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ import java.nio.file.Paths
import nextflow.Session
import nextflow.SysEnv
import nextflow.exception.NodeTerminationException
import nextflow.fusion.FusionScriptLauncher
import nextflow.file.http.XPath
import nextflow.fusion.FusionConfig
import nextflow.fusion.FusionScriptLauncher
import nextflow.k8s.client.ClientConfig
import nextflow.k8s.client.K8sClient
import nextflow.k8s.client.K8sResponseException
Expand Down Expand Up @@ -882,7 +883,7 @@ class K8sTaskHandlerTest extends Specification {
handler.completeTimeMillis == 20
}

def 'should create a fusion pod' () {
def 'should create a fusion privileged pod' () {
given:
def WORK_DIR = XPath.get('http://some/work/dir')
def config = Mock(TaskConfig)
Expand Down Expand Up @@ -925,6 +926,51 @@ class K8sTaskHandlerTest extends Specification {
result.spec.containers[0].env == [[name:'FUSION_BUCKETS', value:'this,that']]
}

def 'should create a fusion unprivileged pod' () {
given:
def WORK_DIR = XPath.get('http://some/work/dir')
def config = Mock(TaskConfig)
def task = Mock(TaskRun)
def client = Mock(K8sClient)
def builder = Mock(K8sWrapperBuilder)
def launcher = Mock(FusionScriptLauncher)
def handler = Spy(new K8sTaskHandler(builder:builder, client: client))
Map result

when:
result = handler.newSubmitRequest(task)
then:
launcher.fusionEnv() >> [FUSION_BUCKETS: 'this,that']
launcher.toContainerMount(WORK_DIR.resolve('.command.run')) >> Path.of('/fusion/http/work/dir/.command.run')
launcher.fusionSubmitCli(task) >> ['/usr/bin/fusion', 'bash', '/fusion/http/work/dir/.command.run']
and:
handler.getTask() >> task
handler.fusionEnabled() >> true
handler.fusionLauncher() >> launcher
handler.fusionConfig() >> new FusionConfig(privileged: false)
and:
task.getContainer() >> 'debian:latest'
task.getWorkDir() >> WORK_DIR
task.getConfig() >> config
and:
1 * handler.fixOwnership() >> false
1 * handler.entrypointOverride() >> false
1 * handler.getPodOptions() >> new PodOptions()
1 * handler.getSyntheticPodName(task) >> 'nf-123'
1 * handler.getLabels(task) >> [:]
1 * handler.getAnnotations() >> [:]
1 * handler.getContainerMounts() >> []
and:
1 * config.getCpus() >> 0
1 * config.getMemory() >> null
1 * client.getConfig() >> new ClientConfig()
and:
result.spec.containers[0].args == ['/usr/bin/fusion', 'bash', '/fusion/http/work/dir/.command.run']
result.spec.containers[0].env == [[name:'FUSION_BUCKETS', value:'this,that']]
result.spec.containers[0].resources == [limits:['nextflow.io/fuse':1]]
!result.spec.containers[0].securityContext
}

def 'get fusion submit command' () {
given:
def handler = Spy(K8sTaskHandler) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package nextflow.k8s.model

import nextflow.executor.res.AcceleratorResource
import nextflow.util.MemoryUnit
import spock.lang.Specification
import spock.lang.Unroll
/**
Expand Down Expand Up @@ -118,6 +119,35 @@ class PodSpecBuilderTest extends Specification {

}

def 'should create pod with resources limits' () {
when:
def pod1 = new PodSpecBuilder()
.withPodName('foo')
.withImageName('busybox')
.withCommand('echo foo')
.withResourcesLimits('nextflow.io/fuse': 1)
.build()

then:
pod1.spec.containers[0].resources == [limits:['nextflow.io/fuse':1]]


when:
def pod2 = new PodSpecBuilder()
.withPodName('foo')
.withImageName('busybox')
.withCommand('echo foo')
.withCpus(8)
.withMemory(MemoryUnit.of('10GB'))
.withResourcesLimits('nextflow.io/fuse': 1)
.build()

then:
pod2.spec.containers[0].resources == [
requests: ['cpu':8, 'memory':'10240Mi'],
limits: [memory:'10240Mi', 'nextflow.io/fuse':1] ]
}

def 'should set namespace, labels and annotations' () {

when:
Expand Down Expand Up @@ -627,6 +657,22 @@ class PodSpecBuilderTest extends Specification {
res.limits == ['example.com/fpga': 10]
}

def 'should add resources limits' () {
given:
def builder = new PodSpecBuilder()
Map resources

when:
resources = builder.addResourcesLimits(['foo':1], null)
then:
resources == [limits:[foo:1]]

when:
resources = builder.addResourcesLimits(['foo':1], [requests: ['x':1], limits: ['y': 2]])
then:
resources == [requests:[x:1], limits:[y:2, foo:1]]
}


@Unroll
def 'should sanitize k8s label value: #label' () {
Expand Down

0 comments on commit 035e6e7

Please sign in to comment.