• 手写RPC框架--8.压缩报文


    RPC框架-Gitee代码(麻烦点个Starred, 支持一下吧)
    RPC框架-GitHub代码(麻烦点个Starred, 支持一下吧)

    对报文进行压缩

    a.报文压缩

    在core模块下创建compress

    在该包下创建Compressor接口:压缩与解压缩的接口

    /**
     * 压缩与解压缩的接口
     */
    public interface Compressor {
        /**
         * 压缩
         * @param bytes
         * @return
         */
        byte[] compress(byte[] bytes);
    
        /**
         * 解压缩
         * @param bytes
         * @return
         */
        byte[] decompress(byte[] bytes);
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    在该包下创建CompressWrapper类:压缩与解压缩的包装类

    /**
     * 压缩与解压缩的包装类
     */
    @NoArgsConstructor
    @AllArgsConstructor
    @Data
    public class CompressWrapper {
        private byte code;
        private String type;
        private Compressor compressor;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    在该包下创建CompressorFactory类:压缩工厂类

    /**
     * 压缩工厂类
     */
    public class CompressorFactory {
    
        private final static Map<String, CompressWrapper> COMPRESSOR_CACHE = new ConcurrentHashMap<>(8);
        private final static Map<Byte, CompressWrapper> COMPRESSOR_CODE = new ConcurrentHashMap<>(8);
    
        static {
            CompressWrapper gzip = new CompressWrapper((byte) 1, "gzip", new GzipCompressor());
    
            COMPRESSOR_CACHE.put("gzip", gzip);
    
            COMPRESSOR_CODE.put((byte) 1, gzip);
        }
    
        /**
         * 使用工厂方法获取一个CompressWrapper
         * @param compressorType 压缩的类型
         * @return
         */
        public static CompressWrapper getCompressor(String compressorType) {
            return COMPRESSOR_CACHE.get(compressorType);
        }
    
        public static CompressWrapper getCompressor(byte compressorCode) {
            return COMPRESSOR_CODE.get(compressorCode);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    在common的exceptions中创建CompressException类:压缩异常处理

    public class compressException extends RuntimeException{
        public compressException() {
            super();
        }
    
        public compressException(Throwable cause) {
            super(cause);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    在该包下创建impl包,并创建GzipCompressor类:gzip压缩的实现。实现Compressor接口

    /**
     * gzip压缩的实现
     */
    @Slf4j
    public class GzipCompressor implements Compressor {
        @Override
        public byte[] compress(byte[] bytes) {
    
            try (
                    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                    GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream);
            ) {
                gzipOutputStream.write(bytes);
                gzipOutputStream.finish();
                byte[] result = byteArrayOutputStream.toByteArray();
    
                log.info("对字节数组进行压缩, 长度【{}】压缩至【{}】", bytes.length, result.length);
                return result;
            } catch (IOException e) {
                log.error("对字节数组进行压缩时发生异常", e);
                throw new CompressException(e);
            }
        }
    
        @Override
        public byte[] decompress(byte[] bytes) {
    
            try (
                    ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
                    GZIPInputStream gzipInputStream = new GZIPInputStream(bais);
            ) {
                byte[] result = gzipInputStream.readAllBytes();
                log.info("对字节数组进行解压, 长度【{}】解压至【{}】", bytes.length, result.length);
                return result;
            } catch (IOException e) {
                log.error("对字节数组进行解压时发生异常", e);
                throw new CompressException(e);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    DcyRpcBootstrap类中添加COMPRESS_TYPE的默认常量,也可以通过方法进行修改

    // 略...
    
    public static String COMPRESS_TYPE = "gizp";
    
    // 略...
    
    /**
     * 配置压缩的方式
     * @param compressType
     * @return
     */
    public DcyRpcBootstrap compress(String compressType) {
        COMPRESS_TYPE = compressType;
        return this;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    修改RpcConsumerInvocationHandler类:修改填写压缩的代码

    // 略...
    DcyRpcRequest dcyRpcRequest = DcyRpcRequest.builder()
            .requestId(DcyRpcBootstrap.ID_GENERATOR.getId())
            .compressType(CompressorFactory.getCompressor(DcyRpcBootstrap.COMPRESS_TYPE).getCode())
            .serializeType(SerializerFactory.getSerializer(DcyRpcBootstrap.SERIALIZE_TYPE).getCode())
            .requestType(RequestType.REQUEST.getId())
            .requestPayload(requestPayload)
            .build();
    // 略...
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    修改DcyRpcRequestEncoder类,在请求类,对请求添加有关压缩的代码

    // 略...
    // 1.根据配置的序列化方式进行序列化
    Serializer serializer = SerializerFactory.getSerializer(dcyRpcRequest.getSerializeType()).getSerializer();
    byte[] body = serializer.serializer(dcyRpcRequest.getRequestPayload());
    
    // 2.根据配置的压缩方式进行压缩
    Compressor compressor = CompressorFactory.getCompressor(dcyRpcRequest.getCompressType()).getCompressor();
    body = compressor.compress(body);
    
    // 略...
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    修改DcyRpcRequestDecoder类,在响应类,对请求添加有关解压缩的代码

    // 略...
    // 解压缩和反序列化
    // 解压缩
    Compressor compressor = CompressorFactory.getCompressor(compressType).getCompressor();
    payload = compressor.decompress(payload);
    // 略...
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    修改DcyRpcResponseEncoder类,在响应类,对响应添加有关压缩的代码

    // 略...
    // 写入请求体body(requestPayload)
    // 对响应做序列化器
    Serializer serializer = SerializerFactory.getSerializer(dcyRpcResponse.getSerializeType()).getSerializer();
    byte[] body = serializer.serializer(dcyRpcResponse.getBody());
    
    // 对响应做压缩
    Compressor compressor = CompressorFactory.getCompressor(dcyRpcResponse.getCompressType()).getCompressor();
    body = compressor.compress(body);
    // 略...
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    修改DcyRpcResponseDecoder类,在请求类,对响应添加有关解压缩的代码

    // 略...
    // 解压缩和反序列化
    // 解压缩
    Compressor compressor = CompressorFactory.getCompressor(compressType).getCompressor();
    payload = compressor.decompress(payload);
    // 略...
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    b.负载均衡

    当一个服务节点无法支撑现有的访问量时,会部署多个节点,组成一个集群然后通过负载均衡,将请求分发给这个集群下的每个服务节点,从而达到多个服务节点共同分担请求压力的目的。

    负载均衡可以根据不同的标准进行分类。常见的负载均衡分类:

    • 1.根据工作层级
      • 数据链路层负载均衡:通过分析数据链路层的信息(如MAC地址)进行负载均衡。
      • 传输层负载均衡:通过分析传输层信息(如TCP/UDP端口号)进行负载均衡。例如:L4负载均衡
      • 应用层负载均衡:通过分析应用层信息(如HTTP请求)进行负载均衡。例如:L7负载均衡
    • 2.根据分配策略
      • 轮询:按顺序将请求分配给服务器,当分配到最后一个服务器后,进行重新分配
      • 加权轮询:类似于轮询,但服务器被分配一个权重,根据权重来分配请求
      • 随机:随机选择一个服务器来处理请求
      • 加权随机:类似于随机,但服务器被分配一个权重,根据权重来随机分配请求
      • 最少连接:将请求分配给当前连接数最少的服务器
      • 最短响应时间:将请求分配给响应时间最短的服务器
      • 哈希:根据某个哈希值(如请求的源IP地址)来分配请求,相同哈希值的请求会被分配到同一个服务器
      • 自适应:根据服务器的运行时指标(如CPU利用率、内存使用量等)动态调整分配策略
    • 3.根据部署位置
      • 客户端负载均衡:在客户端实现负载均衡,客户端直接选择合适的服务器进行请求
      • 服务器端负载均衡:在服务器端实现负载均衡,通常有一个负载均衡器设备或软件来分配请求给后端服务器
      • 全局负载均衡:在全局范围内实现负载均衡,主要用于夸数据中心或地理位置分布的场景,例如:使用DNS解析不同地域的请求到不同的服务器

    在core模块下,修改com.dcyrpc.discovery.impl包的ZookeeperRegistry类:修改成,拉取合适的服务列表

    • 修改lookup()方法

    • 根据服务名称,返回一个服务列表

    // 略...
    /**
     * 服务发现
     *  - 拉取合适的服务列表
     * @param serviceName 服务名称
     * @return 服务列表
     */
    @Override
    public List<InetSocketAddress> lookup(String serviceName) {
        // 1.找到对应服务的节点
        String serviceNode = Constant.BASE_PROVIDERS_PATH + "/" + serviceName;
    
        // 2.从zk中获取它的子节点,
        List<String> children = ZookeeperUtils.getChildren(zooKeeper, serviceNode, null);
        // 获取所有的可用的服务列表
        List<InetSocketAddress> inetSocketAddressList = children.stream().map(ipString -> {
            String[] ipAndPort = ipString.split(":");
            String ip = ipAndPort[0];
            int port = Integer.valueOf(ipAndPort[1]);
            return new InetSocketAddress(ip, port);
        }).toList();
    
        if (inetSocketAddressList.size() == 0){
            throw new DiscoveryException("未发现任何可用的服务主机");
        }
    
        return inetSocketAddressList;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    在core模块下的com.dcyrpc创建loadbalancer包,在包创建LoadBalancer类:负载均衡器的接口

    /**
     * 负载均衡接口
     * - 根据服务列表找到一个可用的服务
     */
    public interface LoadBalancer {
        /**
         * 根据服务名找到一个可用的服务
         * @param serviceName 服务名称
         * @return 服务地址
         */
        InetSocketAddress selectServiceAddress(String serviceName);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    loadbalancer包下,创建Selector类:轮询的负载均衡器的算法接口

    public interface Selector {
    
        /**
         * 根据服务列表执行一种算法,获取一个服务节点
         * @return 服务节点
         */
        InetSocketAddress getNext();
    
        // todo 服务动态上下线需要进行reBalance
        void reBalance();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    在common的exceptions中创建LoadBalancerException类:负载均衡异常处理

    /**
     * 负载均衡异常处理
     */
    public class LoadBalancerException extends RuntimeException{
        public LoadBalancerException() {
            super();
        }
    
        public LoadBalancerException(String message) {
            super(message);
        }
    
        public LoadBalancerException(Throwable cause) {
            super(cause);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    DcyRpcBootstrap类中添加getRegistry()方法

    // 略... 
    public static LoadBalancer LOAD_BALANCER;
    
    // 略... 
    public DcyRpcBootstrap registry(RegistryConfig registryConfig) {
        // 维护一个zookeeper实例,但是,如果这样写就会将zookeeper和当前的工程耦合
        // 使用 registryConfig 获取一个注册中心
        this.registry = registryConfig.getRegistry();
        DcyRpcBootstrap.LOAD_BALANCER = new RoundRobinLoadBalancer();
        return this;
    }
    
    // 略... 
    public Registry getRegistry() {
        return registry;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    loadbalancer包下,创建RoundRobinLoadBalancer类:轮询的负载均衡器策略

    • 实现LoadBalancer接口
    • 创建RoundRobinSelector内部类,实现Selector接口
    /**
     * 轮询的负载均衡策略
     */
    @Slf4j
    public class RoundRobinLoadBalancer implements LoadBalancer {
    
        private Registry registry;
    
        // 一个服务会匹配一个selector
        private Map<String, Selector> cache = new ConcurrentHashMap<>(8);
    
        public RoundRobinLoadBalancer() {
            this.registry = DcyRpcBootstrap.getInstance().getRegistry();
        }
    
        @Override
        public InetSocketAddress selectServiceAddress(String serviceName) {
    
            // 优先从缓存中获取一个选择器
            Selector selector = cache.get(serviceName);
    
            // 如果为空,则需要为这个service创建一个selector
            if (selector == null) {
                // 这个负载均衡器,内部要维护服务列表,作为缓存
                List<InetSocketAddress> serviceList = this.registry.lookup(serviceName);
    
                // 提供一些算法
                selector = new RoundRobinSelector(serviceList);
    
                // 将selector放入缓存当中
                cache.put(serviceName, selector);
            }
    
            // 获取可用节点
            return selector.getNext();
        }
    
        private static class RoundRobinSelector implements Selector {
    
            private List<InetSocketAddress> serviceList;
            private AtomicInteger index;
    
            public RoundRobinSelector(List<InetSocketAddress> serviceList) {
                this.serviceList = serviceList;
                this.index = new AtomicInteger(0);
            }
    
            @Override
            public InetSocketAddress getNext() {
                if (serviceList == null || serviceList.size() == 0) {
                    log.error("进行负载均衡选取节点时发生服务列表为空");
                    throw new LoadBalancerException();
                }
    
                InetSocketAddress address = serviceList.get(index.get());
    
                // index如果到了最后一个位置,重置
                if (index.get() == serviceList.size() - 1) {
                    index.set(0);
                } else {
                    // 游标后移一位
                	index.incrementAndGet();
                }
                
                return address;
            }
    
            @Override
            public void reBalance() {
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72

    修改RpcConsumerInvocationHandler类:修改invoke()方法

    • 添加负载均衡器的方法
    // 略...
    // 1.发现服务,从注册中心,寻找一个可用的服务
    //  - 传入服务的名字,返回ip+端口 (InetSocketAddress可以封装端口/ip/host name)
    
    // 获取当前配置的负载均衡器,选取一个可用的节点
    InetSocketAddress address = DcyRpcBootstrap.LOAD_BALANCER.selectServiceAddress(interfaceRef.getName());
    if (log.isInfoEnabled()){
        log.info("服务调用方,发现了服务{}的可用主机{}", interfaceRef.getName(), address);
    }
    
    // 略...
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    在core下com.dcyrpc.discovery.impl包下,修改ZookeeperRegistry类:将固定的端口号改成通过启动类获取DcyRpcBootstrap.PORT

    // 创建本机的临时节点,ip:port
    // 服务提供方的端口(一般自己设定),还需要获取ip的方法
    // /dcyrpc-metadata/providers/com.dcyrpc.DcyRpc/192.168.195.1:8088
    // TODO:后续处理端口问题
    String node = parentNode + "/" + NetUtils.getIp() + ":" + DcyRpcBootstrap.PORT;
    
    • 1
    • 2
    • 3
    • 4
    • 5

    DcyRpcBootstrap类中

    • 添加静态端口常量
    • 修改start()方法:重写新的端口
    // 略...
    public static int PORT = 8088;
    // 略...
    
    
    public void start() {
        // 略...
        // 4.绑定端口
        ChannelFuture channelFuture = serverBootstrap.bind(PORT).sync();
        // 略...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    c.使用模板方法优化负载均衡

    在core模块下的loadbalancer包下,创建AbstractLoadBalancer抽象类

    public abstract class AbstractLoadBalancer implements LoadBalancer{
    
        // 一个服务会匹配一个selector
        private Map<String, Selector> cache = new ConcurrentHashMap<>(8);
    
        @Override
        public InetSocketAddress selectServiceAddress(String serviceName) {
    
            // 优先从缓存中获取一个选择器
            Selector selector = cache.get(serviceName);
    
            // 如果为空,则需要为这个service创建一个selector
            if (selector == null) {
                // 这个负载均衡器,内部要维护服务列表,作为缓存
                List<InetSocketAddress> serviceList = DcyRpcBootstrap.getInstance().getRegistry().lookup(serviceName);
    
                // 提供一些算法
                selector = getSelector(serviceList);
    
                // 将selector放入缓存当中
                cache.put(serviceName, selector);
            }
    
            // 获取可用节点
            return selector.getNext();
        }
    
        /**
         * 由子类进行拓展
         * @param serviceList 服务列表
         * @return 负载均衡算法选择器
         */
        protected abstract Selector getSelector(List<InetSocketAddress> serviceList);
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    在core模块下的loadbalancer包下,修改RoundRobinLoadBalancer

    • 继承AbstractLoadBalancer抽象类
    /**
     * 轮询的负载均衡策略
     */
    @Slf4j
    public class RoundRobinLoadBalancer extends AbstractLoadBalancer {
    
        @Override
        protected Selector getSelector(List<InetSocketAddress> serviceList) {
            return new RoundRobinSelector(serviceList);
        }
        
    	/**
         * 轮询的负载均衡具体实现算法
         */
        private static class RoundRobinSelector implements Selector {
    
            private List<InetSocketAddress> serviceList;
            private AtomicInteger index;
    
            public RoundRobinSelector(List<InetSocketAddress> serviceList) {
                this.serviceList = serviceList;
                this.index = new AtomicInteger(0);
            }
    
            @Override
            public InetSocketAddress getNext() {
                if (serviceList == null || serviceList.size() == 0) {
                    log.error("进行负载均衡选取节点时发生服务列表为空");
                    throw new LoadBalancerException();
                }
    
                InetSocketAddress address = serviceList.get(index.get());
    
                // index如果到了最后一个位置,重置
                if (index.get() == serviceList.size() - 1) {
                    index.set(0);
                } else {
                    // 游标后移一位
                    index.incrementAndGet();
                }
    
    
                return address;
            }
    
            @Override
            public void reBalance() {
    
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51

    d.一致性hash-负载均衡算法

    d.1) 介绍

    按照传统的hash算法思路,我们需要构建一张hash表,将服务器挂载在hash表中:根据请求的要素,与服务器的数量取余

    • 有缓存
    • 长连接

    在这里插入图片描述

    但是,这样的方式会存在很多问题,如动态扩容的问题。比如,随着业务量增长,将原有的六个服务扩容至八个,此时,我们不仅要修改路由表,还要修改hash的路由策略。

    一致性hash借鉴了hash算法的部分能力做了如下的设计:

    • 1.将hash值均匀的分布在一个区间,我们一般将区间设置为整形的取值范围(-2的31次方 ~ 2的31次方 -1)当然这个范围也可以是(0 ~ 2的32次方 -1),只要是一个合理的容易计算的足够大的范围即可。
    • 2.将这个区间构建成一个环,构建成环不一定必须要链表,其实很多的有序的数据结构都可以,比如数组,比如红黑树,只要加上一点点逻辑,就是数完最后一个回到第一个节点就可以了。
    • 3.将服务器按照自身的特点,计算hash值,并将其挂载在hash表中。

    在这里插入图片描述

    当请求进来以后,根据请求的部分特征,如url、请求id,请求来源等信息进行hash运算,看请求落在哪个范围,然后顺时针找到第一个服务器即可,这样最大的好处就是当有新的服务加入集群只需要将服务挂载在hash环即可,但是后自然会有流量进入该服务器,而不需要修改任何的逻辑,因为我们的hash环足够大,所以可以容纳的机器也很多。

    **问题:**但是此时会出现一个问题,如果节点过少,hash分布不均匀会产生严重的流量倾斜:

    在这里插入图片描述

    为了解决这个问题,我们就需要引入虚拟节点的概念,我们可以将一个真实节点化身为n个(比如128)虚拟节点,每个虚拟节点都指向同一个服务,分别对虚拟节点进行hash,可以让一个服务的虚拟节点大致均匀的分布在hash环上,

    在这里插入图片描述

    d.2) 实现

    在core模块下的loadbalancer.impl包中,创建ConsistentHashBalancer类,继承AbstractLoadBalancer

    /**
     * 一致性哈希的负载均衡策略
     */
    @Slf4j
    public class ConsistentHashBalancer extends AbstractLoadBalancer {
    
        @Override
        protected Selector getSelector(List<InetSocketAddress> serviceList) {
            return new ConsistentHashSelector(serviceList, 128);
        }
    
    
        /**
         * 一致性哈希的具体实现算法
         */
        private static class ConsistentHashSelector implements Selector {
    
            // hash环用来存储服务器节点
            private SortedMap<Integer, InetSocketAddress> circle = new TreeMap<>();
            // 虚拟节点的个数
            private int virtualNodes;
    
            public ConsistentHashSelector(List<InetSocketAddress> serviceList, int virtualNodes) {
                // 将节点转换为虚拟节点,进行过载
                this.virtualNodes = virtualNodes;
                for (InetSocketAddress inetSocketAddress : serviceList) {
                    // 把每一个节点加入到哈希环中
                    addNodeToCircle(inetSocketAddress);
                }
            }
    
    
            @Override
            public InetSocketAddress getNext() {
                // 获取请求的要素
                DcyRpcRequest dcyRpcRequest = DcyRpcBootstrap.REQUEST_THREAD_LOCAL.get();
                // 对请求的要素做处理,
    
                // 根据请求的一些特征来选择服务器 id
                String requestId = Long.toString(dcyRpcRequest.getRequestId());
    
                // 对请求的id做hash,字符串默认的hash不太好
                int hash = hash(requestId);
    
                // 判断该hash值是否能直接落在一个服务器上(是否和服务器的hash一样)
                if (!circle.containsKey(hash)) {
                    // 寻找最近的节点
                    SortedMap<Integer, InetSocketAddress> tailMap = circle.tailMap(hash);
                    hash = tailMap.isEmpty() ? circle.firstKey() : tailMap.firstKey();
                }
    
                return circle.get(hash);
            }
    
            @Override
            public void reBalance() {
    
            }
    
            /**
             * 将每一个节点挂载到哈希环
             * @param inetSocketAddress 节点的地址
             */
            private void addNodeToCircle(InetSocketAddress inetSocketAddress) {
                // 为每一个节点生成匹配的虚拟节点进行挂载
                for (int i = 0; i < virtualNodes; i++) {
                    int hash = hash(inetSocketAddress.toString() + "-" + i);
                    // 挂载到hash环上
                    circle.put(hash, inetSocketAddress);
                    log.info("hash为【{}】的节点已经挂载到了哈希环上", hash);
                }
            }
    
            private void removeNodeFromCircle(InetSocketAddress inetSocketAddress) {
                // 为每一个节点生成匹配的虚拟节点进行挂载
                for (int i = 0; i < virtualNodes; i++) {
                    int hash = hash(inetSocketAddress.toString() + "-" + i);
                    // 挂载到hash环上
                    circle.remove(hash);
                }
            }
    
            /**
             * 具体的哈希算法
             * @param s
             * @return
             */
            private int hash(String s) {
                MessageDigest md;
                try {
                    md = MessageDigest.getInstance("md5");
                } catch (NoSuchAlgorithmException e) {
                    throw new RuntimeException(e);
                }
    
                byte[] digest = md.digest(s.getBytes());
                int res = 0;
                // md5得到的结果是一个字节数组,但是我们要int 4个字节
                for (int i = 0; i < digest.length; i++) {
                    res = res << 8;
                    if (digest[i] < 0){
                        res = res | (digest[i] & 255);
                    } else {
                        res = res | digest[i];
                    }
                    System.out.println(Integer.toBinaryString(digest[i]));
                }
    
                return res;
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112

    DcyRpcBootstrap类中

    • 添加 ThreadLocal 本地线程
    • LOAD_BALANCER 改为 一致性哈希
    // 略...
    public static final ThreadLocal<DcyRpcRequest> REQUEST_THREAD_LOCAL = new ThreadLocal<>();
    // 略...
    public DcyRpcBootstrap registry(RegistryConfig registryConfig) {
        // 维护一个zookeeper实例,但是,如果这样写就会将zookeeper和当前的工程耦合
        // 使用 registryConfig 获取一个注册中心
        this.registry = registryConfig.getRegistry();
        DcyRpcBootstrap.LOAD_BALANCER = new ConsistentHashBalancer();
        return this;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    RpcConsumerInvocationHandler类中:

    • 将请求存入本地线程,需要在合适的时候调用remove方法
    • 调整 invoke()方法的逻辑顺序
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    
        /**
         * ---------------------------封装报文---------------------------
         */
        // 1.封装报文
        RequestPayload requestPayload = RequestPayload.builder()
                .interfaceName(interfaceRef.getName())
                .methodName(method.getName())
                .parametersType(method.getParameterTypes())
                .parametersValue(args)
                .returnType(method.getReturnType())
                .build();
    
        // TODO 需要对各个请求的操作做处理
    
        DcyRpcRequest dcyRpcRequest = DcyRpcRequest.builder()
                .requestId(DcyRpcBootstrap.ID_GENERATOR.getId())
                .compressType(CompressorFactory.getCompressor(DcyRpcBootstrap.COMPRESS_TYPE).getCode())
                .serializeType(SerializerFactory.getSerializer(DcyRpcBootstrap.SERIALIZE_TYPE).getCode())
                .requestType(RequestType.REQUEST.getId())
                .requestPayload(requestPayload)
                .build();
    
        // 请求存入本地线程
        DcyRpcBootstrap.REQUEST_THREAD_LOCAL.set(dcyRpcRequest);
    
        // 2.发现服务,从注册中心拉取服务列表,并通过客户端负载均衡寻找一个可用的服务
        //  - 传入服务的名字,返回ip+端口 (InetSocketAddress可以封装端口/ip/host name)
    
        // 获取当前配置的负载均衡器,选取一个可用的节点
        InetSocketAddress address = DcyRpcBootstrap.LOAD_BALANCER.selectServiceAddress(interfaceRef.getName());
        if (log.isInfoEnabled()){
            log.info("服务调用方,发现了服务{}的可用主机{}", interfaceRef.getName(), address);
        }
    
        // 2.尝试获取一个可用的通道
        Channel channel = getAvailableChannel(address);
        if (log.isInfoEnabled()){
            log.info("获取了和【{}】建立的连接通道,准备发送数据", address);
        }
    
        /**
         * ---------------------------异步策略---------------------------
         */
    
        // 4.写出报文
        CompletableFuture<Object> completableFuture = new CompletableFuture<>();
        // 将completableFuture暴露出去
        DcyRpcBootstrap.PENDING_REQUEST.put(1L, completableFuture);
    
        // 直接使用writeAndFlush 写出一个请求,这个请求的实例就会进入pipeline执行出栈的一系列操作
        channel.writeAndFlush(dcyRpcRequest).addListener((ChannelFutureListener) promise -> {
            // 需要处理异常
            if (!promise.isSuccess()) {
                completableFuture.completeExceptionally(promise.cause());
            }
        });
    
        // 清理threadLocal
        DcyRpcBootstrap.REQUEST_THREAD_LOCAL.remove();
    
        // 如果没有地方处理这个completableFuture,这里会阻塞等待 complete 方法的执行
        // 在Netty的pipeline中最终的handler的处理结果 调用complete
        // 5.获得响应的结果
        return completableFuture.get(10, TimeUnit.SECONDS);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68

    e.实现心跳检测

    定期向所有的channel发送一个简单的请求即可,如果能得到回应说明连接是正常的。

    其中我们要在心跳探测的过程中完成以下几项工作:

    1、如果可以正常访问,记录响应时间,以备后用。

    2、如果不能正常访问,则进行重试,重试三次依旧不能访问,则从健康服务列表中剔除,以后的访问不会使用该连接。

    注意:重试的等待时间我们选取一个合适范围内的随机时间,这样可以避免局域网络问题导致的大面积同时重试,产生重试风暴

    在core模块下的transport.message包的DcyRpcRequest请求类上:添加时间戳变量

    // 时间戳
    private long timeStamp;
    
    • 1
    • 2

    在core模块下的transport.message包的DcyRpcResponse响应类上:添加时间戳变量

    // 时间戳
    private long timeStamp;
    
    • 1
    • 2

    修改RpcConsumerInvocationHandler类:

    • 请求体增加时间戳
    • 修改请求类型的id
    // 略...
    DcyRpcRequest dcyRpcRequest = DcyRpcRequest.builder()
            .requestId(DcyRpcBootstrap.ID_GENERATOR.getId())
            .compressType(CompressorFactory.getCompressor(DcyRpcBootstrap.COMPRESS_TYPE).getCode())
            .serializeType(SerializerFactory.getSerializer(DcyRpcBootstrap.SERIALIZE_TYPE).getCode())
            .requestType(RequestType.REQUEST.getId())
            .timeStamp(new Date().getTime())
            .requestPayload(requestPayload)
            .build();
    // 略...
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    修改MySimpleChannelInboundHandler类:

    • 修改请求类型的id
    // 从全局的挂起的请求中,寻找与之匹配的待处理 completeFuture
    CompletableFuture<Object> completableFuture = DcyRpcBootstrap.PENDING_REQUEST.get(dcyRpcResponse.getRequestId());
    
    • 1
    • 2

    修改DcyRpcRequestEncoder请求类编码器:

    • 添加时间戳
    • 添加 RequestPayload是否为空 的判断逻辑
    // 略...
    // 8个字节的请求id
    byteBuf.writeLong(dcyRpcRequest.getRequestId());
    // 时间戳
    byteBuf.writeLong(dcyRpcRequest.getTimeStamp());
    
    // 写入请求体body(requestPayload)
    // 1.根据配置的序列化方式进行序列化
    byte[] body = null;
    if (dcyRpcRequest.getRequestPayload() != null) {
        Serializer serializer = SerializerFactory.getSerializer(dcyRpcRequest.getSerializeType()).getSerializer();
        body = serializer.serializer(dcyRpcRequest.getRequestPayload());
    
    	// 2.根据配置的压缩方式进行压缩
        Compressor compressor = CompressorFactory.getCompressor(dcyRpcRequest.getCompressType()).getCompressor();
        body = compressor.compress(body);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    修改DcyRpcRequestDecoder响应类解码器:

    • 添加时间戳
    • 添加 payload是否为空 的判断逻辑
    // 略...
    // 9.时间戳
    long timeStamp = byteBuf.readLong();
    // 略...
    dcyRpcRequest.setTimeStamp(timeStamp);
    // 略...
    // 解压缩
    if (payload != null && payload.length != 0) {
        Compressor compressor = CompressorFactory.getCompressor(compressType).getCompressor();
        payload = compressor.decompress(payload);
    
        // 反序列化
        Serializer serializer = SerializerFactory.getSerializer(serializeType).getSerializer();
        RequestPayload requestPayload = serializer.deserialize(payload, RequestPayload.class);
        dcyRpcRequest.setRequestPayload(requestPayload);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    修改DcyRpcResponseEncoder响应类编码器:

    • 添加时间戳
    • 添加 body是否为空 的判断逻辑
    // 略...
    // 时间戳
    byteBuf.writeLong(dcyRpcResponse.getTimeStamp());
    
    // 写入请求体body(requestPayload)
    // 对响应做序列化器
    byte[] body = null;
    if (dcyRpcResponse.getBody() != null) {
        Serializer serializer = SerializerFactory.getSerializer(dcyRpcResponse.getSerializeType()).getSerializer();
        body = serializer.serializer(dcyRpcResponse.getBody());
    
        // 对响应做压缩
        Compressor compressor = CompressorFactory.getCompressor(dcyRpcResponse.getCompressType()).getCompressor();
        body = compressor.compress(body);
    }
    // 略...
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    修改DcyRpcResponseDecoder响应类解码器:

    • 添加时间戳
    • 添加 payload是否为空 的判断逻辑
    // 略...
    // 9.解析时间戳
    long timeStamp = byteBuf.readLong();
    // 略...
    dcyRpcResponse.setTimeStamp(timeStamp);
    // 略...
    if (payload != null && payload.length > 0) {
    // 解压缩和反序列化
    // 解压缩
    Compressor compressor = CompressorFactory.getCompressor(compressType).getCompressor();
    payload = compressor.decompress(payload);
    
    Serializer serializer = SerializerFactory.getSerializer(dcyRpcResponse.getSerializeType()).getSerializer();
    Object body = serializer.deserialize(payload, Object.class);
    
    dcyRpcResponse.setBody(body);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    在core模块下的transport.message包的MessageFormatConstant常量上:

    • 修改HEADER_LENGTH头部信息的长度
    // 头部信息的长度
    public final static short HEADER_LENGTH = (byte) (MAGIC.length + 1 + 2 + 4 + 1 + 1 + 1 + 8 + 8);
    
    • 1
    • 2

    修改MethodCallHandler类的channelRead0()方法

    • 添加 请求类型是否为心跳请求 的判断逻辑
    // 2.根据负载内容进行方法调用
    Object result = null;
    if (dcyRpcRequest.getRequestType() != RequestType.HEART_BEAT.getId()) {
        result = callTargetMethod(requestPayload);
        log.info("请求【{}】已经在Provider端完成方法调用", dcyRpcRequest.getRequestId());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    DcyRpcBootstrap类中

    • 添加静态的响应时间的TreeMap(有序)
    • 开启心跳检测
    // 略...
    public static final TreeMap<InetSocketAddress, Channel> ANSWER_TIME_CHANNEL_CACHE = new TreeMap<>();
    // 略...
    public DcyRpcBootstrap reference(ReferenceConfig<?> reference) {
        // 启动心跳检测
        log.info("开始心跳检测");
        HeartbeatDetector.detectHeartbeat(reference.getInterface().getName());
    	// 略...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    在core模块下创建core包,

    在该包下创建HeartbeatDetector类:心跳检测器

    • 通过线程池开启守护线程,每隔2秒发送心跳请求
    /**
     * 心跳检测器
     */
    @Slf4j
    public class HeartbeatDetector {
        public static void detectHeartbeat(String serviceName) {
            // 从注册中心拉取服务列表并建立连接
            Registry registry = DcyRpcBootstrap.getInstance().getRegistry();
            List<InetSocketAddress> addresses = registry.lookup(serviceName);
    
            // 将连接进行缓存
            for (InetSocketAddress address : addresses) {
                try {
                    if (!DcyRpcBootstrap.CHANNEL_CACHE.containsKey(address)) {
                        Channel channel = NettyBootstrapInitializer.getBootstrap().connect(address).sync().channel();
                        DcyRpcBootstrap.CHANNEL_CACHE.put(address, channel);
                    }
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
    
            // 定期发送消息
            Thread thread = new Thread(() -> {
                new Timer().scheduleAtFixedRate(new MyTimerTask(), 0, 2000);
            }, "dcyRpc-HeartbeatDetector-thread");
    
            thread.setDaemon(true);
            thread.start();
        }
    
        private static class MyTimerTask extends TimerTask {
    
            @Override
            public void run() {
    
                // 每次启动将响应时长的map清空
                DcyRpcBootstrap.ANSWER_TIME_CHANNEL_CACHE.clear();
    
                // 遍历所有的channel
                Map<InetSocketAddress, Channel> channelCache = DcyRpcBootstrap.CHANNEL_CACHE;
                for (Map.Entry<InetSocketAddress, Channel> entry : channelCache.entrySet()) {
                    Channel channel = entry.getValue();
    
                    long start = System.currentTimeMillis();
                    // 构建心跳请求
                    DcyRpcRequest dcyRpcRequest = DcyRpcRequest.builder()
                            .requestId(DcyRpcBootstrap.ID_GENERATOR.getId())
                            .compressType(CompressorFactory.getCompressor(DcyRpcBootstrap.COMPRESS_TYPE).getCode())
                            .serializeType(SerializerFactory.getSerializer(DcyRpcBootstrap.SERIALIZE_TYPE).getCode())
                            .requestType(RequestType.HEART_BEAT.getId())
                            .timeStamp(start)
                            .build();
    
                    CompletableFuture<Object> completableFuture = new CompletableFuture<>();
                    DcyRpcBootstrap.PENDING_REQUEST.put(dcyRpcRequest.getRequestId(), completableFuture);
                    channel.writeAndFlush(dcyRpcRequest).addListener((ChannelFutureListener) promise -> {
                        if (!promise.isSuccess()) {
                            completableFuture.completeExceptionally(promise.cause());
                        }
                    });
    
                    long endTime = 0L;
                    try {
                        completableFuture.get();
                        endTime = System.currentTimeMillis();
                    } catch (InterruptedException | ExecutionException e) {
                        throw new RuntimeException(e);
                    }
    
                    Long time = endTime - start;
    
                    // 使用TreeMap进行缓存
                    DcyRpcBootstrap.ANSWER_TIME_CHANNEL_CACHE.put(time, channel);
    
                    log.info("和【{}】服务器的响应时间是【{}】", entry.getKey() ,time);
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80

    f.最短响应时间的负载均衡策略

    在core模块下com.dcyrpc.loadbalancer.impl包下,创建MinResponseTimeLoadBalancer类:最短响应时间的负载均衡策略

    • 继承AbstractLoadBalancer
    /**
     * 最短响应时间的负载均衡策略
     */
    @Slf4j
    public class MinResponseTimeLoadBalancer extends AbstractLoadBalancer {
        @Override
        protected Selector getSelector(List<InetSocketAddress> serviceList) {
            return new MinResponseTimeSelector(serviceList);
        }
    
        /**
         * 最短响应时间的负载均衡具体实现算法
         */
        private static class MinResponseTimeSelector implements Selector {
    
            public MinResponseTimeSelector(List<InetSocketAddress> serviceList) {
            }
    
            @Override
            public InetSocketAddress getNext() {
                Map.Entry<Long, Channel> entry = DcyRpcBootstrap.ANSWER_TIME_CHANNEL_CACHE.firstEntry();
                if (entry != null) {
                    return (InetSocketAddress) entry.getValue().remoteAddress();
                }
    
                // 直接从缓存中获取一个可用
                Channel channel = (Channel) DcyRpcBootstrap.CHANNEL_CACHE.values().toArray()[0];
    
                return (InetSocketAddress) channel.remoteAddress();
            }
    
            @Override
            public void reBalance() {
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
  • 相关阅读:
    Android Glide限定onlyRetrieveFromCache取内存缓存submit超时阻塞方式,Kotlin
    [运维]如何快速压缩一个数据库的硬盘占用大小(简单粗暴但有效)
    工程化:Plugin 介绍
    Spring实例化源码解析(一)
    IOT Core-设备接入网关
    Python基础-6-字典
    前端css、js、bootstrap、vue2.x、ajax查漏补缺(1)
    jSignature 横屏手写签名
    C语言之动态内存管理篇(1)
    matplotlib画latex表格
  • 原文地址:https://blog.csdn.net/weixin_46926189/article/details/132763631