diff --git a/parm/evs_config/mesoscale/config.evs.prod.stats.mesoscale.atmos.grid2obs.nam b/parm/evs_config/mesoscale/config.evs.prod.stats.mesoscale.atmos.grid2obs.nam index 585c1ea32..fafe82fc9 100644 --- a/parm/evs_config/mesoscale/config.evs.prod.stats.mesoscale.atmos.grid2obs.nam +++ b/parm/evs_config/mesoscale/config.evs.prod.stats.mesoscale.atmos.grid2obs.nam @@ -15,6 +15,10 @@ echo "BEGIN: $(basename ${BASH_SOURCE[0]})" export MET_PLUS_OUT=${DATA}/${VERIF_CASE}/METplus_output export MET_CONFIG_OVERRIDES="" +# Restart Settings +export RESTART_DIR="${COMOUTsmall}/restart/c${vhr}" +export COMPLETED_JOBS_FILE="completed_jobs.txt" + # Time Settings export FHR_START=0 export FHR_INCR=6 diff --git a/parm/evs_config/mesoscale/config.evs.prod.stats.mesoscale.atmos.grid2obs.rap b/parm/evs_config/mesoscale/config.evs.prod.stats.mesoscale.atmos.grid2obs.rap index 70cdfadf0..3947d8dc6 100644 --- a/parm/evs_config/mesoscale/config.evs.prod.stats.mesoscale.atmos.grid2obs.rap +++ b/parm/evs_config/mesoscale/config.evs.prod.stats.mesoscale.atmos.grid2obs.rap @@ -15,6 +15,10 @@ echo "BEGIN: $(basename ${BASH_SOURCE[0]})" export MET_PLUS_OUT=${DATA}/${VERIF_CASE}/METplus_output export MET_CONFIG_OVERRIDES="" +# Restart Settings +export RESTART_DIR="${COMOUTsmall}/restart/c${vhr}" +export COMPLETED_JOBS_FILE="completed_jobs.txt" + # Time Settings export FHR_START=0 export FHR_INCR=1 diff --git a/scripts/stats/mesoscale/exevs_mesoscale_nam_grid2obs_stats.sh b/scripts/stats/mesoscale/exevs_mesoscale_nam_grid2obs_stats.sh index a968bea86..38c11bd7e 100755 --- a/scripts/stats/mesoscale/exevs_mesoscale_nam_grid2obs_stats.sh +++ b/scripts/stats/mesoscale/exevs_mesoscale_nam_grid2obs_stats.sh @@ -42,6 +42,7 @@ echo "*****************************" # Reformat MET Data export job_type="reformat" export njob=1 + export run_restart=true for NEST in $NEST_LIST; do export NEST=$NEST for VERIF_TYPE in $VERIF_TYPES; do @@ -55,8 +56,12 @@ echo "*****************************" # Check for restart files reformat echo " Check for restart files reformat begin" if [ $evs_run_mode = production ]; then - ${USHevs}/mesoscale/mesoscale_stats_g2o_production_restart.sh - export err=$?; err_chk + # Check For Restart Files + if [ "$run_restart" = true ]; then + python ${USHevs}/mesoscale/mesoscale_production_restart.py + export err=$?; err_chk + export run_restart=false + fi fi echo " Check for restart files reformat done" diff --git a/scripts/stats/mesoscale/exevs_mesoscale_rap_grid2obs_stats.sh b/scripts/stats/mesoscale/exevs_mesoscale_rap_grid2obs_stats.sh index 535bb7253..61deeae2c 100755 --- a/scripts/stats/mesoscale/exevs_mesoscale_rap_grid2obs_stats.sh +++ b/scripts/stats/mesoscale/exevs_mesoscale_rap_grid2obs_stats.sh @@ -43,6 +43,7 @@ echo "*****************************" # Reformat MET Data export job_type="reformat" export njob=1 + export run_restart=true for NEST in $NEST_LIST; do export NEST=$NEST for VERIF_TYPE in $VERIF_TYPES; do @@ -58,8 +59,12 @@ echo "*****************************" # Check for restart files reformat echo " Check for restart files reformat begin" if [ $evs_run_mode = production ]; then - ${USHevs}/mesoscale/mesoscale_stats_g2o_production_restart.sh - export err=$?; err_chk + # Check For Restart Files + if [ "$run_restart" = true ]; then + python ${USHevs}/mesoscale/mesoscale_production_restart.py + export err=$?; err_chk + export run_restart=false + fi fi echo " Check for restart files reformat done" diff --git a/ush/mesoscale/mesoscale_check_settings.py b/ush/mesoscale/mesoscale_check_settings.py index 11e3c14c4..80473f4d8 100644 --- a/ush/mesoscale/mesoscale_check_settings.py +++ b/ush/mesoscale/mesoscale_check_settings.py @@ -58,7 +58,7 @@ 'MET_PLUS_CONF','MET_PLUS_OUT', 'LOG_MET_OUTPUT_TO_METPLUS','NEST','TEMP_DIR','GRID_DIR','URL_HEAD', ] -evs_mesoscale_settings_dict['RUN_GRID2OBS_STATS'] = ['bufr_ROOT'] +evs_mesoscale_settings_dict['RUN_GRID2OBS_STATS'] = ['RESTART_DIR', 'bufr_ROOT'] evs_mesoscale_settings_dict['RUN_GRID2OBS_PLOTS'] = [ 'MET_VERSION','IMG_HEADER','PRUNE_DIR','SAVE_DIR','LOG_TEMPLATE', 'LOG_LEVEL','STAT_OUTPUT_BASE_DIR','STAT_OUTPUT_BASE_TEMPLATE', diff --git a/ush/mesoscale/mesoscale_create_output_dirs.py b/ush/mesoscale/mesoscale_create_output_dirs.py index 393470bfe..2e72bf7d5 100644 --- a/ush/mesoscale/mesoscale_create_output_dirs.py +++ b/ush/mesoscale/mesoscale_create_output_dirs.py @@ -85,6 +85,7 @@ COMOUTplots = os.environ['COMOUTplots'] if STEP == 'stats': job_type = os.environ['job_type'] + RESTART_DIR = os.environ['RESTART_DIR'] if STEP == 'plots': RESTART_DIR = os.environ['RESTART_DIR'] @@ -230,23 +231,39 @@ working_output_base_dir = os.path.join( DATA, VERIF_CASE, 'METplus_output', VERIF_TYPE ) + COMOUT_restart_base_dir = os.path.join( + RESTART_DIR, 'METplus_output', VERIF_TYPE + ) if job_type == 'generate': working_output_base_dir = os.path.join( DATA, VERIF_CASE, 'METplus_output', VERIF_TYPE ) + COMOUT_restart_base_dir = os.path.join( + RESTART_DIR, 'METplus_output', VERIF_TYPE + ) if job_type == 'gather': working_output_base_dir = os.path.join( DATA, VERIF_CASE, 'METplus_output', 'gather_small' ) + COMOUT_restart_base_dir = os.path.join( + RESTART_DIR, 'METplus_output', 'gather_small' + ) if job_type == 'gather2': working_output_base_dir = os.path.join( DATA, VERIF_CASE, 'METplus_output' ) + COMOUT_restart_base_dir = os.path.join( + RESTART_DIR, 'METplus_output' + ) if job_type == 'gather3': working_output_base_dir = os.path.join( DATA, VERIF_CASE, 'METplus_output' ) + COMOUT_restart_base_dir = os.path.join( + RESTART_DIR, 'METplus_output' + ) working_dir_list.append(working_output_base_dir) + COMOUT_dir_list.append(COMOUT_restart_base_dir) if job_type == 'reformat': working_dir_list.append(os.path.join( working_output_base_dir, NEST, 'pb2nc', 'confs' @@ -257,6 +274,9 @@ working_dir_list.append(os.path.join( working_output_base_dir, NEST, 'pb2nc', 'tmp' )) + COMOUT_dir_list.append(os.path.join( + COMOUT_restart_base_dir, NEST, 'pb2nc' + )) if NEST in ['spc_otlk', 'firewx']: working_dir_list.append(os.path.join( working_output_base_dir, 'genvxmask', 'confs' @@ -271,6 +291,10 @@ working_output_base_dir, 'genvxmask', NEST+'.'+vdate_dt.strftime('%Y%m%d') )) + COMOUT_dir_list.append(os.path.join( + COMOUT_restart_base_dir, 'genvxmask', + NEST+'.'+vdate_dt.strftime('%Y%m%d') + )) if job_type == 'generate': working_dir_list.append(os.path.join( working_output_base_dir, 'regrid_data_plane', 'confs' @@ -285,6 +309,10 @@ working_output_base_dir, 'regrid_data_plane', MODELNAME+'.'+vdate_dt.strftime('%Y%m%d') )) + COMOUT_dir_list.append(os.path.join( + COMOUT_restart_base_dir, 'regrid_data_plane', + MODELNAME+'.'+vdate_dt.strftime('%Y%m%d') + )) working_dir_list.append(os.path.join( working_output_base_dir, 'point_stat', 'confs' )) @@ -298,6 +326,10 @@ working_output_base_dir, 'point_stat', MODELNAME+'.'+vdate_dt.strftime('%Y%m%d') )) + COMOUT_dir_list.append(os.path.join( + COMOUT_restart_base_dir, 'point_stat', + MODELNAME+'.'+vdate_dt.strftime('%Y%m%d') + )) if job_type in ['gather', 'gather2', 'gather3']: working_dir_list.append(os.path.join( working_output_base_dir, 'stat_analysis', 'confs' @@ -312,6 +344,10 @@ working_output_base_dir, 'stat_analysis', MODELNAME+'.'+vdate_dt.strftime('%Y%m%d') )) + COMOUT_dir_list.append(os.path.join( + COMOUT_restart_base_dir, 'stat_analysis', + MODELNAME+'.'+vdate_dt.strftime('%Y%m%d') + )) COMOUT_dir_list.append(os.path.join( COMOUTsmall, 'gather_small' diff --git a/ush/mesoscale/mesoscale_production_restart.py b/ush/mesoscale/mesoscale_production_restart.py index b19466cb3..3ed7a90da 100644 --- a/ush/mesoscale/mesoscale_production_restart.py +++ b/ush/mesoscale/mesoscale_production_restart.py @@ -1,9 +1,12 @@ #!/usr/bin/env python3 -''' -Name: mesoscale_production_restart.py -Contact(s): Marcel Caron -Abstract: -''' +# ============================================================================= +# +# NAME: mesoscale_production_restart.py +# CONTRIBUTOR(S): Marcel Caron, marcel.caron@noaa.gov, NOAA/NWS/NCEP/EMC-VPPPGB +# PURPOSE: Check the appropriate restart directory for restart files and copy +# the available files to the working directory +# +# ============================================================================= import os import glob @@ -20,20 +23,33 @@ NET = os.environ['NET'] RUN = os.environ['RUN'] COMPONENT = os.environ['COMPONENT'] -VERIF_CASE = os.environ['VERIF_CASE'] STEP = os.environ['STEP'] +VERIF_CASE = os.environ['VERIF_CASE'] # Copy files for restart -if STEP == 'plots': +if STEP == 'stats': + VERIF_CASE = os.environ['VERIF_CASE'] + RESTART_DIR = os.environ['RESTART_DIR'] + working_dir = os.path.join(DATA, VERIF_CASE) + completed_jobs_file = os.path.join(RESTART_DIR, 'completed_jobs.txt') + if os.path.exists(RESTART_DIR): + if (os.path.exists(completed_jobs_file) + and os.stat(completed_jobs_file).st_size != 0): + print(f"Copying restart directory {RESTART_DIR} " + +f"into working directory {working_dir}") + cutil.run_shell_command( + ['cp', '-rpv', RESTART_DIR, working_dir] + ) +elif STEP == 'plots': COMOUTplots = os.environ['COMOUTplots'] RESTART_DIR = os.environ['RESTART_DIR'] - COMPLETED_JOBS_FILE = os.environ['COMPLETED_JOBS_FILE'] - working_dir = os.path.join(DATA, VERIF_CASE, 'out') - completed_jobs_file = os.path.join(RESTART_DIR, COMPLETED_JOBS_FILE) + SAVE_DIR = os.environ['SAVE_DIR'] + completed_jobs_file = os.path.join(RESTART_DIR, f'completed_jobs.txt') if os.path.exists(completed_jobs_file): if os.stat(completed_jobs_file).st_size != 0: cutil.run_shell_command( - ['cp', '-rpv', os.path.join(RESTART_DIR,'*'), working_dir] + ['cp', '-rpv', os.path.join(RESTART_DIR,'*'), SAVE_DIR] ) + print("END: "+os.path.basename(__file__)) diff --git a/ush/mesoscale/mesoscale_stats_grid2obs_create_job_script.py b/ush/mesoscale/mesoscale_stats_grid2obs_create_job_script.py index 0c0147fcd..c136505e0 100644 --- a/ush/mesoscale/mesoscale_stats_grid2obs_create_job_script.py +++ b/ush/mesoscale/mesoscale_stats_grid2obs_create_job_script.py @@ -35,6 +35,7 @@ MET_PATH = os.environ['MET_PATH'] MET_CONFIG = os.environ['MET_CONFIG'] DATA = os.environ['DATA'] +RESTART_DIR = os.environ['RESTART_DIR'] VDATE = os.environ['VDATE'] MET_PLUS_CONF = os.environ['MET_PLUS_CONF'] @@ -43,6 +44,8 @@ machine_conf = os.path.join( os.environ['PARMevs'], 'metplus_config', 'machine.conf' ) +COMPLETED_JOBS_FILE = os.environ['COMPLETED_JOBS_FILE'] + if job_type == 'reformat': VERIF_TYPE = os.environ['VERIF_TYPE'] NEST = os.environ['NEST'] @@ -96,7 +99,7 @@ VERIF_TYPE = os.environ['VERIF_TYPE'] njob = os.environ['njob'] MET_PLUS_OUT = os.path.join( - os.environ['MET_PLUS_OUT'], 'workdirs', job_type, f'job{njob}' + os.environ['MET_PLUS_OUT'], 'workdirs', job_type, f'job{njob}' ) elif job_type in ['gather2','gather3']: VERIF_TYPE = os.environ['VERIF_TYPE'] @@ -366,62 +369,443 @@ pass elif STEP == 'stats': if job_type == 'reformat': + if f'{job_type}_job{njob}' in cutil.get_completed_jobs(os.path.join(RESTART_DIR, COMPLETED_JOBS_FILE)): + job_cmd_list_iterative.append( + f'#jobs were restarted, and the following has already run successfully' + ) + job_cmd_list_iterative.append( + f'#{metplus_launcher} -c {machine_conf} ' + + f'-c {MET_PLUS_CONF}/' + + f'PB2NC_obs{VERIF_TYPE.upper()}.conf' + ) + job_cmd_list_iterative.append( + f'#python -c ' + + '\"import mesoscale_util as cutil; cutil.copy_data_to_restart(' + + '\\\"${DATA}\\\", \\\"${RESTART_DIR}\\\", ' + + f'njob=\\\"{njob}\\\", ' + + 'verif_case=\\\"${VERIF_CASE}\\\", ' + + 'verif_type=\\\"${VERIF_TYPE}\\\", ' + + 'vx_mask=\\\"${NEST}\\\", ' + + 'met_tool=\\\"pb2nc\\\", ' + + 'vdate=\\\"${VDATE}\\\", ' + + 'vhour=\\\"${VHOUR}\\\"' + + ')\"' + ) + else: job_cmd_list_iterative.append( f'{metplus_launcher} -c {machine_conf} ' + f'-c {MET_PLUS_CONF}/' + f'PB2NC_obs{VERIF_TYPE.upper()}.conf' ) + job_cmd_list_iterative.append( + f'python -c ' + + '\"import mesoscale_util as cutil; cutil.copy_data_to_restart(' + + '\\\"${DATA}\\\", \\\"${RESTART_DIR}\\\", ' + + f'njob=\\\"{njob}\\\", ' + + 'verif_case=\\\"${VERIF_CASE}\\\", ' + + 'verif_type=\\\"${VERIF_TYPE}\\\", ' + + 'vx_mask=\\\"${NEST}\\\", ' + + 'met_tool=\\\"pb2nc\\\", ' + + 'vdate=\\\"${VDATE}\\\", ' + + 'vhour=\\\"${VHOUR}\\\"' + + ')\"' + ) + job_cmd_list_iterative.append( + "python -c " + + f"'import mesoscale_util; mesoscale_util.mark_job_completed(" + + f"\"{os.path.join(RESTART_DIR, COMPLETED_JOBS_FILE)}\", " + + f"\"job{njob}\", job_type=\"{job_type}\")'" + ) if job_type == 'generate': if FCST_VAR2_NAME: + if f'{job_type}_job{njob}' in cutil.get_completed_jobs(os.path.join(RESTART_DIR, COMPLETED_JOBS_FILE)): + job_cmd_list_iterative.append( + f'#jobs were restarted, and the following has already run successfully' + ) + job_cmd_list_iterative.append( + f'#{metplus_launcher} -c {machine_conf} ' + + f'-c {MET_PLUS_CONF}/' + + f'PointStat_fcst{COMPONENT.upper()}_' + + f'obs{VERIF_TYPE.upper()}_{str(NEST).upper()}_VAR2.conf' + ) + job_cmd_list_iterative.append( + f'#python -c ' + + '\"import mesoscale_util as cutil; cutil.copy_data_to_restart(' + + '\\\"${DATA}\\\", \\\"${RESTART_DIR}\\\", ' + + f'njob=\\\"{njob}\\\", ' + + 'verif_case=\\\"${VERIF_CASE}\\\", ' + + 'verif_type=\\\"${VERIF_TYPE}\\\", ' + + 'vx_mask=\\\"${NEST}\\\", ' + + 'met_tool=\\\"point_stat\\\", ' + + 'vdate=\\\"${VDATE}\\\", ' + + 'vhour=\\\"${VHOUR}\\\", ' + + 'fhr_start=\\\"${FHR_START}\\\", ' + + 'fhr_end=\\\"${FHR_END}\\\", ' + + 'fhr_incr=\\\"${FHR_INCR}\\\", ' + + 'model=\\\"${MODELNAME}\\\", ' + + 'var_name=\\\"${VAR_NAME}\\\"' + + ')\"' + ) + else: job_cmd_list_iterative.append( f'{metplus_launcher} -c {machine_conf} ' + f'-c {MET_PLUS_CONF}/' + f'PointStat_fcst{COMPONENT.upper()}_obs{VERIF_TYPE.upper()}_VAR2.conf' ) + job_cmd_list_iterative.append( + f'python -c ' + + '\"import mesoscale_util as cutil; cutil.copy_data_to_restart(' + + '\\\"${DATA}\\\", \\\"${RESTART_DIR}\\\", ' + + f'njob=\\\"{njob}\\\", ' + + 'verif_case=\\\"${VERIF_CASE}\\\", ' + + 'verif_type=\\\"${VERIF_TYPE}\\\", ' + + 'vx_mask=\\\"${NEST}\\\", ' + + 'met_tool=\\\"point_stat\\\", ' + + 'vdate=\\\"${VDATE}\\\", ' + + 'vhour=\\\"${VHOUR}\\\", ' + + 'fhr_start=\\\"${FHR_START}\\\", ' + + 'fhr_end=\\\"${FHR_END}\\\", ' + + 'fhr_incr=\\\"${FHR_INCR}\\\", ' + + 'model=\\\"${MODELNAME}\\\", ' + + 'var_name=\\\"${VAR_NAME}\\\"' + + ')\"' + ) + job_cmd_list_iterative.append( + "python -c " + + f"'import mesoscale_util; mesoscale_util.mark_job_completed(" + + f"\"{os.path.join(RESTART_DIR, COMPLETED_JOBS_FILE)}\", " + + f"\"job{njob}\", job_type=\"{job_type}\")'" + ) else: if NEST == 'conusp': if VAR_NAME == 'PTYPE': + if f'{job_type}_job{njob}' in cutil.get_completed_jobs(os.path.join(RESTART_DIR, COMPLETED_JOBS_FILE)): job_cmd_list_iterative.append( - f'{metplus_launcher} -c {machine_conf} ' + f'#jobs were restarted, and the following has already run successfully' + ) + job_cmd_list_iterative.append( + f'#{metplus_launcher} -c {machine_conf} ' + f'-c {MET_PLUS_CONF}/' + f'RegridDataPlane_fcst{COMPONENT.upper()}_PTYPE.conf' ) job_cmd_list_iterative.append( - f'python ' + f'#python -c ' + + '\"import mesoscale_util as cutil; cutil.copy_data_to_restart(' + + '\\\"${DATA}\\\", \\\"${RESTART_DIR}\\\", ' + + f'njob=\\\"{njob}\\\", ' + + 'verif_case=\\\"${VERIF_CASE}\\\", ' + + 'verif_type=\\\"${VERIF_TYPE}\\\", ' + + 'vx_mask=\\\"${NEST}\\\", ' + + 'met_tool=\\\"regrid_data_plane\\\", ' + + 'vdate=\\\"${VDATE}\\\", ' + + 'vhour=\\\"${VHOUR}\\\", ' + + 'fhr_start=\\\"${FHR_START}\\\", ' + + 'fhr_end=\\\"${FHR_END}\\\", ' + + 'fhr_incr=\\\"${FHR_INCR}\\\", ' + + 'model=\\\"${MODELNAME}\\\", ' + + f'njob=\\\"{njob}\\\"' + + ')\"' + ) + job_cmd_list_iterative.append( + f'#python ' + f'{USHevs}/{COMPONENT}/' + f'{COMPONENT}_{STEP}_{VERIF_CASE}_create_merged_ptype.py' ) - job_cmd_list_iterative.append( - f'{metplus_launcher} -c {machine_conf} ' + f'#python -c ' + + '\"import mesoscale_util as cutil; cutil.copy_data_to_restart(' + + '\\\"${DATA}\\\", \\\"${RESTART_DIR}\\\", ' + + f'njob=\\\"{njob}\\\", ' + + 'verif_case=\\\"${VERIF_CASE}\\\", ' + + 'verif_type=\\\"${VERIF_TYPE}\\\", ' + + 'vx_mask=\\\"${NEST}\\\", ' + + 'met_tool=\\\"merged_ptype\\\", ' + + 'vdate=\\\"${VDATE}\\\", ' + + 'vhour=\\\"${VHOUR}\\\", ' + + 'fhr_start=\\\"${FHR_START}\\\", ' + + 'fhr_end=\\\"${FHR_END}\\\", ' + + 'fhr_incr=\\\"${FHR_INCR}\\\", ' + + 'model=\\\"${MODELNAME}\\\", ' + + f'njob=\\\"{njob}\\\"' + + ')\"' + ) + job_cmd_list_iterative.append( + f'#{metplus_launcher} -c {machine_conf} ' + f'-c {MET_PLUS_CONF}/' + f'PointStat_fcst{COMPONENT.upper()}_' + f'obs{VERIF_TYPE.upper()}_{VAR_NAME}.conf' - ) + ) + job_cmd_list_iterative.append( + f'#python -c ' + + '\"import mesoscale_util as cutil; cutil.copy_data_to_restart(' + + '\\\"${DATA}\\\", \\\"${RESTART_DIR}\\\", ' + + f'njob=\\\"{njob}\\\", ' + + 'verif_case=\\\"${VERIF_CASE}\\\", ' + + 'verif_type=\\\"${VERIF_TYPE}\\\", ' + + 'vx_mask=\\\"${NEST}\\\", ' + + 'met_tool=\\\"point_stat\\\", ' + + 'vdate=\\\"${VDATE}\\\", ' + + 'vhour=\\\"${VHOUR}\\\", ' + + 'fhr_start=\\\"${FHR_START}\\\", ' + + 'fhr_end=\\\"${FHR_END}\\\", ' + + 'fhr_incr=\\\"${FHR_INCR}\\\", ' + + 'model=\\\"${MODELNAME}\\\", ' + + 'var_name=\\\"${VAR_NAME}\\\"' + + ')\"' + ) + else: + job_cmd_list_iterative.append( + f'{metplus_launcher} -c {machine_conf} ' + + f'-c {MET_PLUS_CONF}/' + + f'RegridDataPlane_fcst{COMPONENT.upper()}_PTYPE.conf' + ) + job_cmd_list_iterative.append( + f'python -c ' + + '\"import mesoscale_util as cutil; cutil.copy_data_to_restart(' + + '\\\"${DATA}\\\", \\\"${RESTART_DIR}\\\", ' + + f'njob=\\\"{njob}\\\", ' + + 'verif_case=\\\"${VERIF_CASE}\\\", ' + + 'verif_type=\\\"${VERIF_TYPE}\\\", ' + + 'vx_mask=\\\"${NEST}\\\", ' + + 'met_tool=\\\"regrid_data_plane\\\", ' + + 'vdate=\\\"${VDATE}\\\", ' + + 'vhour=\\\"${VHOUR}\\\", ' + + 'fhr_start=\\\"${FHR_START}\\\", ' + + 'fhr_end=\\\"${FHR_END}\\\", ' + + 'fhr_incr=\\\"${FHR_INCR}\\\", ' + + 'model=\\\"${MODELNAME}\\\", ' + + f'njob=\\\"{njob}\\\"' + + ')\"' + ) + job_cmd_list_iterative.append( + f'python ' + + f'{USHevs}/{COMPONENT}/' + + f'{COMPONENT}_{STEP}_{VERIF_CASE}_create_merged_ptype.py' + ) + job_cmd_list_iterative.append( + f'python -c ' + + '\"import mesoscale_util as cutil; cutil.copy_data_to_restart(' + + '\\\"${DATA}\\\", \\\"${RESTART_DIR}\\\", ' + + f'njob=\\\"{njob}\\\", ' + + 'verif_case=\\\"${VERIF_CASE}\\\", ' + + 'verif_type=\\\"${VERIF_TYPE}\\\", ' + + 'vx_mask=\\\"${NEST}\\\", ' + + 'met_tool=\\\"merged_ptype\\\", ' + + 'vdate=\\\"${VDATE}\\\", ' + + 'vhour=\\\"${VHOUR}\\\", ' + + 'fhr_start=\\\"${FHR_START}\\\", ' + + 'fhr_end=\\\"${FHR_END}\\\", ' + + 'fhr_incr=\\\"${FHR_INCR}\\\", ' + + 'model=\\\"${MODELNAME}\\\", ' + + f'njob=\\\"{njob}\\\"' + + ')\"' + ) + job_cmd_list_iterative.append( + f'{metplus_launcher} -c {machine_conf} ' + + f'-c {MET_PLUS_CONF}/' + + f'PointStat_fcst{COMPONENT.upper()}_' + + f'obs{VERIF_TYPE.upper()}_{VAR_NAME}.conf' + ) + job_cmd_list_iterative.append( + f'python -c ' + + '\"import mesoscale_util as cutil; cutil.copy_data_to_restart(' + + '\\\"${DATA}\\\", \\\"${RESTART_DIR}\\\", ' + + f'njob=\\\"{njob}\\\", ' + + 'verif_case=\\\"${VERIF_CASE}\\\", ' + + 'verif_type=\\\"${VERIF_TYPE}\\\", ' + + 'vx_mask=\\\"${NEST}\\\", ' + + 'met_tool=\\\"point_stat\\\", ' + + 'vdate=\\\"${VDATE}\\\", ' + + 'vhour=\\\"${VHOUR}\\\", ' + + 'fhr_start=\\\"${FHR_START}\\\", ' + + 'fhr_end=\\\"${FHR_END}\\\", ' + + 'fhr_incr=\\\"${FHR_INCR}\\\", ' + + 'model=\\\"${MODELNAME}\\\", ' + + 'var_name=\\\"${VAR_NAME}\\\"' + + ')\"' + ) + job_cmd_list_iterative.append( + "python -c " + + f"'import mesoscale_util; mesoscale_util.mark_job_completed(" + + f"\"{os.path.join(RESTART_DIR, COMPLETED_JOBS_FILE)}\", " + + f"\"job{njob}\", job_type=\"{job_type}\")'" + ) + else: pstat_file_exist = cutil.check_pstat_files(job_env_vars_dict) if pstat_file_exist: print(f"skip this run, pstat already exist") else: + if f'{job_type}_job{njob}' in cutil.get_completed_jobs(os.path.join(RESTART_DIR, COMPLETED_JOBS_FILE)): + job_cmd_list_iterative.append( + f'#jobs were restarted, and the following has already run successfully' + ) job_cmd_list_iterative.append( - f'{metplus_launcher} -c {machine_conf} ' + f'#{metplus_launcher} -c {machine_conf} ' + f'-c {MET_PLUS_CONF}/' + f'PointStat_fcst{COMPONENT.upper()}_obs{VERIF_TYPE.upper()}.conf' ) + job_cmd_list_iterative.append( + f'#python -c ' + + '\"import mesoscale_util as cutil; cutil.copy_data_to_restart(' + + '\\\"${DATA}\\\", \\\"${RESTART_DIR}\\\", ' + + f'njob=\\\"{njob}\\\", ' + + 'verif_case=\\\"${VERIF_CASE}\\\", ' + + 'verif_type=\\\"${VERIF_TYPE}\\\", ' + + 'vx_mask=\\\"${NEST}\\\", ' + + 'met_tool=\\\"point_stat\\\", ' + + 'vdate=\\\"${VDATE}\\\", ' + + 'vhour=\\\"${VHOUR}\\\", ' + + 'fhr_start=\\\"${FHR_START}\\\", ' + + 'fhr_end=\\\"${FHR_END}\\\", ' + + 'fhr_incr=\\\"${FHR_INCR}\\\", ' + + 'model=\\\"${MODELNAME}\\\", ' + + 'var_name=\\\"${VAR_NAME}\\\"' + + ')\"' + ) + else: + job_cmd_list_iterative.append( + f'{metplus_launcher} -c {machine_conf} ' + + f'-c {MET_PLUS_CONF}/' + + f'PointStat_fcst{COMPONENT.upper()}_' + + f'obs{VERIF_TYPE.upper()}.conf' + ) + job_cmd_list_iterative.append( + f'python -c ' + + '\"import mesoscale_util as cutil; cutil.copy_data_to_restart(' + + '\\\"${DATA}\\\", \\\"${RESTART_DIR}\\\", ' + + f'njob=\\\"{njob}\\\", ' + + 'verif_case=\\\"${VERIF_CASE}\\\", ' + + 'verif_type=\\\"${VERIF_TYPE}\\\", ' + + 'vx_mask=\\\"${NEST}\\\", ' + + 'met_tool=\\\"point_stat\\\", ' + + 'vdate=\\\"${VDATE}\\\", ' + + 'vhour=\\\"${VHOUR}\\\", ' + + 'fhr_start=\\\"${FHR_START}\\\", ' + + 'fhr_end=\\\"${FHR_END}\\\", ' + + 'fhr_incr=\\\"${FHR_INCR}\\\", ' + + 'model=\\\"${MODELNAME}\\\", ' + + 'var_name=\\\"${VAR_NAME}\\\"' + + ')\"' + ) + job_cmd_list_iterative.append( + "python -c " + + f"'import mesoscale_util; mesoscale_util.mark_job_completed(" + + f"\"{os.path.join(RESTART_DIR, COMPLETED_JOBS_FILE)}\", " + + f"\"job{njob}\", job_type=\"{job_type}\")'" + ) elif job_type == 'gather': + if f'{job_type}_job{njob}' in cutil.get_completed_jobs(os.path.join(RESTART_DIR, COMPLETED_JOBS_FILE)): + job_cmd_list.append( + f'#jobs were restarted, and the following has already run successfully' + ) + job_cmd_list.append( + f'#{metplus_launcher} -c {machine_conf} ' + + f'-c {MET_PLUS_CONF}/' + + f'StatAnalysis_fcst{COMPONENT.upper()}_obs{VERIF_TYPE.upper()}' + + f'_GatherByDay.conf' + ) + job_cmd_list.append( + f'#python -c ' + + '\"import mesoscale_util as cutil; cutil.copy_data_to_restart(' + + '\\\"${DATA}\\\", \\\"${RESTART_DIR}\\\", ' + + f'njob=\\\"{njob}\\\", ' + + 'verif_case=\\\"${VERIF_CASE}\\\", ' + + 'verif_type=\\\"${VERIF_TYPE}\\\", ' + + 'met_tool=\\\"stat_analysis\\\", ' + + 'vdate=\\\"${VDATE}\\\", ' + + 'net=\\\"${NET}\\\", ' + + 'step=\\\"${STEP}\\\", ' + + 'model=\\\"${MODELNAME}\\\", ' + + 'run=\\\"${RUN}\\\", ' + + f'job_type=\\\"{job_type}\\\"' + + ')\"' + ) + else: job_cmd_list.append( f'{metplus_launcher} -c {machine_conf} ' + f'-c {MET_PLUS_CONF}/' + f'StatAnalysis_fcst{COMPONENT.upper()}_obs{VERIF_TYPE.upper()}' + f'_GatherByDay.conf' ) + job_cmd_list.append( + f'python -c ' + + '\"import mesoscale_util as cutil; cutil.copy_data_to_restart(' + + '\\\"${DATA}\\\", \\\"${RESTART_DIR}\\\", ' + + f'njob=\\\"{njob}\\\", ' + + 'verif_case=\\\"${VERIF_CASE}\\\", ' + + 'verif_type=\\\"${VERIF_TYPE}\\\", ' + + 'met_tool=\\\"stat_analysis\\\", ' + + 'vdate=\\\"${VDATE}\\\", ' + + 'net=\\\"${NET}\\\", ' + + 'step=\\\"${STEP}\\\", ' + + 'model=\\\"${MODELNAME}\\\", ' + + 'run=\\\"${RUN}\\\", ' + + f'job_type=\\\"{job_type}\\\"' + + ')\"' + ) + job_cmd_list.append( + "python -c " + + f"'import mesoscale_util; mesoscale_util.mark_job_completed(" + + f"\"{os.path.join(RESTART_DIR, COMPLETED_JOBS_FILE)}\", " + + f"\"job{njob}\", job_type=\"{job_type}\")'" + ) elif job_type == 'gather2': + if f'{job_type}_job{njob}' in cutil.get_completed_jobs(os.path.join(RESTART_DIR, COMPLETED_JOBS_FILE)): + job_cmd_list.append( + f'#jobs were restarted, and the following has already run successfully' + ) + job_cmd_list.append( + f'#{metplus_launcher} -c {machine_conf} ' + + f'-c {MET_PLUS_CONF}/' + + f'StatAnalysis_fcst{COMPONENT.upper()}' + + f'_GatherByCycle.conf' + ) + job_cmd_list.append( + f'#python -c ' + + '\"import mesoscale_util as cutil; cutil.copy_data_to_restart(' + + '\\\"${DATA}\\\", \\\"${RESTART_DIR}\\\", ' + + f'njob=\\\"{njob}\\\", ' + + 'verif_case=\\\"${VERIF_CASE}\\\", ' + + 'met_tool=\\\"stat_analysis\\\", ' + + 'vdate=\\\"${VDATE}\\\", ' + + 'net=\\\"${NET}\\\", ' + + 'step=\\\"${STEP}\\\", ' + + 'model=\\\"${MODELNAME}\\\", ' + + 'run=\\\"${RUN}\\\", ' + + 'vhr=\\\"${vhr}\\\", ' + + f'job_type=\\\"{job_type}\\\"' + + ')\"' + ) + else: job_cmd_list.append( f'{metplus_launcher} -c {machine_conf} ' + f'-c {MET_PLUS_CONF}/' + f'StatAnalysis_fcst{COMPONENT.upper()}' + f'_GatherByCycle.conf' ) + job_cmd_list.append( + f'python -c ' + + '\"import mesoscale_util as cutil; cutil.copy_data_to_restart(' + + '\\\"${DATA}\\\", \\\"${RESTART_DIR}\\\", ' + + f'njob=\\\"{njob}\\\", ' + + 'verif_case=\\\"${VERIF_CASE}\\\", ' + + 'met_tool=\\\"stat_analysis\\\", ' + + 'vdate=\\\"${VDATE}\\\", ' + + 'net=\\\"${NET}\\\", ' + + 'step=\\\"${STEP}\\\", ' + + 'model=\\\"${MODELNAME}\\\", ' + + 'run=\\\"${RUN}\\\", ' + + 'vhr=\\\"${vhr}\\\", ' + + f'job_type=\\\"{job_type}\\\"' + + ')\"' + ) + job_cmd_list.append( + "python -c " + + f"'import mesoscale_util; mesoscale_util.mark_job_completed(" + + f"\"{os.path.join(RESTART_DIR, COMPLETED_JOBS_FILE)}\", " + + f"\"job{njob}\", job_type=\"{job_type}\")'" + ) elif job_type == 'gather3': job_cmd_list.append( f'{metplus_launcher} -c {machine_conf} ' diff --git a/ush/mesoscale/mesoscale_util.py b/ush/mesoscale/mesoscale_util.py index 570eee8e4..a3a306df4 100644 --- a/ush/mesoscale/mesoscale_util.py +++ b/ush/mesoscale/mesoscale_util.py @@ -372,6 +372,360 @@ def format_filler(unfilled_file_format, valid_time_dt, init_time_dt, filled_file_format_chunk) return filled_file_format +def get_completed_jobs(completed_jobs_file): + completed_jobs = set() + if os.path.exists(completed_jobs_file): + with open(completed_jobs_file, 'r') as f: + completed_jobs = set(f.read().splitlines()) + return completed_jobs + +def mark_job_completed(completed_jobs_file, job_name, job_type=""): + with open(completed_jobs_file, 'a') as f: + if job_type: + f.write(job_type + "_" + job_name + "\n") + else: + f.write(job_name + "\n") + +def copy_data_to_restart(data_dir, restart_dir, met_tool=None, net=None, + run=None, step=None, model=None, vdate=None, vhr=None, + verif_case=None, verif_type=None, vx_mask=None, + job_type=None, var_name=None, vhour=None, + fhr_start=None, fhr_end=None, fhr_incr=None, + njob=None, acc=None, nbrhd=None): + sub_dirs_in = [] + sub_dirs_out = [] + copy_files = [] + if met_tool == "ascii2nc": + check_if_none = [ + data_dir, restart_dir, verif_case, verif_type, vx_mask, met_tool, + vdate, vhour + ] + if any([var is None for var in check_if_none]): + e = (f"FATAL ERROR: None encountered as an argument while copying" + + f" {met_tool} METplus output to COMOUT directory.") + raise TypeError(e) + sub_dirs_in.append(os.path.join( + 'METplus_output', + 'workdirs', + 'reformat', + f'job{njob}', + verif_type, + vx_mask, + met_tool, + )) + sub_dirs_out.append(os.path.join( + 'METplus_output', + verif_type, + vx_mask, + met_tool, + )) + copy_files.append(f'{verif_type}.{vdate}{vhour}.nc') + elif met_tool == 'genvxmask': + check_if_none = [ + data_dir, restart_dir, verif_case, verif_type, vx_mask, met_tool, + vdate, vhour, fhr_start, fhr_end, fhr_incr + ] + if any([var is None for var in check_if_none]): + e = (f"FATAL ERROR: None encountered as an argument while copying" + + f" {met_tool} METplus output to COMOUT directory.") + raise TypeError(e) + sub_dirs_in.append(os.path.join( + 'METplus_output', + 'workdirs', + 'reformat', + f'job{njob}', + verif_type, + met_tool, + f'{vx_mask}.{vdate}', + )) + sub_dirs_out.append(os.path.join( + 'METplus_output', + verif_type, + met_tool, + f'{vx_mask}.{vdate}', + )) + for fhr in np.arange(int(fhr_start), int(fhr_end) + int(fhr_incr), int(fhr_incr)): + copy_files.append(f'{vx_mask}_t{vhour}z_f{str(fhr).zfill(2)}.nc') + elif met_tool == 'grid_stat': + if verif_case == "snowfall": + check_if_none = [ + data_dir, restart_dir, verif_case, verif_type, met_tool, + vdate, vhour, fhr_start, fhr_end, fhr_incr, model, var_name, + acc, nbrhd + ] + if any([var is None for var in check_if_none]): + e = (f"FATAL ERROR: None encountered as an argument while copying" + + f" {met_tool} METplus output to COMOUT directory.") + raise TypeError(e) + sub_dirs.append(os.path.join( + 'METplus_output', + verif_type, + met_tool, + f'{model}.{vdate}' + )) + for fhr in np.arange(int(fhr_start), int(fhr_end) + int(fhr_incr), int(fhr_incr)): + copy_files.append( + f'{met_tool}_{model}_{var_name}*{acc}H_{str(verif_type).upper()}_NBRHD{nbrhd}*_' + + f'{str(fhr).zfill(2)}0000L_{vdate}_{vhour}0000V.stat' + ) + else: + check_if_none = [ + data_dir, restart_dir, verif_case, verif_type, met_tool, + vdate, vhour, fhr_start, fhr_end, fhr_incr, model, acc, nbrhd + ] + if any([var is None for var in check_if_none]): + e = (f"FATAL ERROR: None encountered as an argument while copying" + + f" {met_tool} METplus output to COMOUT directory.") + raise TypeError(e) + sub_dirs.append(os.path.join( + 'METplus_output', + verif_type, + met_tool, + f'{model}.{vdate}' + )) + for fhr in np.arange(int(fhr_start), int(fhr_end) + int(fhr_incr), int(fhr_incr)): + copy_files.append( + f'{met_tool}_{model}_*_{acc}H_{str(verif_type).upper()}_NBRHD{nbrhd}*_' + + f'{str(fhr).zfill(2)}0000L_{vdate}_{vhour}0000V.stat' + ) + elif met_tool == 'merged_ptype': + check_if_none = [ + data_dir, restart_dir, verif_case, verif_type, vx_mask, met_tool, + vdate, vhour, fhr_start, fhr_end, fhr_incr, model, njob + ] + if any([var is None for var in check_if_none]): + e = (f"FATAL ERROR: None encountered as an argument while copying" + + f" {met_tool} output to COMOUT directory.") + raise TypeError(e) + sub_dirs_in.append(os.path.join( + 'data', + model, + met_tool, + )) + sub_dirs_out.append(os.path.join( + 'data', + model, + met_tool, + )) + for fhr in np.arange(int(fhr_start), int(fhr_end) + int(fhr_incr), int(fhr_incr)): + vdt = datetime.strptime(f'{vdate}{vhour}', '%Y%m%d%H') + idt = vdt - td(hours=int(fhr)) + idate = idt.strftime('%Y%m%d') + ihour = idt.strftime('%H') + copy_files.append( + f'{met_tool}_{verif_type}_{vx_mask}_job{njob}_' + + f'init{idate}{ihour}_fhr{str(fhr).zfill(2)}.nc' + ) + elif met_tool == 'pb2nc': + check_if_none = [ + data_dir, restart_dir, verif_case, verif_type, vx_mask, met_tool, + vdate, vhour + ] + if any([var is None for var in check_if_none]): + e = (f"FATAL ERROR: None encountered as an argument while copying" + + f" {met_tool} METplus output to COMOUT directory.") + raise TypeError(e) + sub_dirs_in.append(os.path.join( + 'METplus_output', + 'workdirs', + 'reformat', + f'job{njob}', + verif_type, + vx_mask, + met_tool, + )) + sub_dirs_out.append(os.path.join( + 'METplus_output', + verif_type, + vx_mask, + met_tool, + )) + copy_files.append(f'prepbufr.*.{vdate}{vhour}.nc') + elif met_tool == 'pcp_combine': + if verif_case == "snowfall": + check_if_none = [ + data_dir, restart_dir, verif_case, verif_type, vx_mask, met_tool, + vdate, vhour, fhr_start, fhr_end, fhr_incr, model, var_name, acc + ] + if any([var is None for var in check_if_none]): + e = (f"FATAL ERROR: None encountered as an argument while copying" + + f" {met_tool} METplus output to COMOUT directory.") + raise TypeError(e) + for fhr in np.arange(int(fhr_start), int(fhr_end) + int(fhr_incr), int(fhr_incr)): + vdt = datetime.strptime(f'{vdate}{vhour}', '%Y%m%d%H') + idt = vdt - td(hours=int(fhr)) + idate = idt.strftime('%Y%m%d') + ihour = idt.strftime('%H') + sub_dirs.append(os.path.join( + 'METplus_output', + verif_type, + met_tool, + )) + copy_files.append( + f'{model}.{var_name}.init{idate}.t{ihour}z.f{str(fhr).zfill(3)}.a{acc}h.{vx_mask}.nc' + ) + else: + check_if_none = [ + data_dir, restart_dir, verif_case, verif_type, vx_mask, met_tool, + vdate, vhour, fhr_start, fhr_end, fhr_incr, model, acc + ] + if any([var is None for var in check_if_none]): + e = (f"FATAL ERROR: None encountered as an argument while copying" + + f" {met_tool} METplus output to COMOUT directory.") + raise TypeError(e) + for fhr in np.arange(int(fhr_start), int(fhr_end) + int(fhr_incr), int(fhr_incr)): + vdt = datetime.strptime(f'{vdate}{vhour}', '%Y%m%d%H') + idt = vdt - td(hours=int(fhr)) + idate = idt.strftime('%Y%m%d') + ihour = idt.strftime('%H') + sub_dirs_in.append(os.path.join( + 'METplus_output', + 'workdirs', + 'reformat', + f'job{njob}', + verif_type, + met_tool, + )) + sub_dirs_out.append(os.path.join( + 'METplus_output', + verif_type, + met_tool, + )) + copy_files.append( + f'{model}.init{idate}.t{ihour}z.f{str(fhr).zfill(3)}.a{acc}h.{vx_mask}.nc' + ) + elif met_tool == 'point_stat': + check_if_none = [ + data_dir, restart_dir, verif_case, verif_type, vx_mask, met_tool, + vdate, vhour, fhr_start, fhr_end, fhr_incr, model, var_name + ] + if any([var is None for var in check_if_none]): + e = (f"FATAL ERROR: None encountered as an argument while copying" + + f" {met_tool} METplus output to COMOUT directory.") + raise TypeError(e) + sub_dirs_in.append(os.path.join( + 'METplus_output', + 'workdirs', + 'generate', + f'job{njob}', + verif_type, + met_tool, + f'{model}.{vdate}' + )) + sub_dirs_out.append(os.path.join( + 'METplus_output', + verif_type, + met_tool, + f'{model}.{vdate}' + )) + for fhr in np.arange(int(fhr_start), int(fhr_end) + int(fhr_incr), int(fhr_incr)): + copy_files.append( + f'{met_tool}_{model}_{vx_mask}_{var_name}_OBS*_{str(fhr).zfill(2)}0000L_{vdate}_' + + f'{vhour}0000V.stat' + ) + elif met_tool == 'regrid_data_plane': + check_if_none = [ + data_dir, restart_dir, verif_case, verif_type, vx_mask, met_tool, + vdate, vhour, fhr_start, fhr_end, fhr_incr, model, njob + ] + if any([var is None for var in check_if_none]): + e = (f"FATAL ERROR: None encountered as an argument while copying" + + f" {met_tool} METplus output to COMOUT directory.") + raise TypeError(e) + sub_dirs_in.append(os.path.join( + 'METplus_output', + 'workdirs', + 'generate' + f'job{njob}', + verif_type, + met_tool, + f'{model}.{vdate}' + )) + sub_dirs_out.append(os.path.join( + 'METplus_output', + verif_type, + met_tool, + f'{model}.{vdate}' + )) + for fhr in np.arange(int(fhr_start), int(fhr_end) + int(fhr_incr), int(fhr_incr)): + copy_files.append( + f'{met_tool}_{model}_t{vhour}z_{verif_type}_{vx_mask}_job{njob}_' + + f'fhr{str(fhr).zfill(2)}.nc' + ) + elif met_tool == 'stat_analysis': + if job_type == 'gather': + check_if_none = [ + data_dir, restart_dir, verif_case, verif_type, met_tool, vdate, + net, step, model, run + ] + if any([var is None for var in check_if_none]): + e = (f"FATAL ERROR: None encountered as an argument while copying" + + f" {met_tool} METplus output to COMOUT directory.") + raise TypeError(e) + sub_dirs_in.append(os.path.join( + 'METplus_output', + 'workdirs', + 'gather', + f'job{njob}', + 'gather_small', + met_tool, + f'{model}.{vdate}' + )) + sub_dirs_out.append(os.path.join( + 'METplus_output', + 'gather_small', + met_tool, + f'{model}.{vdate}' + )) + copy_files.append( + f'{net}.{step}.{model}.{run}.{verif_case}.{verif_type}' + + f'.v{vdate}.stat' + ) + elif job_type == 'gather2': + check_if_none = [ + data_dir, restart_dir, verif_case, met_tool, vdate, net, step, + model, run, vhr + ] + if any([var is None for var in check_if_none]): + e = (f"FATAL ERROR: None encountered as an argument while copying" + + f" {met_tool} METplus output to COMOUT directory.") + raise TypeError(e) + sub_dirs_in.append(os.path.join( + 'METplus_output', + 'workdirs', + 'gather2', + f'job{njob}', + met_tool, + f'{model}.{vdate}' + )) + sub_dirs_out.append(os.path.join( + 'METplus_output', + met_tool, + f'{model}.{vdate}' + )) + copy_files.append( + f'{net}.{step}.{model}.{run}.{verif_case}.v{vdate}.c{vhr}z.stat' + ) + for s, sub_dir_out in enumerate(sub_dirs_out): + for copy_file in copy_files: + origin_path = os.path.join( + data_dir, verif_case, sub_dirs_in[s], copy_file + ) + dest_path = os.path.join(restart_dir, sub_dir_out) + if not glob.glob(origin_path): + continue + if not os.path.exists(dest_path): + print(f"FATAL ERROR: Could not copy METplus output to COMOUT directory" + + f" {dest_path} because the path does not already exist.") + continue + if len(glob.glob(origin_path)) == len(glob.glob(os.path.join(dest_path, copy_file))): + print(f"Not copying restart files to restart_directory" + + f" {dest_path} because they already exist.") + else: + run_shell_command( + ['cp', '-rpv', origin_path, os.path.join(dest_path,'.')] + ) + def initalize_job_env_dict(): """! This initializes a dictionary of environment variables and their values to be set for the job pulling from environment variables @@ -386,8 +740,8 @@ def initalize_job_env_dict(): 'machine', 'evs_ver', 'HOMEevs', 'FIXevs', 'USHevs', 'DATA', 'NET', 'RUN', 'VERIF_CASE', 'STEP', 'COMPONENT', 'evs_run_mode', 'COMROOT', 'COMIN', 'COMOUT', 'COMOUTsmall', 'COMOUTfinal', 'EVSIN', - 'METPLUS_PATH','LOG_MET_OUTPUT_TO_METPLUS', - 'MET_ROOT', + 'METPLUS_PATH','LOG_MET_OUTPUT_TO_METPLUS', + 'MET_ROOT', 'MET_TMP_DIR', 'MODELNAME', 'JOB_GROUP' ] job_env_dict = {} @@ -827,19 +1181,6 @@ def snowfall_check_model_input_output_files(job_dict): return (all_input_file_exist, input_files_list, \ all_COMOUT_file_exist, COMOUT_files_list, DATA_files_list) - - -def get_completed_jobs(completed_jobs_file): - completed_jobs = set() - if os.path.exists(completed_jobs_file): - with open(completed_jobs_file, 'r') as f: - completed_jobs = set(f.read().splitlines()) - return completed_jobs - -def mark_job_completed(completed_jobs_file, job_name): - with open(completed_jobs_file, 'a') as f: - f.write(job_name + "\n") - # Construct a file name given a template def fname_constructor(template_str, IDATE="YYYYmmdd", IHOUR="HH", VDATE="YYYYmmdd", VHOUR="HH", VDATEHOUR="YYYYmmddHH", @@ -947,3 +1288,383 @@ def preprocess_prepbufr(indir, fname, workdir, outdir, subsets): ) os.chdir(wd) +# Create a list of ccpa file paths +def get_ccpa_qpe_templates(indir, vdates, obs_acc, target_acc, nest, paths=[]): + ''' + indir..... - (str) Input directory for prepbufr file data + vdates.... - (datetime object) List of datetimes used to fill templates + obs_acc... - (str) precip accumulation interval of ccpa files in hours + target_acc - (str) target precip accumulation interval of combined + ccpa files in hours + nest...... - (str) domain used to find ccpa files + paths..... - (list of str) list of paths to append the prepbufr paths to + Default is empty. + ''' + ccpa_paths = [] + for v, vdate in enumerate(vdates): + vh = vdate.strftime('%H') + vd = vdate.strftime('%Y%m%d') + if int(target_acc) == 1: + if int(obs_acc) == 1: + offsets = [0] + else: + raise ValueError(f"obs_acc is not valid: \"{obs_acc}\"") + elif int(target_acc) == 3: + if int(obs_acc) == 1: + offsets = [0, 1, 2] + elif int(obs_acc) == 3: + offsets = [0] + else: + raise ValueError(f"obs_acc is not valid: \"{obs_acc}\"") + elif int(target_acc) == 24: + if int(obs_acc) == 1: + offsets = [ + 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, + 11, 12, 13, 14, 15, 16, 17, 18 , 19, 20, + 21, 22, 23 + ] + elif int(obs_acc) == 3: + offsets = [0, 3, 6, 9, 12, 15, 18, 21] + elif int(obs_acc) == 24: + offsets = [0] + else: + raise ValueError(f"obs_acc is not valid: \"{obs_acc}\"") + else: + raise ValueError(f"target_acc is not valid: \"{target_acc}\"") + for offset in offsets: + use_vdate = vdate - td(hours=int(offset)) + use_vd = use_vdate.strftime('%Y%m%d') + use_vh = use_vdate.strftime('%H') + template = os.path.join( + indir, + 'ccpa.{VDATE}', + 'ccpa.t{VHOUR}z.' + f'{obs_acc}h.hrap.{nest}.gb2' + ) + ccpa_paths.append(fname_constructor( + template, VDATE=use_vd, VHOUR=use_vh + )) + return np.concatenate((paths, np.unique(ccpa_paths))) + +# Create a list of mrms file paths +def get_mrms_qpe_templates(indir, vdates, obs_acc, target_acc, nest, paths=[]): + ''' + indir..... - (str) Input directory for prepbufr file data + vdates.... - (datetime object) List of datetimes used to fill templates + obs_acc... - (str) precip accumulation interval of mrms files in hours + target_acc - (str) target precip accumulation interval of combined + mrms files in hours + nest...... - (str) domain used to find mrms files + paths..... - (list of str) list of paths to append the prepbufr paths to + Default is empty. + ''' + mrms_paths = [] + for v, vdate in enumerate(vdates): + vh = vdate.strftime('%H') + vd = vdate.strftime('%Y%m%d') + if int(target_acc) == 1: + if int(obs_acc) == 1: + offsets = [0] + else: + raise ValueError(f"obs_acc is not valid: \"{obs_acc}\"") + elif int(target_acc) == 3: + if int(obs_acc) == 1: + offsets = [0, 1, 2] + elif int(obs_acc) == 3: + offsets = [0] + else: + raise ValueError(f"obs_acc is not valid: \"{obs_acc}\"") + elif int(target_acc) == 24: + if int(obs_acc) == 1: + offsets = [ + 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, + 11, 12, 13, 14, 15, 16, 17, 18 , 19, 20, + 21, 22, 23 + ] + elif int(obs_acc) == 3: + offsets = [0, 3, 6, 9, 12, 15, 18, 21] + elif int(obs_acc) == 24: + offsets = [0] + else: + raise ValueError(f"obs_acc is not valid: \"{obs_acc}\"") + else: + raise ValueError(f"target_acc is not valid: \"{target_acc}\"") + for offset in offsets: + use_vdate = vdate - td(hours=int(offset)) + use_vd = use_vdate.strftime('%Y%m%d') + use_vh = use_vdate.strftime('%H') + template = os.path.join( + indir, + 'mrms.{VDATE}', + 'mrms.t{VHOUR}z.' + f'{obs_acc}h.{nest}.gb2' + ) + mrms_paths.append(fname_constructor( + template, VDATE=use_vd, VHOUR=use_vh + )) + return np.concatenate((paths, np.unique(mrms_paths))) + +# Create a list of nohrsc file paths +def get_nohrsc_qpe_templates(indir, vdates, obs_acc, target_acc, nest, paths=[]): + ''' + indir..... - (str) Input directory for nohrsc file data + vdates.... - (datetime object) List of datetimes used to fill templates + obs_acc... - (str) snow accumulation interval of nohrsc files in hours + target_acc - (str) target snow accumulation interval of combined + nohrsc files in hours + nest...... - (str) domain used to find nohrsc files + paths..... - (list of str) list of paths to append the nohrsc paths to + Default is empty. + ''' + nohrsc_paths = [] + for v, vdate in enumerate(vdates): + vh = vdate.strftime('%H') + vd = vdate.strftime('%Y%m%d') + if int(target_acc) == 6: + if int(obs_acc) == 6: + offsets = [0] + else: + raise ValueError(f"obs_acc is not valid: \"{obs_acc}\"") + elif int(target_acc) == 24: + if int(obs_acc) == 6: + offsets = [0, 6, 12, 18] + elif int(obs_acc) == 24: + offsets = [0] + else: + raise ValueError(f"obs_acc is not valid: \"{obs_acc}\"") + else: + raise ValueError(f"target_acc is not valid: \"{target_acc}\"") + for offset in offsets: + use_vdate = vdate - td(hours=int(offset)) + use_vd = use_vdate.strftime('%Y%m%d') + use_vh = use_vdate.strftime('%H') + template = os.path.join( + indir, + '{VDATE}', 'wgrbbul', 'nohrsc_snowfall', + f'sfav2_CONUS_{int(obs_acc)}h_' + '{VDATE}{VHOUR}_grid184.grb2' + ) + nohrsc_paths.append(fname_constructor( + template, VDATE=use_vd, VHOUR=use_vh + )) + return np.concatenate((paths, np.unique(nohrsc_paths))) + +# Return a list of missing ccpa files needed to create "target_acc" +def check_ccpa_files(indir, vdate, obs_acc, target_acc, nest): + ''' + indir..... - (str) Input directory for prepbufr file data + vdate..... - (datetime object) datetime used to fill templates + obs_acc... - (str) precip accumulation interval of ccpa files in hours + target_acc - (str) target precip accumulation interval of combined + ccpa files in hours + nest...... - (str) domain used to find ccpa files + ''' + paths = get_ccpa_qpe_templates(indir, [vdate], obs_acc, target_acc, nest) + return [path for path in paths if not os.path.exists(path)] + +# Return a list of missing mrms files needed to create "target_acc" +def check_mrms_files(indir, vdate, obs_acc, target_acc, nest): + ''' + indir..... - (str) Input directory for prepbufr file data + vdate..... - (datetime object) datetime used to fill templates + obs_acc... - (str) precip accumulation interval of mrms files in hours + target_acc - (str) target precip accumulation interval of combined + mrms files in hours + nest...... - (str) domain used to find mrms files + ''' + paths = get_mrms_qpe_templates(indir, [vdate], obs_acc, target_acc, nest) + return [path for path in paths if not os.path.exists(path)] + +# Return a list of missing nohrsc files needed to create "target_acc" +def check_nohrsc_files(indir, vdate, obs_acc, target_acc, nest): + ''' + indir..... - (str) Input directory for prepbufr file data + vdate..... - (datetime object) datetime used to fill templates + obs_acc... - (str) precip accumulation interval of nohrsc files in hours + target_acc - (str) target precip accumulation interval of combined + nohrsc files in hours + nest...... - (str) domain used to find nohrsc files + ''' + paths = get_nohrsc_qpe_templates(indir, [vdate], obs_acc, target_acc, nest) + return [path for path in paths if not os.path.exists(path)] + +# Return the obs accumulation interval needed to create target_acc, based on +# available ccpa files +def get_ccpa_accums(indir, vdate, target_acc, nest): + ''' + indir..... - (str) Input directory for prepbufr file data + vdate..... - (datetime object) datetime used to fill templates + target_acc - (str) target precip accumulation interval of combined + ccpa files in hours + nest...... - (str) domain used to find ccpa files + ''' + if int(target_acc) == 1: + # check 1-h obs + obs_acc = "01" + missing_ccpa = check_ccpa_files(indir, vdate, obs_acc, target_acc, nest) + if missing_ccpa: + return None + else: + return obs_acc + elif int(target_acc) == 3: + # check 3-h obs + obs_acc = "03" + missing_ccpa = check_ccpa_files(indir, vdate, obs_acc, target_acc, nest) + if missing_ccpa: + # check 1-h obs + obs_acc = "01" + missing_ccpa = check_ccpa_files(indir, vdate, obs_acc, target_acc, nest) + if missing_ccpa: + return None + else: + return obs_acc + else: + return obs_acc + elif int(target_acc) == 24: + + # check 24-h obs + obs_acc = "24" + missing_ccpa = check_ccpa_files(indir, vdate, obs_acc, target_acc, nest) + if missing_ccpa: + # check 3-h obs + obs_acc = "03" + missing_ccpa = check_ccpa_files(indir, vdate, obs_acc, target_acc, nest) + if missing_ccpa: + # check 1-h obs + obs_acc = "01" + missing_ccpa = check_ccpa_files(indir, vdate, obs_acc, target_acc, nest) + if missing_ccpa: + return None + else: + return obs_acc + else: + return obs_acc + else: + return obs_acc + else: + raise ValueError(f"Invalid target_acc: \"{target_acc}\"") + +# Return the obs accumulation interval needed to create target_acc, based on +# available mrms files +def get_mrms_accums(indir, vdate, target_acc, nest): + ''' + indir..... - (str) Input directory for prepbufr file data + vdate..... - (datetime object) datetime used to fill templates + target_acc - (str) target precip accumulation interval of combined + mrms files in hours + nest...... - (str) domain used to find mrms files + ''' + if int(target_acc) == 1: + # check 1-h obs + obs_acc = "01" + missing_mrms = check_mrms_files(indir, vdate, obs_acc, target_acc, nest) + if missing_mrms: + return None + else: + return obs_acc + elif int(target_acc) == 3: + # check 3-h obs + obs_acc = "03" + missing_mrms = check_mrms_files(indir, vdate, obs_acc, target_acc, nest) + if missing_mrms: + # check 1-h obs + obs_acc = "01" + missing_mrms = check_mrms_files(indir, vdate, obs_acc, target_acc, nest) + if missing_mrms: + return None + else: + return obs_acc + else: + return obs_acc + elif int(target_acc) == 24: + + # check 24-h obs + obs_acc = "24" + missing_mrms = check_mrms_files(indir, vdate, obs_acc, target_acc, nest) + if missing_mrms: + # check 3-h obs + obs_acc = "03" + missing_mrms = check_mrms_files(indir, vdate, obs_acc, target_acc, nest) + if missing_mrms: + # check 1-h obs + obs_acc = "01" + missing_mrms = check_mrms_files(indir, vdate, obs_acc, target_acc, nest) + if missing_mrms: + return None + else: + return obs_acc + else: + return obs_acc + else: + return obs_acc + else: + raise ValueError(f"Invalid target_acc: \"{target_acc}\"") + +# Return the obs accumulation interval needed to create target_acc, based on +# available nohrsc files +def get_nohrsc_accums(indir, vdate, target_acc, nest): + ''' + indir..... - (str) Input directory for prepbufr file data + vdate..... - (datetime object) datetime used to fill templates + target_acc - (str) target precip accumulation interval of combined + nohrsc files in hours + nest...... - (str) domain used to find nohrsc files + ''' + if int(target_acc) == 6: + # check 6-h obs + obs_acc = "06" + missing_nohrsc = check_nohrsc_files(indir, vdate, obs_acc, target_acc, nest) + if missing_nohrsc: + return None + else: + return obs_acc + elif int(target_acc) == 24: + # check 24-h obs + obs_acc = "24" + missing_nohrsc = check_nohrsc_files(indir, vdate, obs_acc, target_acc, nest) + if missing_nohrsc: + # check 6-h obs + obs_acc = "06" + missing_nohrsc = check_nohrsc_files(indir, vdate, obs_acc, target_acc, nest) + if missing_nohrsc: + return None + else: + return obs_acc + else: + return obs_acc + else: + raise ValueError(f"Invalid target_acc: \"{target_acc}\"") + +# Return the obs accumulation interval needed to create target_acc, based on +# available input files +def get_obs_accums(indir, vdate, target_acc, nest, obsname, job_type='reformat'): + ''' + indir..... - (str) Input directory for obs file data + vdate..... - (datetime object) datetime used to fill templates + target_acc - (str) target precip accumulation interval of combined + obs files in hours + nest...... - (str) domain used to find obs files + obsname... - (str) name of input file dataset + ''' + if obsname == "mrms": + return get_mrms_accums(indir, vdate, target_acc, nest) + elif obsname == "ccpa": + return get_ccpa_accums(indir, vdate, target_acc, nest) + elif obsname == "nohrsc": + return get_nohrsc_accums(indir, vdate, target_acc, nest) + else: + raise ValueError(f"Invalid obsname: \"{obsname}\"") + +# Return availability of obs needed for job +def get_obs_avail(indir, vdate, nest, obsname): + ''' + indir..... - (str) Input directory for obs file data + vdate..... - (datetime object) datetime used to fill templates + nest...... - (str) domain used to find obs files + obsname... - (str) name of input file dataset + ''' + if obsname in ["raob", "metar"]: + paths = get_prepbufr_templates(indir, [vdate], obsname=obsname, already_preprocessed=True) + if paths.size > 0: + return all([os.path.exists(fname) for fname in paths]) + else: + return False + else: + raise ValueError(f"Invalid obsname: \"{obsname}\"")