• 强化学习之Dueling DQN对DQN的改进——以倒立摆环境(Inverted Pendulum)为例


    0.简介

    参考博客来源:DeepRL系列(10): Dueling DQN(DDQN)原理及实现icon-default.png?t=N7T8https://zhuanlan.zhihu.com/p/114834834

     通过前面的推导,我们得到了Dueling Network的数学形式为

    Q(s,a;w)=V(s;w^{V})+a(s,a;w^{A})-max_{a}A(s,a;w^{A})

     实际中将最大化形式变成均值形式效果更好,更稳定,其数学形式如下

    Q(s,a;w)=V(s;w^{V})+A(s,a;w^{A})-mean_{a}A(s,a;w^{A})

    1.导库

    1. import torch
    2. import numpy as np
    3. import matplotlib.pyplot as plt
    4. from tqdm import tqdm
    5. import gym
    6. import collections
    7. import random

    2.神经网络Qnet和VAnet构建

    1. class VAnet(torch.nn.Module):
    2. """ 只有一层隐藏层的A网络和V网络 """
    3. def __init__(self,statedim,hiddendim,actiondim):
    4. super(VAnet,self).__init__()
    5. self.fc1=torch.nn.Linear(statedim,hiddendim)
    6. self.fcA=torch.nn.Linear(hiddendim,actiondim)
    7. self.fcV=torch.nn.Linear(hiddendim,1)
    8. def forward(self,x):
    9. A=self.fcA(torch.nn.functional.relu(self.fc1(x)))
    10. V=self.fcV(torch.nn.functional.relu(self.fc1(x)))
    11. # Q=V+A-A.mean(1).unsqueeze(1)#unsqueeze 则用于在指定位置增加一个维度 本式子相当于下式
    12. Q=V+A-A.mean(1).view(-1,1)
    13. return Q
    14. def save(self, path):
    15. torch.save(self.state_dict(), path)
    16. def load(self, path):
    17. self.load_state_dict(torch.load(path))
    18. class Qnet(torch.nn.Module):
    19. """ 只有一层隐藏层的Q网络 """
    20. def __init__(self,statedim,hiddendim,actiondim):
    21. super(Qnet,self).__init()
    22. self.fc1=torch.nn.Linear(statedim,hiddendim)
    23. self.fc2=torch.nn.Linear(hiddendim,actiondim)
    24. def forward(self,x):
    25. x=torch.nn.functional.relu(self.fc1(x))
    26. return self.fc2(x)
    27. def save(self, path):
    28. torch.save(self.state_dict(), path)
    29. def load(self, path):
    30. self.load_state_dict(torch.load(path))

    3.经验回放池实现

    1. class ReplayBuffer:
    2. """ 经验回放池 """
    3. def __init__(self,capacity):
    4. self.buffer=collections.deque(maxlen=capacity)
    5. def add(self,state,action,reward,nextstate,done):
    6. self.buffer.append((state,action,reward,nextstate,done))
    7. def sample(self,batchsize):
    8. transitions=random.sample(self.buffer,batchsize)
    9. state,action,reward,nextstate,done=zip(*transitions)
    10. return np.array(state),action,reward,np.array(nextstate),done
    11. def size(self):
    12. return len(self.buffer)

    当然我们神经网络也可以写成如下形式,是等价的。

    1. class VAnet(torch.nn.Module):
    2. def __init__(self, statedim, hiddendim, actiondim):
    3. super(VAnet, self).__init__()
    4. self.A = torch.nn.Sequential(
    5. torch.nn.Linear(statedim, hiddendim),
    6. torch.nn.ReLU(),
    7. torch.nn.Linear(hiddendim, actiondim),
    8. # torch.nn.Softmax(dim=1)
    9. )
    10. self.V = torch.nn.Sequential(
    11. torch.nn.Linear(statedim, hiddendim),
    12. torch.nn.ReLU(),
    13. torch.nn.Linear(hiddendim, 1)
    14. )
    15. def forward(self, x):
    16. a_output = self.A(x)
    17. v_output = self.V(x)
    18. a_mean = a_output.mean(1).view(-1, 1)
    19. return a_output + v_output - a_mean

     4.离散动作转为连续函数的实现函数

    1. def dis_to_con(actionid,env,actiondim):#离散动作转回连续函数
    2. actionlowbound=env.action_space.low[0]#连续动作最小值
    3. actionupbound=env.action_space.high[0]#连续动作最大值
    4. return actionlowbound+actionid*(actionupbound-actionlowbound)/(actiondim-1)

    5.DQN算法实现

    1. class DQN:
    2. """ DQN算法,包括DoubleDQN和DuelingDQN """
    3. def __init__(self,statedim,hiddendim,actiondim,learningrate,gamma,epsilon,targetupdate,device,dqntype='VanillaDQN'):
    4. self.actiondim=actiondim
    5. self.gamma=gamma
    6. self.epsilon=epsilon
    7. self.targetupdate=targetupdate
    8. self.device=device
    9. self.dqntype=dqntype
    10. self.count=0
    11. if self.dqntype=='DuelingDQN':#Dueling DQN采取不一样的网络框架
    12. self.qnet=VAnet(statedim=statedim,hiddendim=hiddendim,actiondim=actiondim).to(self.device)
    13. self.targetqnet=VAnet(statedim=statedim,hiddendim=hiddendim,actiondim=actiondim).to(self.device)
    14. else:
    15. self.qnet=Qnet(statedim=statedim,hiddendim=hiddendim,actiondim=actiondim).to(self.device)
    16. self.targetqnet=Qnet(statedim=statedim,hiddendim=hiddendim,actiondim=actiondim).to(self.device)
    17. self.optimizer=torch.optim.Adam(self.qnet.parameters(),lr=learningrate)
    18. def takeaction(self,state):
    19. if np.random.random()
    20. action=np.random.randint(self.actiondim)
    21. else:
    22. state=torch.tensor([state],dtype=torch.float).to(self.device)
    23. action=self.qnet(state).argmax().item()
    24. return action
    25. def max_qvalue(self,state):
    26. state=torch.tensor([state],dtype=torch.float).to(self.device)
    27. return self.qnet(state).max().item()
    28. def update(self,transition_dict):
    29. states=torch.tensor(transition_dict['states'],dtype=torch.float).to(self.device)
    30. actions=torch.tensor(transition_dict['actions']).view(-1,1).to(self.device)
    31. rewards=torch.tensor(transition_dict['rewards'],dtype=torch.float).view(-1,1).to(self.device)
    32. nextstates=torch.tensor(transition_dict['nextstates'],dtype=torch.float).to(self.device)
    33. dones=torch.tensor(transition_dict['dones'],dtype=torch.float).view(-1,1).to(self.device)
    34. qvalues=self.qnet(states).gather(1,actions)#gather(1, actions) 中的参数 1 表示沿着第 1 维度(即列维度)进行收集操作,根据 actions 提供的索引来收集相应的 qvalues 。
    35. if self.dqntype=='DoubleDQN':
    36. maxaction=self.qnet(nextstates).max(1)[1].view(-1,1)#max(1)表示在第 1 个维度(通常是列维度)上求最大值;max(1)会返回两个值,第一个是每行的最大值,第二个是最大值所在的索引[1]。
    37. maxnextqvalues=self.targetqnet(nextstates).gather(1,maxaction)
    38. else:
    39. maxnextqvalues=self.targetqnet(nextstates).max(1)[0].view(-1,1)
    40. targetqvalues=rewards+self.gamma*maxnextqvalues*(1-dones)
    41. dqnloss=torch.mean(torch.nn.functional.mse_loss(qvalues,targetqvalues))
    42. self.optimizer.zero_grad()
    43. dqnloss.backward()
    44. self.optimizer.step()
    45. if self.count % self.targetupdate==0:
    46. self.targetqnet.load_state_dict(self.qnet.state_dict())
    47. self.count+=1

    6.训练DQN函数实现

    1. def trainDQN(agent,env,episodesnum,pbarnum,printreturnnum,replaybuffer,minimalsize,batchsize):
    2. returnlist=[]
    3. maxqvaluelist=[]
    4. maxqvalue=0
    5. for k in range(pbarnum):
    6. with tqdm(total=int(episodesnum/pbarnum),desc='Iteration %d'%k) as pbar:
    7. for episode in range(int(episodesnum/pbarnum)):
    8. episodereturn=0
    9. state=env.reset(seed=10)[0]
    10. done=False
    11. while not done:
    12. action=agent.takeaction(state)
    13. maxqvalue=agent.max_qvalue(state)*0.005+maxqvalue*0.995#平滑处理
    14. maxqvaluelist.append(maxqvalue)#记录最大q值
    15. action_continuous=dis_to_con(actionid=action,env=env,actiondim=agent.actiondim)
    16. nextstate,reward,done,truncated,_=env.step([action_continuous])
    17. done=done or truncated
    18. replaybuffer.add(state,action,reward,nextstate,done)
    19. state=nextstate
    20. episodereturn+=reward
    21. if replaybuffer.size()>minimalsize:
    22. bs,ba,br,bns,bd=replaybuffer.sample(batchsize)
    23. transitiondict={'states':bs,'actions':ba,'rewards':br,'nextstates':bns,'dones':bd}
    24. agent.update(transitiondict)
    25. returnlist.append(episodereturn)
    26. if (episode+1)%printreturnnum==0:
    27. pbar.set_postfix({'episode':'%d'%(int(episodesnum/pbarnum)*k+episode+1),'return':'%.3f'%np.mean(returnlist[-printreturnnum:])})
    28. pbar.update(1)
    29. return returnlist,maxqvaluelist

    7.移动平均函数实现

    1. def moving_average(a, window_size):
    2. cumulative_sum = np.cumsum(np.insert(a, 0, 0))
    3. middle = (cumulative_sum[window_size:] - cumulative_sum[:-window_size]) / window_size
    4. r = np.arange(1, window_size-1, 2)
    5. begin = np.cumsum(a[:window_size-1])[::2] / r
    6. end = (np.cumsum(a[:-window_size:-1])[::2] / r)[::-1]
    7. return np.concatenate((begin, middle, end))

    8.参数设置

    1. lr=1e-2
    2. gamma=0.98
    3. epsilon=0.01
    4. target_update=10
    5. batchsize=64
    6. minimalsize=500
    7. episodesnum=500
    8. buffersize=10000
    9. hiddendim=128
    10. actiondim=11
    11. pbarnum=10
    12. printreturnnum=10
    13. device=torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

    9.倒立摆环境下训练并实现可视化

    1. random.seed(10)
    2. np.random.seed(10)
    3. torch.manual_seed(10)
    4. replaybuffer=ReplayBuffer(buffersize)
    5. env=gym.make('Pendulum-v1')
    6. env.reset(seed=10)
    7. statedim=env.observation_space.shape[0]
    8. agent=DQN(statedim=statedim,hiddendim=hiddendim,actiondim=actiondim,learningrate=lr,gamma=gamma,epsilon=epsilon,targetupdate=target_update,device=device,dqntype='DuelingDQN')
    9. returnlist,maxqvaluelist=trainDQN(agent=agent,env=env,episodesnum=episodesnum,pbarnum=pbarnum,printreturnnum=printreturnnum,replaybuffer=replaybuffer,minimalsize=minimalsize,batchsize=batchsize)
    10. episodelist=np.arange(len(returnlist))#等价于np.linspace(0,len(returnlist)-1,len(returnlist))以及list(range(len(returnlist)))
    11. plt.plot(episodelist,returnlist)
    12. plt.xlabel('Episodes')
    13. plt.ylabel('Return')
    14. plt.title(f'{agent.dqntype} on {env.spec.name}')
    15. plt.show()
    16. framslist=np.arange(len(maxqvaluelist))
    17. plt.plot(framslist,maxqvaluelist)
    18. plt.axhline(y=0,color='purple',linestyle='--')
    19. plt.axhline(y=10,c='red',ls='--')
    20. plt.xlabel('Frames')
    21. plt.ylabel('Q value')
    22. plt.title(f'{agent.dqntype} on {env.spec.name}')
    23. plt.show()
    24. env.close()

    9.可视化结果显示以及结论

     

     结论:相比传统的DQN,Dueing DQN在多个动作选择下的学习更加稳定,得到的回报最大值也更大,由Dueling DQN 原理知随着动作空间增大,Dueling DQN相比DQN优势更加明显。本实验中离散动作数设置为11,可以增加动作数(例如15,25,30等),继续对比实验,实验效果更为明显。

    当然我们可以改变网络结构,加大隐藏层数量, 更改神经网络结构如下所示。

    1. class VAnet(torch.nn.Module):
    2. def __init__(self, statedim, hiddendim, actiondim):
    3. super(VAnet, self).__init__()
    4. self.A = torch.nn.Sequential(
    5. torch.nn.Linear(statedim, hiddendim),
    6. torch.nn.Tanh(), # 改变激活函数为 Tanh
    7. torch.nn.Linear(hiddendim, hiddendim), # 增加一层隐藏层
    8. torch.nn.ReLU(),
    9. torch.nn.Linear(hiddendim, actiondim),
    10. # torch.nn.Softmax(dim=1)
    11. )
    12. self.V = torch.nn.Sequential(
    13. torch.nn.Linear(statedim, hiddendim),
    14. torch.nn.Tanh(), # 改变激活函数为 Tanh
    15. torch.nn.Linear(hiddendim, hiddendim), # 增加一层隐藏层
    16. torch.nn.ReLU(),
    17. torch.nn.Linear(hiddendim, 1)
    18. )
    19. def forward(self, x):
    20. return self.A(x) + self.V(x) - self.A(x).mean(1).view(-1, 1)
    21. def save(self, path):
    22. torch.save(self.state_dict(), path)
    23. def load(self, path):
    24. self.load_state_dict(torch.load(path))

     结果如下所示:

  • 相关阅读:
    JS高级:浏览器内核
    05【NIO核心组件之Channel】
    Greenplum-备份与恢复
    使用Python,dlib进行对象实时追踪
    浏览器解析URL全部流程
    Javascript知识【BootStrap】
    GIT 遇到问题
    Numpy(一)简介与基本使用
    G120变频器输入输出端子功能定义配置方法及示例
    【电商项目实战】用户登录(详细篇)
  • 原文地址:https://blog.csdn.net/m0_56497861/article/details/141001674