Skip to content

Commit

Permalink
Modifications to allow any type of observation to be used with GPI-LS
Browse files Browse the repository at this point in the history
Buffers have also been modified

Creation of a wrapper for the observations as well as a Generic type to encapsulate them
  • Loading branch information
AdrienBolling committed Dec 7, 2023
1 parent f8fc8cb commit afe4fa9
Show file tree
Hide file tree
Showing 8 changed files with 309 additions and 122 deletions.
1 change: 1 addition & 0 deletions examples/gpi_pd_hopper.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def make_env(record_episode_statistics: bool = False):
project_name="MORL-Baselines",
experiment_name="GPI-PD",
log=True,
device="mps"
)

agent.train(
Expand Down
2 changes: 1 addition & 1 deletion examples/gpi_pd_minecart.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
# from gymnasium.wrappers.record_video import RecordVideo


def main(algo: str, gpi_pd: bool, g: int, timesteps_per_iter: int = 10000, seed: int = 0):
def main(algo: str = "gpi-ls", gpi_pd: bool = False, g: int = 20, timesteps_per_iter: int = 10000, seed: int = 0):
def make_env():
env = mo_gym.make("minecart-v0")
env = mo_gym.MORecordEpisodeStatistics(env, gamma=0.98)
Expand Down
86 changes: 77 additions & 9 deletions morl_baselines/common/buffer.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,36 @@
"""Replay buffer for multi-objective reinforcement learning."""
import copy
import os

import numpy as np
import torch as th

from morl_baselines.common.observation import Observation


class ReplayBuffer:
"""Multi-objective replay buffer for multi-objective reinforcement learning."""

def __init__(
self,
action_dim,
action_shape,
rew_dim=1,
max_size=100000,
action_dtype=np.float32,
):
"""Initialize the replay buffer.
Args:
action_dim: Dimension of the actions
action_shape: Dimension of the actions
rew_dim: Dimension of the rewards
max_size: Maximum size of the buffer
action_dtype: Data type of the actions
"""
self.max_size = max_size
self.ptr, self.size = 0, 0
self.obs = np.zeros((max_size,), dtype=object)
self.obs = np.zeros((max_size,), dtype=Observation)
self.next_obs = np.zeros((max_size,), dtype=object)
self.actions = np.zeros((max_size, action_dim), dtype=action_dtype)
self.actions = np.zeros((max_size,) + action_shape, dtype=action_dtype)
self.rewards = np.zeros((max_size, rew_dim), dtype=np.float32)
self.dones = np.zeros((max_size, 1), dtype=np.float32)

Expand All @@ -39,8 +44,8 @@ def add(self, obs, action, reward, next_obs, done):
next_obs: Next observation
done: Done
"""
self.obs[self.ptr] = np.array(obs).copy()
self.next_obs[self.ptr] = np.array(next_obs).copy()
self.obs[self.ptr] = copy.deepcopy(obs) # We could try to first call a .copy() method of the observation if implemented here, but it may be extra
self.next_obs[self.ptr] = copy.deepcopy(next_obs)
self.actions[self.ptr] = np.array(action).copy()
self.rewards[self.ptr] = np.array(reward).copy()
self.dones[self.ptr] = np.array(done).copy()
Expand Down Expand Up @@ -72,7 +77,13 @@ def sample(self, batch_size, replace=True, use_cer=False, to_tensor=False, devic
self.dones[inds],
)
if to_tensor:
return tuple(map(lambda x: th.tensor(x, device=device), experience_tuples))
return (
np.array([observation.to_tensor(device=device) for observation in experience_tuples[0]]),
th.tensor(experience_tuples[1], device=device),
th.tensor(experience_tuples[2], device=device),
np.array([observation.to_tensor(device=device) for observation in experience_tuples[3]]),
th.tensor(experience_tuples[4], device=device),
)
else:
return experience_tuples

Expand All @@ -90,7 +101,7 @@ def sample_obs(self, batch_size, replace=True, to_tensor=False, device=None):
"""
inds = np.random.choice(self.size, batch_size, replace=replace)
if to_tensor:
return th.tensor(self.obs[inds], device=device)
return np.array([observation.to_tensor(device=device) for observation in self.obs[inds]])
else:
return self.obs[inds]

Expand Down Expand Up @@ -118,10 +129,67 @@ def get_all_data(self, max_samples=None, to_tensor=False, device=None):
)

if to_tensor:
return tuple(map(lambda x: th.tensor(x, device=device), samples))
return (
np.array([observation.to_tensor(device=device) for observation in samples[0]]),
th.tensor(samples[1], device=device),
th.tensor(samples[2], device=device),
np.array([observation.to_tensor(device=device) for observation in samples[3]]),
th.tensor(samples[4], device=device),
)
else:
return samples

def save(self, path):
"""Save the buffer to a file.
Args:
path: Path to the file
"""

if not os.path.isdir(path):
os.makedirs(path)

np.savez_compressed(
path + "buffer_without_obs.npz",
actions=self.actions,
rewards=self.rewards,
dones=self.dones,
ptr=self.ptr,
size=self.size,
)
# Save the observations
# We save the observations separately because they can be large, as we don't know their type (maybe handle the case of np.ndarray separately?)
if not os.path.isdir(path + "obs"):
os.makedirs(path + "obs")
for i, obs in enumerate(self.obs):
obs.save(path + "obs/" + str(i))
if not os.path.isdir(path + "next_obs"):
os.makedirs(path + "next_obs")
for i, obs in enumerate(self.next_obs):
obs.save(path + "next_obs/" + str(i))

def load(self, path):
"""Load the buffer from a file.
Args:
path: Path to the file
"""

data = np.load(path, allow_pickle=True)
self.actions = data["actions"]
self.rewards = data["rewards"]
self.dones = data["dones"]
self.ptr = data["ptr"]
self.size = data["size"]

# Load the observations
self.obs = np.zeros((self.max_size,), dtype=Observation)
self.next_obs = np.zeros((self.max_size,), dtype=Observation)

for i in range(self.size):
self.obs[i] = Observation().load(path + "obs/" + str(i))
self.next_obs[i] = Observation().load(path + "next_obs/" + str(i))

def __len__(self):
"""Get the size of the buffer."""
return self.size
2 changes: 1 addition & 1 deletion morl_baselines/common/morl_algorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ def extract_env_info(self, env: Optional[gym.Env]) -> None:
self.action_dim = self.env.action_space.n
else:
self.action_shape = self.env.action_space.shape
self.action_dim = self.env.action_space.shape[0]
self.action_dim = self.env.action_space.n
self.reward_dim = self.env.reward_space.shape[0]

@abstractmethod
Expand Down
2 changes: 1 addition & 1 deletion morl_baselines/common/networks.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def forward(self, observations: th.Tensor) -> th.Tensor:
return self.linear(self.cnn(observations / 255.0))


def huber(x, min_priority=0.01):
def huber(x, min_priority=0.01) -> th.Tensor:
"""Huber loss function.
Args:
Expand Down
100 changes: 100 additions & 0 deletions morl_baselines/common/observation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import pickle

import gymnasium
import torch as th
from typing import Type, Optional, Any


class Observation:

"""
This class is an interface used to encapsulate any kind of observation the environment can return.
Implementing it ensures that all the necessary methods are implemented.
"""

def __init__(self, item: Optional[Any] = None):
"""
Initialize the observation.
Args:
item: The item to encapsulate. If None, the observation is initialized as None.
"""

self.item = item
self.item_dtype = type(item)

def __repr__(self):
return self.item.__repr__()

def __str__(self):
return self.item.__str__()

def __eq__(self, other):
return self.item == other.item

def __ne__(self, other):
return self.item != other.item

def save(self, path):
"""
Save the observation.
Args:
path: The path to save the observation to.
"""
with open(path, "wb") as f:
pickle.dump(self.item, f)

def load(self, path):
"""
Load the observation.
Args:
path: The path to load the observation from.
"""
with open(path, "rb") as f:
self.item = pickle.load(f)
self.item_dtype = type(self.item)

def to_tensor(self, device=None):
"""
Convert the observation to a PyTorch tensor.
Args:
device: The device to use.
Returns:
The observation as a PyTorch tensor.
"""
if device is None:
return th.tensor(self.item, device=th.device("cpu"))
else:
return th.tensor(self.item, device=device)


class ConversionWrapper(gymnasium.ObservationWrapper):

"""
This class is used to wrap the observations returned by the environment.
It is used to ensure that the observations are of the Observation type.
"""

def __init__(self, env: gymnasium.Env, observation_class: Type[Observation] = Observation, observation_space: gymnasium.Space = None):
"""
Initialize the wrapper.
Args:
env: The environment to wrap.
observation_class: The class to use for the observations. By default, it is Observation, feel free to implement your own inheriting from it.
observation_space: The observation space after conversion (if different from the original).
"""
super().__init__(env)
self.observation_class = observation_class
if observation_space is not None:
self.observation_space = observation_space
else:
self.observation_space = env.observation_space

def observation(self, observation):
"""
Wrap the observation.
Args:
observation: The observation to wrap.
Returns:
The wrapped observation.
"""
return self.observation_class(observation)
Loading

0 comments on commit afe4fa9

Please sign in to comment.