Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generalise submission and cancellation arguments #641

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions dask_jobqueue/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@
Additional arguments to pass to `dask-worker`
env_extra : list
Deprecated: use ``job_script_prologue`` instead. This parameter will be removed in a future version.
submit_command_extra : list
Extra arguments to pass to the job scheduler submit command
cancel_command_extra : list
Extra arguments to pass to the job scheduler cancel command
job_script_prologue : list
Other commands to add to script before launching worker.
header_skip : list
Expand Down Expand Up @@ -172,6 +176,8 @@ def __init__(
job_extra=None,
job_extra_directives=None,
env_extra=None,
submit_command_extra=None,
cancel_command_extra=None,
job_script_prologue=None,
header_skip=None,
job_directives_skip=None,
Expand Down Expand Up @@ -270,6 +276,29 @@ def __init__(

if env_extra is None:
env_extra = dask.config.get("jobqueue.%s.env-extra" % self.config_name)

if self.submit_command_extra is None:
self.submit_command_extra = dask.config.get(
"jobqueue.%s.submit-command-extra" % self.config_name, []
)

self.submit_command = (
Job.submit_command
+ " "
+ " ".join(shlex.quote(arg) for arg in self.submit_command_extra)
)

if self.cancel_command_extra is None:
self.cancel_command_extra = dask.config.get(
"jobqueue.%s.cancel-command-extra" % self.config_name, []
)

self.cancel_command = (
Job.cancel_command
+ " "
+ " ".join(shlex.quote(arg) for arg in self.cancel_command_extra)
)

if job_script_prologue is None:
job_script_prologue = dask.config.get(
"jobqueue.%s.job-script-prologue" % self.config_name
Expand Down
20 changes: 7 additions & 13 deletions dask_jobqueue/htcondor.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ def __init__(
name=None,
disk=None,
config_name=None,
submit_command_extra=None,
cancel_command_extra=None,
**base_class_kwargs
):
super().__init__(
Expand Down Expand Up @@ -95,26 +93,26 @@ def __init__(
if self.job_extra_directives:
self.job_header_dict.update(self.job_extra_directives)

if submit_command_extra is None:
submit_command_extra = dask.config.get(
if self.submit_command_extra is None:
self.submit_command_extra = dask.config.get(
"jobqueue.%s.submit-command-extra" % self.config_name, []
)

self.submit_command = (
HTCondorJob.submit_command
HTCondorJob.submit_commsand
+ " "
+ " ".join(shlex.quote(arg) for arg in submit_command_extra)
+ " ".join(shlex.quote(arg) for arg in self.submit_command_extra)
)

if cancel_command_extra is None:
cancel_command_extra = dask.config.get(
if self.cancel_command_extra is None:
self.cancel_command_extra = dask.config.get(
"jobqueue.%s.cancel-command-extra" % self.config_name, []
)

self.cancel_command = (
HTCondorJob.cancel_command
+ " "
+ " ".join(shlex.quote(arg) for arg in cancel_command_extra)
+ " ".join(shlex.quote(arg) for arg in self.cancel_command_extra)
)

def job_script(self):
Expand Down Expand Up @@ -227,10 +225,6 @@ class HTCondorCluster(JobQueueCluster):
job_extra_directives : dict
Extra submit file attributes for the job as key-value pairs.
They will be inserted as ``key = value``.
submit_command_extra : list of str
Extra arguments to pass to condor_submit
cancel_command_extra : list of str
Extra arguments to pass to condor_rm
{job}
{cluster}

Expand Down
2 changes: 2 additions & 0 deletions dask_jobqueue/jobqueue.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ jobqueue:
account: null
walltime: '00:30:00'
env-extra: null
submit-command-extra: [] # Extra sbatch arguments
cancel-command-extra: ["--signal=SIGTERM"] # Extra scancel arguments
job-script-prologue: []
job-cpu: null
job-mem: null
Expand Down
23 changes: 23 additions & 0 deletions dask_jobqueue/lsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import math
import os
import re
import shlex
import subprocess
import toolz

Expand Down Expand Up @@ -54,6 +55,28 @@ def __init__(
use_stdin = dask.config.get("jobqueue.%s.use-stdin" % self.config_name)
self.use_stdin = use_stdin

if self.submit_command_extra is None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is all this code both in the Core class and the class inheriting it?

self.submit_command_extra = dask.config.get(
"jobqueue.%s.submit-command-extra" % self.config_name, []
)

self.submit_command = (
LSFJob.submit_command
+ " "
+ " ".join(shlex.quote(arg) for arg in self.submit_command_extra)
)

if self.cancel_command_extra is None:
self.cancel_command_extra = dask.config.get(
"jobqueue.%s.cancel-command-extra" % self.config_name, []
)

self.cancel_command = (
LSFJob.cancel_command
+ " "
+ " ".join(shlex.quote(arg) for arg in self.cancel_command_extra)
)

header_lines = []
# LSF header build
if self.name is not None:
Expand Down
22 changes: 22 additions & 0 deletions dask_jobqueue/oar.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,28 @@ def __init__(
if walltime is None:
walltime = dask.config.get("jobqueue.%s.walltime" % self.config_name)

if self.submit_command_extra is None:
self.submit_command_extra = dask.config.get(
"jobqueue.%s.submit-command-extra" % self.config_name, []
)

self.submit_command = (
OARJob.submit_command
+ " "
+ " ".join(shlex.quote(arg) for arg in self.submit_command_extra)
)

if self.cancel_command_extra is None:
self.cancel_command_extra = dask.config.get(
"jobqueue.%s.cancel-command-extra" % self.config_name, []
)

self.cancel_command = (
OARJob.cancel_command
+ " "
+ " ".join(shlex.quote(arg) for arg in self.cancel_command_extra)
)

header_lines = []
if self.job_name is not None:
header_lines.append("#OAR -n %s" % self.job_name)
Expand Down
23 changes: 23 additions & 0 deletions dask_jobqueue/pbs.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import math
import os
import shlex
import warnings

import dask
Expand Down Expand Up @@ -83,6 +84,28 @@ def __init__(
if not account:
account = project

if self.submit_command_extra is None:
self.submit_command_extra = dask.config.get(
"jobqueue.%s.submit-command-extra" % self.config_name, []
)

self.submit_command = (
PBSJob.submit_command
+ " "
+ " ".join(shlex.quote(arg) for arg in self.submit_command_extra)
)

if self.cancel_command_extra is None:
self.cancel_command_extra = dask.config.get(
"jobqueue.%s.cancel-command-extra" % self.config_name, []
)

self.cancel_command = (
PBSJob.cancel_command
+ " "
+ " ".join(shlex.quote(arg) for arg in self.cancel_command_extra)
)

header_lines = []
# PBS header build
if self.job_name is not None:
Expand Down
23 changes: 23 additions & 0 deletions dask_jobqueue/sge.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import shlex

import dask

Expand Down Expand Up @@ -38,6 +39,28 @@ def __init__(
if walltime is None:
walltime = dask.config.get("jobqueue.%s.walltime" % self.config_name)

if self.submit_command_extra is None:
self.submit_command_extra = dask.config.get(
"jobqueue.%s.submit-command-extra" % self.config_name, []
)

self.submit_command = (
SGEJob.submit_command
+ " "
+ " ".join(shlex.quote(arg) for arg in self.submit_command_extra)
)

if self.cancel_command_extra is None:
self.cancel_command_extra = dask.config.get(
"jobqueue.%s.cancel-command-extra" % self.config_name, []
)

self.cancel_command = (
SGEJob.cancel_command
+ " "
+ " ".join(shlex.quote(arg) for arg in self.cancel_command_extra)
)

header_lines = []
if self.job_name is not None:
header_lines.append("#$ -N %s" % self.job_name)
Expand Down
24 changes: 24 additions & 0 deletions dask_jobqueue/slurm.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import math
import shlex
import warnings

import dask
Expand All @@ -26,6 +27,7 @@ def __init__(
job_cpu=None,
job_mem=None,
config_name=None,
cancel_command_extra=["--signal=SIGTERM"],
**base_class_kwargs
):
super().__init__(
Expand Down Expand Up @@ -57,6 +59,28 @@ def __init__(
if job_mem is None:
job_mem = dask.config.get("jobqueue.%s.job-mem" % self.config_name)

if self.submit_command_extra is None:
self.submit_command_extra = dask.config.get(
"jobqueue.%s.submit-command-extra" % self.config_name, []
)

self.submit_command = (
SLURMJob.submit_command
+ " "
+ " ".join(shlex.quote(arg) for arg in self.submit_command_extra)
)

if cancel_command_extra is None:
cancel_command_extra = dask.config.get(
"jobqueue.%s.cancel-command-extra" % self.config_name, []
)

self.cancel_command = (
SLURMJob.cancel_command
+ " "
+ " ".join(shlex.quote(arg) for arg in cancel_command_extra)
)

header_lines = []
# SLURM header build
if self.job_name is not None:
Expand Down
Loading