• Netty简介


    Socket弊端

    前言

    1、可能有大量的线程处于休眠,只是等待输入或者输出数据
    2、每个线程的调用栈都分配内存很庞大
    3、线程间上下文切换开销
    可以理解为你要雇那么多工人还有给工人每个人分一块地盘,而且你还得不停地切换用哪个工人。

    引入NIO

    IO
    请添加图片描述
    NIO
    请添加图片描述
    class java.nio.channels.Selector是Java的非阻塞I/O实现的关键。它使用了事件通知API以确定在一组非阻塞套接字中有哪些已经就绪能够进行I/O相关的操作。

    Netty介绍

    设计 统一的API,支持多种传输类型,阻塞的和非阻塞的
    简单而强大的线程模型
    真正的无连接数据报套接字支持
    链接逻辑组件以支持复用
    易于使用 详实的Javadoc和大量的示例集
    不需要超过JDK 1.6+[7]的依赖。(一些可选的特性可能需要Java 1.7+和/或额外的依赖)
    性能 拥有比Java的核心API更高的吞吐量以及更低的延迟
    得益于池化和复用,拥有更低的资源消耗
    最少的内存复制
    健壮性 不会因为慢速、快速或者超载的连接而导致OutOfMemoryError
    消除在高速网络中NIO应用程序常见的不公平读/写比率
    安全性 完整的SSL/TLS以及StartTLS支持
    可用于受限环境下,如Applet和OSGI
    社区驱动 发布快速而且频繁

    Netty核心

    Channel它代表一个到实体(如一个硬件设备、一个文件、一个网络套接字或者一个能够执行一个或者多个不同的I/O操作的程序组件)的开放连接,如读操作和写操作
    回调;
    Future;
    事件和ChannelHandler

    项目中用到的netty

    netty作为服务端,向客户端推送视差信息

    package com.wg.server;
    
    import com.wg.constant.Constants;
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.FixedLengthFrameDecoder;
    import io.netty.util.concurrent.Future;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Service;
    
    import javax.annotation.PreDestroy;
    
    /**
     * @author lst
     * @date 2023/6/6 16:58
     * @return null
     */
    @Slf4j
    @Service
    public class DirectionServer {
        @Value("${netty.thread-number:1}")
        private int threadNumber;
        @Value("${direction.port:8096}")
        private int port;
        @Autowired
        private DirectionServerHandler directionServerHandler;
        private Channel channel;
        private final EventLoopGroup GROUP = new NioEventLoopGroup(threadNumber);
    
        public void start() throws Exception {
            // 1、启动器,负责装配netty组件,启动服务器
            ChannelFuture channelFuture = new ServerBootstrap()
                    // 2、创建 NioEventLoopGroup,可以简单理解为 线程池 + Selector
                    .group(GROUP)
                    // 3、选择服务器的 ServerSocketChannel 实现
                    .channel(NioServerSocketChannel.class)
                    // 4、child 负责处理读写,该方法决定了 child 执行哪些操作
                    // ChannelInitializer 处理器(仅执行一次)
                    // 它的作用是待客户端SocketChannel建立连接后,执行initChannel以便添加更多的处理器
                    .childHandler(new ChannelInitializer<NioSocketChannel>() {
    
                        @Override
                        protected void initChannel(NioSocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new FixedLengthFrameDecoder(Constants.MEG_LENGTH));
                            ch.pipeline().addLast(directionServerHandler);
                        }
                    }).bind(port).sync();
            if (channelFuture != null && channelFuture.isSuccess()) {
                log.warn("DirectionServer start success, port = {}", port);
                // 获取通道
                channel = channelFuture.channel();
    
                channelFuture.channel().closeFuture().addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        log.warn(future.channel().toString() + " 链路关闭");
                        // 链路关闭时,再释放线程池和连接句柄
                        GROUP.shutdownGracefully();
                    }
                });
            } else {
                log.error("DirectionServer start failed!");
            }
        }
    
        @PreDestroy
        public void destroy() {
            try {
                if (channel != null) {
                    ChannelFuture await = channel.close().await();
                    if (!await.isSuccess()) {
                        log.error("DirectionServer channel close fail, {}", await.cause());
                    }
                }
                Future<?> future = GROUP.shutdownGracefully().await();
                if (!future.isSuccess()) {
                    log.error("DirectionServer group shutdown fail, {}", future.cause());
                }
                if (log.isInfoEnabled()) {
                    log.info("DirectionServer shutdown success");
                }
            } catch (InterruptedException e) {
                log.error("DirectionServer shutdown fail, {}", e);
            }
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    /**
     * WebServerHandler.java
     * Created at 2022-08-30
     * Created by chenyuxiang
     * Copyright (C) 2022 WEGO Group, All rights reserved.
     */
    package com.wg.server;
    
    import cn.hutool.core.util.HexUtil;
    import com.wg.constant.Constants;
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.util.ReferenceCountUtil;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.stereotype.Service;
    
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    
    /**
     * @author lst
     * @date 2023/6/6 16:57
     * @return null
     */
    @Slf4j
    @Service
    @ChannelHandler.Sharable
    public class DirectionServerHandler extends ChannelInboundHandlerAdapter {
    
        /**
         * 接入的主程序入口服务
         */
        public static final Map<String, ChannelHandlerContext> DIRECTION_MAP = new ConcurrentHashMap<>();
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            String uuid = ctx.channel().id().asLongText();
            DIRECTION_MAP.put(uuid, ctx);
            log.info("连接请求进入: {}, 地址: {}", uuid, ctx.channel().remoteAddress());
            super.channelActive(ctx);
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            String uuid = ctx.channel().id().asLongText();
            DIRECTION_MAP.remove(uuid);
            ctx.channel().close();
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf in = (ByteBuf) msg;
            try {
                byte[] bytes = new byte[Constants.MEG_LENGTH];
                in.readBytes(bytes);
                log.info("收到消息 --> {}", HexUtil.encodeHexStr(bytes));
            } finally {
                // 释放ByteBuf
                ReferenceCountUtil.release(in);
            }
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            String uuid = ctx.channel().id().asLongText();
            DIRECTION_MAP.remove(uuid);
            log.error(cause.getMessage());
            ctx.close();
        }
    
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    package com.wg.job;
    
    import cn.hutool.core.thread.ThreadUtil;
    import cn.hutool.core.util.HexUtil;
    import com.wg.constant.Constants;
    import com.wg.controller.RecordSnapShotController;
    import com.wg.model.RecordStatusRes;
    import com.wg.server.WebServerHandler;
    import com.wg.util.MessageUtil;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerContext;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.scheduling.annotation.Scheduled;
    import org.springframework.stereotype.Component;
    
    import java.util.Map;
    
    import static cn.hutool.core.convert.Convert.hexToBytes;
    import static com.wg.common.RecordStatus.RECORDING;
    import static com.wg.controller.UserController.*;
    import static com.wg.server.DirectionServerHandler.DIRECTION_MAP;
    
    /**
     * @author lst
     * @date 2023年06月08日 15:07
     */
    @Component
    @Slf4j
    public class DirectionJob {
        private static int msgId = -1;
        @Value("${directionAndRecordStatus.interval:1000}")
        private long interval;
    
        @Scheduled(fixedDelayString = "${working.sendDirectionMessageToClient:30000}")
        public void sendDirectionMessageToClient() throws InterruptedException {
            if (System.getProperty("os.name") != null && System.getProperty("os.name").toLowerCase().startsWith("windows")) {
                return;
            }
            if (!DIRECTION_MAP.isEmpty()) {
                for (Map.Entry<String, ChannelHandlerContext> entry : DIRECTION_MAP.entrySet()) {
                    String msgId0xFF = generateMsgId();
                    String cmdName0xFF = "08";
                    String key = msgId0xFF + cmdName0xFF + fillZero(Integer.toHexString(horizontalVal & 0xFF)) + fillZero(Integer.toHexString(upAndDownVal & 0xFF));
                    byte[] bytes = hexToBytes(key);
                    log.info(key + "," + "此处发送上下左右数值指令:" + HexUtil.encodeHexStr(bytes));
                    entry.getValue().writeAndFlush(Unpooled.copiedBuffer(bytes));
    
                    Thread.sleep(interval);
    
                    msgId0xFF = generateMsgId();
                    cmdName0xFF = "09";
                    RecordStatusRes recordStatusRes = new RecordSnapShotController().getCplusplusRecordStatus();
                    String record0xFF = "00";
                    if (recordStatusRes != null && recordStatusRes.getStatus() != null && recordStatusRes.getStatus().equals(RECORDING)) {
                        //录制中
                        record0xFF = "01";
                    }
                    String recordKey = msgId0xFF + cmdName0xFF + record0xFF;
                    byte[] recordBytes = hexToBytes(recordKey);
                    log.info(recordKey + "," + "此处发送录像状态指令:" + HexUtil.encodeHexStr(recordBytes));
                    entry.getValue().writeAndFlush(Unpooled.copiedBuffer(recordBytes));
                }
            }
    
        }
    
        public static String generateMsgId() {
            msgId++;
            //取低8位就可以
            String ret = Integer.toHexString(msgId & 0xFF);
            ret = ret.length() == 1 ? "0" + ret : ret;
            return ret;
        }
    
        private static String fillZero(String str) {
            str = str.length() == 1 ? "0" + str : str;
            return str;
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
  • 相关阅读:
    二十八、Java 包(package)
    荧光染料Cy7 酰肼,Cy7 hydrazide,Cy7 HZ参数及结构式解析
    Hive怎么调整优化Tez引擎的查询?在Tez上优化Hive查询的指南
    OpenCV笔记整理【模板匹配】
    Vue中this.$set()解决页面不更新问题
    Unity 公用函数整理【一】
    C Primer Plus(6) 中文版 第7章 C控制语句:分支和跳转 7.8 goto语句 7.9 关键概念 7.10 本章小结
    剑指offer 44. 从1到n整数中1出现的次数
    【React入门实战】实现Todo代办
    SpringBoot借助hutool生成图片二维码
  • 原文地址:https://blog.csdn.net/weixin_43914278/article/details/127673992