• SpringCloud源码探析(十)-Web消息推送


    1.概述

    消息推送在日常使用中的场景比较多,比如有人点赞了我的博客或者关注了我,这时我就会收到一条推送消息,以此来吸引我点击或者打开应用。消息推送的方式主要分为两种:web消息推送和移动端消息推送。它将所要发送的信息,发送至用户当前访问的网页或者移动设备。本文主要分析在web端进行消息推送的几种方式,实现用户在web端接收推送消息。

    2.消息推送几种方式

    web消息推送的方式主要分为两种:一种是主动向服务端请求(简单理解为客户端pull消息)、一种是服务端推送(简单理解为服务端push消息)。两种方式各有利弊:主动向服务端请求会按照一定周期不断去请求服务器,如果客户端数量庞大会对服务端造成极大压力,并且数据具有一定延时性;服务端推送实时性较好,但服务端需要存储客户端会话信息,如果客户端数量较多,服务端查询对应会话压力较大。

    2.1 Pull消息

    Pull消息主要是客户端发起的操作,定时向服务端进行轮询获取消息,轮询可分为短轮询和长轮询。

    短轮询:指定时间间隔,由应用浏览器发送http请求,服务器实时返回消息至客户端,浏览器进行展示;短轮询在前端一般通过JS定时器定时发送请求来实现;
    长轮询:是对短轮询的一种优化,客户端发起请求,服务器不会立即返回请求结果,而是将请求挂起等待一段时间,如果此时间段内数据变更,立即响应客户端请求,若是一直无变化则等到指定的超时时间响应请求,客户端重新发起长连接;长轮询在nacos、Kafka、RocketMQ队列中使用较多。

    2.2 Push消息

    服务端向客户端推送,在一定程度上能节约一部分资源,常用的方式有WebSocket、SSE等,还有一些通过中间件RabbitMQ来实现等。本文主要介绍利用SSE方式和WebSocket方式来推送消息,具体如下:

    2.2.1 SSE

    SSE(Server-sent events)是一种用于实现服务器向客户端实时推送数据的Web技术。与传统的短轮询和长轮询相比,SSE提供了更高效和实时的数据推送机制。SSE基于HTTP协议,允许服务器将数据以事件流(Event Stream)的形式发送给客户端。客户端通过建立持久的HTTP连接,并监听事件流,可以实时接收服务器推送的数据。SSE的主要特点包括:

    简单易用:SSE使用基于文本的数据格式,如纯文本、JSON等,使得数据的发送和解析都相对简单;
    单向通信:SSE支持服务器向客户端的单向通信,服务器可以主动推送数据给客户端,而客户端只能接收数据;
    实时性:SSE建立长时间的连接,使得服务器可以实时地将数据推送给客户端,而无需客户端频繁地发起请求。

    SSE的整体实现思路如下,它的原理其实类似于在线视频播放,视频流会连续不断的推送到浏览器,图如下:
    在这里插入图片描述
    其实可以简单地理解为它是一种单向实时通信技术,一旦客户端与服务端建立连接,只能接收服务端信息,不能向服务端发送信息,且拥有自动重连机制,客户端与服务端断开会进行自动重连,websocket断开不能自动重连,这是SSE优于websocket的地方。
    Springboot使用SSE功能发送信息代码如下,由于springboot内嵌sse模块,因此不需要引入额外包:

    package com.eckey.lab.controller;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.http.MediaType;
    import org.springframework.web.bind.annotation.*;
    import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
    
    import java.io.IOException;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    
    /**
     * @Author: Marin
     * @CreateTime: 2023-10-08  14:29
     * @Description: TODO
     * @Version: 1.0
     */
    @Slf4j
    @RestController
    @CrossOrigin //此注解是为了解决测试过程中的跨域问题
    @RequestMapping("/sse")
    public class SSEController {
    
        /**
         * 使用Map对象来存放userId和对应的会话
         */
        private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();
    
        /**
         * @description: 浏览器端注册,将会话信息存入Map,这种方式会导致一个userId只能与服务器建立一个会话,生产环境慎用这种方式
         * @author: Marin
         * @date: 2023/10/9 16:51
         * @param: [userId]
         * @return: org.springframework.web.servlet.mvc.method.annotation.SseEmitter
         **/
        @GetMapping(path = "/subscribe/{userId}", produces = {MediaType.TEXT_EVENT_STREAM_VALUE})
        public SseEmitter subscribe(@PathVariable("userId") String userId) throws IOException {
            // 设置超时时间,0表示不过期。默认30秒,超过时间未完成会抛出异常:AsyncRequestTimeoutException
            SseEmitter sseEmitter = new SseEmitter(30_000L);
            // 设置前端的重试时间为15s,如果不加这个发送一下,前端就不会显示连接成功
            sseEmitter.send("连接成功");
            // 注册回调
            sseEmitter.onCompletion(() -> {
                log.info("连接结束:{}", userId);
            });
    
            sseEmitter.onError((Throwable throwable) -> {
                log.error("连接异常:{}", throwable.getMessage());
            });
    
            sseEmitter.onTimeout(() -> {
                log.warn("连接超时:{}", userId);
            });
            //以userId为key,如果一个用户多个设备连接,会不准确
            sseEmitterMap.put(userId, sseEmitter);
            log.info("创建新的sse连接,当前用户:{}", userId);
            return sseEmitter;
        }
    
        /**
         * @description: 向指定用户发送指定信息
         * @author: Marin
         * @date: 2023/10/9 16:53
         * @param: [userId, message]
         * @return: void
         **/
        @GetMapping(path = "/sendMessage")
        public void sendMessage(String userId, String message) {
            if (sseEmitterMap.containsKey(userId)) {
                try {
                    log.info("向用户:{},发送消息:{}", userId, message);
                    sseEmitterMap.get(userId).send(message, MediaType.APPLICATION_JSON);
                } catch (IOException e) {
                    log.error("用户[{}]推送异常:{}", userId, e.getMessage());
                    removeUser(userId);
                }
            }
        }
    
        /**
         * @description: 移除用户
         * @author: Marin
         * @date: 2023/10/9 16:53
         * @param: [userId]
         * @return: void
         **/
        private void removeUser(String userId) {
            sseEmitterMap.remove(userId);
            log.info("移除用户成功:{}", userId);
        }
    
        /**
         * @description: 删除与指定用户会话
         * @author: Marin
         * @date: 2023/10/9 16:54
         * @param: [userId]
         * @return: void
         **/
        @GetMapping("/close/{userId}")
        public void close(@PathVariable("userId") String userId) {
            removeUser(userId);
            log.info("关闭连接成功:{}", userId);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105

    前端测试代码如下:

    <!DOCTYPE html>
    <html lang="en">
    
    <head>
        <meta charset="UTF-8">
        <title>SseEmitter</title>
    </head>
    
    <body>
    <button onclick="closeSse()">关闭连接</button>
    <div id="message"></div>
    </body>
    <script>
        let source = null;
    
        // 用时间戳模拟登录用户
        const userId = new Date().getTime();
    
        if (window.EventSource) {
    
            // 建立连接
            source = new EventSource('http://127.0.0.1:9090/sse/subscribe/' + userId);
            setMessageInnerHTML("连接用户=" + userId);
          
            source.addEventListener('open', function(e) {
                setMessageInnerHTML("建立连接。。。");
            }, false);
    
            source.addEventListener('message', function(e) {
                setMessageInnerHTML(e.data);
            });
    
            source.addEventListener('error', function(e) {
                if (e.readyState === EventSource.CLOSED) {
                    setMessageInnerHTML("连接关闭");
                } else if (e.target.readyState === EventSource.CONNECTING) { 
                    console.log('Connecting...');
                }else {
                    console.log(e);
                }
            }, false);
    
        } else {
            setMessageInnerHTML("你的浏览器不支持SSE");
        }
    
        // 监听窗口关闭事件,主动去关闭sse连接,如果服务端设置永不过期,浏览器关闭后手动清理服务端数据
        window.onbeforeunload = function() {
            closeSse();
        };
    
        // 关闭Sse连接
        function closeSse() {
            source.close();
            const httpRequest = new XMLHttpRequest();
            httpRequest.open('GET', 'http://127.0.0.1:9090/sse/close/'+ userId, true);
            httpRequest.send();
            console.log("close");
        }
    
        // 将消息显示在网页上
        function setMessageInnerHTML(innerHTML) {
            document.getElementById('message').innerHTML += innerHTML + '
    '
    ; } </script> </html>
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67

    打开前端页面,会出现连接信息,如下所示:
    在这里插入图片描述
    调用信息发送接口,跟据用户id发送指定消息,如下:
    在这里插入图片描述
    发送成功后前端接收并显示在页面上,如下:
    在这里插入图片描述

    2.2.2 WebSocket

    WebSocket是一种用于实现实时双向通信的Web技术,它使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在WebSocket API中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。它与SSE在某些方面有所不同。下面是SSE和WebSocket之间的比较:

    数据推送方向:SSE是服务器向客户端的单向通信,服务器可以主动推送数据给客户端。而WebSocket是双向通信,允许服务器和客户端之间进行实时的双向数据交换;
    连接建立:SSE使用基于HTTP的长连接,通过普通的HTTP请求和响应来建立连接,从而实现数据的实时推送。WebSocket使用自定义的协议,通过建立WebSocket连接来实现双向通信;
    兼容性:由于SSE基于HTTP协议,它可以在大多数现代浏览器中使用,并且不需要额外的协议升级。WebSocket在绝大多数现代浏览器中也得到了支持,但在某些特殊的网络环境下可能会遇到问题;
    适用场景:SSE适用于服务器向客户端实时推送数据的场景,如股票价格更新、新闻实时推送等。WebSocket适用于需要实时双向通信的场景,如聊天应用、多人协同编辑等。

    WebSocket原理图如下所示:
    在这里插入图片描述
    Springboot整合websocket如下:
    1.引入pom

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>
    
    • 1
    • 2
    • 3
    • 4

    2.编写socket配置

    package com.eckey.lab.config;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.boot.web.servlet.ServletContextInitializer;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.web.socket.server.standard.ServerEndpointExporter;
    
    import javax.servlet.ServletContext;
    import javax.servlet.ServletException;
    
    /**
     * @Author: Marin
     * @CreateTime: 2023-10-08  16:26
     * @Description: TODO
     * @Version: 1.0
     */
    @Slf4j
    @Configuration
    public class WebSocketConfig implements ServletContextInitializer {
    
        /**
         * 这个bean的注册,用于扫描带有@ServerEndpoint的注解成为websocket,如果你使用外置的tomcat就不需要该配置文件
         */
        @Bean
        public ServerEndpointExporter serverEndpointExporter() {
            return new ServerEndpointExporter();
        }
    
        @Override
        public void onStartup(ServletContext servletContext) throws ServletException {
            String serverInfo = servletContext.getServerInfo();
            log.info("serverInfo:{}", serverInfo);
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    3.编写SocketServer代码

    package com.eckey.lab.config;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.stereotype.Component;
    
    import javax.websocket.OnMessage;
    import javax.websocket.OnOpen;
    import javax.websocket.Session;
    import javax.websocket.server.PathParam;
    import javax.websocket.server.ServerEndpoint;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.CopyOnWriteArraySet;
    
    /**
     * @Author: Marin
     * @CreateTime: 2023-10-08  15:49
     * @Description: TODO
     * @Version: 1.0
     */
    @Component
    @Slf4j
    @ServerEndpoint("/websocket/{userId}")
    public class WebSocketServer {
    
        //与某个客户端的连接会话,需要通过它来给客户端发送数据
        private Session session;
    
        private static final CopyOnWriteArraySet<WebSocketServer> webSockets = new CopyOnWriteArraySet<>();
    
        // 用来存在线连接数
        private static final Map<String, Session> sessionPool = new HashMap<String, Session>();
    
        /**
         * 链接成功调用的方法
         */
        @OnOpen
        public void onOpen(Session session, @PathParam(value = "userId") String userId) {
            try {
                this.session = session;
                webSockets.add(this);
                sessionPool.put(userId, session);
                log.info("websocket消息: 有新的连接,总数为:" + webSockets.size());
            } catch (Exception e) {
                log.error("");
            }
        }
    
        /**
         * 收到客户端消息后调用的方法
         */
        @OnMessage
        public void onMessage(String message) {
            log.info("websocket消息: 收到客户端消息:" + message);
        }
    
        /**
         * 此为单点消息
         */
        public void sendOneMessage(String userId, String message) {
            Session session = sessionPool.get(userId);
            if (session != null && session.isOpen()) {
                try {
                    log.info("websocket发送单点消息:" + message);
                    session.getAsyncRemote().sendText(message);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72

    4.编写controller代码

    package com.eckey.lab.controller;
    
    import com.alibaba.fastjson.JSON;
    import com.eckey.lab.config.WebSocketServer;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.*;
    
    import java.io.IOException;
    import java.util.HashMap;
    
    /**
     * @Author: Marin
     * @CreateTime: 2023-10-08  15:24
     * @Description: TODO
     * @Version: 1.0
     */
    @Slf4j
    @RestController
    @CrossOrigin
    @RequestMapping("/socket")
    public class WebSocketController {
    
        @Autowired
        private WebSocketServer webSocketServer;
    
        @GetMapping(path = "/publish/{userId}")
        public String publish(@PathVariable("userId") String userId, String message) throws IOException {
            webSocketServer.sendOneMessage(userId, message);
            log.info("信息发送成功!userId:{},message:{}", userId, message);
            HashMap maps = new HashMap();
            maps.put("code", "0");
            maps.put("msg", "success");
            return JSON.toJSONString(maps);
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    5.测试
    客户端发送消息,服务接收到消息,具体如下:
    在这里插入图片描述
    在这里插入图片描述
    调用服务端接口发送消息,客户端接收到消息,具体如下:
    在这里插入图片描述
    在这里插入图片描述

    3.小结

    1.SSE方式是一种基于TCP协议的单向数据传输方式,当连接建立完成,只能由服务端向客户端发送信息;
    2.WebSocket是一种双向通信技术,能够实现客户端和服务端的全双工通信,它在建立连接时使用HTTP协议,其它时候都是直接基于TCP协议进行通信;
    3.在选择SSE或者WebSocket时,需要跟据场景、性能损耗进行综合考虑,合理的技术选型能够有效增强服务的健壮性。

    4.参考文献

    1.https://zhuanlan.zhihu.com/p/634581294
    2.https://juejin.cn/post/7122014462181113887
    3.https://javaguide.cn/system-design/web-real-time-message-push.html

    5.附录

    https://gitee.com/Marinc/nacos.git

  • 相关阅读:
    科普长文--网络安全拟态防御技术概念及应用
    CSS中如何实现一个自适应正方形(宽高相等)的元素?
    发布文章到wordpress
    冯喜运:4.24-4.25黄金原油双双跳水、今日走势分析
    数字档案一体化解决方案
    【计算机网络-自顶向下方法】应用层(HTTP、FTP)
    100个特别的遥感应用和用途
    一些自己收集的秒杀设计的重要知识点
    l8-d9 UDP通信实现
    取消合并单元格并快速填充
  • 原文地址:https://blog.csdn.net/qq_33479841/article/details/133686673