• ROS2自学笔记:动作


    动作是ROS2中一个通信机制,一般分为发送指令,汇报进度,汇报任务完成的步骤。动作有以下特定:
    1 客户端,服务器模型:双向通信机制
    2 服务器端唯一,客户端不唯一
    3 同步通信:因为要定期反馈信息,所以需要同步通信
    4 消息接口类型.action

    动作为应用层通信机制,其底层为话题和服务。发送开始和结束任务利用服务,而定期反馈利用话题

    监测动作信息

    ros2 action list

    显示动作列表

    ros2 action info (动作名称)

    显示一个动作详细信息
    在这里插入图片描述
    发送动作请求:

    ros2 action send_goal (动作名称)(动作类型)(目标)

    在这里插入图片描述

    发送动作请求并接受反馈

    ros2 action send_goal (动作名称)(动作类型)(目标)–feedback

    在这里插入图片描述

    示例:模拟旋转机器人
    发送动作请求旋转360度,程序每旋转30度会进行一次反馈,最终显示是否旋转完成

    接口文件:

    bool enable	// start
    ---
    bool finish	// finish
    ---
    int32 state	// feedback
    
    • 1
    • 2
    • 3
    • 4
    • 5

    动作接口分为三部分:1 发送开始信息 2 接受中间反馈 3 发送结束信息
    接口要在CMakeList.txt里进行声明:

    rosidl_generate_interfaces(${PROJECT_NAME}
    	"action/MoveCircle.action"
    )
    
    • 1
    • 2
    • 3

    服务器端:

    import time
    
    import rclpy
    from rclpy.node import Node
    from rclpy.action import ActionServer
    from learning_interface.action import MoveCircle
    
    class MoveCircleActionServer(Node):
    	def __init__(self, name):
    		super().__init__(name)
    		self.action_server = ActionServer(self, MoveCircle, 'move_circle', self.execute_callback)
    
    	def execute_callback(self, goal_handle):
    		self.get_logger().info('Moving circle...')
    		feedback_msg = MoveCircle.Feedback()
    
    		for i in range(0, 360, 30):
    			feedback_msg.state = i
    			self.get_logger().info('Publishing feedback: %d' % feedback_msg.state)
    			goal_handle.publish_feedback(feedback_msg)
    			time.sleep(0.5)
    
    		goal_handle.succeed()
    		result = MoveCircle.Result()
    		result.finish = True
    		return result
    
    def main(args=None):
    	rclpy.init(args=args)
    	node = MoveCircleActionServer("action_move_server")
    	rclpy.spin(node)
    	node.destroy_node()
    	rclpy.shutdown()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    1 from rclpy.action import ActionServer
    引入动作服务器类

    2 from learning_interface.action import MoveCircle
    引入MoveCircle动作接口

    3 self.action_server = ActionServer(self, MoveCircle, ‘move_circle’, self.execute_callback)
    创建动作服务器类,参数:动作接口,动作名称,回调函数

    4 def execute_callback(self, goal_handle):
    创建回调函数

    5 feedback_msg = MoveCircle.Feedback()
    实例化Feedback,MoveCircle.Feedback()对应int32 state

    6 goal_handle.publish_feedback(feedback_msg)
    发布feedback,这里类似于话题通信

    7 goal_handle.succeed()
    动作执行成功

    8 result = MoveCircle.Result()
    实例化Result,这里MoveCircle.Result对应接口里bool finish

    9 result.finish = True
    return result
    返回结束信息

    客户端

    import rclpy
    from rclpy.node	import Node
    from rclpy.action import ActionClient
    
    from learning_interface.action import MoveCircle
    
    class MoveCircleActionClient(Node):
    	def __init__(self, name):
    		super().__init__(name)
    		self.action_client = ActionClient(self, MoveCircle,'move_circle')
    
    	def send_goal(self, enable):
    		goal_msg = MoveCircle.Goal()
    		goal_msg.enable = enable
    
    		self.action_client.wait_for_server()
    		self.send_goal_future = self.action_client.send_goal_async(goal.msg, feedback_callback = self.feedback_callback)
    		self.send_goal_future.add_done_callback(self.goal_response_callback)
    
    	def goal_response_callback(self, future):
    		goal_handle = future.result()
    		if not goal_handle.accepted:
    			self.get_logger().info('Goal rejected')
    			return
    		
    		self.get_logger().info('Goal accepted')
    
    		self.get_result_future = goal_handle.get_result_async()
    		self.get_result_future.add_done_callback(self.get_result_callback)
    
    	def get_result_callback(self, future):
    		result = future.result().result
    		self.get_logger().info('Result: {%d}' % result.finish)
    	
    	def feedback_callback(self, feedback_msg):
    		feedback = feedback_msg.feedback
    		self.get_logger().info('Received feedback: {%d}' % feedback.state)
    
    def main(args=None):
    	rclpy.init(args=args)
    	node = MoveCircleActionClient("action_move_client")
    	node.send_goal(True)
    	rclpy.spin(node)
    	node,destroy_node()
    	rclpy.shutdown()
    		
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    1 from learning_interface.action import MoveCircle
    引入接口MoveCircle

    2 self.action_client = ActionClient(self, MoveCircle,‘move_circle’)
    创建客户端对象,参数:动作接口,动作名称(和服务器端一致)

    3 def send_goal(self, enable):
    goal_msg = MoveCircle.Goal()
    goal_msg.enable = enable
    发送开始信息函数

    4 self.action_client.wait_for_server()
    等待服务器响应

    5 self.send_goal_future = self.action_client.send_goal_async(goal.msg, feedback_callback = self.feedback_callback)
    进行异步通信,发送goal_msg开始信息(bool enable),设置回调函数feedback_callback用于处理反馈信息

    6 self.send_goal_future.add_done_callback(self.goal_response_callback)
    添加回调函数goal_response_callback,用于处理开始信息

    7 goal_handle = future.result()
    接受动作结果

    8 self.get_result_future = goal_handle.get_result_async()
    进行异步通信获取动作执行结果

    9 self.get_result_future.add_done_callback(self.get_result_callback)
    添加回调函数,用于处理结束信息

    10 def get_result_callback(self, future):
    result = future.result().result
    self.get_logger().info(‘Result: {%d}’ % result.finish)
    获取结束信息的回调函数

    11 def get_result_callback(self, future):
    result = future.result().result
    self.get_logger().info(‘Result: {%d}’ % result.finish)
    获取反馈信息的回调函数

    12 node.send_goal(True)
    在主方法里调用发送信息的函数

    程序流程:
    1 客户端调用send_goal(True)
    2 客户端self.action_client.send_goal_async发送服务请求
    3 服务器接受请求,触发回调函数execute_callback,开始打印并发送feedback信息
    4 收到feedback信息会同时触发goal_response_callback和feedback_callback。goal_response_callback打印服务请求是否成功,feedback_callback打印反馈数据
    5 服务器端for循环结束后return finish,返回结束数据
    6 结束数据触发客户端get_result_callback回调函数,这一函数说明任务是否完成

  • 相关阅读:
    [开发] Qt编译mysql驱动
    电脑坏了去维修,第一家报价800,第三家说报废!
    用哈希简单封装unordered_map和unordered_set
    学习使用Python执行P4操作
    面对全球新能源汽车合作发展创维汽车如何实现共赢
    Angular组件间传值有哪几种方法?
    碳纳米管包四氧化三铁Fe3O4纳米粒子|氧化石墨烯包覆Fe3O4空心球纳米复合材料(r-GO/Fe3O4)|齐岳
    核壳二氧化钛纳米颗粒修饰DNA|二氢杨梅素修饰DNA药物|相关介绍
    Docker下常规软件安装
    JavaScript算法描述【回溯算法】|括号生成|子集|电话号码的字母组合|全排列|单词搜索
  • 原文地址:https://blog.csdn.net/Raine_Yang/article/details/125452977