• ROS2进阶第三章 -- 机器人语音交互之ros集成科大讯飞中文语音库,实现语音控制机器人小车


    概述:
    上一篇博客:ROS2进阶第二章 – 使用Gazebo构建机器人仿真平台 – 控制差速轮式机器人移动,我们使用gazebo仿真环境创建一个世界,并将上一节制作的机器人加载到仿真环境中,在通过键盘控制节点来控制小车移动,并通过rviz实时察看 camera,kinect和lidar三种传感器的仿真效果。

    本文我们将在ros上集成科大讯飞的中文语音库,实现语音控制机器人小车前进后退,左转右转等。至于语音识别和语音合成的原理,本文并不深究,读者可以自行搜索相关的文章介绍。这里提醒,本文的测试环境是ubuntu22.04 + ros2 humble。

    参考资料

    1. 讯飞语音听写 Linux SDK 文档
    2. linux系统(ubuntu)调用科大讯飞SDK实现语音识别
    3. ROS2进阶第一章 – 从头开始构建一个可视化的差速轮式机器人模型 – 学习URDF机器人建模与xacro优化
    4. ROS2进阶第二章 – 使用Gazebo构建机器人仿真平台 – 控制差速轮式机器人移动
    5. ROS高效进阶第五章 – 机器人语音交互之ros集成科大讯飞中文语音库,实现语音控制机器人小车

    方法一、使用普通麦克风版

    这个方法整个过程较为简单,容易配置,由于科大讯飞的语音包是C++代码编写,在使用这个包编写接口调用程序是存在一定的困难,因此另辟蹊径,直接调用语音包的Demo,获得语音的识别结果,通过文件的形式将这个结果传递给ros2的键盘控制节点,以实现语音控制功能。下面是详细的说明。

    1. 下载科大讯飞语音库

    1. 首先登陆讯飞开放平台:讯飞开放平台 ,注册后,点击控制台进入。
    2. 然后创建应用并下载linux sdk,更具体的操作可以参考: linux系统(ubuntu)调用科大讯飞SDK实现语音识别

    2. 创建工作空间

    我这边的工作空间命名为dev_ws_A(如果看了我前面两节的则不用新建工作空间了,直接在ros2_ws目录下即可),并在这里创建了两个文件夹srcvoice_ros2,如下图所示
    在这里插入图片描述
    将刚刚下载的压缩包放到voice_ros2文件夹中,如图所示:
    在这里插入图片描述

    src文件夹中的内容如下:
    一个是机器人的模型以及世界,一个是键盘控制节点(这两个文件夹中的内容都在第二章详细的进行了说明)
    在这里插入图片描述

    3. 编写代码

    (1)首先在voice_ros2文件夹中新建一个文件名为voice.py,复制下面的内容

    import subprocess
    import multiprocessing
    import time
    
    def run_iat_online_record_sample(queue):
        process = subprocess.Popen(["./bin/iat_online_record_sample"], 
                                   stdout=subprocess.PIPE, 
                                   stdin=subprocess.PIPE, 
                                   stderr=subprocess.PIPE, 
                                   )
        
        # Communicate with the process
        stdout, _ = process.communicate(input=b"0\n1\n")
        
        # Put the result into the queue
        queue.put(stdout.decode('utf-8'))
    
    def main():
        while True:
            # Create a queue for communication between processes
            queue = multiprocessing.Queue()
            
            # Start the process
            process = multiprocessing.Process(target=run_iat_online_record_sample, args=(queue,))
            process.start()
            
            # Wait for the process to finish and get the result from the queue
            process.join()
            result = queue.get()
            
            # Print the result
            print("Result:", result)
            
            # Save the result to a text file, clearing the file first
            with open("result.txt", "w") as f:
                f.write(result)
            
            # Ask user whether to continue recognition
            continue_recognition = input("是否继续识别? (0: 结束, 1: 继续): ")
            if continue_recognition == "0":
                break
    
    if __name__ == "__main__":
        main()
    
    

    运行时在当前文件夹中以python文件的方式来运行,如图所示
    在这里插入图片描述
    这个文件运行后会在当前目录生成一个result.txt文件,如下图,这个文件的内容每次识别之后都会更新,键盘节点就是通过获取这个文件的数据来通过语音控制机器人移动的
    在这里插入图片描述

    (2)修改teleop_twist_keyboard.py文件

    在键盘控制的代码前添加读取文件数据的代码
    在这里插入图片描述
    这里将刚刚识别到的语音过滤后存储在voice_command[0]中,以供后续使用,下面会通过判断voice_command[0]中的值来进行不同的操作
    在这里插入图片描述
    完整代码如下:

    import sys
    import threading
    import time
    import os
    from std_msgs.msg import String
    import geometry_msgs.msg
    import rclpy
    
    if sys.platform == 'win32':
        import msvcrt
    else:
        import termios
        import tty
    
    msg = """
    This node takes keypresses from the keyboard and publishes them
    as Twist/TwistStamped messages. It works best with a US keyboard layout.
    ---------------------------
    Moving around:
       u    i    o
       j    k    l
       m    ,    .
    
    For Holonomic mode (strafing), hold down the shift key:
    ---------------------------
       U    I    O
       J    K    L
       M    <    >
    
    t : up (+z)
    b : down (-z)
    
    anything else : stop
    
    q/z : increase/decrease max speeds by 10%
    w/x : increase/decrease only linear speed by 10%
    e/c : increase/decrease only angular speed by 10%
    
    CTRL-C to quit
    """
    
    moveBindings = {
        'i': (1, 0, 0, 0),
        'o': (1, 0, 0, -1),
        'j': (0, 0, 0, 1),
        'l': (0, 0, 0, -1),
        'u': (1, 0, 0, 1),
        ',': (-1, 0, 0, 0),
        '.': (-1, 0, 0, 1),
        'm': (-1, 0, 0, -1),
        'O': (1, -1, 0, 0),
        'I': (1, 0, 0, 0),
        'J': (0, 1, 0, 0),
        'L': (0, -1, 0, 0),
        'U': (1, 1, 0, 0),
        '<': (-1, 0, 0, 0),
        '>': (-1, -1, 0, 0),
        'M': (-1, 1, 0, 0),
        't': (0, 0, 1, 0),
        'b': (0, 0, -1, 0),
    }
    
    speedBindings = {
        'q': (1.1, 1.1),
        'z': (.9, .9),
        'w': (1.1, 1),
        'x': (.9, 1),
        'e': (1, 1.1),
        'c': (1, .9),
    }
    
    
    def getKey(settings):
        if sys.platform == 'win32':
            # getwch() returns a string on Windows
            key = msvcrt.getwch()
        else:
            tty.setraw(sys.stdin.fileno())
            # sys.stdin.read() returns a string on Linux
            key = sys.stdin.read(1)
            termios.tcsetattr(sys.stdin, termios.TCSADRAIN, settings)
        return key
    
    
    def saveTerminalSettings():
        if sys.platform == 'win32':
            return None
        return termios.tcgetattr(sys.stdin)
    
    
    def restoreTerminalSettings(old_settings):
        if sys.platform == 'win32':
            return
        termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings)
    
    
    def vels(speed, turn):
        return 'currently:\tspeed %s\tturn %s ' % (speed, turn)
    
    
    def main():
        settings = saveTerminalSettings()
    
        rclpy.init()
    
        node = rclpy.create_node('teleop_twist_keyboard')
    
        # parameters
        stamped = node.declare_parameter('stamped', False).value
        frame_id = node.declare_parameter('frame_id', '').value
        if not stamped and frame_id:
            raise Exception("'frame_id' can only be set when 'stamped' is True")
    
        if stamped:
            TwistMsg = geometry_msgs.msg.TwistStamped
        else:
            TwistMsg = geometry_msgs.msg.Twist
    
        pub = node.create_publisher(TwistMsg, 'cmd_vel', 10)
    
        voice_command = [None]  # Initializing as a list
    
        spinner = threading.Thread(target=rclpy.spin, args=(node,))
        spinner.start()
    
        speed = 0.5
        turn = 1.0
        x = 0.0
        y = 0.0
        z = 0.0
        th = 0.0
        status = 0.0
    
        twist_msg = TwistMsg()
    
        if stamped:
            twist = twist_msg.twist
            twist_msg.header.stamp = node.get_clock().now().to_msg()
            twist_msg.header.frame_id = frame_id
        else:
            twist = twist_msg
    
        try:
            print(msg)
            print(vels(speed, turn))
            while True:
                print("当前工作路径:", os.getcwd())
                with open('./voice_ros2/result.txt', 'r') as f:
                    for line in f:
                        if line.startswith('Result: ['):
                            start = line.find('[')
                            end = line.find(']')
                            if start != -1 and end != -1:
                                voice_command[0] = line[start + 1:end].strip()
                                print("voice_command", voice_command[0])
                                # Clearing the content of result.txt
                                open('./voice_ros2/result.txt', 'w').close()
                                break
    
                key = getKey(settings)
                # print("键盘控制按键输出", key)
                if key in moveBindings.keys():
                    x = moveBindings[key][0]
                    y = moveBindings[key][1]
                    z = moveBindings[key][2]
                    th = moveBindings[key][3]
                elif key in speedBindings.keys():
                    speed = speed * speedBindings[key][0]
                    turn = turn * speedBindings[key][1]
    
                    print(vels(speed, turn))
                    if (status == 14):
                        print(msg)
                    status = (status + 1) % 15
               
                elif voice_command[0] is not None:
                    if voice_command[0] == "小车前进":
                        print("语音控制小车前进", voice_command[0])
                        x = moveBindings['i'][0]
                        y = moveBindings['i'][1]
                        z = moveBindings['i'][2]
                        th = moveBindings['i'][3]
                    elif voice_command[0] == "小车后退":
                        print("语音控制小车后退", voice_command[0])
                        x = moveBindings[','][0]
                        y = moveBindings[','][1]
                        z = moveBindings[','][2]
                        th = moveBindings[','][3]
                    elif voice_command[0] == "小车左转":
                        print("语音控制小车左转", voice_command[0])
                        x = moveBindings['j'][0]
                        y = moveBindings['j'][1]
                        z = moveBindings['j'][2]
                        th = moveBindings['j'][3]
                    elif voice_command[0] == "小车右转":
                        print("语音控制小车右转", voice_command[0])
                        x = moveBindings['l'][0]
                        y = moveBindings['l'][1]
                        z = moveBindings['l'][2]
                        th = moveBindings['l'][3]
                    elif voice_command[0] == "小车停":
                        print("语音控制小车停", voice_command[0])
                        x = moveBindings['k'][0]
                        y = moveBindings['k'][1]
                        z = moveBindings['k'][2]
                        th = moveBindings['k'][3]
                    voice_command[0] = None
                else:
                    x = 0.0
                    y = 0.0
                    z = 0.0
                    th = 0.0
                    if (key == '\x03'):
                        break
    
                if stamped:
                    twist_msg.header.stamp = node.get_clock().now().to_msg()
    
                twist.linear.x = x * speed
                twist.linear.y = y * speed
                twist.linear.z = z * speed
                twist.angular.x = 0.0
                twist.angular.y = 0.0
                twist.angular.z = th * turn
                pub.publish(twist_msg)
    
                # Print timestamp every second
                time.sleep(1)
                print("时间戳:", time.time())
    
        except Exception as e:
            print(e)
    
        finally:
            if stamped:
                twist_msg.header.stamp = node.get_clock().now().to_msg()
    
            twist.linear.x = 0.0
            twist.linear.y = 0.0
            twist.linear.z = 0.0
            twist.angular.x = 0.0
            twist.angular.y = 0.0
            twist.angular.z = 0.0
            pub.publish(twist_msg)
            rclpy.shutdown()
            spinner.join()
    
            restoreTerminalSettings(settings)
    
    
    if __name__ == '__main__':
        main()
    
    

    4. 编译运行

    我们一共需要运行3个终端

    • 第一个终端,编译运行机器人节点
    cd dev_ws_A
    colcon build --packages-select mbot_gazebo
    source install/setup.bash
    ros2 launch mbot_gazebo load_urdf_into_gazebo.launch.py
    

    运行结果如下
    在这里插入图片描述)

    • 第二个终端,运行键盘控制节点
    cd dev_ws_A
    colcon build --packages-select teleop_twist_keyboard
    source install/setup.bash
    ros2 run teleop_twist_keyboard teleop_twist_keyboard
    

    运行结果如下:
    在这里插入图片描述

    • 第三个终端
      运行voice.py文进行语音识别
      在这里插入图片描述

    然后就可以通过语音控制小车移动了
    在右侧终端按1进行语音识别,此时将识别到小车前进的命令并打印,在左侧终端按回车健获取result中的命令,将输出voice_command 小车前进,此时再按键ctrl+c,将输出语音控制小车前进 小车前进并且小车开始移动。
    目前的代码需要按键才能加载进来语音的命令并控制小车移动,但好在实现了功能,后续还会继续优化。
    在这里插入图片描述
    到此方法一结束。
    演示视频

    ros2语音控制机器人

    方法二、

    需购买设备:
    这个设备自带了ros1,ros2,仿真模拟,实物的各种ros功能包,可直接使用,但配置环境较为复杂,且语音识别非常不准,但用来学习代码是很不错的。
    设备:科大讯飞远场麦克风阵列语音板六麦语音交互模块声源定位ROS导航
    在这里插入图片描述
    演示视频:

    机器人语音交互之ros集成科大讯飞中文语音库,实现语音控制机

    使用这个设备只需要简单修改键盘控制节点teleop_twist_keyboard.py即可,其他位置不做修改,修改后的代码如下:

    import sys
    import threading
    from std_msgs.msg import String
    import geometry_msgs.msg
    import rclpy
    
    if sys.platform == 'win32':
        import msvcrt
    else:
        import termios
        import tty
    
    
    msg = """
    This node takes keypresses from the keyboard and publishes them
    as Twist/TwistStamped messages. It works best with a US keyboard layout.
    ---------------------------
    Moving around:
       u    i    o
       j    k    l
       m    ,    .
    
    For Holonomic mode (strafing), hold down the shift key:
    ---------------------------
       U    I    O
       J    K    L
       M    <    >
    
    t : up (+z)
    b : down (-z)
    
    anything else : stop
    
    q/z : increase/decrease max speeds by 10%
    w/x : increase/decrease only linear speed by 10%
    e/c : increase/decrease only angular speed by 10%
    
    CTRL-C to quit
    """
    
    moveBindings = {
        'i': (1, 0, 0, 0),
        'o': (1, 0, 0, -1),
        'j': (0, 0, 0, 1),
        'l': (0, 0, 0, -1),
        'u': (1, 0, 0, 1),
        ',': (-1, 0, 0, 0),
        '.': (-1, 0, 0, 1),
        'm': (-1, 0, 0, -1),
        'O': (1, -1, 0, 0),
        'I': (1, 0, 0, 0),
        'J': (0, 1, 0, 0),
        'L': (0, -1, 0, 0),
        'U': (1, 1, 0, 0),
        '<': (-1, 0, 0, 0),
        '>': (-1, -1, 0, 0),
        'M': (-1, 1, 0, 0),
        't': (0, 0, 1, 0),
        'b': (0, 0, -1, 0),
    }
    
    speedBindings = {
        'q': (1.1, 1.1),
        'z': (.9, .9),
        'w': (1.1, 1),
        'x': (.9, 1),
        'e': (1, 1.1),
        'c': (1, .9),
    }
    
    
    def getKey(settings):
        if sys.platform == 'win32':
            # getwch() returns a string on Windows
            key = msvcrt.getwch()
        else:
            tty.setraw(sys.stdin.fileno())
            # sys.stdin.read() returns a string on Linux
            key = sys.stdin.read(1)
            termios.tcsetattr(sys.stdin, termios.TCSADRAIN, settings)
        return key
    
    
    def saveTerminalSettings():
        if sys.platform == 'win32':
            return None
        return termios.tcgetattr(sys.stdin)
    
    
    def restoreTerminalSettings(old_settings):
        if sys.platform == 'win32':
            return
        termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings)
    
    
    def vels(speed, turn):
        return 'currently:\tspeed %s\tturn %s ' % (speed, turn)
    
    
    def main():
        settings = saveTerminalSettings()
    
        rclpy.init()
    
        node = rclpy.create_node('teleop_twist_keyboard')
    
        # parameters
        stamped = node.declare_parameter('stamped', False).value
        frame_id = node.declare_parameter('frame_id', '').value
        if not stamped and frame_id:
            raise Exception("'frame_id' can only be set when 'stamped' is True")
    
        if stamped:
            TwistMsg = geometry_msgs.msg.TwistStamped
        else:
            TwistMsg = geometry_msgs.msg.Twist
    
        pub = node.create_publisher(TwistMsg, 'cmd_vel', 10)
    
        voice_command = [None]
    
        def voice_words_callback(msg):
            voice_command[0] = msg.data
    
        voice_words_sub = node.create_subscription(
            String, 'voice_words', voice_words_callback, 10
        )
    
        spinner = threading.Thread(target=rclpy.spin, args=(node,))
        spinner.start()
    
        speed = 0.5
        turn = 1.0
        x = 0.0
        y = 0.0
        z = 0.0
        th = 0.0
        status = 0.0
    
        twist_msg = TwistMsg()
    
        if stamped:
            twist = twist_msg.twist
            twist_msg.header.stamp = node.get_clock().now().to_msg()
            twist_msg.header.frame_id = frame_id
        else:
            twist = twist_msg
    
        try:
            print(msg)
            print(vels(speed, turn))
            while True:
                key = getKey(settings)
                # print("键盘控制按键输出", key)
                if key in moveBindings.keys():
                    x = moveBindings[key][0]
                    y = moveBindings[key][1]
                    z = moveBindings[key][2]
                    th = moveBindings[key][3]
                elif key in speedBindings.keys():
                    speed = speed * speedBindings[key][0]
                    turn = turn * speedBindings[key][1]
    
                    print(vels(speed, turn))
                    if (status == 14):
                        print(msg)
                    status = (status + 1) % 15
                elif voice_command[0] is not None:
                    if voice_command[0] == "小车前进":
                        print("语音控制小车前进", voice_command)
                        x = moveBindings['i'][0]
                        y = moveBindings['i'][1]
                        z = moveBindings['i'][2]
                        th = moveBindings['i'][3]
                    elif voice_command[0] == "小车后退":
                        print("语音控制小车后退", key)
                        x = moveBindings[','][0]
                        y = moveBindings[','][1]
                        z = moveBindings[','][2]
                        th = moveBindings[','][3]
                    elif voice_command[0] == "小车左转":
                        print("语音控制小车左转", key)
                        x = moveBindings['j'][0]
                        y = moveBindings['j'][1]
                        z = moveBindings['j'][2]
                        th = moveBindings['j'][3]
                    elif voice_command[0] == "小车右转":
                        print("语音控制小车右转", key)
                        x = moveBindings['l'][0]
                        y = moveBindings['l'][1]
                        z = moveBindings['l'][2]
                        th = moveBindings['l'][3]
                    elif voice_command[0] == "小车停":
                        print("语音控制小车停", key)
                        x = moveBindings['k'][0]
                        y = moveBindings['k'][1]
                        z = moveBindings['k'][2]
                        th = moveBindings['k'][3]
                    voice_command[0] = None
                else:
                    x = 0.0
                    y = 0.0
                    z = 0.0
                    th = 0.0
                    if (key == '\x03'):
                        break
    
                if stamped:
                    twist_msg.header.stamp = node.get_clock().now().to_msg()
    
                twist.linear.x = x * speed
                twist.linear.y = y * speed
                twist.linear.z = z * speed
                twist.angular.x = 0.0
                twist.angular.y = 0.0
                twist.angular.z = th * turn
                pub.publish(twist_msg)
    
        except Exception as e:
            print(e)
    
        finally:
            if stamped:
                twist_msg.header.stamp = node.get_clock().now().to_msg()
    
            twist.linear.x = 0.0
            twist.linear.y = 0.0
            twist.linear.z = 0.0
            twist.angular.x = 0.0
            twist.angular.y = 0.0
            twist.angular.z = 0.0
            pub.publish(twist_msg)
            rclpy.shutdown()
            spinner.join()
    
            restoreTerminalSettings(settings)
    
    
    if __name__ == '__main__':
        main()
    
    

    这里将所有的资料链接附上,如有需要自行下载

    链接:https://pan.baidu.com/s/1T7Z0CNft4IR5lbk01UrlCQ?pwd=xkmu
    提取码:xkmu
    –来自百度网盘超级会员V6的分享

    上述代码编译可能会出现问题:解决方法在这里

  • 相关阅读:
    echarts入门图表可视化(基本介绍+示例代码)
    React中使用SWR处理数据请求
    静态代理和动态代理笔记
    文件共享(通过git实现,提供脚本)
    关于Blob链接的一些开发应用
    基于JAYA优化的BP神经网络(分类应用) - 附代码
    dubbo使用带有密码的redis注册中心完整配置及遇到问题解决、RestTemplate配置【持续更新】
    Git学习
    这3种人适合学习人工智能,看看你在不在其中?
    微信小程序本地开发
  • 原文地址:https://blog.csdn.net/weixin_45897172/article/details/138608079