Tianshou/tianshou/env/vecenv.py

245 lines
7.3 KiB
Python
Raw Normal View History

2020-03-11 09:09:56 +08:00
import numpy as np
2020-03-15 17:41:00 +08:00
from abc import ABC, abstractmethod
2020-03-11 09:09:56 +08:00
from multiprocessing import Process, Pipe
2020-03-11 09:38:14 +08:00
try:
import ray
except ImportError:
pass
2020-03-11 09:09:56 +08:00
2020-03-20 19:52:29 +08:00
from tianshou.env import EnvWrapper, CloudpickleWrapper
2020-03-11 09:09:56 +08:00
2020-03-12 22:20:33 +08:00
class BaseVectorEnv(ABC):
2020-03-25 14:08:28 +08:00
def __init__(self, env_fns):
2020-03-13 21:47:17 +08:00
self._env_fns = env_fns
self.env_num = len(env_fns)
2020-03-15 17:41:00 +08:00
def __len__(self):
return self.env_num
@abstractmethod
def reset(self):
pass
@abstractmethod
def step(self, action):
pass
@abstractmethod
def seed(self, seed=None):
pass
@abstractmethod
2020-03-18 21:45:41 +08:00
def render(self, **kwargs):
2020-03-15 17:41:00 +08:00
pass
@abstractmethod
def close(self):
pass
2020-03-12 22:20:33 +08:00
class VectorEnv(BaseVectorEnv):
2020-03-11 09:09:56 +08:00
"""docstring for VectorEnv"""
2020-03-13 17:49:22 +08:00
2020-03-25 14:08:28 +08:00
def __init__(self, env_fns):
super().__init__(env_fns)
2020-03-11 09:09:56 +08:00
self.envs = [_() for _ in env_fns]
2020-03-11 09:38:14 +08:00
2020-03-25 14:08:28 +08:00
def reset(self, id=None):
if id is None:
self._obs = np.stack([e.reset() for e in self.envs])
else:
if np.isscalar(id):
id = [id]
for i in id:
self._obs[i] = self.envs[i].reset()
2020-03-16 11:11:29 +08:00
return self._obs
2020-03-11 09:09:56 +08:00
def step(self, action):
2020-03-11 16:14:53 +08:00
assert len(action) == self.env_num
2020-03-25 14:08:28 +08:00
result = [e.step(a) for e, a in zip(self.envs, action)]
2020-03-16 11:11:29 +08:00
self._obs, self._rew, self._done, self._info = zip(*result)
2020-03-25 14:08:28 +08:00
self._obs = np.stack(self._obs)
self._rew = np.stack(self._rew)
self._done = np.stack(self._done)
self._info = np.stack(self._info)
return self._obs, self._rew, self._done, self._info
2020-03-11 09:09:56 +08:00
def seed(self, seed=None):
2020-03-17 11:37:31 +08:00
if np.isscalar(seed):
seed = [seed + _ for _ in range(self.env_num)]
elif seed is None:
seed = [seed] * self.env_num
2020-03-25 14:08:28 +08:00
result = []
2020-03-11 16:14:53 +08:00
for e, s in zip(self.envs, seed):
2020-03-11 09:09:56 +08:00
if hasattr(e, 'seed'):
2020-03-25 14:08:28 +08:00
result.append(e.seed(s))
return result
2020-03-11 09:09:56 +08:00
2020-03-18 21:45:41 +08:00
def render(self, **kwargs):
2020-03-25 14:08:28 +08:00
result = []
2020-03-11 09:09:56 +08:00
for e in self.envs:
if hasattr(e, 'render'):
2020-03-25 14:08:28 +08:00
result.append(e.render(**kwargs))
return result
2020-03-11 09:09:56 +08:00
def close(self):
for e in self.envs:
e.close()
2020-03-25 14:08:28 +08:00
def worker(parent, p, env_fn_wrapper):
2020-03-11 16:14:53 +08:00
parent.close()
env = env_fn_wrapper.data()
2020-03-17 20:22:37 +08:00
try:
while True:
cmd, data = p.recv()
if cmd == 'step':
2020-03-25 14:08:28 +08:00
p.send(env.step(data))
2020-03-17 20:22:37 +08:00
elif cmd == 'reset':
p.send(env.reset())
elif cmd == 'close':
p.close()
break
elif cmd == 'render':
2020-03-18 21:45:41 +08:00
p.send(env.render(**data) if hasattr(env, 'render') else None)
2020-03-17 20:22:37 +08:00
elif cmd == 'seed':
p.send(env.seed(data) if hasattr(env, 'seed') else None)
else:
p.close()
raise NotImplementedError
except KeyboardInterrupt:
p.close()
2020-03-11 16:14:53 +08:00
2020-03-12 22:20:33 +08:00
class SubprocVectorEnv(BaseVectorEnv):
2020-03-11 09:09:56 +08:00
"""docstring for SubProcVectorEnv"""
2020-03-13 17:49:22 +08:00
2020-03-25 14:08:28 +08:00
def __init__(self, env_fns):
super().__init__(env_fns)
2020-03-11 09:09:56 +08:00
self.closed = False
2020-03-13 17:49:22 +08:00
self.parent_remote, self.child_remote = \
zip(*[Pipe() for _ in range(self.env_num)])
2020-03-11 09:38:14 +08:00
self.processes = [
2020-03-13 17:49:22 +08:00
Process(target=worker, args=(
2020-03-25 14:08:28 +08:00
parent, child, CloudpickleWrapper(env_fn)), daemon=True)
2020-03-13 17:49:22 +08:00
for (parent, child, env_fn) in zip(
self.parent_remote, self.child_remote, env_fns)
2020-03-11 09:38:14 +08:00
]
2020-03-11 09:09:56 +08:00
for p in self.processes:
p.start()
for c in self.child_remote:
c.close()
def step(self, action):
2020-03-11 16:14:53 +08:00
assert len(action) == self.env_num
2020-03-11 09:09:56 +08:00
for p, a in zip(self.parent_remote, action):
p.send(['step', a])
result = [p.recv() for p in self.parent_remote]
2020-03-25 14:08:28 +08:00
self._obs, self._rew, self._done, self._info = zip(*result)
self._obs = np.stack(self._obs)
self._rew = np.stack(self._rew)
self._done = np.stack(self._done)
self._info = np.stack(self._info)
return self._obs, self._rew, self._done, self._info
def reset(self, id=None):
if id is None:
for p in self.parent_remote:
p.send(['reset', None])
self._obs = np.stack([p.recv() for p in self.parent_remote])
return self._obs
else:
if np.isscalar(id):
id = [id]
for i in id:
self.parent_remote[i].send(['reset', None])
for i in id:
self._obs[i] = self.parent_remote[i].recv()
return self._obs
2020-03-11 09:09:56 +08:00
2020-03-11 16:14:53 +08:00
def seed(self, seed=None):
2020-03-17 11:37:31 +08:00
if np.isscalar(seed):
seed = [seed + _ for _ in range(self.env_num)]
elif seed is None:
seed = [seed] * self.env_num
2020-03-11 09:09:56 +08:00
for p, s in zip(self.parent_remote, seed):
p.send(['seed', s])
2020-03-25 14:08:28 +08:00
return [p.recv() for p in self.parent_remote]
2020-03-11 09:09:56 +08:00
2020-03-18 21:45:41 +08:00
def render(self, **kwargs):
2020-03-11 09:09:56 +08:00
for p in self.parent_remote:
2020-03-18 21:45:41 +08:00
p.send(['render', kwargs])
2020-03-25 14:08:28 +08:00
return [p.recv() for p in self.parent_remote]
2020-03-11 09:09:56 +08:00
def close(self):
if self.closed:
return
for p in self.parent_remote:
p.send(['close', None])
self.closed = True
for p in self.processes:
p.join()
2020-03-12 22:20:33 +08:00
class RayVectorEnv(BaseVectorEnv):
2020-03-11 09:09:56 +08:00
"""docstring for RayVectorEnv"""
2020-03-13 17:49:22 +08:00
2020-03-25 14:08:28 +08:00
def __init__(self, env_fns):
super().__init__(env_fns)
2020-03-11 09:09:56 +08:00
try:
2020-03-11 10:56:38 +08:00
if not ray.is_initialized():
ray.init()
except NameError:
2020-03-13 17:49:22 +08:00
raise ImportError(
2020-03-18 21:45:41 +08:00
'Please install ray to support RayVectorEnv: pip3 install ray')
2020-03-13 17:49:22 +08:00
self.envs = [
ray.remote(EnvWrapper).options(num_cpus=0).remote(e())
for e in env_fns]
2020-03-11 09:09:56 +08:00
def step(self, action):
2020-03-11 16:14:53 +08:00
assert len(action) == self.env_num
2020-03-25 14:08:28 +08:00
result_obj = [e.step.remote(a) for e, a in zip(self.envs, action)]
result = [ray.get(r) for r in result_obj]
2020-03-16 11:11:29 +08:00
self._obs, self._rew, self._done, self._info = zip(*result)
2020-03-25 14:08:28 +08:00
self._obs = np.stack(self._obs)
self._rew = np.stack(self._rew)
self._done = np.stack(self._done)
self._info = np.stack(self._info)
return self._obs, self._rew, self._done, self._info
def reset(self, id=None):
if id is None:
result_obj = [e.reset.remote() for e in self.envs]
self._obs = np.stack([ray.get(r) for r in result_obj])
else:
2020-03-11 18:02:19 +08:00
result_obj = []
2020-03-25 14:08:28 +08:00
if np.isscalar(id):
id = [id]
for i in id:
result_obj.append(self.envs[i].reset.remote())
for _, i in enumerate(id):
self._obs[i] = ray.get(result_obj[_])
2020-03-16 11:11:29 +08:00
return self._obs
2020-03-11 09:09:56 +08:00
2020-03-11 16:14:53 +08:00
def seed(self, seed=None):
2020-03-12 22:20:33 +08:00
if not hasattr(self.envs[0], 'seed'):
return
2020-03-17 11:37:31 +08:00
if np.isscalar(seed):
seed = [seed + _ for _ in range(self.env_num)]
elif seed is None:
seed = [seed] * self.env_num
2020-03-11 09:09:56 +08:00
result_obj = [e.seed.remote(s) for e, s in zip(self.envs, seed)]
2020-03-25 14:08:28 +08:00
return [ray.get(r) for r in result_obj]
2020-03-11 09:09:56 +08:00
2020-03-18 21:45:41 +08:00
def render(self, **kwargs):
2020-03-12 22:20:33 +08:00
if not hasattr(self.envs[0], 'render'):
return
2020-03-18 21:45:41 +08:00
result_obj = [e.render.remote(**kwargs) for e in self.envs]
2020-03-25 14:08:28 +08:00
return [ray.get(r) for r in result_obj]
2020-03-11 09:09:56 +08:00
def close(self):
result_obj = [e.close.remote() for e in self.envs]
for r in result_obj:
ray.get(r)