• 强化学习:伪代码汇总及用DQN求解MountainCar-v0问题代码


    半梯度Sarsa 伪代码

    在这里插入图片描述

    Q-Learning 伪代码

    在这里插入图片描述

    DQN伪代码

    DQN利用经验回放和TD网络打破数据间关联,实现神经网络收敛及稳定性

    在这里插入图片描述
    [ 14 ] : θ t + 1 ← θ t + α [ r t + γ max ⁡ a ′ Q ( s t + 1 , a ′ ; θ − ) − Q ( s t , a t ; θ ) ] ∇ Q ( s t , a t ; θ ) [14]:\theta_{t+1}\leftarrow \theta_{t} + \alpha[r_t+\gamma\max_{a^{'}}Q(s_{t+1},a^{'};\theta^-)-Q(s_{t},a_t;\theta)]\nabla Q(s_t,a_t;\theta) [14]:θt+1θt+α[rt+γmaxaQ(st+1,a;θ)Q(st,at;θ)]Q(st,at;θ)

    [ 15 ] : θ = θ + Δ θ [15]:\theta =\theta +\Delta \theta [15]:θ=θ+Δθ
    [ 16 ] : θ − = θ [16]:\theta^{-}=\theta [16]:θ=θ

    Double DQN 伪代码

    ​ double DQN 克服了DQN与Q-Learning的过估计问题

    在这里插入图片描述

    Dueling DQN网络

    ​ Dueling DQN此部分网络结构上改进DQN。将动作价值函数分解为:
    Q π ( s , a ) = V π ( s ) + A π ( s , a ) Q^{\pi}(s,a)=V^{\pi}(s)+A^{\pi}(s,a) Qπ(s,a)=Vπ(s)+Aπ(s,a)
    在这里插入图片描述

    代码

    各种要导入的库:

    import sys
    import numpy as np
    sys.path.append(r'D:\Anaconda3\envs\pythonProjectNewStart\Lib\site-packages')
    import torch
    # 导入torch的各种模块
    import torch.nn as nn
    from torch.nn import functional as F
    from torch.distributions import Categorical
    import gym
    # 环境类型
    env = gym.make("MountainCar-v0")
    env = env.unwrapped
    print("初始状态{}".format(np.array(env.reset())))
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    经验回放的数组类:

    # 经验回放数组类
    class ReplayBuffer(object):
        def __init__(self,caplicity):
            self.Buffer = deque(maxlen=caplicity)
        # 往经验数组中加入新的数据
        def push(self,state,action,reward,next_state,done):
            state = np.expand_dims(state,0)
            next_state = np.expand_dims(next_state,0)
            self.Buffer.append((state,action,reward,next_state,done))
            # 从ReplayBuffer中抽取batchsize大小的数据
        def sample(self,batchsize):
            # 各个数组按时间排序抽出
            state_array = []
            action_array = []
            reward_array = []
            next_state_array = []
            done_array = []
            # 随机抽样导出数据
            batchsize_buffer = random.sample(self.Buffer,batchsize)
            for unit in zip(batchsize_buffer):
                state,action,reward,next_state,done = unit[0]
                state_array.append(state)
                action_array.append(action)
                reward_array.append(reward)
                next_state_array.append(next_state)
                done_array.append(done)
            return state_array,action_array,reward_array,next_state_array,done_array
        # 返回当前ReplayBuffer的尺寸
        def __len__(self):
            return len(self.Buffer)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    DQN网络的搭建:

    # 下面搭建神经网络DQN
    class DQN(nn.Module):
        def __init__(self,num_states,num_actions,num_hidden=64,learning_rate=0.01):
            super(DQN,self).__init__()
            self.num_states = num_states
            self.num_actions = num_actions
            # 下面是定义的网络架构
            self.layers = nn.Sequential(
            nn.Linear(num_states,num_hidden),
            nn.ReLU(),
            nn.Linear(num_hidden,num_hidden),
            nn.ReLU(),
            nn.Linear(num_hidden,num_actions)
            )
            self.optimzer = torch.optim.Adam(self.parameters(),lr=learning_rate)
        def forward(self,x):
            # 输入x为torch数据
            return self.layers(x)
        # 下面是根据神经网络参数选择动作
        def select_action(self,state,epsilon=0.1):
            # 输入state为numpy类型数据
            # 输出为int
            if np.random.random() <= epsilon:
                return np.random.randint(0,int(self.num_actions))
            else:
                q_value = self.forward(torch.tensor(state,dtype=torch.float))
                action = torch.argmax(q_value).item()
                return action
        # 下面是策略更新
        def update_policy(self,replay_buffer,batchsize,gamma=0.9):
            # batchsize:从replayBuffer中抽取数据进行初始化
            state_array,action_array,reward_array,next_state_array,done_array = replay_buffer.sample(batchsize)
            # 将抽出的四元组转化为torch类型
            state_array = torch.tensor(state_array,dtype=torch.float)
            action_array = torch.tensor(action_array,dtype=torch.int)
            reward_array = torch.tensor(reward_array,dtype=torch.float)
            next_state_array = torch.tensor(next_state_array,dtype=torch.float)
            done_array = torch.tensor(done_array)
            # 计算loss
            loss = []
            ## 先计算Q(s,a)
            for t in range(len(state_array)):
                s = state_array[t]
                a = action_array[t]
                r = reward_array[t]
                s_ = next_state_array[t]
                q = self.forward(s)[0][a]
                done = done_array[t]
                if done == torch.tensor(True):
                    q_target = r 
                else:
                    q_target = r + gamma*torch.max(self.forward(s_)[0])
                loss.append((q-q_target).pow(2))
            loss = torch.mean(torch.tensor(loss))
            self.optimzer.zero_grad()
            loss.requires_grad_(True)
            loss.backward()
            self.optimzer.step()
            return loss
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59

    ε \varepsilon ε的调整函数:

    # 随着学习过程中慢慢减小探索率epsilon
    def epsilon_set(epsiode,epsilon_start=1.0,epsilon_final=0.01,epsilon_decay=100):
        # epsiode:探索步长
        return epsilon_final + (epsilon_start - epsilon_final)*np.exp(-epsiode/epsilon_decay)
    
    • 1
    • 2
    • 3
    • 4

    定义环境及各个类:

    # 定义环境env以及DQN网络以及经验数组replayBuffer
    env = gym.make("MountainCar-v0")
    env = env.unwrapped
    DQN_network = DQN(num_states=2,num_actions=3)
    replay_buffer = ReplayBuffer(caplicity=10000)
    
    • 1
    • 2
    • 3
    • 4
    • 5

    定义主函数:

    # 定义主函数
    def main(epsiodes=50,batchsize=100,global_step=30,gamma=0.9):
        # 其中global_step表示每隔多长的步长更新一次
        reward_array = []
        mean_reward_array = []
        losses = [] 
        global_count = 0
        for epsiode in range(epsiodes):
            state = env.reset()
            epsiode_reward = 0.0
            while True:
                # 先计算epsilon
                epsilon = epsilon_set(epsiode)
                # 再采样动作(a)
                action = DQN_network.select_action(state,epsilon=epsilon)
                # 再得到(r,s_,done)
                next_state,reward,done,_,_ = env.step(action)
                # 存储(s,a,r,s_,done)于经验数组中
                replay_buffer.push(state,action,reward,next_state,done)
                # 状态迭代s <- s_
                state = next_state
                epsiode_reward += reward
                global_count += 1
                # 从replayBuffer中采样作为参数更新
                if (replay_buffer.__len__()>=batchsize)and(global_count%global_step == 0):
                    loss = DQN_network.update_policy(replay_buffer,batchsize,gamma=gamma)
                    losses.append(loss)
                # 当当前是结束状态时退出
                if done:
                    break
            reward_array.append(epsiode_reward)
            mean_reward_array.append(np.mean(reward_array))
            print("epsiode:{},rewards:{},mean_rewards:{}".format(epsiode,reward_array[-1],int(mean_reward_array[-1])))
        return reward_array,mean_reward_array,losses
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    执行部分:

    # 不带target_network的运行
    reward_array,mean_reward_array,losses = main(epsiodes=50,batchsize=50)
    plt.plot(reward_array)
    plt.plot(mean_reward_array)
    plt.plot(losses)
    
    • 1
    • 2
    • 3
    • 4
    • 5

    效果:

    epsiode:0,rewards:-44051.0,mean_rewards:-44051
    epsiode:1,rewards:-19320.0,mean_rewards:-31685
    epsiode:2,rewards:-9235.0,mean_rewards:-24202
    epsiode:3,rewards:-52916.0,mean_rewards:-31380
    epsiode:4,rewards:-142999.0,mean_rewards:-53704
    epsiode:5,rewards:-102141.0,mean_rewards:-61777
    epsiode:6,rewards:-107398.0,mean_rewards:-68294
    epsiode:7,rewards:-2794.0,mean_rewards:-60106
    epsiode:8,rewards:-161268.0,mean_rewards:-71346
    epsiode:9,rewards:-1394702.0,mean_rewards:-203682
    epsiode:10,rewards:-432946.0,mean_rewards:-224524
    epsiode:11,rewards:-16071.0,mean_rewards:-207153
    epsiode:12,rewards:-1041327.0,mean_rewards:-271320
    epsiode:13,rewards:-583157.0,mean_rewards:-293594
    epsiode:14,rewards:-406127.0,mean_rewards:-301096
    epsiode:15,rewards:-1726005.0,mean_rewards:-390153
    ...
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    简单来说就已经是震荡的看不下去了…

  • 相关阅读:
    什么是持续部署
    面试素材-结构化
    maven高级
    请问有哪些让人惊艳的数据可视化工具?
    QGIS制作精美地图
    微信小程序云开发教程——墨刀原型工具入门(页面交互+交互案例教程)
    织梦翻译插件-织梦自动采集翻译插件
    【c++智能指针】模拟实现my_shared_ptr
    解决方案 | 如何构建市政综合管廊安全运行监测系统?
    Logo设计教程:从入门到精通的全程指导
  • 原文地址:https://blog.csdn.net/shengzimao/article/details/126323311