Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix: Allow Nesting of Sync/Async VectorEnvs #2104

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion gym/vector/async_vector_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,16 @@ def _worker(index, env_fn, pipe, parent_pipe, shared_memory, error_queue):
assert shared_memory is None
env = env_fn()
parent_pipe.close()

def step_fn(actions):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we defining this function? It doesn't seem to be called anywhere. If it's actually used, it'd need a type hint and some comments/docstring

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in fcebd76

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops. Actually fixed in 15208e3

observation, reward, done, info = env.step(actions)
# Do nothing if the env is a VectorEnv, since it will automatically
# reset the envs that are done if needed in the 'step' method and return
# the initial observation instead of the final observation.
if not isinstance(env.unwrapped, VectorEnv) and done:
observation = env.reset()
return observation, reward, done, info

try:
while True:
command, data = pipe.recv()
Expand Down Expand Up @@ -699,6 +709,16 @@ def _worker_shared_memory(index, env_fn, pipe, parent_pipe, shared_memory, error
env = env_fn()
observation_space = env.observation_space
parent_pipe.close()

def step_fn(actions):
observation, reward, done, info = env.step(actions)
# Do nothing if the env is a VectorEnv, since it will automatically
# reset the envs that are done if needed in the 'step' method and return
# the initial observation instead of the final observation.
if not isinstance(env.unwrapped, VectorEnv) and done:
observation = env.reset()
return observation, reward, done, info

try:
while True:
command, data = pipe.recv()
Expand All @@ -717,7 +737,10 @@ def _worker_shared_memory(index, env_fn, pipe, parent_pipe, shared_memory, error
pipe.send((None, True))
elif command == "step":
observation, reward, done, info = env.step(data)
if done:
if isinstance(env, VectorEnv):
# VectorEnvs take care of resetting the envs that are done.
pass
elif done:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it maybe be better to just do a single condition if done and isinstance(env, Env) or if done and not isinstance(env, VectorEnv)? (not sure which would be clearer, but the pass gave me a bit of a pause since it's rarely present in released code, and it felt more like something unfinished)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 055087b

info["terminal_observation"] = observation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we actually need this with the AutoReset wrapper introduced recently? And wouldn't it cause a redundant double reset in some cases?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this is still necessary, since the if done check doesn't work with VectorEnvs. This check is there so we don't reset the env when the episode is done (as is currently still done in the case of a single env here).
Not sure if/how the AutoReset wrapper relates to this.
Lmk if that wasn't clear.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 055087b

observation = env.reset()
write_to_shared_memory(
Expand Down
16 changes: 12 additions & 4 deletions gym/vector/sync_vector_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,11 @@ def __init__(self, env_fns, observation_space=None, action_space=None, copy=True
self.observations = create_empty_array(
self.single_observation_space, n=self.num_envs, fn=np.zeros
)
self._rewards = np.zeros((self.num_envs,), dtype=np.float64)
self._dones = np.zeros((self.num_envs,), dtype=np.bool_)
shape = (self.num_envs,)
if isinstance(self.envs[0].unwrapped, VectorEnv):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it ever possible that this condition would be different for self.envs[0] and self.envs[1]? Also, do we actually need to unwrap it? (I'm not sure off the top of my head how wrappers interact with VectorEnvs)

Copy link
Author

@lebrice lebrice May 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont see how this would only be true for some envs.. I mean, it would mean that they are passing a mix of Envs and VectorEnvs to a VectorEnv, which I can't imagine being useful..

Yeah unwrapping it is necessary, since most wrappers can work the same way for both envs and VectorEns.

shape += (self.envs[0].num_envs,)
self._rewards = np.zeros(shape, dtype=np.float64)
self._dones = np.zeros(shape, dtype=np.bool_)
self._actions = None

def seed(self, seed=None):
Expand Down Expand Up @@ -133,10 +136,15 @@ def step_async(self, actions):
self._actions = iterate(self.action_space, actions)

def step_wait(self):
observations, infos = [], []
observations, rewards, dones, infos = [], [], [], []
for i, (env, action) in enumerate(zip(self.envs, self._actions)):
observation, self._rewards[i], self._dones[i], info = env.step(action)
if self._dones[i]:
if isinstance(env, VectorEnv):
# Do nothing if the env is a VectorEnv, since it will automatically
# reset the envs that are done if needed in the 'step' method and
# return the initial observation instead of the final observation.
pass
elif self._dones[i]:
info["terminal_observation"] = observation
observation = env.reset()
observations.append(observation)
Expand Down
117 changes: 116 additions & 1 deletion tests/vector/test_vector_env.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
from functools import partial
from typing import Callable, Type

import numpy as np
import pytest

from gym.spaces import Tuple
from gym import Space, spaces
Copy link
Contributor

@pseudo-rnd-thoughts pseudo-rnd-thoughts Apr 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you remove spaces as I dont think you should need to import the module and add have the following line

Copy link
Author

@lebrice lebrice Apr 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from gym.spaces import Box, Tuple
from gym.vector.async_vector_env import AsyncVectorEnv
from gym.vector.sync_vector_env import SyncVectorEnv
from gym.vector.vector_env import VectorEnv
Expand Down Expand Up @@ -58,3 +62,114 @@ def test_custom_space_vector_env():

assert isinstance(env.single_action_space, CustomSpace)
assert isinstance(env.action_space, Tuple)


@pytest.mark.parametrize("base_env", ["Pendulum-v1", "CartPole-v1"])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why parametrize only over the two envs instead of all of them?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean over all gym envs?
I mean, sure, I'm all for it, but the current vectorenv tests (i.e. the only test above) is only using CartPole-v1

Copy link
Author

@lebrice lebrice May 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Parametrized the test with all classic_control + toy_text envs in 9c0e308

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @RedTachyon is this good? (testing with all envs where should_skipp_env_spec_for_test = False)?
As-is, the parametrization of this test generates 579 tests, which take about 1:20 to run on my end (with pytest xdist and 4 parallel workers). I think this will probably take something like 5 minutes to run on GitHub, depending on the machine's hardware.

(here's a link for that function btw: https://github.com/lebrice/gym/blob/e913bc81b83b43ae8ca9b3a02c981b74d31017ea/tests/envs/spec_list.py#L20)

@pytest.mark.parametrize("async_inner", [False, True])
@pytest.mark.parametrize("async_outer", [False, True])
@pytest.mark.parametrize("n_inner_envs", [1, 4, 7])
@pytest.mark.parametrize("n_outer_envs", [1, 4, 7])
def test_nesting_vector_envs(
base_env: str,
async_inner: bool,
async_outer: bool,
n_inner_envs: int,
n_outer_envs: int,
):
"""Tests nesting of vector envs: Using a VectorEnv of VectorEnvs.

This can be useful for example when running a large number of environments
on a machine with few cores, as worker process of an AsyncVectorEnv can themselves
run multiple environments sequentially using a SyncVectorEnv (a.k.a. chunking).

This test uses creates `n_outer_envs` vectorized environments, each of which has
`n_inner_envs` inned environments. If `async_outer` is True, then the outermost
wrapper is an `AsyncVectorEnv` and a `SyncVectorEnv` when `async_outer` is False.
Same goes for the "inner" environments.

Parameters
----------
- base_env : str
The base environment id.
- async_inner : bool
Whether the inner VectorEnv will be async or not.
- async_outer : bool
Whether the outer VectorEnv will be async or not.
- n_inner_envs : int
Number of inner environments.
- n_outer_envs : int
Number of outer environments.
"""

inner_vectorenv_type: Type[VectorEnv] = (
AsyncVectorEnv if async_inner else SyncVectorEnv
)
outer_vectorenv_type: Type[VectorEnv] = (
partial(AsyncVectorEnv, daemon=False) if async_outer else SyncVectorEnv
)
# NOTE: When nesting AsyncVectorEnvs, only the "innermost" envs can have
# `daemon=True`, otherwise the "daemonic processes are not allowed to have
# children" AssertionError is raised in `multiprocessing.process`.

# Create the VectorEnv of VectorEnvs
env = outer_vectorenv_type(
[
partial(
inner_vectorenv_type,
env_fns=[
make_env(base_env, seed=n_inner_envs * i + j)
for j in range(n_inner_envs)
],
)
for i in range(n_outer_envs)
]
)

# Create a single test environment.
with make_env(base_env, 0)() as temp_single_env:
single_observation_space = temp_single_env.observation_space
single_action_space = temp_single_env.action_space

assert isinstance(single_observation_space, Box)
assert isinstance(env.observation_space, Box)
assert env.observation_space.shape == (
n_outer_envs,
n_inner_envs,
*single_observation_space.shape,
)
assert env.observation_space.dtype == single_observation_space.dtype

from gym.vector.utils.spaces import iterate

def batch_size(space: Space) -> int:
return len(list(iterate(space, space.sample())))

assert batch_size(env.action_space) == n_outer_envs

with env:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we using environment as a context manager here?

Copy link
Author

@lebrice lebrice Apr 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it gets closed and the resources are freed.
This isn't absolutely necessary, since the env going out of scope should have the same effect.
I like it since it's more explicit, and also made it easier for me to spot if there were some errors when closing the nested AsyncVectorEnvs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, that makes sense. My issue is that it is a code style that we do not follow anywhere else in the tests so could you just have a env.close() at the end of the script to make it simpler for anyone looking at the tests

Copy link
Author

@lebrice lebrice Apr 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, no problem.
Fixed in f47596f .

However, just for the record, I disagree:
I think having to remember to close the env at the end of the test is ugly, error-prone, and also unnecessary, since in all cases the env is closed when it gets out of scope. In my opinion, the only tests where an env should be closed explicitly are 1) the tests related to closing the envs, and 2) tests that require creating a temporary environment just to check spaces (as in the case here).

For example, consider this:
https://github.com/lebrice/gym/blob/49ee20904ac3a4a1dba3020d1ebd11076848f376/tests/vector/test_vector_env.py#L52-L54

observations = env.reset()
assert observations in env.observation_space

actions = env.action_space.sample()
assert actions in env.action_space

observations, rewards, dones, _ = env.step(actions)
assert observations in env.observation_space

assert isinstance(env.observation_space, Box)
assert isinstance(observations, np.ndarray)
assert observations.dtype == env.observation_space.dtype
assert (
observations.shape
== (n_outer_envs, n_inner_envs) + single_observation_space.shape
)

assert isinstance(rewards, np.ndarray)
assert isinstance(rewards[0], np.ndarray)
assert rewards.ndim == 2
assert rewards.shape == (n_outer_envs, n_inner_envs)

assert isinstance(dones, np.ndarray)
assert dones.dtype == np.bool_
assert dones.ndim == 2
assert dones.shape == (n_outer_envs, n_inner_envs)