{ "nbformat": 4, "nbformat_minor": 0, "metadata": { "colab": { "provenance": [] }, "kernelspec": { "name": "python3", "display_name": "Python 3" }, "language_info": { "name": "python" } }, "cells": [ { "cell_type": "code", "execution_count": null, "metadata": { "id": "cesUkq8hA373" }, "outputs": [], "source": [ "# Remember to install tianshou first\n", "!pip install tianshou==0.4.8\n", "!pip install gym" ] }, { "cell_type": "markdown", "source": [ "# Overview\n", "In reinforcement learning, the agent interacts with environments to improve itself. In this tutorial we will concentrate on the agent part. In Tianshou, both the agent and the core DRL algorithm are implementated in the Policy module. Tianshou provides more than 20 Policy modules, each representing one DRL algorithm. See supported algorithms [here](https://github.com/thu-ml/tianshou).\n", "\n", "
\n", "\n", "\n", " The agents interacting with the environment \n", "
\n", "\n", "All Policy modules inherit from a BasePolicy Class and share the same interface." ], "metadata": { "id": "PNM9wqstBSY_" } }, { "cell_type": "markdown", "source": [ "# Creating you own Policy\n", "We will use the a simple REINFORCE algorithm Policy to show the implementation of a Policy Module. The Policy we implement here will be a highly scaled-down version of [PGPolicy](https://github.com/thu-ml/tianshou/blob/master/tianshou/policy/modelfree/pg.py) in Tianshou." ], "metadata": { "id": "ZqdHYdoJJS51" } }, { "cell_type": "markdown", "source": [ "## Initialisation\n", "Firstly we create the `REINFORCEPolicy` by inheriting from `BasePolicy` in Tianshou." ], "metadata": { "id": "PWFBgZ4TJkfz" } }, { "cell_type": "code", "source": [ "\n", "from typing import Any, Dict, List, Optional, Type, Union\n", "\n", "import numpy as np\n", "import torch\n", "\n", "from tianshou.data import Batch, ReplayBuffer, to_torch, to_torch_as\n", "from tianshou.policy import BasePolicy\n", "\n", "class REINFORCEPolicy(BasePolicy):\n", " \"\"\"Implementation of REINFORCE algorithm.\"\"\"\n", " def __init__(self):\n", " super().__init__()" ], "metadata": { "id": "cDlSjASbJmy-" }, "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": [ "As we have mentioned, the Policy Module mainly does two things:\n", "\n", "\n", "1. `policy.forward()` receives observation and other information (stored in a Batch) from the environment and returns a new Batch containing the action.\n", "2. `policy.update()` receives training data sampled from the replay buffer and updates itself, and then returns logging details.\n", "\n", "\n", "
\n", "\n", "\n", " policy.forward() and policy.update() \n", "
\n", "\n", "We also need to take care of the following things:\n", "\n", "\n", "\n", "1. Since Tianshou is a **Deep** RL libraries, there should be a policy network in our Policy Module, also a Torch optimizer.\n", "2. In Tianshou's BasePolicy, `Policy.update()` first calls `Policy.process_fn()` to preprocess training data and computes quantities like episodic returns (gradient free), then it will call `Policy.learn()` to perform the back-propagation.\n", "\n", "Then we get the implementation below.\n", "\n", "\n", "\n" ], "metadata": { "id": "qc1RnIBbLCDN" } }, { "cell_type": "code", "source": [ "class REINFORCEPolicy(BasePolicy):\n", " \"\"\"Implementation of REINFORCE algorithm.\"\"\"\n", " def __init__(self, model: torch.nn.Module, optim: torch.optim.Optimizer,):\n", " super().__init__()\n", " self.actor = model\n", " self.optim = optim\n", "\n", " def forward(self, batch: Batch) -> Batch:\n", " \"\"\"Compute action over the given batch data.\"\"\"\n", " act = None\n", " return Batch(act=act)\n", "\n", " def process_fn(self, batch: Batch, buffer: ReplayBuffer, indices: np.ndarray) -> Batch:\n", " \"\"\"Compute the discounted returns for each transition.\"\"\"\n", " pass\n", "\n", " def learn(self, batch: Batch, batch_size: int, repeat: int) -> Dict[str, List[float]]:\n", " \"\"\"Perform the back-propagation.\"\"\"\n", " return" ], "metadata": { "id": "6j32PSKUQ23w" }, "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": [ "## Policy.forward()\n", "According to the equation of REINFORCE algorithm in Spinning Up's [documentation](https://spinningup.openai.com/en/latest/algorithms/vpg.html), we need to map the observation to an action distribution in action space using neural network (`self.actor`).\n", "\n", "
\n", "\n", "\n", "
\n", "\n", "Let's us suppose the action space is discrete, and the distribution is a simple categorical distribution.\n", "\n" ], "metadata": { "id": "tjtqjt8WRY5e" } }, { "cell_type": "code", "source": [ "def forward(self, batch: Batch) -> Batch:\n", " \"\"\"Compute action over the given batch data.\"\"\"\n", " self.dist_fn = torch.distributions.Categorical\n", " logits = self.actor(batch.obs)\n", " dist = self.dist_fn(logits)\n", " act = dist.sample()\n", " return Batch(act=act, dist=dist)" ], "metadata": { "id": "uE4YDE-_RwgN" }, "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": [ "## Policy.process_fn()\n", "Now that we have defined our actor, if given training data we can set up a loss function and optimize our neural network. However, before that, we must first calculate episodic returns for every step in our training data to construct the REINFORCE loss function.\n", "\n", "Calculating episodic return is not hard, given `ReplayBuffer.next()` allows us to access every reward to go in an episode. A more convenient way would be to simply use the built-in method `BasePolicy.compute_episodic_return()` inherited from BasePolicy.\n" ], "metadata": { "id": "CultfOeuTx2V" } }, { "cell_type": "code", "source": [ "def process_fn(self, batch: Batch, buffer: ReplayBuffer, indices: np.ndarray) -> Batch:\n", " \"\"\"Compute the discounted returns for each transition.\"\"\"\n", " returns, _ = self.compute_episodic_return(batch, buffer, indices, gamma=0.99, gae_lambda=1.0)\n", " batch.returns = returns\n", " return batch" ], "metadata": { "id": "wPAmOD7zV7n2" }, "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": [ "`BasePolicy.compute_episodic_return()` could also be used to compute [GAE](https://arxiv.org/abs/1506.02438). Another similar method is `BasePolicy.compute_nstep_return()`. Check the [source code](https://github.com/thu-ml/tianshou/blob/6fc68578127387522424460790cbcb32a2bd43c4/tianshou/policy/base.py#L304) for more details." ], "metadata": { "id": "XA8OF4GnWWr5" } }, { "cell_type": "markdown", "source": [ "## Policy.learn()\n", "Data batch returned by `Policy.process_fn()` will flow into `Policy.learn()`. Finall we can construct our loss function and perform the back-propagation." ], "metadata": { "id": "7UsdzNaOXPpC" } }, { "cell_type": "code", "source": [ "def learn(self, batch: Batch, batch_size: int, repeat: int) -> Dict[str, List[float]]:\n", " \"\"\"Perform the back-propagation.\"\"\"\n", " logging_losses = []\n", " for _ in range(repeat):\n", " for minibatch in batch.split(batch_size, merge_last=True):\n", " self.optim.zero_grad()\n", " result = self(minibatch)\n", " dist = result.dist\n", " act = to_torch_as(minibatch.act, result.act)\n", " ret = to_torch(minibatch.returns, torch.float, result.act.device)\n", " log_prob = dist.log_prob(act).reshape(len(ret), -1).transpose(0, 1)\n", " loss = -(log_prob * ret).mean()\n", " loss.backward()\n", " self.optim.step()\n", " logging_losses.append(loss.item())\n", " return {\"loss\": logging_losses}" ], "metadata": { "id": "aCO-dLXWXtz9" }, "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": [ "## Implementation\n", "Finally we can assemble the implemented methods and form a REINFORCE Policy." ], "metadata": { "id": "1BtuV2W0YJTi" } }, { "cell_type": "code", "source": [ "class REINFORCEPolicy(BasePolicy):\n", " \"\"\"Implementation of REINFORCE algorithm.\"\"\"\n", " def __init__(self, model: torch.nn.Module, optim: torch.optim.Optimizer,):\n", " super().__init__()\n", " self.actor = model\n", " self.optim = optim\n", " # action distribution\n", " self.dist_fn = torch.distributions.Categorical\n", "\n", " def forward(self, batch: Batch) -> Batch:\n", " \"\"\"Compute action over the given batch data.\"\"\"\n", " logits, _ = self.actor(batch.obs)\n", " dist = self.dist_fn(logits)\n", " act = dist.sample()\n", " return Batch(act=act, dist=dist)\n", "\n", " def process_fn(self, batch: Batch, buffer: ReplayBuffer, indices: np.ndarray) -> Batch:\n", " \"\"\"Compute the discounted returns for each transition.\"\"\"\n", " returns, _ = self.compute_episodic_return(batch, buffer, indices, gamma=0.99, gae_lambda=1.0)\n", " batch.returns = returns\n", " return batch\n", "\n", " def learn(self, batch: Batch, batch_size: int, repeat: int) -> Dict[str, List[float]]:\n", " \"\"\"Perform the back-propagation.\"\"\"\n", " logging_losses = []\n", " for _ in range(repeat):\n", " for minibatch in batch.split(batch_size, merge_last=True):\n", " self.optim.zero_grad()\n", " result = self(minibatch)\n", " dist = result.dist\n", " act = to_torch_as(minibatch.act, result.act)\n", " ret = to_torch(minibatch.returns, torch.float, result.act.device)\n", " log_prob = dist.log_prob(act).reshape(len(ret), -1).transpose(0, 1)\n", " loss = -(log_prob * ret).mean()\n", " loss.backward()\n", " self.optim.step()\n", " logging_losses.append(loss.item())\n", " return {\"loss\": logging_losses}\n" ], "metadata": { "id": "Ab0KNQHTOlGo" }, "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": [ "# Use the policy\n", "Note that `BasePolicy` itself inherits from `torch.nn.Module`. As a result, you can consider all Policy modules as a Torch Module. They share similar APIs.\n", "\n", "Firstly we will initialise a new REINFORCE policy." ], "metadata": { "id": "xlPAbh0lKti8" } }, { "cell_type": "code", "source": [ "from tianshou.utils.net.common import Net\n", "from tianshou.utils.net.discrete import Actor\n", "import warnings\n", "warnings.filterwarnings('ignore')\n", "state_shape = 4\n", "action_shape = 2\n", "net = Net(state_shape, hidden_sizes=[16, 16], device=\"cpu\")\n", "actor = Actor(net, action_shape, device=\"cpu\").to(\"cpu\")\n", "optim = torch.optim.Adam(actor.parameters(), lr=0.0003)\n", "\n", "policy = REINFORCEPolicy(actor, optim)" ], "metadata": { "id": "JkLFA9Z1KjuX" }, "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": [ "REINFORCE policy shares same APIs with the Torch Module." ], "metadata": { "id": "LAo_0t2fekUD" } }, { "cell_type": "code", "source": [ "print(policy)\n", "print(\"========================================\")\n", "for para in policy.parameters():\n", " print(para.shape)" ], "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "id": "UiuTc8RhJiEi", "outputId": "9b5bc54c-6303-45f3-ba81-2216a44931e8" }, "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": [ "## Making decision\n", "Given a batch of observations, the policy can return a batch of actions and other data." ], "metadata": { "id": "-RCrsttYgAG-" } }, { "cell_type": "code", "source": [ "obs_batch = Batch(obs=np.ones(shape=(256, 4)))\n", "action = policy(obs_batch) # forward() method is called\n", "print(action)" ], "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "id": "0jkBb6AAgUla", "outputId": "37948844-cdd8-4567-9481-89453c80a157" }, "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": [ "## Save and Load models\n", "Naturally, Tianshou Policy can be saved and loaded like a normal Torch Network." ], "metadata": { "id": "swikhnuDfKep" } }, { "cell_type": "code", "source": [ "torch.save(policy.state_dict(), 'policy.pth')\n", "assert policy.load_state_dict(torch.load('policy.pth'))" ], "metadata": { "id": "tYOoWM_OJRnA" }, "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": [ "## Algorithm Updating\n", "We have to collect some data and save them in the ReplayBuffer before updating our agent(policy). Typically we use collector to collect data, but we leave this part till later when we have learned the Collector in Tianshou. For now we generate some **fake** data." ], "metadata": { "id": "gp8PzOYsg5z-" } }, { "cell_type": "markdown", "source": [ "### Generating fake data\n", "Firstly, we need to \"pretend\" that we are using the \"Policy\" to collect data. We plan to collect 10 data so that we can update our algorithm." ], "metadata": { "id": "XrrPxOUAYShR" } }, { "cell_type": "code", "source": [ "import gym\n", "from tianshou.data import Batch, ReplayBuffer\n", "# a buffer is initialised with its maxsize set to 20.\n", "print(\"========================================\")\n", "buf = ReplayBuffer(size=12)\n", "print(buf)\n", "print(\"maxsize: {}, data length: {}\".format(buf.maxsize, len(buf)))\n", "env = gym.make(\"CartPole-v0\")" ], "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "id": "a14CmzSfYh5C", "outputId": "aaf45a1f-5e21-4bc8-cbe3-8ce798258af0" }, "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": [ "Now we are pretending to collect the first episode. The first episode ends at step 3 (perhaps because we are performing too badly)." ], "metadata": { "id": "8S94cV7yZITR" } }, { "cell_type": "code", "source": [ "obs = env.reset()\n", "for i in range(3):\n", " act = policy(Batch(obs=obs[np.newaxis, :])).act.item()\n", " obs_next, rew, done, info = env.step(act)\n", " # pretend ending at step 3\n", " done = True if i==2 else False\n", " info[\"id\"] = i\n", " buf.add(Batch(obs=obs, act=act, rew=rew, done=done, obs_next=obs_next, info=info))\n", " obs = obs_next" ], "metadata": { "id": "a_mtvbmBZbfs" }, "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": [ "Now we are pretending to collect the second episode. At step 7 the second episode still does't end, but we are unwilling to wait, so we stop collecting to update the algorithm." ], "metadata": { "id": "pkxq4gu9bGkt" } }, { "cell_type": "code", "source": [ "obs = env.reset()\n", "for i in range(3, 10):\n", " act = policy(Batch(obs=obs[np.newaxis, :])).act.item()\n", " obs_next, rew, done, info = env.step(act)\n", " # pretend this episode never end\n", " done = False\n", " info[\"id\"] = i\n", " buf.add(Batch(obs=obs, act=act, rew=rew, done=done, obs_next=obs_next, info=info))\n", " obs = obs_next" ], "metadata": { "id": "pAoKe02ybG68" }, "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": [ "Our replay buffer looks like this now." ], "metadata": { "id": "MKM6aWMucv-M" } }, { "cell_type": "code", "source": [ "print(buf)\n", "print(\"maxsize: {}, data length: {}\".format(buf.maxsize, len(buf)))" ], "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "id": "CSJEEWOqXdTU", "outputId": "2b3bb75c-f219-4e82-ca78-0ea6173a91f9" }, "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": [ "### Updates\n", "Now we have got a replay buffer with 10 data steps in it. We can call `Policy.update()` to train." ], "metadata": { "id": "55VWhWpkdfEb" } }, { "cell_type": "code", "source": [ "# 0 means sample all data from the buffer\n", "# batch_size=10 defines the training batch size\n", "# repeat=6 means repeat the training for 6 times\n", "policy.update(0, buf, batch_size=10, repeat=6)" ], "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "id": "i_O1lJDWdeoc", "outputId": "b154741a-d6dc-46cb-898f-6e84fa14e5a7" }, "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": [ "Not that difficult, right?" ], "metadata": { "id": "enqlFQLSJrQl" } }, { "cell_type": "markdown", "source": [ "# Further Reading\n", "\n", "\n" ], "metadata": { "id": "QJ5krjrcbuiA" } }, { "cell_type": "markdown", "source": [ "## Pre-defined Networks\n", "Tianshou provides numberous pre-defined networks usually used in DRL so that you don't have to bother yourself. Check this [documentation](https://tianshou.readthedocs.io/en/master/api/tianshou.utils.html#pre-defined-networks) for details." ], "metadata": { "id": "pmWi3HuXWcV8" } }, { "cell_type": "markdown", "source": [ "## How to compute GAE on your own?\n", "(Note that for this reading you need to understand the calculation of [GAE](https://arxiv.org/abs/1506.02438) advantage first)\n", "\n", "In terms of code implementation, perhaps the most difficult and annoying part is computing GAE advantage. Just now, we use the `self.compute_episodic_return()` method inherited from `BasePolicy` to save us from all those troubles. However, it is still important that we know the details behind this.\n", "\n", "To compute GAE advantage, the usage of `self.compute_episodic_return()` may goes like:" ], "metadata": { "id": "UPVl5LBEWJ0t" } }, { "cell_type": "code", "source": [ "batch, indices = buf.sample(0) # 0 means sampling all the data from the buffer\n", "returns, advantage = BasePolicy.compute_episodic_return(batch, buf, indices, v_s_=np.zeros(10), v_s=np.zeros(10), gamma=1.0, gae_lambda=1.0)\n", "print(returns)\n", "print(advantage)" ], "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "id": "D34GlVvPNz08", "outputId": "43a4e5df-59b5-4e4a-c61c-e69090810215" }, "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": [ "In the code above, we sample all the 10 data in the buffer and try to compute the GAE advantage. As we know, we need to estimate the value function of every observation to compute GAE advantage. so the passed in `v_s` is the value of bacth.obs, `v_s_` is the value of bacth.obs_next this is usually computed by:\n", "\n", "`v_s = critic(bacth.obs)`,\n", "\n", "`v_s_ = critic(bacth.obs_next)`,\n", "\n", "where uboth `v_s` and `v_s_` are 10 dimensional arrays and `critic` is usually a neural network.\n", "\n", "After we've got all those values, GAE can be computed following the equation below." ], "metadata": { "id": "h_5Dt6XwQLXV" } }, { "cell_type": "markdown", "source": [ "\\begin{aligned}\n", "\\hat{A}_{t}^{\\mathrm{GAE}(\\gamma, \\lambda)}: =& \\sum_{l=0}^{\\infty}(\\gamma \\lambda)^{l} \\delta_{t+l}^{V}\n", "\\end{aligned}\n", "\n", "while\n", "\n", "\\begin{equation}\n", "\\delta_{t}^{V} \\quad=-V\\left(s_{t}\\right)+r_{t}+\\gamma V\\left(s_{t+1}\\right)\n", "\\end{equation}\n" ], "metadata": { "id": "ooHNIICGUO19" } }, { "cell_type": "markdown", "source": [ "But, if you do follow this equation I refered from the paper. You probably will get a slightly lower performance than you expected. There are at least 3 \"bugs\" in this equation." ], "metadata": { "id": "eV6XZaouU7EV" } }, { "cell_type": "markdown", "source": [ "**First** is that Gym always returns you a `obs_next` even if this is already the last step. The value of this timestep is exactly 0 and you should not let the neural network estimate it." ], "metadata": { "id": "FCxD9gNNVYbd" } }, { "cell_type": "code", "source": [ "import copy\n", "# Assume v_s_ is got by calling critic(bacth.obs_next)\n", "v_s_ = np.ones(10)\n", "v_s_ *= ~batch.done\n", "print(v_s_)" ], "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "id": "rNZNUNgQVvRJ", "outputId": "44354595-c25a-4da8-b4d8-cffa31ac4b7d" }, "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": [ "After the fix above, we will perhaps get a more accurate estimate.\n", "\n", "**Secondly**, you must know when to stop bootstrapping. Usually we stop bootstrapping when we meet a `done` flag. However, in the buffer above, the last (10th) step is not marked by done=True, because the collecting has not finished. We must know all those unfinished steps so that we know when to stop bootstraping.\n", "\n", "Luckily, this can be done under the assistance of buffer because buffers in Tianshou not only store data, but also help you manage data trajectories." ], "metadata": { "id": "2EtMi18QWXTN" } }, { "cell_type": "code", "source": [ "unfinished_indexes = buf.unfinished_index()\n", "print(unfinished_indexes)\n", "done_indexes = np.where(batch.done)[0]\n", "print(done_indexes)\n", "stop_bootstrap_ids = np.concatenate([unfinished_indexes, done_indexes])\n", "print(stop_bootstrap_ids)" ], "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "id": "saluvX4JU6bC", "outputId": "2994d178-2f33-40a0-a6e4-067916b0b5c5" }, "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": [ "**Thirdly**, there are some special indexes which are marked by done flag. However, its value for obs_next should not be zero. This is because these steps are usually those at the last step of an episode, but this episode stops not because the agent can no longer get any rewards (value=0), but because the episode is too long so we have to truncate it. These kind of steps are always marked with `info['TimeLimit.truncated']=True` in Gym." ], "metadata": { "id": "qp6vVE4dYWv1" } }, { "cell_type": "markdown", "source": [ "As a result, we need to rewrite the equation above\n", "\n", "`v_s_ *= ~batch.done`" ], "metadata": { "id": "tWkqXRJfZTvV" } }, { "cell_type": "markdown", "source": [ "to\n", "\n", "```\n", "mask = batch.info['TimeLimit.truncated'] | (~batch.done)\n", "v_s_ *= mask\n", "\n", "```\n", "\n", "\n", "\n" ], "metadata": { "id": "kms-QtxKZe-M" } }, { "cell_type": "markdown", "source": [ "### Summary\n", "If you already felt bored by now, simply remember that Tianshou can help handle all these little details so that you can focus on the algorithm itself. Just call `BasePolicy.compute_episodic_return()`.\n", "\n", "If you still feel interested, we would recommend you check Appendix C in this [paper](https://arxiv.org/abs/2107.14171v2) and implementation of `BasePolicy.value_mask()` and `BasePolicy.compute_episodic_return()` for details." ], "metadata": { "id": "u_aPPoKraBu6" } }, { "cell_type": "markdown", "source": [ "\n", "![timelimit.svg]()\n", "\n", "![22.PNG]()" ], "metadata": { "id": "2cPnUXRBWKD9" } } ] }