WebSocket是一种在单个TCP连接上进行全双工通信的协议。WebSocket通信协议于2011年被IETF定为标准RFC 6455,并由RFC7936补充规范。WebSocket API也被W3C定为标准。
WebSocket使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在WebSocket API中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。
在spring中,使用最基础的spring-websocket的方式有两种。一种是利用注解的方式,一种是利用直接通过实现WebSocketConfigurer配置来实现。
本次先实现第一种实现WebSocketConfigurer配置来实现。
先引入依赖:
org.springframework.boot spring-boot-starter-websocket
首先,需要实现WebSocketHandler接口,WebSocketHandler定义了处理连接、处理数据、关闭连接、以及连接异常锁调用的方法,在实现类中我们可以定义需要的操作。
- public class WebSocketEndPointHandler implements WebSocketHandler {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketEndPointHandler.class);
-
- private static WebSessionSendService webSessionSendService = new WebSessionSendService();
-
- /**
- * 连接处理
- */
- @Override
- public void afterConnectionEstablished(WebSocketSession session) throws Exception {
- LOGGER.info("WebSocketEndPointHandler:afterConnectionEstablished");
- List
appIds = session.getHandshakeHeaders().get("Sec-WebSocket-Protocol"); - webSessionSendService.addAppSession(appIds.get(0), session);
- }
-
- /**
- * 处理客户端发送过来的数据
- * @param message 消息
- */
- @Override
- public void handleMessage(WebSocketSession session, WebSocketMessage> message) throws Exception {
- LOGGER.info("收到消息:{}", message.getPayload());
- }
-
- /**
- * 处理传输过程中发生的错误
- */
- @Override
- public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
- LOGGER.info("------WebSocketEndPointHandler:handleTransportError");
- }
-
-
- /**
- * 处理连接关闭之后触发操作
- * @param closeStatus 关闭的状态信息
- */
- @Override
- public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
- LOGGER.info("------WebSocketEndPointHandler:afterConnectionClosed");
- List
appIds = session.getHandshakeHeaders().get("Sec-WebSocket-Protocol"); - webSessionSendService.deleteAppSession(appIds.get(0));
- }
-
- @Override
- public boolean supportsPartialMessages() {
- return false;
- }
- }
为了实现从后端将数据发送到前端。所以我们需要记录下每一个前端websocket连接的WebSocketSession。在需要发送数据的时候取出来判断并发送消息。
- @Service
- public class WebSessionSendService {
-
- private static ConcurrentHashMap
sessionPool = new ConcurrentHashMap<>(); -
-
- public void addAppSession(String appId, WebSocketSession session){
- sessionPool.put(appId, session);
- }
-
- public void deleteAppSession(String appId){
- sessionPool.remove(appId);
- }
-
- public void sendMessage(String appId, String message){
- WebSocketSession session = sessionPool.get(appId);
- if(session == null || !session.isOpen()){
- return;
- }
- TextMessage textMessage = new TextMessage(message);
- try {
- session.sendMessage(textMessage);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- }
其次,实现一个拦截器,用户拦截websocket过来的握手请求,这个可以根据自身需要决定要不要实现。拦截可以实现 HttpSessionHandshakeInterceptor或HandshakeInterceptor都可以。
- public class MessageHandshakeInterceptor extends HttpSessionHandshakeInterceptor {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(MessageHandshakeInterceptor.class);
-
- @Override
- public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map
attributes) throws Exception { - LOGGER.info("---------MessageHandshakeInterceptor:beforeHandshake");
- super.beforeHandshake(request, response, wsHandler, attributes);
- //这里可以拦截请求头, 做用户认证。
- List
appIds = request.getHeaders().get("Sec-WebSocket-Protocol"); - if(CollectionUtils.isEmpty(appIds)){
- LOGGER.info("websocket缺少用户认证信息");
- return false;
- }
- response.getHeaders().set("Sec-WebSocket-Protocol", appIds.get(0));
- return true;
- }
-
- @Override
- public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
- LOGGER.info("--------MessageHandshakeInterceptor:afterHandshake");
- super.afterHandshake(request, response, wsHandler, exception);
- }
- }
最后是配置项。通过实现WebSocketConfigurer配置相关信息。
- @Configuration
- @EnableWebSocket
- public class PathWebSocketConfig implements WebSocketConfigurer {
-
- @Override
- public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
- //websocket地址以及处理器,可以多个
- registry.addHandler(new WebSocketEndPointHandler(), "/websocket/log/getLog")
- .setAllowedOrigins("*")
- //自定义的 HandshakeHandler实现类会报错,springboot默认需要用 AbstractHandshakeHandler
- //.setHandshakeHandler(new WebSocketHandshakeHandler())
- .addInterceptors(new MessageHandshakeInterceptor()); //设置拦截器
- }
-
- }
我的前端代码是在自己的搭建的一个以vue为基础的项目中测试的。
- <template>
- <div class="f_c float_l">
- <div class="m_10">
- <el-input class='input_common' v-model="websocketPath" placeholder="请输入后端websocket地址" >el-input>
- <el-button v-if="!connected" @click=" initWebSocket" >连接el-button>
- <el-button v-else @click="disConnectWebsocket" >断开el-button>
- div>
- <div class="f_c" v-if="connected">
- <el-button @click="sendMessage">发送el-button>
- <el-input class='input_common mt_10' v-model="message" type="textarea" :rows="2" placeholder="请输入需要发送的消息">el-input>
- div>
- <el-divider />
- <div class="m_10 f_c" v-if="connected">
- <span class="float_l">收到消息span>
- <span style="border: aqua; width: 500px; height: 100px">{{receiveMessage}}span>
- div>
-
- div>
- template>
-
- <script>
- export default {
- name: "index",
- data(){
- return {
- websocketPath:'localhost:7000/websocket-demo/websocket/log/getLog',
- receiveMessage:'',
- message:'',
- connected: false,
- ws:null,
- number:0,
- heartbeatIntervalId: null, //客户端心跳定时发送
- timeoutId: null //定时检测服务端发送过来的心跳
- }
- },
- methods: {
- initWebSocket() {
- this.ws = new WebSocket('ws://' + this.websocketPath, ['123456789']);
- this.ws.onmessage = this.websocketOnmessage;
- this.ws.onopen = this.websocketOpen;
- this.ws.onerror = this.websocketError;
- this.ws.onclose = this.websocketOnclose;
- },
- websocketOpen() {
- console.log('websocket onopen', this.ws.readyState);
- this.connected = true;
- // 开始心跳检测
- this.startHeartbeat();
- },
- websocketError(event) {
- console.log('websocket onerror', this.ws.readyState);
- this.connected =false
- clearInterval(this.heartbeatIntervalId);
- clearTimeout(this.timeoutId);
- },
- websocketOnclose(event){
- // WebSocket关闭时的处理
- console.log('WebSocket disconnected');
- // 清除心跳检测定时器
- clearInterval(this.heartbeatIntervalId);
- clearTimeout(this.timeoutId);
- this.connected = false;
- },
- websocketOnmessage(event) {
- console.log(`收到消息:${event.data}`);
- let nowMessage = '第' + this.number + '次收到订阅的消息:' + event.data + '\r\n';
- this.receiveMessage += nowMessage;
- this.number++
- this.resetHeartbeat();
- },
- sendMessage() {
- if (this.ws.readyState === WebSocket.OPEN) {
- let param = {
- message: this.message
- }
- this.ws.send(JSON.stringify(param));
- }
- },
- startHeartbeat() {
- let _that = this
- //开始心跳检测
- this.heartbeatIntervalId = setInterval( function () {
- //根据实际情况发送正确的心跳消息
- _that.ws.send('HEARTBEAT');
- }, 8 * 1000)
- this.timeoutId = setTimeout(() => {
- console.error('Heartbeat timeout, reconnecting...');
- console.log(this.ws)
- this.ws.close();
- }, 15 * 1000)
- },
- resetHeartbeat() {
- clearTimeout(this.timeoutId);
- let _that = this
- this.timeoutId = window.setTimeout(function() {
- console.error('Heartbeat timeout, reconnecting...');
- _that.ws.close();
- }, 15 * 1000);
- },
- disConnectWebsocket(){
- this.ws.close();
- clearInterval(this.heartbeatIntervalId);
- clearTimeout(this.timeoutId);
- }
- }
- }
- script>
-
- <style scoped>
-
- style>
启动之后就可以填写我们的websocket地址。就可以连接了。