• 手写RPC框架--6.封装报文


    RPC框架-Gitee代码(麻烦点个Starred, 支持一下吧)
    RPC框架-GitHub代码(麻烦点个Starred, 支持一下吧)

    封装报文

    a.封装请求和负载

    服务调用方
     - 发送报文 writeAndFlush(objetc) 请求
       - 此处的object应该包含:DcyRpcRequest
         - 请求id(long)
         - 压缩类型(1个byte)
         - 序列化的方式(1个byte)
         - 消息类型(普通请求/心跳检测请求)(1个byte)
         - 负载payload:接口的名字,方法的名字,参数列表,返回值类型(1个byte)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    在core模块下com.dcyrpc下创建transprt.message包:用于放置传输消息的类

    transprt.message包创建DcyRpcRequest类:服务调用方发起的请求内容

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    @Builder
    public class DcyRpcRequest {
        // 请求的id
        private long requestId;
    
        // 请求的类型
        private byte requestType;
    
        // 压缩的类型
        private byte compressType;
    
        // 序列化的方式
        private byte serializeType;
    
        // 具体的消息体
        private RequestPayload requestPayload;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    transprt.message包创建RequestPayload类:请求调用方所请求的具体的消息体的描述

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    @Builder
    public class RequestPayload implements Serializable{
        // 接口的名字(服务的名字) -- com.dcyrpc.DcyRpc
        private String interfaceName;
    
        // 调用方法的名字 -- sayHi()
        private String methodName;
    
        // 参数列表: 参数类型或具体的参数
        //  - 参数类型: 确定重载方法
        //  - 具体的参数: 执行方法的调用
        private Class<?>[] parametersType;
        private Object[] parametersValue;
    
        // 返回值类型
        private Class<?> returnType;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    b.封装请求实体

    在core模块下的com.dcyrpc下创建enumeration

    在包内创建RequestEnum 枚举:定义相关的请求类型

    /**
     * 标记请求类型的枚举类
     */
    public enum RequestType {
    
        REQUEST((byte) 1, "普通请求"), HEART((byte) 2, "心跳检测");
    
        private byte id;
    
        private String type;
    
        RequestEnum(byte id, String type) {
            this.id = id;
            this.type = type;
        }
    
        public byte getId() {
            return id;
        }
    
        public String getType() {
            return type;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    RpcConsumerInvocationHandler中的封装报文位置进行添加代码

    • 已添加@Builder注解,可以直接使用建造者设计模式
    // 略....
    /**
     * ---------------------------封装报文---------------------------
     */
    RequestPayload requestPayload = RequestPayload.builder()
            .interfaceName(interfaceRef.getName())
            .methodName(method.getName())
            .parametersType(method.getParameterTypes())
            .parametersValue(args)
            .returnType(method.getReturnType())
            .build();
    
    DcyRpcRequest dcyRpcRequest = DcyRpcRequest.builder()
            .requestId(1L)
            .compressType((byte) 1)
            .serializeType((byte) 1)
            .requestType(RequestType.REQUEST.getId())
            .requestPayload(requestPayload)
            .build();
    
    // 略....
    
    // 直接使用writeAndFlush 写出一个请求,这个请求的实例就会进入pipeline执行出栈的一系列操作
    channel.writeAndFlush(dcyRpcRequest).addListener((ChannelFutureListener) promise -> {
        // 略....
    });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    c.对请求进行编码

    * 4B magic(魔数值) -- Drpc.getBytes()
    * 1B version(版本) -- 1
    * 2B header length(首部的长度)
    * 4B full length(报文的总长度)
    * 1B serialize (序列化类型的长度)
    * 1B compress(压缩类型的长度)
    * 1B requestType
    * 8B requestId
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    image-20230903180240307

    在core模块channelhandler.handler包下创建DcyRpcRequestEncoder类:出站时,第一个经过的处理器

    • 继承 MessageToByteEncoder
    /**
     * 出站时,第一个经过的处理器
     *
     * 4B magic(魔数值) -- Drpc.getBytes()
     * 1B version(版本) -- 1
     * 2B header length(首部的长度)
     * 4B full length(报文的总长度)
     * 1B serialize (序列化类型的长度)
     * 1B compress(压缩类型的长度)
     * 1B requestType
     * 8B requestId
     *
     * full length = header - header length
     */
    @Slf4j
    public class DcyRpcRequestEncoder extends MessageToByteEncoder<DcyRpcRequest> {
        @Override
        protected void encode(ChannelHandlerContext channelHandlerContext, DcyRpcRequest dcyRpcRequest, ByteBuf byteBuf) throws Exception {
            // 魔数值
            byteBuf.writeBytes(MessageFormatConstant.MAGIC);
            // 版本号
            byteBuf.writeByte(MessageFormatConstant.VERSION);
            // 2个字节的头部的长度
            byteBuf.writeShort(MessageFormatConstant.HEADER_LENGTH);
            // 总长度未知,不知道body的长度
            byteBuf.writerIndex(byteBuf.writerIndex() + MessageFormatConstant.FULL_FIELD_LENGTH);
            // 请求类型
            byteBuf.writeByte(dcyRpcRequest.getRequestType());
            // 序列化类型
            byteBuf.writeByte(dcyRpcRequest.getSerializeType());
            // 压缩类型
            byteBuf.writeByte(dcyRpcRequest.getCompressType());
            // 8个字节的请求id
            byteBuf.writeLong(dcyRpcRequest.getRequestId());
    
            // 写入请求体body(requestPayload)
            byte[] body = getBodyBytes(dcyRpcRequest.getRequestPayload());
            if (body != null) {
                byteBuf.writeBytes(body);
                byteBuf.writeInt(MessageFormatConstant.HEADER_LENGTH + body.length);
            }
            
            int bodyLength = body ==null ? 0 : body.length;
            
            // 重新处理报文的总长度
            // 先获取当前的写指针的位置
            int writerIndex = byteBuf.writerIndex();
            // 将写指针的位置移动到总长度的位置上
            byteBuf.writerIndex(MessageFormatConstant.MAGIC.length + MessageFormatConstant.VERSION_LENGTH + MessageFormatConstant.HEADER_FIELD_LENGTH);
            byteBuf.writeInt(MessageFormatConstant.HEADER_LENGTH + bodyLength);
    
            // 将写指针归位
            byteBuf.writerIndex(writerIndex);
        }
    
        private byte[] getBodyBytes(RequestPayload requestPayload) {
            // 心跳请求没有payload
            if (requestPayload == null) {
                return null;
            }
            // 对象序列化成字节数组
            try {
                ByteArrayOutputStream baos = new ByteArrayOutputStream();
                ObjectOutputStream outputStream = new ObjectOutputStream(baos);
                outputStream.writeObject(requestPayload);
                return baos.toByteArray();
            } catch (IOException e) {
                log.error("序列化时出现异常");
                throw new RuntimeException(e);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72

    transprt.message包创建MessageFormatConstant类:封装请求参数的相关常量

    public class MessageFormatConstant {
        // 魔数值
        public final static byte[] MAGIC = "Drpc".getBytes();
    
        // 版本号
        public final static byte VERSION = 1;
    
        // 首部的长度
        public final static short HEADER_LENGTH = (byte) (MAGIC.length + 1 + 2 + 4 + 1 + 1 + 1 + 8);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    d.日志处理器输出报文

    修改channelHandler包下的ConsumerChannelInitializer类:添加日志处理器 LoggingHandler

    // 略...
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        socketChannel.pipeline()
                // netty自带的日志处理器
                .addLast(new LoggingHandler(LogLevel.DEBUG))
                // 消息编码器
                .addLast(new DcyRpcRequestEncoder())
                .addLast(new MySimpleChannelInboundHandler());
    
    }
    // 略...
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    在这里插入图片描述

    e.编写报文解码器

    在core模块channelhandler.handler包下创建DcyRpcRequestDecoder类:对报文进行解码解析

    • 继承 LengthFieldBasedFrameDecoder
    /**
     * 解码器
     */
    @Slf4j
    public class DcyRpcRequestDecoder extends LengthFieldBasedFrameDecoder {
        public DcyRpcRequestDecoder() {
    
            // 找到当前报文的总长度,截取报文,截取出来的报文可以进行解析
            super(
                    // 最大帧的长度,超过这个maxFrameLength值,会直接丢弃
                    MessageFormatConstant.MAX_FRAME_LENGTH,
                    // 长度字段偏移量
                    MessageFormatConstant.LENGTH_FIELD_OFFSET,
                    // 长度的字段的长度
                    MessageFormatConstant.FULL_FIELD_LENGTH,
                    // todo 负载的适配长度
                    -(MessageFormatConstant.LENGTH_FIELD_OFFSET + MessageFormatConstant.FULL_FIELD_LENGTH),
                    // 跳过的字段
                    0);
        }
    
        @Override
        protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
            Object decode = super.decode(ctx, in);
            if (decode instanceof ByteBuf) {
                ByteBuf byteBuf = (ByteBuf) decode;
                return decodeFrame(byteBuf);
            }
            return null;
        }
    
        private Object decodeFrame(ByteBuf byteBuf) {
            // 1.解析魔数
            byte[] magic = new byte[MessageFormatConstant.MAGIC.length];
            byteBuf.readBytes(magic);
    
            // 检测魔数值是否匹配
            for (int i = 0; i < magic.length; i++) {
                if (magic[i] != MessageFormatConstant.MAGIC[i]) {
                    throw new RuntimeException("获得的请求类型不匹配");
                }
            }
    
            // 2.解析版本号
            byte version = byteBuf.readByte();
            if (version > MessageFormatConstant.VERSION) {
                throw new RuntimeException("获得的请求版本不被支持");
            }
    
            // 3.解析头部的长度
            short headLength = byteBuf.readShort();
    
            // 4.解析总长度
            int fullLength = byteBuf.readInt();
    
            // 5.解析请求类型
            byte requestType = byteBuf.readByte();
    
            // 6.解析序列化类型
            byte serializeType = byteBuf.readByte();
    
            // 7.解析压缩型
            byte compressType = byteBuf.readByte();
    
            // 8.解析请求Id
            long requestId = byteBuf.readLong();
    
            DcyRpcRequest dcyRpcRequest = new DcyRpcRequest();
            dcyRpcRequest.setRequestType(requestType);
            dcyRpcRequest.setCompressType(compressType);
            dcyRpcRequest.setSerializeType(serializeType);
    
            // 心跳请求没有负载,此处可以判断并直接返回
            if (requestType == RequestType.HEART.getId()) {
                return dcyRpcRequest;
            }
    
            // 9.解析消息体payload
            int payloadLength = fullLength - headLength;
            byte[] payload = new byte[payloadLength];
            byteBuf.readBytes(payload);
    
            // 解压缩和反序列化
            // todo 解压缩
    
            // 反序列化
            try (ByteArrayInputStream bis = new ByteArrayInputStream(payload);
                 ObjectInputStream ois = new ObjectInputStream(bis)
            ) {
    
                RequestPayload requestPayload = (RequestPayload) ois.readObject();
                dcyRpcRequest.setRequestPayload(requestPayload);
            } catch (IOException | ClassNotFoundException e) {
                log.error("请求【{}】反序列化时出现异常", requestId, e);
                throw new RuntimeException(e);
            }
    
            return dcyRpcRequest;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100

    MessageFormatConstant类里定义相关的常量

    // 略...
    // 头部信息长度占用的字节数
    public final static int HEADER_FIELD_LENGTH = 2;
    
    // 最大帧长度
    public final static int MAX_FRAME_LENGTH = 1024 * 1024;
    
    // 长度字段偏移量
    public final static int LENGTH_FIELD_OFFSET = MAGIC.length + VERSION_LENGTH + HEADER_FIELD_LENGTH;
    
    // 总长度占用的字节数
    public final static int FULL_FIELD_LENGTH = 4;
    // 略...
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    修改DcyRpcBootstrapstart()方法

    • 添加 DcyRpcRequestDecoder解码器
    // 略...
    
    // 3.配置服务器
    serverBootstrap = serverBootstrap.group(boss, worker)
            .channel(NioServerSocketChannel.class)
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    // TODO 核心内容,需要添加很多入栈和出栈的handler
                    socketChannel.pipeline().addLast(new LoggingHandler(LogLevel.INFO))
                            // 对报文进行解码
                            .addLast(new DcyRpcRequestDecoder())
                }
            });
    
    // 略...
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    f.通过反射调用方法

    在core模块channelhandler.handler包下创建MethodCallHandler类:

    • 继承 SimpleChannelInboundHandler
    /**
     * 利用反射调用方法
     */
    @Slf4j
    public class MethodCallHandler extends SimpleChannelInboundHandler<DcyRpcRequest> {
        @Override
        protected void channelRead0(ChannelHandlerContext channelHandlerContext, DcyRpcRequest dcyRpcRequest) throws Exception {
            // 1.获取负载内容payload
            RequestPayload requestPayload = dcyRpcRequest.getRequestPayload();
    
            // 2.根据负载内容进行方法调用
            Object object = callTargetMethod(requestPayload);
    
            // 3.封装响应
    
            // 4.写出响应
            channelHandlerContext.channel().writeAndFlush(null);
        }
    
        /**
         * 通过反射调用方法
         * @param requestPayload
         * @return
         */
        private Object callTargetMethod(RequestPayload requestPayload) {
            String interfaceName = requestPayload.getInterfaceName();
            String methodName = requestPayload.getMethodName();
            Class<?>[] parametersType = requestPayload.getParametersType();
            Object[] parametersValue = requestPayload.getParametersValue();
    
            // 寻找暴露出去的具体实现
            ServiceConfig<?> serviceConfig = DcyRpcBootstrap.SERVERS_LIST.get(interfaceName);
    
            Object refImpl = serviceConfig.getRef();
    
            // 通过反射调用
            //  - 1.获取方法对象
            //  - 2.执行invoke方法
            Object returnValue = null;
            try {
                Class<?> refImplClass = refImpl.getClass();
                Method method = refImplClass.getMethod(methodName, parametersType);
                returnValue = method.invoke(refImpl, parametersValue);
            } catch (Exception e) {
                log.error("调用服务【{}】的方法【{}】时发生异常", interfaceName, methodName, e);
                throw new RuntimeException(e);
            }
    
            return returnValue;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51

    修改DcyRpcBootstrapstart()方法

    • 添加 MethodCallHandler 执行方法调用
    // 略...
    // 3.配置服务器
    serverBootstrap = serverBootstrap.group(boss, worker)
            .channel(NioServerSocketChannel.class)
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    // TODO 核心内容,需要添加很多入栈和出栈的handler
                    socketChannel.pipeline().addLast(new LoggingHandler(LogLevel.INFO))
                            // 对报文进行解码
                            .addLast(new DcyRpcRequestDecoder())
                            // 根据请求进行方法调用
                            .addLast(new MethodCallHandler());
                }
            });
    
    // 4.绑定端口
    // 略...
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
  • 相关阅读:
    Django REST Framework完整教程-认证与权限-JWT的使用
    数据中心网络规划设计,数据中心设计规范解读
    数字电路计算器
    六级高频词汇
    BFC(块格式化上下文)
    jmeter实现webservice接口测试
    win10系统图片查看器关联不上了
    IB中文:语言与文学中的非文学语篇
    执行ls /dev/pts为什么这么慢?
    PyTorch JIT和TorchScript,一个API提升推理性能50%
  • 原文地址:https://blog.csdn.net/weixin_46926189/article/details/132712077