• ROS2时间同步(python)


    最近1周一直研究ROS2的时间同步,翻越很多博客,很少有人使用ROS2进行时间同步的代码,无奈不断尝试与源码阅读,终于将其搞定,

    为此,本博客将介绍基于python的ROS2的时间同步方法。

    本博客内容结构为话题发布代码,话题订阅与时间同步代码,代码文件夹结构及结果显示图片。本博客假设2个publisher和一个scribe,同步是在scibe中完成。

    一.话题发布代码

    发布1为第二个发布者,可理解为某传感器

    publisher1代码如下:

    复制代码
    #!/usr/bin/env python3
    import rclpy
    from rclpy.node import Node
    from std_msgs.msg import String,Float32,Int32
    import cv2
    # from std_msgs.msg import Header
    import time
    class NodePublisher(Node):
        def __init__(self,name):
            super().__init__(name)
            self.get_logger().info("大家好,我是%s!" % name)
            self.num=0
            self.command_publisher1 = self.create_publisher(Int32,"command1", 10) 
            self.timer = self.create_timer(0.4, self.timer_callback)  # 
            # self.inputdata1()
        def inputdata1(self):
            msg = Int32() #String()
            period=0.5
            print("publisher1-周期",period)
            self.get_logger().info(f'发布了指令:{msg.data}')    #打印一下发布的数据
            num=0
            while True:
                num=num+1
                msg.data = num #str(num)
    
                self.command_publisher_.publish(msg) 
                # time.sleep(period)
                self.get_logger().info(f'发布了指令:{msg.data}')    #打印一下发布的数据
        def timer_callback(self):
            msg = Int32() #String()
            self.num+=1
            msg.data = self.num #str(num)
            self.command_publisher1.publish(msg) 
            self.get_logger().info(f'发布了指令:{msg.data}')    #打印一下发布的数据
    def main(args=None):
        rclpy.init(args=args) # 初始化rclpy
        node = NodePublisher("topic_publisher1")  # 新建一个节点
        rclpy.spin(node) # 保持节点运行,检测是否收到退出指令(Ctrl+C)
        rclpy.shutdown() # 关闭rclpy
    复制代码

    发布2为第二个发布者,可理解为某传感器

    publisher2代码如下:

    复制代码
    #!/usr/bin/env python3
    import rclpy
    from rclpy.node import Node
    from std_msgs.msg import String,Float32,Int32
    import time
    class NodePublisher(Node):
        def __init__(self,name):
            super().__init__(name)
            self.get_logger().info("大家好,我是%s!" % name)
            self.num=0
            self.command_publisher2= self.create_publisher(Int32,"command2", 10) 
            self.timer = self.create_timer(0.2, self.timer_callback)  #    
        def timer_callback(self):
            msg = Int32() #String()
            self.num+=1
            msg.data = self.num #str(num)
            self.command_publisher2.publish(msg) 
            self.get_logger().info(f'发布了指令:{msg.data}')    #打印一下发布的数据
    def main(args=None):
        rclpy.init(args=args) # 初始化rclpy
        node = NodePublisher("topic_publisher2")  # 新建一个节点
        rclpy.spin(node) # 保持节点运行,检测是否收到退出指令(Ctrl+C)
        rclpy.shutdown() # 关闭rclpy
    复制代码

    二.话题订阅及时间同步代码

    订阅发布者信息,并将其同步,可理解为同步不同传感器

    scriabe代码如下:

    复制代码
    #!/usr/bin/env python3
    import rclpy
    from rclpy.node import Node
    import message_filters
    from std_msgs.msg import String,Float32,Int32
    import message_filters
    from sensor_msgs.msg import Image, CameraInfo
    def callback(image_sub,info_sub):
        res=int(info_sub.data)-int(image_sub.data)
        print("publisher1:\t{}\tpubsher2:\t{}\t{}".format(str(image_sub.data),str(info_sub.data),res))
    def main(args=None):
        rclpy.init(args=args) # 初始化rclpy
        scribe_node=Node('scribe_time')
     
        image_sub = message_filters.Subscriber(scribe_node, Int32,'command1')
        info_sub = message_filters.Subscriber(scribe_node, Int32,'command2')
        
        ts = message_filters.ApproximateTimeSynchronizer([image_sub, info_sub], 10, 0.1, allow_headerless=True) # allow_headerless=True,可以不使用时间戳
        # ts = message_filters.TimeSynchronizer([image_sub, info_sub], 10) # 这个需要时间戳才可调用 
        ts.registerCallback(callback)
        rclpy.spin(scribe_node)
        rospy.spin()   
        
    复制代码

     

    三.参数配置及文件格式

    setup.py设置如下:

    复制代码
    from setuptools import setup
    package_name = 'topic_time'
    setup(
        name=package_name,
        version='0.0.0',
        packages=[package_name],
        data_files=[
            ('share/ament_index/resource_index/packages',
                ['resource/' + package_name]),
            ('share/' + package_name, ['package.xml']),
        ],
        install_requires=['setuptools'],
        zip_safe=True,
        maintainer='root',
        maintainer_email='root@todo.todo',
        description='TODO: Package description',
        license='TODO: License declaration',
        tests_require=['pytest'],
        entry_points={
            'console_scripts': [
            "publisher1_node = topic_time.publisher1:main",
            "publisher2_node = topic_time.publisher2:main",
            "subscribe_node = topic_time.subscribe:main",
            "subscribe2_node = topic_time.subscribe2:main"
    
            ],
        },
    )
    复制代码

     

    文件格式如下:

     

     

     

    通过以上代码将可看到同步的scribe中发布1时间无间隔,发布2时间间隔为4,恰好与设置周期同等,结果显示如下:

     

     

  • 相关阅读:
    Mysql的视图、存储过程与函数
    openCV初级实践项目:银行卡卡号识别
    日撸Java三百行(day19:字符串匹配)
    使用.NET简单实现一个Redis的高性能克隆版(四、五)
    人工智能第2版学习——博弈中的搜索1
    GO语言使用之网络编程(TCP编程)
    JDBC【DBUtils】
    shell_38.Linux读取脚本名
    OpenAI超级对齐负责人:“驾驭”超级智能的四年计划
    YOLOv8 : 数据组织
  • 原文地址:https://www.cnblogs.com/tangjunjun/p/16803149.html