• tensorflow实现强化学习DDPG算法


    基于tf2.x实现DDPG

    1. 相关依赖库

    • tensorflow==2.7.0
    • gym==0.24.0
    • python==3.8.0
    • 测试运行环境——windows/Mac

    2. DDPG代码实现

    import tensorflow as tf
    from tensorflow import keras
    import numpy as np
    
    
    class DDPG(keras.Model):
        def __init__(self, a_dim, s_dim, a_bound, batch_size=32, tau=0.002, gamma=0.95,
                     a_lr=0.0001, c_lr=0.001, memory_capacity=9000):
            super().__init__()
            self.batch_size = batch_size  # 批量数据
            self.tau = tau   # 滑动平均参数
            self.gamma = gamma   # 回报折扣系数
            self.a_lr = a_lr  # actor学习率
            self.c_lr = c_lr  # critic学习率
            self.memory_capacity = memory_capacity  # 记忆库大小
            self.memory = np.zeros((memory_capacity, s_dim * 2 + a_dim + 1), dtype=np.float32)
            self.pointer = 0  # 记忆库初始大小为0
            self.memory_full = False  # 记忆库是否已经满
    
            self.a_dim, self.s_dim, self.a_bound = a_dim, s_dim, a_bound[1]  # 动作维度, 状态维度, 动作值的上限
    
            s = keras.Input(shape=(s_dim,))     # current state
            s_ = keras.Input(shape=(s_dim,))    # next state
            self.actor = self._build_actor(trainable=True, name="a/eval")  # 策略网络
            self.actor_ = self._build_actor(trainable=False, name="a/target")  # 目标策略网络
            self.actor_.set_weights(self.actor.get_weights())
            self.critic = self._build_critic(trainable=True, name="d/eval")  # Q网络
            self.critic_ = self._build_critic(trainable=False, name="d/target")  # 目标Q网络
            self.critic_.set_weights(self.critic.get_weights())
            self.a_opt = keras.optimizers.Adam(self.a_lr)  # 优化器
            self.c_opt = keras.optimizers.Adam(self.c_lr)  # 优化器
            self.mse = keras.losses.MeanSquaredError()  # 均方差损失函数
    
        def _build_actor(self, trainable, name):  # 设计策略网络
            data = keras.Input(shape=(self.s_dim,))
            x = keras.layers.Dense(30, activation="relu", trainable=trainable)(data)
            x = keras.layers.Dense(30, activation="relu", trainable=trainable)(x)
            x = keras.layers.Dense(self.a_dim, trainable=trainable)(x)
            a = self.a_bound * tf.math.tanh(x)
            model = keras.Model(data, a, name=name)
            return model
    
        def _build_critic(self, trainable, name):  # 设计评价网络
            data = keras.Input(shape=(self.a_dim + self.s_dim,))
            x = keras.layers.Dense(30, activation="relu", trainable=trainable)(data)
            x = keras.layers.Dense(30, activation="relu", trainable=trainable)(x)
            q = keras.layers.Dense(1, trainable=trainable)(x)
            model = keras.Model(data, q, name=name)
            return model
    
        def param_replace(self):  # 参数更新
            actor_weights = self.actor.get_weights()
            critic_weights = self.critic.get_weights()
            actor_target_weights = self.actor_.get_weights()
            critic_target_weights = self.critic_.get_weights()
            for i in range(len(actor_target_weights)):
                actor_target_weights[i] = actor_target_weights[i] * (1 - self.tau) + self.tau * actor_weights[i]
            for i in range(len(critic_target_weights)):
                critic_target_weights[i] = critic_target_weights[i] * (1 - self.tau) + self.tau * critic_weights[i]
            self.actor_.set_weights(actor_target_weights)
            self.critic_.set_weights(critic_target_weights)
    
        def act(self, s):  # 根据当前状态s执行动作
            a = self.actor.predict(np.reshape(s, (-1, self.s_dim)), verbose=0)[0]  # 使用策略网络
            return a
    
        def sample_memory(self):  # 从记忆库中采样数据
            indices = np.random.choice(self.memory_capacity, size=self.batch_size)
            bt = self.memory[indices, :]  # 获取批次数据
            bs = bt[:, :self.s_dim]
            ba = bt[:, self.s_dim: self.s_dim + self.a_dim]
            br = bt[:, -self.s_dim - 1: -self.s_dim]
            bs_ = bt[:, -self.s_dim:]
            return bs, ba, br, bs_
    
        def learn(self):  # 训练策略网络和Q网络
            bs, ba, br, bs_ = self.sample_memory()
            with tf.GradientTape() as tape:  更新策略网络
                a = self.actor(bs)  # 获取策略网络执行的动作
                q = self.critic(tf.concat([bs, a], 1))
                actor_loss = tf.reduce_mean(-q)  # 最大化价值函数值Q等于最小化-Q
            grads = tape.gradient(actor_loss, self.actor.trainable_variables)  # 仅更新策略网络参数
            self.a_opt.apply_gradients(zip(grads, self.actor.trainable_variables))
    
            with tf.GradientTape() as tape:  # 更新价值网络
                a_ = self.actor_(bs_)  # 目标策略网络根据下一状态决定下一动作
                q_ = br + self.gamma * self.critic_(tf.concat([bs_, a_], 1))
                q = self.critic(tf.concat([bs, ba], 1))
                critic_loss = self.mse(q_, q)  # 均方差损失函数
            grads = tape.gradient(critic_loss, self.critic.trainable_variables)  # 仅更新价值网络参数
            self.c_opt.apply_gradients(zip(grads, self.critic.trainable_variables))
            return actor_loss.numpy(), critic_loss.numpy()
    
        def store_transition(self, s, a, r, s_):  # 保存数据到记忆库中
            transition = np.hstack((s, a, [r], s_))
            index = self.pointer % self.memory_capacity
            self.memory[index, :] = transition
            self.pointer += 1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98

    3. gym环境测试

    import gym
    import numpy as np
    model = DDPG(a_dim=1, s_dim=3, a_bound=[-2, 2], batch_size=128, tau=0.01, gamma=0.9, memory_capacity=10000)
    
    env = gym.make('Pendulum-v1')
    RENDER = False  # 是否渲染环境
    env = env.unwrapped  # 取消限制
    max_ep_step = 200
    
    var = 3
    for ep in range(200):
        s = env.reset(seed=1)  # 设置随机种子
        ep_reward = 0
        for step in range(max_ep_step):
            if RENDER:
                env.render()
            # 增加探索时的噪音
            a = model.act(s)
            a = np.clip(np.random.normal(a, var), -2, 2)    # 为行动选择添加随机性进行探索,action超过[-2,2]时做截断处理
            s_, r, done, info = env.step(a)
    
            # 将当前的状态,行为,回报,下一个状态存储到记忆库中
            model.store_transition(s, a, r/10, s_)
    
            # 达到记忆库容量的最大值
            if model.pointer > 10000:
                var *= .9995  # 衰减动作随机性
                model.learn()  # 开始学习
                model.param_replace()  # 参数更新
    
            s = s_
            ep_reward += r
            if step == max_ep_step -1:
                print('Episode:', ep, ' Reward: %i' % int(ep_reward), 'Explore: %.2f' % var, )
                if ep_reward > -300:  # 达到回合最大值且回合回报值大于-300,渲染环境
                    RENDER = True
                break
    
    env.close()  # 关闭渲染窗口
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    基于tf1.x实现DDPG

    1. 相关依赖库

    • tensorflow==1.7.0
    • gym==0.25.0
    • python==3.6.0
    • 测试运行环境——Windows

    2. DDPG代码实现

    import tensorflow as tf
    import numpy as np
    
    LR_A = 0.001  # 演员网络学习率
    LR_C = 0.001  # 评论家网络学习率
    GAMMA = 0.9  # 回报的折扣因子
    TAU = 0.01  # 滑动平均数值
    MEMORY_CAPACITY = 10000  # 记忆库大小
    BATCH_SIZE = 32  # 训练批次
    
    class DDPG(object):
        def __init__(self, a_dim, s_dim, a_bound):
            self.memory = np.zeros((MEMORY_CAPACITY, s_dim * 2 + a_dim + 1), dtype=np.float32)  # 设置记忆库存储的结构
            self.pointer = 0    # 记忆库当前容量
            self.sess = tf.Session()    # tf session会话
            # self.a_replace_counter, self.c_replace_counter = 0, 0   # 演员网络替换次数,评论家网络替换次数
    
            self.a_dim, self.s_dim, self.a_bound = a_dim, s_dim, a_bound,  # 动作空间,状态空间,动作范围
            self.S = tf.placeholder(tf.float32, [None, s_dim], 's')  # 当前状态预留占位符
            self.S_ = tf.placeholder(tf.float32, [None, s_dim], 's_')  # 下一状态预留占位符
            self.R = tf.placeholder(tf.float32, [None, 1], 'r')  # 回报预留占位符
    
            with tf.variable_scope('Actor'):
                self.a = self._build_a(self.S, scope='eval', trainable=True)  # 演员-评估网络根据当前状态输出行为
                a_ = self._build_a(self.S_, scope='target', trainable=False)   # 演员-目标网络根据下一个状态输出行为
            with tf.variable_scope('Critic'):
                # 当为td_error计算q时,在内存中分配self.a = a,否则当更新Actor时self.a来自Actor
                q = self._build_c(self.S, self.a, scope='eval', trainable=True)  # 根据当前状态和来自演员-评估网络的行为,计算q值
                q_ = self._build_c(self.S_, a_, scope='target', trainable=False)  # 根据下一状态和来自演员-目标网络的行为,计算q_值
    
            # 网络参数
            self.ae_params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope='Actor/eval')
            self.at_params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope='Actor/target')
            self.ce_params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope='Critic/eval')
            self.ct_params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope='Critic/target')
    
            # 目标网络参数替换(简单替换)
            self.soft_replace = [[tf.assign(ta, (1 - TAU) * ta + TAU * ea), tf.assign(tc, (1 - TAU) * tc + TAU * ec)]
                                 for ta, ea, tc, ec in zip(self.at_params, self.ae_params, self.ct_params, self.ce_params)]
    
            q_target = self.R + GAMMA * q_
            # 在td_error的feed_dic中,self.a应该更改为内存中的行为
            td_error = tf.losses.mean_squared_error(labels=q_target, predictions=q)
            self.ctrain = tf.train.AdamOptimizer(LR_C).minimize(td_error, var_list=self.ce_params)  # 根据td_error进行更新评论家网络
    
            # 根据为代码进行修改: 相当于dq / da * da / dparams,即可以直接对q求梯度即可
            self.policy_grads = tf.gradients(ys=self.a, xs=self.ae_params, grad_ys=tf.gradients(q, self.a)[0])
            self.atrain = tf.train.AdamOptimizer(-LR_A).apply_gradients(zip(self.policy_grads, self.ae_params))
    
            self.sess.run(tf.global_variables_initializer())
    
        def choose_action(self, s):
            return self.sess.run(self.a, {self.S: s[np.newaxis, :]})[0]
    
        def learn(self):
            # 简单目标网络参数的替换
            self.sess.run(self.soft_replace)
    
            # 随机选择记忆库中BATCH_SIZE各数据进行更新
            indices = np.random.choice(MEMORY_CAPACITY, size=BATCH_SIZE)
            bt = self.memory[indices, :]
            bs = bt[:, :self.s_dim]
            ba = bt[:, self.s_dim: self.s_dim + self.a_dim]
            br = bt[:, -self.s_dim - 1: -self.s_dim]
            bs_ = bt[:, -self.s_dim:]
    
            # 演员网络更新
            self.sess.run(self.atrain, {self.S: bs})
            # 评论家网络更新
            self.sess.run(self.ctrain, {self.S: bs, self.a: ba, self.R: br, self.S_: bs_})
    
        def store_transition(self, s, a, r, s_):
            transition = np.hstack((s, a, [r], s_))
            index = self.pointer % MEMORY_CAPACITY  # 使用新的记忆替换旧的
            self.memory[index, :] = transition
            self.pointer += 1
    
        def _build_a(self, s, scope, trainable):
            with tf.variable_scope(scope):
                net = tf.layers.dense(s, 30, activation=tf.nn.relu, name='l1', trainable=trainable)
                a = tf.layers.dense(net, self.a_dim, activation=tf.nn.tanh, name='a', trainable=trainable)
                return tf.multiply(a, self.a_bound, name='scaled_a')
    
        def _build_c(self, s, a, scope, trainable):
            with tf.variable_scope(scope):
                n_l1 = 30
                w1_s = tf.get_variable('w1_s', [self.s_dim, n_l1], trainable=trainable)
                w1_a = tf.get_variable('w1_a', [self.a_dim, n_l1], trainable=trainable)
                b1 = tf.get_variable('b1', [1, n_l1], trainable=trainable)
                net = tf.nn.relu(tf.matmul(s, w1_s) + tf.matmul(a, w1_a) + b1)
                return tf.layers.dense(net, 1, trainable=trainable)  # Q(s,a)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91

    3. gym环境测试

    import numpy as np
    import gym
    from RL_brain import DDPG
    
    MAX_EPISODES = 200  # 最大回合数
    MAX_EP_STEPS = 200  # 每个回合最大步数
    MEMORY_CAPACITY = 10000 # 记忆库容量
    
    RENDER = False  # 是否渲染环境
    ENV_NAME = 'Pendulum-v1'    # 环境名称
    
    env = gym.make(ENV_NAME)    # 加载环境
    env = env.unwrapped # 取消限制
    env.seed(1) # 设置种子
    
    s_dim = env.observation_space.shape[0]  # 状态空间
    a_dim = env.action_space.shape[0]   # 行为空间
    a_bound = env.action_space.high # 行为值上限
    
    ddpg = DDPG(a_dim, s_dim, a_bound)  # 创建DDPG决策类
    
    var = 3  # 控制探索
    for i in range(MAX_EPISODES):
        s = env.reset()
        ep_reward = 0
        for j in range(MAX_EP_STEPS):
            if RENDER:
                env.render()
    
            # 增加探索时的噪音
            a = ddpg.choose_action(s)
            a = np.clip(np.random.normal(a, var), -2, 2)    # 为行动选择添加随机性进行探索
            s_, r, done, info, _ = env.step(a)
    
            # 将当前的状态,行为,回报,下一个状态存储到记忆库中
            ddpg.store_transition(s, a, r / 10, s_)
    
            # 达到记忆库容量的最大值
            if ddpg.pointer > MEMORY_CAPACITY:
                var *= .9995    # 衰减动作随机性
                ddpg.learn()    # 开始学习
    
            s = s_
            ep_reward += r
            if j == MAX_EP_STEPS-1:
                print('Episode:', i, ' Reward: %i' % int(ep_reward), 'Explore: %.2f' % var, )
                if ep_reward > -300:  # 达到回合最大值且回合回报值大于-300,渲染环境
                    RENDER = True
                break
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49

    参考资料

    • tf1.x代码实现 —— 书籍《强化学习》,邹伟,鬲玲,刘昱杓著,清华大学出版社;
    • tf2.x代码实现 —— morvan强化学习机械臂,gitee链接:https://gitee.com/TIPE/robotArm
  • 相关阅读:
    企业架构LNMP学习笔记11
    19.Feign 的工程化实例:eureka,ribbon,feign,hystrix(springcloud)
    【Spring Boot】详解restful api
    BLEMotion-Kit 开发环境搭建&评估板程序下载
    一些动态几何问题的流式算法
    【zookeeper】报错整理 zookeeper Packet len* is out of range
    全栈开发性能优化基础第四单元日考技能
    golang 的 net/http 和 net/rpc 的区别, rpc 效率比 http 高?
    C# 将HTML转为XML
    php+mysql汽车配件管理系统wamp
  • 原文地址:https://blog.csdn.net/qq_37388085/article/details/126135541