Skip to content

Commit

Permalink
wip! workflows as programs
Browse files Browse the repository at this point in the history
  • Loading branch information
tsibley committed Dec 19, 2024
1 parent a1c1305 commit 5873eb4
Show file tree
Hide file tree
Showing 14 changed files with 812 additions and 42 deletions.
8 changes: 8 additions & 0 deletions doc/config/paths.rst
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,11 @@ If necessary, the defaults can be overridden by environment variables.
File for preserving command history across :doc:`/commands/shell` invocations.

Default is :file:`{${NEXTSTRAIN_HOME}}/shell-history`.

.. envvar:: NEXTSTRAIN_WORKFLOWS

Directory for pathogen workflow data managed by :doc:`/commands/setup`,
e.g. local copies of pathogen repos like `nextstrain/measles
<https://github.com/nextstrain/measles>`__.

Default is :file:`{${NEXTSTRAIN_HOME}}/workflows/`.
2 changes: 2 additions & 0 deletions nextstrain/cli/command/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from . import (
run,
build,
view,
deploy,
Expand All @@ -25,6 +26,7 @@
# in various user interfaces, e.g. `nextstrain --help`.
#
all_commands = [
run,
build,
view,
deploy,
Expand Down
3 changes: 3 additions & 0 deletions nextstrain/cli/command/check_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@
__doc__ = (__doc__ or "").format(default_runner_name = runner_name(default_runner))


# XXX FIXME: this whole file


def register_parser(subparser):
"""
%(prog)s [--set-default] [<runtime> [<runtime> ...]]
Expand Down
175 changes: 175 additions & 0 deletions nextstrain/cli/command/run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
# XXX FIXME command doc
# - names of things <https://github.com/nextstrain/measles/pull/55#discussion_r1876754557>
# - pathogen spec syntax
"""
TKTKTK
"""

from shlex import quote as shquote
from .. import runner
from ..argparse import add_extended_help_flags, DirectoryPath
from ..debug import DEBUGGING
from ..errors import UserError
from ..runner import ambient, conda, docker, singularity
from ..util import byte_quantity
from ..volume import NamedVolume
from ..workflows import PathogenWorkflows
from . import build


def register_parser(subparser):
"""
%(prog)s [options] <pathogen-name> <workflow-name> <analysis-directory> [<target> [<target> [...]]]
%(prog)s --help
"""

parser = subparser.add_parser("run", help = "Run pathogen workflow", add_help = False)

# Support --help and --help-all
add_extended_help_flags(parser)

# XXX TODO: consider if and how to share argument definitions with `build`
# XXX TODO: options for --aws-batch, e.g. --detach, --detach-on-interrupt, --attach, --cancel, etc.
# *OR* maybe only support detached Batch builds and kick the can to `build` for further monitoring/management?

parser.add_argument(
"--force",
help = "Force a rerun of the whole workflow even if everything seems up-to-date.",
action = "store_true")

parser.add_argument(
"--cpus",
help = "Number of CPUs/cores/threads/jobs to utilize at once. "
"Limits containerized (Docker, AWS Batch) workflow runs to this amount. "
"Informs Snakemake's resource scheduler when applicable. "
"Informs the AWS Batch instance size selection. "
"By default, no constraints are placed on how many CPUs are used by a workflow run; "
"workflow runs may use all that are available if they're able to.",
metavar = "<count>",
type = int)

parser.add_argument(
"--memory",
help = "Amount of memory to make available to the workflow run. "
"Units of b, kb, mb, gb, kib, mib, gib are supported. "
"Limits containerized (Docker, AWS Batch) workflow runs to this amount. "
"Informs Snakemake's resource scheduler when applicable. "
"Informs the AWS Batch instance size selection. ",
metavar = "<quantity>",
type = byte_quantity)

# Positional parameters
parser.add_argument(
"pathogen",
help = "Pathogen name. Required.", # XXX FIXME: add details
metavar = "<pathogen-name>")

parser.add_argument(
"workflow",
help = "Workflow name. Required.", # XXX FIXME: add details
metavar = "<workflow-name>")

parser.add_argument(
"analysis_directory",
help = "Analysis directory. Required.", # XXX FIXME: add details
type = DirectoryPath,
metavar = "<analysis-directory>")

parser.add_argument(
"targets",
help = "Output target; a file path produced by the workflow or the name of a workflow rule. Optional.", # XXX FIXME: add details
metavar = "<target>",
nargs = "*")

# Register runner flags and arguments
# XXX TODO: explain why an Ellipsis (...) is intentionally omitted
runner.register_runners(
parser,
runners = [docker, ambient, conda, singularity], # XXX FIXME: aws_batch
exec = ["snakemake"]) # Other default exec args defined below

return parser


def run(opts):
build.assert_overlay_volumes_support(opts)

# Resolve pathogen and workflow names to a local workflow directory.
pathogen = PathogenWorkflows(opts.pathogen)

pathogen_directory = pathogen.path
workflow_directory = pathogen.workflow_path(opts.workflow)

if not pathogen_directory.is_dir():
raise UserError(f"""
No pathogen {opts.pathogen!r} found {f"in {str(pathogen_directory)!r}" if DEBUGGING else "locally"}.
Did you set it up?
Hint: to set it up, run `nextstrain setup {shquote(opts.pathogen)}`.
""")

if not workflow_directory.is_dir():
raise UserError(f"""
No {opts.workflow!r} workflow for pathogen {opts.pathogen!r} found {f"in {str(workflow_directory)!r}" if DEBUGGING else "locally"}.
Maybe you need to update to a newer version of the pathogen?
Hint: to update the pathogen, run `nextstrain update {shquote(pathogen.name)}`.
""")

# The build volume is the pathogen directory (i.e. repo).
# The working volume is the workflow directory within the pathogen directory.
# The analysis volume is the user's analysis directory and will be Snakemake's workdir.
build_volume, working_volume = build.pathogen_volumes(workflow_directory)
analysis_volume = NamedVolume("analysis", opts.analysis_directory)

# for Docker, Singularity, and AWS Batch
opts.volumes.append(build_volume)
opts.volumes.append(analysis_volume)

# Set up Snakemake invocation.
opts.default_exec_args += [
# Useful to see what's going on; see also 08ffc925.
"--printshellcmds",

# In our experience,¹ it's rarely useful to fail on incomplete outputs
# (Snakemake's default behaviour) instead of automatically regenerating
# them.
#
# ¹ <https://discussion.nextstrain.org/t/snakemake-throwing-incompletefilesexception-when-using-forceall/1397/4>
"--rerun-incomplete",

# XXX FIXME: explain
"--rerun-triggers", "code", "input", "mtime", "params", "software-env",

# XXX FIXME: explain
*(["--forceall"]
if opts.force else []),

# Set workdir to the analysis volume.
# XXX FIXME: aws_batch
"--directory=%s" % (
docker.mount_point(analysis_volume)
if opts.__runner__ in {docker, singularity} else
analysis_volume.src.resolve(strict = True)),

# Pass thru appropriate resource options.
#
# Snakemake requires the --cores option as of 5.11, so provide a
# default to insulate our users from this and make Nextstrain builds
# fast-by-default. For more rationale/details, see a similar comment
# in nextstrain/cli/command/build.py.
# -trs, 1 Nov 2024
"--cores=%s" % (opts.cpus or "all"),

# Named MB but is really MiB, so convert our count of bytes to MiB
*(["--resources=mem_mb=%d" % (opts.memory // 1024**2)]
if opts.memory else []),

"--",

*opts.targets,
]

return runner.run(opts, working_volume = working_volume, cpus = opts.cpus, memory = opts.memory)
130 changes: 90 additions & 40 deletions nextstrain/cli/command/setup.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,45 @@
"""
Sets up a Nextstrain runtime for use with `nextstrain build`, `nextstrain
view`, etc.
Sets up a Nextstrain pathogen or runtime for use with `nextstrain run`,
`nextstrain build`, `nextstrain view`, etc.
Only the Conda runtime currently supports automated set up, but this command
may still be used with other runtimes to check an existing (manual) setup and
set the runtime as the default on success.
For pathogens, TKTK
For runtimes, only the Conda runtime currently supports fully-automated set up,
but this command may still be used with other runtimes to check an existing
(manual) setup and set the runtime as the default on success.
Exits with an error code if automated set up fails or if setup checks fail.
"""
# XXX FIXME doc above
from functools import partial
from textwrap import dedent
from textwrap import dedent, indent

from .. import console
from ..argparse import runner_module_argument
from ..util import colored, runner_name, setup_tests_ok, print_setup_tests
from ..errors import UserError
from ..util import colored, runner_module, runner_name, setup_tests_ok, print_setup_tests
from ..types import Options
from ..runner import all_runners_by_name, configured_runner, default_runner # noqa: F401 (it's wrong; we use it in run())
from ..workflows import PathogenWorkflows, pathogen_default_version


heading = partial(colored, "bold")
failure = partial(colored, "red")


def register_parser(subparser):
parser = subparser.add_parser("setup", help = "Set up a runtime")
"""
%(prog)s [--dry-run] [--force] [--set-default] <pathogen|runtime>
%(prog)s --help
"""
parser = subparser.add_parser("setup", help = "Set up a pathogen or runtime")

parser.add_argument(
"runner",
help = "The Nextstrain runtime to set up. "
f"One of {{{', '.join(all_runners_by_name)}}}.",
metavar = "<runtime>",
type = runner_module_argument)
"thing",
help = "The Nextstrain pathogen or runtime to set up. "
f"A runtime is one of {{{', '.join(all_runners_by_name)}}}. "
"A pathogen is either the name of a Nextstrain-maintained pathogen (e.g. measles) or a URL to a ZIP file (e.g. https://github.com/nextstrain/measles/archive/refs/heads/main.zip).",
# XXX FIXME: pathogen@version syntax
metavar = "<pathogen|runtime>")

parser.add_argument(
"--dry-run",
Expand All @@ -41,7 +54,7 @@ def register_parser(subparser):

parser.add_argument(
"--set-default",
help = "Use the runtime as the default if set up is successful.",
help = "Use this pathogen version or runtime as the default if set up is successful.",
action = "store_true")

return parser
Expand All @@ -51,12 +64,42 @@ def register_parser(subparser):
def run(opts: Options) -> int:
global default_runner

heading = partial(colored, "bold")
failure = partial(colored, "red")
try:
kind = "runtime"
thing = runner_module(opts.thing)
nameof = runner_name
default = default_runner

except ValueError as e1:
try:
kind = "pathogen"
thing = PathogenWorkflows(opts.thing, new_setup = True)
nameof = str

# XXX FIXME
if default_version := pathogen_default_version(thing.name, implicit = False):
default = PathogenWorkflows(thing.name, default_version)
else:
default = None

except Exception as e2:
raise UserError(f"""
Unable to set up {opts.thing!r}.
It's not a valid runtime:
{{e1}}
nor pathogen:
{{e2}}
as specified. Double check your spelling and syntax?
""", e1 = indent(str(e1), " "), e2 = indent(str(e2), " "))

# Setup
print(heading(f"Setting up {runner_name(opts.runner)}…"))
setup_ok = opts.runner.setup(dry_run = opts.dry_run, force = opts.force)
print(heading(f"Setting up {nameof(thing)}…"))
setup_ok = thing.setup(dry_run = opts.dry_run, force = opts.force)

if setup_ok is None:
print("Automated set up is not supported, but we'll check for a manual setup.")
Expand All @@ -70,7 +113,7 @@ def run(opts: Options) -> int:
print(heading(f"Checking setup…"))

if not opts.dry_run:
tests = opts.runner.test_setup()
tests = thing.test_setup()

print_setup_tests(tests)

Expand All @@ -83,33 +126,40 @@ def run(opts: Options) -> int:

# Optionally set as default
if opts.set_default:
default_runner = opts.runner
default = thing
print()
print("Setting default runtime to %s." % runner_name(default_runner))
print(f"Setting {kind} default to {nameof(default)}.")

if not opts.dry_run:
default_runner.set_default_config()
default.set_default_config()

# Warn if this isn't the default runner.
if default_runner is not opts.runner:
print()
if not configured_runner:
print(f"Warning: No default runtime is configured so {runner_name(default_runner)} will be used.")
else:
print(f"Note that your default runtime is still {runner_name(default_runner)}.")
print()
print(dedent(f"""\
You can use {runner_name(opts.runner)} on an ad-hoc basis with commands like `nextstrain build`,
`nextstrain view`, etc. by passing them the --{runner_name(opts.runner)} option, e.g.:
# Warn if this isn't the default
if default != thing:
if kind == "runtime":
print()
if not configured_runner:
print(f"Warning: No default runtime is configured so {runner_name(default_runner)} will be used.")
else:
print(f"Note that your default runtime is still {runner_name(default_runner)}.")
print()
print(dedent(f"""\
You can use {runner_name(opts.runner)} on an ad-hoc basis with commands
like `nextstrain run`, `nextstrain build`, `nextstrain view`, etc. by
passing them the --{runner_name(opts.runner)} option, e.g.:
nextstrain build --{runner_name(opts.runner)}
nextstrain build --{runner_name(opts.runner)}
If you want to use {runner_name(opts.runner)} by default instead, re-run
this command with the --set-default option, e.g.:
If you want to use {runner_name(opts.runner)} by default instead, re-run this
command with the --set-default option, e.g.:
nextstrain setup --set-default {runner_name(opts.runner)}\
"""))

nextstrain setup --set-default {runner_name(opts.runner)}\
"""))
elif kind == "pathogen":
# XXX FIXME
...


print()
print("All good! Set up of", runner_name(opts.runner), "complete.")
print("All good! Set up of", nameof(thing), "complete.")
return 0
Loading

0 comments on commit 5873eb4

Please sign in to comment.