• 强化深度学习中使用Dyna-Q算法确定机器人问题中不同规划的学习和策略实战(超详细 附源码)


    需要源码请点赞关注收藏后评论区留下QQ并且私信~~~

    一、模型、学习、规划简介

    1:模型

    Agent可以通过模型来预测环境并做出反应,这里所说的模型通常指模拟模型,即在给定一个状态和动作时,通过模型可以对下一状态和奖赏做出预测

    模型通常可以分为分布模型和样本模型两种类型

    分布模型:该模型可以生成所有可能的结果及其对应的概率分布

    样本模型:该模型能够从所有可能的情况中产生一个确定的结果

    从功能上讲,模型是用于模拟环境和产生模拟经验的。与样本模型相比,分布模型包含更多的信息,只是现实任务中难以获得所有的状态转移概率

    2:学习

    学习过程是从环境产生的真实经验中进行学习,根据经验的使用方法,学习过程可以分为直接强化学习和模型学习两种类型

    直接强化学习:在真实环境中采集真实经验,根据真实经验直接更新值函数或策略,不受模型偏差的影响

    模型学习:在真实环境中采集真实经验,根据真实经验来构建和改进模拟模型,提供模拟模型精度,使其更接近真实环境

    3:规划

    规划过程是基于模拟环境或经验模型,从模拟经验中更新值函数,实现改进策略的目的,学习和规划的核心都是通过回溯操作来评估值函数,不同之处在于:在规划过程中,Agent并没有与真实环境交互

    规划通常可分为状态空间规划和方案空间规划,状态空间规划是在状态空间中寻找最优策略,值函数的计算都是基于状态的,通常该规划方法视为搜索方法,其基本思想如下

    1:所有规划算法都以计算值函数作为策略改进的中间关键步骤

    2:所有规划算法都可以通过基于模型产生的模拟经验来计算值函数

     二、Dyna-Q结构及其算法

    Dyna-Q架构包含了在线规划Agent所需要的主要功能,该架构讲学习和规划有机地结合在一起,是有模型和无模型方法的融合,其数据来源包括基于真实环境采样的真实经验以及基于模拟模型采样的模拟经验,通过直接强化学习或间接强化学习来更新值函数或者策略

    架构图如下

     三、Dyna-Q不同规划对学习步数的影响

    机器人环境搭建以及背景可点击如下链接了解

    机器人环境

    此处比较不同规划步数对实验效果的影响,当机器人离开边界或者撞到障碍物则得到-10的奖赏,到达充电桩获得+1的奖赏,其他情况奖赏均为0,不同需要不同情节数,可视化结果如下

     

     代码如下

    1. import gym
    2. from gym import spaces
    3. from gym.utils import seeding
    4. from random import random, choice
    5. import matplotlib.pyplot as plt
    6. from matplotlib.font_manager import FontProperties
    7. class Grid(object):
    8. def __init__(self, x:int = None,
    9. y:int = None,
    10. type:int = 0,
    11. reward:float = 0.0):
    12. self.x = x # 坐标x
    13. self.y = y
    14. self.type = type # 类别值(0:空;1:障碍或边界)
    15. self.reward = reward # 该格子的即时奖励
    16. self.name = None # 该格子的名称
    17. self._update_name()
    18. def _update_name(self):
    19. self.name = "X{0}-Y{1}".format(self.x, self.y)
    20. def __str__(self):
    21. return "name:{3}, x:{0}, y:{1}, type:{2}".format(self.x,
    22. self.y,
    23. self.type,
    24. self.name
    25. )
    26. class GridMatrix(object):
    27. def __init__(self, n_width:int, # 水平方向格子数
    28. n_height:int, # 竖直方向格子数
    29. default_type:int = 0, # 默认类型
    30. default_reward:float = 0.0, # 默认即时奖励值
    31. ):
    32. self.grids = None
    33. self.n_height = n_height
    34. self.n_width = n_width
    35. self.len = n_width * n_height
    36. self.default_reward = default_reward
    37. self.default_type = default_type
    38. self.reset()
    39. def reset(self):
    40. self.grids = []
    41. for x in range(self.n_height):
    42. for y in range(self.n_width):
    43. self.grids.append(Grid(x,
    44. y,
    45. self.default_type,
    46. self.default_reward))
    47. def get_grid(self, x, y=None):
    48. '''获取一个格子信息
    49. args:坐标信息,由x,y表示或仅有一个类型为tuple的x表示
    50. return:grid object
    51. '''
    52. xx, yy = None, None
    53. if isinstance(x, int):
    54. xx, yy = x, y
    55. elif isinstance(x, tuple):
    56. xx, yy = x[0], x[1]
    57. assert(xx >= 0 and yy >= 0 and xx < self.n_width and yy < self.n_height), "任意坐标值应在合理区间"
    58. index = yy * self.n_width + xx
    59. return self.grids[index]
    60. def set_reward(self, x, y, reward):
    61. grid = self.get_grid(x, y)
    62. if grid is not None:
    63. grid.reward = reward
    64. else:
    65. raise("grid doesn't exist")
    66. def set_type(self, x, y, type):
    67. grid = self.get_grid(x, y)
    68. if grid is not None:
    69. grid.type = type
    70. else:
    71. raise("grid doesn't exist")
    72. def get_reward(self, x, y):
    73. grid = self.get_grid(x, y)
    74. if grid is None:
    75. return None
    76. return grid.reward
    77. def get_type(self, x, y):
    78. grid = self.get_grid(x, y)
    79. if grid is None:
    80. return None
    81. return grid.type
    82. # 格子世界环境
    83. class GridWorldEnv(gym.Env):
    84. metadata = {
    85. 'render.modes': ['human', 'rgb_array'],
    86. 'video.frames_per_second': 30
    87. }
    88. def __init__(self, n_width: int=5,
    89. n_height: int = 5,
    90. u_size=40,
    91. default_reward: float = 0.0,
    92. default_type=0):
    93. self.u_size = u_size # 当前格子绘制尺寸
    94. self.n_width = n_width # 格子世界宽度(以格子数计)
    95. self.n_height = n_height # 高度
    96. self.width = u_size * n_width # 场景宽度 screen width
    97. self.height = u_size * n_height # 场景长度
    98. self.default_reward = default_reward
    99. self.default_type = default_type
    100. self.grids = GridMatrix(n_width=self.n_width,
    101. n_height=self.n_height,
    102. default_reward=self.default_reward,
    103. default_type=self.default_type)
    104. self.reward = 0 # for rendering
    105. self.action = None # for rendering
    106. # 0,1,2,3 represent up, down, left, right
    107. self.action_space = spaces.Discrete(4)
    108. # 观察空间由low和high决定
    109. self.observation_space = spaces.Discrete(self.n_height * self.n_width)
    110. self.ends = [(0, 0)] # 终止格子坐标,可以有多个
    111. self.start = (0, 4) # 起始格子坐标,只有一个
    112. self.types = [(2, 2, 1)]
    113. self.rewards = []
    114. self.refresh_setting()
    115. self.viewer = None # 图形接口对象
    116. self.seed() # 产生一个随机子
    117. self.reset()
    118. def seed(self, seed=None):
    119. # 产生一个随机化时需要的种子,同时返回一个np_random对象,支持后续的随机化生成操作
    120. self.np_random, seed = seeding.np_random(seed)
    121. return [seed]
    122. def step(self, action):
    123. assert self.action_space.contains(action), "%r (%s) invalid" % (action, type(action))
    124. self.action = action # action for rendering
    125. old_x, old_y = self._state_to_xy(self.state)
    126. new_x, new_y = old_x, old_y
    127. if action == 0: new_y += 1 # up
    128. elif action == 1: new_y -= 1 # down
    129. elif action == 2: new_x -= 1 # left
    130. elif action == 3: new_x += 1 # right
    131. # boundary effect
    132. if new_x < 0: new_x = 0
    133. if new_x >= self.n_width: new_x = self.n_width - 1
    134. if new_y < 0: new_y = 0
    135. if new_y >= self.n_height: new_y = self.n_height - 1
    136. # wall effect:
    137. # 类型为1的格子为障碍格子,不可进入
    138. if self.grids.get_type(new_x, new_y) == 1:
    139. new_x, new_y = old_x, old_y
    140. self.reward = self.grids.get_reward(new_x, new_y)
    141. done = self._is_end_state(new_x, new_y)
    142. self.state = self._xy_to_state(new_x, new_y)
    143. # 提供格子世界所有的信息在info内
    144. info = {"x": new_x, "y": new_y, "grids": self.grids}
    145. return self.state, self.reward, done, info
    146. # 将状态变为横纵坐标
    147. def _state_to_xy(self, s):
    148. x = s % self.n_width
    149. y = int((s - x) / self.n_width)
    150. return x, y
    151. def _xy_to_state(self, x, y=None):
    152. if isinstance(x, int):
    153. assert (isinstance(y, int)), "incomplete Position info"
    154. return x + self.n_width * y
    155. elif isinstance(x, tuple):
    156. return x[0] + self.n_width * x[1]
    157. return -1 # 未知状态
    158. def refresh_setting(self):
    159. '''用户在使用该类创建格子世界后可能会修改格子世界某些格子类型或奖励值
    160. 的设置,修改设置后通过调用该方法使得设置生效。
    161. '''
    162. for x, y, r in self.rewards:
    163. self.grids.set_reward(x, y, r)
    164. for x, y, t in self.types:
    165. self.grids.set_type(x, y, t)
    166. def reset(self):
    167. self.state = self._xy_to_state(self.start)
    168. return self.state
    169. # 判断是否是终止状态
    170. def _is_end_state(self, x, y=None):
    171. if y is not None:
    172. xx, yy = x, y
    173. elif isinstance(x, int):
    174. xx, yy = self._state_to_xy(x)
    175. else:
    176. assert (isinstance(x, tuple)), "坐标数据不完整"
    177. xx, yy = x[0], x[1]
    178. for end in self.ends:
    179. if xx == end[0] and yy == end[1]:
    180. return True
    181. return False
    182. # 图形化界面
    183. def render(self, mode='human', close=False):
    184. if close:
    185. if self.viewer is not None:
    186. self.viewer.close()
    187. self.viewer = None
    188. return
    189. zero = (0, 0)
    190. u_size = self.u_size
    191. m = 2 # 格子之间的间隙尺寸
    192. # 如果还没有设定屏幕对象,则初始化整个屏幕具备的元素。
    193. if self.viewer is None:
    194. from gym.envs.classic_control import rendering
    195. self.viewer = rendering.Viewer(self.width, self.height)
    196. # 绘制格子
    197. for x in range(self.n_width):
    198. for y in range(self.n_height):
    199. v = [(x * u_size + m, y * u_size + m),
    200. ((x + 1) * u_size - m, y * u_size + m),
    201. ((x + 1) * u_size - m, (y + 1) * u_size - m),
    202. (x * u_size + m, (y + 1) * u_size - m)]
    203. rect = rendering.FilledPolygon(v)
    204. r = self.grids.get_reward(x, y) / 10
    205. if r < 0:
    206. rect.set_color(0.9 - r, 0.9 + r, 0.9 + r)
    207. elif r > 0:
    208. rect.set_color(0.3, 0.5 + r, 0.3)
    209. else:
    210. rect.set_color(0.9, 0.9, 0.9)
    211. self.viewer.add_geom(rect)
    212. # 绘制边框
    213. v_outline = [(x * u_size + m, y * u_size + m),
    214. ((x + 1) * u_size - m, y * u_size + m),
    215. ((x + 1) * u_size - m, (y + 1) * u_size - m),
    216. (x * u_size + m, (y + 1) * u_size - m)]
    217. outline = rendering.make_polygon(v_outline, False)
    218. outline.set_linewidth(3)
    219. if self._is_end_state(x, y):
    220. # 给终点方格添加金黄色边框
    221. outline.set_color(0.9, 0.9, 0)
    222. self.viewer.add_geom(outline)
    223. if self.start[0] == x and self.start[1] == y:
    224. outline.set_color(0.5, 0.5, 0.8)
    225. self.viewer.add_geom(outline)
    226. if self.grids.get_type(x, y) == 1: # 障碍格子用深灰色表示
    227. rect.set_color(0.3, 0.3, 0.3)
    228. else:
    229. pass
    230. # 绘制个体
    231. self.agent = rendering.make_circle(u_size / 4, 30, True)
    232. self.agent.set_color(1.0, 1.0, 0.0)
    233. self.viewer.add_geom(self.agent)
    234. self.agent_trans = rendering.Transform()
    235. self.agent.add_attr(self.agent_trans)
    236. # 更新个体位置
    237. x, y = self._state_to_xy(self.state)
    238. self.agent_trans.set_translation((x + 0.5) * u_size, (y + 0.5) * u_size)
    239. return self.viewer.render(return_rgb_array= mode == 'rgb_array')
    240. # 环境参数设定
    241. class Agent():
    242. def __init__(self, env):
    243. self.episode = 1
    244. self.Q = {}
    245. self.actions = [0, 1, 2, 3]
    246. self.position = env.start
    247. self.model = {}
    248. # 建立模型
    249. def make_model(self, pos, act, reward, next_state):
    250. self.model["{0},{1}".format(pos, act)] = "{0},{1}".format(reward, next_state)
    251. # 模型规划
    252. def q_planning(self,n):
    253. for i in range(0,n):
    254. a = [i for i in self.model.keys()]
    255. done = False
    256. if a != []:
    257. str = choice(a)
    258. pos = str.split(",")[0]+","+str.split(",")[1]
    259. act = int(str.split(",")[2])
    260. reward = float(self.model[str].split(",")[0])
    261. next_state = self.model[str].split(",")[1]+","+self.model[str].split(",")[2]
    262. if next_state == "(0,0)" or next_state == "(4,3)":
    263. done = True
    264. self.updateQ(pos, act, next_state, reward, done)
    265. def chaxunQ(self, pos, act):
    266. judge = False
    267. for i in self.Q:
    268. if i == "{0},{1}".format(pos, act):
    269. judge = True
    270. break
    271. if judge == True:
    272. return True
    273. else:
    274. self.Q["{0},{1}".format(pos, act)] = float(format(random()/10000, '.3f'))
    275. return
    276. # 更新状态动作值Q函数
    277. def updateQ(self, pos, action, next_pos, reward, done):
    278. if done == False:
    279. self.chaxunQ(pos, action)
    280. old_q = self.Q["{0},{1}".format(pos, action)]
    281. action1 = self.performmax(next_pos)
    282. # self.chaxunQ(next_pos, action1)
    283. new_q = self.Q["{0},{1}".format(next_pos, action1)]
    284. old_q = old_q + 0.1 * (reward+0.9 * new_q - old_q)
    285. self.Q["{0},{1}".format(pos, action)] = float(format(old_q, '.3f'))
    286. else:
    287. self.chaxunQ(pos, action)
    288. self.Q["{0},{1}".format(pos, action)] = float(format(reward, '.3f'))
    289. # print(pos, action,reward)
    290. # 动作选取策略
    291. def perform(self, pos):
    292. eplison = random()
    293. self.chaxunQ(pos, choice([0, 1, 2, 3]))
    294. if eplison > 1/self.episode:
    295. maxq = -1000
    296. act = ""
    297. for i in self.Q:
    298. list = i.split(",")
    299. state = list[0] + "," + list[1]
    300. if state == str(pos):
    301. if self.Q[i] > maxq:
    302. maxq = self.Q[i]
    303. act = list[2]
    304. return int(act)
    305. else:
    306. return choice([0, 1, 2, 3])
    307. # argmaxQ
    308. def performmax(self, pos):
    309. maxq = -1000
    310. str1 = ""
    311. self.chaxunQ(pos,choice([0,1,2,3]))
    312. for i in self.Q:
    313. list = i.split(",")
    314. state = list[0]+","+list[1]
    315. if state == str(pos):
    316. if self.Q[i] > maxq:
    317. maxq = self.Q[i]
    318. str1 = list[2]
    319. return int(str1)
    320. def run(n):
    321. agent = Agent(env)
    322. total_j = 0
    323. total_r = 0
    324. a = []
    325. b = []
    326. env.refresh_setting()
    327. for i in range(0, 300):
    328. done = False
    329. env.reset()
    330. r = 0
    331. j = 0
    332. while done == False:
    333. state = env._state_to_xy(env.state)
    334. action = agent.perform(state)
    335. next_state, reward, done, info = env.step(action)
    336. next_state = env._state_to_xy(next_state)
    337. # 更新Q值
    338. agent.updateQ(state, action, next_state, reward,done)
    339. # 更新模型
    340. agent.make_model(state, action, reward, next_state)
    341. r += reward
    342. # 模型规划
    343. agent.q_planning(n)
    344. j = j+1
    345. tpend(total_r)
    346. agent.episode += 1
    347. # print(agent.Q)
    348. total_j += j
    349. if i != 0:
    350. b.append(j)
    351. print("回合={0},步数={1},奖赏={2}".format(i, j, '%.3f' % r))
    352. # return (np.array(a)/np.array(total_j)).tolist()
    353. # return a
    354. return b
    355. if __name__ == "__main__":
    356. n_width = 5
    357. n_height = 5
    358. default_reward = 0
    359. env = GridWorldEnv(n_width, n_height, default_reward=default_reward)
    360. env.types = [(2, 2, 1)]
    361. env.rewards = [(0, 0, 1), (4, 3, 1)] # 奖赏值设定
    362. env.start = (0, 4)
    363. env.ends = [(0, 0), (4, 3)]
    364. env.refresh_setting()
    365. x = range(1, 300)
    366. ln1, = plt.plot(x, run(0), label=u"n=0")
    367. ln2, = plt.plot(x, run(5), label=u"n=5")
    368. ln3, = plt.plot(x, run(10), label=u"n=10")
    369. ln4, = plt.plot(x, run(30), label=u"n=30")
    370. font1 = FontProperties(fname=r"c:\windows\fonts\simsun.ttc", size=15)
    371. ontproperties=font1)
    372. plt.ylabel(u'步数', fontproperties=font1)
    373. plt.show()

    四、Dyan-Q算法对策略的影响

    同样Agent在不同的情节中采用不同的n获得的策略也不一样,当n=50的时候明显快,而且策略更加广泛

     代码如下

    1. from random import random, choice
    2. import gym
    3. from gym import spaces
    4. from gym.utils import seeding
    5. class Grid(object):
    6. def __init__(self, x:int = None,
    7. y:int = None,
    8. type:int = 0,
    9. reward:float = 0.0):
    10. self.x = x # 坐标x
    11. self.y = y
    12. self.type = type # 类别值(0:空;1:障碍或边界)
    13. self.reward = reward # 该格子的即时奖励
    14. self.name = None # 该格子的名称
    15. self._update_name()
    16. def _update_name(self):
    17. self.name = "X{0}-Y{1}".format(self.x, self.y)
    18. def __str__(self):
    19. return "name:{3}, x:{0}, y:{1}, type:{2}".format(self.x,
    20. self.y,
    21. self.type,
    22. self.name
    23. )
    24. class GridMatrix(object):
    25. def __init__(self, n_width:int, # 水平方向格子数
    26. n_height:int, # 竖直方向格子数
    27. default_type:int = 0, # 默认类型
    28. default_reward:float = 0.0, # 默认即时奖励值
    29. ):
    30. self.grids = None
    31. self.n_height = n_height
    32. self.n_width = n_width
    33. self.len = n_width * n_height
    34. self.default_reward = default_reward
    35. self.default_type = default_type
    36. self.reset()
    37. def reset(self):
    38. self.grids = []
    39. for x in range(self.n_height):
    40. for y in range(self.n_width):
    41. self.grids.append(Grid(x,
    42. y,
    43. self.default_type,
    44. self.default_reward))
    45. def get_grid(self, x, y=None):
    46. '''获取一个格子信息
    47. args:坐标信息,由x,y表示或仅有一个类型为tuple的x表示
    48. return:grid object
    49. '''
    50. xx, yy = None, None
    51. if isinstance(x, int):
    52. xx, yy = x, y
    53. elif isinstance(x, tuple):
    54. xx, yy = x[0], x[1]
    55. assert(xx >= 0 and yy >= 0 and xx < self.n_width and yy < self.n_height), "任意坐标值应在合理区间"
    56. index = yy * self.n_width + xx
    57. return self.grids[index]
    58. def set_reward(self, x, y, reward):
    59. grid = self.get_grid(x, y)
    60. if grid is not None:
    61. grid.reward = reward
    62. else:
    63. raise("grid doesn't exist")
    64. def set_type(self, x, y, type):
    65. grid = self.get_grid(x, y)
    66. if grid is not None:
    67. grid.type = type
    68. else:
    69. raise("grid doesn't exist")
    70. def get_reward(self, x, y):
    71. grid = self.get_grid(x, y)
    72. if grid is None:
    73. return None
    74. return grid.reward
    75. def get_type(self, x, y):
    76. grid = self.get_grid(x, y)
    77. if grid is None:
    78. return None
    79. return grid.type
    80. class GridWorldEnv(gym.Env):
    81. metadata = {
    82. 'render.modes': ['human', 'rgb_array'],
    83. 'video.frames_per_second': 30
    84. }
    85. def __init__(self, n_width: int=5,
    86. n_height: int = 5,
    87. u_size=40,
    88. default_reward: float = 0.0,
    89. default_type=0):
    90. self.u_size = u_size # 当前格子绘制尺寸
    91. self.n_width = n_width # 格子世界宽度(以格子数计)
    92. self.n_height = n_height # 高度
    93. self.width = u_size * n_width # 场景宽度 screen width
    94. self.height = u_size * n_height # 场景长度
    95. self.default_reward = default_reward
    96. self.default_type = default_type
    97. self.grids = GridMatrix(n_width=self.n_width,
    98. n_height=self.n_height,
    99. default_reward=self.default_reward,
    100. default_type=self.default_type)
    101. self.reward = 0 # for rendering
    102. self.action = None # for rendering
    103. # 0,1,2,3 represent up, down, left, right
    104. self.action_space = spaces.Discrete(4)
    105. # 观察空间由low和high决定
    106. self.observation_space = spaces.Discrete(self.n_height * self.n_width)
    107. self.ends = [(0, 0)] # 终止格子坐标,可以有多个
    108. self.start = (0, 4) # 起始格子坐标,只有一个
    109. self.types = [(2, 2, 1)]
    110. self.rewards = []
    111. self.refresh_setting()
    112. self.viewer = None # 图形接口对象
    113. self.seed() # 产生一个随机子
    114. self.reset()
    115. def seed(self, seed=None):
    116. # 产生一个随机化时需要的种子,同时返回一个np_random对象,支持后续的随机化生成操作
    117. self.np_random, seed = seeding.np_random(seed)
    118. return [seed]
    119. def step(self, action):
    120. assert self.action_space.contains(action), "%r (%s) invalid" % (action, type(action))
    121. self.action = action # action for rendering
    122. old_x, old_y = self._state_to_xy(self.state)
    123. new_x, new_y = old_x, old_y
    124. if action == 0: new_y += 1 # up
    125. elif action == 1: new_y -= 1 # down
    126. elif action == 2: new_x -= 1 # left
    127. elif action == 3: new_x += 1 # right
    128. # boundary effect
    129. if new_x < 0: new_x = 0
    130. if new_x >= self.n_width: new_x = self.n_width - 1
    131. if new_y < 0: new_y = 0
    132. if new_y >= self.n_height: new_y = self.n_height - 1
    133. # wall effect:
    134. # 类型为1的格子为障碍格子,不可进入
    135. if self.grids.get_type(new_x, new_y) == 1:
    136. new_x, new_y = old_x, old_y
    137. self.reward = self.grids.get_reward(new_x, new_y)
    138. done = self._is_end_state(new_x, new_y)
    139. self.state = self._xy_to_state(new_x, new_y)
    140. # 提供格子世界所有的信息在info内
    141. info = {"x": new_x, "y": new_y, "grids": self.grids}
    142. return self.state, self.reward, done, info
    143. # 将状态变为横纵坐标
    144. def _state_to_xy(self, s):
    145. x = s % self.n_width
    146. y = int((s - x) / self.n_width)
    147. return x, y
    148. def _xy_to_state(self, x, y=None):
    149. if isinstance(x, int):
    150. assert (isinstance(y, int)), "incomplete Position info"
    151. return x + self.n_width * y
    152. elif isinstance(x, tuple):
    153. return x[0] + self.n_width * x[1]
    154. return -1 # 未知状态
    155. def refresh_setting(self):
    156. '''用户在使用该类创建格子世界后可能会修改格子世界某些格子类型或奖励值
    157. 的设置,修改设置后通过调用该方法使得设置生效。
    158. '''
    159. for x, y, r in self.rewards:
    160. self.grids.set_reward(x, y, r)
    161. for x, y, t in self.types:
    162. self.grids.set_type(x, y, t)
    163. def reset(self):
    164. self.state = self._xy_to_state(self.start)
    165. return self.state
    166. # 判断是否是终止状态
    167. def _is_end_state(self, x, y=None):
    168. if y is not None:
    169. xx, yy = x, y
    170. elif isinstance(x, int):
    171. xx, yy = self._state_to_xy(x)
    172. else:
    173. assert (isinstance(x, tuple)), "坐标数据不完整"
    174. xx, yy = x[0], x[1]
    175. for end in self.ends:
    176. if xx == end[0] and yy == end[1]:
    177. return True
    178. return False
    179. # 图形化界面
    180. def render(self, mode='human', close=False):
    181. if close:
    182. if self.viewer is not None:
    183. self.viewer.close()
    184. self.viewer = None
    185. return
    186. zero = (0, 0)
    187. u_size = self.u_size
    188. m = 2 # 格子之间的间隙尺寸
    189. # 如果还没有设定屏幕对象,则初始化整个屏幕具备的元素。
    190. if self.viewer is None:
    191. from gym.envs.classic_control import rendering
    192. self.viewer = rendering.Viewer(self.width, self.height)
    193. # 绘制格子
    194. for x in range(self.n_width):
    195. for y in range(self.n_height):
    196. v = [(x * u_size + m, y * u_size + m),
    197. ((x + 1) * u_size - m, y * u_size + m),
    198. ((x + 1) * u_size - m, (y + 1) * u_size - m),
    199. (x * u_size + m, (y + 1) * u_size - m)]
    200. rect = rendering.FilledPolygon(v)
    201. r = self.grids.get_reward(x, y) / 10
    202. if r < 0:
    203. rect.set_color(0.9 - r, 0.9 + r, 0.9 + r)
    204. elif r > 0:
    205. rect.set_color(0.3, 0.5 + r, 0.3)
    206. else:
    207. rect.set_color(0.9, 0.9, 0.9)
    208. self.viewer.add_geom(rect)
    209. # 绘制边框
    210. v_outline = [(x * u_size + m, y * u_size + m),
    211. ((x + 1) * u_size - m, y * u_size + m),
    212. ((x + 1) * u_size - m, (y + 1) * u_size - m),
    213. (x * u_size + m, (y + 1) * u_size - m)]
    214. outline = rendering.make_polygon(v_outline, False)
    215. outline.set_linewidth(3)
    216. if self._is_end_state(x, y):
    217. # 给终点方格添加金黄色边框
    218. outline.set_color(0.9, 0.9, 0)
    219. self.viewer.add_geom(outline)
    220. if self.start[0] == x and self.start[1] == y:
    221. outline.set_color(0.5, 0.5, 0.8)
    222. self.viewer.add_geom(outline)
    223. if self.grids.get_type(x, y) == 1: # 障碍格子用深灰色表示
    224. rect.set_color(0.3, 0.3, 0.3)
    225. else:
    226. pass
    227. # 绘制个体
    228. self.agent = rendering.make_circle(u_size / 4, 30, True)
    229. self.agent.set_color(1.0, 1.0, 0.0)
    230. self.viewer.add_geom(self.agent)
    231. self.agent_trans = rendering.Transform()
    232. self.agent.add_attr(self.agent_trans)
    233. # 更新个体位置
    234. x, y = self._state_to_xy(self.state)
    235. self.agent_trans.set_translation((x + 0.5) * u_size, (y + 0.5) * u_size)
    236. return self.viewer.render(return_rgb_array= mode == 'rgb_array')
    237. # 环境参数设定
    238. class Agent():
    239. def __init__(self, env):
    240. self.episode = 1
    241. self.Q = {}
    242. self.actions = [0, 1, 2, 3]
    243. self.position = env.start
    244. self.model = {}
    245. def make_model(self, pos, act, reward, next_state):
    246. self.model["{0},{1}".format(pos, act)] = "{0},{1}".format(reward, next_state)
    247. def q_planning(self,n):
    248. for i in range(0, n):
    249. a = [i for i in self.model.keys()]
    250. done=False
    251. if a != []:
    252. str = choice(a)
    253. pos = str.split(",")[0]+","+str.split(",")[1]
    254. act = int(str.split(",")[2])
    255. reward = float(self.model[str].split(",")[0])
    256. next_state = self.model[str].split(",")[1]+","+self.model[str].split(",")[2]
    257. if next_state == "(8,5)" or next_state == "(1,6)":
    258. done = True
    259. self.updateQ(pos,act,next_state,reward,done)
    260. def chaxunQ(self, pos, act):
    261. judge = False
    262. for i in self.Q:
    263. if i == "{0},{1}".format(pos, act):
    264. judge = True
    265. break
    266. if judge == True:
    267. return True
    268. else:
    269. self.Q["{0},{1}".format(pos, act)] = float(format(random()/10000, '.3f'))
    270. return
    271. # 更新状态动作值Q函数
    272. def updateQ(self, pos,action,next_pos,reward,done):
    273. if done == False:
    274. self.chaxunQ(pos, action)
    275. old_q = self.Q["{0},{1}".format(pos, action)]
    276. action1 = self.performmax(next_pos)
    277. new_q = self.Q["{0},{1}".format(next_pos, action1)]
    278. old_q = old_q + 0.1 * (reward+0.9 * new_q - old_q)
    279. self.Q["{0},{1}".format(pos, action)] = float(format(old_q, '.3f'))
    280. else:
    281. self.chaxunQ(pos, action)
    282. self.Q["{0},{1}".format(pos, action)] = float(format(reward, '.3f'))
    283. # 动作选取策略
    284. def perform(self, pos):
    285. eplison = random()
    286. self.chaxunQ(pos, choice([0, 1, 2, 3]))
    287. if eplison > 1/self.episode:
    288. maxq = -1000
    289. act = ""
    290. for i in self.Q:
    291. list = i.split(",")
    292. state = list[0] + "," + list[1]
    293. if state == str(pos):
    294. if self.Q[i] > maxq:
    295. maxq = self.Q[i]
    296. act = list[2]
    297. return int(act)
    298. else:
    299. return choice([0, 1, 2, 3])
    300. # argmaxQ
    301. def performmax(self, pos):
    302. maxq = -1000
    303. str1 = ""
    304. self.chaxunQ(pos,choice([0,1,2,3]))
    305. for i in self.Q:
    306. list = i.split(",")
    307. state = list[0]+","+list[1]
    308. if state == str(pos):
    309. if self.Q[i] > maxq:
    310. maxq = self.Q[i]
    311. str1 = list[2]
    312. return int(str1)
    313. def run(n):
    314. agent = Agent(env)
    315. total_j = 0
    316. total_r = 0
    317. a = []
    318. b = []
    319. env.refresh_setting()
    320. for i in range(0, 300):
    321. done = False
    322. env.reset()
    323. r = 0
    324. j = 0
    325. while done == False and j < 50:
    326. j = j + 1
    327. state = env._state_to_xy(env.state)
    328. action = agent.perform(state)
    329. next_state, reward, done, info = env.step(action)
    330. next_state = env._state_to_xy(next_state)
    331. # 更新模型
    332. agent.make_model(state, action, reward, next_state)
    333. # 更新Q值
    334. agent.updateQ(state, action, next_state, reward, done)
    335. r += reward
    336. # 模型规划
    337. agent.q_planning(n)
    338. if i >= 2:
    339. total_r += r
    340. a.append(total_r)
    341. agent.episode += 1
    342. action_Q = {}
    343. for i in sorted(list(agent.Q.keys()), key=lambda x: [x[1], x[4], x[5], x[-1]]):
    344. action_Q[i] = agent.Q[i]
    345. print("n={0}:Q值{1}".format(n, action_Q))
    346. # P_A:状态采取策略
    347. P_A = {}
    348. Q_keys = list(action_Q.keys())
    349. for i in range(0, len(Q_keys)-4, 4):
    350. temp_action_list = []
    351. max_a_value = max(action_Q[Q_keys[j]] for j in range(i, i+4))
    352. # temp_num:标记四个动作最大值
    353. temp_num = [0, 0, 0, 0]
    354. # PA:四个动作概率
    355. PA = [0, 0, 0, 0]
    356. for k in range(i, i+4):
    357. if action_Q[Q_keys[k]] == max_a_value:
    358. temp_action_list.append(Q_keys[k])
    359. temp_num[k-i] = 1
    360. valid_action_p = round(1/len(temp_action_list), 2)
    361. for m in range(4):
    362. if temp_num[m] == 1:
    363. PA[m] = valid_action_p
    364. P_A[Q_keys[i][0:-2]] = PA
    365. print("Q_A: ", P_A)
    366. total_j += j
    367. b.append(j)
    368. return a
    369. if __name__ == "__main__":
    370. n_width = 11
    371. n_height = 7
    372. default_reward = -0.1
    373. env = GridWorldEnv(n_width, n_height, default_reward=default_reward)
    374. env.types = [(1, 2, 1), (2, 2, 1), (3, 2, 1), (4, 2, 1), (5, 2, 1), (6, 2, 1), (7, 2, 1), (8, 2, 1), (9, 2, 1),
    375. (10, 2, 1)]
    376. env.rewards = [(8, 5, 1), (1, 6, 1)] # 奖赏值设定
    377. env.start = (4, 0) # 机器人出发点坐标
    378. env.ends = [(8, 5), (1, 6)] # 吸入状态坐标
    379. env.refresh_setting()
    380. for i in range(0, 2):
    381. print("episode=", i+1)
    382. run(0)
    383. for i in range(0, 2):
    384. print("episode=", i+1)
    385. run(50)

    创作不易 觉得有帮助请点赞关注收藏~~~

  • 相关阅读:
    Visual studio代码提示(IntelliSense)的语言(包括汉化等)修改
    WGCNA分析教程五 | [更新版]
    学习笔记2
    全网最全超详细.htaccess语法讲解
    细谈VR全景:数字营销时代的宠儿
    Coupang走什么物流?Coupang火箭颜色什么意思?——站斧浏览器
    迷宫问题详解(数据结构实验)
    Google gtest事件机制
    通过async方式在浏览器中调用web worker
    elasticsearch搜索IK分词器实现单个字搜索
  • 原文地址:https://blog.csdn.net/jiebaoshayebuhui/article/details/128018045