与DP不同的是,在Q-Learning中,我们在一个未知的MDP中做所有的事情,就是无模型的意思。
无模型预测是对未知MDP的价值函数进行估计,无模型控制是对未知MDP的价值函数进行优化。
在线策略学习是指从某策略的经验中学习策略,离线策略学习是指从其他策略的经验中学习策略,如:向教练学习
时序差分学习是蒙特卡洛(MC)思想和动态规划(DP)思想的结合。与MC方法一样,TD方法可以在没有环境动态模型的情况下直接从原始经验中学习。与DP方法一样,TD方法在一定程度上基于其他已知的估计来更新估计,而不像MC那样等待最终结果。
现在是时候引入Q-Learning算法了,它是一种off-policy TD控制算法,是无模型RL的早期突破之一

接下来,我们将在悬崖行走任务中实现Q-learning算法。

这是一个标准的、没有折扣的、关卡任务,有开始和目标状态,以及导致上下左右移动的通常动作。所有奖励为−1除了那些进入“悬崖”区域的过渡。进入这个区域会获得奖励−100,立即返回起始位置。


S=height*length种
A=4种(上、下、左、右)




import time
import random
# 环境类
class Env():
def __init__(self, length, height):
# define the height and length of the map
self.length = length # 12
self.height = height # 4
# define the agent's start position
self.x = 0
self.y = 0
# 动态的用符号渲染地图
def render(self, frames=50):
# 逐行打印
for i in range(self.height):
# 打印悬崖行
if i == 0: # cliff is in the line 0
# 第0行[S, x, x, x, x, x, x, x, x, x, x, T]
line = ['S'] + ['x']*(self.length - 2) + ['T'] # 'S':start, 'T':terminal, 'x':the cliff
# 打印其他三行
else:
line = ['.'] * self.length
# 将agent所在的位置标为o
if self.x == i:
line[self.y] = 'o' # mark the agent's position as 'o'
print(''.join(line))
print('\033['+str(self.height+1)+'A') # printer go back to top-left
time.sleep(1.0 / frames)
def step(self, action):
# 0:向右 1:向左 2:向上 3:向下
change = [[0, 1], [0, -1], [-1, 0], [1, 0]]
# 计算agent移动之后的位置
self.x = min(self.height - 1, max(0, self.x + change[action][0]))
self.y = min(self.length - 1, max(0, self.y + change[action][1]))
# 其实状态就是agent在地图中的位置
states = [self.x, self.y]
reward = -1
terminal = False
if self.x == 0: # if agent is on the cliff line "SxxxxxT"
# 在第0行,而不在起始位置
if self.y > 0: # if agent is not on the start position
# 无论是达到终点,还是摔落悬崖,游戏都终止了!!!
terminal = True
# 摔落悬崖了!!
if self.y != self.length - 1: # if agent falls
reward = -100
return reward, states, terminal
# 重开游戏,让agent回到起点
def reset(self):
self.x = 0
self.y = 0
class Q_table():
def __init__(self, length, height, actions=4, alpha=0.1, gamma=0.9):
# Q表是一维列表
self.table = [0] * actions * length * height # initialize all Q(s,a) to zero
self.actions = actions
self.length = length
self.height = height
self.alpha = alpha # 学习率
self.gamma = gamma # 折扣因子
# 在一维Q表中寻找,对应状态下采取特定action的索引
def _index(self, a, x, y):
"""Return the index of Q([x,y], a) in Q_table."""
return a * self.height * self.length + x * self.length + y
def _epsilon(self):
return 0.1
# version for better convergence:
# """At the beginning epsilon is 0.2, after 300 episodes decades to 0.05, and eventually go to 0."""
# return 20. / (num_episode + 100)
"""
根据epsilon-greedy的策略选择action
"""
def take_action(self, x, y, num_episode):
"""epsilon-greedy action selection"""
# epsilon的概率进行随机选取动作
if random.random() < self._epsilon():
# 随机选择action
return int(random.random() * 4)
else:
# 将四种action对应的Q值存在actions_value中
actions_value = [self.table[self._index(a, x, y)] for a in range(self.actions)]
# 返回当前state下Q值最大对应的action
# index就是在0、1、2、3中选择
return actions_value.index(max(actions_value))
"""
这个函数是,先将当前状态下的四种行为所对应的Q值放到actions_value中
然后在四种行为中选择Q值最大的那个action,最终返回最大Q值
"""
def max_q(self, x, y):
# 后面的for循环是:取四个action,然后通过Q表的索引找到当前状态下,采取行为所对应的Q值
# 最外层的[]是放四个不同的action对应的Q值,所以actions_value总是有四个
# actions_value是一个长度为4的列表
actions_value = [self.table[self._index(a, x, y)] for a in range(self.actions)]
return max(actions_value)
# 更新Q表
def update(self, a, s0, s1, r, is_terminated):
# both s0, s1 have the form [x,y]
q_predict = self.table[self._index(a, s0[0], s0[1])]
# print(q_predict)
if not is_terminated:
q_target = r + self.gamma * self.max_q(s1[0], s1[1])
else:
q_target = r
self.table[self._index(a, s0[0], s0[1])] += self.alpha * (q_target - q_predict)
def cliff_walk():
# 初始化环境
env = Env(length=12, height=4)
# 构造Q表,Q表是一个48 x 4列的矩阵
table = Q_table(length=12, height=4)
# 训练3000个episode
for num_episode in range(3000):
# within the whole learning process
episodic_reward = 0
is_terminated = False
# episode开始时的state是起点位置
s0 = [0, 0]
while not is_terminated:
# within one episode
# 循环内的s0代表当前state
action = table.take_action(s0[0], s0[1], num_episode)
# 选择了action之后的奖励、状态和是否结束游戏
# 也就是走了一步之后,s1代表下一步的state
r, s1, is_terminated = env.step(action)
table.update(action, s0, s1, r, is_terminated)
episodic_reward += r
# 渲染动态过程
# env.render(frames=100)
# 当前位置变为了s1
s0 = s1
if num_episode % 20 == 0:
print("Episode: {}, Score: {}".format(num_episode, episodic_reward))
# 一个episode结束之后,重置环境
env.reset()
if __name__ == "__main__":
cliff_walk()


SARSA算法:我当前时间t下使用epsilon-greedy的策略进行take action。因为还有考虑将来的Q值。将来Q值方面,采取的是:我在当前情况下,我使用epsilon-greedy的策略,往前再走一步,利用Q表,看看当前state下,采取什么action是最优的。然后将这个最优的Q值使用在上述的Q值更新公式中。这种属于,我下一步哪个Q值最大,我根据当前Q表,进行下一步的最优实践,我往前迈一步试试,这就是on-policy的策略!!!
Q-Learning算法:我当前时间t下使用epsilon-greedy的策略进行take action。因为还有考虑将来的Q值。将来Q值方面,采取的是:我在当前情况下,我再次查阅Q表,将我下一步take action的所有情况列出来进行比较。然后将最优的Q值用在Q值更新公式中。这种属于“不实操,我根据当前的情况Q表,去思考,我下一步哪个Q值最大,然后直接将其用在更新公式中”。这叫做off-policy策略!!!


import time
import random
# 环境类
class Env():
def __init__(self, length, height):
# define the height and length of the map
# 初始化地图的宽和高
self.length = length
self.height = height
# define the agent's start position
# 初始化agent的起点
self.x = 0
self.y = 0
def render(self, frames=50):
for i in range(self.height):
if i == 0: # cliff is in the line 0
line = ['S'] + ['x']*(self.length - 2) + ['T'] # 'S':start, 'T':terminal, 'x':the cliff
else:
line = ['.'] * self.length
if self.x == i:
line[self.y] = 'o' # mark the agent's position as 'o'
print(''.join(line))
print('\033['+str(self.height+1)+'A') # printer go back to top-left
time.sleep(1.0 / frames)
# 走动
def step(self, action):
# 根据采取的action之后,改变agent在地图中的状态(位置)
change = [[0, 1], [0, -1], [-1, 0], [1, 0]]
self.x = min(self.height - 1, max(0, self.x + change[action][0]))
self.y = min(self.length - 1, max(0, self.y + change[action][1]))
states = [self.x, self.y]
reward = -1
terminal = False
if self.x == 0: # if agent is on the cliff line "SxxxxxT"
if self.y > 0: # if agent is not on the start position
terminal = True
if self.y != self.length - 1: # if agent falls
reward = -100
return reward, states, terminal
# 重置agent的位置
def reset(self):
self.x = 0
self.y = 0
class Q_table():
def __init__(self, length, height, actions=4, alpha=0.1, gamma=0.9):
# 初始化Q表,其实时一个一维Q表,4 * 12 * 4 = 192
self.table = [0] * actions * length * height # initialize all Q(s,a) to zero
self.actions = actions
self.length = length
self.height = height
self.alpha = alpha
self.gamma = gamma
# 根据当前状态S(位置)和行为A,找到Q表中对应的索引
def _index(self, a, x, y):
"""Return the index of Q([x,y], a) in Q_table."""
return a * self.height * self.length + x * self.length + y
# epsilon还是0.1
def _epsilon(self):
return 0.1
# version for better convergence:
# """At the beginning epsilon is 0.2, after 300 episodes decades to 0.05, and eventually go to 0."""
# return 20. / (num_episode + 100)
# 使用epsilon-greedy策略去take action
def take_action(self, x, y, num_episode):
"""epsilon-greedy action selection"""
if random.random() < self._epsilon():
return int(random.random() * 4)
else:
actions_value = [self.table[self._index(a, x, y)] for a in range(self.actions)]
return actions_value.index(max(actions_value))
# on-policy策略,SARSA算法的核心
def epsilon_q(self, x, y):
actions_value = [self.table[self._index(a, x, y)] for a in range(self.actions)]
return max(actions_value) if random.random() > self._epsilon() else actions_value[int(random.random() * 4)]
def update(self, a, s0, s1, r, is_terminated):
# both s0, s1 have the form [x,y]
q_predict = self.table[self._index(a, s0[0], s0[1])]
if not is_terminated:
# 和Q-Learning算法更新Q值的不同之处
q_target = r + self.gamma * self.epsilon_q(s1[0], s1[1])
else:
q_target = r
self.table[self._index(a, s0[0], s0[1])] += self.alpha * (q_target - q_predict)
def cliff_walk():
env = Env(length=12, height=4)
table = Q_table(length=12, height=4)
for num_episode in range(3000):
# within the whole learning process
episodic_reward = 0
is_terminated = False
s0 = [0, 0]
while not is_terminated:
# within one episode
action = table.take_action(s0[0], s0[1], num_episode)
r, s1, is_terminated = env.step(action)
table.update(action, s0, s1, r, is_terminated)
episodic_reward += r
# env.render(frames=100)
s0 = s1
if num_episode % 20 == 0:
print("Episode: {}, Score: {}".format(num_episode, episodic_reward))
env.reset()
if __name__ == "__main__":
cliff_walk()

SARSA算法和Q-Learning算法的区别之处主要在Q值更新公式上——未来Q值的计算。
从实验结果来看,SARSA算法最终的收敛结果reward小于Q-Learning算法,维持在-15~-19左右,但是更加“稳健”。Q-Learing算法最终收敛的reward更大一些,可以达到最好的效果-13。但是出现-100多的情况也更多,比较“激进”。