• Netty编解码器和handler的调用机制


    基本说明

    1. netty的组件设计:Netty的主要组件有Channel、EventLoop、ChannelFuture、ChannelHandler、ChannelPipe等
    2. ChannelHandler充当了处理入站和出站数据的应用程序逻辑的容器。例如,实现ChannelInboundHandler接口(或ChannelInboundHandlerAdapter),你就可以接收入站事件和数据,这些数据会被业务逻辑处理。当要给客户端发送响应时,也可以从ChannelInboundHandler冲刷数据。业务逻辑通常写在一个或者多个ChannelInboundHandler中。ChannelOutboundHandler原理一样,只不过它是用来处理出站数据的
    3. ChannelPipeline提供了ChannelHandler链的容器。以客户端应用程序为例,如果事件的运动方向是从客户端到服务端的,那么我们称这些事件为出站的,即客户端发送给服务端的数据会通过pipeline中的一系列ChannelOutboundHandler,并被这些Handler处理,反之则称为入站的
      在这里插入图片描述

    编码解码

    1. 当Netty发送或者接受一个消息的时候,就将会发生一次数据转换。入站消息会被解码:从字节转换为另一种格式(比如java对象);如果是出站消息,它会被编码成字节。
    2. Netty提供一系列实用的编解码器,他们都实现了ChannelInboundHadnler或者ChannelOutboundHandler接口。在这些类中,channelRead方法已经被重写了。以入站为例,对于每个从入站Channel读取的消息,这个方法会被调用。随后,它将调用由解码器所提供的decode()方法进行解码,并将已经解码的字节转发给ChannelPipeline中的下一个ChannelInboundHandler。

    解码器-ByteToMessageDecoder

    1. 关系继承图
      在这里插入图片描述

    2. 由于不可能知道远程节点是否会一次性发送一个完整的信息,tcp有可能出现粘包拆包的问题,这个类会对入站数据进行缓冲,直到它准备好被处理.

    3. 一个关于ByteToMessageDecoder实例分析

    public class ToIntegerDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception
    {
    if (in.readableBytes() >= 4) {
    out.add(in.readInt());
    }
    }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    说明:

    1. 这个例子,每次入站从ByteBuf中读取4字节,将其解码为一个int,然后将它添加到下一个List中。当没有更多元素可以被添加到该List中时,它的内容将会被发送给下一个ChannelInboundHandler。int在被添加到List中时,会被自动装箱为Integer。在调用readInt()方法前必须验证所输入的ByteBuf是 否具有足够的数据

    案例

    客户端

    package com.jhj.netty.inboundhandlerandoutboundhandler;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    
    public class MyClient {
    
        public static void main(String[] args) throws Exception {
            EventLoopGroup group = new NioEventLoopGroup();
    
            try{
                Bootstrap bootstrap = new Bootstrap();
                bootstrap.group(group).channel(NioSocketChannel.class)
                        .handler(new MyClientInitializer());//自定义一个初始化类
    
                ChannelFuture channelFuture = bootstrap.connect("localhost", 7000).sync();
                channelFuture.channel().closeFuture().sync();
            }finally {
                group.shutdownGracefully();
            }
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    package com.jhj.netty.inboundhandlerandoutboundhandler;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    
    public class MyClientHandler extends SimpleChannelInboundHandler<Long> {
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("MyClientHandler 发送数据");
    
            ctx.writeAndFlush(123456L);
        }
    
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
            System.out.println(msg);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    package com.jhj.netty.inboundhandlerandoutboundhandler;
    
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.socket.SocketChannel;
    
    public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
    
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            //加入出站的handler 对数据进行一个编码
            pipeline.addLast(new MyLongToByteEncoder());
            pipeline.addLast(new MyByteToLongDecoder());
            //加入一个自定义的handler,处理业务
            pipeline.addLast(new MyClientHandler());
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    服务端

    package com.jhj.netty.inboundhandlerandoutboundhandler;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    
    public class Myserver {
    
        public static void main(String[] args) throws Exception {
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            
            try{
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
                        .childHandler(new MyServerInitializer());//自定义一个初始化类
    
                ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
                channelFuture.channel().closeFuture().sync();
            }finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    package com.jhj.netty.inboundhandlerandoutboundhandler;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    
    public class MyServerHandler extends SimpleChannelInboundHandler<Long> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
            System.out.println("读取到从客户端"+msg);
    
            //给客户端发送一个long
            ctx.writeAndFlush(98765l);
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    package com.jhj.netty.inboundhandlerandoutboundhandler;
    
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.socket.SocketChannel;
    
    public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
    
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            //入站的Handler进行解码 MyByteToLongDecoder
            //MessageToByteEncoder 和 ByteToMessageDecoder不冲突
            pipeline.addLast(new MyByteToLongDecoder());
            pipeline.addLast(new MyLongToByteEncoder());
            //自定义的handler 处理业务罗技
            pipeline.addLast(new MyServerHandler());
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    编解码器

    package com.jhj.netty.inboundhandlerandoutboundhandler;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.ByteToMessageDecoder;
    import io.netty.handler.codec.MessageToByteEncoder;
    
    import java.util.List;
    
    public class MyLongToByteEncoder extends MessageToByteEncoder<Long> {
    
        @Override
        protected void encode(ChannelHandlerContext ctx, Long msg, ByteBuf out) throws Exception {
            System.out.println("encode 被调用");
            System.out.println("msg="+msg);
            out.writeLong(msg);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    package com.jhj.netty.inboundhandlerandoutboundhandler;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.ByteToMessageDecoder;
    
    import java.util.List;
    
    public class MyByteToLongDecoder extends ByteToMessageDecoder {
        /**
         *decode 会根据接收的数据,被调用多次,知道确定没有新的元素被添加到list,或者是ByteBuf 没有更多的可读字节为止
         * 如果list out不为空 就会将list的内容传递给下一个channelinboundhandler处理,该处理器的方法也会调用多次
         *
         * @param ctx   上下文        the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
         * @param in    入站的ByteBuf        the {@link ByteBuf} from which to read data
         * @param out   List集合,将解码后的数据传给下一个handler        the {@link List} to which decoded messages should be added
         * @throws Exception
         */
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
    
            System.out.println("decoder被调用");
            //因为long8个字节,需要判断有8个字节才能读取一个long
            if (in.readableBytes()>=8){
                out.add(in.readLong());
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    总结

    不论解码器handler 还是 编码器handler 即接收的消息类型必须与待处理的消息类型一致,否则该handler不会被执行
    在解码器进行数据解码时,需要判断缓存区(ByteBuf)的数据是否足够 ,否则接收到的结果和期望结果可能不一致

    其他解码器

    解码器-ReplayingDecoder

    1. public abstract class ReplayingDecoder extends ByteToMessageDecoder
    2. ReplayingDecoder扩展了ByteToMessageDecoder类,使用这个类,我们不必调用readableBytes()方法。参数S指定了用户状态管理的类型,其中Void代表不需要状态管理
    3. 应用实例:使用ReplayingDecoder 编写解码器,对前面的案例进行简化 [案例演示]
    package com.jhj.netty.inboundhandlerandoutboundhandler;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.ByteToMessageDecoder;
    import io.netty.handler.codec.ReplayingDecoder;
    
    import java.util.List;
    
    public class MyByteToLongDecoder2 extends ReplayingDecoder<Void> {
        /**
         *decode 会根据接收的数据,被调用多次,知道确定没有新的元素被添加到list,或者是ByteBuf 没有更多的可读字节为止
         * 如果list out不为空 就会将list的内容传递给下一个channelinboundhandler处理,该处理器的方法也会调用多次
         *
         * @param ctx   上下文        the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
         * @param in    入站的ByteBuf        the {@link ByteBuf} from which to read data
         * @param out   List集合,将解码后的数据传给下一个handler        the {@link List} to which decoded messages should be added
         * @throws Exception
         */
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
    
            System.out.println("decoder被调用");
    
            //不需要判断数据是否足够读取内部会进行哦按段
            out.add(in.readLong());
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    1. ReplayingDecoder使用方便,但它也有一些局限性:
      • 并不是所有的 ByteBuf 操作都被支持,如果调用了一个不被支持的方法,将会抛出一个UnsupportedOperationException。
      • ReplayingDecoder 在某些情况下可能稍慢于 ByteToMessageDecoder,例如网络缓慢并且消息格式复杂时,消息会被拆成了多个碎片,速度变慢

    其它解码器

    1. LineBasedFrameDecoder:这个类在Netty内部也有使用,它使用行尾控制字符(\n或者\r\n)作为分隔符来解析数据。
    2. DelimiterBasedFrameDecoder:使用自定义的特殊字符作为消息的分隔符。
    3. HttpObjectDecoder:一个HTTP数据的解码器
    4. LengthFieldBasedFrameDecoder:通过指定长度来标识整包消息,这样就可以自动的处理黏包和半包消息。

    其它编码器

    在这里插入图片描述

    作者声明

    如有问题,欢迎指正!
    
    • 1
  • 相关阅读:
    软考高级之系统架构师之计算机硬件基础与嵌入式系统
    org.postgresql.util.PSQLException: Bad value for type long
    Goland常用快捷键设置
    对IP协议概念以及IP地址的概念进行简单整理
    中国信息通信研究院发布《中国金融科技生态白皮书》(2023)
    记一次PDU接室外监控溶解事故
    windows殺死端口
    Dataloader有哪些使用方法
    git版本管理的使用
    仓颉语言HelloWorld内测【仅需三步】
  • 原文地址:https://blog.csdn.net/weixin_45247019/article/details/126796002