• Netty源码剖析之NIOEventLoopGroup创建流程


    准备

    1、NettyServer

    public class NettyServer {
    
        public static void main(String[] args) throws InterruptedException {
    
            // 1、创建bossGroup线程组:处理网络连接事件。默认线程数:2*处理器线程数
            NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
            // 2、创建workGroup线程组:处理网络read/write事件。 默认线程数:2*处理器线程数
            NioEventLoopGroup workerGroup = new NioEventLoopGroup();
            // 3、创建服务端启动助手
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            // 4、服务端启动助手,设置线程组
            serverBootstrap.group(bossGroup,workerGroup)
                    // 5、设置服务端Channel实现类
                    .channel(NioServerSocketChannel.class)
                    // 6、设置bossGroup线程队列中等待连接个数
                    .option(ChannelOption.SO_BACKLOG,128)
                    // 7、设置workerGroup中线程活跃状态
                    .childOption(ChannelOption.SO_KEEPALIVE,true)
                    // 使用channelInitializer 可以配置多个handler
                    .childHandler(new ChannelInitializer<SocketChannel>() {// 8、设置一个通道初始化对象
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            // 9、向pipeline中添加自定义的channelHandler, 处理socketChannel传送的数据
                            ch.pipeline().addLast(new NettyServerHandler());
                        }
                    });
    
            // 10、服务端启动并绑定端口
            ChannelFuture future = serverBootstrap.bind(9999).sync();
            // 给服务器启动绑定结果,对结果进行监听,触发回调
            future.addListener((ChannelFuture channelFuture)-> {
                if(channelFuture.isSuccess()){
                    System.out.println("服务器启动成功");
                }else {
                    System.out.println("服务器启动失败");
                }
            });
    
    
            // 11、关闭监听通道和连接池,将异步改同步
            future.channel().closeFuture().sync();
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    2、NettyServerHandler

    /**
     * 自定义的channelHandler处理器
     *
     * 事件触发,触发相应函数
     */
    public class NettyServerHandler implements ChannelInboundHandler {
    
        /**
         * 通道读取事件
         * @param ctx
         * @param msg
         * @throws Exception
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf byteBuffer = (ByteBuf)msg;
            System.out.println("客户端:"+byteBuffer.toString(CharsetUtil.UTF_8));
        }
    
        /**
         * 通道数据读取完毕事件
         * @param ctx
         * @throws Exception
         */
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            TimeUnit.SECONDS.sleep(2);
            ctx.writeAndFlush(Unpooled.copiedBuffer("叫我靓仔!!!".getBytes()));
        }
    
        /**
         * 发生异常捕获事件
         * @param ctx
         * @param cause
         * @throws Exception
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    
        /**
         * 通道就绪事件
         * @param ctx
         * @throws Exception
         */
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
        }
    
    
        @Override
        public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
    
        }
    
        @Override
        public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
    
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    
        }
    
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    
        }
    
        @Override
        public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
    
        }
    
        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    
        }
    
        @Override
        public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88

    3、NettyClient

    /**
     * nettyClient
     */
    public class NettyClient {
    
        public static void main(String[] args) throws InterruptedException {
            // 1、创建线程组
            NioEventLoopGroup group = new NioEventLoopGroup();
            // 2、创建客户端启动助手bootstrap
            Bootstrap bootstrap = new Bootstrap();
            // 3、配置线程组
            bootstrap.group(group)
                    // 4、定义socketChannel的实现类
                    .channel(NioSocketChannel.class)
                    // 5、定义channelHandler, 处理socketChannel的数据
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //6、向pipeline中添加自定义业务处理handler
                            ch.pipeline().addLast(new NettyClientHandler());
                        }
                    });
    
            // 7、启动客户端, 等待连接服务端, 同时将异步改为同步
            ChannelFuture future = bootstrap.connect(new InetSocketAddress(9999)).sync();
            // 8、关闭通道和关闭连接池
            future.channel().closeFuture().sync();
            group.shutdownGracefully();
    
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    4、NettyClientHandler

    /**
     * 自定义的channelHandler处理器
     * 

    * 事件触发,触发相应函数 */ public class NettyClientHandler implements ChannelInboundHandler { /** * 通道读取事件 * * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf byteBuf = (ByteBuf) msg; System.out.println("服务端:" + byteBuf.toString(CharsetUtil.UTF_8)); } /** * 通道数据读取完毕事件 * * @param ctx * @throws Exception */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.copiedBuffer("不行,不行啊!!!".getBytes())); } /** * 发生异常捕获事件 * * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } /** * 通道就绪事件 * * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.copiedBuffer("你好哇 小客客!!!".getBytes())); } @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { } @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { } }

    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92

    NioEventLoopGroup创建流程

    1、定义线程数
    在这里插入图片描述
    在这里插入图片描述
    如果创建线程组的时候没有指定线程数,那么默认线程数将通过指定系统参数或者CPU逻辑处理核数*2来定义。Math.max(1, SystemPropertyUtil.getInt( "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

    2、获取执行器

    在这里插入图片描述
    在这里插入图片描述
    ThreadPerTaskExecutor本质上就要线程工厂创建新的线程执行任务,这里包装了一层。
    在这里插入图片描述
    Thread 使用的是FastThreadLocalThread,优化ThreadLocal在哪?

    3、创建n个线程的NioEventLoop
    在这里插入图片描述

    在这里插入图片描述

    3.1、创建任务队列TaskQueue
    在这里插入图片描述
    3.2、获取SelectorProvider提供器

    在这里插入图片描述
    selectNow 以非阻塞的方式获取感兴趣的事件,感兴趣事件指:SocketChannel注册到Selector上要求监听的事件

    3.3、绑定SelectStrategy
    在这里插入图片描述
    SelectStrategy 实现类为 DefaultSelectStrategy,执行逻辑,判断任务队列中是否有任务,最终返回一个int值,返回SELECT = -1 为阻塞当前线程

    4、为每个NioEventLoop绑定一个中断监听器
    在这里插入图片描述

    总结

    NioEventLoopGroup内部结构
    在这里插入图片描述
    在这里插入图片描述

    执行图

    在这里插入图片描述

  • 相关阅读:
    nacos微服务云开发,远程联调部署,内网穿透,frp部署
    mac for m1(arm):安装mysql的三种方式(本机安装、虚拟机安装、docker安装)
    快速上手:剧本杀dm预约平台小程序的制作流程
    k8s、docker 卸载
    R语言矩阵操作:根据值找到行号和列号
    RedisTemplate序列化后数据字段增加,代码无常,大肠包小肠
    聊聊在不确定环境下的个人成长
    CT图像伪影MATLAB仿真
    计算机操作系统:二级页表原理
    代码随想录算法训练营19期第49天
  • 原文地址:https://blog.csdn.net/qq_44787816/article/details/126835132