From e27f5df5bbe22323ebc583c1407259d8879a9b8e Mon Sep 17 00:00:00 2001 From: Walter Kolczynski - NOAA Date: Tue, 14 Jan 2025 19:08:13 -0500 Subject: [PATCH] Reinstate product groups (#3208) Testing with full-sized GEFS found that the sheer number of tasks overloads rocoto, resulting in `rocotorun` taking over 10 min to complete or hanging entirely. To reduce the number of tasks, product groups are reimplemented so that multiple forecast hour are processed in a single task. However, the implementation is a little different than previously. The jobs where groups are enabled (atmos_products, oceanice_products, wavepostsbs, atmos_ensstat, and gempak) have a new variable, `MAX_TASKS`, that controls how many groups to use. This setting is currently *per member*. The forecast hours to be processed are then divided into this many groups as evenly as possible without crossing forecast segment boundaries. The walltime for those jobs is then multiplied by the number of times in the largest group. For the gridded wave post job, the dependencies were also updated to trigger off of either the data being available or the appropriate segment completing (the dependencies had not been updated when the job was initially broken into fhrs). A number of helper methods are added to Tasks to determine these groups and make a standard metatask variable dict in a centralized location. There is also a function to multiply the walltime, but this may be better off relocated to wxflow with the other time functions. As part of switching from a single value to a list, hours are no longer passed by rocoto as zero-padded values. The lists are comma-delimited (without spaces) and split apart in the job stub (`jobs/rocoto/*`), so each j-job call is still a single forecast hour. The offline post (upp) job is not broken into groups, since it really isn't used outside the analysis anymore. Resolves #2999 Resolves #3210 --- jobs/rocoto/atmos_ensstat.sh | 21 ++-- jobs/rocoto/atmos_products.sh | 23 ++-- jobs/rocoto/gempak.sh | 19 +++- jobs/rocoto/oceanice_products.sh | 21 ++-- jobs/rocoto/wavepostsbs.sh | 21 ++-- parm/config/gefs/config.atmos_ensstat | 3 + parm/config/gefs/config.atmos_products | 4 +- parm/config/gefs/config.oceanice_products | 4 +- parm/config/gefs/config.resources | 10 +- parm/config/gefs/config.wavepostsbs | 3 + parm/config/gfs/config.atmos_products | 4 +- parm/config/gfs/config.gempak | 5 +- parm/config/gfs/config.oceanice_products | 3 + parm/config/gfs/config.resources | 8 +- parm/config/gfs/config.wavepostsbs | 3 + workflow/rocoto/gefs_tasks.py | 111 ++++++++++-------- workflow/rocoto/gfs_tasks.py | 84 ++++++++------ workflow/rocoto/tasks.py | 126 ++++++++++++++++++++- workflow/tests/__init__.py | 0 workflow/{ => tests}/test_configuration.py | 0 workflow/{ => tests}/test_hosts.py | 0 workflow/tests/test_tasks.py | 84 ++++++++++++++ 22 files changed, 431 insertions(+), 126 deletions(-) create mode 100644 workflow/tests/__init__.py rename workflow/{ => tests}/test_configuration.py (100%) rename workflow/{ => tests}/test_hosts.py (100%) create mode 100644 workflow/tests/test_tasks.py diff --git a/jobs/rocoto/atmos_ensstat.sh b/jobs/rocoto/atmos_ensstat.sh index 76ed7f0a72..617cbd77f8 100755 --- a/jobs/rocoto/atmos_ensstat.sh +++ b/jobs/rocoto/atmos_ensstat.sh @@ -13,13 +13,20 @@ status=$? if (( status != 0 )); then exit "${status}"; fi export job="atmos_ensstat" -export jobid="${job}.$$" -export FORECAST_HOUR=$(( 10#${FHR3} )) +# shellcheck disable=SC2153 +IFS=', ' read -r -a fhr_list <<< "${FHR_LIST}" -############################################################### -# Execute the JJOB -############################################################### -"${HOMEgfs}/jobs/JGLOBAL_ATMOS_ENSSTAT" +export FORECAST_HOUR jobid +for FORECAST_HOUR in "${fhr_list[@]}"; do + fhr3=$(printf '%03d' "${FORECAST_HOUR}") + jobid="${job}_f${fhr3}.$$" + ############################################################### + # Execute the JJOB + ############################################################### + "${HOMEgfs}/jobs/JGLOBAL_ATMOS_ENSSTAT" + status=$? + [[ ${status} -ne 0 ]] && exit "${status}" +done -exit $? +exit 0 diff --git a/jobs/rocoto/atmos_products.sh b/jobs/rocoto/atmos_products.sh index f6adbcf861..947b06dfc2 100755 --- a/jobs/rocoto/atmos_products.sh +++ b/jobs/rocoto/atmos_products.sh @@ -13,15 +13,20 @@ status=$? if (( status != 0 )); then exit "${status}"; fi export job="atmos_products" -export jobid="${job}.$$" -# Negatation needs to be before the base -fhr3_base="10#${FHR3}" -export FORECAST_HOUR=$(( ${fhr3_base/10#-/-10#} )) +# shellcheck disable=SC2153 +IFS=', ' read -r -a fhr_list <<< "${FHR_LIST}" -############################################################### -# Execute the JJOB -############################################################### -"${HOMEgfs}/jobs/JGLOBAL_ATMOS_PRODUCTS" +export FORECAST_HOUR jobid +for FORECAST_HOUR in "${fhr_list[@]}"; do + fhr3=$(printf '%03d' "${FORECAST_HOUR}") + jobid="${job}_f${fhr3}.$$" + ############################################################### + # Execute the JJOB + ############################################################### + "${HOMEgfs}/jobs/JGLOBAL_ATMOS_PRODUCTS" + status=$? + [[ ${status} -ne 0 ]] && exit "${status}" +done -exit $? +exit 0 diff --git a/jobs/rocoto/gempak.sh b/jobs/rocoto/gempak.sh index f5aea2379d..dc1d3f2621 100755 --- a/jobs/rocoto/gempak.sh +++ b/jobs/rocoto/gempak.sh @@ -6,11 +6,20 @@ status=$? if (( status != 0 )); then exit "${status}"; fi export job="gempak" -export jobid="${job}.$$" +# shellcheck disable=SC2153 +IFS=', ' read -r -a fhr_list <<< "${FHR_LIST}" -# Execute the JJOB -"${HOMEgfs}/jobs/J${RUN^^}_ATMOS_GEMPAK" +export FHR3 jobid +for fhr in "${fhr_list[@]}"; do + FHR3=$(printf '%03d' "${fhr}") + jobid="${job}_f${FHR3}.$$" + ############################################################### + # Execute the JJOB + ############################################################### + "${HOMEgfs}/jobs/J${RUN^^}_ATMOS_GEMPAK" + status=$? + [[ ${status} -ne 0 ]] && exit "${status}" +done -status=$? -exit "${status}" +exit 0 diff --git a/jobs/rocoto/oceanice_products.sh b/jobs/rocoto/oceanice_products.sh index 2a3b617d05..c3e03cea1a 100755 --- a/jobs/rocoto/oceanice_products.sh +++ b/jobs/rocoto/oceanice_products.sh @@ -13,13 +13,20 @@ status=$? if (( status != 0 )); then exit "${status}"; fi export job="oceanice_products" -export jobid="${job}.$$" -export FORECAST_HOUR=$(( 10#${FHR3} )) +# shellcheck disable=SC2153 +IFS=', ' read -r -a fhr_list <<< "${FHR_LIST}" -############################################################### -# Execute the JJOB -############################################################### -"${HOMEgfs}/jobs/JGLOBAL_OCEANICE_PRODUCTS" +export FORECAST_HOUR jobid +for FORECAST_HOUR in "${fhr_list[@]}"; do + fhr3=$(printf '%03d' "${FORECAST_HOUR}") + jobid="${job}_${COMPONENT}_f${fhr3}.$$" + ############################################################### + # Execute the JJOB + ############################################################### + "${HOMEgfs}/jobs/JGLOBAL_OCEANICE_PRODUCTS" + status=$? + [[ ${status} -ne 0 ]] && exit "${status}" +done -exit $? +exit 0 diff --git a/jobs/rocoto/wavepostsbs.sh b/jobs/rocoto/wavepostsbs.sh index f4789210d8..ff81c2a9d3 100755 --- a/jobs/rocoto/wavepostsbs.sh +++ b/jobs/rocoto/wavepostsbs.sh @@ -5,17 +5,24 @@ source "${HOMEgfs}/ush/preamble.sh" ############################################################### # Source FV3GFS workflow modules #. ${HOMEgfs}/ush/load_fv3gfs_modules.sh -. ${HOMEgfs}/ush/load_ufswm_modules.sh +source "${HOMEgfs}/ush/load_ufswm_modules.sh" status=$? -[[ ${status} -ne 0 ]] && exit ${status} +[[ ${status} -ne 0 ]] && exit "${status}" export job="wavepostsbs" -export jobid="${job}.$$" ############################################################### -# Execute the JJOB -${HOMEgfs}/jobs/JGLOBAL_WAVE_POST_SBS -status=$? -[[ ${status} -ne 0 ]] && exit ${status} +# shellcheck disable=SC2153 +IFS=', ' read -r -a fhr_list <<< "${FHR_LIST}" + +export FHR3 jobid +for FORECAST_HOUR in "${fhr_list[@]}"; do + FHR3=$(printf '%03d' "${FORECAST_HOUR}") + jobid="${job}_f${FHR3}.$$" + # Execute the JJOB + "${HOMEgfs}/jobs/JGLOBAL_WAVE_POST_SBS" + status=$? + [[ ${status} -ne 0 ]] && exit "${status}" +done exit 0 diff --git a/parm/config/gefs/config.atmos_ensstat b/parm/config/gefs/config.atmos_ensstat index d371f75887..b542659523 100644 --- a/parm/config/gefs/config.atmos_ensstat +++ b/parm/config/gefs/config.atmos_ensstat @@ -5,6 +5,9 @@ echo "BEGIN: config.atmos_ensstat" +# Maximum number of rocoto tasks +export MAX_TASKS=25 + # Get task specific resources . "${EXPDIR}/config.resources" atmos_ensstat diff --git a/parm/config/gefs/config.atmos_products b/parm/config/gefs/config.atmos_products index e8aae324e1..d1f36a7bc9 100644 --- a/parm/config/gefs/config.atmos_products +++ b/parm/config/gefs/config.atmos_products @@ -8,8 +8,8 @@ echo "BEGIN: config.atmos_products" # Get task specific resources . "${EXPDIR}/config.resources" atmos_products -# No. of forecast hours to process in a single job -export NFHRS_PER_GROUP=3 +# Maximum number of rocoto tasks per member +export MAX_TASKS=25 # Scripts used by this job export INTERP_ATMOS_MASTERSH="${USHgfs}/interp_atmos_master.sh" diff --git a/parm/config/gefs/config.oceanice_products b/parm/config/gefs/config.oceanice_products index 3b8b064947..6bb604d0ca 100644 --- a/parm/config/gefs/config.oceanice_products +++ b/parm/config/gefs/config.oceanice_products @@ -9,7 +9,7 @@ source "${EXPDIR}/config.resources" oceanice_products export OCEANICEPRODUCTS_CONFIG="${PARMgfs}/post/oceanice_products_gefs.yaml" -# No. of forecast hours to process in a single job -export NFHRS_PER_GROUP=3 +# Maximum number of rocoto tasks per member +export MAX_TASKS=25 echo "END: config.oceanice_products" diff --git a/parm/config/gefs/config.resources b/parm/config/gefs/config.resources index 17033858c8..bb33f3eb02 100644 --- a/parm/config/gefs/config.resources +++ b/parm/config/gefs/config.resources @@ -234,6 +234,7 @@ case ${step} in ;; "atmos_products") + # Walltime is per forecast hour; will be multipled by group size export walltime="00:15:00" export ntasks=24 export threads_per_task=1 @@ -242,7 +243,8 @@ case ${step} in ;; "atmos_ensstat") - export walltime="00:30:00" + # Walltime is per forecast hour; will be multipled by group size + export walltime="00:15:00" export ntasks=6 export threads_per_task=1 export tasks_per_node="${ntasks}" @@ -250,6 +252,7 @@ case ${step} in ;; "oceanice_products") + # Walltime is per forecast hour; will be multipled by group size export walltime="00:15:00" export ntasks=1 export tasks_per_node=1 @@ -258,7 +261,8 @@ case ${step} in ;; "wavepostsbs") - export walltime="03:00:00" + # Walltime is per forecast hour; will be multipled by group size + export walltime="00:15:00" export ntasks=1 export threads_per_task=1 export tasks_per_node=$(( max_tasks_per_node / threads_per_task )) @@ -328,7 +332,7 @@ case ${step} in ;; "cleanup") - export walltime="00:15:00" + export walltime="00:30:00" export ntasks=1 export tasks_per_node=1 export threads_per_task=1 diff --git a/parm/config/gefs/config.wavepostsbs b/parm/config/gefs/config.wavepostsbs index 82cec321da..b43ea33d40 100644 --- a/parm/config/gefs/config.wavepostsbs +++ b/parm/config/gefs/config.wavepostsbs @@ -8,6 +8,9 @@ echo "BEGIN: config.wavepostsbs" # Get task specific resources source "${EXPDIR}/config.resources" wavepostsbs +# Maximum number of rocoto tasks per member +export MAX_TASKS=25 + # Subgrid info for grib2 encoding export WAV_SUBGRBSRC="" export WAV_SUBGRB="" diff --git a/parm/config/gfs/config.atmos_products b/parm/config/gfs/config.atmos_products index 451f5eff86..5b6e4067b5 100644 --- a/parm/config/gfs/config.atmos_products +++ b/parm/config/gfs/config.atmos_products @@ -8,8 +8,8 @@ echo "BEGIN: config.atmos_products" # Get task specific resources . "${EXPDIR}/config.resources" atmos_products -# No. of forecast hours to process in a single job -export NFHRS_PER_GROUP=3 +## Maximum number of rocoto tasks per member +export MAX_TASKS=25 # Scripts used by this job export INTERP_ATMOS_MASTERSH="${USHgfs}/interp_atmos_master.sh" diff --git a/parm/config/gfs/config.gempak b/parm/config/gfs/config.gempak index 791770ba4a..db5e85af3f 100644 --- a/parm/config/gfs/config.gempak +++ b/parm/config/gfs/config.gempak @@ -5,7 +5,10 @@ echo "BEGIN: config.gempak" +# Maximum number of rocoto tasks per member +export MAX_TASKS=25 + # Get task specific resources -. $EXPDIR/config.resources gempak +source "${EXPDIR}/config.resources" gempak echo "END: config.gempak" diff --git a/parm/config/gfs/config.oceanice_products b/parm/config/gfs/config.oceanice_products index 9e5c5b1c68..a618cbe10c 100644 --- a/parm/config/gfs/config.oceanice_products +++ b/parm/config/gfs/config.oceanice_products @@ -7,6 +7,9 @@ echo "BEGIN: config.oceanice_products" # Get task specific resources source "${EXPDIR}/config.resources" oceanice_products +# Maximum number of rocoto tasks per member +export MAX_TASKS=25 + export OCEANICEPRODUCTS_CONFIG="${PARMgfs}/post/oceanice_products.yaml" # No. of forecast hours to process in a single job diff --git a/parm/config/gfs/config.resources b/parm/config/gfs/config.resources index 6a85a6de14..0eea92cbde 100644 --- a/parm/config/gfs/config.resources +++ b/parm/config/gfs/config.resources @@ -191,8 +191,9 @@ case ${step} in ;; "wavepostsbs") - walltime_gdas="00:20:00" - walltime_gfs="03:00:00" + # Walltime is per forecast hour; will be multipled by group size + walltime_gdas="00:15:00" + walltime_gfs="00:15:00" ntasks=8 threads_per_task=1 tasks_per_node=$(( max_tasks_per_node / threads_per_task )) @@ -911,6 +912,7 @@ case ${step} in ;; "oceanice_products") + # Walltime is per forecast hour; will be multipled by group size walltime="00:15:00" ntasks=1 tasks_per_node=1 @@ -949,6 +951,7 @@ case ${step} in ;; "atmos_products") + # Walltime is per forecast hour; will be multipled by group size walltime="00:15:00" ntasks=24 threads_per_task=1 @@ -1278,6 +1281,7 @@ case ${step} in ;; "gempak") + # Walltime is per forecast hour; will be multipled by group size walltime="00:30:00" ntasks_gdas=2 ntasks_gfs=28 diff --git a/parm/config/gfs/config.wavepostsbs b/parm/config/gfs/config.wavepostsbs index 82cec321da..b43ea33d40 100644 --- a/parm/config/gfs/config.wavepostsbs +++ b/parm/config/gfs/config.wavepostsbs @@ -8,6 +8,9 @@ echo "BEGIN: config.wavepostsbs" # Get task specific resources source "${EXPDIR}/config.resources" wavepostsbs +# Maximum number of rocoto tasks per member +export MAX_TASKS=25 + # Subgrid info for grib2 encoding export WAV_SUBGRBSRC="" export WAV_SUBGRB="" diff --git a/workflow/rocoto/gefs_tasks.py b/workflow/rocoto/gefs_tasks.py index ca29bcdf1e..f89d3dbbb0 100644 --- a/workflow/rocoto/gefs_tasks.py +++ b/workflow/rocoto/gefs_tasks.py @@ -190,39 +190,57 @@ def _atmosoceaniceprod(self, component: str): fhout_ice_gfs = self._configs['base']['FHOUT_ICE_GFS'] products_dict = {'atmos': {'config': 'atmos_products', 'history_path_tmpl': 'COM_ATMOS_MASTER_TMPL', - 'history_file_tmpl': f'{self.run}.t@Hz.master.grb2f#fhr#'}, + 'history_file_tmpl': f'{self.run}.t@Hz.master.grb2f#fhr3_last#'}, 'ocean': {'config': 'oceanice_products', 'history_path_tmpl': 'COM_OCEAN_HISTORY_TMPL', - 'history_file_tmpl': f'{self.run}.ocean.t@Hz.{fhout_ocn_gfs}hr_avg.f#fhr_next#.nc'}, + 'history_file_tmpl': f'{self.run}.ocean.t@Hz.{fhout_ocn_gfs}hr_avg.f#fhr3_next#.nc'}, 'ice': {'config': 'oceanice_products', 'history_path_tmpl': 'COM_ICE_HISTORY_TMPL', - 'history_file_tmpl': f'{self.run}.ice.t@Hz.{fhout_ice_gfs}hr_avg.f#fhr#.nc'}} + 'history_file_tmpl': f'{self.run}.ice.t@Hz.{fhout_ice_gfs}hr_avg.f#fhr3_last#.nc'}} component_dict = products_dict[component] config = component_dict['config'] history_path_tmpl = component_dict['history_path_tmpl'] history_file_tmpl = component_dict['history_file_tmpl'] + max_tasks = self._configs[config]['MAX_TASKS'] resources = self.get_resource(config) + fhrs = self._get_forecast_hours('gefs', self._configs[config], component) + + # when replaying, atmos component does not have fhr 0, therefore remove 0 from fhrs + is_replay = self._configs[config]['REPLAY_ICS'] + if is_replay and component in ['atmos'] and 0 in fhrs: + fhrs.remove(0) + + # ocean/ice components do not have fhr 0 as they are averaged output + if component in ['ocean', 'ice'] and 0 in fhrs: + fhrs.remove(0) + + fhr_var_dict = self.get_grouped_fhr_dict(fhrs=fhrs, ngroups=max_tasks) + + # Adjust walltime based on the largest group + largest_group = max([len(grp.split(',')) for grp in fhr_var_dict['fhr_list'].split(' ')]) + resources['walltime'] = Tasks.multiply_HMS(resources['walltime'], largest_group) + history_path = self._template_to_rocoto_cycstring(self._base[history_path_tmpl], {'MEMDIR': 'mem#member#'}) deps = [] data = f'{history_path}/{history_file_tmpl}' dep_dict = {'type': 'data', 'data': data, 'age': 120} deps.append(rocoto.add_dependency(dep_dict)) - dep_dict = {'type': 'metatask', 'name': 'gefs_fcst_mem#member#'} + dep_dict = {'type': 'task', 'name': 'gefs_fcst_mem#member#_#seg_dep#'} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep=deps, dep_condition='or') postenvars = self.envars.copy() postenvar_dict = {'ENSMEM': '#member#', 'MEMDIR': 'mem#member#', - 'FHR3': '#fhr#', + 'FHR_LIST': '#fhr_list#', 'COMPONENT': component} for key, value in postenvar_dict.items(): postenvars.append(rocoto.create_envar(name=key, value=str(value))) - task_name = f'gefs_{component}_prod_mem#member#_f#fhr#' + task_name = f'gefs_{component}_prod_mem#member#_#fhr_label#' task_dict = {'task_name': task_name, 'resources': resources, 'dependency': dependencies, @@ -233,22 +251,6 @@ def _atmosoceaniceprod(self, component: str): 'log': f'{self.rotdir}/logs/@Y@m@d@H/{task_name}.log', 'maxtries': '&MAXTRIES;'} - fhrs = self._get_forecast_hours('gefs', self._configs[config], component) - - # when replaying, atmos component does not have fhr 0, therefore remove 0 from fhrs - is_replay = self._configs[config]['REPLAY_ICS'] - if is_replay and component in ['atmos'] and 0 in fhrs: - fhrs.remove(0) - - # ocean/ice components do not have fhr 0 as they are averaged output - if component in ['ocean', 'ice'] and 0 in fhrs: - fhrs.remove(0) - - fhr_var_dict = {'fhr': ' '.join([f"{fhr:03d}" for fhr in fhrs])} - if component in ['ocean']: - fhrs_next = fhrs[1:] + [fhrs[-1] + (fhrs[-1] - fhrs[-2])] - fhr_var_dict['fhr_next'] = ' '.join([f"{fhr:03d}" for fhr in fhrs_next]) - fhr_metatask_dict = {'task_name': f'gefs_{component}_prod_#member#', 'task_dict': task_dict, 'var_dict': fhr_var_dict} @@ -268,18 +270,32 @@ def atmos_ensstat(self): deps = [] for member in range(0, self.nmem + 1): - task = f'gefs_atmos_prod_mem{member:03d}_f#fhr#' + task = f'gefs_atmos_prod_mem{member:03d}_#fhr_label#' dep_dict = {'type': 'task', 'name': task} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) + fhrs = self._get_forecast_hours('gefs', self._configs['atmos_ensstat']) + + # when replaying, atmos component does not have fhr 0, therefore remove 0 from fhrs + is_replay = self._configs['atmos_ensstat']['REPLAY_ICS'] + if is_replay and 0 in fhrs: + fhrs.remove(0) + + max_tasks = self._configs['atmos_ensstat']['MAX_TASKS'] + fhr_var_dict = self.get_grouped_fhr_dict(fhrs=fhrs, ngroups=max_tasks) + + # Adjust walltime based on the largest group + largest_group = max([len(grp.split(',')) for grp in fhr_var_dict['fhr_list'].split(' ')]) + resources['walltime'] = Tasks.multiply_HMS(resources['walltime'], largest_group) + postenvars = self.envars.copy() - postenvar_dict = {'FHR3': '#fhr#'} + postenvar_dict = {'FHR_LIST': '#fhr_list#'} for key, value in postenvar_dict.items(): postenvars.append(rocoto.create_envar(name=key, value=str(value))) - task_name = f'gefs_atmos_ensstat_f#fhr#' + task_name = f'gefs_atmos_ensstat_#fhr_label#' task_dict = {'task_name': task_name, 'resources': resources, 'dependency': dependencies, @@ -290,15 +306,6 @@ def atmos_ensstat(self): 'log': f'{self.rotdir}/logs/@Y@m@d@H/{task_name}.log', 'maxtries': '&MAXTRIES;'} - fhrs = self._get_forecast_hours('gefs', self._configs['atmos_ensstat']) - - # when replaying, atmos component does not have fhr 0, therefore remove 0 from fhrs - is_replay = self._configs['atmos_ensstat']['REPLAY_ICS'] - if is_replay and 0 in fhrs: - fhrs.remove(0) - - fhr_var_dict = {'fhr': ' '.join([f"{fhr:03d}" for fhr in fhrs])} - fhr_metatask_dict = {'task_name': f'gefs_atmos_ensstat', 'task_dict': task_dict, 'var_dict': fhr_var_dict} @@ -308,22 +315,43 @@ def atmos_ensstat(self): return task def wavepostsbs(self): + + wave_grid = self._configs['base']['waveGRD'] + history_path = self._template_to_rocoto_cycstring(self._base['COM_WAVE_HISTORY_TMPL'], {'MEMDIR': 'mem#member#'}) + history_file = f'/{self.run}wave.out_grd.{wave_grid}.@Y@m@d.@H@M@S' + deps = [] - dep_dict = {'type': 'metatask', 'name': f'gefs_fcst_mem#member#'} + dep_dict = {'type': 'data', 'data': [history_path, history_file], 'offset': [None, '#fhr3_next#:00:00']} deps.append(rocoto.add_dependency(dep_dict)) - dependencies = rocoto.create_dependency(dep=deps) + dep_dict = {'type': 'task', 'name': f'{self.run}_fcst_mem#member#_#seg_dep#'} + deps.append(rocoto.add_dependency(dep_dict)) + dependencies = rocoto.create_dependency(dep=deps, dep_condition='or') + + fhrs = self._get_forecast_hours('gefs', self._configs['wavepostsbs'], 'wave') + + # When using replay, output does not start until hour 3 + is_replay = self._configs['wavepostsbs']['REPLAY_ICS'] + if is_replay: + fhrs = [fhr for fhr in fhrs if fhr not in [0, 1, 2]] + + max_tasks = self._configs['wavepostsbs']['MAX_TASKS'] + fhr_var_dict = self.get_grouped_fhr_dict(fhrs=fhrs, ngroups=max_tasks) wave_post_envars = self.envars.copy() postenvar_dict = {'ENSMEM': '#member#', 'MEMDIR': 'mem#member#', - 'FHR3': '#fhr#', + 'FHR_LIST': '#fhr_list#', } for key, value in postenvar_dict.items(): wave_post_envars.append(rocoto.create_envar(name=key, value=str(value))) resources = self.get_resource('wavepostsbs') - task_name = f'gefs_wave_post_grid_mem#member#_f#fhr#' + # Adjust walltime based on the largest group + largest_group = max([len(grp.split(',')) for grp in fhr_var_dict['fhr_list'].split(' ')]) + resources['walltime'] = Tasks.multiply_HMS(resources['walltime'], largest_group) + + task_name = f'gefs_wave_post_grid_mem#member#_#fhr_label#' task_dict = {'task_name': task_name, 'resources': resources, 'dependency': dependencies, @@ -335,13 +363,6 @@ def wavepostsbs(self): 'maxtries': '&MAXTRIES;' } - fhrs = self._get_forecast_hours('gefs', self._configs['wavepostsbs'], 'wave') - is_replay = self._configs['wavepostsbs']['REPLAY_ICS'] - if is_replay: - fhrs = [fhr for fhr in fhrs if fhr not in [0, 1, 2]] - - fhr_var_dict = {'fhr': ' '.join([f"{fhr:03d}" for fhr in fhrs])} - fhr_metatask_dict = {'task_name': f'gefs_wave_post_grid_#member#', 'task_dict': task_dict, 'var_dict': fhr_var_dict} diff --git a/workflow/rocoto/gfs_tasks.py b/workflow/rocoto/gfs_tasks.py index 6cbf6bdb1f..d709393f95 100644 --- a/workflow/rocoto/gfs_tasks.py +++ b/workflow/rocoto/gfs_tasks.py @@ -1029,7 +1029,7 @@ def atmanlupp(self): def atmanlprod(self): postenvars = self.envars.copy() - postenvar_dict = {'FHR3': '-001'} + postenvar_dict = {'FHR_LIST': '-1'} for key, value in postenvar_dict.items(): postenvars.append(rocoto.create_envar(name=key, value=str(value))) @@ -1126,21 +1126,36 @@ def _atmosoceaniceprod(self, component: str): products_dict = {'atmos': {'config': 'atmos_products', 'history_path_tmpl': 'COM_ATMOS_MASTER_TMPL', - 'history_file_tmpl': f'{self.run}.t@Hz.master.grb2f#fhr#'}, + 'history_file_tmpl': f'{self.run}.t@Hz.master.grb2f#fhr3_last#'}, 'ocean': {'config': 'oceanice_products', 'history_path_tmpl': 'COM_OCEAN_HISTORY_TMPL', - 'history_file_tmpl': f'{self.run}.ocean.t@Hz.6hr_avg.f#fhr_next#.nc'}, + 'history_file_tmpl': f'{self.run}.ocean.t@Hz.6hr_avg.f#fhr3_next#.nc'}, 'ice': {'config': 'oceanice_products', 'history_path_tmpl': 'COM_ICE_HISTORY_TMPL', - 'history_file_tmpl': f'{self.run}.ice.t@Hz.6hr_avg.f#fhr#.nc'}} + 'history_file_tmpl': f'{self.run}.ice.t@Hz.6hr_avg.f#fhr3_last#.nc'}} component_dict = products_dict[component] config = component_dict['config'] history_path_tmpl = component_dict['history_path_tmpl'] history_file_tmpl = component_dict['history_file_tmpl'] + max_tasks = self._configs[config]['MAX_TASKS'] + resources = self.get_resource(component_dict['config']) + + fhrs = self._get_forecast_hours(self.run, self._configs[config], component) + + # ocean/ice components do not have fhr 0 as they are averaged output + if component in ['ocean', 'ice'] and 0 in fhrs: + fhrs.remove(0) + + fhr_var_dict = self.get_grouped_fhr_dict(fhrs=fhrs, ngroups=max_tasks) + + # Adjust walltime based on the largest group + largest_group = max([len(grp.split(',')) for grp in fhr_var_dict['fhr_list'].split(' ')]) + resources['walltime'] = Tasks.multiply_HMS(resources['walltime'], largest_group) + postenvars = self.envars.copy() - postenvar_dict = {'FHR3': '#fhr#', 'COMPONENT': component} + postenvar_dict = {'FHR_LIST': '#fhr_list#', 'COMPONENT': component} for key, value in postenvar_dict.items(): postenvars.append(rocoto.create_envar(name=key, value=str(value))) @@ -1154,9 +1169,8 @@ def _atmosoceaniceprod(self, component: str): dependencies = rocoto.create_dependency(dep=deps, dep_condition='or') cycledef = 'gdas_half,gdas' if self.run in ['gdas'] else self.run - resources = self.get_resource(component_dict['config']) - task_name = f'{self.run}_{component}_prod_f#fhr#' + task_name = f'{self.run}_{component}_prod_#fhr_label#' task_dict = {'task_name': task_name, 'resources': resources, 'dependency': dependencies, @@ -1168,17 +1182,6 @@ def _atmosoceaniceprod(self, component: str): 'maxtries': '&MAXTRIES;' } - fhrs = self._get_forecast_hours(self.run, self._configs[config], component) - - # ocean/ice components do not have fhr 0 as they are averaged output - if component in ['ocean', 'ice'] and 0 in fhrs: - fhrs.remove(0) - - fhr_var_dict = {'fhr': ' '.join([f"{fhr:03d}" for fhr in fhrs])} - - if component in ['ocean']: - fhrs_next = fhrs[1:] + [fhrs[-1] + (fhrs[-1] - fhrs[-2])] - fhr_var_dict['fhr_next'] = ' '.join([f"{fhr:03d}" for fhr in fhrs_next]) metatask_dict = {'task_name': f'{self.run}_{component}_prod', 'task_dict': task_dict, 'var_dict': fhr_var_dict} @@ -1189,18 +1192,32 @@ def _atmosoceaniceprod(self, component: str): def wavepostsbs(self): + wave_grid = self._configs['base']['waveGRD'] + history_path = self._template_to_rocoto_cycstring(self._base['COM_WAVE_HISTORY_TMPL']) + history_file = f'/{self.run}wave.out_grd.{wave_grid}.@Y@m@d.@H@M@S' + deps = [] - dep_dict = {'type': 'metatask', 'name': f'{self.run}_fcst'} + dep_dict = {'type': 'data', 'data': [history_path, history_file], 'offset': [None, '#fhr3_next#:00:00']} deps.append(rocoto.add_dependency(dep_dict)) - dependencies = rocoto.create_dependency(dep=deps) + dep_dict = {'type': 'task', 'name': f'{self.run}_fcst_#seg_dep#'} + deps.append(rocoto.add_dependency(dep_dict)) + dependencies = rocoto.create_dependency(dep=deps, dep_condition='or') + + fhrs = self._get_forecast_hours('gfs', self._configs['wavepostsbs'], 'wave') + max_tasks = self._configs['wavepostsbs']['MAX_TASKS'] + fhr_var_dict = self.get_grouped_fhr_dict(fhrs=fhrs, ngroups=max_tasks) wave_post_envars = self.envars.copy() - postenvar_dict = {'FHR3': '#fhr#'} + postenvar_dict = {'FHR_LIST': '#fhr_list#'} for key, value in postenvar_dict.items(): wave_post_envars.append(rocoto.create_envar(name=key, value=str(value))) resources = self.get_resource('wavepostsbs') - task_name = f'{self.run}_wavepostsbs_f#fhr#' + # Adjust walltime based on the largest group + largest_group = max([len(grp.split(',')) for grp in fhr_var_dict['fhr_list'].split(' ')]) + resources['walltime'] = Tasks.multiply_HMS(resources['walltime'], largest_group) + + task_name = f'{self.run}_wavepostsbs_#fhr_label#' task_dict = {'task_name': task_name, 'resources': resources, 'dependency': dependencies, @@ -1212,12 +1229,9 @@ def wavepostsbs(self): 'maxtries': '&MAXTRIES;' } - fhrs = self._get_forecast_hours('gfs', self._configs['wavepostsbs'], 'wave') - - fhr_metatask_dict = {'fhr': ' '.join([f"{fhr:03d}" for fhr in fhrs])} metatask_dict = {'task_name': f'{self.run}_wavepostsbs', 'task_dict': task_dict, - 'var_dict': fhr_metatask_dict} + 'var_dict': fhr_var_dict} task = rocoto.create_task(metatask_dict) @@ -1512,17 +1526,26 @@ def awips_20km_1p0deg(self): def gempak(self): deps = [] - dep_dict = {'type': 'task', 'name': f'{self.run}_atmos_prod_f#fhr#'} + dep_dict = {'type': 'task', 'name': f'{self.run}_atmos_prod_#fhr_label#'} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep=deps) + fhrs = self._get_forecast_hours(self.run, self._configs['gempak']) + max_tasks = self._configs['gempak']['MAX_TASKS'] + fhr_var_dict = self.get_grouped_fhr_dict(fhrs=fhrs, ngroups=max_tasks) + + resources = self.get_resource('gempak') + # Adjust walltime based on the largest group + largest_group = max([len(grp.split(',')) for grp in fhr_var_dict['fhr_list'].split(' ')]) + resources['walltime'] = Tasks.multiply_HMS(resources['walltime'], largest_group) + gempak_vars = self.envars.copy() - gempak_dict = {'FHR3': '#fhr#'} + gempak_dict = {'FHR_LIST': '#fhr_list#'} for key, value in gempak_dict.items(): gempak_vars.append(rocoto.create_envar(name=key, value=str(value))) resources = self.get_resource('gempak') - task_name = f'{self.run}_gempak_f#fhr#' + task_name = f'{self.run}_gempak_#fhr_label#' task_dict = {'task_name': task_name, 'resources': resources, 'dependency': dependencies, @@ -1534,9 +1557,6 @@ def gempak(self): 'maxtries': '&MAXTRIES;' } - fhrs = self._get_forecast_hours(self.run, self._configs['gempak']) - fhr_var_dict = {'fhr': ' '.join([f"{fhr:03d}" for fhr in fhrs])} - fhr_metatask_dict = {'task_name': f'{self.run}_gempak', 'task_dict': task_dict, 'var_dict': fhr_var_dict} diff --git a/workflow/rocoto/tasks.py b/workflow/rocoto/tasks.py index d9c769ffbe..3c215414b5 100644 --- a/workflow/rocoto/tasks.py +++ b/workflow/rocoto/tasks.py @@ -3,8 +3,9 @@ import numpy as np from applications.applications import AppConfig import rocoto.rocoto as rocoto -from wxflow import Template, TemplateConstants, to_timedelta -from typing import List +from wxflow import Template, TemplateConstants, to_timedelta, timedelta_to_HMS +from typing import List, Union +from bisect import bisect_right __all__ = ['Tasks'] @@ -176,6 +177,127 @@ def _get_forecast_hours(run, config, component='atmos') -> List[str]: return fhrs + @staticmethod + def get_job_groups(fhrs: List[int], ngroups: int, breakpoints: List[int] = None) -> List[dict]: + ''' + Split forecast hours into a number of groups, obeying a list of pre-set breakpoints. + + Takes a list of forecast hours and splits it into a number of groups while obeying + a list of pre-set breakpoints and recording which segment each belongs to. + + Parameters + ---------- + fhrs: List[int] + List of forecast hours to break into groups + ngroups: int + Number of groups to split the forecast hours into + breakpoints: List[int] + List of preset forecast hour break points to use (default: []) + + Returns + ------- + List[dict]: List of dicts, where each dict contains two keys: + 'fhrs': the forecast hours for that group + 'seg': the forecast segment (from the original breakpoint list) + the group belong to + ''' + if breakpoints is None: + breakpoints = [] + + num_segs = len(breakpoints) + 1 + if num_segs > ngroups: + raise ValueError(f"Number of segments ({num_segs}) is greater than the number of groups ({ngroups}") + + if ngroups > len(fhrs): + ngroups = len(fhrs) + + # First, split at segment boundaries + fhrs_segs = [grp.tolist() for grp in np.array_split(fhrs, [bisect_right(fhrs, bpnt) for bpnt in breakpoints if bpnt < max(fhrs)])] + seg_lens = [len(seg) for seg in fhrs_segs] + + # Initialize each segment to be split into one job group + ngroups_segs = [1 for _ in range(0, len(fhrs_segs))] + + # For remaining job groups, iteratively assign to the segment with the most + # hours per group + for _ in range(0, ngroups - len(fhrs_segs)): + current_lens = [size / weight for size, weight in zip(seg_lens, ngroups_segs)] + index_max = max(range(len(current_lens)), key=current_lens.__getitem__) + ngroups_segs[index_max] += 1 + + # Now that we know how many groups each forecast segment should be split into, + # Split them and flatten to a single list. + groups = [] + for seg_num, (fhrs_seg, ngroups_seg) in enumerate(zip(fhrs_segs, ngroups_segs)): + [groups.append({'fhrs': grp.tolist(), 'seg': seg_num}) for grp in np.array_split(fhrs_seg, ngroups_seg)] + + return groups + + def get_grouped_fhr_dict(self, fhrs: List[int], ngroups: int) -> dict: + ''' + Prepare a metatask dictionary for forecast hour groups. + + Takes a list of forecast hours and splits it into a number of groups while not + crossing forecast segment boundaries. Then use that to prepare a dict with key + variable lists for use in a rocoto metatask. + + Parameters + ---------- + fhrs: List[int] + List of forecast hours to break into groups + ngroups: int + Number of groups to split the forecast hours into + + Returns + ------- + dict: Several variable lists for use in rocoto metatasks: + fhr_list: list of comma-separated lists of fhr groups + fhr_label: list of labels corresponding to the fhr range + fhr3_last: list of the last fhr in each group, formatted to three digits + fhr3_next: list of the fhr that would follow each group, formatted to + three digits + seg_dep: list of segments each group belongs to + ''' + fhr_breakpoints = self.options['fcst_segments'][1:-1] + group_dicts = Tasks.get_job_groups(fhrs=fhrs, ngroups=ngroups, breakpoints=fhr_breakpoints) + + fhrs_group = [dct['fhrs'] for dct in group_dicts] + fhrs_first = [grp[0] for grp in fhrs_group] + fhrs_last = [grp[-1] for grp in fhrs_group] + fhrs_next = fhrs_first[1:] + [fhrs_last[-1] + (fhrs[-1] - fhrs[-2])] + grp_str = [f'f{grp[0]:03d}-f{grp[-1]:03d}' if len(grp) > 1 else f'f{grp[0]:03d}' for grp in fhrs_group] + seg_deps = [f'seg{dct["seg"]}' for dct in group_dicts] + + fhr_var_dict = {'fhr_list': ' '.join(([','.join(str(fhr) for fhr in grp) for grp in fhrs_group])), + 'fhr_label': ' '.join(grp_str), + 'seg_dep': ' '.join(seg_deps), + 'fhr3_last': ' '.join([f'{fhr:03d}' for fhr in fhrs_last]), + 'fhr3_next': ' '.join([f'{fhr:03d}' for fhr in fhrs_next]) + } + + return fhr_var_dict + + @staticmethod + def multiply_HMS(hms_timedelta: str, multiplier: Union[int, float]) -> str: + ''' + Multiplies an HMS timedelta string + + Parameters + ---------- + hms_timedelta: str + String representing a time delta in HH:MM:SS format + multiplier: int | float + Value to multiply the time delta by + + Returns + ------- + str: String representing a time delta in HH:MM:SS format + + ''' + input_timedelta = to_timedelta(hms_timedelta) + output_timedelta = input_timedelta * multiplier + return timedelta_to_HMS(output_timedelta) + def get_resource(self, task_name): """ Given a task name (task_name) and its configuration (task_names), diff --git a/workflow/tests/__init__.py b/workflow/tests/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/workflow/test_configuration.py b/workflow/tests/test_configuration.py similarity index 100% rename from workflow/test_configuration.py rename to workflow/tests/test_configuration.py diff --git a/workflow/test_hosts.py b/workflow/tests/test_hosts.py similarity index 100% rename from workflow/test_hosts.py rename to workflow/tests/test_hosts.py diff --git a/workflow/tests/test_tasks.py b/workflow/tests/test_tasks.py new file mode 100644 index 0000000000..7e7e7eb5e7 --- /dev/null +++ b/workflow/tests/test_tasks.py @@ -0,0 +1,84 @@ +from rocoto.tasks import Tasks + + +class TestTasks: + + ''' + Tasks class tests + + Note: this is currently only testing a small fraction of the class. + ''' + + def test_job_groups(self): + test_array = list(range(0, 24)) + + # Test simple splitting with no breakpoints + test_groups = [{'fhrs': [0, 1, 2, 3, 4, 5], 'seg': 0}, + {'fhrs': [6, 7, 8, 9, 10, 11], 'seg': 0}, + {'fhrs': [12, 13, 14, 15, 16, 17], 'seg': 0}, + {'fhrs': [18, 19, 20, 21, 22, 23], 'seg': 0}] + assert Tasks.get_job_groups(fhrs=test_array, ngroups=4) == test_groups + + # Test with a break point that aligns with normal split point + test_groups = [{'fhrs': [0, 1, 2, 3, 4, 5], 'seg': 0}, + {'fhrs': [6, 7, 8, 9, 10, 11], 'seg': 0}, + {'fhrs': [12, 13, 14, 15, 16, 17], 'seg': 1}, + {'fhrs': [18, 19, 20, 21, 22, 23], 'seg': 1}] + assert Tasks.get_job_groups(fhrs=test_array, ngroups=4, breakpoints=[11]) == test_groups + + # Test with a break point not at a normal split point + test_groups = [{'fhrs': [0, 1, 2, 3, 4, 5, 6, 7], 'seg': 0}, + {'fhrs': [8, 9, 10, 11, 12, 13, 14], 'seg': 0}, + {'fhrs': [15, 16, 17, 18, 19], 'seg': 1}, + {'fhrs': [20, 21, 22, 23], 'seg': 1}] + assert Tasks.get_job_groups(fhrs=test_array, ngroups=4, breakpoints=[14]) == test_groups + + # Test highly skewed break point + test_groups = [{'fhrs': [0, 1, 2, 3, 4, 5, 6, 7], 'seg': 0}, + {'fhrs': [8, 9, 10, 11, 12, 13, 14, 15], 'seg': 0}, + {'fhrs': [16, 17, 18, 19, 20, 21, 22], 'seg': 0}, + {'fhrs': [23], 'seg': 1}] + assert Tasks.get_job_groups(fhrs=test_array, ngroups=4, breakpoints=[22]) == test_groups + + # Test with two break points that align + test_groups = [{'fhrs': [0, 1, 2, 3, 4, 5], 'seg': 0}, + {'fhrs': [6, 7, 8, 9, 10, 11], 'seg': 0}, + {'fhrs': [12, 13, 14, 15, 16, 17], 'seg': 1}, + {'fhrs': [18, 19, 20, 21, 22, 23], 'seg': 2}] + assert Tasks.get_job_groups(fhrs=test_array, ngroups=4, breakpoints=[11, 17]) == test_groups + + # Test with two skewed break points + test_groups = [{'fhrs': [0, 1], 'seg': 0}, + {'fhrs': [2, 3, 4, 5, 6, 7], 'seg': 1}, + {'fhrs': [8, 9, 10, 11, 12], 'seg': 1}, + {'fhrs': [13, 14, 15, 16, 17], 'seg': 1}, + {'fhrs': [18, 19, 20, 21, 22], 'seg': 1}, + {'fhrs': [23], 'seg': 2}] + assert Tasks.get_job_groups(fhrs=test_array, ngroups=6, breakpoints=[1, 22]) == test_groups + + # Test slightly irregular break points + test_groups = [{'fhrs': [0, 1, 2, 3], 'seg': 0}, + {'fhrs': [4, 5, 6], 'seg': 0}, + {'fhrs': [7, 8, 9, 10], 'seg': 1}, + {'fhrs': [11, 12, 13, 14], 'seg': 1}, + {'fhrs': [15, 16, 17, 18], 'seg': 1}, + {'fhrs': [19, 20, 21, 22, 23], 'seg': 2}] + assert Tasks.get_job_groups(fhrs=test_array, ngroups=6, breakpoints=[6, 18]) == test_groups + + # Test more groups than fhrs available + test_array = list(range(0, 6)) + test_groups = [{'fhrs': [0], 'seg': 0}, + {'fhrs': [1], 'seg': 0}, + {'fhrs': [2], 'seg': 0}, + {'fhrs': [3], 'seg': 0}, + {'fhrs': [4], 'seg': 0}, + {'fhrs': [5], 'seg': 0}] + assert Tasks.get_job_groups(fhrs=test_array, ngroups=15) == test_groups + + def test_multiply_HMS(self): + assert Tasks.multiply_HMS('00:10:00', 2) == '00:20:00' + assert Tasks.multiply_HMS('00:30:00', 10) == '05:00:00' + assert Tasks.multiply_HMS('01:15:00', 4) == '05:00:00' + assert Tasks.multiply_HMS('00:05:00', 1.5) == '00:07:30' + assert Tasks.multiply_HMS('00:40:00', 2.5) == '01:40:00' + assert Tasks.multiply_HMS('00:10:00', 1) == '00:10:00'