Skip to content

Commit

Permalink
Merge pull request #1966 from AdeelH/runner-unit-tests
Browse files Browse the repository at this point in the history
Improve unit test coverage of CLI and Runners
  • Loading branch information
AdeelH authored Oct 19, 2023
2 parents e3abaf4 + ca70b22 commit 7106652
Show file tree
Hide file tree
Showing 10 changed files with 322 additions and 126 deletions.
2 changes: 1 addition & 1 deletion docs/framework/quickstart.rst
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ We can now run the pipeline by invoking the following command inside the contain
Seeing Results
---------------

If you go to ``${RV_QUICKSTART_OUT_DIR}`` you should see a directory structure like this.
If you go to ``${RV_QUICKSTART_OUT_DIR}/tiny_spacenet`` you should see a directory structure like this.

.. note:: This uses the ``tree`` command which you may need to install first.

Expand Down
8 changes: 0 additions & 8 deletions docs/release.rst
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,6 @@ Minor or Major Version Release
#. Update the docs if needed. See the `docs README <{{ repo }}/docs/README.md>`__ for instructions.
#. Update `tiny_spacenet.py <{{ repo_examples }}/tiny_spacenet.py>`__ if needed and ensure the line numbers in every ``literalinclude`` of that file are correct. Tip: you can find all instances by searching the repo using the regex: ``\.\. literalinclude:: .+tiny_spacenet\.py$``.
#. Test :ref:`setup` and :ref:`quickstart` instructions and make sure they work.
#. Test examples from :ref:`pipelines plugins`.

.. code-block:: console
rastervision run inprocess rastervision.pipeline_example_plugin1.config1 -a root_uri /opt/data/pipeline-example/1/ --splits 2
rastervision run inprocess rastervision.pipeline_example_plugin1.config2 -a root_uri /opt/data/pipeline-example/2/ --splits 2
rastervision run inprocess rastervision.pipeline_example_plugin2.config3 -a root_uri /opt/data/pipeline-example/3/ --splits 2
#. Test examples from :ref:`bootstrap`.

.. code-block:: console
Expand Down
145 changes: 80 additions & 65 deletions rastervision_aws_batch/rastervision/aws_batch/aws_batch_runner.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
import logging
import os
import uuid
from typing import List, Optional
from pprint import pformat

from rastervision.pipeline import rv_config_ as rv_config
from rastervision.pipeline.runner import Runner

if TYPE_CHECKING:
from rastervision.pipeline.pipeline import Pipeline

log = logging.getLogger(__name__)

AWS_BATCH = 'batch'


Expand All @@ -23,20 +28,23 @@ def submit_job(cmd: List[str],
"""Submit a job to run on AWS Batch.
Args:
cmd: a command to run in the Docker container for the remote job
debug: if True, run the command using a ptvsd wrapper which sets up a remote
VS Code Python debugger server
profile: if True, run the command using kernprof, a line profiler
attempts: the number of times to try running the command which is useful
in case of failure.
parent_job_ids: optional list of parent Batch job ids. The job created by this
will only run after the parent jobs complete successfully.
num_array_jobs: if set, make this a Batch array job with size equal to
num_array_jobs
use_gpu: if True, run the job in a GPU-enabled queue
job_queue: if set, use this job queue
job_def: if set, use this job definition
cmd: Command to run in the Docker container for the remote job as list
of strings.
debug: If True, run the command using a ptvsd wrapper which sets up a
remote VS Code Python debugger server.
profile: If True, run the command using kernprof, a line profiler.
attempts: The number of times to try running the command which is
useful in case of failure.
parent_job_ids: Optional list of parent Batch job ids. The job created
by this will only run after the parent jobs complete successfully.
num_array_jobs: If set, make this a Batch array job with size equal to
num_array_jobs.
use_gpu: If True, run the job in a GPU-enabled queue.
job_queue: If set, use this job queue.
job_def: If set, use this job definition.
"""
import boto3

batch_config = rv_config.get_namespace_config(AWS_BATCH)

if job_queue is None:
Expand All @@ -51,25 +59,21 @@ def submit_job(cmd: List[str],
else:
job_def = batch_config('cpu_job_def')

import boto3
client = boto3.client('batch')

cmd_list = cmd.split(' ')
if debug:
cmd_list = [
cmd = [
'python', '-m', 'ptvsd', '--host', '0.0.0.0', '--port', '6006',
'--wait', '-m'
] + cmd_list
] + cmd

if profile:
cmd_list = ['kernprof', '-v', '-l'] + cmd_list
cmd = ['kernprof', '-v', '-l'] + cmd

kwargs = {
'jobName': job_name,
'jobQueue': job_queue,
'jobDefinition': job_def,
'containerOverrides': {
'command': cmd_list
'command': cmd
},
'retryStrategy': {
'attempts': attempts
Expand All @@ -80,12 +84,8 @@ def submit_job(cmd: List[str],
if num_array_jobs:
kwargs['arrayProperties'] = {'size': num_array_jobs}

client = boto3.client('batch')
job_id = client.submit_job(**kwargs)['jobId']
msg = 'submitted job with jobName={} and jobId={} w/ parent(s)={}'.format(
job_name, job_id, parent_job_ids)
log.info(msg)
log.info(cmd_list)

return job_id


Expand All @@ -94,74 +94,89 @@ class AWSBatchRunner(Runner):
Requires Everett configuration of form:
```
[AWS_BATCH]
cpu_job_queue=
cpu_job_def=
gpu_job_queue=
gpu_job_def=
attempts=
```
.. code-block:: ini
[AWS_BATCH]
cpu_job_queue=
cpu_job_def=
gpu_job_queue=
gpu_job_def=
attempts=
"""

def run(self,
cfg_json_uri: str,
pipeline: 'Pipeline',
commands: List[str],
num_splits: int = 1,
pipeline_run_name: str = 'raster-vision'):
cmd, args = self.build_cmd(
cfg_json_uri,
pipeline,
commands,
num_splits=1,
pipeline_run_name: str = 'raster-vision'):
num_splits,
pipeline_run_name=pipeline_run_name)
job_id = submit_job(cmd=cmd, **args)

job_info = dict(
name=args['job_name'],
id=job_id,
parents=args['parent_job_ids'],
cmd=cmd,
)
job_info_str = pformat(job_info, sort_dicts=False)
msg = (f'Job submitted:\n{job_info_str}')
log.info(msg)

def build_cmd(self,
cfg_json_uri: str,
pipeline: 'Pipeline',
commands: List[str],
num_splits: int = 1,
pipeline_run_name: str = 'raster-vision'
) -> Tuple[List[str], Dict[str, Any]]:
parent_job_ids = []

# pipeline-specific job queue
if hasattr(pipeline, 'job_queue'):
pipeline_job_queue = pipeline.job_queue
else:
pipeline_job_queue = None

# pipeline-specific job definition
if hasattr(pipeline, 'job_def'):
pipeline_job_def = pipeline.job_def
else:
pipeline_job_def = None
# pipeline-specific job queue and job definition
pipeline_job_queue = getattr(pipeline, 'job_queue', None)
pipeline_job_def = getattr(pipeline, 'job_def', None)

for command in commands:

# command-specific job queue, job definition
job_def = pipeline_job_def
job_queue = pipeline_job_queue
if hasattr(pipeline, command):
fn = getattr(pipeline, command)
if hasattr(fn, 'job_def'):
job_def = fn.job_def
if hasattr(fn, 'job_queue'):
job_queue = fn.job_queue
cmd_obj = getattr(pipeline, command, None)
job_def = getattr(cmd_obj, 'job_def', pipeline_job_def)
job_queue = getattr(cmd_obj, 'job_queue', pipeline_job_queue)

num_array_jobs = None
use_gpu = command in pipeline.gpu_commands

job_name = f'{pipeline_run_name}-{command}-{uuid.uuid4()}'

cmd = ['python', '-m', 'rastervision.pipeline.cli']

if rv_config.get_verbosity() > 1:
cmd.append('-' + 'v' * (rv_config.get_verbosity() - 1))
cmd.extend(
['run_command', cfg_json_uri, command, '--runner', AWS_BATCH])
num_vs = rv_config.get_verbosity() - 1
# produces a string like "-vvv..."
verbosity_opt_str = f'-{"v" * num_vs}'
cmd += [verbosity_opt_str]

cmd += [
'run_command', cfg_json_uri, command, '--runner', AWS_BATCH
]

if command in pipeline.split_commands and num_splits > 1:
num_array_jobs = num_splits
cmd += ['--num-splits', str(num_splits)]
job_id = submit_job(
cmd=' '.join(cmd),

args = dict(
job_name=job_name,
parent_job_ids=parent_job_ids,
num_array_jobs=num_array_jobs,
use_gpu=use_gpu,
job_queue=job_queue,
job_def=job_def)
parent_job_ids = [job_id]

job_queue = None
job_def = None
return cmd, args

def get_split_ind(self):
return int(os.environ.get('AWS_BATCH_JOB_ARRAY_INDEX', 0))
17 changes: 7 additions & 10 deletions rastervision_pipeline/rastervision/pipeline/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,18 +69,16 @@ def get_configs(cfg_module_path: str, runner: str,
if _get_config is None:
_get_configs = getattr(cfg_module, 'get_configs', None)
if _get_configs is None:
raise Exception(
'There must be a get_config or get_configs function in {}.'.format(
cfg_module_path))
raise Exception('There must be a get_config or get_configs function '
f'in {cfg_module_path}.')
cfgs = _get_configs(runner, **args)
if not isinstance(cfgs, list):
cfgs = [cfgs]

for cfg in cfgs:
if not issubclass(type(cfg), PipelineConfig):
raise Exception(
('All objects returned by get_configs in {} must be '
'PipelineConfigs.').format(cfg_module_path))
raise Exception('All objects returned by get_configs in '
f'{cfg_module_path} must be PipelineConfigs.')
return cfgs


Expand Down Expand Up @@ -208,12 +206,11 @@ def _run_command(cfg_json_uri: str,
command_fn = getattr(pipeline, command)

if num_splits is not None and num_splits > 1:
msg = 'Running {} command split {}/{}...'.format(
command, split_ind + 1, num_splits)
msg = f'Running {command} command split {split_ind + 1}/{num_splits}...'
click.secho(msg, fg='green', bold=True)
command_fn(split_ind=split_ind, num_splits=num_splits)
else:
msg = 'Running {} command...'.format(command)
msg = f'Running {command} command...'
click.secho(msg, fg='green', bold=True)
command_fn()

Expand Down Expand Up @@ -241,7 +238,7 @@ def run_command(cfg_json_uri: str, command: str, split_ind: Optional[int],
runner=runner)


def _main():
def _main(): # pragma: no cover
for pc in registry.get_plugin_commands():
main.add_command(pc)
main()
Expand Down
Loading

0 comments on commit 7106652

Please sign in to comment.