diff --git a/gym/vector/async_vector_env.py b/gym/vector/async_vector_env.py index 565c7513744..851d40e1ff0 100644 --- a/gym/vector/async_vector_env.py +++ b/gym/vector/async_vector_env.py @@ -634,6 +634,7 @@ def _worker(index, env_fn, pipe, parent_pipe, shared_memory, error_queue): assert shared_memory is None env = env_fn() parent_pipe.close() + try: while True: command, data = pipe.recv() @@ -698,6 +699,7 @@ def _worker_shared_memory(index, env_fn, pipe, parent_pipe, shared_memory, error env = env_fn() observation_space = env.observation_space parent_pipe.close() + try: while True: command, data = pipe.recv() @@ -716,7 +718,8 @@ def _worker_shared_memory(index, env_fn, pipe, parent_pipe, shared_memory, error pipe.send((None, True)) elif command == "step": observation, reward, done, info = env.step(data) - if done: + # NOTE: VectorEnvs take care of resetting the envs that are done. + if not isinstance(env.unwrapped, VectorEnv) and done: info["terminal_observation"] = observation observation = env.reset() write_to_shared_memory( diff --git a/gym/vector/sync_vector_env.py b/gym/vector/sync_vector_env.py index dc2e8c345da..e92398d09ff 100644 --- a/gym/vector/sync_vector_env.py +++ b/gym/vector/sync_vector_env.py @@ -69,8 +69,11 @@ def __init__(self, env_fns, observation_space=None, action_space=None, copy=True self.observations = create_empty_array( self.single_observation_space, n=self.num_envs, fn=np.zeros ) - self._rewards = np.zeros((self.num_envs,), dtype=np.float64) - self._dones = np.zeros((self.num_envs,), dtype=np.bool_) + shape = (self.num_envs,) + if isinstance(self.envs[0].unwrapped, VectorEnv): + shape += (self.envs[0].num_envs,) + self._rewards = np.zeros(shape, dtype=np.float64) + self._dones = np.zeros(shape, dtype=np.bool_) self._actions = None def seed(self, seed=None): @@ -134,7 +137,12 @@ def step_wait(self): observations, infos = [], [] for i, (env, action) in enumerate(zip(self.envs, self._actions)): observation, self._rewards[i], self._dones[i], info = env.step(action) - if self._dones[i]: + if isinstance(env, VectorEnv): + # Do nothing if the env is a VectorEnv, since it will automatically + # reset the envs that are done if needed in the 'step' method and + # return the initial observation instead of the final observation. + pass + elif self._dones[i]: info["terminal_observation"] = observation observation = env.reset() observations.append(observation) diff --git a/tests/envs/spec_list.py b/tests/envs/spec_list.py index a0f192b7c2a..13ff8e90d6e 100644 --- a/tests/envs/spec_list.py +++ b/tests/envs/spec_list.py @@ -1,6 +1,7 @@ import os from gym import envs, logger +from gym.envs.registration import EnvSpec SKIP_MUJOCO_WARNING_MESSAGE = ( "Cannot run mujoco test (either license key not found or mujoco not" @@ -16,10 +17,16 @@ skip_mujoco = True -def should_skip_env_spec_for_tests(spec): +def should_skip_env_spec_for_tests(spec: EnvSpec) -> bool: # We skip tests for envs that require dependencies or are otherwise # troublesome to run frequently ep = spec.entry_point + + if not isinstance(ep, str): + # Skip entry points that aren't strings. + # (Also avoids type checking errors below) + return False + # Skip mujoco tests for pull request CI if skip_mujoco and ep.startswith("gym.envs.mujoco"): return True diff --git a/tests/vector/test_vector_env.py b/tests/vector/test_vector_env.py index 82870d79c29..f08eef62281 100644 --- a/tests/vector/test_vector_env.py +++ b/tests/vector/test_vector_env.py @@ -1,10 +1,20 @@ +import warnings +from functools import partial + import numpy as np import pytest +from numpy.testing import assert_allclose +import gym +from gym.envs.registration import registry from gym.spaces import Tuple from gym.vector.async_vector_env import AsyncVectorEnv from gym.vector.sync_vector_env import SyncVectorEnv +from gym.vector.utils.numpy_utils import concatenate +from gym.vector.utils.spaces import iterate from gym.vector.vector_env import VectorEnv +from gym.wrappers import AutoResetWrapper +from tests.envs.spec_list import should_skip_env_spec_for_tests from tests.vector.utils import CustomSpace, make_env @@ -58,3 +68,221 @@ def test_custom_space_vector_env(): assert isinstance(env.single_action_space, CustomSpace) assert isinstance(env.action_space, Tuple) + + +# Only use 'local' envs for testing. +# NOTE: this won't work if the atari dependencies are installed, as we can't gym.make() them when +# inside the git repo folder. +local_env_ids = [ + spec.id for spec in registry.values() if not should_skip_env_spec_for_tests(spec) +] + + +def _make_seeded_env(env_id: str, seed: int) -> gym.Env: + # Ignore any deprecated environment warnings, since we will always need to test those. + with warnings.catch_warnings(): + warnings.simplefilter("ignore", category=UserWarning) + env = gym.make(env_id) + # env.seed(seed) + env.action_space.seed(seed) + env.observation_space.seed(seed) + return env + + +@pytest.mark.parametrize("env_id", local_env_ids) +@pytest.mark.parametrize("async_inner", [False, True]) +@pytest.mark.parametrize("async_outer", [False, True]) +@pytest.mark.parametrize("n_inner_envs", [1, 2, 3]) +@pytest.mark.parametrize("n_outer_envs", [1, 2, 3]) +def test_nesting_vector_envs( + env_id: str, + async_inner: bool, + async_outer: bool, + n_inner_envs: int, + n_outer_envs: int, +): + """Tests nesting of vector envs: Using a VectorEnv of VectorEnvs. + + This can be useful for example when running a large number of environments + on a machine with few cores, as worker process of an AsyncVectorEnv can themselves + run multiple environments sequentially using a SyncVectorEnv (a.k.a. chunking). + + This test uses creates `n_outer_envs` vectorized environments, each of which has + `n_inner_envs` inned environments. If `async_outer` is True, then the outermost + wrapper is an `AsyncVectorEnv` and a `SyncVectorEnv` when `async_outer` is False. + Same goes for the "inner" environments. + + Parameters + ---------- + - env_id : str + ID of a gym environment to use as the base environment. + - async_inner : bool + Whether the inner VectorEnv will be async or not. + - async_outer : bool + Whether the outer VectorEnv will be async or not. + - n_inner_envs : int + Number of inner environments. + - n_outer_envs : int + Number of outer environments. + """ + + # NOTE: When nesting AsyncVectorEnvs, only the "innermost" envs can have + # `daemon=True`, otherwise the "daemonic processes are not allowed to have + # children" AssertionError is raised in `multiprocessing.process`. + inner_vectorenv_type = AsyncVectorEnv if async_inner else SyncVectorEnv + outer_vectorenv_type = ( + partial(AsyncVectorEnv, daemon=False) if async_outer else SyncVectorEnv + ) + + base_seed = 123 + + # Create the functions for the envs at each index (i, j) + seeds = [ + [base_seed + i * n_inner_envs + j for j in range(n_inner_envs)] + for i in range(n_outer_envs) + ] + + env_fns_grid = [ + [ + partial(_make_seeded_env, env_id, seed=seeds[i][j]) + for j in range(n_inner_envs) + ] + for i in range(n_outer_envs) + ] + + outer_env_fns = [ + partial( + inner_vectorenv_type, + env_fns=env_fns_grid[i], + ) + for i in range(n_outer_envs) + ] + + env = outer_vectorenv_type(env_fns=outer_env_fns) + + # Note the initial obs, action, next_obs, reward, done, info in all these envs, and then + # compare with those of the vectorenv. + + base_obs: list[list] = np.zeros([n_outer_envs, n_inner_envs]).tolist() + base_act: list[list] = np.zeros([n_outer_envs, n_inner_envs]).tolist() + base_next_obs: list[list] = np.zeros([n_outer_envs, n_inner_envs]).tolist() + base_reward = np.zeros(shape=(n_outer_envs, n_inner_envs), dtype=float) + base_done = np.zeros(shape=(n_outer_envs, n_inner_envs), dtype=bool) + base_info: list[list[dict]] = np.zeros([n_outer_envs, n_inner_envs]).tolist() + + # Create an env temporarily to get the observation and action spaces. + with env_fns_grid[0][0]() as temp_env: + base_observation_space = temp_env.observation_space + base_action_space = temp_env.action_space + + # Go through each index (i, j) and create the env with the seed at that index, getting the + # initial state, action, next_obs, reward, done, info, etc. + # This will then be compared with the states produced by the VectorEnv equivalent. + + for i in range(n_outer_envs): + for j in range(n_inner_envs): + # Create a properly seeded environment. Then, reset, and step once. + with env_fns_grid[i][j]() as temp_env: + + # Add the AutoResetWrapper to the individual environments to replicate what will + # happen in the VectorEnv. (See the note below). + temp_env = AutoResetWrapper(temp_env) + + assert temp_env.observation_space == base_observation_space + assert temp_env.action_space == base_action_space + + # NOTE: This will change a bit once the AutoResetWrapper is used in the VectorEnvs. + base_obs[i][j], base_info[i][j] = temp_env.reset( + seed=seeds[i][j], return_info=True + ) + base_act[i][j] = base_action_space.sample() + ( + base_next_obs[i][j], + base_reward[i][j], + base_done[i][j], + base_info[i][j], + ) = temp_env.step(base_act[i][j]) + + obs = env.reset(seed=seeds) + + # NOTE: creating these values so they aren't possibly unbound below and type hinters can relax. + i = -1 + j = -1 + + for i, obs_i in enumerate(iterate(env.observation_space, obs)): + for j, obs_ij in enumerate(iterate(env.single_observation_space, obs_i)): + assert obs_ij in base_observation_space + # Assert that each observation is what we'd expect (following the single env.) + assert_allclose(obs_ij, base_obs[i][j]) + + assert j == n_inner_envs - 1 + assert i == n_outer_envs - 1 + + # NOTE: Sampling an action using env.action_space.sample() would give a different value than + # if we sampled actions from each env individually and batched them. + # In order to check that everything is working correctly, we'll instead create the action by + # concatenating the individual actions, and pass it to the vectorenv, to check if that will + # recreate the same result for all individual envs. + # _ = env.action_space.sample() + action = concatenate( + env.single_action_space, + [ + concatenate(base_action_space, base_act[i], out=None) + for i in range(n_outer_envs) + ], + out=None, + ) + + for i, action_i in enumerate(iterate(env.action_space, action)): + for j, action_ij in enumerate(iterate(env.single_action_space, action_i)): + assert action_ij in base_action_space + # Assert that each action is what we'd expect (following the single env.) + assert_allclose(action_ij, base_act[i][j]) + assert j == n_inner_envs - 1 + assert i == n_outer_envs - 1 + + # Perform a single step: + + next_obs, reward, done, info = env.step(action) + + for i, next_obs_i in enumerate(iterate(env.observation_space, next_obs)): + for j, next_obs_ij in enumerate( + iterate(env.single_observation_space, next_obs_i) + ): + assert next_obs_ij in base_observation_space + # Assert that each next observation is what we'd expect (following the single env.) + assert_allclose(next_obs_ij, base_next_obs[i][j]) + + for i, rew_i in enumerate(reward): + for j, rew_ij in enumerate(rew_i): + # Assert that each reward is what we'd expect (following the single env.) + assert_allclose(rew_ij, base_reward[i][j]) + assert j == n_inner_envs - 1 + assert i == n_outer_envs - 1 + + for i, done_i in enumerate(done): + for j, done_ij in enumerate(done_i): + assert done_ij == base_done[i][j] + assert j == n_inner_envs - 1 + assert i == n_outer_envs - 1 + + for i, info_i in enumerate(info): + for j, info_ij in enumerate(info_i): + # NOTE: Since the VectorEnvs don't apply an AutoResetWrapper to the individual envs, + # the autoreset logic is in the 'worker' code, and this code doesn't add the + # 'terminal_info' entry in the 'info' dictionary. + # NOTE: This test-case is forward-compatible in case the VectorEnvs do end up adding + # the 'terminal_info' entry in the 'info' dictionary. + expected_info = base_info[i][j].copy() + if ( + info_ij != base_info[i][j] + and ("terminal_info" in expected_info) + and ("terminal_info" not in info_ij) + ): + # Remove the 'terminal_info' key from the expected info dict and compare as before. + expected_info.pop("terminal_info") + assert info_ij == expected_info + assert j == n_inner_envs - 1 + assert i == n_outer_envs - 1 + + env.close()