• Ros2中Action两个节点主动双向通信


    计划实现:

    过程一:机器人给服务器发一段音频

    过程二:服务器处理音频后,再主动给机器人发送播放请求

    过程三:机器人播放完成后,给服务器一个响应

    过程四:然后服务器给机器人返回结束指令

    整个过程有两个action,过程一和过程四是一个action,过程二和过程三是一个action,过程二和过程三是过程一和过程四的一部分。 

    注意的点:

    1. 在robot中有一个主线程,一个定时任务子线程,在server中只有一个主线程(可以查看之前的文章导入一下ros环境,然后直接在ros源代码中打印数据)
    2. 在robot中的子线程中,不能使用rclpy.spin_until_future_complete(self, _send_goal_future),不然会提示ValueError: generator already executing。 在server中的主线程中可以使用这个方法。 我理解的是,该方法会阻塞主线程,直接让ros不再处理任何事情,只等待这个返回,但是这个时候,服务端还会主动给机器人发送信息,所以程序就死了
    3. 在server中,我们使用client发送请求的时候,必须要新创建一个node,因为是在主线程中了,所以可以使用rclpy.spin_until_future_complete进行阻塞

    上面的原理都不是很清楚,是自己猜的

    代码如下:

    文件一:server.py

    1. import threading
    2. import time
    3. import rclpy
    4. from rclpy.action import ActionClient
    5. from rclpy.action import ActionServer
    6. from rclpy.node import Node
    7. from rclpy.action.server import ServerGoalHandle
    8. from bothway_interfaces.action import AskNbReplyRobot
    9. from bothway_interfaces.action import AskRobotReplyNb
    10. # 导入Action接口
    11. # 两个线程,一个Main线程,一个定时任务线程
    12. class Robot(Node):
    13. """Action客户端"""
    14. def __init__(self, name):
    15. super().__init__(name)
    16. current_thread = threading.current_thread()
    17. self.get_logger().info(
    18. f"Main thread: {current_thread.name}, ID: {current_thread.ident}")
    19. self.send_index = 0
    20. self.get_logger().info(f"节点已启动:{name}!")
    21. self.action_server_reply_robot = ActionServer(
    22. self,
    23. AskRobotReplyNb,
    24. "AskRobotReplyNb",
    25. execute_callback=self.callback_execute_callback,# 主线程运行
    26. )
    27. self.timer = threading.Timer(2, self.send_request)
    28. self.timer.start()
    29. def callback_execute_callback(self,goal_handle: ServerGoalHandle):
    30. current_thread = threading.current_thread()
    31. self.get_logger().info(
    32. f"callback_execute_callback thread: {current_thread.name}, ID: {current_thread.ident}")
    33. playwave = goal_handle.request.base64wav
    34. self.get_logger().info(f"成功拿到播放参数:{playwave}")
    35. self.get_logger().info("播放成功")
    36. #给反馈
    37. goal_handle.succeed()
    38. result = AskRobotReplyNb.Result()
    39. result.code = 0
    40. result.msg = '播放成功'
    41. result.t = time.time()
    42. return result
    43. def send_request(self):
    44. #因为启动了定时器,所以是在子线程中运行
    45. current_thread = threading.current_thread()
    46. self.get_logger().info(
    47. f"request thread: {current_thread.name}, ID: {current_thread.ident}")
    48. # 每次创建一个新的client
    49. # test = rclpy.create_node("test")
    50. action_client_ask_nb = ActionClient(
    51. self, AskNbReplyRobot, "ask_nb_reply_robot")
    52. while not action_client_ask_nb.wait_for_server(timeout_sec=1.0):
    53. self.get_logger().info('service not available, waiting again...')
    54. goal_msg = AskNbReplyRobot.Goal()
    55. goal_msg.base64wav = f"base64wav--aaaaa:{self.send_request}"
    56. goal_msg.index = self.send_index
    57. self.send_index = self.send_index + 1
    58. goal_msg.t = time.time()
    59. self.get_logger().info(f"goal_msg封装完成:{goal_msg.t},准备发送请求")
    60. _send_goal_future = action_client_ask_nb.send_goal_async(
    61. goal_msg, feedback_callback=self.feedback_callback)
    62. self.get_logger().info("等待 send_goal_future返回中.....")
    63. # 阻塞当前线程,直到 _send_goal_future 完成。这意味着ROS节点的事件循环将被阻塞,不允许节点继续执行其他任务,直到 _send_goal_future 完成或超时
    64. # rclpy.spin_until_future_complete(self, _send_goal_future)
    65. while not _send_goal_future.done(): # 用于阻塞当前线程,当前线程中其他代码都不能执行,但是ros事件循环能执行
    66. time.sleep(0.2)
    67. self.get_logger().info("等待 send_goal_future返回中.....")
    68. self.get_logger().info("请求成功返回")
    69. goal_handle = _send_goal_future.result()
    70. if goal_handle.accepted:
    71. result_future = goal_handle.get_result_async()
    72. self.get_logger().info('等待result_future返回中......')
    73. # rclpy.spin_until_future_complete(self, result_future)
    74. while not result_future.done():
    75. time.sleep(0.2)
    76. self.get_logger().info("等待 result_future.....")
    77. self.get_logger().info('result_future成功')
    78. if result_future == None or result_future.result() == None or result_future.result().result == None:
    79. self.get_logger().info(
    80. "result_future == None or result_future.result() == None or result_future.result().result == None")
    81. return None
    82. self.get_logger().info(f"result_future:{result_future}")
    83. self.get_logger().info(
    84. f"result_future.result():{result_future.result()}")
    85. # self.timer = threading.Timer(0.1, self.send_request)
    86. # self.timer.start()
    87. def feedback_callback(self, feedback_msg): # 此处是第一个话题,过程反馈话题,客户端订阅
    88. """获取回调反馈"""
    89. feedback = feedback_msg.feedback
    90. # self.get_logger().info(f"Received feedback: {feedback.state}, {feedback.process}")
    91. def main(args=None):
    92. """主函数"""
    93. rclpy.init(args=args)
    94. robot = Robot("robot")
    95. rclpy.spin(robot)
    96. rclpy.shutdown()

    文件二:

    robot.py

    1. import threading
    2. import time
    3. import rclpy
    4. from rclpy.action import ActionClient
    5. from rclpy.action import ActionServer
    6. from rclpy.node import Node
    7. from rclpy.action.server import ServerGoalHandle
    8. from bothway_interfaces.action import AskNbReplyRobot
    9. from bothway_interfaces.action import AskRobotReplyNb
    10. # 导入Action接口
    11. # 两个线程,一个Main线程,一个定时任务线程
    12. class Robot(Node):
    13. """Action客户端"""
    14. def __init__(self, name):
    15. super().__init__(name)
    16. current_thread = threading.current_thread()
    17. self.get_logger().info(
    18. f"Main thread: {current_thread.name}, ID: {current_thread.ident}")
    19. self.send_index = 0
    20. self.get_logger().info(f"节点已启动:{name}!")
    21. self.action_server_reply_robot = ActionServer(
    22. self,
    23. AskRobotReplyNb,
    24. "AskRobotReplyNb",
    25. execute_callback=self.callback_execute_callback,# 主线程运行
    26. )
    27. self.timer = threading.Timer(2, self.send_request)
    28. self.timer.start()
    29. def callback_execute_callback(self,goal_handle: ServerGoalHandle):
    30. current_thread = threading.current_thread()
    31. self.get_logger().info(
    32. f"callback_execute_callback thread: {current_thread.name}, ID: {current_thread.ident}")
    33. playwave = goal_handle.request.base64wav
    34. self.get_logger().info(f"成功拿到播放参数:{playwave}")
    35. self.get_logger().info("播放成功")
    36. #给反馈
    37. goal_handle.succeed()
    38. result = AskRobotReplyNb.Result()
    39. result.code = 0
    40. result.msg = '播放成功'
    41. result.t = time.time()
    42. return result
    43. def send_request(self):
    44. #因为启动了定时器,所以是在子线程中运行
    45. current_thread = threading.current_thread()
    46. self.get_logger().info(
    47. f"request thread: {current_thread.name}, ID: {current_thread.ident}")
    48. # 每次创建一个新的client
    49. # test = rclpy.create_node("test")
    50. action_client_ask_nb = ActionClient(
    51. self, AskNbReplyRobot, "ask_nb_reply_robot")
    52. while not action_client_ask_nb.wait_for_server(timeout_sec=1.0):
    53. self.get_logger().info('service not available, waiting again...')
    54. goal_msg = AskNbReplyRobot.Goal()
    55. goal_msg.base64wav = f"base64wav--aaaaa:{self.send_request}"
    56. goal_msg.index = self.send_index
    57. self.send_index = self.send_index + 1
    58. goal_msg.t = time.time()
    59. self.get_logger().info(f"goal_msg封装完成:{goal_msg.t},准备发送请求")
    60. _send_goal_future = action_client_ask_nb.send_goal_async(
    61. goal_msg, feedback_callback=self.feedback_callback)
    62. self.get_logger().info("等待 send_goal_future返回中.....")
    63. # 阻塞当前线程,直到 _send_goal_future 完成。这意味着ROS节点的事件循环将被阻塞,不允许节点继续执行其他任务,直到 _send_goal_future 完成或超时
    64. # rclpy.spin_until_future_complete(self, _send_goal_future)
    65. while not _send_goal_future.done(): # 用于阻塞当前线程,当前线程中其他代码都不能执行,但是ros事件循环能执行
    66. time.sleep(0.2)
    67. self.get_logger().info("等待 send_goal_future返回中.....")
    68. self.get_logger().info("请求成功返回")
    69. goal_handle = _send_goal_future.result()
    70. if goal_handle.accepted:
    71. result_future = goal_handle.get_result_async()
    72. self.get_logger().info('等待result_future返回中......')
    73. # rclpy.spin_until_future_complete(self, result_future)
    74. while not result_future.done():
    75. time.sleep(0.2)
    76. self.get_logger().info("等待 result_future.....")
    77. self.get_logger().info('result_future成功')
    78. if result_future == None or result_future.result() == None or result_future.result().result == None:
    79. self.get_logger().info(
    80. "result_future == None or result_future.result() == None or result_future.result().result == None")
    81. return None
    82. self.get_logger().info(f"result_future:{result_future}")
    83. self.get_logger().info(
    84. f"result_future.result():{result_future.result()}")
    85. # self.timer = threading.Timer(0.1, self.send_request)
    86. # self.timer.start()
    87. def feedback_callback(self, feedback_msg): # 此处是第一个话题,过程反馈话题,客户端订阅
    88. """获取回调反馈"""
    89. feedback = feedback_msg.feedback
    90. # self.get_logger().info(f"Received feedback: {feedback.state}, {feedback.process}")
    91. def main(args=None):
    92. """主函数"""
    93. rclpy.init(args=args)
    94. robot = Robot("robot")
    95. rclpy.spin(robot)
    96. rclpy.shutdown()

  • 相关阅读:
    Flutter - APP主界面Tabbar保持页面状态
    一、【React拓展】setState 的2种写法
    C#/.NET/.NET Core优秀项目和框架2024年4月简报
    [算法]数组给定长度的所有排列
    MySQL之如何复制一张表的数据
    计算机算法分析与设计(4)---矩阵连乘问题(含C++代码)
    第二章 进程与线程 十五、互斥锁
    Learn Prompt-Prompt 高级技巧:AI-town 虚拟小镇
    C#实现顺序表定义、插入、删除、查找操作
    基于c++的简易web服务器搭建(初尝socket编程)
  • 原文地址:https://blog.csdn.net/sunriseYJP/article/details/134013187