import gym import torch import pprint import argparse import numpy as np from torch import nn import torch.nn.functional as F from torch.utils.tensorboard import SummaryWriter from tianshou.policy import PPOPolicy from tianshou.env import SubprocVectorEnv from tianshou.trainer import onpolicy_trainer from tianshou.data import Collector, ReplayBuffer class Net(nn.Module): def __init__(self, layer_num, state_shape, device='cpu'): super().__init__() self.device = device self.model = [ nn.Linear(np.prod(state_shape), 128), nn.ReLU(inplace=True)] for i in range(layer_num): self.model += [nn.Linear(128, 128), nn.ReLU(inplace=True)] self.model = nn.Sequential(*self.model) def forward(self, s): s = torch.tensor(s, device=self.device, dtype=torch.float) batch = s.shape[0] s = s.view(batch, -1) logits = self.model(s) return logits class Actor(nn.Module): def __init__(self, preprocess_net, action_shape): super().__init__() self.model = nn.Sequential(*[ preprocess_net, nn.Linear(128, np.prod(action_shape)), ]) def forward(self, s, **kwargs): logits = F.softmax(self.model(s), dim=-1) return logits, None class Critic(nn.Module): def __init__(self, preprocess_net): super().__init__() self.model = nn.Sequential(*[ preprocess_net, nn.Linear(128, 1), ]) def forward(self, s): logits = self.model(s) return logits def get_args(): parser = argparse.ArgumentParser() parser.add_argument('--task', type=str, default='CartPole-v0') parser.add_argument('--seed', type=int, default=1626) parser.add_argument('--buffer-size', type=int, default=20000) parser.add_argument('--lr', type=float, default=3e-3) parser.add_argument('--gamma', type=float, default=0.99) parser.add_argument('--epoch', type=int, default=100) parser.add_argument('--step-per-epoch', type=int, default=1000) parser.add_argument('--collect-per-step', type=int, default=10) parser.add_argument('--repeat-per-collect', type=int, default=2) parser.add_argument('--batch-size', type=int, default=64) parser.add_argument('--layer-num', type=int, default=1) parser.add_argument('--training-num', type=int, default=32) parser.add_argument('--test-num', type=int, default=100) parser.add_argument('--logdir', type=str, default='log') parser.add_argument( '--device', type=str, default='cuda' if torch.cuda.is_available() else 'cpu') # ppo special parser.add_argument('--vf-coef', type=float, default=0.5) parser.add_argument('--ent-coef', type=float, default=0.0) parser.add_argument('--eps-clip', type=float, default=0.2) parser.add_argument('--max-grad-norm', type=float, default=0.5) args = parser.parse_known_args()[0] return args def test_ppo(args=get_args()): env = gym.make(args.task) args.state_shape = env.observation_space.shape or env.observation_space.n args.action_shape = env.action_space.shape or env.action_space.n # train_envs = gym.make(args.task) train_envs = SubprocVectorEnv( [lambda: gym.make(args.task) for _ in range(args.training_num)], reset_after_done=True) # test_envs = gym.make(args.task) test_envs = SubprocVectorEnv( [lambda: gym.make(args.task) for _ in range(args.test_num)], reset_after_done=False) # seed np.random.seed(args.seed) torch.manual_seed(args.seed) train_envs.seed(args.seed) test_envs.seed(args.seed) # model net = Net(args.layer_num, args.state_shape, args.device) actor = Actor(net, args.action_shape).to(args.device) critic = Critic(net).to(args.device) optim = torch.optim.Adam(list( actor.parameters()) + list(critic.parameters()), lr=args.lr) dist = torch.distributions.Categorical policy = PPOPolicy( actor, critic, optim, dist, args.gamma, max_grad_norm=args.max_grad_norm, eps_clip=args.eps_clip, vf_coef=args.vf_coef, ent_coef=args.ent_coef, action_range=None) # collector train_collector = Collector( policy, train_envs, ReplayBuffer(args.buffer_size)) test_collector = Collector(policy, test_envs, stat_size=args.test_num) # log writer = SummaryWriter(args.logdir) def stop_fn(x): return x >= env.spec.reward_threshold # trainer result = onpolicy_trainer( policy, train_collector, test_collector, args.epoch, args.step_per_epoch, args.collect_per_step, args.repeat_per_collect, args.test_num, args.batch_size, stop_fn=stop_fn, writer=writer) assert stop_fn(result['best_reward']) train_collector.close() test_collector.close() if __name__ == '__main__': pprint.pprint(result) # Let's watch its performance! env = gym.make(args.task) collector = Collector(policy, env) result = collector.collect(n_episode=1, render=1 / 35) print(f'Final reward: {result["rew"]}, length: {result["len"]}') collector.close() if __name__ == '__main__': test_ppo()