• 11.netty入站与出站(处理器链调用机制)


    【README】

    • 1.本文源代码总结自 B站《netty-尚硅谷》;
    • 2.本文部分内容总结自 https://www.baeldung.com/netty
    • 3.本文主要介绍了通道管道中多个入栈处理器与多个出站处理器如何执行?并用代码演示执行顺序;

    补充:文末附带了 log4j整合到netty的配置;


    【1】事件与处理器

    1)概述:

    • netty使用了一种事件驱动的应用范式,因此数据处理的管道是一个经过处理器的事件链。事件和处理器可以关联到 inbound入站 与 outbound出站 数据流。

    2)入站事件如下(inbound):

    • channel通道激活与失活(activation and deactivation)
    • 读操作事件
    • 异常事件;
    • 用户事件;

    3)出站事件很简单,如下(outbound):

    • 打开与关闭连接;
    • 写出或刷新缓冲区数据

    4)netty应用包含许多网络和应用逻辑事件及它们的处理器。

    • 通道事件处理器的基本接口是 ChannelHandler,其子类有 ChannelOutboundHandler 和 ChannelInboundHandler ;

    【2】处理器链

    1)入站和出栈事件都会经过预设的处理器链(多个处理器);

    • 即入站事件经过 入站处理器;出站事件经过出站处理器;多个处理器形成一个链或管道;

    2)处理器举例:

    • 网络传输全是字节形式,而业务逻辑处理是对象形式,所以需要编码器把对象转字节,需要解码器把字节转对象
      • ByteToMessageDecoder 字节转消息(对象)解码器;
      • MessageToByteEncoder 消息(对象)转字节编码器;
    • 业务逻辑处理器(如加工,统计,入库,消息转发等);

    3)以客户端服务器模式介绍入站与出站处理器的事件处理过程

    【图解】

     客户端的处理器有:

    • 解码处理器;
    • 编码处理器;
    • 客户端业务处理器;

    服务端的处理器有:

    • 解码处理器;
    • 编码处理器;
    • 服务器业务处理器;

    补充:多个处理器封装到通道管道 ChannelPipeline;


    【3】处理器链调用机制代码实现

    1)需求描述:

    • 自定义编码器和解码器实现客户端与服务器间的数据传输;

    2)通道管道ChannelPipeline 可以封装多个处理器;其处理器执行顺序特别重要(前后关系特别重要,如入栈解码处理器要第1个执行,又如出站编码器要最后一个执行),否则客户端与服务器将无法通信(因为事件或数据要经过所有的处理器);类似于如下:

    1. for (event event : events) {
    2. handler1(event);
    3. handler2(event);
    4. handler3(event);
    5. }

    3)入站与出站处理器执行顺序:

    3.1)服务器初始化器,添加处理器到管道;

     3.2)客户端初始化器:


      【3.1】服务器

    1)服务器:用于监听网络端口,处理请求;

    1. /**
    2. * @Description 测试handler链的服务器
    3. * @author xiao tang
    4. * @version 1.0.0
    5. * @createTime 2022年09月10日
    6. */
    7. public class NettyServerForHandlerChain80 {
    8. public static void main(String[] args) throws InterruptedException {
    9. EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    10. EventLoopGroup workerGroup = new NioEventLoopGroup();
    11. try {
    12. ServerBootstrap serverBootstrap = new ServerBootstrap();
    13. serverBootstrap.group(bossGroup, workerGroup)
    14. .channel(NioServerSocketChannel.class)
    15. .childHandler(new NettyServerInitializer()); // 自定义一个初始化类
    16. // 自动服务器
    17. ChannelFuture channelFuture = serverBootstrap.bind(8089).sync();
    18. System.out.println("服务器启动成功");
    19. // 监听关闭
    20. channelFuture.channel().closeFuture().sync();
    21. } finally {
    22. bossGroup.shutdownGracefully();
    23. workerGroup.shutdownGracefully();
    24. }
    25. }
    26. }

    2)通道初始化器:把多个处理器添加到通道管道;

    1. /**
    2. * @Description 初始化器
    3. * @author xiao tang
    4. * @version 1.0.0
    5. * @createTime 2022年09月10日
    6. */
    7. public class NettyServerInitializer extends ChannelInitializer {
    8. @Override
    9. protected void initChannel(SocketChannel ch) throws Exception {
    10. ChannelPipeline pipeline = ch.pipeline();
    11. // 入站handler,把字节型数据解码为long型
    12. // pipeline.addLast(new MyByte2LongDecoder()); // MyByte2LongDecoder 与 MyByte2LongDecoder2 等价
    13. pipeline.addLast(new MyByte2LongDecoder2());
    14. // 出站handler, 把long型数据编码为字节(编码器)
    15. pipeline.addLast(new MyLong2ByteEncoder());
    16. // 添加业务逻辑handler
    17. pipeline.addLast(new NettyServerHandler());
    18. System.out.println("NettyServerInitializer.initChannel 执行成功.");
    19. }
    20. }

    3)服务端业务处理器:业务逻辑处理; 

    1. /**
    2. * @Description nety服务器handler
    3. * @author xiao tang
    4. * @version 1.0.0
    5. * @createTime 2022年09月10日
    6. */
    7. public class NettyServerHandler extends SimpleChannelInboundHandler {
    8. // 被调用多次
    9. @Override
    10. protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
    11. System.out.println("从客户端" + ctx.channel().remoteAddress() + "读取到long" + msg);
    12. // 给客户端回送消息
    13. ctx.writeAndFlush(98765L);
    14. }
    15. @Override
    16. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    17. cause.printStackTrace();
    18. ctx.close();
    19. }
    20. }

     【3.2】客户端

    1)客户端:建立与服务器的连接;

    1. /**
    2. * @Description netty客户端
    3. * @author xiao tang
    4. * @version 1.0.0
    5. * @createTime 2022年09月10日
    6. */
    7. public class NettyClientForHandlerChain81 {
    8. public static void main(String[] args) throws InterruptedException {
    9. EventLoopGroup group = new NioEventLoopGroup();
    10. try {
    11. Bootstrap bootstrap = new Bootstrap();
    12. bootstrap.group(group)
    13. .channel(NioSocketChannel.class)
    14. .handler(new NettyClientInitializer()); // 自定义一个初始化类
    15. // 连接服务器
    16. ChannelFuture channelFuture = bootstrap.connect("localhost", 8089).sync();
    17. channelFuture.channel().closeFuture().sync();
    18. } finally {
    19. group.shutdownGracefully();
    20. }
    21. }
    22. }

    2)通道初始化器:添加多个处理器到通道管道

    1. /**
    2. * @Description netty客户端初始化器
    3. * @author xiao tang
    4. * @version 1.0.0
    5. * @createTime 2022年09月10日
    6. */
    7. public class NettyClientInitializer extends ChannelInitializer {
    8. @Override
    9. protected void initChannel(SocketChannel ch) throws Exception {
    10. ChannelPipeline pipeline = ch.pipeline();
    11. // 出站handler,把long型数据解码为字节型
    12. pipeline.addLast(new MyLong2ByteEncoder());
    13. // 入站handler,把字节型数据解码为long型
    14. pipeline.addLast(new MyByte2LongDecoder());
    15. // 添加一个自定义handler(入站),处理业务逻辑
    16. pipeline.addLast(new NettyClientHandler());
    17. }
    18. }

    3)客户端业务处理器:业务逻辑处理 ;

    1. /**
    2. * @Description 客户端处理器
    3. * @author xiao tang
    4. * @version 1.0.0
    5. * @createTime 2022年09月10日
    6. */
    7. public class NettyClientHandler extends SimpleChannelInboundHandler {
    8. @Override
    9. protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
    10. System.out.println("得到服务器ip = " + ctx.channel().remoteAddress());
    11. System.out.println("收到服务器消息 = " + msg);
    12. }
    13. @Override
    14. public void channelActive(ChannelHandlerContext ctx) throws Exception {
    15. System.out.println("NettyClientHandler 发送数据");
    16. // 1 分析
    17. // 1.1 abcdefgabcdefgab 是16个字节, long型占8个字节,所以服务器需要解码2次,每次解码8个字节
    18. // 1.2 该处理器的前一个handler是 MyLong2ByteEncoder,
    19. // 1.3 MyLong2ByteEncoder 的父类是 MessageToByteEncoder
    20. // 1.4 MessageToByteEncoder.write()方法通过acceptOutboundMessage判断当前msg是否为要处理的数据类型;
    21. // 若不是,则跳过encode方法, 否则执行对应的encode 方法(处理方法)
    22. // 客户端发送一个ByteBuf,不走Long型编码器
    23. ctx.writeAndFlush(Unpooled.copiedBuffer("abcdefgabcdefgab", StandardCharsets.UTF_8));
    24. // 客户端发送一个Long,走Long型编码器
    25. // ctx.writeAndFlush(123456L); // 发送一个long
    26. }
    27. }

    【3.3】编码器与解码器处理器

    1)字节转 Long型的解码器处理器

    1. /**
    2. * @Description 字节转long的解码器
    3. * @author xiao tang
    4. * @version 1.0.0
    5. * @createTime 2022年09月10日
    6. */
    7. public class MyByte2LongDecoder extends ByteToMessageDecoder {
    8. /**
    9. * @description decode 会根据接收的数据,被调用多次,直到确定没有新元素被添加到list为止, 或者 ByteBuf没有更多的可读字节为止;
    10. * 如果list out 不为空,就会将list的内容传递给下一个 ChannelInboundHandler,
    11. * 且下一个 ChannelInboundHandler的处理方法也会被调用多次
    12. * @param ctx 处 理器上下文
    13. * @param in 字节输入缓冲
    14. * @param out 集合,把处理后的数据传给下一个 ChannelInboundHandler
    15. *
    16. * @return
    17. * @author xiao tang
    18. * @date 2022/9/10
    19. */
    20. @Override
    21. protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception {
    22. System.out.println("decode解码");
    23. // long 8个字节,需要判断有8个字节,才能读取一个long
    24. if (in.readableBytes() >= 8) {
    25. out.add(in.readLong());
    26. }
    27. }
    28. }
    29.  一个 解码器 变体;

      1. /**
      2. * @Description 字节转long的解码器
      3. * @author xiao tang
      4. * @version 1.0.0
      5. * @createTime 2022年09月10日
      6. */
      7. public class MyByte2LongDecoder2 extends ReplayingDecoder {
      8. /**
      9. * @description decode 会根据接收的数据,被调用多次,直到确定没有新元素被添加到list为止, 或者 ByteBuf没有更多的可读字节为止;
      10. * 如果list out 不为空,就会将list的内容传递给下一个 ChannelInboundHandler,
      11. * 且下一个 ChannelInboundHandler的处理方法也会被调用多次
      12. * @param ctx 处 理器上下文
      13. * @param in 字节输入缓冲
      14. * @param out 集合,把处理后的数据传给下一个 ChannelInboundHandler
      15. *
      16. * @return
      17. * @author xiao tang
      18. * @date 2022/9/10
      19. */
      20. @Override
      21. protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception {
      22. System.out.println("MyByte2LongDecoder2被调用, decode解码");
      23. // ReplayingDecoder,无需判断字节流是否足够读取,内部会进行处理判断
      24. // if (in.readableBytes() >= 8) { // 无需判断
      25. out.add(in.readLong());
      26. // }
      27. }
      28. }
      29. 2)Long型转字节的编码器处理器

        1. /**
        2. * @Description long转字节的编码器
        3. * @author xiao tang
        4. * @version 1.0.0
        5. * @createTime 2022年09月10日
        6. */
        7. public class MyLong2ByteEncoder extends MessageToByteEncoder {
        8. @Override
        9. protected void encode(ChannelHandlerContext ctx, Long msg, ByteBuf out) throws Exception {
        10. System.out.println("MyLong2ByteEncoder.encode 被调用");
        11. System.out.println("MyLong2ByteEncoder msg = " + msg);
        12. out.writeLong(msg);
        13. }
        14. }

        【3.4】运行效果

        1)客户端:

        1. decode解码
        2. 得到服务器ip = localhost/127.0.0.1:8089
        3. 收到服务器消息 = 98765
        4. decode解码
        5. 得到服务器ip = localhost/127.0.0.1:8089
        6. 收到服务器消息 = 98765

        2)服务器:

        1. MyByte2LongDecoder2被调用, decode解码
        2. 从客户端/127.0.0.1:56272读取到long7017280452245743457
        3. MyLong2ByteEncoder.encode 被调用
        4. MyLong2ByteEncoder msg = 98765
        5. MyByte2LongDecoder2被调用, decode解码
        6. 从客户端/127.0.0.1:56272读取到long7089620625083818338
        7. MyLong2ByteEncoder.encode 被调用
        8. MyLong2ByteEncoder msg = 98765

        【4】 log4j整合到 netty

        1)引入log4j maven依赖;

        1. <dependency>
        2. <groupId>log4jgroupId>
        3. <artifactId>log4jartifactId>
        4. <version>1.2.17version>
        5. dependency>
        6. <dependency>
        7. <groupId>org.slf4jgroupId>
        8. <artifactId>slf4j-apiartifactId>
        9. <version>1.7.25version>
        10. dependency>
        11. <dependency>
        12. <groupId>org.slf4jgroupId>
        13. <artifactId>slf4j-log4j12artifactId><version>1.7.25version>
        14. <scope>testscope>
        15. dependency>
        16. <dependency>
        17. <groupId>org.slf4jgroupId>
        18. <artifactId>slf4j-simpleartifactId><version>1.7.25version>
        19. <scope>testscope>
        20. dependency>

        2)配置log4j;

        1. log4j.rootLogger=DEBUG, stdout
        2. log4j.appender.stdout=org.apache.log4j.ConsoleAppender
        3. log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
        4. log4j.appender.stdout.layout.ConversionPattern=[%p] %C{1} - %m%n

        3)效果:

      30. 相关阅读:
        【Codeforces】Codeforces Round 905 (Div. 3)
        让golang程序生成coredump文件并进行调试
        电子元器件包装类型
        国际法试题及答案
        既不是研发顶尖高手,也不是销售大牛,为何偏偏获得 2 万 RMB 的首个涛思文化奖?
        DASCTF X CBCTF 2023|无畏者先行
        计算机网络复习总结4
        Redis应用问题解决
        Vue date与el的两种写法
        Arthas快速入门
      31. 原文地址:https://blog.csdn.net/PacosonSWJTU/article/details/126796461