• springboot -sse -flux 服务器推送消息


    先说BUG处理,遇到提示异步问题 Async support must be enabled on a servlet and for all filters involved in async request processing. This is done in Java code using the Servlet API or by adding "true" to servlet and filter declarations in web.xml.

    springboot在@WebFilter注解处,加入urlPatterns = { "/*" },asyncSupported = true

    springmvc在web.xml处理

    1. <?xml version="1.0" encoding="UTF-8"?>
    2. <web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://java.sun.com/xml/ns/javaee"
    3.          xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
    4.          version="3.0">
    5. <filter-mapping>
    6.   <filter-name>shiroFilter</filter-name>
    7.   <url-pattern>/*</url-pattern>
    8.   <dispatcher>REQUEST</dispatcher>
    9.   <dispatcher>ASYNC</dispatcher>
    10. </filter-mapping>
    • demo1,服务器间隔一定时间推送内容
    1.     接口方法
    1. @GetMapping(path = "/sse/{userId}",produces = MediaType.TEXT_EVENT_STREAM_VALUE )
    2. public Flux> sse(@PathVariable String userId) {
    3. // 每两秒推送一次
    4. return Flux.interval(Duration.ofSeconds(2)).map(seq->
    5. Tuples.of(seq, LocalDateTime.now())).log()//序号和时间
    6. .map(data-> ServerSentEvent.builder().id(userId).data(data.getT1().toString()).build());//推送内容
    7. }

    2.前端代码

    1. <!DOCTYPE html>
    2. <html xmlns:th="http://www.thymeleaf.org">
    3. <head>
    4. <meta charset="UTF-8"/>
    5. <title>服务器推送事件</title>
    6. </head>
    7. <body>
    8. <div>
    9. <div id="data"></div>
    10. <div id="result"></div><br/>
    11. </div>
    12. <script th:inline="javascript" >
    13. //服务器推送事件
    14. if (typeof (EventSource) !== "undefined") {
    15. var source1 = new EventSource("http://localhost:9000/api/admin/test/sse/1");
    16. //当抓取到消息时
    17. source1.onmessage = function (evt) {
    18. document.getElementById("data").innerHTML = document.getElementById("data").innerHTML+"股票行情:" + evt.data;
    19. };
    20. } else {
    21. //注意:ie浏览器不支持
    22. document.getElementById("result").innerHTML = "抱歉,你的浏览器不支持 server-sent 事件...";
    23. var xhr;
    24. var xhr2;
    25. if (window.XMLHttpRequest){
    26. //IE7+, Firefox, Chrome, Opera, Safari浏览器支持该方法
    27. xhr=new XMLHttpRequest();
    28. xhr2=new XMLHttpRequest();
    29. }else{
    30. //IE6, IE5 浏览器不支持,使用ActiveXObject方法代替
    31. xhr=new ActiveXObject("Microsoft.XMLHTTP");
    32. xhr2=new ActiveXObject("Microsoft.XMLHTTP");
    33. }
    34. console.log(xhr);
    35. console.log(xhr2);
    36. xhr.open('GET', '/sse/countDown');
    37. xhr.send(null);//发送请求
    38. xhr.onreadystatechange = function() {
    39. console.log("s响应状态:" + xhr.readyState);
    40. //2是空响应,3是响应一部分,4是响应完成
    41. if (xhr.readyState > 2) {
    42. //这儿可以使用response(对应json)与responseText(对应text)
    43. var newData = xhr.response.substr(xhr.seenBytes);
    44. newData = newData.replace(/\n/g, "#");
    45. newData = newData.substring(0, newData.length - 1);
    46. var data = newData.split("#");
    47. console.log("获取到的数据:" + data);
    48. document.getElementById("result").innerHTML = data;
    49. //长度重新赋值,下次截取时需要使用
    50. xhr.seenBytes = xhr.response.length;
    51. }
    52. }
    53. xhr2.open('GET', '/sse/retrieve');
    54. xhr2.send(null);//发送请求
    55. xhr2.onreadystatechange = function() {
    56. console.log("s响应状态:" + xhr2.readyState);
    57. //0: 请求未初始化,2 请求已接收,3 请求处理中,4 请求已完成,且响应已就绪
    58. if (xhr2.readyState > 2) {
    59. //这儿可以使用response(对应json)与responseText(对应text)
    60. var newData1 = xhr2.response.substr(xhr2.seenBytes);
    61. newData1 = newData1.replace(/\n/g, "#");
    62. newData1 = newData1.substring(0, newData1.length - 1);
    63. var data1 = newData1.split("#");
    64. console.log("获取到的数据:" + data1);
    65. document.getElementById("data").innerHTML = data1;
    66. //长度重新赋值,下次截取时需要使用
    67. xhr2.seenBytes = xhr2.response.length;
    68. }
    69. }
    70. }
    71. </script>
    72. </body>
    73. </html>
    • demo2 订阅服务器消息,服务器send推送消息完成后,关闭sse.close

    1.接口方法以及工具类

    1. @GetMapping(path = "/sse/sub",produces = MediaType.TEXT_EVENT_STREAM_VALUE )
    2. public SseEmitter subscribe(@RequestParam String questionId,HttpServletResponse response) {
    3. // 简单异步发消息 ====
    4. //questionId 订阅id,id对应了sse对象
    5. new Thread(() -> {
    6. try {
    7. Thread.sleep(1000);
    8. for (int i = 0; i < 10; i++) {
    9. Thread.sleep(500);
    10. SSEUtils.pubMsg(questionId, questionId + " - kingtao come " + i);
    11. }
    12. } catch (Exception e) {
    13. e.printStackTrace();
    14. } finally {
    15. // 消息发送完关闭订阅
    16. SSEUtils.closeSub(questionId);
    17. }
    18. }).start();
    19. // =================
    20. return SSEUtils.addSub(questionId);
    21. }

    工具类

    1. import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
    2. import java.util.Map;
    3. import java.util.concurrent.ConcurrentHashMap;
    4. public class SSEUtils {
    5. // timeout
    6. private static Long DEFAULT_TIME_OUT = 2*60*1000L;
    7. // 订阅表
    8. private static Map<String, SseEmitter> subscribeMap = new ConcurrentHashMap<>();
    9. /** 添加订阅 */
    10. public static SseEmitter addSub(String questionId) {
    11. if (null == questionId || "".equals(questionId)) {
    12. return null;
    13. }
    14. SseEmitter emitter = subscribeMap.get(questionId);
    15. if (null == emitter) {
    16. emitter = new SseEmitter(DEFAULT_TIME_OUT);
    17. subscribeMap.put(questionId, emitter);
    18. }
    19. return emitter;
    20. }
    21. /** 发消息 */
    22. public static void pubMsg(String questionId, String msg) {
    23. SseEmitter emitter = subscribeMap.get(questionId);
    24. if (null != emitter) {
    25. try {
    26. // 更规范的消息结构看源码
    27. emitter.send(SseEmitter.event().data(msg));
    28. } catch (Exception e) {
    29. // e.printStackTrace();
    30. }
    31. }
    32. }
    33. /**
    34. * 关闭订阅
    35. * @param questionId
    36. */
    37. public static void closeSub(String questionId) {
    38. SseEmitter emitter = subscribeMap.get(questionId);
    39. if (null != emitter) {
    40. try {
    41. emitter.complete();
    42. subscribeMap.remove(questionId);
    43. } catch (Exception e) {
    44. e.printStackTrace();
    45. }
    46. }
    47. }
    48. }

    2.前端代码

    1. <!DOCTYPE html>
    2. <html lang="en">
    3. <head>
    4. <meta charset="UTF-8">
    5. <title>sse</title>
    6. </head>
    7. <body>
    8. <div>
    9. <label>问题id</label>
    10. <input type="text" id="questionId">
    11. <button onclick="subscribe()">订阅</button>
    12. <hr>
    13. <label>F12-console控制台查看消息</label>
    14. </div>
    15. <script>
    16. function subscribe() {
    17. let questionId = document.getElementById('questionId').value;
    18. let url = 'http://localhost:9000/api/admin/test/sse/sub?questionId=' + questionId;
    19. let eventSource = new EventSource(url);
    20. eventSource.onmessage = function (e) {
    21. console.log(e.data);
    22. };
    23. eventSource.onopen = function (e) {
    24. console.log(e,1);
    25. // todo
    26. };
    27. eventSource.onerror = function (e) {
    28. // todo
    29. console.log(e,2);
    30. eventSource.close()
    31. };
    32. }
    33. </script>
    34. </body>
    35. </html>

  • 相关阅读:
    智能运维和数字孪生赋能智慧城市管理服务平台
    Nginx+Tomcat负载均衡、动静分离群集
    Windows Server 系统各版本及授权说明(附下载地址
    Linux安装MySQL【Ubuntu20.04】
    4.1保护模式
    【wpa_supplicant】driver如何告诉supplicant自己做的一些事情以及结果
    11.22Spring 学习day02
    联想笔记本电脑开机后一直黑屏的解决办法
    零基础可以学设计专业吗,怎么学?
    Redis-高性能原理剖析
  • 原文地址:https://blog.csdn.net/qq_32784303/article/details/134511496