• elasticsearch DSL部分 源码解析


    DSL查询条件解析源码

    参考文章:https://www.jianshu.com/p/b77e80d6c18e

    先了解es的调用链路——看一次搜索的时序图

    一次查询分为三部分

    1. 将用户请求restRequest 转发到应该处理本次请求的RestAction

      img

      以search查询为例:一次调用链路:

      Netty4HttpRequestHandler#channelRead0() 调用 Netty4HttpServerTransport#dispatchRequest()

      Netty4HttpServerTransport#dispatchRequest() 这里是接口

      RestController#dispatchRequest()实现了上边的接口

      接着调用了RestController#tryAllHandlers()

      RestController#tryAllHandlers() 调用 RestController#dispatchRequest()

      RestController#dispatchRequest() 调用 BaseRestHandler#handleRequest()

      BaseRestHandler#handleRequest() 调用子类RestSearchAction#prepareRequest()

    2. 将RestRequest(http层面的请求)转换成 SearchRequest(es内部认识能处理的请求),然后调用search(),此时相当于进入到了controller层。

      img

    3. 还要根据请求,来找到要处理的服务层,对应es的是TransportSearchAction,调用其子类的execute()方法来执行搜索。

      img

    清楚时序图以后,再来看DSL查询条件解析的部分

    实际上对应时序图中的第二部分,对应的代码在RestSearchAction类中

    RestSearchAction类继承了BaseRestHandler类,重写了prepareRequest方法。

      @Override
        public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
            SearchRequest searchRequest = new SearchRequest();
            searchRequest.remoteAddress(new TransportAddress((InetSocketAddress)request.getRemoteAddress()));
            IntConsumer setSize = size -> searchRequest.source().size(size);
            // 下边的 parser -> 是一个lambda表达式。理解起来不太容易。要清楚 parser是在RestRequest#withContentOrSourceParamParserOrNull()方法中完成构建的!知道这一点很重要,因为这个parser是非常重要的。它将请求中的条件进行了解析。并作为了参数给了parseSearchRequest()方法,在该方法里边,去构造了查询条件。
            request.withContentOrSourceParamParserOrNull(parser ->
                //重点在这里,将RestRequest转换成了SearchRequest
                parseSearchRequest(searchRequest, request, parser, setSize));
    
            return channel -> client.search(searchRequest, new RestStatusToXContentListener<>(channel));
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    可以看下一下RestRequest#withContentOrSourceParamParserOrNull()方法

    从RestRequest对象中,获取请求内容,转成XContentParser对象。

    public final void withContentOrSourceParamParserOrNull(CheckedConsumer<XContentParser, IOException> withParser) throws IOException {
            if (hasContentOrSourceParam()) {
                Tuple<XContentType, BytesReference> tuple = contentOrSourceParam();
                BytesReference content = tuple.v2();
                XContentType xContentType = tuple.v1();
                try (InputStream stream = content.streamInput();
                     // 在这里创建了 parser出来,注意这里
                     XContentParser parser = xContentType.xContent()
                         .createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, stream)) {
                    withParser.accept(parser);
                }
            } else {
                withParser.accept(null);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    接着再看 parseSearchRequest() 方法源码

     /**
         * Parses the rest request on top of the SearchRequest, preserving values that are not overridden by the rest request.
         *
         * @param requestContentParser body of the request to read. This method does not attempt to read the body from the {@code request}
         *        parameter
         * @param setSize how the size url parameter is handled. {@code udpate_by_query} and regular search differ here.
         */
        public static void parseSearchRequest(SearchRequest searchRequest, RestRequest request,
                                              XContentParser requestContentParser,
                                              IntConsumer setSize) throws IOException {
    
            if (searchRequest.source() == null) {
                searchRequest.source(new SearchSourceBuilder());
            }
            // 获取到索引的索引
            searchRequest.indices(Strings.splitStringByCommaToArray(request.param("index")));
            if (requestContentParser != null) {
                // 这里是重点!正是这个方法,将http的查询条件,转换成了es的查询条件。
                searchRequest.source().parseXContent(requestContentParser, true);
            }
    
            final int batchedReduceSize = request.paramAsInt("batched_reduce_size", searchRequest.getBatchedReduceSize());
            searchRequest.setBatchedReduceSize(batchedReduceSize);
            searchRequest.setPreFilterShardSize(request.paramAsInt("pre_filter_shard_size", searchRequest.getPreFilterShardSize()));
    
            if (request.hasParam("max_concurrent_shard_requests")) {
                // only set if we have the parameter since we auto adjust the max concurrency on the coordinator
                // based on the number of nodes in the cluster
                final int maxConcurrentShardRequests = request.paramAsInt("max_concurrent_shard_requests",
                    searchRequest.getMaxConcurrentShardRequests());
                searchRequest.setMaxConcurrentShardRequests(maxConcurrentShardRequests);
            }
    
            if (request.hasParam("allow_partial_search_results")) {
                // only set if we have the parameter passed to override the cluster-level default
                searchRequest.allowPartialSearchResults(request.paramAsBoolean("allow_partial_search_results", null));
            }
    
            // do not allow 'query_and_fetch' or 'dfs_query_and_fetch' search types
            // from the REST layer. these modes are an internal optimization and should
            // not be specified explicitly by the user.
            String searchType = request.param("search_type");
            if ("query_and_fetch".equals(searchType) ||
                    "dfs_query_and_fetch".equals(searchType)) {
                throw new IllegalArgumentException("Unsupported search type [" + searchType + "]");
            } else {
                searchRequest.searchType(searchType);
            }
            parseSearchSource(searchRequest.source(), request, setSize);
            searchRequest.requestCache(request.paramAsBoolean("request_cache", null));
    
            String scroll = request.param("scroll");
            if (scroll != null) {
                searchRequest.scroll(new Scroll(parseTimeValue(scroll, null, "scroll")));
            }
    
            searchRequest.types(Strings.splitStringByCommaToArray(request.param("type")));
            searchRequest.routing(request.param("routing"));
            searchRequest.preference(request.param("preference"));
            searchRequest.indicesOptions(IndicesOptions.fromRequest(request, searchRequest.indicesOptions()));
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61

    进入parseXContent()方法

     /**
         * Parse some xContent into this SearchSourceBuilder, overwriting any values specified in the xContent. Use this if you need to set up
         * different defaults than a regular SearchSourceBuilder would have and use {@link #fromXContent(XContentParser, boolean)} if you have
         * normal defaults.
         *
         * @param parser The xContent parser.
         * @param checkTrailingTokens If true throws a parsing exception when extra tokens are found after the main object.
         */
        public void parseXContent(XContentParser parser, boolean checkTrailingTokens) throws IOException {
            XContentParser.Token token = parser.currentToken();
            String currentFieldName = null;
            if (token != XContentParser.Token.START_OBJECT && (token = parser.nextToken()) != XContentParser.Token.START_OBJECT) {
                throw new ParsingException(parser.getTokenLocation(), "Expected [" + XContentParser.Token.START_OBJECT +
                        "] but found [" + token + "]", parser.getTokenLocation());
            }
            while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
                if (token == XContentParser.Token.FIELD_NAME) {
                    currentFieldName = parser.currentName();
                    // 如果是查询条件的参数,例如:from size 
                } else if (token.isValue()) {
                    if (FROM_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                        from = parser.intValue();
                    } else if (SIZE_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                        size = parser.intValue();
                    } else if (TIMEOUT_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                        timeout = TimeValue.parseTimeValue(parser.text(), null, TIMEOUT_FIELD.getPreferredName());
                    } else if (TERMINATE_AFTER_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                        terminateAfter = parser.intValue();
                    } else if (MIN_SCORE_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                        minScore = parser.floatValue();
                    } else if (VERSION_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                        version = parser.booleanValue();
                    } else if (SEQ_NO_PRIMARY_TERM_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                        seqNoAndPrimaryTerm = parser.booleanValue();
                    } else if (EXPLAIN_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                        explain = parser.booleanValue();
                    } else if (TRACK_SCORES_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                        trackScores = parser.booleanValue();
                    } else if (TRACK_TOTAL_HITS_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                        trackTotalHits = parser.booleanValue();
                    } else if (_SOURCE_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                        fetchSourceContext = FetchSourceContext.fromXContent(parser);
                    } else if (STORED_FIELDS_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                        storedFieldsContext =
                            StoredFieldsContext.fromXContent(SearchSourceBuilder.STORED_FIELDS_FIELD.getPreferredName(), parser);
                    } else if (SORT_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                        sort(parser.text());
                    } else if (PROFILE_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                        profile = parser.booleanValue();
                    } else {
                        throw new ParsingException(parser.getTokenLocation(), "Unknown key for a " + token + " in [" + currentFieldName + "].",
                                parser.getTokenLocation());
                    }
                    // 如果是查询条件 例如:query  agg
                } else if (token == XContentParser.Token.START_OBJECT) {
                    if (QUERY_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                        // 构造查询条件
                        queryBuilder = parseInnerQueryBuilder(parser);
                    } else if (POST_FILTER_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                        postQueryBuilder = parseInnerQueryBuilder(parser);
                    } else if (_SOURCE_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                        fetchSourceContext = FetchSourceContext.fromXContent(parser);
                    } else if (SCRIPT_FIELDS_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                        scriptFields = new ArrayList<>();
                        while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
                            scriptFields.add(new ScriptField(parser));
                        }
                    } else if (INDICES_BOOST_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                        DEPRECATION_LOGGER.deprecated(
                            "Object format in indices_boost is deprecated, please use array format instead");
                        while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
                            if (token == XContentParser.Token.FIELD_NAME) {
                                currentFieldName = parser.currentName();
                            } else if (token.isValue()) {
                                indexBoosts.add(new IndexBoost(currentFieldName, parser.floatValue()));
                            } else {
                                throw new ParsingException(parser.getTokenLocation(), "Unknown key for a " + token +
                                    " in [" + currentFieldName + "].", parser.getTokenLocation());
                            }
                        }
                    } else if (AGGREGATIONS_FIELD.match(currentFieldName, parser.getDeprecationHandler())
                            || AGGS_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                        aggregations = AggregatorFactories.parseAggregators(parser);
                    } else if (HIGHLIGHT_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                        highlightBuilder = HighlightBuilder.fromXContent(parser);
                    } else if (SUGGEST_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                        suggestBuilder = SuggestBuilder.fromXContent(parser);
                    } else if (SORT_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                        sorts = new ArrayList<>(SortBuilder.fromXContent(parser));
                    } else if (RESCORE_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                        rescoreBuilders = new ArrayList<>();
                        rescoreBuilders.add(RescorerBuilder.parseFromXContent(parser));
                    } else if (EXT_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                        extBuilders = new ArrayList<>();
                        String extSectionName = null;
                        while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
                            if (token == XContentParser.Token.FIELD_NAME) {
                                extSectionName = parser.currentName();
                            } else {
                                SearchExtBuilder searchExtBuilder = parser.namedObject(SearchExtBuilder.class, extSectionName, null);
                                if (searchExtBuilder.getWriteableName().equals(extSectionName) == false) {
                                    throw new IllegalStateException("The parsed [" + searchExtBuilder.getClass().getName() + "] object has a "
                                            + "different writeable name compared to the name of the section that it was parsed from: found ["
                                            + searchExtBuilder.getWriteableName() + "] expected [" + extSectionName + "]");
                                }
                                extBuilders.add(searchExtBuilder);
                            }
                        }
                    } else if (SLICE.match(currentFieldName, parser.getDeprecationHandler())) {
                        sliceBuilder = SliceBuilder.fromXContent(parser);
                    } else if (COLLAPSE.match(currentFieldName, parser.getDeprecationHandler())) {
                        collapse = CollapseBuilder.fromXContent(parser);
                    } else {
                        throw new ParsingException(parser.getTokenLocation(), "Unknown key for a " + token + " in [" + currentFieldName + "].",
                                parser.getTokenLocation());
                    }
                } else if (token == XContentParser.Token.START_ARRAY) {
                    if (STORED_FIELDS_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                        storedFieldsContext = StoredFieldsContext.fromXContent(STORED_FIELDS_FIELD.getPreferredName(), parser);
                    } else if (DOCVALUE_FIELDS_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                        docValueFields = new ArrayList<>();
                        while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
                            docValueFields.add(FieldAndFormat.fromXContent(parser));
                        }
                    } else if (INDICES_BOOST_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                        while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
                            indexBoosts.add(new IndexBoost(parser));
                        }
                    } else if (SORT_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                        sorts = new ArrayList<>(SortBuilder.fromXContent(parser));
                    } else if (RESCORE_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                        rescoreBuilders = new ArrayList<>();
                        while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
                            rescoreBuilders.add(RescorerBuilder.parseFromXContent(parser));
                        }
                    } else if (STATS_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                        stats = new ArrayList<>();
                        while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
                            if (token == XContentParser.Token.VALUE_STRING) {
                                stats.add(parser.text());
                            } else {
                                throw new ParsingException(parser.getTokenLocation(), "Expected [" + XContentParser.Token.VALUE_STRING +
                                        "] in [" + currentFieldName + "] but found [" + token + "]", parser.getTokenLocation());
                            }
                        }
                    } else if (_SOURCE_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                        fetchSourceContext = FetchSourceContext.fromXContent(parser);
                    } else if (SEARCH_AFTER.match(currentFieldName, parser.getDeprecationHandler())) {
                        searchAfterBuilder = SearchAfterBuilder.fromXContent(parser);
                    } else {
                        throw new ParsingException(parser.getTokenLocation(), "Unknown key for a " + token + " in [" + currentFieldName + "].",
                                parser.getTokenLocation());
                    }
                } else {
                    throw new ParsingException(parser.getTokenLocation(), "Unknown key for a " + token + " in [" + currentFieldName + "].",
                            parser.getTokenLocation());
                }
            }
            if (checkTrailingTokens) {
                boolean success;
                try {
                    token = parser.nextToken();
                    success = token == null;
                } catch (JsonParseException exc) {
                    success = false;
                }
                if (success == false) {
                    DEPRECATION_LOGGER.deprecated("Found extra tokens after the _search request body, " +
                        "an error will be thrown in the next major version");
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
  • 相关阅读:
    老胡的周刊(第107期)
    vue中一个页面引入多个相同组件重复请求的问题?
    视频需求超平常数 10 倍,却节省 60% 的 IT 成本投入是种什么样体验?
    第13章 Centreon备份与恢复
    使用FFmpeg合并多个ts视频文件转为mp4格式
    单页面应用与多页面应用的区别?
    RabbitMQ无法删除unsynchronized队列及解决办法
    Oracle 11g_FusionOS_安装文档
    Gateway 接口参数加解密
    FANUC机器人零点复归的报警原因分析和零点标定相关步骤
  • 原文地址:https://blog.csdn.net/star1210644725/article/details/126100841