Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch tasks using Dask in Argo #120

Merged
merged 16 commits into from
Jan 6, 2023
Merged

Batch tasks using Dask in Argo #120

merged 16 commits into from
Jan 6, 2023

Conversation

jpolchlo
Copy link
Collaborator

@jpolchlo jpolchlo commented Jan 3, 2023

Overview

This PR provides some infrastructure for running Dask jobs independent of Jupyter. This enables long-running jobs that would be cumbersome to remain logged into Jupyter for. It also opens up a workflow based on standard python scripts, rather than Jupyter notebooks. I've been using a standard form for my scripts so that the cluster can be configured via the Argo job submission interface:

import logging
import dask_gateway

logger = logging.getLogger("DaskWorkflow")
gw = dask_gateway.Gateway(auth="jupyterhub")

try:
    opts = gw.cluster_options()
    opts.worker_memory = int(os.environ['DASK_OPTS__WORKER_MEMORY'])
    opts.worker_cores = int(os.environ['DASK_OPTS__WORKER_CORES'])
    opts.scheduler_memory = int(os.environ['DASK_OPTS__SCHEDULER_MEMORY'])
    opts.scheduler_cores = int(os.environ['DASK_OPTS__SCHEDULER_CORES'])
    cluster = gw.new_cluster(opts)
    cluster.scale(int(os.environ['DASK_OPTS__N_WORKERS']))
    client = cluster.get_client()

    logger.warning(f"Client dashboard: {client.dashboard_link}")

    # Client code goes here
finally:
    gw.stop_cluster(client.cluster.name)

Closes #112

Checklist

  • Ran nbautoexport export . in /opt/src/notebooks and committed the generated scripts. This is to make reviewing notebooks easier. (Note the export will happen automatically after saving notebooks from the Jupyter web app.)
  • Documentation updated if needed
  • PR has a name that won't get you publicly shamed for vagueness

Notes

This workflow will eventually be added to the cluster configs as a ClusterWorkflowTemplate, but that will be handled by azavea/kubernetes-deployment#34.

Testing Instructions

  • Start a new workflow
  • Copy in the contents of run-dask-job.yaml into the manual editor
  • Adjust the parameter values in the parameters tab to configure the size of the cluster and the source code location (currently the latter must be specified as an HTTP(S) URL)
  • Run the workflow (there will be a 3–6 minute delay for the Dask resources to come on line)
  • If you'd like to monitor the progress, grab the client dashboard URL from the pod logs for the task; append the value to https://jupyter.noaa.azavea.com
  • Since the workflow does not specify any garbage collection, delete the workflow when you're done to avoid stacking up old pods

@jpolchlo jpolchlo requested review from rajadain and vlulla January 4, 2023 14:53
@vlulla
Copy link
Contributor

vlulla commented Jan 4, 2023

This looks great! I have a minor observation to share: in my exploration of distributed dask (using SSHCluster) i learned that Client forks a process1. I have also learned that it is considered a best practice to have Client() initialized in __main__ block. I am completely unfamiliar with how try/finally works with python interpreter initialization to know if your setup completely sidesteps this issue. Anyways, I thought my finding was worthwhile to share and hence this comment.

Anyways, this looks great! I am going to emulate this in my argo workflows and seek your advice on any issues that i run into.

Footnotes

  1. https://github.com/dask/distributed/issues/516#issuecomment-306468605

Copy link
Contributor

@vlulla vlulla left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good!

@jpolchlo
Copy link
Collaborator Author

jpolchlo commented Jan 4, 2023

Is the gist of your comment that I ought to modify the template as follows?

import logging
import dask_gateway

logger = logging.getLogger("DaskWorkflow")

def main():
    gw = dask_gateway.Gateway(auth="jupyterhub")

    try:
        opts = gw.cluster_options()
        opts.worker_memory = int(os.environ['DASK_OPTS__WORKER_MEMORY'])
        opts.worker_cores = int(os.environ['DASK_OPTS__WORKER_CORES'])
        opts.scheduler_memory = int(os.environ['DASK_OPTS__SCHEDULER_MEMORY'])
        opts.scheduler_cores = int(os.environ['DASK_OPTS__SCHEDULER_CORES'])
        cluster = gw.new_cluster(opts)
        cluster.scale(int(os.environ['DASK_OPTS__N_WORKERS']))
        client = cluster.get_client()

        logger.warning(f"Client dashboard: {client.dashboard_link}")

        # Client code goes here
    finally:
        gw.stop_cluster(client.cluster.name)

if __name__ == "__main'":
    main()

It's worth noting that Dask Distributed works differently to Dask Gateway, and it should not be relying on threads/processes in the same way. I've not encountered any difficulty starting a Client from the template as it was presented (which was not in a main block).

@vlulla
Copy link
Contributor

vlulla commented Jan 4, 2023

Indeed, that is the gist of my comment. Additionally, I think that modifying it this way makes the script work correctly when we are trying to experiment in a non-argo environment.

By the way, there's a minor typo: it ought to be "__main__" instead of "__main'".

Thanks for considering my point!

@jpolchlo
Copy link
Collaborator Author

jpolchlo commented Jan 4, 2023

Oops! Typo. Thanks. I adjusted the template in the README and the base flow example.

@jpolchlo
Copy link
Collaborator Author

jpolchlo commented Jan 4, 2023

As an additional point, it should be noted that without more complex logic, such an example template won't be interchangeable between the cloud environment and a local Dask distributed environment, since they have different imports and setup. I think.

@vlulla
Copy link
Contributor

vlulla commented Jan 4, 2023

Yes, point taken!

@rajadain
Copy link
Collaborator

rajadain commented Jan 4, 2023

This could be made a little more explicit like this:

import logging
import dask_gateway

logger = logging.getLogger("DaskWorkflow")

def run_on_cluster(fn):
    gw = dask_gateway.Gateway(auth="jupyterhub")

    try:
        opts = gw.cluster_options()
        opts.worker_memory = int(os.environ['DASK_OPTS__WORKER_MEMORY'])
        opts.worker_cores = int(os.environ['DASK_OPTS__WORKER_CORES'])
        opts.scheduler_memory = int(os.environ['DASK_OPTS__SCHEDULER_MEMORY'])
        opts.scheduler_cores = int(os.environ['DASK_OPTS__SCHEDULER_CORES'])
        cluster = gw.new_cluster(opts)
        cluster.scale(int(os.environ['DASK_OPTS__N_WORKERS']))
        client = cluster.get_client()

        logger.warning(f"Client dashboard: {client.dashboard_link}")

        fn()
    finally:
        gw.stop_cluster(client.cluster.name)


def client_code():
    # Client code goes here


def main():
    run_on_cluster(client_code)


if __name__ == "__main'":
    main()

Going to try to run the example on the cluster now.

@jpolchlo
Copy link
Collaborator Author

jpolchlo commented Jan 6, 2023

@rajadain I took your advice (a bit) and modularized the template a bit more. You add sort of two levels of indirection into your code, that I simplified a bit. Check the modified README. Was there a particular reason that you wanted to elect the client code function as a higher-order function call?

@rajadain
Copy link
Collaborator

rajadain commented Jan 6, 2023

Was there a particular reason that you wanted to elect the client code function as a higher-order function call?

Just for clarity, so the client code is free of distraction. Your solution works well!

@jpolchlo jpolchlo merged commit ca54709 into master Jan 6, 2023
@jpolchlo jpolchlo deleted the workflow/dask-task branch January 6, 2023 23:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Calculate base flow for NWM stream reaches
3 participants