Tianshou/tianshou/env/vecenv.py
youkaichao 8c32d99c65
Add multi-agent example: tic-tac-toe (#122)
* make fileds with empty Batch rather than None after reset

* dummy code

* remove dummy

* add reward_length argument for collector

* Improve Batch (#126)

* make sure the key type of Batch is string, and add unit tests

* add is_empty() function and unit tests

* enable cat of mixing dict and Batch, just like stack

* bugfix for reward_length

* add get_final_reward_fn argument to collector to deal with marl

* minor polish

* remove multibuf

* minor polish

* improve and implement Batch.cat_

* bugfix for buffer.sample with field impt_weight

* restore the usage of a.cat_(b)

* fix 2 bugs in batch and add corresponding unittest

* code fix for update

* update is_empty to recognize empty over empty; bugfix for len

* bugfix for update and add testcase

* add testcase of update

* make fileds with empty Batch rather than None after reset

* dummy code

* remove dummy

* add reward_length argument for collector

* bugfix for reward_length

* add get_final_reward_fn argument to collector to deal with marl

* make sure the key type of Batch is string, and add unit tests

* add is_empty() function and unit tests

* enable cat of mixing dict and Batch, just like stack

* dummy code

* remove dummy

* add multi-agent example: tic-tac-toe

* move TicTacToeEnv to a separate file

* remove dummy MANet

* code refactor

* move tic-tac-toe example to test

* update doc with marl-example

* fix docs

* reduce the threshold

* revert

* update player id to start from 1 and change player to agent; keep coding

* add reward_length argument for collector

* Improve Batch (#128)

* minor polish

* improve and implement Batch.cat_

* bugfix for buffer.sample with field impt_weight

* restore the usage of a.cat_(b)

* fix 2 bugs in batch and add corresponding unittest

* code fix for update

* update is_empty to recognize empty over empty; bugfix for len

* bugfix for update and add testcase

* add testcase of update

* fix docs

* fix docs

* fix docs [ci skip]

* fix docs [ci skip]

Co-authored-by: Trinkle23897 <463003665@qq.com>

* refact

* re-implement Batch.stack and add testcases

* add doc for Batch.stack

* reward_metric

* modify flag

* minor fix

* reuse _create_values and refactor stack_ & cat_

* fix pep8

* fix reward stat in collector

* fix stat of collector, simplify test/base/env.py

* fix docs

* minor fix

* raise exception for stacking with partial keys and axis!=0

* minor fix

* minor fix

* minor fix

* marl-examples

* add condense; bugfix for torch.Tensor; code refactor

* marl example can run now

* enable tic tac toe with larger board size and win-size

* add test dependency

* Fix padding of inconsistent keys with Batch.stack and Batch.cat (#130)

* re-implement Batch.stack and add testcases

* add doc for Batch.stack

* reuse _create_values and refactor stack_ & cat_

* fix pep8

* fix docs

* raise exception for stacking with partial keys and axis!=0

* minor fix

* minor fix

Co-authored-by: Trinkle23897 <463003665@qq.com>

* stash

* let agent learn to play as agent 2 which is harder

* code refactor

* Improve collector (#125)

* remove multibuf

* reward_metric

* make fileds with empty Batch rather than None after reset

* many fixes and refactor
Co-authored-by: Trinkle23897 <463003665@qq.com>

* marl for tic-tac-toe and general gomoku

* update default gamma to 0.1 for tic tac toe to win earlier

* fix name typo; change default game config; add rew_norm option

* fix pep8

* test commit

* mv test dir name

* add rew flag

* fix torch.optim import error and madqn rew_norm

* remove useless kwargs

* Vector env enable select worker (#132)

* Enable selecting worker for vector env step method.

* Update collector to match new vecenv selective worker behavior.

* Bug fix.

* Fix rebase

Co-authored-by: Alexis Duburcq <alexis.duburcq@wandercraft.eu>

* show the last move of tictactoe by capital letters

* add multi-agent tutorial

* fix link

* Standardized behavior of Batch.cat and misc code refactor (#137)

* code refactor; remove unused kwargs; add reward_normalization for dqn

* bugfix for __setitem__ with torch.Tensor; add Batch.condense

* minor fix

* support cat with empty Batch

* remove the dependency of is_empty on len; specify the semantic of empty Batch by test cases

* support stack with empty Batch

* remove condense

* refactor code to reflect the shared / partial / reserved categories of keys

* add is_empty(recursive=False)

* doc fix

* docfix and bugfix for _is_batch_set

* add doc for key reservation

* bugfix for algebra operators

* fix cat with lens hint

* code refactor

* bugfix for storing None

* use ValueError instead of exception

* hide lens away from users

* add comment for __cat

* move the computation of the initial value of lens in cat_ itself.

* change the place of doc string

* doc fix for Batch doc string

* change recursive to recurse

* doc string fix

* minor fix for batch doc

* write tutorials to specify the standard of Batch (#142)

* add doc for len exceptions

* doc move; unify is_scalar_value function

* remove some issubclass check

* bugfix for shape of Batch(a=1)

* keep moving doc

* keep writing batch tutorial

* draft version of Batch tutorial done

* improving doc

* keep improving doc

* batch tutorial done

* rename _is_number

* rename _is_scalar

* shape property do not raise exception

* restore some doc string

* grammarly [ci skip]

* grammarly + fix warning of building docs

* polish docs

* trim and re-arrange batch tutorial

* go straight to the point

* minor fix for batch doc

* add shape / len in basic usage

* keep improving tutorial

* unify _to_array_with_correct_type to remove duplicate code

* delegate type convertion to Batch.__init__

* further delegate type convertion to Batch.__init__

* bugfix for setattr

* add a _parse_value function

* remove dummy function call

* polish docs

Co-authored-by: Trinkle23897 <463003665@qq.com>

* bugfix for mapolicy

* pretty code

* remove debug code; remove condense

* doc fix

* check before get_agents in tutorials/tictactoe

* tutorial

* fix

* minor fix for batch doc

* minor polish

* faster test_ttt

* improve tic-tac-toe environment

* change default epoch and step-per-epoch for tic-tac-toe

* fix mapolicy

* minor polish for mapolicy

* 90% to 80% (need to change the tutorial)

* win rate

* show step number at board

* simplify mapolicy

* minor polish for mapolicy

* remove MADQN

* fix pep8

* change legal_actions to mask (need to update docs)

* simplify maenv

* fix typo

* move basevecenv to single file

* separate RandomAgent

* update docs

* grammarly

* fix pep8

* win rate typo

* format in cheatsheet

* use bool mask directly

* update doc for boolean mask

Co-authored-by: Trinkle23897 <463003665@qq.com>
Co-authored-by: Alexis DUBURCQ <alexis.duburcq@gmail.com>
Co-authored-by: Alexis Duburcq <alexis.duburcq@wandercraft.eu>
2020-07-21 14:59:49 +08:00

248 lines
8.1 KiB
Python

import gym
import numpy as np
from multiprocessing import Process, Pipe
from typing import List, Tuple, Union, Optional, Callable, Any
try:
import ray
except ImportError:
pass
from tianshou.env import BaseVectorEnv
from tianshou.env.utils import CloudpickleWrapper
class VectorEnv(BaseVectorEnv):
"""Dummy vectorized environment wrapper, implemented in for-loop.
.. seealso::
Please refer to :class:`~tianshou.env.BaseVectorEnv` for more detailed
explanation.
"""
def __init__(self, env_fns: List[Callable[[], gym.Env]]) -> None:
super().__init__(env_fns)
self.envs = [_() for _ in env_fns]
def __getattr__(self, key):
return [getattr(env, key) if hasattr(env, key) else None
for env in self.envs]
def reset(self, id: Optional[Union[int, List[int]]] = None) -> np.ndarray:
if id is None:
id = range(self.env_num)
elif np.isscalar(id):
id = [id]
obs = np.stack([self.envs[i].reset() for i in id])
return obs
def step(self,
action: np.ndarray,
id: Optional[Union[int, List[int]]] = None
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
if id is None:
id = range(self.env_num)
elif np.isscalar(id):
id = [id]
assert len(action) == len(id)
result = [self.envs[i].step(action[i]) for i in id]
obs, rew, done, info = map(np.stack, zip(*result))
return obs, rew, done, info
def seed(self, seed: Optional[Union[int, List[int]]] = None) -> List[int]:
if np.isscalar(seed):
seed = [seed + _ for _ in range(self.env_num)]
elif seed is None:
seed = [seed] * self.env_num
result = []
for e, s in zip(self.envs, seed):
if hasattr(e, 'seed'):
result.append(e.seed(s))
return result
def render(self, **kwargs) -> List[Any]:
result = []
for e in self.envs:
if hasattr(e, 'render'):
result.append(e.render(**kwargs))
return result
def close(self) -> List[Any]:
return [e.close() for e in self.envs]
def worker(parent, p, env_fn_wrapper):
parent.close()
env = env_fn_wrapper.data()
try:
while True:
cmd, data = p.recv()
if cmd == 'step':
p.send(env.step(data))
elif cmd == 'reset':
p.send(env.reset())
elif cmd == 'close':
p.send(env.close())
p.close()
break
elif cmd == 'render':
p.send(env.render(**data) if hasattr(env, 'render') else None)
elif cmd == 'seed':
p.send(env.seed(data) if hasattr(env, 'seed') else None)
elif cmd == 'getattr':
p.send(getattr(env, data) if hasattr(env, data) else None)
else:
p.close()
raise NotImplementedError
except KeyboardInterrupt:
p.close()
class SubprocVectorEnv(BaseVectorEnv):
"""Vectorized environment wrapper based on subprocess.
.. seealso::
Please refer to :class:`~tianshou.env.BaseVectorEnv` for more detailed
explanation.
"""
def __init__(self, env_fns: List[Callable[[], gym.Env]]) -> None:
super().__init__(env_fns)
self.closed = False
self.parent_remote, self.child_remote = \
zip(*[Pipe() for _ in range(self.env_num)])
self.processes = [
Process(target=worker, args=(
parent, child, CloudpickleWrapper(env_fn)), daemon=True)
for (parent, child, env_fn) in zip(
self.parent_remote, self.child_remote, env_fns)
]
for p in self.processes:
p.start()
for c in self.child_remote:
c.close()
def __getattr__(self, key):
for p in self.parent_remote:
p.send(['getattr', key])
return [p.recv() for p in self.parent_remote]
def step(self,
action: np.ndarray,
id: Optional[Union[int, List[int]]] = None
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
if id is None:
id = range(self.env_num)
elif np.isscalar(id):
id = [id]
assert len(action) == len(id)
for i, j in enumerate(id):
self.parent_remote[j].send(['step', action[i]])
result = [self.parent_remote[i].recv() for i in id]
obs, rew, done, info = map(np.stack, zip(*result))
return obs, rew, done, info
def reset(self, id: Optional[Union[int, List[int]]] = None) -> np.ndarray:
if id is None:
id = range(self.env_num)
elif np.isscalar(id):
id = [id]
for i in id:
self.parent_remote[i].send(['reset', None])
obs = np.stack([self.parent_remote[i].recv() for i in id])
return obs
def seed(self, seed: Optional[Union[int, List[int]]] = None) -> List[int]:
if np.isscalar(seed):
seed = [seed + _ for _ in range(self.env_num)]
elif seed is None:
seed = [seed] * self.env_num
for p, s in zip(self.parent_remote, seed):
p.send(['seed', s])
return [p.recv() for p in self.parent_remote]
def render(self, **kwargs) -> List[Any]:
for p in self.parent_remote:
p.send(['render', kwargs])
return [p.recv() for p in self.parent_remote]
def close(self) -> List[Any]:
if self.closed:
return []
for p in self.parent_remote:
p.send(['close', None])
result = [p.recv() for p in self.parent_remote]
self.closed = True
for p in self.processes:
p.join()
return result
class RayVectorEnv(BaseVectorEnv):
"""Vectorized environment wrapper based on
`ray <https://github.com/ray-project/ray>`_. However, according to our
test, it is about two times slower than
:class:`~tianshou.env.SubprocVectorEnv`.
.. seealso::
Please refer to :class:`~tianshou.env.BaseVectorEnv` for more detailed
explanation.
"""
def __init__(self, env_fns: List[Callable[[], gym.Env]]) -> None:
super().__init__(env_fns)
try:
if not ray.is_initialized():
ray.init()
except NameError:
raise ImportError(
'Please install ray to support RayVectorEnv: pip3 install ray')
self.envs = [
ray.remote(gym.Wrapper).options(num_cpus=0).remote(e())
for e in env_fns]
def __getattr__(self, key):
return ray.get([e.__getattr__.remote(key) for e in self.envs])
def step(self,
action: np.ndarray,
id: Optional[Union[int, List[int]]] = None
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
if id is None:
id = range(self.env_num)
elif np.isscalar(id):
id = [id]
assert len(action) == len(id)
result = ray.get([self.envs[j].step.remote(action[i])
for i, j in enumerate(id)])
obs, rew, done, info = map(np.stack, zip(*result))
return obs, rew, done, info
def reset(self, id: Optional[Union[int, List[int]]] = None) -> np.ndarray:
if id is None:
id = range(self.env_num)
elif np.isscalar(id):
id = [id]
obs = np.stack(ray.get([self.envs[i].reset.remote() for i in id]))
return obs
def seed(self, seed: Optional[Union[int, List[int]]] = None) -> List[int]:
if not hasattr(self.envs[0], 'seed'):
return []
if np.isscalar(seed):
seed = [seed + _ for _ in range(self.env_num)]
elif seed is None:
seed = [seed] * self.env_num
return ray.get([e.seed.remote(s) for e, s in zip(self.envs, seed)])
def render(self, **kwargs) -> List[Any]:
if not hasattr(self.envs[0], 'render'):
return [None for e in self.envs]
return ray.get([e.render.remote(**kwargs) for e in self.envs])
def close(self) -> List[Any]:
return ray.get([e.close.remote() for e in self.envs])