• WebSocket vs SSE: 实时数据推送到前端的选择与实现(详细)


    在现代 Web 应用程序中,实时数据推送给前端变得越来越重要。无论是实时聊天、实时通知还是仪表板上的实时更新,都需要一种有效的方式来将数据推送给前端。本文将介绍两种常用的实现方法:WebSocket 和 Server-Sent Events(SSE),并提供详细的实现步骤。

    二者对比

    WebSocket 和 Server-Sent Events (SSE) 都是用于实现实时数据推送的技术,但它们在设计、用途和实现上有一些重要的区别。让我们详细比较这两种技术。

    WebSocket:

    1. 双向通信

      • WebSocket 允许双向通信,客户端和服务器都可以在任何时候向对方发送数据。
      • 这使得 WebSocket 非常适用于需要双向交互的应用,如在线聊天、多人协作工具等。
    2. 持久连接

      • WebSocket 建立持久连接,客户端和服务器之间的连接保持打开状态。
      • 这减少了与建立和关闭连接相关的开销,适用于频繁的数据交换。
    3. 低延迟

      • 由于持久连接,WebSocket 可以实现低延迟的实时数据传输,适用于需要快速响应的应用。
    4. 复杂性

      • 实现 WebSocket 可能相对复杂,需要更多的服务器资源和额外的协议处理。
    5. 跨域通信

      • WebSocket 通常需要配置服务器以允许跨域通信,因为它们使用自定义协议。
    6. 浏览器支持

      • WebSocket 在现代浏览器中得到广泛支持。

    Server-Sent Events (SSE):

    1. 单向通信

      • SSE 是一种单向通信,只允许服务器向客户端发送数据。客户端无法向服务器发送数据。
    2. HTTP 协议

      • SSE 建立在 HTTP 协议之上,使用标准 HTTP 请求和响应。
      • 这使得 SSE 更容易部署,因为它与现有的 HTTP 基础设施兼容。
    3. 简单性

      • SSE 的实现相对简单,服务器和客户端都不需要太多复杂的逻辑。
    4. 无需专用库

      • SSE 不需要额外的库或协议处理,客户端可以使用浏览器的原生 EventSource API 来接收数据。
    5. 跨域通信

      • SSE 支持跨域通信,可以通过 CORS(跨域资源共享)机制进行配置。
    6. 浏览器支持

      • SSE 在现代浏览器中也得到广泛支持,但与 WebSocket 相比,它的历史要长一些。

    选择 WebSocket 还是 SSE:

    • WebSocket 适用于需要双向通信和低延迟的场景,例如在线游戏、实时聊天应用等。

    • SSE 适用于单向服务器到客户端的实时数据推送,例如新闻更新、实时股票报价、天气预报等,特别是当你希望使用现有的 HTTP 基础设施时。

    • 在某些情况下,你甚至可以同时使用 WebSocket 和 SSE,根据不同的需求选择合适的技术。

    无论选择哪种技术,都需要考虑你的应用程序的具体需求和复杂性。WebSocket 提供了更多的灵活性和功能,而 SSE 更加简单和易于部署。最终的选择取决于你的项目目标和资源。

    Websocket 实现

    使用原生 WebSocket API:

    1. 简单性

      • Spring Boot 提供了对原生 WebSocket API 的支持,使得创建 WebSocket 应用相对简单。
      • 开发人员可以直接使用 Java 标准库中的 WebSocket 相关类来处理 WebSocket 通信。
    2. 依赖

      • 原生 WebSocket 不需要额外的依赖,因为 WebSocket API 已经包含在 Java 标准库中。
    3. 性能

      • 原生 WebSocket API 在性能方面表现良好,适用于大多数中小型应用。
    4. 生态系统

      • 使用原生 WebSocket 可以更容易地集成到现有的 Spring Boot 生态系统中,例如 Spring Security 等。
    5. 简单应用

      • 当你需要创建相对简单的 WebSocket 应用时,原生 WebSocket 是一个不错的选择。

    使用 Netty 创建 WebSocket:

    1. 灵活性

      • Netty 是一个高度可定制的异步事件驱动框架,它可以用于创建各种网络应用,包括 WebSocket。
      • Netty 提供了更多的灵活性和自定义选项,适用于复杂的 WebSocket 应用。
    2. 性能

      • Netty 以其高性能和低延迟而闻名,适用于需要处理大量并发连接的应用。
    3. 协议支持

      • Netty 支持多种协议,不仅限于 WebSocket。这意味着你可以在同一个应用程序中处理多种网络通信需求。
    4. 集成

      • 尽管 Netty 可以集成到 Spring Boot 中,但其集成可能需要更多的配置和代码。
    5. 复杂应用

      • 当你需要处理复杂的 WebSocket 场景,如高并发、自定义协议、复杂的消息处理等时,使用 Netty 是更好的选择。

    总结和选择:

    选择原生 WebSocket 还是使用 Netty 创建 WebSocket 应取决于你的项目需求和复杂性:

    • 如果你的应用相对简单,对性能要求不是很高,可以考虑使用原生 WebSocket API,它更容易上手并且不需要额外的依赖。

    • 如果你的应用需要处理高并发、复杂的协议、自定义消息处理或需要最大程度的性能和灵活性,那么使用 Netty 创建 WebSocket 可能更合适。Netty 为你提供了更多的控制权和自定义选项。

    无论你选择哪种方法,Spring Boot 都提供了良好的支持,使得在应用中集成 WebSocket 变得相对容易。因此,你可以根据具体的项目需求来选择适合你的方法。

    Netty 实现 Websocket

    • 添加 maven 坐标

        
          
            io.netty  
            netty-common  
            4.1.79.Final  
        
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
    • 创建 NettyWebsocketServer

      package com.todoitbo.baseSpringbootDasmart.netty.server;  
      
        import com.todoitbo.baseSpringbootDasmart.netty.handler.HeartbeatHandler;  
        import com.todoitbo.baseSpringbootDasmart.netty.handler.WebSocketHandler;  
        import io.netty.bootstrap.ServerBootstrap;  
        import io.netty.channel.*;  
        import io.netty.channel.nio.NioEventLoopGroup;  
        import io.netty.channel.socket.SocketChannel;  
        import io.netty.channel.socket.nio.NioServerSocketChannel;  
        import io.netty.handler.codec.http.HttpObjectAggregator;  
        import io.netty.handler.codec.http.HttpServerCodec;  
        import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;  
        import io.netty.handler.stream.ChunkedWriteHandler;  
        import io.netty.handler.timeout.IdleStateHandler;  
        import io.netty.handler.traffic.ChannelTrafficShapingHandler;  
        
        /**  
         * @author xiaobo  
         * @date 2023/9/5  
         */
        public class NettyWebsocketServer {  
           private final int port;  
        
           public NettyWebsocketServer(int port) {  
             this.port = port;  
           }  
        
           public void run() throws Exception {  
             EventLoopGroup bossGroup = new NioEventLoopGroup(1); // 创建用于接受客户端连接的 boss 线程池  
             EventLoopGroup workerGroup = new NioEventLoopGroup(); // 创建用于处理客户端请求的 worker 线程池  
        
             try {  
               ServerBootstrap b = new ServerBootstrap();  
               b.group(bossGroup, workerGroup)  
                 .channel(NioServerSocketChannel.class)  
                 .childHandler(new ChannelInitializer<SocketChannel>() {  
                   @Override  
                   public void initChannel(SocketChannel ch) throws Exception {  
                     ChannelTrafficShapingHandler trafficShapingHandler = new ChannelTrafficShapingHandler(  
                       1, // 读取速率限制(字节/秒)  
                       1, // 写入速率限制(字节/秒)  
                       1, // 流量检查时间间隔(毫秒)  
                       1 // 最大允许的时间窗口(毫秒)  
                     );  
                     ChannelPipeline pipeline = ch.pipeline();  
                     // 添加心跳检测处理器,3秒内没有读写事件将触发 IdleStateEvent,下面的顺序错了也会出现问题的  
                     pipeline.addLast(new IdleStateHandler(30, 0, 0));  
                     pipeline.addLast(new HeartbeatHandler());  
                     pipeline.addLast(new HttpServerCodec()); // 处理 HTTP 请求  
                     pipeline.addLast(new ChunkedWriteHandler()); // 写大数据流的处理器  
                     pipeline.addLast(new HttpObjectAggregator(8192)); // 将 HTTP 消息聚合为 FullHttpRequest 或 FullHttpResponse  
                     // pipeline.addLast(new WebSocketFrameAggregator(8192)); // 将 HTTP 消息聚合为 FullHttpRequest 或 FullHttpResponse  
                     // pipeline.addLast(new WebSocketServerCompressionHandler()); // 消息压缩  
                     pipeline.addLast(new WebSocketHandler()); // 自定义 WebSocket 处理器  
                     pipeline.addLast(new WebSocketServerProtocolHandler("/ws", null, true, 65536 * 10)); // 处理 WebSocket 升级握手和数据帧处理  
                   }  
                 })  
                 .option(ChannelOption.SO_BACKLOG, 128)          // 设置服务器接受队列大小  
                 .childOption(ChannelOption.SO_KEEPALIVE, true); // 开启 TCP 连接的 Keep-Alive 功能  
        
               // Bind and start to accept incoming connections.  
               System.out.println("TCP server started successfully");  
               ChannelFuture f = b.bind(port).sync(); // 绑定端口并等待绑定完成  
        
               // Wait until the server socket is closed.  
               // In this example, this does not happen, but you can do that to gracefully            // shut down your server.            f.channel().closeFuture().sync(); // 阻塞直到服务器关闭  
        
             } finally {  
               // 优雅地关闭线程池  
               workerGroup.shutdownGracefully();  
               bossGroup.shutdownGracefully();  
             }  
           }  
       }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60
      • 61
      • 62
      • 63
      • 64
      • 65
      • 66
      • 67
      • 68
      • 69
      • 70
      • 71
      • 72
      • 73
      • 74

    这里需要注意一下,pipeline.addLast的顺序不一致可能会导致程序报错,运行时

    • 创建心跳 handle

      package com.todoitbo.baseSpringbootDasmart.netty.handler;  
       
      import io.netty.channel.ChannelHandlerContext;  
      import io.netty.channel.ChannelInboundHandlerAdapter;  
      import io.netty.handler.timeout.IdleState;  
      import io.netty.handler.timeout.IdleStateEvent;  
      
      public class HeartbeatHandler extends ChannelInboundHandlerAdapter {  
      
         int readTimeOut = 0;  
      
         @Override  
         public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {  
           IdleStateEvent event = (IdleStateEvent) evt;  
      
           if(event.state() == IdleState.READER_IDLE){  
             readTimeOut++;  
           }  
      
           if(readTimeOut >= 3){  
             System.out.println("超时超过3次,断开连接");  
             ctx.close();  
           }  
         }  
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
    • 创建WebSocketHandler

      package com.todoitbo.baseSpringbootDasmart.netty.handler;  
       
       import cn.hutool.core.collection.CollectionUtil;  
       import com.todoitbo.baseSpringbootDasmart.netty.NamedChannelGroup;  
       import io.netty.buffer.ByteBuf;  
       import io.netty.buffer.Unpooled;  
       import io.netty.channel.*;  
       import io.netty.handler.codec.http.*;  
       import io.netty.handler.codec.http.websocketx.*;  
       import io.netty.util.AttributeKey;  
       import io.netty.util.CharsetUtil;  
       import lombok.extern.slf4j.Slf4j;  
       
       import java.util.HashMap;  
       import java.util.List;  
       import java.util.Map;  
       
       /**  
        * @author xiaobo  
        */
       @Slf4j  
       public class WebSocketHandler extends SimpleChannelInboundHandler<Object> {  
       
         private WebSocketServerHandshaker handshaker;  
       
         public static final AttributeKey<String> USER_ID_KEY = AttributeKey.valueOf("userId");  
         public static final AttributeKey<String> GROUP_ID_KEY = AttributeKey.valueOf("groupId");  
       
         private static final Map<Channel, String> WORK_CHANNEL_MAP = new HashMap<Channel,String>();  
       
         @Override  
         public void channelActive(ChannelHandlerContext ctx) throws Exception {  
           log.info("与客户端建立连接,通道开启!");  
           // 添加到channelGroup通道组(广播)  
           // 之后可以根据ip来进行分组  
           NamedChannelGroup.getChannelGroup("default").add(ctx.channel());  
         }  
         @Override  
         public void channelInactive(ChannelHandlerContext ctx) throws Exception {  
           log.info("与客户端断开连接,通道关闭!");  
           // 从channelGroup通道组(广播)中删除  
           // 之后可以根据ip来进行分组  
           Channel channel = ctx.channel();  
           NamedChannelGroup.getChannelGroup("default").remove(channel);  
           WORK_CHANNEL_MAP.remove(channel);  
         }  
       
         public boolean userAuthentication(ChannelHandlerContext ctx,FullHttpRequest req) {  
           // 提取URI参数  
           QueryStringDecoder queryStringDecoder = new QueryStringDecoder(req.uri());  
           Map<String, List<String>> parameters = queryStringDecoder.parameters();  
           // 根据参数进行处理  
           List<String> userId = parameters.get("userId");  
           List<String> groupId = parameters.get("groupId");  
           if (CollectionUtil.isNotEmpty(userId) && CollectionUtil.isNotEmpty(groupId)) {  
             ctx.channel().attr(USER_ID_KEY).set(userId.get(0));  
             ctx.channel().attr(GROUP_ID_KEY).set(groupId.get(0));  
             return true;  
           }else {  
             return false;  
           }  
       
         }  
       
       
         private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {  
           // 检查是否升级到WebSocket  
           if (!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) {  
             // 如果不是WebSocket协议的握手请求,返回400 Bad Request响应  
             sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));  
             return;  
           }  
       
           // 构建握手响应  
           WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(  
             getWebSocketLocation(req), null, true);  
           handshaker = wsFactory.newHandshaker(req);  
       
           if (handshaker == null) {  
             // 如果不支持WebSocket版本,返回HTTP 426 Upgrade Required响应  
             WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());  
           } else {  
             handshaker.handshake(ctx.channel(), req);  
             // 进行WebSocket握手  
             // 在认证成功后,设置用户ID到Channel属性中  
             boolean authentication = userAuthentication(ctx,req);// 这里需要实现用户认证逻辑  
             if (!authentication) {  
               // 用户认证失败,可能需要关闭连接或发送认证失败消息  
               // 1. 关闭连接:  
               ctx.close();  
               // 2. 发送认证失败消息给客户端:  
               String failureMessage = "认证失败,请提供有效的身份验证信息。";  
               ctx.writeAndFlush(failureMessage);  
               return;  
             }  
             // 其他逻辑...  
             WORK_CHANNEL_MAP.put(ctx.channel(), ctx.channel().attr(GROUP_ID_KEY).get());  
           }  
         }  
       
         private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) {  
           // 发送HTTP响应  
           if (res.status().code() != 200) {  
             ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8);  
             res.content().writeBytes(buf);  
             buf.release();  
             HttpUtil.setContentLength(res, res.content().readableBytes());  
           }  
       
           ChannelFuture future = ctx.channel().writeAndFlush(res);  
           if (!HttpUtil.isKeepAlive(req) || res.status().code() != 200) {  
             future.addListener(ChannelFutureListener.CLOSE);  
           }  
         }  
       
         private String getWebSocketLocation(FullHttpRequest req) {  
           return "ws://" + req.headers().get(HttpHeaderNames.HOST) + req.uri();  
         }  
       
         private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {  
           // 处理WebSocket消息,可以根据实际需求进行处理  
           if (frame instanceof TextWebSocketFrame) {  
             // 处理文本消息  
             String text = ((TextWebSocketFrame) frame).text();  
             System.out.println("Received message: " + text);  
       
             // 可以在这里处理WebSocket消息并发送响应  
             // ...  
           } else if (frame instanceof BinaryWebSocketFrame) {  
             // 处理二进制WebSocket消息  
             // ...  
             System.out.println("123");  
           } else if (frame instanceof CloseWebSocketFrame) {  
             // 处理WebSocket关闭请求  
             handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());  
           } else if (frame instanceof PingWebSocketFrame) {  
             // 处理WebSocket Ping消息  
             System.out.println("cs");  
             ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));  
           }  
         }  
       
         @Override  
         protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {  
           if (msg instanceof FullHttpRequest) {  
             // 处理HTTP握手请求  
             handleHttpRequest(ctx, (FullHttpRequest) msg);  
           } else if (msg instanceof WebSocketFrame) {  
             // 处理WebSocket消息  
             handleWebSocketFrame(ctx, (WebSocketFrame) msg);  
           }  
         }  
       
         @Override  
         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {  
           // 发生异常时的处理  
           log.error(cause.getMessage());  
           ctx.close();  
         }  
       
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60
      • 61
      • 62
      • 63
      • 64
      • 65
      • 66
      • 67
      • 68
      • 69
      • 70
      • 71
      • 72
      • 73
      • 74
      • 75
      • 76
      • 77
      • 78
      • 79
      • 80
      • 81
      • 82
      • 83
      • 84
      • 85
      • 86
      • 87
      • 88
      • 89
      • 90
      • 91
      • 92
      • 93
      • 94
      • 95
      • 96
      • 97
      • 98
      • 99
      • 100
      • 101
      • 102
      • 103
      • 104
      • 105
      • 106
      • 107
      • 108
      • 109
      • 110
      • 111
      • 112
      • 113
      • 114
      • 115
      • 116
      • 117
      • 118
      • 119
      • 120
      • 121
      • 122
      • 123
      • 124
      • 125
      • 126
      • 127
      • 128
      • 129
      • 130
      • 131
      • 132
      • 133
      • 134
      • 135
      • 136
      • 137
      • 138
      • 139
      • 140
      • 141
      • 142
      • 143
      • 144
      • 145
      • 146
      • 147
      • 148
      • 149
      • 150
      • 151
      • 152
      • 153
      • 154
      • 155
      • 156
      • 157
      • 158
      • 159
      • 160
      • 161
    • 创建NamedChannelGroup

      package com.todoitbo.baseSpringbootDasmart.netty;
      
      import io.netty.channel.group.ChannelGroup;
      import io.netty.channel.group.DefaultChannelGroup;
      import io.netty.util.concurrent.GlobalEventExecutor;
      
      import java.util.Map;
      import java.util.concurrent.ConcurrentHashMap;
      
      public class NamedChannelGroup{
          private String groupName;
          public static Map<String,ChannelGroup> channelGroupMap = new ConcurrentHashMap<>();
      
          static {
              channelGroupMap.put("default", new DefaultChannelGroup(GlobalEventExecutor.INSTANCE));
          }
      
          public static void setGroupName(String groupName){
              channelGroupMap.put(groupName, new DefaultChannelGroup(GlobalEventExecutor.INSTANCE));
          }
      
          public static ChannelGroup getChannelGroup(String groupName){
              return channelGroupMap.get(groupName);
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25

    Server-Sent Events (SSE)实现

    创建DataManager

    package com.todoitbo.baseSpringbootDasmart.sse;
    
    import org.springframework.http.MediaType;
    import org.springframework.stereotype.Component;
    import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    /**
     * 数据管理器用于管理Server-Sent Events (SSE) 的订阅和数据推送。
     * @author xiaobo
     */
    @Component
    public class DataManager {
    
        private final Map<String, List<SseEmitter>> dataEmitters = new HashMap<>();
    
        /**
         * 订阅特定数据类型的SSE连接。
         *
         * @param dataType 要订阅的数据类型
         * @param emitter  SSE连接
         */
        public void subscribe(String dataType, SseEmitter emitter) {
            dataEmitters.computeIfAbsent(dataType, k -> new ArrayList<>()).add(emitter);
            emitter.onCompletion(() -> removeEmitter(dataType, emitter));
            emitter.onTimeout(() -> removeEmitter(dataType, emitter));
        }
    
        /**
         * 推送特定数据类型的数据给所有已订阅的连接。
         *
         * @param dataType 要推送的数据类型
         * @param data     要推送的数据
         */
        public void pushData(String dataType, String data) {
            List<SseEmitter> emitters = dataEmitters.getOrDefault(dataType, new ArrayList<>());
            emitters.forEach(emitter -> {
                try {
                    emitter.send(SseEmitter.event().data(data, MediaType.TEXT_PLAIN));
                } catch (IOException e) {
                    removeEmitter(dataType, emitter);
                }
            });
        }
    
        private void removeEmitter(String dataType, SseEmitter emitter) {
            List<SseEmitter> emitters = dataEmitters.get(dataType);
            if (emitters != null) {
                emitters.remove(emitter);
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58

    接口实现

    package com.todoitbo.baseSpringbootDasmart.controller;  
      
    import com.todoitbo.baseSpringbootDasmart.sse.DataManager;  
    import org.springframework.http.MediaType;  
    import org.springframework.http.ResponseEntity;  
    import org.springframework.web.bind.annotation.GetMapping;  
    import org.springframework.web.bind.annotation.PathVariable;  
    import org.springframework.web.bind.annotation.RequestMapping;  
    import org.springframework.web.bind.annotation.RestController;  
    import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;  
      
    import javax.annotation.Resource;  
      
    /**  
     * @author xiaobo  
     */
    @RestController  
    @RequestMapping("/environment")  
    public class EnvironmentController {  
      
        @Resource    private DataManager dataManager;  
      
        @GetMapping(value = "/subscribe", produces = MediaType.TEXT_EVENT_STREAM_VALUE)  
        public SseEmitter subscribe() {  
            SseEmitter emitter = new SseEmitter();  
            dataManager.subscribe("environment", emitter);  
            return emitter;  
        }  
      
        // 示例:推送环境监测数据给前端  
        @GetMapping("/push/{testText}")  
        public ResponseEntity<String> pushEnvironmentData(@PathVariable String testText) {  
            dataManager.pushData("environment", testText);  
            return ResponseEntity.ok("Data pushed successfully.");  
        }  
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    实现说明

    每个不同类型的数据推送都需要一个对应的SSE订阅端点(subscribe)。每个数据类型都有一个对应的订阅端点,用于前端建立SSE连接,并在后端接收和处理特定类型的数据推送。

    在你的后端应用中,对于每种数据类型,你需要创建一个对应的Controller或处理器来处理该数据类型的SSE订阅。每个Controller会有自己的SSE订阅端点,前端可以连接到不同的端点以接收相应类型的数据。

    这种方式允许你将不同类型的数据推送逻辑分离,使代码更具可维护性和可扩展性。当有新的数据可用时,只需调用相应类型的数据推送方法,而不必修改通用的SSE管理逻辑。

    前端实现

    DOCTYPE html>
    <html>
    <head>
        <title>SSE Data Receivertitle>
    head>
    <body>
        <h1>Real-time Data Displayh1>
        <div id="data-container">div>
    
        <script>
            const dataContainer = document.getElementById('data-container');
    
            // 创建一个 EventSource 对象,指定 SSE 服务器端点的 URL
            const eventSource = new EventSource('http://127.0.0.1:13024/environment/subscribe'); // 根据你的控制器端点来设置URL
    
            // 添加事件处理程序,监听服务器端发送的事件
            eventSource.onmessage = (event) => {
                const data = event.data;
                // 在这里处理从服务器接收到的数据
                // 可以将数据显示在页面上或进行其他操作
                const newDataElement = document.createElement('p');
                newDataElement.textContent = data;
                dataContainer.appendChild(newDataElement);
            };
    
            eventSource.onerror = (error) => {
                // 处理连接错误
                console.error('Error occurred:', error);
            };
        script>
    body>
    html>
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    弊端以及解决方案

    如果你没什么处理的话,在它首次调用subscribe时候可能会出现连接超时的问题,因为这个是一个长连接,出现这种问题是因为,此时并没有数据产生,至此,除非你刷新页面,否则即使有数据产生前端也不会受到了

    你希望前端在第一次订阅SSE连接后,即使后端没有数据产生,之后也能接收到数据。这可以通过以下方式来实现:

    1. 保持持久连接: 确保前端建立的SSE连接是持久性连接,不会在第一次连接成功后关闭。这可以让连接一直保持打开状态,即使后端没有即时数据产生。你可以在前端代码中使用以下方式来确保连接持久:

      const eventSource = new EventSource('/environment/subscribe');
      
      • 1

      默认情况下,EventSource对象会自动重连,以保持连接的持久性。

    2. 定期发送心跳数据: 在后端定期发送一些心跳数据,以确保连接保持活跃。这可以防止连接超时关闭。你可以在后端定期发送一个包含无用信息的SSE事件,例如:

      @Scheduled(fixedRate = 30000) // 每30秒发送一次心跳数据
      public void sendHeartbeat() {
          dataManager.pushData("heartbeat", "Heartbeat data");
      }
      
      • 1
      • 2
      • 3
      • 4

      前端可以忽略这些心跳事件,但它们会保持连接处于活跃状态。

    3. 前端自动重连: 在前端代码中添加自动重连逻辑,以处理连接断开的情况。这样,如果连接由于某种原因断开,前端会自动尝试重新建立连接。示例:

      const eventSource = new EventSource('/environment/subscribe');
      
      eventSource.onerror = (error) => {
          // 处理连接错误
          console.error('Error occurred:', error);
          // 重新建立连接
          eventSource.close();
          setTimeout(() => {
              // 重新建立连接
              eventSource = new EventSource('/environment/subscribe');
          }, 1000); // 1秒后重试
      };
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12

    通过结合上述方法,你可以确保前端能够建立并保持持久SSE连接,即使后端没有即时数据产生。这样,一旦后端有数据产生,前端也可以接收到数据而无需重新订阅。

  • 相关阅读:
    Golang仿ps获取Linux进程信息
    BI零售数据分析,当代零售企业的核心竞争力
    Flutter 中使用 extension 使项目更具可读性和效率 01
    翻阅必备----Java窗口组件,容器,布局,监听,事件 API大全
    腾讯轻联:带你创造属于自己的AI小助手
    linux用户空间向内核空间通信之使用proc接口和使用WARN_ON和BUG_ON调试内核代码
    JavaScript —— 算法思想之递归和映射
    【MySql】数据库的CRUD(增删查改)
    CSS-列表属性篇
    VT 入门番外篇——最小 VT 实现(上)
  • 原文地址:https://blog.csdn.net/Mrxiao_bo/article/details/132779099