D3QN代码实现

D3QN代码实现

  • 使用tensorflow实现使用D3QN。

代码及解释

0.运行环境

设备/包 版本
python 3.7.11
显卡 GTX 1050
CUDA 10.2
cudnn 7.6.5
cudatoolkit 10.0.130
tensorflow-gpu 2.2.0
tensorlayer 2.2.3
tensorflow-probability 0.9.0

1.包引入与参数设定

import argparse
import os
import random

import numpy as np

import gym
import tensorflow as tf
import tensorlayer as tl

from matplotlib import animation
import matplotlib.pyplot as plt

parser = argparse.ArgumentParser()
parser.add_argument('--train', dest='train', default=False)
# 训练时是否渲染
parser.add_argument('--render', type=bool, default=False)
parser.add_argument('--save_gif', type=bool, default=True)

parser.add_argument('--gamma', type=float, default=0.995)
parser.add_argument('--lr', type=float, default=0.005)
parser.add_argument('--batch_size', type=int, default=128)
parser.add_argument('--eps', type=float, default=0.2)

parser.add_argument('--train_episodes', type=int, default=1000)
parser.add_argument('--test_episodes', type=int, default=10)
args = parser.parse_args()

ALG_NAME = 'D3QN'
ENV_ID = 'LunarLander-v2'

2.ReplayBuffer的实现

import random  
import numpy as np

class ReplayBuffer:  
	def __init__(self, capacity=50000):  
		self.capacity = capacity  
		self.buffer = []
		#buffer满了之后要从头开始循环利用
		self.position = 0  
	
	def push(self, state, action, reward, next_state, done):  
		if len(self.buffer) < self.capacity:  
			self.buffer.append(None)  
		self.buffer[self.position] = (state, action, reward, next_state, done)  
		self.position = int((self.position + 1) % self.capacity)  

	def sample(self, batch_size = args.batch_size): 
		#从buffer里随机抽batch_size个transition出来
		batch = random.sample(self.buffer, batch_size)
		#把这batch_size个transition分门别类放在几个数组里
		state, action, reward, next_state, done = map(np.stack, zip(*batch))  
		return state, action, reward, next_state, done

3.D3QN类的实现

  • D3QN类主要实现8个方法。
    • _init_:初始化agent。
    • target_update:用于更新target network。
    • choose_action:选择动作。
    • replay:使用梯度下降更新价值函数。
    • test_episode:用于测试模型。
    • train:用于采集训练模型所需要的参数。
    • saveModel:保存模型。
    • loadModel:加载模型。
3.1. _init_
  • D3QN网络建立
def create_model(input_state_shape):
    input_layer = tl.layers.Input(input_state_shape)
    layer_1 = tl.layers.Dense(n_units=256, act=tf.nn.relu)(input_layer)
    layer_2 = tl.layers.Dense(n_units=128, act=tf.nn.relu)(layer_1)

    state_hidden = tl.layers.Dense(n_units=64)(layer_2)
    adv_hidden = tl.layers.Dense(n_units=64)(layer_2)

    # state value
    state_value = tl.layers.Dense(n_units=1)(state_hidden)
    # advantage value
    adv_value = tl.layers.Dense(n_units=self.action_dim)(adv_hidden)

    mean = tl.layers.Lambda(lambda x: tf.reduce_mean(x, axis=1, keepdims=True))(adv_value)
    advantage = tl.layers.ElementwiseLambda(lambda x, y: x-y)([adv_value, mean])
    # output
    output_layer = tl.layers.ElementwiseLambda(lambda x, y: x+y)([state_value, advantage])
    return tl.models.Model(inputs=input_layer, outputs=output_layer)
  • _init_
def __init__(self, env):
    self.env = env
    self.state_dim = self.env.observation_space.shape[0]
    self.action_dim = self.env.action_space.n

    self.model = create_model([None, self.state_dim])
    self.target_model = create_model([None, self.state_dim])
    self.model.train()
    self.target_model.eval()
    self.model_optim  = tf.optimizers.Adam(lr=args.lr)

    self.epsilon = args.eps

    self.buffer = ReplayBuffer()
3.2. target_update
def target_update(self):  
	"""Copy q network to target q network"""  
	for weights, target_weights in zip(  
			self.model.trainable_weights, self.target_model.trainable_weights):  
		target_weights.assign(weights)
3.3. choose_action
def choose_action(self, state):
    if np.random.uniform() < self.epsilon:
        return np.random.choice(self.action_dim)
    else:
        q_value = self.model(state[np.newaxis, :])[0]
        return np.argmax(q_value)
  • np.random.uniform(low=0,high=1.0),生成随机数,默认范围是[0,1]
  • choose_action函数首先产生一个范围为[0,1]的随机数,如果随机数小于ε,则进行探索,否则使用价值函数对当前状态进行评估,选择q值最大的动作。
  • [np.newaxis, :]的作用是在np.newaxis的位置添加新的维度,在这里state是形状为(,state.dim)的向量,添加维度0后,就变成了(1,state.dim)维的向量。
  • model后面加[0]是因为此时只输入了一个state,因此结果也只返回一组动作的q_value值。
  • np.argmax的作用是找到数组中最大的数,并返回下标。
3.4. replay
  • 在replay函数中,主要完成价值网络参数的更新,也是本代码中主要使用"Cuda"计算的地方。
def replay(self):
    for _ in range(10):
        states, actions, rewards, next_states, done = self.buffer.sample()
        target = self.target_model(states).numpy()
        # next_q_values [batch_size, action_dim]
        next_target = self.target_model(next_states).numpy()
        # next_q_value [batch_size, 1]
        next_q_value = next_target[
            range(args.batch_size), np.argmax(self.model(next_states), axis=1)
        ]
        target[range(args.batch_size), actions] = rewards + (1 - done) * args.gamma * next_q_value

        # use sgd to update the network weight
        with tf.GradientTape() as tape:
            q_pred = self.model(states)
            loss = tf.losses.mean_squared_error(target, q_pred)
        grads = tape.gradient(loss, self.model.trainable_weights)
        self.model_optim.apply_gradients(zip(grads, self.model.trainable_weights))
  • D3QN使用Q网络选择动作,再用Target网络评估价值。
3.5. test_episode
  • 在test_episode函数中,对模型进行测试数次,并将每次运行的结果保存为gif文件。
def test_episode(self, test_episodes):
    for episode in range(test_episodes):
        state = self.env.reset().astype(np.float32)
        total_reward, done = 0, False
        frames = []
        while not done:
            action = self.model(np.array([state], dtype=np.float32))[0]
            action = np.argmax(action)
            next_state, reward, done, _ = self.env.step(action)
            next_state = next_state.astype(np.float32)

            total_reward += reward
            state = next_state
            frames.append(env.render(mode='rgb_array'))
        # 将本场游戏保存为gif
        if args.save_gif:
            dir_path = os.path.join('testVideo', '_'.join([ALG_NAME, ENV_ID]))
            if not os.path.exists(dir_path):
                os.makedirs(dir_path)
            display_frames_as_gif(frames, dir_path + '\\' + str(episode) + ".gif")
        print("Test {} | episode rewards is {}".format(episode, total_reward))
  • 如何将gym运行过程保存为gif文件?
from matplotlib import animation  
import matplotlib.pyplot as plt

#第一步:定义帧画面转化为gif的函数
def display_frames_as_gif(frames, path):  
	patch = plt.imshow(frames[0])  
	plt.axis('off')  

	def animate(i):  
		patch.set_data(frames[i])  

	anim = animation.FuncAnimation(plt.gcf(), animate, frames=len(frames), interval=5)  
	anim.save(path, writer='pillow', fps=30)
	
#第二步:定义一个frames,用于收集游戏过程中的画面
frames = []  

#第三步:在游戏运行过程中,收集画面
frames.append(self.env.render(mode = 'rgb_array'))  

#第四部:游戏运行完毕后,将frames中的内容保存为gif
dir_path = os.path.join('testVideo', '_'.join([ALG_NAME, ENV_ID]))  
if not os.path.exists(dir_path):  
	os.makedirs(dir_path)  
display_frames_as_gif(frames, dir_path + '\\' + str(episode) + ".gif")
3.6. train
def train(self, train_episodes=200):
    self.loadModel()
    if args.train:
        all_ep_r = []
        for episode in range(train_episodes):
            total_reward, done = 0, False
            state = self.env.reset().astype(np.float32)
            while not done:
                if args.render:
                    env.render()
                action = self.choose_action(state)
                next_state, reward, done, _ = self.env.step(action)
                next_state = next_state.astype(np.float32)
                reward -= 0.1
                self.buffer.push(state, action, reward, next_state, done)
                total_reward += reward
                state = next_state
                # self.render()
            if len(self.buffer.buffer) > args.batch_size:
                self.replay()
                self.target_update()

            if episode == 0:
                all_ep_r.append(total_reward)
            else:
                all_ep_r.append(all_ep_r[-1] * 0.9 + total_reward * 0.1)
            print(
                'Episode: {}/{}  | Episode Reward: {:.4f}'.format(
                    episode, args.train_episodes, total_reward
                )
            )
            # 一百轮保存一遍模型
            if episode % 100 == 0:
                self.saveModel()
	else:
        self.test_episode(test_episodes=args.test_episodes)
3.7. saveModel
def saveModel(self):
    path = os.path.join('model', '_'.join([ALG_NAME, ENV_ID]))
    if not os.path.exists(path):
        os.makedirs(path)
    tl.files.save_weights_to_hdf5(os.path.join(path, 'model.hdf5'), self.model)
    tl.files.save_weights_to_hdf5(os.path.join(path, 'target_model.hdf5'), self.target_model)
    print('Saved weights.')
3.8. loadModel
def loadModel(self):
    path = os.path.join('model', '_'.join([ALG_NAME, ENV_ID]))
    if os.path.exists(path):
        print('Load DQN Network parametets ...')
        tl.files.load_hdf5_to_weights_in_order(os.path.join(path, 'model.hdf5'), self.model)
        tl.files.load_hdf5_to_weights_in_order(os.path.join(path, 'target_model.hdf5'), self.target_model)
        print('Load weights!')
    else: print("No model file find, please train model first...")

4.主程序

if __name__ == '__main__':  
	env = gym.make(ENV_ID)  
	agent = D3QN(env)  
	agent.train(train_episodes=args.train_episodes)  
	env.close()

训练结果

训练1000盘后

请添加图片描述
请添加图片描述

请添加图片描述

更详细的代码解释参考:DQN with Target代码实现

猜你喜欢

转载自blog.csdn.net/weixin_40735291/article/details/120790788