• Netty粘包拆包、自定义Protostuff编解码器


    1、Netty粘包拆包

    TCP是一个“流”协议,所谓流,就是没有界限的一长串二进制数据。TCP作为传输层协议并不了解上层业务数据的具体含义,会根据TCP缓冲区的实际情况进行数据包的划分;所以在业务上认为是一个完整的包,可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题。

    例如,TCP缓冲区是1024个字节大小,如果应用一次请求发送的数据量比较小,没达到缓冲区大小,TCP则会将多个请求合并为同一个请求进行发送,这就形成了粘包问题;如果应用一次请求发送的数据量比较大,超过了缓冲区大小,TCP就会将其拆分为多次发送,这就是拆包,也就是将一个大的包拆分为多个小包进行发送。

    半包的主要原因:

    • 发送方写入数据 > 套接字缓冲区大小
    • 发送的数据大于协议的 MTU(Maximum Transmission Unit,最大传输单元),必须拆包

    粘包的主要原因:

    • 发送方每次写入数据 < 套接字缓冲区大小
    • 接收方读取套接字缓冲区数据不够及时

    如下图所示,一次性发送服务端50条消息,客户端收到的消息
    在这里插入图片描述
    解决半包粘包问题的常用三种解码器(一次解码器:结果是字节)
    在这里插入图片描述

    二次解码器:将 Java 对象与字节流转化方便存储或传输

    • 一次解码器:ByteToMessageDecoder
      • io.netty.buffer.ByteBuf (原始数据流)-> io.netty.buffer.ByteBuf (用户数据)
    • 二次解码器:MessageToMessageDecoder
      • io.netty.buffer.ByteBuf (用户数据)-> Java Object
      • 常用的二次解码器,json、Protobuf、xml等

    1.1、LineBasedFrameDecoder(回车换行分包)

    Netty中,提供了一个开箱即用、使用换行符分割字符串的解码器——LineBasedFrameDecoder,它是一个最为基础的Netty内置解码器。这个解码器的工作原理,依次遍历ByteBuf数据包中的可读字节,判断在二进制字节流中是否存在换行符"\n"或者"\r\n"的字节码。如果有,就以此位置为结束位置,把从可读索引到结束位置之间的字节作为解码成功后的ByteBuf数据包。

    LineBasedFrameDecoder支持配置一个最大长度值,表示解码出来的ByteBuf能包含的最大字节数。如果连续读取到最大长度后仍然没有发现换行符,就会抛出异常。
    注意顺序
    在这里插入图片描述

    1.2、DelimiterBasedFrameDecoder(特殊分隔符分包)

    DelimiterBasedFrameDecoder解码器不仅可以使用换行符,还可以使用其他特殊字符作为数据包的分隔符,例如制表符"\t"。

    DelimiterBasedFrameDecoder与LineBasedFrameDecoder都支持配置一个最大长度值,同时还支持解码后的数据包是否去掉分隔符,分隔符
    在这里插入图片描述

    pipeline.addLast(new DelimiterBasedFrameDecoder(10240,false,Unpooled.copiedBuffer(“!”.getBytes(“UTF-8”))));

    构造方法

    public DelimiterBasedFrameDecoder(
    	int maxFrameLength, //解码的数据包的最大长度
    	Boolean stripDelimiter, //解码后的数据包是否去掉分隔符
    	ByteBuf delimiter //分隔符
    	) 
    
    • 1
    • 2
    • 3
    • 4
    • 5

    1.3、LengthFieldBasedFrameDecoder(固定长度字段报文来分包)

    LengthFieldBasedFrameDecoder可以翻译为“长度字段数据包解码器”。传输内容中的Length(长度)字段的值是指存放在数据包中要传输内容的字节数。

    消息分为两部分,一部分为消息头部,一部分为实际的消息体。其中消息头部是固定长度的,消息体是可变的,且消息头部一般会包含一个Length字段
    在这里插入图片描述

    LengthFieldBasedFrameDecoderLengthFieldPrepender需要配合起来使用,其实本质上来讲,这两者一个是解码,一个是编码的关系。

    • LengthFieldPrepender(如果我们在发送消息的时候采用:消息长度字段+原始消息的形式,我们就可以使用 LengthFieldPrepender。 LengthFieldPrepender 可以将待发送消息的长度(二进制字节长度)写到 ByteBuf 的前两个字节)

    原理:

    • 在生成的数据包中添加一个长度字段,用于记录当前数据包的长度。

    • LengthFieldBasedFrameDecoder会按照参数指定的包长度偏移量数据对接收到的数据进行解码,从而得到目标消息体数据;

    • LengthFieldPrepender则会在响应的数据前面添加指定的字节数据,这个字节数据中保存了当前消息体的整体字节数据长度。

    LengthFieldBasedFrameDecoder构造方法参数

    • maxFrameLength:发送数据包的最大长度
    • lengthFieldOffset:长度域的偏移量。长度域位于整个数据包字节数组中的开始下标。
    • lengthFieldLength:长度域的字节数长度。
    • lengthAdjustment:长度域的偏移量矫正。如果长度域的值,除了包含有效数据域的长度外,还包含了其他域(如长度域自身)长度,那么,就需要进行矫正。矫正的值为:包长 - 长度域的值 – 长度域偏移 – 长度域长。
    • initialBytesToStrip:丢弃的起始字节数。丢弃处于此索引值前面的字节。

    在这里插入图片描述

    ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2));
    // LengthFieldPrepender是一个编码器,主要是在响应字节数据前面添加字节长度字段
    ch.pipeline().addLast(new LengthFieldPrepender(2));

    2、自定义Protostuff编解码器

    protostuff是一个开箱即用的序列化库,使用非常简单。protostuff基于Google Protobuf,不需要自己写.proto文件即可实现对象的序列化与反序列化。

    相对json等文本序列化库,protostuff是二进制的,因此性能比json等方式高

    2.1、pom

     
     <dependency>
         <groupId>com.dyuproject.protostuffgroupId>
         <artifactId>protostuff-apiartifactId>
         <version>1.0.10version>
     dependency>
     <dependency>
         <groupId>com.dyuproject.protostuffgroupId>
         <artifactId>protostuff-coreartifactId>
         <version>1.0.10version>
     dependency>
     <dependency>
         <groupId>com.dyuproject.protostuffgroupId>
         <artifactId>protostuff-runtimeartifactId>
         <version>1.0.10version>
     dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    2.2、protostuff 序列化工具类,基于protobuf封装

    import com.dyuproject.protostuff.LinkedBuffer;
    import com.dyuproject.protostuff.ProtostuffIOUtil;
    import com.dyuproject.protostuff.Schema;
    import com.dyuproject.protostuff.runtime.RuntimeSchema;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    
    /**
     * protostuff 序列化工具类,基于protobuf封装
     */
    public class ProtostuffUtil {
    
        private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<Class<?>, Schema<?>>();
    
        private static <T> Schema<T> getSchema(Class<T> clazz) {
            @SuppressWarnings("unchecked")
            Schema<T> schema = (Schema<T>) cachedSchema.get(clazz);
            if (schema == null) {
                schema = RuntimeSchema.getSchema(clazz);
                if (schema != null) {
                    cachedSchema.put(clazz, schema);
                }
            }
            return schema;
        }
    
        /**
         * 序列化
         *
         * @param obj
         * @return
         */
        public static <T> byte[] serializer(T obj) {
            @SuppressWarnings("unchecked")
            Class<T> clazz = (Class<T>) obj.getClass();
            LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
            try {
                Schema<T> schema = getSchema(clazz);
                return ProtostuffIOUtil.toByteArray(obj, schema, buffer);
            } catch (Exception e) {
                throw new IllegalStateException(e.getMessage(), e);
            } finally {
                buffer.clear();
            }
        }
    
        /**
         * 反序列化
         *
         * @param data
         * @param clazz
         * @return
         */
        public static <T> T deserializer(byte[] data, Class<T> clazz) {
            try {
                T obj = clazz.newInstance();
                Schema<T> schema = getSchema(clazz);
                ProtostuffIOUtil.mergeFrom(data, obj, schema);
                return obj;
            } catch (Exception e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
        }
    
        public static void main(String[] args) {
            byte[] userBytes = ProtostuffUtil.serializer(new User(1, "zhangsan"));
            User user1 = ProtostuffUtil.deserializer(userBytes, User.class);
            System.out.println(user1);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70

    2.3、EchoResponse

    @Data
    public class EchoResponse {
        private String responseId;
        private Object responseObj;
        private Class<?> responseObjClass;
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    2.4、EchoRequest

    @Data
    public class EchoRequest {
        private String requestId;
        private Object requestObj;
        private Class<?> requestObjClass;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    User

    @Data
    public class User implements Serializable {
        private Integer age;
    
        private String name;
    
        public User(){
    
        }
        public User(Integer age, String name) {
            this.age = age;
            this.name = name;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    2.5、ProtostuffEncoder

    import com.example.netty.protostuff.ProtostuffUtil;
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.MessageToByteEncoder;
    
    /**
     * @author
     */
    public class ProtostuffEncoder extends MessageToByteEncoder<Object> {
    
        @Override
        protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
            // 直接生成序列化对象
            // 需要注意的是,使用protostuff序列化时,不需要知道pojo对象的具体类型也可以进行序列化时
            // 在反序列化时,只要提供序列化后的字节数组和原来pojo对象的类型即可完成反序列化
            byte[] array = ProtostuffUtil.serializer(msg);
            out.writeBytes(array);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    2.6、ProtostuffDecoder

    
    import com.example.netty.protostuff.ProtostuffUtil;
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.MessageToMessageDecoder;
    
    /**
     * PojoDecoder继承自Netty中的MessageToMessageDecoder类,
     * 并重写抽象方法decode(ChannelHandlerContext ctx, ByteBuf msg, List out)
     * 首先从数据报msg(数据类型取决于继承MessageToMessageDecoder时填写的泛型类型)中获取需要解码的byte数组
     * 然后调用使用序列化工具类将其反序列化(解码)为Object对象 将解码后的对象加入到解码列表out中,这样就完成了解码操作
     * @author
     */
    public class ProtostuffDecoder extends MessageToMessageDecoder<ByteBuf> {
    
        /**
         * 需要反序列对象所属的类型
         */
        private Class<?> genericClass;
    
        /**
         * 构造方法,传入需要反序列化对象的类型
         * @param genericClass
         */
        public ProtostuffDecoder(Class<?> genericClass) {
            this.genericClass = genericClass;
        }
    
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
            // ByteBuf的长度
            int length = msg.readableBytes();
            // 构建length长度的字节数组
            byte[] array = new byte[length];
            // 将ByteBuf数据复制到字节数组中
            msg.readBytes(array);
            // 反序列化对象
            Object obj = ProtostuffUtil.deserializer(array, this.genericClass);
            // 添加到反序列化对象结果列表
            out.add(obj);
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    2.7、NettyChatServer

    
    import com.example.netty.decode.ProtostuffDecoder;
    import com.example.netty.encode.ProtostuffEncoder;
    import com.example.netty.protostuff.EchoRequest;
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
    import io.netty.handler.codec.LengthFieldPrepender;
    
    import java.io.UnsupportedEncodingException;
    
    public class NettyChatServer {
    
        private static final int PORT = 8080;
    
        public static void main(String[] args) throws InterruptedException, UnsupportedEncodingException {
            final ServerHandler serverHandler = new ServerHandler();
            // 负责服务器通道新连接的IO事件的监听,但是不做任何处理
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            // 负责传输通道的IO事件的处理和数据传输(真正干活的)
            EventLoopGroup workerGroup = new NioEventLoopGroup();
    
            try{
                // Netty的服务引导类
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                // 设置反应器轮询组
                serverBootstrap.group(bossGroup , workerGroup)
                        // 设置nio类型的通道
                        .channel(NioServerSocketChannel.class)
                        .childOption(ChannelOption.SO_REUSEADDR,true)
                        // 设置监听端口
                        .localAddress(PORT)
                        // 装配子通道流水线
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            /**
                             * 有连接到达时会创建一个通道
                             * @param ch 通道
                             * @throws Exception
                             */
                            @Override
                            protected void initChannel(SocketChannel ch) throws UnsupportedEncodingException {
                                // 流水线的职责:负责管理通道中的处理器
                                ChannelPipeline pipeline = ch.pipeline();
                                // 发送数据包的最大长度
                                int maxFrameLength = 1024*1024;
                                // 长度域的偏移量
                                int lengthFieldOffset = 0;
                                // 长度域的字节数长度
                                int lengthFieldLength = 2;
                                // 长度域的偏移量矫正
                                int lengthAdjustment = 0;
                                // 丢弃的起始字节数
                                int initialBytesToStrip =2;
                                // 固定长度字段报文来分包
                                ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(maxFrameLength, lengthFieldOffset,
                                        lengthFieldLength, lengthAdjustment, initialBytesToStrip));
                                // LengthFieldPrepender是一个编码器,主要是在响应字节数据前面添加字节长度字段
                                ch.pipeline().addLast(new LengthFieldPrepender(2));
                                // 自定义解码器
                                pipeline.addLast(new ProtostuffDecoder(EchoRequest.class));
                                // 自定义编码器
                                pipeline.addLast(new ProtostuffEncoder());
                                // 向“子通道”(传输通道)流水线添加一个处理器,
                                pipeline.addLast(serverHandler);
                            }
                        });
                // 开始绑定服务器,通过调用sync同步方法阻塞直到绑定成功
                ChannelFuture channelFuture = serverBootstrap.bind().sync();
                // 等待通道关闭的异步任务结束,服务监听通道会一直等待通道关闭的异步任务结束
                channelFuture.channel().closeFuture().sync();
            } finally {
                // 优雅关闭EventLoopGroup
                // 释放掉所有资源,包括创建的线程
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81

    2.8、ServerHandler

    
    import com.example.netty.protostuff.EchoRequest;
    import com.example.netty.protostuff.EchoResponse;
    import com.example.netty.protostuff.User;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.channel.group.ChannelGroup;
    import io.netty.channel.group.DefaultChannelGroup;
    import io.netty.util.concurrent.GlobalEventExecutor;
    import java.util.UUID;
    
    /**
     * @author
     */
    @ChannelHandler.Sharable
    public class ServerHandler  extends ChannelInboundHandlerAdapter {
    
        private static ChannelGroup clientChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            Channel channel = ctx.channel();
            clientChannels.add(channel);
    
            String sendMsg = "客户[" + channel.remoteAddress() + "]上线\n";
            System.out.print(sendMsg);
            clientChannels.forEach(clientChannel -> {
                if(clientChannel != channel) {
                    clientChannel.writeAndFlush(sendMsg);
                }
            });
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            Channel channel = ctx.channel();
            if(clientChannels.contains(channel)) {
                clientChannels.remove(channel);
    
                String sendMsg = "客户[" + channel.remoteAddress() + "]异常下线\n";
                System.out.print(sendMsg);
                clientChannels.forEach(clientChannel -> clientChannel.writeAndFlush(sendMsg));
            }
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            // 接收到的对象的类型为EchoRequest
            for (int i = 0; i < 50; i++) {
                EchoRequest req = (EchoRequest) msg;
                System.out.println(req.getRequestId() + " : " + req.getRequestObj());
                // 创建需要传输的user对象
                User user = new User();
                user.setName("server");
                user.setAge(i);
                // 创建传输的user对象载体EchoRequest对象
                EchoResponse resp = new EchoResponse();
                // 设置responseId
                resp.setResponseId(UUID.randomUUID().toString());
                // 设置需要传输的对象
                resp.setResponseObj(user);
                // 设置需要传输的对象的类型
                resp.setResponseObjClass(resp.getResponseObj().getClass());
                // 调用writeAndFlush将数据发送到socketChannel
                ctx.writeAndFlush(resp);
            }
    
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            Channel channel = ctx.channel();
            clientChannels.remove(channel);
    
            String msg = cause.getMessage();
            String sendMsg = "客户[" + channel.remoteAddress() + "]异常: " + msg + "\n";
            System.out.print(sendMsg);
            clientChannels.forEach(clientChannel -> clientChannel.writeAndFlush(sendMsg));
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82

    2.9、NettyChatClient

    
    import com.example.netty.decode.ProtostuffDecoder;
    import com.example.netty.encode.ProtostuffEncoder;
    import com.example.netty.protostuff.EchoResponse;
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
    import io.netty.handler.codec.LengthFieldPrepender;
    import java.io.UnsupportedEncodingException;
    import java.net.InetSocketAddress;
    
    public class NettyChatClient {
    
        private static final int PORT = 8080;
    
        public static void main(String[] args) throws InterruptedException, UnsupportedEncodingException {
    
            final ClientHandler clientHandler = new ClientHandler();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
    
            try{
                Bootstrap bootstrap = new Bootstrap();
                bootstrap.group(workerGroup)
                        .channel(NioSocketChannel.class)
                        .remoteAddress(new InetSocketAddress(PORT))
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline pipeline = ch.pipeline();
                                // 发送数据包的最大长度
                                int maxFrameLength = 1024*1024;
                                // 长度域的偏移量
                                int lengthFieldOffset = 0;
                                // 长度域的字节数长度
                                int lengthFieldLength = 2;
                                // 长度域的偏移量矫正
                                int lengthAdjustment = 0;
                                // 丢弃的起始字节数
                                int initialBytesToStrip =2;
                                // 固定长度字段报文来分包
                                ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(maxFrameLength, lengthFieldOffset,
                                        lengthFieldLength, lengthAdjustment, initialBytesToStrip));
                                // LengthFieldPrepender是一个编码器,主要是在响应字节数据前面添加字节长度字段
                                ch.pipeline().addLast(new LengthFieldPrepender(2));
                                // 自定义解码器
                                pipeline.addLast(new ProtostuffDecoder(EchoResponse.class));
                                // 自定义编码器
                                pipeline.addLast(new ProtostuffEncoder());
                                pipeline.addLast(clientHandler);
                            }
                        });
    
                ChannelFuture channelFuture = bootstrap.connect().sync();
                channelFuture.channel().closeFuture().sync();
            } finally {
                workerGroup.shutdownGracefully();
            }
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66

    2.10、ClientHandler

    
    import com.example.netty.protostuff.EchoRequest;
    import com.example.netty.protostuff.EchoResponse;
    import com.example.netty.protostuff.User;
    import io.netty.channel.*;
    import lombok.extern.slf4j.Slf4j;
    import java.util.UUID;
    
    /**
     * @author
     */
    @Slf4j
    @ChannelHandler.Sharable
    public class ClientHandler  extends SimpleChannelInboundHandler<EchoResponse> {
    
        @Override
        public void channelRead0(ChannelHandlerContext ctx, EchoResponse resp){
            System.out.println(resp.getResponseId() + " : " + resp.getResponseObj());
            User user = (User) resp.getResponseObj();
            System.out.println(user);
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            Channel channel = ctx.channel();
            String msg = cause.getMessage();
            System.out.print("群聊[" + channel.remoteAddress() + "]异常: " + msg);
        }
        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            // 创建需要传输的user对象
            User user = new User();
            user.setName("client");
            user.setAge(11);
            // 创建传输的user对象载体EchoRequest对象
            EchoRequest req = new EchoRequest();
            // 设置requestId
            req.setRequestId(UUID.randomUUID().toString());
            // 设置需要传输的对象
            req.setRequestObj(user);
            // 设置需要传输的对象的类型
            req.setRequestObjClass(req.getRequestObj().getClass());
            // 调用writeAndFlush将数据发送到socketChannel
            ctx.writeAndFlush(req);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    在这里插入图片描述

  • 相关阅读:
    高通XBL阶段读取分区
    从零开始封装 vue 组件
    只需三步,教你搭建一个进销存管理系统!
    二维数组的认识和使用
    45.讲位图:如何实现网页爬虫中的URL去重功能
    JavaScript高级知识-this的指向
    机器学习算法一之DBSCAN聚类原理与实现(二维及三维)
    元素居中展示
    try-with-resources
    MyBatis-Plus学习笔记
  • 原文地址:https://blog.csdn.net/Extraordinarylife/article/details/126106171