Skip to content

Commit

Permalink
Update deletion logic to call the Tron API for valid jobs, and discar…
Browse files Browse the repository at this point in the history
…d the rest
  • Loading branch information
KaspariK committed Jan 9, 2025
1 parent ddd75a8 commit 5a60f18
Showing 1 changed file with 62 additions and 12 deletions.
74 changes: 62 additions & 12 deletions tools/pickles_to_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import Optional

import boto3
import requests
from boto3.resources.base import ServiceResource

from tron.core.job import Job
Expand Down Expand Up @@ -43,6 +44,24 @@ def get_all_jobs(source_table: ServiceResource) -> List[str]:
return list(unique_keys)


def get_job_names(base_url: str) -> List[str]:
"""
Get the list of job names from the Tron API.
:param base_url: The base URL of the Tron API.
:return: A list of job names.
"""
try:
full_url = f"http://{base_url}.yelpcorp.com:8089/api/jobs?include_job_runs=0"
response = requests.get(full_url)
response.raise_for_status()
data = response.json()
job_names = [job["name"] for job in data.get("jobs", [])]
return job_names
except requests.exceptions.RequestException as e:
print(f"An error occurred: {e}")
return []


def combine_pickle_partitions(source_table: ServiceResource, key: str) -> bytes:
"""
Load and combine all partitions of a pickled item from DynamoDB.
Expand Down Expand Up @@ -119,6 +138,7 @@ def dump_json_keys(source_table: ServiceResource, keys: List[str]) -> None:
dump_json_key(source_table, key)


# TODO: clean up old run history for valid jobs? something something look at job_state, then whitelist those runs instead of whitelisting entire jobs
def delete_keys(source_table: ServiceResource, keys: List[str]) -> None:
"""
Delete items with the given list of keys from the DynamoDB table.
Expand Down Expand Up @@ -237,6 +257,7 @@ def convert_pickles_to_json_and_update_table(
keys: List[str],
dry_run: bool = True,
keys_file: Optional[str] = None,
job_names: List[str] = [],
) -> None:
"""
Convert pickled items in the DynamoDB table to JSON and update the entries.
Expand All @@ -249,7 +270,26 @@ def convert_pickles_to_json_and_update_table(
converted_keys = 0
skipped_keys = 0
failed_keys = []
delete_keys = []

for key in keys:
# Extract the job name from the key
parts = key.split()
if len(parts) < 2:
continue

state_type, job_info = parts[0], parts[1]

# Ignore run_num for job_run_state keys
if state_type == "job_run_state":
job_name = ".".join(job_info.split(".")[:-1])
else:
job_name = job_info

if job_name not in job_names:
delete_keys.append(key)
continue

try:
result = convert_pickle_to_json_and_update_table(source_table, key, dry_run)
if result:
Expand All @@ -259,16 +299,19 @@ def convert_pickles_to_json_and_update_table(
except Exception as e:
print(f"Key: {key} - Failed to convert pickle to JSON: {e}")
failed_keys.append(key)

print(f"Total keys processed: {total_keys}")
print(f"Conversions attempted: {total_keys - skipped_keys}")
print(f"Conversions succeeded: {converted_keys}")
print(f"Conversions skipped: {skipped_keys}")
print(f"Conversions failed: {len(failed_keys)}")
if dry_run and keys_file and failed_keys:
print(f"Keys to be deleted: {len(delete_keys)}")

if keys_file:
with open(keys_file, "w") as f:
for key in failed_keys:
for key in failed_keys + delete_keys: # TODO: failed keys to separate file?
f.write(f"{key}\n")
print(f"Failed keys have been written to {keys_file}")
print(f"Failed and delete keys have been written to {keys_file}")
if dry_run:
print("Dry run complete. No changes were made to the DynamoDB table.")

Expand Down Expand Up @@ -300,17 +343,19 @@ def main():
Examples:
Validate pickles (dry run, write failed keys to keys.txt):
pickles_to_json.py --table-name infrastage-tron-state --table-region us-west-1 --action convert --all --dry-run --keys-file keys.txt
pickles_to_json.py --table-name infrastage-tron-state --table-region us-west-1 --action convert --all --dry-run --keys-file keys.txt --tron-api-url tron-infrastage
Convert all pickles to JSON (dry run):
pickles_to_json.py --table-name infrastage-tron-state --table-region us-west-1 --action convert --all --dry-run
pickles_to_json.py --table-name infrastage-tron-state --table-region us-west-1 --action convert --all --dry-run --tron-api-url tron-infrastage
Convert specific pickles to JSON using keys from a file:
pickles_to_json.py --table-name infrastage-tron-state --table-region us-west-1 --action convert --keys-file keys.txt
pickles_to_json.py --table-name infrastage-tron-state --table-region us-west-1 --action convert --keys-file keys.txt --tron-api-url tron-infrastage
Convert specific pickles to JSON:
pickles_to_json.py --table-name infrastage-tron-state --table-region us-west-1 --action convert --keys "key1" "key2"
pickles_to_json.py --table-name infrastage-tron-state --table-region us-west-1 --action convert --keys "key1" "key2" --tron-api-url tron-infrastage
Load and print specific JSON keys using keys from a file:
pickles_to_json.py --table-name infrastage-tron-state --table-region us-west-1 --action dump-json --keys-file keys.txt
Delete specific keys:
pickles_to_json.py --table-name infrastage-tron-state --table-region us-west-1 --action delete-keys --keys "key1" "key2"
Delete keys from a file:
pickles_to_json.py --table-name infrastage-tron-state --table-region us-west-1 --action delete-keys --keys-file keys.txt
""",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
Expand Down Expand Up @@ -348,7 +393,11 @@ def main():
action="store_true",
help="Apply the action to all keys in the table.",
)

parser.add_argument(
"--tron-api-url",
required=True,
help="URL of the Tron API to fetch job names from.",
)
args = parser.parse_args()
source_table = get_dynamodb_table(args.aws_profile, args.table_name, args.table_region)
if not args.keys and not args.keys_file and not args.all:
Expand All @@ -370,12 +419,13 @@ def main():
if not keys:
parser.error("No keys provided. Please provide keys via --keys or --keys-file.")
keys = list(set(keys))

# Get job names from the Tron API using the provided URL
job_names = get_job_names(args.tron_api_url)

if args.action == "convert":
convert_pickles_to_json_and_update_table(
source_table,
keys=keys,
dry_run=args.dry_run,
keys_file=args.keys_file,
source_table, keys=keys, dry_run=args.dry_run, keys_file=args.keys_file, job_names=job_names
)
elif args.action == "dump-pickle":
dump_pickle_keys(source_table, keys)
Expand Down

0 comments on commit 5a60f18

Please sign in to comment.