创建WebSocket工具类
- package org.jmis.riskassess.config;
-
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.stereotype.Component;
-
- import javax.websocket.*;
- import javax.websocket.server.PathParam;
- import javax.websocket.server.ServerEndpoint;
- import java.io.IOException;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.Map;
- import java.util.concurrent.ConcurrentHashMap;
- import java.util.concurrent.CopyOnWriteArraySet;
- @Component
- @ServerEndpoint(value = "/message-service")
- public class WebSocketUtil {
-
- private static final ConcurrentHashMap
sessions = new ConcurrentHashMap<>(); - private static final Logger logger = LoggerFactory.getLogger(WebSocketUtil.class);
-
- public static void pushMessage(String userId, String message) {
- Session session = sessions.get(userId);
- if (session != null && session.isOpen()) {
- try {
- session.getBasicRemote().sendText(message);
- } catch (IOException e) {
- logger.error("Failed to send message to userId: " + userId, e);
- }
- } else {
- // 会话失效,从会话集合中移除
- sessions.remove(userId, session);
- // logger.warn("Session is invalid for userId: " + userId + ", removing from sessions");
- }
- }
-
-
- @OnOpen
- public void onOpen(Session session, EndpointConfig config) {
- Map
> queryParams = session.getRequestParameterMap(); - String userId = queryParams.get("userId").get(0);
- sessions.put(userId, session);
- logger.info("WebSocket opened: " + session.getId() + ", userId: " + userId);
- }
-
- @OnClose
- public void onClose(Session session) {
- String closedSessionId = session.getId();
- sessions.entrySet().removeIf(entry -> entry.getValue().getId().equals(closedSessionId));
- logger.info("WebSocket closed: " + closedSessionId);
- }
-
- @OnMessage
- public void onMessage(String message, Session session) {
- String userId = (String) session.getUserProperties().get("userId");
- if (userId == null) {
- session.getUserProperties().put("userId", message);
- logger.info("User ID saved: " + message);
- }
- }
- public static int getSessionCount() {
- return sessions.size();
- }
-
- public static int getOpenConnectionCount() {
- int openConnectionCount = 0;
- for (Session session : sessions.values()) {
- if (session.isOpen()) {
- openConnectionCount++;
- }
- }
- return openConnectionCount;
- }
-
- public static List
getOpenConnections() { - List
openConnections = new ArrayList<>(); - for (Session session : sessions.values()) {
- if (session.isOpen()) {
- openConnections.add(session);
- }
- }
- return openConnections;
- }
- }
创建WebSocket配置文件
- package org.jmis.riskassess.config;
-
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.web.socket.server.standard.ServerEndpointExporter;
-
-
- @Configuration
- public class WebSocketConfig {
- @Bean
- public ServerEndpointExporter serverEndpointExporter() {
- return new ServerEndpointExporter();
- }
- }
- package org.jmis.riskassess.safeSystemDataTask;
-
- import cn.hutool.core.collection.CollectionUtil;
- import com.alibaba.fastjson.JSONObject;
- import com.baomidou.mybatisplus.core.conditions.Wrapper;
- import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
- import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
- import com.fasterxml.jackson.core.JsonProcessingException;
- import com.fasterxml.jackson.databind.ObjectMapper;
- import org.jmis.riskassess.config.WebSocketUtil;
- import org.jmis.riskassess.entity.Message;
- import org.jmis.riskassess.entity.MessageUser;
- import org.jmis.riskassess.pojo.MessageInfo;
- import org.jmis.riskassess.pojo.MineFoundation;
- import org.jmis.riskassess.service.IMessageService;
- import org.jmis.riskassess.service.IMessageUserService;
- import org.jmis.riskassess.vo.AlarmRealtimeVO;
- import org.jmis.riskassess.vo.DzAlarmAcceptVO;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.scheduling.annotation.Scheduled;
- import org.springframework.stereotype.Component;
- import org.springjmis.core.tool.utils.Func;
- import org.springjmis.core.tool.utils.ObjectUtil;
-
- import java.util.List;
- import java.util.Map;
- import java.util.stream.Collectors;
-
- @Component
- public class MessagePushTask {
- @Autowired
- private WebSocketUtil webSocketUtil;
-
- @Autowired
- private IMessageUserService messageUserService;
- @Autowired
- private IMessageService messageService;
-
- @Scheduled(fixedDelay = 10000) // 每10秒执行一次
- public void pushUnreadMessages() {
- // 查询未读消息的用户以及消息内容
- List
-
- // 推送未读消息给相应的用户
- for (Map
user : userIds) { - Long idLong = (Long) user.get("id");
- String id = String.valueOf(idLong);
- String userId = user.get("user_id").toString();
- Object isImportant= user.get("is_important");
- String messageName = (String) user.get("message_name");
- String details = (String) user.get("details");
- String messageIdLong = user.get("message_id").toString();
- String messageId = String.valueOf(messageIdLong);
- String type = (String) user.get("type");
- // 构造推送消息的内容
- JSONObject pushMessage = new JSONObject();
- pushMessage.put("messageId", messageId);
- pushMessage.put("messageName", messageName);
- pushMessage.put("details", details);
- pushMessage.put("userId", userId);
- pushMessage.put("type", type);
- pushMessage.put("isImportant", isImportant);
- pushMessage.put("id", id);
- // 将推送消息转换为JSON字符串
- String jsonMessage = pushMessage.toJSONString();
-
- // 推送消息给用户
- WebSocketUtil.pushMessage(userId, jsonMessage);
- }
- }
- //甲烷报警推送
- @Scheduled(fixedDelay = 10000) // 每10秒执行一次
- public void alarmMessages() {
- // 查询未读消息的用户以及消息内容
- List
alarmRealtimeVOS = messageUserService.alarmMessages(); - // 推送未读消息给相应的用户
- for (AlarmRealtimeVO realtimeVO : alarmRealtimeVOS) {
- String id = realtimeVO.getAlarmid();
- String userId = realtimeVO.getUserId();
- String messageName = "超限报警";
- String details = "报警类型:"+realtimeVO.getAlarmType()+","+"测点类型:"+realtimeVO.getSensorname()+","+"报警地点:"+
- realtimeVO.getNodeplace()+","+"报警开始时间:"+realtimeVO.getStartTime()+","+"报警持续时长:"+realtimeVO.getTimeLong()
- +","+"报警最大值:"+realtimeVO.getMaxdata();
- saveMessage(id,messageName,details,"jkAlarm");
- saveMessageUser(id,userId);
- }
- }
-
- //矿井超员报警推送
- @Scheduled(fixedDelay = 10000) // 每10秒执行一次
- public void mineOverAlarmMessages() {
- // 查询未读消息的用户以及消息内容
- List
messageInfos = messageUserService.mineOverAlarmMessages(); - // 推送未读消息给相应的用户
- for (MessageInfo realtimeVO : messageInfos) {
-
- saveMessage(realtimeVO.getId(),realtimeVO.getMessageName(),realtimeVO.getDetails(),"ryAlarm");
- saveMessageUser(realtimeVO.getId(),realtimeVO.getUserId());
- }
- }
-
- //地震报警推送
- @Scheduled(fixedDelay = 10000) // 每10秒执行一次
- public void dzAlarmMessages() {
- // 查询未读消息的用户以及消息内容
- List
messageInfos = messageUserService.dzAlarmMessages(); - // 推送未读消息给相应的用户
- for (MessageInfo realtimeVO : messageInfos) {
- String id = realtimeVO.getId();
- List
list = Func.toStrList(realtimeVO.getUserId()); - String messageName = "地震报警";
- String details = realtimeVO.getDetails();
- saveMessage(id,messageName,details,"dzAlarm");
- for (String s : list) {
- saveMessageUser(id,s);
- }
-
- }
- }
-
- //天气报警推送
- @Scheduled(fixedDelay = 10000) // 每10秒执行一次
- public void tqAlarmMessages() {
- // 查询未读消息的用户以及消息内容
- List
tqAlarmMessages = messageUserService.tqAlarmMessages(); - // 存入message表和messageUser表
- for (MessageInfo realtimeVO : tqAlarmMessages) {
- String id = realtimeVO.getId();
- List
list = Func.toStrList(realtimeVO.getUserId()); - String messageName = realtimeVO.getMessageName();
- String details = realtimeVO.getDetails();
- saveMessage(id,messageName,details,"tqAlarm");
- for (String s : list) {
- saveMessageUser(id,s);
- }
- }
-
- }
-
-
- public void saveMessage(String id ,String messageName,String details,String type){
- List
list = messageService.messageIdList(); - if (!list.contains(id)){
- Message message=new Message();
- message.setMessageId(id);
- message.setMessageName(messageName);
- message.setType(type);
- message.setDetails(details);
- messageService.save(message);
- }
- }
-
- public void saveMessageUser(String id,String userId){
-
- List
messageUserServiceOne= messageUserService.getMessageUser(id,userId); - if (CollectionUtil.isEmpty(messageUserServiceOne)) {
- MessageUser messageUser = new MessageUser();
- messageUser.setUserId(userId);
- messageUser.setMessageId(id);
- messageUserService.save(messageUser);
- }
- }
-
- }
-
-
-
-