• springboot+redis+sse+vue实现分布式消息发布/通知


    一、需求说明

    需求是实现web端的小红点通知,因为后端是两台机子做负载,所以需要实现分布式消息订阅发布

    这里没有用消息中间件(rabbitmq…)和websoket,因为相对项目来说,这俩个比较重,所以用了相对较轻的redis和sse,都是项目自带的

    二、架构选择

    1. redis(分布式发布订阅)
    2. sse (SseEmitter)

    三、代码实现

    1. sse集成

    sse服务类代码

    这里会话的key值存储可以不用这么复杂,我当时想着连接成功后可以直接将返回的sseEmitter扔到redis里去实现分布式,但是不行,序列化后取出来是发不了消息的,原因可能是存到redis里就相当于直接把连接扔了,哈哈

    package com.smartvillage.framework.sse.serve;
    
    import cn.hutool.core.collection.CollectionUtil;
    import com.smartvillage.common.core.redis.RedisCache;
    import com.smartvillage.common.utils.spring.SpringUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.http.MediaType;
    import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
    
    import java.io.IOException;
    import java.util.Collection;
    import java.util.List;
    import java.util.Map;
    import java.util.Set;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.function.Consumer;
    import java.util.stream.Collectors;
    
    /**
     * @author wangyj
     * @className SseEmitterServer
     * @description 消息推送服务类
     * @date 22/11/9
     */
    
    public class SseEmitterServer {
    
        private static final Logger log = LoggerFactory.getLogger(SseEmitterServer.class);
    
        private static final String KEY_PREFIX = "SseEmitter_";
        private static final String ONLINE_SESSION_COUNT = "OnlineSessionCount";
    
        /**
         * 当前连接数
         */
        // private static AtomicInteger count = new AtomicInteger(0);
    
        /**
         * 使用map对象,便于根据userId来获取对应的SseEmitter,或者放redis里面
         */
        private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();
    
        /**
         * 创建用户连接并返回 SseEmitter
         *
         * @param sessionId 用户ID
         * @return SseEmitter
         */
        public static SseEmitter connect(String sessionId) {
            // 设置超时时间,0表示不过期。默认30秒,超过时间未完成会抛出异常:AsyncRequestTimeoutException
            SseEmitter sseEmitter = new SseEmitter(0L);
            // 注册回调
            sseEmitter.onCompletion(completionCallBack(sessionId));
            sseEmitter.onError(errorCallBack(sessionId));
            sseEmitter.onTimeout(timeoutCallBack(sessionId));
            // SpringUtils.getBean(RedisCache.class).setCacheObject(getCacheKey(sessionId), sseEmitter);
    
            // 数量+1
            SpringUtils.getBean(RedisCache.class).incr(ONLINE_SESSION_COUNT,1);
            sseEmitterMap.put(getCacheKey(sessionId),sseEmitter);
    
            log.info("创建新的sse连接,当前会话:{}", sessionId);
            return sseEmitter;
        }
    
        /**
         * 给指定用户发送信息  -- 单播
         */
        public static void sendMsg(String userId, String message) {
            sendMessage(getCacheKey(userId),message);
        }
    
        /**
         * 给指定用户发送信息
         */
        public static void sendMessage(String cacheKey, String message) {
            if (sseEmitterMap.containsKey(cacheKey)) {
            // if (SpringUtils.getBean(RedisCache.class).hasKey(cacheKey)) {
                try {
                    // SseEmitter sseEmitter = SpringUtils.getBean(RedisCache.class).getCacheObject(cacheKey);
                    SseEmitter sseEmitter = sseEmitterMap.get(cacheKey);
    
                    sseEmitter.send(message,MediaType.APPLICATION_JSON);
                    log.info("用户[{}]推送成功:{}", cacheKey, message);
    
                } catch (IOException e) {
                    log.error("用户[{}]推送异常:{}", cacheKey, e.getMessage());
                    removeUser(cacheKey);
                }
            }
        }
    
        /**
         * 向多人发布消息   -- 组播
         *
         * @param groupId 开头标识
         * @param message 消息内容
         */
        public static void groupSendMessage(String groupId, String message) {
            // Set keys = SpringUtils.getBean(RedisCache.class).keys(KEY_PREFIX + groupId + "*");
            Set<String> keys = sseEmitterMap.keySet().stream().filter(k -> k.startsWith(KEY_PREFIX + groupId)).collect(Collectors.toSet());
            if(CollectionUtil.isNotEmpty(keys)){
                batchSendMessage(message,keys);
            }
        }
    
        /**
         * 群发所有人   -- 广播
         */
        public static void batchSendMessage(String message) {
            // Set keys = SpringUtils.getBean(RedisCache.class).keys(KEY_PREFIX + "*");
            Set<String> keys = sseEmitterMap.keySet();
            if(CollectionUtil.isNotEmpty(keys)){
                batchSendMessage(message,keys);
            }
        }
    
        /**
         * 群发消息
         */
        public static void batchSendMessage(String message, Set<String> keys) {
            keys.forEach(key -> sendMessage(key, message));
        }
    
        /**
         * 移除用户连接
         */
        public static void removeUser(String cacheKey) {
            // SpringUtils.getBean(RedisCache.class).deleteObject(cacheKey);
            sseEmitterMap.remove(cacheKey);
            // 数量-1
            SpringUtils.getBean(RedisCache.class).decr(ONLINE_SESSION_COUNT,1);
    
            log.info("移除用户:{}", cacheKey);
        }
    
        /**
         * 获取当前连接信息
         */
        public static List<String> getIds() {
            Collection<String> keys = SpringUtils.getBean(RedisCache.class).keys(KEY_PREFIX);
            return keys.stream().map(k -> k.replace(KEY_PREFIX, "")).collect(Collectors.toList());
        }
    
        /**
         * 获取当前连接数量
         */
        public static int getUserCount() {
            return SpringUtils.getBean(RedisCache.class).getCacheObject(ONLINE_SESSION_COUNT);
    
        }
    
        private static Runnable completionCallBack(String userId) {
            return () -> {
                log.info("结束连接:{}", userId);
                removeUser(getCacheKey(userId));
            };
        }
    
        private static Runnable timeoutCallBack(String userId) {
            return () -> {
                log.info("连接超时:{}", userId);
                removeUser(getCacheKey(userId));
            };
        }
    
        private static Consumer<Throwable> errorCallBack(String userId) {
            return throwable -> {
                log.info("连接异常:{}", userId);
                removeUser(getCacheKey(userId));
            };
        }
    
        /**
         * 设置cache key
         *
         * @param configKey 参数键
         * @return 缓存键key
         */
        public static String getCacheKey(String configKey){
            return KEY_PREFIX + configKey;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184

    客户端链接控制器

    
    /**
     * @author wangyj
     * @className AiWarningSseController
     * @description 警告消息订阅
     * @date 22/11/10
     */
    @RestController
    @RequestMapping("/test")
    public class SseController {
    
        @Autowired
        RedisCache redisCache;
    
    	/**
         * 客户端链接
         * @return
         */
        @GetMapping("/connect")
        public SseEmitter connect() {
            return SseEmitterServer.connect("test-key");
        }
    
    	/**
         * 消息推送
         * @return
         */
        @PostMapping("/post")
        public AjaxResult postMessage(String msg) {
            // ... 业务逻辑
            
            // 推送消息
            SseEmitterServer.sendMsg("test-key", msg))
    
            return AjaxResult.success("推送成功");
    
        }
    	
    	/**
         * 链接关闭
         * @return
         */
        @GetMapping("/close")
        public AjaxResult close() {
            SseEmitterServer.removeUser("test-key");
            return AjaxResult.success();
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49

    前端实现(vue)

    这里使用了组件:vue-sse(自行安装哈)

    方法调用

    mounted() {
        // 组件挂载时订阅
        this.subscribeWarnMsg();
      },
      beforeDestroy() {
        // 组件销毁时记得关链接释放资源
        this.closeWarningMessage();
      },
    
    methods: {
        //...
        // 消息订阅
        subscribeWarnMsg() {
          this.$sse
            .create({
              // format: "json", // 注掉就能接受消息
              polyfill: true,
              forcePolyfill: true,
              url: process.env.VUE_APP_BASE_API + "/test/connect",
              withCredentials: true,
              polyfillOptions: {
                // 超时时间,调长点,要不频繁重连
                heartbeatTimeout: 10 * 60 * 1000,
                // 携带认证token
                headers: {
                  Authorization: 'Bearer ' + getToken(),
                },
              },
            })
            .on("message", (msg) => {
    			console.log(msg)
            
            })
            .on("error", (err) =>
              console.error("Failed to parse or lost connection:", err)
            )
            .connect()
            .catch((err) => console.error("Failed make initial connection:", err));
    
        },
        // 关闭订阅
        closeMessage() {
    	  return request({
    	    url: '/test/close',
    	    method: 'get',
    	   }
    	)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    至此sse封装完成!单节点的项目就可以正常用了~

    2. redis实现订阅/发布

    监听类

    /**
     * @author wangyj
     * @className TestListener
     * @description redis listener
     * @date 22/11/17
     */
    @Component
    public class TestListener{
    
        private static final Logger log = LoggerFactory.getLogger(TestListener.class);
        
        public void onMessage(String msg) {
            log.info(msg);
    
            JSONObject parseObject = JSON.parseObject(msg);
            Long deptId = parseObject.getLong("deptId");
            // 组播
            SseEmitterServer.groupSendMessage("deptId:" + deptId, msg);
    
            // 单播
            SseEmitterServer.sendMsg("test-key", msg));
    
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    redisConfig配置

    /**
     * redis配置
     */
    @Configuration
    @EnableCaching
    public class RedisConfig extends CachingConfigurerSupport{
    
        // ... 其他序列化等配置
        
        @Bean
        // 这里要注入我们刚才写的监听者类
        public MessageListenerAdapter TestListenerAdapter(TestListener receiver) {
            // 这个"onMessage"要和监听者类里的方法名对应,因为是反射注入的,默认是"handleMessage"?可以看下源码
            return new MessageListenerAdapter(receiver,"onMessage");
        }
    
    	/*@Bean
        public MessageListenerAdapter listenerAdapter1(TestListener1 receiver) {
            return new MessageListenerAdapter(receiver,"onMessage");
        }
    	@Bean
        public MessageListenerAdapter listenerAdapter2(TestListener2 receiver) {
            return new MessageListenerAdapter(receiver,"onMessage");
        }*/
    
        /**
         * redis消息监听器容器
         * 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器
         * 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理
      	 */
        @Bean
        public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
        // 这个玩意可以后面跟多个哈,名字匹配自动注入的,MessageListenerAdapter aiWarningListenerAdapter,MessageListenerAdapter listenerAdapter1,MessageListenerAdapter listenerAdapter2,当然,要有对应名字的bean,看上面注释掉的代码
    	MessageListenerAdapter testListenerAdapter) {
    	
            RedisMessageListenerContainer container = new RedisMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
            //订阅了一个叫chat的通道
            // container.addMessageListener(listenerAdapter1, new PatternTopic("chat"));
            container.addMessageListener(aiWarningListenerAdapter, new PatternTopic(RedisChannel.AI_WARNING));
            return container;
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    消息发送

    redisCache.convertAndSend(RedisChannel.AI_WARNING, warningLog);
    
    • 1

    结合上文SseController 里面消息推送代码:

    public class SseController {
    
        @Autowired
        RedisCache redisCache;
    
    	/**
         * 客户端链接
         * @return
         */
        @GetMapping("/connect")
        public SseEmitter connect() {
            return SseEmitterServer.connect("test-key");
        }
    
    	/**
         * 消息推送
         * @return
         */
        @PostMapping("/post")
        public AjaxResult postMessage(String msg) {
            // ... 业务逻辑
            
            // 推送消息
            //SseEmitterServer.sendMsg("test-key", msg));
    		// 先推到redis
    		redisCache.convertAndSend(RedisChannel.AI_WARNING, warningLog);
    
            return AjaxResult.success("推送成功");
    
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    redisCache

    @Component
    public class RedisCache
    {
        @Autowired
        public RedisTemplate redisTemplate;
    
    	// ...其他方法
    	
        /**
         * 消息推送
         * @param channel
         * @param message
         */
        public void convertAndSend(String channel,Object message){
            redisTemplate.convertAndSend(channel,message);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    完活~

    还有一种监听者配置方法,参考:

    @Component
    public class TestListener implements MessageListener{
    
        private static final Logger log = LoggerFactory.getLogger(TestListener.class);
        
        @Override
        public void onMessage(Message message, byte[] pattern) {
            // 订阅的频道名称
            String channel = new String(message.getChannel());
            // 消息体
            String msg = new String(message.getBody());
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    redisConfig

    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory redisConnectionFactory, TestListener testListener ) {
    	RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    	container.setConnectionFactory(redisConnectionFactory);
    	//订阅topic - subscribe
    	container.addMessageListener(testListener ,new ChannelTopic("testChannel"));
    	return container;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    注意:

    1. 多个实例在消费时,要注意消费时加锁,避免重复消费的情况
    2. nginx超时时长
    3. nginx iphash
    4. nginx配置
    client_max_body_size 300m;     #设置nginx能处理的最大请求主体大小。
    client_body_buffer_size 128k;  #请求主体的缓冲区大小。 
    proxy_connect_timeout 600;
    proxy_read_timeout 600;
    proxy_send_timeout 600;
    proxy_buffer_size 64k;
    proxy_buffers   4 32k;
    proxy_busy_buffers_size 64k;
    proxy_temp_file_write_size 64k;
    
    location /apis {
    	rewrite ^.+apis/?(.*)$ /$1 break;
    	include uwsgi_params;
    	proxy_pass http://192.168.5.127:8088/;
    	# 关键参数
    	proxy_buffering off;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    注意:

    1. 要配置代理超时时间
    2. 不配置proxy_buffering off的话,会出现请求发出后,接口收到直接返回,无法保持长连接。
      参考网上说明:proxy_buffering这个参数用来控制是否打开后端响应内容的缓冲区,如果这个设置为off,那么proxy_buffers和proxy_busy_buffers_size这两个指令将会失效

    如有问题请不吝指正~

  • 相关阅读:
    LVM逻辑卷
    MacOS - ToDesk 无法远程操控鼠标键盘解决方案
    网络安全攻防:软件逆向之反汇编
    Java集合——Set接口
    JavaScript笔记(本文中将JavaScript简写为JS)
    关于数据权限的设计
    配置和优化您的企业内容管理(ECM)解决方案
    linux 的文件权限案列
    【Proteus仿真】【51单片机】汽车尾灯控制设计
    亚马逊、temu流量暴涨,单量却不动?自养号测评的优势和弊端详解
  • 原文地址:https://blog.csdn.net/Leisurelyc/article/details/127907419