Tianshou/test/continuous/test_npg.py
Erni bf0d632108
Naming and typing improvements in Actor/Critic/Policy forwards (#1032)
Closes #917 

### Internal Improvements
- Better variable names related to model outputs (logits, dist input
etc.). #1032
- Improved typing for actors and critics, using Tianshou classes like
`Actor`, `ActorProb`, etc.,
instead of just `nn.Module`. #1032
- Added interfaces for most `Actor` and `Critic` classes to enforce the
presence of `forward` methods. #1032
- Simplified `PGPolicy` forward by unifying the `dist_fn` interface (see
associated breaking change). #1032
- Use `.mode` of distribution instead of relying on knowledge of the
distribution type. #1032

### Breaking Changes

- Changed interface of `dist_fn` in `PGPolicy` and all subclasses to
take a single argument in both
continuous and discrete cases. #1032

---------

Co-authored-by: Arnau Jimenez <arnau.jimenez@zeiss.com>
Co-authored-by: Michael Panchenko <m.panchenko@appliedai.de>
2024-04-01 17:14:17 +02:00

171 lines
6.3 KiB
Python

import argparse
import os
import pprint
import gymnasium as gym
import numpy as np
import torch
from torch import nn
from torch.distributions import Distribution, Independent, Normal
from torch.utils.tensorboard import SummaryWriter
from tianshou.data import Collector, VectorReplayBuffer
from tianshou.env import DummyVectorEnv
from tianshou.policy import NPGPolicy
from tianshou.policy.base import BasePolicy
from tianshou.policy.modelfree.npg import NPGTrainingStats
from tianshou.trainer import OnpolicyTrainer
from tianshou.utils import TensorboardLogger
from tianshou.utils.net.common import Net
from tianshou.utils.net.continuous import ActorProb, Critic
from tianshou.utils.space_info import SpaceInfo
def get_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument("--task", type=str, default="Pendulum-v1")
parser.add_argument("--reward-threshold", type=float, default=None)
parser.add_argument("--seed", type=int, default=1)
parser.add_argument("--buffer-size", type=int, default=50000)
parser.add_argument("--lr", type=float, default=1e-3)
parser.add_argument("--gamma", type=float, default=0.95)
parser.add_argument("--epoch", type=int, default=5)
parser.add_argument("--step-per-epoch", type=int, default=50000)
parser.add_argument("--step-per-collect", type=int, default=2048)
parser.add_argument("--repeat-per-collect", type=int, default=2) # theoretically it should be 1
parser.add_argument("--batch-size", type=int, default=99999)
parser.add_argument("--hidden-sizes", type=int, nargs="*", default=[64, 64])
parser.add_argument("--training-num", type=int, default=16)
parser.add_argument("--test-num", type=int, default=10)
parser.add_argument("--logdir", type=str, default="log")
parser.add_argument("--render", type=float, default=0.0)
parser.add_argument(
"--device",
type=str,
default="cuda" if torch.cuda.is_available() else "cpu",
)
# npg special
parser.add_argument("--gae-lambda", type=float, default=0.95)
parser.add_argument("--rew-norm", type=int, default=1)
parser.add_argument("--norm-adv", type=int, default=1)
parser.add_argument("--optim-critic-iters", type=int, default=5)
parser.add_argument("--actor-step-size", type=float, default=0.5)
return parser.parse_known_args()[0]
def test_npg(args: argparse.Namespace = get_args()) -> None:
env = gym.make(args.task)
space_info = SpaceInfo.from_env(env)
args.state_shape = space_info.observation_info.obs_shape
args.action_shape = space_info.action_info.action_shape
args.max_action = space_info.action_info.max_action
if args.reward_threshold is None:
default_reward_threshold = {"Pendulum-v0": -250, "Pendulum-v1": -250}
args.reward_threshold = default_reward_threshold.get(
args.task,
env.spec.reward_threshold if env.spec else None,
)
# you can also use tianshou.env.SubprocVectorEnv
# train_envs = gym.make(args.task)
train_envs = DummyVectorEnv([lambda: gym.make(args.task) for _ in range(args.training_num)])
# test_envs = gym.make(args.task)
test_envs = DummyVectorEnv([lambda: gym.make(args.task) for _ in range(args.test_num)])
# seed
np.random.seed(args.seed)
torch.manual_seed(args.seed)
train_envs.seed(args.seed)
test_envs.seed(args.seed)
# model
net = Net(
args.state_shape,
hidden_sizes=args.hidden_sizes,
activation=nn.Tanh,
device=args.device,
)
actor = ActorProb(net, args.action_shape, unbounded=True, device=args.device).to(args.device)
critic = Critic(
Net(
args.state_shape,
hidden_sizes=args.hidden_sizes,
device=args.device,
activation=nn.Tanh,
),
device=args.device,
).to(args.device)
# orthogonal initialization
for m in list(actor.modules()) + list(critic.modules()):
if isinstance(m, torch.nn.Linear):
torch.nn.init.orthogonal_(m.weight)
torch.nn.init.zeros_(m.bias)
optim = torch.optim.Adam(critic.parameters(), lr=args.lr)
# replace DiagGuassian with Independent(Normal) which is equivalent
# pass *logits to be consistent with policy.forward
def dist(loc_scale: tuple[torch.Tensor, torch.Tensor]) -> Distribution:
loc, scale = loc_scale
return Independent(Normal(loc, scale), 1)
policy: NPGPolicy[NPGTrainingStats] = NPGPolicy(
actor=actor,
critic=critic,
optim=optim,
dist_fn=dist,
discount_factor=args.gamma,
reward_normalization=args.rew_norm,
advantage_normalization=args.norm_adv,
gae_lambda=args.gae_lambda,
action_space=env.action_space,
optim_critic_iters=args.optim_critic_iters,
actor_step_size=args.actor_step_size,
deterministic_eval=True,
)
# collector
train_collector = Collector(
policy,
train_envs,
VectorReplayBuffer(args.buffer_size, len(train_envs)),
)
test_collector = Collector(policy, test_envs)
# log
log_path = os.path.join(args.logdir, args.task, "npg")
writer = SummaryWriter(log_path)
logger = TensorboardLogger(writer)
def save_best_fn(policy: BasePolicy) -> None:
torch.save(policy.state_dict(), os.path.join(log_path, "policy.pth"))
def stop_fn(mean_rewards: float) -> bool:
return mean_rewards >= args.reward_threshold
# trainer
result = OnpolicyTrainer(
policy=policy,
train_collector=train_collector,
test_collector=test_collector,
max_epoch=args.epoch,
step_per_epoch=args.step_per_epoch,
repeat_per_collect=args.repeat_per_collect,
episode_per_test=args.test_num,
batch_size=args.batch_size,
step_per_collect=args.step_per_collect,
stop_fn=stop_fn,
save_best_fn=save_best_fn,
logger=logger,
).run()
assert stop_fn(result.best_reward)
if __name__ == "__main__":
pprint.pprint(result)
# Let's watch its performance!
env = gym.make(args.task)
policy.eval()
collector = Collector(policy, env)
collector_stats = collector.collect(n_episode=1, render=args.render)
print(collector_stats)
if __name__ == "__main__":
test_npg()