• SpringCloud-ES学习 第七天


    数据聚合

    • 聚合的种类
    • DSL实现聚合
    • RestAPI实现聚合

    聚合的分类 

    聚合(aggregations)可以实现对文档数据的统计、分析、运算。聚合常见的有三类:

    • 桶(bucket)聚合:用来对文档做分组
      • TermAggregation:按照文档字段值分组
      • Date Histogram:按照日期划分,列如一周为一组,或者一月为一组
    • 度量(Metric)聚合:用以计算一些值,比如:最大值、最小值、平均值等
      • Avg:求平均值
      • Max:求最大值
      • Min:求最小值
      • Stats:同时求max、min、avg、sum等
    • 管道(pipeline)聚合:其他聚合的结果为基础做聚合

    总结:

    什么是聚合?

    • 聚合是对文档数据的统计、分析、计算

    聚合的常见种类有哪些?

    • Bucket:对文档数据分组,并统计每组数量
    • Metric:对文档数据做计算,列如avg
    • Pipeline:基于其他聚合结果再做聚合

    参与聚合的字段类型必须是:

    • keyword
    • 数值
    • 日期
    • 布尔

    DSL实现Bucket聚合

    现在,我们要统计所有数据中的酒店品牌有几种,此时可以根据酒店品牌的名称做聚合。类型为term类型,DSL示例:

    1. GET /hotel/_search
    2. {
    3. "size":0, # 设置size0,结果中不包含文档,只包含聚合结果,就是没有具体数据
    4. "aggs":{ # 定义聚合
    5. "brandAgg":{ # 给聚合起个名字
    6. "terms":{ # 聚合的类型,按照品牌值的聚合,所以选择term
    7. "field":"brand", # 参与聚合的字段
    8. "size": 20 # 希望获取的聚合的结果数量
    9. }
    10. }
    11. }

    结果示例:

    Bucket聚合- 自定义排序规则

    添加order字段

    1. GET /hotel/_search
    2. {
    3. "size": 0,
    4. "aggs": {
    5. "brandAggs": {
    6. "terms": {
    7. "field": "brand",
    8. "size": 20,
    9. "order": {
    10. "_count": "asc" #根据count字段升序排序
    11. }
    12. }
    13. }
    14. }
    15. }

    Bucket聚合-限定聚合范围

    默认情况下,bucket绝活是对索引库所有文档做聚合,我们可以限定要聚合的文档范围,只要添加query条件即可:

    1. GET /hotel/_search
    2. {
    3. "query": {
    4. "range": {
    5. "price": {
    6. "gte": 10,
    7. "lte": 200
    8. }
    9. }
    10. },
    11. "size": 0,
    12. "aggs": {
    13. "brandAggs": {
    14. "terms": {
    15. "field": "brand",
    16. "size": 10,
    17. "order": {
    18. "_count": "desc"
    19. }
    20. }
    21. }
    22. }
    23. }

    总结:

    aggs代表聚合,与query同级,此时query的作用是?

    • 限定聚合查询文档范围

    聚合必须的三要素:

    • 聚合的名称
    • 聚合的类型
    • 聚合的字段

    聚合可配置属性有:

    • size:指定聚合结果数量
    • order:指定聚合结果排序方式
    • field:指定聚合的字段

    题外知识:

    实现多条件复合查询:

    通过bool连接多条条件。

    1. # 多条件查询
    2. GET /hotel/_search
    3. {
    4. "query": {
    5. "bool": {
    6. "must": [
    7. {
    8. "range": {
    9. "price": {
    10. "gte": 10,
    11. "lte": 200
    12. }
    13. }
    14. },{
    15. "term": {
    16. "brand":{
    17. "value": "7天酒店"
    18. }
    19. }
    20. }
    21. ]
    22. }
    23. }
    24. }

    DSl实现Metrics聚合

    聚合查询按照品牌分组的酒店的max,min,avg,sum分数

    1. GET /hotel/_search
    2. {
    3. "size": 0,
    4. "aggs": {
    5. "brandAggs": {
    6. "terms": {
    7. "field": "brand",
    8. "size": 10,
    9. "order": {
    10. "scoreAggs.avg": "desc"
    11. }
    12. },
    13. "aggs": {
    14. "scoreAggs": {
    15. "stats": {
    16. "field": "score"
    17. }
    18. }
    19. }
    20. }
    21. }
    22. }

     例子:数据聚合带过滤条件的数据聚合

    过滤条件

    1. private SearchRequest extracted(RequestParams requestParams, SearchRequest searchRequest) {
    2. //2.1构建query
    3. BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
    4. if (StringUtils.hasText(requestParams.getKey())){
    5. boolQuery.must(QueryBuilders.matchQuery("all", requestParams.getKey()));
    6. }else {
    7. boolQuery.must(QueryBuilders.matchAllQuery());
    8. }
    9. //keyword字符串过滤 品牌
    10. if (StringUtils.hasText(requestParams.getBrand())){
    11. boolQuery.filter(QueryBuilders.termQuery("brand", requestParams.getBrand()));
    12. }
    13. //keyword字符串过滤 城市
    14. if (StringUtils.hasText(requestParams.getCity())){
    15. boolQuery.filter(QueryBuilders.termQuery("city", requestParams.getCity()));
    16. }
    17. //keyword字符串过滤 星级
    18. if (StringUtils.hasText(requestParams.getStarName())) {
    19. boolQuery.filter(QueryBuilders.termQuery("starName", requestParams.getStarName()));
    20. }
    21. //range价格过滤 价格 gte是>= lte是<=
    22. if (requestParams.getMinPrice() != null && requestParams.getMaxPrice() != null) {
    23. boolQuery.filter(QueryBuilders.rangeQuery("price").gte(requestParams.getMinPrice()).lte(requestParams.getMaxPrice()));
    24. }
    25. //算分控制
    26. FunctionScoreQueryBuilder functionScoreQueryBuilder = QueryBuilders.functionScoreQuery(
    27. //原始查询
    28. boolQuery,
    29. //function score数组
    30. new FunctionScoreQueryBuilder.FilterFunctionBuilder[]{
    31. //一个具体的functionscore数组
    32. new FunctionScoreQueryBuilder.FilterFunctionBuilder(
    33. //过滤条件
    34. QueryBuilders.termQuery("isAD", true),
    35. //算分函数
    36. ScoreFunctionBuilders.weightFactorFunction(100)
    37. )
    38. });
    39. //2.2分页
    40. final int page = requestParams.getPage();
    41. final int size = requestParams.getSize();
    42. searchRequest.source().query(functionScoreQueryBuilder).from((page-1)*size).size(size);
    43. //2.3排序
    44. if (StringUtils.hasText(requestParams.getLocation())){
    45. final String location = requestParams.getLocation();
    46. searchRequest.source().sort(SortBuilders.
    47. geoDistanceSort("location", new GeoPoint(location))
    48. .order(SortOrder.ASC)
    49. .unit(DistanceUnit.KILOMETERS)
    50. );
    51. }
    52. return searchRequest;
    53. }

    数据聚合和解析结果

    1. @Override
    2. public Map> filters(RequestParams params) {
    3. try {
    4. //准备reuquest
    5. SearchRequest searchRequest = new SearchRequest("hotel");
    6. //准备DSl
    7. //query
    8. extracted(params,searchRequest);
    9. buildRequest(searchRequest);
    10. SearchResponse search = client.search(searchRequest, RequestOptions.DEFAULT);
    11. List list = getListByName(search,"brandAgg");
    12. List list1 = getListByName(search,"cityAgg");
    13. List list2 = getListByName(search,"starAgg");
    14. return Map.of(
    15. "brand",list,
    16. "city",list1,
    17. "starName",list2
    18. );
    19. } catch (IOException e) {
    20. throw new RuntimeException(e);
    21. }
    22. }
    23. private List getListByName(SearchResponse search,String key) {
    24. Aggregations aggregations = search.getAggregations();
    25. Terms terms = aggregations.get(key);
    26. Listextends Terms.Bucket> buckets = terms.getBuckets();
    27. return buckets.stream().map(MultiBucketsAggregation.Bucket::getKeyAsString).collect(Collectors.toList());
    28. }

    自动补全

    • 拼音分词器
    • 自定义分词器
    • 自动补全查询
    • 实现酒店搜索框自动补全

     拼音分词器安装

    下载连接1、微服务开发框架SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式微服务全技术栈课程_免费高速下载|百度网盘-分享无限制 (baidu.com)https://pan.baidu.com/s/169SFtYEvel44hRJhmFTRTQ#list/path=%2Fsharelink3232509500-496165211763170%2F1%E3%80%81%E5%BE%AE%E6%9C%8D%E5%8A%A1%E5%BC%80%E5%8F%91%E6%A1%86%E6%9E%B6SpringCloud%2BRabbitMQ%2BDocker%2BRedis%2B%E6%90%9C%E7%B4%A2%2B%E5%88%86%E5%B8%83%E5%BC%8F%E5%BE%AE%E6%9C%8D%E5%8A%A1%E5%85%A8%E6%8A%80%E6%9C%AF%E6%A0%88%E8%AF%BE%E7%A8%8B%2F%E5%AE%9E%E7%94%A8%E7%AF%87%2F%E5%AD%A6%E4%B9%A0%E8%B5%84%E6%96%99%2Fday07-Elasticsearch03%2F%E8%B5%84%E6%96%99&parentPath=%2Fsharelink3232509500-496165211763170

    解压缩之后拖放到Es的挂在目录中,然后重启es

    查询容器挂载目录

    docker inspect 容器id | grep Mounts -A 20 

    自定义分词器

     elasticsearch分词器(analyzer)的组成包含三部分:

    • character filters: 在tokenizer之前对文本进行处理。列如删除字符、替换字符
    • tokenizer:将文本呢按照一定的规则切割成词条(term)。列如keyword,就是部分此;还有ik_smart。
    • tokenizer filter:将tokenizer输出的词条做进一步处理。列如大小写转换、同义词处理、拼英处理等

    创建索引库的时候使用拼英分词器,搜索的时候需要注意如果再用拼英分词器就会搜索同音查询不对,所以搜索的时候换一个用ik_max_word

    1. # 自定义拼音分词器
    2. PUT /test
    3. {
    4. "settings": {
    5. "analysis": {
    6. "analyzer": {
    7. "my_analyzer": {
    8. "tokenizer": "ik_max_word",
    9. "filter": "py"
    10. }
    11. },
    12. "filter": {
    13. "py": {
    14. "type": "pinyin",
    15. "keep_full_pinyin": false,
    16. "keep_joined_full_pinyin": true,
    17. "keep_original": true,
    18. "limit_first_letter_length": 16,
    19. "remove_duplicated_term": true,
    20. "none_chinese_pinyin_tokenize": false
    21. }
    22. }
    23. }
    24. },
    25. "mappings": {
    26. "properties": {
    27. "name":{
    28. "type": "text",
    29. "analyzer": "my_analyzer"
    30. }
    31. }
    32. }
    33. }
    34. POST /test/_doc/1
    35. {
    36. "id":1,
    37. "name":"私自"
    38. }
    39. POST /test/_doc/2
    40. {
    41. "id":2,
    42. "name":"四字"
    43. }
    44. GET /test/_search
    45. {
    46. "query": {
    47. "match": {
    48. "name": "调入私自"
    49. }
    50. }
    51. }

    查询结果就会出错,因为查询也用的拼英分词器

     搜索的时候应该用ik_smart

    在定义索引库的时候

    1. # 自定义拼音分词器
    2. PUT /test
    3. {
    4. "settings": {
    5. "analysis": {
    6. "analyzer": {
    7. "my_analyzer": {
    8. "tokenizer": "ik_max_word",
    9. "filter": "py"
    10. }
    11. },
    12. "filter": {
    13. "py": {
    14. "type": "pinyin",
    15. "keep_full_pinyin": false,
    16. "keep_joined_full_pinyin": true,
    17. "keep_original": true,
    18. "limit_first_letter_length": 16,
    19. "remove_duplicated_term": true,
    20. "none_chinese_pinyin_tokenize": false
    21. }
    22. }
    23. }
    24. },
    25. "mappings": {
    26. "properties": {
    27. "name":{
    28. "type": "text",
    29. "analyzer": "my_analyzer",
    30. "search_analyzer": "ik_smart"
    31. }
    32. }
    33. }
    34. }
    35. DELETE /test
    36. POST /test/_doc/1
    37. {
    38. "id":1,
    39. "name":"私自"
    40. }
    41. POST /test/_doc/2
    42. {
    43. "id":2,
    44. "name":"四字"
    45. }
    46. GET /test/_search
    47. {
    48. "query": {
    49. "match": {
    50. "name": "调入私自"
    51. }
    52. }
    53. }

    查询结果

     

    自动补全

    completion suggester 查询

    elasticsearch提供了Completion Suggester查询来实现自动补全功能。这个查询会匹配以用户输入内容开头的词条并返回。为了提高补全查询的效率,对于文档中字段的类型有一些约束:

    新建一个索引库,添加索引文档:

    1. PUT comtest
    2. {
    3. "mappings": {
    4. "properties": {
    5. "title":{
    6. "type": "completion"
    7. }
    8. }
    9. }
    10. }
    11. POST comtest/_doc
    12. {
    13. "title":["Sony","WH-1000XM3"]
    14. }
    15. POST comtest/_doc
    16. {
    17. "title":["SK-II","PITERA"]
    18. }
    19. POST comtest/_doc
    20. {
    21. "title":["Nintendo","switch"]
    22. }

    索引库查询:注意field要全部小写

    1. # 自动补全查询
    2. GET comtest/_search
    3. {
    4. "suggest": {
    5. "testcom": {
    6. "text": "s", #关键字
    7. "completion": {
    8. "FIELD": "title", # 补全字段
    9. "skip_duplicates":true, # 跳过重复的
    10. "size":10 # 获取前10条结果
    11. }
    12. }
    13. }
    14. }
    15. GET comtest/_search
    16. {
    17. "suggest": {
    18. "titlesuggest": {
    19. "text": "s",
    20. "completion": {
    21. "field": "title",
    22. "skip_duplicates": true,
    23. "size": 10
    24. }
    25. }
    26. }
    27. }

    酒店数据自动补全 例子

    实现思路如下:

    1.修改hotel索引库结构,设置自定义拼音分词器

    2.修改索引库的name、all字段,使用自定义分词器

    3.索引库添加一个新字段suggestion,类型为completion类型,使用自定义分词器

    4.给HotelDoc类添加suggestion字段,内容包含brand、business

    5.重新导入数据到hotel库

    1.修改hotel数据库索引结构

    2.修改索引库的name、all字段,使用自定义分词器

    3.索引库添加一个新字段suggestion,类型为completion类型,使用自定义分词器

    1. PUT /hotel
    2. {
    3. "settings": {
    4. "analysis": { #自定义分词器
    5. "analyzer": {
    6. "text_anlyzer":{
    7. "tokenizer": "ik_max_word",
    8. "filter": "py"
    9. },
    10. "completion_analyzer":{
    11. "tokenizer": "ik_max_word",
    12. "filter":"py"
    13. }
    14. },
    15. "filter":{
    16. "py":{ #拼英分词器过滤器
    17. "type":"pinyin",
    18. "keep_full_pinyin":false,
    19. "keep_joined_full_pinyin":true,
    20. "keep_original":true,
    21. "limit_first_letter_length":16,
    22. "remove_duplicated_term":true,
    23. "none_chinese_pinyin_tokenize":false
    24. }
    25. }
    26. }
    27. },
    28. "mappings": {
    29. "properties": {
    30. "id":{
    31. "type": "keyword"
    32. },
    33. "name":{
    34. "type": "text",
    35. "analyzer": "text_anlyzer",
    36. "search_analyzer": "ik_smart",
    37. "copy_to": "all"
    38. },
    39. "address":{
    40. "type": "keyword",
    41. "index": false
    42. },
    43. "price":{
    44. "type": "integer"
    45. },
    46. "score":{
    47. "type": "integer"
    48. },
    49. "brand":{
    50. "type": "keyword",
    51. "copy_to": "all"
    52. },
    53. "city":{
    54. "type": "keyword"
    55. },
    56. "starName":{
    57. "type": "keyword"
    58. },
    59. "business":{
    60. "type": "keyword",
    61. "copy_to": "all"
    62. },
    63. "location":{
    64. "type": "geo_point"
    65. },
    66. "pic":{
    67. "type": "keyword",
    68. "index": false
    69. },
    70. "all":{
    71. "type": "text",
    72. "analyzer": "text_anlyzer",
    73. "search_analyzer": "ik_smart"
    74. },
    75. "suggestion":{# 搜索补全
    76. "type": "completion",
    77. "analyzer": "completion_analyzer"
    78. }
    79. }
    80. }
    81. }

    4.给HotelDoc类添加suggestion字段,内容包含brand、business

    5.重新导入数据到hotel库

    1. @Data
    2. @NoArgsConstructor
    3. public class HotelDoc {
    4. private Long id;
    5. private String name;
    6. private String address;
    7. private Integer price;
    8. private Integer score;
    9. private String brand;
    10. private String city;
    11. private String starName;
    12. private String business;
    13. private String location;
    14. private String pic;
    15. private Object distance;
    16. private String isAD;
    17. private List suggestion;
    18. public HotelDoc(Hotel hotel) {
    19. this.id = hotel.getId();
    20. this.name = hotel.getName();
    21. this.address = hotel.getAddress();
    22. this.price = hotel.getPrice();
    23. this.score = hotel.getScore();
    24. this.brand = hotel.getBrand();
    25. this.city = hotel.getCity();
    26. this.starName = hotel.getStarName();
    27. this.business = hotel.getBusiness();
    28. this.location = hotel.getLatitude() + ", " + hotel.getLongitude();
    29. this.pic = hotel.getPic();
    30. if (this.business.contains("/")){
    31. String[] split = this.business.split("/");
    32. this.suggestion = new ArrayList<>();
    33. this.suggestion.add(this.brand);
    34. Collections.addAll(this.suggestion,split);
    35. }else {
    36. this.suggestion = Arrays.asList(this.brand,this.business);
    37. }
    38. }
    39. }

    RestAPI实现自动补全

    先看请求参数构造的API:

     先创建一个request对象,然后创建dsl搜索对象,client发送请求

    1. /**
    2. * 实现字段补充查询
    3. */
    4. @Test
    5. void testSuggest() throws IOException {
    6. SearchRequest searchRequest = new SearchRequest("hotel");
    7. searchRequest.source().suggest(new SuggestBuilder().addSuggestion(
    8. "mysuggestion",
    9. SuggestBuilders.completionSuggestion("suggestion")
    10. .prefix("h")
    11. .skipDuplicates(true)
    12. .size(10)
    13. ));
    14. SearchResponse search = client.search(searchRequest, RequestOptions.DEFAULT);
    15. System.out.println(search);
    16. }

    结果解析:

    RestAPI实现

    1. /**
    2. * 实现字段补充查询
    3. */
    4. @Test
    5. void testSuggest() throws IOException {
    6. SearchRequest searchRequest = new SearchRequest("hotel");
    7. searchRequest.source().suggest(new SuggestBuilder().addSuggestion(
    8. "mysuggestion",
    9. SuggestBuilders.completionSuggestion("suggestion")
    10. .prefix("h")
    11. .skipDuplicates(true)
    12. .size(10)
    13. ));
    14. SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
    15. // 4.处理结果
    16. Suggest suggest = response.getSuggest();
    17. // 4.1根据名称获取补全结果
    18. CompletionSuggestion suggestion = suggest.getSuggestion("mysuggestion");
    19. // 4.2获取options并遍历
    20. for (Suggest.Suggestion.Entry.Option completionSuggestion : suggestion.getOptions()){
    21. // 4.3获取一个option中的text,也就是补全词条
    22. System.out.println(completionSuggestion.getText().toString());
    23. }
    24. }

     

    实现酒店搜索页面输入框的自动补全

    查看前端页面,可以发现当我们在输入框键入时,前端会发起ajax请求

    在服务端编写接口,接受该请求,返回补全结果的集合,类型为List

    数据同步

    数据同步问题分析

    elasticsearch中的酒店数据来自于mysql数据库,因此mysql数据发生改变时,elasticsearch也必须跟着改变,这个就是elasticsearch与mysql之间的数据同步

     方案一:同步调用

     方案二:异步通知

     方案三:监听binlog

     方式一:同步调用

    • 优点:实现简单,粗暴
    • 缺点:业务耦合度高

    方式二:异步通知

    • 低耦合,实现难度一般
    • 依赖mq的可靠性

    方式二:监听binlog

    • 优点:完全接触服务间耦合
    • 缺点:开启binlog增加数据库负担,实现复杂度高

     案例:利用MQ实现mysql与elasticsearch数据同步

    利用课前资料提供的hotel-admin项目作为酒店管理的微服务。当酒店数据发生增删改时候,要求对elasticsearch中数据也要完成相同操作。

     步骤:

    • 导入课前资料提供的hotel-admin项目,启动并测试酒店数据的CRUD
    • 声明exchange、queue、Routingkey
    • 在hotel-admin中的增、删、改业务中完成消息发送
    • 在hotel-demo中完成消息监听,并更新elasticsearch中数据
    • 启动并测试数据同步功能

     

  • 相关阅读:
    Redis内存碎片:深度解析与优化策略
    Ansible 的脚本 --- playbook 剧本
    JS 实现对象监听 - Kaiqisan
    Linux调试器-gdb使用
    Spring源码(十三)reflush方法的finishBeanFactoryInitialization方法(一)
    flink的TwoPhaseCommitSinkFunction怎么做才能提供精准一次保证
    前端架构基础 vue04 vue中的网络请求
    Everest Group发布《2023年RPA供应商评估报告》:2家中国厂商持续上榜
    (02)Cartographer源码无死角解析-(09) gflags与glog简介、及其main函数讲解
    给字符串添加加粗标签(AC自动机+Python)
  • 原文地址:https://blog.csdn.net/weixin_46543456/article/details/127490747