Skip to content

Commit

Permalink
Add uses_bulkdata argument to paasta spark run instance_config
Browse files Browse the repository at this point in the history
This makes the change to paasta spark run so that https://github.yelpcorp.com/sysgit/yelpsoa-configs/pull/52010 will work as expected.

This works by adding the uses_bulkdata key to the intsance config if the spark job has the key present and set to true.

I have added this arg to the tests so that they pass, and also created a test so that we can check all the different ways that uses_bulkdata can be set, either on paasta spark-run as an argument, or in the instance config.

See #3995 for more information about why we're doing this.
  • Loading branch information
SuperMatt committed Jan 20, 2025
1 parent d5727d7 commit 23fcbfa
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 0 deletions.
11 changes: 11 additions & 0 deletions paasta_tools/cli/cmds/spark_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,13 @@ def add_subparser(subparsers):
default=False,
)

list_parser.add_argument(
"--uses-bulkdata",
help="Mount /nail/bulkdata in the container",
action="store_true",
default=False,
)

aws_group = list_parser.add_argument_group(
title="AWS credentials options",
description="If --aws-credentials-yaml is specified, it overrides all "
Expand Down Expand Up @@ -1170,6 +1177,10 @@ def paasta_spark_run(args: argparse.Namespace) -> int:
load_deployments=args.build is False and args.image is None,
soa_dir=args.yelpsoa_config_root,
)
# If the spark job has uses_bulkdata set then propagate it to the instance_config
# If not, then whatever the instance_config has will be used
if args.uses_bulkdata:
instance_config.config_dict["uses_bulkdata"] = args.uses_bulkdata
except NoConfigurationForServiceError as e:
print(str(e), file=sys.stderr)
return 1
Expand Down
86 changes: 86 additions & 0 deletions tests/cli/test_cmds_spark_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -1030,6 +1030,7 @@ def test_paasta_spark_run_bash(
tronfig=None,
job_id=None,
use_web_identity=False,
uses_bulkdata=True,
)
mock_load_system_paasta_config_utils.return_value.get_kube_clusters.return_value = (
{}
Expand Down Expand Up @@ -1156,6 +1157,7 @@ def test_paasta_spark_run(
tronfig=None,
job_id=None,
use_web_identity=False,
uses_bulkdata=True,
)
mock_load_system_paasta_config_utils.return_value.get_kube_clusters.return_value = (
{}
Expand Down Expand Up @@ -1282,6 +1284,7 @@ def test_paasta_spark_run_pyspark(
tronfig=None,
job_id=None,
use_web_identity=False,
uses_bulkdata=True,
)
mock_load_system_paasta_config_utils.return_value.get_kube_clusters.return_value = (
{}
Expand Down Expand Up @@ -1358,6 +1361,89 @@ def test_paasta_spark_run_pyspark(
)


@mock.patch.object(spark_run, "validate_work_dir", autospec=True)
@mock.patch.object(utils, "load_system_paasta_config", autospec=True)
@mock.patch.object(spark_run, "load_system_paasta_config", autospec=True)
@mock.patch.object(spark_run, "get_instance_config", autospec=True)
@mock.patch.object(spark_run, "get_aws_credentials", autospec=True)
@mock.patch.object(spark_run, "get_docker_image", autospec=True)
@mock.patch.object(spark_run, "get_spark_app_name", autospec=True)
@mock.patch.object(spark_run, "auto_add_timeout_for_spark_job", autospec=True)
@mock.patch.object(spark_run, "_parse_user_spark_args", autospec=True)
@mock.patch(
"paasta_tools.cli.cmds.spark_run.spark_config.SparkConfBuilder", autospec=True
)
@mock.patch.object(spark_run, "configure_and_run_docker_container", autospec=True)
@mock.patch.object(spark_run, "get_smart_paasta_instance_name", autospec=True)
@pytest.mark.parametrize(
"spark_run_arg_uses_bulkdata,instance_config_uses_bulkdata,expected",
[
(True, True, True),
(True, False, True),
(False, True, True),
(False, False, False),
],
)
def test_paasta_spark_run_uses_bulkdata(
mock_get_smart_paasta_instance_name,
mock_configure_and_run_docker_container,
mock_spark_conf_builder,
mock_parse_user_spark_args,
mock_auto_add_timeout_for_spark_job,
mock_get_spark_app_name,
mock_get_docker_image,
mock_get_aws_credentials,
mock_get_instance_config,
mock_load_system_paasta_config_spark_run,
mock_load_system_paasta_config_utils,
mock_validate_work_dir,
spark_run_arg_uses_bulkdata,
instance_config_uses_bulkdata,
expected,
):
args = argparse.Namespace(
work_dir="/tmp/local",
cmd="USER=test spark-submit test.py",
build=True,
image=None,
enable_compact_bin_packing=False,
disable_compact_bin_packing=False,
service="test-service",
instance="test-instance",
cluster="test-cluster",
pool="test-pool",
yelpsoa_config_root="/path/to/soa",
aws_credentials_yaml="/path/to/creds",
aws_profile=None,
spark_args="spark.cores.max=100 spark.executor.cores=10",
cluster_manager=spark_run.CLUSTER_MANAGER_K8S,
timeout_job_runtime="1m",
enable_dra=True,
aws_region="test-region",
force_spark_resource_configs=False,
assume_aws_role=None,
aws_role_duration=3600,
k8s_server_address=None,
tronfig=None,
job_id=None,
use_web_identity=False,
uses_bulkdata=spark_run_arg_uses_bulkdata,
)
mock_load_system_paasta_config_spark_run.return_value.get_pools_for_cluster.return_value = [
"test-pool"
]

mock_get_instance_config.return_value.config_dict = {
"uses_bulkdata": instance_config_uses_bulkdata
}

spark_run.paasta_spark_run(args)

assert (
mock_get_instance_config.return_value.config_dict["uses_bulkdata"] == expected
)


@pytest.mark.parametrize(
"docker_cmd, is_mrjob, expected",
(
Expand Down

0 comments on commit 23fcbfa

Please sign in to comment.