Skip to content

Commit

Permalink
Refactored Chimera Framework (#4)
Browse files Browse the repository at this point in the history
* Fully refactored Chimera framework

* removing old files

* updated permissions of executables

* updated permissions of commons py scripts
  • Loading branch information
NamrataM authored Jun 9, 2020
1 parent c295574 commit 1150637
Show file tree
Hide file tree
Showing 73 changed files with 3,037 additions and 8,280 deletions.
12 changes: 12 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[flake8]
format = pylint
max-line-length = 120
ignore =
# E501: line too long
E501,
# W503: line break before binary operator
W503,
# E722: do not use bare 'except'
E722
statistics = 1
tee = 1
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
.idea/
__pycache__/
venv/
.DS_Store
19 changes: 17 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,17 @@
# chimera
Basic skeleton for creating a workflows using SciFlo. SciFlo is the workflow infrastructure integrated with the HySDS framework. This is a generic layout that can be used by any project to write up workflows consisting of basic steps: Pre processing, PGE execution, Post Processing.
# Chimera
Full Documentation: https://hysds-core.atlassian.net/wiki/spaces/HYS/pages/545914885/Chimera

This is a workflow concept implemented using SciFLo. It is a basic skeleton for creating a workflows using SciFlo and generalized to be easily adapted by any project. SciFlo is the workflow infrastructure integrated with the HySDS framework.

Any PGE can be run by running 3 steps:
1. Input Preprocessor
2. PGE Execution
3. Post Processor


Contributors:
- Michael Cayanan
- Sujen Shah git
- Namrata Malarout
- Gerald Manipon
- Frank Greguska
3 changes: 2 additions & 1 deletion __init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from . import commons
from __future__ import absolute_import
from .chimera import commons
2 changes: 2 additions & 0 deletions chimera/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# chimera
Basic skeleton for creating a workflows using SciFlo. SciFlo is the workflow infrastructure integrated with the HySDS framework. This is a generic layout that can be used by any project to write up workflows consisting of basic steps: Pre processing, PGE execution, Post Processing.
Empty file added chimera/__init__.py
Empty file.
Empty file added chimera/commons/__init__.py
Empty file.
139 changes: 139 additions & 0 deletions chimera/commons/conf_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
from __future__ import absolute_import
from builtins import object
import re
import yaml
import os
import json

from collections import OrderedDict

# have yaml parse regular expressions
yaml.SafeLoader.add_constructor(
u"tag:yaml.org,2002:python/regexp", lambda l, n: re.compile(l.construct_scalar(n))
)


class YamlConfEncoder(json.JSONEncoder):
"""Custom encoder for YamlConf."""

def default(self, obj):
if isinstance(obj, type(re.compile(r""))):
return obj.pattern
return super(YamlConfEncoder, self).default(obj)


class YamlConfError(Exception):
"""Exception class for YamlConf class."""

pass


class YamlConf(object):
"""YAML configuration class."""

def __init__(self, file):
"""Construct YamlConf instance."""

self._file = file
with open(self._file) as f:
self._cfg = yaml.safe_load(f)

@property
def file(self):
return self._file

@property
def cfg(self):
return self._cfg

def get(self, key):
try:
return self._cfg[key]
except KeyError:
raise YamlConfError

def __repr__(self):
return json.dumps(self._cfg, cls=YamlConfEncoder, indent=2)


class JobContext(object):
"""Job context class."""

def __init__(self, file):
"""Construct JobContext instance."""
self._file = file
with open(self._file) as f:
self._ctx = json.load(f)

@property
def file(self):
return self._file

@property
def ctx(self):
return self._ctx

def get(self, key):
try:
return self._ctx[key]
except KeyError:
raise (
Exception(
"Context '{}' doesn't exist in {}.".format(key, self._file)
)
)

def set(self, key, val):
self._ctx[key] = val

def save(self):
with open(self._file, "w") as f:
json.dump(self._ctx, f, indent=2, sort_keys=True)



class DockerParams(object):
"""Job context class."""

def __init__(self, file):
"""Construct DockerParams instance."""
self._file = file
with open(self._file) as f:
self._params = json.load(f)

@property
def file(self):
return self._file

@property
def params(self):
return self._params

def get(self, key):
try:
return self._params[key]
except KeyError:
raise (
Exception(
"Docker params '{}' doesn't exist in {}.".format(key, self._file)
)
)


def load_config(config_filepath):
# load config file
config_ext = os.path.splitext(config_filepath)[1]
if config_ext == ".json":
try:
config = json.load(open(config_filepath, 'r'), object_pairs_hook=OrderedDict)
except Exception as e:
raise RuntimeError("Could not load Config : {}".format(e))
elif config_ext == ".yaml":
try:
config = YamlConf(config_filepath).cfg
except Exception as e:
raise RuntimeError("Could not load Config : {}".format(e))
else:
raise RuntimeError("Config file must end in .yaml or .json: {}".format(config_filepath))

return config
75 changes: 75 additions & 0 deletions chimera/commons/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
"""
Add field names from PGE config files, names of functions,
match patterns or key names that can be referenced throughout code base
Note: To add new keys, please follow an alphabetical order
e.g.
LOCALIZE_KEY = "localize" # name of key found in input preprocessor output
GET_PGE_NAME = "pge_name" # name of key found in PGE config file
GET_ICE_SCLK = "getIceSclk" # name of function
"""


class ChimeraConstants(object):
def __init__(self):
pass

# PGE's name
PGE_NAME = "pge_name"

# To identify the preconditions to check for
PRECONDITIONS = "preconditions"

# To identify the post processing steps to run
POSTPROCESS = "postprocess"

# Key identifying the payload in the _context file
RUNCONFIG = "runconfig"

# To Specify which group elements to localize
LOCALIZE_GROUPS = "localize_groups"

# To specify which filepaths to localize in the worker. Used by Mozart
LOCALIZE = "localize"
CONFIGURATION = "configuration"
PRODUCTION_DATETIME = "ProductionDateTime"

# Key in runconfig for list of inputs
RC_INPUT = "InputFilePath"

# To identify file type level conditions
CONDITIONS = "conditions"

# Keys for identifying in the post_processor produced context.json
PRODUCTS_ID = "product_ids"

# primary input key in PGE config
PRIMARY_INPUT = "primary_input"

# field to specify optionals runconfig fields
OPTIONAL_FIELDS = "optionalFields"

# Key used in post processor to identify the metadata of all products generated
# This is a list of dictionaries
PRODUCTS_METADATA = "product_metadata"

# Key used to identify output products from the previous PGE run
PRODUCT_NAMES = "product_names"

# Key used to identify the path of the products created by the previous PGE
PRODUCT_PATHS = "product_paths"

RELEASE_VERSION = "release_version"

SIMULATE_OUTPUTS = "simulate_outputs"

PGE_SIM_MODE = "PGE_SIMULATION_MODE"

OUTPUT_TYPES = "output_types"

LAST_MOD_TIME = "LastModifiedTime"

JOB_TYPES = "JOB_TYPES"

JOB_QUEUES = "JOB_QUEUES"
94 changes: 94 additions & 0 deletions chimera/commons/sciflo_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
#!/usr/bin/env python
import os
import json
import re
import shutil

WORK_RE = re.compile(r'\d{5}-.+')

# sciflo PGE process names and mapping to their config files
# This is the list of PGEs that need to report status to an explict index


def copy_sciflo_work(output_dir):
"""Move over smap_sciflo work dirs."""

for root, dirs, files in os.walk(output_dir):
for d in dirs:
if not WORK_RE.search(d):
continue
path = os.path.join(root, d)
if os.path.islink(path) and os.path.exists(path):
real_path = os.path.realpath(path)
base_name = os.path.basename(real_path)
new_path = os.path.join(root, base_name)
shutil.copytree(real_path, new_path)
os.unlink(path)
os.symlink(base_name, path)
return


def extract_error(sfl_json):
"""Extract SciFlo error and traceback for mozart."""

with open(sfl_json) as f:
j = json.load(f)
exc_message = j.get('exceptionMessage', None)
if exc_message is not None:
try:
exc_list = eval(exc_message)
except Exception:
exc_list = []
if len(exc_list) == 3:
proc = exc_list[0]
exc = exc_list[1]
tb = exc_list[2]
try:
exc = eval(exc)
except Exception:
pass
if isinstance(exc, tuple) and len(exc) == 2:
err = exc[0]
job_json = exc[1]
if isinstance(job_json, dict):
if 'job_id' in job_json:
err_str = 'SciFlo step %s with job_id %s (task %s) failed: %s' % \
(proc, job_json['job_id'],
job_json['uuid'], err)
with open('_alt_error.txt', 'w') as f:
f.write("%s\n" % err_str)
with open('_alt_traceback.txt', 'w') as f:
f.write("%s\n" % job_json['traceback'])
else:
err_str = 'SciFlo step %s failed: %s' % (proc, exc)
with open('_alt_error.txt', 'w') as f:
f.write("%s\n" % err_str)
with open('_alt_traceback.txt', 'w') as f:
f.write("%s\n" % tb)


def run_sciflo(sfl_file, sfl_args, output_dir):
"""Run sciflo."""

# build paths to executables
sflexec_path = os.path.join(
os.environ['HOME'], 'verdi', 'bin', 'sflExec.py')

# execute sciflo
cmd = [sflexec_path, "-s", "-f", "-o", output_dir,
"--args", '"%s"' % ','.join(sfl_args), sfl_file]
print("Running sflExec.py command:\n%s" % ' '.join(cmd))
status = os.system(' '.join(cmd))
sf_key, context_file = sfl_args[0].split("=")
print("Exit status is: %d" % status)
if status != 0:
extract_error('%s/sciflo.json' % output_dir)
status = 1

# copy smap_sciflo work and exec dir
try:
copy_sciflo_work(output_dir)
except Exception:
pass

return status
File renamed without changes.
13 changes: 13 additions & 0 deletions chimera/configs/chimera_config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# module_path is a path to where the given class can be found

# This tells Chimera how to load in the precondition functions class in order to do the input preprocessor step.
# class_name must be a subclass of Chimera's PreConditionFunctions class.
preprocessor:
module_path: chimera.precondition_functions
class_name: PreConditionFunctions

# This tells Chimera how to load in the job submitter class in order to do the PGE job submission step.
# class_name must be a subclass of Chimera's PgeJobSubmitter class.
job_submitter:
module_path: chimera.pge_job_submitter
class_name: PgeJobSubmitter
Loading

0 comments on commit 1150637

Please sign in to comment.