• SpringCloud源码分析 (Eureka-Server-入口分析和处理Client状态请求) (五)


    Eureka Server 作为一个开箱即用的服务注册中心,提供了以下的功能,用以满足与Eureka Client 交互的需求:

    1. 处理Client状态请求,
    2. 处理客户端删除overriddenStatus,
    3. 处理Client注册请求, 续约请求, 下架请求, 全量下载 增量下载请求
    4. 定时清除过期Client

    需要注意的是, Eureka Server 同时也是一个Eureka Client ,在不禁止Eureka Server 的客户端行为时,它会向它配置文件中的其他Eureka Server 进行拉取注册表、服务注册和发送心跳等操作。

    1.自动配置入口

    <dependency>
        <groupId>org.springframework.cloudgroupId>
        <artifactId>spring-cloud-starter-netflix-eureka-serverartifactId>
    dependency>
    
    • 1
    • 2
    • 3
    • 4

    在这里插入图片描述

    在这里插入图片描述

    1.1 @EnableEurekaServer

    在这里插入图片描述

    在这里插入图片描述

    在这里插入图片描述

    可以看到该类是一个配置类,在其中创建了Marker实例。

    通过上面分析我们知道通过@EnableEurekaServer注解,会创建Marker实例,从而让EurekaServerAutoConfiguration这个自动配置类生效

    1.2 EurekaServerAutoConfiguration

    在这里插入图片描述

    在这里插入图片描述

    该配置类具有一个条件注解:要求必须要有一个EurekaServerMarkerConæguration.Marker实例这个配置类才会起作用。

    在这里插入图片描述

    在这个Eureka Server自动配置类启动后,其会创建一个很重要的实例,InstanceRegistry。该实例及其父类中的很多方法是我们后面要调用到的。

    1.3 PeerEurekaNodes

    在这里插入图片描述

    服务器之间拷贝用得到, 代表的是服务端Eureka集群

    2.处理客户端状态修改请求

    现在我们分析第一个请求,先找到处理器,我们知道SpringCloud用的是jersey框架,与SpringMVC框架不同的是SpringMVC用的是Controller作为处理器,而jersey用的是Resource,所以我们找到InstanceResource:

    在这里插入图片描述

        @PUT
        @Path("status")
        public Response statusUpdate(
                @QueryParam("value") String newStatus,
                @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
                @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
            try {
                if (registry.getInstanceByAppAndId(app.getName(), id) == null) {
                    logger.warn("Instance not found: {}/{}", app.getName(), id);
                    return Response.status(Status.NOT_FOUND).build();
                }
                boolean isSuccess = registry.statusUpdate(app.getName(), id,
                        InstanceStatus.valueOf(newStatus), lastDirtyTimestamp,
                        "true".equals(isReplication));
    
                if (isSuccess) {
                    logger.info("Status updated: {} - {} - {}", app.getName(), id, newStatus);
                    return Response.ok().build();
                } else {
                    logger.warn("Unable to update status: {} - {} - {}", app.getName(), id, newStatus);
                    return Response.serverError().build();
                }
            } catch (Throwable e) {
                logger.error("Error updating instance {} for status {}", id,
                        newStatus);
                return Response.serverError().build();
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    1. 修改了注册表中该instanceInfo的status
    2. 将新的状态记录到了overriddenInstanceStatusMap缓存中
    3. 本次修改记录到了recentlyChangedQueue缓存中
    2.1 根据微服务名称和instanceId从注册表中获取实例

    先看registry.getInstanceByAppAndId方法,根据微服务名称和instanceId查询注册表中的实例:

    //AbstractInstanceRegistry.java
    public InstanceInfo getInstanceByAppAndId(String appName, String id) {
        return this.getInstanceByAppAndId(appName, id, true);
    }
    
    //AbstractInstanceRegistry.java
    public InstanceInfo getInstanceByAppAndId(String appName, String id, boolean includeRemoteRegions) {
    	//registry就是我们服务端本地的注册表,双层map
    	//外层map,key是微服务名称,value是内层map
    	//内层map,key是InstanceInfo的Id,value是Lease续约对象,包装了InstanceInfo
    
    	//先根据微服务名获取内层map
        Map<String, Lease<InstanceInfo>> leaseMap = registry.get(appName);
        Lease<InstanceInfo> lease = null;
        if (leaseMap != null) {
    	    //再根据instanceId获取续约对象
            lease = leaseMap.get(id);
        }
        if (lease != null
                && (!isLeaseExpirationEnabled() || !lease.isExpired())) {
            //lease不空,并且:
            //isLeaseExpirationEnabled:续约过期是否开启,底层其实是判断是否开启了
            //    自我保护机制,如果开启了自我保护机制,这里就会返回false,关闭续约过期
            //    则!isLeaseExpirationEnabled()值就是true,不会进行第二个条件判断了
            //    也就意味着只要注册表中有这个实例信息,就直接返回,无论它是否过期
            
            //相反如果没有开启自我保护机制,则会开启续约过期,就需要检查当前实例
            //的续约信息有没有过期,只会返回没有过期的实例信息
            
            //把lease对象包装成InstanceInfo返回
            return decorateInstanceInfo(lease);
        } else if (includeRemoteRegions) {
    	    //上面只要没有找到,如果includeRemoteRegions为true
    	    //则会尝试去远程region的注册表中查找
    	    
    	    //从远程Region中获取有没有该实例的InstanceInfo
            for (RemoteRegionRegistry remoteRegistry : this.regionNameVSRemoteRegistry.values()) {
    	        //regionNameVSRemoteRegistry也是一个map
    	        //key是regionName,value是RemoteRegionRegistry,远程注册表
    	        
    	        //根据微服务名称获取Application
                Application application = remoteRegistry.getApplication(appName);
                if (application != null) {
    	            //根据instanceId获取对应的InstanceInfo
                    return application.getByInstanceId(id);
                }
            }
        }
        return null;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    服务端的注册表

    在这里插入图片描述
    双重Map, 外部的key为微服务名称, value为Map, 内层Map的key为InstanceId, Value为lease, 续约对象

    2.2 处理Client状态改变
    • 本地状态
    • 同步到其他Server

    statusUpdate

    在这里插入图片描述

    在这里插入图片描述

    1. 本地状态改变: AbstractInstanceRegistry.statusUpdate()

    2. 状态更新操作同步给集群中其他Server: PeerAwareInstanceRegistryImpl.statusUpdate()

    本地状态改变

    1. 将新的状态记录到了overriddenInstanceStatusMap缓存中
    2. 本次修改记录到了recentlyChangedQueue缓存中
    //AbstractInstanceRegistry.java
    public boolean statusUpdate(String appName, String id,
                                InstanceStatus newStatus, String lastDirtyTimestamp,
                                boolean isReplication) {
        try {
            read.lock();//更新操作为什么上读锁?
            //后面服务端很多流程分析,都会看到修改操作,加上了读锁
            //先简单提一下,流程全部分析完,最后还会单独说一下
            //读锁对应有一个写锁,读锁之间不互斥,写锁之间、读写锁之间是互斥的。
            //如果一个线程持有读锁,其他线程再去获取读锁都是可以的立即获取到的
            //  但是如果获取写锁就会阻塞了
            //相反,如果一个线程持有写锁,其他线程无论想获取读锁还是写锁都会阻塞。
            
            //所以这里修改操作用读锁,而读取操作却用写锁
            //首要原因是因为读的操作只有一个地方,而写的操作有很多地方
            //其次是希望写的操作之间不会互斥,可以同时进行,而读的操作进行的时
            //    候是不允许其他线程写的,保证读的时候数据的稳定性。
            //写的操作可以多个线程同时进行,所以它本身肯定需要保证线程安全
            
            //状态更新计数器+1
            STATUS_UPDATE.increment(isReplication);
            //根据微服务名称,从注册表中获取内层map
            Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
            Lease<InstanceInfo> lease = null;
            if (gMap != null) {
    	        //根据instanceId获取续约对象
                lease = gMap.get(id);
            }
            if (lease == null) {
                return false;
            } else {
    	        //刷新续约时间
    	        //既然服务端已经收到了客户的请求,本身也相当于一次续约心跳
                lease.renew();
                //获取持有者 instanceInfo
                InstanceInfo info = lease.getHolder();
                // Lease is always created with its instance info object.
                // This log statement is provided as a safeguard, in case this invariant is violated.
                if (info == null) {
                    logger.error("Found Lease without a holder for instance id {}", id);
                }
                if ((info != null) && !(info.getStatus().equals(newStatus))) {
    	            // 状态不一样才进行更新
                    // Mark service as UP if needed
                    if (InstanceStatus.UP.equals(newStatus)) {
    	                // 状态是UP,如果是第一次启动,则记录一下启动的时间戳
                        lease.serviceUp();
                    }
                    // This is NAC overriden status
                    // 保存到一个维护覆盖状态的map,key是instanceId
                    overriddenInstanceStatusMap.put(id, newStatus);
                    // Set it for transfer of overridden status to replica on
                    // replica start up 设置它以便在副本启动时将覆盖状态转移到副本
                    // 为覆盖状态赋值
                    info.setOverriddenStatus(newStatus);
    
    				//复制时间戳
                    long replicaDirtyTimestamp = 0;
                    //直接修改status状态,而不标记dirty
                    info.setStatusWithoutDirty(newStatus);
                    //如果客户端传过来的lastDirtyTimestamp不为空(客户端的最新修改时间)
                    if (lastDirtyTimestamp != null) {
    	                //赋值给replicaDirtyTimestamp
                        replicaDirtyTimestamp = Long.valueOf(lastDirtyTimestamp);
                    }
                    // If the replication's dirty timestamp is more than the existing one, just update
                    // it to the replica's.
                    // 比较一下客户端传过来的最新修改时间是不是比我服务端记录的还新
                    // 保存更新的
                    if (replicaDirtyTimestamp > info.getLastDirtyTimestamp()) {
                        info.setLastDirtyTimestamp(replicaDirtyTimestamp);
                    }
                    //设置操作类型为修改(增量更新时看到过)
                    info.setActionType(ActionType.MODIFIED);
                    //将其加入到最近更新队列!!!这是一个线程安全的先进先出的队列
                    recentlyChangedQueue.add(new RecentlyChangedItem(lease));
                    //更新服务端的最新修改时间
                    //lastUpdatedTimestamp时间戳,这个时间戳是服务端专门用的
                    info.setLastUpdatedTimestamp();
                    //让一些缓存失效,不看了
                    invalidateCache(appName, info.getVIPAddress(), info.getSecureVipAddress());
                }
                return true;
            }
        } finally {
    	    //最后锁释放
            read.unlock();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89

    状态更新操作同步给集群中其他Server

    在这里插入图片描述

    在这里插入图片描述

    replicateInstanceActionsToPeers()

    //PeerAwareInstanceRegistryImpl.java
    /**
     * Replicates all instance changes to peer eureka nodes except for
     * replication traffic to this node.
     * 将所有实例更改复制到对等eureka节点,但复制到该节点的流量除外。
     */
    private void replicateInstanceActionsToPeers(Action action, String appName,
                                                 String id, InstanceInfo info, InstanceStatus newStatus,
                                                 PeerEurekaNode node) {
    	//此时action = Action.StatusUpdate
        try {
            InstanceInfo infoFromRegistry = null;
            CurrentRequestVersion.set(Version.V2);
            switch (action) {
                case Cancel:
                	//下架
                    node.cancel(appName, id);
                    break;
                case Heartbeat:
                	//心跳
                    InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                    break;
                case Register:
                	//注册
                    node.register(info);
                    break;
                case StatusUpdate:
                	//状态更新
    	            //从注册表获取instanceInfo信息,之前跟过
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    //给节点同步状态更新操作
                    node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                    break;
                case DeleteStatusOverride:
                	//状态删除
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.deleteStatusOverride(appName, id, infoFromRegistry);
                    break;
            }
        } catch (Throwable t) {
            logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    节点同步状态更新操作

    /**
     * Send the status update of the instance.
     */
    public void statusUpdate(final String appName, final String id,
                             final InstanceStatus newStatus, final InstanceInfo info) {
        long expiryTime = System.currentTimeMillis() + maxProcessingDelayMs;
        //batchingDispatcher执行器,底层是将任务放入队列,有专门的后台线程循环从队列取任务执行
        batchingDispatcher.process(
    		    //参数一:任务ID
                taskId("statusUpdate", appName, id),
                //参数二:真正要处理的任务
                new InstanceReplicationTask(targetHost, Action.StatusUpdate, info, null, false) {
                    @Override
                    public EurekaHttpResponse<Void> execute() {
    	                //直接看statusUpdate方法
                        return replicationClient.statusUpdate(appName, id, newStatus, info);
                    }
                },
                //参数三:到期时间
                expiryTime
        );
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    在这里插入图片描述

    最后通过Jersey框架发生put请求

    在这里插入图片描述

  • 相关阅读:
    springboot+vue音乐歌曲分享试听网站java
    <Linux基础I/O(2)>——《Linux》
    【探索Linux命令行】从基础指令到高级管道操作的介绍与实践
    达梦数据库查看锁以及解锁
    Spring AOP 实现方式与应用
    聊聊支付流程的设计与实现逻辑
    animate.css与vue中的v-if/v-show如何一起使用
    java毕业设计大学生数字云平台2021Mybatis+系统+数据库+调试部署
    CleanMyMac X4.11.2免费版专业的Mac电脑清理软件
    【AcWing16】【LeetCode】并查集Union Find-128/130/*1020-学完广度优先/深度优先要回来再看
  • 原文地址:https://blog.csdn.net/qq_43141726/article/details/124546544