diff --git a/parm/snow/obs/config/bufr_sfcsno_mapping.yaml b/parm/snow/obs/config/bufr_sfcsno_mapping.yaml index 2fe12f39c..31d042f27 100644 --- a/parm/snow/obs/config/bufr_sfcsno_mapping.yaml +++ b/parm/snow/obs/config/bufr_sfcsno_mapping.yaml @@ -19,6 +19,7 @@ bufr: query: "*/RPID" stationElevation: query: "[*/SELV, */HSMSL]" + type: float # ObsValue totalSnowDepth: diff --git a/sorc/bufr-query b/sorc/bufr-query index 8bc28860d..9e595b5a3 160000 --- a/sorc/bufr-query +++ b/sorc/bufr-query @@ -1 +1 @@ -Subproject commit 8bc28860d562f9a53ffa86e7c5c5ae05c2c4e09f +Subproject commit 9e595b5a3bb91791f7f5ada456298e2b966e74f6 diff --git a/ush/ioda/bufr2ioda/bufr2ioda_sfcsno_bufr_encoder.py b/ush/ioda/bufr2ioda/bufr2ioda_sfcsno_bufr_encoder.py deleted file mode 100644 index 6c75df0ae..000000000 --- a/ush/ioda/bufr2ioda/bufr2ioda_sfcsno_bufr_encoder.py +++ /dev/null @@ -1,44 +0,0 @@ -import numpy as np -import bufr -from pyioda.ioda.Engines.Bufr import Encoder - - -def mask_container(container, mask): - new_container = bufr.DataContainer() - for var_name in container.list(): - var = container.get(var_name) - paths = container.get_paths(var_name) - new_container.add(var_name, var[mask], paths) - - return new_container - - -def create_obs_group(input_path, mapping_path): - """Create the ioda snow observations - This method: - - reads state of ground (sogr) and snow depth (snod) - - applys sogr conditions to the missing snod values - - removes the filled/missing snow values and creates the masked container - - encoders the new container. - - Parameters - ---------- - input_path: The input bufr file - mapping_path: The input bufr2ioda mapping file - - """ - - YAML_PATH = mapping_path - container = bufr.Parser(input_path, YAML_PATH).parse() - - sogr = container.get('variables/groundState') - snod = container.get('variables/totalSnowDepth') - snod[(sogr <= 11.0) & snod.mask] = 0.0 - snod[(sogr == 15.0) & snod.mask] = 0.0 - container.replace('variables/totalSnowDepth', snod) - - masked_container = mask_container(container, (~snod.mask)) - - data = next(iter(Encoder(YAML_PATH).encode(masked_container).values())) - - return data diff --git a/ush/ioda/bufr2ioda/bufr_sfcsno.py b/ush/ioda/bufr2ioda/bufr_sfcsno.py new file mode 100755 index 000000000..03a9102b4 --- /dev/null +++ b/ush/ioda/bufr2ioda/bufr_sfcsno.py @@ -0,0 +1,189 @@ +#!/usr/bin/env python3 +import sys +import os +import argparse +import time +import numpy as np +import bufr +from pyioda.ioda.Engines.Bufr import Encoder as iodaEncoder +from bufr.encoders.netcdf import Encoder as netcdfEncoder +from wxflow import Logger + +# Initialize Logger +# Get log level from the environment variable, default to 'INFO it not set +log_level = os.getenv('LOG_LEVEL', 'INFO') +logger = Logger('BUFR2IODA_sfcsno.py', level=log_level, colored_log=False) + + +def logging(comm, level, message): + """ + Logs a message to the console or log file, based on the specified logging level. + + This function ensures that logging is only performed by the root process (`rank 0`) + in a distributed computing environment. The function maps the logging level to + appropriate logger methods and defaults to the 'INFO' level if an invalid level is provided. + + Parameters: + comm: object + The communicator object, typically from a distributed computing framework + (e.g., MPI). It must have a `rank()` method to determine the process rank. + level: str + The logging level as a string. Supported levels are: + - 'DEBUG' + - 'INFO' + - 'WARNING' + - 'ERROR' + - 'CRITICAL' + If an invalid level is provided, a warning will be logged, and the level + will default to 'INFO'. + message: str + The message to be logged. + + Behavior: + - Logs messages only on the root process (`comm.rank() == 0`). + - Maps the provided logging level to a method of the logger object. + - Defaults to 'INFO' and logs a warning if an invalid logging level is given. + - Supports standard logging levels for granular control over log verbosity. + + Example: + >>> logging(comm, 'DEBUG', 'This is a debug message.') + >>> logging(comm, 'ERROR', 'An error occurred!') + + Notes: + - Ensure that a global `logger` object is configured before using this function. + - The `comm` object should conform to MPI-like conventions (e.g., `rank()` method). + """ + + if comm.rank() == 0: + # Define a dictionary to map levels to logger methods + log_methods = { + 'DEBUG': logger.debug, + 'INFO': logger.info, + 'WARNING': logger.warning, + 'ERROR': logger.error, + 'CRITICAL': logger.critical, + } + + # Get the appropriate logging method, default to 'INFO' + log_method = log_methods.get(level.upper(), logger.info) + + if log_method == logger.info and level.upper() not in log_methods: + # Log a warning if the level is invalid + logger.warning(f'log level = {level}: not a valid level --> set to INFO') + + # Call the logging method + log_method(message) + + +def _mask_container(container, mask): + + new_container = bufr.DataContainer() + for var_name in container.list(): + var = container.get(var_name) + paths = container.get_paths(var_name) + new_container.add(var_name, var[mask], paths) + + return new_container + + +def _make_description(mapping_path, update=False): + + description = bufr.encoders.Description(mapping_path) + + return description + + +def _make_obs(comm, input_path, mapping_path): + """ + Create the ioda snow depth observations: + - reads state of ground (sogr) and snow depth (snod) + - applys sogr conditions to the missing snod values + - removes the filled/missing snow values and creates the masked container + + Parameters + ---------- + comm: object + The communicator object (e.g., MPI) + input_path: str + The input bufr file + mapping_path: str + The input bufr2ioda mapping file + """ + + # Get container from mapping file first + logging(comm, 'INFO', 'Get container from bufr') + container = bufr.Parser(input_path, mapping_path).parse(comm) + + logging(comm, 'DEBUG', f'container list (original): {container.list()}') + + # Add new/derived data into container + sogr = container.get('variables/groundState') + snod = container.get('variables/totalSnowDepth') + snod[(sogr <= 11.0) & snod.mask] = 0.0 + snod[(sogr == 15.0) & snod.mask] = 0.0 + container.replace('variables/totalSnowDepth', snod) + snod_upd = container.get('variables/totalSnowDepth') + + masked_container = _mask_container(container, (~snod.mask)) + + return masked_container + + +def create_obs_group(input_path, mapping_path, env): + + comm = bufr.mpi.Comm(env["comm_name"]) + + description = _make_description(mapping_path, update=False) + container = _make_obs(comm, input_path, mapping_path) + + # Gather data from all tasks into all tasks. Each task will have the complete record + logging(comm, 'INFO', f'Gather data from all tasks into all tasks') + container.all_gather(comm) + + # Encode the data + logging(comm, 'INFO', f'Encode data') + data = next(iter(iodaEncoder(mapping_path).encode(container).values())) + + logging(comm, 'INFO', f'Return the encoded data') + + return data + + +def create_obs_file(input_path, mapping_path, output_path): + + comm = bufr.mpi.Comm("world") + container = _make_obs(comm, input_path, mapping_path) + container.gather(comm) + + description = _make_description(mapping_path, update=False) + + # Encode the data + if comm.rank() == 0: + netcdfEncoder(description).encode(container, output_path) + + logging(comm, 'INFO', f'Return the encoded data') + + +if __name__ == '__main__': + + start_time = time.time() + + bufr.mpi.App(sys.argv) + comm = bufr.mpi.Comm("world") + + # Required input arguments as positional arguments + parser = argparse.ArgumentParser(description="Convert BUFR to NetCDF using a mapping file.") + parser.add_argument('input', type=str, help='Input BUFR file') + parser.add_argument('mapping', type=str, help='BUFR2IODA Mapping File') + parser.add_argument('output', type=str, help='Output NetCDF file') + + args = parser.parse_args() + mapping = args.mapping + infile = args.input + output = args.output + + create_obs_file(infile, mapping, output) + + end_time = time.time() + running_time = end_time - start_time + logging(comm, 'INFO', f'Total running time: {running_time}')