Tianshou/test/continuous/test_sac_with_il.py
2022-05-12 20:52:55 +08:00

209 lines
7.1 KiB
Python

import argparse
import os
import numpy as np
import pytest
import torch
from torch.utils.tensorboard import SummaryWriter
from tianshou.data import Collector, VectorReplayBuffer
from tianshou.policy import ImitationPolicy, SACPolicy
from tianshou.trainer import offpolicy_trainer
from tianshou.utils import TensorboardLogger
from tianshou.utils.net.common import Net
from tianshou.utils.net.continuous import Actor, ActorProb, Critic
try:
import envpool
except ImportError:
envpool = None
def get_args():
parser = argparse.ArgumentParser()
parser.add_argument('--task', type=str, default='Pendulum-v0')
parser.add_argument('--reward-threshold', type=float, default=None)
parser.add_argument('--seed', type=int, default=1)
parser.add_argument('--buffer-size', type=int, default=20000)
parser.add_argument('--actor-lr', type=float, default=1e-3)
parser.add_argument('--critic-lr', type=float, default=1e-3)
parser.add_argument('--il-lr', type=float, default=1e-3)
parser.add_argument('--gamma', type=float, default=0.99)
parser.add_argument('--tau', type=float, default=0.005)
parser.add_argument('--alpha', type=float, default=0.2)
parser.add_argument('--auto-alpha', type=int, default=1)
parser.add_argument('--alpha-lr', type=float, default=3e-4)
parser.add_argument('--epoch', type=int, default=5)
parser.add_argument('--step-per-epoch', type=int, default=24000)
parser.add_argument('--il-step-per-epoch', type=int, default=500)
parser.add_argument('--step-per-collect', type=int, default=10)
parser.add_argument('--update-per-step', type=float, default=0.1)
parser.add_argument('--batch-size', type=int, default=128)
parser.add_argument('--hidden-sizes', type=int, nargs='*', default=[128, 128])
parser.add_argument(
'--imitation-hidden-sizes', type=int, nargs='*', default=[128, 128]
)
parser.add_argument('--training-num', type=int, default=10)
parser.add_argument('--test-num', type=int, default=100)
parser.add_argument('--logdir', type=str, default='log')
parser.add_argument('--render', type=float, default=0.)
parser.add_argument('--rew-norm', action="store_true", default=False)
parser.add_argument('--n-step', type=int, default=3)
parser.add_argument(
'--device', type=str, default='cuda' if torch.cuda.is_available() else 'cpu'
)
args = parser.parse_known_args()[0]
return args
@pytest.mark.skipif(envpool is None, reason="EnvPool doesn't support this platform")
def test_sac_with_il(args=get_args()):
# if you want to use python vector env, please refer to other test scripts
train_envs = env = envpool.make_gym(
args.task, num_envs=args.training_num, seed=args.seed
)
test_envs = envpool.make_gym(args.task, num_envs=args.test_num, seed=args.seed)
args.state_shape = env.observation_space.shape or env.observation_space.n
args.action_shape = env.action_space.shape or env.action_space.n
args.max_action = env.action_space.high[0]
if args.reward_threshold is None:
default_reward_threshold = {"Pendulum-v0": -250, "Pendulum-v1": -250}
args.reward_threshold = default_reward_threshold.get(
args.task, env.spec.reward_threshold
)
# you can also use tianshou.env.SubprocVectorEnv
# seed
np.random.seed(args.seed)
torch.manual_seed(args.seed)
# model
net = Net(args.state_shape, hidden_sizes=args.hidden_sizes, device=args.device)
actor = ActorProb(
net,
args.action_shape,
max_action=args.max_action,
device=args.device,
unbounded=True
).to(args.device)
actor_optim = torch.optim.Adam(actor.parameters(), lr=args.actor_lr)
net_c1 = Net(
args.state_shape,
args.action_shape,
hidden_sizes=args.hidden_sizes,
concat=True,
device=args.device
)
critic1 = Critic(net_c1, device=args.device).to(args.device)
critic1_optim = torch.optim.Adam(critic1.parameters(), lr=args.critic_lr)
net_c2 = Net(
args.state_shape,
args.action_shape,
hidden_sizes=args.hidden_sizes,
concat=True,
device=args.device
)
critic2 = Critic(net_c2, device=args.device).to(args.device)
critic2_optim = torch.optim.Adam(critic2.parameters(), lr=args.critic_lr)
if args.auto_alpha:
target_entropy = -np.prod(env.action_space.shape)
log_alpha = torch.zeros(1, requires_grad=True, device=args.device)
alpha_optim = torch.optim.Adam([log_alpha], lr=args.alpha_lr)
args.alpha = (target_entropy, log_alpha, alpha_optim)
policy = SACPolicy(
actor,
actor_optim,
critic1,
critic1_optim,
critic2,
critic2_optim,
tau=args.tau,
gamma=args.gamma,
alpha=args.alpha,
reward_normalization=args.rew_norm,
estimation_step=args.n_step,
action_space=env.action_space
)
# collector
train_collector = Collector(
policy,
train_envs,
VectorReplayBuffer(args.buffer_size, len(train_envs)),
exploration_noise=True
)
test_collector = Collector(policy, test_envs)
# train_collector.collect(n_step=args.buffer_size)
# log
log_path = os.path.join(args.logdir, args.task, 'sac')
writer = SummaryWriter(log_path)
logger = TensorboardLogger(writer)
def save_best_fn(policy):
torch.save(policy.state_dict(), os.path.join(log_path, 'policy.pth'))
def stop_fn(mean_rewards):
return mean_rewards >= args.reward_threshold
# trainer
result = offpolicy_trainer(
policy,
train_collector,
test_collector,
args.epoch,
args.step_per_epoch,
args.step_per_collect,
args.test_num,
args.batch_size,
update_per_step=args.update_per_step,
stop_fn=stop_fn,
save_best_fn=save_best_fn,
logger=logger
)
assert stop_fn(result['best_reward'])
# here we define an imitation collector with a trivial policy
policy.eval()
if args.task.startswith("Pendulum"):
args.reward_threshold -= 50 # lower the goal
net = Actor(
Net(
args.state_shape,
hidden_sizes=args.imitation_hidden_sizes,
device=args.device
),
args.action_shape,
max_action=args.max_action,
device=args.device
).to(args.device)
optim = torch.optim.Adam(net.parameters(), lr=args.il_lr)
il_policy = ImitationPolicy(
net,
optim,
action_space=env.action_space,
action_scaling=True,
action_bound_method="clip"
)
il_test_collector = Collector(
il_policy,
envpool.make_gym(args.task, num_envs=args.test_num, seed=args.seed),
)
train_collector.reset()
result = offpolicy_trainer(
il_policy,
train_collector,
il_test_collector,
args.epoch,
args.il_step_per_epoch,
args.step_per_collect,
args.test_num,
args.batch_size,
stop_fn=stop_fn,
save_best_fn=save_best_fn,
logger=logger
)
assert stop_fn(result['best_reward'])
if __name__ == '__main__':
test_sac_with_il()