import gym import torch import pprint import argparse import numpy as np from torch.utils.tensorboard import SummaryWriter from tianshou.policy import TD3Policy from tianshou.utils.net.common import Net from tianshou.env import SubprocVectorEnv from tianshou.exploration import GaussianNoise from tianshou.trainer import offpolicy_trainer from tianshou.data import Collector, ReplayBuffer from tianshou.utils.net.continuous import Actor, Critic from mujoco.register import reg def get_args(): parser = argparse.ArgumentParser() parser.add_argument('--task', type=str, default='PointMaze-v1') parser.add_argument('--seed', type=int, default=1626) parser.add_argument('--buffer-size', type=int, default=20000) parser.add_argument('--actor-lr', type=float, default=3e-5) parser.add_argument('--critic-lr', type=float, default=1e-4) parser.add_argument('--gamma', type=float, default=0.99) parser.add_argument('--tau', type=float, default=0.005) parser.add_argument('--exploration-noise', type=float, default=0.1) parser.add_argument('--policy-noise', type=float, default=0.2) parser.add_argument('--noise-clip', type=float, default=0.5) parser.add_argument('--update-actor-freq', type=int, default=2) parser.add_argument('--epoch', type=int, default=100) parser.add_argument('--step-per-epoch', type=int, default=2400) parser.add_argument('--collect-per-step', type=int, default=10) parser.add_argument('--batch-size', type=int, default=128) parser.add_argument('--hidden-sizes', type=int, nargs='*', default=[128, 128]) parser.add_argument('--training-num', type=int, default=8) parser.add_argument('--test-num', type=int, default=100) parser.add_argument('--logdir', type=str, default='log') parser.add_argument('--render', type=float, default=0.) parser.add_argument( '--device', type=str, default='cuda' if torch.cuda.is_available() else 'cpu') return parser.parse_args() def test_td3(args=get_args()): reg() env = gym.make(args.task) args.state_shape = env.observation_space.shape or env.observation_space.n args.action_shape = env.action_space.shape or env.action_space.n args.max_action = env.action_space.high[0] # train_envs = gym.make(args.task) train_envs = SubprocVectorEnv( [lambda: gym.make(args.task) for _ in range(args.training_num)]) # test_envs = gym.make(args.task) test_envs = SubprocVectorEnv( [lambda: gym.make(args.task) for _ in range(args.test_num)]) # seed np.random.seed(args.seed) torch.manual_seed(args.seed) train_envs.seed(args.seed) test_envs.seed(args.seed) # model net = Net(args.state_shape, hidden_sizes=args.hidden_sizes, device=args.device) actor = Actor(net, args.action_shape, max_action=args.max_action, device=args.device).to(args.device) actor_optim = torch.optim.Adam(actor.parameters(), lr=args.actor_lr) net = Net(args.state_shape, args.action_shape, hidden_sizes=args.hidden_sizes, concat=True, device=args.device) critic1 = Critic(net, device=args.device).to(args.device) critic1_optim = torch.optim.Adam(critic1.parameters(), lr=args.critic_lr) net = Net(args.state_shape, args.action_shape, hidden_sizes=args.hidden_sizes, concat=True, device=args.device) critic2 = Critic(net, device=args.device).to(args.device) critic2_optim = torch.optim.Adam(critic2.parameters(), lr=args.critic_lr) policy = TD3Policy( actor, actor_optim, critic1, critic1_optim, critic2, critic2_optim, action_range=[env.action_space.low[0], env.action_space.high[0]], tau=args.tau, gamma=args.gamma, exploration_noise=GaussianNoise(sigma=args.exploration_noise), policy_noise=args.policy_noise, update_actor_freq=args.update_actor_freq, noise_clip=args.noise_clip, reward_normalization=True, ignore_done=True) # collector train_collector = Collector( policy, train_envs, ReplayBuffer(args.buffer_size)) test_collector = Collector(policy, test_envs) # train_collector.collect(n_step=args.buffer_size) # log writer = SummaryWriter(args.logdir + '/' + 'td3') def stop_fn(mean_rewards): if env.spec.reward_threshold: return mean_rewards >= env.spec.reward_threshold else: return False # trainer result = offpolicy_trainer( policy, train_collector, test_collector, args.epoch, args.step_per_epoch, args.collect_per_step, args.test_num, args.batch_size, stop_fn=stop_fn, writer=writer) assert stop_fn(result['best_reward']) if __name__ == '__main__': pprint.pprint(result) # Let's watch its performance! policy.eval() test_envs.seed(args.seed) test_collector.reset() result = test_collector.collect(n_episode=[1] * args.test_num, render=args.render) print(f'Final reward: {result["rew"]}, length: {result["len"]}') if __name__ == '__main__': test_td3()