• Netty 编解码与自定义编解码


    1、Netty为什么要提供编解码框架

    作为一个高性能NIO框架,编解码框架是Netty的重要组成部分。在Netty中,从网络读取的Inbound消息,需要通过解码,将二进制报文转换成应用协议或者业务消息,才能被上层应用逻辑识别和处理;同理,用户发送到网络的OutBound业务消息需要经过编码转换成二进制字节数组(对于Netty就是ByteBuf),才能发送到网络对端。

    2、Netty 编解码原理

    Netty提供了一系列实用的编码解码器,他们都实现了ChannelInboundHadnler或者ChannelOutboundHandler接口。在这些类中,channelRead方法已经被重写了。以入站为例,对于每个从入站Channel读取的消息,这个方法会被调用。随后,它将调用由已知解码器所提供的decode()方法进行解码,并将已经解码的字节转发给ChannelPipeline中的下一个ChannelInboundHandler。

    ChannelPipeline提供了ChannelHandler链的容器。以客户端应用程序为例,如果事件的运动方向是从客户端到服务端的,那么我们称这些事件为出站,即客户端发送给服务端的数据会通过pipeline中的一系列ChannelOutboundHandler(ChannelOutboundHandler调用是从tail到head方向逐个调用每个handler的逻辑),并被这些Handler处理,反之则称为入站的,入站只调用pipeline里的ChannelInboundHandler逻辑(ChannelInboundHandler调用是从head到tail方向逐个调用每个handler的逻辑)。
    在这里插入图片描述
    当通过Netty发送或者接受一个消息的时候,就将会发生一次数据转换。入站消息会被解码:从字节转换为另一种格式(比如java对象);如果是出站消息:它会被编码成字节;

    Netty提供了很多编解码器,比如编解码字符串的StringEncoder和StringDecoder,编解码对象的ObjectEncoder和ObjectDecoder等。
    在这里插入图片描述

    3、decode(解码器)原理

    什么是Netty的解码器?

    • 它是一个InBound入站处理器,负责处理“入站数据”。
    • 它能将上一站Inbound入站处理器传过来的输入(Input) 数据进行解码或者格式转换,然后发送到下一站Inbound入站处理器。

    解码器的职责为:将输入类型为ByteBuf的数据进行解码,输出一个一个的Java POJO对象。

    Netty中的解码器,都直接或者间接地实现了入站处理接口ChannelInboundHandler。

    3.1、ByteToMessageDecoder

    ByteToMessageDecoder是一个非常重要的解码器基类,是一个抽象类,实现了解码处理的基础逻辑和流程。ByteToMessageDecoder继承自ChannelInboundHandlerAdapter适配器,是一个入站处理器,用于完成从ByteBuf到Java POJO对象的解码功能。ByteToMessageDecoder会完成ByteBuf释放工作,它会调用ReferenceCountUtil.release(in)方法将之前的 ByteBuf缓冲区的引用计数减1。

    解码的流程:它将上一站传过来的输入到ByteBuf中的数据进行解码,解码出一个List对象列表;然后,迭代List列表,逐个将Java POJO对象传入下一站Inbound入站处理器。
    在这里插入图片描述
    ByteToMessageDecoder的解码方法为decode(),是一个抽象方法。解码需要子类实现,在子类中完成;ByteToMessageDecoder在设计上使用了模板模式(Template Pattern)

    3.2、自定义解码器

    实现一个解码器,首先要继承ByteToMessageDecoder抽象类,然 后实现其基类的decode()抽象方法。总体来说,流程大致如下:

    1. 继承ByteToMessageDecoder抽象类。
    2. 实现基类的decode()抽象方法,将ByteBuf到目标POJO的解码逻辑写入此方法,以将ByteBuf中的二进制数据解码成一个一个的 Java POJO对象。
    3. 解码完成后,需要将解码后的Java POJO对象放入decode() 方法的List实参中,此实参是父类所传入的解码结果收集容器。
    4. 余下的工作都由父类ByteToMessageDecoder自动完成。在流水线 的处理过程中,父类在执行完子类的解码后,会将List收集 到的结果一个一个地传递到下一个Inbound入站处理器。
    @Slf4j
    public class IntegerDecode extends ByteToMessageDecoder {
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            // int 占用4个字节
            while (in.readableBytes() >= 4) {
                int i = in.readInt();
                log.info("解码出一个整数: " + i);
                out.add(i);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import lombok.extern.slf4j.Slf4j;
    
    @Slf4j
    public class IntegerDecodeHandel extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            Integer integer = (Integer) msg;
    
            //向通道写一个入站报文(数据包)
            System.out.println(integer);
            super.channelRead(ctx, msg);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    
        @Test
        public void integerDecodeHandelTest(){
            ChannelInitializer i =new ChannelInitializer<EmbeddedChannel>() {
                protected void initChannel(EmbeddedChannel ch) {
                    ChannelPipeline pipeline = ch.pipeline();
                    pipeline.addLast(new IntegerDecode());
                    pipeline.addLast(new IntegerDecodeHandel());
                }
            };
            EmbeddedChannel channel = new EmbeddedChannel(i);
            ByteBuf buf = Unpooled.buffer();
            buf.writeInt(10);
            channel.writeInbound(buf);
            channel.flush();
            //通道关闭
            channel.close();
    
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    3.2、ReplayingDecoder解码器

    上例中:IntegerDecode中会面临一个问题:需要对ByteBuf的长度进行检查,有足够的字节才能进行整数的读取。Netty中使用ReplayingDecoder也可以处理此问题

    ReplayingDecoder类是ByteToMessageDecoder的子类,作用是:

    • 在读取ByteBuf缓冲区的数据之前,需要检查缓冲区是否有足够的字节。

    • 若ByteBuf中有足够的字节,则会正常读取;反之,则会停止解码。

    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.ReplayingDecoder;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.List;
    
    @Slf4j
    public class IntegerReplayDecoder extends ReplayingDecoder<Integer> {
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            int i = in.readInt();
            log.info("IntegerReplayDecoder解码出一个整数: " + i);
            out.add(i);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    ReplayingDecoder原理:内部定义一个新的二进制缓冲区类(类名为ReplayingDecoderBuffer),又对ByteBuf缓冲区进行装饰。该装饰器的特点是,在缓冲区真正读数据之前先进行长度的判断:如果长度合格,就读取数据;否则,抛出ReplayError。ReplayingDecoder捕获到ReplayError后会留着数据,等待下一次IO事件到来时再读取。

    4、encode(编码器)原理

    在Netty的业务处理完成后,业务处理的结果往往是某个Java POJO对象需要编码成最终的ByteBuf二进制类型,通过流水线写入底层的Java通道,这就需要用到Encoder(编码器)

    编码器是ChannelOutboundHandler的具体实现类。一个编码器将 出站对象编码之后,数据将被传递到下一个ChannelOutboundHandler 出站处理器进行后面的出站处理。

    由于最后只有ByteBuf才能写入通道中,因此可以肯定通道流水线上装配的第一个编码器一定是把数据编码成了ByteBuf类型

    为什么编码成的最终ByteBuf类型数据包的编码器是在流水线的头部,而不是在流水线的尾部呢?
    原因:出站处理的顺序是从后向前的。

    4.1、MessageToByteEncoder编码器

    MessageToByteEncoder是一个非常重要的编码器基类。MessageToByteEncoder的功能是将 一个Java POJO对象编码成一个ByteBuf数据包。它是一个抽象类,仅仅实现了编码的基础流程,在编码过程中通过调用encode()抽象方法来完成。需要子类去实现

    自定义编码器

    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.MessageToByteEncoder;
    import lombok.extern.slf4j.Slf4j;
    
    @Slf4j
    public class IntegerEncode extends MessageToByteEncoder<Integer> {
    
        @Override
        protected void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out) throws Exception {
            out.writeInt(msg);
            log.info("IntegerEncode:{}",msg);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    测试方法

    
        @Test
        public void integerEncodeHandelTest(){
            ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {
                protected void initChannel(EmbeddedChannel ch) {
                    ChannelPipeline pipeline = ch.pipeline();
                    pipeline.addLast(new IntegerEncode());
                }
            };
            EmbeddedChannel channel = new EmbeddedChannel(i);
            channel.write(10);
            channel.flush();
            channel.write(11);
            channel.flush();
            channel.write(12);
            channel.flush();
    
            //取得通道的出站数据包
            ByteBuf outBuf = (ByteBuf) channel.readOutbound();
            while (null != outBuf) {
                System.out.println("outBuf:"+outBuf.readInt());
                outBuf = channel.readOutbound();
            }
            //通道关闭
            channel.close();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    在这里插入图片描述

    4.2、MessageToMessageEncoder编码器

    MessageToMessageEncoder编码器将某种POJO对象编码成另外一种POJO对象

    示例,将String转成Integer出站

    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.MessageToMessageEncoder;
    
    import java.util.List;
    
    public class StringToIntegerEncoder extends MessageToMessageEncoder<String> {
        @Override
        protected void encode(ChannelHandlerContext ctx, String msg, List<Object> list) throws Exception {
            char[] chars = msg.toCharArray();
            for (char aChar : chars) {
    
                //48 是0的编码,57 是9 的编码
                if (aChar >= 48 && aChar <= 57) {
                    list.add(new Integer(aChar));
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    测试方法

    
        @Test
        public void stringToIntegerEncodeHandelTest(){
            ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {
                protected void initChannel(EmbeddedChannel ch) {
                    ChannelPipeline pipeline = ch.pipeline();
                    pipeline.addLast(new IntegerEncode());
                    pipeline.addLast(new StringToIntegerEncoder());
                }
            };
            EmbeddedChannel channel = new EmbeddedChannel(i);
            channel.write("10");
            channel.write("11");
            channel.write("12");
            channel.flush();
    
            //取得通道的出站数据包
            ByteBuf outBuf = (ByteBuf) channel.readOutbound();
            while (null != outBuf) {
                System.out.println("outBuf:"+outBuf.readInt());
                outBuf = channel.readOutbound();
            }
            //通道关闭
            channel.close();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    测试用例中除了需要使用StringToIntegerEncoder编码器外,还需要用到IntegerEncode编码器。StringToIntegerEncoder仅仅是编码的第一棒,负责将字符串编码成整数;IntegerEncode是编码的第二棒,将整数进一步变成ByteBuf数据包后才能最终写入通道。 由于出站处理的过程是从后向前的次序,因此Integer2ByteEncoder先加入流水线,String2IntegerEncoder后加入流水线。是有先后顺序的

    5、ByteToMessageCodec编解码器

    完成POJO到ByteBuf数据包的编解码器。基类为ByteToMessageCodec,它是一个抽象类。从功能上说,继承 ByteToMessageCodec就等同于继承了ByteToMessageDecoder和 MessageToByteEncoder这两个基类。

    编解码器ByteToMessageCodec同时包含了编码encode()和解码decode()两个抽象方法,完成了编码器和解码器的结合,这两个方法都需要子类实现。在使用时,加入流水线时也只需要加入 一次。

    
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.ByteToMessageCodec;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.List;
    
    @Slf4j
    public class IntegerEncodeAndDecode extends ByteToMessageCodec<Integer> {
    
        @Override
        protected void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out) throws Exception {
            out.writeInt(msg);
            log.info("IntegerEncode:{}",msg);
        }
    
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            // int 占用4个字节
            while (in.readableBytes() >= 4) {
                int i = in.readInt();
                log.info("解码出一个整数: " + i);
                out.add(i);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    测试用例

    
        @Test
        public void integerEncodeAndDecodeTest(){
            ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {
                protected void initChannel(EmbeddedChannel ch) {
                    ChannelPipeline pipeline = ch.pipeline();
                    pipeline.addLast(new IntegerEncodeAndDecode());
                }
            };
            EmbeddedChannel channel = new EmbeddedChannel(i);
            channel.write(1);
            channel.write(2);
            channel.write(3);
            channel.flush();
    
            //取得通道的出站数据包
            ByteBuf outBuf = (ByteBuf) channel.readOutbound();
            while (null != outBuf) {
                System.out.println("outBuf:"+outBuf.readInt());
                outBuf = channel.readOutbound();
            }
            //通道关闭
            channel.close();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    6、CombinedChannelDuplexHandler编解码组合器

    编码器和解码器组合起来,在类中更清晰可见。与继承相比,组合会带来更大的灵活性(编码
    器和解码器可以捆绑使用,也可以单独使用。)

    
    import com.example.netty.decode.IntegerReplayDecoder;
    import io.netty.channel.CombinedChannelDuplexHandler;
    
    /**
     * 编解码组合器
     * @author
     */
    public class IntegerDuplexHandler extends CombinedChannelDuplexHandler<IntegerReplayDecoder,IntegerEncode> {
    
        public IntegerDuplexHandler() {
            super(new IntegerReplayDecoder(), new IntegerEncode());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    7、protostuff

    如果要实现高效的编解码可以用protobuf,但是protobuf需要维护大量的proto文件比较麻烦,现在一般可以使用protostuff。MessageToByteEncoder的功能是将 一个Java POJO对象编码成一个ByteBuf数据包。它是一个抽象类,仅仅实现了编码的基础流程,在编码过程中通过调用encode()抽象方法来完成。需要子类去实现

    protostuff是一个基于protobuf实现的序列化方法,它较于protobuf最明显的好处是,在几乎不损耗性能的情况下做到了不用写.proto文件来实现序列化。使用它也非常简单

     
     <dependency>
         <groupId>com.dyuproject.protostuffgroupId>
         <artifactId>protostuff-apiartifactId>
         <version>1.0.10version>
     dependency>
     <dependency>
         <groupId>com.dyuproject.protostuffgroupId>
         <artifactId>protostuff-coreartifactId>
         <version>1.0.10version>
     dependency>
     <dependency>
         <groupId>com.dyuproject.protostuffgroupId>
         <artifactId>protostuff-runtimeartifactId>
         <version>1.0.10version>
     dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    
    import com.dyuproject.protostuff.LinkedBuffer;
    import com.dyuproject.protostuff.ProtostuffIOUtil;
    import com.dyuproject.protostuff.Schema;
    import com.dyuproject.protostuff.runtime.RuntimeSchema;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    
    /**
     * protostuff 序列化工具类,基于protobuf封装
     */
    public class ProtostuffUtil {
    
        private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<Class<?>, Schema<?>>();
    
        private static <T> Schema<T> getSchema(Class<T> clazz) {
            @SuppressWarnings("unchecked")
            Schema<T> schema = (Schema<T>) cachedSchema.get(clazz);
            if (schema == null) {
                schema = RuntimeSchema.getSchema(clazz);
                if (schema != null) {
                    cachedSchema.put(clazz, schema);
                }
            }
            return schema;
        }
    
        /**
         * 序列化
         *
         * @param obj
         * @return
         */
        public static <T> byte[] serializer(T obj) {
            @SuppressWarnings("unchecked")
            Class<T> clazz = (Class<T>) obj.getClass();
            LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
            try {
                Schema<T> schema = getSchema(clazz);
                return ProtostuffIOUtil.toByteArray(obj, schema, buffer);
            } catch (Exception e) {
                throw new IllegalStateException(e.getMessage(), e);
            } finally {
                buffer.clear();
            }
        }
    
        /**
         * 反序列化
         *
         * @param data
         * @param clazz
         * @return
         */
        public static <T> T deserializer(byte[] data, Class<T> clazz) {
            try {
                T obj = clazz.newInstance();
                Schema<T> schema = getSchema(clazz);
                ProtostuffIOUtil.mergeFrom(data, obj, schema);
                return obj;
            } catch (Exception e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65

    测试类中测试方法

        @Test
        public void protostuffTestHandler(){
            ChannelInitializer i =new ChannelInitializer<EmbeddedChannel>() {
                protected void initChannel(EmbeddedChannel ch) {
                    ChannelPipeline pipeline = ch.pipeline();
                    pipeline.addLast(new ProtostuffTestHandler());
                }
            };
            EmbeddedChannel channel = new EmbeddedChannel(i);
            byte[] userBytes = ProtostuffUtil.serializer(new User(1, "zhangsan"));
            channel.writeInbound(userBytes);
            channel.flush();
            //通道关闭
            channel.close();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    
    /**
     * @author 
     */
    public class ProtostuffTestHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    
            User user = ProtostuffUtil.deserializer((byte[])msg, User.class);
    
            //向通道写一个入站报文(数据包)
            System.out.println("ProtostuffTestHandler:"+user);
            super.channelRead(ctx, msg);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    8、xml/json/protobuf性能对比

    对比项xmljsonprotobuf
    协议通用通用私有protocol buffer协议
    序列化数据大小2倍json1/2json大小
    序列化可读性可读(文本结构)可读 (文本结构)不可读(二进制)
    并发性一般
    学习成本高(需要维护proto文件,使用protostuff相对简单)
  • 相关阅读:
    洛谷-P5019-铺设道路
    408考研科目《数据结构》第三章第一节:栈和队列
    Lua 协程 + 过滤器实现生产者和消费者
    规范 Git 提交说明
    操作系统目录与文件系统---21
    随笔-就地过节
    【果蔬识别】Python+卷积神经网络算法+深度学习+人工智能+机器学习+TensorFlow+计算机课设项目+算法模型
    论文解读《Cauchy Graph Embedding》
    redis(13)持久化操作-AOF
    设计模式 --单例模式
  • 原文地址:https://blog.csdn.net/Extraordinarylife/article/details/126082088