springboot
实现websocket
有4
种方式
servlet,spring,netty,stomp
使用下来spring
方式是最简单的.
springboot
版本:3.1.2
jdk:17
当前依赖版本
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-websocketartifactId>
<version>3.1.2version>
dependency>
demo
借鉴了ruoyi-plus
spring
实现websocket
处理核心就是AbstractWebSocketHandler+HandshakeInterceptor
HandshakeInterceptor
实现握手前后处理,比如握手前鉴权
AbstractWebSocketHandler
实现消息的接收
,监听连接的建立+关闭
,顺序在HandshakeInterceptor
之后.
拦截器
package org.xxx.xxx.websocket.interceptor;
import cn.dev33.satoken.session.SaSession;
import cn.dev33.satoken.stp.StpUtil;
import cn.hutool.core.lang.Assert;
import org.xxx.common.core.domain.model.LoginUser;
import org.xxx.common.core.exception.ServiceException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.util.CollectionUtils;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;
import java.util.List;
import java.util.Map;
import static org.qps.common.websocket.constant.WebSocketConstants.LOGIN_USER_KEY;
/**
* WebSocket握手请求的拦截器
*/
@Slf4j
public class PlusWebSocketInterceptor implements HandshakeInterceptor {
/**
* 握手前
*
* @param request request
* @param response response
* @param wsHandler wsHandler
* @param attributes attributes
* @return 是否握手成功
*/
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) {
attributes.put(LOGIN_USER_KEY, getLoginUser(request));
List<String> tokenList = request.getHeaders().get("Sec-Websocket-Protocol");
//再塞回协议头里
response.getHeaders().put("Sec-Websocket-Protocol", tokenList);
return true;
}
private LoginUser getLoginUser(ServerHttpRequest request){
List<String> tokenList = request.getHeaders().get("Sec-Websocket-Protocol");
if(CollectionUtils.isEmpty(tokenList)){
throw new ServiceException("用户未登录");
}
//请求头的子协议中获取token
String token = tokenList.get(0);
return loginUser;
}
/**
* 握手后
*
* @param request request
* @param response response
* @param wsHandler wsHandler
* @param exception 异常
*/
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
}
}
消息接收+连接建立+关闭处理
package org.xxx.common.websocket.handler;
import org.xxx.common.core.domain.model.LoginUser;
import org.xxx.common.websocket.dto.WebSocketMessageDto;
import org.xxx.common.websocket.holder.WebSocketSessionHolder;
import org.xxx.common.websocket.utils.WebSocketUtils;
import lombok.extern.slf4j.Slf4j;
import org.xxx.common.websocket.constant.WebSocketConstants;
import org.springframework.web.socket.*;
import org.springframework.web.socket.handler.AbstractWebSocketHandler;
import java.util.List;
import java.util.Objects;
/**
* WebSocketHandler 实现类
*/
@Slf4j
public class PlusWebSocketHandler extends AbstractWebSocketHandler {
/**
* 连接成功后
*/
@Override
public void afterConnectionEstablished(WebSocketSession session) {
LoginUser loginUser = (LoginUser) session.getAttributes().get(WebSocketConstants.LOGIN_USER_KEY);
WebSocketSessionHolder.addSession(loginUser.getUserId(), session);
log.info("[connect] sessionId: {},userId:{},userType:{}", session.getId(), loginUser.getUserId(), loginUser.getUserType());
}
/**
* 处理发送来的文本消息
*
* @param session
* @param message
* @throws Exception
*/
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
String msgPayload = message.getPayload();
//todo 业务操作
}
@Override
protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) throws Exception {
super.handleBinaryMessage(session, message);
}
/**
* 心跳监测的回复
* 针对ping帧的恢复,非text形式的消息接收
* @param session
* @param message
* @throws Exception
*/
@Override
protected void handlePongMessage(WebSocketSession session, PongMessage message) throws Exception {
WebSocketUtils.sendPongMessage(session);
}
/**
* 连接出错时
*
* @param session
* @param exception
* @throws Exception
*/
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
log.error("[transport error] sessionId: {} , exception:{}", session.getId(), exception.getMessage());
}
/**
* 连接关闭后
*
* @param session
* @param status
*/
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
//连接关闭
}
/**
* 是否支持分片消息
*
* @return
*/
@Override
public boolean supportsPartialMessages() {
return false;
}
}
配置类
package org.xxx.common.websocket.config;
import cn.hutool.core.util.StrUtil;
import org.xxx.common.websocket.config.properties.WebSocketProperties;
import org.xxx.common.websocket.handler.PlusWebSocketHandler;
import org.xxx.common.websocket.interceptor.PlusWebSocketInterceptor;
import org.xxx.common.websocket.listener.WebSocketTopicListener;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.server.HandshakeInterceptor;
import org.springframework.web.socket.server.standard.ServletServerContainerFactoryBean;
/**
* WebSocket 配置
*/
@AutoConfiguration
//我是走的自定义配置文件是否允许开启websocket
@ConditionalOnProperty(value = "websocket.enabled", havingValue = "true")
@EnableConfigurationProperties(WebSocketProperties.class)
@EnableWebSocket
public class WebSocketConfig{
@Bean
public WebSocketConfigurer webSocketConfigurer(HandshakeInterceptor handshakeInterceptor,
WebSocketHandler webSocketHandler,
WebSocketProperties webSocketProperties) {
if (StrUtil.isBlank(webSocketProperties.getPath())) {
webSocketProperties.setPath("/websocket");
}
if (StrUtil.isBlank(webSocketProperties.getAllowedOrigins())) {
webSocketProperties.setAllowedOrigins("*");
}
return registry -> registry
.addHandler(webSocketHandler, webSocketProperties.getPath())
.addInterceptors(handshakeInterceptor)
.setAllowedOrigins(webSocketProperties.getAllowedOrigins());
}
@Bean
public HandshakeInterceptor handshakeInterceptor() {
return new PlusWebSocketInterceptor();
}
@Bean
public WebSocketHandler webSocketHandler() {
return new PlusWebSocketHandler();
}
/**
* 自定义服务器属性容器配置
*
* @return 自定义服务器容器属性配置
*/
@Bean
public ServletServerContainerFactoryBean createWebSocketContainer() {
ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
//session超时时间25s,客户端25s没有交互就断开连接
container.setMaxSessionIdleTimeout(25 * 1000L);
container.setMaxTextMessageBufferSize(10 * 1024);
container.setMaxBinaryMessageBufferSize(10 * 1024);
return container;
}
}