目录
1.前言
这次我们还是使用MountainCar来进行实验,因为这次我们不需要重度改变它的reward了。所以只要是没有拿到小旗子reward=-1,拿到小旗子时,我们定义它获得了+10的reward。比起之前DQN中,这个reward定义更加准确。如果使用这种reward定义方式,可以想象Natural DQN会花很长时间学习。因为记忆库中只有很少很少的+10的reward可以学习,正负样本不一样。而使用Prioritized replay,就会重视这种少量,但值得学习的样本。
接下来我们就来看看他是怎么做到的。
2.算法
这一套算法的重点就在我们batch抽样的时候并不是随机抽样的,而是按照Memory中的样本优先级来抽。所以这能更有效地找到我们需要学习的样本。
那么样本的优先级是怎么定呢?原来我们可以用到TD-error
,也就是Q现实-Q估计
来规定优先学习的成都。如果TD-error
越大,就代表我们的预测精度还有很多上升空间,那么这个样本就越需要被学习,也就是优先级p
越高。
有了TD-error
就有了优先级p
,那我们如何有效地根据p
来抽样呢?如果每次抽样都需要针对p
对所有样本排序,这将会是一件非常消耗计算能力的事情,好在我们还有其他方法,这种方法不会对得到的样本进行排序,这就是论文中说到的SumTree
。
SumTree
是一种树形结构,每片输液存储每个样本的优先级p
,每个树枝节点只有两个分叉,节点的值是两个分叉的合,所以SumTree
的顶端就是所有p
的合。正如下面图片,最下面一层树叶存储样本的p
。叶子上一层最左边的13=3+10,按这个规律相加,顶层的roor就是全部p
的合了。
抽样的时,我们会将p
的总和除以batch size,分成batch size那么多区间,(n=sum§/batch_size)。如果将所有node的priority加起来是42的话,我们如果抽6个样本,这时的区间拥有的priority可能是这样的:
[0-7], [7-14], [14-21], [21-28], [28-35], [35-42]
然后在每个区间里随机选取一个数。比如在第4个区间[21-28]
选到了24,就按照这个24从最顶上的42开始往下搜索。首先看到最顶上42
下面有两个child nodes
,拿着手中的24对比左边的chlid29
,如果左边的chlid比自己手中的值大,那我们就走左边这条路,接着再对比29
下面的左边那个点13
,这时,手中的24比13
大,那我们就走右边的路,并且将手中的值根据13
修改一下,变成24-13=11.接着拿11和13
右下角的12
比,结果12
比11大,那我们就选12当做这次选到的priority,并且也选择12对应的数据。
2.1 SumTree有效抽样
class SumTree(object):
# 建立 tree 和 data,
# 因为 SumTree 有特殊的数据结构,
# 所以两者都能用一个一维 np.array 来存储
def __init__(self, capacity):
# 当有新 sample 时, 添加进 tree 和 data
def add(self, p, data):
# 当 sample 被 train, 有了新的 TD-error, 就在 tree 中更新
def update(self, tree_idx, p):
# 根据选取的 v 点抽取样本
def get_leaf(self, v):
# 获取 sum(priorities)
@property
def totoal_p(self):
2.2 Memory类
class Memory(object):
# 建立 SumTree 和各种参数
def __init__(self, capacity):
# 存储数据, 更新 SumTree
def store(self, transition):
# 抽取 sample
def sample(self, n):
# train 完被抽取的 samples 后更新在 tree 中的 sample 的 priority
def batch_update(self, tree_idx, abs_errors):
具体完整的代码我在最后会附上我github的链接,这里说一下这个关于ISweight到底怎么算。需要提到一点是,代码中的计算方法是经过了简化的,将论文中的步骤合并了一些,比如:prob = p / self.tree.total_p; ISWeights = np.power(prob/min_prob, -self.beta)
在paper 中, ISWeight = (N*Pj)^(-beta) / maxi_wi
里面的maxi_wi
是为了 normalize ISWeight, 所以我们先把他放在一边. 所以单纯的importance sampling 就是(N*Pj)^(-beta)
,那 maxi_wi = maxi[(N*Pi)^(-beta)].
如果将这两个式子合并,
ISWeight = (N*Pj)^(-beta) / maxi[ (N*Pi)^(-beta) ]
而且如果将maxi[ (N*Pi)^(-beta)]
中的 (-beta) 提出来, 这就变成了mini[ (N*Pi) ] ^ (-beta)
看出来了吧, 有的东西可以抵消掉的. 最后
ISWeight = (Pj / mini[Pi])^(-beta)
这样我们就有了代码中的样子.
还有代码中的alpha
是一个决定我们要使用多少 ISweight 的影响, 如果alpha = 0
,我们就没使用到任何 Importance Sampling.
2.3 更新方法
我们在_init_
中加一个prioritized
参数来表示DQN是否具备prioritized能力。为了对比的需要,我们的tf.Session()
也单独传入,并移除原本在DQN代码中的这一句:self.sess.run(tf.global_variables_initializer())
搭建神经网络时,我们发现DQN with Prioritized replay只多了一个ISWeights
,这个正是刚刚算法中提到的Importance-Sampling Weights
,用来恢复被Prioritized replay打乱的抽样概率分布。
class DQNPrioritizedReplay:
def _build_net(self)
...
# self.prioritized 时 eval net 的 input 多加了一个 ISWeights
self.s = tf.placeholder(tf.float32, [None, self.n_features], name='s') # input
self.q_target = tf.placeholder(tf.float32, [None, self.n_actions], name='Q_target') # for calculating loss
if self.prioritized:
self.ISWeights = tf.placeholder(tf.float32, [None, 1], name='IS_weights')
...
# 为了得到 abs 的 TD error 并用于修改这些 sample 的 priority, 我们修改如下
with tf.variable_scope('loss'):
if self.prioritized:
self.abs_errors = tf.reduce_sum(tf.abs(self.q_target - self.q_eval), axis=1) # for updating Sumtree
self.loss = tf.reduce_mean(self.ISWeights * tf.squared_difference(self.q_target, self.q_eval))
else:
self.loss = tf.reduce_mean(tf.squared_difference(self.q_target, self.q_eval))
因为和Natural DQN使用的Memory不一样,所以在存储transition的时候方式也略不相同
class DQNPrioritizedReplay:
def store_transition(self, s, a, r, s_):
if self.prioritized: # prioritized replay
transition = np.hstack((s, [a, r], s_))
self.memory.store(transition)
else: # random replay
if not hasattr(self, 'memory_counter'):
self.memory_counter = 0
transition = np.hstack((s, [a, r], s_))
index = self.memory_counter % self.memory_size
self.memory[index, :] = transition
self.memory_counter += 1
我们在learn()部分的改变也在如下展示:
class DQNPrioritizedReplay:
def learn(self):
...
# 相对于 DQN 代码, 改变的部分
if self.prioritized:
tree_idx, batch_memory, ISWeights = self.memory.sample(self.batch_size)
else:
sample_index = np.random.choice(self.memory_size, size=self.batch_size)
batch_memory = self.memory[sample_index, :]
...
if self.prioritized:
_, abs_errors, self.cost = self.sess.run([self._train_op, self.abs_errors, self.loss],
feed_dict={self.s: batch_memory[:, :self.n_features],
self.q_target: q_target,
self.ISWeights: ISWeights})
self.memory.batch_update(tree_idx, abs_errors) # update priority
else:
_, self.cost = self.sess.run([self._train_op, self.loss],
feed_dict={self.s: batch_memory[:, :self.n_features],
self.q_target: q_target})
...
3.对比结果
运行我Github中的这个MountainCar脚本,我们就不难发现,我们都从两种方法最初拿到第一个R+=10
奖励的时候算起,看看经历过一次R+=10
后,他们有没有好好利用这次的奖励,可以看出,有 Prioritized replay的可以高效地利用这些不常拿到的奖励,并好好学习他们。所以Prioritized replay 会更快结束每个 episode, 很快就到达了小旗子。
完整代码:https://github.com/cristianoc20/RL_learning/tree/master/Prioritized_Replay_DQN
参考:https://github.com/MorvanZhou