• SpringBoot集成WebSocket讲解


    1 WebSocket

    1.1 简介

    WebSocket 协议是基于TCP的一种新的网络协议。它实现了浏览器与服务器全双工(full-duplex)通信——允许服务器主动发送信息给客户端,建立客户端和服务器之间的通信渠道。浏览器和服务器仅需一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。
    在这里插入图片描述

    1.2 WebSocket作用和调用

    1.2.1 作用

    HTTP 是基于请求响应式的,即通信只能由客户端发起,服务端做出响应,无状态,无连接:

    • 无状态:每次连接只处理一个请求,请求结束后断开连接。
    • 无连接:对于事务处理没有记忆能力,服务器不知道客户端是什么状态。

    通过HTTP实现即时通讯,只能是页面轮询向服务器发出请求,服务器返回查询结果。轮询的效率低,非常浪费资源,因为必须不停连接,或者 HTTP 连接始终打开。
    WebSocket的最大特点就是,服务器可以主动向客户端推送信息,客户端也可以主动向服务器发送信息,是真正的双向平等对话。

    WebSocket 特点:

    • 建立在 TCP 协议之上,服务器端的实现比较容易。
    • HTTP 协议有着良好的兼容性。默认端口也是80和443,并且握手阶段采用 HTTP 协议,因此握手时不容易屏蔽,能通过各种 HTTP 代理服务器。
    • 数据格式比较轻量,性能开销小,通信高效。
    • 可以发送文本,也可以发送二进制数据。
    • 没有同源限制,客户端可以与任意服务器通信。
    • 协议标识符是 ws(如果加密,则为wss),服务器网址就是 URL

    1.2.2 js端调用

    <script>
        var ws = new WebSocket('ws://localhost:8080/webSocket/10086');
        // 获取连接状态
        console.log('ws连接状态:' + ws.readyState);
        //监听是否连接成功
        ws.onopen = function () {
            console.log('ws连接状态:' + ws.readyState);
            //连接成功则发送一个数据
            ws.send('test1');
        }
        // 接听服务器发回的信息并处理展示
        ws.onmessage = function (data) {
            console.log('接收到来自服务器的消息:');
            console.log(data);
            //完成通信后关闭WebSocket连接
            ws.close();
        }
        // 监听连接关闭事件
        ws.onclose = function () {
            // 监听整个过程中websocket的状态
            console.log('ws连接状态:' + ws.readyState);
        }
        // 监听并处理error事件
        ws.onerror = function (error) {
            console.log(error);
        }
        function sendMessage() {
            var content = $("#message").val();
            $.ajax({
                url: '/socket/publish?userId=10086&message=' + content,
                type: 'GET',
                data: { "id": "7777", "content": content },
                success: function (data) {
                    console.log(data)
                }
            })
        }
    </script>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    下面主要介绍三种方式:Javax,WebMVC,WebFlux,在Spring Boot中的服务端和客户端配置

    1.3 Javax

    java的扩展包javax.websocket中就定义了一套WebSocket的接口规范

    <dependency>
         <groupId>org.springframework.bootgroupId>
         <artifactId>spring-boot-starter-websocketartifactId>
     dependency>
    
    • 1
    • 2
    • 3
    • 4

    1.3.1 服务端

    1.3.1.1 服务端接收

    一般使用注解的方式来进行配置

    /**
     * html页面与之关联的接口
     * var reqUrl = "http://localhost:8081/websocket/" + cid;
     * socket = new WebSocket(reqUrl.replace("http", "ws"));
     */
    @Component
    @ServerEndpoint("/websocket/{type}")
    public class JavaxWebSocketServerEndpoint {
    
        @OnOpen
        public void onOpen(Session session, EndpointConfig config,
                           @PathParam(value = "type") String type) {
            //连接建立
        }
    
        @OnClose
        public void onClose(Session session, CloseReason reason) {
            //连接关闭
        }
    
        @OnMessage
        public void onMessage(Session session, String message) {
            //接收文本信息
        }
    
        @OnMessage
        public void onMessage(Session session, PongMessage message) {
            //接收pong信息
        }
    
        @OnMessage
        public void onMessage(Session session, ByteBuffer message) {
            //接收二进制信息,也可以用byte[]接收
        }
    
        @OnError
        public void onError(Session session, Throwable e) {
            //异常处理
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    我们在类上添加 @ServerEndpoint注解来表示这是一个服务端点,同时可以在注解中配置路径,这个路径可以配置成动态的,使用{}包起来就可以了

    • @OnOpen:用来标记对应的方法作为客户端连接上来之后的回调,Session就相当于和客户端的连接了,我们可以把它缓存起来用于发送消息;通过@PathParam注解就可以获得动态路径中对应值了
    • @OnClose:用来标记对应的方法作为客户端断开连接之后的回调,我们可以在这个方法中移除对应Session的缓存,同时可以接受一个CloseReason的参数用于获取关闭原因
    • @OnMessage:用来标记对应的方法作为接收到消息之后的回调,我们可以接受文本消息,二进制消息和pong消息
    • @OnError:用来标记对应的方法作为抛出异常之后的回调,可以获得对应的Session和异常对象
    1.3.1.2 服务端集成
    @Configuration(proxyBeanMethods = false)
    public class JavaxWebSocketConfiguration {
    
        @Bean
        public ServerEndpointExporter serverEndpointExporter() {
            return new ServerEndpointExporter();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    依赖SpringWebSocket模块,手动注入ServerEndpointExporter就可以了
    需要注意ServerEndpointExporterSpring中的类,算是Spring为了支持javax.websocket的原生用法所提供的支持类

    javax.websocket 库中定义了PongMessage而没有PingMessage

    通过测试发现基本上所有的WebSocket包括前端js自带的,都实现了自动回复;也就是说当接收到一个ping消息之后,是会自动回应一个pong消息,所以没有必要再自己接受ping消息来处理了,即我们不会接受到ping消息;
    当然我上面讲的ping和pong都是需要使用框架提供的api,如果是我们自己通过Message来自定义心跳数据的话是没有任何的处理的,下面是对应的api

    //发送ping
    session.getAsyncRemote().sendPing(ByteBuffer buffer);
    
    //发送pong
    session.getAsyncRemote().sendPong(ByteBuffer buffer);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    1.3.1.3 ping和pong消息

    ping 消息pong 消息都是 WebSocket 协议中的特殊消息类型,用于进行心跳保活和检测 WebSocket 连接的健康状态。

    • ping 消息:由服务器端(或客户端)发送给对端的消息。它用于发起一个心跳检测请求,要求对端回复一个 pong 消息作为响应。ping 消息通常用于检测对端的连接是否仍然处于活动状态,以及测量网络延迟。
    • pong 消息:由对端(即客户端或服务器端)作为对 ping 消息的响应发送回来。它用于确认接收到 ping 消息,并表明连接仍然活跃。

    当一方发送一个 ping 消息时,对端应该立即发送一个 pong 消息作为响应。通过交换 ping 和 pong 消息,可以检测连接是否仍然有效,以及测量网络的延迟时间。

    ping 和 pong 消息通常由 WebSocket 底层协议处理,开发人员可以通过设置相应的参数来启用或禁用这些消息的交换。一般情况下,WebSocket 客户端和服务器都会自动处理 ping 和 pong 消息,无需开发人员显式地处理。ping 和 pong 消息是属于底层协议层

    1.3.2 客户端

    1.3.2.1 客户端接收

    客户端也是使用注解配置

    @ClientEndpoint
    public class JavaxWebSocketClientEndpoint {
    
        @OnOpen
        public void onOpen(Session session) {
            //连接建立
        }
    
        @OnClose
        public void onClose(Session session, CloseReason reason) {
            //连接关闭
        }
    
        @OnMessage
        public void onMessage(Session session, String message) {
            //接收文本消息
        }
    
        @OnMessage
        public void onMessage(Session session, PongMessage message) {
            //接收pong消息
        }
    
        @OnMessage
        public void onMessage(Session session, ByteBuffer message) {
            //接收二进制消息
        }
    
        @OnError
        public void onError(Session session, Throwable e) {
            //异常处理
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    客户端使用@ClientEndpoint来标记,其他的@OnOpen,@OnClose,@OnMessage,@OnError和服务端一模一样

    1.3.2.2 客户端发送
    WebSocketContainer container = ContainerProvider.getWebSocketContainer();
    Session session = container.connectToServer(JavaxWebSocketClientEndpoint.class, uri);
    
    • 1
    • 2

    我们可以通过ContainerProvider来获得一个WebSocketContainer,然后调用connectToServer方法将我们的客户端类和连接的uri传入就行了

    通过ContainerProvider#getWebSocketContainer获得WebSocketContainer其实是基于SPI实现的
    Spring的环境中更推荐大家使用ServletContextAware来获得,代码如下

    @Component
    public class JavaxWebSocketContainer implements ServletContextAware {
    
        private volatile WebSocketContainer container;
    
        public WebSocketContainer getContainer() {
            if (container == null) {
                synchronized (this) {
                    if (container == null) {
                        container = ContainerProvider.getWebSocketContainer();
                    }
                }
            }
            return container;
        }
    
        @Override
        public void setServletContext(@NonNull ServletContext servletContext) {
            if (container == null) {
                container = (WebSocketContainer) servletContext
                    .getAttribute("javax.websocket.server.ServerContainer");
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    发消息

    Session session = container.connectToServer(JavaxWebSocketClientEndpoint.class, uri);
    
    //发送文本消息
    session.getAsyncRemote().sendText(String message);
    
    //发送二进制消息
    session.getAsyncRemote().sendBinary(ByteBuffer message);
    
    //发送对象消息,会尝试使用Encoder编码
    session.getAsyncRemote().sendObject(Object message);
    
    //发送ping
    session.getAsyncRemote().sendPing(ByteBuffer buffer);
    
    //发送pong
    session.getAsyncRemote().sendPong(ByteBuffer buffer);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    1.4 WebMVC

    pom依赖

    <dependency>
         <groupId>org.springframework.bootgroupId>
         <artifactId>spring-boot-starter-websocketartifactId>
     dependency>
    
    • 1
    • 2
    • 3
    • 4

    1.4.1 服务端

    1.1.4.1 服务端接收

    我们实现一个WebSocketHandler来处理WebSocket的连接,关闭,消息和异常

    import org.springframework.web.socket.WebSocketHandler;
    import org.springframework.web.socket.WebSocketMessage;
    import org.springframework.web.socket.WebSocketSession;
    
    public class ServletWebSocketServerHandler implements WebSocketHandler {
    
        @Override
        public void afterConnectionEstablished(@NonNull WebSocketSession session) throws Exception {
            //连接建立
        }
    
        @Override
        public void handleMessage(@NonNull WebSocketSession session, @NonNull WebSocketMessage<?> message) throws Exception {
            //接收消息
        }
    
        @Override
        public void handleTransportError(@NonNull WebSocketSession session, @NonNull Throwable exception) throws Exception {
            //异常处理
        }
    
        @Override
        public void afterConnectionClosed(@NonNull WebSocketSession session, @NonNull CloseStatus closeStatus) throws Exception {
            //连接关闭
        }
    
        @Override
        public boolean supportsPartialMessages() {
            //是否支持接收不完整的消息
            return false;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    1.1.4.2 服务端集成

    首先需要添加@EnableWebSocket来启用WebSocket
    然后实现WebSocketConfigurer来注册WebSocket路径以及对应的WebSocketHandler

    @Configuration
    @EnableWebSocket
    public class ServletWebSocketServerConfigurer implements WebSocketConfigurer {
    
        @Override
        public void registerWebSocketHandlers(@NonNull WebSocketHandlerRegistry registry) {
            registry
                //添加处理器到对应的路径
                .addHandler(new ServletWebSocketServerHandler(), "/websocket")//注册Handler
                .setAllowedOrigins("*");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    1.1.4.3 服务器握手拦截

    提供了HandshakeInterceptor来拦截握手

    @Configuration
    @EnableWebSocket
    public class ServletWebSocketServerConfigurer implements WebSocketConfigurer {
    
        @Override
        public void registerWebSocketHandlers(@NonNull WebSocketHandlerRegistry registry) {
            registry
                //添加处理器到对应的路径
                .addHandler(new ServletWebSocketServerHandler(), "/websocket")
                //添加握手拦截器
                .addInterceptors(new ServletWebSocketHandshakeInterceptor())
                .setAllowedOrigins("*");
        }
        
        public static class ServletWebSocketHandshakeInterceptor implements HandshakeInterceptor {
    
            @Override
            public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
                //握手之前
               if (request instanceof ServletServerHttpRequest) {
            	String path = request.getURI().getPath();
            	if(requestIsValid(path)){
            		String[] params = getParams(path);
            		attributes.put("WEBSOCKET_AUTH", params[0]);
            		attributes.put("WEBSOCKET_PID", params[1]);
            		attributes.put("WEBSOCKET_SN", params[2]);
            		attributes.put("WEBSOCKET_OPENID", params[3]);
            		attributes.put("WEBSOCKET_FIRSTONE","yes");
            	}
            }
            System.out.println("================Before Handshake================");
            return true;
            }
    
            @Override
            public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
                //握手之后
                System.out.println("================After Handshake================");
    	    	if(e!=null) e.printStackTrace();
    	    	System.out.println("================After Handshake================");
            }
         
         	private boolean requestIsValid(String url){
    	        //在这里可以写上具体的鉴权逻辑
    	    	boolean isvalid = false;
    	    	if(StringUtils.isNotEmpty(url)
    	    			&& url.startsWith("/netgate/")
    	    			&& url.split("/").length==6){
    	    		isvalid = true;
    	    	}
        		return isvalid;
        	}
        
    	    private String[] getParams(String url){
    	    	url = url.replace("/netgate/","");
    	    	return url.split("/");
    	    }
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    1.1.4.4 服务器地址问题

    当在集成的时候发现这种方式没办法动态匹配路径,它的路径就是固定的,没办法使用如/websocket/**这样的通配符

    在研究了一下之后发现可以在UrlPathHelper上解决

    @Configuration
    @EnableWebSocket
    public class ServletWebSocketServerConfigurer implements WebSocketConfigurer {
    
        @Override
        public void registerWebSocketHandlers(@NonNull WebSocketHandlerRegistry registry) {
            if (registry instanceof ServletWebSocketHandlerRegistry) {
                //替换UrlPathHelper
                ((ServletWebSocketHandlerRegistry) registry)
                    .setUrlPathHelper(new PrefixUrlPathHelper("/websocket"));
            }
    
            registry
                //添加处理器到对应的路径
                .addHandler(new ServletWebSocketServerHandler(), "/websocket/**")
                .setAllowedOrigins("*");
        }
        
        public class PrefixUrlPathHelper extends UrlPathHelper {
    
            private String prefix;
    		public PrefixUrlPathHelper(String prefix){this.prefix=prefix;}
            @Override
            public @NonNull String resolveAndCacheLookupPath(@NonNull HttpServletRequest request) {
                //获得原本的Path
                String path = super.resolveAndCacheLookupPath(request);
                //如果是指定前缀就返回对应的通配路径
                if (path.startsWith(prefix)) {
                    return prefix + "/**";
                }
                return path;
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    因为它内部实际上就是用一个Map来存的,所以没有办法用通配符

    1.4.2 客户端

    1.4.2.1 客户端接收

    和服务端一样我们需要先实现一个WebSocketHandler来处理WebSocket的连接,关闭,消息和异常

    public class ServletWebSocketClientHandler implements WebSocketHandler {
    
        @Override
        public void afterConnectionEstablished(@NonNull WebSocketSession session) throws Exception {
            //连接建立
        }
    
        @Override
        public void handleMessage(@NonNull WebSocketSession session, @NonNull WebSocketMessage<?> message) throws Exception {
            //接收消息
        }
    
        @Override
        public void handleTransportError(@NonNull WebSocketSession session, @NonNull Throwable exception) throws Exception {
            //异常处理
        }
    
        @Override
        public void afterConnectionClosed(@NonNull WebSocketSession session, @NonNull CloseStatus closeStatus) throws Exception {
            //连接关闭
        }
    
        @Override
        public boolean supportsPartialMessages() {
            //是否支持接收不完整的消息
            return false;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    1.4.2.2 客服端发送
    WebSocketClient client = new StandardWebSocketClient();
    WebSocketHandler handler = new ServletWebSocketClientHandler();
    WebSocketConnectionManager manager = new WebSocketConnectionManager(client, handler, uri);
    manager.start();
    
    • 1
    • 2
    • 3
    • 4

    首先我们需要先new一个StandardWebSocketClient,可以传入一个WebSocketContainer参数,获得该对象的方式上面已经介绍过了,这边就先略过

    然后new一个WebSocketConnectionManager传入WebSocketClientWebSocketHandler还有路径uri
    最后调用一下WebSocketConnectionManagerstart方法就可以了

    这里如果大家去看WebSocketClient的实现类就会发现有StandardWebSocketClient还有JettyWebSocketClient等等,所以大家可以根据自身项目所使用的容器来选择不同的WebSocketClient实现类

    这里给大家贴一小段Spring适配不同容器WebSocket的代码

    public abstract class AbstractHandshakeHandler implements HandshakeHandler, Lifecycle {
    
        private static final boolean tomcatWsPresent;
    
        private static final boolean jettyWsPresent;
    
        private static final boolean jetty10WsPresent;
    
        private static final boolean undertowWsPresent;
    
        private static final boolean glassfishWsPresent;
    
        private static final boolean weblogicWsPresent;
    
        private static final boolean websphereWsPresent;
    
        static {
            ClassLoader classLoader = AbstractHandshakeHandler.class.getClassLoader();
            tomcatWsPresent = ClassUtils.isPresent(
                "org.apache.tomcat.websocket.server.WsHttpUpgradeHandler", classLoader);
            jetty10WsPresent = ClassUtils.isPresent(
                "org.eclipse.jetty.websocket.server.JettyWebSocketServerContainer", classLoader);
            jettyWsPresent = ClassUtils.isPresent(
                "org.eclipse.jetty.websocket.server.WebSocketServerFactory", classLoader);
            undertowWsPresent = ClassUtils.isPresent(
                "io.undertow.websockets.jsr.ServerWebSocketContainer", classLoader);
            glassfishWsPresent = ClassUtils.isPresent(
                "org.glassfish.tyrus.servlet.TyrusHttpUpgradeHandler", classLoader);
            weblogicWsPresent = ClassUtils.isPresent(
                "weblogic.websocket.tyrus.TyrusServletWriter", classLoader);
            websphereWsPresent = ClassUtils.isPresent(
                "com.ibm.websphere.wsoc.WsWsocServerContainer", classLoader);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    发消息

    import org.springframework.web.socket.*;
    
    WebSocketSession session = ...
    
    //发送文本消息
    session.sendMessage(new TextMessage(CharSequence message);
    
    //发送二进制消息
    session.sendMessage(new BinaryMessage(ByteBuffer message));
    
    //发送ping
    session.sendMessage(new PingMessage(ByteBuffer message));
    
    //发送pong
    session.sendMessage(new PongMessage(ByteBuffer message));
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    1.5 WebFlux

    WebFluxWebSocket不需要额外的依赖包

    1.5.1 服务端

    1.5.1.1 服务端发送接收
    import org.springframework.web.reactive.socket.WebSocketHandler;
    import org.springframework.web.reactive.socket.WebSocketSession;
    
    public class ReactiveWebSocketServerHandler implements WebSocketHandler {
    
        @NonNull
        @Override
        public Mono<Void> handle(WebSocketSession session) {
            Mono<Void> send = session.send(Flux.create(sink -> {
                //可以持有sink对象在任意时候调用next发送消息
                sink.next(WebSocketMessage message);
            })).doOnError(it -> {
                //异常处理
            });
    
            Mono<Void> receive = session.receive()
                    .doOnNext(it -> {
                        //接收消息
                    })
                    .doOnError(it -> {
                        //异常处理
                    })
                    .then();
    
            @SuppressWarnings("all")
            Disposable disposable = session.closeStatus()
                    .doOnError(it -> {
                        //异常处理
                    })
                    .subscribe(it -> {
                        //连接关闭
                    });
    
            return Mono.zip(send, receive).then();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    首先需要注意这里的WebSocketHandlerWebSocketSessionreactive包下的:

    • 通过WebSocketSession#send方法来持有一个FluxSink来用于发送消息
    • 通过WebSocketSession#receive来订阅消息
    • 通过WebSocketSession#closeStatus来订阅连接关闭事件
    1.5.1.2 服务端集成

    注入WebSocketHandlerAdapter

    @Configuration(proxyBeanMethods = false)
    public class ReactiveWebSocketConfiguration {
    
        @Bean
        public WebSocketHandlerAdapter webSocketHandlerAdapter() {
            return new WebSocketHandlerAdapter();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    注册一个HandlerMapping同时配置路径和对应的WebSocketHandler

    @Order(Ordered.HIGHEST_PRECEDENCE)
    @Component
    public class ReactiveWebSocketServerHandlerMapping extends SimpleUrlHandlerMapping {
        public ReactiveWebSocketServerHandlerMapping() {
            Map<String, WebSocketHandler> map = new HashMap<>();
            map.put("/websocket/**", new ReactiveWebSocketServerHandler());
            setUrlMap(map);
            setOrder(100);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    注意:我们自定义的HandlerMapping需要设置order,如果不设置,默认为Ordered.LOWEST_PRECEDENCE,会导致这个HandlerMapping被放在最后,当有客户端连接上来时会被其他的HandlerMapping优先匹配上而连接失败

    1.5.2 客户端

    1.5.2.1 客户端发送接收

    客户端WebSocketHandler的写法和服务端的一样

    import org.springframework.web.reactive.socket.WebSocketHandler;
    import org.springframework.web.reactive.socket.WebSocketSession;
    
    public class ReactiveWebSocketClientHandler implements WebSocketHandler {
    
        @NonNull
        @Override
        public Mono<Void> handle(WebSocketSession session) {
            Mono<Void> send = session.send(Flux.create(sink -> {
                //可以持有sink对象在任意时候调用next发送消息
                sink.next(WebSocketMessage message);
            })).doOnError(it -> {
                //处理异常
            });
    
            Mono<Void> receive = session.receive()
                    .doOnNext(it -> {
                        //接收消息
                    })
                    .doOnError(it -> {
                        //异常处理
                    })
                    .then();
    
            @SuppressWarnings("all")
            Disposable disposable = session.closeStatus()
                    .doOnError(it -> {
                        //异常处理
                    })
                    .subscribe(it -> {
                        //连接关闭
                    });
    
            return Mono.zip(send, receive).then();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    1.5.2.2 客户端发送
    import org.springframework.web.reactive.socket.client.WebSocketClient;
    
    WebSocketClient client = ReactorNettyWebSocketClient();
    WebSocketHandler handler = new ReactiveWebSocketClientHandler();
    client.execute(uri, handler).subscribe();
    
    • 1
    • 2
    • 3
    • 4
    • 5

    首先我们需要先new一个ReactorNettyWebSocketClient
    然后调用一下WebSocketClientexecute方法传入路径uriWebSocketHandler并继续调用subscribe方法就可以了

    注意WebFluxWebMVC 中的 WebSocketClient一样,Reactive包中的WebSocketClient也有很多实现类,比如ReactorNettyWebSocketClientJettyWebSocketClientUndertowWebSocketClientTomcatWebSocketClient 等等,也是需要大家基于自身项目的容器使用不同的实现类

    这里也给大家贴一小段Reactive适配不同容器WebSocket的代码

    public class HandshakeWebSocketService implements WebSocketService, Lifecycle {
    
        private static final boolean tomcatPresent;
    
        private static final boolean jettyPresent;
    
        private static final boolean jetty10Present;
    
        private static final boolean undertowPresent;
    
        private static final boolean reactorNettyPresent;
    
        static {
            ClassLoader loader = HandshakeWebSocketService.class.getClassLoader();
            tomcatPresent = ClassUtils.isPresent("org.apache.tomcat.websocket.server.WsHttpUpgradeHandler", loader);
            jettyPresent = ClassUtils.isPresent("org.eclipse.jetty.websocket.server.WebSocketServerFactory", loader);
            jetty10Present = ClassUtils.isPresent("org.eclipse.jetty.websocket.server.JettyWebSocketServerContainer", loader);
            undertowPresent = ClassUtils.isPresent("io.undertow.websockets.WebSocketProtocolHandshakeHandler", loader);
            reactorNettyPresent = ClassUtils.isPresent("reactor.netty.http.server.HttpServerResponse", loader);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    发消息
    我们需要使用在WebSocketHandler中获得的FluxSink来发送消息

    import org.springframework.web.reactive.socket.CloseStatus;
    import org.springframework.web.reactive.socket.WebSocketMessage;
    import org.springframework.web.reactive.socket.WebSocketSession;
    
    public class ReactiveWebSocket {
    
        private final WebSocketSession session;
    
        private final FluxSink<WebSocketMessage> sender;
    
        public ReactiveWebSocket(WebSocketSession session, FluxSink<WebSocketMessage> sender) {
            this.session = session;
            this.sender = sender;
        }
    
        public String getId() {
            return session.getId();
        }
    
        public URI getUri() {
            return session.getHandshakeInfo().getUri();
        }
    
        public void send(Object message) {
            if (message instanceof WebSocketMessage) {
                sender.next((WebSocketMessage) message);
            } else if (message instanceof String) {
                //发送文本消息
                sender.next(session.textMessage((String) message));
            } else if (message instanceof DataBuffer) {
                //发送二进制消息
                sender.next(session.binaryMessage(factory -> (DataBuffer) message));
            } else if (message instanceof ByteBuffer) {
                //发送二进制消息
                sender.next(session.binaryMessage(factory -> factory.wrap((ByteBuffer) message)));
            } else if (message instanceof byte[]) {
                 //发送二进制消息
                sender.next(session.binaryMessage(factory -> factory.wrap((byte[]) message)));
            } else {
                throw new IllegalArgumentException("Message type not match");
            }
        }
    
        public void ping() {
            //发送ping
            sender.next(session.pingMessage(factory -> factory.wrap(ByteBuffer.allocate(0))));
        }
    
        public void pong() {
            //发送pong
            sender.next(session.pongMessage(factory -> factory.wrap(ByteBuffer.allocate(0))));
        }
    
        public void close(CloseStatus reason) {
            sender.complete();
            session.close(reason).subscribe();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
  • 相关阅读:
    MySQL知识总结 (六) MySQL调优
    Java项目:jsp在线考试系统
    关于使用RT-Thread系统读取stm32的adc无法连续转换的问题解决
    Django与Ajax
    英语六级范文模板
    【tg】2:视频采集的输入和输出
    XSS高级 svg 复现一个循环问题以及两个循环问题
    C语言ATM自动取款机系统项目的设计与开发
    淘宝/天猫、1688、京东API接口—item_search - 按关键字搜索淘宝商品
    Vue 移动端(H5)项目怎么实现页面缓存(即列表页面进入详情返回后列表页面缓存且还原页面滚动条位置)keep-alive缓存及清除keep-alive缓存
  • 原文地址:https://blog.csdn.net/u012060033/article/details/133703092