• Sentinel源码剖析之初始化


    1、SphU

    用过sentinel的都知道SphU是一切的源头

    entry = SphU.entry(target, EntryType.IN); 通过这行代码来获取访问令牌,如果获取到令牌,那么就可以访问目标资源,没有获取到entry便无法访问对应资源。

     public static Entry entry(String name, int resourceType, EntryType trafficType, Object[] args)
            throws BlockException {
            // 注意第4个参数值为1
            return Env.sph.entryWithType(name, resourceType, trafficType, 1, args);
        }
    
    public static AsyncEntry asyncEntry(String name, int resourceType, EntryType trafficType, Object[] args)
            throws BlockException {
            return Env.sph.asyncEntryWithType(name, resourceType, trafficType, 1, false, args);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    这里有两种获取entry的方法,前者适用于传统处理方式一个请求一个线程进行处理SpringMVC,普通处理业务这种,后者适用于响应式处理方式,根据信号来处理数据,像基于SpringWebFlux,SCG这种。

    不过呢,这不是本主题,因为Sentinel初始化,无论是后者还是前者,都是共用的,后面文章讲各自的处理方式。

    参数解释:

    • name 受保护的资源名称
    • resourceType 资源类型 一般是web/rpc
    • entryType 进入类型/令牌类型, 一般有两种 in/out 往往是in 代表进来的流量
    • args 参数流控所需配置项

    2、Env.sph

    public class Env {
        public static final Sph sph = new CtSph();
        static {
            // If init fails, the process will exit.
            InitExecutor.doInit();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    Env 是sentinel的触发器,第一次使用SphU.entry触发,进行类加载的时候初始化,初始化sentinel基础组件

    /**
         * If one {@link InitFunc} throws an exception, the init process
         * will immediately be interrupted and the application will exit.
         *
         * The initialization will be executed only once.
         */
        public static void doInit() {
            if (!initialized.compareAndSet(false, true)) {
                return;
            }
            try {
                List<InitFunc> initFuncs = SpiLoader.of(InitFunc.class).loadInstanceListSorted();
                List<OrderWrapper> initList = new ArrayList<OrderWrapper>();
                for (InitFunc initFunc : initFuncs) {
                    RecordLog.info("[InitExecutor] Found init func: {}", initFunc.getClass().getCanonicalName());
                    insertSorted(initList, initFunc);
                }
                for (OrderWrapper w : initList) {
                    w.func.init();
                    RecordLog.info("[InitExecutor] Executing {} with order {}",
                        w.func.getClass().getCanonicalName(), w.order);
                }
            } catch (Exception ex) {
                RecordLog.warn("[InitExecutor] WARN: Initialization failed", ex);
                ex.printStackTrace();
            } catch (Error error) {
                RecordLog.warn("[InitExecutor] ERROR: Initialization failed with fatal error", error);
                error.printStackTrace();
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    SpiLoader.of(InitFunc.class).loadInstanceListSorted() 通过JavaSPI技术获取到InitFunc所有实现类,Sentinel自己实现了SPI加载机制,并没有采用ServcieLoader.load获取。我们可以看看这种实现。

    public List<S> loadInstanceListSorted() {
            load();
    
            return createInstanceList(sortedClassList);
        }
     /**
         * Load the Provider class from Provider configuration file
         */
        public void load() {
            if (!loaded.compareAndSet(false, true)) {
                return;
            }
    		
    		// 拼接SPI接口位置  META-INF/services/InitFunc
            String fullFileName = SPI_FILE_PREFIX + service.getName();
            // 获取类加载器,用来加载指定SPI接口的实现类
            ClassLoader classLoader;
            if (SentinelConfig.shouldUseContextClassloader()) {
                classLoader = Thread.currentThread().getContextClassLoader();
            } else {
                classLoader = service.getClassLoader();
            }
            if (classLoader == null) {
                classLoader = ClassLoader.getSystemClassLoader();
            }
            Enumeration<URL> urls = null;
            try {
                // 通过classLoader.getResources 架子啊SPI配置文件地址,获取SPI配置文件里的配置项,也即SPI实现类字符串
                urls = classLoader.getResources(fullFileName);
            } catch (IOException e) {
                fail("Error locating SPI configuration file, filename=" + fullFileName + ", classloader=" + classLoader, e);
            }
    
            if (urls == null || !urls.hasMoreElements()) {
                RecordLog.warn("No SPI configuration file, filename=" + fullFileName + ", classloader=" + classLoader);
                return;
            }
    		
    		// 对指定SPI接口实现类,进行类加载,获取class对象
            while (urls.hasMoreElements()) {
                URL url = urls.nextElement();
    
                InputStream in = null;
                BufferedReader br = null;
                try {
                    in = url.openStream();
                    br = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8));
                    String line;
                    while ((line = br.readLine()) != null) {
                        if (StringUtil.isBlank(line)) {
                            // Skip blank line
                            continue;
                        }
    
                        line = line.trim();
                        int commentIndex = line.indexOf("#");
                        if (commentIndex == 0) {
                            // Skip comment line
                            continue;
                        }
    
                        if (commentIndex > 0) {
                            line = line.substring(0, commentIndex);
                        }
                        line = line.trim();
    
                        Class<S> clazz = null;
                        try {
                        	// 获取SPI接口实现类  class对象
                            clazz = (Class<S>) Class.forName(line, false, classLoader);
                        } catch (ClassNotFoundException e) {
                            fail("class " + line + " not found", e);
                        }
    
                        if (!service.isAssignableFrom(clazz)) {
                            fail("class " + clazz.getName() + "is not subtype of " + service.getName() + ",SPI configuration file=" + fullFileName);
                        }
    
                        classList.add(clazz);
                        Spi spi = clazz.getAnnotation(Spi.class);
                        String aliasName = spi == null || "".equals(spi.value()) ? clazz.getName() : spi.value();
                        if (classMap.containsKey(aliasName)) {
                            Class<? extends S> existClass = classMap.get(aliasName);
                            fail("Found repeat alias name for " + clazz.getName() + " and "
                                    + existClass.getName() + ",SPI configuration file=" + fullFileName);
                        }
                        classMap.put(aliasName, clazz);
    
                        if (spi != null && spi.isDefault()) {
                            if (defaultClass != null) {
                                fail("Found more than one default Provider, SPI configuration file=" + fullFileName);
                            }
                            defaultClass = clazz;
                        }
    
                        RecordLog.info("[SpiLoader] Found SPI implementation for SPI {}, provider={}, aliasName={}"
                                + ", isSingleton={}, isDefault={}, order={}",
                            service.getName(), line, aliasName
                                , spi == null ? true : spi.isSingleton()
                                , spi == null ? false : spi.isDefault()
                                , spi == null ? 0 : spi.order());
                    }
                } catch (IOException e) {
                    fail("error reading SPI configuration file", e);
                } finally {
                    closeResources(in, br);
                }
            }
    
    		// 构建SPI接口实现类 顺序,
            sortedClassList.addAll(classList);
            Collections.sort(sortedClassList, new Comparator<Class<? extends S>>() {
                @Override
                public int compare(Class<? extends S> o1, Class<? extends S> o2) {
                    Spi spi1 = o1.getAnnotation(Spi.class);
                    int order1 = spi1 == null ? 0 : spi1.order();
    
                    Spi spi2 = o2.getAnnotation(Spi.class);
                    int order2 = spi2 == null ? 0 : spi2.order();
    
                    return Integer.compare(order1, order2);
                }
            });
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124

    List initFuncs = SpiLoader.of(InitFunc.class).loadInstanceListSorted();这一步本质上是通过读取SPI配置文件,进行逐行类加载,获取到全部实现类class对象列表

    在这里插入图片描述
    可以看到实现类还是蛮多的,那么都是用来干什么的呢???

    for (OrderWrapper w : initList) {
                    w.func.init();
                    RecordLog.info("[InitExecutor] Executing {} with order {}",
                        w.func.getClass().getCanonicalName(), w.order);
                }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    通过上述代码进行遍历初始化组件

    3、CommandCenterInitFunc

    顾名思义,Sentinel命令中心组件,主要用来接收dashboard发送过来的命令,进行命令解析处理。例如dashboard新增了一条限流规则,rpc到Sentinel命令中心,让这条规则生效

    @InitOrder(-1)
    public class CommandCenterInitFunc implements InitFunc {
    
        @Override
        public void init() throws Exception {
            CommandCenter commandCenter = CommandCenterProvider.getCommandCenter();
    
            if (commandCenter == null) {
                RecordLog.warn("[CommandCenterInitFunc] Cannot resolve CommandCenter");
                return;
            }
    
            commandCenter.beforeStart();
            commandCenter.start();
            RecordLog.info("[CommandCenterInit] Starting command center: "
                    + commandCenter.getClass().getCanonicalName());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    CommandCenter commandCenter = CommandCenterProvider.getCommandCenter(); 通commandCenterProvider来类加载 命令处理器,因为命令中心接收到命令之后,需要通过对应命令处理器处理。

    public final class CommandCenterProvider {
    
        private static CommandCenter commandCenter = null;
    
        static {
            resolveInstance();
        }
    
        private static void resolveInstance() {
            CommandCenter resolveCommandCenter = SpiLoader.of(CommandCenter.class).loadHighestPriorityInstance();
    
            if (resolveCommandCenter == null) {
                RecordLog.warn("[CommandCenterProvider] WARN: No existing CommandCenter found");
            } else {
                commandCenter = resolveCommandCenter;
                RecordLog.info("[CommandCenterProvider] CommandCenter resolved: {}", resolveCommandCenter.getClass()
                    .getCanonicalName());
            }
        }
    
        /**
         * Get resolved {@link CommandCenter} instance.
         *
         * @return resolved {@code CommandCenter} instance
         */
        public static CommandCenter getCommandCenter() {
            return commandCenter;
        }
    
        private CommandCenterProvider() {}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    这里无非是通过SPI加载ComandCenter的所有实现类,上面讲过了,不赘诉了。CommandCenter接口有哪些??

    在这里插入图片描述
    命令中心有两个实现方式,一个通过netty接收命令处理,一种就是传统的httpClient,内部的实现方式类似

    • NettyHttpCommandCenter
    • SimpleHttpCommandCenter

    我们以默认的SImpleHttpCommandCenter来讲诉吧!!对了loadHighestPriorityInstance()默认加载高优先级的CommandCenter这里!

    3.1、SimpleHttpCommandCenter

    commandCenter.beforeStart();

    @Override
        public void init() throws Exception {
            CommandCenter commandCenter = CommandCenterProvider.getCommandCenter();
    
            if (commandCenter == null) {
                RecordLog.warn("[CommandCenterInitFunc] Cannot resolve CommandCenter");
                return;
            }
    
            commandCenter.beforeStart();
            commandCenter.start();
            RecordLog.info("[CommandCenterInit] Starting command center: "
                    + commandCenter.getClass().getCanonicalName());
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    获取到CommandCenter两个实现类后,调用beforeStart();

    @Override
        @SuppressWarnings("rawtypes")
        public void beforeStart() throws Exception {
            // Register handlers
            Map<String, CommandHandler> handlers = CommandHandlerProvider.getInstance().namedHandlers();
            registerCommands(handlers);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    显然又是SPI加载,这次呢是获取所有CommandHandler实现类,也即获取所有命令处理器

     public Map<String, CommandHandler> namedHandlers() {
            Map<String, CommandHandler> map = new HashMap<String, CommandHandler>();
            // 老套路,SPI类加载
            List<CommandHandler> handlers = spiLoader.loadInstanceList();
            for (CommandHandler handler : handlers) {
                String name = parseCommandName(handler);
                if (!StringUtil.isEmpty(name)) {
                    map.put(name, handler);
                }
            }
            return map;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    CommandHnadler默认实现蛮多的,用到的也不过是那几个 限流规则命令处理器,熔断规则命令处理器等等,有兴趣可以了解下,不难

    在这里插入图片描述

    /**
    	 *
    	 * @throws Exception
    	 */
    	@Override
        @SuppressWarnings("rawtypes")
        public void beforeStart() throws Exception {
            // Register handlers
            Map<String, CommandHandler> handlers = CommandHandlerProvider.getInstance().namedHandlers();
            // 注册命令处理器缓存一下,方便后续使用
            registerCommands(handlers);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    commandCenter.start();

    @Override
        public void start() throws Exception {
    		// 获取当前服务器的最大处理器
            int nThreads = Runtime.getRuntime().availableProcessors();
            // 创建线程池 核心线程数=最大线程数= 最大处理器数
            this.bizExecutor = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<Runnable>(10),
                new NamedThreadFactory("sentinel-command-center-service-executor"),
                new RejectedExecutionHandler() {
                    @Override
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                        CommandCenterLog.info("EventTask rejected");
                        throw new RejectedExecutionException();
                    }
                });
    		// 定义一个服务器初始化的任务,启动命令中心,接收命令处理
            Runnable serverInitTask = new Runnable() {
                int port;
    
                // 定义dashboard端口
                {
                    try {
                        port = Integer.parseInt(TransportConfig.getPort());
                    } catch (Exception e) {
                        port = DEFAULT_PORT;
                    }
                }
    
                @Override
                public void run() {
                    boolean success = false;
                    // 创建一个serverSocket 接收命令用的
                    ServerSocket serverSocket = getServerSocketFromBasePort(port);
    
                    if (serverSocket != null) {
                        CommandCenterLog.info("[CommandCenter] Begin listening at port " + serverSocket.getLocalPort());
                        socketReference = serverSocket;
                        // 启动ServerThread,来接收任务并处理
                        executor.submit(new ServerThread(serverSocket));
                        success = true;
                        port = serverSocket.getLocalPort();
                    } else {
                        CommandCenterLog.info("[CommandCenter] chooses port fail, http command center will not work");
                    }
    
                    if (!success) {
                        port = PORT_UNINITIALIZED;
                    }
    
                    TransportConfig.setRuntimePort(port);
                    executor.shutdown();
                }
    
            };
    
            new Thread(serverInitTask).start();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    private static ServerSocket getServerSocketFromBasePort(int basePort) {
            int tryCount = 0;
            while (true) {
                try {
                    ServerSocket server = new ServerSocket(basePort + tryCount / 3, 100);
                    server.setReuseAddress(true);
                    return server;
                } catch (IOException e) {
                    tryCount++;
                    try {
                        TimeUnit.MILLISECONDS.sleep(30);
                    } catch (InterruptedException e1) {
                        break;
                    }
                }
            }
            return null;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    服务器创建失败,端口数增加,会自动重建

    executor.submit(new ServerThread(serverSocket));
    
    • 1

    ServerThread.run

    @Override
            public void run() {
                while (true) {
                    Socket socket = null;
                    try {
                        socket = this.serverSocket.accept();
                        setSocketSoTimeout(socket);
                        HttpEventTask eventTask = new HttpEventTask(socket);
                        bizExecutor.submit(eventTask);
                    } catch (Exception e) {
                        CommandCenterLog.info("Server error", e);
                        if (socket != null) {
                            try {
                                socket.close();
                            } catch (Exception e1) {
                                CommandCenterLog.info("Error when closing an opened socket", e1);
                            }
                        }
                        try {
                            // In case of infinite log.
                            Thread.sleep(10);
                        } catch (InterruptedException e1) {
                            // Indicates the task should stop.
                            break;
                        }
                    }
                }
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    监听serverSocket连接,接受请求,封装HttpEventTask请求,通过线程池处理

    逻辑如下
    HttpEventTask.run

    @Override
        public void run() {
            if (socket == null) {
                return;
            }
    
            PrintWriter printWriter = null;
            InputStream inputStream = null;
            try {
                long start = System.currentTimeMillis();
                inputStream = new BufferedInputStream(socket.getInputStream());
                OutputStream outputStream = socket.getOutputStream();
    
                printWriter = new PrintWriter(
                    new OutputStreamWriter(outputStream, Charset.forName(SentinelConfig.charset())));
    
                String firstLine = readLine(inputStream);
    
                CommandCenterLog.info("[SimpleHttpCommandCenter] Socket income: " + firstLine
                    + ", addr: " + socket.getInetAddress());
                CommandRequest request = processQueryString(firstLine);
    
                if (firstLine.length() > 4 && StringUtil.equalsIgnoreCase("POST", firstLine.substring(0, 4))) {
                    // Deal with post method
                    processPostRequest(inputStream, request);
                }
    
                // Validate the target command.
                String commandName = HttpCommandUtils.getTarget(request);
                if (StringUtil.isBlank(commandName)) {
                    writeResponse(printWriter, StatusCode.BAD_REQUEST, INVALID_COMMAND_MESSAGE);
                    return;
                }
    
                // Find the matching command handler.
    	        System.out.println("sentinel dashboard 请求首行:"+firstLine);
                CommandHandler<?> commandHandler = SimpleHttpCommandCenter.getHandler(commandName);
                if (commandHandler != null) {
                    CommandResponse<?> response = commandHandler.handle(request);
                    handleResponse(response, printWriter);
                } else {
                    // No matching command handler.
                    writeResponse(printWriter, StatusCode.BAD_REQUEST, "Unknown command `" + commandName + '`');
                }
    
                long cost = System.currentTimeMillis() - start;
                CommandCenterLog.info("[SimpleHttpCommandCenter] Deal a socket task: " + firstLine
                    + ", address: " + socket.getInetAddress() + ", time cost: " + cost + " ms");
            } catch (RequestException e) {
                writeResponse(printWriter, e.getStatusCode(), e.getMessage());
            } catch (Throwable e) {
                CommandCenterLog.warn("[SimpleHttpCommandCenter] CommandCenter error", e);
                try {
                    if (printWriter != null) {
                        String errorMessage = SERVER_ERROR_MESSAGE;
                        e.printStackTrace();
                        if (!writtenHead) {
                            writeResponse(printWriter, StatusCode.INTERNAL_SERVER_ERROR, errorMessage);
                        } else {
                            printWriter.println(errorMessage);
                        }
                        printWriter.flush();
                    }
                } catch (Exception e1) {
                    CommandCenterLog.warn("Failed to write error response", e1);
                }
            } finally {
                closeResource(inputStream);
                closeResource(printWriter);
                closeResource(socket);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72

    无非是解析命令,通过命令名称匹配对应的CommandHandler进行处理

    4、HeartBeatSenderInitFunc

    顾名思义发送心跳的处理器

     @Override
        public void init() {
        	// 获取默认的HeartBeatSender
            HeartbeatSender sender = HeartbeatSenderProvider.getHeartbeatSender();
            if (sender == null) {
                RecordLog.warn("[HeartbeatSenderInitFunc] WARN: No HeartbeatSender loaded");
                return;
            }
    		// 如果有需要初始化调度器,线程数为2
            initSchedulerIfNeeded();
            // 获取心跳间隔
            long interval = retrieveInterval(sender);
            setIntervalIfNotExists(interval);
            scheduleHeartbeatTask(sender, interval);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    HeartbeatSenderProvider.getHeartbeatSender(); 获取心跳器

    private static void resolveInstance() {
            HeartbeatSender resolved = SpiLoader.of(HeartbeatSender.class).loadHighestPriorityInstance();
            if (resolved == null) {
                RecordLog.warn("[HeartbeatSenderProvider] WARN: No existing HeartbeatSender found");
            } else {
                heartbeatSender = resolved;
                RecordLog.info("[HeartbeatSenderProvider] HeartbeatSender activated: {}", resolved.getClass()
                    .getCanonicalName());
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    同理通过SPI类加载,获取HeartBeatSender实现类

    在这里插入图片描述

    默认有两个实现类,优先加载优先级高的,SimpleHttpHeartBeatSender优先级高,所以默认为他

     @Override
        public void init() {
        	// 获取默认的HeartBeatSender
            HeartbeatSender sender = HeartbeatSenderProvider.getHeartbeatSender();
            if (sender == null) {
                RecordLog.warn("[HeartbeatSenderInitFunc] WARN: No HeartbeatSender loaded");
                return;
            }
    		// 如果有需要初始化调度器,线程数为2
            initSchedulerIfNeeded();
            // 获取心跳间隔
            long interval = retrieveInterval(sender);
            setIntervalIfNotExists(interval);
            scheduleHeartbeatTask(sender, interval);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    在这里插入图片描述

    默认加载这个配置项

    private void scheduleHeartbeatTask(/*@NonNull*/ final HeartbeatSender sender, /*@Valid*/ long interval) {
            pool.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        sender.sendHeartbeat();
                    } catch (Throwable e) {
                        RecordLog.warn("[HeartbeatSender] Send heartbeat error", e);
                    }
                }
            }, 5000, interval, TimeUnit.MILLISECONDS);
            RecordLog.info("[HeartbeatSenderInit] HeartbeatSender started: "
                + sender.getClass().getCanonicalName());
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    然后开一个定时调度器,定时发送心跳,延迟5s开始,应该是每隔10s发送一次

      @Override
        public boolean sendHeartbeat() throws Exception {
            if (TransportConfig.getRuntimePort() <= 0) {
                RecordLog.info("[SimpleHttpHeartbeatSender] Command server port not initialized, won't send heartbeat");
                return false;
            }
            Endpoint addrInfo = getAvailableAddress();
            if (addrInfo == null) {
                return false;
            }
    
            SimpleHttpRequest request = new SimpleHttpRequest(addrInfo, TransportConfig.getHeartbeatApiPath());
            request.setParams(heartBeat.generateCurrentMessage());
            try {
                SimpleHttpResponse response = httpClient.post(request);
                if (response.getStatusCode() == OK_STATUS) {
                    return true;
                } else if (clientErrorCode(response.getStatusCode()) || serverErrorCode(response.getStatusCode())) {
                    RecordLog.warn("[SimpleHttpHeartbeatSender] Failed to send heartbeat to " + addrInfo
                        + ", http status code: " + response.getStatusCode());
                }
            } catch (Exception e) {
                RecordLog.warn("[SimpleHttpHeartbeatSender] Failed to send heartbeat to " + addrInfo, e);
            }
            return false;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    发送心跳包,无非是发送一些当前服务的基本信息,维持下心跳

    5、总结

    Sentinel初始化,有个几个核心对象

    • SphU sentinel基本API
    • Env 组件初始化
    • CommandCenterInitFunc 命令中心,负责接收命令
    • CommandHandler,接收的命令,最终的处理器
    • HeartBeatSenderInitFunc 心跳发送器,维护与dashboard的心跳,10s一次

    简单吧嘿嘿!!!

  • 相关阅读:
    书生·浦语大模型全链路开源体系-第6课
    Revit SDK 介绍:Ribbon 界面
    python-opencv 图像处理基础 (十)图像膨胀腐蚀+开闭操作
    ES6的class
    my_print_defaults 及perror
    Docker迁移以及环境变量问题
    2022 CCF BDCI 返乡发展人群预测 [0.9117+]
    张高兴的 .NET IoT 入门指南:(八)基于 GPS 的 NTP 时间同步服务器
    鸿蒙入门-13Gauge组件
    uniapp小程序中的登录完整代码(兼容小程序和app)
  • 原文地址:https://blog.csdn.net/qq_44787816/article/details/127776198