From 7d42715759a63bc88e71612490b8786d5ba610f7 Mon Sep 17 00:00:00 2001 From: ritvik Date: Mon, 30 Dec 2024 12:17:07 -0500 Subject: [PATCH] updates --- geoprepare/extract/extract_EO.py | 445 +++++++++++++++++++++---------- 1 file changed, 310 insertions(+), 135 deletions(-) diff --git a/geoprepare/extract/extract_EO.py b/geoprepare/extract/extract_EO.py index 02eddb3..9b8eeb4 100644 --- a/geoprepare/extract/extract_EO.py +++ b/geoprepare/extract/extract_EO.py @@ -95,13 +95,6 @@ def get_var_fname(params, var, year, doy): return None -def build_output_path(dir_output, region, region_id, year, var, crop): - """ - Generates the output path for a CSV file. - """ - return dir_output / f"{region_id}_{region}_{year}_{var}_{crop}.csv" - - def get_month_and_day_from_day_of_year(year, day_of_year): # Create an arrow object for the first day of the given year start_of_year = ar.get(f"{year}-01-01") @@ -119,14 +112,268 @@ def get_month_and_day_from_day_of_year(year, day_of_year): # ======================== # CORE PROCESSING # ======================== -def process(val): +# --------------------------------------------------------------------- +# 1. Utility & helper functions +# --------------------------------------------------------------------- +def validate_scale(scale: str) -> None: + """ + Ensure scale is either 'admin_1' or 'admin_2'. + """ + if scale not in ["admin_1", "admin_2"]: + raise ValueError(f"Scale {scale} is not valid. Must be 'admin_1' or 'admin_2'.") + + +def skip_chirps_gefs(var: str, year: int) -> bool: + """ + Determine if we should skip 'chirps_gefs' if the input year != current year. + """ + current_year = ar.utcnow().year + + return (var == "chirps_gefs") and (year != current_year) + + +def get_admin_fields(scale: str) -> tuple[str, str]: + """ + Return the admin name and ID fields based on the scale. + """ + if scale == "admin_1": + return "ADM1_NAME", "ADM_ID" + else: + return "ADM2_NAME", "ADM_ID" + + +def prepare_output_directory(params, country: str, scale: str, crop: str, var: str) -> Path: + """ + Create (if needed) and return the output directory path for final CSV outputs. + """ + threshold = params.parser.getboolean(country, "threshold") + limit = utils.crop_mask_limit(params, country, threshold) + + dir_crop_inputs = Path(f"crop_t{limit}") if threshold else Path(f"crop_p{limit}") + # e.g. /some/path/Output/crop_t20/angola/admin_1/cr/ndvi + dir_output = params.dir_output / dir_crop_inputs / country / scale / crop / var + os.makedirs(dir_output, exist_ok=True) + + return dir_output + + +def build_output_path(dir_output: Path, region: str, region_id: str, year: int, var: str, crop: str) -> Path: + """ + Construct the filename for the CSV output. + Adjust this logic to match your desired output file naming convention. + """ + filename = f"{region_id}_{region}_{year}_{var}_{crop}.csv" + + return dir_output / filename + + +def build_existing_rows(path_output: Path) -> list[list[str]]: + """ + If a CSV file already exists, read its rows (skipping the header) for partial data usage. + """ + if not path_output.is_file(): + return [] + with path_output.open() as f: + reader = csv.reader(f) + rows = list(reader) + return rows[1:] if len(rows) > 1 else [] + + +def should_use_partial_file(params, year: int, current_year: int) -> bool: + """ + Decide if we should use partial file data when re-processing. + """ + # We do so if year == current_year, or if year < current_year and we are not redoing everything. + return (current_year == year) or (current_year > year and not params.redo) + + +def get_end_doy(year: int) -> int: + """ + Return 367 if leap year, else 366. + """ + return 367 if calendar.isleap(year) else 366 + + +def get_default_empty_str(country: str, region: str, region_id: str, date_part: str) -> str: + """ + Return a CSV line with NaN placeholders if no data was found. + """ + # You mentioned 6 numeric columns: mean, counts, etc. + empty_values = ",".join([str(np.NaN)] * 6) + return f"{country},{region},{region_id},{date_part},{empty_values}" + + +def read_or_skip_existing_day(doy: int, existing_rows: list[list[str]]) -> str | None: + """ + If partial file has data for this day-of-year (index = doy-1), return it. Otherwise None. + """ + try: + # The 6th position (index=5) is the first numeric column in your CSV. + if existing_rows[doy - 1][5] != "": + return ",".join(existing_rows[doy - 1]) + except IndexError: + pass + return None + + +def extract_stats(geometry, var: str, fl_var: Path, afi_file, limit: int) -> str: + """ + Calls geom_extract(...) and returns a comma-separated string of the extracted results. + If extraction fails/returns no data, return "". + """ + from .stats import geom_extract + + val_extracted = geom_extract( + geometry, + var, + fl_var, + stats_out=["mean", "counts"], + afi=afi_file, + afi_thresh=limit * 100, + thresh_type="Fixed" + ) + + if not val_extracted: + return "" + + # Combine "stats" and "counts" + values = list(val_extracted["stats"].values()) + list(val_extracted["counts"].values()) + + return ",".join(map(str, values)) + + +def write_daily_stats_to_csv(daily_stats: list[str], path_output: Path, var: str) -> None: + """ + If daily_stats is non-empty, insert a header and write to CSV. + """ + if not daily_stats: + return + + # Example CSV header with 6 numeric columns for your var + counts + header = ( + f"country,region,region_id,year,doy,{var},num_pixels," + "average_crop_percentage,median_crop_percentage," + "min_crop_percentage,max_crop_percentage" + ) + daily_stats.insert(0, header) + + df_result = pd.read_csv(io.StringIO("\n".join(daily_stats))) + df_result.to_csv(path_output, index=False) + + +# --------------------------------------------------------------------- +# 2. Special-case function for chirps_gefs +# --------------------------------------------------------------------- +def process_chirps_gefs( + daily_stats: list[str], + row, + afi_file, + limit: int, + country: str, + region: str, + region_id: str, + var: str, + year: int, + params +) -> None: + """ + Handle the special chirps_gefs logic: from start_date to end_date (a ~15-day window), + either extract stats or fill with NaN if the file doesn't exist. + """ + start_date = ar.utcnow().date().timetuple().tm_yday + end_date = ar.utcnow().shift(days=+15).date().timetuple().tm_yday + + for jd in range(start_date, end_date): + date_part = f"{ar.utcnow().year},{jd}" + empty_str = get_default_empty_str(country, region, region_id, date_part) + + # Suppose you have a function that knows how to build the filename: + # get_var_fname(params, var, year, jd) -> str + fname = get_var_fname(params, var, year, jd) + fl_var = params.dir_interim / var / fname + + if not os.path.isfile(fl_var): + daily_stats.append(empty_str) + continue + + values_str = extract_stats(row["geometry"], var, fl_var, afi_file, limit) + if values_str: + daily_stats.append(f"{country},{region},{region_id},{date_part},{values_str}") + else: + daily_stats.append(empty_str) + + +# --------------------------------------------------------------------- +# 3. General daily processor for variables other than chirps_gefs +# --------------------------------------------------------------------- +def process_regular_var( + daily_stats: list[str], + row, + afi_file, + limit: int, + var: str, + params, + year: int, + country: str, + region: str, + region_id: str, + end_doy: int, + existing_rows: list[list[str]], + use_partial_file: bool +) -> None: """ - Processes a single combination of (params, country, crop, scale, var, year, afi_file, df_country). - Extracts stats from raster files, handles partial data, etc. + Process a 'regular' variable for each day of the year (1..end_doy). + If partial data exists for a given day, skip re-processing. """ - # Here we assume there's a local import in your real environment: - from .stats import geom_extract # type: ignore + for doy in range(1, end_doy): + date_part = f"{year},{doy}" + empty_str = get_default_empty_str(country, region, region_id, date_part) + + # Build the filename (adjust as needed to match your naming scheme). + fname = get_var_fname(params, var, year, doy) + + # Inline logic for nsidc_surface and nsidc_rootzone + if var == "nsidc_surface": + fl_var = params.dir_interim / "nsidc" / "daily" / "surface" / fname + elif var == "nsidc_rootzone": + fl_var = params.dir_interim / "nsidc" / "daily" / "rootzone" / fname + else: + fl_var = params.dir_interim / var / fname + # If the file doesn't exist, store an empty row + if not os.path.isfile(fl_var): + daily_stats.append(empty_str) + continue + + # If partial data exists for this day, skip re-calculation + if use_partial_file and existing_rows: + existing_line = read_or_skip_existing_day(doy, existing_rows) + if existing_line: + daily_stats.append(existing_line) + continue + + # Extract stats + values_str = extract_stats(row["geometry"], var, fl_var, afi_file, limit) + if values_str: + daily_stats.append(f"{country},{region},{region_id},{date_part},{values_str}") + else: + daily_stats.append(empty_str) + + +# --------------------------------------------------------------------- +# 4. Main entry point: process(...) +# --------------------------------------------------------------------- +def process(val): + """ + Processes a single combination of: + (params, country, crop, scale, var, year, afi_file, df_country). + + - Validates scale + - Optionally skips chirps_gefs if not current year + - Prepares output folder + - Loops over each region in df_country, extracting stats + - Writes out a CSV + """ ( params, country, @@ -138,151 +385,79 @@ def process(val): df_country, ) = val - # Validate scale - if scale not in ["admin_1", "admin_2"]: - raise ValueError(f"Scale {scale} is not valid. Must be 'admin_1' or 'admin_2'.") - - admin_name = "ADM1_NAME" if scale == "admin_1" else "ADM2_NAME" - admin_id = "ADM_ID" + # 1. Validate scale + validate_scale(scale) - # Skip chirps_gefs if year != current year - if var == "chirps_gefs" and year != ar.utcnow().year: + # 2. Skip 'chirps_gefs' if year != current year + if skip_chirps_gefs(var, year): return - # --------------------------- - # 1. Prepare output directory - # --------------------------- - threshold = params.parser.getboolean(country, "threshold") - limit = utils.crop_mask_limit(params, country, threshold) + # 3. Prepare the output directory + dir_output = prepare_output_directory(params, country, scale, crop, var) - dir_crop_inputs = Path(f"crop_t{limit}") if threshold else Path(f"crop_p{limit}") - # e.g. D:/Users/ritvik/projects/GEOGLAM/Output/FEWSNET/crop_t20/angola/admin_1/cr/ndvi - dir_output = params.dir_output / dir_crop_inputs / country / scale / crop / var - os.makedirs(dir_output, exist_ok=True) + # 4. Identify admin fields + admin_name, admin_id = get_admin_fields(scale) - # --------------------------- - # 2. Loop over each region - # --------------------------- + # 5. Some references current_year = ar.utcnow().year - end_doy = 367 if calendar.isleap(year) else 366 + end_doy = get_end_doy(year) + threshold = params.parser.getboolean(country, "threshold") + limit = utils.crop_mask_limit(params, country, threshold) + use_partial = should_use_partial_file(params, year, current_year) + # 6. Iterate over each row (region) in df_country for _, row in df_country.iterrows(): + + # Skip rows missing an admin name if not row[admin_name]: continue region = row[admin_name].lower().replace(" ", "_") region_id = row[admin_id] + # Build the path for this region-year CSV path_output = build_output_path(dir_output, region, region_id, year, var, crop) - # Determine if partial file usage applies - use_partial_file = ( - (current_year == year) or (current_year > year and not params.redo) - ) + # If we plan to use partial data, load existing rows + existing_rows = build_existing_rows(path_output) if use_partial else [] - # If partial data exists, read it - existing_rows = [] - if use_partial_file and path_output.is_file(): - with path_output.open() as hndl_outf: - reader = csv.reader(hndl_outf) - existing_rows = list(reader)[1:] # skip header + # We'll collect daily stats in this list + daily_stats = [] + # 7. If var == "chirps_gefs", handle that with special logic if var == "chirps_gefs": - start_date = ar.utcnow().date().timetuple().tm_yday - end_date = ar.utcnow().shift(days=+15).date().timetuple().tm_yday - - for jd in range(start_date, end_date): - # Build default empty string for missing data - empty_values = ",".join([str(np.NaN)] * 6) - empty_str = f"{country},{region},{region_id},{date_part},{empty_values}" - - fl_var = params.dir_interim / var / Path(get_var_fname(var, year, jd)) - if not os.path.isfile(fl_var): - daily_stats.append(empty_str) - else: - val_extracted = geom_extract( - row["geometry"], var, fl_var, - stats_out=["mean", "counts"], - afi=afi_file, - afi_thresh=limit * 100, - thresh_type="Fixed" - ) - if val_extracted: - # Combine the stats and counts - values = list(val_extracted["stats"].values()) + list(val_extracted["counts"].values()) - values_str = ",".join(map(str, values)) - daily_stats.append(f"{country},{region},{region_id},{date_part},{values_str}") - else: - daily_stats.append(empty_str) + process_chirps_gefs( + daily_stats, + row, + afi_file, + limit, + country, + region, + region_id, + var, + year, + params + ) else: - daily_stats = [] - for doy in range(1, end_doy): - # Handle special chirps_gefs case - if var == "chirps_gefs": - # Only do day=1 - if doy > 1: - break - forecast_date = ar.utcnow().shift(days=+15).date() - doy = forecast_date.timetuple().tm_yday - date_part = f"{forecast_date.year},{doy}" - else: - date_part = f"{year},{doy}" - - # Build default empty string for missing data - empty_values = ",".join([str(np.NaN)] * 6) - empty_str = f"{country},{region},{region_id},{date_part},{empty_values}" - - # Attempt to find raster file - fname = get_var_fname(params, var, year, doy) - - if var == "nsidc_surface": - fl_var = params.dir_interim / "nsidc" / "daily" / "surface" / fname - elif var == "nsidc_rootzone": - fl_var = params.dir_interim / "nsidc" / "daily" / "rootzone" / fname - else: - fl_var = params.dir_interim / var / fname - - if not os.path.isfile(fl_var): - daily_stats.append(empty_str) - continue - - # If partial file has data for this day (index = doy-1), skip - if use_partial_file and existing_rows: - # 6th position (index=5) is the var data (or "") - try: - if existing_rows[doy - 1][5] != "": - daily_stats.append(",".join(existing_rows[doy - 1])) - continue - except: - breakpoint() - - val_extracted = geom_extract( - row["geometry"], var, fl_var, - stats_out=["mean", "counts"], - afi=afi_file, - afi_thresh=limit * 100, - thresh_type="Fixed" - ) - if val_extracted: - # Combine the stats and counts - values = list(val_extracted["stats"].values()) + list(val_extracted["counts"].values()) - values_str = ",".join(map(str, values)) - daily_stats.append(f"{country},{region},{region_id},{date_part},{values_str}") - else: - daily_stats.append(empty_str) - - if len(daily_stats): - # Insert a header at the start - header = ( - f"country,region,region_id,year,doy,{var},num_pixels," - "average_crop_percentage,median_crop_percentage," - "min_crop_percentage,max_crop_percentage" + # Otherwise, process day-by-day + process_regular_var( + daily_stats, + row, + afi_file, + limit, + var, + params, + year, + country, + region, + region_id, + end_doy, + existing_rows, + use_partial ) - daily_stats.insert(0, header) - # Write results to CSV - df_result = pd.read_csv(io.StringIO("\n".join(daily_stats))) - df_result.to_csv(path_output, index=False) + # 8. Write the results to CSV + write_daily_stats_to_csv(daily_stats, path_output, var) # ========================