• SpringBoot使用WebSocket收发实时离线消息


    引入maven依赖

    1. <dependency>
    2. <groupId>org.springframework.boot</groupId>
    3. <artifactId>spring-boot-starter-websocket</artifactId>
    4. </dependency>

     WebScoket配置处理器

    1. import org.springframework.boot.web.servlet.ServletContextInitializer;
    2. import org.springframework.context.annotation.Bean;
    3. import org.springframework.context.annotation.Configuration;
    4. import org.springframework.web.socket.server.standard.ServerEndpointExporter;
    5. import javax.servlet.ServletContext;
    6. /**
    7. * WebScoket配置处理器
    8. */
    9. @Configuration
    10. public class WebSocketConfig implements ServletContextInitializer {
    11. /**
    12. * ServerEndpointExporter 作用
    13. *
    14. * 这个Bean会自动注册使用@ServerEndpoint注解声明的websocket endpoint
    15. *
    16. * @return
    17. */
    18. @Bean
    19. public ServerEndpointExporter serverEndpointExporter() {
    20. return new ServerEndpointExporter();
    21. }
    22. //设置websocket发送内容长度
    23. @Override
    24. public void onStartup(ServletContext servletContext) {
    25. servletContext.setInitParameter("org.apache.tomcat.websocket.textBufferSize","22428800");
    26. }
    27. }

    webScoket消息对象

    1. import com.alibaba.fastjson.annotation.JSONField;
    2. import lombok.Data;
    3. import java.util.Date;
    4. /**
    5. * @author: ws
    6. * @date20223/10/26 15:59
    7. * @Description: WebSocketMessage
    8. */
    9. @Data
    10. public class WebSocketMessage {
    11. /**
    12. * 用户ID
    13. */
    14. private String fromId;
    15. /**
    16. * 对方ID
    17. */
    18. private String toOtherId;
    19. //消息内容
    20. private String message;
    21. //发送时间
    22. @JSONField(format="yyyy-MM-dd HH:mm:ss")
    23. public Date date;
    24. }

    WebSocket操作类

    1. import cn.hutool.core.collection.ListUtil;
    2. import com.alibaba.fastjson.JSON;
    3. import com.ws.wxyinghang.entity.WebSocketMessage;
    4. import lombok.extern.slf4j.Slf4j;
    5. import org.springframework.stereotype.Component;
    6. import javax.websocket.*;
    7. import javax.websocket.server.PathParam;
    8. import javax.websocket.server.ServerEndpoint;
    9. import java.io.IOException;
    10. import java.util.Iterator;
    11. import java.util.List;
    12. import java.util.concurrent.ConcurrentHashMap;
    13. import java.util.concurrent.CopyOnWriteArraySet;
    14. /**
    15. * @author: ws
    16. * @date: 20223/10/26 15:59
    17. * @Description: WebSocket操作类
    18. */
    19. @ServerEndpoint("/websocket/{userId}")
    20. @Component
    21. @Slf4j
    22. public class WebSocketSever {
    23. // 与某个客户端的连接会话,需要通过它来给客户端发送数据
    24. private Session session;
    25. private String userId;
    26. // session集合,存放对应的session
    27. private static ConcurrentHashMap<String, Session> sessionPool = new ConcurrentHashMap<>();
    28. // concurrent包的线程安全Set,用来存放每个客户端对应的WebSocket对象。
    29. private static CopyOnWriteArraySet<WebSocketSever> webSocketSet = new CopyOnWriteArraySet<>();
    30. // 用于存放离线消息
    31. private static ConcurrentHashMap<String, List<WebSocketMessage>> offlineMessageMap = new ConcurrentHashMap();
    32. /**
    33. * 建立WebSocket连接
    34. *
    35. * @param session
    36. * @param userId 用户ID
    37. */
    38. @OnOpen
    39. public void onOpen(Session session, @PathParam(value = "userId") String userId) {
    40. log.info("WebSocket建立连接中,连接用户ID:{}", userId);
    41. try {
    42. Session historySession = sessionPool.get(userId);
    43. // historySession不为空,说明已经有人登陆账号,应该删除登陆的WebSocket对象
    44. if (historySession != null) {
    45. webSocketSet.remove(historySession);
    46. historySession.close();
    47. }
    48. } catch (IOException e) {
    49. log.error("重复登录异常,错误信息:" + e.getMessage(), e);
    50. }
    51. // 建立连接
    52. this.session = session;
    53. this.userId = userId;
    54. webSocketSet.add(this);
    55. sessionPool.put(userId, session);
    56. //从离线消息队列里面获取消息
    57. if (offlineMessageMap.containsKey(userId)) {
    58. List<WebSocketMessage> list = offlineMessageMap.get(userId);
    59. Iterator it = list.iterator();
    60. while (it.hasNext()) {
    61. Object x = it.next();
    62. //离线消息接收成功后删除消息
    63. Boolean bb = sendOfflineMessageByUser(JSON.toJSONString(x));
    64. if (bb) {
    65. System.out.println("从队列中删除离线消息" + x);
    66. it.remove();
    67. }
    68. }
    69. offlineMessageMap.remove(userId);
    70. }
    71. log.info("建立连接完成,当前在线人数为:{}", webSocketSet.size());
    72. }
    73. /**
    74. * 发生错误
    75. *
    76. * @param throwable e
    77. */
    78. @OnError
    79. public void onError(Throwable throwable) {
    80. throwable.printStackTrace();
    81. }
    82. /**
    83. * 连接关闭
    84. */
    85. @OnClose
    86. public void onClose() {
    87. webSocketSet.remove(this);
    88. sessionPool.remove(this.userId);
    89. log.info("连接断开,当前在线人数为:{}", webSocketSet.size());
    90. }
    91. /**
    92. * 接收客户端消息
    93. *
    94. * @param message 接收的消息
    95. */
    96. @OnMessage
    97. public void onMessage(String message) {
    98. log.info("收到客户端发来的消息:{}", message);
    99. sendMessageByUser(message);
    100. }
    101. /**
    102. * 推送消息到指定用户
    103. *
    104. * @param message 发送的消息
    105. */
    106. public static Boolean sendMessageByUser(String message) {
    107. WebSocketMessage msg = JSON.parseObject(message, WebSocketMessage.class);
    108. log.info("用户ID:" + msg.getToOtherId() + ",推送内容:" + message);
    109. Session session = sessionPool.get(msg.getToOtherId());
    110. //判断session是否正常
    111. if (session == null || !session.isOpen()) {
    112. log.info("用户ID:" + msg.getToOtherId() + ",离线,放入离线消息队列中");
    113. if (offlineMessageMap.containsKey(msg.getToOtherId())) {
    114. List<WebSocketMessage> list = offlineMessageMap.get(msg.getToOtherId());
    115. list.add(msg);
    116. offlineMessageMap.put(msg.getToOtherId(), list);
    117. } else {
    118. offlineMessageMap.put(msg.getToOtherId(), ListUtil.toList(msg));
    119. }
    120. }//发送消息
    121. else {
    122. try {
    123. session.getBasicRemote().sendText(message);
    124. } catch (IOException e) {
    125. log.error("推送消息到指定用户发生错误:" + e.getMessage(), e);
    126. return false;
    127. }
    128. }
    129. return true;
    130. }
    131. //发送离线消息
    132. public static Boolean sendOfflineMessageByUser(String message) {
    133. WebSocketMessage msg = JSON.parseObject(message, WebSocketMessage.class);
    134. log.info("用户ID:" + msg.getToOtherId() + ",推送内容:" + message);
    135. Session session = sessionPool.get(msg.getToOtherId());
    136. try {
    137. session.getBasicRemote().sendText(message);
    138. } catch (IOException e) {
    139. log.error("推送消息到指定用户发生错误:" + e.getMessage(), e);
    140. return false;
    141. }
    142. return true;
    143. }
    144. /**
    145. * 群发消息
    146. *
    147. * @param message 发送的消息
    148. */
    149. public static void sendAllMessage(String message) {
    150. log.info("发送消息:{}", message);
    151. for (WebSocketSever webSocket : webSocketSet) {
    152. try {
    153. webSocket.session.getBasicRemote().sendText(message);
    154. } catch (IOException e) {
    155. log.error("群发消息发生错误:" + e.getMessage(), e);
    156. }
    157. }
    158. }
    159. }

    启动项目,使用apiFox测试,新建webScoket接口

    新建websocket1,连接后发送消息 

     

    新建webScoket2 ,可以看到连接后接收到了消息 

     

    如果webScoket2断开连接后, webScoket1继续发送消息,等webScoket2连接后就会收到离线的消息。

  • 相关阅读:
    CTF--Web安全--SQL注入之‘绕过方法’
    将自己本地项目上传到git,增加IDEA操作
    MyBatis(4)---多表查询
    JAVA 中集合取交集
    Cisdem Video Player for mac(高清视频播放器) v5.6.0中文版
    物理学专业英语(词汇整理)--------05
    链表相关的一些问题
    C++ Reference: Standard C++ Library reference: C Library: cwchar: wcrtomb
    java常用类
    【HarmonyOS4学习笔记】《HarmonyOS4+NEXT星河版入门到企业级实战教程》课程学习笔记(三)
  • 原文地址:https://blog.csdn.net/letterss/article/details/134059686