Closes: #1058 ### Api Extensions - Batch received two new methods: `to_dict` and `to_list_of_dicts`. #1063 - `Collector`s can now be closed, and their reset is more granular. #1063 - Trainers can control whether collectors should be reset prior to training. #1063 - Convenience constructor for `CollectStats` called `with_autogenerated_stats`. #1063 ### Internal Improvements - `Collector`s rely less on state, the few stateful things are stored explicitly instead of through a `.data` attribute. #1063 - Introduced a first iteration of a naming convention for vars in `Collector`s. #1063 - Generally improved readability of Collector code and associated tests (still quite some way to go). #1063 - Improved typing for `exploration_noise` and within Collector. #1063 ### Breaking Changes - Removed `.data` attribute from `Collector` and its child classes. #1063 - Collectors no longer reset the environment on initialization. Instead, the user might have to call `reset` expicitly or pass `reset_before_collect=True` . #1063 - VectorEnvs now return an array of info-dicts on reset instead of a list. #1063 - Fixed `iter(Batch(...)` which now behaves the same way as `Batch(...).__iter__()`. Can be considered a bugfix. #1063 --------- Co-authored-by: Michael Panchenko <m.panchenko@appliedai.de>
121 lines
3.8 KiB
Python
121 lines
3.8 KiB
Python
from typing import Any
|
|
|
|
import numpy as np
|
|
import torch
|
|
|
|
from tianshou.env.utils import gym_new_venv_step_type
|
|
from tianshou.env.venvs import GYM_RESERVED_KEYS, BaseVectorEnv
|
|
from tianshou.utils import RunningMeanStd
|
|
|
|
|
|
class VectorEnvWrapper(BaseVectorEnv):
|
|
"""Base class for vectorized environments wrapper."""
|
|
|
|
# Note: No super call because this is a wrapper with overridden __getattribute__
|
|
# It's not a "true" subclass of BaseVectorEnv but it does extend its interface, so
|
|
# it can be used as a drop-in replacement
|
|
# noinspection PyMissingConstructor
|
|
def __init__(self, venv: BaseVectorEnv) -> None:
|
|
self.venv = venv
|
|
self.is_async = venv.is_async
|
|
|
|
def __len__(self) -> int:
|
|
return len(self.venv)
|
|
|
|
def __getattribute__(self, key: str) -> Any:
|
|
if key in GYM_RESERVED_KEYS: # reserved keys in gym.Env
|
|
return getattr(self.venv, key)
|
|
return super().__getattribute__(key)
|
|
|
|
def get_env_attr(
|
|
self,
|
|
key: str,
|
|
id: int | list[int] | np.ndarray | None = None,
|
|
) -> list[Any]:
|
|
return self.venv.get_env_attr(key, id)
|
|
|
|
def set_env_attr(
|
|
self,
|
|
key: str,
|
|
value: Any,
|
|
id: int | list[int] | np.ndarray | None = None,
|
|
) -> None:
|
|
return self.venv.set_env_attr(key, value, id)
|
|
|
|
def reset(
|
|
self,
|
|
env_id: int | list[int] | np.ndarray | None = None,
|
|
**kwargs: Any,
|
|
) -> tuple[np.ndarray, np.ndarray]:
|
|
return self.venv.reset(env_id, **kwargs)
|
|
|
|
def step(
|
|
self,
|
|
action: np.ndarray | torch.Tensor | None,
|
|
id: int | list[int] | np.ndarray | None = None,
|
|
) -> gym_new_venv_step_type:
|
|
return self.venv.step(action, id)
|
|
|
|
def seed(self, seed: int | list[int] | None = None) -> list[list[int] | None]:
|
|
return self.venv.seed(seed)
|
|
|
|
def render(self, **kwargs: Any) -> list[Any]:
|
|
return self.venv.render(**kwargs)
|
|
|
|
def close(self) -> None:
|
|
self.venv.close()
|
|
|
|
|
|
class VectorEnvNormObs(VectorEnvWrapper):
|
|
"""An observation normalization wrapper for vectorized environments.
|
|
|
|
:param update_obs_rms: whether to update obs_rms. Default to True.
|
|
"""
|
|
|
|
def __init__(self, venv: BaseVectorEnv, update_obs_rms: bool = True) -> None:
|
|
super().__init__(venv)
|
|
# initialize observation running mean/std
|
|
self.update_obs_rms = update_obs_rms
|
|
self.obs_rms = RunningMeanStd()
|
|
|
|
def reset(
|
|
self,
|
|
env_id: int | list[int] | np.ndarray | None = None,
|
|
**kwargs: Any,
|
|
) -> tuple[np.ndarray, np.ndarray]:
|
|
obs, info = self.venv.reset(env_id, **kwargs)
|
|
|
|
if isinstance(obs, tuple): # type: ignore
|
|
raise TypeError(
|
|
"Tuple observation space is not supported. ",
|
|
"Please change it to array or dict space",
|
|
)
|
|
|
|
if self.obs_rms and self.update_obs_rms:
|
|
self.obs_rms.update(obs)
|
|
obs = self._norm_obs(obs)
|
|
return obs, info
|
|
|
|
def step(
|
|
self,
|
|
action: np.ndarray | torch.Tensor | None,
|
|
id: int | list[int] | np.ndarray | None = None,
|
|
) -> gym_new_venv_step_type:
|
|
step_results = self.venv.step(action, id)
|
|
if self.obs_rms and self.update_obs_rms:
|
|
self.obs_rms.update(step_results[0])
|
|
return (self._norm_obs(step_results[0]), *step_results[1:])
|
|
|
|
def _norm_obs(self, obs: np.ndarray) -> np.ndarray:
|
|
if self.obs_rms:
|
|
return self.obs_rms.norm(obs) # type: ignore
|
|
return obs
|
|
|
|
def set_obs_rms(self, obs_rms: RunningMeanStd) -> None:
|
|
"""Set with given observation running mean/std."""
|
|
self.obs_rms = obs_rms
|
|
|
|
def get_obs_rms(self) -> RunningMeanStd:
|
|
"""Return observation running mean/std."""
|
|
return self.obs_rms
|