• 【解决】elasticsearch:Could not parse aggregation keyed as [%s]问题


    背景#

    在做elasticsearch集群从原来的2.x版本升级到更新版本如6.x过程中,由于需要在原来的应用中,同时连接2.x的集群以及6.x的集群来做在线动态灰度切流量,保证流量平滑切换,有问题可随时回切;一般在应用侧比较常规的做法是使用elasticsearch提供rest的sdk:Java High Level REST Client,可以最大程度的保证多版本的兼容性;但在切换过程中也遇到了比较一些兼容性的问题,今天主要来讨论一下关于聚合结果处理的问题;

    直方图统计2.x集群返回结果无法解决的问题#

    在应用中有一个根据价格区间的直方图统计服务,统计查询语句如下:

    //POST /my-index/my-type/_search
    {
     "from": 0,
     "size": 0,
     "timeout": "10000ms",
     "aggregations": {
       "statPrice": {
         "histogram": {
           "field": "price",
           "interval": 10000,
           "min_doc_count": 1
         }
       }
     }
    }
    

    正常的出参数据如下:

    {
        "_shards":{
            "total":16,
            "failed":0,
            "successful":16,
            "skipped":0
        },
        "hits":{
            "hits":[
    
            ],
            "total":243399987,
            "max_score":0
        },
        "took":539,
        "timed_out":false,
        "aggregations":{
            "statPrice":{
                "buckets":[
                    {
                        "doc_count":123,
                        "key":0.0
                    },
                    {
                        "doc_count":345,
                        "key":1000.0
                    },
                    {
                        "doc_count":780,
                        "key":2000.0
                    }
                ]
            }
        }
    }
    

    在应用中通过rest api调用,6.x集群可以正常的获取数据,没有任务问题,这里因为我们使用的elasticsearch-rest-high-level-client的版本就是6.x的,这个跟es服务的版本是一样;但是在2.x的老集群上调用时,返回异常; 提示Could not parse aggregation keyed as [statPrice],详细异常如下

    Caused by: org.elasticsearch.common.ParsingException: Could not parse aggregation keyed as [statPrice]
    	at org.elasticsearch.search.aggregations.Aggregations.fromXContent(Aggregations.java:146) ~[elasticsearch-6.3.2.jar:6.3.2]
    	at org.elasticsearch.action.search.SearchResponse.innerFromXContent(SearchResponse.java:289) ~[elasticsearch-6.3.2.jar:6.3.2]
    	at org.elasticsearch.action.search.SearchResponse.fromXContent(SearchResponse.java:248) ~[elasticsearch-6.3.2.jar:6.3.2]
    	at org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:653) ~[elasticsearch-rest-high-level-client-6.3.2.jar:6.3.2]
    	at org.elasticsearch.client.RestHighLevelClient.lambda$performRequestAndParseEntity$2(RestHighLevelClient.java:508) ~[elasticsearch-rest-high-level-client-6.3.2.jar:6.3.2]
    	at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:539) ~[elasticsearch-rest-high-level-client-6.3.2.jar:6.3.2]
    	at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:508) ~[elasticsearch-rest-high-level-client-6.3.2.jar:6.3.2]
    	at org.elasticsearch.client.RestHighLevelClient.search(RestHighLevelClient.java:404) ~[elasticsearch-rest-high-level-client-6.3.2.jar:6.3.2]
    	at com.jd.b2b.ware.admin.compare.core.support.IncrementSyncHandler.statCount(IncrementSyncHandler.java:133) ~[b2b-ware-admin-compare-0.0.1-SNAPSHOT.jar:?]
    	... 48 more
    

    类:org.elasticsearch.search.aggregations.Aggregations.fromXContent的部分代码

     public static Aggregations fromXContent(XContentParser parser) throws IOException {
            final List aggregations = new ArrayList<>();
            XContentParser.Token token;
            while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
                if (token == XContentParser.Token.START_OBJECT) {
                    SetOnce typedAgg = new SetOnce<>();
                    String currentField = parser.currentName();
                    // TYPED_KEYS_DELIMITER为#, 使用 typed_keys 参数在聚合名称前面加上聚合名称类型前缀时使用的分隔符
                    parseTypedKeysObject(parser, Aggregation.TYPED_KEYS_DELIMITER, Aggregation.class, typedAgg::set);
                    if (typedAgg.get() != null) {
                        aggregations.add(typedAgg.get());
                    } else {
                        throw new ParsingException(parser.getTokenLocation(),
                                String.format(Locale.ROOT, "Could not parse aggregation keyed as [%s]", currentField));
                    }
                }
            }
            return new Aggregations(aggregations);
        }
    

    通过对报错处的代码debug以及分析,这里会对结果的聚合名称根据#号,取到对应的聚合类型的名称;先是翻看了官方文档,了解了一下typed_keys Aggregations 参数的说明;该参数就是指定返回聚合类型;官方文档最早出现是在7.x的版本,但我们在6.x的集群上同样支持该参数;并且增加该参数与比2.x,出参结果除了在集合名称中有聚合类型外,其他都是一样的;

    POST /my-index/my-type/_search?typed_keys  (或则typed_keys=true 都可以)
    

    聚合结果的对比

    这里又仔细回去检查了我们自己的代码,并没有设置typed_keys参数;本着刨根问底的精神,进行了详细debug,在org.elasticsearch.client.Request#search找到了该参数的设置;这个参数是固定给入的,并且没有参数可以进行覆盖修改;

    static Request search(SearchRequest searchRequest) throws IOException {
            String endpoint = endpoint(searchRequest.indices(), searchRequest.types(), "_search");
            Params params = Request.Params.builder();
            params.putParam("typed_keys", "true");
            params.withRouting(searchRequest.routing());
            ........
    

    到此问题原因基本以找到,解决方案就比较如搞;

    方案一:使用LowLevelClient#

    使用LowLevelClient应该是解决这类兼容性问题最通用的办法,因为这个LowLevelClient就是个httpClient,通过给服务端送查询DSL,再获取到出参字符串,基本简单的json字符串解析即可;代码片段如下:

    SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
    searchSourceBuilder.from(0).size(0).timeout(TimeValue.timeValueMillis(10000L));
    
    //数据聚合
    searchSourceBuilder.query(queryBuilder);
    AggregationBuilder aggregationBuilder = AggregationBuilders
            .histogram("statPrice")
            .field("price")
            .interval(1000)
            .minDocCount(1);
    searchSourceBuilder.aggregation(aggregationBuilder);
    log.info("入参:{}", searchSourceBuilder.toString());
    //从highLevelClient中获取到LowLevelClient
    RestClient client = highLevelClient.getLowLevelClient();
    Map params = new HashMap<>();
    HttpEntity entity = new NStringEntity(searchSourceBuilder.toString(), ContentType.APPLICATION_JSON);
    Response response = client.performRequest("POST","/my-index/my-type/_search",
            params, entity);
    String responseBody = EntityUtils.toString(response.getEntity());
    //后续的业务处理
    

    方案二:聚合名称前缀聚合类型#

    针对这个场景,还有一个比较简便的办法;在访问2.x集群时,查询的DSL语句的聚合名字前加上他们需要的前缀即可;查询入参的原来聚合名称是statPrice,现在改成histogram#statPrice即可;

    // 2.x集群
    //POST /my-index/my-type/_search
     {
      "from": 0,
      "size": 0,
      "timeout": "10000ms",
      "aggregations": {
        "histogram#statPrice": {
          "histogram": {
            "field": "price",
            "interval": 10000,
            "min_doc_count": 1
          }
        }
      }
    }
    

    后续的处理就跟6.x集群一样的,也是通过statPrice去取聚合的结果;具体还有哪么前缀可以参考 org.elasticsearch.client.RestHighLevelClient#getDefaultNamedXContents这里的注册信息,可以在6.x等更高的版本中加入typed_keys=true参数,执行一下查询,查看返回的前缘是什么,即可;

    参考#

  • 相关阅读:
    Oracle中序列、索引、触发器查询
    Redis 实战缓存
    git代码回退方法简要总结
    由于找不到packet.dll,无法继续执行代码的多种解决方法分享
    镜像站制作 centos8
    Opengl之立方体贴图
    向量数据库是如何检索的?基于 Feder 的 HNSW 可视化实现
    平面口径天线简谈
    【Qt】控件探幽——QLineEdit
    瑞士军刀Netcat
  • 原文地址:https://www.cnblogs.com/salted/p/17782853.html