• springboot Socket 通信


    一、引入依赖:
      <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starter-websocketartifactId>
    
            dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    二、准备工具类:
    /**
     * @author WeiDaPang
     */
    @Configuration
    public class ScheduledConfiguration{
        @Bean
        public TaskScheduler taskScheduler(){
            ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
            taskScheduler.setPoolSize(10);
            taskScheduler.initialize();
            return taskScheduler;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    /**
     * WebSocket 配置
     * @author WeiDaPang
     */
    @EnableWebSocket
    @Configuration
    public class WebSocketConfig implements WebSocketConfigurer {
        @Bean
        public ServerEndpointExporter serverEndpointExporter(){
            return new ServerEndpointExporter();
        }
    
        /**
         *  注入WebSocket的处理器
         */
        @Resource
        private WebSocketServer  webSocketServer;
    
        @Override
        public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
            // webSocketDemo 为前端请求的地址,前端具体地址组成结构为:ws://ip:接口启动的端口/webSocketDemo
            registry.addHandler(webSocketServer,"webSocketByNotice")
                    .setAllowedOrigins("*");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    /**
     * @author Da.Pang
     * 2022/11/8 18:57
     */
    public class WebSocketEntity {
    
        private String userId;
        private WebSocketSession webSocketSession;
    
        public WebSocketEntity(String userId , WebSocketSession webSocketSession){
            this.userId = userId;
            this.webSocketSession = webSocketSession;
        }
    
        public String getUserId() {
            return userId;
        }
    
        public WebSocketEntity setUserId(String userId) {
            this.userId = userId;
            return this;
        }
    
        public WebSocketSession getWebSocketSession() {
            return webSocketSession;
        }
    
        public WebSocketEntity setWebSocketSession(WebSocketSession webSocketSession) {
            this.webSocketSession = webSocketSession;
            return this;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    /**
     * @author WeiDaPang
     */
    @Component
    public class WebSocketServer extends AbstractWebSocketHandler {
    
        private static final Logger logger = LoggerFactory.getLogger(WsService.class);
    
        @Resource
        private WsService  wsService;
    
        /**
         * 连接成功后触发
         */
        @Override
        public void afterConnectionEstablished(WebSocketSession session) throws Exception {
            logger.info("...[客户端连接成功]...");
            // 客户端建立连接,将客户端的session对象加入到WebSocketSessionManager的sessionGroup中
            WebSocketSessionManager.add(session);
            //将连接结果返回给客户端
            wsService.sendMsg(session,session.getId()+" 连接成功"+ LocalDateTime.now());
        }
    
        /**
         * 关闭socket连接后触发
         */
        @Override
        public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
            logger.info("...【客户端关闭连接成功】...");
            // 关闭连接,从WebSocketSessionManager的sessionGroup中移除连接对象
            WebSocketSessionManager.removeAndClose(session);
        }
    
        /**
         * 接收客户端发送的文本消息
         */
        @Override
        protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
            logger.info("{ 客户端发送:}"+message.getPayload());
            //将连接结果返回给客户端
            wsService.sendMsg(session,message.getPayload());
        }
    
        /**
         * 接收客户端发送的二进制消息
         */
        @Override
        protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) throws Exception {
            logger.info("{ 客户端二进制发送: }"+message.getPayload());
        }
    
        /**
         * 异常处理
         */
        @Override
        public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
            System.out.println();
            logger.error("*** socket异常: "+exception.getMessage(),exception);
            // 出现异常则关闭连接,从WebSocketSessionManager的sessionGroup中移除连接对象
            WebSocketSessionManager.removeAndClose(session);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    /**
     * @author WeiDaPang
     */
    public class WebSocketSessionManager {
        private static final Logger logger = LoggerFactory.getLogger(WebSocketSessionManager.class);
        /**
         * 保存连接对象的 session 到集合中
         */
        public  static ConcurrentHashMap<String, WebSocketEntity> sessionGroup = new ConcurrentHashMap<>();
    
    
        /**
         * 添加 session 到集合中
         * @param session 连接对象的session
         */
        public static void add(WebSocketSession session) {
            logger.info(String.valueOf(session.getUri()));
            sessionGroup.put(session.getId(), new WebSocketEntity(getUserIdBySession(session), session));
        }
    
        /**
         * 从集合中删除 session,会返回删除的 session
         * @param key 通过session.getId()得到
         */
        public static WebSocketSession remove(String key) {
            return sessionGroup.remove(key).getWebSocketSession();
        }
    
        /**
         * 从集合中删除 session,会返回删除的 session
         * @param session 连接对象的session
         */
        public static WebSocketSession remove(WebSocketSession session) {
            return sessionGroup.remove(session.getId()).getWebSocketSession();
        }
    
        /**
         * 删除并关闭 连接
         * @param key 通过session.getId()得到
         */
        public static void removeAndClose(String key) {
            WebSocketSession session = remove(key);
            if (session != null) {
                try {
                    // 关闭连接
                    session.close();
                } catch (IOException e) {
                    logger.error("关闭出现异常处理",e);
                    e.printStackTrace();
                }
            }
        }
    
     	/**
         * 删除并关闭 连接
         * @param session 连接对象的session
         */
        public static void removeAndClose(WebSocketSession session) {
            WebSocketSession session2 = remove(session);
            if (session2 != null) {
                try {
                    // 关闭连接
                    session.close();
                } catch (IOException e) {
                    logger.error("关闭出现异常处理",e);
                    e.printStackTrace();
                }
            }
        }
    
        /**
         * 从集合中获得 session
         * @param key 通过session.getId()得到
         */
        public static WebSocketSession get(String key) {
            return sessionGroup.get(key).getWebSocketSession();
        }
    
    
        /**
         * [描述] 从ws地址中获取用户id
         * 格式:ws://IP:端口/webSocketByNotice?userId=10000000
         */
        public static String getUserIdBySession(WebSocketSession session){
            String uri = Objects.requireNonNull(session.getUri()).toString();
            uri =  uri.substring(uri.indexOf("?")+1);
            uri =  uri.replace("userId=","").trim();
            return uri;
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    /**
     * @author WeiDaPang
     */
    @Service
    public class WsService {
    
    
        /**
         * 发送消息给指定客户端
         * @param session 客户端session
         * @param text 发送消息的内容
         */
        public synchronized  void sendMsg(WebSocketSession session, String text) throws IOException {
            session.sendMessage(new TextMessage(text));
            Notice notice = new Notice();
            notice.setNoticeTitle("消息标题");
            notice.setNoticeContent(text);
            notice.setCreateTime(new Date());
            // 发送一条消息 就广播给所有人
            broadcastMsg(JSONUtil.toJsonStr(notice));
        }
        /**
         * 发送消息给指定客户端
         * @param session 客户端session
         * @param notice 通知
         */
        public synchronized  void sendMsgByNotice(WebSocketSession session, Notice notice) throws IOException {
            session.sendMessage(new TextMessage(JSONUtil.toJsonStr(notice)));
        }
    
    
    
        /**
         * 发送消息给所有客户端,客户端的session必须在WebSocketSessionManager的sessionGroup中
         * @param text 发送消息的内容
         */
        public synchronized void broadcastMsg(String text) throws IOException {
            for (WebSocketEntity entity: WebSocketSessionManager.sessionGroup.values()) {
                entity.getWebSocketSession().sendMessage(new TextMessage(text));
            }
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    三、测试html页面代码:
    DOCTYPE HTML>
    <html lang="UTF-8">
    <head>
        <title>WebSocket 测试页面title>
    head>
    <meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
    <body>
    <input id="text" type="text" />
    <button onclick="send()">发送消息button>
    <button onclick="closeWebSocket()">关闭webScoket连接button>
    <div id="message">div>
    body>
    
    <script type="text/javascript">
        // 连接到WebSocket的url地址。格式为  ws://ip:接口启动的端口/webSocketDemo
        // 如果 sping boot 配置文件中中配置了server.servlet.context-path ,则格式为: ws://ip:接口启动的端口/server.servlet.context-path的名称/webSocketDemo
        // 此处demo中 sping boot 配置了server.servlet.context-path 为 webSocketDemoApi
        let connectionUrl = "ws://127.0.0.1:808/webSocketByNotice?userId=10000001"
        let ws = null;
        //判断当前浏览器是否支持WebSocket
        if ('WebSocket' in window) {
            ws = new WebSocket(connectionUrl);
        }else {
            alert('当前浏览器不支持 websocket')
        }
    
        //连接发生错误的回调方法
        ws.onerror = function () {
            setMessageInnerHTML("WebSocket连接发生错误");
        };
    
        //连接成功建立的回调方法
        ws.onopen = function(event) {
            console.log("ws调用连接成功回调方法")
            //ws.send("")
        }
        //接收到消息的回调方法
        ws.onmessage = function(message) {
            console.log("接收消息:" + message.data);
            if (typeof(message.data) == 'string') {
                setMessageInnerHTML(message.data);
            }
        }
        //ws连接断开的回调方法
        ws.onclose = function(e) {
            console.log("ws连接断开")
            //console.log(e)
            setMessageInnerHTML("ws close");
        }
    
        //将消息显示在网页上
        function setMessageInnerHTML(innerHTML) {
            console.log(innerHTML)
            document.getElementById('message').innerHTML += '接收的消息:' + innerHTML + '
    '
    ; } //关闭连接 function closeWebSocket() { ws.close(); } //发送消息 function send(msg) { if(!msg){ msg = document.getElementById('text').value; console.log(msg) document.getElementById('message').innerHTML += "发送的消息:" + msg + '
    '
    ; ws.send(msg); } }
    script>
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    四、当socket和定时任务同时被引入自己的项目报错:

    由于我的项目同时引入了
    socket和定时任务,所以启动时候会报错,解决方式就是为socket创建一个【TaskScheduler】的bean。(第一个工具类就是解决这个问题的)

  • 相关阅读:
    Matlab:矩阵分解
    C#中使用Bitmap 传递图到C++
    来看看你是不是真的了解 RSA 加密算法,查漏补缺!
    Azure OpenAI 服务
    云原生大数据平台零信任网络安全实践技术稿
    BI工具-DataEase(1) 安装
    Java实现图片上传功能(前后端:vue+springBoot)
    iOS上架App Store的全攻略
    SpringCloud Alibaba微服务第5章之Gateway
    程序员的数学课06 向量及其导数:计算机如何完成对海量高维度数据计算?
  • 原文地址:https://blog.csdn.net/Timeguys/article/details/127789538