diff --git a/jobs/rocoto/atmos_ensstat.sh b/jobs/rocoto/atmos_ensstat.sh index 76ed7f0a72..617cbd77f8 100755 --- a/jobs/rocoto/atmos_ensstat.sh +++ b/jobs/rocoto/atmos_ensstat.sh @@ -13,13 +13,20 @@ status=$? if (( status != 0 )); then exit "${status}"; fi export job="atmos_ensstat" -export jobid="${job}.$$" -export FORECAST_HOUR=$(( 10#${FHR3} )) +# shellcheck disable=SC2153 +IFS=', ' read -r -a fhr_list <<< "${FHR_LIST}" -############################################################### -# Execute the JJOB -############################################################### -"${HOMEgfs}/jobs/JGLOBAL_ATMOS_ENSSTAT" +export FORECAST_HOUR jobid +for FORECAST_HOUR in "${fhr_list[@]}"; do + fhr3=$(printf '%03d' "${FORECAST_HOUR}") + jobid="${job}_f${fhr3}.$$" + ############################################################### + # Execute the JJOB + ############################################################### + "${HOMEgfs}/jobs/JGLOBAL_ATMOS_ENSSTAT" + status=$? + [[ ${status} -ne 0 ]] && exit "${status}" +done -exit $? +exit 0 diff --git a/jobs/rocoto/atmos_products.sh b/jobs/rocoto/atmos_products.sh index f6adbcf861..947b06dfc2 100755 --- a/jobs/rocoto/atmos_products.sh +++ b/jobs/rocoto/atmos_products.sh @@ -13,15 +13,20 @@ status=$? if (( status != 0 )); then exit "${status}"; fi export job="atmos_products" -export jobid="${job}.$$" -# Negatation needs to be before the base -fhr3_base="10#${FHR3}" -export FORECAST_HOUR=$(( ${fhr3_base/10#-/-10#} )) +# shellcheck disable=SC2153 +IFS=', ' read -r -a fhr_list <<< "${FHR_LIST}" -############################################################### -# Execute the JJOB -############################################################### -"${HOMEgfs}/jobs/JGLOBAL_ATMOS_PRODUCTS" +export FORECAST_HOUR jobid +for FORECAST_HOUR in "${fhr_list[@]}"; do + fhr3=$(printf '%03d' "${FORECAST_HOUR}") + jobid="${job}_f${fhr3}.$$" + ############################################################### + # Execute the JJOB + ############################################################### + "${HOMEgfs}/jobs/JGLOBAL_ATMOS_PRODUCTS" + status=$? + [[ ${status} -ne 0 ]] && exit "${status}" +done -exit $? +exit 0 diff --git a/jobs/rocoto/gempak.sh b/jobs/rocoto/gempak.sh index f5aea2379d..dc1d3f2621 100755 --- a/jobs/rocoto/gempak.sh +++ b/jobs/rocoto/gempak.sh @@ -6,11 +6,20 @@ status=$? if (( status != 0 )); then exit "${status}"; fi export job="gempak" -export jobid="${job}.$$" +# shellcheck disable=SC2153 +IFS=', ' read -r -a fhr_list <<< "${FHR_LIST}" -# Execute the JJOB -"${HOMEgfs}/jobs/J${RUN^^}_ATMOS_GEMPAK" +export FHR3 jobid +for fhr in "${fhr_list[@]}"; do + FHR3=$(printf '%03d' "${fhr}") + jobid="${job}_f${FHR3}.$$" + ############################################################### + # Execute the JJOB + ############################################################### + "${HOMEgfs}/jobs/J${RUN^^}_ATMOS_GEMPAK" + status=$? + [[ ${status} -ne 0 ]] && exit "${status}" +done -status=$? -exit "${status}" +exit 0 diff --git a/jobs/rocoto/oceanice_products.sh b/jobs/rocoto/oceanice_products.sh index 2a3b617d05..c3e03cea1a 100755 --- a/jobs/rocoto/oceanice_products.sh +++ b/jobs/rocoto/oceanice_products.sh @@ -13,13 +13,20 @@ status=$? if (( status != 0 )); then exit "${status}"; fi export job="oceanice_products" -export jobid="${job}.$$" -export FORECAST_HOUR=$(( 10#${FHR3} )) +# shellcheck disable=SC2153 +IFS=', ' read -r -a fhr_list <<< "${FHR_LIST}" -############################################################### -# Execute the JJOB -############################################################### -"${HOMEgfs}/jobs/JGLOBAL_OCEANICE_PRODUCTS" +export FORECAST_HOUR jobid +for FORECAST_HOUR in "${fhr_list[@]}"; do + fhr3=$(printf '%03d' "${FORECAST_HOUR}") + jobid="${job}_${COMPONENT}_f${fhr3}.$$" + ############################################################### + # Execute the JJOB + ############################################################### + "${HOMEgfs}/jobs/JGLOBAL_OCEANICE_PRODUCTS" + status=$? + [[ ${status} -ne 0 ]] && exit "${status}" +done -exit $? +exit 0 diff --git a/jobs/rocoto/wavepostsbs.sh b/jobs/rocoto/wavepostsbs.sh index f4789210d8..ff81c2a9d3 100755 --- a/jobs/rocoto/wavepostsbs.sh +++ b/jobs/rocoto/wavepostsbs.sh @@ -5,17 +5,24 @@ source "${HOMEgfs}/ush/preamble.sh" ############################################################### # Source FV3GFS workflow modules #. ${HOMEgfs}/ush/load_fv3gfs_modules.sh -. ${HOMEgfs}/ush/load_ufswm_modules.sh +source "${HOMEgfs}/ush/load_ufswm_modules.sh" status=$? -[[ ${status} -ne 0 ]] && exit ${status} +[[ ${status} -ne 0 ]] && exit "${status}" export job="wavepostsbs" -export jobid="${job}.$$" ############################################################### -# Execute the JJOB -${HOMEgfs}/jobs/JGLOBAL_WAVE_POST_SBS -status=$? -[[ ${status} -ne 0 ]] && exit ${status} +# shellcheck disable=SC2153 +IFS=', ' read -r -a fhr_list <<< "${FHR_LIST}" + +export FHR3 jobid +for FORECAST_HOUR in "${fhr_list[@]}"; do + FHR3=$(printf '%03d' "${FORECAST_HOUR}") + jobid="${job}_f${FHR3}.$$" + # Execute the JJOB + "${HOMEgfs}/jobs/JGLOBAL_WAVE_POST_SBS" + status=$? + [[ ${status} -ne 0 ]] && exit "${status}" +done exit 0 diff --git a/parm/config/gefs/config.atmos_ensstat b/parm/config/gefs/config.atmos_ensstat index d371f75887..b542659523 100644 --- a/parm/config/gefs/config.atmos_ensstat +++ b/parm/config/gefs/config.atmos_ensstat @@ -5,6 +5,9 @@ echo "BEGIN: config.atmos_ensstat" +# Maximum number of rocoto tasks +export MAX_TASKS=25 + # Get task specific resources . "${EXPDIR}/config.resources" atmos_ensstat diff --git a/parm/config/gefs/config.atmos_products b/parm/config/gefs/config.atmos_products index e8aae324e1..d1f36a7bc9 100644 --- a/parm/config/gefs/config.atmos_products +++ b/parm/config/gefs/config.atmos_products @@ -8,8 +8,8 @@ echo "BEGIN: config.atmos_products" # Get task specific resources . "${EXPDIR}/config.resources" atmos_products -# No. of forecast hours to process in a single job -export NFHRS_PER_GROUP=3 +# Maximum number of rocoto tasks per member +export MAX_TASKS=25 # Scripts used by this job export INTERP_ATMOS_MASTERSH="${USHgfs}/interp_atmos_master.sh" diff --git a/parm/config/gefs/config.oceanice_products b/parm/config/gefs/config.oceanice_products index 3b8b064947..6bb604d0ca 100644 --- a/parm/config/gefs/config.oceanice_products +++ b/parm/config/gefs/config.oceanice_products @@ -9,7 +9,7 @@ source "${EXPDIR}/config.resources" oceanice_products export OCEANICEPRODUCTS_CONFIG="${PARMgfs}/post/oceanice_products_gefs.yaml" -# No. of forecast hours to process in a single job -export NFHRS_PER_GROUP=3 +# Maximum number of rocoto tasks per member +export MAX_TASKS=25 echo "END: config.oceanice_products" diff --git a/parm/config/gefs/config.resources b/parm/config/gefs/config.resources index 17033858c8..bb33f3eb02 100644 --- a/parm/config/gefs/config.resources +++ b/parm/config/gefs/config.resources @@ -234,6 +234,7 @@ case ${step} in ;; "atmos_products") + # Walltime is per forecast hour; will be multipled by group size export walltime="00:15:00" export ntasks=24 export threads_per_task=1 @@ -242,7 +243,8 @@ case ${step} in ;; "atmos_ensstat") - export walltime="00:30:00" + # Walltime is per forecast hour; will be multipled by group size + export walltime="00:15:00" export ntasks=6 export threads_per_task=1 export tasks_per_node="${ntasks}" @@ -250,6 +252,7 @@ case ${step} in ;; "oceanice_products") + # Walltime is per forecast hour; will be multipled by group size export walltime="00:15:00" export ntasks=1 export tasks_per_node=1 @@ -258,7 +261,8 @@ case ${step} in ;; "wavepostsbs") - export walltime="03:00:00" + # Walltime is per forecast hour; will be multipled by group size + export walltime="00:15:00" export ntasks=1 export threads_per_task=1 export tasks_per_node=$(( max_tasks_per_node / threads_per_task )) @@ -328,7 +332,7 @@ case ${step} in ;; "cleanup") - export walltime="00:15:00" + export walltime="00:30:00" export ntasks=1 export tasks_per_node=1 export threads_per_task=1 diff --git a/parm/config/gefs/config.wavepostsbs b/parm/config/gefs/config.wavepostsbs index 82cec321da..b43ea33d40 100644 --- a/parm/config/gefs/config.wavepostsbs +++ b/parm/config/gefs/config.wavepostsbs @@ -8,6 +8,9 @@ echo "BEGIN: config.wavepostsbs" # Get task specific resources source "${EXPDIR}/config.resources" wavepostsbs +# Maximum number of rocoto tasks per member +export MAX_TASKS=25 + # Subgrid info for grib2 encoding export WAV_SUBGRBSRC="" export WAV_SUBGRB="" diff --git a/parm/config/gfs/config.atmos_products b/parm/config/gfs/config.atmos_products index 451f5eff86..5b6e4067b5 100644 --- a/parm/config/gfs/config.atmos_products +++ b/parm/config/gfs/config.atmos_products @@ -8,8 +8,8 @@ echo "BEGIN: config.atmos_products" # Get task specific resources . "${EXPDIR}/config.resources" atmos_products -# No. of forecast hours to process in a single job -export NFHRS_PER_GROUP=3 +## Maximum number of rocoto tasks per member +export MAX_TASKS=25 # Scripts used by this job export INTERP_ATMOS_MASTERSH="${USHgfs}/interp_atmos_master.sh" diff --git a/parm/config/gfs/config.gempak b/parm/config/gfs/config.gempak index 791770ba4a..db5e85af3f 100644 --- a/parm/config/gfs/config.gempak +++ b/parm/config/gfs/config.gempak @@ -5,7 +5,10 @@ echo "BEGIN: config.gempak" +# Maximum number of rocoto tasks per member +export MAX_TASKS=25 + # Get task specific resources -. $EXPDIR/config.resources gempak +source "${EXPDIR}/config.resources" gempak echo "END: config.gempak" diff --git a/parm/config/gfs/config.oceanice_products b/parm/config/gfs/config.oceanice_products index 9e5c5b1c68..a618cbe10c 100644 --- a/parm/config/gfs/config.oceanice_products +++ b/parm/config/gfs/config.oceanice_products @@ -7,6 +7,9 @@ echo "BEGIN: config.oceanice_products" # Get task specific resources source "${EXPDIR}/config.resources" oceanice_products +# Maximum number of rocoto tasks per member +export MAX_TASKS=25 + export OCEANICEPRODUCTS_CONFIG="${PARMgfs}/post/oceanice_products.yaml" # No. of forecast hours to process in a single job diff --git a/parm/config/gfs/config.resources b/parm/config/gfs/config.resources index 6a85a6de14..0eea92cbde 100644 --- a/parm/config/gfs/config.resources +++ b/parm/config/gfs/config.resources @@ -191,8 +191,9 @@ case ${step} in ;; "wavepostsbs") - walltime_gdas="00:20:00" - walltime_gfs="03:00:00" + # Walltime is per forecast hour; will be multipled by group size + walltime_gdas="00:15:00" + walltime_gfs="00:15:00" ntasks=8 threads_per_task=1 tasks_per_node=$(( max_tasks_per_node / threads_per_task )) @@ -911,6 +912,7 @@ case ${step} in ;; "oceanice_products") + # Walltime is per forecast hour; will be multipled by group size walltime="00:15:00" ntasks=1 tasks_per_node=1 @@ -949,6 +951,7 @@ case ${step} in ;; "atmos_products") + # Walltime is per forecast hour; will be multipled by group size walltime="00:15:00" ntasks=24 threads_per_task=1 @@ -1278,6 +1281,7 @@ case ${step} in ;; "gempak") + # Walltime is per forecast hour; will be multipled by group size walltime="00:30:00" ntasks_gdas=2 ntasks_gfs=28 diff --git a/parm/config/gfs/config.wavepostsbs b/parm/config/gfs/config.wavepostsbs index 82cec321da..b43ea33d40 100644 --- a/parm/config/gfs/config.wavepostsbs +++ b/parm/config/gfs/config.wavepostsbs @@ -8,6 +8,9 @@ echo "BEGIN: config.wavepostsbs" # Get task specific resources source "${EXPDIR}/config.resources" wavepostsbs +# Maximum number of rocoto tasks per member +export MAX_TASKS=25 + # Subgrid info for grib2 encoding export WAV_SUBGRBSRC="" export WAV_SUBGRB="" diff --git a/workflow/rocoto/gefs_tasks.py b/workflow/rocoto/gefs_tasks.py index ca29bcdf1e..f89d3dbbb0 100644 --- a/workflow/rocoto/gefs_tasks.py +++ b/workflow/rocoto/gefs_tasks.py @@ -190,39 +190,57 @@ def _atmosoceaniceprod(self, component: str): fhout_ice_gfs = self._configs['base']['FHOUT_ICE_GFS'] products_dict = {'atmos': {'config': 'atmos_products', 'history_path_tmpl': 'COM_ATMOS_MASTER_TMPL', - 'history_file_tmpl': f'{self.run}.t@Hz.master.grb2f#fhr#'}, + 'history_file_tmpl': f'{self.run}.t@Hz.master.grb2f#fhr3_last#'}, 'ocean': {'config': 'oceanice_products', 'history_path_tmpl': 'COM_OCEAN_HISTORY_TMPL', - 'history_file_tmpl': f'{self.run}.ocean.t@Hz.{fhout_ocn_gfs}hr_avg.f#fhr_next#.nc'}, + 'history_file_tmpl': f'{self.run}.ocean.t@Hz.{fhout_ocn_gfs}hr_avg.f#fhr3_next#.nc'}, 'ice': {'config': 'oceanice_products', 'history_path_tmpl': 'COM_ICE_HISTORY_TMPL', - 'history_file_tmpl': f'{self.run}.ice.t@Hz.{fhout_ice_gfs}hr_avg.f#fhr#.nc'}} + 'history_file_tmpl': f'{self.run}.ice.t@Hz.{fhout_ice_gfs}hr_avg.f#fhr3_last#.nc'}} component_dict = products_dict[component] config = component_dict['config'] history_path_tmpl = component_dict['history_path_tmpl'] history_file_tmpl = component_dict['history_file_tmpl'] + max_tasks = self._configs[config]['MAX_TASKS'] resources = self.get_resource(config) + fhrs = self._get_forecast_hours('gefs', self._configs[config], component) + + # when replaying, atmos component does not have fhr 0, therefore remove 0 from fhrs + is_replay = self._configs[config]['REPLAY_ICS'] + if is_replay and component in ['atmos'] and 0 in fhrs: + fhrs.remove(0) + + # ocean/ice components do not have fhr 0 as they are averaged output + if component in ['ocean', 'ice'] and 0 in fhrs: + fhrs.remove(0) + + fhr_var_dict = self.get_grouped_fhr_dict(fhrs=fhrs, ngroups=max_tasks) + + # Adjust walltime based on the largest group + largest_group = max([len(grp.split(',')) for grp in fhr_var_dict['fhr_list'].split(' ')]) + resources['walltime'] = Tasks.multiply_HMS(resources['walltime'], largest_group) + history_path = self._template_to_rocoto_cycstring(self._base[history_path_tmpl], {'MEMDIR': 'mem#member#'}) deps = [] data = f'{history_path}/{history_file_tmpl}' dep_dict = {'type': 'data', 'data': data, 'age': 120} deps.append(rocoto.add_dependency(dep_dict)) - dep_dict = {'type': 'metatask', 'name': 'gefs_fcst_mem#member#'} + dep_dict = {'type': 'task', 'name': 'gefs_fcst_mem#member#_#seg_dep#'} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep=deps, dep_condition='or') postenvars = self.envars.copy() postenvar_dict = {'ENSMEM': '#member#', 'MEMDIR': 'mem#member#', - 'FHR3': '#fhr#', + 'FHR_LIST': '#fhr_list#', 'COMPONENT': component} for key, value in postenvar_dict.items(): postenvars.append(rocoto.create_envar(name=key, value=str(value))) - task_name = f'gefs_{component}_prod_mem#member#_f#fhr#' + task_name = f'gefs_{component}_prod_mem#member#_#fhr_label#' task_dict = {'task_name': task_name, 'resources': resources, 'dependency': dependencies, @@ -233,22 +251,6 @@ def _atmosoceaniceprod(self, component: str): 'log': f'{self.rotdir}/logs/@Y@m@d@H/{task_name}.log', 'maxtries': '&MAXTRIES;'} - fhrs = self._get_forecast_hours('gefs', self._configs[config], component) - - # when replaying, atmos component does not have fhr 0, therefore remove 0 from fhrs - is_replay = self._configs[config]['REPLAY_ICS'] - if is_replay and component in ['atmos'] and 0 in fhrs: - fhrs.remove(0) - - # ocean/ice components do not have fhr 0 as they are averaged output - if component in ['ocean', 'ice'] and 0 in fhrs: - fhrs.remove(0) - - fhr_var_dict = {'fhr': ' '.join([f"{fhr:03d}" for fhr in fhrs])} - if component in ['ocean']: - fhrs_next = fhrs[1:] + [fhrs[-1] + (fhrs[-1] - fhrs[-2])] - fhr_var_dict['fhr_next'] = ' '.join([f"{fhr:03d}" for fhr in fhrs_next]) - fhr_metatask_dict = {'task_name': f'gefs_{component}_prod_#member#', 'task_dict': task_dict, 'var_dict': fhr_var_dict} @@ -268,18 +270,32 @@ def atmos_ensstat(self): deps = [] for member in range(0, self.nmem + 1): - task = f'gefs_atmos_prod_mem{member:03d}_f#fhr#' + task = f'gefs_atmos_prod_mem{member:03d}_#fhr_label#' dep_dict = {'type': 'task', 'name': task} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) + fhrs = self._get_forecast_hours('gefs', self._configs['atmos_ensstat']) + + # when replaying, atmos component does not have fhr 0, therefore remove 0 from fhrs + is_replay = self._configs['atmos_ensstat']['REPLAY_ICS'] + if is_replay and 0 in fhrs: + fhrs.remove(0) + + max_tasks = self._configs['atmos_ensstat']['MAX_TASKS'] + fhr_var_dict = self.get_grouped_fhr_dict(fhrs=fhrs, ngroups=max_tasks) + + # Adjust walltime based on the largest group + largest_group = max([len(grp.split(',')) for grp in fhr_var_dict['fhr_list'].split(' ')]) + resources['walltime'] = Tasks.multiply_HMS(resources['walltime'], largest_group) + postenvars = self.envars.copy() - postenvar_dict = {'FHR3': '#fhr#'} + postenvar_dict = {'FHR_LIST': '#fhr_list#'} for key, value in postenvar_dict.items(): postenvars.append(rocoto.create_envar(name=key, value=str(value))) - task_name = f'gefs_atmos_ensstat_f#fhr#' + task_name = f'gefs_atmos_ensstat_#fhr_label#' task_dict = {'task_name': task_name, 'resources': resources, 'dependency': dependencies, @@ -290,15 +306,6 @@ def atmos_ensstat(self): 'log': f'{self.rotdir}/logs/@Y@m@d@H/{task_name}.log', 'maxtries': '&MAXTRIES;'} - fhrs = self._get_forecast_hours('gefs', self._configs['atmos_ensstat']) - - # when replaying, atmos component does not have fhr 0, therefore remove 0 from fhrs - is_replay = self._configs['atmos_ensstat']['REPLAY_ICS'] - if is_replay and 0 in fhrs: - fhrs.remove(0) - - fhr_var_dict = {'fhr': ' '.join([f"{fhr:03d}" for fhr in fhrs])} - fhr_metatask_dict = {'task_name': f'gefs_atmos_ensstat', 'task_dict': task_dict, 'var_dict': fhr_var_dict} @@ -308,22 +315,43 @@ def atmos_ensstat(self): return task def wavepostsbs(self): + + wave_grid = self._configs['base']['waveGRD'] + history_path = self._template_to_rocoto_cycstring(self._base['COM_WAVE_HISTORY_TMPL'], {'MEMDIR': 'mem#member#'}) + history_file = f'/{self.run}wave.out_grd.{wave_grid}.@Y@m@d.@H@M@S' + deps = [] - dep_dict = {'type': 'metatask', 'name': f'gefs_fcst_mem#member#'} + dep_dict = {'type': 'data', 'data': [history_path, history_file], 'offset': [None, '#fhr3_next#:00:00']} deps.append(rocoto.add_dependency(dep_dict)) - dependencies = rocoto.create_dependency(dep=deps) + dep_dict = {'type': 'task', 'name': f'{self.run}_fcst_mem#member#_#seg_dep#'} + deps.append(rocoto.add_dependency(dep_dict)) + dependencies = rocoto.create_dependency(dep=deps, dep_condition='or') + + fhrs = self._get_forecast_hours('gefs', self._configs['wavepostsbs'], 'wave') + + # When using replay, output does not start until hour 3 + is_replay = self._configs['wavepostsbs']['REPLAY_ICS'] + if is_replay: + fhrs = [fhr for fhr in fhrs if fhr not in [0, 1, 2]] + + max_tasks = self._configs['wavepostsbs']['MAX_TASKS'] + fhr_var_dict = self.get_grouped_fhr_dict(fhrs=fhrs, ngroups=max_tasks) wave_post_envars = self.envars.copy() postenvar_dict = {'ENSMEM': '#member#', 'MEMDIR': 'mem#member#', - 'FHR3': '#fhr#', + 'FHR_LIST': '#fhr_list#', } for key, value in postenvar_dict.items(): wave_post_envars.append(rocoto.create_envar(name=key, value=str(value))) resources = self.get_resource('wavepostsbs') - task_name = f'gefs_wave_post_grid_mem#member#_f#fhr#' + # Adjust walltime based on the largest group + largest_group = max([len(grp.split(',')) for grp in fhr_var_dict['fhr_list'].split(' ')]) + resources['walltime'] = Tasks.multiply_HMS(resources['walltime'], largest_group) + + task_name = f'gefs_wave_post_grid_mem#member#_#fhr_label#' task_dict = {'task_name': task_name, 'resources': resources, 'dependency': dependencies, @@ -335,13 +363,6 @@ def wavepostsbs(self): 'maxtries': '&MAXTRIES;' } - fhrs = self._get_forecast_hours('gefs', self._configs['wavepostsbs'], 'wave') - is_replay = self._configs['wavepostsbs']['REPLAY_ICS'] - if is_replay: - fhrs = [fhr for fhr in fhrs if fhr not in [0, 1, 2]] - - fhr_var_dict = {'fhr': ' '.join([f"{fhr:03d}" for fhr in fhrs])} - fhr_metatask_dict = {'task_name': f'gefs_wave_post_grid_#member#', 'task_dict': task_dict, 'var_dict': fhr_var_dict} diff --git a/workflow/rocoto/gfs_tasks.py b/workflow/rocoto/gfs_tasks.py index 6cbf6bdb1f..d709393f95 100644 --- a/workflow/rocoto/gfs_tasks.py +++ b/workflow/rocoto/gfs_tasks.py @@ -1029,7 +1029,7 @@ def atmanlupp(self): def atmanlprod(self): postenvars = self.envars.copy() - postenvar_dict = {'FHR3': '-001'} + postenvar_dict = {'FHR_LIST': '-1'} for key, value in postenvar_dict.items(): postenvars.append(rocoto.create_envar(name=key, value=str(value))) @@ -1126,21 +1126,36 @@ def _atmosoceaniceprod(self, component: str): products_dict = {'atmos': {'config': 'atmos_products', 'history_path_tmpl': 'COM_ATMOS_MASTER_TMPL', - 'history_file_tmpl': f'{self.run}.t@Hz.master.grb2f#fhr#'}, + 'history_file_tmpl': f'{self.run}.t@Hz.master.grb2f#fhr3_last#'}, 'ocean': {'config': 'oceanice_products', 'history_path_tmpl': 'COM_OCEAN_HISTORY_TMPL', - 'history_file_tmpl': f'{self.run}.ocean.t@Hz.6hr_avg.f#fhr_next#.nc'}, + 'history_file_tmpl': f'{self.run}.ocean.t@Hz.6hr_avg.f#fhr3_next#.nc'}, 'ice': {'config': 'oceanice_products', 'history_path_tmpl': 'COM_ICE_HISTORY_TMPL', - 'history_file_tmpl': f'{self.run}.ice.t@Hz.6hr_avg.f#fhr#.nc'}} + 'history_file_tmpl': f'{self.run}.ice.t@Hz.6hr_avg.f#fhr3_last#.nc'}} component_dict = products_dict[component] config = component_dict['config'] history_path_tmpl = component_dict['history_path_tmpl'] history_file_tmpl = component_dict['history_file_tmpl'] + max_tasks = self._configs[config]['MAX_TASKS'] + resources = self.get_resource(component_dict['config']) + + fhrs = self._get_forecast_hours(self.run, self._configs[config], component) + + # ocean/ice components do not have fhr 0 as they are averaged output + if component in ['ocean', 'ice'] and 0 in fhrs: + fhrs.remove(0) + + fhr_var_dict = self.get_grouped_fhr_dict(fhrs=fhrs, ngroups=max_tasks) + + # Adjust walltime based on the largest group + largest_group = max([len(grp.split(',')) for grp in fhr_var_dict['fhr_list'].split(' ')]) + resources['walltime'] = Tasks.multiply_HMS(resources['walltime'], largest_group) + postenvars = self.envars.copy() - postenvar_dict = {'FHR3': '#fhr#', 'COMPONENT': component} + postenvar_dict = {'FHR_LIST': '#fhr_list#', 'COMPONENT': component} for key, value in postenvar_dict.items(): postenvars.append(rocoto.create_envar(name=key, value=str(value))) @@ -1154,9 +1169,8 @@ def _atmosoceaniceprod(self, component: str): dependencies = rocoto.create_dependency(dep=deps, dep_condition='or') cycledef = 'gdas_half,gdas' if self.run in ['gdas'] else self.run - resources = self.get_resource(component_dict['config']) - task_name = f'{self.run}_{component}_prod_f#fhr#' + task_name = f'{self.run}_{component}_prod_#fhr_label#' task_dict = {'task_name': task_name, 'resources': resources, 'dependency': dependencies, @@ -1168,17 +1182,6 @@ def _atmosoceaniceprod(self, component: str): 'maxtries': '&MAXTRIES;' } - fhrs = self._get_forecast_hours(self.run, self._configs[config], component) - - # ocean/ice components do not have fhr 0 as they are averaged output - if component in ['ocean', 'ice'] and 0 in fhrs: - fhrs.remove(0) - - fhr_var_dict = {'fhr': ' '.join([f"{fhr:03d}" for fhr in fhrs])} - - if component in ['ocean']: - fhrs_next = fhrs[1:] + [fhrs[-1] + (fhrs[-1] - fhrs[-2])] - fhr_var_dict['fhr_next'] = ' '.join([f"{fhr:03d}" for fhr in fhrs_next]) metatask_dict = {'task_name': f'{self.run}_{component}_prod', 'task_dict': task_dict, 'var_dict': fhr_var_dict} @@ -1189,18 +1192,32 @@ def _atmosoceaniceprod(self, component: str): def wavepostsbs(self): + wave_grid = self._configs['base']['waveGRD'] + history_path = self._template_to_rocoto_cycstring(self._base['COM_WAVE_HISTORY_TMPL']) + history_file = f'/{self.run}wave.out_grd.{wave_grid}.@Y@m@d.@H@M@S' + deps = [] - dep_dict = {'type': 'metatask', 'name': f'{self.run}_fcst'} + dep_dict = {'type': 'data', 'data': [history_path, history_file], 'offset': [None, '#fhr3_next#:00:00']} deps.append(rocoto.add_dependency(dep_dict)) - dependencies = rocoto.create_dependency(dep=deps) + dep_dict = {'type': 'task', 'name': f'{self.run}_fcst_#seg_dep#'} + deps.append(rocoto.add_dependency(dep_dict)) + dependencies = rocoto.create_dependency(dep=deps, dep_condition='or') + + fhrs = self._get_forecast_hours('gfs', self._configs['wavepostsbs'], 'wave') + max_tasks = self._configs['wavepostsbs']['MAX_TASKS'] + fhr_var_dict = self.get_grouped_fhr_dict(fhrs=fhrs, ngroups=max_tasks) wave_post_envars = self.envars.copy() - postenvar_dict = {'FHR3': '#fhr#'} + postenvar_dict = {'FHR_LIST': '#fhr_list#'} for key, value in postenvar_dict.items(): wave_post_envars.append(rocoto.create_envar(name=key, value=str(value))) resources = self.get_resource('wavepostsbs') - task_name = f'{self.run}_wavepostsbs_f#fhr#' + # Adjust walltime based on the largest group + largest_group = max([len(grp.split(',')) for grp in fhr_var_dict['fhr_list'].split(' ')]) + resources['walltime'] = Tasks.multiply_HMS(resources['walltime'], largest_group) + + task_name = f'{self.run}_wavepostsbs_#fhr_label#' task_dict = {'task_name': task_name, 'resources': resources, 'dependency': dependencies, @@ -1212,12 +1229,9 @@ def wavepostsbs(self): 'maxtries': '&MAXTRIES;' } - fhrs = self._get_forecast_hours('gfs', self._configs['wavepostsbs'], 'wave') - - fhr_metatask_dict = {'fhr': ' '.join([f"{fhr:03d}" for fhr in fhrs])} metatask_dict = {'task_name': f'{self.run}_wavepostsbs', 'task_dict': task_dict, - 'var_dict': fhr_metatask_dict} + 'var_dict': fhr_var_dict} task = rocoto.create_task(metatask_dict) @@ -1512,17 +1526,26 @@ def awips_20km_1p0deg(self): def gempak(self): deps = [] - dep_dict = {'type': 'task', 'name': f'{self.run}_atmos_prod_f#fhr#'} + dep_dict = {'type': 'task', 'name': f'{self.run}_atmos_prod_#fhr_label#'} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep=deps) + fhrs = self._get_forecast_hours(self.run, self._configs['gempak']) + max_tasks = self._configs['gempak']['MAX_TASKS'] + fhr_var_dict = self.get_grouped_fhr_dict(fhrs=fhrs, ngroups=max_tasks) + + resources = self.get_resource('gempak') + # Adjust walltime based on the largest group + largest_group = max([len(grp.split(',')) for grp in fhr_var_dict['fhr_list'].split(' ')]) + resources['walltime'] = Tasks.multiply_HMS(resources['walltime'], largest_group) + gempak_vars = self.envars.copy() - gempak_dict = {'FHR3': '#fhr#'} + gempak_dict = {'FHR_LIST': '#fhr_list#'} for key, value in gempak_dict.items(): gempak_vars.append(rocoto.create_envar(name=key, value=str(value))) resources = self.get_resource('gempak') - task_name = f'{self.run}_gempak_f#fhr#' + task_name = f'{self.run}_gempak_#fhr_label#' task_dict = {'task_name': task_name, 'resources': resources, 'dependency': dependencies, @@ -1534,9 +1557,6 @@ def gempak(self): 'maxtries': '&MAXTRIES;' } - fhrs = self._get_forecast_hours(self.run, self._configs['gempak']) - fhr_var_dict = {'fhr': ' '.join([f"{fhr:03d}" for fhr in fhrs])} - fhr_metatask_dict = {'task_name': f'{self.run}_gempak', 'task_dict': task_dict, 'var_dict': fhr_var_dict} diff --git a/workflow/rocoto/tasks.py b/workflow/rocoto/tasks.py index d9c769ffbe..3c215414b5 100644 --- a/workflow/rocoto/tasks.py +++ b/workflow/rocoto/tasks.py @@ -3,8 +3,9 @@ import numpy as np from applications.applications import AppConfig import rocoto.rocoto as rocoto -from wxflow import Template, TemplateConstants, to_timedelta -from typing import List +from wxflow import Template, TemplateConstants, to_timedelta, timedelta_to_HMS +from typing import List, Union +from bisect import bisect_right __all__ = ['Tasks'] @@ -176,6 +177,127 @@ def _get_forecast_hours(run, config, component='atmos') -> List[str]: return fhrs + @staticmethod + def get_job_groups(fhrs: List[int], ngroups: int, breakpoints: List[int] = None) -> List[dict]: + ''' + Split forecast hours into a number of groups, obeying a list of pre-set breakpoints. + + Takes a list of forecast hours and splits it into a number of groups while obeying + a list of pre-set breakpoints and recording which segment each belongs to. + + Parameters + ---------- + fhrs: List[int] + List of forecast hours to break into groups + ngroups: int + Number of groups to split the forecast hours into + breakpoints: List[int] + List of preset forecast hour break points to use (default: []) + + Returns + ------- + List[dict]: List of dicts, where each dict contains two keys: + 'fhrs': the forecast hours for that group + 'seg': the forecast segment (from the original breakpoint list) + the group belong to + ''' + if breakpoints is None: + breakpoints = [] + + num_segs = len(breakpoints) + 1 + if num_segs > ngroups: + raise ValueError(f"Number of segments ({num_segs}) is greater than the number of groups ({ngroups}") + + if ngroups > len(fhrs): + ngroups = len(fhrs) + + # First, split at segment boundaries + fhrs_segs = [grp.tolist() for grp in np.array_split(fhrs, [bisect_right(fhrs, bpnt) for bpnt in breakpoints if bpnt < max(fhrs)])] + seg_lens = [len(seg) for seg in fhrs_segs] + + # Initialize each segment to be split into one job group + ngroups_segs = [1 for _ in range(0, len(fhrs_segs))] + + # For remaining job groups, iteratively assign to the segment with the most + # hours per group + for _ in range(0, ngroups - len(fhrs_segs)): + current_lens = [size / weight for size, weight in zip(seg_lens, ngroups_segs)] + index_max = max(range(len(current_lens)), key=current_lens.__getitem__) + ngroups_segs[index_max] += 1 + + # Now that we know how many groups each forecast segment should be split into, + # Split them and flatten to a single list. + groups = [] + for seg_num, (fhrs_seg, ngroups_seg) in enumerate(zip(fhrs_segs, ngroups_segs)): + [groups.append({'fhrs': grp.tolist(), 'seg': seg_num}) for grp in np.array_split(fhrs_seg, ngroups_seg)] + + return groups + + def get_grouped_fhr_dict(self, fhrs: List[int], ngroups: int) -> dict: + ''' + Prepare a metatask dictionary for forecast hour groups. + + Takes a list of forecast hours and splits it into a number of groups while not + crossing forecast segment boundaries. Then use that to prepare a dict with key + variable lists for use in a rocoto metatask. + + Parameters + ---------- + fhrs: List[int] + List of forecast hours to break into groups + ngroups: int + Number of groups to split the forecast hours into + + Returns + ------- + dict: Several variable lists for use in rocoto metatasks: + fhr_list: list of comma-separated lists of fhr groups + fhr_label: list of labels corresponding to the fhr range + fhr3_last: list of the last fhr in each group, formatted to three digits + fhr3_next: list of the fhr that would follow each group, formatted to + three digits + seg_dep: list of segments each group belongs to + ''' + fhr_breakpoints = self.options['fcst_segments'][1:-1] + group_dicts = Tasks.get_job_groups(fhrs=fhrs, ngroups=ngroups, breakpoints=fhr_breakpoints) + + fhrs_group = [dct['fhrs'] for dct in group_dicts] + fhrs_first = [grp[0] for grp in fhrs_group] + fhrs_last = [grp[-1] for grp in fhrs_group] + fhrs_next = fhrs_first[1:] + [fhrs_last[-1] + (fhrs[-1] - fhrs[-2])] + grp_str = [f'f{grp[0]:03d}-f{grp[-1]:03d}' if len(grp) > 1 else f'f{grp[0]:03d}' for grp in fhrs_group] + seg_deps = [f'seg{dct["seg"]}' for dct in group_dicts] + + fhr_var_dict = {'fhr_list': ' '.join(([','.join(str(fhr) for fhr in grp) for grp in fhrs_group])), + 'fhr_label': ' '.join(grp_str), + 'seg_dep': ' '.join(seg_deps), + 'fhr3_last': ' '.join([f'{fhr:03d}' for fhr in fhrs_last]), + 'fhr3_next': ' '.join([f'{fhr:03d}' for fhr in fhrs_next]) + } + + return fhr_var_dict + + @staticmethod + def multiply_HMS(hms_timedelta: str, multiplier: Union[int, float]) -> str: + ''' + Multiplies an HMS timedelta string + + Parameters + ---------- + hms_timedelta: str + String representing a time delta in HH:MM:SS format + multiplier: int | float + Value to multiply the time delta by + + Returns + ------- + str: String representing a time delta in HH:MM:SS format + + ''' + input_timedelta = to_timedelta(hms_timedelta) + output_timedelta = input_timedelta * multiplier + return timedelta_to_HMS(output_timedelta) + def get_resource(self, task_name): """ Given a task name (task_name) and its configuration (task_names), diff --git a/workflow/tests/__init__.py b/workflow/tests/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/workflow/test_configuration.py b/workflow/tests/test_configuration.py similarity index 100% rename from workflow/test_configuration.py rename to workflow/tests/test_configuration.py diff --git a/workflow/test_hosts.py b/workflow/tests/test_hosts.py similarity index 100% rename from workflow/test_hosts.py rename to workflow/tests/test_hosts.py diff --git a/workflow/tests/test_tasks.py b/workflow/tests/test_tasks.py new file mode 100644 index 0000000000..7e7e7eb5e7 --- /dev/null +++ b/workflow/tests/test_tasks.py @@ -0,0 +1,84 @@ +from rocoto.tasks import Tasks + + +class TestTasks: + + ''' + Tasks class tests + + Note: this is currently only testing a small fraction of the class. + ''' + + def test_job_groups(self): + test_array = list(range(0, 24)) + + # Test simple splitting with no breakpoints + test_groups = [{'fhrs': [0, 1, 2, 3, 4, 5], 'seg': 0}, + {'fhrs': [6, 7, 8, 9, 10, 11], 'seg': 0}, + {'fhrs': [12, 13, 14, 15, 16, 17], 'seg': 0}, + {'fhrs': [18, 19, 20, 21, 22, 23], 'seg': 0}] + assert Tasks.get_job_groups(fhrs=test_array, ngroups=4) == test_groups + + # Test with a break point that aligns with normal split point + test_groups = [{'fhrs': [0, 1, 2, 3, 4, 5], 'seg': 0}, + {'fhrs': [6, 7, 8, 9, 10, 11], 'seg': 0}, + {'fhrs': [12, 13, 14, 15, 16, 17], 'seg': 1}, + {'fhrs': [18, 19, 20, 21, 22, 23], 'seg': 1}] + assert Tasks.get_job_groups(fhrs=test_array, ngroups=4, breakpoints=[11]) == test_groups + + # Test with a break point not at a normal split point + test_groups = [{'fhrs': [0, 1, 2, 3, 4, 5, 6, 7], 'seg': 0}, + {'fhrs': [8, 9, 10, 11, 12, 13, 14], 'seg': 0}, + {'fhrs': [15, 16, 17, 18, 19], 'seg': 1}, + {'fhrs': [20, 21, 22, 23], 'seg': 1}] + assert Tasks.get_job_groups(fhrs=test_array, ngroups=4, breakpoints=[14]) == test_groups + + # Test highly skewed break point + test_groups = [{'fhrs': [0, 1, 2, 3, 4, 5, 6, 7], 'seg': 0}, + {'fhrs': [8, 9, 10, 11, 12, 13, 14, 15], 'seg': 0}, + {'fhrs': [16, 17, 18, 19, 20, 21, 22], 'seg': 0}, + {'fhrs': [23], 'seg': 1}] + assert Tasks.get_job_groups(fhrs=test_array, ngroups=4, breakpoints=[22]) == test_groups + + # Test with two break points that align + test_groups = [{'fhrs': [0, 1, 2, 3, 4, 5], 'seg': 0}, + {'fhrs': [6, 7, 8, 9, 10, 11], 'seg': 0}, + {'fhrs': [12, 13, 14, 15, 16, 17], 'seg': 1}, + {'fhrs': [18, 19, 20, 21, 22, 23], 'seg': 2}] + assert Tasks.get_job_groups(fhrs=test_array, ngroups=4, breakpoints=[11, 17]) == test_groups + + # Test with two skewed break points + test_groups = [{'fhrs': [0, 1], 'seg': 0}, + {'fhrs': [2, 3, 4, 5, 6, 7], 'seg': 1}, + {'fhrs': [8, 9, 10, 11, 12], 'seg': 1}, + {'fhrs': [13, 14, 15, 16, 17], 'seg': 1}, + {'fhrs': [18, 19, 20, 21, 22], 'seg': 1}, + {'fhrs': [23], 'seg': 2}] + assert Tasks.get_job_groups(fhrs=test_array, ngroups=6, breakpoints=[1, 22]) == test_groups + + # Test slightly irregular break points + test_groups = [{'fhrs': [0, 1, 2, 3], 'seg': 0}, + {'fhrs': [4, 5, 6], 'seg': 0}, + {'fhrs': [7, 8, 9, 10], 'seg': 1}, + {'fhrs': [11, 12, 13, 14], 'seg': 1}, + {'fhrs': [15, 16, 17, 18], 'seg': 1}, + {'fhrs': [19, 20, 21, 22, 23], 'seg': 2}] + assert Tasks.get_job_groups(fhrs=test_array, ngroups=6, breakpoints=[6, 18]) == test_groups + + # Test more groups than fhrs available + test_array = list(range(0, 6)) + test_groups = [{'fhrs': [0], 'seg': 0}, + {'fhrs': [1], 'seg': 0}, + {'fhrs': [2], 'seg': 0}, + {'fhrs': [3], 'seg': 0}, + {'fhrs': [4], 'seg': 0}, + {'fhrs': [5], 'seg': 0}] + assert Tasks.get_job_groups(fhrs=test_array, ngroups=15) == test_groups + + def test_multiply_HMS(self): + assert Tasks.multiply_HMS('00:10:00', 2) == '00:20:00' + assert Tasks.multiply_HMS('00:30:00', 10) == '05:00:00' + assert Tasks.multiply_HMS('01:15:00', 4) == '05:00:00' + assert Tasks.multiply_HMS('00:05:00', 1.5) == '00:07:30' + assert Tasks.multiply_HMS('00:40:00', 2.5) == '01:40:00' + assert Tasks.multiply_HMS('00:10:00', 1) == '00:10:00'