• WebSocket学习笔记


    一篇文章理解WebSocket原理
    一文搞懂四种 WebSocket 使用方式
    Spring Boot 中的 WebSocketSession 是什么,原理,如何使用

    1.HTTP协议(半双工通信):

    HTTP是客户端向服务器发起请求,服务器返回响应给客户端的一种模式。

    特点:

    1.只能是客户端向服务器发起请求,是单向的。

    2.服务器不能主动发送数据给客户端。

    半双工通信的局限性也从中体现出来,同一时刻数据的传输只能是单向的,想在某一段时间内监听服务器是否有新数据的更新就要不停的从客户端这边发起请求,如果服务器有数据更新那么就会返回响应。那么这种做法是特别消耗性能的,想到一种更优的办法就是监听服务器如果有数据改变就立刻返回响应,不需要客户端一直不停的请求。

    举个例子,HTTP协议就是,小明要去超市买薯片,老板说没有,过了一会小明又跑来超市买薯片,老板还是说没有,这样反反复复过了很多次,超市进货的薯片终于到了,小明也拿到薯片了。这样感觉是不是特别麻烦呢?如果使用WebSocket协议就是,小明把他的电话和地址给了超市老板,当超市进货的薯片到了后,老板第一时间给小明打电话告诉他薯片到了,小明可以自己来拿,也可以超市老板送货上门。这样是不是就更省时更省事呢?

    2.WebSocket协议(全双工通信):

    WbeSocket 是 Html5 开始提供的一种浏览器与服务器之间进行全双工通信的协议(websocket协议本质上是一个基于tcp的协议),它实现了浏览器与服务器全双工通信,能更好的节省服务器资源和带宽并达到实时通讯的目的,属于应用层,基于TCP协议,并且复用HTTP握手通道,是一个持久化的协议

    简单来说,建立一个Websocket连接,客户端浏览器首先要向服务器发起一个HTTP请求,这个请求头中包含了特殊的"Upgrade: WebSocket"信息表明这是一个从HTTP升级到WebSocket的请求,服务器解析之后返回响应给客户端并建立了WebSocket连接。

    3.WebSocket 与 HTTP 的关系:

    在这里插入图片描述

    在这里插入图片描述

    相同点:

    都是基于TCP协议的,都是可靠性传输协议。

    都是应用层协议

    不同点:

    WebSocket是全双工通信协议,模拟Socket协议,可以双向发送或接收信息。

    HTTP是单向通信的。

    WebSocket是需要浏览器和服务器握手建立连接的。

    HTTP是浏览器发请求向服务器的连接,而服务器则不会提前知道这个连接。

    3.http和WebSocket的联系:

    WebSocket在建立握手是,数据是通过HTTP传输的,但是建立了连接后,传输则不需要HTTP协议。

    总体过程:

    客户端发起HTTP请求,请过三次握手后与服务器建立TCP连接,HTTP请求中包含了WebSocket的版本号信息:Upgrade、Connection、WebSocket-Version等。

    服务器接收到客户端的握手请求后,使用HTTP协议返回响应给客户端。

    最后,客户端收到连接成功消息后,可以借助TCP传输协议和服务器进行全双工通信。

    4.WebSocket特点:

    1. WebSocket约定了一个通信的规范,通过一个握手机制,将客户端与服务器端进行一个类似TCP的连接,实现了通信。

    2. 在使用WebSocket之前,客户端与服务器端的交互是基于HTTP协议短连接长连接

    3. WebSocket的协议名是"ws",是一种全新的协议,不属于HTTP无状态协议

    WebSocket和socket的区分:从本质上来说,socket并不是一个新的协议,它只是为了便于程序员进行网络编程而对tcp/ip协议族通信机制的一种封装。

    5.实现WebSocket用例:

    事件

    说明

    open

    连接建立时触发

    message

    客户端接收到服务器消息时触发

    error

    通信出现错误时触发

    close

    连接关闭时触发

    send

    客户端给服务器发送数据

    5.1 java api实现

    import org.java_websocket.WebSocket;
    import org.java_websocket.handshake.ClientHandshake;
    import org.java_websocket.server.WebSocketServer;
    
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.net.InetSocketAddress;
    
    public class SocketServer extends WebSocketServer {
    
        public static void main(String[] args) throws InterruptedException, IOException {
            int port = 8887; // 843 flash policy port
    
            SocketServer s = new SocketServer(port);
            s.start();
            System.out.println("ChatServer started on port: " + s.getPort());
    
            BufferedReader sysIn = new BufferedReader(new InputStreamReader(System.in));
            while (true) {
                String in = sysIn.readLine();
                s.broadcast(in);
                if (in.equals("exit")) {
                    s.stop(1000);
                    break;
                }
            }
        }
    
        public SocketServer(int port) {
            super(new InetSocketAddress(port));
        }
    
        @Override
        public void onOpen(WebSocket conn, ClientHandshake handshake) {
            conn.send("Welcome to the server!"); // This method sends a message to the new client
            broadcast("new connection: " + handshake
                    .getResourceDescriptor()); // This method sends a message to all clients connected
            System.out.println(
                    conn.getRemoteSocketAddress().getAddress().getHostAddress() + " entered the room!");
    
        }
    
        @Override
        public void onClose(WebSocket conn, int code, String reason, boolean remote) {
            broadcast(conn + " has left the room!");
            System.out.println(conn + " has left the room!");
    
        }
    
        @Override
        public void onMessage(WebSocket conn, String message) {
    
            broadcast(message);
            System.out.println(conn + ": " + message);
        }
    
        @Override
        public void onError(WebSocket conn, Exception ex) {
            ex.printStackTrace();
            if (conn != null) {
                // some errors like port binding failed may not be assignable to a specific
                // websocket
            }
    
        }
    
        @Override
        public void onStart() {
            System.out.println("Server started!");
            setConnectionLostTimeout(0);
            setConnectionLostTimeout(100);
        }
    
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77

    启动服务
    http://www.websocket-test.com/
    进入此网站,连接本地websokcet服务ws://127.0.0.1:8887
    在这里插入图片描述
    在这里插入图片描述
    可以互相发送消息

    5.2 springboot结合redis发布订阅实现发给其他人

    5.2.1 WebSocketSession 概念

    WebSocketSession 是一个 WebSocket 连接的会话对象。每当客户端与服务器建立一个 WebSocket 连接时,服务器都会创建一个新的 WebSocketSession 对象。WebSocketSession 对象代表了服务器和客户端之间的一个持久连接,可以用来发送和接收消息。

    WebSocketSession 接口定义了一组用于与客户端进行通信的方法。这些方法包括:

    void sendMessage(TextMessage message):发送文本消息。
    void sendMessage(BinaryMessage message):发送二进制消息。
    void sendMessage(PongMessage message):发送 Pong 消息。
    void close():关闭 WebSocket 连接。
    boolean isOpen():检查 WebSocket 连接是否打开。

    WebSocketSession 还提供了一些其他的方法,例如获取会话 ID、获取远程地址等。

    5.2.2 WebSocketSession 原理

    在使用 WebSocketSession 之前,我们需要了解一些 WebSocket 的原理。

    WebSocket 协议是一个基于 HTTP 的协议。在客户端和服务器建立 WebSocket 连接之前,客户端和服务器之间首先要建立一个普通的 HTTP 连接。当客户端发送一个包含 WebSocket 握手信息的 HTTP 请求时,服务器会将其升级为 WebSocket 连接。在升级完成后,客户端和服务器之间的通信就变成了基于 WebSocket 协议的双向通信。

    在 Spring Boot 中,使用 WebSocketSession 进行通信的过程与上述原理类似。当客户端和服务器建立 WebSocket 连接时,服务器会创建一个新的 WebSocketSession 对象。客户端和服务器之间的通信就是通过这个 WebSocketSession 对象进行的。

    5.2.3 WebSocketSession 使用

    在 Spring Boot 中使用 WebSocketSession 需要进行以下步骤:

    添加依赖
    首先,我们需要在项目中添加 Spring Boot 的 WebSocket 依赖。在 Maven 中,可以通过以下方式添加依赖:

    <dependency>
        <groupId>org.springframework.bootgroupId>
        <artifactId>spring-boot-starter-websocketartifactId>
    dependency>
    
    • 1
    • 2
    • 3
    • 4

    5.2.4 配置 WebSocket

    接下来,我们需要配置 WebSocket。在 Spring Boot 中,可以通过实现 WebSocketConfigurer 接口来配置 WebSocket。WebSocketConfigurer 接口定义了一个 configureWebSocket 方法,我们可以在这个方法中注册 WebSocket 处理器和拦截器。

    下面是一个示例 WebSocketConfigurer 的实现:

    @Configuration
    public class WebSocketConfig implements WebSocketConfigurer {
    
        @Autowired
        private WebSocketHandler webSocketMessageHandler;
    
        @Override
        public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
            registry.addHandler(webSocketMessageHandler, "/websocket")
                    .addInterceptors(new WebSocketInterceptor())
                    .setAllowedOrigins("*");
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    在上面的示例中,我们实现了 WebSocketConfigurer 接口,并注册了一个 WebSocket 处理器。在 registerWebSocketHandlers 方法中,我们调用了 addHandler 方法来注册 WebSocket 处理器,并指定了 WebSocket 的路径。在这个示例中,WebSocket 的路径是 “/websocket”。setAllowedOrigins 方法用于设置允许的来源,这里设置为 “*” 表示允许所有来源。

    5.2.5 实现 WebSocket 处理器

    接下来,我们需要实现 WebSocket 处理器。WebSocket 处理器负责处理客户端发送的消息,并向客户端发送响应消息。在 Spring Boot 中,可以通过实现 WebSocketHandler 接口来实现 WebSocket 处理器。

    public interface WebSocketHandler {
    
    	/**
    	 * Invoked after WebSocket negotiation has succeeded and the WebSocket connection is
    	 * opened and ready for use.
    	 * @throws Exception this method can handle or propagate exceptions; see class-level
    	 * Javadoc for details.
    	 */
    	void afterConnectionEstablished(WebSocketSession session) throws Exception;
    
    	/**
    	 * Invoked when a new WebSocket message arrives.
    	 * @throws Exception this method can handle or propagate exceptions; see class-level
    	 * Javadoc for details.
    	 */
    	void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception;
    
    	/**
    	 * Handle an error from the underlying WebSocket message transport.
    	 * @throws Exception this method can handle or propagate exceptions; see class-level
    	 * Javadoc for details.
    	 */
    	void handleTransportError(WebSocketSession session, Throwable exception) throws Exception;
    
    	/**
    	 * Invoked after the WebSocket connection has been closed by either side, or after a
    	 * transport error has occurred. Although the session may technically still be open,
    	 * depending on the underlying implementation, sending messages at this point is
    	 * discouraged and most likely will not succeed.
    	 * @throws Exception this method can handle or propagate exceptions; see class-level
    	 * Javadoc for details.
    	 */
    	void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception;
    
    	/**
    	 * Whether the WebSocketHandler handles partial messages. If this flag is set to
    	 * {@code true} and the underlying WebSocket server supports partial messages,
    	 * then a large WebSocket message, or one of an unknown size may be split and
    	 * maybe received over multiple calls to
    	 * {@link #handleMessage(WebSocketSession, WebSocketMessage)}. The flag
    	 * {@link org.springframework.web.socket.WebSocketMessage#isLast()} indicates if
    	 * the message is partial and whether it is the last part.
    	 */
    	boolean supportsPartialMessages();
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    afterConnectionEstablished:连接成功后调用。
    handleMessage:处理发送来的消息。
    handleTransportError: WS 连接出错时调用。
    afterConnectionClosed:连接关闭后调用。
    supportsPartialMessages:是否支持分片消息。

    以上这几个方法重点可以来看一下 handleMessage 方法,handleMessage 方法中有一个 WebSocketMessage 参数,这也是一个接口,我们一般不直接使用这个接口而是使用它的实现类,它有以下几个实现类:
    BinaryMessage:二进制消息体
    TextMessage:文本消息体
    PingMessage: Ping ****消息体
    PongMessage: Pong ****消息体
    但是由于 handleMessage 这个方法参数是WebSocketMessage,所以我们实际使用中可能需要判断一下当前来的消息具体是它的哪个子类,比如这样:

    	@Override
    	public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
    		if (message instanceof TextMessage) {
    			handleTextMessage(session, (TextMessage) message);
    		}
    		else if (message instanceof BinaryMessage) {
    			handleBinaryMessage(session, (BinaryMessage) message);
    		}
    		else if (message instanceof PongMessage) {
    			handlePongMessage(session, (PongMessage) message);
    		}
    		else {
    			throw new IllegalStateException("Unexpected WebSocket message type: " + message);
    		}
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    下面是一个示例 WebSocketHandler 的实现:

    @Component
    @Slf4j
    public class WebSocketMessageHandler extends TextWebSocketHandler {
    
        /**
         * redis 订阅通道名
         */
        public static final String CHANNEL_NAME = "msgRedisTopic";
        /**
         * userId字段名
         */
        public static final String USER_ID = "userId";
        /**
         * 当前节点在线session
         */
        protected static final Map<String, WebSocketSession> CLIENTS = new ConcurrentHashMap<>();
        @Resource
        private RedisTemplate<String, WebSocketMessageDto> redisTemplate;
    
        /**
         * 连接成功后调用
         */
        @Override
        public void afterConnectionEstablished(WebSocketSession session) {
            String uid = String.valueOf(session.getAttributes().get(USER_ID));
            CLIENTS.put(uid, session);
            log.info("uri :" + session.getUri());
            log.info("连接建立:uid{} ", uid);
            log.info("当前连接服务器客户端数: {}", CLIENTS.size());
            log.info("===================================");
        }
    
        /**
         * 连接关闭后调用
         */
        @Override
        public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
            String uid = String.valueOf(session.getAttributes().get(USER_ID));
            CLIENTS.remove(uid);
            log.info("断开连接: uid{}", uid);
            log.info("当前连接服务器客户端数: {}", CLIENTS.size());
        }
    
        /**
         * 处理发送来的消息
         */
        @Override
        protected void handleTextMessage(WebSocketSession session, TextMessage message) throws IOException {
            String payload = message.getPayload();
            log.info("服务端收到消息:{}", payload);
            boolean isValid = JSON.isValidObject(payload);
            if (!isValid) {
                log.info("服务端收到消息的数据格式不符合要求,要求是json格式");
                return;
            }
            WebSocketMessageDto webSocketMessageDto = JSONUtil.toBean(payload, WebSocketMessageDto.class);
            String toUid = webSocketMessageDto.getToUid();
    
            if (CLIENTS.containsKey(toUid)) {
                try {
                    log.info("当前ws服务器内包含客户端uid {},直接发送消息", toUid);
                    CLIENTS.get(toUid).sendMessage(new TextMessage("收到" + webSocketMessageDto.getSendUid() + "的信息:" + payload));
                } catch (Exception e) {
                    log.error("发送消息给uid:{}失败", toUid, e);
                }
            } else {
                log.warn("当前ws服务器内未找到客户端uid {},推送到redis", toUid);
                // 向指定频道发布消息
                redisTemplate.convertAndSend(CHANNEL_NAME, webSocketMessageDto);
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73

    在上面的示例中,我们将所有连接到服务器的 WebSocketSession 对象保存到一个列表中。
    我们实现了 WebSocketHandler 接口,并重写了其中的几个方法。afterConnectionEstablished 方法在建立 WebSocket 连接后被调用,可以在这个方法中进行一些初始化操作。handleMessage 方法用于处理客户端发送的消息,并向客户端发送响应消息。handleTransportError 方法在 WebSocket 传输发生错误时被调用。afterConnectionClosed 方法在 WebSocket 连接关闭后被调用,可以在这个方法中进行一些清理操作。supportsPartialMessages 方法用于设置是否支持部分消息传输。

    总结
    WebSocketSession 是 Spring Boot 中用于与客户端进行 WebSocket 通信的核心概念。在使用 WebSocketSession 时,我们需要先添加 Spring Boot 的 WebSocket 依赖,然后配置 WebSocket,并实现一个 WebSocket 处理器,最后在处理器中使用 WebSocketSession 进行通信。客户端也可以使用 WebSocketSession 进行通信,非常简单。

    总的来说,WebSocketSession 是实现 WebSocket 通信的关键。它提供了一组用于与客户端进行通信的方法,可以用来发送和接收消息。在 Spring Boot 中,使用 WebSocketSession 进行通信非常方便,只需要实现一个 WebSocket 处理器,并使用 WebSocketSession 进行通信即可。

    5.2.6 需求

    服务做了集群,不同客户端可能连接的服务端是不同的机器,如果按照原先写的代码发送消息就会失败,因为不在同一个服务内找不到另一个客户端,这时候发送消息就会失败,所以用到了redis发布订阅模式,订阅同一个频道的服务,都会收到消息,然后判断此客户在不在当前服务内,再就转发给他

    5.2.6.1 声明一个用来发送给指定用户的消息类

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public class WebSocketMessageDto implements Serializable {
    
        public static void main(String[] args) {
            WebSocketMessageDto webSocketMessageDto = new WebSocketMessageDto();
            webSocketMessageDto.setToUid("3");
            webSocketMessageDto.setSendUid("1");
            webSocketMessageDto.setMsg("牛逼");
            System.out.println(JSON.toJSONString(webSocketMessageDto));
        }
    
        private static final long serialVersionUID = -4291728346293647762L;
    
        private String toUid;
        private String sendUid;
        private String msg;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    5.2.6.2 实现一个文本消息处理器(还有其他类型也可以实现),负责处理客户端发送的消息,并向客户端发送响应消息

    @Component
    @Slf4j
    public class WebSocketMessageHandler extends TextWebSocketHandler {
    
        /**
         * redis 订阅通道名
         */
        public static final String CHANNEL_NAME = "msgRedisTopic";
        /**
         * userId字段名
         */
        public static final String USER_ID = "userId";
        /**
         * 当前节点在线session
         */
        protected static final Map<String, WebSocketSession> CLIENTS = new ConcurrentHashMap<>();
        @Resource
        private RedisTemplate<String, WebSocketMessageDto> redisTemplate;
    
        /**
         * 连接成功后调用
         */
        @Override
        public void afterConnectionEstablished(WebSocketSession session) {
            String uid = String.valueOf(session.getAttributes().get(USER_ID));
            CLIENTS.put(uid, session);
            log.info("uri :" + session.getUri());
            log.info("连接建立:uid{} ", uid);
            log.info("当前连接服务器客户端数: {}", CLIENTS.size());
            log.info("===================================");
        }
    
        /**
         * 连接关闭后调用
         */
        @Override
        public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
            String uid = String.valueOf(session.getAttributes().get(USER_ID));
            CLIENTS.remove(uid);
            log.info("断开连接: uid{}", uid);
            log.info("当前连接服务器客户端数: {}", CLIENTS.size());
        }
    
        /**
         * 处理发送来的消息
         */
        @Override
        protected void handleTextMessage(WebSocketSession session, TextMessage message) throws IOException {
            String payload = message.getPayload();
            log.info("服务端收到消息:{}", payload);
            boolean isValid = JSON.isValidObject(payload);
            if (!isValid) {
                log.info("服务端收到消息的数据格式不符合要求,要求是json格式");
                return;
            }
            WebSocketMessageDto webSocketMessageDto = JSONUtil.toBean(payload, WebSocketMessageDto.class);
            String toUid = webSocketMessageDto.getToUid();
    
            if (CLIENTS.containsKey(toUid)) {
                try {
                    log.info("当前ws服务器内包含客户端uid {},直接发送消息", toUid);
                    CLIENTS.get(toUid).sendMessage(new TextMessage("收到" + webSocketMessageDto.getSendUid() + "的信息:" + payload));
                } catch (Exception e) {
                    log.error("发送消息给uid:{}失败", toUid, e);
                }
            } else {
                log.warn("当前ws服务器内未找到客户端uid {},推送到redis", toUid);
                // 向指定频道发布消息
                redisTemplate.convertAndSend(CHANNEL_NAME, webSocketMessageDto);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72

    5.2.6.3 **通过实现WebSocketConfigurer接口,可以注册相应的WebSocket处理器、路径、允许域、SockJs支持。

    把我们实现的websocket消息处理器注册进来,并配置连接路径/websocket**

    @Configuration
    @EnableWebSocket
    public class WebSocketConfig implements WebSocketConfigurer {
    
        @Autowired
        private WebSocketHandler webSocketMessageHandler;
    
        @Override
        public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
            registry.addHandler(webSocketMessageHandler, "/websocket")
                    .addInterceptors(new WebSocketInterceptor())
                    .setAllowedOrigins("*");
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    5.2.6.4 实现redis监听消息类,订阅指定频道监听消息,当前服务端有此用户id则发送给此用户消息

    @Component
    @Slf4j
    public class WebSocketRedisMessageListener implements MessageListener {
    
        @Resource
        private RedisTemplate<String, Object> redisTemplate;
    
        @Override
        public void onMessage(Message message, byte[] bytes) {
            try {
                // 获取消息
                byte[] messageBody = message.getBody();
                TypeReference<WebSocketMessageDto> reference = new TypeReference<WebSocketMessageDto>() {
                };
                WebSocketMessageDto webSocketMessageDto = Convert.convert(reference, redisTemplate.getValueSerializer().deserialize(messageBody));
    
                Map<String, WebSocketSession> onlineSessionMap = WebSocketMessageHandler.CLIENTS;
                String toUid = webSocketMessageDto.getToUid();
                if (onlineSessionMap.containsKey(toUid)) {
    
                    String sendUid = webSocketMessageDto.getSendUid();
                    String msg = webSocketMessageDto.getMsg();
                    log.info("redis监听消息,{} 收到 {} 的消息:{}", sendUid, toUid, msg);
                    onlineSessionMap.get(toUid).sendMessage(new TextMessage("收到" + sendUid + "的消息:" + msg));
                }
            } catch (IOException e) {
                log.error("Redis监听消息失败", e);
            }
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    5.2.6.5 配置Redis,发布订阅,注册redis消息监听器监听指定频道

    @Configuration
    @EnableCaching
    public class RedisConfig extends CachingConfigurerSupport {
    
        @Bean
        public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
            RedisTemplate<String, Object> template = new RedisTemplate<>();
            template.setConnectionFactory(connectionFactory);
    
            StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
            // 使用Jackson2JsonRedisSerialize 替换默认序列化(默认采用的是JDK序列化)
            Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
            ObjectMapper om = new ObjectMapper();
            om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
            om.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.WRAPPER_ARRAY);
            jackson2JsonRedisSerializer.setObjectMapper(om);
    
            template.setValueSerializer(jackson2JsonRedisSerializer);
            // key 采用 String的序列化
            template.setKeySerializer(stringRedisSerializer);
            // hash 的key采用String的序列化
            template.setHashKeySerializer(stringRedisSerializer);
            // hash 的 value 采用 String 的序列化
            template.setHashValueSerializer(jackson2JsonRedisSerializer);
            template.afterPropertiesSet();
            return template;
        }
    
        @Bean
        public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory factory, WebSocketRedisMessageListener webSocketRedisMessageListener) {
            RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
            redisMessageListenerContainer.setConnectionFactory(factory);
            // 订阅(多个)频道:将消息侦听器添加到(可能正在运行的)容器中。如果容器正在运行,侦听器会尽快开始接收(匹配)消息。
            //参数1:消息监听器,参数2:消息频道
            redisMessageListenerContainer.addMessageListener(webSocketRedisMessageListener, new ChannelTopic(WebSocketMessageHandler.CHANNEL_NAME));
            //redisMessageListenerContainer.addMessageListener(webSocketRedisMessageListener2, new ChannelTopic(WebSocketMessageHandler.CHANNEL_NAME2));
            return redisMessageListenerContainer;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    5.2.6.6 **实现WebSocket 握手请求的拦截器。可用于检查握手请求和响应,以及将属性传递给目标 WebSocketHandler

    业务逻辑是判断请求参数是否完整,获取userId,塞入attributes,在WebSocketMessageHandler的WebSocketSession.getAttributes().get(USER_ID)获取出来**

    @Slf4j
    @Component
    public class WebSocketInterceptor implements HandshakeInterceptor {
        /**
         * 握手前
         * @param attributes 如果该方法通过,可以在WebSocketHandler拿到这里设置的数据,org.springframework.web.socket.WebSocketSession#getAttributes()这个可以拿到这里设置的值
    
         */
        @Override
        public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) {
            log.info("uid 握手开始");
            // 获得请求参数
            Map<String, String> paramMap = HttpUtil.decodeParamMap(request.getURI().getQuery(), Charset.defaultCharset());
            String uid = paramMap.get(WebSocketMessageHandler.USER_ID);
            if (CharSequenceUtil.isNotBlank(uid)) {
                // 放入属性域
                attributes.put(WebSocketMessageHandler.USER_ID, uid);
                log.info("用户{}握手成功!", uid);
                return true;
            }
            return false;
        }
    
        /**
         * 握手后
         */
        @Override
        public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
            log.info("握手完成");
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    到此代码结束,下面是启动验证方式

    在这里插入图片描述

    5.2.6.7 修改端口,启动3个实例,8080,8081,8082

    在这里插入图片描述

    5.2.6.8 进入网站测试

    http://www.websocket-test.com/
    依旧进入此网站连接本地启动的websocket服务
    其中1和11两个客户端连同一台服务

    ws://127.0.0.1:8080/websocket?userId=1
    ws://127.0.0.1:8080/websocket?userId=11
    ws://127.0.0.1:8081/websocket?userId=2
    ws://127.0.0.1:8082/websocket?userId=3
    
    • 1
    • 2
    • 3
    • 4

    在这里插入图片描述
    在这里插入图片描述
    在1客户端发送消息给3
    在这里插入图片描述

    1和3连接的服务端不在同一个,通过redis发布,3所在服务端的redis监听消息,输出信息,然后发给3客户端

    1服务端
    在这里插入图片描述
    3服务端输出redis监听的消息
    在这里插入图片描述
    3服务端收到发给3客户端
    在这里插入图片描述

  • 相关阅读:
    工作中的综合能力
    确定谁在往mysql的某张表中写入数据
    华中某科技大学校园网疑似dns劫持的解决方法
    【Android】Lombok for Android Studio 离线插件
    含文档+PPT+源码等]精品基于PHP实现的计算机信息管理学院网站[包运行成功]计算机PHP毕业设计项目源码
    重构:banner 中 logo 聚合分散动画
    mysql 忘记密码后重置
    21天学习挑战赛——Python爬虫解析器BeautifulSoup4
    【基础篇】Redis深入理解与实践指南(一)之Redis的前世今生
    二、GRE(Generic Routing Encapsulation,通用路由封装协议)
  • 原文地址:https://blog.csdn.net/Fire_Sky_Ho/article/details/133899945