• NIO- Handler业务处理器


    Handler业务处理器

    Reactor反应器经典模型中,反应器查询到IO事件后,分发到Handler业务处理器,由Handler完成IO操作和业务处理。整个的IO处理操作环节包括:从通道读数据包、数据包解码、业务处理、目标数据编码、把数据包写到通道,然后由通道发送到对端,如图6-8所示。

    在这里插入图片描述前后两个环节,从通道读数据包和由通道发送到对端,由Netty的底层负责完成,不需要用户程序负责。用户程序主要在Handler业务处理器中,Handler涉及的环节为:数据包解码、业务处理、目标数据编码、把数据包写到通道中。

    前面已经介绍过,从应用程序开发人员的角度来看,有入站和出站两种类型操作。

    · 入站处理,触发的方向为:自底向上,Netty的内部(如通道)到ChannelInboundHandler入站处理器。
    · 出站处理,触发的方向为:自顶向下,从ChannelOutboundHandler出站处理器到Netty的内部(如通道)。

    按照这种方向来分,前面数据包解码、业务处理两个环节——属于入站处理器的工作;后面目标数据编码、把数据包写到通道中两个环节——属于出站处理器的工作。

    1 ChannelInboundHandler通道入站处理器

    当数据或者信息入站到Netty通道时,Netty将触发入站处理器ChannelInboundHandler所对应的入站API,进行入站操作处理。ChannelInboundHandler的主要操作,如图6-9所示,具体的介绍如下:

    在这里插入图片描述
    1.channelRegistered
    当通道注册完成后,Netty会调用fireChannelRegistered,触发通道注册事件。通道会启动该入站操作的流水线处理,在通道注册过的入站处理器Handler的channelRegistered方法,会被调用到。

    2.channelActive
    当通道激活完成后,Netty会调用fireChannelActive,触发通道激活事件。通道会启动该入站操作的流水线处理,在通道注册过的入站处理器Handler的channelActive方法,会被调用到。

    3.channelRead
    当通道缓冲区可读,Netty会调用fireChannelRead,触发通道可读事件。通道会启动该入站操作的流水线处理,在通道注册过的入站处理器Handler的channelRead方法,会被调用到。

    4.channelReadComplete

    当通道缓冲区读完,Netty会调用fireChannelReadComplete,触发通道读完事件。通道会启动该入站操作的流水线处理,在通道注册过的入站处理器Handler的channelReadComplete方法,会被调用到。

    5.channelInactive

    当连接被断开或者不可用,Netty会调用fireChannelInactive,触发连接不可用事件。通道会启动对应的流水线处理,在通道注册过的入站处理器Handler的channelInactive方法,会被调用到。

    6.exceptionCaught

    当通道处理过程发生异常时,Netty会调用fireExceptionCaught,触发异常捕获事件。通道会启动异常捕获的流水线处理,在通道注册过的处理器Handler的exceptionCaught方法,会被调用到。注意,这个方法是在通道处理器中ChannelHandler定义的方法,入站处理器、出站处理器接口都继承到了该方法。

    上面介绍的并不是ChanneInboundHandler的全部方法,仅仅介绍了其中几种比较重要的方法。在Netty中,它的默认实现为ChannelInboundHandlerAdapter,在实际开发中,只需要继承这个ChannelInboundHandlerAdapter默认实现,重写自己需要的方法即可。

    2 ChannelOutboundHandler通道出站处理器

    当业务处理完成后,需要操作Java NIO底层通道时,通过一系列的ChannelOutboundHandler通道出站处理器,完成Netty通道到底层通道的操作。比方说建立底层连接、断开底层连接、写入底层Java NIO通道等。ChannelOutboundHandler接口定义了大部分的出站操作,如图6-10所示,具体的介绍如下:
    在这里插入图片描述再强调一下,出站处理的方向:是通过上层Netty通道,去操作底层Java IO通道。主要出站(Outbound)的操作如下:

    1.bind

    监听地址(IP+端口)绑定:完成底层Java IO通道的IP地址绑定。如果使用TCP传输协议,这个方法用于服务器端。

    2.connect

    连接服务端:完成底层Java IO通道的服务器端的连接操作。如果使用TCP传输协议,这个方法用于客户端。

    3.write

    写数据到底层:完成Netty通道向底层Java IO通道的数据写入操作。此方法仅仅是触发一下操作而已,并不是完成实际的数据写入操作。

    4.flush

    腾空缓冲区中的数据,把这些数据写到对端:将底层缓存区的数据腾空,立即写出到对端。

    5.read

    从底层读数据:完成Netty通道从Java IO通道的数据读取。

    6.disConnect

    断开服务器连接:断开底层Java IO通道的服务器端连接。如果使用TCP传输协议,此方法主要用于客户端。

    7.close

    主动关闭通道:关闭底层的通道,例如服务器端的新连接监听通道。

    上面介绍的并不是ChannelOutboundHandler的全部方法,仅仅介绍了其中几个比较重要的方法。在Netty中,它的默认实现为ChannelOutboundHandlerAdapter,在实际开发中,只需要继承这个ChannelOutboundHandlerAdapter默认实现,重写自己需要的方法即可。

    3 ChannelInitializer通道初始化处理器

    在前面已经讲到,通道和Handler业务处理器的关系是:一条Netty的通道拥有一条Handler业务处理器流水线,负责装配自己的Handler业务处理器。装配Handler的工作,发生在通道开始工作之前。现在的问题是:如果向流水线中装配业务处理器呢?这就得借助通道的初始化类——ChannelInitializer。

    首先回顾一下NettyDiscardServer丢弃服务端的代码,在给接收到的新连接装配Handler业务处理器时,使用childHandler()方法设置了一个ChannelInitializer实例:

            //5 装配子通道流水线
            b.childHandler(new ChannelInitializer<SocketChannel>() {
                //有连接到达时会创建一个通道
                protected void initChannel(SocketChannel ch) throws Exception {
                    // 流水线管理子通道中的Handler业务处理器
                    // 向子通道流水线添加一个Handler业务处理器
                    ch.pipeline().addLast(new NettyDiscardHandler());
                }
            });
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    上面的ChannelInitializer也是通道初始化器,属于入站处理器的类型。在示例代码中,使用了ChannelInitializer的initChannel() 方法。它是何方神圣呢?

    initChannel()方法是ChannelInitializer定义的一个抽象方法,这个抽象方法需要开发人员自己实现。在父通道调用initChannel()方法时,会将新接收的通道作为参数,传递给initChannel()方法。initChannel()方法内部大致的业务代码是:拿到新连接通道作为实际参数,往它的流水线中装配Handler业务处理器。

    4 ChannelInboundHandler的生命周期的实践案例

    为了弄清Handler业务处理器的各个方法的执行顺序和生命周期,这里定义一个简单的入站Handler处理器——InHandlerDemo。这个类继承于ChannelInboundHandlerAdapter适配器,它实现了基类的大部分入站处理方法,并在每一个方法的实现代码中都加上必要的输出信息,以便于观察方法是否被执行到。

    InHandlerDemo的代码如下:

            package com.crazymakercircle.netty.handler;
            //...
            public class InHandlerDemo extends ChannelInboundHandlerAdapter {
                @Override
                public void handlerAdded(ChannelHandlerContextctx) throws Exception {
                  Logger.info("被调用:handlerAdded()");
                  super.handlerAdded(ctx);
                }
                @Override
                public void channelRegistered(ChannelHandlerContextctx) throws Exception
              {
                  Logger.info("被调用:channelRegistered()");
                  super.channelRegistered(ctx);
                }
                @Override
                public void channelActive(ChannelHandlerContextctx) throws Exception {
                  Logger.info("被调用:channelActive()");
                  super.channelActive(ctx);
                }
                @Override
                public void channelRead(ChannelHandlerContextctx, Object msg) throws
        Exception {
                  Logger.info("被调用:channelRead()");
                  super.channelRead(ctx, msg);
                }
                @Override
                public void channelReadComplete(ChannelHandlerContextctx) throws
        Exception {
                  Logger.info("被调用:channelReadComplete()");
                  super.channelReadComplete(ctx);
                }
                @Override
                public void channelInactive(ChannelHandlerContextctx) throws Exception {
                  Logger.info("被调用:channelInactive()");
                  super.channelInactive(ctx);
                }
                @Override
                public void channelUnregistered(ChannelHandlerContextctx) throws
        Exception {
                    Logger.info("被调用: channelUnregistered()");
                    super.channelUnregistered(ctx);
                }
                @Override
                public void handlerRemoved(ChannelHandlerContextctx) throws Exception {
                    Logger.info("被调用:handlerRemoved()");
                    super.handlerRemoved(ctx);
                }
            }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49

    为了演示这个入站处理器,需要编写一个单元测试代码:将上面的Inhandler入站处理器加入到一个EmbeddedChannel嵌入式通道的流水线中。接着,通过writeInbound方法写入ByteBuf数据包。InHandlerDemo作为一个入站处理器,会处理从通道到流水线的入站报文——ByteBuf数据包。单元测试的代码如下:

            package com.crazymakercircle.netty.handler;
            //...
            public class InHandlerDemoTester {
                @Test
                public void testInHandlerLifeCircle() {
                  final InHandlerDemo inHandler = new InHandlerDemo();
                  //初始化处理器
                  ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {
                      protected void initChannel(EmbeddedChannelch) {
                          ch.pipeline().addLast(inHandler);
                      }
                  };
                  //创建嵌入式通道
                  EmbeddedChannel channel = new EmbeddedChannel(i);
                  ByteBuf buf = Unpooled.buffer();
                  buf.writeInt(1);
                  //模拟入站,写一个入站数据包
                  channel.writeInbound(buf);
                  channel.flush();
                  //模拟入站,再写一个入站数据包
                  channel.writeInbound(buf);
                  channel.flush();
                  //通道关闭
                  channel.close();
                  try {
                      Thread.sleep(Integer.MAX_VALUE);
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
                }
            }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    运行上面的测试用例,输出的结果具体如下:

            [main|InHandlerDemo:handlerAdded]:被调用:handlerAdded()
            [main|InHandlerDemo:channelRegistered]:被调用:channelRegistered()
            [main|InHandlerDemo:channelActive]:被调用:channelActive()
            [main|InHandlerDemo:channelRead]:被调用:channelRead()
            [main|InHandlerDemo:channelReadComplete]:被调用:channelReadComplete()
            [main|InHandlerDemo:channelRead]:被调用:channelRead()
            [main|InHandlerDemo:channelReadComplete]:被调用:channelReadComplete()
            [main|InHandlerDemo:channelInactive]:被调用:channelInactive()
            [main|InHandlerDemo:channelUnregistered]:被调用: channelUnregistered()
            [main|InHandlerDemo:handlerRemoved]:被调用:handlerRemoved()
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    在讲解上面的方法之前,首先对方法进行分类:
    (1)生命周期方法,
    (2)入站回调方法。

    上面的几个方法中,channelRead、channelReadComplete是入站处理方法;而其他的6个方法是入站处理器的周期方法。

    从输出的结果可以看到,ChannelHandler中的回调方法的执行顺序为:handlerAdded() →channelRegistered() → channelActive() → 入站方法回调→ channelInactive() → channelUnregistered()→ handlerRemoved()。其中,读数据的入站回调为:channelRead() → channelReadComplete();入站方法会多次调用,每一次有ByteBuf数据包入站都会调用到。

    除了两个入站回调方法外,其余的6个方法都和ChannelHandler的生命周期有关,具体的介绍如下:

    (1)handlerAdded() :当业务处理器被加入到流水线后,此方法被回调。也就是在完成ch.pipeline().addLast(handler)语句之后,会回调handlerAdded()。
    (2)channelRegistered():当通道成功绑定一个NioEventLoop线程后,会通过流水线回调所有业务处理器的channelRegistered()方法。
    (3)channelActive():当通道激活成功后,会通过流水线回调所有业务处理器的channelActive()方法。通道激活成功指的是,所有的业务处理器添加、注册的异步任务完成,并且NioEventLoop线程绑定的异步任务完成。
    (4)channelInactive():当通道的底层连接已经不是ESTABLISH状态,或者底层连接已经关闭时,会首先回调所有业务处理器的channelInactive()方法。
    (5)channelUnregistered():通道和NioEventLoop线程解除绑定,移除掉对这条通道的事件处理之后,回调所有业务处理器的channelUnregistered ()方法。
    (6)handlerRemoved():最后,Netty会移除掉通道上所有的业务处理器,并且回调所有的业务处理器的handlerRemoved()方法。

    在上面的6个生命周期方法中,前面3个在通道创建的时候被先后回调,后面3个在通道关闭的时候会先后被回调。

    除了生命周期的回调,就是入站和出站处理的回调。对于Inhandler入站处理器,有两个很重要的回调方法为:
    (1)channelRead():有数据包入站,通道可读。流水线会启动入站处理流程,从前向后,入站处理器的channelRead()方法会被依次回调到。
    (2)channelReadComplete():流水线完成入站处理后,会从前向后,依次回调每个入站处理器的channelReadComplete()方法,表示数据读取完毕。

    至此,大家对ChannelInboundHandler的生命周期和入站业务处理,有一个非常清楚的了解。

    上面的入站处理器实践案例InHandlerDemo,演示的是入站处理器的工作流程。对于出站处理器ChannelOutboundHandler的生命周期以及回调的顺序,与入站处理器是大致相同的。不同的是,出站处理器的业务处理方法。

    在实践案例的Maven源代码工程中,有一个关于出站处理器的实践案例——OutHandlerDemo。它的代码、包名和上面的类似,大家可以自己去运行和学习,这里就不再赘述。

  • 相关阅读:
    LVS负载均衡集群和NAT模式群集部署
    Oracle expdp/impdp 及 exp/imp 命令详解
    Navicat 16.1 的新功能 - 第 1 部分
    Docker容器相关命令
    苹果智能戒指再曝光,或将配合头显来应用,支持无线充电
    Java--MyBatis传入参数parameterType
    用pytorch给深度学习加速:正交与谱归一化技术
    基于Citespace、vosviewer、R语言的文献计量学可视化分析技术及全流程文献可视化SCI论文高效写作
    java计算机毕业设计红色主题旅游网站MyBatis+系统+LW文档+源码+调试部署
    【动态库和静态库】
  • 原文地址:https://blog.csdn.net/yitian881112/article/details/127644410