• MQTT第一话 -- Docker安装emqx以及Springboot集成emqx


    本文主要记录mqtt消息件emqx的安装及使用

    1.docker安装emqx

    基于liunx centos7,docker-compose,emqx:4.4.4。

    1.1 yaml文件

    version: '3.7'
    services:
      emqx01:
        image: emqx:4.4.4
        container_name: emqx01
        ports:
        - "1893:1883" #tcp连接
        - "18083:18083" #控制台
        - "8083:8083" #控制台工具websocket ws用
        - "8084:8084" #控制台工具websocket wss用
        environment:
        - TZ=Asia/Shanghai
        networks:
        - my-net
    networks:
      #新增的网络 内部服务名调用
      my-net:
        external: true
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    1.2 启动日志

    [root@m emqx]# docker-compose ps -a
     Name               Command               State                                                                                 Ports                                                                              
    -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
    emqx01   /usr/bin/docker-entrypoint ...   Up      11883/tcp, 0.0.0.0:18093->18083/tcp, 0.0.0.0:1893->1883/tcp, 4369/tcp, 4370/tcp, 5369/tcp, 6369/tcp, 6370/tcp, 8081/tcp, 0.0.0.0:8093->8083/tcp,                 
                                                      0.0.0.0:8094->8084/tcp, 8883/tcp      
    
    [root@m emqx]# docker-compose logs -f
    Attaching to emqx01
    emqx01    | Starting emqx on node e15d8dee531f@172.19.0.2
    emqx01    | Start mqtt:tcp:internal listener on 127.0.0.1:11883 successfully.
    emqx01    | Start mqtt:tcp:external listener on 0.0.0.0:1883 successfully.
    emqx01    | Start mqtt:ws:external listener on 0.0.0.0:8083 successfully.
    emqx01    | Start mqtt:ssl:external listener on 0.0.0.0:8883 successfully.
    emqx01    | Start mqtt:wss:external listener on 0.0.0.0:8084 successfully.
    emqx01    | Start http:management listener on 8081 successfully.
    emqx01    | 2022-06-27T18:40:03.425652+08:00 [warning] [Dashboard] Using default password for dashboard 'admin' user. Please use './bin/emqx_ctl admins' command to change it. NOTE: the default password in config file is only used to initialise the database record, changing the config file after database is initialised has no effect.
    emqx01    | Start http:dashboard listener on 18083 successfully.
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    1.3 控制台界面

    从日志可以看出,dashboard listener on 18083,admin/public
    在这里插入图片描述

    2.springboot集成emqx

    基于springboot2.5.6

    2.1 mqtt依赖如下

    <dependency>
    	<groupId>org.springframework.integration</groupId>
    	<artifactId>spring-integration-mqtt</artifactId>
    	<version>5.5.13</version>
    </dependency>
    <dependency>
    	<groupId>org.springframework.integration</groupId>
    	<artifactId>spring-integration-stream</artifactId>
    	<version>5.5.13</version>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    2.2 yaml配置文件

    mqtt:
      test:
        url: tcp://192.168.0.221:1883
        clientId: testmqtt #连接的一个标识
        topic: mqtt/test
        userName: admin
        passWord: public
        timeout: 5000
        keepAlive: 10
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    2.3 启动注入

    @Slf4j
    @ConfigurationProperties("mqtt.test")
    @Component
    @Data
    public class MqttConfig {
    
        String url;
        String clientId;
        String topic;
        String userName;
        String passWord;
        Integer timeout;
        Integer keepAlive;
    
    
        @Bean
        public MqttClient initClient() {
            try {
                MqttClient client = new MqttClient(url, clientId);
                MqttConnectOptions options = new MqttConnectOptions();
                options.setUserName(userName);
                options.setPassword(passWord.toCharArray());
                options.setCleanSession(true);
                options.setConnectionTimeout(timeout);
                options.setKeepAliveInterval(keepAlive);
                client.setCallback(new MqttConsume());
                IMqttToken iMqttToken = client.connectWithResult(options);
                boolean complete = iMqttToken.isComplete();
                log.info("mqtt建立连接:{}", complete);
    
    			//我这里就直接订阅了 消息消费者MqttConsume
                client.subscribe(topic, 0);
                log.info("已订阅topic:{}", topic);
    
                return client;
            } catch (Exception e) {
                e.printStackTrace();
                throw new RuntimeException("mqtt 连接异常");
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    2.4 消费者

    @Slf4j
    public class MqttConsume implements MqttCallback {
    
    
    	//连接丢失
        @Override
        public void connectionLost(Throwable throwable) {
    
        }
    
        @Override
        public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
            log.info("收到消息:topic:{},Qos:{},msg:{}",
                    topic, mqttMessage.getQos(), new String(mqttMessage.getPayload()));
        }
    
    	//传输完成
        @Override
        public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    2.5 生产者

    @Autowired
    MqttClient client;
    @Autowired
    MqttConfig mqttConfig;
    
    
    @GetMapping("/send")
    public String send() {
        try {
            MqttMessage message = new MqttMessage();
            message.setQos(0);
            message.setRetained(false);
            for (int i = 0; i < 10; i++) {
                message.setPayload(("1241234&" + i).getBytes());
                client.publish(mqttConfig.getTopic(), message);
                log.info("发送成功:{}", i);
                Thread.sleep(1000);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return "SUCCESS";
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    2.6 发送日志

    com.example.mqtt.config.MqttConfig       : mqtt连接:true
    com.example.mqtt.config.MqttConfig       : 已订阅topic:mqtt/test
    
    c.e.mqtt.controller.MqttController       : 发送成功:0
    com.example.mqtt.consume.MqttConsume     : 收到消息:topic:mqtt/test,Qos:0,msg:1241234&0
    c.e.mqtt.controller.MqttController       : 发送成功:1
    com.example.mqtt.consume.MqttConsume     : 收到消息:topic:mqtt/test,Qos:0,msg:1241234&1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    3. Qos分析

    QoS0,发送就不管了,最多一次;
    QoS1,发送之后依赖MQTT规范,是否启动重传消息,所以至少一次;
    QoS2,发送之后依赖MQTT消息机制,确保只有一次。

    以上就是本章的全部内容了。

    上一篇:RocketMQ第三话 – RocketMQ高可用集群搭建
    下一篇:MQTT第二话 – emqx高可用集群实现

    人生天地之间,若白驹过隙,忽然而已

  • 相关阅读:
    .NET中如何实现高精度定时器
    近200篇文章汇总而成的机器翻译非自回归生成最新综述,揭示其挑战和未来研究方向...
    android 7.1 mipi 屏 唤醒白屏
    杀毒软件如何识别病毒43.240.74.X
    dp好题集锦
    Unity_建造系统及保存加载
    双目视觉(双目相机)
    C#(四十)之stringBuilder类
    Redis 教程 - 主从复制
    大数据NoSQL数据库HBase集群部署
  • 原文地址:https://blog.csdn.net/qq_35551875/article/details/125428932