From 5a60f18085370b3b59b3bd164124bfad3393859c Mon Sep 17 00:00:00 2001 From: Kevin Kaspari Date: Thu, 9 Jan 2025 11:50:09 -0800 Subject: [PATCH] Update deletion logic to call the Tron API for valid jobs, and discard the rest --- tools/pickles_to_json.py | 74 +++++++++++++++++++++++++++++++++------- 1 file changed, 62 insertions(+), 12 deletions(-) diff --git a/tools/pickles_to_json.py b/tools/pickles_to_json.py index 7a105af3d..6333211ed 100644 --- a/tools/pickles_to_json.py +++ b/tools/pickles_to_json.py @@ -6,6 +6,7 @@ from typing import Optional import boto3 +import requests from boto3.resources.base import ServiceResource from tron.core.job import Job @@ -43,6 +44,24 @@ def get_all_jobs(source_table: ServiceResource) -> List[str]: return list(unique_keys) +def get_job_names(base_url: str) -> List[str]: + """ + Get the list of job names from the Tron API. + :param base_url: The base URL of the Tron API. + :return: A list of job names. + """ + try: + full_url = f"http://{base_url}.yelpcorp.com:8089/api/jobs?include_job_runs=0" + response = requests.get(full_url) + response.raise_for_status() + data = response.json() + job_names = [job["name"] for job in data.get("jobs", [])] + return job_names + except requests.exceptions.RequestException as e: + print(f"An error occurred: {e}") + return [] + + def combine_pickle_partitions(source_table: ServiceResource, key: str) -> bytes: """ Load and combine all partitions of a pickled item from DynamoDB. @@ -119,6 +138,7 @@ def dump_json_keys(source_table: ServiceResource, keys: List[str]) -> None: dump_json_key(source_table, key) +# TODO: clean up old run history for valid jobs? something something look at job_state, then whitelist those runs instead of whitelisting entire jobs def delete_keys(source_table: ServiceResource, keys: List[str]) -> None: """ Delete items with the given list of keys from the DynamoDB table. @@ -237,6 +257,7 @@ def convert_pickles_to_json_and_update_table( keys: List[str], dry_run: bool = True, keys_file: Optional[str] = None, + job_names: List[str] = [], ) -> None: """ Convert pickled items in the DynamoDB table to JSON and update the entries. @@ -249,7 +270,26 @@ def convert_pickles_to_json_and_update_table( converted_keys = 0 skipped_keys = 0 failed_keys = [] + delete_keys = [] + for key in keys: + # Extract the job name from the key + parts = key.split() + if len(parts) < 2: + continue + + state_type, job_info = parts[0], parts[1] + + # Ignore run_num for job_run_state keys + if state_type == "job_run_state": + job_name = ".".join(job_info.split(".")[:-1]) + else: + job_name = job_info + + if job_name not in job_names: + delete_keys.append(key) + continue + try: result = convert_pickle_to_json_and_update_table(source_table, key, dry_run) if result: @@ -259,16 +299,19 @@ def convert_pickles_to_json_and_update_table( except Exception as e: print(f"Key: {key} - Failed to convert pickle to JSON: {e}") failed_keys.append(key) + print(f"Total keys processed: {total_keys}") print(f"Conversions attempted: {total_keys - skipped_keys}") print(f"Conversions succeeded: {converted_keys}") print(f"Conversions skipped: {skipped_keys}") print(f"Conversions failed: {len(failed_keys)}") - if dry_run and keys_file and failed_keys: + print(f"Keys to be deleted: {len(delete_keys)}") + + if keys_file: with open(keys_file, "w") as f: - for key in failed_keys: + for key in failed_keys + delete_keys: # TODO: failed keys to separate file? f.write(f"{key}\n") - print(f"Failed keys have been written to {keys_file}") + print(f"Failed and delete keys have been written to {keys_file}") if dry_run: print("Dry run complete. No changes were made to the DynamoDB table.") @@ -300,17 +343,19 @@ def main(): Examples: Validate pickles (dry run, write failed keys to keys.txt): - pickles_to_json.py --table-name infrastage-tron-state --table-region us-west-1 --action convert --all --dry-run --keys-file keys.txt + pickles_to_json.py --table-name infrastage-tron-state --table-region us-west-1 --action convert --all --dry-run --keys-file keys.txt --tron-api-url tron-infrastage Convert all pickles to JSON (dry run): - pickles_to_json.py --table-name infrastage-tron-state --table-region us-west-1 --action convert --all --dry-run + pickles_to_json.py --table-name infrastage-tron-state --table-region us-west-1 --action convert --all --dry-run --tron-api-url tron-infrastage Convert specific pickles to JSON using keys from a file: - pickles_to_json.py --table-name infrastage-tron-state --table-region us-west-1 --action convert --keys-file keys.txt + pickles_to_json.py --table-name infrastage-tron-state --table-region us-west-1 --action convert --keys-file keys.txt --tron-api-url tron-infrastage Convert specific pickles to JSON: - pickles_to_json.py --table-name infrastage-tron-state --table-region us-west-1 --action convert --keys "key1" "key2" + pickles_to_json.py --table-name infrastage-tron-state --table-region us-west-1 --action convert --keys "key1" "key2" --tron-api-url tron-infrastage Load and print specific JSON keys using keys from a file: pickles_to_json.py --table-name infrastage-tron-state --table-region us-west-1 --action dump-json --keys-file keys.txt Delete specific keys: pickles_to_json.py --table-name infrastage-tron-state --table-region us-west-1 --action delete-keys --keys "key1" "key2" + Delete keys from a file: + pickles_to_json.py --table-name infrastage-tron-state --table-region us-west-1 --action delete-keys --keys-file keys.txt """, formatter_class=argparse.RawDescriptionHelpFormatter, ) @@ -348,7 +393,11 @@ def main(): action="store_true", help="Apply the action to all keys in the table.", ) - + parser.add_argument( + "--tron-api-url", + required=True, + help="URL of the Tron API to fetch job names from.", + ) args = parser.parse_args() source_table = get_dynamodb_table(args.aws_profile, args.table_name, args.table_region) if not args.keys and not args.keys_file and not args.all: @@ -370,12 +419,13 @@ def main(): if not keys: parser.error("No keys provided. Please provide keys via --keys or --keys-file.") keys = list(set(keys)) + + # Get job names from the Tron API using the provided URL + job_names = get_job_names(args.tron_api_url) + if args.action == "convert": convert_pickles_to_json_and_update_table( - source_table, - keys=keys, - dry_run=args.dry_run, - keys_file=args.keys_file, + source_table, keys=keys, dry_run=args.dry_run, keys_file=args.keys_file, job_names=job_names ) elif args.action == "dump-pickle": dump_pickle_keys(source_table, keys)