• springboot整合SSE


    SSE简介

    SSE(Server Sent Event),是一种可以主动从服务端推送消息的技术。SSE的本质其实就是一个HTTP的长连接,只不过它给客户端发送的不是一次性的数据包,而是一个stream流,格式为text/event-stream。所以客户端不会关闭连接,会一直等着服务器发过来的新的数据流。

    SSE服务端代码

    springboot中封装了sse代码,不需要额外的依赖

            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starterartifactId>
            dependency>
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starter-webartifactId>
            dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    public class SseEmitterServer {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(SseEmitterServer.class);
    
        /**
         * 当前连接数
         */
        private static AtomicInteger count = new AtomicInteger(0);
    
        private static Map<Integer, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();
    
        public static SseEmitter connect(Integer userId){
            // 设置超时日期,0表示不过期
            SseEmitter sseEmitter = new SseEmitter(0L);
    
            // 注册回调
            sseEmitter.onCompletion(completionCallBack(userId));
            sseEmitter.onError(errorCallBack(userId));
            sseEmitter.onTimeout(timeoutCallBack(userId));
            sseEmitterMap.put(userId,sseEmitter);
            count.getAndIncrement();
            LOGGER.info("创建新SSE连接,连接用户编号:{}",userId);
            LOGGER.info("现有连接用户:"+sseEmitterMap.keySet());
            return sseEmitter;
        }
    
        /**
         * 给指定用户发信息
         */
        public static void sendMessage(Integer userId,String message){
            if (!sseEmitterMap.containsKey(userId)) {
                connect(userId);
            }
            try {
                sseEmitterMap.get(userId).send(message);
                LOGGER.info("给" + userId + "号发送消息:" + message);
            } catch (IOException e) {
                LOGGER.error("userId:{},发送信息出错:{}", userId, e.getMessage());
                e.printStackTrace();
            }
        }
    
        /**
         * 群发消息
         */
        public static void batchSendMessage(String message){
            if (sseEmitterMap != null&&!sseEmitterMap.isEmpty()) {
                sseEmitterMap.forEach((k,v)->{
                    try {
                        v.send(message, MediaType.APPLICATION_JSON);
                    } catch (IOException e) {
                        LOGGER.error("userId:{},发送信息出错:{}",k,e.getMessage());
                        e.printStackTrace();
                    }
                });
            }
        }
    
        public static void batchSendMessage(Set<Integer> userIds,String message){
            userIds.forEach(userId->sendMessage(userId,message));
        }
    
        /**
         * 移出用户
         */
        public static void removeUser(Integer userId){
            sseEmitterMap.remove(userId);
            count.getAndDecrement();
            LOGGER.info("remove user id:{}",userId);
            LOGGER.info("remain user id:"+sseEmitterMap.keySet());
        }
    
        public static List<Integer> getIds(){
            return new ArrayList<>(sseEmitterMap.keySet());
        }
    
        public static int getUserCount(){
            return count.intValue();
        }
    
        private static Runnable completionCallBack(Integer userId){
            return ()->{
              LOGGER.info("结束连接,{}",userId);
              removeUser(userId);
            };
        }
    
        private static Runnable timeoutCallBack(Integer userId){
            return ()->{
                LOGGER.info("连接超时,{}",userId);
                removeUser(userId);
            };
        }
    
        private static Consumer<Throwable> errorCallBack(Integer userId){
            return throwable -> {
                LOGGER.error("连接异常,{}",userId);
                removeUser(userId);
            };
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    @RestController
    @CrossOrigin(maxAge = 3600)
    public class SseController {
    
        @RequestMapping(value = "/sse/connect/{id}",method = RequestMethod.GET)
        public SseEmitter connect(@PathVariable Integer id){
            SseEmitter sseEmitter = SseEmitterServer.connect(id);
            return sseEmitter;
        }
    
        /**
         * 向指定用户发送消息
         */
        @RequestMapping(value = "/sse/send/{id}", method = RequestMethod.GET)
        public EiInfo sendMsg(@PathVariable Integer id,@RequestParam("message") String message) {
            EiInfo eiInfo = new EiInfo();
            SseEmitterServer.sendMessage(id,message);
            eiInfo.sysSetMsg("向"+id+"号用户发送信息,"+message+",消息发送成功");
            return eiInfo;
        }
    
        /**
         * 向所有用户发送消息
         */
        @RequestMapping(value = "/sse/send/all", method = RequestMethod.GET)
        public EiInfo sendMsg2AllUser(@RequestParam("message") String message) {
            EiInfo eiInfo = new EiInfo();
            SseEmitterServer.batchSendMessage(message);
            eiInfo.sysSetMsg("向所有用户发送信息,"+message+",消息发送成功");
            return eiInfo;
        }
    
        /**
         * 关闭用户连接
         */
        @RequestMapping(value = "/sse/close/{id}", method = RequestMethod.GET)
        public EiInfo closeSse(@PathVariable Integer id) {
            EiInfo eiInfo = new EiInfo();
            SseEmitterServer.removeUser(id);
            eiInfo.sysSetMsg("关闭"+id+"号连接。当前连接用户有:"+SseEmitterServer.getIds());
            return eiInfo;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    SSE测试

    连接服务端

    创建三个用户,分别连接服务端
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

    给指定用户发送消息

    使用接口测试工具,只给1号用户发送消息
    在这里插入图片描述

    浏览器中,1号用户接收到消息,2号3号未接收到消息
    在这里插入图片描述
    在这里插入图片描述

    群发消息

    在这里插入图片描述

    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

    关闭连接

    在这里插入图片描述

    SSE连接超时

    当SSE客户端连接后,如果长时间不断开,它会保持连接状态。SSE(Server-Sent Events)是一种基于HTTP的推送技术,允许服务器实时向客户端发送数据。当客户端连接到服务器的SSE端点时,它会创建一个EventSource对象,该对象将与服务器进行长期连接,并接收服务器发送的事件。只有在客户端手动关闭连接或连接发生错误时,才会断开SSE连接。

    请注意,由于SSE是基于HTTP的技术,因此它可能受到浏览器或服务器的超时设置的影响。如果在长时间没有收到服务器的数据时,连接可能会断开。如果您希望在长时间不断开连接的情况下保持SSE连接,请确保服务器发送数据以保持连接活跃,或者调整相关超时设置。

    在使用SSE的客户端连接中,如果长时间不断开连接,可能会出现以下情况:

    1. 连接超时。如果服务器端没有发送新的事件数据,而客户端也没有重新建立连接,可能会超过服务器的连接超时时间。这可能导致服务器关闭连接,客户端需要重新建立连接才能接收新的事件数据。
    2. 网络异常。长时间不断开连接可能会导致网络异常,例如连接中断、丢包等问题。在这种情况下,客户端可能需要重新建立连接,以恢复和服务器的通信。
      为了避免长时间不断开连接的问题,建议在合适的时机关闭连接。

    在初始化SseEmitter对象时,需要指定超时时间。
    SseEmitter sseEmitter = new SseEmitter(10006060L);
    0L表示不超时,1000L表示1秒超时。假设设置为30s超时连接,在客户端连接服务端后开始计时,如果在这30s内,服务端有向该客户端发送数据,那么在30s时间到了之后,服务端会先断开客户端的连接然后重新连接客户端,并开始新一轮计时;如果在这30s内,服务端没有向客户端发送数据,那么30s后服务端会断开客户端的连接,不再重连。

  • 相关阅读:
    nginx配置dav上传+展示页面
    Android ImageView 四个角自定义角度,以及角度的变换
    “can not run elasticsearch as root“如何解决
    [CAD二次开发]获取CAD内3D块参照的欧拉旋转交,Matrix3d矩阵转欧拉角。
    【STC32G12K128开发板】——STC32G12K128开发板介绍
    el-form 表单设置某个参数非必填验证
    对象内存布局
    selenium报错解决
    MacOS开发环境搭建
    如何利用Web Components提高前端开发效率?
  • 原文地址:https://blog.csdn.net/qq_41841482/article/details/132875733