• 强化学习------Policy Gradient算法


    简介

    之前的QLearning DQN Sarsa都是通过计算动作得分来决策的,我们是在确定了价值函数的基础上采用某种策略,即Value-Based,通过先算出价值函数,再去做决策。而Policy Gradient算法是一种直接的方法,我们直接去评估策略的好坏,然后进行选择。即Policy-Base

    智能体通过与环境的交互获得特定时刻的状态信息,并直接给出下一步要采取各种动作的概率,然后根据该状态动作的策略分布采取下一步的行动,所以每种动作都有可能被选中,只是选中的概率性不同。智能体直接学习状态动作的策略分布,在强化学习的训练中,用神经网络来表示状态动作分布,给一个状态,就会输出该状态下的动作分布。强化学习算法直接对策略进行优化,使指定的策略能够获得最大的奖励。

    PG算法原理

    在这里插入图片描述

    强化学习主要目标强是最大化智能体在与环境交互的过程中获得的累积奖励的期望值
    考虑一个随机参数化的策略

    • π(θ) 是一个参数化的策略函数,它接受当前的观测作为输入,输出一个动作的概率分布。
    • E 表示期望值,表示对所有可能的轨迹 τ 进行加权平均。
    • τ 表示一个轨迹,它包含了智能体在环境中与环境进行交互的一系列状态、动作和奖励。
    • R(τ) 表示轨迹 τ 的累积奖励,表示智能体在轨迹中获得的所有奖励的总和。

    通过梯度上升法优化策略即有
    在这里插入图片描述

    • θ_k 表示当前的参数值,即第 k 次迭代时的参数值。
    • θ_{k+1} 表示下一次迭代的参数值,即第 k+1 次迭代时的参数值。
    • α 表示学习率,它决定了每次迭代时参数更新的幅度。
    • ∇_θ J(π_θ) 表示性能指标 J(π_θ) 对参数 θ 的梯度,它表示了在当前参数值下,性能指标 J(π_θ) 变化最快的方向。
    • ∣ θ_k 表示在参数值为 θ_k 的情况下计算梯度。

    在这里插入图片描述

    先实现一个episode然后从后往前计算回报,损失函数是负的回报乘于log的该状态下采取该动作的概率。每个状态动作对对应算一次loss,然后反向传播计算梯度。最后整个episode完之后进行梯度下降。

    本代码核心在于更新参数的部分
    更新参数的步骤如下:

    • Step 1: 计算每一步的状态价值 首先,将每个时间步的奖励值进行折扣,得到折扣后的奖励值。这里使用的折扣因子为GAMMA。
      然后,从最后一个时间步开始,计算每个时间步的状态价值。状态价值等于当前时间步的奖励值加上下一个时间步的折扣后的状态价值。这个过程通过一个循环实现。
      最后,对计算得到的状态价值进行标准化处理,减去均值并除以标准差。
            for t in reversed(range(0, len(self.ep_rs))):
                running_add = running_add * GAMMA + self.ep_rs[t]
                discounted_ep_rs[t] = running_add
    
            # 标准化处理
            discounted_ep_rs -= np.mean(discounted_ep_rs)  # 减均值
            discounted_ep_rs /= np.std(discounted_ep_rs)  # 除以标准差
            discounted_ep_rs = torch.FloatTensor(discounted_ep_rs).to(device)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • Step 2: 前向传播 将状态观测值作为输入,通过神经网络进行前向传播,得到每个动作的概率分布。
      这里使用了softmax函数将输出转化为概率分布。
    # Step 2: 前向传播
    softmax_input = self.network.forward(torch.FloatTensor(self.ep_obs).to(device))
    # all_act_prob = F.softmax(softmax_input, dim=0).detach().numpy()
    neg_log_prob = F.cross_entropy(input=softmax_input, target=torch.LongTensor(self.ep_as).to(device),
                                      reduction='none')
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • Step 3: 反向传播 计算负对数似然损失函数,该损失函数用于最大化选择正确动作的概率。
      然后,将负对数似然损失函数与折扣后的状态价值相乘,得到最终的损失函数。 最后,计算损失函数的均值,用于更新神经网络的参数。
    # Step 3: 反向传播
    loss = torch.mean(neg_log_prob * discounted_ep_rs)
    self.optimizer.zero_grad()
    loss.backward()
    self.optimizer.step()
    
    • 1
    • 2
    • 3
    • 4
    • 5

    代码如下:

    import os
    
    import gym
    import torch
    import torch.nn as nn
    import torch.nn.functional as F
    import numpy as np
    import random
    import time
    from collections import deque
    
    # Hyper Parameters for PG Network
    GAMMA = 0.95  # discount factor
    LR = 0.01  # learning rate
    
    # Use GPU
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    
    
    # torch.backends.cudnn.enabled = False  # 非确定性算法
    
    
    class PGNetwork(nn.Module):
        def __init__(self, state_dim, action_dim):
            super(PGNetwork, self).__init__()
            self.fc1 = nn.Linear(state_dim, 20)
            self.fc2 = nn.Linear(20, action_dim)
    
        def forward(self, x):
            out = F.relu(self.fc1(x))
            out = self.fc2(out)
            return out
    
    
    class PG(object):
        # dqn Agent
        def __init__(self, env):  # 初始化
            # 状态空间和动作空间的维度
            self.state_dim = env.observation_space.shape[0]
            self.action_dim = env.action_space.n
    
            # init N Monte Carlo transitions in one game
            self.ep_obs, self.ep_as, self.ep_rs = [], [], []
    
            # init network parameters
            self.network = PGNetwork(state_dim=self.state_dim, action_dim=self.action_dim).to(device)
            self.optimizer = torch.optim.Adam(self.network.parameters(), lr=LR)
            # 加载以前保存的模型(如果有的话)
            if os.path.exists("./model/model.pkl"):
                self.network.load_state_dict(torch.load("./model/model.pkl"))
                # 加载以前保存的优化器
                self.optimizer.load_state_dict(torch.load("./model/optimizer.pkl"))
    
    
            # init some parameters
            self.time_step = 0
    
        def choose_action(self, observation):
            observation = torch.FloatTensor(observation).to(device)
            network_output = self.network.forward(observation)
    
            prob_weights = F.softmax(network_output, dim=0).detach().numpy()
            action = np.random.choice(range(prob_weights.shape[0]),
                                      p=prob_weights)  # select action w.r.t the actions prob
            return action
    
        # 将状态,动作,奖励这一个transition保存到三个列表中
        def store_transition(self, s, a, r):
            self.ep_obs.append(s)
            self.ep_as.append(a)
            self.ep_rs.append(r)
    
        def learn(self):
            self.time_step += 1
    
            # Step 1: 计算每一步的状态价值
            discounted_ep_rs = np.zeros_like(self.ep_rs)
            running_add = 0
            # 注意这里是从后往前算的,所以式子还不太一样。算出每一步的状态价值
            # 前面的价值的计算可以利用后面的价值作为中间结果,简化计算;从前往后也可以
            for t in reversed(range(0, len(self.ep_rs))):
                running_add = running_add * GAMMA + self.ep_rs[t]
                discounted_ep_rs[t] = running_add
    
            discounted_ep_rs -= np.mean(discounted_ep_rs)  # 减均值
            discounted_ep_rs /= np.std(discounted_ep_rs)  # 除以标准差
            discounted_ep_rs = torch.FloatTensor(discounted_ep_rs).to(device)
    
            # Step 2: 前向传播
            softmax_input = self.network.forward(torch.FloatTensor(self.ep_obs).to(device))
            # all_act_prob = F.softmax(softmax_input, dim=0).detach().numpy()
            neg_log_prob = F.cross_entropy(input=softmax_input, target=torch.LongTensor(self.ep_as).to(device),
                                           reduction='none')
    
            # Step 3: 反向传播
            loss = torch.mean(neg_log_prob * discounted_ep_rs)
            self.optimizer.zero_grad()
            loss.backward()
            self.optimizer.step()
    
            # 每次学习完后清空数组
            self.ep_obs, self.ep_as, self.ep_rs = [], [], []
    
    
    # ---------------------------------------------------------
    # Hyper Parameters
    ENV_NAME = 'CartPole-v0'
    EPISODE = 3000  # Episode limitation
    STEP = 300  # Step limitation in an episode
    TEST = 10  # The number of experiment test every 100 episode
    
    
    def main():
        # initialize OpenAI Gym env and dqn agent
        env = gym.make(ENV_NAME)
        agent = PG(env)
    
        for episode in range(EPISODE):
            # initialize task
            state = env.reset()
            # Train
            # 只采一盘?N个完整序列
            for step in range(STEP):
                action = agent.choose_action(state)  # softmax概率选择action
                next_state, reward, done, _ = env.step(action)
                agent.store_transition(state, action, reward)  # 新函数 存取这个transition
                state = next_state
                if done:
                    # print("stick for ",step, " steps")
                    agent.learn()  # 更新策略网络
                    break
    
            # Test every 100 episodes
            if episode % 100 == 0:
                # 保存模型
                torch.save(agent.network.state_dict(), "./model/model.pkl")
                torch.save(agent.optimizer.state_dict(), "./model/optimizer.pkl")
                print("save model to /model/model.pkl")
                total_reward = 0
                for i in range(TEST):
                    state = env.reset()
                    for j in range(STEP):
                        env.render()
                        action = agent.choose_action(state)  # direct action for test
                        state, reward, done, _ = env.step(action)
                        total_reward += reward
                        if done:
                            break
                ave_reward = total_reward / TEST
                print('episode: ', episode, 'Evaluation Average Reward:', ave_reward)
    
    
    if __name__ == '__main__':
        time_start = time.time()
        main()
        time_end = time.time()
        print('The total time is ', time_end - time_start)
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159

    效果:

    在这里插入图片描述

    参考

    策略梯度算法
    policy gradient详解

  • 相关阅读:
    05-CSS溢出属性&文本溢出省略号
    代码随想录(番外)图论1
    山东企业ITSS认证条件
    基于springboot、logback的日志脱敏组件
    javascript+canvas身份证背景颜色识别
    MySQL 深度分页性能急剧下降,该如何优化?
    牛客小白月赛55
    深度学习·理论篇(2023版)·第004篇深度学习和概率论基础01:使用大白话举例说明条件概率独立概率+期望值方差+协方差+熵和平均编码长度的关系
    Android 解决CameraView叠加2个以上滤镜拍照黑屏的BUG (二)
    如何在Android应用程序中实现高效的图片加载和缓存机制。
  • 原文地址:https://blog.csdn.net/niulinbiao/article/details/133869359