Skip to content

Commit

Permalink
NSDS-1730: implement accountability re-design (#15)
Browse files Browse the repository at this point in the history
* Fixed issue when preparing the psuedo_context

* Incremented version number

* fixed up creation of cls_object

* NSDS-913: Implemented accountability in chimera core

* NSDS-913: fixed up imports

* Added env python

* fixed imports?

* Added _id to job_type in accountability.py

* fixed sciflo issue

* added the ability to grab the accountability module path.

* Trying to fix up how I grab the accountability class

* removed use of logger in sciflo_util.py

* Fixed accountability failure implementation

* Added an index field that should be set and added 3 more functions that are recommended to be overwritten when implementing a custom Accountability object.

* added logging for PostProcess run

* NSDS-1730: Added changes in run_sciflo for new __init__ of Accountability object. Added to Accountability's __init__. Updated how we initialize accountability in the PostProcessFunctions __init__. Added to job fields to constants.py

* NSDS-1730: updated accountability to be cleaner.

* NSDS-1730: removed unused constants

* NSDS-1730: added "set_products" function in accountability object

* NSDS-1730: removed accountability from unecessary places.

* NSDS-1730: fixed reading of context file mistake

* NSDS-1730: fixed getcwd issue

* NSDS-1730: fixed create_job_entry issue

* NSDS-1730: updated how we pass in the work_dir to the accountability object.

* NSDS-1730: removed work_dir as a optional param in Accountability

* NSDS-1730: removed unecessary pge_config.

* NSDS-1730: removed check if a job failed for the old accountability

Co-authored-by: Poreh <[email protected]>
Co-authored-by: Ben Poreh <[email protected]>
Co-authored-by: bporeh <[email protected]>
  • Loading branch information
4 people authored Oct 29, 2021
1 parent bff5cc8 commit 6122eec
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 51 deletions.
26 changes: 14 additions & 12 deletions chimera/commons/accountability.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@
#!/usr/bin/env python
import json
from chimera.commons.constants import ChimeraConstants as chimera_const


class Accountability(object):
def __init__(self, context, index=None):
self.input_dataset_type = context.get(chimera_const.INPUT_DATASET_TYPE) + "_id"
self.input_dataset_id = context.get(chimera_const.INPUT_DATASET_ID)
self.step = context.get(chimera_const.STEP)
self.index = index
def __init__(self, context, work_dir):
self.context = context
self.job_json = None
self.job_id = None
self.work_dir = work_dir
if work_dir is not None:
with open("{}/_job.json".format(work_dir), "r") as f:
self.job_json = json.load(f)

def _search(self, query):
pass
self.job_id = self.job_json.get(chimera_const.JOB_INFO).get(chimera_const.JOB_PAYLOAD).get(chimera_const.PAYLOAD_TASK_ID)

def _update_doc(self, id, body):
def get_entries(self):
pass

def get_entries(self):
def create_job_entry(self):
pass

def set_status(self, status):
pass
def set_products(self, job_results):
pass
14 changes: 9 additions & 5 deletions chimera/commons/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,16 @@ def __init__(self):

LAST_MOD_TIME = "LastModifiedTime"

JOB_TYPES = "JOB_TYPES"
JOB_INFO = "job_info"

JOB_QUEUES = "JOB_QUEUES"
JOB_PAYLOAD = "job_payload"

PAYLOAD_TASK_ID = "payload_task_id"

INPUT_DATASET_TYPE = "dataset_type"
JOB_ID_FIELD = "job_id"

INPUT_DATASET_ID = "input_dataset_id"
JOB_TYPES = "JOB_TYPES"

JOB_QUEUES = "JOB_QUEUES"

STEP = "purpose"
WORK_DIR = "work_dir"
1 change: 1 addition & 0 deletions chimera/commons/sciflo_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def copy_sciflo_work(output_dir):
shutil.copytree(real_path, new_path)
return


def extract_error(sfl_json):
"""Extract SciFlo error and traceback for mozart."""

Expand Down
4 changes: 0 additions & 4 deletions chimera/pge_job_submitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@

import json
import os

from chimera.commons.accountability import Accountability
from chimera.commons.constants import ChimeraConstants as chimera_const
from chimera.commons.conf_util import load_config, YamlConf
from chimera.logger import logger
Expand All @@ -22,7 +20,6 @@ def __init__(self, context, run_config, pge_config_file, settings_file, wuid=Non
self._context = context
elif isinstance(context, str):
self._context = json.load(open(context, 'r'))
self.accountability = Accountability(self._context)
logger.debug("Loaded context file: {}".format(json.dumps(self._context)))

# This is intended to represent the top level working directory of the job. It's assumed to be at the same
Expand Down Expand Up @@ -216,7 +213,6 @@ def submit_job(self):
# information, if specified
if "job_specification" in self._context:
job_json["job_specification"] = self._context["job_specification"]
self.accountability.set_status("job-started")
job_json = self.perform_adaptation_tasks(job_json)

return job_json
1 change: 1 addition & 0 deletions chimera/postprocess_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ def __init__(
self._job_result = job_result
elif isinstance(job_result, str):
self._job_result = json.load(open(job_result, "r"))
self._job_result["work_dir"] = os.path.dirname(sf_context)
logger.debug("Loaded job result: {}".format(json.dumps(self._job_result)))

def prepare_psuedo_context(self, psuedo_context):
Expand Down
8 changes: 5 additions & 3 deletions chimera/postprocess_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from hysds.es_util import get_grq_es, get_mozart_es

from chimera.commons.accountability import Accountability
from chimera.commons.constants import ChimeraConstants
from chimera.commons.constants import ChimeraConstants as chimera_consts

from chimera.logger import logger

Expand All @@ -19,7 +19,7 @@ def __init__(self, context, pge_config, settings, job_result, mozart_es=None, gr
self._pge_config = pge_config
self._settings = settings
self._job_result = job_result
self.accountability = Accountability(self._context)
self.accountability = Accountability(self._context, self._job_result.get(chimera_consts.WORK_DIR))
if mozart_es:
self._mozart_es = mozart_es
else:
Expand All @@ -39,10 +39,12 @@ def run(self, function_list):
:return: a dictionary containing information about the results of the post PGE processes.
"""
output_context = dict()
logger.info(
"function_list: {}".format(function_list)
)
for func in function_list:
self._job_result.update(getattr(self, func)())

self.accountability.set_status("job-completed")
return self._job_result

def _check_job_status(self):
Expand Down
4 changes: 0 additions & 4 deletions chimera/precondition_functions.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
from chimera.commons.accountability import Accountability

class PreConditionFunctions(object):
def __init__(self, context, pge_config, settings, job_params):
self._context = context
self._pge_config = pge_config
self._settings = settings
self._job_params = job_params
self.accountability = Accountability(self._context)

def run(self, function_list):
"""
Expand All @@ -16,7 +13,6 @@ def run(self, function_list):
:return: a dictionary containing information about the results of the precondition evaluations.
"""
self.accountability.set_status("job-pp-started")
for func in function_list:
self._job_params.update(getattr(self, func)())

Expand Down
36 changes: 13 additions & 23 deletions chimera/run_sciflo.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import argparse
import os
import json
import sys
from importlib import import_module

Expand All @@ -20,7 +21,13 @@


# grabs accountability class if implemented and set in the sciflo jobspecs
def get_accountability_class(context):
def get_accountability_class(context_file):
work_dir = None
context = None
if isinstance(context_file, str):
work_dir = os.path.dirname(context_file)
with open(context_file, "r") as f:
context = json.load(f)
path = context.get("module_path")
if "accountability_module_path" in context:
path = context.get("accountability_module_path")
Expand All @@ -30,33 +37,17 @@ def get_accountability_class(context):
LOGGER.error(
"No accountability class specified"
)
return Accountability(context)
return Accountability(context, work_dir)
cls = getattr(accountability_module, accountability_class_name)
if not issubclass(cls, Accountability):
LOGGER.error(
"accountability class does not extend Accountability"
)
return Accountability(context)
cls_object = cls(context)
return Accountability(context, work_dir)
cls_object = cls(context, work_dir)
return cls_object


# sets the accountability status as failed or doesn't do anything at all
def set_status_failed(context_file):
context = {}
with open(context_file, "r") as f:
import json
context = json.load(f)
try:
#from nisar_chimera.commons.accountability import NisarAccountability
accountability = get_accountability_class(context)
accountability.set_status("job-failed")
except Exception as e:
LOGGER.info("could not get accountability object")
LOGGER.info("path: {}".format(sys.path))
LOGGER.error(e)


def main(sfl_file, context_file, output_folder):
"""Main."""

Expand All @@ -65,10 +56,9 @@ def main(sfl_file, context_file, output_folder):
output_file = os.path.abspath(output_folder)
LOGGER.info("sfl_file: %s" % sfl_file)
LOGGER.info("context_file: %s" % context_file)
accountability = get_accountability_class(context_file)
accountability.create_job_entry()
result = run_sciflo(sfl_file, ["sf_context=%s" % context_file], output_folder)
if result != 0:
# sets status as failed if accountability implemented in chimera, otherwise, does nothing
set_status_failed(context_file)
return result


Expand Down

0 comments on commit 6122eec

Please sign in to comment.