Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Clariy Ray Documentation and Fix Minor Issues #1717

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
312 changes: 167 additions & 145 deletions docs/source/features/ray.rst

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions source/standalone/workflows/ray/cluster_configs/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
FROM isaac-lab-base:latest

# WGet is needed so that GCS or other cloud providers can mark the container as ready.
# Otherwise the Ray liveliness checks fail.
RUN apt-get update && apt-get install wget

# Set NVIDIA paths
ENV PATH="/usr/local/nvidia/bin:$PATH"
ENV LD_LIBRARY_PATH="/usr/local/nvidia/lib64"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ spec:
block: "true"
dashboard-host: 0.0.0.0
dashboard-port: "8265"
node-ip-address: "0.0.0.0"
port: "6379"
include-dashboard: "true"
ray-debugger-external: "true"
Expand All @@ -30,7 +29,7 @@ spec:
apiVersion: v1
kind: Service
metadata:
name: head
name: {{ name }}-head
spec:
type: LoadBalancer
template:
Expand Down Expand Up @@ -130,7 +129,7 @@ spec:
volumeMounts:
- mountPath: /tmp/ray
name: ray-logs
command: ["/bin/bash", "-c", "ray start --address=head.{{ namespace }}.svc.cluster.local:6379 && tail -f /dev/null"]
command: ["/bin/bash", "-c", "ray start --address={{name}}-head.{{ namespace }}.svc.cluster.local:6379 && tail -f /dev/null"]
- image: fluent/fluent-bit:1.9.6
name: fluentbit
resources:
Expand Down
16 changes: 8 additions & 8 deletions source/standalone/workflows/ray/grok_cluster_with_kubectl.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

.. code-block:: bash

./isaaclab.sh -p source/standalone/workflows/ray/grok_cluster_with_kubectl.py
python3 source/standalone/workflows/ray/grok_cluster_with_kubectl.py
# For options, supply -h arg
"""

Expand Down Expand Up @@ -67,9 +67,10 @@ def get_clusters(pods: list, cluster_name_prefix: str) -> set:

match = re.match(r"(" + re.escape(cluster_name_prefix) + r"[-\w]+)", pod_name)
if match:
# Get base name without head/worker suffix
base_name = match.group(1).split("-head")[0].split("-worker")[0]
clusters.add(base_name)
# Get base name without head/worker suffix (skip workers)
if "head" in pod_name:
base_name = match.group(1).split("-head")[0]
clusters.add(base_name)
return sorted(clusters)


Expand All @@ -90,9 +91,7 @@ def get_mlflow_info(namespace: str = None, cluster_prefix: str = "isaacray") ->
clusters = get_clusters(pods=pods, cluster_name_prefix=cluster_prefix)
if len(clusters) > 1:
raise ValueError("More than one cluster matches prefix, could not automatically determine mlflow info.")

base_name = cluster_prefix.split("-head")[0].split("-worker")[0]
mlflow_name = f"{base_name}-mlflow"
mlflow_name = f"{cluster_prefix}-mlflow"

cmd = ["kubectl", "get", "svc", mlflow_name, "-n", namespace, "--no-headers"]
try:
Expand All @@ -102,7 +101,8 @@ def get_mlflow_info(namespace: str = None, cluster_prefix: str = "isaacray") ->
# Get cluster IP
cluster_ip = fields[2]
port = "5000" # Default MLflow port

# This needs to be http to be resolved. HTTPS can't be resolved
# This should be fine as it is on a subnet on the cluster regardless
return f"http://{cluster_ip}:{port}"
except subprocess.CalledProcessError as e:
raise ValueError(f"Could not grok MLflow: {e}") # Fixed f-string
Expand Down
12 changes: 6 additions & 6 deletions source/standalone/workflows/ray/launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,28 @@
import subprocess
import yaml

import util
from jinja2 import Environment, FileSystemLoader
from kubernetes import config

import source.standalone.workflows.ray.util as util

"""This script helps create one or more KubeRay clusters.

Usage:

.. code-block:: bash
# If the head node is stuck on container creating, make sure to create a secret
./isaaclab.sh -p source/standalone/workflows/ray/launch.py -h
python3 source/standalone/workflows/ray/launch.py -h

# Examples

# The following creates 8 GPUx1 nvidia l4 workers
./isaaclab.sh -p source/standalone/workflows/ray/launch.py --cluster_host google_cloud \
python3 source/standalone/workflows/ray/launch.py --cluster_host google_cloud \
--namespace <NAMESPACE> --image <YOUR_ISAAC_RAY_IMAGE> \
--num_workers 8 --num_clusters 1 --worker_accelerator nvidia-l4 --gpu_per_worker 1

# The following creates 1 GPUx1 nvidia l4 worker, 2 GPUx2 nvidia-tesla-t4 workers,
# and 2 GPUx4 nvidia-tesla-t4 GPU workers
./isaaclab.sh -p source/standalone/workflows/ray/launch.py --cluster_host google_cloud \
python3 source/standalone/workflows/ray/launch.py --cluster_host google_cloud \
--namespace <NAMESPACE> --image <YOUR_ISAAC_RAY_IMAGE> \
--num_workers 1 2 --num_clusters 1 \
--worker_accelerator nvidia-l4 nvidia-tesla-t4 --gpu_per_worker 1 2 4
Expand All @@ -53,7 +52,7 @@ def apply_manifest(args: argparse.Namespace) -> None:
# Set up Jinja2 environment for loading templates
templates_dir = RAY_DIR / "cluster_configs" / args.cluster_host
file_loader = FileSystemLoader(str(templates_dir))
jinja_env = Environment(loader=file_loader, keep_trailing_newline=True)
jinja_env = Environment(loader=file_loader, keep_trailing_newline=True, autoescape=True)

# Define template filename
template_file = "kuberay.yaml.jinja"
Expand All @@ -79,6 +78,7 @@ def apply_manifest(args: argparse.Namespace) -> None:

# Apply the Kubernetes manifest using kubectl
try:
print(cleaned_yaml_string)
subprocess.run(["kubectl", "apply", "-f", "-"], input=cleaned_yaml_string, text=True, check=True)
except subprocess.CalledProcessError as e:
exit(f"An error occurred while running `kubectl`: {e}")
Expand Down
8 changes: 4 additions & 4 deletions source/standalone/workflows/ray/submit_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,16 @@
.. code-block:: bash

# Example; submitting a tuning job
./isaaclab.sh -p source/standalone/workflows/ray/submit_job.py \
python3 source/standalone/workflows/ray/submit_job.py \
--aggregate_jobs /workspace/isaaclab/source/standalone/workflows/ray/tuner.py \
--cfg_file hyperparameter_tuning/vision_cartpole_cfg.py \
--cfg_class CartpoleRGBNoTuneJobCfg --mlflow_uri <ML_FLOW_URI>
--cfg_class CartpoleTheiaJobCfg --mlflow_uri <ML_FLOW_URI>

# Example: Submitting resource wrapped job
./isaaclab.sh -p source/standalone/workflows/ray/submit_job.py --aggregate_jobs wrap_resources.py --sub_jobs ./isaaclab.sh -p source/standalone/workflows/rl_games/train.py --task Isaac-Cartpole-v0 --headless+./isaaclab.sh -p source/standalone/workflows/rl_games/train.py --task Isaac-Cartpole-RGB-Camera-Direct-v0 --headless --enable_cameras agent.params.config.max_epochs=150
python3 source/standalone/workflows/ray/submit_job.py --aggregate_jobs wrap_resources.py --test

# For all command line arguments
./isaaclab.sh -p source/standalone/workflows/ray/submit_job.py -h
python3 source/standalone/workflows/ray/submit_job.py -h
"""
script_directory = os.path.dirname(os.path.abspath(__file__))
CONFIG = {"working_dir": script_directory, "executable": "/workspace/isaaclab/isaaclab.sh -p"}
Expand Down
34 changes: 24 additions & 10 deletions source/standalone/workflows/ray/tuner.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
"""
This script breaks down an aggregate tuning job, as defined by a hyperparameter sweep configuration,
into individual jobs (shell commands) to run on the GPU-enabled nodes of the cluster.
By default, (unless combined as a sub-job in a resource-wrapped aggregate job), one worker is created
for each GPU-enabled node in the cluster for each individual job.
By default, one worker is created for each GPU-enabled node in the cluster for each individual job.
To use more than one worker per node (likely the case for multi-GPU machines), supply the
num_workers_per_node argument.

Each hyperparameter sweep configuration should include the workflow,
runner arguments, and hydra arguments to vary.
Expand All @@ -39,16 +40,15 @@
./isaaclab.sh -p source/standalone/workflows/ray/tuner.py -h

# Examples
# Local (not within a docker container, when within a local docker container, do not supply run_mode argument)
# Local
./isaaclab.sh -p source/standalone/workflows/ray/tuner.py --run_mode local \
--cfg_file source/standalone/workflows/ray/hyperparameter_tuning/vision_cartpole_cfg.py \
--cfg_class CartpoleRGBNoTuneJobCfg
# Local docker: start the ray server and run above command in the same running container without run_mode arg
--cfg_class CartpoleTheiaJobCfg
# Remote (run grok cluster or create config file mentioned in :file:`submit_job.py`)
./isaaclab.sh -p source/standalone/workflows/ray/submit_job.py \
--aggregate_jobs tuner.py \
--cfg_file hyperparameter_tuning/vision_cartpole_cfg.py \
--cfg_class CartpoleRGBNoTuneJobCfg --mlflow_uri <MLFLOW_URI_FROM_GROK_OR_MANUAL>
--cfg_class CartpoleTheiaJobCfg --mlflow_uri <MLFLOW_URI_FROM_GROK_OR_MANUAL>

"""

Expand All @@ -74,7 +74,7 @@ def setup(self, config: dict) -> None:
print(f"[INFO]: Recovered invocation with {self.invoke_cmd}")
self.experiment = None

def reset_config(self, new_config):
def reset_config(self, new_config: dict):
"""Allow environments to be re-used by fetching a new invocation command"""
self.setup(new_config)
return True
Expand All @@ -95,7 +95,7 @@ def step(self) -> dict:
self.proc = experiment["proc"]
self.experiment_name = experiment["experiment_name"]
self.isaac_logdir = experiment["logdir"]
self.tensorboard_logdir = self.isaac_logdir + f"/{self.experiment_name}/summaries"
self.tensorboard_logdir = self.isaac_logdir + "/" + self.experiment_name
self.done = False

if self.proc is None:
Expand Down Expand Up @@ -220,10 +220,24 @@ class JobCfg:
"""To be compatible with :meth: invoke_tuning_run and :class:IsaacLabTuneTrainable,
at a minimum, the tune job should inherit from this class."""

def __init__(self, cfg):
def __init__(self, cfg: dict):
"""
Runner args include command line arguments passed to the task.
For example:
cfg["runner_args"]["headless_singleton"] = "--headless"
cfg["runner_args"]["enable_cameras_singleton"] = "--enable_cameras"
"""
assert "runner_args" in cfg, "No runner arguments specified."
"""
Task is the desired task to train on. For example:
cfg["runner_args"]["--task"] = tune.choice(["Isaac-Cartpole-RGB-TheiaTiny-v0"])
"""
assert "--task" in cfg["runner_args"], "No task specified."
assert "hydra_args" in cfg, "No hypeparameters specified."
"""
Hydra args define the hyperparameters varied within the sweep. For example:
cfg["hydra_args"]["agent.params.network.cnn.activation"] = tune.choice(["relu", "elu"])
"""
assert "hydra_args" in cfg, "No hyperparameters specified."
self.cfg = cfg


Expand Down
31 changes: 18 additions & 13 deletions source/standalone/workflows/ray/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,32 @@


def load_tensorboard_logs(directory: str) -> dict:
"""From a tensorboard directory, get the latest scalar values.
"""From a tensorboard directory, get the latest scalar values. If the logs can't be
found, check the summaries sublevel.

Args:
directory: The directory of the tensorboard logging.

Returns:
The latest available scalar values.
"""

# Initialize the event accumulator with a size guidance for only the latest entry
size_guidance = {"scalars": 1} # Load only the latest entry for scalars
event_acc = EventAccumulator(directory, size_guidance=size_guidance)
event_acc.Reload() # Load all data from the directory

# Extract the latest scalars logged
latest_scalars = {}
for tag in event_acc.Tags()["scalars"]:
events = event_acc.Scalars(tag)
if events: # Check if there is at least one entry
latest_event = events[-1] # Get the latest entry
latest_scalars[tag] = latest_event.value
return latest_scalars
def get_latest_scalars(path: str) -> dict:
event_acc = EventAccumulator(path, size_guidance={"scalars": 1})
try:
event_acc.Reload()
if event_acc.Tags()["scalars"]:
return {
tag: event_acc.Scalars(tag)[-1].value
for tag in event_acc.Tags()["scalars"]
if event_acc.Scalars(tag)
}
except (KeyError, OSError, RuntimeError):
return {}

scalars = get_latest_scalars(directory)
return scalars or get_latest_scalars(os.path.join(directory, "summaries"))


def get_invocation_command_from_cfg(
Expand Down
5 changes: 2 additions & 3 deletions source/standalone/workflows/ray/wrap_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,11 @@
import argparse

import ray
import util
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy

import source.standalone.workflows.ray.util as util

"""
This script dispatches sub-job(s) (either individual jobs or tuning aggregate jobs)
This script dispatches sub-job(s) (individual jobs, use :file:`tuner.py` for tuning jobs)
to worker(s) on GPU-enabled node(s) of a specific cluster as part of an resource-wrapped aggregate
job. If no desired compute resources for each sub-job are specified,
this script creates one worker per available node for each node with GPU(s) in the cluster.
Expand Down
2 changes: 2 additions & 0 deletions source/standalone/workflows/rsl_rl/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ def main(env_cfg: ManagerBasedRLEnvCfg | DirectRLEnvCfg | DirectMARLEnvCfg, agen
print(f"[INFO] Logging experiment in directory: {log_root_path}")
# specify directory for logging runs: {time-stamp}_{run_name}
log_dir = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
# This way, the Ray Tune workflow can extract experiment name.
print(f"Exact experiment name requested from command line: {log_dir}")
if agent_cfg.run_name:
log_dir += f"_{agent_cfg.run_name}"
log_dir = os.path.join(log_root_path, log_dir)
Expand Down
6 changes: 5 additions & 1 deletion source/standalone/workflows/sb3/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,11 @@ def main(env_cfg: ManagerBasedRLEnvCfg | DirectRLEnvCfg | DirectMARLEnvCfg, agen
env_cfg.sim.device = args_cli.device if args_cli.device is not None else env_cfg.sim.device

# directory for logging into
log_dir = os.path.join("logs", "sb3", args_cli.task, datetime.now().strftime("%Y-%m-%d_%H-%M-%S"))
run_info = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
log_root_path = os.path.abspath(os.path.join("logs", "sb3", args_cli.task))
print(f"[INFO] Logging experiment in directory: {log_root_path}")
print(f"Exact experiment name requested from command line: {run_info}")
log_dir = os.path.join(log_root_path, run_info)
# dump the configuration into log-directory
dump_yaml(os.path.join(log_dir, "params", "env.yaml"), env_cfg)
dump_yaml(os.path.join(log_dir, "params", "agent.yaml"), agent_cfg)
Expand Down
1 change: 1 addition & 0 deletions source/standalone/workflows/skrl/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ def main(env_cfg: ManagerBasedRLEnvCfg | DirectRLEnvCfg | DirectMARLEnvCfg, agen
print(f"[INFO] Logging experiment in directory: {log_root_path}")
# specify directory for logging runs: {time-stamp}_{run_name}
log_dir = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") + f"_{algorithm}_{args_cli.ml_framework}"
print(f"Exact experiment name requested from command line {log_dir}")
if agent_cfg["agent"]["experiment"]["experiment_name"]:
log_dir += f'_{agent_cfg["agent"]["experiment"]["experiment_name"]}'
# set directory into agent config
Expand Down
Loading