- import org.springframework.http.MediaType;
- import org.springframework.stereotype.Component;
- import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
-
- import java.io.IOException;
- import java.util.ArrayList;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
-
- /**
- * 数据管理器用于管理Server-Sent Events (SSE) 的订阅和数据推送。
- * @author xiaobo
- */
- @Component
- public class DataManager {
-
- private final Map
> dataEmitters = new HashMap<>(); -
- /**
- * 订阅特定数据类型的SSE连接。
- *
- * @param dataType 要订阅的数据类型
- * @param emitter SSE连接
- */
- public void subscribe(String dataType, SseEmitter emitter) {
- dataEmitters.computeIfAbsent(dataType, k -> new ArrayList<>()).add(emitter);
- emitter.onCompletion(() -> removeEmitter(dataType, emitter));
- emitter.onTimeout(() -> removeEmitter(dataType, emitter));
- }
-
- /**
- * 推送特定数据类型的数据给所有已订阅的连接。
- *
- * @param dataType 要推送的数据类型
- * @param data 要推送的数据
- */
- public void pushData(String dataType, String data) {
- List
emitters = dataEmitters.getOrDefault(dataType, new ArrayList<>()); - emitters.forEach(emitter -> {
- try {
- emitter.send(SseEmitter.event().data(data, MediaType.TEXT_PLAIN));
- } catch (IOException e) {
- removeEmitter(dataType, emitter);
- }
- });
- }
-
- private void removeEmitter(String dataType, SseEmitter emitter) {
- List
emitters = dataEmitters.get(dataType); - if (emitters != null) {
- emitters.remove(emitter);
- }
- }
- }
-
- import com.todoitbo.baseSpringbootDasmart.sse.DataManager;
- import org.springframework.http.MediaType;
- import org.springframework.http.ResponseEntity;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.PathVariable;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
- import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
-
- import javax.annotation.Resource;
-
- /**
- * @author xiaobo
- */
- @RestController
- @RequestMapping("/environment")
- public class EnvironmentController {
-
- @Resource private DataManager dataManager;
-
- @GetMapping(value = "/subscribe", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
- public SseEmitter subscribe() {
- SseEmitter emitter = new SseEmitter();
- dataManager.subscribe("environment", emitter);
- return emitter;
- }
-
- // 示例:推送环境监测数据给前端
- @GetMapping("/push/{testText}")
- public ResponseEntity
pushEnvironmentData(@PathVariable String testText) { - dataManager.pushData("environment", testText);
- return ResponseEntity.ok("Data pushed successfully.");
- }
- }
如果没有数据产生会出现连接超时问题。
默认情况下,EventSource对象会自动重连,以保持连接的持久性。
第一次订阅SSE连接后,即使后端没有数据产生,之后也能接收到数据。这可以通过定期发送心跳数据
- @Scheduled(fixedRate = 30000) // 每30秒发送一次心跳数据
- public void sendHeartbeat() {
- dataManager.pushData("heartbeat", "Heartbeat data");
- }
- html>
- <html>
- <head>
- <title>SSE Data Receivertitle>
- head>
- <body>
- <h1>Real-time Data Displayh1>
- <div id="data-container">div>
-
- <script>
- const dataContainer = document.getElementById('data-container');
-
- // 创建一个 EventSource 对象,指定 SSE 服务器端点的 URL
- const eventSource = new EventSource('http://127.0.0.1:13024/environment/subscribe'); // 根据你的控制器端点来设置URL
-
- eventSource.onopen = function(event) {
-
- };
-
- // 添加事件处理程序,监听服务器端发送的事件
- eventSource.onmessage = (event) => {
- const data = event.data;
- // 在这里处理从服务器接收到的数据
- // 可以将数据显示在页面上或进行其他操作
- const newDataElement = document.createElement('p');
- newDataElement.textContent = data;
- dataContainer.appendChild(newDataElement);
- };
-
- eventSource.onerror = (error) => {
- // 处理连接错误
- console.error('Error occurred:', error);
- // 重新建立连接
- eventSource.close();
- setTimeout(() => {
- // 重新建立连接
- eventSource = new EventSource('/environment/subscribe');
- }, 1000); // 1秒后重试
- };
- script>
- body>
- html>
-
精简版后端
- import org.springframework.boot.SpringApplication;
- import org.springframework.boot.autoconfigure.SpringBootApplication;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.RestController;
-
- import java.io.IOException;
- import java.util.concurrent.CopyOnWriteArrayList;
- import java.util.concurrent.atomic.AtomicLong;
-
- @RestController
- @SpringBootApplication
- public class SseApplication {
-
- private final CopyOnWriteArrayList
emitters = new CopyOnWriteArrayList<>(); - private final AtomicLong counter = new AtomicLong();
-
- public static void main(String[] args) {
- SpringApplication.run(SseApplication.class, args);
- }
-
- @GetMapping("/sse")
- public SseEmitter handleSse() {
- SseEmitter emitter = new SseEmitter();
- emitters.add(emitter);
-
- emitter.onCompletion(() -> emitters.remove(emitter));
-
- new Thread(() -> {
- try {
- for (int i = 0; i < 10; i++) {
- emitter.send(SseEmitter.event()
- .id(String.valueOf(counter.incrementAndGet()))
- .name("message")
- .data("This is message " + i));
- Thread.sleep(1000);
- }
- emitter.complete();
- } catch (IOException | InterruptedException e) {
- emitter.completeWithError(e);
- }
- }).start();
-
- return emitter;
- }
- }