• 手写RPC框架--7.封装响应


    RPC框架-Gitee代码(麻烦点个Starred, 支持一下吧)
    RPC框架-GitHub代码(麻烦点个Starred, 支持一下吧)

    封装响应

    a.封装响应

    在core模块下的com.dcyrpcenumeration

    在包内创建ResponseCode 枚举:定义响应码枚举

    /**
     * 响应码枚举
     */
    public enum ResponseCode {
    
        SUCCESS((byte) 1, "成功"), FAIL((byte) 2, "失败");
    
        private byte code;
        private String desc;
    
        ResponseCode(byte code, String desc) {
            this.code = code;
            this.desc = desc;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    在core模块下com.dcyrpctransprt.message包下

    创建DcyRpcResponse类:服务提供方回复的响应

    /**
     * 服务提供方回复的响应
     */
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    @Builder
    public class DcyRpcResponse {
        // 请求的id
        private long requestId;
    
        // 压缩的类型
        private byte compressType;
    
        // 序列化的方式
        private byte serializeType;
    
        // 响应码类型:1 成功,2 异常
        private byte code;
    
        // 具体的消息体
        private Object body;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    在core模块channelhandler.handler包下创建DcyRpcResponseEncoder类:对调用结果进行编码

    /**
     * 编码器
     *
     * 4B magic(魔数值) -- Drpc.getBytes()
     * 1B version(版本) -- 1
     * 2B header length(首部的长度)
     * 4B full length(报文的总长度)
     * 1B serialize (序列化类型的长度)
     * 1B compress(压缩类型的长度)
     * 1B code(响应码)
     * 8B requestId
     *
     * Object body
     */
    @Slf4j
    public class DcyRpcResponseEncoder extends MessageToByteEncoder<DcyRpcResponse> {
        @Override
        protected void encode(ChannelHandlerContext channelHandlerContext, DcyRpcResponse dcyRpcResponse, ByteBuf byteBuf) throws Exception {
            // 4个字节魔数值
            byteBuf.writeBytes(MessageFormatConstant.MAGIC);
            // 1个字节版本号
            byteBuf.writeByte(MessageFormatConstant.VERSION);
            // 2个字节的头部的长度
            byteBuf.writeShort(MessageFormatConstant.HEADER_LENGTH);
            // 总长度未知,不知道body的长度
            byteBuf.writerIndex(byteBuf.writerIndex() + MessageFormatConstant.FULL_FIELD_LENGTH);
            // 响应码
            byteBuf.writeByte(dcyRpcResponse.getCode());
            // 序列化类型
            byteBuf.writeByte(dcyRpcResponse.getSerializeType());
            // 压缩类型
            byteBuf.writeByte(dcyRpcResponse.getCompressType());
            // 8个字节的请求id
            byteBuf.writeLong(dcyRpcResponse.getRequestId());
    
            // 写入请求体body(requestPayload)
            byte[] body = getBodyBytes(dcyRpcResponse.getBody());
            if (body != null) {
                byteBuf.writeBytes(body);
                byteBuf.writeInt(MessageFormatConstant.HEADER_LENGTH + body.length);
            }
    
            int bodyLength = body ==null ? 0 : body.length;
    
            // 重新处理报文的总长度
            // 先获取当前的写指针的位置
            int writerIndex = byteBuf.writerIndex();
            // 将写指针的位置移动到总长度的位置上
            byteBuf.writerIndex(MessageFormatConstant.MAGIC.length + MessageFormatConstant.VERSION_LENGTH + MessageFormatConstant.HEADER_FIELD_LENGTH);
            byteBuf.writeInt(MessageFormatConstant.HEADER_LENGTH + bodyLength);
    
            // 将写指针归位
            byteBuf.writerIndex(writerIndex);
        }
    
        private byte[] getBodyBytes(Object body) {
            // 心跳请求没有payload
            if (body == null) {
                return null;
            }
            // 对象序列化成字节数组
            try {
                ByteArrayOutputStream baos = new ByteArrayOutputStream();
                ObjectOutputStream outputStream = new ObjectOutputStream(baos);
                outputStream.writeObject(body);
                return baos.toByteArray();
            } catch (IOException e) {
                log.error("序列化时出现异常");
                throw new RuntimeException(e);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72

    修改DcyRpcBootstrapstart()方法

    • 添加 DcyRpcResponseEncoder 响应编码器
    // 略....
    // 3.配置服务器
    serverBootstrap = serverBootstrap.group(boss, worker)
            .channel(NioServerSocketChannel.class)
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    // TODO 核心内容,需要添加很多入栈和出栈的handler
                    socketChannel.pipeline().addLast(new LoggingHandler(LogLevel.INFO))
                            // 对报文进行解码
                            .addLast(new DcyRpcRequestDecoder())
                            // 根据请求进行方法调用
                            .addLast(new MethodCallHandler())
                            // 对响应结果进行编码
                            .addLast(new DcyRpcResponseEncoder());
                }
            });
    // 略....
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    在core模块channelhandler.handler包下创建DcyRpcResponseEncoder类:服务请求方对响应结果进行解码

    /**
     * 解码器
     */
    @Slf4j
    public class DcyRpcResponseDecoder extends LengthFieldBasedFrameDecoder {
        public DcyRpcResponseDecoder() {
    
            // 找到当前报文的总长度,截取报文,截取出来的报文可以进行解析
            super(
                    // 最大帧的长度,超过这个maxFrameLength值,会直接丢弃
                    MessageFormatConstant.MAX_FRAME_LENGTH,
                    // 长度字段偏移量
                    MessageFormatConstant.MAGIC.length + MessageFormatConstant.VERSION_LENGTH + MessageFormatConstant.HEADER_FIELD_LENGTH,
                    // 长度的字段的长度
                    MessageFormatConstant.FULL_FIELD_LENGTH,
                    // todo 负载的适配长度
                    -(MessageFormatConstant.MAGIC.length + MessageFormatConstant.VERSION_LENGTH + MessageFormatConstant.HEADER_FIELD_LENGTH + MessageFormatConstant.FULL_FIELD_LENGTH),
                    // 跳过的字段
                    0);
        }
    
        @Override
        protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
            Object decode = super.decode(ctx, in);
            if (decode instanceof ByteBuf) {
                ByteBuf byteBuf = (ByteBuf) decode;
                return decodeFrame(byteBuf);
            }
            return null;
        }
    
        private Object decodeFrame(ByteBuf byteBuf) {
            // 1.解析魔数
            byte[] magic = new byte[MessageFormatConstant.MAGIC.length];
            byteBuf.readBytes(magic);
    
            // 检测魔数值是否匹配
            for (int i = 0; i < magic.length; i++) {
                if (magic[i] != MessageFormatConstant.MAGIC[i]) {
                    throw new RuntimeException("获得的请求类型不匹配");
                }
            }
    
            // 2.解析版本号
            byte version = byteBuf.readByte();
            if (version > MessageFormatConstant.VERSION) {
                throw new RuntimeException("获得的请求版本不被支持");
            }
    
            // 3.解析头部的长度
            short headLength = byteBuf.readShort();
    
            // 4.解析总长度
            int fullLength = byteBuf.readInt();
    
            // 5.解析响应码
            byte responseCode = byteBuf.readByte();
    
            // 6.解析序列化类型
            byte serializeType = byteBuf.readByte();
    
            // 7.解析压缩型
            byte compressType = byteBuf.readByte();
    
            // 8.解析请求Id
            long requestId = byteBuf.readLong();
    
            DcyRpcResponse dcyRpcResponse = new DcyRpcResponse();
            dcyRpcResponse.setCode(responseCode);
            dcyRpcResponse.setCompressType(compressType);
            dcyRpcResponse.setSerializeType(serializeType);
            dcyRpcResponse.setRequestId(requestId);
    
            // 9.解析消息体payload
            int bodyLength = fullLength - headLength;
            byte[] body = new byte[bodyLength];
            byteBuf.readBytes(body);
    
            // 解压缩和反序列化
            // todo 解压缩
    
            // 反序列化
            try (ByteArrayInputStream bis = new ByteArrayInputStream(body);
                 ObjectInputStream ois = new ObjectInputStream(bis)
    
            ) {
                Object object = ois.readObject();
                dcyRpcResponse.setBody(object);
            } catch (IOException | ClassNotFoundException e) {
                log.error("请求【{}】反序列化时出现异常", requestId, e);
                throw new RuntimeException(e);
            }
    
            return dcyRpcResponse;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96

    修改channelHandler包下的ConsumerChannelInitializer类:添加入站的解码器 DcyRpcResponseDecoder()

    public class ConsumerChannelInitializer extends ChannelInitializer<SocketChannel> {
        @Override
        protected void initChannel(SocketChannel socketChannel) throws Exception {
            socketChannel.pipeline()
                    // netty自带的日志处理器
                    .addLast(new LoggingHandler(LogLevel.INFO))
                    // 消息编码器
                    .addLast(new DcyRpcRequestEncoder())
                    // 入站的解码器
                    .addLast(new DcyRpcResponseDecoder())
                    // 处理结果
                    .addLast(new MySimpleChannelInboundHandler());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    修改channelhandler.handler包下的MySimpleChannelInboundHandler类:将响应结果的ByteBuf改成DcyRpcResponse

    /**
     * 处理响应结果
     */
    public class MySimpleChannelInboundHandler extends SimpleChannelInboundHandler<DcyRpcResponse> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, DcyRpcResponse dcyRpcResponse) throws Exception {
            // 异步
            // 服务提供方,给予的结果
            Object returnValue = dcyRpcResponse.getBody();
            // 从全局的挂起的请求中,寻找与之匹配的待处理 completeFuture
            CompletableFuture<Object> completableFuture = DcyRpcBootstrap.PENDING_REQUEST.get(1L);
            completableFuture.complete(returnValue);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    在core模块channelhandler.handler包的MethodCallHandler类:封装响应结果

    // 略...
    // 3.封装响应
    DcyRpcResponse dcyRpcResponse = DcyRpcResponse.builder()
            .code(ResponseCode.SUCCESS.getCode())
            .requestId(dcyRpcRequest.getRequestId())
            .compressType(dcyRpcRequest.getCompressType())
            .serializeType(dcyRpcRequest.getSerializeType())
            .body(result)
            .build();
    
    // 4.写出响应
    channelHandlerContext.channel().writeAndFlush(dcyRpcResponse);
    
    // 略...
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    b.请求id生成器(雪花算法)

    在当前项目中,我们需要给请求一个唯一标识,用来标识一个请求和响应的关联关系,我们要求请求的id必须唯一,且不能占用过大的空间,可用的方案如下:

    • 1.自增id,单机的自增id不能解决不重复的问题,微服务情况下我们需要一个稳定的发号服务才能保证,但是这样做性能偏低。

    • 2.uuid,将uuid作为唯一标识占用空间太大

    • 3.雪花算法,最优解。

    雪花算法(snowflake)最早是twitter内部使用分布式环境下的唯一ID生成算法,他使用64位long类型的数据存储

    id,具体如下:

    0 - 0000000000 0000000000 0000000000 0000000000 0 - 0000000000 - 000000000000
    符号位 时间戳 机器码 序列号
    最高位表示符号位,其中0代表整数,1代表负数,而id一般都是正数,所以最高位为0。
    
    • 1
    • 2
    • 3

    通过雪花算法实现 – (世界上没有一片雪花是一样的) 5+5+42+12=64位

    • 机房号(数据中心)5bit
    • 机器号 5bit
    • 时间戳(long)原本64位表示的时间,必须减少到42位,可以自由选择一个时间。如: 公司的成立日期
    • 序列化 12bit:同一个机房的同一个机器号的同一个时间可能因并发量需要很多个Id
    时间戳 (42) 机房号 (5) 机器号 (5) 序列号 (12)
    101010101010101010101010101010101010101011 10101 10101 101011010101
    
    • 1
    • 2

    在common块下的com.dcy包下创建IdGenerator

    /**
     * 请求id的生成器:雪花算法
     */
    public class IdGenerator {
    
        // 起始时间戳
        private static final long START_STAMP = DateUtil.get("2022-1-1").getTime();
    
        // 机房号
        public static final long DATA_CENTER_BIT = 5L;
    
        // 机器号
        public static final long MACHINE_BIT = 5L;
    
        // 序列化号
        public static final long SEQUENCE_BIT = 5L;
    
        // 机房号的最大值
        public static final long DATA_CENTER_MAX = ~(-1L << DATA_CENTER_BIT);
    
        // 机器号的最大值
        public static final long MACHINE_MAX = ~(-1L << MACHINE_BIT);
    
        // 序列号的最大值
        public static final long SEQUENCE_MAX = ~(-1L << SEQUENCE_BIT);
    
        // 时间戳需要左移的位数
        public static final long TIMESTAMP_LEFT = DATA_CENTER_BIT + MACHINE_BIT + SEQUENCE_BIT;
    
        // 机房号需要左移的位数
        public static final long DATA_CENTER_LEFT = MACHINE_BIT + SEQUENCE_BIT;
    
        // 机器号需要左移的位数
        public static final long MACHINE_LEFT = SEQUENCE_BIT;
    
        private long dataCenterId;
        private long machineId;
        private LongAdder sequenceId = new LongAdder();
    
        private long lastTimeStamp = -1;
    
        public IdGenerator(long dataCenterId, long machineId) {
            //参数是否合法
            if (dataCenterId > DATA_CENTER_MAX || machineId > MACHINE_MAX) {
                throw new IllegalArgumentException("传入的数据中心编号和机器编号不合法");
            }
            this.dataCenterId = dataCenterId;
            this.machineId = machineId;
        }
    
        public long getId() {
            // 1.处理时间戳的问题
            long currentTime = System.currentTimeMillis();
            long timeStamp = currentTime - START_STAMP;
    
            // 2.判断时钟回拨
            if (timeStamp < lastTimeStamp) {
                throw new RuntimeException("服务器进行了时钟回调");
            }
    
            // 3.对sequenceId做一些处理:如果是同一个时间节点,必须自增
            if (timeStamp == lastTimeStamp) {
                sequenceId.increment();
                if (sequenceId.sum() >= SEQUENCE_MAX) {
                    timeStamp = getNextTimeStamp();
                    sequenceId.reset();
                }
            } else {
                sequenceId.reset();
            }
            // 执行结束将时间戳赋值给lastTimeStamp
            lastTimeStamp = timeStamp;
            long sequence = sequenceId.sum();
            return timeStamp << TIMESTAMP_LEFT | dataCenterId << DATA_CENTER_LEFT | machineId << MACHINE_LEFT | sequence;
        }
    
        private long getNextTimeStamp() {
            // 获取当前的时间戳
            long current = System.currentTimeMillis() - START_STAMP;
            // 如果一样就一直循环,直到下一个时间戳
            while (current == lastTimeStamp) {
                current = System.currentTimeMillis() - START_STAMP;
            }
            return current;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86

    在common块下的com.dcy包下创建DateUtil类:用于时间日期相关的工具类

    /**
     * 时间日期相关的工具类
     */
    public class DateUtil {
        public static Date get(String pattern) {
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
            try {
                return sdf.parse(pattern);
            } catch (ParseException e) {
                throw new RuntimeException(e);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    DcyRpcBootstrap类中添加IdGenerator

    // 略...
    private int port = 8088;
    
    public static final IdGenerator ID_GENERATOR = new IdGenerator(1, 2);
    // 略...
    
    • 1
    • 2
    • 3
    • 4
    • 5

    RpcConsumerInvocationHandler类的封装报文位置,将requestId的值设置为通过id生成器获取

    // 略...
    DcyRpcRequest dcyRpcRequest = DcyRpcRequest.builder()
                    .requestId(DcyRpcBootstrap.ID_GENERATOR.getId())
                    .compressType((byte) 1)
                    .serializeType((byte) 1)
                    .requestType(RequestType.REQUEST.getId())
                    .requestPayload(requestPayload)
                    .build();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    c.抽象序列化

    在core模块下创建serialize

    在该包下创建Serializer接口:序列化器

    public interface Serializer {
        /**
         * 序列化
         * @param object 待序列化的对象实例
         * @return 字节数组
         */
        byte[] serializer(Object object);
    
        /**
         * 反序列化
         * @param bytes 待反序列化的字节数组
         * @param clazz 目标类的class对象
         * @return 目标实例
         * @param  目标类泛型
         */
        <T> T deserialize(byte[] bytes, Class<T> clazz);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    serialize包下创建impl包,创建JdkSerializer类,实现Serializer接口:jdk序列化器

    @Slf4j
    public class JdkSerializer implements Serializer{
        @Override
        public byte[] serializer(Object object) {
            // 心跳请求没有payload
            if (object == null) {
                return null;
            }
            // 对象序列化成字节数组
            try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
                 ObjectOutputStream outputStream = new ObjectOutputStream(baos)
            ) {
                outputStream.writeObject(object);
                return baos.toByteArray();
            } catch (IOException e) {
                log.error("序列化对象【{}】时出现异常", object);
                throw new SerializeException(e);
            }
        }
    
        @Override
        public <T> T deserialize(byte[] bytes, Class<T> clazz) {
            if (bytes == null || clazz == null) {
                return null;
            }
    
            // 字节数组转成对象序列化
            try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
                 ObjectInputStream objectInputStream = new ObjectInputStream(bais)
            ) {
                return (T) objectInputStream.readObject();
            } catch (IOException | ClassNotFoundException e) {
                log.error("反序列化对象【{}】时出现异常", clazz);
                throw new SerializeException(e);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    在common的exceptions中创建SerializeException类:序列化异常处理

    public class SerializeException extends RuntimeException{
        public SerializeException() {
            super();
        }
    
        public SerializeException(String message) {
            super(message);
        }
    
        public SerializeException(Throwable cause) {
            super(cause);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    d.建立序列化工厂

    consumer包下的Application启动类中添加序列化方法

    // 略...
    DcyRpcBootstrap.getInstance()
                    .application("first-dcyrpc-consumer")
                    .registry(new RegistryConfig("zookeeper://127.0.0.1:2181"))
                    .serialize("jdk")
                    .reference(reference);
    // 略...
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    DcyRpcBootstrap类中,添加序列化配置项和方法

    // 略...
    public static String SERIALIZE_TYPE = "jdk";
    // 略...
    /**
     * 配置序列化的方式
     * @param serializeType
     * @return
     */
    public DcyRpcBootstrap serialize(String serializeType) {
        SERIALIZE_TYPE = serializeType;
        return this;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    在core模块下的serialize包下创建SerializerWrapper类:序列化包装类

    @NoArgsConstructor
    @AllArgsConstructor
    @Data
    public class SerializerWrapper {
        private byte code;
        private String type;
        private Serializer serializer;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    在core模块下的serialize包下创建SerializerFactory类:序列化工厂类

    /**
     * 序列化工厂类
     */
    public class SerializerFactory {
    
        private final static Map<String, SerializerWrapper> SERIALIZER_CACHE = new ConcurrentHashMap<>(8);
        private final static Map<Byte, SerializerWrapper> SERIALIZER_CACHE_CODE = new ConcurrentHashMap<>(8);
    
        static {
            SerializerWrapper jdk = new SerializerWrapper((byte) 1, "jdk", new JdkSerializer());
            SerializerWrapper json = new SerializerWrapper((byte) 2, "json", new JsonSerializer());
    
            SERIALIZER_CACHE.put("jdk", jdk);
            SERIALIZER_CACHE.put("json", json);
    
            SERIALIZER_CACHE_CODE.put((byte) 1, jdk);
            SERIALIZER_CACHE_CODE.put((byte) 2, json);
        }
    
        /**
         * 使用工厂方法获取一个SerializerWrapper
         * @param serializeType 序列化的类型
         * @return
         */
        public static SerializerWrapper getSerializer(String serializeType) {
            return SERIALIZER_CACHE.get(serializeType);
        }
    
        public static SerializerWrapper getSerializer(byte serializeCode) {
            return SERIALIZER_CACHE_CODE.get(serializeCode);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    修改DcyRpcRequestEncoder类,在请求类,对请求添加有关序列化的代码

    //略...
    // 8个字节的请求id
    byteBuf.writeLong(dcyRpcRequest.getRequestId());
    
    // 写入请求体body(requestPayload)
    // 1.根据配置的序列化方式进行序列化
    Serializer serializer = SerializerFactory.getSerializer(dcyRpcRequest.getSerializeType()).getSerializer();
    byte[] body = serializer.serializer(dcyRpcRequest.getRequestPayload());
    
    // 2.根据配置的压缩方式进行压缩
    
    if (body != null) {
        byteBuf.writeBytes(body);
        byteBuf.writeInt(MessageFormatConstant.HEADER_LENGTH + body.length);
    }
    //略...
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    修改RpcConsumerInvocationHandler类:修改填写序列化器的代码

    // 略...
    DcyRpcRequest dcyRpcRequest = DcyRpcRequest.builder()
            .requestId(DcyRpcBootstrap.ID_GENERATOR.getId())
            .compressType((byte) 1)
            .serializeType(SerializerFactory.getSerializer(DcyRpcBootstrap.SERIALIZE_TYPE).getCode())
            .requestType(RequestType.REQUEST.getId())
            .requestPayload(requestPayload)
            .build();
    // 略...
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    修改DcyRpcRequestDecoder类,在响应类,对请求添加反序列化的代码

    // 略...
    // 9.解析消息体payload
    int payloadLength = fullLength - headLength;
    byte[] payload = new byte[payloadLength];
    byteBuf.readBytes(payload);
    
    // 解压缩和反序列化
    // todo 解压缩
    
    // 反序列化
    Serializer serializer = SerializerFactory.getSerializer(serializeType).getSerializer();
    RequestPayload requestPayload = serializer.deserialize(payload, RequestPayload.class);
    
    dcyRpcRequest.setRequestPayload(requestPayload);
    
    return dcyRpcRequest;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    修改DcyRpcResponseEncoder类:在响应类,对响应添加序列化的代码

    // 略...
    // 8个字节的请求id
    byteBuf.writeLong(dcyRpcResponse.getRequestId());
    
    // 写入请求体body(requestPayload)
    // 对响应做序列化器
    Serializer serializer = SerializerFactory.getSerializer(dcyRpcResponse.getSerializeType()).getSerializer();
    byte[] body = serializer.serializer(dcyRpcResponse.getBody());
    
    if (body != null) {
        byteBuf.writeBytes(body);
        byteBuf.writeInt(MessageFormatConstant.HEADER_LENGTH + body.length);
    }
    // 略...
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    修改DcyRpcResponseDecoder类,在请求类,对响应添加反序列化的代码

    // 略...
    // 9.解析消息体payload
    int payloadLength = fullLength - headLength;
    byte[] payload = new byte[payloadLength];
    byteBuf.readBytes(payload);
    
    // 解压缩和反序列化
    // todo 解压缩
    
    Serializer serializer = SerializerFactory.getSerializer(dcyRpcResponse.getSerializeType()).getSerializer();
    Object body = serializer.deserialize(payload, Object.class);
    
    dcyRpcResponse.setBody(body);
    
    return dcyRpcResponse;
    // 略...
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    e.hessian的序列化方式(拓展)

    Hessian序列化是一种支持动态类型、跨语言、基于对象传输的网络协议,Java对象序列化的二进制流可以被其他语言(如,c++,python)。特性如下:

    • 1.自描述序列化类型。不依赖外部描述文件或者接口定义,用一个字节表示常用的基础类型,极大缩短二进制流。
    • 2.语言无关,支持脚本语言
    • 3.协议简单,比Java原生序列化高效
    • 4.相比hessian1,hessian2中增加了压缩编码,其序列化二进制流大小是Java序列化的50%,序列化耗时是Java序列化的30%,反序列化耗时是Java序列化的20%

    序列化操作:

    • 1.new一个Hessian2Output,传入一个OutputStream
    • 2.writeObject(),传入具体要序列化的对象
    • 3.flush()

    反序列化操作:

    • 1.new一个Hessian2Output,传入一个InputStream
    • 2.readObject()
    /**
     * hessian序列化器
     */
    @Slf4j
    public class HessianSerializer implements Serializer {
        @Override
        public byte[] serializer(Object object) {
            // 心跳请求没有payload
            if (object == null) {
                return null;
            }
    
            // 对象序列化成字节数组
            try (
                ByteArrayOutputStream baos = new ByteArrayOutputStream();
            ) {
    
                Hessian2Output hessian2Output = new Hessian2Output(baos);
                hessian2Output.writeObject(object);
                hessian2Output.flush();
    
                log.info("对象使用hessian【{}】完成了序列化", object);
                return baos.toByteArray();
            } catch (IOException e) {
                log.error("使用hessian序列化对象【{}】时出现异常", object);
                throw new SerializeException(e);
            }
        }
    
        @Override
        public <T> T deserialize(byte[] bytes, Class<T> clazz) {
            if (bytes == null || clazz == null) {
                return null;
            }
    
            // 字节数组转成对象序列化
            try (
                ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
            ) {
                HessianInput hessianInput = new HessianInput(bais);
                T t = (T) hessianInput.readObject();
                log.info("对象使用hessian【{}】完成了反序列化", clazz);
                return t;
            } catch (IOException e) {
                log.error("使用hessian反序列化对象【{}】时出现异常", clazz);
                throw new SerializeException(e);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
  • 相关阅读:
    Java【手撕滑动窗口】LeetCode 438. “字符串中所有异位词“, 图文详解思路分析 + 代码
    【文章学习系列之模型】Koopa
    JAVA设计模式8:装饰模式,动态地将责任附加到对象上,扩展对象的功能
    ruoyi登录功能源码分析
    浅谈web前端工程师hr面试经典问题20+
    Redis客户端访问
    计算机毕业设计(附源码)python校园疫情防控系统
    常用的卷积神经网络模型,卷积神经网络改进算法
    vue3的element-plus的el-dialog的样式修改无效问题
    简单版小米侧边栏案列
  • 原文地址:https://blog.csdn.net/weixin_46926189/article/details/132746785