• 【Netty】Netty 编码器(十三)


    前言

    回顾Netty系列文章:

    编码器就是用来把出站数据从一种格式转换到另外一种格式,因此它实现了ChannelOutboundHandler,类似于解码器,Netty 也提供了一组类来帮助开发者快速上手编码器,当然,这些类提供的是与解码器相反的方法,如下所示:

    • 编码从消息到字节(MessageToByteEncoder)。
    • 编码从消息到消息(MessageToMessageEncoder)。

    一、MessageToByteEncoder 抽象类

    在上一篇文章中,我们知道了如何使用ByteToMessageDecoder来将字节转换成消息,现在可以使用MessageToByteEncoder实现相反的效果。

    MessageToByteEncoder 核心代码如下:

    public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter {
        private final TypeParameterMatcher matcher;
        private final boolean preferDirect;
    
        protected MessageToByteEncoder() {
            this(true);
        }
    
        protected MessageToByteEncoder(Class<? extends I> outboundMessageType) {
            this(outboundMessageType, true);
        }
    
        protected MessageToByteEncoder(boolean preferDirect) {
            this.matcher = TypeParameterMatcher.find(this, MessageToByteEncoder.class, "I");
            this.preferDirect = preferDirect;
        }
    
        protected MessageToByteEncoder(Class<? extends I> outboundMessageType, boolean preferDirect) {
            this.matcher = TypeParameterMatcher.get(outboundMessageType);
            this.preferDirect = preferDirect;
        }
    
        public boolean acceptOutboundMessage(Object msg) throws Exception {
            return this.matcher.match(msg);
        }
    
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            ByteBuf buf = null;
    
            try {
                if (this.acceptOutboundMessage(msg)) {
                    I cast = msg;
                    buf = this.allocateBuffer(ctx, msg, this.preferDirect);
    
                    try {
                        this.encode(ctx, cast, buf);
                    } finally {
                        ReferenceCountUtil.release(msg);
                    }
    
                    if (buf.isReadable()) {
                        ctx.write(buf, promise);
                    } else {
                        buf.release();
                        ctx.write(Unpooled.EMPTY_BUFFER, promise);
                    }
    
                    buf = null;
                } else {
                    ctx.write(msg, promise);
                }
            } catch (EncoderException var17) {
                throw var17;
            } catch (Throwable var18) {
                throw new EncoderException(var18);
            } finally {
                if (buf != null) {
                    buf.release();
                }
    
            }
    
        }
    
        protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, I msg, boolean preferDirect) throws Exception {
            return preferDirect ? ctx.alloc().ioBuffer() : ctx.alloc().heapBuffer();
        }
    
        protected abstract void encode(ChannelHandlerContext var1, I var2, ByteBuf var3) throws Exception;
    
        protected boolean isPreferDirect() {
            return this.preferDirect;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74

    在MessageToByteEncoder抽象类中,唯一要关注的是encode方法,该方法是开发者需要实现的唯一抽象方法。它与出站消息一起调用,将消息编码为ByteBuf,然后,将ByteBuf转发到ChannelPipeline中的下一个ChannelOutboundHandler。

    以下是MessageToByteEncoder 的使用示例:

    public class ShortToByteEncoder extends MessageToByteEncoder<Short> {
        @Override
        protected void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out) throws Exception {
            out.writeShort(msg);//将Short转成二进制字节流写入ByteBuf中
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    上述示例中,ShortToByteEncoder收到 Short 消息,编码它们,并把它们写入ByteBuf。然后,将ByteBuf转发到ChannelPipeline中的下一个ChannelOutboundHandler,每个 Short 将占有 ByteBuf 的两个字节。
    实现ShortToByteEncoder主要分为以下两步:

    • 实现继承自MessageToByteEncoder。
    • 写 Short 到 ByteBuf。

    上述的例子处理流程图如下:
    在这里插入图片描述

    Netty 也提供了很多MessageToByteEncoder类的子类来帮助开发者实现自己的编码器,例如:
    在这里插入图片描述

    二、MessageToMessageEncoder 抽象类

    MessageToMessageEncoder 抽象类用于将出站数据从一种消息转换为另一种消息。
    核心源码如下:

    public abstract class MessageToMessageEncoder<I> extends ChannelOutboundHandlerAdapter {
    
        private final TypeParameterMatcher matcher;
    
        /**
         * Create a new instance which will try to detect the types to match out of the type parameter of the class.
         */
        protected MessageToMessageEncoder() {
            matcher = TypeParameterMatcher.find(this, MessageToMessageEncoder.class, "I");
        }
    
        /**
         * Create a new instance
         *
         * @param outboundMessageType   The type of messages to match and so encode
         */
        protected MessageToMessageEncoder(Class<? extends I> outboundMessageType) {
            matcher = TypeParameterMatcher.get(outboundMessageType);
        }
    
        /**
         * Returns {@code true} if the given message should be handled. If {@code false} it will be passed to the next
         * {@link ChannelOutboundHandler} in the {@link ChannelPipeline}.
         */
        public boolean acceptOutboundMessage(Object msg) throws Exception {
            return matcher.match(msg);
        }
    
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            CodecOutputList out = null;
            try {
                if (acceptOutboundMessage(msg)) {
                    out = CodecOutputList.newInstance();
                    @SuppressWarnings("unchecked")
                    I cast = (I) msg;
                    try {
                        encode(ctx, cast, out);
                    } finally {
                        ReferenceCountUtil.release(cast);
                    }
    
                    if (out.isEmpty()) {
                        throw new EncoderException(
                                StringUtil.simpleClassName(this) + " must produce at least one message.");
                    }
                } else {
                    ctx.write(msg, promise);
                }
            } catch (EncoderException e) {
                throw e;
            } catch (Throwable t) {
                throw new EncoderException(t);
            } finally {
                if (out != null) {
                    try {
                        final int sizeMinusOne = out.size() - 1;
                        if (sizeMinusOne == 0) {
                            ctx.write(out.getUnsafe(0), promise);
                        } else if (sizeMinusOne > 0) {
                            // Check if we can use a voidPromise for our extra writes to reduce GC-Pressure
                            // See https://github.com/netty/netty/issues/2525
                            if (promise == ctx.voidPromise()) {
                                writeVoidPromise(ctx, out);
                            } else {
                                writePromiseCombiner(ctx, out, promise);
                            }
                        }
                    } finally {
                        out.recycle();
                    }
                }
            }
        }
    
        private static void writeVoidPromise(ChannelHandlerContext ctx, CodecOutputList out) {
            final ChannelPromise voidPromise = ctx.voidPromise();
            for (int i = 0; i < out.size(); i++) {
                ctx.write(out.getUnsafe(i), voidPromise);
            }
        }
    
        private static void writePromiseCombiner(ChannelHandlerContext ctx, CodecOutputList out, ChannelPromise promise) {
            final PromiseCombiner combiner = new PromiseCombiner(ctx.executor());
            for (int i = 0; i < out.size(); i++) {
                combiner.add(ctx.write(out.getUnsafe(i)));
            }
            combiner.finish(promise);
        }
    
    
        protected abstract void encode(ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93

    同MessageToByteEncoder抽象类一样,MessageToMessageEncoder 唯一要关注的也是encode方法,该方法是开发者需要实现的唯一抽象方法。对于使用write()编写的每条消息,都会调用该消息,以将消息编码为一个或多个新的出站消息,然后将编码后的消息转发。
    下面是使用 MessageToMessageEncoder 的一个例子:

    public class IntegerToStringEncoder extends MessageToMessageEncoder <Integer> {
        @Override
        protected void encode(ChannelHandlerContext ctx, Integer msg, List<Object> out) throws Exception {
            out.add(String.ValueOf(msg));
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    上述示例将 Integer 消息编码为 String 消息,主要分为两步:

    实现继承自MessageToMessageEncoder。
    将Integer 转为 String,并添加到 MessageBuf。

    上述例子的处理流程如下图所示:

    在这里插入图片描述

    总结

    通过上述文章的讲解,我们对于编码器和解码器应该都有了一定的认识,其实针对编码和解码,Netty 还提供了第三种方式,那就是编解码器。下节我们就来讲解一下。

  • 相关阅读:
    CSS中如何实现文字描边效果(Text Stroke)?
    Review-MyBatis
    在IOS 的开发中iBeacon和BLE的区别
    用moment插件分别取时间戳的年、月、日、时、分、秒
    101-视频与网络应用篇-教程内容
    【JavaEE基础与高级 第61章】Java中的注解和元注解的介紹使用、注解解析
    软件安装攻略:EmEditor编辑器下载安装与使用
    LeetCode50天刷题计划(Day 18—— 搜索旋转排序数组(8.50-12.00)
    Mysql中的目录和文件详解
    N皇后问题
  • 原文地址:https://blog.csdn.net/u011397981/article/details/130897340