• 图文加多个测试带你彻底搞懂Netty ChannelPipeline的执行顺序(附源码)


    这里是weihubeats,觉得文章不错可以关注公众号小奏技术,文章首发。拒绝营销号,拒绝标题党

    netty version

    • 4.1.65.Final

    ChannelPipeline 是什么

    Pipeline,管道、流水线,类似于责任链模式。基本上我们使用Netty开发程序需要编写的就是ChannelPipeline中的各个ChannelHandler

    ChannelPipeline就是用来组合所有的ChannelHandler

    我们今天重点讨论的对象是ChannelPipeline添加多个ChannelHandler他的执行顺序是什么

    测试demo

    这里我们直接用测试代码来看看

    NettyServer

    public class NettyServer {
    	public static void main(String[] args) throws InterruptedException {
    		// 启动服务器
    		startServer();
    		
    	}
    
    	private static void startServer() throws InterruptedException {
    		EventLoopGroup bossGroup = new NioEventLoopGroup();
    		EventLoopGroup workerGroup = new NioEventLoopGroup();
    		ServerBootstrap b = new ServerBootstrap();
    
    		b.group(bossGroup, workerGroup)
    				.channel(NioServerSocketChannel.class)
    				.option(ChannelOption.SO_BACKLOG, 1024)
    				.childHandler(new ChannelInitializer<SocketChannel>() {
    					@Override
    					protected void initChannel(SocketChannel ch) {
    						ChannelPipeline pipeline = ch.pipeline();
    						pipeline.addLast(new CustomInboundHandler1());
    						pipeline.addLast(new CustomInboundHandler2());
    						pipeline.addLast(new CustomOutboundHandler1());
    						pipeline.addLast(new CustomOutboundHandler2());
    						pipeline.addLast(new SimpleChannelInboundHandler<ByteBuf>() {
    							@Override
    							protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
    								//buf.readableBytes()获取缓冲区可读的字节数
    								byte[] req = new byte[msg.readableBytes()];
    								// 将缓冲区的字节数组复制到新的byte数组中
    								msg.readBytes(req);
    								String body = new String(req, StandardCharsets.UTF_8);
    
    								System.out.println("Server received: " + body);
    								ByteBuf firstMessage;
    								byte[] req1 = "你好客户端".getBytes();
    								firstMessage = Unpooled.buffer(req1.length);
    								firstMessage.writeBytes(req1);
    								System.out.println("开始给客户端发送消息");
    								ctx.writeAndFlush(firstMessage);
    							}
    							@Override
    							public void channelRegistered(ChannelHandlerContext ctx) {
    								System.out.println("连接上来了");
    								ctx.fireChannelRegistered();
    							}
    						});
    					}
    				});
    
    		ChannelFuture future = b.bind(8888).sync();
    		future.channel().closeFuture().sync();
    	}
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54

    NettyClient

    public class NettyClient {
    
    	public static void main(String[] args) throws Exception {
    		// 启动客户端
    		startClient();
    	}
    
    	public static void startClient() throws InterruptedException {
    		Bootstrap clientBootstrap = new Bootstrap();
    		NioEventLoopGroup group = new NioEventLoopGroup();
    
    		clientBootstrap.group(group)
    				.channel(NioSocketChannel.class)
    				.handler(new ChannelInitializer<SocketChannel>() {
    					@Override
    					protected void initChannel(SocketChannel ch) {
    						ChannelPipeline pipeline = ch.pipeline();
    						pipeline.addLast(new CustomOutboundHandler2());
    						pipeline.addLast(new CustomOutboundHandler1());
    						pipeline.addLast(new CustomInboundHandler2());
    						pipeline.addLast(new CustomInboundHandler1());
    						pipeline.addLast(new SimpleChannelInboundHandler<ByteBuf>() {
    							@Override
    							public void channelActive(ChannelHandlerContext ctx) {
    								ByteBuf firstMessage;
    								byte[] req = "你好服务器".getBytes();
    								firstMessage = Unpooled.buffer(req.length);
    								firstMessage.writeBytes(req);
    								System.out.println("------------ 开始发送消息 ------------");
    								ctx.writeAndFlush(firstMessage);
    							}
    
    							@Override
    							protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
    								//buf.readableBytes()获取缓冲区可读的字节数
    								byte[] req = new byte[msg.readableBytes()];
    								// 将缓冲区的字节数组复制到新的byte数组中
    								msg.readBytes(req);
    								String body = new String(req, StandardCharsets.UTF_8);
    
    								System.out.println("Client received: " + body);
    							}
    
    						});
    					}
    				});
    
    		ChannelFuture future = clientBootstrap.connect("localhost", 8888).sync();
    		future.channel().closeFuture().sync();
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51

    测试

    我们这里先启动NettyServer,然后启动 NettyClient

    控制台输出信息如下

    • NettyClient

    • NettyServer

    首先我们可以看到我们在 Netty 添加Handler顺序如下

    pipeline.addLast(new CustomOutboundHandler2());
    						pipeline.addLast(new CustomOutboundHandler1());
    						pipeline.addLast(new CustomInboundHandler2());
    						pipeline.addLast(new CustomInboundHandler1());
    
    • 1
    • 2
    • 3
    • 4

    client Handler 执行顺序

    所以对于client执行的顺序是CustomOutboundHandler1CustomOutboundHandler2CustomInboundHandler2CustomInboundHandler1

    通俗的讲就是我们client发送消息的时候就是如下一个顺序

    只会执行OutboundHandler

    client接受到server返回的消息后的handler执行顺序如下

    只会执行InboundHandler

    server Handler 执行顺序

    服务端的顺序也是类似,由于client发送消息到server,所以对于服务端来说是接受消息

    所以服务端首先执行的顺序就是接受消息

    所以打印了

    CustomInboundHandler1 - channelRead

    CustomInboundHandler2 - channelRead

    Server received: 你好服务器

    server在处理完数据后会返回消息给client,这里server就是发送数据

    所以就是这个顺序

    CustomOutboundHandler2 - write

    CustomOutboundHandler1 - write

    图解handler顺序

    基于上面的demo演示,我们就总结了一个handler的执行顺序

    横看就是这个顺序,当然我们也可以通过上面的箭头看。

    如果是发送消息 就是 从下到上

    如果是接受消息就是从上到下

    再谈ctx.channel().writeAndFlushctx.writeAndFlush

    现在我们所有的handerl一般调用的方法都是
    ctx.channel().writeAndFlush

    那么ctx.channel().writeAndFlushctx.writeAndFlush方法有什么区别呢

    我们知道ChannelPipeline是一个双向链表结构。
    比如我们server添加handler的顺序如下

    ch.pipeline().addLast(new InboundHandler1());  
    ch.pipeline().addLast(new InboundHandler2()); 
    ch.pipeline().addLast(new OutboundHandler1());  
    ch.pipeline().addLast(new OutboundHandler2());
    
    • 1
    • 2
    • 3
    • 4

    那么在链表中的顺序就是如下

    head->in1->in2->out1->out2->tail
    
    • 1

    如果我们在InboundHandler2中执行ctx.channel().writeAndFlush

    那么输出的顺序就是

    InboundHandler1 
    InboundHandler2 
    OutboundHandler2
    OutboundHandler1
    
    • 1
    • 2
    • 3
    • 4

    如果在InboundHandler2中执行ctx.writeAndFlush,执行顺序是

    InboundHandler1 
    InboundHandler2 
    
    • 1
    • 2

    可以看到执行ctx.writeAndFlush他不会从tail节点向前找OutboundHandler

    ctx.channel().writeAndFlush则是从tail节点开始向前找OutboundHandler

    如果我们将handler顺序修改下改成如下

    ch.pipeline().addLast(new OutboundHandler1());  
    ch.pipeline().addLast(new OutboundHandler2());  
    ch.pipeline().addLast(new InboundHandler1());  
    ch.pipeline().addLast(new InboundHandler2());
    
    • 1
    • 2
    • 3
    • 4

    然后在InboundHandler2执行ctx.writeAndFlush,由于InboundHandler2在链表tail节点的前一个节点,所以输出的结果和执行
    ctx.channel().writeAndFlush一样
    都是

    InboundHandler1
    InboundHandler2
    OutboundHandler2 
    OutboundHandler1
    
    • 1
    • 2
    • 3
    • 4

    总结

    从上面大量的用例我们可以看出如下规律

    1. ChannelPipeline是双向链表结构,包含ChannelInboundHandlerChannelOutboundHandler两种处理器.也有处理器既是ChannelInboundHandler又是ChannelOutboundHandler,但是也属于这两种
    2. ctx.writeAndFlush只会从当前的handler位置开始,往前找outbound执行
    3. ctx.pipeline().writeAndFlushctx.channel().writeAndFlush会从tail的位置开始,往前找outbound执行
    4. InboundHandler的执行顺序一般是从上到下Head -> Tail
    5. OutboundHandler的执行顺序一般是从下到上,Tail -> Head

    源码

    • 源码: https://github.com/weihubeats/weihubeats_demos/tree/master/java-demos/netty-demo/src/test/java/com/weihubeats/netty/demo/channelPipeline

    参考

  • 相关阅读:
    酒店公共广播背景音乐-基于互联网+的酒店IP网络广播系统设计
    kylin使用心得
    PyTorch深度学习实践1——线性回归和Logistic回归
    hive调优原理详解:案例解析参数配置(第17天)
    网络协议-TCP、UDP区别及TCP三次握手、四次挥手
    基于hydra库实现yaml配置文件的读取(支持命令行参数)
    Tuxera2023最新版本新功能特性
    Flexmonster Pivot Table 2.9.59 Crack
    vuex详解
    【SQL 初级语法 6】 SQL 高级处理
  • 原文地址:https://blog.csdn.net/qq_42651904/article/details/134325940