Skip to content

Commit

Permalink
added workflow execution steps to README, removed outdated documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
julietcohen committed May 16, 2023
1 parent 03dd876 commit 3b2e919
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 93 deletions.
36 changes: 0 additions & 36 deletions IN_PROGRESS_VIZ_WORKFLOW.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,6 @@
import PRODUCTION_IWP_CONFIG # reintroduce when processing IWP
IWP_CONFIG = PRODUCTION_IWP_CONFIG.IWP_CONFIG # reintroduce when processing IWP

# set up logging from ConfigManager.py
#log_filepath = IWP_CONFIG["dir_output"] + "log.log"
# handler = logging.FileHandler(
# os.environ.get("LOGFILE", '/tmp/log.log')) # changed function to one that is newer, maybe logger.handlers.WatchedFileHandler is depreciated

# handler = logging.handlers.WatchedFileHandler(
# os.environ.get("LOGFILE", '/tmp/log.log'))
# formatter = logging.Formatter(logging.BASIC_FORMAT)
# handler.setFormatter(formatter)
# root = logging.getLogger()
# root.setLevel(os.environ.get("LOGLEVEL", "INFO"))
# root.addHandler(handler)
# # set up logging from other scripts to print to console
# sh = logging.StreamHandler()
# sh.setFormatter(formatter)
# root.addHandler(sh)

# configure logger
logger = logging.getLogger("logger")
# Remove any existing handlers from the logger
Expand All @@ -68,25 +51,6 @@

print(logger.handlers)

#print("Using config: ")
#pprint.pprint(IWP_CONFIG)

# # setup logging
# def setup_logging(log_json_file):
# """
# Setup logging configuration
# """
# with open(log_json_file, 'r') as f:
# logging_dict = json.load(f)
# logging.config.dictConfig(logging_dict)
# return logging_dict

# # define logger:
# logging_config = '/u/julietcohen/viz-workflow/logging.json'
# logging_dict = setup_logging(logging_config)
# # retrieve name of logger to add updates
# logger = logging.getLogger(__name__)

def main():
result = subprocess.run(["hostname", "-i"], capture_output=True, text=True)
head_ip = result.stdout.strip()
Expand Down
25 changes: 15 additions & 10 deletions PRODUCTION_IWP_CONFIG.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,40 +12,45 @@
# define desired location for output files within user dir
# ensures a new subfolder every run as long as new run is not started within same day as last run
# following path is the output subdir for test run, using just on subdir of the alaska files that is only ~8% of the Alaska dir, 23.5 GB
output_subdir = 'IWP/output/iwp_testRun_2023____'
output_subdir = 'IWP/output/iwp_testRun'
#output_subdir = datetime.now().strftime("%b-%d-%y")
# don't use subprocess to retrieve date for subdir because runs might span over 2 days if they go overnight

##############################
#### END OF Change me 😁 ####
##############################

# input path for all data:
# input path
#INPUT = '/scratch/bbou/julietcohen/IWP/input/2023-01-19/iwp_files/high/' # The output data of MAPLE. Which is the input data for STAGING.
INPUT = '/scratch/bbou/julietcohen/IWP/input/2023-01-19/iwp_files/high/russia/226_227_iwp/'
#INPUT = '/scratch/bbou/julietcohen/IWP/input/2023-01-19/iwp_files/high/russia/226_227_iwp/'
INPUT = '/scratch/bbou/julietcohen/IWP/input/few_adjacent_russia/iwp/'

# following path is the OUTPUT for test run, using just on subdir of the alaska files that is only 7.78% of the Alaska dir, 45.57 GB
# output path
OUTPUT = f'/scratch/bbou/{user}/{output_subdir}/' # Dir for results. High I/O is good.
# output path for all data, when it is available:
#OUTPUT = f'/scratch/bbou/{user}/IWP/output/{output_subdir}/' # Dir for results. High I/O is good.

# footprints paths for all data:
# footprints paths:
FOOTPRINTS_LOCAL = '/tmp/staged_footprints/'
#FOOTPRINTS_REMOTE = '/scratch/bbou/julietcohen/IWP/footprint_files_with_date_20230119/high/'
FOOTPRINTS_REMOTE = '/scratch/bbou/julietcohen/IWP/footprint_files_with_date_20230119/high/russia/226_227_iwp/'
#FOOTPRINTS_REMOTE = '/scratch/bbou/julietcohen/IWP/footprint_files_with_date_20230119/high/russia/226_227_iwp/'
FOOTPRINTS_REMOTE = '/scratch/bbou/julietcohen/IWP/input/few_adjacent_russia/footprints/'

# staging paths
STAGING_LOCAL = '/tmp/staged/'
STAGING_REMOTE = OUTPUT + 'staged/'
STAGING_REMOTE_MERGED = STAGING_REMOTE + head_node

# geotiff paths
GEOTIFF_LOCAL = '/tmp/geotiff/'
GEOTIFF_REMOTE = OUTPUT + 'geotiff/' # Kastan used pathlib.Path(OUTPUT) / pathlib.Path('merged_geotiff_sep9') for this so if it errors try something similar
# check if need a variable GEOTIFF_REMOTE_MERGED after we finish the raster step successfully

# web tile path
#WEBTILE_LOCAL = '/tmp/web_tiles/' # we do not use /tmp for webtile step, it is unique in that way
WEBTILE_REMOTE = OUTPUT + 'web_tiles/'

#THREE_D_PATH = OUTPUT + '3d_tiles/' # workflow does not accomodate 3d-tiling yet
# 3d tile path
# Note: workflow does not accomodate 3d-tiling yet as of release 0.9.0
#THREE_D_PATH = OUTPUT + '3d_tiles/'

""" FINAL config is exported here, and imported in the IPW Workflow python file. """
IWP_CONFIG = {
Expand All @@ -56,7 +61,7 @@
"ext_footprints": ".shp",
"dir_footprints_remote": FOOTPRINTS_REMOTE, # the footprints start on /scratch before we transfer them to /tmp
"dir_footprints_local": FOOTPRINTS_LOCAL, # we rsync footprints from /scratch to /tmp before we use them for deduplication
"dir_geotiff_remote": GEOTIFF_REMOTE, # we store geotiffs in /scratch after they are created so they are safe after the job concludes, and web-tiling can access all geotiffs in the same directory
"dir_geotiff_remote": GEOTIFF_REMOTE, # we store geotiffs in /scratch after they're created so they're safe after the job concludes, and web-tiling can access all geotiffs in the same directory
"dir_geotiff_local": GEOTIFF_LOCAL, # we write highest level geotiffs to /tmp then transfer to /scratch
"dir_web_tiles": WEBTILE_REMOTE, # we do not use /tmp for webtile step, it writes directly to /scratch
"dir_staged_remote": STAGING_REMOTE, # we rsync the staged files to /scratch to merge, then rasterize and 3dtile with that merged dir
Expand Down
93 changes: 75 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,27 +1,81 @@
# pdg_ray_workflow
Ray PDG's Viz-staging, Viz-Raster and Viz-3D in parallel using Ray Core and Ray Workflows.
# PDG Ray Workflow

### requirements
Permafrost Discovery Gateway visualization workflow that uses [viz-staging](https://github.com/PermafrostDiscoveryGateway/viz-staging), [viz-raster](https://github.com/PermafrostDiscoveryGateway/viz-raster/tree/main), and [viz-3dtiles](https://github.com/PermafrostDiscoveryGateway/viz-3dtiles) in parallel using Ray Core and Ray workflows.

Run this to install dependencies (only tested on x86_64):
```
conda env create --file environment_cross_platform.yml
```
TODO: Reduce strictness of env.yml requirements.
## Running the Workflow

For release 0.9.0, scripts must be executed in a particular order for staging, rasterization, and web-tiling steps. 3d-tiling has not been tested for this release.

## Running
- ssh into the Delta server

```
# Start head node. It will also act as a worker node by default.
ray start --head --port=6379 --dashboard-port=8265
# Start worker nodes
- Pull updates from the `main` branch for each of the 4 PDG repositories:
- [`PermafrostDiscoveryGateway/viz-workflow`](https://github.com/PermafrostDiscoveryGateway/viz-workflow/tree/main)
- [`PermafrostDiscoveryGateway/viz-staging`](https://github.com/PermafrostDiscoveryGateway/viz-staging)
- [`PermafrostDiscoveryGateway/viz-raster`](https://github.com/PermafrostDiscoveryGateway/viz-raster)
- [`PermafrostDiscoveryGateway/viz-3dtiles`](https://github.com/PermafrostDiscoveryGateway/viz-3dtiles) (Note: that release 0.9.0, 3D-tiling has not been fully implemented)

```
- Create a virtual environment with `python=3.9` and install __local__ versions of the PDG packages (using `pip install -e {LOCAL PACKAGE}`). Also install the packages:
- `ray`
- `glances`
- `pyfastcopy`

- Prepare one of the two `slurm` scripts to claim some worker nodes on which we will launch a job.
- Open the appropriate script that will soon be run to claim the nodes: either `viz-workflow/slurm/BEST-v2_gpu_ray.slurm` if you're using GPU, or `BEST_cpu_ray_double_srun.slurm` for CPU.
- **Change the line `#SBATCH --nodes={NUMBER}`** which represents the number of nodes that will process the IWP data.
- **Change the line `#SBATCH --time=24:00:00`** which represents the total amount of time a job is allowed to run (and charge credits based on minutes and cores) before it is cancelled. The full 24 hours should be set if doing a full IWP run.
- **Change the line `#SBATCH --account={ACCOUNT NAME}`** and enter the account name for the appropriate allocation. Note that we do __not__ store these private account names on GitHub, so pay attention to this line when you are pushing.
- **Find the `# global settings section` and change `conda activate {ENVIRONMENT}` or `source path/to/{ENVIRONMENT}/bin/activate`** by entering your virtual environment for this workflow.

- Open a new terminal, start a `tmux` session, then activate your virtual environment.

- Switch into the slurm dir by running `cd viz-workflow/slurm`. Then run `sbatch BEST-v2_gpu_ray.slurm` or `sbatch BEST_cpu_ray_double_srun.slurm` to launch the job on the number of nodes you specified within that file.

- Sync the most recent footprints from `/scratch` to all relevant nodes' `/tmp` dirs on Delta. Within a `tmux` session, switch into the correct environment, and ssh into the head node (e.g. `ssh gpub059`) and run `python viz-workflow/rsync_footprints_to_nodes.py`.

- Adjust `viz-workflow/PRODUCTION_IWP_CONFIG.py` as needed:
- Change the variable `head_node` to the head node.
- Specify the `INPUT` path and `FOOTPRINTS_REMOTE` paths.
- Specify the `OUTPUT` path (which serves as the _start_ of the path for all output files). This includes a variable `output_subdir` which should be changed to something unique, such as any subfolders the user wants the data to be written to. Create this folder manually.
- Within the subdir just created, created a subdirectory called `staged`.

- Adjust `viz-workflow/IN_PROGRESS_VIZ_WORKFLOW.py` as needed:
- Within the `try:` part of the first function `main()`, comment out / uncomment out steps depending on your stage in the workflow. `step0_staging()` is first, so only uncomment this step.

- Within the `tmux` session with your virtual environment activated, ssh into the head node associated with that job by running `ssh gpub059` or `ssh cn059`, for example. Then run: `python viz-workflow/IN_PROGRESS_VIZ_WORKFLOW.py`.

- Once staging is complete, run `python viz-workflow/rsync_staging_to_scratch.py`.

- Adjust the file `merge_staged_vector_tiles.py` as needed:
- Set the variables `staged_dir_path_list` and `merged_dir_path`:
- Change the last string part of `merged_dir_path` (where merged staged files will live) to the lowest number node of the nodes you're using (the head node).
- Change the hard-coded nodes specified in `staged_dir_paths_list` to the list of nodes you're using for this job, except **do not include the head node in this list** because it was already assigned to `merged_dir_path`.
- Within a `tmux` session, with your virtual environment activated, and ssh'd into the head node, run `python viz-workflow/merge_staged_vector_tiles.py`.

### port forward ray dashboard
- Pre-populate your `/scratch` with a `geotiff` dir.

1. Login to a login node
- Return to the file `viz-workflow/IN_PROGRESS_VIZ_WORKFLOW.py` and comment out `step0_staging()`, and uncomment out the next step: `step2_raster_highest(batch_size=100)` (skipping 3d-tiling for release 0.9.0). Run `python viz-workflow/IN_PROGRESS_VIZ_WORKFLOW.py` in a `tmux` session with the virtual environment activated and ssh'd into the head node.

- Run `python viz-workflow/rsync_step2_raster_highest_to_scratch.py`.

- Return to the file `viz-workflow/IN_PROGRESS_VIZ_WORKFLOW.py` and comment out `step2_raster_highest(batch_size=100)`, and uncomment out the next step: `step3_raster_lower(batch_size_geotiffs=100)`. Run `python viz-workflow/IN_PROGRESS_VIZ_WORKFLOW.py`.

- Check that the file `rasters_summary.csv` was written to `/scratch`. Download this file locally. If the top few lines look oddly fomatted, delete these lines and re-upload the file to the same directory (overwriting the misformatted one there).

- Create a new directory called `web_tiles` in your `/scratch` dir.

- Return to `IN_PROGRESS_VIZ_WORKFLOW.py` and comment out the step: `step4_webtiles(batch_size_web_tiles=250)`. Run `python viz-workflow/IN_PROGRESS_VIZ_WORKFLOW.py`.

- Transfer the `log.log` in each nodes' `/tmp` dir to that respective nodes' subdir within `staging` dir: run `python viz-workflow/rsync_log_to_scratch.py`

- Cancel the job: `scancel {JOB ID}`. The job ID can be found on the left column of the output from `squeue | grep {USERNAME}`. No more credits are being used. Recall that the job will automatically be cancelled after 24 hours even if this command is not run.

- Remember to remove the `{ACCOUNT NAME}` for the allocation in the slurm script before pushing to GitHub.

- Move output data from Delta to the Arctic Data Center as soon as possible.

### Port Forward Ray Dashboard

1. Login to a login node on delta server

```
ssh <👉YOUR_NCSA_USERNAME👈>@login.delta.ncsa.illinois.edu
Expand Down Expand Up @@ -49,9 +103,12 @@ ssh -L 8265:localhost:8265 <local_username>@<your_locaL_machine>
# Navigate your web browser to: localhost:8265/
```

### contributing
### Contributing

Please contribute via pull requests.

Documenting an environment can be done as follows:

Documenting an env (best way to create environment.yml):
```
conda env export | grep -v "^prefix: " > environment.yml
```
25 changes: 3 additions & 22 deletions merge_staged_vector_tiles.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,3 @@
"""
This file prepares staged, vector tiles for archiving in the DataONE network.
It has methods that do the following:
1. Find all the paths of vector tiles in the staged directory.
2. Open each path in GeoPandas
3. Remove the polygons whose centroids are not contained within the
boundaries of the tile. (This avoids archiving the exact same polygons
in two different tiles, in the case that the polygon intersects a tile's
boundary.)
4. Remove the "centroids_within_tile" and "tile" columns, which are no
longer needed after the above step. The tile is still identified using
by the "centroid_tile" property.
5. Write the GeoPandas object to a file in a given archive directory, still
maintaining the tile path structure (e.g.
{TileMatrix}/{TileRow}/{TileCol})
Juliet's note: I think the above documentation is wrong. I don't see where in this script we exeute #3 & #4.
If you read in a tile from the head node after merging, it still has the property "staging_centroid_within_tile"
"""

import filecmp
import os
import pathlib
Expand Down Expand Up @@ -61,7 +41,8 @@ def main():
'''
Usage Instructions:
- Set the list of paths to the vector tiles `staged_dir_paths_list`
- Choose one of the paths from the last step, remove it from the `staged_dir_paths_list` and set it as the `merged_dir_path` that will be used as the merged dir.
- Choose one of the paths from the last step, remove the head node
from the `staged_dir_paths_list` and set it as the `merged_dir_path`
'''

#######################
Expand Down Expand Up @@ -101,7 +82,7 @@ def main():
print("Final dir: ", merged_dir_path)

#stager = pdgstaging.TileStager(config=IWP_CONFIG, check_footprints=False)
stager = TileStager(config=IWP_CONFIG, check_footprints=False) # changed this from above line when troubleshooting merging 02/01
stager = TileStager(config=IWP_CONFIG, check_footprints=False)
ext = '.gpkg'

print("Starting merge...")
Expand Down
8 changes: 1 addition & 7 deletions rsync_log_to_scratch.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,12 @@

# define user on Delta, avoid writing files to other user's dir
user = subprocess.check_output("whoami").strip().decode("ascii")
# define desired location for output files within user dir
# ensures a new subfolder every run as long as new run is not started within same day as last run
#output_subdir = datetime.now().strftime("%b-%d-%y")
# don't use subprocess to retrieve date for subdir because runs might span over 2 days if they go overnight
#output_subdir = "iwp_testRun_20230131"
#output_subdir = '2023-01-20'

''' get hostnames from slurm file '''
with open(f'/u/{user}/viz-workflow/slurm/nodes_array.txt', 'r') as f:
hostnames = f.read().splitlines()

print("Syncing staged files to nodes:\n\t", '\n\t'.join(hostnames))
print("Syncing log files to nodes within /staged dir:\n\t", '\n\t'.join(hostnames))

count = 0
for hostname in hostnames:
Expand Down

0 comments on commit 3b2e919

Please sign in to comment.