• 使用Netty进行协议开发:多协议支持与自定义协议的实现


    为什么需要协议❓❓❓

    在TCP/IP中,数据传输是通过流的方式进行的,这意味着数据被分割成一系列的数据包,并通过网络传输。这样的流式传输方式虽然高效,但没有明确的消息边界,因此在接收端需要一种机制来确定消息的开始和结束位置。协议的目的就是划定消息的边界,制定通信双方要共同遵守的通信规则

    协议的作用就是定义通信双方之间的规则和约定,包括消息的格式、传输方式、错误处理等。通过协议,发送方可以将数据按照一定的格式打包成消息,并在消息中包含必要的元数据,如消息长度等信息。接收方则按照协议规定的格式和规则解析消息,从而正确地还原出发送方发送的数据。

    协议还可以定义数据的校验机制,以确保数据的完整性和可靠性。例如,在TCP协议中,每个数据包都有一个校验和字段,接收方可以通过校验和验证数据是否在传输过程中损坏或篡改。如果数据包损坏,接收方可以要求发送方重新发送。

    下雨天留客天留我不留,是中文一句著名的无标点符号句子,在没有标点符号情况下,这句话有数种拆解方式,而意思却是完全不同,所以常被用作讲述标点符号的重要性。

    • 一种解读,“下雨天留客,天留,我不留”
    • 另一种解读,下雨天,留客天,留我不?留

    如何设计协议呢?其实就是给网络传输的信息加上“标点符号”。但通过分隔符来断句不是很好,因为分隔符本身如果用于传输,那么必须加以区分。因此,下面一种协议较为常用

    定长字节表示内容长度 + 实际内容
    
    • 1

    例如,一个中文字符两个字节,按照上述协议的规则,发送信息方式如下,就不会被接收方弄错意思了

    06下雨天06留客天06留我不02留
    
    • 1

    Netty支持丰富的协议,让程序员专注于业务🧱

    在Netty中,已经封装了许多协议的实现细节,这使得开发者可以专注于业务逻辑,而无需自行处理协议的实现。下面是一个基于HTTP协议通信的示例代码,演示了如何简化协议处理——打印http请求内容以及返回响应

    @Slf4j
    public class TestHttp {
        public static void main(String[] args) {
            NioEventLoopGroup boss = new NioEventLoopGroup();
            NioEventLoopGroup worker = new NioEventLoopGroup();
    
            try {
                ServerBootstrap bootstrap = new ServerBootstrap();
                bootstrap.channel(NioServerSocketChannel.class);
                bootstrap.group(boss, worker);
                bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                        // Http服务器编解码器
                        ch.pipeline().addLast(new HttpServerCodec());
                        // 可以根据消息类型去区分处理
                        ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {
                            @Override
                            protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception {
                                // 读取请求
                                log.debug("收到消息:{}", msg.uri());
    
                                // 返回响应
                                DefaultFullHttpResponse response = new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK);
                                byte[] bytes = "

    Hello, world!

    "
    .getBytes(); response.content().writeBytes(bytes); response.headers().setInt(CONTENT_LENGTH, bytes.length); // 写回响应 ctx.writeAndFlush(response); } }); } }); ChannelFuture channelFuture = bootstrap.bind(8080).sync(); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { boss.shutdownGracefully(); worker.shutdownGracefully(); } } }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    借助浏览器向服务端发送请求:

    在这里插入图片描述
    在这里插入图片描述

    自定义协议👤

    如果你想自定义一个协议,在Netty框架中也可以很容易地实现。Netty提供了强大的API和工具,可以轻松地构建自定义协议的通信。

    1、前置准备

    当涉及消息传输和编解码时,为了实现更好的可扩展性和互操作性,定义一个公共消息体是很有帮助的。公共消息体是在通信双方之间共享的数据结构,用于在网络上进行消息的传输和解析。
    在这里插入图片描述
    抽象消息体

    package com.gw.core.message;
    
    import lombok.Data;
    
    import java.io.Serializable;
    import java.util.HashMap;
    import java.util.Map;
    /**
     * Description: 抽象消息体
     *
     * @author YanAn
     * @date 2023/8/7 20:37
     */
    @Data
    public abstract class Message implements Serializable {
    
        /**
         * 根据消息类型字节,获得对应的消息 class
         * @param messageType 消息类型字节
         * @return 消息 class
         */
        public static Class<? extends Message> getMessageClass(int messageType) {
            return messageClasses.get(messageType);
        }
    
        private int sequenceId;
    
        private int messageType;
    
        public abstract int getMessageType();
        /**
         * 请求类型 byte 值
         */
        public static final int RPC_MESSAGE_TYPE_REQUEST = 0;
        /**
         * 响应类型 byte 值
         */
        public static final int RPC_MESSAGE_TYPE_RESPONSE = 1;
    
        private static final Map<Integer, Class<? extends Message>> messageClasses = new HashMap<>();
    
        static {
            messageClasses.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class);
            messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    Rpc请求消息体

    package com.gw.core.message;
    
    import lombok.Getter;
    import lombok.ToString;
    
    /**
     * Description: Rpc请求体
     *
     * @author LinHuiBa-YanAn
     * @date 2023/8/7 20:39
     */
    @Getter
    @ToString(callSuper = true)
    public class RpcRequestMessage extends Message {
    
        /**
         * 调用的接口全限定名,服务端根据它找到实现
         */
        private String interfaceName;
        /**
         * 调用接口中的方法名
         */
        private String methodName;
        /**
         * 方法返回类型
         */
        private Class<?> returnType;
        /**
         * 方法参数类型数组
         */
        private Class[] parameterTypes;
        /**
         * 方法参数值数组
         */
        private Object[] parameterValue;
    
        public RpcRequestMessage(int sequenceId, String interfaceName, String methodName, Class<?> returnType, Class[] parameterTypes, Object[] parameterValue) {
            super.setSequenceId(sequenceId);
            this.interfaceName = interfaceName;
            this.methodName = methodName;
            this.returnType = returnType;
            this.parameterTypes = parameterTypes;
            this.parameterValue = parameterValue;
        }
    
        @Override
        public int getMessageType() {
            return RPC_MESSAGE_TYPE_REQUEST;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    Rpc响应消息体

    package com.gw.core.message;
    
    import lombok.Data;
    import lombok.ToString;
    
    /**
     * Description: Rpc响应体
     *
     * @author YanAn
     * @date 2023/8/7 20:43
     */
    @Data
    @ToString(callSuper = true)
    public class RpcResponseMessage extends Message {
        /**
         * 返回值
         */
        private Object returnValue;
        /**
         * 异常值
         */
        private Exception exceptionValue;
    
        @Override
        public int getMessageType() {
            return RPC_MESSAGE_TYPE_RESPONSE;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    2、编解码器抽象类介绍

    MessageToMessageCodec是Netty框架中提供的一个编解码器抽象类,用于在处理网络通信中的消息时进行编码和解码的转换。可以看作是MessageToMessageDecoder和MessageToMessageEncoder的组合。MessageToMessageCodec实际上是ChannelDuplexHandler的一个子类,它同时实现了ChannelInboundHandlerChannelOutboundHandler接口。这使得它既可以在入站消息(从外部系统接收的消息)的解码过程中使用,也可以在出站消息(发送到外部系统的消息)的编码过程中使用。

    3、编解码器实现

    让我们先思考一下自定义协议需要哪些要素?

    • 魔数,用来在第一时间判定是否是无效数据包
    • 版本号,可以支持协议的升级
    • 序列化算法,消息正文到底采用哪种序列化反序列化方式,可以由此扩展,例如:json、protobuf、hessian、jdk……
    • 指令类型,登录、注册、单聊、群聊等(业务相关)
    • 请求序号,为了双向通信,提供异步能力
    • 正文长度
    • 消息正文

    根据上面的要素,设计一个登录请求消息和登录响应消息,并使用 Netty 完成收发

    package com.gw.core.protocol;
    
    import com.gw.core.config.Config;
    import com.gw.core.message.Message;
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.MessageToMessageCodec;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.List;
    
    /**
     * Description: 消息的编解码器
     *
     * @author LinHuiBa-YanAn
     * @date 2023/8/7 20:49
     */
    @Slf4j
    @ChannelHandler.Sharable
    public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {
        @Override
        public void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception {
            ByteBuf out = ctx.alloc().buffer();
           	// 1. 4个字节的魔数
          	out.writeBytes(new byte[]{1, 2, 3, 4});
          	// 2. 1 字节的版本
            out.writeByte(1);
          	// 3. 1 字节的序列化方式 jdk 0 , json 1
            out.writeByte(1);
          	// 4. 1 字节的消息指令类型
            out.writeByte(msg.getMessageType());
          	// 5. 4 个字节请求序号
            out.writeInt(msg.getSequenceId());
          	// 填充位(以上共11个字节,固加上一个字节做填充位)
            out.writeByte(0xff);
          	// 获取内容的字节数组
            byte[] bytes = Config.getSerializerAlgorithm().serialize(msg);
          	// 长度
            out.writeInt(bytes.length);
          	// 消息征文
            out.writeBytes(bytes);
            outList.add(out);
        }
    
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
          	// 读取魔术
            int magicNum = in.readInt();
          	// 读取版本
            byte version = in.readByte();
          	// 读取序列化方法
            byte serializerAlgorithm = in.readByte();
          	// 读取消息指令类型
            byte messageType = in.readByte();
          	// 读取请求序号
            int sequenceId = in.readInt();
          	// 读取无意义的对齐填充
            in.readByte();
          	// 读取长度
            int length = in.readInt();
          	// 读取正文
            byte[] bytes = new byte[length];
            in.readBytes(bytes, 0, length);
    				
          	// 获取序列化算法
            Serializer.Algorithm algorithm = Serializer.Algorithm.values()[serializerAlgorithm];
          	// 根据消息指令类型获取消息类型
            Class<? extends Message> messageClass = Message.getMessageClass(messageType);
          	// 将字节数组反序列化成消息指令对象
            Message message = algorithm.deserialize(messageClass, bytes);
            out.add(message);
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75

    测试编码器

    public static void main(String[] args) throws Exception {
        EmbeddedChannel channel = new EmbeddedChannel(
                new LoggingHandler(),
                new ProtocolFrameDecoder(),
                new MessageCodecSharable()
        );
        int sequenceId = SequenceIdGenerator.nextId();
    
        Class<HelloService> serviceClass = HelloService.class;
        Method method = HelloService.class.getMethod("sayHello", String.class);
        String[] parameterValue = new String[]{"yanan"};
        RpcRequestMessage message = new RpcRequestMessage(sequenceId, serviceClass.getName(), method.getName(), method.getReturnType(), method.getParameterTypes(), parameterValue);
        channel.writeOutbound(message);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    在这里插入图片描述

    测试解码器

    public static void main(String[] args) throws Exception {
        EmbeddedChannel channel = new EmbeddedChannel(
                new LoggingHandler(),
                new ProtocolFrameDecoder(),
                new MessageCodecSharable()
        );
        int sequenceId = SequenceIdGenerator.nextId();
    
        Class<HelloService> serviceClass = HelloService.class;
        Method method = HelloService.class.getMethod("sayHello", String.class);
        String[] parameterValue = new String[]{"yanan"};
        RpcRequestMessage message = new RpcRequestMessage(sequenceId, serviceClass.getName(), method.getName(), method.getReturnType(), method.getParameterTypes(), parameterValue);
        // channel.writeOutbound(message);
        
        ByteBuf out = ByteBufAllocator.DEFAULT.buffer();
        out.writeBytes(new byte[]{1, 2, 3, 4});
        out.writeByte(1);
        out.writeByte(Config.getSerializerAlgorithm().ordinal());
        out.writeByte(message.getMessageType());
        out.writeInt(message.getSequenceId());
        out.writeByte(0xff);
        byte[] bytes = Config.getSerializerAlgorithm().serialize(message);
        out.writeInt(bytes.length);
        out.writeBytes(bytes);
    
        channel.writeInbound(out);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    在这里插入图片描述

  • 相关阅读:
    番外 1 : Java 环境下的 selenium 搭建
    数据结构-红黑树
    SpringBoot Actuator未授权访问漏洞修复
    Codeforces Round 929 (Div. 3 ABCDEFG题) 视频讲解
    云原生十二要素应用
    【Canvas】js用Canvas绘制阴阳太极图动画效果
    肽核酸(PNA)偶联穿膜肽(CCPs)(KFF)3K形成CCPs-PNA|肽核酸的使用方法
    SuperEdge易学易用系列-一键搭建SuperEdge集群
    19 【RTK Query】
    git clone报错SSL connect error
  • 原文地址:https://blog.csdn.net/m0_49183244/article/details/133102864