• 从零开始学习Netty - 学习笔记 -Netty入门-ChannelFuture


    5.2.2.Channel

    Channel 的基本概念

    在 Netty 中,Channel 是表示网络传输的开放连接的抽象。它提供了对不同种类网络传输的统一视图,比如 TCP 和 UDP。

    Channel 的生命周期

    Channel 的生命周期包括创建、激活、连接、读取、写入和关闭等阶段。Netty 中的 Channel 具有状态,根据不同的事件触发状态转换。

    Channel channel = ...; // 获取 Channel 实例
    
    // 检查 Channel 是否打开
    if (channel.isOpen()) {
        // 进行数据读取操作
        channel.read();
    }
    
    // 关闭 Channel
    channel.close();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    Channel 的异步 I/O

    Netty 中的 Channel 支持异步的 I/O 操作,这意味着可以在不阻塞线程的情况下进行网络通信。下面是一个简单的读取操作示例:

    // 从 Channel 中读取数据
    channel.read(new ChannelHandler() {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            // 处理读取到的数据
            ByteBuf buf = (ByteBuf) msg;
            System.out.println(buf.toString(Charset.defaultCharset()));
            buf.release(); // 释放资源
        }
    });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    ChannelHandler 和 ChannelPipeline

    ChannelHandler 用于处理入站和出站的事件,而 ChannelPipeline 是一系列 ChannelHandler 的链,负责处理 Channel 传递的事件。

    // 创建一个 ChannelInitializer 用于初始化 ChannelPipeline
    ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(SocketChannel ch) {
            ChannelPipeline pipeline = ch.pipeline();
            
            // 添加自定义的 ChannelHandler 到 ChannelPipeline 中
            pipeline.addLast("handler", new MyChannelHandler());
        }
    };
    
    // 在 ServerBootstrap 中应用 ChannelInitializer
    ServerBootstrap bootstrap = new ServerBootstrap();
    bootstrap.group(eventLoopGroup)
             .channel(NioServerSocketChannel.class)
             .childHandler(initializer); 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    channel的主要作用

    • close():主要用来关闭channel
    • **closeFuture():**用来处理channel的关闭
      • sync方法作用是同步等待channel的关闭
      • addListener方法是异步等待channel关闭
    • **pipeline():**方法添加处理器
    • **write():**方法是将数据写入
    • **writeAndFlush():**方法是将数据写入并刷出

    例如刚刚的客户端代码

    		// 1.创建启动器
    		try {
    			new Bootstrap()
    					// 2.指定线程模型 一个用于接收客户端连接,另一个用于处理客户端读写
    					.group(new NioEventLoopGroup())
    					// 3.选择客户端的Channel的实现
    					.channel(NioSocketChannel.class)
    					// 4.添加处理器
    					.handler(new ChannelInitializer<NioSocketChannel>() {
    						// 5.初始化处理器
    						@Override
    						protected void initChannel(NioSocketChannel ch) throws Exception {
    							// 6.添加具体的handler 客户端是需要一个编码器
    							ch.pipeline().addLast(new StringEncoder());
    						}
    					})
    					// 7.连接到服务器
    					.connect(new InetSocketAddress("localhost", 8080))
    					.sync() // 阻塞方法 知道连接建立
    					.channel() // 代表客户端和服务端的连接
    					// 8.向服务器发送数据
    					.writeAndFlush("hello, world");
    		} catch (InterruptedException e) {
    			throw new RuntimeException(e);
    		}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    5.2.2.1.连接问题sync
    			// 1.创建启动器
    			try {
    				ChannelFuture channelFuture = new Bootstrap()
    						.group(new NioEventLoopGroup())
    						.channel(NioSocketChannel.class)
    						.handler(new ChannelInitializer<NioSocketChannel>() {
    							@Override
    							protected void initChannel(NioSocketChannel ch) throws Exception {
    								ch.pipeline().addLast(new StringEncoder());
    							}
    						})
    						// 7.连接到服务器
    						// connect方法是异步的,返回一个ChannelFuture(异步调用 就是不关心结果,直接返回)
    						// main线程发起了调用,真正执行了connect是另外一个线程 nio线程
    						.connect(new InetSocketAddress("localhost", 8080));
    				// 7.1.同步等待连接成功 如果不调用sync()方法,main线程会继续往下执行,不会等待connect()方法的执行结果
    				channelFuture.sync();
    				// 7.2.获取连接对象 如果没有调用sync()方法,这里的channel此时还没有真正建立起连接
    				Channel channel = channelFuture.channel(); // 连接对象
    				logger.error("channel: {}", channel);
    				// 8.向服务器发送数据
    				channel.writeAndFlush("hello, world");
    			} catch (Exception e) {
    				throw new RuntimeException(e);
    			}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    image-20240226071218470

    image-20240226071322706

    5.2.2.2.处理结果

    带有Future Promise 的类型,都是和异步方法配套使用的,用来正确处理结果的

    • 调用channelFuture.sync()处理同步结果,sync()主要是阻塞当前线程,直到nio线程连接建立完毕

    • 使用addListener(new ChannelFutureListener() )

      • 	// 使用addListener(回调对象)方法,可以在ChannelFuture执行完成后,再执行一些操作
          				channelFuture.addListener(new ChannelFutureListener() {
          					// 在NIO线程连接建立好后,会调用operationComplete方法
          					@Override
          					public void operationComplete(ChannelFuture channelFuture) throws Exception {
          						if (channelFuture.isSuccess()) {
          							// 7.2.获取连接对象 如果没有调用sync()方法,这里的channel就会是null
          							Channel channel = channelFuture.channel(); // 连接对象
          							logger.error("channel: {}", channel);
          							// 8.向服务器发送数据
          							channel.writeAndFlush("hello, world");
          						} else {
          							// 7.3.连接失败
          							Throwable cause = channelFuture.cause();
          							logger.error("connect failed: {}", cause);
          						}
          					}
          				});
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
        • 11
        • 12
        • 13
        • 14
        • 15
        • 16
        • 17
        • 18
    5.2.2.3.处理关闭

    小需求 : 客户端 不断接收用于输入的信息,然后发送给客户端,当用户端输入q 退出 关闭channel

    /**
     *
     * @author 13723
     * @version 1.0
     * 2024/2/27 21:46
     */
    public class CloseFutureClient {
    	private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    
    
    	public static void main(String[] args) throws InterruptedException {
    
    
    		ChannelFuture channelFuture = new Bootstrap()
    				.group(new NioEventLoopGroup())
    				.channel(NioSocketChannel.class)
    				.handler(new ChannelInitializer<NioSocketChannel>() {
    					@Override
    					protected void initChannel(NioSocketChannel ch) throws Exception {
    						ch.pipeline().addLast(new StringEncoder());
    					}
    				})
    				.connect(new InetSocketAddress("localhost", 8080));
    
    		// 客户端 不断接收用于输入的信息,然后发送给客户端,当用户端输入q 退出
    		// 建立建立
    		Channel channel = channelFuture.sync().channel();
    		logger.error("channel: {} ",channel);
    		// 接收用户输入的需求
    		new Thread(()->{
    			Scanner scanner = new Scanner(System.in);
    			while (true){
    				String s = scanner.nextLine();
    				if ("q".equals(s)){
    					// 退出 关闭channel
                        // 1s 后才真正的关闭
    					channel.close();
    					// 退出循环
    					logger.error("处理关闭之后的操作!");
    					break;
    				}
    				// 向服务器 发送数据
    				channel.writeAndFlush(s);
    			}
    		},"input").start();
    	}
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    在这里插入图片描述

    在这里插入图片描述

    • 解决

      • 使用CloseFuture.sync()

        		// 关闭Channel
        		// 获取closeFuture对象 1.同步受理关闭 2.异步处理关闭
        		ChannelFuture closeFuture = channel.closeFuture();
        		logger.error("wait close... ");
        		closeFuture.sync();
        		logger.error("处理关闭之后的操作!");
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6

        在这里插入图片描述

      • 使用addListener(new ChannelFutureListener())

        		closeFuture.addListener(new ChannelFutureListener() {
        			@Override
        			public void operationComplete(ChannelFuture channelFuture) throws Exception {
        				logger.error("处理关闭之后的操作!");
        			}
        		});
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6

        在这里插入图片描述

    `

    此时关闭,会会发现客户端并没有结束,因为线程虽然结束,但是NioEventLoopGroup 里面可能还有线程,这是时关闭,需要调

    **shutdownGracefully()**方法

    // 将NioEventLoopGroup提出来  
    NioEventLoopGroup group = new NioEventLoopGroup();
    ChannelFuture channelFuture = new Bootstrap()
            .group(group)
    .........
        
    // 然后在处理善后中调用 
    @Override
    public void operationComplete(ChannelFuture channelFuture) throws Exception {
        logger.error("处理关闭之后的操作!");
        // 需要保证整个全部关闭
        group.shutdownGracefully();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    在这里插入图片描述

    5.2.2.4.为什么使用异步

    思考下面这样的场景,4个医生给人看病,每个病人花费20分钟,而且医生看病的过程中,是以病人为单位的,一个病人看完了,才能看下一个病人,假设病人源源不断来,可以计算一天4个医生工作8小时,处理病人总数 4 * 8 * 3 = 96

    在这里插入图片描述

    经研究 发现 看病可以分为 四个步骤 经拆分后每个步骤仅需要五分钟

    在这里插入图片描述

    因此 可以做如下优化,只有一开始, 医生 2 3 4 需要分别等待 5 10 15分钟开能开始执行工作,但是只要后续病人源源不断的来,他们就能满负荷工作,并且处理病人的能力提高 到了, 4 * 8 * 12 整个效率 是原先的 4 倍

    (满负载情况下)第一个医生 只挂号,一个号五分钟,那么 一个小时 可以处理 12个,之前一个医生从头到尾只能看一个病人,那么一个小时只能看3个

    在这里插入图片描述

    • 单线程没法异步提高效率,必须配合多线程,多核心cpu才能发挥异步的优势
    • 异步并没有缩短响应时间,反而有所增加(提高的是吞吐量,单位时间内能够处理请求的速度)
    • 合理任务的拆分,也是利用异步的关键
  • 相关阅读:
    区块链架构-fabric单机测试版安装运行(centos7版本)
    【博学谷学习记录】超强总结,用心分享|架构师-设计模式 1
    100天精通Python(数据分析篇)——第51天:numpy函数进阶
    小程序开发必备功能的吐血整理【个人中心界面样式大全】
    Java虚拟机垃圾收集器详细总结
    【Excel导出】(亲测可用)使用实现Hutool工具类将list对象数组导出的简单实现
    8月!优选国产软件 - 国货之光 / Windows 系统必备软件大捆绑!
    (183)Verilog HDL:设计一个移位功能Rotate100
    2022年武汉安全员ABC证评分标准?多少分及格呢?甘建二
    【附源码】计算机毕业设计JAVA旅行指南网站
  • 原文地址:https://blog.csdn.net/q1372302825/article/details/136333984