• Spring Boot 集成 WebSocket 实例 | 前端持续打印远程日志文件更新内容(模拟 tail 命令)


    这个是我在 CSDN 的第一百篇原则博文,留念😎

    #1 需求说明

    先说下项目结构,后端基于 Spring Boot 3,前端为 node.js 开发的控制台程序。现在希望能够在前端模拟 tail 命令,持续输出后端的日志文件。

    #2 技术方案

    #2.1 基于轮询(PASS)

    这个方案实施较为简单,通过前端不断(定时)发起请求,并携带已读的内容坐标(position),询问后端日志文件是否有更新,判断依据为当前文件大小大于 position。若有变动,则读取更新的内容,回显在前端控制台。

    此方案会产生非常多的请求,如果定时间隔设置不好,会有明显的延迟,故不采用。

    #2.2 WebSocket 长连接

    1. 前端开启一个 WebSocket
    2. 后端监听到长连接后,启动文件变动检测线程
    3. 若文件发生变动,则读取更新内容,发送到前端

    #3 实施

    #3.1 后端改造

    关于 Spring Boot 与 WebSocket 的集成,请转到:springboot集成websocket持久连接(权限过滤+拦截)

    首先,我们定义一个监听文件变动并读取最新内容的工具类(借助于 common-io 包):

    class FileTail(val path:Path, val handler: Consumer<String>, delay:Long=1000): FileAlterationListenerAdaptor() {
        private val watcher = FileSystems.getDefault().newWatchService()
    
        private val MODE = "r"
        private var reader = RandomAccessFile(path.toFile(), MODE)
        private var position= reader.length()
    
        // 使用 JDK 自带的 WatchService ,发现不能正常读取文件追加的内容
        private var monitor: FileAlterationMonitor = FileAlterationMonitor(delay)
    
        init {
            // 初始化监视器,只检测同名的文件
            FileAlterationObserver(path.parent.toFile()) { f: File -> f.name == path.name }.also { observer->
                observer.addListener(this)
                monitor.addObserver(observer)
    
                monitor.start()
            }
        }
    
        override fun onFileChange(file: File) {
            reader.seek(position)
    
            val bytes = mutableListOf<Byte>()
            val tmp = ByteArray(1024)
            var readSize: Int
    
            while ((reader.read(tmp).also { readSize = it }) != -1) {
                for (i in 0..< readSize){
                    bytes.add(tmp[i])
                }
            }
    
            position += bytes.size
            handler.accept(String(bytes.toByteArray()))
        }
        
        fun stop() {
            reader.close()
            monitor.stop()
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    再定义长连接的通信处理类:

    @Component
    class FileTailWsHandler : TextWebSocketHandler() {
        private val logger = LoggerFactory.getLogger(javaClass)
    
        companion object {
            val monitors = mutableMapOf<String, FileTail>()
        }
    
        override fun afterConnectionEstablished(session: WebSocketSession) {
            try{
                 val textFile = Paths.get("logs/spring.log")
    
                // 加入队列
                monitors[session.id] = FileTail(
                    textFile,
                    { text -> session.sendMessage(TextMessage(text)) }
                )
            }catch (e:Exception){
                logger.error("处理客户端消息失败", e)
                session.sendMessage(TextMessage("服务器出错:${ExceptionUtils.getMessage(e)}"))
                session.close(CloseStatus.SERVER_ERROR)
            }
        }
    
        override fun afterConnectionClosed(session: WebSocketSession, status: CloseStatus) {
            logger.info("客户端(${session.id}${session.remoteAddress} 断开连接...")
    
            monitors.remove(session.id)?.stop()
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    编写配置类,启用上述的组件:

    @Component
    class WsInterceptor : HandshakeInterceptor {
        private val logger = LoggerFactory.getLogger(javaClass)
    
        override fun beforeHandshake(
            request: ServerHttpRequest,
            response: ServerHttpResponse,
            wsHandler: WebSocketHandler,
            attributes: MutableMap<String, Any>
        ): Boolean {
            if(logger.isDebugEnabled){
                logger.debug("WS 握手开始:${request.uri} 客户端=${request.remoteAddress}")
                request.headers.forEach { name, v -> logger.debug("[HEADER] $name = $v") }
            }
    		//此处可以进行鉴权
    	
    		//写入属性值,方便在 handler 中获取
            attributes[F.PARAMS]    = request.headers.getFirst(F.PARAMS)?: EMPTY
            // 返回 true 才能建立连接
            return true
        }
    
        override fun afterHandshake(
            request: ServerHttpRequest,
            response: ServerHttpResponse,
            wsHandler: WebSocketHandler,
            exception: Exception?
        ) {
        }
    }
    
    @Configuration
    @EnableWebSocket
    class SocketConfig : WebSocketConfigurer {
        private val logger = LoggerFactory.getLogger(javaClass)
    
        @Resource
        lateinit var interceptor: WsInterceptor
        @Resource
        lateinit var fileTailHandler:FileTailWsHandler
    
        override fun registerWebSocketHandlers(registry: WebSocketHandlerRegistry) {
            registry.addHandler(fileTailHandler, "/ws/file-tail").addInterceptors(interceptor)
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    #3.2 前端(node.js)

    请先安装依赖:npm i -D ws

    /**
     * 跟踪远程日志文件
     * @param {*} ps
     */
    const _tailRemoteFile = async ps=>{
        let url = remoteUrl("/ws/file-tail")
        let index = url.indexOf("://")
    
        let headers = {}
        headers.params = JSON.stringify(ps)
    
        const client = new WebSocket(`ws${url.substring(index)}`, { headers })
        client.on('open', ()=> console.debug(chalk.magenta(`与服务器连接成功 🤝`)))
        // client.on('close',()=> console.debug(chalk.magenta(`\n与服务器连接关闭 👋`)))
        client.on('error', e=> {
            console.debug(chalk.red(e))
        })
        client.on('message', /** @param {Buffer} buf */buf=>{
            let line = buf.toString()
            if(line.endsWith("\n") || line.endsWith("\r\n"))
                line = line.substring(0, line.length-2)
            console.debug(line)
        })
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    #3.3 看看效果

  • 相关阅读:
    springboot获取不到客户端ip问题排查
    经典网络架构-ResNet
    Fluent Support – 适用于 WordPress 的客户支持和帮助台插件
    JAVA爱音乐网站计算机毕业设计Mybatis+系统+数据库+调试部署
    Redis(七) - 封装Redis工具类
    Linux上配置NAT
    c语言二维数组解引用复制一篇文章学习用
    什么商业模式是适合你,元宇宙电商NFG了解一下
    修改禅道启动因mysql端口被占用问题
    经典机器学习备忘录
  • 原文地址:https://blog.csdn.net/ssrc0604hx/article/details/136718655