• SSE:后端向前端发送消息(springboot SseEmitter)


    背景

    有一个项目,前端vue,后端springboot。现在需要做一个功能:用户在使用系统的时候,管理员发布公告,则使用系统的用户可以看到该公告。
    基于此,一个简单的方案:前端使用JS方法setInterval,重复调用后端公告获取接口。此方法有几点缺陷:

    • 循环调用的时间间隔不好确定:太长了,获取公告的时效有延迟;太短了,给服务器造成压力,很多请求都是无用的(公告发布的时间不定,很可能几天都没有新公告);
    • token的续期问题:项目中,前端请求,需要带上token,token有过期时间,如果用户一直使用(前后端有交互),会无感续期。如果有这种定时循环和后端交互的场景,就会造成token用不过期(循环的调用会触发续期),当然,可以在续期中,排除某个场景的请求,但是这样的设计不好,因为这种场景太多了,就会造成维护上的困难。

    因此就想到了,如果后端主动向前端推送消息,这个问题就可以完美解决。

    方案

    有两种方案可以实现后端向前端推送消息:

    1. 使用websocket;
    2. 使用sse;

    这里介绍SSE的方式(如果系统中对这种消息的准确性和可靠性有严格的要求,则使用websocket,websocket的使用相对复杂的多);
    如果想了解SSE的详细基础知识,可以参考阮一峰老师的这篇文章:Server-Sent Events 教程

    SSE后端代码

    SpringMVC中,已经集成了该功能,所以无需额外引入jar包,直接上代码:

    @RestController
    @RequestMapping("/notice")
    public class NoticeController {
    
        @Autowired
        private NoticeService noticeService;
    
        @GetMapping(path = "createSseEmitter")
        public SseEmitter createSseEmitter(String id) {
            return noticeService.createSseEmitter(id);
        }
    
        @PostMapping(path = "sendMsg")
        public boolean sendMsg(String id, String content) {
            noticeService.sendMsg(id, content);
            return true;
        }
    
    }
    
    @Slf4j
    @Service
    public class NoticeServiceImpl implements NoticeService {
        @Autowired
        @Qualifier("sseEmitterCacheService")
        private CacheService<SseEmitter> sseEmitterCacheService;
    
        @Override
        public SseEmitter createSseEmitter(String clientId) {
            if (StringUtil.isBlank(clientId)) {
                clientId = UUID.randomUUID().toString().replace("-", "");
            }
            SseEmitter sseEmitter = sseEmitterCacheService.getCache(clientId);
            log.info("获取SSE,id={}", clientId);
            final String id = clientId;
            sseEmitter.onCompletion(() -> {
                log.info("SSE已完成,关闭连接 id={}", id);
                sseEmitterCacheService.deleteCache(id);
            });
            return sseEmitter;
        }
        @Override
        public void sendMsg(String clientId, String content) {
            if (sseEmitterCacheService.hasCache(clientId)) {
                SseEmitter sseEmitter = sseEmitterCacheService.getCache(clientId);
                try {
                    sseEmitter.send(content);
                } catch (IOException e) {
                    log.error("发送消息失败:{}", e.getMessage(), e);
                    throw new BusinessRuntimeExcepption(CustomExcetionConstant.IO_ERR, "发送消息失败", e);
                }
            } else {
                log.error("SSE对象不存在");
                throw new BusinessRuntimeExcepption("SSE对象不存在");
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57

    这里,只列出了核心的代码,简而言之,需要做到两点即可:

    1. 前端首先是发起一个请求,创建SseEmitter,即createSseEmitter方法,该方法必须返回一个SseEmitter对象;
    2. 返回的SseEmitter,后端必须要缓存起来(我用的是ehcache,也可以直接定义一个map来缓存);

    为什么要这么做?看下文,后端代码一起来分析就明白了。

    前端代码

    由于,我请求该接口,需要带上token,所以直接使用EventSource不行,另外这个IE也不支持。所以选择了一个工具:event-source-polyfill。

    1. 先安装event-source-polyfill
    npm install event-source-polyfill
    
    • 1
    1. 然后使用:
    import { EventSourcePolyfill } from "event-source-polyfill";
      created() {
        let _this = this;
        this.source = new EventSourcePolyfill(
          "/" +
            process.env.VUE_APP_MANAGER_PRE_API_URL +
            "/notice/createSseEmitter?id=" +
            uuid(),
          {
            headers: {
              [process.env.VUE_APP_OAUTH_AUTHORIZATION]: store.getters.getToken,
            },
            //重连时间间隔,单位:毫秒,默认45000毫秒,这里设置为10分钟
            heartbeatTimeout: 10 * 60 * 1000,
          }
        );
    
        this.source.onopen = () => {
          console.log("NOTICE建立连接");
        };
        this.source.onmessage = (e) => {
          _this.scrollMessage = e.data;
          console.log("NOTICE接收到消息");
        };
        this.source.onerror = (e) => {
          if (e.readyState == EventSource.CLOSED) {
            console.log("NOTICE连接关闭");
          } else if (this.source.readyState == EventSource.CONNECTING) {
            console.log("NOTICE正在重连");
            //重新设置header
            this.source.headers = {
              [process.env.VUE_APP_OAUTH_AUTHORIZATION]: store.getters.getToken,
            };
          } else {
            console.log(e);
          }
        };
      },
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    有几点说明:

    • new EventSourcePolyfill中,可以带入header
    • heartbeatTimeout是一个心跳时间,默认情况下间隔heartbeatTimeout后,会触发重新连接后端接口;
    • this.source.headers,该行的作用是在重连的时候重新设置header,如果不这样,那么重连的时候,用的参数信息,还是和最开始的一样(包括本例中url中的id)。而由于我的项目中,如果token其他操作触发了刷新token,则有效token可能会变,所以,这里取缓存中放置的token,而不应该使用最初的token。
      好了,这样就基本实现了我们所需要的功能了。

    特别注意

    前端配置了代理,所以一直收不到后端发送的消息,尝试加入以下参数:

      devServer: {
        compress:false,
        …………
    }
    
    • 1
    • 2
    • 3
    • 4

    问题

    之前在写后端的时候提到了两个问题:为什么要返回SseEmitter对象?为什么要缓存SseEmitter对象?
    其实看过SSE的原理,都应该明白:这就是一个长连接,前端调用创建SseEmitter对象的接口,虽然接口返回了,但是并未结束(这就是为什么要返回SseEmitter对象,如果返回的是一个其他对象,就和普通的接口没两样了, 该接口就直接结束了),请看下截图:
    在这里插入图片描述
    发起请求之后,一直是待处理,并未结束,10分钟之后,该请求被取消(前端设置的重连),然后重新发起连接,重新发起的连接也是在等待中。只有接收到消息后,这个请求的状态码才是200,但是这个时候才连接已经建立好了。其中的细节,这里不做讲述。
    所以,如果再使用SseEmitter对象发送消息,则前端就可以收到对象的消息了(即实现后端向前端发送消息)。这里使用的SseEmitter对象,就是createSseEmitter接口返回的对象(也就是使用哪个SseEmitter对象,就可以向哪个前端发送消息)。这也就是为什么要缓存SseEmitter对象的原因了。

    效果

    通过调用发送消息接口,前端即可立即展示发送的消息:
    在这里插入图片描述

  • 相关阅读:
    socket 降低cpu 占用率 ,放弃Thread和Whell 循环
    为什么各大厂自研的内存泄漏检测框架都要参考 LeakCanary?因为它是真强啊!
    Camunda 创建 流程图回调 (三)
    C语言——从键盘任意输人一个三位数的自然数,求该数个位、十位、百位上的数字之和
    【firewalld防火墙】
    保证接口数据安全的10种方案
    AIGC独角兽官宣联手,支持千亿大模型的云实例发布,“云计算春晚”比世界杯还热闹...
    【JavaScript数据网格】上海道宁51component为你带来企业JS开发人员首选的数据网格——AG Grid
    南大通用数据库-Gbase-8a-学习-11-Oracle通过Dblink访问Gbase8a
    C++——std::async和std::thread
  • 原文地址:https://blog.csdn.net/fyk844645164/article/details/126680347