• SpringBoot实现SSE构建实时数据单向推送


    • SSE 是一种单向通信,只允许服务器向客户端发送数据。客户端无法向服务器发送数据。
    • SSE 建立在 HTTP 协议之上,使用标准 HTTP 请求和响应。
    • SSE 不需要额外的库或协议处理,客户端可以使用浏览器的原生 EventSource API 来接收数据。
    • SSE 支持跨域通信,可以通过 CORS(跨域资源共享)机制进行配置。
    • SSE 在现代浏览器中也得到广泛支持,但与 WebSocket 相比,它的历史要长一些。

    1. import org.springframework.http.MediaType;
    2. import org.springframework.stereotype.Component;
    3. import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
    4. import java.io.IOException;
    5. import java.util.ArrayList;
    6. import java.util.HashMap;
    7. import java.util.List;
    8. import java.util.Map;
    9. /**
    10. * 数据管理器用于管理Server-Sent Events (SSE) 的订阅和数据推送。
    11. * @author xiaobo
    12. */
    13. @Component
    14. public class DataManager {
    15. private final Map> dataEmitters = new HashMap<>();
    16. /**
    17. * 订阅特定数据类型的SSE连接。
    18. *
    19. * @param dataType 要订阅的数据类型
    20. * @param emitter SSE连接
    21. */
    22. public void subscribe(String dataType, SseEmitter emitter) {
    23. dataEmitters.computeIfAbsent(dataType, k -> new ArrayList<>()).add(emitter);
    24. emitter.onCompletion(() -> removeEmitter(dataType, emitter));
    25. emitter.onTimeout(() -> removeEmitter(dataType, emitter));
    26. }
    27. /**
    28. * 推送特定数据类型的数据给所有已订阅的连接。
    29. *
    30. * @param dataType 要推送的数据类型
    31. * @param data 要推送的数据
    32. */
    33. public void pushData(String dataType, String data) {
    34. List emitters = dataEmitters.getOrDefault(dataType, new ArrayList<>());
    35. emitters.forEach(emitter -> {
    36. try {
    37. emitter.send(SseEmitter.event().data(data, MediaType.TEXT_PLAIN));
    38. } catch (IOException e) {
    39. removeEmitter(dataType, emitter);
    40. }
    41. });
    42. }
    43. private void removeEmitter(String dataType, SseEmitter emitter) {
    44. List emitters = dataEmitters.get(dataType);
    45. if (emitters != null) {
    46. emitters.remove(emitter);
    47. }
    48. }
    49. }

    1. import com.todoitbo.baseSpringbootDasmart.sse.DataManager;
    2. import org.springframework.http.MediaType;
    3. import org.springframework.http.ResponseEntity;
    4. import org.springframework.web.bind.annotation.GetMapping;
    5. import org.springframework.web.bind.annotation.PathVariable;
    6. import org.springframework.web.bind.annotation.RequestMapping;
    7. import org.springframework.web.bind.annotation.RestController;
    8. import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
    9. import javax.annotation.Resource;
    10. /**
    11. * @author xiaobo
    12. */
    13. @RestController
    14. @RequestMapping("/environment")
    15. public class EnvironmentController {
    16. @Resource private DataManager dataManager;
    17. @GetMapping(value = "/subscribe", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    18. public SseEmitter subscribe() {
    19. SseEmitter emitter = new SseEmitter();
    20. dataManager.subscribe("environment", emitter);
    21. return emitter;
    22. }
    23. // 示例:推送环境监测数据给前端
    24. @GetMapping("/push/{testText}")
    25. public ResponseEntity pushEnvironmentData(@PathVariable String testText) {
    26. dataManager.pushData("environment", testText);
    27. return ResponseEntity.ok("Data pushed successfully.");
    28. }
    29. }

     如果没有数据产生会出现连接超时问题。

    默认情况下,EventSource对象会自动重连,以保持连接的持久性。

    第一次订阅SSE连接后,即使后端没有数据产生,之后也能接收到数据。这可以通过定期发送心跳数据

    1. @Scheduled(fixedRate = 30000) // 每30秒发送一次心跳数据
    2. public void sendHeartbeat() {
    3. dataManager.pushData("heartbeat", "Heartbeat data");
    4. }

    1. html>
    2. <html>
    3. <head>
    4. <title>SSE Data Receivertitle>
    5. head>
    6. <body>
    7. <h1>Real-time Data Displayh1>
    8. <div id="data-container">div>
    9. <script>
    10. const dataContainer = document.getElementById('data-container');
    11. // 创建一个 EventSource 对象,指定 SSE 服务器端点的 URL
    12. const eventSource = new EventSource('http://127.0.0.1:13024/environment/subscribe'); // 根据你的控制器端点来设置URL
    13. eventSource.onopen = function(event) {
    14. };
    15. // 添加事件处理程序,监听服务器端发送的事件
    16. eventSource.onmessage = (event) => {
    17. const data = event.data;
    18. // 在这里处理从服务器接收到的数据
    19. // 可以将数据显示在页面上或进行其他操作
    20. const newDataElement = document.createElement('p');
    21. newDataElement.textContent = data;
    22. dataContainer.appendChild(newDataElement);
    23. };
    24. eventSource.onerror = (error) => {
    25. // 处理连接错误
    26. console.error('Error occurred:', error);
    27. // 重新建立连接
    28. eventSource.close();
    29. setTimeout(() => {
    30. // 重新建立连接
    31. eventSource = new EventSource('/environment/subscribe');
    32. }, 1000); // 1秒后重试
    33. };
    34. script>
    35. body>
    36. html>

     精简版后端

    1. import org.springframework.boot.SpringApplication;
    2. import org.springframework.boot.autoconfigure.SpringBootApplication;
    3. import org.springframework.web.bind.annotation.GetMapping;
    4. import org.springframework.web.bind.annotation.RestController;
    5. import java.io.IOException;
    6. import java.util.concurrent.CopyOnWriteArrayList;
    7. import java.util.concurrent.atomic.AtomicLong;
    8. @RestController
    9. @SpringBootApplication
    10. public class SseApplication {
    11. private final CopyOnWriteArrayList emitters = new CopyOnWriteArrayList<>();
    12. private final AtomicLong counter = new AtomicLong();
    13. public static void main(String[] args) {
    14. SpringApplication.run(SseApplication.class, args);
    15. }
    16. @GetMapping("/sse")
    17. public SseEmitter handleSse() {
    18. SseEmitter emitter = new SseEmitter();
    19. emitters.add(emitter);
    20. emitter.onCompletion(() -> emitters.remove(emitter));
    21. new Thread(() -> {
    22. try {
    23. for (int i = 0; i < 10; i++) {
    24. emitter.send(SseEmitter.event()
    25. .id(String.valueOf(counter.incrementAndGet()))
    26. .name("message")
    27. .data("This is message " + i));
    28. Thread.sleep(1000);
    29. }
    30. emitter.complete();
    31. } catch (IOException | InterruptedException e) {
    32. emitter.completeWithError(e);
    33. }
    34. }).start();
    35. return emitter;
    36. }
    37. }

  • 相关阅读:
    Vue图片懒加载
    专业数据分析思路和常用分析方法
    MAC修改python3命令为py
    计算机视觉(CV)技术
    流水线作业模拟程序
    Spring Boot 2020 官方基础68课程第24个 gateway
    Golang Array 数组使用注意事项和细节
    ISPE GAMP5 中文版
    ros自定义action记录
    搭建Prometheus+Grafana框架监控Hyperledger Fabric的运行
  • 原文地址:https://blog.csdn.net/guo__hang/article/details/134457142