From 6122eec1df2f87b66a74462c802898b5de4d6f5f Mon Sep 17 00:00:00 2001 From: Benjamin Poreh Date: Fri, 29 Oct 2021 13:20:15 -0700 Subject: [PATCH] NSDS-1730: implement accountability re-design (#15) * Fixed issue when preparing the psuedo_context * Incremented version number * fixed up creation of cls_object * NSDS-913: Implemented accountability in chimera core * NSDS-913: fixed up imports * Added env python * fixed imports? * Added _id to job_type in accountability.py * fixed sciflo issue * added the ability to grab the accountability module path. * Trying to fix up how I grab the accountability class * removed use of logger in sciflo_util.py * Fixed accountability failure implementation * Added an index field that should be set and added 3 more functions that are recommended to be overwritten when implementing a custom Accountability object. * added logging for PostProcess run * NSDS-1730: Added changes in run_sciflo for new __init__ of Accountability object. Added to Accountability's __init__. Updated how we initialize accountability in the PostProcessFunctions __init__. Added to job fields to constants.py * NSDS-1730: updated accountability to be cleaner. * NSDS-1730: removed unused constants * NSDS-1730: added "set_products" function in accountability object * NSDS-1730: removed accountability from unecessary places. * NSDS-1730: fixed reading of context file mistake * NSDS-1730: fixed getcwd issue * NSDS-1730: fixed create_job_entry issue * NSDS-1730: updated how we pass in the work_dir to the accountability object. * NSDS-1730: removed work_dir as a optional param in Accountability * NSDS-1730: removed unecessary pge_config. * NSDS-1730: removed check if a job failed for the old accountability Co-authored-by: Poreh Co-authored-by: Ben Poreh Co-authored-by: bporeh --- chimera/commons/accountability.py | 26 +++++++++++----------- chimera/commons/constants.py | 14 +++++++----- chimera/commons/sciflo_util.py | 1 + chimera/pge_job_submitter.py | 4 ---- chimera/postprocess_evaluator.py | 1 + chimera/postprocess_functions.py | 8 ++++--- chimera/precondition_functions.py | 4 ---- chimera/run_sciflo.py | 36 +++++++++++-------------------- 8 files changed, 43 insertions(+), 51 deletions(-) diff --git a/chimera/commons/accountability.py b/chimera/commons/accountability.py index ffc66f4..e5660e1 100644 --- a/chimera/commons/accountability.py +++ b/chimera/commons/accountability.py @@ -1,22 +1,24 @@ #!/usr/bin/env python +import json from chimera.commons.constants import ChimeraConstants as chimera_const - class Accountability(object): - def __init__(self, context, index=None): - self.input_dataset_type = context.get(chimera_const.INPUT_DATASET_TYPE) + "_id" - self.input_dataset_id = context.get(chimera_const.INPUT_DATASET_ID) - self.step = context.get(chimera_const.STEP) - self.index = index + def __init__(self, context, work_dir): + self.context = context + self.job_json = None + self.job_id = None + self.work_dir = work_dir + if work_dir is not None: + with open("{}/_job.json".format(work_dir), "r") as f: + self.job_json = json.load(f) - def _search(self, query): - pass + self.job_id = self.job_json.get(chimera_const.JOB_INFO).get(chimera_const.JOB_PAYLOAD).get(chimera_const.PAYLOAD_TASK_ID) - def _update_doc(self, id, body): + def get_entries(self): pass - def get_entries(self): + def create_job_entry(self): pass - def set_status(self, status): - pass \ No newline at end of file + def set_products(self, job_results): + pass diff --git a/chimera/commons/constants.py b/chimera/commons/constants.py index 603a22f..31966b9 100755 --- a/chimera/commons/constants.py +++ b/chimera/commons/constants.py @@ -73,12 +73,16 @@ def __init__(self): LAST_MOD_TIME = "LastModifiedTime" - JOB_TYPES = "JOB_TYPES" + JOB_INFO = "job_info" - JOB_QUEUES = "JOB_QUEUES" + JOB_PAYLOAD = "job_payload" + + PAYLOAD_TASK_ID = "payload_task_id" - INPUT_DATASET_TYPE = "dataset_type" + JOB_ID_FIELD = "job_id" - INPUT_DATASET_ID = "input_dataset_id" + JOB_TYPES = "JOB_TYPES" + + JOB_QUEUES = "JOB_QUEUES" - STEP = "purpose" + WORK_DIR = "work_dir" diff --git a/chimera/commons/sciflo_util.py b/chimera/commons/sciflo_util.py index 0a0fa9a..932be0f 100755 --- a/chimera/commons/sciflo_util.py +++ b/chimera/commons/sciflo_util.py @@ -28,6 +28,7 @@ def copy_sciflo_work(output_dir): shutil.copytree(real_path, new_path) return + def extract_error(sfl_json): """Extract SciFlo error and traceback for mozart.""" diff --git a/chimera/pge_job_submitter.py b/chimera/pge_job_submitter.py index a8d5d86..549c2c4 100755 --- a/chimera/pge_job_submitter.py +++ b/chimera/pge_job_submitter.py @@ -6,8 +6,6 @@ import json import os - -from chimera.commons.accountability import Accountability from chimera.commons.constants import ChimeraConstants as chimera_const from chimera.commons.conf_util import load_config, YamlConf from chimera.logger import logger @@ -22,7 +20,6 @@ def __init__(self, context, run_config, pge_config_file, settings_file, wuid=Non self._context = context elif isinstance(context, str): self._context = json.load(open(context, 'r')) - self.accountability = Accountability(self._context) logger.debug("Loaded context file: {}".format(json.dumps(self._context))) # This is intended to represent the top level working directory of the job. It's assumed to be at the same @@ -216,7 +213,6 @@ def submit_job(self): # information, if specified if "job_specification" in self._context: job_json["job_specification"] = self._context["job_specification"] - self.accountability.set_status("job-started") job_json = self.perform_adaptation_tasks(job_json) return job_json diff --git a/chimera/postprocess_evaluator.py b/chimera/postprocess_evaluator.py index 2b7f2b7..9d89e0c 100755 --- a/chimera/postprocess_evaluator.py +++ b/chimera/postprocess_evaluator.py @@ -73,6 +73,7 @@ def __init__( self._job_result = job_result elif isinstance(job_result, str): self._job_result = json.load(open(job_result, "r")) + self._job_result["work_dir"] = os.path.dirname(sf_context) logger.debug("Loaded job result: {}".format(json.dumps(self._job_result))) def prepare_psuedo_context(self, psuedo_context): diff --git a/chimera/postprocess_functions.py b/chimera/postprocess_functions.py index f57a4d0..3269cb3 100755 --- a/chimera/postprocess_functions.py +++ b/chimera/postprocess_functions.py @@ -5,7 +5,7 @@ from hysds.es_util import get_grq_es, get_mozart_es from chimera.commons.accountability import Accountability -from chimera.commons.constants import ChimeraConstants +from chimera.commons.constants import ChimeraConstants as chimera_consts from chimera.logger import logger @@ -19,7 +19,7 @@ def __init__(self, context, pge_config, settings, job_result, mozart_es=None, gr self._pge_config = pge_config self._settings = settings self._job_result = job_result - self.accountability = Accountability(self._context) + self.accountability = Accountability(self._context, self._job_result.get(chimera_consts.WORK_DIR)) if mozart_es: self._mozart_es = mozart_es else: @@ -39,10 +39,12 @@ def run(self, function_list): :return: a dictionary containing information about the results of the post PGE processes. """ output_context = dict() + logger.info( + "function_list: {}".format(function_list) + ) for func in function_list: self._job_result.update(getattr(self, func)()) - self.accountability.set_status("job-completed") return self._job_result def _check_job_status(self): diff --git a/chimera/precondition_functions.py b/chimera/precondition_functions.py index 9725352..e35c1dd 100755 --- a/chimera/precondition_functions.py +++ b/chimera/precondition_functions.py @@ -1,12 +1,9 @@ -from chimera.commons.accountability import Accountability - class PreConditionFunctions(object): def __init__(self, context, pge_config, settings, job_params): self._context = context self._pge_config = pge_config self._settings = settings self._job_params = job_params - self.accountability = Accountability(self._context) def run(self, function_list): """ @@ -16,7 +13,6 @@ def run(self, function_list): :return: a dictionary containing information about the results of the precondition evaluations. """ - self.accountability.set_status("job-pp-started") for func in function_list: self._job_params.update(getattr(self, func)()) diff --git a/chimera/run_sciflo.py b/chimera/run_sciflo.py index 6007a05..36f763a 100755 --- a/chimera/run_sciflo.py +++ b/chimera/run_sciflo.py @@ -5,6 +5,7 @@ import argparse import os +import json import sys from importlib import import_module @@ -20,7 +21,13 @@ # grabs accountability class if implemented and set in the sciflo jobspecs -def get_accountability_class(context): +def get_accountability_class(context_file): + work_dir = None + context = None + if isinstance(context_file, str): + work_dir = os.path.dirname(context_file) + with open(context_file, "r") as f: + context = json.load(f) path = context.get("module_path") if "accountability_module_path" in context: path = context.get("accountability_module_path") @@ -30,33 +37,17 @@ def get_accountability_class(context): LOGGER.error( "No accountability class specified" ) - return Accountability(context) + return Accountability(context, work_dir) cls = getattr(accountability_module, accountability_class_name) if not issubclass(cls, Accountability): LOGGER.error( "accountability class does not extend Accountability" ) - return Accountability(context) - cls_object = cls(context) + return Accountability(context, work_dir) + cls_object = cls(context, work_dir) return cls_object -# sets the accountability status as failed or doesn't do anything at all -def set_status_failed(context_file): - context = {} - with open(context_file, "r") as f: - import json - context = json.load(f) - try: - #from nisar_chimera.commons.accountability import NisarAccountability - accountability = get_accountability_class(context) - accountability.set_status("job-failed") - except Exception as e: - LOGGER.info("could not get accountability object") - LOGGER.info("path: {}".format(sys.path)) - LOGGER.error(e) - - def main(sfl_file, context_file, output_folder): """Main.""" @@ -65,10 +56,9 @@ def main(sfl_file, context_file, output_folder): output_file = os.path.abspath(output_folder) LOGGER.info("sfl_file: %s" % sfl_file) LOGGER.info("context_file: %s" % context_file) + accountability = get_accountability_class(context_file) + accountability.create_job_entry() result = run_sciflo(sfl_file, ["sf_context=%s" % context_file], output_folder) - if result != 0: - # sets status as failed if accountability implemented in chimera, otherwise, does nothing - set_status_failed(context_file) return result