diff --git a/docs/framework/quickstart.rst b/docs/framework/quickstart.rst index 1905a8cf6..c840e3d12 100644 --- a/docs/framework/quickstart.rst +++ b/docs/framework/quickstart.rst @@ -67,7 +67,7 @@ We can now run the pipeline by invoking the following command inside the contain Seeing Results --------------- -If you go to ``${RV_QUICKSTART_OUT_DIR}`` you should see a directory structure like this. +If you go to ``${RV_QUICKSTART_OUT_DIR}/tiny_spacenet`` you should see a directory structure like this. .. note:: This uses the ``tree`` command which you may need to install first. diff --git a/docs/release.rst b/docs/release.rst index 07e023df4..40931a485 100644 --- a/docs/release.rst +++ b/docs/release.rst @@ -29,14 +29,6 @@ Minor or Major Version Release #. Update the docs if needed. See the `docs README <{{ repo }}/docs/README.md>`__ for instructions. #. Update `tiny_spacenet.py <{{ repo_examples }}/tiny_spacenet.py>`__ if needed and ensure the line numbers in every ``literalinclude`` of that file are correct. Tip: you can find all instances by searching the repo using the regex: ``\.\. literalinclude:: .+tiny_spacenet\.py$``. #. Test :ref:`setup` and :ref:`quickstart` instructions and make sure they work. - #. Test examples from :ref:`pipelines plugins`. - - .. code-block:: console - - rastervision run inprocess rastervision.pipeline_example_plugin1.config1 -a root_uri /opt/data/pipeline-example/1/ --splits 2 - rastervision run inprocess rastervision.pipeline_example_plugin1.config2 -a root_uri /opt/data/pipeline-example/2/ --splits 2 - rastervision run inprocess rastervision.pipeline_example_plugin2.config3 -a root_uri /opt/data/pipeline-example/3/ --splits 2 - #. Test examples from :ref:`bootstrap`. .. code-block:: console diff --git a/rastervision_aws_batch/rastervision/aws_batch/aws_batch_runner.py b/rastervision_aws_batch/rastervision/aws_batch/aws_batch_runner.py index 88224a38f..565b1e2fb 100644 --- a/rastervision_aws_batch/rastervision/aws_batch/aws_batch_runner.py +++ b/rastervision_aws_batch/rastervision/aws_batch/aws_batch_runner.py @@ -1,12 +1,17 @@ +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple import logging import os import uuid -from typing import List, Optional +from pprint import pformat from rastervision.pipeline import rv_config_ as rv_config from rastervision.pipeline.runner import Runner +if TYPE_CHECKING: + from rastervision.pipeline.pipeline import Pipeline + log = logging.getLogger(__name__) + AWS_BATCH = 'batch' @@ -23,20 +28,23 @@ def submit_job(cmd: List[str], """Submit a job to run on AWS Batch. Args: - cmd: a command to run in the Docker container for the remote job - debug: if True, run the command using a ptvsd wrapper which sets up a remote - VS Code Python debugger server - profile: if True, run the command using kernprof, a line profiler - attempts: the number of times to try running the command which is useful - in case of failure. - parent_job_ids: optional list of parent Batch job ids. The job created by this - will only run after the parent jobs complete successfully. - num_array_jobs: if set, make this a Batch array job with size equal to - num_array_jobs - use_gpu: if True, run the job in a GPU-enabled queue - job_queue: if set, use this job queue - job_def: if set, use this job definition + cmd: Command to run in the Docker container for the remote job as list + of strings. + debug: If True, run the command using a ptvsd wrapper which sets up a + remote VS Code Python debugger server. + profile: If True, run the command using kernprof, a line profiler. + attempts: The number of times to try running the command which is + useful in case of failure. + parent_job_ids: Optional list of parent Batch job ids. The job created + by this will only run after the parent jobs complete successfully. + num_array_jobs: If set, make this a Batch array job with size equal to + num_array_jobs. + use_gpu: If True, run the job in a GPU-enabled queue. + job_queue: If set, use this job queue. + job_def: If set, use this job definition. """ + import boto3 + batch_config = rv_config.get_namespace_config(AWS_BATCH) if job_queue is None: @@ -51,25 +59,21 @@ def submit_job(cmd: List[str], else: job_def = batch_config('cpu_job_def') - import boto3 - client = boto3.client('batch') - - cmd_list = cmd.split(' ') if debug: - cmd_list = [ + cmd = [ 'python', '-m', 'ptvsd', '--host', '0.0.0.0', '--port', '6006', '--wait', '-m' - ] + cmd_list + ] + cmd if profile: - cmd_list = ['kernprof', '-v', '-l'] + cmd_list + cmd = ['kernprof', '-v', '-l'] + cmd kwargs = { 'jobName': job_name, 'jobQueue': job_queue, 'jobDefinition': job_def, 'containerOverrides': { - 'command': cmd_list + 'command': cmd }, 'retryStrategy': { 'attempts': attempts @@ -80,12 +84,8 @@ def submit_job(cmd: List[str], if num_array_jobs: kwargs['arrayProperties'] = {'size': num_array_jobs} + client = boto3.client('batch') job_id = client.submit_job(**kwargs)['jobId'] - msg = 'submitted job with jobName={} and jobId={} w/ parent(s)={}'.format( - job_name, job_id, parent_job_ids) - log.info(msg) - log.info(cmd_list) - return job_id @@ -94,47 +94,58 @@ class AWSBatchRunner(Runner): Requires Everett configuration of form: - ``` - [AWS_BATCH] - cpu_job_queue= - cpu_job_def= - gpu_job_queue= - gpu_job_def= - attempts= - ``` + .. code-block:: ini + + [AWS_BATCH] + cpu_job_queue= + cpu_job_def= + gpu_job_queue= + gpu_job_def= + attempts= """ def run(self, + cfg_json_uri: str, + pipeline: 'Pipeline', + commands: List[str], + num_splits: int = 1, + pipeline_run_name: str = 'raster-vision'): + cmd, args = self.build_cmd( cfg_json_uri, pipeline, commands, - num_splits=1, - pipeline_run_name: str = 'raster-vision'): + num_splits, + pipeline_run_name=pipeline_run_name) + job_id = submit_job(cmd=cmd, **args) + + job_info = dict( + name=args['job_name'], + id=job_id, + parents=args['parent_job_ids'], + cmd=cmd, + ) + job_info_str = pformat(job_info, sort_dicts=False) + msg = (f'Job submitted:\n{job_info_str}') + log.info(msg) + + def build_cmd(self, + cfg_json_uri: str, + pipeline: 'Pipeline', + commands: List[str], + num_splits: int = 1, + pipeline_run_name: str = 'raster-vision' + ) -> Tuple[List[str], Dict[str, Any]]: parent_job_ids = [] - # pipeline-specific job queue - if hasattr(pipeline, 'job_queue'): - pipeline_job_queue = pipeline.job_queue - else: - pipeline_job_queue = None - - # pipeline-specific job definition - if hasattr(pipeline, 'job_def'): - pipeline_job_def = pipeline.job_def - else: - pipeline_job_def = None + # pipeline-specific job queue and job definition + pipeline_job_queue = getattr(pipeline, 'job_queue', None) + pipeline_job_def = getattr(pipeline, 'job_def', None) for command in commands: - # command-specific job queue, job definition - job_def = pipeline_job_def - job_queue = pipeline_job_queue - if hasattr(pipeline, command): - fn = getattr(pipeline, command) - if hasattr(fn, 'job_def'): - job_def = fn.job_def - if hasattr(fn, 'job_queue'): - job_queue = fn.job_queue + cmd_obj = getattr(pipeline, command, None) + job_def = getattr(cmd_obj, 'job_def', pipeline_job_def) + job_queue = getattr(cmd_obj, 'job_queue', pipeline_job_queue) num_array_jobs = None use_gpu = command in pipeline.gpu_commands @@ -142,26 +153,30 @@ def run(self, job_name = f'{pipeline_run_name}-{command}-{uuid.uuid4()}' cmd = ['python', '-m', 'rastervision.pipeline.cli'] + if rv_config.get_verbosity() > 1: - cmd.append('-' + 'v' * (rv_config.get_verbosity() - 1)) - cmd.extend( - ['run_command', cfg_json_uri, command, '--runner', AWS_BATCH]) + num_vs = rv_config.get_verbosity() - 1 + # produces a string like "-vvv..." + verbosity_opt_str = f'-{"v" * num_vs}' + cmd += [verbosity_opt_str] + + cmd += [ + 'run_command', cfg_json_uri, command, '--runner', AWS_BATCH + ] if command in pipeline.split_commands and num_splits > 1: num_array_jobs = num_splits cmd += ['--num-splits', str(num_splits)] - job_id = submit_job( - cmd=' '.join(cmd), + + args = dict( job_name=job_name, parent_job_ids=parent_job_ids, num_array_jobs=num_array_jobs, use_gpu=use_gpu, job_queue=job_queue, job_def=job_def) - parent_job_ids = [job_id] - job_queue = None - job_def = None + return cmd, args def get_split_ind(self): return int(os.environ.get('AWS_BATCH_JOB_ARRAY_INDEX', 0)) diff --git a/rastervision_pipeline/rastervision/pipeline/cli.py b/rastervision_pipeline/rastervision/pipeline/cli.py index 1973eb1a7..55f7e23e4 100644 --- a/rastervision_pipeline/rastervision/pipeline/cli.py +++ b/rastervision_pipeline/rastervision/pipeline/cli.py @@ -69,18 +69,16 @@ def get_configs(cfg_module_path: str, runner: str, if _get_config is None: _get_configs = getattr(cfg_module, 'get_configs', None) if _get_configs is None: - raise Exception( - 'There must be a get_config or get_configs function in {}.'.format( - cfg_module_path)) + raise Exception('There must be a get_config or get_configs function ' + f'in {cfg_module_path}.') cfgs = _get_configs(runner, **args) if not isinstance(cfgs, list): cfgs = [cfgs] for cfg in cfgs: if not issubclass(type(cfg), PipelineConfig): - raise Exception( - ('All objects returned by get_configs in {} must be ' - 'PipelineConfigs.').format(cfg_module_path)) + raise Exception('All objects returned by get_configs in ' + f'{cfg_module_path} must be PipelineConfigs.') return cfgs @@ -208,12 +206,11 @@ def _run_command(cfg_json_uri: str, command_fn = getattr(pipeline, command) if num_splits is not None and num_splits > 1: - msg = 'Running {} command split {}/{}...'.format( - command, split_ind + 1, num_splits) + msg = f'Running {command} command split {split_ind + 1}/{num_splits}...' click.secho(msg, fg='green', bold=True) command_fn(split_ind=split_ind, num_splits=num_splits) else: - msg = 'Running {} command...'.format(command) + msg = f'Running {command} command...' click.secho(msg, fg='green', bold=True) command_fn() @@ -241,7 +238,7 @@ def run_command(cfg_json_uri: str, command: str, split_ind: Optional[int], runner=runner) -def _main(): +def _main(): # pragma: no cover for pc in registry.get_plugin_commands(): main.add_command(pc) main() diff --git a/rastervision_pipeline/rastervision/pipeline/runner/local_runner.py b/rastervision_pipeline/rastervision/pipeline/runner/local_runner.py index 0ce0b92c4..981323f46 100644 --- a/rastervision_pipeline/rastervision/pipeline/runner/local_runner.py +++ b/rastervision_pipeline/rastervision/pipeline/runner/local_runner.py @@ -1,3 +1,4 @@ +from typing import TYPE_CHECKING, List, Optional import sys from os.path import dirname, join from subprocess import Popen @@ -6,21 +7,67 @@ from rastervision.pipeline.runner.runner import Runner from rastervision.pipeline.utils import terminate_at_exit +if TYPE_CHECKING: + from rastervision.pipeline.pipeline import Pipeline + LOCAL = 'local' +def make_run_cmd_invocation(cfg_json_uri: str, + command: str, + opts: Optional[dict] = None) -> str: + opts_str = '' + if opts is not None: + opts_str = ' ' + ' '.join(f'{k} {v}' for k, v in opts.items()) + return ('python -m rastervision.pipeline.cli run_command ' + f'{cfg_json_uri} {command}{opts_str}') + + +def make_makefile_entry_for_cmd(curr_command_ind: int, + prev_command_inds: List[int], + cfg_json_uri: str, + command: str, + opts: Optional[dict] = None) -> str: + out = f'{curr_command_ind}: ' + out += ' '.join([str(ci) for ci in prev_command_inds]) + out += '\n' + invocation = make_run_cmd_invocation(cfg_json_uri, command, opts=opts) + out += f'\t{invocation}\n\n' + return out + + class LocalRunner(Runner): - """Runs each command locally using different processes for each command/split. + """ + Runs each command locally using different processes for each command/split. - This is implemented by generating a Makefile and then running it using make. + This is implemented by generating a Makefile and then running it using + make. """ def run(self, - cfg_json_uri, - pipeline, - commands, - num_splits=1, + cfg_json_uri: str, + pipeline: 'Pipeline', + commands: List[str], + num_splits: int = 1, pipeline_run_name: str = 'raster-vision'): + makefile = self.build_makefile_string(cfg_json_uri, pipeline, commands, + num_splits) + makefile_path = join(dirname(cfg_json_uri), 'Makefile') + str_to_file(makefile, makefile_path) + makefile_path_local = download_if_needed(makefile_path) + process = Popen(['make', '-j', '-f', makefile_path_local]) + terminate_at_exit(process) + exitcode = process.wait() + if exitcode != 0: + sys.exit(exitcode) + else: + return 0 + + def build_makefile_string(self, + cfg_json_uri: str, + pipeline: 'Pipeline', + commands: List[str], + num_splits: int = 1) -> str: num_commands = 0 for command in commands: if command in pipeline.split_commands and num_splits > 1: @@ -28,51 +75,32 @@ def run(self, else: num_commands += 1 - makefile = '.PHONY: ' - makefile += ' '.join([str(ci) for ci in range(num_commands)]) - makefile += '\n\n' - - makefile += 'all: ' - makefile += ' '.join([str(ci) for ci in range(num_commands)]) - makefile += '\n\n' + all_command_inds_str = ' '.join([str(i) for i in range(num_commands)]) + makefile = f'.PHONY: {all_command_inds_str}\n\n' + makefile += f'all: {all_command_inds_str}\n\n' prev_command_inds = [] curr_command_ind = 0 for command in commands: - curr_command_inds = [] if command in pipeline.split_commands and num_splits > 1: + opts = {'--num-splits': num_splits} for split_ind in range(num_splits): - makefile += '{}: '.format(curr_command_ind) - makefile += ' '.join([str(ci) for ci in prev_command_inds]) - makefile += '\n' - invocation = ( - 'python -m rastervision.pipeline.cli run_command ' - '{} {} --split-ind {} --num-splits {}').format( - cfg_json_uri, command, split_ind, num_splits) - makefile += '\t{}\n\n'.format(invocation) + opts['--split-ind'] = split_ind + makefile_entry = make_makefile_entry_for_cmd( + curr_command_ind, + prev_command_inds, + cfg_json_uri, + command, + opts=opts) + makefile += makefile_entry curr_command_inds.append(curr_command_ind) curr_command_ind += 1 else: - makefile += '{}: '.format(curr_command_ind) - makefile += ' '.join([str(ci) for ci in prev_command_inds]) - makefile += '\n' - invocation = ( - 'python -m rastervision.pipeline.cli run_command ' - '{} {}'.format(cfg_json_uri, command)) - makefile += '\t{}\n\n'.format(invocation) + makefile_entry = make_makefile_entry_for_cmd( + curr_command_ind, prev_command_inds, cfg_json_uri, command) + makefile += makefile_entry curr_command_inds.append(curr_command_ind) curr_command_ind += 1 - prev_command_inds = curr_command_inds - - makefile_path = join(dirname(cfg_json_uri), 'Makefile') - str_to_file(makefile, makefile_path) - makefile_path_local = download_if_needed(makefile_path) - process = Popen(['make', '-j', '-f', makefile_path_local]) - terminate_at_exit(process) - exitcode = process.wait() - if exitcode != 0: - sys.exit(exitcode) - else: - return 0 + return makefile diff --git a/rastervision_pytorch_backend/rastervision/pytorch_backend/examples/tiny_spacenet.py b/rastervision_pytorch_backend/rastervision/pytorch_backend/examples/tiny_spacenet.py index 74f6ec5b9..e5ecb579a 100644 --- a/rastervision_pytorch_backend/rastervision/pytorch_backend/examples/tiny_spacenet.py +++ b/rastervision_pytorch_backend/rastervision/pytorch_backend/examples/tiny_spacenet.py @@ -9,7 +9,7 @@ def get_config(runner) -> SemanticSegmentationConfig: - output_root_uri = '/opt/data/output/' + output_root_uri = '/opt/data/output/tiny_spacenet' class_config = ClassConfig( names=['building', 'background'], colors=['red', 'black']) diff --git a/tests/aws_batch/__init__.py b/tests/aws_batch/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/aws_batch/test_aws_batch_runner.py b/tests/aws_batch/test_aws_batch_runner.py new file mode 100644 index 000000000..f7a3971ca --- /dev/null +++ b/tests/aws_batch/test_aws_batch_runner.py @@ -0,0 +1,49 @@ +import os +import unittest + +from rastervision.pipeline import rv_config_ as rv_config +from rastervision.aws_batch.aws_batch_runner import AWSBatchRunner + + +class MockPipeline: + commands = ['test_cpu', 'test_gpu'] + split_commands = ['test_cpu'] + gpu_commands = ['test_gpu'] + + +class TestAWSBatchRunner(unittest.TestCase): + def test_build_cmd(self): + pipeline = MockPipeline() + runner = AWSBatchRunner() + rv_config.set_verbosity(4) + cmd, args = runner.build_cmd( + 'config.json', + pipeline, ['predict'], + num_splits=2, + pipeline_run_name='test') + cmd_expected = [ + 'python', '-m', 'rastervision.pipeline.cli', '-vvv', 'run_command', + 'config.json', 'predict', '--runner', 'batch' + ] + args_expected = { + 'parent_job_ids': [], + 'num_array_jobs': None, + 'use_gpu': False, + 'job_queue': None, + 'job_def': None + } + self.assertListEqual(cmd, cmd_expected) + self.assertTrue(args['job_name'].startswith('test')) + del args['job_name'] + self.assertDictEqual(args, args_expected) + + def test_get_split_ind(self): + runner = AWSBatchRunner() + os.environ['AWS_BATCH_JOB_ARRAY_INDEX'] = '1' + self.assertEqual(runner.get_split_ind(), 1) + del os.environ['AWS_BATCH_JOB_ARRAY_INDEX'] + self.assertEqual(runner.get_split_ind(), 0) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/pipeline/test_cli.py b/tests/pipeline/test_cli.py new file mode 100644 index 000000000..2b1f41d4e --- /dev/null +++ b/tests/pipeline/test_cli.py @@ -0,0 +1,80 @@ +import unittest +import shutil + +from rastervision.pipeline.cli import main, print_error, convert_bool_args + +from click.testing import CliRunner + + +class TestCli(unittest.TestCase): + def test_rastervision_help(self): + runner = CliRunner() + result = runner.invoke(main, ['--help']) + self.assertEqual(result.exit_code, 0) + + def test_rastervision_run_help(self): + runner = CliRunner() + result = runner.invoke(main, ['run', '--help']) + self.assertEqual(result.exit_code, 0) + + def test_rastervision_run_local(self): + runner = CliRunner() + shutil.rmtree('/opt/data/pipeline-example/1/', ignore_errors=True) + result = runner.invoke(main, [ + 'run', 'local', 'rastervision.pipeline_example_plugin1.config1', + '-a', 'root_uri', '/opt/data/pipeline-example/1/', '--splits', '2' + ]) + self.assertEqual(result.exit_code, 0) + + # from config path + cfg_path = ('rastervision_pipeline/rastervision/' + 'pipeline_example_plugin1/config1.py') + result = runner.invoke(main, [ + 'run', 'local', cfg_path, '-a', 'root_uri', + '/opt/data/pipeline-example/1/', '--splits', '2' + ]) + self.assertEqual(result.exit_code, 0) + + def test_rastervision_run_inprocess1(self): + runner = CliRunner() + shutil.rmtree('/opt/data/pipeline-example/1/', ignore_errors=True) + result = runner.invoke(main, [ + 'run', 'inprocess', + 'rastervision.pipeline_example_plugin1.config1', '-a', 'root_uri', + '/opt/data/pipeline-example/1/', '--splits', '2' + ]) + self.assertEqual(result.exit_code, 0) + + def test_rastervision_run_inprocess2(self): + runner = CliRunner() + shutil.rmtree('/opt/data/pipeline-example/2/', ignore_errors=True) + result = runner.invoke(main, [ + 'run', 'inprocess', + 'rastervision.pipeline_example_plugin1.config2', '-a', 'root_uri', + '/opt/data/pipeline-example/2/', '--splits', '2' + ]) + self.assertEqual(result.exit_code, 0) + + def test_rastervision_run_inprocess3(self): + runner = CliRunner() + shutil.rmtree('/opt/data/pipeline-example/3/', ignore_errors=True) + result = runner.invoke(main, [ + 'run', 'inprocess', + 'rastervision.pipeline_example_plugin2.config3', '-a', 'root_uri', + '/opt/data/pipeline-example/3/', '--splits', '2' + ]) + self.assertEqual(result.exit_code, 0) + + +class TestUtils(unittest.TestCase): + def test_print_error(self): + print_error('error') + + def test_convert_bool_args(self): + args_in = dict(a='true', b='false') + args_out = convert_bool_args(args_in) + self.assertDictEqual(args_out, dict(a=True, b=False)) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/pytorch_backend/examples/test_tiny_spacenet.py b/tests/pytorch_backend/examples/test_tiny_spacenet.py new file mode 100644 index 000000000..5345bfe03 --- /dev/null +++ b/tests/pytorch_backend/examples/test_tiny_spacenet.py @@ -0,0 +1,35 @@ +import unittest +import shutil + +from click.testing import CliRunner + +from rastervision.pipeline.cli import main +from rastervision.pipeline.file_system.utils import get_tmp_dir +from rastervision.core.cli import predict + +from tests import data_file_path + + +class TestTinySpacenet(unittest.TestCase): + def test_rastervision_run_tiny_spacenet(self): + runner = CliRunner() + shutil.rmtree('/opt/data/output/tiny_spacenet', ignore_errors=True) + result = runner.invoke(main, [ + 'run', 'inprocess', + 'rastervision.pytorch_backend.examples.tiny_spacenet' + ]) + self.assertEqual(result.exit_code, 0) + + # test predict command + bundle_path = '/opt/data/output/tiny_spacenet/bundle/model-bundle.zip' + img_path = data_file_path('small-rgb-tile.tif') + with get_tmp_dir() as tmp_dir: + result = runner.invoke(predict, [ + bundle_path, img_path, tmp_dir, '--channel-order', '0', '1', + '2' + ]) + self.assertEqual(result.exit_code, 0) + + +if __name__ == '__main__': + unittest.main()