-
Notifications
You must be signed in to change notification settings - Fork 32
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Update the scripts and sorc/bufr-query.
- Loading branch information
1 parent
d7bf5d7
commit 3c1dbf7
Showing
4 changed files
with
191 additions
and
45 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Submodule bufr-query
updated
5 files
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,189 @@ | ||
#!/usr/bin/env python3 | ||
import sys | ||
import os | ||
import argparse | ||
import time | ||
import numpy as np | ||
import bufr | ||
from pyioda.ioda.Engines.Bufr import Encoder as iodaEncoder | ||
from bufr.encoders.netcdf import Encoder as netcdfEncoder | ||
from wxflow import Logger | ||
|
||
# Initialize Logger | ||
# Get log level from the environment variable, default to 'INFO it not set | ||
log_level = os.getenv('LOG_LEVEL', 'INFO') | ||
logger = Logger('BUFR2IODA_sfcsno.py', level=log_level, colored_log=False) | ||
|
||
|
||
def logging(comm, level, message): | ||
""" | ||
Logs a message to the console or log file, based on the specified logging level. | ||
This function ensures that logging is only performed by the root process (`rank 0`) | ||
in a distributed computing environment. The function maps the logging level to | ||
appropriate logger methods and defaults to the 'INFO' level if an invalid level is provided. | ||
Parameters: | ||
comm: object | ||
The communicator object, typically from a distributed computing framework | ||
(e.g., MPI). It must have a `rank()` method to determine the process rank. | ||
level: str | ||
The logging level as a string. Supported levels are: | ||
- 'DEBUG' | ||
- 'INFO' | ||
- 'WARNING' | ||
- 'ERROR' | ||
- 'CRITICAL' | ||
If an invalid level is provided, a warning will be logged, and the level | ||
will default to 'INFO'. | ||
message: str | ||
The message to be logged. | ||
Behavior: | ||
- Logs messages only on the root process (`comm.rank() == 0`). | ||
- Maps the provided logging level to a method of the logger object. | ||
- Defaults to 'INFO' and logs a warning if an invalid logging level is given. | ||
- Supports standard logging levels for granular control over log verbosity. | ||
Example: | ||
>>> logging(comm, 'DEBUG', 'This is a debug message.') | ||
>>> logging(comm, 'ERROR', 'An error occurred!') | ||
Notes: | ||
- Ensure that a global `logger` object is configured before using this function. | ||
- The `comm` object should conform to MPI-like conventions (e.g., `rank()` method). | ||
""" | ||
|
||
if comm.rank() == 0: | ||
# Define a dictionary to map levels to logger methods | ||
log_methods = { | ||
'DEBUG': logger.debug, | ||
'INFO': logger.info, | ||
'WARNING': logger.warning, | ||
'ERROR': logger.error, | ||
'CRITICAL': logger.critical, | ||
} | ||
|
||
# Get the appropriate logging method, default to 'INFO' | ||
log_method = log_methods.get(level.upper(), logger.info) | ||
|
||
if log_method == logger.info and level.upper() not in log_methods: | ||
# Log a warning if the level is invalid | ||
logger.warning(f'log level = {level}: not a valid level --> set to INFO') | ||
|
||
# Call the logging method | ||
log_method(message) | ||
|
||
|
||
def _mask_container(container, mask): | ||
|
||
new_container = bufr.DataContainer() | ||
for var_name in container.list(): | ||
var = container.get(var_name) | ||
paths = container.get_paths(var_name) | ||
new_container.add(var_name, var[mask], paths) | ||
|
||
return new_container | ||
|
||
|
||
def _make_description(mapping_path, update=False): | ||
|
||
description = bufr.encoders.Description(mapping_path) | ||
|
||
return description | ||
|
||
|
||
def _make_obs(comm, input_path, mapping_path): | ||
""" | ||
Create the ioda snow depth observations: | ||
- reads state of ground (sogr) and snow depth (snod) | ||
- applys sogr conditions to the missing snod values | ||
- removes the filled/missing snow values and creates the masked container | ||
Parameters | ||
---------- | ||
comm: object | ||
The communicator object (e.g., MPI) | ||
input_path: str | ||
The input bufr file | ||
mapping_path: str | ||
The input bufr2ioda mapping file | ||
""" | ||
|
||
# Get container from mapping file first | ||
logging(comm, 'INFO', 'Get container from bufr') | ||
container = bufr.Parser(input_path, mapping_path).parse(comm) | ||
|
||
logging(comm, 'DEBUG', f'container list (original): {container.list()}') | ||
|
||
# Add new/derived data into container | ||
sogr = container.get('variables/groundState') | ||
snod = container.get('variables/totalSnowDepth') | ||
snod[(sogr <= 11.0) & snod.mask] = 0.0 | ||
snod[(sogr == 15.0) & snod.mask] = 0.0 | ||
container.replace('variables/totalSnowDepth', snod) | ||
snod_upd = container.get('variables/totalSnowDepth') | ||
|
||
masked_container = _mask_container(container, (~snod.mask)) | ||
|
||
return masked_container | ||
|
||
|
||
def create_obs_group(input_path, mapping_path, env): | ||
|
||
comm = bufr.mpi.Comm(env["comm_name"]) | ||
|
||
description = _make_description(mapping_path, update=False) | ||
container = _make_obs(comm, input_path, mapping_path) | ||
|
||
# Gather data from all tasks into all tasks. Each task will have the complete record | ||
logging(comm, 'INFO', f'Gather data from all tasks into all tasks') | ||
container.all_gather(comm) | ||
|
||
# Encode the data | ||
logging(comm, 'INFO', f'Encode data') | ||
data = next(iter(iodaEncoder(mapping_path).encode(container).values())) | ||
|
||
logging(comm, 'INFO', f'Return the encoded data') | ||
|
||
return data | ||
|
||
|
||
def create_obs_file(input_path, mapping_path, output_path): | ||
|
||
comm = bufr.mpi.Comm("world") | ||
container = _make_obs(comm, input_path, mapping_path) | ||
container.gather(comm) | ||
|
||
description = _make_description(mapping_path, update=False) | ||
|
||
# Encode the data | ||
if comm.rank() == 0: | ||
netcdfEncoder(description).encode(container, output_path) | ||
|
||
logging(comm, 'INFO', f'Return the encoded data') | ||
|
||
|
||
if __name__ == '__main__': | ||
|
||
start_time = time.time() | ||
|
||
bufr.mpi.App(sys.argv) | ||
comm = bufr.mpi.Comm("world") | ||
|
||
# Required input arguments as positional arguments | ||
parser = argparse.ArgumentParser(description="Convert BUFR to NetCDF using a mapping file.") | ||
parser.add_argument('input', type=str, help='Input BUFR file') | ||
parser.add_argument('mapping', type=str, help='BUFR2IODA Mapping File') | ||
parser.add_argument('output', type=str, help='Output NetCDF file') | ||
|
||
args = parser.parse_args() | ||
mapping = args.mapping | ||
infile = args.input | ||
output = args.output | ||
|
||
create_obs_file(infile, mapping, output) | ||
|
||
end_time = time.time() | ||
running_time = end_time - start_time | ||
logging(comm, 'INFO', f'Total running time: {running_time}') |