131 lines
5.4 KiB
Python
131 lines
5.4 KiB
Python
"""
|
|
adapted from keras-rl
|
|
"""
|
|
|
|
from __future__ import division
|
|
import numpy as np
|
|
|
|
__all__ = [
|
|
'GaussianWhiteNoiseProcess',
|
|
'OrnsteinUhlenbeckProcess',
|
|
]
|
|
|
|
|
|
class RandomProcess(object):
|
|
"""
|
|
Base class for random process for exploration in the environment.
|
|
"""
|
|
def reset_states(self):
|
|
"""
|
|
Reset the internal states, if any, of the random process. Does nothing by default.
|
|
"""
|
|
pass
|
|
|
|
|
|
class AnnealedGaussianProcess(RandomProcess):
|
|
"""
|
|
Class for annealed Gaussian process, annealing the sigma in the Gaussian-like distribution along sampling.
|
|
At each timestep, the class samples from a Gaussian-like distribution.
|
|
|
|
:param mu: A float. Specifying the mean of the Gaussian-like distribution.
|
|
:param sigma: A float. Specifying the std of teh Gaussian-like distribution.
|
|
:param sigma_min: A float. Specifying the minimum std until which the annealing stops.
|
|
:param n_steps_annealing: An int. It specifies the total number of steps for which the annealing happens.
|
|
"""
|
|
def __init__(self, mu, sigma, sigma_min, n_steps_annealing):
|
|
self.mu = mu
|
|
self.sigma = sigma
|
|
self.n_steps = 0
|
|
|
|
if sigma_min is not None:
|
|
self.m = -float(sigma - sigma_min) / float(n_steps_annealing)
|
|
self.c = sigma
|
|
self.sigma_min = sigma_min
|
|
else:
|
|
self.m = 0.
|
|
self.c = sigma
|
|
self.sigma_min = sigma
|
|
|
|
@property
|
|
def current_sigma(self):
|
|
"""The current sigma after potential annealing."""
|
|
sigma = max(self.sigma_min, self.m * float(self.n_steps) + self.c)
|
|
return sigma
|
|
|
|
|
|
class GaussianWhiteNoiseProcess(AnnealedGaussianProcess):
|
|
"""
|
|
Class for Gaussian white noise. At each timestep, the class samples from an exact Gaussian distribution.
|
|
It allows annealing in the std of the Gaussian, but the distribution is independent at different timesteps.
|
|
|
|
:param mu: A float defaulting to 0. Specifying the mean of the Gaussian-like distribution.
|
|
:param sigma: A float defaulting to 1. Specifying the std of the Gaussian-like distribution.
|
|
:param sigma_min: Optional. A float. Specifying the minimum std until which the annealing stops. It defaults to
|
|
``None`` where no annealing takes place.
|
|
:param n_steps_annealing: Optional. An int. It specifies the total number of steps for which the annealing happens.
|
|
Only effective when ``sigma_mean`` is not ``None``.
|
|
:param size: An int or tuple of ints. It corresponds to the shape of the action of the environment.
|
|
"""
|
|
def __init__(self, mu=0., sigma=1., sigma_min=None, n_steps_annealing=1000, size=1):
|
|
super(GaussianWhiteNoiseProcess, self).__init__(mu=mu, sigma=sigma, sigma_min=sigma_min, n_steps_annealing=n_steps_annealing)
|
|
self.size = size
|
|
|
|
def sample(self):
|
|
"""
|
|
Draws one sample from the random process.
|
|
|
|
:return: A numpy array. The drawn sample.
|
|
"""
|
|
sample = np.random.normal(self.mu, self.current_sigma, self.size)
|
|
self.n_steps += 1
|
|
return sample
|
|
|
|
|
|
class OrnsteinUhlenbeckProcess(AnnealedGaussianProcess):
|
|
"""
|
|
Class for Ornstein-Uhlenbeck Process, as used for exploration in DDPG. Implemented based on
|
|
http://math.stackexchange.com/questions/1287634/implementing-ornstein-uhlenbeck-in-matlab .
|
|
It basically is a temporal-correlated Gaussian process where the distribution at the current timestep depends on
|
|
the samples from the last timestep. It's not exactly Gaussian but still resembles Gaussian.
|
|
|
|
:param theta: A float. A special parameter for this process.
|
|
:param mu: A float. Another parameter of this process, but it's not exactly the mean of the distribution.
|
|
:param sigma: A float. Another parameter of this process. It acts like the std of the Gaussian-like distribution
|
|
to some extent.
|
|
:param dt: A float. The time interval to simulate this process discretely, as the process is mathematically defined
|
|
to be a continuous one.
|
|
:param x0: Optional. A float. The initial value of "the samples from the last timestep" so as to draw the first
|
|
sample. It defaults to zero.
|
|
:param size: An int or tuple of ints. It corresponds to the shape of the action of the environment.
|
|
:param sigma_min: Optional. A float. Specifying the minimum std until which the annealing stops. It defaults to
|
|
``None`` where no annealing takes place.
|
|
:param n_steps_annealing: An int. It specifies the total number of steps for which the annealing happens.
|
|
"""
|
|
def __init__(self, theta, mu=0., sigma=1., dt=1e-2, x0=None, size=1, sigma_min=None, n_steps_annealing=1000):
|
|
super(OrnsteinUhlenbeckProcess, self).__init__(
|
|
mu=mu, sigma=sigma, sigma_min=sigma_min, n_steps_annealing=n_steps_annealing)
|
|
self.theta = theta
|
|
self.mu = mu
|
|
self.dt = dt
|
|
self.x0 = x0
|
|
self.size = size
|
|
self.reset_states()
|
|
|
|
def sample(self):
|
|
"""
|
|
Draws one sample from the random process.
|
|
|
|
:return: A numpy array. The drawn sample.
|
|
"""
|
|
x = self.x_prev + self.theta * (self.mu - self.x_prev) * self.dt + \
|
|
self.current_sigma * np.sqrt(self.dt) * np.random.normal(size=self.size)
|
|
self.x_prev = x
|
|
self.n_steps += 1
|
|
return x
|
|
|
|
def reset_states(self):
|
|
"""
|
|
Reset ``self.x_prev`` to be ``self.x0``.
|
|
"""
|
|
self.x_prev = self.x0 if self.x0 is not None else np.zeros(self.size)
|