引入maven依赖
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-websocket</artifactId>
- </dependency>
WebScoket配置处理器
-
- import org.springframework.boot.web.servlet.ServletContextInitializer;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.web.socket.server.standard.ServerEndpointExporter;
- import javax.servlet.ServletContext;
-
-
- /**
- * WebScoket配置处理器
- */
- @Configuration
- public class WebSocketConfig implements ServletContextInitializer {
- /**
- * ServerEndpointExporter 作用
- *
- * 这个Bean会自动注册使用@ServerEndpoint注解声明的websocket endpoint
- *
- * @return
- */
- @Bean
- public ServerEndpointExporter serverEndpointExporter() {
- return new ServerEndpointExporter();
- }
-
- //设置websocket发送内容长度
- @Override
- public void onStartup(ServletContext servletContext) {
- servletContext.setInitParameter("org.apache.tomcat.websocket.textBufferSize","22428800");
- }
- }
webScoket消息对象
- import com.alibaba.fastjson.annotation.JSONField;
- import lombok.Data;
- import java.util.Date;
-
- /**
- * @author: ws
- * @date: 20223/10/26 15:59
- * @Description: WebSocketMessage
- */
- @Data
- public class WebSocketMessage {
-
- /**
- * 用户ID
- */
- private String fromId;
-
- /**
- * 对方ID
- */
- private String toOtherId;
- //消息内容
- private String message;
-
- //发送时间
- @JSONField(format="yyyy-MM-dd HH:mm:ss")
- public Date date;
-
- }
WebSocket操作类
- import cn.hutool.core.collection.ListUtil;
- import com.alibaba.fastjson.JSON;
- import com.ws.wxyinghang.entity.WebSocketMessage;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.stereotype.Component;
- import javax.websocket.*;
- import javax.websocket.server.PathParam;
- import javax.websocket.server.ServerEndpoint;
- import java.io.IOException;
- import java.util.Iterator;
- import java.util.List;
- import java.util.concurrent.ConcurrentHashMap;
- import java.util.concurrent.CopyOnWriteArraySet;
-
- /**
- * @author: ws
- * @date: 20223/10/26 15:59
- * @Description: WebSocket操作类
- */
- @ServerEndpoint("/websocket/{userId}")
- @Component
- @Slf4j
- public class WebSocketSever {
-
- // 与某个客户端的连接会话,需要通过它来给客户端发送数据
- private Session session;
- private String userId;
-
-
- // session集合,存放对应的session
- private static ConcurrentHashMap<String, Session> sessionPool = new ConcurrentHashMap<>();
-
- // concurrent包的线程安全Set,用来存放每个客户端对应的WebSocket对象。
- private static CopyOnWriteArraySet<WebSocketSever> webSocketSet = new CopyOnWriteArraySet<>();
- // 用于存放离线消息
- private static ConcurrentHashMap<String, List<WebSocketMessage>> offlineMessageMap = new ConcurrentHashMap();
- /**
- * 建立WebSocket连接
- *
- * @param session
- * @param userId 用户ID
- */
- @OnOpen
- public void onOpen(Session session, @PathParam(value = "userId") String userId) {
- log.info("WebSocket建立连接中,连接用户ID:{}", userId);
- try {
- Session historySession = sessionPool.get(userId);
- // historySession不为空,说明已经有人登陆账号,应该删除登陆的WebSocket对象
- if (historySession != null) {
- webSocketSet.remove(historySession);
- historySession.close();
- }
- } catch (IOException e) {
- log.error("重复登录异常,错误信息:" + e.getMessage(), e);
- }
- // 建立连接
- this.session = session;
- this.userId = userId;
- webSocketSet.add(this);
- sessionPool.put(userId, session);
- //从离线消息队列里面获取消息
- if (offlineMessageMap.containsKey(userId)) {
- List<WebSocketMessage> list = offlineMessageMap.get(userId);
- Iterator it = list.iterator();
- while (it.hasNext()) {
- Object x = it.next();
- //离线消息接收成功后删除消息
- Boolean bb = sendOfflineMessageByUser(JSON.toJSONString(x));
- if (bb) {
- System.out.println("从队列中删除离线消息" + x);
- it.remove();
- }
- }
- offlineMessageMap.remove(userId);
- }
- log.info("建立连接完成,当前在线人数为:{}", webSocketSet.size());
- }
-
- /**
- * 发生错误
- *
- * @param throwable e
- */
- @OnError
- public void onError(Throwable throwable) {
- throwable.printStackTrace();
- }
-
- /**
- * 连接关闭
- */
- @OnClose
- public void onClose() {
- webSocketSet.remove(this);
- sessionPool.remove(this.userId);
- log.info("连接断开,当前在线人数为:{}", webSocketSet.size());
- }
-
- /**
- * 接收客户端消息
- *
- * @param message 接收的消息
- */
- @OnMessage
- public void onMessage(String message) {
- log.info("收到客户端发来的消息:{}", message);
- sendMessageByUser(message);
- }
-
- /**
- * 推送消息到指定用户
- *
- * @param message 发送的消息
- */
- public static Boolean sendMessageByUser(String message) {
- WebSocketMessage msg = JSON.parseObject(message, WebSocketMessage.class);
- log.info("用户ID:" + msg.getToOtherId() + ",推送内容:" + message);
- Session session = sessionPool.get(msg.getToOtherId());
- //判断session是否正常
- if (session == null || !session.isOpen()) {
- log.info("用户ID:" + msg.getToOtherId() + ",离线,放入离线消息队列中");
- if (offlineMessageMap.containsKey(msg.getToOtherId())) {
- List<WebSocketMessage> list = offlineMessageMap.get(msg.getToOtherId());
- list.add(msg);
- offlineMessageMap.put(msg.getToOtherId(), list);
- } else {
- offlineMessageMap.put(msg.getToOtherId(), ListUtil.toList(msg));
- }
- }//发送消息
- else {
- try {
- session.getBasicRemote().sendText(message);
- } catch (IOException e) {
- log.error("推送消息到指定用户发生错误:" + e.getMessage(), e);
- return false;
- }
- }
- return true;
- }
-
- //发送离线消息
- public static Boolean sendOfflineMessageByUser(String message) {
- WebSocketMessage msg = JSON.parseObject(message, WebSocketMessage.class);
- log.info("用户ID:" + msg.getToOtherId() + ",推送内容:" + message);
- Session session = sessionPool.get(msg.getToOtherId());
- try {
- session.getBasicRemote().sendText(message);
- } catch (IOException e) {
- log.error("推送消息到指定用户发生错误:" + e.getMessage(), e);
- return false;
- }
- return true;
- }
-
- /**
- * 群发消息
- *
- * @param message 发送的消息
- */
- public static void sendAllMessage(String message) {
- log.info("发送消息:{}", message);
- for (WebSocketSever webSocket : webSocketSet) {
- try {
- webSocket.session.getBasicRemote().sendText(message);
- } catch (IOException e) {
- log.error("群发消息发生错误:" + e.getMessage(), e);
- }
- }
- }
-
- }
启动项目,使用apiFox测试,新建webScoket接口
新建websocket1,连接后发送消息
新建webScoket2 ,可以看到连接后接收到了消息
如果webScoket2断开连接后, webScoket1继续发送消息,等webScoket2连接后就会收到离线的消息。