Skip to content

Commit

Permalink
Add asset to create catalogs
Browse files Browse the repository at this point in the history
  • Loading branch information
BenGalewsky committed Jan 6, 2025
1 parent 9f4c6b1 commit 5b7c337
Show file tree
Hide file tree
Showing 4 changed files with 2,390 additions and 5 deletions.
136 changes: 134 additions & 2 deletions downscaled_climate_data/assets/loca2.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
from tempfile import TemporaryDirectory

import intake_esm.cat
import requests
from dagster import AssetExecutionContext, Config, asset, EnvVar, AssetIn
from dagster_aws.s3 import S3Resource
import s3fs
import xarray as xr
from dagster import AssetExecutionContext, AssetIn, Config, EnvVar, asset
from dagster_aws.s3 import S3Resource

import downscaled_climate_data

Expand Down Expand Up @@ -98,3 +101,132 @@ def loca2_zarr(context,

# Close the dataset to free memory
ds.close()


class ESMCatalogConfig(Config):
data_format: str = "netcdf"
id: str = "loca2_raw_netcdf_monthly_esm_catalog"
description: str = "LOCA2 raw data catalog"

def is_zarr(self):
return self.data_format == "zarr"


def parse_key(relative_path: str, bucket: str, full_key: str) -> dict[str, str]:
# Split the relative path into parts
# Filter out empty strings that occur when there are consecutive slashes
path_parts = [part for part in relative_path.split('/') if part]

model = path_parts[0]
scheme = path_parts[1]

file_parts = path_parts[-1].split('.')
variable = file_parts[0]
experiment_id = file_parts[3]
time_range = file_parts[4]

uri = f"s3://{bucket}/{full_key}"
return {
"variable": variable,
"model": model,
"scheme": scheme,
"experiment_id": experiment_id,
"time_range": time_range,
"path": uri
}


@asset(
name="loca2_esm_catalog",
group_name="loca2",
description="Generate an Intake-ESM Catalog for LOCA2 datasets",
code_version=downscaled_climate_data.__version__)
def loca2_esm_catalog(context: AssetExecutionContext,
config: ESMCatalogConfig,
s3: S3Resource):

bucket = EnvVar("LOCA2_BUCKET").get_value()

if config.is_zarr():
prefix = EnvVar("LOCA2_ZARR_PATH_ROOT").get_value()
else:
prefix = EnvVar("LOCA2_RAW_PATH_ROOT").get_value()

catalog_metadata = intake_esm.cat.ESMCatalogModel(
esmcat_version="0.1.0",
id=config.id,
description=config.description,
catalog_file=f"s3://{bucket}/{config.id}.csv",
attributes=[
intake_esm.cat.Attribute(column_name="variable"),
intake_esm.cat.Attribute(column_name="model"),
intake_esm.cat.Attribute(column_name="scheme"),
intake_esm.cat.Attribute(column_name="experiment_id"),
intake_esm.cat.Attribute(column_name="time_range"),
intake_esm.cat.Attribute(column_name="path")
],
assets=intake_esm.cat.Assets(
column_name='path',
format=intake_esm.cat.DataFormat.zarr
if config.is_zarr() else intake_esm.cat.DataFormat.netcdf,
)
)
context.log.info(catalog_metadata.model_dump_json())

s3_client = s3.get_client()
paginator = s3_client.get_paginator('list_objects_v2')
pages = paginator.paginate(Bucket=bucket, Prefix=prefix)

# We use a set to keep track of unique keys since the zarr keys are only
# directories with many files in them, so they show up as a number of
# keys in the S3 bucket
keys = set()

for page in pages:
if 'Contents' in page:
for obj in page['Contents']:
full_key = obj['Key']

# If we are cataloging Zarr stores, we need to identify the directory
# that holds all the files for a single dataset. These directories end
# with "cent.zarr" and are the base path for the dataset
if config.is_zarr():
if "monthly.cent.zarr" in full_key:
base_path = (full_key.rsplit("monthly.cent.zarr", 1)[0]
+ "monthly.cent.zarr")
elif "cent.monthly.zarr" in full_key:
base_path = (full_key.rsplit("cent.monthly.zarr", 1)[0]
+ "cent.monthly.zarr")
else:
base_path = full_key
keys.add(base_path)

context.log.info(f"Found {len(keys)} unique base paths")

with TemporaryDirectory() as temp_dir:
collection_spec_path = f"{temp_dir}/{config.id}.json"
with open(collection_spec_path, "w") as json_file:
json_file.write(catalog_metadata.model_dump_json(indent=4))

catalog_path = f"{temp_dir}/{config.id}.csv"

with open(catalog_path, 'w') as f:
f.write("variable,model,scheme,experiment_id,time_range,path\n")

# Now that we have the unique base paths, we can write the catalog
for full_key in keys:
relative_path = full_key[len(prefix):] if full_key.startswith(prefix) \
else full_key
try:
parsed = parse_key(relative_path, bucket, full_key)
f.write(f"{parsed['variable']},{parsed['model']},{parsed['scheme']},{parsed['experiment_id']},{parsed['time_range']},{parsed['path']}\n") # NOQA E501
except IndexError as e:
context.log.error(f"Error processing {full_key}: {e}")

s3_client.upload_file(Filename=catalog_path,
Bucket=bucket,
Key=f"{config.id}.csv")

s3_client.upload_file(Filename=collection_spec_path,
Bucket=bucket,
Key=f"{config.id}.json")
6 changes: 3 additions & 3 deletions downscaled_climate_data/definitions.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from dagster import Definitions, EnvVar
from dagster_aws.s3 import S3Resource

from downscaled_climate_data.assets.loca2 import loca2_zarr
from downscaled_climate_data.assets.loca2 import loca2_raw_netcdf
from downscaled_climate_data.assets.loca2 import loca2_zarr, loca2_raw_netcdf
from downscaled_climate_data.assets.loca2 import loca2_esm_catalog
from downscaled_climate_data.sensors.loca2_models import Loca2Models
from downscaled_climate_data.sensors.loca2_sensor import (loca2_sensor_monthly_pr,
loca2_sensor_monthly_tasmin,
Expand All @@ -13,7 +13,7 @@
loca2_sensor_tasmin)

defs = Definitions(
assets=[loca2_raw_netcdf, loca2_zarr],
assets=[loca2_raw_netcdf, loca2_zarr, loca2_esm_catalog],
sensors=[loca2_sensor_tasmax,
loca2_sensor_tasmin,
loca2_sensor_pr,
Expand Down
Loading

0 comments on commit 5b7c337

Please sign in to comment.