2018-05-20 22:36:04 +08:00

131 lines
5.4 KiB
Python

"""
adapted from keras-rl
"""
from __future__ import division
import numpy as np
__all__ = [
'GaussianWhiteNoiseProcess',
'OrnsteinUhlenbeckProcess',
]
class RandomProcess(object):
"""
Base class for random process for exploration in the environment.
"""
def reset_states(self):
"""
Reset the internal states, if any, of the random process. Does nothing by default.
"""
pass
class AnnealedGaussianProcess(RandomProcess):
"""
Class for annealed Gaussian process, annealing the sigma in the Gaussian-like distribution along sampling.
At each timestep, the class samples from a Gaussian-like distribution.
:param mu: A float. Specifying the mean of the Gaussian-like distribution.
:param sigma: A float. Specifying the std of teh Gaussian-like distribution.
:param sigma_min: A float. Specifying the minimum std until which the annealing stops.
:param n_steps_annealing: An int. It specifies the total number of steps for which the annealing happens.
"""
def __init__(self, mu, sigma, sigma_min, n_steps_annealing):
self.mu = mu
self.sigma = sigma
self.n_steps = 0
if sigma_min is not None:
self.m = -float(sigma - sigma_min) / float(n_steps_annealing)
self.c = sigma
self.sigma_min = sigma_min
else:
self.m = 0.
self.c = sigma
self.sigma_min = sigma
@property
def current_sigma(self):
"""The current sigma after potential annealing."""
sigma = max(self.sigma_min, self.m * float(self.n_steps) + self.c)
return sigma
class GaussianWhiteNoiseProcess(AnnealedGaussianProcess):
"""
Class for Gaussian white noise. At each timestep, the class samples from an exact Gaussian distribution.
It allows annealing in the std of the Gaussian, but the distribution is independent at different timesteps.
:param mu: A float defaulting to 0. Specifying the mean of the Gaussian-like distribution.
:param sigma: A float defaulting to 1. Specifying the std of the Gaussian-like distribution.
:param sigma_min: Optional. A float. Specifying the minimum std until which the annealing stops. It defaults to
``None`` where no annealing takes place.
:param n_steps_annealing: Optional. An int. It specifies the total number of steps for which the annealing happens.
Only effective when ``sigma_mean`` is not ``None``.
:param size: An int or tuple of ints. It corresponds to the shape of the action of the environment.
"""
def __init__(self, mu=0., sigma=1., sigma_min=None, n_steps_annealing=1000, size=1):
super(GaussianWhiteNoiseProcess, self).__init__(mu=mu, sigma=sigma, sigma_min=sigma_min, n_steps_annealing=n_steps_annealing)
self.size = size
def sample(self):
"""
Draws one sample from the random process.
:return: A numpy array. The drawn sample.
"""
sample = np.random.normal(self.mu, self.current_sigma, self.size)
self.n_steps += 1
return sample
class OrnsteinUhlenbeckProcess(AnnealedGaussianProcess):
"""
Class for Ornstein-Uhlenbeck Process, as used for exploration in DDPG. Implemented based on
http://math.stackexchange.com/questions/1287634/implementing-ornstein-uhlenbeck-in-matlab .
It basically is a temporal-correlated Gaussian process where the distribution at the current timestep depends on
the samples from the last timestep. It's not exactly Gaussian but still resembles Gaussian.
:param theta: A float. A special parameter for this process.
:param mu: A float. Another parameter of this process, but it's not exactly the mean of the distribution.
:param sigma: A float. Another parameter of this process. It acts like the std of the Gaussian-like distribution
to some extent.
:param dt: A float. The time interval to simulate this process discretely, as the process is mathematically defined
to be a continuous one.
:param x0: Optional. A float. The initial value of "the samples from the last timestep" so as to draw the first
sample. It defaults to zero.
:param size: An int or tuple of ints. It corresponds to the shape of the action of the environment.
:param sigma_min: Optional. A float. Specifying the minimum std until which the annealing stops. It defaults to
``None`` where no annealing takes place.
:param n_steps_annealing: An int. It specifies the total number of steps for which the annealing happens.
"""
def __init__(self, theta, mu=0., sigma=1., dt=1e-2, x0=None, size=1, sigma_min=None, n_steps_annealing=1000):
super(OrnsteinUhlenbeckProcess, self).__init__(
mu=mu, sigma=sigma, sigma_min=sigma_min, n_steps_annealing=n_steps_annealing)
self.theta = theta
self.mu = mu
self.dt = dt
self.x0 = x0
self.size = size
self.reset_states()
def sample(self):
"""
Draws one sample from the random process.
:return: A numpy array. The drawn sample.
"""
x = self.x_prev + self.theta * (self.mu - self.x_prev) * self.dt + \
self.current_sigma * np.sqrt(self.dt) * np.random.normal(size=self.size)
self.x_prev = x
self.n_steps += 1
return x
def reset_states(self):
"""
Reset ``self.x_prev`` to be ``self.x0``.
"""
self.x_prev = self.x0 if self.x0 is not None else np.zeros(self.size)