Skip to content

Commit

Permalink
HC-556: Update retry and purge jobs to delete all possible instances …
Browse files Browse the repository at this point in the history
…of a job across multiple indices (#35)

* HC-556: Update delete_by_id function to delete by job_status-current

* delete the jobs in a different way

* wait for job-revoked state to be logged before continuing

* use ensure_job_indexed

* update how the container is being built

* add user ops

* revert back to print statements

* revert back to print statements

* query by correct thing

* typo

* make wait shorter

* add logging

* add logging format

* should be retry.log

* update query

* update query

* add comment

* Revert "add comment"

This reverts commit 0babf15.

* update purge to remove all instances of a job

* add timestamps to logging; surround with try/except

* remove waiting for a revoked status

---------

Co-authored-by: Mike Cayanan <[email protected]>
  • Loading branch information
mcayanan and Mike Cayanan authored Oct 31, 2024
1 parent a2fb7e6 commit 8f28268
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 26 deletions.
10 changes: 10 additions & 0 deletions docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,16 @@ MAINTAINER malarout "[email protected]"
LABEL description="Lightweight System Jobs"

# provision lightweight-jobs PGE

# create ops user and group
RUN useradd -M -u 10005 ops

# softlink /home/ops
RUN ln -s /root /home/ops

# Make sure $HOME is set when we run this container
ENV HOME=/home/ops

USER ops
COPY . /home/ops/lightweight-jobs

Expand Down
14 changes: 9 additions & 5 deletions purge.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@
from utils import revoke, create_info_message_files

LOG_FILE_NAME = 'purge.log'
logging.basicConfig(filename=LOG_FILE_NAME, filemode='a', level=logging.INFO)
log_format = "[%(asctime)s: %(levelname)s/%(funcName)s] %(message)s"
logging.basicConfig(format=log_format, filename=LOG_FILE_NAME, filemode='a', level=logging.INFO)
logger = logging

tosca_es = get_grq_es()
mozart_es = get_mozart_es()


def read_context():
Expand Down Expand Up @@ -145,10 +147,12 @@ def purge_products(query, component, operation, delete_from_obj_store=True):
logger.info('Revoking %s\n', uuid)
revoke(uuid, state)

# Delete job from ES
logger.info('Removing document from index %s for %s', index, payload_id)
es.delete_by_id(index=index, id=payload_id, ignore=404)
logger.info('Removed %s from index: %s', payload_id, index)
# Delete job(s) from ES
results = es.search_by_id(index=index, id=payload_id, return_all=True, ignore=404)
for result in results:
logger.info('Removing document from index %s for %s', result['_id'], result['_index'])
es.delete_by_id(index=result['_index'], id=result['_id'])
logger.info('Removed %s from index: %s', result['_id'], result['_index'])
logger.info('Finished.')


Expand Down
58 changes: 37 additions & 21 deletions retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import json
import traceback
import backoff
import logging

from datetime import datetime
from celery import uuid

Expand All @@ -15,6 +17,13 @@


STATUS_ALIAS = app.conf["STATUS_ALIAS"]
JOB_STATUS_CURRENT = "job_status-current"

LOG_FILE_NAME = 'retry.log'
log_format = "[%(asctime)s: %(levelname)s/%(funcName)s] %(message)s"
logging.basicConfig(format=log_format, filename=LOG_FILE_NAME, filemode='a', level=logging.INFO)
logger = logging

mozart_es = get_mozart_es()


Expand All @@ -35,20 +44,23 @@ def query_es(job_id):
}
}
}
return mozart_es.search(index="job_status-current", body=query_json)
return mozart_es.search(index=JOB_STATUS_CURRENT, body=query_json)


@backoff.on_exception(backoff.expo, Exception, max_tries=10, max_value=64)
def delete_by_id(index, _id):
mozart_es.delete_by_id(index=index, id=_id)
results = mozart_es.search_by_id(index=index, id=_id, return_all=True)
for result in results:
logger.info(f"Deleting job {result['_id']} in {result['_index']}")
mozart_es.delete_by_id(index=result['_index'], id=result['_id'])


def get_new_job_priority(old_priority, increment_by, new_priority):
if increment_by is not None:
priority = int(old_priority) + int(increment_by)
if priority == 0 or priority == 9:
print("Not applying {} on previous priority of {}")
print("Priority must be between 0 and 8".format(increment_by, old_priority))
logger.info("Not applying {} on previous priority of {}")
logger.info("Priority must be between 0 and 8".format(increment_by, old_priority))
priority = int(old_priority)
else:
priority = int(new_priority)
Expand Down Expand Up @@ -77,11 +89,11 @@ def resubmit_jobs(context):
retry_job_ids = [context['retry_job_id']]

for job_id in retry_job_ids:
print(("Validating retry job: {}".format(job_id)))
logger.info("Validating retry job: {}".format(job_id))
try:
doc = query_es(job_id)
if doc['hits']['total']['value'] == 0:
print('job id %s not found in Elasticsearch. Continuing.' % job_id)
logger.warning('job id %s not found in Elasticsearch. Continuing.' % job_id)
continue
doc = doc["hits"]["hits"][0]

Expand All @@ -91,21 +103,21 @@ def resubmit_jobs(context):
_id = doc["_id"]

if not index.startswith("job"):
print("Cannot retry a worker: %s" % _id)
logger.error("Cannot retry a worker: %s" % _id)
continue

# don't retry a retry
if job_json['type'].startswith('job-lw-mozart-retry'):
print("Cannot retry retry job %s. Skipping" % job_id)
logger.error("Cannot retry retry job %s. Skipping" % job_id)
continue

# check retry_remaining_count
if 'retry_count' in job_json:
if job_json['retry_count'] < retry_count_max:
job_json['retry_count'] = int(job_json['retry_count']) + 1
else:
print("For job {}, retry_count now is {}, retry_count_max limit of {} reached. Cannot retry again."
.format(job_id, job_json['retry_count'], retry_count_max))
logger.error("For job {}, retry_count now is {}, retry_count_max limit of {} reached. Cannot retry again."
.format(job_id, job_json['retry_count'], retry_count_max))
continue
else:
job_json['retry_count'] = 1
Expand Down Expand Up @@ -133,32 +145,36 @@ def resubmit_jobs(context):
job_id = job_json['job_id']
try:
revoke(task_id, state)
print("revoked original job: %s (%s)" % (job_id, task_id))
logger.info("revoked original job: %s (%s) state=%s" % (job_id, task_id, state))
except:
print("Got error issuing revoke on job %s (%s): %s" % (job_id, task_id, traceback.format_exc()))
print("Continuing.")
logger.error("Got error issuing revoke on job %s (%s): %s" % (job_id, task_id, traceback.format_exc()))
logger.error("Continuing.")

# generate celery task id
new_task_id = uuid()
job_json['task_id'] = new_task_id

# delete old job status
delete_by_id(index, _id)
# delete old job status; we should pass in the job_status-current alias
# instead so that we make sure to properly handle the scenario where
# figaro rules are in place to auto retry jobs that fail due to spot termination.
# This may potentially cause duplicate records across the job_status
# and job_failed indices
delete_by_id(JOB_STATUS_CURRENT, _id)

# check if new queues, soft time limit, and time limit values were set
new_job_queue = context.get("job_queue", "")
if new_job_queue:
print(f"new job queue specified. Sending retry job to {new_job_queue}")
logger.info(f"new job queue specified. Sending retry job to {new_job_queue}")
job_json['job_info']['job_queue'] = new_job_queue

new_soft_time_limit = context.get("soft_time_limit", "")
if new_soft_time_limit:
print(f"new soft time limit specified. Setting new soft time limit to {int(new_soft_time_limit)}")
logger.info(f"new soft time limit specified. Setting new soft time limit to {int(new_soft_time_limit)}")
job_json['job_info']['soft_time_limit'] = int(new_soft_time_limit)

new_time_limit = context.get("time_limit", "")
if new_time_limit:
print(f"new time limit specified. Setting new time limit to {int(new_time_limit)}")
logger.info(f"new time limit specified. Setting new time limit to {int(new_time_limit)}")
job_json['job_info']['time_limit'] = int(new_time_limit)

# Before re-queueing, check to see if the job was under the job_failed index. If so, need to
Expand All @@ -183,9 +199,9 @@ def resubmit_jobs(context):
soft_time_limit=job_json['job_info']['soft_time_limit'],
priority=job_json['priority'],
task_id=new_task_id)
logger.info(f"re-submitted job_id={job_id}, payload_id={job_status_json['payload_id']}, task_id={new_task_id}")
except Exception as ex:
print("[ERROR] Exception occurred {0}:{1} {2}".format(type(ex), ex, traceback.format_exc()),
file=sys.stderr)
logger.error("[ERROR] Exception occurred {0}:{1} {2}".format(type(ex), ex, traceback.format_exc()))


if __name__ == "__main__":
Expand All @@ -194,4 +210,4 @@ def resubmit_jobs(context):
# if input_type == "job":
resubmit_jobs(ctx)
# else:
# print("Cannot retry a task, worker or event.")
# logger.info("Cannot retry a task, worker or event.")

0 comments on commit 8f28268

Please sign in to comment.