• 【Netty】Netty 编解码器(十四)


    前言

    回顾Netty系列文章:

    其实针对编码和解码,Netty 还提供了第三种方式,那就是编解码器。编解码器顾名思义,就是结合了编码和解码功能的程序。

    编解码器能够把入站和出站的数据和信息转换都放在同一个类中,对于某些场景来说显得更实用。

    一、编解码器概述

    Netty 提供了抽象的编解码器类,能把一些成对的解码器和编码器组合在一起,以此来提供对字节和消息都相同的操作。这些类实现了ChannelOutboundHandler和ChannelInboundHandler接口。

    Netty 的编解码器抽象类主要有以下两种:

    • 实现从字节到消息的编解码(ByteToMessageCodec)。

    • 实现从消息到消息的编解码(MessageToMessageCodec)。

    二、ByteToMessageCodec 抽象类

    ByteToMessageCodec 抽象类用于将字节实时编码/解码为消息的编解码器,可以将其视为ByteToMessageDecoder和MessageToByteEncoder的组合。

    需要注意的是,ByteToMessageCodec 的子类绝不能使用 @Sharable 进行注释。

    ByteToMessageCodec 抽象类的核心源码如下:

    public abstract class ByteToMessageCodec<I> extends ChannelDuplexHandler {
    
        private final TypeParameterMatcher outboundMsgMatcher;
        private final MessageToByteEncoder<I> encoder;
    
        private final ByteToMessageDecoder decoder = new ByteToMessageDecoder() {
            @Override
            public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
                ByteToMessageCodec.this.decode(ctx, in, out);
            }
    
            @Override
            protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
                ByteToMessageCodec.this.decodeLast(ctx, in, out);
            }
        };
    
        
        protected ByteToMessageCodec() {
            this(true);
        }
    
       
        protected ByteToMessageCodec(Class<? extends I> outboundMessageType) {
            this(outboundMessageType, true);
        }
    
      
        protected ByteToMessageCodec(boolean preferDirect) {
            ensureNotSharable();
            outboundMsgMatcher = TypeParameterMatcher.find(this, ByteToMessageCodec.class, "I");
            encoder = new Encoder(preferDirect);
        }
    
       
        protected ByteToMessageCodec(Class<? extends I> outboundMessageType, boolean preferDirect) {
            ensureNotSharable();
            outboundMsgMatcher = TypeParameterMatcher.get(outboundMessageType);
            encoder = new Encoder(preferDirect);
        }
    
       
        public boolean acceptOutboundMessage(Object msg) throws Exception {
            return outboundMsgMatcher.match(msg);
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            decoder.channelRead(ctx, msg);
        }
    
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            encoder.write(ctx, msg, promise);
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            decoder.channelReadComplete(ctx);
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            decoder.channelInactive(ctx);
        }
    
        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            try {
                decoder.handlerAdded(ctx);
            } finally {
                encoder.handlerAdded(ctx);
            }
        }
    
        @Override
        public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
            try {
                decoder.handlerRemoved(ctx);
            } finally {
                encoder.handlerRemoved(ctx);
            }
        }
    
        protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception;
    
        protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
    
        protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            if (in.isReadable()) {
                decode(ctx, in, out);
            }
        }
    
        private final class Encoder extends MessageToByteEncoder<I> {
            Encoder(boolean preferDirect) {
                super(preferDirect);
            }
    
            @Override
            public boolean acceptOutboundMessage(Object msg) throws Exception {
                return ByteToMessageCodec.this.acceptOutboundMessage(msg);
            }
    
            @Override
            protected void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception {
                ByteToMessageCodec.this.encode(ctx, msg, out);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110

    在上述代码中,重点关注以下方法:

    • decode():这是必须要实现的抽象方法,将入站的 ByteBuf 转换为指定的消息格式,并将其转发到管道中的下一个ChannelInboundHandler。
    • encode():该方法是开发者需要实现的抽象方法。对于每个被编码并写入出站 ByteBuf 的消息来说,这个方法都将会被调用。
    • decodeLast():Netty 提供的这个默认实现只是简单地调用了decode()方法。当Channel的状态变为非活动时,这个方法将会被调用一次。可以重写该方法以提供特殊的处理。

    三、MessageToMessageCodec 抽象类

    MessageToMessageCodec 抽象类用于将消息实时编码/解码为消息的编解码器,可以将其视为MessageToMessageDecoder和MessageToMessageEncoder`的组合。

    MessageToMessageCodec 抽象类的核心源码如下:

    public abstract class MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN> extends ChannelDuplexHandler {
    
        private final MessageToMessageEncoder<Object> encoder = new MessageToMessageEncoder<Object>() {
    
            @Override
            public boolean acceptOutboundMessage(Object msg) throws Exception {
                return MessageToMessageCodec.this.acceptOutboundMessage(msg);
            }
    
            @Override
            @SuppressWarnings("unchecked")
            protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {
                MessageToMessageCodec.this.encode(ctx, (OUTBOUND_IN) msg, out);
            }
        };
    
        private final MessageToMessageDecoder<Object> decoder = new MessageToMessageDecoder<Object>() {
    
            @Override
            public boolean acceptInboundMessage(Object msg) throws Exception {
                return MessageToMessageCodec.this.acceptInboundMessage(msg);
            }
    
            @Override
            @SuppressWarnings("unchecked")
            protected void decode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {
                MessageToMessageCodec.this.decode(ctx, (INBOUND_IN) msg, out);
            }
        };
    
        private final TypeParameterMatcher inboundMsgMatcher;
        private final TypeParameterMatcher outboundMsgMatcher;
    
        protected MessageToMessageCodec() {
            inboundMsgMatcher = TypeParameterMatcher.find(this, MessageToMessageCodec.class, "INBOUND_IN");
            outboundMsgMatcher = TypeParameterMatcher.find(this, MessageToMessageCodec.class, "OUTBOUND_IN");
        }
    
     
        protected MessageToMessageCodec(
                Class<? extends INBOUND_IN> inboundMessageType, Class<? extends OUTBOUND_IN> outboundMessageType) {
            inboundMsgMatcher = TypeParameterMatcher.get(inboundMessageType);
            outboundMsgMatcher = TypeParameterMatcher.get(outboundMessageType);
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            decoder.channelRead(ctx, msg);
        }
    
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            encoder.write(ctx, msg, promise);
        }
    
        public boolean acceptInboundMessage(Object msg) throws Exception {
            return inboundMsgMatcher.match(msg);
        }
    
        public boolean acceptOutboundMessage(Object msg) throws Exception {
            return outboundMsgMatcher.match(msg);
        }
    
        protected abstract void encode(ChannelHandlerContext ctx, OUTBOUND_IN msg, List<Object> out)
                throws Exception;
    
        protected abstract void decode(ChannelHandlerContext ctx, INBOUND_IN msg, List<Object> out)
                throws Exception;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69

    在上述代码中,重点关注以下几个方法:

    • decode():这是必须要实现的抽象方法,将入站消息(INBOUND_IN类型)解码为消息,这些消息将转发到ChannelPipeline中的下一个ChannelInboundHandler。

    • encode():该方法是开发者需要实现的抽象方法。将出站消息(OUTBOUND_IN类型)编码为消息,然后将消息转发到ChannelPipeline中的下一个ChannelOutboundHandler。

    请注意,如果消息是ReferenceCounted类型,则需要对刚刚通过的消息调用ReferenceCounted.ratain()。这个调用是必须的,因为MessageToMessageCodec将在编码/解码的消息上调用ReferenceCounted.ralease()。

    以下是 MessageToMessageCodec 的示例,将 Integer 解码为 Long,然后将 Long 编码为 Integer。

    public class NumberCodec extends MessageToMessageCodec<Integer,Long> {
    
        @Override
        protected void decode(ChannelHandlerContext ctx, Integer msg, List<Object> out) throws Exception {
                out.add(msg.longValue());
        }
    
        @Override
        protected void encode(ChannelHandlerContext ctx, Long msg, List<Object> out) throws Exception {
                out.add(msg.intValue())
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    四、ChannelDuplexHandler 类

    观察ByteToMessageCodec和MessageToMessageCodec 的源码,发现他们都继承自ChannelDuplexHandler 类。ChannelDuplexHandler 类是ChannelHandler的一个实现,表示 ChannelInboundHandler和 ChannelOutboundHandler的组合。如果ChannelHandler的实现需要拦截操作及状态更新,则这个 ChannelDuplexHandler 类会是一个很好的起点。

    public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implements ChannelOutboundHandler {...}
    
    • 1

    五、CombinedChannelDuplexHandler 类

    CombinedChannelDuplexHandler 类是 ChannelDuplexHandler 类的子类,用于将ChannelInboundHandler和 ChannelOutboundHandler组合到一个ChannelHandler中去。
    在前面的文章示例中,编码器和解码器都是分开实现的。在不动现有代码的基础上,可以使用CombinedChannelDuplexHandler 类轻松实现一个编解码器,唯一要做的就是通过 CombinedChannelDuplexHandler 类来对解码器和编码器进行组合。
    例如,有一个解码器ByteToCharDecoder,代码如下:

    public class ByteToCharDecoder extends ByteToMessageDecoder {
    
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            if(in.readableBytes() >= 2){
                out.add(in.readChar());
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    ByteToCharDecoder用于从 ByteBuf 中提取 2 个字节长度的字符,并将它们作为 char 写入到 List 中,将会被自动装箱为 Character 对象。
    编码器 CharToByteEncoder的代码如下:

    public class CharToByteEncoder extends MessageToByteEncoder<Character> {
    
        @Override
        protected void encode(ChannelHandlerContext ctx, Character msg, ByteBuf out) throws Exception {
            out.writeChar(msg);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    CharToByteEncoder 编码 char 消息到 ByteBuf。
    现在有了编码器和解码器了,需要将它们组成一个编解码器。CombinedByteCharCodec代码如下:

    public class CombinedByteCharCodec extends CombinedChannelDuplexHandler<ByteToCharDecoder, CharToByteEncoder> {
    
        public CombinedByteCharCodec(){
            super(new ByteToCharDecoder(),new CharToByteEncoder());
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    CombinedByteCharCodec的参数是解码器和编码器,通过父类的构造器函数使它们结合起来。用上述方式组合编码器和解码器,使程序更简单、更灵活,避免编写多个编解码器类。
    当然,是否使用CombinedByteCharCodec取决于具体的项目风格,没有绝对的好坏。

    总结

    通过以上对于编解码器的分析,相信小伙伴们对于编码器、解码器以及编解码器都有所了解了,下节我们就自己来实现一个自定义的编解码器。

  • 相关阅读:
    《Docker极简教程》--Docker镜像--Docker镜像的管理
    分布式系统中进程和线程简介
    从零开始的C++(九)
    C++类与动态内存分配
    【数据分享】2022年全国各城市公交线路与站点数据
    我所遇到的web前端最常见的面试 - 后续不断更新
    多尺度特征融合
    【学习笔记】《Python深度学习》第五章:深度学习用于计算机视觉
    Android音乐播放器(三)轮播图
    centos docker下安装Minio服务
  • 原文地址:https://blog.csdn.net/u011397981/article/details/130905576