需要源码请点赞关注收藏后评论区留言~~~
在DDPG算法基础上,TD3算法的主要目的在于解决AC框架中,由函数逼近引入的偏差和方差问题。一方面,由于方差会引起过高估计,为解决过高估计问题,TD3将截断式双Q学习(clipped Double Q-Learning)应用于AC框架;另一方面,高方差会引起误差累积,为解决误差累积问题,TD3分别采用延迟策略更新和添加噪声平滑目标策略两种技巧。
从策略梯度方法已知,基于PG的强化学习存在过高估计问题,但由于DDPG评论家的目标值不是取最优动作值函数的,所以不存在最大化操作。此时,将Double DQN思想直接用于DDPG的评论家,构造如下目标函数:
该算法相比于原算法的区别仅在于多了一个和原评论家Qw1Qw1同步更新的辅助评论家Qw2Qw2,在更新目标值y时取最小值。不过这一修改仍然会让人疑惑,Qw1Qw1和Qw2Qw2只有初始参数不同,后面的更新都一样,这样形成的两个类似的评论家能否有效消除TD误差带来的偏置估计。
在函数逼近问题中,TD(0)算法的过高估计问题会进一步加剧,每次更新都会产生一定量的TD误差δ(s,a)δ(s,a):
由此可见,估计的方差与未来奖励、未来TD误差的方差成正比。当折扣因子γγ较大时,每次更新都可以引起方差的快速提升,所以通常TD3设置较小的折扣系数γγ。
TD3目标网络的更新方式与DDPG相同,都采用软更新,尽管软更新比硬更新更有利于算法的稳定性,但AC算法依然会失败,其原因通常在于行动者和评论家的更新是相互作用的结果:评论家提供的值函数估计值不准确,就会使行动者将策略往错误方向改进;行动者产生了较差的策略,就会进一步加剧评论家误差累积问题,两者不断作用产生恶性循环。
为解决以上问题,TD3考虑对策略进行延时更新,减少行动者的更新频率,尽可能等待评论家训练收敛后再进行更新操作。延时更新操作可以有效减少累积误差,从而降低方差;同时,也能减少不必要的重复更新操作,一定程度上提升效率。在实际应用时,TD3采取的操作是每隔评论家更新dd次后,再对行动者进行更新。
上节中通过延时更新策略来减小误差累积,接下来考虑误差本身。首先,误差的根源是值函数逼近所产生的偏差,在机器学习中,消除估计偏差的常用方法就是对参数更新进行正则化,同样的,这一思想也可以应用在强化学习中。
一个很自然的想法是,相似的动作应该拥有相似的价值,动作空间中目标动作周围的一小片区域的价值若能足够平滑,就可以有效减少误差的产生。TD3的具体做法是,为目标动作添加截断噪声:
实验环境:OpenAI Gym工具包中的MuIoCo环境,用了其中四个连续控制任务,包括Ant,HalfCheetah,Walker2d,Hopper
每次训练 均运行1000000步,并每取5000步作为一个训练阶段,每个训练阶段结束,对所学策略进行测试评估 与环境交互十个情节并取平均返回值
结果如下图
可以发现在Ant和Walker2d任务中TD3由于采用了Clipped Double Q-Learning机制 较好的缓解了高估问题 减少了由于高估问题导致的不良状态对于策略更新乃至后续训练的不良影响,动作值逼近相对更为准确,因而相对DDPG而言,不容易陷入局部最优,Agent与环境交互所获得的回报,相比较会大幅提升,总而言之,与DDPG相比,TD3算法训练各阶段波动性更小,算法整体更加稳定


部分源码如下
-
- import numpy as np
- import torch
- import gym
- import os
- import copy
- import numpy as np
- import torch
- import torch.nn as nn
- import torch.nn.functional as F
-
- device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
-
- class ReplayBuffer(object):
- def __init_
- self.ptr = 0
- self.size = 0
-
- self.state = np.zeros((max_size, state_dim))
- self.action = np.zeros((max_size, action_dim))
- self.next_state = np.zeros((max_size, state_dim))
- self.reward = np.zeros((max_size, 1))
- self.not_done = np.zeros((max_size, 1))
-
- self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
-
- def add(self, state, action, next_state, reward, done):
- self.state[self.ptr] = state
- self.action[self.ptr] = action
- self.next_state[self.ptr] = next_state
- self.reward[self.ptr] = reward
- self.not_done[self.ptr] = 1. - done
-
- self.ptr = (self.ptr + 1) % self.max_size
- self.size = min(self.size + 1, self.max_size)
-
- def sample(self, batch_size):
- ind = np.random.randint(0, self.size, size=batch_size)
-
- return (
- torch.FloatTensor(self.state[ind]).to(self.device),
- torch.FloatTensor(self.action[ind]).to(self.device),
- torch.FloatTensor(self.next_state[ind]).to(self.device),
- torch.FloatTensor(self.reward[ind]).to(self.device),
- torch.FloatTensor(self.not_done[ind]).to(self.device)
- )
-
- class Actor(nn.Module):
- def __init__(self, state_dim, action_dim, max_action):
- super(Actor, self).__init__()
-
- self.l1 = nn.Linear(state_dim, 256)
- self.l2 = nn.Linear(256, 256)
- self.l3 = nn.Linear(256, action_dim)
-
- self.max_action = max_action
-
- def forward(self, state):
- a = F.relu(self.l1(state))
- a = F.relu(self.l2(a))
- return self.max_action * torch.tanh(self.l3(a))
-
- class Critic(nn.Module):
- def __init__(self, state_dim, action_dim):
- super(Critic, self).__init__()
-
- # Q1 architecture
- self.l1 = nn.Linear(state_dim + action_dim, 256)
- self.l2 = nn.Linear(256, 256)
- self.l3 = nn.Linear(256, 1)
-
- # Q2 architecture
- self.l4 = nn.Linear(state_dim + action_dim, 256)
- self.l5 = nn.Linear(256, 256)
- self.l6 = nn.Linear(256, 1)
-
- def forward(self, state, action):
- sa = torch.cat([state, action], 1)
-
- q1 = F.relu(self.l1(sa))
- q1 = F.relu(self.l2(q1))
- q1 = self.l3(q1)
-
- q2 = F.relu(self.l4(sa))
- q2 = F.relu(self.l5(q2))
- q2 = self.l6(q2)
- return q1, q2
-
- def Q1(self, state, action):
- sa = torch.cat([state, action], 1)
-
- q1 = F.relu(self.l1(sa))
- q1 = F.relu(self.l2(q1))
- q1 = self.l3(q1)
- return q1
-
- actor1=Actor(17,6,1.0)
- for ch in actor1.children():
- print(ch)
- print("*********************")
- critic1=Critic(17,6)
- for ch in critic1.children():
- print(ch)
-
- class TD3(object):
- def __init__(
- self,
- state_dim,
- action_dim,
- max_action,
- discount=0.99,
- tau=0.005,
- policy_noise=0.2,
- noise_clip=0.5,
- policy_freq=2
- ):
-
- self.actor = Actor(state_dim, action_dim, max_action).to(device)
- self.actor_target = copy.deepcopy(self.actor)
- self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=3e-4)
-
- self.critic = Critic(state_dim, action_dim).to(device)
- self.critic_target = copy.deepcopy(self.critic)
- self.critic_optimizer = torch.optim.Adam(self.critic.parameters(), lr=3e-4)
-
- self.max_action = max_action
- self.discount = discount
- self.tau = tau
- self.policy_noise = policy_noise
- self.noise_clip = noise_clip
- self.policy_freq = policy_freq
-
- self.total_it = 0
-
-
- def select_action(self, state):
- state = torch.FloatTensor(state.reshape(1, -1)).to(device)
- return self.actor(state).cpu().data.numpy().flatten()
-
-
- def train(self, replay_buffer, batch_size=100):
- self.total_it += 1
-
- # Sample replay buffer
- state, action, next_state, reward, not_done = replay_buffer.sample(batch_size)
-
- with torch.no_grad():
- # Select action according to policy and add clipped noise
- noise = (
- torch.randn_like(action) * self.policy_noise
- ).clamp(-self.noise_clip, self.noise_clip)
-
- next_action = (
- self.actor_target(next_state) + noise
- ).clamp(-self.max_action, self.max_action)
-
- # Compute the target Q value
- target_Q1, target_Q2 = self.critic_target(next_state, next_action)
- target_Q = torch.min(target_Q1, target_Q2)
- target_Q = reward + not_done * self.discount * target_Q
-
- # Get current Q estimates
- current_Q1, current_Q2 = self.critic(state, action)
-
- # Compute critic loss
- critic_loss = F.mse_loss(current_Q1, target_Q) + F.mse_loss(current_Q2, target_Q)
-
- # Optimize the critic
- self.critic_optimizer.zero_grad()
- critic_loss.backward()
- self.critic_optimizer.step()
-
- # Delayed policy updates
- if self.total_it % self.policy_freq == 0:
-
- # Compute actor losse
- actor_loss = -self.critic.Q1(state, self.actor(state)).mean()
-
- # Optimize the actor
- self.actor_optimizer.zero_grad()
- actor_loss.backward()
- self.actor_optimizer.step()
-
- # Update the frozen target models
- for param, target_param in zip(self.critic.parameters(), self.critic_target.parameters()):
- target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)
-
- for param, target_param in zip(self.actor.parameters(), self.actor_target.parameters()):
- target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)
-
-
- def save(self, filename):
- torch.save(self.critic.state_dict(), filename + "_critic")
- torch.save(self.critic_optimizer.state_dict(), filename + "_critic_optimizer")
-
- torch.save(self.actor.state_dict(), filename + "_actor")
- torch.save(self.actor_optimizer.state_dict(), filename + "_actor_optimizer")
-
-
- def load(self, filename):
- self.critic.load_state_dict(torch.load(filename + "_critic"))
- self.critic_optimizer.load_state_dict(torch.load(filename + "_critic_optimizer"))
- self.critic_target = copy.deepcopy(self.critic)
-
- self.actor.load_state_dict(torch.load(filename + "_actor"))
- self.actor_optimizer.load_state_dict(torch.load(filename + "_actor_optimizer"))
- self.actor_target = copy.deepcopy(self.actor)
-
- # Runs policy for X episodes and returns average reward
- # A fixed seed is used for the eval environment
- def eval_policy(policy, env_name, seed, eval_episodes=10):
- eval_env = gym.make(env_name)
- eval_env.seed(seed + 100)
-
- avg_reward = 0.
- for _ in range(eval_episodes):
- state, done = eval_env.reset(), False
- while not done:
- action = policy.select_action(np.array(state))
- state, reward, done, _ = eval_env.step(action)
- avg_reward += reward
-
- avg_reward /= eval_episodes
-
- print("---------------------------------------")
- print(f"Evaluation over {eval_episodes} episodes: {avg_reward:.3f}")
- print("---------------------------------------")
- return avg_reward
-
-
- policy = "TD3"
- env_name = "Walker2d-v4" # OpenAI gym environment name
- seed = 0 # Sets Gym, PyTorch and Numpy seeds
- start_timesteps = 25e3 # Time steps initial random policy is used
- eval_freq = 5e3 # How often (time steps) we evaluate
- max_timesteps = 1e6 # Max time steps to run environment
- expl_noise = 0.1 # Std of Gaussian exploration noise
- batch_size = 256 # Batch size for both actor and critic
- discount = 0.99 # Discount factor
- tau = 0.005 # Target network update rate
- policy_noise = 0.2 # Noise added to target policy during critic update
- noise_clip = 0.5 # Range to clip target policy noise
- policy_freq = 2 # Frequency of delayed policy updates
- save_model = "store_true" # Save model and optimizer parameters
- load_model = "" # Model load file name, "" doesn't load, "default" uses file_name
- file_name = f"{policy}_{env_name}_{seed}"
- print("---------------------------------------")
- print(f"Policy: {policy}, Env: {env_name}, Seed: {seed}")
- print("---------------------------------------")
- if not os.path.exists("./results"):
- os.makedirs("./results")
- if save_model and not os.path.exists("./models"):
- os.makedirs("./models")
- env = gym.make(env_name)
- # Set seeds
- env.seed(seed)
- torch.manual_seed(seed)
- np.random.seed(seed)
- state_dim = env.observation_space.shape[0]
- action_dim = env.action_space.shape[0]
- max_action = float(env.action_space.high[0])
- kwargs = {
- "state_dim": state_dim,
- "action_dim": action_dim,
- "max_action": max_action,
- "discount": discount,
- "tau": tau,
- "policy_noise": policy_noise * max_action,
- "noise_clip": noise_clip * max_action,
- "policy_freq": policy_freq
- }
- policy = TD3(**kwargs)
- if load_model != "":
- policy_file = file_name if load_model == "default" else load_model
- policy.load(f"./models/{policy_file}")
- replay_buffer = ReplayBuffer(state_dim, action_dim)
- # Evaluate untrained policy
- evaluations = [eval_policy(policy, env_name, seed)]
- state, done = env.reset(), False
- episode_reward = 0
- episode_timesteps = 0
- episode_num = 0
- for t in range(int(max_timesteps)):
- episode_timesteps += 1
- # Select action randomly or according to policy
- if t < start_timesteps:
- action = env.action_space.sample()
- else:
- action = (
- policy.select_action(np.array(state))
- + np.random.normal(0, max_action * expl_noise, size=action_dim)
- ).clip(-max_action, max_action)
- l = float(done) if episode_timesteps < env._max_episode_steps else 0
- # Store data in replay buffer
- replay_buffer.add(state, action, next_state, reward, done_bool)
- state = next_state
- episode_reward += reward
- # Train agent after collecting sufficient data
- if t >= start_timesteps:
- policy.train(replay_buffer, batch_size)
- if done:
- end(eval_policy(policy, env_name, seed))
- np.save(f"./results/{file_name}", evaluations)
- if save_model:
- policy.save(f"./models/{file_name}")
- state_dim
创作不易 觉得有帮助请点赞关注收藏~~~