Improves typing in examples and tests, towards mypy passing there. Introduces the SpaceInfo utility
		
			
				
	
	
		
			141 lines
		
	
	
		
			4.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			141 lines
		
	
	
		
			4.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import numpy as np
 | 
						|
import torch
 | 
						|
 | 
						|
from tianshou.exploration import GaussianNoise, OUNoise
 | 
						|
from tianshou.utils import MovAvg, MultipleLRSchedulers, RunningMeanStd
 | 
						|
from tianshou.utils.net.common import MLP, Net
 | 
						|
from tianshou.utils.net.continuous import RecurrentActorProb, RecurrentCritic
 | 
						|
 | 
						|
 | 
						|
def test_noise() -> None:
 | 
						|
    noise = GaussianNoise()
 | 
						|
    size = (3, 4, 5)
 | 
						|
    assert np.allclose(noise(size).shape, size)
 | 
						|
    noise = OUNoise()
 | 
						|
    noise.reset()
 | 
						|
    assert np.allclose(noise(size).shape, size)
 | 
						|
 | 
						|
 | 
						|
def test_moving_average() -> None:
 | 
						|
    stat = MovAvg(10)
 | 
						|
    assert np.allclose(stat.get(), 0)
 | 
						|
    assert np.allclose(stat.mean(), 0)
 | 
						|
    assert np.allclose(stat.std() ** 2, 0)
 | 
						|
    stat.add(torch.tensor([1]))
 | 
						|
    stat.add(np.array([2]))
 | 
						|
    stat.add([3, 4])
 | 
						|
    stat.add(5.0)
 | 
						|
    assert np.allclose(stat.get(), 3)
 | 
						|
    assert np.allclose(stat.mean(), 3)
 | 
						|
    assert np.allclose(stat.std() ** 2, 2)
 | 
						|
 | 
						|
 | 
						|
def test_rms() -> None:
 | 
						|
    rms = RunningMeanStd()
 | 
						|
    assert np.allclose(rms.mean, 0)
 | 
						|
    assert np.allclose(rms.var, 1)
 | 
						|
    rms.update(np.array([[[1, 2], [3, 5]]]))
 | 
						|
    rms.update(np.array([[[1, 2], [3, 4]], [[1, 2], [0, 0]]]))
 | 
						|
    assert np.allclose(rms.mean, np.array([[1, 2], [2, 3]]), atol=1e-3)
 | 
						|
    assert np.allclose(rms.var, np.array([[0, 0], [2, 14 / 3.0]]), atol=1e-3)
 | 
						|
 | 
						|
 | 
						|
def test_net() -> None:
 | 
						|
    # here test the networks that does not appear in the other script
 | 
						|
    bsz = 64
 | 
						|
    # MLP
 | 
						|
    data = torch.rand([bsz, 3])
 | 
						|
    mlp = MLP(3, 6, hidden_sizes=[128])
 | 
						|
    assert list(mlp(data).shape) == [bsz, 6]
 | 
						|
    # output == 0 and len(hidden_sizes) == 0 means identity model
 | 
						|
    mlp = MLP(6, 0)
 | 
						|
    assert data.shape == mlp(data).shape
 | 
						|
    # common net
 | 
						|
    state_shape = (10, 2)
 | 
						|
    action_shape = (5,)
 | 
						|
    data = torch.rand([bsz, *state_shape])
 | 
						|
    expect_output_shape = [bsz, *action_shape]
 | 
						|
    net = Net(
 | 
						|
        state_shape,
 | 
						|
        action_shape,
 | 
						|
        hidden_sizes=[128, 128],
 | 
						|
        norm_layer=torch.nn.LayerNorm,
 | 
						|
        activation=None,
 | 
						|
    )
 | 
						|
    assert list(net(data)[0].shape) == expect_output_shape
 | 
						|
    assert str(net).count("LayerNorm") == 2
 | 
						|
    assert str(net).count("ReLU") == 0
 | 
						|
    Q_param = V_param = {"hidden_sizes": [128, 128]}
 | 
						|
    net = Net(
 | 
						|
        state_shape,
 | 
						|
        action_shape,
 | 
						|
        hidden_sizes=[128, 128],
 | 
						|
        dueling_param=(Q_param, V_param),
 | 
						|
    )
 | 
						|
    assert list(net(data)[0].shape) == expect_output_shape
 | 
						|
    # concat
 | 
						|
    net = Net(state_shape, action_shape, hidden_sizes=[128], concat=True)
 | 
						|
    data = torch.rand([bsz, int(np.prod(state_shape)) + int(np.prod(action_shape))])
 | 
						|
    expect_output_shape = [bsz, 128]
 | 
						|
    assert list(net(data)[0].shape) == expect_output_shape
 | 
						|
    net = Net(
 | 
						|
        state_shape,
 | 
						|
        action_shape,
 | 
						|
        hidden_sizes=[128],
 | 
						|
        concat=True,
 | 
						|
        dueling_param=(Q_param, V_param),
 | 
						|
    )
 | 
						|
    assert list(net(data)[0].shape) == expect_output_shape
 | 
						|
    # recurrent actor/critic
 | 
						|
    data = torch.rand([bsz, *state_shape]).flatten(1)
 | 
						|
    expect_output_shape = [bsz, *action_shape]
 | 
						|
    net = RecurrentActorProb(3, state_shape, action_shape)
 | 
						|
    mu, sigma = net(data)[0]
 | 
						|
    assert mu.shape == sigma.shape
 | 
						|
    assert list(mu.shape) == [bsz, 5]
 | 
						|
    net = RecurrentCritic(3, state_shape, action_shape)
 | 
						|
    data = torch.rand([bsz, 8, int(np.prod(state_shape))])
 | 
						|
    act = torch.rand(expect_output_shape)
 | 
						|
    assert list(net(data, act).shape) == [bsz, 1]
 | 
						|
 | 
						|
 | 
						|
def test_lr_schedulers() -> None:
 | 
						|
    initial_lr_1 = 10.0
 | 
						|
    step_size_1 = 1
 | 
						|
    gamma_1 = 0.5
 | 
						|
    net_1 = torch.nn.Linear(2, 3)
 | 
						|
    optim_1 = torch.optim.Adam(net_1.parameters(), lr=initial_lr_1)
 | 
						|
    sched_1 = torch.optim.lr_scheduler.StepLR(optim_1, step_size=step_size_1, gamma=gamma_1)
 | 
						|
 | 
						|
    initial_lr_2 = 5.0
 | 
						|
    step_size_2 = 2
 | 
						|
    gamma_2 = 0.3
 | 
						|
    net_2 = torch.nn.Linear(3, 2)
 | 
						|
    optim_2 = torch.optim.Adam(net_2.parameters(), lr=initial_lr_2)
 | 
						|
    sched_2 = torch.optim.lr_scheduler.StepLR(optim_2, step_size=step_size_2, gamma=gamma_2)
 | 
						|
    schedulers = MultipleLRSchedulers(sched_1, sched_2)
 | 
						|
    for _ in range(10):
 | 
						|
        loss_1 = (torch.ones((1, 3)) - net_1(torch.ones((1, 2)))).sum()
 | 
						|
        optim_1.zero_grad()
 | 
						|
        loss_1.backward()
 | 
						|
        optim_1.step()
 | 
						|
        loss_2 = (torch.ones((1, 2)) - net_2(torch.ones((1, 3)))).sum()
 | 
						|
        optim_2.zero_grad()
 | 
						|
        loss_2.backward()
 | 
						|
        optim_2.step()
 | 
						|
        schedulers.step()
 | 
						|
    assert optim_1.state_dict()["param_groups"][0]["lr"] == (
 | 
						|
        initial_lr_1 * gamma_1 ** (10 // step_size_1)
 | 
						|
    )
 | 
						|
    assert optim_2.state_dict()["param_groups"][0]["lr"] == (
 | 
						|
        initial_lr_2 * gamma_2 ** (10 // step_size_2)
 | 
						|
    )
 | 
						|
 | 
						|
 | 
						|
if __name__ == "__main__":
 | 
						|
    test_noise()
 | 
						|
    test_moving_average()
 | 
						|
    test_rms()
 | 
						|
    test_net()
 | 
						|
    test_lr_schedulers()
 |