• 【Netty】自定义解码器、编码器、编解码器(十五)


    前言

    回顾Netty系列文章:

    我们今天继续来分析 Netty 的编解码器,这次我们要自己动手实现自定义的编码器、解码器和编解码器。
    废话不多说,进入正文吧。

    一、自定义基于换行的解码器

    1.1 LineBasedFrameDecoder 类

    LineBasedFrameDecoder 类是基于换行的,意味着只要在接收数据时遇到以换行符\n或者回车换行符\r\n结尾时,就表明数据已经接收完成可以被处理了。

    LineBasedFrameDecoder 类继承自 ByteToMessageDecoder,并重写了 decode方法。

    public class LineBasedFrameDecoder extends ByteToMessageDecoder {
    
        /** 帧的最大长度限制  */
        private final int maxLength;
        /** 帧超长时是否抛出异常 */
        private final boolean failFast;
        private final boolean stripDelimiter;
    
        /** 如果超出长度则为True,表明需要丢弃输入的数据  */
        private boolean discarding;
        private int discardedBytes;
    
        /** 最后扫描的位置 */
        private int offset;
    
      
        public LineBasedFrameDecoder(final int maxLength) {
            this(maxLength, true, false);
        }
    
        public LineBasedFrameDecoder(final int maxLength, final boolean stripDelimiter, final boolean failFast) {
            this.maxLength = maxLength;
            this.failFast = failFast;
            this.stripDelimiter = stripDelimiter;
        }
    
        @Override
        protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            Object decoded = decode(ctx, in);
            if (decoded != null) {
                out.add(decoded);
            }
        }
    
        protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
            final int eol = findEndOfLine(buffer);
            if (!discarding) {
                if (eol >= 0) {
                    final ByteBuf frame;
                    final int length = eol - buffer.readerIndex();
                    final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;
    
                    if (length > maxLength) {
                        buffer.readerIndex(eol + delimLength);
                        fail(ctx, length);
                        return null;
                    }
    
                    if (stripDelimiter) {
                        frame = buffer.readRetainedSlice(length);
                        buffer.skipBytes(delimLength);
                    } else {
                        frame = buffer.readRetainedSlice(length + delimLength);
                    }
    
                    return frame;
                } else {
                    final int length = buffer.readableBytes();
                    if (length > maxLength) {
                        discardedBytes = length;
                        buffer.readerIndex(buffer.writerIndex());
                        discarding = true;
                        offset = 0;
                        if (failFast) {
                            fail(ctx, "over " + discardedBytes);
                        }
                    }
                    return null;
                }
            } else {
                if (eol >= 0) {
                    final int length = discardedBytes + eol - buffer.readerIndex();
                    final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;
                    buffer.readerIndex(eol + delimLength);
                    discardedBytes = 0;
                    discarding = false;
                    if (!failFast) {
                        fail(ctx, length);
                    }
                } else {
                    discardedBytes += buffer.readableBytes();
                    buffer.readerIndex(buffer.writerIndex());
                    // 跳过缓冲区中的所有内容,需要再次将offset 设置为0
                    offset = 0;
                }
                return null;
            }
        }
    
        private void fail(final ChannelHandlerContext ctx, int length) {
            fail(ctx, String.valueOf(length));
        }
    
        private void fail(final ChannelHandlerContext ctx, String length) {
            ctx.fireExceptionCaught(
                    new TooLongFrameException(
                            "frame length (" + length + ") exceeds the allowed maximum (" + maxLength + ')'));
        }
    
        /**
         * 返回找到的行尾缓冲区的索引
         * 如果在缓冲区中未找到行尾,则返回 -1
         */
        private int findEndOfLine(final ByteBuf buffer) {
            int totalLength = buffer.readableBytes();
            int i = buffer.forEachByte(buffer.readerIndex() + offset, totalLength - offset, ByteProcessor.FIND_LF);
            if (i >= 0) {
                offset = 0;
                
                // 判断是否是回车符
                if (i > 0 && buffer.getByte(i - 1) == '\r') {
                    i--;
                }
            } else {
                offset = totalLength;
            }
            return i;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119

    从上述代码可以看出,LineBasedFrameDecoder是通过查找回车换行符来找到数据结束的标志的。

    1.2 定义解码器

    定义了解码器MyLineBasedFrameDecoder,该解码器继承自LineBasedFrameDecoder,因此可以使用LineBasedFrameDecoder上的所有功能。
    代码如下:

    public class MyLineBasedFrameDecoder extends LineBasedFrameDecoder {
    
    	private final static int MAX_LENGTH = 1024; // 帧的最大长度
    
    	public MyLineBasedFrameDecoder() {
    		super(MAX_LENGTH);
    	}
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    在上述代码中,通过MAX_LENGTH常量,来限制解码器帧的大小。超过该常量值的限制的话,则会抛出TooLongFrameException异常。

    1.3 定义 ChannelHandler

    ChannelHandler 定义如下:

    public class MyLineBasedFrameDecoderServerHandler extends ChannelInboundHandlerAdapter {
    
    	@Override
    	public void channelRead(ChannelHandlerContext ctx, Object msg) {
    		// 接收msg消息,此处已经无需解码了
    		System.out.println("Client -> Server: " + msg);
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    MyLineBasedFrameDecoderServerHandler业务非常简单,把收到的消息打印出来即可。

    1.4 定义 ChannelInitializer

    定义一个 ChannelInitializer 用于容纳解码器 MyLineBasedFrameDecoder 和 MyLineBasedFrameDecoderServerHandler,代码如下:

    public class MyLineBasedFrameDecoderChannelInitializer extends ChannelInitializer<SocketChannel> {
    
    	@Override
    	protected void initChannel(SocketChannel channel) {
    		// 基于换行符号
    		channel.pipeline().addLast(new MyLineBasedFrameDecoder());
    
    		// 解码转String
    		channel.pipeline().addLast(new StringDecoder());
    
    		// 自定义ChannelHandler
    		channel.pipeline().addLast(new MyLineBasedFrameDecoderServerHandler());
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    注意添加到ChannelPipeline的ChannelHandler的顺序,MyLineBasedFrameDecoder 在前,MyLineBasedFrameDecoderServerHandler 在后,意味着数据先经过MyLineBasedFrameDecoder 解码,然后再交给MyLineBasedFrameDecoderServerHandler 处理。
    StringDecoder实现将数据转换为字符串。

    1.5 编写服务器

    定义服务器 MyLineBasedFrameDecoderServer代码如下:

    public class MyLineBasedFrameDecoderServer {
    
        public static int DEFAULT_PORT = 8023;
    
        public static void main(String[] args) throws Exception {
            int port = DEFAULT_PORT;
    
            // 多线程事件循环器
            EventLoopGroup bossGroup = new NioEventLoopGroup(1); // boss
            EventLoopGroup workerGroup = new NioEventLoopGroup(); // worker
    
            try {
                // 启动NIO服务的引导程序类
                ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workerGroup) // 设置EventLoopGroup
                        .channel(NioServerSocketChannel.class) // 指明新的Channel的类型
                        .childHandler(new MyLineBasedFrameDecoderChannelInitializer()) // 指定ChannelHandler
                        .option(ChannelOption.SO_BACKLOG, 128) // 设置的ServerChannel的一些选项
                        .childOption(ChannelOption.SO_KEEPALIVE, true); // 设置的ServerChannel的子Channel的选项
                // 绑定端口,开始接收进来的连接
                ChannelFuture f = b.bind(port).sync();
                System.out.println("MyLineBasedFrameDecoderServer已启动,端口:" + port);
                // 等待服务器 socket 关闭 。
                // 在这个例子中,这不会发生,但你可以优雅地关闭你的服务器。
                f.channel().closeFuture().sync();
            } finally {
                // 优雅的关闭
                workerGroup.shutdownGracefully();
                bossGroup.shutdownGracefully();
            }
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    MyLineBasedFrameDecoderServer 中唯一需要注意的是,在 ServerBootstrap 中指定MyLineBasedFrameDecoderChannelInitializer,这样服务器就能应用咱们自定义的编码器和ChannelHandler了。

    1.6 编写客户端

    为了测试服务器,编写了一个简单的 TCP 客户端,代码如下:

    public class TcpClient {
    
        public static void main(String[] args) throws IOException {
            Socket socket = null;
            OutputStream out = null;
            try {
                socket = new Socket("localhost", 8023);
                out = socket.getOutputStream();
                // 请求服务器  
                String lines = "床前明月光\r\n疑是地上霜\r\n举头望明月\r\n低头思故乡\r\n";
                byte[] outputBytes = lines.getBytes("UTF-8");
                out.write(outputBytes);
                out.flush();
            } finally {
                // 关闭连接  
                out.close();
                socket.close();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    上述客户端在启动后会发送一段文本,而后关闭连接。该文本每句用回车换行符\r\n结尾,这样服务器就能一句一句地解析了。

    1.7 测试

    先启动服务器,观察控制台,可以看到如下输出的内容:

    MyLineBasedFrameDecoderServer已启动,端口:8023
    
    • 1

    然后启动客户端。启动完成之后,再次观察服务器的控制台,可以看到如下输出内容:

    MyLineBasedFrameDecoderServer已启动,端口:8023
    Client -> Server: 床前明月光
    Client -> Server: 疑是地上霜
    Client -> Server: 举头望明月
    Client -> Server: 低头思故乡
    
    • 1
    • 2
    • 3
    • 4
    • 5

    上述的输出内容说明,MyLineBasedFrameDecoderServerHandler接收到了 4 条数据。那么为啥客户端发送了 1 条数据,到这里就变成了 4 条了呢?这是因为在前面介绍的MyLineBasedFrameDecoderChannelInitializer中,MyLineBasedFrameDecoder先被添加到ChannelPipeline,然后才添加到MyLineBasedFrameDecoderServerHandler,意味着数据先经过解码,再交给MyLineBasedFrameDecoderServerHandler处理,而在数据解码过程中,MyLineBasedFrameDecoderServerHandler是按照换行解码的,而客户端所发送的数据里面又包含了 4 个回车换行符,因此,数据被解码为了 4 条。

    二、自定义编码器

    2.1 定义消息通信协议

    消息通信协议是连接客户端和服务器的密语,只有熟知双方的通信协议,客户端和服务器才能正常识别消息的内容。常见的消息通信协议有 HTTP、MQTT、XMPP、STOMP、AMQP和 RTMP等。

    下面展示了消息通信协议的内容格式:

    类型名称字节序列取值范围备注
    消息头msgType00x00-0xff消息类型
    消息头len1-40-2147483647消息体长度
    消息体body变长0-消息体

    从上述协议中可以看出,消息主要是由消息头和消息体组成,并说明如下:

    • msgType 表示消息的类型。在本节示例中,请求用EMGW_LOGIN_REQ(0x00),响应用EMGW_LOGIN_RES(0x01)表示。
    • len 表示消息体的长度。
    • body 表示消息体。

    定义了如下MsgType枚举类型来表示消息类型:

    public enum MsgType {
        EMGW_LOGIN_REQ((byte) 0x00),
        EMGW_LOGIN_RES((byte) 0x01);
    
        private byte value;
    
        public byte getValue() {
            return value;
        }
    
        private MsgType(byte value) {
            this.value = value;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    消息头类 MsgHeader定义如下:

    public class MsgHeader {
        private byte msgType; // 消息类型
        private int len; // 长度
    
        public MsgHeader() {
        }
    
        public MsgHeader(byte msgType, int len) {
            this.msgType = msgType;
            this.len = len;
        }
    
        public byte getMsgType() {
            return msgType;
        }
    
        public void setMsgType(byte msgType) {
            this.msgType = msgType;
        }
    
        public int getLen() {
            return len;
        }
    
        public void setLen(int len) {
            this.len = len;
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    消息类 Msg 定义如下:

    public class Msg {
    
        private MsgHeader msgHeader = new MsgHeader();
        private String body;
    
        public MsgHeader getMsgHeader() {
            return msgHeader;
        }
    
        public void setMsgHeader(MsgHeader msgHeader) {
            this.msgHeader = msgHeader;
        }
    
        public String getBody() {
            return body;
        }
    
        public void setBody(String body) {
            this.body = body;
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    2.2 定义编码器

    public class MyEncoder extends MessageToByteEncoder<Msg> {
    
    	@Override
    	protected void encode(ChannelHandlerContext ctx, Msg msg, ByteBuf out) throws Exception {
    		if (msg == null | msg.getMsgHeader() == null) {
    			throw new Exception("The encode message is null");
    		}
    
    		// 获取消息头
    		MsgHeader header = msg.getMsgHeader();
    
    		// 获取消息体
    		String body = msg.getBody();
    		byte[] bodyBytes = body.getBytes(Charset.forName("utf-8"));
    
    		// 计算消息体的长度
    		int bodySize = bodyBytes.length;
    
    		System.out.printf("MyEncoder header: %s, body: %s", header.getMsgType(), body);
    
    		out.writeByte(MsgType.EMGW_LOGIN_RES.getValue());
    		out.writeInt(bodySize);
    		out.writeBytes(bodyBytes);
    	}
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    MyEncoder会将 Msg 消息转为 ByteBuf 类型。

    2.3 定义解码器

    public class MyDecoder extends LengthFieldBasedFrameDecoder {
    	
    	private static final int MAX_FRAME_LENGTH = 1024 * 1024;
    	private static final int LENGTH_FIELD_LENGTH = 4;
    	private static final int LENGTH_FIELD_OFFSET = 1;
    	private static final int LENGTH_ADJUSTMENT = 0;
    	private static final int INITIAL_BYTES_TO_STRIP = 0;
    	
    	private static final int HEADER_SIZE = 5;
    	private byte msgType; // 消息类型
    	private int len; // 长度
    
    	public MyDecoder() {
    		super(MAX_FRAME_LENGTH,
    				LENGTH_FIELD_OFFSET, LENGTH_FIELD_LENGTH,
    				LENGTH_ADJUSTMENT, INITIAL_BYTES_TO_STRIP);
    	}
    
    	@Override
    	protected Msg decode(ChannelHandlerContext ctx, ByteBuf in2) throws Exception {
    		ByteBuf in = (ByteBuf) super.decode(ctx, in2);
    		if (in == null) {
    			return null;
    		}
    
    		// 校验头长度
    		if (in.readableBytes() < HEADER_SIZE) {
    			return null;
    		}
    
    		msgType = in.readByte();
    		len = in.readInt();
    
    		// 校验消息体长度
    		if (in.readableBytes() < len) {
    			return null;
    		}
    
    		ByteBuf buf = in.readBytes(len);
    		byte[] req = new byte[buf.readableBytes()];
    		buf.readBytes(req);
    		String body = new String(req, "UTF-8");
    		
    		// ByteBuf转为Msg类型
    		Msg msg = new Msg();
    		MsgHeader msgHeader = new MsgHeader(msgType, len);
    		msg.setBody(body);
    		msg.setMsgHeader(msgHeader);
    		return msg;
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51

    MyDecoder集成自 Netty 内嵌的解码器LengthFieldBasedFrameDecoder。LengthFieldBasedFrameDecoder是一种基于灵活长度的解码器。在数据包中,加了一个长度字段,保存上层包的长度。解码时,会按照这个长度,进行上层 ByteBuf 应用包的提取。其中,初始化LengthFieldBasedFrameDecoder时,需要指定以下参数:

    • maxFrameLength:发送数据包最大的长度。
    • lengthFieldOffset:长度域偏移量,指的是长度域位于整个数据包字节数组中的下标。
    • lengthFieldLength:长度域的字节长度。
    • lengthAdjustment:长度域的偏移量矫正。
    • initialBytesToStrip:丢弃的初始字节数。丢弃处于有效数据前面的字节数量。

    2.4 定义服务器 ChannelHandler

    public class MyServerHandler extends SimpleChannelInboundHandler<Object> {
    
    	@Override
    	protected void channelRead0(ChannelHandlerContext ctx, Object obj) throws Exception {
    		Channel incoming = ctx.channel();
    
    		if (obj instanceof Msg) {
    			Msg msg = (Msg) obj;
    			System.out.println("Client->Server:" + incoming.remoteAddress() + msg.getBody());
    			incoming.write(obj);
    		}
    	}
    
    	@Override
    	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    		ctx.flush();
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    MyServerHandler逻辑比较简单,只是把收到的消息内容打印出来。

    2.5 定义客户端 ChannelHandler

    public class MyClientHandler extends SimpleChannelInboundHandler<Object> {
    
    	@Override
    	protected void channelRead0(ChannelHandlerContext ctx, Object obj) throws Exception {
    		Channel incoming = ctx.channel();
    
    		if (obj instanceof Msg) {
    			Msg msg = (Msg) obj;
    			System.out.println("Server->Client:" + incoming.remoteAddress() + msg.getBody());
    		} else {
    			System.out.println("Server->Client:" + incoming.remoteAddress() + obj.toString());
    		}
    	}
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    MyClientHandler逻辑比较简单,只是把收到的消息内容打印出来。

    2.6定义服务器的主程序

    public class MyServer {
    
    	private int port;
    
    	public MyServer(int port) {
    		this.port = port;
    	}
    
    	public void run() throws Exception {
    		EventLoopGroup bossGroup = new NioEventLoopGroup();
    		EventLoopGroup workerGroup = new NioEventLoopGroup();
    		try {
    			ServerBootstrap b = new ServerBootstrap();
    			b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
    					.childHandler(new ChannelInitializer<SocketChannel>() {
    						@Override
    						public void initChannel(SocketChannel ch) throws Exception {
    							ch.pipeline().addLast("decoder", new MyDecoder());
    							ch.pipeline().addLast("encoder", new MyEncoder());
    							ch.pipeline().addLast(new MyServerHandler());
    						}
    					}).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);
    
    			ChannelFuture f = b.bind(port).sync();
    
    			System.out.println("Server start listen at " + port);
    
    			f.channel().closeFuture().sync();
    
    		} finally {
    			workerGroup.shutdownGracefully();
    			bossGroup.shutdownGracefully();
    		}
    	}
    
    	public static void main(String[] args) throws Exception {
    		int port;
    		if (args.length > 0) {
    			port = Integer.parseInt(args[0]);
    		} else {
    			port = 8082;
    		}
    		new MyServer(port).run();
    	}
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    注意添加到ChannelPipeline的ChannelHandler的顺序,MyDecoder在前,MyEncoder在后,业务处理的MyServerHandler在最后。

    2.7 定义客户端主程序

    public class MyClient {
    
    	private String host;
    	private int port;
    
    	public MyClient(String host, int port) {
    		this.host = host;
    		this.port = port;
    	}
    
    	public void run() throws InterruptedException {
    
    		EventLoopGroup workerGroup = new NioEventLoopGroup();
    
    		try {
    			Bootstrap b = new Bootstrap();
    			b.group(workerGroup);
    			b.channel(NioSocketChannel.class);
    			b.option(ChannelOption.SO_KEEPALIVE, true);
    			b.handler(new ChannelInitializer<SocketChannel>() {
    				@Override
    				public void initChannel(SocketChannel ch) throws Exception {
    					ch.pipeline().addLast("decoder", new MyDecoder());
    					ch.pipeline().addLast("encoder", new MyEncoder());
    					ch.pipeline().addLast(new MyClientHandler());
    
    				}
    			});
    
    			// 启动客户端
    			ChannelFuture f = b.connect(host, port).sync();
    
    			while (true) {
    
    				// 发送消息给服务器
    				Msg msg = new Msg();
    				MsgHeader msgHeader = new MsgHeader();
    				msgHeader.setMsgType(MsgType.EMGW_LOGIN_REQ.getValue());
    				String body = "床前明月光,疑是地上霜。";
    
    				byte[] bodyBytes = body.getBytes(Charset.forName("utf-8"));
    				int bodySize = bodyBytes.length;
    				msgHeader.setLen(bodySize);
    				msg.setMsgHeader(msgHeader);
    				msg.setBody(body);
    
    				f.channel().writeAndFlush(msg);
    				Thread.sleep(2000);
    			}
    		} finally {
    			workerGroup.shutdownGracefully();
    		}
    	}
    
    	public static void main(String[] args) throws InterruptedException {
    		new MyClient("localhost", 8082).run();
    	}
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59

    注意添加到ChannelPipeline的ChannelHandler的顺序,MyDecoder在前,MyEncoder在后,业务处理的MyClientHandler在最后。
    上述的客户端程序,会每隔 2 秒给服务器发送一条消息。

    2.8 测试

    分别运行服务器和客户端程序。
    客户端输出如下:
    arduino复制代码MyEncoder header: 0, body: 床前明月光,疑是地上霜。Server->Client:localhost/127.0.0.1:8082床前明月光,疑是地上霜。
    MyEncoder header: 0, body: 床前明月光,疑是地上霜。Server->Client:localhost/127.0.0.1:8082床前明月光,疑是地上霜。
    MyEncoder header: 0, body: 床前明月光,疑是地上霜。Server->Client:localhost/127.0.0.1:8082床前明月光,疑是地上霜。
    MyEncoder header: 0, body: 床前明月光,疑是地上霜。Server->Client:localhost/127.0.0.1:8082床前明月光,疑是地上霜。
    MyEncoder header: 0, body: 床前明月光,疑是地上霜。Server->Client:localhost/127.0.0.1:8082床前明月光,疑是地上霜。
    MyEncoder header: 0, body: 床前明月光,疑是地上霜。Server->Client:localhost/127.0.0.1:8082床前明月光,疑是地上霜。

    服务端输出如下:
    arduino复制代码Server start listen at 8082
    Client->Server:/127.0.0.1:62927床前明月光,疑是地上霜。
    MyEncoder header: 1, body: 床前明月光,疑是地上霜。Client->Server:/127.0.0.1:62927床前明月光,疑是地上霜。
    MyEncoder header: 1, body: 床前明月光,疑是地上霜。Client->Server:/127.0.0.1:62927床前明月光,疑是地上霜。
    MyEncoder header: 1, body: 床前明月光,疑是地上霜。Client->Server:/127.0.0.1:62927床前明月光,疑是地上霜。
    MyEncoder header: 1, body: 床前明月光,疑是地上霜。Client->Server:/127.0.0.1:62927床前明月光,疑是地上霜。
    MyEncoder header: 1, body: 床前明月光,疑是地上霜。Client->Server:/127.0.0.1:62927床前明月光,疑是地上霜。

    三、自定义编解码器

    前面我们实现了编码器 MyEncoder 和 解码器 MyDecoder。这些代码无须做任何改动。

    3.1 自定义编解码器

    使用CombinedChannelDuplexHandler 类对编码器 MyEncoder 和 解码器 MyDecoder进行组合。代码如下:

    public class MyCodec extends CombinedChannelDuplexHandler<MyDecoder, MyEncoder> {
    	public MyCodec() {
    		super(new MyDecoder(), new MyEncoder());
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    3.2 使用编解码器

    分别修改 MyServer 和 MyClient 类,添加编解码器,修改代码如下:

    // 添加编解码器
    ch.pipeline().addLast("codec", new MyCodec());
    
    • 1
    • 2

    上述代码将原来的 MyEncoder 和 MyDecoder从ChannelPipeline中剔除掉了,取而代之是MyEncoder。

    3.3 测试

    分别运行服务器和客户端。
    客户端输出如下:

    arduino复制代码MyEncoder header: 0, body: 床前明月光,疑是地上霜。Server->Client:localhost/127.0.0.1:8082床前明月光,疑是地上霜。
    MyEncoder header: 0, body: 床前明月光,疑是地上霜。Server->Client:localhost/127.0.0.1:8082床前明月光,疑是地上霜。
    MyEncoder header: 0, body: 床前明月光,疑是地上霜。Server->Client:localhost/127.0.0.1:8082床前明月光,疑是地上霜。
    MyEncoder header: 0, body: 床前明月光,疑是地上霜。Server->Client:localhost/127.0.0.1:8082床前明月光,疑是地上霜。
    MyEncoder header: 0, body: 床前明月光,疑是地上霜。Server->Client:localhost/127.0.0.1:8082床前明月光,疑是地上霜。
    
    • 1
    • 2
    • 3
    • 4
    • 5

    服务端输出如下:

    Server start listen at 8082
    Client->Server:/127.0.0.1:56181床前明月光,疑是地上霜。
    MyEncoder header: 1, body: 床前明月光,疑是地上霜。Client->Server:/127.0.0.1:56181床前明月光,疑是地上霜。
    MyEncoder header: 1, body: 床前明月光,疑是地上霜。Client->Server:/127.0.0.1:56181床前明月光,疑是地上霜。
    MyEncoder header: 1, body: 床前明月光,疑是地上霜。Client->Server:/127.0.0.1:56181床前明月光,疑是地上霜。
    MyEncoder header: 1, body: 床前明月光,疑是地上霜。Client->Server:/127.0.0.1:56181床前明月光,疑是地上霜。
    MyEncoder header: 1, body: 床前明月光,疑是地上霜。
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    总结

    以上就是关于一些自定义的编码器、解码器、编解码器的示例应用,我们下节继续深入 Netty 源码。

  • 相关阅读:
    无胁科技-TVD每日漏洞情报-2022-11-11
    项目部署spring
    mac安装zookeeper
    什么是BeanDefination
    让代码变得优雅简洁的神器:Java8 Stream流式编程
    大三第十一周学习笔记
    机器学习 | 模型评估和选择 各种评估指标总结——错误率精度-查准率查全率-真正例率假正例率 PR曲线ROC曲线
    week9|查阅文章 Mac os M2芯片 arm64架构|安装paddlepaddle问题
    超级无敌详细使用ubuntu搭建hadoop完全分布式集群
    关键任务:网络安全事件中真正重要的是什么
  • 原文地址:https://blog.csdn.net/u011397981/article/details/130906350