diff --git a/docs/ch8/main.md b/docs/ch8/main.md
index 3e79948..dcd1052 100644
--- a/docs/ch8/main.md
+++ b/docs/ch8/main.md
@@ -85,35 +85,219 @@ $\qquad$ 总的来讲,$\text{Dueling DQN}$ 算法在某些情况下相对于 $
## Noisy DQN 算法
-Noisy DQN 算法③ 也是通过优化网络结构的方法来提升 DQN 算法的性能,但与 Dueling 算法不同的是,它的目的并不是为了提高 $Q$ 值的估计,而是增强网络的探索能力。
+$\qquad$ $\text{Noisy DQN}$ 算法③ 也是通过优化网络结构的方法来提升 DQN 算法的性能,但与 Dueling 算法不同的是,它的目的并不是为了提高 $Q$ 值的估计,而是增强网络的探索能力。
-> ③ 论文链接:https://arxiv.org/abs/1706.10295
+> ③ Fortunato M , Azar M G , Piot B ,et al.Noisy Networks for Exploration[J]. 2017.DOI:10.48550/arXiv.1706.10295.
-从 Q-learning 算法开始,我们就讲到了探索策略以及探索-利用窘境的问题,常见的 $\varepsilon-greedy$ 策略是从智能体与环境的交互过程改善探索能力,以避免陷入局部最优解。而在深度强化学习中,由于引入了深度学习,深度学习本身也会因为网络模型限制或者梯度下降方法陷入局部最优解问题。也就是说,深度强化学习既要考虑与环境交互过程中的探索能力,也要考虑深度模型本身的探索能力,从而尽量避免陷入局部最优解的困境之中,这也是为什么经常有人会说强化学习比深度学习更难“炼丹”的原因之一。
+$\qquad$ 从 $\text{Q-learning}$ 算法开始,我们就讲到了探索-利用平衡的问题,常见的 $\varepsilon-greedy$ 策略是从智能体与环境的交互过程改善探索能力,以避免陷入局部最优解。而在深度强化学习中,由于引入了深度学习,深度学习本身也会因为网络模型限制或者梯度下降方法陷入局部最优解问题。也就是说,深度强化学习既要考虑与环境交互过程中的探索能力,也要考虑深度模型本身的探索能力,从而尽量避免陷入局部最优解的困境之中,这也是为什么经常有人会说强化学习比深度学习更难“炼丹”的原因之一。
-回归正题,Noisy DQN 算法其实是在 DQN 算法基础上在神经网络中引入了噪声层来提高网络性能的,即将随机性应用到神经网络中的参数或者说权重,增加了 $Q$ 网络对于状态和动作空间的探索能力,从而提高收敛速度和稳定性。在实践上也比较简单,就是通过添加随机性参数到神经网络的线性层,对应的 $Q$ 值则可以表示为 $Q_{\theta+\epsilon}$,注意不要把这里的 $\epsilon$ 跟 $\varepsilon-greedy$ 策略中的 $\varepsilon$ 混淆了。虽然都叫做 epsilon,但这里 $\epsilon$ 是由高斯分布生成的总体分类噪声参数。
+$\qquad$ 回归正题,$\text{Noisy DQN}$ 算法其实是在 $\text{DQN}$ 算法基础上在神经网络中引入了噪声层来提高网络性能的,即将随机性应用到神经网络中的参数或者说权重,增加了 $Q$ 网络对于状态和动作空间的探索能力,从而提高收敛速度和稳定性。在实践上也比较简单,就是通过添加随机性参数到神经网络的线性层,对应的 $Q$ 值则可以表示为 $Q_{\theta+\epsilon}$,注意不要把这里的 $\epsilon$ 跟 $\varepsilon-greedy$ 策略中的 $\varepsilon$ 混淆了。虽然都叫做 $\epsilon$ ,但这里 $\epsilon$ 是由高斯分布生成的总体分类噪声参数。
-在实战中,我们首先可以定义引入了噪声层的线性层,如下:
+$\qquad$ 其实在网络模型中增加噪声层是一种比较泛用的做法,而不只是用在 $\text{DQN}$ 算法中,具体做法读者可以参考后面的实战内容。
+
+
+## PER DQN 算法
+
+$\qquad$ 在 $\text{DQN}$ 算法章节中我们讲到经验回放,从另一个角度来说也是为了优化深度网络中梯度下降的方式,或者说网络参数更新的方式。在本节要讲的 $\text{PER DQN}$ 算法④中,进一步优化了经验回放的设计从而提高模型的收敛能力和鲁棒性。PER 可以翻译为优先经验回放($\text{prioritized experience replay}$),跟数据结构中优先队列与普通队列一样,会在采样过程中赋予经验回放中样本的优先级。
+
+> ④ Schaul T , Quan J , Antonoglou I ,et al.Prioritized Experience Replay[J].Computer Science, 2015.DOI:10.48550/arXiv.1511.05952.
+
+$\qquad$ 具体以什么为依据来为经验回放中的样本赋予不同优先级呢?答案是 $\text{TD}$ 误差。$\text{TD}$ 误差最开始我们是在时序差分方法的提到的,广义的定义是值函数(包括状态价值函数和动作价值函数)的估计值与实际值之差,在 $\text{DQN}$ 算法中就是目标网络计算的 $Q$ 值和策略网络(当前网络)计算的 $Q$ 值之差,也就是 $\text{DQN}$ 网络中损失函数的主要构成部分。
+
+$\qquad$ 我们每次从经验回访中取出一个批量的样本,进而计算的 $\text{TD}$ 误差一般是不同的,对于 $\text{DQN}$ 网络反向传播的作用也是不同的。**TD误差越大,损失函数的值也越大,对于反向传播的作用也就越大。** 这样一来如果 $\text{TD}$ 误差较大的样本更容易被采到的话,那么我们的算法也会更加容易收敛。因此我们只需要设计一个经验回放,根据经验回放中的每个样本计算出的TD误差赋予对应的优先级,然后在采样的时候取出优先级较大的样本。
+
+$\qquad$ 原理听上去比较简单,但具体如何实现呢?在实践中,我们通常用 SumTree 这样的二叉树结构来实现。这里建议没有了解过数据结构或者二叉树的读者先花个十几分钟的时间快速了解一下二叉树的基本概念,比如根节点、叶节点、父节点与子节点等等。
+
+$\qquad$ 如图 $\text{8-3}$ 所示,每个父节点的值等于左右两个子节点值之和。在强化学习中,所有的样本只保存在最下面的叶子节点中,并且除了保存样本数据之外,还会保存对应的优先级,即对应叶子节点中的值(例如图中的 $\text{31,13,14}$ 以及 $8$ 等,也对应样本的 $\text{TD}$ 误差)。并且根据叶子节点的值,我们从 $0$ 开始依次划分采样区间。然后在采样中,例如这里根节点值为 $66$ ,那么我们就可以在 $[0,66)$ 这个区间均匀采样,采样到的值落在哪个区间中,就说明对应的样本就是我们要采样的样本。例如我们采样到了 $25$ 这个值,即对应区间 $[0,31)$,那么我们就采样到了第一个叶子节点对应的样本。注意到,第一个样本对应的区间也是最长的,这意味着第一个样本的优先级最高,也就是 $\text{TD}$ 误差最大,反之第四个样本的区间最短,优先级也最低。这样一来,我们就可以通过采样来实现优先经验回放的功能。
+
+
+
+
+图 $\text{8-3}$ $\text{SumTree}$ 结构
+
+$\qquad$ 每个叶节点的值就是对应样本的 $\text{TD}$ 误差(例如途中的)。我们可以通过根节点的值来计算出每个样本的 $\text{TD}$ 误差占所有样本 $\text{TD}$ 误差的比例,这样就可以根据比例来采样样本。在实际的实现中,我们可以将每个叶节点的值设置为一个元组,其中包含样本的 $\text{TD}$ 误差和样本的索引,这样就可以通过索引来找到对应的样本。具体如何用 $\text{Python}$ 类来实现 $\text{SumTree}$ 结构,读者可以参考后面的实战内容。
+
+
+
+$\qquad$ 尽管 $\text{SumTree}$ 结构可以实现优先经验回放的功能。然而直接使用 $\text{TD}$ 误差作为优先级存在一些问题。首先,考虑到算法效率问题,我们在每次更新时不会把经验回放中的所有样本都计算 $\text{TD}$ 误差并更新对应的优先级,而是只更新当前取到的一定批量的样本。这样一来,每次计算的 $\text{TD}$ 误差是对应之前的网络,而不是当前待更新的网络。换句话说,如果某批量样本的 $\text{TD}$ 误差较低,只能说明它们对于之前的网络来说“信息量”不大,但不能说明对当前的网络“信息量”不大,因此单纯根据 $\text{TD}$ 误差进行优先采样有可能会错过对当前网络“信息量”更大的样本。其次,被选中样本的 $\text{TD}$ 误差会在当前更新后下降,然后优先级会排到后面去,下次这些样本就不会被选中,这样来来回回都是那几个样本,很容易出现“旱的旱死,涝的涝死”的情况,导致过拟合。
+
+$\qquad$ 为了解决上面提到的两个问题,我们首先引入**随机优先级采样**( $\text{stochastic prioritization}$ )的技巧。即在每次更新时,我们不再是直接采样 $\text{TD}$ 误差最大的样本,而是定义一个采样概率,如式 $\text{(8.7)}$ 所示。
+
+$$
+\tag{8.7}
+P(i) = \frac{p_i^\alpha}{\sum_k p_k^\alpha}
+$$
+
+$\qquad$ 其中,$p_i$ 是样本 $i$ 的优先级,$\alpha$ 是一个超参数,用于调节优先采样的程序,通常在 $(0,1)$ 的区间内。当 $\alpha=0$ 时,采样概率为均匀分布;当 $\alpha =1$ 时,采样概率为优先级的线性分布。同时,即使对于最低优先级的样本,我们也不希望它们的采样概率为 $0$ ,因此我们可以在优先级上加上一个常数 $\epsilon$,即式 $\text{(8.8)}$ 。
+
+$$
+\tag{8.8}
+p_i = |\delta_i| + \epsilon
+$$
+
+$\qquad$ 其中,$|\delta_i|$ 是样本 $i$ 的 $\text{TD}$ 误差。当然,我们也可以使用其他的优先级计算方式,如式 $\text{(8.9)}$ 所示。
+
+$$
+\tag{8.9}
+p_i = \frac{1}{rank(i)}
+$$
+
+$\qquad$ 其中 $rank(i)$ 是样本 $i$ 的优先级排名,这种方式也能保证每个样本的采样概率都不为 $0$ ,但在实践中,我们更倾向于直接增加一个常数 $\epsilon$ 的方式。
+
+$\qquad$ 除了随机优先级采样之外,我们还引入了另外一个技巧,在讲解该技巧之前,我们需要简单了解一下**重要性采样**,这个概念在后面的 $\text{PPO}$ 算法也会用到,读者需要重点掌握。重要性采样($\text{importance sampling}$ )是一种用于估计某一分布性质的方法,它的基本思想是,我们可以通过与待估计分布不同的另一个分布中采样,然后通过采样样本的权重来估计待估计分布的性质,数学表达式如式 $\text{(8.10)}$ 所示。
+
+$$
+\tag{8.10}
+\begin{aligned}
+\mathbb{E}_{x \sim p(x)}[f(x)] &= \int f(x) p(x) dx \\
+&= \int f(x) \frac{p(x)}{q(x)} q(x) dx \\
+&= \int f(x) \frac{p(x)}{q(x)} \frac{q(x)}{p(x)} p(x) dx \\
+&= \mathbb{E}_{x \sim q(x)}\left[\frac{p(x)}{q(x)} f(x)\right]
+\end{aligned}
+$$
+
+$\qquad$ 其中 $p(x)$ 是待估计分布,$q(x)$ 是采样分布,$f(x)$ 是待估计分布的性质。在前面我们讲到,每次计算的 $\text{TD}$ 误差是对应之前的网络,而不是当前待更新的网络。也就是说,我们已经从之前的网络中采样了一批样本,也就是 $q(x)$ 已知,然后只要找到之前网络分布与当前网络分布之前的权重 $\frac{p(x)}{q(x)}$,就可以利用重要性采样来估计出当前网络的性质。我们可以定义权重为式 $\text{(8.11)}$ 。
+
+$$
+\tag{8.11}
+w_i = \left(\frac{1}{N} \frac{1}{P(i)}\right)^\beta
+$$
+
+其中,$N$ 是经验回放中的样本数量,$P(i)$ 是样本 $i$ 的采样概率。同时,为了避免出现权重过大或过小的情况,我们可以对权重进行归一化处理,如式 $\text{(8.12)}$ 所示。
+
+$$
+\tag{8.12}
+w_i = \frac{\left(N*P(i)\right)^{-\beta}}{\max_j (w_j)}
+$$
+
+$\qquad$ 注意到,我们引入了一个超参数 $\beta$,用于调节重要性采样的程度,这个技术叫做 **热偏置**。当 $\beta = 0$ 时,重要性采样的权重为 1,即不考虑重要性采样;当 $\beta = 1$ 时,重要性采样的权重为 $w_i$,即完全考虑重要性采样。在实践中,我们希望 $\beta$ 从 0 开始,随着训练步数的增加而逐渐增加,以便更好地利用重要性采样,这就是热偏置(Annealing The Bias)的思想。数学表达式如式 $\text{(8.13)}$ 所示。
+
+$$
+\tag{8.13}
+\beta = \min(1, \beta + \beta_{\text{step}})
+$$
+
+$\qquad$ 其中,$\beta_{\text{step}}$ 是每个训练步数对应的 $\beta$ 的增量。在实践中,我们可以将 $\beta_{\text{step}}$ 设置为一个很小的常数,如 $0.0001$。这样一来,我们就可以在训练刚开始时,使用随机优先级采样,以便更快地收敛;在训练后期,使用重要性采样,以便更好地利用经验回放中的样本。
+
+结合上面的优先级采样和重要性采样,我们可以基于 SumTree 实现一个带有优先级的经验回放,代码如下:
+
+
+
+
+
+## C51 算法
+
+分布式 DQN 算法,即 Distributed DQN,有时也会看到它也叫 Categorical DQN 这个名字,但最常见的名字是 C51 算法⑤ 。该算法跟 PER 算法一样,是从不同的角度改进强化学习算法,而不单单指 DQN 算法,而是能适用于任何基于 Q-learning 的强化学习算法。该算法的核心思想是将传统 DQN 算法中的值函数 $Q(s,a)$ 换成了值分布 $Z(x,a)$,即将值函数的输出从一个数值变成了一个分布,这样就能更好地处理值函数估计不准确以及离散动作空间的问题。
+
+> ⑤ 论文链接:https://arxiv.org/abs/1707.06887
+
+在之前讲到的经典强化学习算法中我们优化的其实是值分布的均值,也就是 $Q$ 函数,但实际上由于状态转移的随机性、函数近似等原因,智能体与环境之间也存在着随机性,这也导致了最终累积的回报也会是一个随机变量,使用一个确定的均值会忽略整个分布所提供的信息。。因此,我们可以将值函数 $Q$ 看成是一个随机变量,它的期望值就是 $Q$ 函数,而它的方差就是 $Q$ 函数的不确定性,公式表示如下:
+
+$$
+Q^\pi(x, a):=\mathbb{E} Z^\pi(x, a)=\mathbb{E}\left[\sum_{t=0}^{\infty} \gamma^t R\left(x_t, a_t\right)\right]
+$$
+
+其中状态分布 $x_t \sim P\left(\cdot \mid x_{t-1}, a_{t-1}\right), a_t \sim \pi\left(\cdot \mid x_t\right), x_0=x, a_0=a \text {. }$
+
+## Rainbow DQN 算法
+
+## 实战:Double DQN 算法
+
+$\qquad$ 由于本章都是基于 $\text{DQN}$ 改进的算法,整体训练方式跟 $\text{DQN}$ 是一样的,也就是说伪代码基本都是一致的,因此不再赘述,只讲算法的改进部分。而 $\text{Double DQN}$ 算法跟 $\text{DQN}$ 算法的区别在于目标值的计算方式,如代码清单 $\text{8-1}$ 所示。
+
+
+ 代码清单 $\text{8-1}$ $\text{Double DQN}$目标值的计算
+
+
+```python
+# 计算当前网络的Q值,即Q(s_t+1|a)
+next_q_value_batch = self.policy_net(next_state_batch)
+# 计算目标网络的Q值,即Q'(s_t+1|a)
+next_target_value_batch = self.target_net(next_state_batch)
+# 计算 Q'(s_t+1|a=argmax Q(s_t+1|a))
+next_target_q_value_batch = next_target_value_batch.gather(1, torch.max(next_q_value_batch, 1)[1].unsqueeze(1))
+```
+
+$\qquad$ 最后与 $\text{DQN}$ 算法相同,可以得到 $\text{Double DQN}$ 算法在 $\text{CartPole}$ 环境下的训练结果,如图 $\text{8-5}$ 所示,完整的代码可以参考本书的代码仓库。
+
+
+
+
+图 $\text{8-5}$ $\text{CartPole}$ 环境 $\text{Double DQN}$ 算法训练曲线
+
+$\qquad$ 与 $\text{DQN}$ 算法的训练曲线对比可以看出,在实践上 $\text{Double DQN}$ 算法的效果并不一定比 $\text{DQN}$ 算法好,比如在这个环境下其收敛速度反而更慢了,因此读者需要多多实践才能摸索并体会到这些算法适合的场景。
+
+## 实战:Dueling DQN 算法
+
+$\qquad$ $\text{Dueling DQN}$ 算法主要是改了网络结构,其他地方跟 $\text{DQN}$ 是一模一样的,如代码清单 $\text{8-2}$ 所示。
+
+
+ 代码清单 $\text{8-2}$ $\text{Dueling DQN}$ 网络结构
+
+
+```python
+class DuelingQNetwork(nn.Module):
+ def __init__(self, state_dim, action_dim,hidden_dim=128):
+ super(DuelingQNetwork, self).__init__()
+ # 隐藏层
+ self.hidden_layer = nn.Sequential(
+ nn.Linear(state_dim, hidden_dim),
+ nn.ReLU()
+ )
+ # 优势层
+ self.advantage_layer = nn.Sequential(
+ nn.Linear(hidden_dim, hidden_dim),
+ nn.ReLU(),
+ nn.Linear(hidden_dim, action_dim)
+ )
+ # 价值层
+ self.value_layer = nn.Sequential(
+ nn.Linear(hidden_dim, hidden_dim),
+ nn.ReLU(),
+ nn.Linear(hidden_dim, 1)
+ )
+
+ def forward(self, state):
+ x = self.hidden_layer(state)
+ advantage = self.advantage_layer(x)
+ value = self.value_layer(x)
+ return value + advantage - advantage.mean() # Q(s,a) = V(s) + A(s,a) - mean(A(s,a))
+```
+
+$\qquad$ 最后我们展示一下它在 $\text{CartPole}$ 环境下的训练结果,如图 $\text{8-6}$ 所示,完整的代码同样可以参考本书的代码仓库。
+
+
+
+
+图 $\text{8-5}$ $\text{CartPole}$ 环境 $\text{Dueling DQN}$ 算法训练曲线
+
+$\qquad$ 由于环境比较简单,暂时还看不出来 $\text{Dueling DQN}$ 算法的优势,但是在复杂的环境下,比如 $\text{Atari}$ 游戏中,$\text{Dueling DQN}$ 算法的效果就会比 $\text{DQN}$ 算法好很多,读者可以在 $\text{JoyRL}$ 仓库中找到更复杂环境下的训练结果便于更好地进行对比。
+
+## 实战:Noisy DQN 算法
+
+$\qquad$ $\text{Noisy DQN}$ 算法的核心思想是将 $\text{DQN}$ 算法中的线性层替换成带有噪声的线性层,如代码清单 $\text{8-3}$ 所示。
+
+
+ 代码清单 $\text{8-3}$ 带有噪声的线性层网络
+
```python
class NoisyLinear(nn.Module):
'''在Noisy DQN中用NoisyLinear层替换普通的nn.Linear层
'''
- def __init__(self, in_dim, out_dim, std_init=0.4):
+ def __init__(self, input_dim, output_dim, std_init=0.4):
super(NoisyLinear, self).__init__()
-
- self.in_dim = in_dim
- self.out_dim = out_dim
+ self.input_dim = input_dim
+ self.output_dim = output_dim
self.std_init = std_init
-
- self.weight_mu = nn.Parameter(torch.empty(out_dim, in_dim))
- self.weight_sigma = nn.Parameter(torch.empty(out_dim, in_dim))
+ self.weight_mu = nn.Parameter(torch.empty(output_dim, input_dim))
+ self.weight_sigma = nn.Parameter(torch.empty(output_dim, input_dim))
# 将一个 tensor 注册成 buffer,使得这个 tensor 不被当做模型参数进行优化。
- self.register_buffer('weight_epsilon', torch.empty(out_dim, in_dim))
+ self.register_buffer('weight_epsilon', torch.empty(output_dim, input_dim))
- self.bias_mu = nn.Parameter(torch.empty(out_dim))
- self.bias_sigma = nn.Parameter(torch.empty(out_dim))
- self.register_buffer('bias_epsilon', torch.empty(out_dim))
+ self.bias_mu = nn.Parameter(torch.empty(output_dim))
+ self.bias_sigma = nn.Parameter(torch.empty(output_dim))
+ self.register_buffer('bias_epsilon', torch.empty(output_dim))
self.reset_parameters() # 初始化参数
self.reset_noise() # 重置噪声
@@ -128,17 +312,17 @@ class NoisyLinear(nn.Module):
return F.linear(x, weight, bias)
def reset_parameters(self):
- mu_range = 1 / self.in_dim ** 0.5
+ mu_range = 1 / self.input_dim ** 0.5
self.weight_mu.data.uniform_(-mu_range, mu_range)
- self.weight_sigma.data.fill_(self.std_init / self.in_dim ** 0.5)
+ self.weight_sigma.data.fill_(self.std_init / self.input_dim ** 0.5)
self.bias_mu.data.uniform_(-mu_range, mu_range)
- self.bias_sigma.data.fill_(self.std_init / self.out_dim ** 0.5)
+ self.bias_sigma.data.fill_(self.std_init / self.output_dim ** 0.5)
def reset_noise(self):
- epsilon_in = self._scale_noise(self.in_dim)
- epsilon_out = self._scale_noise(self.out_dim)
+ epsilon_in = self._scale_noise(self.input_dim)
+ epsilon_out = self._scale_noise(self.output_dim)
self.weight_epsilon.copy_(epsilon_out.ger(epsilon_in))
- self.bias_epsilon.copy_(self._scale_noise(self.out_dim))
+ self.bias_epsilon.copy_(self._scale_noise(self.output_dim))
def _scale_noise(self, size):
x = torch.randn(size)
@@ -146,7 +330,11 @@ class NoisyLinear(nn.Module):
return x
```
-根据写好的 NoisyLinear 层,我们可以在 DQN 算法中将普通的线性层替换为 NoisyLinear 层,如下:
+$\qquad$ 根据写好的 $\text{NoisyLinear}$ 层,我们可以在 $\text{DQN}$ 算法中将普通的线性层替换为 $\text{NoisyLinear}$ 层,如代码清单 $\text{8-4}$ 所示。
+
+
+ 代码清单 $\text{8-4}$ 带噪声层的全连接网络
+
```python
class NoisyQNetwork(nn.Module):
@@ -167,7 +355,11 @@ class NoisyQNetwork(nn.Module):
self.noisy_fc3.reset_noise()
```
-注意在训练过程中,我们需要在每个 mini-batch 中重置噪声,更多细节请参考 JoyRL 源码。另外,我们可以直接利用 torchrl 模块中中封装好的 NoisyLinear 层来构建 Noisy Q 网络,跟我们自己定义的功能是一样的,如下:
+$\qquad$ 注意在训练过程中,我们需要在每次更新后重置噪声,这样有助于提高训练的稳定性,更多细节请参考 $\text{JoyRL}$ 源码。另外,我们也可以直接利用 $\text{torchrl}$ 模块中中封装好的 $\text{NoisyLinear}$ 层来构建 $\text{Noisy Q}$ 网络,跟我们自己定义的功能是一样的,如代码清单 $\text{8-5}$ 所示。
+
+
+ 代码清单 $\text{8-5}$ 使用 $\text{torchrl}$ 模块构造的 $\text{Noisy Q}$ 网络
+
```python
import torchrl
@@ -189,26 +381,32 @@ class NoisyQNetwork(nn.Module):
self.noisy_fc3.reset_noise()
```
-## PER DQN 算法
+$\qquad$ 同样我们展示一下它在 $\text{CartPole}$ 环境下的训练结果,如图 $\text{8-6}$ 所示。
-在 DQN 算法章节中我们讲到经验回放,从另一个角度来说也是为了优化深度网络中梯度下降的方式,或者说网络参数更新的方式。在本节要讲的 PER DQN 算法④中,进一步优化了经验回放的设计从而提高模型的收敛能力和鲁棒性。PER 英文全称为 Prioritized Experience Replay,即优先经验回放,跟数据结构中优先队列与普通队列一样,会在采样过程中赋予经验回放中样本的优先级。
+
+
+
+图 $\text{8-6}$ $\text{CartPole}$ 环境 $\text{Noisy DQN}$ 算法训练曲线
+
+## 实战:PER DQN 算法
-> ④ 论文链接:https://arxiv.org/abs/1511.05952
+### 伪代码
-具体以什么为依据来为经验回放中的样本赋予不同优先级呢?答案是TD误差。TD误差最开始我们是在时序差分方法的提到的,广义的定义是值函数(包括状态价值函数和动作价值函数)的估计值与实际值之差,在 DQN 算法中就是目标网络计算的 $Q$ 值和策略网络(当前网络)计算的 $Q$ 值之差,也就是 DQN 网络中损失函数的主要构成部分。我们每次从经验回访中取出一个批量的样本,进而计算的TD误差一般是不同的,对于 DQN 网络反向传播的作用也是不同的。**TD误差越大,损失函数的值也越大,对于反向传播的作用也就越大。** 这样一来如果TD误差较大的样本更容易被采到的话,那么我们的算法也会更加容易收敛。因此我们只需要设计一个经验回放,根据经验回放中的每个样本计算出的TD误差赋予对应的优先级,然后在采样的时候取出优先级较大的样本。
+$\qquad$ $\text{PER DQN}$ 算法的核心看起来简单,就是把普通的经验回放改进成了优先级经验回放,但是实现起来却比较复杂,因为我们需要实现一个 $\text{SumTree}$ 结构,并且在模型更新的时候也需要一些额外的操作,因此我们先从伪代码开始,如图 $\text{8-7}$ 所示。
-原理听上去比较简单,但具体如何实现呢?在实践中,我们通常用 SumTree 这样的二叉树结构来实现。这里建议没有了解过数据结构或者二叉树的读者先花个十几分钟的时间快速了解一下二叉树的基本概念,比如根节点、叶节点、父节点与子节点等等。
-
+
-图 8.4 SumTree 结构
+图 $\text{8-7}$ $\text{PER DQN}$ 伪代码
-如图 8.4 所示,每个父节点的值等于左右两个子节点值之和。在强化学习中,所有的样本只保存在最下面的叶子节点中,并且除了保存样本数据之外,还会保存对应的优先级,即对应叶子节点中的值(例如图中的31、13、14以及8等,也对应样本的 TD 误差)。并且根据叶子节点的值,我们从 $0$ 开始依次划分采样区间。然后在采样中,例如这里根节点值为 66,那么我们就可以在$[0,66)$这个区间均匀采样,采样到的值落在哪个区间中,就说明对应的样本就是我们要采样的样本。例如我们采样到了 $25$ 这个值,即对应区间 $[0,31)$,那么我们就采样到了第一个叶子节点对应的样本。注意到,第一个样本对应的区间也是最长的,这意味着第一个样本的优先级最高,也就是 TD 误差最大,反之第四个样本的区间最短,优先级也最低。这样一来,我们就可以通过采样来实现优先经验回放的功能。
+### SumTree 结构
-每个叶节点的值就是对应样本的 TD 误差(例如途中的)。我们可以通过根节点的值来计算出每个样本的 TD 误差占所有样本 TD 误差的比例,这样就可以根据比例来采样样本。在实际的实现中,我们可以将每个叶节点的值设置为一个元组,其中包含样本的 TD 误差和样本的索引,这样就可以通过索引来找到对应的样本。
+$\qquad$ 如代码清单 $\text{8-6}$ 所示,我们可以先实现 $\text{SumTree}$ 结构。
-基于以上原理,我们可以新建一个 Python 类来实现 SumTree 结构,代码如下:
+
+ 代码清单 $\text{8-6}$ $\text{SumTree}$ 结构
+
```python
class SumTree:
@@ -270,62 +468,15 @@ class SumTree:
'''
return np.max(self.tree[self.capacity-1:self.capacity+self.write_idx-1])
```
-其中,除了需要存放各个节点的值(`tree`)之外,我们需要定义要给`data`来存放叶子节点的样本。此外,`add` 函数用于添加一个样本到叶子节点,并更新其父节点的优先级;`update` 函数用于更新叶子节点的优先级,并更新其父节点的优先级;`get_leaf` 函数用于根据优先级的值采样对应区间的叶子节点样本;`get_data` 函数用于根据索引获取对应的样本;`total` 函数用于返回所有样本的优先级之和,即根节点的值;`max_prior` 函数用于返回所有样本的最大优先级。
+$\qquad$ 其中,除了需要存放各个节点的值($\text{tree}$)之外,我们需要定义要给 $\text{data}$ 来存放叶子节点的样本。此外,$\text{add}$ 函数用于添加一个样本到叶子节点,并更新其父节点的优先级;$\text{update}$ 函数用于更新叶子节点的优先级,并更新其父节点的优先级;$\text{get_leaf}$ 函数用于根据优先级的值采样对应区间的叶子节点样本;$\text{get_data}$ 函数用于根据索引获取对应的样本;$\text{total}$ 函数用于返回所有样本的优先级之和,即根节点的值;$\text{max_prior}$ 函数用于返回所有样本的最大优先级。
-基于以上的 SumTree 结构,我们可以实现优先经验回放的功能。然而,论文原作者认为,直接使用 TD 误差作为优先级存在一些问题。首先,考虑到算法效率问题,我们在每次更新时不会把经验回放中的所有样本都计算 TD 误差并更新对应的优先级,而是只更新当前取到的一定批量的样本。这样一来,每次计算的 TD 误差是对应之前的网络,而不是当前待更新的网络。换句话说,如果某批量样本的 TD 误差较低,只能说明它们对于之前的网络来说“信息量”不大,但不能说明对当前的网络“信息量”不大,因此单纯根据 TD 误差进行优先采样有可能会错过对当前网络“信息量”更大的样本。其次,被选中样本的 TD 误差会在当前更新后下降,然后优先级会排到后面去,下次这些样本就不会被选中,这样来来回回都是那几个样本,很容易出现“旱的旱死,涝的涝死”的情况,导致过拟合。
+### 优先级经验回放
-**随机优先级采样**。为了解决上面提到的两个问题,我们首先引入随机优先级采样(Stochastic Prioritization)的技巧。即在每次更新时,我们不再是直接采样 TD 误差最大的样本,而是定义一个采样概率,如下:
+$\qquad$ 基于 $\text{SumTree}$ 结构,并结合优先级采样和重要性采样的技巧,如代码清单 $\text{8-7}$ 所示。
-$$
-P(i) = \frac{p_i^\alpha}{\sum_k p_k^\alpha}
-$$
-
-其中,$p_i$ 是样本 $i$ 的优先级,$\alpha$ 是一个超参数,用于调节优先采样的程序,通常在 $(0,1)$ 的区间内。当 $\alpha=0$ 时,采样概率为均匀分布;当 $\alpha =1$ 时,采样概率为优先级的线性分布。同时,即使对于最低优先级的样本,我们也不希望它们的采样概率为 $0$ ,因此我们可以在优先级上加上一个常数 $\epsilon$,即:
-
-$$
-p_i = |\delta_i| + \epsilon
-$$
-
-其中,$|\delta_i|$ 是样本 $i$ 的 TD 误差。当然,我们也可以使用其他的优先级计算方式,如:
-
-$$
-p_i = \frac{1}{rank(i)}
-$$
-
-其中,$rank(i)$ 是样本 $i$ 的优先级排名,这种方式也能保证每个样本的采样概率都不为 0,但在实践中,我们更倾向于直接增加一个常数 $\epsilon$ 的方式。
-
-**重要性采样**。除了随机优先级采样之外,我们还引入了另外一个技巧,在讲解该技巧之前,我们需要简单了解一下重要性采样。重要性采样(Importance Sampling)是一种用于估计某一分布性质的方法,它的基本思想是,我们可以通过与待估计分布不同的另一个分布中采样,然后通过采样样本的权重来估计待估计分布的性质,数学表达式如下:
-
-$$
-\begin{aligned}
-\mathbb{E}_{x \sim p(x)}[f(x)] &= \int f(x) p(x) dx \\
-&= \int f(x) \frac{p(x)}{q(x)} q(x) dx \\
-&= \int f(x) \frac{p(x)}{q(x)} \frac{q(x)}{p(x)} p(x) dx \\
-&= \mathbb{E}_{x \sim q(x)}\left[\frac{p(x)}{q(x)} f(x)\right]
-\end{aligned}
-$$
-
-其中,$p(x)$ 是待估计分布,$q(x)$ 是采样分布,$f(x)$ 是待估计分布的性质。在前面我们讲到,每次计算的 TD 误差是对应之前的网络,而不是当前待更新的网络。也就是说,我们已经从之前的网络中采样了一批样本,也就是 $q(x)$ 已知,然后只要找到之前网络分布与当前网络分布之前的权重 $\frac{p(x)}{q(x)}$,就可以利用重要性采样来估计出当前网络的性质。我们可以定义权重为:
-
-$$
-w_i = \left(\frac{1}{N} \frac{1}{P(i)}\right)^\beta
-$$
-
-其中,$N$ 是经验回放中的样本数量,$P(i)$ 是样本 $i$ 的采样概率。同时,为了避免出现权重过大或过小的情况,我们可以对权重进行归一化处理:
-
-$$
-w_i = \frac{\left(N*P(i)\right)^{-\beta}}{\max_j (w_j)}
-$$
-
-**热偏置**。注意到,我们引入了一个超参数 $\beta$,用于调节重要性采样的程度。当 $\beta = 0$ 时,重要性采样的权重为 1,即不考虑重要性采样;当 $\beta = 1$ 时,重要性采样的权重为 $w_i$,即完全考虑重要性采样。在实践中,我们希望 $\beta$ 从 0 开始,随着训练步数的增加而逐渐增加,以便更好地利用重要性采样,这就是热偏置(Annealing The Bias)的思想。数学表达式如下:
-
-$$
-\beta = \min(1, \beta + \beta_{\text{step}})
-$$
-
-其中,$\beta_{\text{step}}$ 是每个训练步数对应的 $\beta$ 的增量。在实践中,我们可以将 $\beta_{\text{step}}$ 设置为一个很小的常数,如 $0.0001$。这样一来,我们就可以在训练刚开始时,使用随机优先级采样,以便更快地收敛;在训练后期,使用重要性采样,以便更好地利用经验回放中的样本。
-
-结合上面的优先级采样和重要性采样,我们可以基于 SumTree 实现一个带有优先级的经验回放,代码如下:
+
+ 代码清单 $\text{8-7}$ 优先级经验回放结构
+
```python
class PrioritizedReplayBuffer:
@@ -380,7 +531,11 @@ class PrioritizedReplayBuffer:
return self.tree.count
```
-我们可以看到,优先级经验回放的核心是 SumTree,它可以在 $O(\log N)$ 的时间复杂度内完成添加、更新和采样操作。在实践中,我们可以将经验回放的容量设置为 $10^6$,并将 $\alpha$ 设置为 $0.6$,$\epsilon$ 设置为 $0.01$,$\beta$ 设置为 $0.4$,$\beta_{\text{step}}$ 设置为 $0.0001$。 当然我们也可以利用 Python 队列的方式实现优先级经验回放,形式上会更加简洁,并且在采样的时候减少了 for 循环的操作,会更加高效,如下:
+$\qquad$ 我们可以看到,优先级经验回放的核心是 SumTree,它可以在 $O(\log N)$ 的时间复杂度内完成添加、更新和采样操作。在实践中,我们可以将经验回放的容量设置为 $10^6$,并将 $\alpha$ 设置为 $0.6$,$\epsilon$ 设置为 $0.01$,$\beta$ 设置为 $0.4$,$\beta_{\text{step}}$ 设置为 $0.0001$。 当然我们也可以利用 Python 队列的方式实现优先级经验回放,形式上会更加简洁,并且在采样的时候减少了 for 循环的操作,会更加高效,如代码清单 $\text{8-8}$ 所示。
+
+
+ 代码清单 $\text{8-7}$ 基于队列实现优先级经验回放
+
```python
class PrioritizedReplayBufferQue:
@@ -415,100 +570,9 @@ class PrioritizedReplayBufferQue:
def __len__(self):
return self.count
```
-最后,我们可以将优先级经验回放和 DQN 结合起来,实现一个带有优先级的 DQN 算法,伪代码如下:
-
-
-
-
-图 8.4 SumTree 结构
-
-## C51 算法
-
-分布式 DQN 算法,即 Distributed DQN,有时也会看到它也叫 Categorical DQN 这个名字,但最常见的名字是 C51 算法⑤ 。该算法跟 PER 算法一样,是从不同的角度改进强化学习算法,而不单单指 DQN 算法,而是能适用于任何基于 Q-learning 的强化学习算法。该算法的核心思想是将传统 DQN 算法中的值函数 $Q(s,a)$ 换成了值分布 $Z(x,a)$,即将值函数的输出从一个数值变成了一个分布,这样就能更好地处理值函数估计不准确以及离散动作空间的问题。
-
-> ⑤ 论文链接:https://arxiv.org/abs/1707.06887
-
-在之前讲到的经典强化学习算法中我们优化的其实是值分布的均值,也就是 $Q$ 函数,但实际上由于状态转移的随机性、函数近似等原因,智能体与环境之间也存在着随机性,这也导致了最终累积的回报也会是一个随机变量,使用一个确定的均值会忽略整个分布所提供的信息。。因此,我们可以将值函数 $Q$ 看成是一个随机变量,它的期望值就是 $Q$ 函数,而它的方差就是 $Q$ 函数的不确定性,公式表示如下:
-
-$$
-Q^\pi(x, a):=\mathbb{E} Z^\pi(x, a)=\mathbb{E}\left[\sum_{t=0}^{\infty} \gamma^t R\left(x_t, a_t\right)\right]
-$$
-
-其中状态分布 $x_t \sim P\left(\cdot \mid x_{t-1}, a_{t-1}\right), a_t \sim \pi\left(\cdot \mid x_t\right), x_0=x, a_0=a \text {. }$
-
-## Rainbow DQN 算法
-
-## 实战:Double DQN 算法
-
-$\qquad$ 由于本章都是基于 $\text{DQN}$ 改进的算法,整体训练方式跟 $\text{DQN}$ 是一样的,也就是说伪代码基本都是一致的,因此不再赘述,只讲算法的改进部分。而 $\text{Double DQN}$ 算法跟 $\text{DQN}$ 算法的区别在于目标值的计算方式,如代码清单 $\text{8-1}$ 所示。
-
-
- 代码清单 $\text{8-1}$ $\text{Double DQN}$目标值的计算
-
-
-```python
-# 计算当前网络的Q值,即Q(s_t+1|a)
-next_q_value_batch = self.policy_net(next_state_batch)
-# 计算目标网络的Q值,即Q'(s_t+1|a)
-next_target_value_batch = self.target_net(next_state_batch)
-# 计算 Q'(s_t+1|a=argmax Q(s_t+1|a))
-next_target_q_value_batch = next_target_value_batch.gather(1, torch.max(next_q_value_batch, 1)[1].unsqueeze(1))
-```
-
-$\qquad$ 最后与 $\text{DQN}$ 算法相同,可以得到 $\text{Double DQN}$ 算法在 $\text{CartPole}$ 环境下的训练结果,如图 $\text{8-5}$ 所示,完整的代码可以参考本书的代码仓库。
+最后,我们可以将优先级经验回放和 DQN 结合起来,实现一个带有优先级的 DQN 算法,并展示它在 $\text{CartPole}$ 环境下的训练结果,如图 $\text{8-8}$ 所示。
-
+
-图 $\text{8-5}$ $\text{CartPole}$ 环境 $\text{Double DQN}$ 算法训练曲线
-
-$\qquad$ 与 $\text{DQN}$ 算法的训练曲线对比可以看出,在实践上 $\text{Double DQN}$ 算法的效果并不一定比 $\text{DQN}$ 算法好,比如在这个环境下其收敛速度反而更慢了,因此读者需要多多实践才能摸索并体会到这些算法适合的场景。
-
-## 实战:Dueling DQN 算法
-
-$\qquad$ $\text{Dueling DQN}$ 算法主要是改了网络结构,其他地方跟 $\text{DQN}$ 是一模一样的,如代码清单 $\text{8-2}$ 所示。
-
-
- 代码清单 $\text{8-2}$ $\text{Dueling DQN}$ 网络结构
-
-
-```python
-class DuelingQNetwork(nn.Module):
- def __init__(self, state_dim, action_dim,hidden_dim=128):
- super(DuelingQNetwork, self).__init__()
- # 隐藏层
- self.hidden_layer = nn.Sequential(
- nn.Linear(state_dim, hidden_dim),
- nn.ReLU()
- )
- # 优势层
- self.advantage_layer = nn.Sequential(
- nn.Linear(hidden_dim, hidden_dim),
- nn.ReLU(),
- nn.Linear(hidden_dim, action_dim)
- )
- # 价值层
- self.value_layer = nn.Sequential(
- nn.Linear(hidden_dim, hidden_dim),
- nn.ReLU(),
- nn.Linear(hidden_dim, 1)
- )
-
- def forward(self, state):
- x = self.hidden_layer(state)
- advantage = self.advantage_layer(x)
- value = self.value_layer(x)
- return value + advantage - advantage.mean() # Q(s,a) = V(s) + A(s,a) - mean(A(s,a))
-```
-
-$\qquad$ 最后我们展示一下它在 $\text{CartPole}$ 环境下的训练结果,如图 $\text{8-6}$ 所示,完整的代码同样可以参考本书的代码仓库。
-
-
-
-
-图 $\text{8-5}$ $\text{CartPole}$ 环境 $\text{Dueling DQN}$ 算法训练曲线
-
-$\qquad$ 由于环境比较简单,暂时还看不出来 $\text{Dueling DQN}$ 算法的优势,但是在复杂的环境下,比如 $\text{Atari}$ 游戏中,$\text{Dueling DQN}$ 算法的效果就会比 $\text{DQN}$ 算法好很多,读者可以在 $\text{JoyRL}$ 仓库中找到更复杂环境下的训练结果便于更好地进行对比。
-
-
-
+图 $\text{8-8}$ $\text{CartPole}$ 环境 $\text{PER DQN}$ 算法训练曲线
\ No newline at end of file
diff --git a/docs/figs/ch8/NoisyDQN_CartPole-v1_training_curve.png b/docs/figs/ch8/NoisyDQN_CartPole-v1_training_curve.png
new file mode 100644
index 0000000..b0a61e8
Binary files /dev/null and b/docs/figs/ch8/NoisyDQN_CartPole-v1_training_curve.png differ
diff --git a/docs/figs/ch8/PERDQN_CartPole-v1_training_curve.png b/docs/figs/ch8/PERDQN_CartPole-v1_training_curve.png
new file mode 100644
index 0000000..27f596b
Binary files /dev/null and b/docs/figs/ch8/PERDQN_CartPole-v1_training_curve.png differ
diff --git a/docs/figs/ch8/per_dqn_pseu.png b/docs/figs/ch8/per_dqn_pseu.png
index 216daf7..056cf91 100644
Binary files a/docs/figs/ch8/per_dqn_pseu.png and b/docs/figs/ch8/per_dqn_pseu.png differ
diff --git a/notebooks/NoisyDQN.ipynb b/notebooks/NoisyDQN.ipynb
new file mode 100644
index 0000000..69c32f5
--- /dev/null
+++ b/notebooks/NoisyDQN.ipynb
@@ -0,0 +1,582 @@
+{
+ "cells": [
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## 1. 定义算法\n",
+ "\n",
+ "NoisyDQN 是在 DQN 的基础上进行改进,主要就是通过在训练网络的时候加上一些噪声参数,可以用较小的额外计算成本,在强化学习算法上获得更优的结果。\n",
+ "配置和 DQN 基本一致,只是在模型定义的时候,在模型中加入了一些噪声参数。"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "### 1.1、 定义模型\n",
+ "这里使用了一个三层的MLP,不同的是其中加入了一些噪声参数,就是每个权值weight和偏置bias中都有额外的参数mu和sigma,这里仅供参考。"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 56,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "import math\n",
+ "import torch\n",
+ "import torch.nn as nn\n",
+ "import torch.nn.functional as F\n",
+ "class NoisyLinear(nn.Module):\n",
+ " def __init__(self, input_dim, output_dim, std_init=0.4):\n",
+ " super(NoisyLinear, self).__init__()\n",
+ " \n",
+ " self.input_dim = input_dim\n",
+ " self.output_dim = output_dim\n",
+ " self.std_init = std_init\n",
+ " \n",
+ " self.weight_mu = nn.Parameter(torch.FloatTensor(output_dim, input_dim))\n",
+ " self.weight_sigma = nn.Parameter(torch.FloatTensor(output_dim, input_dim))\n",
+ " self.register_buffer('weight_epsilon', torch.FloatTensor(output_dim, input_dim))\n",
+ " \n",
+ " self.bias_mu = nn.Parameter(torch.FloatTensor(output_dim))\n",
+ " self.bias_sigma = nn.Parameter(torch.FloatTensor(output_dim))\n",
+ " self.register_buffer('bias_epsilon', torch.FloatTensor(output_dim))\n",
+ " \n",
+ " self.reset_parameters()\n",
+ " self.reset_noise()\n",
+ " \n",
+ " def forward(self, x):\n",
+ " if self.training: \n",
+ " weight = self.weight_mu + self.weight_sigma.mul(torch.tensor(self.weight_epsilon))\n",
+ " bias = self.bias_mu + self.bias_sigma.mul(torch.tensor(self.bias_epsilon))\n",
+ " else:\n",
+ " weight = self.weight_mu\n",
+ " bias = self.bias_mu\n",
+ " \n",
+ " return F.linear(x, weight, bias)\n",
+ " \n",
+ " def reset_parameters(self):\n",
+ " mu_range = 1 / math.sqrt(self.weight_mu.size(1))\n",
+ " \n",
+ " self.weight_mu.data.uniform_(-mu_range, mu_range)\n",
+ " self.weight_sigma.data.fill_(self.std_init / math.sqrt(self.weight_sigma.size(1)))\n",
+ " \n",
+ " self.bias_mu.data.uniform_(-mu_range, mu_range)\n",
+ " self.bias_sigma.data.fill_(self.std_init / math.sqrt(self.bias_sigma.size(0)))\n",
+ " \n",
+ " def reset_noise(self):\n",
+ " epsilon_in = self._scale_noise(self.input_dim)\n",
+ " epsilon_out = self._scale_noise(self.output_dim)\n",
+ " \n",
+ " self.weight_epsilon.copy_(epsilon_out.ger(epsilon_in))\n",
+ " self.bias_epsilon.copy_(self._scale_noise(self.output_dim))\n",
+ " \n",
+ " def _scale_noise(self, size):\n",
+ " x = torch.randn(size)\n",
+ " x = x.sign().mul(x.abs().sqrt())\n",
+ " return x\n",
+ "\n",
+ "class NoisyMLP(nn.Module):\n",
+ " def __init__(self, input_dim,output_dim,hidden_dim=128):\n",
+ " super(NoisyMLP, self).__init__()\n",
+ " self.fc1 = nn.Linear(input_dim, hidden_dim)\n",
+ " self.noisy_fc2 = NoisyLinear(hidden_dim, hidden_dim)\n",
+ " self.noisy_fc3 = NoisyLinear(hidden_dim, output_dim)\n",
+ " \n",
+ " def forward(self, x):\n",
+ " x = F.relu(self.fc1(x))\n",
+ " x = F.relu(self.noisy_fc2(x))\n",
+ " x = self.noisy_fc3(x)\n",
+ " return x\n",
+ "\n",
+ " def reset_noise(self):\n",
+ " self.noisy_fc2.reset_noise()\n",
+ " self.noisy_fc3.reset_noise()\n"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "### 1.2、定义经验回放\n",
+ "\n",
+ "这里的经验回放和DQN中保持一致,也是具有一定容量,只有存储到一定的transition网络才会更新。经验回放的时候一般包涵两个功能或方法,一个是push,即将一个transition样本按顺序放到经验回放中,如果满了就把最开始放进去的样本挤掉;另外一个是sample,就是随机采样出一个或者若干个(具体多少就是batch_size了)样本供DQN网络更新。"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 57,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "from collections import deque\n",
+ "import random\n",
+ "class ReplayBuffer(object):\n",
+ " def __init__(self, capacity: int) -> None:\n",
+ " self.capacity = capacity\n",
+ " self.buffer = deque(maxlen=self.capacity)\n",
+ " def push(self,transitions):\n",
+ " ''' 存储transition到经验回放中\n",
+ " '''\n",
+ " self.buffer.append(transitions)\n",
+ " def sample(self, batch_size: int, sequential: bool = False):\n",
+ " if batch_size > len(self.buffer): # 如果批量大小大于经验回放的容量,则取经验回放的容量\n",
+ " batch_size = len(self.buffer)\n",
+ " if sequential: # 顺序采样\n",
+ " rand = random.randint(0, len(self.buffer) - batch_size)\n",
+ " batch = [self.buffer[i] for i in range(rand, rand + batch_size)]\n",
+ " return zip(*batch)\n",
+ " else: # 随机采样\n",
+ " batch = random.sample(self.buffer, batch_size)\n",
+ " return zip(*batch)\n",
+ " def clear(self):\n",
+ " ''' 清空经验回放\n",
+ " '''\n",
+ " self.buffer.clear()\n",
+ " def __len__(self):\n",
+ " ''' 返回当前存储的量\n",
+ " '''\n",
+ " return len(self.buffer)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "### 1.3、模型算法定义\n",
+ "\n",
+ "这里根据前面的噪声MLP搭建智能体agent,其中的动作采样和模型更新和DQN基本一致,这里不再赘述。"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 58,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "import torch\n",
+ "import torch.optim as optim\n",
+ "import math\n",
+ "import numpy as np\n",
+ "\n",
+ "class NoisyDQN:\n",
+ " def __init__(self, model, memory, cfg) -> None:\n",
+ " self.n_actions = cfg.n_actions \n",
+ " self.device = torch.device(cfg.device) \n",
+ " self.gamma = cfg.gamma \n",
+ " ## e-greedy策略相关参数\n",
+ " self.sample_count = 0 # 用于epsilon的衰减计数\n",
+ " self.epsilon = cfg.epsilon_start\n",
+ " self.epsilon_start = cfg.epsilon_start\n",
+ " self.epsilon_end = cfg.epsilon_end\n",
+ " self.epsilon_decay = cfg.epsilon_decay\n",
+ " self.batch_size = cfg.batch_size\n",
+ " self.target_update = cfg.target_update\n",
+ "\n",
+ " self.device = torch.device(cfg.device) \n",
+ "\n",
+ " self.policy_net = model.to(self.device)\n",
+ " self.target_net = model.to(self.device)\n",
+ " ## 复制参数到目标网络\n",
+ " for target_param, param in zip(self.target_net.parameters(),self.policy_net.parameters()): \n",
+ " target_param.data.copy_(param.data)\n",
+ " self.optimizer = optim.Adam(self.policy_net.parameters(), lr=cfg.lr)\n",
+ " self.memory = memory ## 经验回放\n",
+ " self.update_flag = False\n",
+ " \n",
+ " def sample_action(self, state):\n",
+ " ''' sample action with e-greedy policy \n",
+ " '''\n",
+ " self.sample_count += 1\n",
+ " # epsilon 指数衰减\n",
+ " self.epsilon = self.epsilon_end + (self.epsilon_start - self.epsilon_end) * \\\n",
+ " math.exp(-1. * self.sample_count / self.epsilon_decay) \n",
+ " if random.random() > self.epsilon:\n",
+ " with torch.no_grad():\n",
+ " state = torch.tensor(state, device=self.device, dtype=torch.float32).unsqueeze(dim=0)\n",
+ " q_values = self.policy_net(state)\n",
+ " action = q_values.max(1)[1].item() # 根据Q值选择动作\n",
+ " else:\n",
+ " action = random.randrange(self.n_actions)\n",
+ " return action\n",
+ "\n",
+ " @torch.no_grad()\n",
+ " def predict_action(self, state):\n",
+ " state = torch.tensor(state, device=self.device, dtype=torch.float32).unsqueeze(dim=0)\n",
+ " q_value = self.policy_net(state)\n",
+ " action = q_value.max(1)[1].item()\n",
+ " return action\n",
+ " def update(self):\n",
+ " if len(self.memory) < self.batch_size: # 不满足一个批量时,不更新策略\n",
+ " return\n",
+ " else:\n",
+ " if not self.update_flag:\n",
+ " print(\"Begin to update!\")\n",
+ " self.update_flag = True\n",
+ " # beta = min(1.0, self.beta_start + self.sample_count * (1.0 - self.beta_start) / self.beta_frames)\n",
+ " state_batch, action_batch, reward_batch, next_state_batch, done_batch = self.memory.sample(\n",
+ " self.batch_size)\n",
+ " # state_batch, action_batch, reward_batch, next_state_batch, done_batch, weights_batch, indices = self.memory.sample(self.batch_size, beta) \n",
+ " state_batch = torch.tensor(np.array(state_batch), device=self.device, dtype=torch.float) \n",
+ " action_batch = torch.tensor(action_batch, device=self.device).unsqueeze(1)\n",
+ " reward_batch = torch.tensor(reward_batch, device=self.device, dtype=torch.float).unsqueeze(1)\n",
+ " next_state_batch = torch.tensor(np.array(next_state_batch), device=self.device, dtype=torch.float) # shape(batchsize,n_states)\n",
+ " done_batch = torch.tensor(done_batch, device=self.device, dtype=torch.float).unsqueeze(1)\n",
+ " # weights_batch = torch.tensor(weights_batch, device=self.device, dtype=torch.float)\n",
+ "\n",
+ " q_value_batch = self.policy_net(state_batch).gather(dim=1, index=action_batch) # shape(batchsize,1),requires_grad=True\n",
+ " next_max_q_value_batch = self.target_net(next_state_batch).max(1)[0].detach().unsqueeze(1) \n",
+ " expected_q_value_batch = reward_batch + self.gamma * next_max_q_value_batch* (1-done_batch)\n",
+ "\n",
+ " loss = nn.MSELoss()(q_value_batch, expected_q_value_batch) # shape same to \n",
+ " # 反向传播\n",
+ " self.optimizer.zero_grad() \n",
+ " loss.backward()\n",
+ " # 梯度截断,防止梯度爆炸\n",
+ " for param in self.policy_net.parameters(): \n",
+ " param.grad.data.clamp_(-1, 1)\n",
+ " self.optimizer.step() \n",
+ "\n",
+ " if self.sample_count % self.target_update == 0: # 更新 target_net\n",
+ " self.target_net.load_state_dict(self.policy_net.state_dict()) \n",
+ "\n",
+ " ## 噪声参数重置\n",
+ " self.policy_net.reset_noise()\n",
+ " self.target_net.reset_noise()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## 2、 定义训练"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 59,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "def train(cfg, env, agent):\n",
+ " ''' 训练\n",
+ " '''\n",
+ " print(\"开始训练!\")\n",
+ " rewards = [] # 记录所有回合的奖励\n",
+ " steps = []\n",
+ " for i_ep in range(cfg.train_eps):\n",
+ " ep_reward = 0 # 记录一回合内的奖励\n",
+ " ep_step = 0\n",
+ " state = env.reset() # 重置环境,返回初始状态\n",
+ " for _ in range(cfg.max_steps):\n",
+ " ep_step += 1\n",
+ " action = agent.sample_action(state) # 选择动作\n",
+ " next_state, reward, done, _ = env.step(action) # 更新环境,返回transition\n",
+ " agent.memory.push((state, action, reward,next_state, done)) # 保存transition\n",
+ " state = next_state # 更新下一个状态\n",
+ " agent.update() # 更新智能体\n",
+ " ep_reward += reward # 累加奖励\n",
+ " if done:\n",
+ " break\n",
+ " if (i_ep + 1) % cfg.target_update == 0: # 智能体目标网络更新\n",
+ " agent.target_net.load_state_dict(agent.policy_net.state_dict())\n",
+ " steps.append(ep_step)\n",
+ " rewards.append(ep_reward)\n",
+ " if (i_ep + 1) % 10 == 0:\n",
+ " print(f\"回合:{i_ep+1}/{cfg.train_eps},奖励:{ep_reward:.2f},Epislon:{agent.epsilon:.3f}\")\n",
+ " print(\"完成训练!\")\n",
+ " env.close()\n",
+ " return {'rewards':rewards}\n",
+ "\n",
+ "def test(cfg, env, agent):\n",
+ " print(\"开始测试!\")\n",
+ " rewards = [] # 记录所有回合的奖励\n",
+ " steps = []\n",
+ " for i_ep in range(cfg.test_eps):\n",
+ " ep_reward = 0 # 记录一回合内的奖励\n",
+ " ep_step = 0\n",
+ " state = env.reset() # 重置环境,返回初始状态\n",
+ " for _ in range(cfg.max_steps):\n",
+ " ep_step+=1\n",
+ " action = agent.predict_action(state) # 选择动作\n",
+ " next_state, reward, done, _ = env.step(action) # 更新环境,返回transition\n",
+ " state = next_state # 更新下一个状态\n",
+ " ep_reward += reward # 累加奖励\n",
+ " if done:\n",
+ " break\n",
+ " steps.append(ep_step)\n",
+ " rewards.append(ep_reward)\n",
+ " print(f\"回合:{i_ep+1}/{cfg.test_eps},奖励:{ep_reward:.2f}\")\n",
+ " print(\"完成测试\")\n",
+ " env.close()\n",
+ " return {'rewards':rewards}"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## 3. 定义环境"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 60,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "import gym\n",
+ "import os\n",
+ "def all_seed(env,seed = 1):\n",
+ " ''' 万能的seed函数\n",
+ " '''\n",
+ " env.seed(seed) # env config\n",
+ " np.random.seed(seed)\n",
+ " random.seed(seed)\n",
+ " torch.manual_seed(seed) # config for CPU\n",
+ " torch.cuda.manual_seed(seed) # config for GPU\n",
+ " os.environ['PYTHONHASHSEED'] = str(seed) # config for python scripts\n",
+ " # config for cudnn\n",
+ " torch.backends.cudnn.deterministic = True\n",
+ " torch.backends.cudnn.benchmark = False\n",
+ " torch.backends.cudnn.enabled = False\n",
+ "def env_agent_config(cfg):\n",
+ " env = gym.make(cfg.env_name) # 创建环境\n",
+ " if cfg.seed !=0:\n",
+ " all_seed(env,seed=cfg.seed)\n",
+ " n_states = env.observation_space.shape[0]\n",
+ " n_actions = env.action_space.n\n",
+ " print(f\"状态空间维度:{n_states},动作空间维度:{n_actions}\")\n",
+ "\n",
+ " cfg.n_actions = env.action_space.n ## set the env action space\n",
+ " model = NoisyMLP(n_states, n_actions, hidden_dim = cfg.hidden_dim) # 创建模型\n",
+ " memory = ReplayBuffer(cfg.buffer_size)\n",
+ " agent = NoisyDQN(model,memory,cfg)\n",
+ " return env,agent"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## 4、设置参数"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 61,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "import argparse\n",
+ "import matplotlib.pyplot as plt\n",
+ "import seaborn as sns\n",
+ "class Config():\n",
+ " def __init__(self) -> None:\n",
+ " self.env_name = \"CartPole-v1\" # 环境名字\n",
+ " self.new_step_api = True # 是否用gym的新api\n",
+ " self.wrapper = None \n",
+ " self.render = False \n",
+ " self.algo_name = \"NoisyDQN\" # 算法名字\n",
+ " self.mode = \"train\" # train or test\n",
+ " self.seed = 0 # 随机种子\n",
+ " self.device = \"cpu\" # device to use\n",
+ " self.train_eps = 100 # 训练的回合数\n",
+ " self.test_eps = 20 # 测试的回合数\n",
+ " self.eval_eps = 10 # 评估的回合数\n",
+ " self.eval_per_episode = 5 # 每个回合的评估次数\n",
+ " self.max_steps = 200 # 每个回合的最大步数\n",
+ " self.load_checkpoint = False\n",
+ " self.load_path = \"tasks\" # 加载模型的路径\n",
+ " self.show_fig = False # 是否展示图片\n",
+ " self.save_fig = True # 是否存储图片\n",
+ "\n",
+ "\n",
+ " # 设置epsilon值\n",
+ " self.epsilon_start = 0.95 # 起始的epsilon值\n",
+ " self.epsilon_end = 0.01 # 终止的epsilon值\n",
+ " self.epsilon_decay = 500 # 衰减率\n",
+ " self.hidden_dim = 256 \n",
+ " self.gamma = 0.95 \n",
+ " self.lr = 0.0001 \n",
+ " self.buffer_size = 100000 # 经验回放的buffer大小\n",
+ " self.batch_size = 64 # batch size\n",
+ " self.target_update = 4 # 目标网络更新频率\n",
+ " self.value_layers = [\n",
+ " {'layer_type': 'linear', 'layer_dim': ['n_states', 256],\n",
+ " 'activation': 'relu'},\n",
+ " {'layer_type': 'linear', 'layer_dim': [256, 256],\n",
+ " 'activation': 'relu'},\n",
+ " {'layer_type': 'linear', 'layer_dim': [256, 'n_actions'],\n",
+ " 'activation': 'none'}]\n",
+ "\n",
+ "def smooth(data, weight=0.9): \n",
+ " '''用于平滑曲线,类似于Tensorboard中的smooth曲线\n",
+ " '''\n",
+ " last = data[0] \n",
+ " smoothed = []\n",
+ " for point in data:\n",
+ " smoothed_val = last * weight + (1 - weight) * point # 计算平滑值\n",
+ " smoothed.append(smoothed_val) \n",
+ " last = smoothed_val \n",
+ " return smoothed\n",
+ "\n",
+ "def plot_rewards(rewards,cfg, tag='train'):\n",
+ " ''' 画图\n",
+ " '''\n",
+ " sns.set()\n",
+ " plt.figure() # 创建一个图形实例,方便同时多画几个图\n",
+ " plt.title(f\"{tag}ing curve on {cfg.device} of {cfg.algo_name} for {cfg.env_name}\")\n",
+ " plt.xlabel('epsiodes')\n",
+ " plt.plot(rewards, label='rewards')\n",
+ " plt.plot(smooth(rewards), label='smoothed')\n",
+ " plt.legend()\n",
+ " plt.show()\n"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## 5、开始训练"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 63,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "状态空间维度:4,动作空间维度:2\n",
+ "开始训练!\n",
+ "Begin to update!\n"
+ ]
+ },
+ {
+ "name": "stderr",
+ "output_type": "stream",
+ "text": [
+ "/home/dingli/anaconda3/envs/joyrl/lib/python3.7/site-packages/ipykernel_launcher.py:26: UserWarning: To copy construct from a tensor, it is recommended to use sourceTensor.clone().detach() or sourceTensor.clone().detach().requires_grad_(True), rather than torch.tensor(sourceTensor).\n",
+ "/home/dingli/anaconda3/envs/joyrl/lib/python3.7/site-packages/ipykernel_launcher.py:27: UserWarning: To copy construct from a tensor, it is recommended to use sourceTensor.clone().detach() or sourceTensor.clone().detach().requires_grad_(True), rather than torch.tensor(sourceTensor).\n"
+ ]
+ },
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "回合:10/100,奖励:11.00,Epislon:0.711\n",
+ "回合:20/100,奖励:18.00,Epislon:0.498\n",
+ "回合:30/100,奖励:20.00,Epislon:0.359\n",
+ "回合:40/100,奖励:20.00,Epislon:0.214\n",
+ "回合:50/100,奖励:94.00,Epislon:0.049\n",
+ "回合:60/100,奖励:200.00,Epislon:0.011\n",
+ "回合:70/100,奖励:200.00,Epislon:0.010\n",
+ "回合:80/100,奖励:200.00,Epislon:0.010\n",
+ "回合:90/100,奖励:200.00,Epislon:0.010\n",
+ "回合:100/100,奖励:200.00,Epislon:0.010\n",
+ "完成训练!\n"
+ ]
+ },
+ {
+ "data": {
+ "image/png": "",
+ "text/plain": [
+ "