From 23fcbfa9ca82794f76a335ea9010625e160cda92 Mon Sep 17 00:00:00 2001 From: Matthew Ames Date: Tue, 14 Jan 2025 02:53:11 -0800 Subject: [PATCH] Add uses_bulkdata argument to paasta spark run instance_config This makes the change to paasta spark run so that https://github.yelpcorp.com/sysgit/yelpsoa-configs/pull/52010 will work as expected. This works by adding the uses_bulkdata key to the intsance config if the spark job has the key present and set to true. I have added this arg to the tests so that they pass, and also created a test so that we can check all the different ways that uses_bulkdata can be set, either on paasta spark-run as an argument, or in the instance config. See #3995 for more information about why we're doing this. --- paasta_tools/cli/cmds/spark_run.py | 11 ++++ tests/cli/test_cmds_spark_run.py | 86 ++++++++++++++++++++++++++++++ 2 files changed, 97 insertions(+) diff --git a/paasta_tools/cli/cmds/spark_run.py b/paasta_tools/cli/cmds/spark_run.py index 3b9300ebbf..9dc29140d4 100644 --- a/paasta_tools/cli/cmds/spark_run.py +++ b/paasta_tools/cli/cmds/spark_run.py @@ -350,6 +350,13 @@ def add_subparser(subparsers): default=False, ) + list_parser.add_argument( + "--uses-bulkdata", + help="Mount /nail/bulkdata in the container", + action="store_true", + default=False, + ) + aws_group = list_parser.add_argument_group( title="AWS credentials options", description="If --aws-credentials-yaml is specified, it overrides all " @@ -1170,6 +1177,10 @@ def paasta_spark_run(args: argparse.Namespace) -> int: load_deployments=args.build is False and args.image is None, soa_dir=args.yelpsoa_config_root, ) + # If the spark job has uses_bulkdata set then propagate it to the instance_config + # If not, then whatever the instance_config has will be used + if args.uses_bulkdata: + instance_config.config_dict["uses_bulkdata"] = args.uses_bulkdata except NoConfigurationForServiceError as e: print(str(e), file=sys.stderr) return 1 diff --git a/tests/cli/test_cmds_spark_run.py b/tests/cli/test_cmds_spark_run.py index 022b707b8b..39a7ac5b8e 100644 --- a/tests/cli/test_cmds_spark_run.py +++ b/tests/cli/test_cmds_spark_run.py @@ -1030,6 +1030,7 @@ def test_paasta_spark_run_bash( tronfig=None, job_id=None, use_web_identity=False, + uses_bulkdata=True, ) mock_load_system_paasta_config_utils.return_value.get_kube_clusters.return_value = ( {} @@ -1156,6 +1157,7 @@ def test_paasta_spark_run( tronfig=None, job_id=None, use_web_identity=False, + uses_bulkdata=True, ) mock_load_system_paasta_config_utils.return_value.get_kube_clusters.return_value = ( {} @@ -1282,6 +1284,7 @@ def test_paasta_spark_run_pyspark( tronfig=None, job_id=None, use_web_identity=False, + uses_bulkdata=True, ) mock_load_system_paasta_config_utils.return_value.get_kube_clusters.return_value = ( {} @@ -1358,6 +1361,89 @@ def test_paasta_spark_run_pyspark( ) +@mock.patch.object(spark_run, "validate_work_dir", autospec=True) +@mock.patch.object(utils, "load_system_paasta_config", autospec=True) +@mock.patch.object(spark_run, "load_system_paasta_config", autospec=True) +@mock.patch.object(spark_run, "get_instance_config", autospec=True) +@mock.patch.object(spark_run, "get_aws_credentials", autospec=True) +@mock.patch.object(spark_run, "get_docker_image", autospec=True) +@mock.patch.object(spark_run, "get_spark_app_name", autospec=True) +@mock.patch.object(spark_run, "auto_add_timeout_for_spark_job", autospec=True) +@mock.patch.object(spark_run, "_parse_user_spark_args", autospec=True) +@mock.patch( + "paasta_tools.cli.cmds.spark_run.spark_config.SparkConfBuilder", autospec=True +) +@mock.patch.object(spark_run, "configure_and_run_docker_container", autospec=True) +@mock.patch.object(spark_run, "get_smart_paasta_instance_name", autospec=True) +@pytest.mark.parametrize( + "spark_run_arg_uses_bulkdata,instance_config_uses_bulkdata,expected", + [ + (True, True, True), + (True, False, True), + (False, True, True), + (False, False, False), + ], +) +def test_paasta_spark_run_uses_bulkdata( + mock_get_smart_paasta_instance_name, + mock_configure_and_run_docker_container, + mock_spark_conf_builder, + mock_parse_user_spark_args, + mock_auto_add_timeout_for_spark_job, + mock_get_spark_app_name, + mock_get_docker_image, + mock_get_aws_credentials, + mock_get_instance_config, + mock_load_system_paasta_config_spark_run, + mock_load_system_paasta_config_utils, + mock_validate_work_dir, + spark_run_arg_uses_bulkdata, + instance_config_uses_bulkdata, + expected, +): + args = argparse.Namespace( + work_dir="/tmp/local", + cmd="USER=test spark-submit test.py", + build=True, + image=None, + enable_compact_bin_packing=False, + disable_compact_bin_packing=False, + service="test-service", + instance="test-instance", + cluster="test-cluster", + pool="test-pool", + yelpsoa_config_root="/path/to/soa", + aws_credentials_yaml="/path/to/creds", + aws_profile=None, + spark_args="spark.cores.max=100 spark.executor.cores=10", + cluster_manager=spark_run.CLUSTER_MANAGER_K8S, + timeout_job_runtime="1m", + enable_dra=True, + aws_region="test-region", + force_spark_resource_configs=False, + assume_aws_role=None, + aws_role_duration=3600, + k8s_server_address=None, + tronfig=None, + job_id=None, + use_web_identity=False, + uses_bulkdata=spark_run_arg_uses_bulkdata, + ) + mock_load_system_paasta_config_spark_run.return_value.get_pools_for_cluster.return_value = [ + "test-pool" + ] + + mock_get_instance_config.return_value.config_dict = { + "uses_bulkdata": instance_config_uses_bulkdata + } + + spark_run.paasta_spark_run(args) + + assert ( + mock_get_instance_config.return_value.config_dict["uses_bulkdata"] == expected + ) + + @pytest.mark.parametrize( "docker_cmd, is_mrjob, expected", (