• SpringBoot整合Websocket,实现作为客户端接收消息的同时作为服务端向下游客户发送消息


    SpringBoot整合Websocket

    1. SpringBoot作为服务端

    作为服务端时,需要先导入websocket的依赖

    <dependency>
    	<groupId>org.springframework.bootgroupId>
    	<artifactId>spring-boot-starter-websocketartifactId>
    dependency>
    

    创建WebSocketServer工具类

    package com.newlinker.jiangyin.utils;
    
    /**
     * @author cyl
     * @time 2023/7/21
     */
    import org.springframework.stereotype.Component;
    
    import javax.websocket.*;
    import javax.websocket.server.ServerEndpoint;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    
    @Component
    @ServerEndpoint(value = "/websocket")
    public class WebSocketServer {
    
        // 客户端会话列表
        private static final Map clientSessions = new ConcurrentHashMap<>();
    
        @OnOpen
        public void onServerOpen(Session session) {
            // 客户端连接到本地 WebSocket 服务
            System.out.println("Client connected: " + session.getId());
            clientSessions.put(session.getId(), session);
        }
    
        @OnMessage
        public void onMessage(Session session, String message) {
            // 处理客户端发送的消息
            System.out.println("Received message from client " + session.getId() + ": " + message);
    
            // 示例:将收到的消息广播给所有客户端
            //broadcast(message);
        }
    
        @OnClose
        public void onServerClose(Session session, CloseReason reason) {
            // 客户端断开连接
            System.out.println("Client " + session.getId() + " disconnected: " + reason);
            clientSessions.remove(session.getId());
        }
    
        @OnError
        public void onError(Session session, Throwable throwable) {
            // 客户端连接发生错误
            System.out.println("WebSocket client error: " + throwable.getMessage());
            clientSessions.remove(session.getId());
        }
    
        // 发送消息给指定客户端
        public void sendToClient(String clientId, String message) {
            Session session = clientSessions.get(clientId);
            if (session != null && session.isOpen()) {
                session.getAsyncRemote().sendText(message);
            }
        }
    
        // 广播消息给所有客户端
        public void broadcast(String message) {
            for (Session session : clientSessions.values()) {
                if (session.isOpen()) {
                    session.getAsyncRemote().sendText(message);
                }
            }
        }
    
        // 关闭客户端连接
        public void closeClientConnection(String clientId) {
            Session session = clientSessions.get(clientId);
            if (session != null && session.isOpen()) {
                try {
                    session.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, "Closing connection"));
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
    

    添加Spring Bean配置

    package com.newlinker.jiangyin.config;
    
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.web.socket.server.standard.ServerEndpointExporter;
    
    /**
     * @author cyl
     * @time 2022/4/11
     */
    
    @Configuration
    public class WebSocketConfig {
        @Bean
        public ServerEndpointExporter serverEndpointExporter(){
            return new ServerEndpointExporter();
        }
    }
    
    

    至此,SpringBoot已可作为服务端进行websocket连接测试,测试时的路径为:

    ws://localhost:port/websocket

    其中若SpringBoot配置了ssl证书可提供https访问,则应将websocket连接协议更改为wss

    websocket路径中的"/websocket"由@ServerEndpoint注解决定,推荐使用在线测试,简单方便

    2. SpringBoot作为客户端

    作为客户端,推荐使用okhttp的依赖以及google的gson转换包(可与上方的依赖共存,不用担心)

    <dependency>
    	<groupId>com.squareup.okhttp3groupId>
    	<artifactId>okhttpartifactId>
    	<version>4.9.1version>
    dependency>
    
    <dependency>
    	<groupId>com.google.code.gsongroupId>
    	<artifactId>gsonartifactId>
    	<version>2.8.9version>
    dependency>
    

    创建WebSocketClient工具类

    package com.newlinker.jiangyin.utils;
    
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONObject;
    import com.google.gson.Gson;
    import com.google.gson.JsonArray;
    import com.google.gson.JsonObject;
    import com.newlinker.jiangyin.config.XingHuoConfig;
    import com.newlinker.jiangyin.entity.ro.Payload;
    import com.newlinker.jiangyin.entity.ro.ResponseData;
    import okhttp3.*;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.scheduling.annotation.Async;
    import org.springframework.stereotype.Component;
    
    import javax.websocket.OnError;
    import javax.websocket.Session;
    import javax.websocket.server.ServerEndpoint;
    import java.net.URISyntaxException;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    
    
    /**
     * @author cyl
     * @time 2022/4/11
     */
    @Component
    public class WebSocketClient extends WebSocketListener {
        //注入你的WebSocketServer工具类
        @Autowired
        private WebSocketServer webSocketServer;
    
        private WebSocket webSocket;
    
        // 客户端连接其他服务器
        public void connectToServer(String serverUrl) {
            OkHttpClient okHttpClient = new OkHttpClient.Builder().build();
            Request request = new Request.Builder().url(serverUrl).build();
            webSocket = okHttpClient.newWebSocket(request, this);
        }
    
    
        @Override
        public void onOpen(WebSocket webSocket, Response response) {
            super.onOpen(webSocket, response);
    
        }
    	
        //收到消息时触发,核心逻辑
        @Override
        public void onMessage(WebSocket webSocket, String text) {
            ResponseData responseData = GSON.fromJson(text, ResponseData.class);
            //此处服务器返回的status值为0时代表连接正常,由接口具体情况而定,与协议无关
            if (0 == responseData.getHeader().get("code").getAsInt()) {
                    Payload pl =GSON.fromJson(responseData.getPayload(), Payload.class);
                    JsonArray temp = (JsonArray) pl.getChoices().get("text");
                    JsonObject jo = (JsonObject) temp.get(0); 
                //解析结果后将内容转发给下游客户端,也可以使用sendMessage方法定向发送
                    webSocketServer.broadcast(jo.get("content").getAsString());
                //如果不想每次发送消息时都主动连接,需要建立websocket心跳,这里每次收发消息都主动断开
                    webSocket.close(3, "客户端主动断开链接");
                }
            } else {
                System.out.println("返回结果错误:\n" + responseData.getHeader().get("code") + " " + responseData.getHeader().get("message"));
            }
        }
    
        @Override
        public void onFailure(WebSocket webSocket, Throwable t, Response response) {
            System.out.println("WebSocket连接失败:");
            super.onFailure(webSocket, t, response);
            System.out.println(response);
        }
    
        @OnError
        public void onError(Session session, Throwable throwable) {
            System.out.println("WebSocket发生错误:" + throwable.getMessage());
        }
    	
    	//可以在Controller中调用该方法进行websocket的手动发送以及参数调整
        public void sendMessage(String word) {
            connectToServer();
            JsonObject frame = new JsonObject();
            //根据自己的需求填充你的请求参数
            //...
            webSocket.send(frame.toString());
            System.out.println(frame.toString());
        }
    }
    
    
  • 相关阅读:
    【数据结构】C++实现红黑树
    Django(二)精美博客搭建(13)实现留言页面及留言功能
    【附源码】计算机毕业设计SSM摊位管理系统
    微服务技术栈-认识微服务和第一个微服务Demo
    2. 加载与存储指令
    独立站新手卖家如何玩转低成本营销
    IDEA崩溃:A fatal error has been detected by the Java Runtime Environment解决方案
    临近取样(KNN)算法基本原理&sklearn实现
    修改 Ubuntu .cache 和 pip cache 默认路径
    网络代理技术:保障隐私与增强安全
  • 原文地址:https://www.cnblogs.com/onecyl/p/17571577.html