• (二十七)admin-boot项目之集成websocket实时推送消息


    (二十七)集成websocket实时推送消息

    基础项目地址:

    https://gitee.com/springzb/admin-boot

    此次整合推送消息有两种方案:

    1. 采用 rabbitmq 详细插件 rabbitmq_web_stomp

    2.采用 websocket的方案(mica-mqtt-core底层采用 t-io 框架, mica-mqtt-core提供了 mqtt 和 websocket 两种通信协议)

    方案一 适用于连接少,消息多的情况

    方案二 适用于连接多,消息多的情况

    rabbitmq方案具体实施查看《(二十五)集成rabbitmq消息推送》

    引入依赖:

    
    <dependency>
        <groupId>org.springframework.bootgroupId>
        <artifactId>spring-boot-starter-amqpartifactId>
    dependency>
    
    
    <dependency>
        <groupId>net.dreamlugroupId>
        <artifactId>mica-mqtt-coreartifactId>
        <version>1.2.10version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    统一配置:

    package cn.mesmile.admin.modules.message;
    
    import lombok.Data;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    
    /**
     * @author zb
     * @Description
     */
    @Data
    @ConfigurationProperties(prefix = "admin.message")
    public class MessageProperties {
    
        /**
         * 是否启用消息发送
         */
        private Boolean enabled = Boolean.FALSE;
    
        /**
         * 发送消息底层调用逻辑
         */
        private MessageServiceTypeEnum serviceType = MessageServiceTypeEnum.RABBIT_MQ;
    
        /**
         * mqtt配置 服务端ip
         */
        private String mqttServerIp = "0.0.0.0";
    
        /**
         * mqtt服务端 端口号
         */
        private Integer mqttServerPort = 1883;
    
        /**
         * webSocket端口号
         */
        private Integer mqttServerWebsocketPort = 5883;
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    package cn.mesmile.admin.modules.message;
    
    /**
     * @author zb
     * @Description 发送消息调用的底层服务
     */
    public enum  MessageServiceTypeEnum {
    
        /**
         * rabbitMq
         */
        RABBIT_MQ,
        /**
         * rocketMq
         */
        ROCKET_MQ,
        /**
         * mqtt
         */
        MQTT
        
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    一、rabbitmq推送方案

    package cn.mesmile.admin.modules.message.operational;
    
    import cn.mesmile.admin.common.rabbit.constant.RabbitConstant;
    import cn.mesmile.admin.modules.message.vo.MqMessageVO;
    import com.alibaba.fastjson.JSONObject;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.Resource;
    import java.util.List;
    import java.util.Set;
    
    /**
     * rabbitmq方式实现接口
     * 生产者-直连交换机模式
     * 此模式下路由键匹配,同一账号多处在线都可接收到消息,未登录账号无法接收推送,且再登录账号不会有推送,路由消息丢失,适用于web端
     *
     * @author zb
     * @Description
     */
    @Slf4j
    @Component
    @ConditionalOnProperty(
      name = "admin.message.service-type",
      havingValue = "rabbit_mq"
    )
    public class RabbitMqSendServiceImpl implements ISendService{
    
      /**
         rabbitmq-plugins enable rabbitmq_web_stomp rabbitmq_web_stomp_examples
         直连交换机
      String MESSAGE_EXCHANGE_DIRECT = "message.direct.exchange";
       普通通知消息
       String MESSAGE_QUEUE_SINGLE_ROUTE_KEY = "message.single_";
       群发通知消息
      String MESSAGE_QUEUE_GROUP_ROUTE_KEY = "message.group_";
       */
    
        @Resource
        public RabbitTemplate rabbitTemplate;
    
      @Override
      public void sendSingleMessage(String receiveUserId, MqMessageVO mqMessageVO) {
        try {
          rabbitTemplate.convertAndSend(RabbitConstant.MESSAGE_EXCHANGE_DIRECT,
              RabbitConstant.MESSAGE_QUEUE_SINGLE_ROUTE_KEY + receiveUserId, JSONObject.toJSONString(mqMessageVO));
        }catch (Exception e){
          log.error("发送消息失败:",e);
        }
      }
    
      @Override
      public void sendGroupMessage(Set<String> userSet, MqMessageVO mqMessageVO) {
        try {
          userSet.forEach(
              userId-> rabbitTemplate.convertAndSend(RabbitConstant.MESSAGE_EXCHANGE_DIRECT,
              RabbitConstant.MESSAGE_QUEUE_SINGLE_ROUTE_KEY +userId, JSONObject.toJSONString(mqMessageVO))
          );
        }catch (Exception e){
          log.error("发送消息失败:",e);
        }
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65

    采用rabbitmq 前端需要安装插件 stomp 插件,对接rabbitmq 消息

    安装使用stomp插件请参考:https://blog.csdn.net/suprezheng/article/details/126613531 【插件】

    二、websocket方案(mica-mqtt-core)

    mica-mqtt-core

    https://gitee.com/596392912/mica-mqtt

    package cn.mesmile.admin.modules.message;
    
    import lombok.extern.slf4j.Slf4j;
    import net.dreamlu.iot.mqtt.codec.ByteBufferUtil;
    import net.dreamlu.iot.mqtt.core.server.MqttServer;
    import net.dreamlu.iot.mqtt.core.server.event.IMqttConnectStatusListener;
    import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
    import org.springframework.boot.context.properties.EnableConfigurationProperties;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.tio.core.ChannelContext;
    
    /**
     * @author zb
     * @Description
     */
    @Slf4j
    @Configuration(
            proxyBeanMethods = false
    )
    @EnableConfigurationProperties({MessageProperties.class})
    @ConditionalOnProperty(
            value = {"admin.message.enabled"},
            havingValue = "true"
    )
    public class MessageConfiguration {
    
        /**
         *  客户端工具下载: https://mqttx.app/
         *  mqtt://127.0.0.1:1883
         *  ws://127.0.0.1:5883
         *
         *  TODO 切换为 springboot 方式
         * https://gitee.com/596392912/mica-mqtt/blob/master/starter/mica-mqtt-server-spring-boot-starter/README.md
         */
        @ConditionalOnProperty(
                value = {"admin.message.service-type"},
                havingValue = "mqtt"
        )
        @Bean
        public MqttServer get(MessageProperties messageProperties) {
            // 注意:为了能接受更多链接(降低内存),请添加 jvm 参数 -Xss129k
            MqttServer mqttServer = MqttServer.create()
                    // 默认:0.0.0.0
                    .ip(messageProperties.getMqttServerIp())
                    // 默认:1883
                    .port(messageProperties.getMqttServerPort())
                    .webPort(messageProperties.getMqttServerWebsocketPort())
                    // 默认为: 8092(mqtt 默认最大消息大小),为了降低内存可以减小小此参数,如果消息过大 t-io 会尝试解析多次
                    .readBufferSize(512)
                    // 最大包体长度,如果包体过大需要设置此参数,默认为: 8092
                    .maxBytesInMessage(1024 * 10)
                    // 自定义认证
                    .authHandler((context, uniqueId, clientId, userName, password) -> true)
                    // 消息监听
                    .messageListener((context, clientId, message) -> {
                        log.info("clientId:{} message:{} payload:{}", clientId, message, ByteBufferUtil.toString(message.getPayload()));
                    })
                    .connectStatusListener(new IMqttConnectStatusListener() {
                        @Override
                        public void online(ChannelContext channelContext, String s) {
                            log.info("--------客户端【上线】----------{}, 客户端id:{}",channelContext, s);
                        }
    
                        @Override
                        public void offline(ChannelContext channelContext, String s) {
                            log.info("--------客户端【下线】----------{}, 客户端id:{}",channelContext, s);
                        }
                    })
                    .httpEnable(true)
                    .debug() // 开启 debug 信息日志
                    .start();
            return mqttServer;
        }
    
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    package cn.mesmile.admin.modules.message.operational;
    
    import cn.mesmile.admin.modules.message.vo.MqMessageVO;
    import com.alibaba.fastjson.JSONObject;
    import lombok.extern.slf4j.Slf4j;
    import net.dreamlu.iot.mqtt.core.server.MqttServer;
    import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.Resource;
    import java.nio.ByteBuffer;
    import java.nio.charset.StandardCharsets;
    import java.util.Set;
    
    /**
     * @author zb
     * @Description
     */
    @Slf4j
    @ConditionalOnProperty(
        value = {"admin.message.service-type"},
        havingValue = "mqtt"
    )
    @Component
    public class MqttSendServiceImpl implements  ISendService{
    
      @Resource
      private MqttServer mqttServer;
    
      @Override
      public void sendSingleMessage(String receiveUserId, MqMessageVO mqMessageVO) {
        boolean b = mqttServer.publishAll("/userId_" + receiveUserId,
            ByteBuffer.wrap(JSONObject.toJSONString(mqMessageVO).getBytes(StandardCharsets.UTF_8)));
      }
    
      @Override
      public void sendGroupMessage(Set<String> userSet, MqMessageVO mqMessageVO) {
        userSet.forEach(
            userId-> this.sendSingleMessage(userId, mqMessageVO)
        );
      }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    三、mica-mqtt方案测试

      @Resource
        private ISendService sendService;
    
      /**
       *  客户端工具:https://mqttx.app/
       */
      @GetMapping("/message")
      public R message(){
          MqMessageVO mqMessageVO = new MqMessageVO();
          mqMessageVO.setReceiveUser("123");
          mqMessageVO.setContent("设置内容11111111111");
          sendService.sendSingleMessage("123",mqMessageVO);
          return R.data("test");
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    客户端工具下载: https://mqttx.app/

    mqtt://127.0.0.1:1883
    ws://127.0.0.1:5883

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-QymU7gGV-1663506368083)(image/image_FMKWQ55TCS.png)]

  • 相关阅读:
    网络安全好学吗?要学哪些技术?
    激光雷达+4D环绕成像雷达,新一轮「感知」军备竞赛开打
    RabbitMQ linux安装
    农村电力供应,这件事必须重视!
    方舟开服务器Vmware虚拟机安装不上?
    【数据结构】手撕单链表
    cx3588 Rockchip_基于 DRM 框架的 HDMI 开发指南
    C/C++常见的易模糊知识点(结构体、指针、STL)
    【贪吃蛇小游戏】宝塔面板快速搭建贪吃蛇小游戏Web网站 - 无需云服务器
    xsslabs第七关
  • 原文地址:https://blog.csdn.net/suprezheng/article/details/126923155