• Elasticsearch 8.9启动时构建接收Rest请求的hander过程源码


    一、main方式入口

    路径:org.elasticsearch.bootstrap.Elasticsearch

      /**
         * 启动 elasticsearch 的主入口点。
         */
        public static void main(final String[] args) {
    
            Bootstrap bootstrap = initPhase1();
            assert bootstrap != null;
    
            try {
                initPhase2(bootstrap);
                initPhase3(bootstrap);
            } catch (NodeValidationException e) {
                bootstrap.exitWithNodeValidationException(e);
            } catch (Throwable t) {
                bootstrap.exitWithUnknownException(t);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    这里初始化会有三个初始化阶段。可以直接看initPhase3

    二、Elasticsearch初始化第三阶段

     /**
         *初始化的第三阶段
         *阶段 3 包含初始化安全管理器后的所有内容。到目前为止,该系统一直是单线程的。此阶段可以生成线程、写入日志,并受安全管理器策略的约束。
         * 在第 3 阶段结束时,系统已准备好接受请求,主线程已准备好终止。这意味着:
         *    节点组件已构建并启动
         *    清理已完成(例如安全设置已关闭)
         *    除主线程外,至少有一个线程处于活动状态,并且在主线程终止后将保持活动状态
         *    已通知父 CLI 进程系统已准备就绪
         */
        private static void initPhase3(Bootstrap bootstrap) throws IOException, NodeValidationException {
            //调用checkLucene()函数进行Lucene的检查
            checkLucene();
            //创建一个Node对象,并重写validateNodeBeforeAcceptingRequests方法,用于在接受请求之前进行节点验证。
            Node node = new Node(bootstrap.environment()) {
                @Override
                protected void validateNodeBeforeAcceptingRequests(
                    final BootstrapContext context,
                    final BoundTransportAddress boundTransportAddress,
                    List<BootstrapCheck> checks
                ) throws NodeValidationException {
                    BootstrapChecks.check(context, boundTransportAddress, checks);
                }
            };
            //使用bootstrap.spawner()和之前创建的node对象实例化一个Elasticsearch对象,并将其赋值给INSTANCE变量。
            INSTANCE = new Elasticsearch(bootstrap.spawner(), node);
            //关闭安全设置
            IOUtils.close(bootstrap.secureSettings());
            //启动INSTANCE对象,node会启动,并保持一个存活线程
            INSTANCE.start();
            //如果命令行参数指定了daemonize,则移除控制台输出的日志配置。
            if (bootstrap.args().daemonize()) {
                LogConfigurator.removeConsoleAppender();
            }
            //发送CLI标记,表示服务器已经准备好接受请求。
            bootstrap.sendCliMarker(BootstrapInfo.SERVER_READY_MARKER);
            //如果命令行参数指定了daemonize,则关闭流;否则,启动CLI监视线程。
            if (bootstrap.args().daemonize()) {
                bootstrap.closeStreams();
            } else {
                startCliMonitorThread(System.in);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    其中INSTANCE.start();如下,代表node启动,并且存活线程运行

    private void start() throws NodeValidationException {
            node.start();
            keepAliveThread.start();
        }
    
    • 1
    • 2
    • 3
    • 4

    1、构造node节点对象时构造restController

    public Node(Environment environment) {
            this(environment, PluginsService.getPluginsServiceCtor(environment), true);
        }
    /**
         * Constructs a node
         * 节点的初始化
         */
        protected Node(
            final Environment initialEnvironment,
            final Function<Settings, PluginsService> pluginServiceCtor,
            boolean forbidPrivateIndexSettings
        ) {
       		 //省略代码。。。。
       			//里面会初始化restController
                ActionModule actionModule = new ActionModule(
                    settings,
                    clusterModule.getIndexNameExpressionResolver(),
                    settingsModule.getIndexScopedSettings(),
                    settingsModule.getClusterSettings(),
                    settingsModule.getSettingsFilter(),
                    threadPool,
                    pluginsService.filterPlugins(ActionPlugin.class),
                    client,
                    circuitBreakerService,
                    usageService,
                    systemIndices,
                    tracer,
                    clusterService,
                    reservedStateHandlers
                );
                modules.add(actionModule);
                //restController存入到networkModule,而NetworkModule是用于处理注册和绑定所有网络相关类的模块
                //末尾有 actionModule.initRestHandlers初始化hander
                final RestController restController = actionModule.getRestController();
                final NetworkModule networkModule = new NetworkModule(
                    settings,
                    pluginsService.filterPlugins(NetworkPlugin.class),
                    threadPool,
                    bigArrays,
                    pageCacheRecycler,
                    circuitBreakerService,
                    namedWriteableRegistry,
                    xContentRegistry,
                    networkService,
                    restController,
                    actionModule::copyRequestHeadersToThreadContext,
                    clusterService.getClusterSettings(),
                    tracer
                );
                //省略代码。。。。
     			//初始化Rest的Handler
                actionModule.initRestHandlers(() -> clusterService.state().nodesIfRecovered());
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53

    2、在node构建对象最后执行初始化RestHanders的操作

      public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
           //省略代码。。。这里只选几个经常用到的
            registerHandler.accept(new RestGetIndicesAction());
            registerHandler.accept(new RestIndicesStatsAction());
            registerHandler.accept(new RestCreateIndexAction());    
            registerHandler.accept(new RestDeleteIndexAction());
            registerHandler.accept(new RestGetIndexTemplateAction());
            registerHandler.accept(new RestPutIndexTemplateAction());
            registerHandler.accept(new RestDeleteIndexTemplateAction());
            registerHandler.accept(new RestPutMappingAction());
            registerHandler.accept(new RestGetMappingAction());
            registerHandler.accept(new RestGetFieldMappingAction());
            registerHandler.accept(new RestIndexAction());
            registerHandler.accept(new RestSearchAction(restController.getSearchUsageHolder()));
            //省略代码
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    三、以注册在hander中的RestGetIndicesAction对象为例介绍

    
    /**
     * The REST handler for get index and head index APIs.
     * 用于获取索引和头索引 API 的 REST 处理程序。
     */
    @ServerlessScope(Scope.PUBLIC)
    public class RestGetIndicesAction extends BaseRestHandler {
    	//代表路由匹配规则,通过这个规则知道要调用这个实例,每一个实例路由规则都是不一样的
        @Override
        public List<Route> routes() {
            return List.of(new Route(GET, "/{index}"), new Route(HEAD, "/{index}"));
        }
        @Override
        public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
            // starting with 7.0 we don't include types by default in the response to GET requests
            if (request.getRestApiVersion() == RestApiVersion.V_7
                && request.hasParam(INCLUDE_TYPE_NAME_PARAMETER)
                && request.method().equals(GET)) {
                deprecationLogger.compatibleCritical("get_indices_with_types", TYPES_DEPRECATION_MESSAGE);
            }
    
            String[] indices = Strings.splitStringByCommaToArray(request.param("index"));
            final GetIndexRequest getIndexRequest = new GetIndexRequest();
            getIndexRequest.indices(indices);
            getIndexRequest.indicesOptions(IndicesOptions.fromRequest(request, getIndexRequest.indicesOptions()));
            getIndexRequest.local(request.paramAsBoolean("local", getIndexRequest.local()));
            getIndexRequest.masterNodeTimeout(request.paramAsTime("master_timeout", getIndexRequest.masterNodeTimeout()));
            getIndexRequest.humanReadable(request.paramAsBoolean("human", false));
            getIndexRequest.includeDefaults(request.paramAsBoolean("include_defaults", false));
            getIndexRequest.features(GetIndexRequest.Feature.fromRequest(request));
            final var httpChannel = request.getHttpChannel();
            return channel -> new RestCancellableNodeClient(client, httpChannel).admin()
                .indices()
                .getIndex(getIndexRequest, new RestChunkedToXContentListener<>(channel));
        }
    
       
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    1、继承了BaseRestHandler,routes方法做路由规则,父类调用子类的prepareRequest实现

    public abstract class BaseRestHandler implements RestHandler {
        /**
         * {@inheritDoc}
         */
        @Override
        public abstract List<Route> routes();
    
        @Override
        public final void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {
            //调用prepareRequest方法,准备请求以供执行,并且会对请求参数进行处理。
            final RestChannelConsumer action = prepareRequest(request, client);
            //过滤未使用的参数,将未使用的参数收集到一个有序集合中。
            final SortedSet<String> unconsumedParams = request.unconsumedParams()
                .stream()
                .filter(p -> responseParams(request.getRestApiVersion()).contains(p) == false)
                .collect(Collectors.toCollection(TreeSet::new));
            //验证未使用的参数是否有效,如果存在无效参数,则抛出IllegalArgumentException异常。
            if (unconsumedParams.isEmpty() == false) {
                final Set<String> candidateParams = new HashSet<>();
                candidateParams.addAll(request.consumedParams());
                candidateParams.addAll(responseParams(request.getRestApiVersion()));
                throw new IllegalArgumentException(unrecognized(request, unconsumedParams, candidateParams, "parameter"));
            }
            //验证请求是否包含请求体,并且请求体是否已被消耗,如果不满足条件,则抛出IllegalArgumentException异常。
            if (request.hasContent() && request.isContentConsumed() == false) {
                throw new IllegalArgumentException("request [" + request.method() + " " + request.path() + "] does not support having a body");
            }
            //增加使用计数
            usageCount.increment();
            //执行action,将结果传递给channel。
            action.accept(channel);
        }
         /**
         *准备要执行的请求。
         * 实现应在返回可运行对象以进行实际执行之前使用所有请求参数。
         * 未使用的参数将立即终止请求的执行。
         * 但是,某些参数仅用于处理响应;实现可以覆盖 {@link BaseRestHandlerresponseParams()} 来指示此类参数。
         */
        protected abstract RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException;
    }    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    2、BaseRestHandler实现的是RestHandler接口

    /**
     * Handler for REST requests
     */
    @FunctionalInterface
    public interface RestHandler {
        /**
         * 处理rest请求
         */
        void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception;
        /**
         * 此 RestHandler 负责处理的 {@link 路由} 的列表。
         */
        default List<Route> routes() {
            return Collections.emptyList();
        }
     }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    其中调用RestHandler接口的handerRequest的上游是

     @Override
        public void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {
        //使用 OPTIONS 方法的请求应在其他地方处理,而不是通过调用 {@code RestHandlerhandleRequest} 使用 OPTIONS 方法的 HTTP 请求绕过 authn,因此此健全性检查可防止调度未经身份验证的请求
            if (request.method() == Method.OPTIONS) {
                handleException(
                    request,
                    channel,
                    new ElasticsearchSecurityException("Cannot dispatch OPTIONS request, as they are not authenticated")
                );
                return;
            }
            if (enabled == false) {
                doHandleRequest(request, channel, client);
                return;
            }
        }
    
        private void doHandleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {
            threadContext.sanitizeHeaders();
            // operator privileges can short circuit to return a non-successful response
            if (operatorPrivilegesService.checkRest(restHandler, request, channel, threadContext)) {
                try {
                    restHandler.handleRequest(request, channel, client);
                } catch (Exception e) {
                    logger.debug(() -> format("Request handling failed for REST request [%s]", request.uri()), e);
                    throw e;
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    其他注册在hander中的API和RestGetIndicesAction类似

  • 相关阅读:
    conda创建环境、安装包到环境迁移
    ubuntu 20.04 docker 安装 mysql
    技术分享 | 开发板网口热插拔自动获取IP地址
    GUI编程--PyQt5--QLineEdit
    java 基础(核心知识搭配代码)
    3dmax已渲染的图怎么在后期进行调节灯混呢?
    【数据科学赛】2023全球智能汽车AI挑战赛 #¥95000 #LLM文档问答 #视频理解
    R语言基于ARMA-GARCH过程的VaR拟合和预测
    传统单节点网站的 Serverless 上云
    r语言绘制动态统计图:绘制世界各国的人均GDP,出生时的期望寿命和人口气泡图动画动态gif图
  • 原文地址:https://blog.csdn.net/weixin_43113679/article/details/133956546