Tianshou/tianshou/env/venv_wrappers.py
bordeauxred 4f65b131aa
Feat/refactor collector (#1063)
Closes: #1058 

### Api Extensions
- Batch received two new methods: `to_dict` and `to_list_of_dicts`.
#1063
- `Collector`s can now be closed, and their reset is more granular.
#1063
- Trainers can control whether collectors should be reset prior to
training. #1063
- Convenience constructor for `CollectStats` called
`with_autogenerated_stats`. #1063

### Internal Improvements
- `Collector`s rely less on state, the few stateful things are stored
explicitly instead of through a `.data` attribute. #1063
- Introduced a first iteration of a naming convention for vars in
`Collector`s. #1063
- Generally improved readability of Collector code and associated tests
(still quite some way to go). #1063
- Improved typing for `exploration_noise` and within Collector. #1063

### Breaking Changes

- Removed `.data` attribute from `Collector` and its child classes.
#1063
- Collectors no longer reset the environment on initialization. Instead,
the user might have to call `reset`
expicitly or pass `reset_before_collect=True` . #1063
- VectorEnvs now return an array of info-dicts on reset instead of a
list. #1063
- Fixed `iter(Batch(...)` which now behaves the same way as
`Batch(...).__iter__()`. Can be considered a bugfix. #1063

---------

Co-authored-by: Michael Panchenko <m.panchenko@appliedai.de>
2024-03-28 18:02:31 +01:00

121 lines
3.8 KiB
Python

from typing import Any
import numpy as np
import torch
from tianshou.env.utils import gym_new_venv_step_type
from tianshou.env.venvs import GYM_RESERVED_KEYS, BaseVectorEnv
from tianshou.utils import RunningMeanStd
class VectorEnvWrapper(BaseVectorEnv):
"""Base class for vectorized environments wrapper."""
# Note: No super call because this is a wrapper with overridden __getattribute__
# It's not a "true" subclass of BaseVectorEnv but it does extend its interface, so
# it can be used as a drop-in replacement
# noinspection PyMissingConstructor
def __init__(self, venv: BaseVectorEnv) -> None:
self.venv = venv
self.is_async = venv.is_async
def __len__(self) -> int:
return len(self.venv)
def __getattribute__(self, key: str) -> Any:
if key in GYM_RESERVED_KEYS: # reserved keys in gym.Env
return getattr(self.venv, key)
return super().__getattribute__(key)
def get_env_attr(
self,
key: str,
id: int | list[int] | np.ndarray | None = None,
) -> list[Any]:
return self.venv.get_env_attr(key, id)
def set_env_attr(
self,
key: str,
value: Any,
id: int | list[int] | np.ndarray | None = None,
) -> None:
return self.venv.set_env_attr(key, value, id)
def reset(
self,
env_id: int | list[int] | np.ndarray | None = None,
**kwargs: Any,
) -> tuple[np.ndarray, np.ndarray]:
return self.venv.reset(env_id, **kwargs)
def step(
self,
action: np.ndarray | torch.Tensor | None,
id: int | list[int] | np.ndarray | None = None,
) -> gym_new_venv_step_type:
return self.venv.step(action, id)
def seed(self, seed: int | list[int] | None = None) -> list[list[int] | None]:
return self.venv.seed(seed)
def render(self, **kwargs: Any) -> list[Any]:
return self.venv.render(**kwargs)
def close(self) -> None:
self.venv.close()
class VectorEnvNormObs(VectorEnvWrapper):
"""An observation normalization wrapper for vectorized environments.
:param update_obs_rms: whether to update obs_rms. Default to True.
"""
def __init__(self, venv: BaseVectorEnv, update_obs_rms: bool = True) -> None:
super().__init__(venv)
# initialize observation running mean/std
self.update_obs_rms = update_obs_rms
self.obs_rms = RunningMeanStd()
def reset(
self,
env_id: int | list[int] | np.ndarray | None = None,
**kwargs: Any,
) -> tuple[np.ndarray, np.ndarray]:
obs, info = self.venv.reset(env_id, **kwargs)
if isinstance(obs, tuple): # type: ignore
raise TypeError(
"Tuple observation space is not supported. ",
"Please change it to array or dict space",
)
if self.obs_rms and self.update_obs_rms:
self.obs_rms.update(obs)
obs = self._norm_obs(obs)
return obs, info
def step(
self,
action: np.ndarray | torch.Tensor | None,
id: int | list[int] | np.ndarray | None = None,
) -> gym_new_venv_step_type:
step_results = self.venv.step(action, id)
if self.obs_rms and self.update_obs_rms:
self.obs_rms.update(step_results[0])
return (self._norm_obs(step_results[0]), *step_results[1:])
def _norm_obs(self, obs: np.ndarray) -> np.ndarray:
if self.obs_rms:
return self.obs_rms.norm(obs) # type: ignore
return obs
def set_obs_rms(self, obs_rms: RunningMeanStd) -> None:
"""Set with given observation running mean/std."""
self.obs_rms = obs_rms
def get_obs_rms(self) -> RunningMeanStd:
"""Return observation running mean/std."""
return self.obs_rms