• 策略梯度玩 cartpole 游戏,强化学习代替PID算法控制平衡杆


     

    cartpole游戏,车上顶着一个自由摆动的杆子,实现杆子的平衡,杆子每次倒向一端车就开始移动让杆子保持动态直立的状态,策略函数使用一个两层的简单神经网络,输入状态有4个,车位置,车速度,杆角度,杆速度,输出action为左移动或右移动,输入状态发现至少要给3个才能稳定一会儿,给2个完全学不明白,给4个能学到很稳定的policy

     

     

    策略梯度实现代码,使用torch实现一个简单的神经网络

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    import gym
    import torch
    import torch.nn as nn
    import torch.optim as optim
    import pygame
    import sys
    from collections import deque
    import numpy as np
     
    # 策略网络定义
    class PolicyNetwork(nn.Module):
        def __init__(self):
            super(PolicyNetwork, self).__init__()
            self.fc = nn.Sequential(
                nn.Linear(4, 10),  # 4个状态输入,128个隐藏单元
                nn.Tanh(),
                nn.Linear(10, 2),  # 输出2个动作的概率
                nn.Softmax(dim=-1)
            )
     
        def forward(self, x):
            # print(x)  车位置 车速度 杆角度 杆速度
            selected_values = x[:, [0,1,2,3]]  #只使用车位置和杆角度
            return self.fc(selected_values)
     
    # 训练函数
    def train(policy_net, optimizer, trajectories):
        policy_net.zero_grad()
        loss = 0
        print(trajectories[0])
        for trajectory in trajectories:
             
            # if trajectory["returns"] > 90:
            # returns = torch.tensor(trajectory["returns"]).float()
            # else:
            returns = torch.tensor(trajectory["returns"]).float() - torch.tensor(trajectory["step_mean_reward"]).float()
            # print(f"获得奖励{returns}")
            log_probs = trajectory["log_prob"]
            loss += -(log_probs * returns).sum()  # 计算策略梯度损失
        loss.backward()
        optimizer.step()
        return loss.item()
     
    # 主函数
    def main():
        env = gym.make('CartPole-v1')
        policy_net = PolicyNetwork()
        optimizer = optim.Adam(policy_net.parameters(), lr=0.01)
     
        print(env.action_space)
        print(env.observation_space)
        pygame.init()
        screen = pygame.display.set_mode((600, 400))
        clock = pygame.time.Clock()
     
        rewards_one_episode= []
        for episode in range(10000):
             
            state = env.reset()
            done = False
            trajectories = []
            state = state[0]
            step = 0
            torch.save(policy_net, 'policy_net_full.pth')
            while not done:
                state_tensor = torch.tensor(state).float().unsqueeze(0)
                probs = policy_net(state_tensor)
                action = torch.distributions.Categorical(probs).sample().item()
                log_prob = torch.log(probs.squeeze(0)[action])
                next_state, reward, done, _,_ = env.step(action)
     
                # print(episode)
                trajectories.append({"state": state, "action": action, "reward": reward, "log_prob": log_prob})
                state = next_state
     
                for event in pygame.event.get():
                    if event.type == pygame.QUIT:
                        pygame.quit()
                        sys.exit()
                step +=1
                 
                # 绘制环境状态
                if rewards_one_episode and rewards_one_episode[-1] >99:
                    screen.fill((255, 255, 255))
                    cart_x = int(state[0] * 100 + 300)
                    pygame.draw.rect(screen, (0, 0, 255), (cart_x, 300, 50, 30))
                    # print(state)
                    pygame.draw.line(screen, (255, 0, 0), (cart_x + 25, 300), (cart_x + 25 - int(50 * torch.sin(torch.tensor(state[2]))), 300 - int(50 * torch.cos(torch.tensor(state[2])))), 2)
                    pygame.display.flip()
                    clock.tick(200)
                     
     
            print(f"第{episode}回合",f"运行{step}步后挂了")
            # 为策略梯度计算累积回报
            returns = 0
             
             
            for traj in reversed(trajectories):
                returns = traj["reward"] + 0.99 * returns
                traj["returns"] = returns
                if rewards_one_episode:
                    # print(rewards_one_episode[:10])
                    traj["step_mean_reward"] = np.mean(rewards_one_episode[-10:])
                else:
                    traj["step_mean_reward"] = 0
            rewards_one_episode.append(returns)
            # print(rewards_one_episode[:10])
            train(policy_net, optimizer, trajectories)
     
    def play():
     
        env = gym.make('CartPole-v1')
        policy_net = PolicyNetwork()
        pygame.init()
        screen = pygame.display.set_mode((600, 400))
        clock = pygame.time.Clock()
     
        state = env.reset()
        done = False
        trajectories = deque()
        state = state[0]
        step = 0
        policy_net = torch.load('policy_net_full.pth')
        while not done:
            state_tensor = torch.tensor(state).float().unsqueeze(0)
            probs = policy_net(state_tensor)
            action = torch.distributions.Categorical(probs).sample().item()
            log_prob = torch.log(probs.squeeze(0)[action])
            next_state, reward, done, _,_ = env.step(action)
     
            # print(episode)
            trajectories.append({"state": state, "action": action, "reward": reward, "log_prob": log_prob})
            state = next_state
     
            for event in pygame.event.get():
                if event.type == pygame.QUIT:
                    pygame.quit()
                    sys.exit()
     
             
            # 绘制环境状态
            screen.fill((255, 255, 255))
            cart_x = int(state[0] * 100 + 300)
            pygame.draw.rect(screen, (0, 0, 255), (cart_x, 300, 50, 30))
            # print(state)
            pygame.draw.line(screen, (255, 0, 0), (cart_x + 25, 300), (cart_x + 25 - int(50 * torch.sin(torch.tensor(state[2]))), 300 - int(50 * torch.cos(torch.tensor(state[2])))), 2)
            pygame.display.flip()
            clock.tick(60)
            step +=1
     
        print(f"运行{step}步后挂了")
     
     
     
    if __name__ == '__main__':
        main() #训练
        # play() #推理

      运行效果,训练过程不是很稳定,有时候学很多轮次也学不明白,有时侯只需要几十次就可以学明白了

     

  • 相关阅读:
    SpringBoot整合Kafka的快速使用教程
    Java 8新特性:DateTime、Lambda、Stream的强大功能解析
    融云AIGC专题:高知识密度与大数据处理双向奔赴的「金融大模型」
    DASCTF X CBCTF 2023|无畏者先行
    空字符串使用Equals方法导致NullPointException
    jquery ajax 全局设置header
    JavaScript 中的一些奇怪问题
    Win11+RTX3060+Anconda+CUDA11.3+cuDNN8.2+Pytorch1.8一条龙服务
    How To Purge The UNIFIED AUDIT TRAIL (Doc ID 1582627.1)
    Lombok安装及Lombok使用
  • 原文地址:https://www.cnblogs.com/LiuXinyu12378/p/18187947