• RabbitMQ开启MQTT协议支持


    1)RabbitMQ启用MQTT插件

    root@mq:/# rabbitmq-plugins enable rabbitmq_mqtt
    Enabling plugins on node rabbit@mq:
    rabbitmq_mqtt
    The following plugins have been configured:
      rabbitmq_management
      rabbitmq_management_agent
      rabbitmq_mqtt
      rabbitmq_web_dispatch
    Applying plugin configuration to rabbit@mq...
    The following plugins have been enabled:
      rabbitmq_mqtt
    
    started 1 plugins.
    root@mq:/# rabbitmq-plugins enable rabbitmq_web_mqtt
    Enabling plugins on node rabbit@mq:
    rabbitmq_web_mqtt
    The following plugins have been configured:
      rabbitmq_management
      rabbitmq_management_agent
      rabbitmq_mqtt
      rabbitmq_web_dispatch
      rabbitmq_web_mqtt
    Applying plugin configuration to rabbit@mq...
    The following plugins have been enabled:
      rabbitmq_web_mqtt
    
    started 1 plugins.
    root@mq:/# 
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    2)RabbitMQ管理控制台查看
    如果插件启动成功,rabbitmq会打开1883和15675端口:
    在这里插入图片描述

    3)用MQTTX工具测试
    在这里插入图片描述
    4)用eclipse paho客户端测试
    添加依赖

     <dependency>
        <groupId>org.eclipse.pahogroupId>
        <artifactId>org.eclipse.paho.client.mqttv3artifactId>
        <version>1.2.5version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    收发消息测试

    @RestController
    public class DemoController {
    
        @GetMapping("/publish")
        public String publish() throws MqttException {
    
            MqttClientPersistence persistence = new MemoryPersistence();;//内存持久化
            MqttClient client = new MqttClient("tcp://192.168.137.138:1883", "abc", persistence);
    
            //连接选项中定义用户名密码和其它配置
            MqttConnectOptions options = new MqttConnectOptions();
            options.setCleanSession(true);//参数为true表示清除缓存,也就是非持久化订阅者,这个时候只要参数设为true,一定是非持久化订阅者。而参数设为false时,表示服务器保留客户端的连接记录
            options.setAutomaticReconnect(true);//是否自动重连
            options.setConnectionTimeout(30);//连接超时时间  秒
            options.setKeepAliveInterval(10);//连接保持检查周期  秒
            options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1); //版本
            options.setUserName("xjs1919");
            options.setPassword("123321".toCharArray());
            // client.setManualAcks(true);
            client.connect(options);//连接
            //订阅topic
            client.subscribe("demoTopic", 2);
            // 设置回调,将来收到消息的时候会被回调
            client.setCallback(new MqttCallbackExtended() {
                @Override
                public void connectComplete(boolean reconnect, String serverURI) {
                    System.out.println("连接完成");
                }
    
                @Override
                public void connectionLost(Throwable cause) {
                    System.out.println("连接丢失");
                }
    
                @Override
                public void messageArrived(String topic, MqttMessage message) throws Exception {
                    System.out.println("收到消息,topic:"+topic + ", msg:" + new String(message.getPayload()));
                    //client.messageArrivedComplete(message.getId(),message.getQos());
                }
    
                @Override
                public void deliveryComplete(IMqttDeliveryToken token) {
                    // Qos0: 当消息发送出去就回调
                    // Qos1: 当发送者收到了puback的时候的回调
                    // Qos2: 当发送者收到了pubcomp的时候的回调
                    System.out.println("消息发送完成");
                }
            });
    
            client.publish("demoTopic", "hello,这是一个测试消息!".getBytes(), 2, false);
    
            return "ok";
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55

    参考文章:
    https://www.cnblogs.com/motion/p/14974024.html
    https://blog.csdn.net/u013615903/article/details/131395264

  • 相关阅读:
    第3周学习:ResNet+ResNeXt
    Python技术栈性能测试工具Locust入门
    中文乱码问题解决
    SpringBoot--配置Redisson的方法
    Spring-底层架构核心概念
    参加了个算法比赛,真是一言难尽啊
    查看吾托帮88.47的docker里的tomcat日志
    springboot毕设项目超市通管理系统dwsgt(java+VUE+Mybatis+Maven+Mysql)
    【程序员面试金典】01.02. 判定是否互为字符重排
    计算机组成原理-第三章 存储系统【期末复习|考研复习】
  • 原文地址:https://blog.csdn.net/goldenfish1919/article/details/136238247