• Elasticsearch 8.9 Master节点处理请求源码


    大家看可以看ElasticSearch源码:Rest请求与Master节点处理流程(1)

    在这里插入图片描述

    这个图非常好,下午的讲解代码在各个类和方法之间流转,都体现这个图上

    一、Master节点处理请求的逻辑

    不是所有的请求都需要Master节点处理,但是有些请求必须让Master节点处理,比如创建index,下面的3就是用创建索引做的示例

    1、节点(数据节点)要和主节点进行通讯,需要继承自基类MasterNodeRequest

    主节点在 Elasticsearch 集群中负责集群的管理和协调工作。当节点需要执行某些操作时,它将创建相应的 MasterNodeRequest 实现类的实例,填充请求的参数和数据,并将其发送给主节点。主节点根据不同的 MasterNodeRequest 实现类的类型,执行相应的操作

    /**
     * A based request for master based operation.
     * 在master上
     */
    public abstract class MasterNodeRequest<Request extends MasterNodeRequest<Request>> extends ActionRequest {
    
        public static final TimeValue DEFAULT_MASTER_NODE_TIMEOUT = TimeValue.timeValueSeconds(30);
    
        protected TimeValue masterNodeTimeout = DEFAULT_MASTER_NODE_TIMEOUT;
    
        protected MasterNodeRequest() {}
    
        protected MasterNodeRequest(StreamInput in) throws IOException {
            super(in);
            masterNodeTimeout = in.readTimeValue();
        }
    
        @Override
        public void writeTo(StreamOutput out) throws IOException {
            super.writeTo(out);
            out.writeTimeValue(masterNodeTimeout);
        }
    
        /**
         * A timeout value in case the master has not been discovered yet or disconnected.
         */
        @SuppressWarnings("unchecked")
        public final Request masterNodeTimeout(TimeValue timeout) {
            this.masterNodeTimeout = timeout;
            return (Request) this;
        }
    
        /**
         * A timeout value in case the master has not been discovered yet or disconnected.
         */
        public final Request masterNodeTimeout(String timeout) {
            return masterNodeTimeout(TimeValue.parseTimeValue(timeout, null, getClass().getSimpleName() + ".masterNodeTimeout"));
        }
    
        public final TimeValue masterNodeTimeout() {
            return this.masterNodeTimeout;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    这里有点模糊,后面学到数据节点向主节点请求或者同步什么时,我再挂个链接

    2、Master节点处理来自客户端的请求(以创建索引请求举例)

    (1)首先会找到RestHandler中创建索引的Action(RestCreateIndexAction)

    至于请求如何找到RestCreateIndexAction的,可以参考Elasticsearch 8.9启动时构建接收Rest请求的hander过程源码

    @ServerlessScope(Scope.PUBLIC)
    public class RestCreateIndexAction extends BaseRestHandler {
     	//省略代码  
        @Override
        public List<Route> routes() {
            return List.of(new Route(PUT, "/{index}"));
        }
    
        @Override
        public String getName() {
            return "create_index_action";
        }
    
        @Override
        public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
            CreateIndexRequest createIndexRequest;
            if (request.getRestApiVersion() == RestApiVersion.V_7) {
                createIndexRequest = prepareRequestV7(request);
            } else {
                createIndexRequest = prepareRequest(request);
            }
            return channel -> client.admin().indices().create(createIndexRequest, new RestToXContentListener<>(channel));
        }
      //省略代码  
    }    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    (2)再执行继承自TransportMasterNodeAction的Action必须实现的masterOperation方法

    TransportMasterNodeAction 主要用于处理来自节点的各种管理操作请求,如创建索引、删除索引、更新集群设置等。
    当节点(数据节点)发送请求到主节点时,请求会被传递给相应的 TransportMasterNodeAction 实现类进行处理。实现类会根据请求的类型,执行相应的操作逻辑,并返回执行结果给主节点。

    
    /**
     * 需要在主节点上执行的操作的基类。
     * A base class for operations that needs to be performed on the master node.
     *
     */
    public abstract class TransportMasterNodeAction<Request extends MasterNodeRequest<Request>, Response extends ActionResponse> extends
        HandledTransportAction<Request, Response>
        implements
            ActionWithReservedState<Request> {
       //省略代码     
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    /**
     * 创建索引操作
     */
    public class TransportCreateIndexAction extends TransportMasterNodeAction<CreateIndexRequest, CreateIndexResponse> {
        @Override
        protected void masterOperation(
            Task task,
            final CreateIndexRequest request,
            final ClusterState state,
            final ActionListener<CreateIndexResponse> listener
        ) {
        //省略代码
         createIndexService.createIndex(
                updateRequest,
                listener.map(response -> new CreateIndexResponse(response.isAcknowledged(), response.isShardsAcknowledged(), indexName))
            );
        }
    }    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    二、RestHander的Action如何映射到TransportMasterNodeAction(还是以创建索引举例)

    这个场景为主节点和数据节点分离的情况

    1、首先通过nodeClient执行doExecute()

    client.admin().indices().createcreate方法调用IndicesAdmin类的create方法,再调用execute方法的入参是 CreateIndexAction.INSTANCE

    static class IndicesAdmin implements IndicesAdminClient {
    	  @Override
          public void create(final CreateIndexRequest request, final ActionListener<CreateIndexResponse> listener) {
                execute(CreateIndexAction.INSTANCE, request, listener);
    	 }
    
    	
    	 @Override
    	public <Request extends ActionRequest, Response extends ActionResponse> ActionFuture<Response> execute(
    	    ActionType<Response> action,
    	    Request request
    	) {
    	    return client.execute(action, request);
    	 }
    	
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    调用的是AbstractClientexecute方法

      /**
         * This is the single execution point of *all* clients.
         * 这是所有客户端的单个执行点。
         */
        @Override
        public final <Request extends ActionRequest, Response extends ActionResponse> void execute(
            ActionType<Response> action,
            Request request,
            ActionListener<Response> listener
        ) {
            try {
                doExecute(action, request, listener);
            } catch (Exception e) {
                assert false : new AssertionError(e);
                listener.onFailure(e);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    doExecute方法调用的是NodeClient类的方法

      @Override
        public <Request extends ActionRequest, Response extends ActionResponse> void doExecute(
            ActionType<Response> action,
            Request request,
            ActionListener<Response> listener
        ) {
            // Discard the task because the Client interface doesn't use it.
            try {
                executeLocally(action, request, listener);
            } catch (TaskCancelledException | IllegalArgumentException | IllegalStateException e) {
                listener.onFailure(e);
            }
        }
     	/**
         *在本地执行 {@link ActionType},返回用于跟踪它的 {@link Task},并链接 {@link ActionListener}。如果在侦听响应时不需要访问任务,则首选此方法。这是用于实现 {@link 客户端} 接口的方法。
         */
        public <Request extends ActionRequest, Response extends ActionResponse> Task executeLocally(
            ActionType<Response> action,
            Request request,
            ActionListener<Response> listener
        ) {
        	//注册并执行任务
            return taskManager.registerAndExecute(
                "transport",
                transportAction(action),
                request,
                localConnection,
                new SafelyWrappedActionListener<>(listener)
            );
        }   
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    之后调用TaskManager.java的方法

    2、创建一个task任务异步执行TransportAction

    public <Request extends ActionRequest, Response extends ActionResponse> Task registerAndExecute(
            String type,
            TransportAction<Request, Response> action,
            Request request,
            Transport.Connection localConnection,
            ActionListener<Response> taskListener
        ) { 
            //检查请求是否有父任务,如果有,则注册子连接。
            final Releasable unregisterChildNode;
            if (request.getParentTask().isSet()) {
                unregisterChildNode = registerChildConnection(request.getParentTask().getId(), localConnection);
            } else {
                unregisterChildNode = null;
            }
            //创建一个新的跟踪上下文
            try (var ignored = threadPool.getThreadContext().newTraceContext()) {
                final Task task;
                //注册一个任务,并捕获可能的取消任务异常。
                try {
                    task = register(type, action.actionName, request);
                } catch (TaskCancelledException e) {
                    Releasables.close(unregisterChildNode);
                    throw e;
                }
                //执行操作,并在操作完成时调用相应的监听器。
                action.execute(task, request, new ActionListener<>() {
                    @Override
                    public void onResponse(Response response) {
                        try {
                            release();
                        } finally {
                            taskListener.onResponse(response);
                        }
                    }
                    //根据操作的成功或失败情况,取消子任务并释放资源。
                    @Override
                    public void onFailure(Exception e) {
                        try {
                            if (request.getParentTask().isSet()) {
                                cancelChildLocal(request.getParentTask(), request.getRequestId(), e.toString());
                            }
                            release();
                        } finally {
                            taskListener.onFailure(e);
                        }
                    }
    
                    @Override
                    public String toString() {
                        return this.getClass().getName() + "{" + taskListener + "}{" + task + "}";
                    }
    
                    private void release() {
                        Releasables.close(unregisterChildNode, () -> unregister(task));
                    }
                });
                //返回任务对象。
                return task;
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60

    下面是TransportAction.java类中的方法

        /**
         * Use this method when the transport action should continue to run in the context of the current task
         * 当传输操作应继续在当前任务的上下文中运行时,请使用此方法
         */
        public final void execute(Task task, Request request, ActionListener<Response> listener) {
            final ActionRequestValidationException validationException;
            //对请求进行验证,如果验证过程中出现异常,则记录错误日志并通知监听器执行失败。
            try {
                validationException = request.validate();
            } catch (Exception e) {
                assert false : new AssertionError("validating of request [" + request + "] threw exception", e);
                logger.warn("validating of request [" + request + "] threw exception", e);
                listener.onFailure(e);
                return;
            }
            if (validationException != null) {
                listener.onFailure(validationException);
                return;
            }
            //检查是否存在任务且请求需要存储结果,如果满足条件,则创建一个TaskResultStoringActionListener实例,用于在任务完成后将结果存储起来。
            if (task != null && request.getShouldStoreResult()) {
                listener = new TaskResultStoringActionListener<>(taskManager, task, listener);
            }
            //创建一个请求过滤器链(RequestFilterChain),然后调用proceed方法,将任务、动作名称、请求和监听器传递给过滤器链进行处理。
            RequestFilterChain<Request, Response> requestFilterChain = new RequestFilterChain<>(this, logger);
            requestFilterChain.proceed(task, actionName, request, listener);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
     @Override
            public void proceed(Task task, String actionName, Request request, ActionListener<Response> listener) {
                int i = index.getAndIncrement();
                try {
                    if (i < this.action.filters.length) {
                        this.action.filters[i].apply(task, actionName, request, listener, this);
                    } else if (i == this.action.filters.length) {
                    //`this.action.doExecute(task, request, listener);` 中`action`对应的是`TransportMasterNodeAction`。
                        this.action.doExecute(task, request, listener);
                    } else {
                        listener.onFailure(new IllegalStateException("proceed was called too many times"));
                    }
                } catch (Exception e) {
                    logger.trace("Error during transport action execution.", e);
                    listener.onFailure(e);
                }
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    this.action.doExecute(task, request, listener);action对应的是TransportMasterNodeAction

    3、TransportMasterNodeAction中doExecute会通过线程池调用子类实现的masterOperation方法

    TransportMasterNodeAction继承HandledTransportAction
    HandledTransportAction继承自TransportAction

    public abstract class TransportMasterNodeAction<Request extends MasterNodeRequest<Request>, Response extends ActionResponse> extends
        HandledTransportAction<Request, Response>
        implements
            ActionWithReservedState<Request> {
     @Override
        protected void doExecute(Task task, final Request request, ActionListener<Response> listener) {
           //省略代码
            new AsyncSingleAction(task, request, listener).doStart(state);
        }
    }    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
     protected void doStart(ClusterState clusterState) {
    	  threadPool.executor(executor).execute(ActionRunnable.wrap(delegate, l -> executeMasterOperation(task, request, clusterState, l)));
      }
    private void executeMasterOperation(Task task, Request request, ClusterState state, ActionListener<Response> listener)
            throws Exception {
           //调用子类实现
            masterOperation(task, request, state, listener);
        }
    //子类实现   
    protected abstract void masterOperation(Task task, Request request, ClusterState state, ActionListener<Response> listener)
            throws Exception;
       
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    4、TransportCreateIndexAction的masterOperation实现会调用createIndexService接口创建索引

    其中创建索引的actionTransportCreateIndexAction

     @Override
        protected void masterOperation(
            Task task,
            final CreateIndexRequest request,
            final ClusterState state,
            final ActionListener<CreateIndexResponse> listener
        ) {
    	createIndexService.createIndex(
                updateRequest,
                listener.map(response -> new CreateIndexResponse(response.isAcknowledged(), response.isShardsAcknowledged(), indexName))
            );
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    之后调用createIndexService.createIndex创建索引

    三、快速找到像TransportCreateIndexAction类的小窍门

    在上面

    static class IndicesAdmin implements IndicesAdminClient {
    	  @Override
          public void create(final CreateIndexRequest request, final ActionListener<CreateIndexResponse> listener) {
                execute(CreateIndexAction.INSTANCE, request, listener);
    	 }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    记住CreateIndexAction.INSTANCE,在直接去ActionModule.java找到下面这句,就知道实际执行的是TransportCreateIndexActionmasterOperation 方法,

        actions.register(CreateIndexAction.INSTANCE, TransportCreateIndexAction.class);
    
    
    • 1
    • 2
  • 相关阅读:
    循环神经网络
    家用电脑做服务器,本地服务器搭建,公网IP申请,路由器改桥接模式,拨号上网
    移远通信推出高性能九合一5G组合天线
    haas506 2.0开发教程-高级组件库-modem.sms(仅支持2.2以上版本)
    Mybatis动态SQL和分页
    go-10-字符串操作
    【java学习】构造方法重载(26)
    基于django | 创建app,并启动django
    HttpContext.TraceIdentifier那严谨的设计
    Spring容器生命周期--SmartLifecycle的用法
  • 原文地址:https://blog.csdn.net/weixin_43113679/article/details/133968671