Skip to content

Commit

Permalink
Adjusted scripts to process Ingmar's lake change data (testing palett…
Browse files Browse the repository at this point in the history
…es and max z-level using just 1 UTM zone, 1 input file) and created separate script for config
  • Loading branch information
julietcohen committed Apr 7, 2023
1 parent 8c1997a commit 3bd2fb2
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 26 deletions.
51 changes: 29 additions & 22 deletions IN_PROGRESS_VIZ_WORKFLOW.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,31 +29,38 @@
# help flag provides flag help
# store_true actions stores argument as True

import PRODUCTION_IWP_CONFIG
IWP_CONFIG = PRODUCTION_IWP_CONFIG.IWP_CONFIG
#import PRODUCTION_IWP_CONFIG # reintroduce when processing IWP
#IWP_CONFIG = PRODUCTION_IWP_CONFIG.IWP_CONFIG # reintroduce when processing IWP

import lake_change_config
IWP_CONFIG = lake_change_config.IWP_CONFIG


#print("Using config: ")
#pprint.pprint(IWP_CONFIG)

# setup logging
def setup_logging(log_json_file):
"""
Setup logging configuration
"""
with open(log_json_file, 'r') as f:
logging_dict = json.load(f)
logging.config.dictConfig(logging_dict)
return logging_dict
# # setup logging
# def setup_logging(log_json_file):
# """
# Setup logging configuration
# """
# with open(log_json_file, 'r') as f:
# logging_dict = json.load(f)
# logging.config.dictConfig(logging_dict)
# return logging_dict

# define logger:
logging_config = '/u/julietcohen/viz-workflow/logging.json'
logging_dict = setup_logging(logging_config)
# retrieve name of logger to add updates
logger = logging.getLogger(__name__)
# # define logger:
# logging_config = '/u/julietcohen/viz-workflow/logging.json'
# logging_dict = setup_logging(logging_config)
# # retrieve name of logger to add updates
# logger = logging.getLogger(__name__)


def main():
result = subprocess.run(["hostname", "-i"], capture_output=True, text=True)
head_ip = result.stdout.strip()
#print("connecting to ray.")
#ray.init(address='auto', dashboard_port=8265)
print(f"Connecting to Ray... at address ray://{head_ip}:10001")
ray.init(address=f'ray://{head_ip}:10001', dashboard_port=8265) # most reliable way to start Ray
# use port-forwarding to see dashboard: `ssh -L 8265:localhost:8265 [email protected]`
Expand Down Expand Up @@ -112,16 +119,16 @@ def step0_staging():
# update the config for the current context: write staged files to local /tmp dir
iwp_config = deepcopy(IWP_CONFIG)
iwp_config['dir_staged'] = iwp_config['dir_staged_local']
iwp_config['dir_footprints'] = iwp_config['dir_footprints_local']
#iwp_config['dir_footprints'] = iwp_config['dir_footprints_local'] # reintroduce when processing IWP
# make directory /tmp/staged on each node
# not really necessary cause Robyn's functions are set up to do this
# and /tmp allows dirs to be created to write files
os.makedirs(iwp_config['dir_staged'], exist_ok = True)

# OLD METHOD "glob" all files.
stager = pdgstaging.TileStager(iwp_config, check_footprints=False)
missing_footprints = stager.check_footprints()
print(f"⚠️ Num missing footprints: {len(missing_footprints)}")
#missing_footprints = stager.check_footprints() # reintroduce when processing IWP
#print(f"⚠️ Num missing footprints: {len(missing_footprints)}") # reintroduce when processing IWP

# Get
staging_input_files_list = stager.tiles.get_filenames_from_dir('input')
Expand Down Expand Up @@ -484,7 +491,7 @@ def step3_raster_lower(batch_size_geotiffs=20):

iwp_config['dir_geotiff'] = iwp_config['dir_geotiff_remote']
# next line is likely not necessary but can't hurt
iwp_config['dir_footprints'] = iwp_config['dir_footprints_local']
# iwp_config['dir_footprints'] = iwp_config['dir_footprints_local'] # reintroduce when processing IWP data
# update the config for the current context: pull stager that represents staged files in /scratch
# next line is likely not necessary but can't hurt
iwp_config['dir_staged'] = iwp_config['dir_staged_remote_merged']
Expand Down Expand Up @@ -535,7 +542,7 @@ def step3_raster_lower(batch_size_geotiffs=20):
# even though the geotiff base dir has been created and the filenames have been batched,
# still cannot switch the dir_geotiff to _local !!! because the lower z-level rasters need to be
# written to scratch rather than /tmp so all lower z-levels can access all files in higher z-levels
iwp_config['dir_footprints'] = iwp_config['dir_footprints_local'] # we deduplicate at rasterization
# iwp_config['dir_footprints'] = iwp_config['dir_footprints_local'] # we deduplicate at rasterization, reintroduce when processing IWP data
#iwp_config['dir_geotiff'] = iwp_config['dir_geotiff_local'] # run immeditely before defining rasterizer
# I dont think theres a need to set rasterizer with new config after chaning this property cause
# that is done within the function create_composite_geotiffs() but troubleshooting
Expand Down Expand Up @@ -733,7 +740,7 @@ def make_batch(items, batch_size):
return [items[i:i + batch_size] for i in range(0, len(items), batch_size)]

@ray.remote
def stage_remote(filepath, config, logging_dict = logging_dict):
def stage_remote(filepath, config, logging_dict = None):
"""
Step 1.
Parallelism at the per-shape-file level.
Expand Down
98 changes: 98 additions & 0 deletions lake_change_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
from datetime import datetime
import subprocess
#######################
#### Change me 😁 ####
#######################
# ALWAYS include the tailing slash "/"

# define user on Delta, avoid writing files to other user's dir
user = subprocess.check_output("whoami").strip().decode("ascii")
#head_node = 'cn___/'
head_node = 'gpub073'
# define desired location for output files within user dir
# ensures a new subfolder every run as long as new run is not started within same day as last run
# following path is the output subdir for test run, using just on subdir of the alaska files that is only ~8% of the Alaska dir, 23.5 GB
output_subdir = 'lake_change/output/utm_32640_20230407'
#output_subdir = datetime.now().strftime("%b-%d-%y")
# don't use subprocess to retrieve date for subdir because runs might span over 2 days if they go overnight

##############################
#### END OF Change me 😁 ####
##############################

# input path for all data:
INPUT = '/scratch/bbou/julietcohen/lake_change/input/sample/'

# following path is the OUTPUT for test run, using just on subdir of the alaska files that is only 7.78% of the Alaska dir, 45.57 GB
OUTPUT = f'/scratch/bbou/{user}/{output_subdir}/' # Dir for results. High I/O is good.

STAGING_LOCAL = '/tmp/staged/'
STAGING_REMOTE = OUTPUT + 'staged/'
STAGING_REMOTE_MERGED = STAGING_REMOTE + head_node

GEOTIFF_LOCAL = '/tmp/geotiff/'
GEOTIFF_REMOTE = OUTPUT + 'geotiff/' # Kastan used pathlib.Path(OUTPUT) / pathlib.Path('merged_geotiff_sep9') for this so if it errors try something similar
# check if need a variable GEOTIFF_REMOTE_MERGED after we finish the raster step successfully

#WEBTILE_LOCAL = '/tmp/web_tiles/' # we do not use /tmp for webtile step, it is unique in that way
WEBTILE_REMOTE = OUTPUT + 'web_tiles/'

#THREE_D_PATH = OUTPUT + '3d_tiles/' # workflow does not accomodate 3d-tiling yet

""" FINAL config is exported here, and imported in the IPW Workflow python file. """
IWP_CONFIG = {
"deduplicate_clip_to_footprint": False,
"dir_output": OUTPUT, # base dir of all output, needs to change every run with definition of output_subdir
"dir_input": INPUT, # base dir of all files to be staged
"ext_input": ".gpkg",
"dir_geotiff_remote": GEOTIFF_REMOTE, # we store geotiffs in /scratch after they are created so they are safe after the job concludes, and web-tiling can access all geotiffs in the same directory
"dir_geotiff_local": GEOTIFF_LOCAL, # we write highest level geotiffs to /tmp then transfer to /scratch
"dir_web_tiles": WEBTILE_REMOTE, # we do not use /tmp for webtile step, it writes directly to /scratch
"dir_staged_remote": STAGING_REMOTE, # we rsync the staged files to /scratch to merge, then rasterize and 3dtile with that merged dir
"dir_staged_remote_merged": STAGING_REMOTE_MERGED, # input for raster highest after staged files have been merged
"dir_staged_local": STAGING_LOCAL, # initially write staged files to /tmp so they write faster
"filename_staging_summary": STAGING_REMOTE + "staging_summary.csv",
"filename_rasterization_events": GEOTIFF_REMOTE + "raster_events.csv",
"filename_rasters_summary": GEOTIFF_REMOTE + "raster_summary.csv",
#"filename_config": OUTPUT + "config",
"version": datetime.now().strftime("%B%d,%Y"),
"simplify_tolerance": 0.1,
"tms_id": "WGS1984Quad",
"z_range": [
0,
11
],
"geometricError": 57,
"z_coord": 0,
"statistics": [
{
"name": "coverage",
"weight_by": "area",
"property": "area_per_pixel_area",
"aggregation_method": "sum",
"resampling_method": "average",
"val_range": [
0,
1
],
"palette": [
"#ff0000", # red
"#0000ff" # blue
],
"nodata_val": 0,
"nodata_color": "#ffffff00"
},
],
"deduplicate_at": "raster",
"deduplicate_keep_rules": [
[
"Date",
"larger"
]
],
"deduplicate_method": "neighbor",
"deduplicate_keep_rules": [["staging_filename", "larger"]],
"deduplicate_overlap_tolerance": 0.1,
"deduplicate_overlap_both": False,
"deduplicate_centroid_tolerance": None
}
8 changes: 6 additions & 2 deletions rsync_staging_to_scratch.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@
import subprocess
from subprocess import PIPE, Popen
import pprint
import PRODUCTION_IWP_CONFIG
IWP_CONFIG = PRODUCTION_IWP_CONFIG.IWP_CONFIG
#import PRODUCTION_IWP_CONFIG
#IWP_CONFIG = PRODUCTION_IWP_CONFIG.IWP_CONFIG

import lake_change_config
IWP_CONFIG = lake_change_config.IWP_CONFIG

# set config properties for current context
IWP_CONFIG['dir_staged'] = IWP_CONFIG['dir_staged_local']
SOURCE = IWP_CONFIG['dir_staged']
Expand Down
8 changes: 6 additions & 2 deletions rsync_step2_raster_highest_to_scratch.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@
import subprocess
import time
from subprocess import PIPE, Popen
import PRODUCTION_IWP_CONFIG
IWP_CONFIG = PRODUCTION_IWP_CONFIG.IWP_CONFIG
#import PRODUCTION_IWP_CONFIG # reintroduce when processing IWP
#IWP_CONFIG = PRODUCTION_IWP_CONFIG.IWP_CONFIG # reintroduce when processing IWP

import lake_change_config
IWP_CONFIG = lake_change_config.IWP_CONFIG

# set config properties for current context
IWP_CONFIG['dir_geotiff'] = IWP_CONFIG['dir_geotiff_local']
SOURCE = IWP_CONFIG['dir_geotiff']
Expand Down

0 comments on commit 3bd2fb2

Please sign in to comment.