• WebSocket消息推送


    创建WebSocket工具类

    1. package org.jmis.riskassess.config;
    2. import org.slf4j.Logger;
    3. import org.slf4j.LoggerFactory;
    4. import org.springframework.stereotype.Component;
    5. import javax.websocket.*;
    6. import javax.websocket.server.PathParam;
    7. import javax.websocket.server.ServerEndpoint;
    8. import java.io.IOException;
    9. import java.util.ArrayList;
    10. import java.util.List;
    11. import java.util.Map;
    12. import java.util.concurrent.ConcurrentHashMap;
    13. import java.util.concurrent.CopyOnWriteArraySet;
    14. @Component
    15. @ServerEndpoint(value = "/message-service")
    16. public class WebSocketUtil {
    17. private static final ConcurrentHashMap sessions = new ConcurrentHashMap<>();
    18. private static final Logger logger = LoggerFactory.getLogger(WebSocketUtil.class);
    19. public static void pushMessage(String userId, String message) {
    20. Session session = sessions.get(userId);
    21. if (session != null && session.isOpen()) {
    22. try {
    23. session.getBasicRemote().sendText(message);
    24. } catch (IOException e) {
    25. logger.error("Failed to send message to userId: " + userId, e);
    26. }
    27. } else {
    28. // 会话失效,从会话集合中移除
    29. sessions.remove(userId, session);
    30. // logger.warn("Session is invalid for userId: " + userId + ", removing from sessions");
    31. }
    32. }
    33. @OnOpen
    34. public void onOpen(Session session, EndpointConfig config) {
    35. Map> queryParams = session.getRequestParameterMap();
    36. String userId = queryParams.get("userId").get(0);
    37. sessions.put(userId, session);
    38. logger.info("WebSocket opened: " + session.getId() + ", userId: " + userId);
    39. }
    40. @OnClose
    41. public void onClose(Session session) {
    42. String closedSessionId = session.getId();
    43. sessions.entrySet().removeIf(entry -> entry.getValue().getId().equals(closedSessionId));
    44. logger.info("WebSocket closed: " + closedSessionId);
    45. }
    46. @OnMessage
    47. public void onMessage(String message, Session session) {
    48. String userId = (String) session.getUserProperties().get("userId");
    49. if (userId == null) {
    50. session.getUserProperties().put("userId", message);
    51. logger.info("User ID saved: " + message);
    52. }
    53. }
    54. public static int getSessionCount() {
    55. return sessions.size();
    56. }
    57. public static int getOpenConnectionCount() {
    58. int openConnectionCount = 0;
    59. for (Session session : sessions.values()) {
    60. if (session.isOpen()) {
    61. openConnectionCount++;
    62. }
    63. }
    64. return openConnectionCount;
    65. }
    66. public static List getOpenConnections() {
    67. List openConnections = new ArrayList<>();
    68. for (Session session : sessions.values()) {
    69. if (session.isOpen()) {
    70. openConnections.add(session);
    71. }
    72. }
    73. return openConnections;
    74. }
    75. }

    创建WebSocket配置文件

    1. package org.jmis.riskassess.config;
    2. import org.springframework.beans.factory.annotation.Autowired;
    3. import org.springframework.context.annotation.Bean;
    4. import org.springframework.context.annotation.Configuration;
    5. import org.springframework.web.socket.server.standard.ServerEndpointExporter;
    6. @Configuration
    7. public class WebSocketConfig {
    8. @Bean
    9. public ServerEndpointExporter serverEndpointExporter() {
    10. return new ServerEndpointExporter();
    11. }
    12. }

    业务逻辑

    1. package org.jmis.riskassess.safeSystemDataTask;
    2. import cn.hutool.core.collection.CollectionUtil;
    3. import com.alibaba.fastjson.JSONObject;
    4. import com.baomidou.mybatisplus.core.conditions.Wrapper;
    5. import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
    6. import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
    7. import com.fasterxml.jackson.core.JsonProcessingException;
    8. import com.fasterxml.jackson.databind.ObjectMapper;
    9. import org.jmis.riskassess.config.WebSocketUtil;
    10. import org.jmis.riskassess.entity.Message;
    11. import org.jmis.riskassess.entity.MessageUser;
    12. import org.jmis.riskassess.pojo.MessageInfo;
    13. import org.jmis.riskassess.pojo.MineFoundation;
    14. import org.jmis.riskassess.service.IMessageService;
    15. import org.jmis.riskassess.service.IMessageUserService;
    16. import org.jmis.riskassess.vo.AlarmRealtimeVO;
    17. import org.jmis.riskassess.vo.DzAlarmAcceptVO;
    18. import org.springframework.beans.factory.annotation.Autowired;
    19. import org.springframework.scheduling.annotation.Scheduled;
    20. import org.springframework.stereotype.Component;
    21. import org.springjmis.core.tool.utils.Func;
    22. import org.springjmis.core.tool.utils.ObjectUtil;
    23. import java.util.List;
    24. import java.util.Map;
    25. import java.util.stream.Collectors;
    26. @Component
    27. public class MessagePushTask {
    28. @Autowired
    29. private WebSocketUtil webSocketUtil;
    30. @Autowired
    31. private IMessageUserService messageUserService;
    32. @Autowired
    33. private IMessageService messageService;
    34. @Scheduled(fixedDelay = 10000) // 每10秒执行一次
    35. public void pushUnreadMessages() {
    36. // 查询未读消息的用户以及消息内容
    37. List> userIds = messageUserService.findUsersWithUnreadMessages();
    38. // 推送未读消息给相应的用户
    39. for (Map user : userIds) {
    40. Long idLong = (Long) user.get("id");
    41. String id = String.valueOf(idLong);
    42. String userId = user.get("user_id").toString();
    43. Object isImportant= user.get("is_important");
    44. String messageName = (String) user.get("message_name");
    45. String details = (String) user.get("details");
    46. String messageIdLong = user.get("message_id").toString();
    47. String messageId = String.valueOf(messageIdLong);
    48. String type = (String) user.get("type");
    49. // 构造推送消息的内容
    50. JSONObject pushMessage = new JSONObject();
    51. pushMessage.put("messageId", messageId);
    52. pushMessage.put("messageName", messageName);
    53. pushMessage.put("details", details);
    54. pushMessage.put("userId", userId);
    55. pushMessage.put("type", type);
    56. pushMessage.put("isImportant", isImportant);
    57. pushMessage.put("id", id);
    58. // 将推送消息转换为JSON字符串
    59. String jsonMessage = pushMessage.toJSONString();
    60. // 推送消息给用户
    61. WebSocketUtil.pushMessage(userId, jsonMessage);
    62. }
    63. }
    64. //甲烷报警推送
    65. @Scheduled(fixedDelay = 10000) // 每10秒执行一次
    66. public void alarmMessages() {
    67. // 查询未读消息的用户以及消息内容
    68. List alarmRealtimeVOS = messageUserService.alarmMessages();
    69. // 推送未读消息给相应的用户
    70. for (AlarmRealtimeVO realtimeVO : alarmRealtimeVOS) {
    71. String id = realtimeVO.getAlarmid();
    72. String userId = realtimeVO.getUserId();
    73. String messageName = "超限报警";
    74. String details = "报警类型:"+realtimeVO.getAlarmType()+","+"测点类型:"+realtimeVO.getSensorname()+","+"报警地点:"+
    75. realtimeVO.getNodeplace()+","+"报警开始时间:"+realtimeVO.getStartTime()+","+"报警持续时长:"+realtimeVO.getTimeLong()
    76. +","+"报警最大值:"+realtimeVO.getMaxdata();
    77. saveMessage(id,messageName,details,"jkAlarm");
    78. saveMessageUser(id,userId);
    79. }
    80. }
    81. //矿井超员报警推送
    82. @Scheduled(fixedDelay = 10000) // 每10秒执行一次
    83. public void mineOverAlarmMessages() {
    84. // 查询未读消息的用户以及消息内容
    85. List messageInfos = messageUserService.mineOverAlarmMessages();
    86. // 推送未读消息给相应的用户
    87. for (MessageInfo realtimeVO : messageInfos) {
    88. saveMessage(realtimeVO.getId(),realtimeVO.getMessageName(),realtimeVO.getDetails(),"ryAlarm");
    89. saveMessageUser(realtimeVO.getId(),realtimeVO.getUserId());
    90. }
    91. }
    92. //地震报警推送
    93. @Scheduled(fixedDelay = 10000) // 每10秒执行一次
    94. public void dzAlarmMessages() {
    95. // 查询未读消息的用户以及消息内容
    96. List messageInfos = messageUserService.dzAlarmMessages();
    97. // 推送未读消息给相应的用户
    98. for (MessageInfo realtimeVO : messageInfos) {
    99. String id = realtimeVO.getId();
    100. List list = Func.toStrList(realtimeVO.getUserId());
    101. String messageName = "地震报警";
    102. String details = realtimeVO.getDetails();
    103. saveMessage(id,messageName,details,"dzAlarm");
    104. for (String s : list) {
    105. saveMessageUser(id,s);
    106. }
    107. }
    108. }
    109. //天气报警推送
    110. @Scheduled(fixedDelay = 10000) // 每10秒执行一次
    111. public void tqAlarmMessages() {
    112. // 查询未读消息的用户以及消息内容
    113. List tqAlarmMessages = messageUserService.tqAlarmMessages();
    114. // 存入message表和messageUser表
    115. for (MessageInfo realtimeVO : tqAlarmMessages) {
    116. String id = realtimeVO.getId();
    117. List list = Func.toStrList(realtimeVO.getUserId());
    118. String messageName = realtimeVO.getMessageName();
    119. String details = realtimeVO.getDetails();
    120. saveMessage(id,messageName,details,"tqAlarm");
    121. for (String s : list) {
    122. saveMessageUser(id,s);
    123. }
    124. }
    125. }
    126. public void saveMessage(String id ,String messageName,String details,String type){
    127. List list = messageService.messageIdList();
    128. if (!list.contains(id)){
    129. Message message=new Message();
    130. message.setMessageId(id);
    131. message.setMessageName(messageName);
    132. message.setType(type);
    133. message.setDetails(details);
    134. messageService.save(message);
    135. }
    136. }
    137. public void saveMessageUser(String id,String userId){
    138. List messageUserServiceOne= messageUserService.getMessageUser(id,userId);
    139. if (CollectionUtil.isEmpty(messageUserServiceOne)) {
    140. MessageUser messageUser = new MessageUser();
    141. messageUser.setUserId(userId);
    142. messageUser.setMessageId(id);
    143. messageUserService.save(messageUser);
    144. }
    145. }
    146. }

  • 相关阅读:
    Element-ui源码解析(二):最简单的组件Button
    JVM堆内存转储
    qt学习之旅--MinGW32编译opencv3.0.0
    网络安全--使用Kali进行ARP欺骗(详细教程)
    Linux-6-基础IO
    python super用法
    vue中加载OCX控件(IE浏览器执行)
    年薪30万,达到人生巅峰,入职字节一个月,我却被无情碾压
    这些并发容器的坑,你要谨记!
    【云原生】Kubernetes----Ingress对外服务
  • 原文地址:https://blog.csdn.net/weixin_67601403/article/details/132731913