maxhuettenrauch 522f7fbf98
Feature/dataclasses (#996)
This PR adds strict typing to the output of `update` and `learn` in all
policies. This will likely be the last large refactoring PR before the
next release (0.6.0, not 1.0.0), so it requires some attention. Several
difficulties were encountered on the path to that goal:

1. The policy hierarchy is actually "broken" in the sense that the keys
of dicts that were output by `learn` did not follow the same enhancement
(inheritance) pattern as the policies. This is a real problem and should
be addressed in the near future. Generally, several aspects of the
policy design and hierarchy might deserve a dedicated discussion.
2. Each policy needs to be generic in the stats return type, because one
might want to extend it at some point and then also extend the stats.
Even within the source code base this pattern is necessary in many
places.
3. The interaction between learn and update is a bit quirky, we
currently handle it by having update modify special field inside
TrainingStats, whereas all other fields are handled by learn.
4. The IQM module is a policy wrapper and required a
TrainingStatsWrapper. The latter relies on a bunch of black magic.

They were addressed by:
1. Live with the broken hierarchy, which is now made visible by bounds
in generics. We use type: ignore where appropriate.
2. Make all policies generic with bounds following the policy
inheritance hierarchy (which is incorrect, see above). We experimented a
bit with nested TrainingStats classes, but that seemed to add more
complexity and be harder to understand. Unfortunately, mypy thinks that
the code below is wrong, wherefore we have to add `type: ignore` to the
return of each `learn`

```python

T = TypeVar("T", bound=int)


def f() -> T:
  return 3
```

3. See above
4. Write representative tests for the `TrainingStatsWrapper`. Still, the
black magic might cause nasty surprises down the line (I am not proud of
it)...

Closes #933

---------

Co-authored-by: Maximilian Huettenrauch <m.huettenrauch@appliedai.de>
Co-authored-by: Michael Panchenko <m.panchenko@appliedai.de>
2023-12-30 11:09:03 +01:00

234 lines
9.2 KiB
Python

import warnings
from collections.abc import Callable
from dataclasses import dataclass
from typing import Any, Generic, Literal, TypeAlias, TypeVar, cast
import gymnasium as gym
import numpy as np
import torch
from tianshou.data import (
Batch,
ReplayBuffer,
SequenceSummaryStats,
to_torch,
to_torch_as,
)
from tianshou.data.batch import BatchProtocol
from tianshou.data.types import (
BatchWithReturnsProtocol,
DistBatchProtocol,
ObsBatchProtocol,
RolloutBatchProtocol,
)
from tianshou.policy import BasePolicy
from tianshou.policy.base import TLearningRateScheduler, TrainingStats
from tianshou.utils import RunningMeanStd
# TODO: Is there a better way to define this type? mypy doesn't like Callable[[torch.Tensor, ...], torch.distributions.Distribution]
TDistributionFunction: TypeAlias = Callable[..., torch.distributions.Distribution]
@dataclass(kw_only=True)
class PGTrainingStats(TrainingStats):
loss: SequenceSummaryStats
TPGTrainingStats = TypeVar("TPGTrainingStats", bound=PGTrainingStats)
class PGPolicy(BasePolicy[TPGTrainingStats], Generic[TPGTrainingStats]):
"""Implementation of REINFORCE algorithm.
:param actor: mapping (s->model_output), should follow the rules in
:class:`~tianshou.policy.BasePolicy`.
:param optim: optimizer for actor network.
:param dist_fn: distribution class for computing the action.
Maps model_output -> distribution. Typically a Gaussian distribution
taking `model_output=mean,std` as input for continuous action spaces,
or a categorical distribution taking `model_output=logits`
for discrete action spaces. Note that as user, you are responsible
for ensuring that the distribution is compatible with the action space.
:param action_space: env's action space.
:param discount_factor: in [0, 1].
:param reward_normalization: if True, will normalize the *returns*
by subtracting the running mean and dividing by the running standard deviation.
Can be detrimental to performance! See TODO in process_fn.
:param deterministic_eval: if True, will use deterministic action (the dist's mode)
instead of stochastic one during evaluation. Does not affect training.
:param observation_space: Env's observation space.
:param action_scaling: if True, scale the action from [-1, 1] to the range
of action_space. Only used if the action_space is continuous.
:param action_bound_method: method to bound action to range [-1, 1].
Only used if the action_space is continuous.
:param lr_scheduler: if not None, will be called in `policy.update()`.
.. seealso::
Please refer to :class:`~tianshou.policy.BasePolicy` for more detailed explanation.
"""
def __init__(
self,
*,
actor: torch.nn.Module,
optim: torch.optim.Optimizer,
dist_fn: TDistributionFunction,
action_space: gym.Space,
discount_factor: float = 0.99,
# TODO: rename to return_normalization?
reward_normalization: bool = False,
deterministic_eval: bool = False,
observation_space: gym.Space | None = None,
# TODO: why change the default from the base?
action_scaling: bool = True,
action_bound_method: Literal["clip", "tanh"] | None = "clip",
lr_scheduler: TLearningRateScheduler | None = None,
) -> None:
super().__init__(
action_space=action_space,
observation_space=observation_space,
action_scaling=action_scaling,
action_bound_method=action_bound_method,
lr_scheduler=lr_scheduler,
)
if action_scaling and not np.isclose(actor.max_action, 1.0):
warnings.warn(
"action_scaling and action_bound_method are only intended"
"to deal with unbounded model action space, but find actor model"
f"bound action space with max_action={actor.max_action}."
"Consider using unbounded=True option of the actor model,"
"or set action_scaling to False and action_bound_method to None.",
)
self.actor = actor
self.optim = optim
self.dist_fn = dist_fn
assert 0.0 <= discount_factor <= 1.0, "discount factor should be in [0, 1]"
self.gamma = discount_factor
self.rew_norm = reward_normalization
self.ret_rms = RunningMeanStd()
self._eps = 1e-8
self.deterministic_eval = deterministic_eval
def process_fn(
self,
batch: RolloutBatchProtocol,
buffer: ReplayBuffer,
indices: np.ndarray,
) -> BatchWithReturnsProtocol:
r"""Compute the discounted returns (Monte Carlo estimates) for each transition.
They are added to the batch under the field `returns`.
Note: this function will modify the input batch!
.. math::
G_t = \sum_{i=t}^T \gamma^{i-t}r_i
where :math:`T` is the terminal time step, :math:`\gamma` is the
discount factor, :math:`\gamma \in [0, 1]`.
:param batch: a data batch which contains several episodes of data in
sequential order. Mind that the end of each finished episode of batch
should be marked by done flag, unfinished (or collecting) episodes will be
recognized by buffer.unfinished_index().
:param buffer: the corresponding replay buffer.
:param numpy.ndarray indices: tell batch's location in buffer, batch is equal
to buffer[indices].
"""
v_s_ = np.full(indices.shape, self.ret_rms.mean)
# gae_lambda = 1.0 means we use Monte Carlo estimate
unnormalized_returns, _ = self.compute_episodic_return(
batch,
buffer,
indices,
v_s_=v_s_,
gamma=self.gamma,
gae_lambda=1.0,
)
# TODO: overridden in A2C, where mean is not subtracted. Subtracting mean
# can be very detrimental! It also has no theoretical grounding.
# This should be addressed soon!
if self.rew_norm:
batch.returns = (unnormalized_returns - self.ret_rms.mean) / np.sqrt(
self.ret_rms.var + self._eps,
)
self.ret_rms.update(unnormalized_returns)
else:
batch.returns = unnormalized_returns
batch: BatchWithReturnsProtocol
return batch
def _get_deterministic_action(self, logits: torch.Tensor) -> torch.Tensor:
if self.action_type == "discrete":
return logits.argmax(-1)
if self.action_type == "continuous":
# assume that the mode of the distribution is the first element
# of the actor's output (the "logits")
return logits[0]
raise RuntimeError(
f"Unknown action type: {self.action_type}. "
f"This should not happen and might be a bug."
f"Supported action types are: 'discrete' and 'continuous'.",
)
def forward(
self,
batch: ObsBatchProtocol,
state: dict | BatchProtocol | np.ndarray | None = None,
**kwargs: Any,
) -> DistBatchProtocol:
"""Compute action over the given batch data by applying the actor.
Will sample from the dist_fn, if appropriate.
Returns a new object representing the processed batch data
(contrary to other methods that modify the input batch inplace).
.. seealso::
Please refer to :meth:`~tianshou.policy.BasePolicy.forward` for
more detailed explanation.
"""
# TODO: rename? It's not really logits and there are particular
# assumptions about the order of the output and on distribution type
logits, hidden = self.actor(batch.obs, state=state, info=batch.info)
if isinstance(logits, tuple):
dist = self.dist_fn(*logits)
else:
dist = self.dist_fn(logits)
# in this case, the dist is unused!
if self.deterministic_eval and not self.training:
act = self._get_deterministic_action(logits)
else:
act = dist.sample()
result = Batch(logits=logits, act=act, state=hidden, dist=dist)
return cast(DistBatchProtocol, result)
# TODO: why does mypy complain?
def learn( # type: ignore
self,
batch: BatchWithReturnsProtocol,
batch_size: int | None,
repeat: int,
*args: Any,
**kwargs: Any,
) -> TPGTrainingStats:
losses = []
split_batch_size = batch_size or -1
for _ in range(repeat):
for minibatch in batch.split(split_batch_size, merge_last=True):
self.optim.zero_grad()
result = self(minibatch)
dist = result.dist
act = to_torch_as(minibatch.act, result.act)
ret = to_torch(minibatch.returns, torch.float, result.act.device)
log_prob = dist.log_prob(act).reshape(len(ret), -1).transpose(0, 1)
loss = -(log_prob * ret).mean()
loss.backward()
self.optim.step()
losses.append(loss.item())
loss_summary_stat = SequenceSummaryStats.from_sequence(losses)
return PGTrainingStats(loss=loss_summary_stat) # type: ignore[return-value]