

DQN利用经验回放和TD网络打破数据间关联,实现神经网络收敛及稳定性

[
14
]
:
θ
t
+
1
←
θ
t
+
α
[
r
t
+
γ
max
a
′
Q
(
s
t
+
1
,
a
′
;
θ
−
)
−
Q
(
s
t
,
a
t
;
θ
)
]
∇
Q
(
s
t
,
a
t
;
θ
)
[14]:\theta_{t+1}\leftarrow \theta_{t} + \alpha[r_t+\gamma\max_{a^{'}}Q(s_{t+1},a^{'};\theta^-)-Q(s_{t},a_t;\theta)]\nabla Q(s_t,a_t;\theta)
[14]:θt+1←θt+α[rt+γmaxa′Q(st+1,a′;θ−)−Q(st,at;θ)]∇Q(st,at;θ)
[
15
]
:
θ
=
θ
+
Δ
θ
[15]:\theta =\theta +\Delta \theta
[15]:θ=θ+Δθ
[
16
]
:
θ
−
=
θ
[16]:\theta^{-}=\theta
[16]:θ−=θ
double DQN 克服了DQN与Q-Learning的过估计问题

Dueling DQN此部分网络结构上改进DQN。将动作价值函数分解为:
Q
π
(
s
,
a
)
=
V
π
(
s
)
+
A
π
(
s
,
a
)
Q^{\pi}(s,a)=V^{\pi}(s)+A^{\pi}(s,a)
Qπ(s,a)=Vπ(s)+Aπ(s,a)

各种要导入的库:
import sys
import numpy as np
sys.path.append(r'D:\Anaconda3\envs\pythonProjectNewStart\Lib\site-packages')
import torch
# 导入torch的各种模块
import torch.nn as nn
from torch.nn import functional as F
from torch.distributions import Categorical
import gym
# 环境类型
env = gym.make("MountainCar-v0")
env = env.unwrapped
print("初始状态{}".format(np.array(env.reset())))
经验回放的数组类:
# 经验回放数组类
class ReplayBuffer(object):
def __init__(self,caplicity):
self.Buffer = deque(maxlen=caplicity)
# 往经验数组中加入新的数据
def push(self,state,action,reward,next_state,done):
state = np.expand_dims(state,0)
next_state = np.expand_dims(next_state,0)
self.Buffer.append((state,action,reward,next_state,done))
# 从ReplayBuffer中抽取batchsize大小的数据
def sample(self,batchsize):
# 各个数组按时间排序抽出
state_array = []
action_array = []
reward_array = []
next_state_array = []
done_array = []
# 随机抽样导出数据
batchsize_buffer = random.sample(self.Buffer,batchsize)
for unit in zip(batchsize_buffer):
state,action,reward,next_state,done = unit[0]
state_array.append(state)
action_array.append(action)
reward_array.append(reward)
next_state_array.append(next_state)
done_array.append(done)
return state_array,action_array,reward_array,next_state_array,done_array
# 返回当前ReplayBuffer的尺寸
def __len__(self):
return len(self.Buffer)
DQN网络的搭建:
# 下面搭建神经网络DQN
class DQN(nn.Module):
def __init__(self,num_states,num_actions,num_hidden=64,learning_rate=0.01):
super(DQN,self).__init__()
self.num_states = num_states
self.num_actions = num_actions
# 下面是定义的网络架构
self.layers = nn.Sequential(
nn.Linear(num_states,num_hidden),
nn.ReLU(),
nn.Linear(num_hidden,num_hidden),
nn.ReLU(),
nn.Linear(num_hidden,num_actions)
)
self.optimzer = torch.optim.Adam(self.parameters(),lr=learning_rate)
def forward(self,x):
# 输入x为torch数据
return self.layers(x)
# 下面是根据神经网络参数选择动作
def select_action(self,state,epsilon=0.1):
# 输入state为numpy类型数据
# 输出为int
if np.random.random() <= epsilon:
return np.random.randint(0,int(self.num_actions))
else:
q_value = self.forward(torch.tensor(state,dtype=torch.float))
action = torch.argmax(q_value).item()
return action
# 下面是策略更新
def update_policy(self,replay_buffer,batchsize,gamma=0.9):
# batchsize:从replayBuffer中抽取数据进行初始化
state_array,action_array,reward_array,next_state_array,done_array = replay_buffer.sample(batchsize)
# 将抽出的四元组转化为torch类型
state_array = torch.tensor(state_array,dtype=torch.float)
action_array = torch.tensor(action_array,dtype=torch.int)
reward_array = torch.tensor(reward_array,dtype=torch.float)
next_state_array = torch.tensor(next_state_array,dtype=torch.float)
done_array = torch.tensor(done_array)
# 计算loss
loss = []
## 先计算Q(s,a)
for t in range(len(state_array)):
s = state_array[t]
a = action_array[t]
r = reward_array[t]
s_ = next_state_array[t]
q = self.forward(s)[0][a]
done = done_array[t]
if done == torch.tensor(True):
q_target = r
else:
q_target = r + gamma*torch.max(self.forward(s_)[0])
loss.append((q-q_target).pow(2))
loss = torch.mean(torch.tensor(loss))
self.optimzer.zero_grad()
loss.requires_grad_(True)
loss.backward()
self.optimzer.step()
return loss
ε \varepsilon ε的调整函数:
# 随着学习过程中慢慢减小探索率epsilon
def epsilon_set(epsiode,epsilon_start=1.0,epsilon_final=0.01,epsilon_decay=100):
# epsiode:探索步长
return epsilon_final + (epsilon_start - epsilon_final)*np.exp(-epsiode/epsilon_decay)
定义环境及各个类:
# 定义环境env以及DQN网络以及经验数组replayBuffer
env = gym.make("MountainCar-v0")
env = env.unwrapped
DQN_network = DQN(num_states=2,num_actions=3)
replay_buffer = ReplayBuffer(caplicity=10000)
定义主函数:
# 定义主函数
def main(epsiodes=50,batchsize=100,global_step=30,gamma=0.9):
# 其中global_step表示每隔多长的步长更新一次
reward_array = []
mean_reward_array = []
losses = []
global_count = 0
for epsiode in range(epsiodes):
state = env.reset()
epsiode_reward = 0.0
while True:
# 先计算epsilon
epsilon = epsilon_set(epsiode)
# 再采样动作(a)
action = DQN_network.select_action(state,epsilon=epsilon)
# 再得到(r,s_,done)
next_state,reward,done,_,_ = env.step(action)
# 存储(s,a,r,s_,done)于经验数组中
replay_buffer.push(state,action,reward,next_state,done)
# 状态迭代s <- s_
state = next_state
epsiode_reward += reward
global_count += 1
# 从replayBuffer中采样作为参数更新
if (replay_buffer.__len__()>=batchsize)and(global_count%global_step == 0):
loss = DQN_network.update_policy(replay_buffer,batchsize,gamma=gamma)
losses.append(loss)
# 当当前是结束状态时退出
if done:
break
reward_array.append(epsiode_reward)
mean_reward_array.append(np.mean(reward_array))
print("epsiode:{},rewards:{},mean_rewards:{}".format(epsiode,reward_array[-1],int(mean_reward_array[-1])))
return reward_array,mean_reward_array,losses
执行部分:
# 不带target_network的运行
reward_array,mean_reward_array,losses = main(epsiodes=50,batchsize=50)
plt.plot(reward_array)
plt.plot(mean_reward_array)
plt.plot(losses)
效果:
epsiode:0,rewards:-44051.0,mean_rewards:-44051
epsiode:1,rewards:-19320.0,mean_rewards:-31685
epsiode:2,rewards:-9235.0,mean_rewards:-24202
epsiode:3,rewards:-52916.0,mean_rewards:-31380
epsiode:4,rewards:-142999.0,mean_rewards:-53704
epsiode:5,rewards:-102141.0,mean_rewards:-61777
epsiode:6,rewards:-107398.0,mean_rewards:-68294
epsiode:7,rewards:-2794.0,mean_rewards:-60106
epsiode:8,rewards:-161268.0,mean_rewards:-71346
epsiode:9,rewards:-1394702.0,mean_rewards:-203682
epsiode:10,rewards:-432946.0,mean_rewards:-224524
epsiode:11,rewards:-16071.0,mean_rewards:-207153
epsiode:12,rewards:-1041327.0,mean_rewards:-271320
epsiode:13,rewards:-583157.0,mean_rewards:-293594
epsiode:14,rewards:-406127.0,mean_rewards:-301096
epsiode:15,rewards:-1726005.0,mean_rewards:-390153
...
简单来说就已经是震荡的看不下去了…