• SpringBoot整合Websocket,实现作为客户端接收消息的同时作为服务端向下游客户发送消息


    SpringBoot整合Websocket

    1. SpringBoot作为服务端

    作为服务端时,需要先导入websocket的依赖

    <dependency>
    	<groupId>org.springframework.bootgroupId>
    	<artifactId>spring-boot-starter-websocketartifactId>
    dependency>
    

    创建WebSocketServer工具类

    package com.newlinker.jiangyin.utils;
    
    /**
     * @author cyl
     * @time 2023/7/21
     */
    import org.springframework.stereotype.Component;
    
    import javax.websocket.*;
    import javax.websocket.server.ServerEndpoint;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    
    @Component
    @ServerEndpoint(value = "/websocket")
    public class WebSocketServer {
    
        // 客户端会话列表
        private static final Map clientSessions = new ConcurrentHashMap<>();
    
        @OnOpen
        public void onServerOpen(Session session) {
            // 客户端连接到本地 WebSocket 服务
            System.out.println("Client connected: " + session.getId());
            clientSessions.put(session.getId(), session);
        }
    
        @OnMessage
        public void onMessage(Session session, String message) {
            // 处理客户端发送的消息
            System.out.println("Received message from client " + session.getId() + ": " + message);
    
            // 示例:将收到的消息广播给所有客户端
            //broadcast(message);
        }
    
        @OnClose
        public void onServerClose(Session session, CloseReason reason) {
            // 客户端断开连接
            System.out.println("Client " + session.getId() + " disconnected: " + reason);
            clientSessions.remove(session.getId());
        }
    
        @OnError
        public void onError(Session session, Throwable throwable) {
            // 客户端连接发生错误
            System.out.println("WebSocket client error: " + throwable.getMessage());
            clientSessions.remove(session.getId());
        }
    
        // 发送消息给指定客户端
        public void sendToClient(String clientId, String message) {
            Session session = clientSessions.get(clientId);
            if (session != null && session.isOpen()) {
                session.getAsyncRemote().sendText(message);
            }
        }
    
        // 广播消息给所有客户端
        public void broadcast(String message) {
            for (Session session : clientSessions.values()) {
                if (session.isOpen()) {
                    session.getAsyncRemote().sendText(message);
                }
            }
        }
    
        // 关闭客户端连接
        public void closeClientConnection(String clientId) {
            Session session = clientSessions.get(clientId);
            if (session != null && session.isOpen()) {
                try {
                    session.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, "Closing connection"));
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
    

    添加Spring Bean配置

    package com.newlinker.jiangyin.config;
    
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.web.socket.server.standard.ServerEndpointExporter;
    
    /**
     * @author cyl
     * @time 2022/4/11
     */
    
    @Configuration
    public class WebSocketConfig {
        @Bean
        public ServerEndpointExporter serverEndpointExporter(){
            return new ServerEndpointExporter();
        }
    }
    
    

    至此,SpringBoot已可作为服务端进行websocket连接测试,测试时的路径为:

    ws://localhost:port/websocket

    其中若SpringBoot配置了ssl证书可提供https访问,则应将websocket连接协议更改为wss

    websocket路径中的"/websocket"由@ServerEndpoint注解决定,推荐使用在线测试,简单方便

    2. SpringBoot作为客户端

    作为客户端,推荐使用okhttp的依赖以及google的gson转换包(可与上方的依赖共存,不用担心)

    <dependency>
    	<groupId>com.squareup.okhttp3groupId>
    	<artifactId>okhttpartifactId>
    	<version>4.9.1version>
    dependency>
    
    <dependency>
    	<groupId>com.google.code.gsongroupId>
    	<artifactId>gsonartifactId>
    	<version>2.8.9version>
    dependency>
    

    创建WebSocketClient工具类

    package com.newlinker.jiangyin.utils;
    
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONObject;
    import com.google.gson.Gson;
    import com.google.gson.JsonArray;
    import com.google.gson.JsonObject;
    import com.newlinker.jiangyin.config.XingHuoConfig;
    import com.newlinker.jiangyin.entity.ro.Payload;
    import com.newlinker.jiangyin.entity.ro.ResponseData;
    import okhttp3.*;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.scheduling.annotation.Async;
    import org.springframework.stereotype.Component;
    
    import javax.websocket.OnError;
    import javax.websocket.Session;
    import javax.websocket.server.ServerEndpoint;
    import java.net.URISyntaxException;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    
    
    /**
     * @author cyl
     * @time 2022/4/11
     */
    @Component
    public class WebSocketClient extends WebSocketListener {
        //注入你的WebSocketServer工具类
        @Autowired
        private WebSocketServer webSocketServer;
    
        private WebSocket webSocket;
    
        // 客户端连接其他服务器
        public void connectToServer(String serverUrl) {
            OkHttpClient okHttpClient = new OkHttpClient.Builder().build();
            Request request = new Request.Builder().url(serverUrl).build();
            webSocket = okHttpClient.newWebSocket(request, this);
        }
    
    
        @Override
        public void onOpen(WebSocket webSocket, Response response) {
            super.onOpen(webSocket, response);
    
        }
    	
        //收到消息时触发,核心逻辑
        @Override
        public void onMessage(WebSocket webSocket, String text) {
            ResponseData responseData = GSON.fromJson(text, ResponseData.class);
            //此处服务器返回的status值为0时代表连接正常,由接口具体情况而定,与协议无关
            if (0 == responseData.getHeader().get("code").getAsInt()) {
                    Payload pl =GSON.fromJson(responseData.getPayload(), Payload.class);
                    JsonArray temp = (JsonArray) pl.getChoices().get("text");
                    JsonObject jo = (JsonObject) temp.get(0); 
                //解析结果后将内容转发给下游客户端,也可以使用sendMessage方法定向发送
                    webSocketServer.broadcast(jo.get("content").getAsString());
                //如果不想每次发送消息时都主动连接,需要建立websocket心跳,这里每次收发消息都主动断开
                    webSocket.close(3, "客户端主动断开链接");
                }
            } else {
                System.out.println("返回结果错误:\n" + responseData.getHeader().get("code") + " " + responseData.getHeader().get("message"));
            }
        }
    
        @Override
        public void onFailure(WebSocket webSocket, Throwable t, Response response) {
            System.out.println("WebSocket连接失败:");
            super.onFailure(webSocket, t, response);
            System.out.println(response);
        }
    
        @OnError
        public void onError(Session session, Throwable throwable) {
            System.out.println("WebSocket发生错误:" + throwable.getMessage());
        }
    	
    	//可以在Controller中调用该方法进行websocket的手动发送以及参数调整
        public void sendMessage(String word) {
            connectToServer();
            JsonObject frame = new JsonObject();
            //根据自己的需求填充你的请求参数
            //...
            webSocket.send(frame.toString());
            System.out.println(frame.toString());
        }
    }
    
    
  • 相关阅读:
    C++ Reference: Standard C++ Library reference: C Library: cstdio: getchar
    学习笔记:CANOE模拟LIN主节点和实际从节点进行通信测试
    【LeetCode】Day169-子数组的最小值之和
    subprocess Python执行系统命令最优选模块
    Python优化算法08——粘液霉菌算法
    DPDK之eventdev_pipeline源码解析
    服务service
    Android11编译导入PRODUCT_BOOT_JARS
    Android:多进程的开启方式、注意点以及如何解决。
    元素跟随鼠标移动
  • 原文地址:https://www.cnblogs.com/onecyl/p/17571577.html