From 9b6f8404ac4507d14adc404b77cfdf002b55e832 Mon Sep 17 00:00:00 2001 From: Rahul Mahajan Date: Tue, 7 May 2024 00:14:36 -0400 Subject: [PATCH] Add task to prepare emissions for GEFS (#2562) This PR: - introduces a task to prepare emissions for a forecast into the GEFS application. - adds configuration, j-job, rocoto job, ex-script and the python class for this job - updates GEFS workflow to be able to generate the XML to call this job. - updates the `fcst` and `efcs` job dependencies in the GEFS application to depend on `prep_emissions` if aerosols are turned ON. - provides a placeholder for @bbakernoaa to work on the details for preparing emissions. Co-authored-by: Walter Kolczynski - NOAA --- env/HERA.env | 4 ++ env/HERCULES.env | 4 ++ env/JET.env | 4 ++ env/ORION.env | 4 ++ env/S4.env | 4 ++ env/WCOSS2.env | 4 ++ jobs/JGLOBAL_PREP_EMISSIONS | 35 +++++++++++ jobs/rocoto/prep_emissions.sh | 23 +++++++ parm/config/gefs/config.prep_emissions | 11 ++++ parm/config/gefs/config.resources | 8 +++ scripts/exglobal_prep_emissions.py | 25 ++++++++ ush/python/pygfs/__init__.py | 16 +++++ ush/python/pygfs/task/aero_emissions.py | 82 +++++++++++++++++++++++++ workflow/applications/gefs.py | 6 ++ workflow/rocoto/gefs_tasks.py | 29 +++++++++ 15 files changed, 259 insertions(+) create mode 100755 jobs/JGLOBAL_PREP_EMISSIONS create mode 100755 jobs/rocoto/prep_emissions.sh create mode 100644 parm/config/gefs/config.prep_emissions create mode 100755 scripts/exglobal_prep_emissions.py create mode 100644 ush/python/pygfs/task/aero_emissions.py diff --git a/env/HERA.env b/env/HERA.env index 6ce99f8e90..68dbd4d396 100755 --- a/env/HERA.env +++ b/env/HERA.env @@ -49,6 +49,10 @@ elif [[ "${step}" = "prepsnowobs" ]]; then export APRUN_CALCFIMS="${launcher} -n 1" +elif [[ "${step}" = "prep_emissions" ]]; then + + export APRUN="${launcher} -n 1" + elif [[ "${step}" = "waveinit" ]] || [[ "${step}" = "waveprep" ]] || [[ "${step}" = "wavepostsbs" ]] || [[ "${step}" = "wavepostbndpnt" ]] || [[ "${step}" = "wavepostbndpntbll" ]] || [[ "${step}" = "wavepostpnt" ]]; then export CFP_MP="YES" diff --git a/env/HERCULES.env b/env/HERCULES.env index da5ad972f2..0b62120536 100755 --- a/env/HERCULES.env +++ b/env/HERCULES.env @@ -45,6 +45,10 @@ case ${step} in export APRUN_CALCFIMS="${launcher} -n 1" ;; + "prep_emissions") + + export APRUN="${launcher} -n 1" + ;; "waveinit" | "waveprep" | "wavepostsbs" | "wavepostbndpnt" | "wavepostpnt" | "wavepostbndpntbll") export CFP_MP="YES" diff --git a/env/JET.env b/env/JET.env index 3b4c2c2c53..976e42a025 100755 --- a/env/JET.env +++ b/env/JET.env @@ -37,6 +37,10 @@ elif [[ "${step}" = "prepsnowobs" ]]; then export APRUN_CALCFIMS="${launcher} -n 1" +elif [[ "${step}" = "prep_emissions" ]]; then + + export APRUN="${launcher} -n 1" + elif [[ "${step}" = "waveinit" ]] || [[ "${step}" = "waveprep" ]] || [[ "${step}" = "wavepostsbs" ]] || [[ "${step}" = "wavepostbndpnt" ]] || [[ "${step}" = "wavepostbndpntbll" ]] || [[ "${step}" = "wavepostpnt" ]]; then export CFP_MP="YES" diff --git a/env/ORION.env b/env/ORION.env index 6aac84a169..795346f0c6 100755 --- a/env/ORION.env +++ b/env/ORION.env @@ -44,6 +44,10 @@ elif [[ "${step}" = "prepsnowobs" ]]; then export APRUN_CALCFIMS="${launcher} -n 1" +elif [[ "${step}" = "prep_emissions" ]]; then + + export APRUN="${launcher} -n 1" + elif [[ "${step}" = "waveinit" ]] || [[ "${step}" = "waveprep" ]] || [[ "${step}" = "wavepostsbs" ]] || \ [[ "${step}" = "wavepostbndpnt" ]] || [[ "${step}" = "wavepostpnt" ]] || [[ "${step}" == "wavepostbndpntbll" ]]; then diff --git a/env/S4.env b/env/S4.env index 9cbf8b7bdb..ce68fddb89 100755 --- a/env/S4.env +++ b/env/S4.env @@ -37,6 +37,10 @@ elif [[ "${step}" = "prepsnowobs" ]]; then export APRUN_CALCFIMS="${launcher} -n 1" +elif [[ "${step}" = "prep_emissions" ]]; then + + export APRUN="${launcher} -n 1" + elif [[ "${step}" = "waveinit" ]] || [[ "${step}" = "waveprep" ]] || [[ "${step}" = "wavepostsbs" ]] || [[ "${step}" = "wavepostbndpnt" ]] || [[ "${step}" = "wavepostbndpntbll" ]] || [[ "${step}" = "wavepostpnt" ]]; then export CFP_MP="YES" diff --git a/env/WCOSS2.env b/env/WCOSS2.env index ba55495655..ff0121e034 100755 --- a/env/WCOSS2.env +++ b/env/WCOSS2.env @@ -31,6 +31,10 @@ elif [[ "${step}" = "prepsnowobs" ]]; then export APRUN_CALCFIMS="${launcher} -n 1" +elif [[ "${step}" = "prep_emissions" ]]; then + + export APRUN="${launcher} -n 1" + elif [[ "${step}" = "waveinit" ]] || [[ "${step}" = "waveprep" ]] || [[ "${step}" = "wavepostsbs" ]] || [[ "${step}" = "wavepostbndpnt" ]] || [[ "${step}" = "wavepostbndpntbll" ]] || [[ "${step}" = "wavepostpnt" ]]; then export USE_CFP="YES" diff --git a/jobs/JGLOBAL_PREP_EMISSIONS b/jobs/JGLOBAL_PREP_EMISSIONS new file mode 100755 index 0000000000..84edac8e50 --- /dev/null +++ b/jobs/JGLOBAL_PREP_EMISSIONS @@ -0,0 +1,35 @@ +#! /usr/bin/env bash + +source "${HOMEgfs}/ush/preamble.sh" +source "${HOMEgfs}/ush/jjob_header.sh" -e "prep_emissions" -c "base prep_emissions" + +############################################## +# Set variables used in the script +############################################## +# TODO: Set local variables used in this script e.g. GDATE may be needed for previous cycle + +############################################## +# Begin JOB SPECIFIC work +############################################## +# Generate COM variables from templates +# TODO: Add necessary COMIN, COMOUT variables for this job + +############################################################### +# Run relevant script +EXSCRIPT=${PREP_EMISSIONS_PY:-${SCRgfs}/exglobal_prep_emissions.py} +${EXSCRIPT} +status=$? +(( status != 0 )) && ( echo "FATAL ERROR: Error executing ${EXSCRIPT}, ABORT!"; exit "${status}" ) + +############################################## +# End JOB SPECIFIC work +############################################## + +############################################## +# Final processing +############################################## +if [[ -e "${pgmout}" ]] ; then + cat "${pgmout}" +fi + +exit 0 diff --git a/jobs/rocoto/prep_emissions.sh b/jobs/rocoto/prep_emissions.sh new file mode 100755 index 0000000000..0677073947 --- /dev/null +++ b/jobs/rocoto/prep_emissions.sh @@ -0,0 +1,23 @@ +#! /usr/bin/env bash + +source "${HOMEgfs}/ush/preamble.sh" + +############################################################### +# Source UFSDA workflow modules +source "${HOMEgfs}/ush/load_fv3gfs_modules.sh" +status=$? +(( status != 0 )) && exit "${status}" + +export job="prep_emissions" +export jobid="${job}.$$" + +############################################################### +# setup python path for workflow utilities and tasks +PYTHONPATH="${PYTHONPATH:+${PYTHONPATH}:}${HOMEgfs}/ush/python" +export PYTHONPATH + +############################################################### +# Execute the JJOB +"${HOMEgfs}/jobs/JGLOBAL_PREP_EMISSIONS" +status=$? +exit "${status}" diff --git a/parm/config/gefs/config.prep_emissions b/parm/config/gefs/config.prep_emissions new file mode 100644 index 0000000000..fa411c27ad --- /dev/null +++ b/parm/config/gefs/config.prep_emissions @@ -0,0 +1,11 @@ +#! /usr/bin/env bash + +########## config.prep_emissions ########## +# aerosol emissions preprocessing specific + +echo "BEGIN: config.prep_emissions" + +# Get task specific resources +source "${EXPDIR}/config.resources" prep_emissions + +echo "END: config.prep_emissions" diff --git a/parm/config/gefs/config.resources b/parm/config/gefs/config.resources index 9bf62cf514..d98e437359 100644 --- a/parm/config/gefs/config.resources +++ b/parm/config/gefs/config.resources @@ -69,6 +69,14 @@ case ${step} in export memory_waveinit="2GB" ;; + "prep_emissions") + export wtime_prep_emissions="00:10:00" + export npe_prep_emissions=1 + export nth_prep_emissions=1 + export npe_node_prep_emissions=$(( npe_node_max / nth_prep_emissions )) + export memory_prep_emissions="1GB" + ;; + "fcst" | "efcs") export is_exclusive=True diff --git a/scripts/exglobal_prep_emissions.py b/scripts/exglobal_prep_emissions.py new file mode 100755 index 0000000000..ef0e709142 --- /dev/null +++ b/scripts/exglobal_prep_emissions.py @@ -0,0 +1,25 @@ +#!/usr/bin/env python3 +# exglobal_prep_emissions.py +# This script creates a emissions object +# which perform the pre-processing for aerosol emissions +import os + +from wxflow import Logger, cast_strdict_as_dtypedict +from pygfs import AerosolEmissions + + +# Initialize root logger +logger = Logger(level=os.environ.get("LOGGING_LEVEL", "DEBUG"), colored_log=True) + + +if __name__ == '__main__': + + # Take configuration from environment and cast it as python dictionary + config = cast_strdict_as_dtypedict(os.environ) + + # Instantiate the emissions pre-processing task + emissions = AerosolEmissions(config) + emissions.initialize() + emissions.configure() + emissions.execute(emissions.task_config.DATA, emissions.task_config.APRUN) + emissions.finalize() diff --git a/ush/python/pygfs/__init__.py b/ush/python/pygfs/__init__.py index e69de29bb2..fa6b0b373e 100644 --- a/ush/python/pygfs/__init__.py +++ b/ush/python/pygfs/__init__.py @@ -0,0 +1,16 @@ + +import os + +from .task.analysis import Analysis +from .task.aero_emissions import AerosolEmissions +from .task.aero_analysis import AerosolAnalysis +from .task.atm_analysis import AtmAnalysis +from .task.atmens_analysis import AtmEnsAnalysis +from .task.snow_analysis import SnowAnalysis +from .task.upp import UPP +from .task.oceanice_products import OceanIceProducts +from .task.gfs_forecast import GFSForecast + +__docformat__ = "restructuredtext" +__version__ = "0.1.0" +pygfs_directory = os.path.dirname(__file__) diff --git a/ush/python/pygfs/task/aero_emissions.py b/ush/python/pygfs/task/aero_emissions.py new file mode 100644 index 0000000000..17d2f528e4 --- /dev/null +++ b/ush/python/pygfs/task/aero_emissions.py @@ -0,0 +1,82 @@ +#!/usr/bin/env python3 + +import os +from logging import getLogger +from typing import Dict, Any, Union +from pprint import pformat + +from wxflow import (AttrDict, + parse_j2yaml, + FileHandler, + Jinja, + logit, + Task, + add_to_datetime, to_timedelta, + WorkflowException, + Executable, which) + +logger = getLogger(__name__.split('.')[-1]) + + +class AerosolEmissions(Task): + """Aerosol Emissions pre-processing Task + """ + + @logit(logger, name="AerosolEmissions") + def __init__(self, config: Dict[str, Any]) -> None: + """Constructor for the Aerosol Emissions task + + Parameters + ---------- + config : Dict[str, Any] + Incoming configuration for the task from the environment + + Returns + ------- + None + """ + super().__init__(config) + + local_variable = "something" + + localdict = AttrDict( + {'variable_used_repeatedly': local_variable} + ) + self.task_config = AttrDict(**self.config, **self.runtime_config, **localdict) + + @staticmethod + @logit(logger) + def initialize() -> None: + """Initialize the work directory + """ + + @staticmethod + @logit(logger) + def configure() -> None: + """Configure the artifacts in the work directory. + Copy run specific data to run directory + """ + + @staticmethod + @logit(logger) + def execute(workdir: Union[str, os.PathLike], aprun_cmd: str) -> None: + """Run the executable (if any) + + Parameters + ---------- + workdir : str | os.PathLike + work directory with the staged data, parm files, namelists, etc. + aprun_cmd : str + launcher command for executable.x + + Returns + ------- + None + """ + + @staticmethod + @logit(logger) + def finalize() -> None: + """Perform closing actions of the task. + Copy data back from the DATA/ directory to COM/ + """ diff --git a/workflow/applications/gefs.py b/workflow/applications/gefs.py index b14c1a9003..c165f9d1ca 100644 --- a/workflow/applications/gefs.py +++ b/workflow/applications/gefs.py @@ -27,6 +27,9 @@ def _get_app_configs(self): if self.do_ocean or self.do_ice: configs += ['oceanice_products'] + if self.do_aero: + configs += ['prep_emissions'] + return configs @staticmethod @@ -45,6 +48,9 @@ def get_task_names(self): if self.do_wave: tasks += ['waveinit'] + if self.do_aero: + tasks += ['prep_emissions'] + tasks += ['fcst'] if self.nens > 0: diff --git a/workflow/rocoto/gefs_tasks.py b/workflow/rocoto/gefs_tasks.py index 6ee079fdfa..86be494549 100644 --- a/workflow/rocoto/gefs_tasks.py +++ b/workflow/rocoto/gefs_tasks.py @@ -89,6 +89,27 @@ def waveinit(self): return task + def prep_emissions(self): + deps = [] + dep_dict = {'type': 'task', 'name': f'stage_ic'} + deps.append(rocoto.add_dependency(dep_dict)) + dependencies = rocoto.create_dependency(dep=deps) + + resources = self.get_resource('prep_emissions') + task_name = 'prep_emissions' + task_dict = {'task_name': task_name, + 'resources': resources, + 'envars': self.envars, + 'cycledef': 'gefs', + 'command': f'{self.HOMEgfs}/jobs/rocoto/prep_emissions.sh', + 'job_name': f'{self.pslot}_{task_name}_@H', + 'log': f'{self.rotdir}/logs/@Y@m@d@H/{task_name}.log', + 'maxtries': '&MAXTRIES;' + } + task = rocoto.create_task(task_dict) + + return task + def fcst(self): dependencies = [] dep_dict = {'type': 'task', 'name': f'stage_ic'} @@ -98,6 +119,10 @@ def fcst(self): dep_dict = {'type': 'task', 'name': f'wave_init'} dependencies.append(rocoto.add_dependency(dep_dict)) + if self.app_config.do_aero: + dep_dict = {'type': 'task', 'name': f'prep_emissions'} + dependencies.append(rocoto.add_dependency(dep_dict)) + dependencies = rocoto.create_dependency(dep_condition='and', dep=dependencies) resources = self.get_resource('fcst') @@ -125,6 +150,10 @@ def efcs(self): dep_dict = {'type': 'task', 'name': f'wave_init'} dependencies.append(rocoto.add_dependency(dep_dict)) + if self.app_config.do_aero: + dep_dict = {'type': 'task', 'name': f'prep_emissions'} + dependencies.append(rocoto.add_dependency(dep_dict)) + dependencies = rocoto.create_dependency(dep_condition='and', dep=dependencies) efcsenvars = self.envars.copy()