• Elasticsearch集群搭建学习


    RestClient

    查询所有

    void testMatchAll() throws IOException {
            //1.准备Request
            SearchRequest request = new SearchRequest("hotel");
            //2.准备DSL
            request.source().query(QueryBuilders.matchAllQuery());
            //3.发送请求
            SearchResponse response = restClient.search(requ est , RequestOptions.DEFAULT);
    
            System.out.println(response);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    高亮

    高亮API包括请求DSL构建和结果解析两部分。

    构建:

    request.source().highlighter(new HghlightBuilder().field("name")
                                 //是否需要与字段匹配
                                 .requireFieldMatch(false))
    
    • 1
    • 2
    • 3
    @Test
        public void testHighLight() throws IOException {
            //1 准备Request
            SearchRequest request = new SearchRequest("hotel");
            //2 准备DSL
            request.source().query(QueryBuilders.matchQuery("name" , "如家"));
            //2.2 高亮
            request.source().highlighter(new HighlightBuilder().field("name").requireFieldMatch(false));
            //3 发送请求
            SearchResponse response = restClient.search(request , RequestOptions.DEFAULT);
            //4 解析响应
            SearchHits searchHits = response.getHits();
            //5 获取总条数
            long total = searchHits.getTotalHits().value;
            //4.2 文档数组
            SearchHit[] hits = searchHits.getHits();
            //4.3 遍历
            for(SearchHit hit : hits){
                //获取文档source
                String json = hit.getSourceAsString();
                //反序列化
                HotelDoc hotelDoc = JSON.parseObject(json , HotelDoc.class);
                //获取高亮结果
                Map<String , HighlightField> highlightFields = hit.getHighlightFields();
                //根据字段名获取高亮结果
                HighlightField highlightField = hightlightFields.get("name");
                //获取高亮值
                String name = highlightField.getFragments()[0].string();
                //覆盖非高亮结果
                hotelDoc.setName(name);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 所有搜索DSL的构建,记住一个API:

      • SearchRequest的source()方法。
    • 高亮结果解析是参考JSON结果,逐层解析

    算分控制

    FunctionScoreQueryBuilder functionScoreQuery = QueryBuilders.functionScoreQuery(
    	//原始查询,相关性算分的查询
        boolQuery ,
        new FunctionSocreQueryBuilder.FilterFunctionBuilder[]{
            new FunctionScoreQueryBuilder.FilterFunctionBuilder(
            	//过滤条件
                QueryBuilders.termQuery("isAd" , true),
                //算分函数
                ScoreFuntionBuilders.weightFactorFuntion(10)
                
            )
        }
    );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    数据聚合

    聚合的分类

    聚合(aggregations)可以实现对文档数据的统计、分析、运算。聚合常见的有三类:

    • 桶(Bucket)聚合:用来对文档做分组

      • TermAggregation:按照文档字段值分组
      • Date Histogram:按照日期阶梯分组,例如一周为一组,或者一月为一组
    • 度量(Metric)聚合:用以计算一些值,比如:最大值、最小值、平均值等

      • Avg:求平均值
      • Max:求最大值
      • Min:求最小值
      • Stats:同时求max、min、avg、sum等
    • 管道(pipeline)聚合:其他聚合的结果为基础做聚合

    什么是聚合?

    • 聚合是对文档数据的统计、分析、计算

    聚合的常见种类有哪些?

    • Bucket:对文档数据分组,并统计每组数量
    • Metric:对文档数据做计算,例如avg
    • Pipeline:基于其他聚合结果再做聚合

    参与聚合的字段类型必须是:

    • keyword
    • 数值
    • 日期
    • 布尔

    DSL实现Bucket聚合

    GET /hotel/_search
    {
        "size":0, // 设置size为0,结果中不包含文档,只包含聚合结果
        "aggs":{ // 定义聚合
            "brandAgg":{ // 给聚合起个名字
                "terms":{ // 聚合的类型,按照品牌值聚合,所以选择term
                    "field":"brand", // 参与聚合的字段
                    "size":20 // 希望获取的聚合结果数量
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    aggs代表聚合,与query同级,此时query的作用是?

    • 限定聚合的文档范围

    聚合必须的三要素:

    • 聚合名称
    • 聚合类型
    • 聚合手段

    聚合可配置属性有:

    • size:指定聚合结果数量
    • order:指定聚合结果排序方式
    • field:指定聚合字段

    DSL实现Metrics聚合

    例如,我们要求获取每个品牌的用户评分的min、max、avg等值

    GET /hotel/_search
    {
     	"size":0,
        "aggs":{
        	"brandAgg":{
        		"terms":{
        			"field":"brand",
        			"size":20
        		},
        		"aggs":{// 是brands聚合的子聚合,也就是分组后对每组分别计算
        			"score_status":{ // 聚合名称
        				"stats":{ // 聚合类型,这里stats可以计算min、max、avg等
        					"field":"score"// 聚合字段,这里可以是score
        				}
        			}
        		}
        	}
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    RestAPI实现聚合

    @Test
        public void testAggregation() throws IOException {
            // 准备Request
            SearchRequest request = new SearchRequest("hotel");
            // 准备DSL
            // 设置size
            request.source().size(0);
            // 聚合
            request.source().aggregation(AggregationBuilders.terms("brandAgg").size(20).field("brand"));
            //发出请求
            SearchResponse response = restClient.search(request, RequestOptions.DEFAULT);
            //解析聚合结果
            Aggregations aggregations = response.getAggregations();
            //根据名称获取聚合结果
            Terms brandTerms = aggregations.get("brand_agg");
            //获取桶
            List<? extends Terms.Bucket> buckets = brandTerms.getBuckets();
            for (Terms.Bucket bucket : buckets) {
                //获取key,也就是品牌信息
                String brandName = bucket.getKeyAsString();
                System.out.println(brandName);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    实现对品牌、城市、星级的聚合

    @Test
        public Map<String , List<String>> testMap() throws IOException {
            // 准备Request
            SearchRequest request = new SearchRequest("hotel");
            // 准备DSL
            // 设置size
            request.source().size(0);
            // 聚合
            request.source().aggregation(AggregationBuilders.terms("brandAgg").size(20).field("brand"));
            request.source().aggregation(AggregationBuilders.terms("cityAgg").size(20).field("city"));
            request.source().aggregation(AggregationBuilders.terms("starAgg").size(20).field("starName"));
            //发出请求
            SearchResponse response = restClient.search(request, RequestOptions.DEFAULT);
            //解析聚合结果
            Aggregations aggregations = response.getAggregations();
            Map<String , List<String>> result = new HashMap<>();
            result.put("brandAgg" , getAggByName("brandAgg" , aggregations));
            result.put("cityAgg" , getAggByName("cityAgg" , aggregations));
            result.put("starAgg" , getAggByName("starAgg" , aggregations));
            return result;
        }
    
        private List<String> getAggByName(String name , Aggregations aggregations){
            List<String> result = new ArrayList<>();
            Terms terms = aggregations.get(name);
            for (Terms.Bucket bucket : terms.getBuckets()) {
                String keyAsString = bucket.getKeyAsString();
                result.add(keyAsString);
            }
            return result;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    拼音分词器

    如何使用拼音分词器?

    下载pinyin分词器

    解压并放到elasticsearch的plugin目录

    重启

    如何自定义分词器?

    创建索引库时,在settings中配置,可以包含三部分

    character filter

    tokenizer

    filter

    拼音分词器注意事项?

    为了避免搜索到同音字,搜索时不要使用拼音分词器

    自动补全

    elasticsearch提供了Completion Suggester查询来实现自动补全功能。这个查询会匹配以用户输入内容开头的词条并返回。为了提高补全查询的效率,对于文档中字段的类型有一些约束:

    • 参与补全查询的字段必须是completion类型
    • 字段的内容一般是用来补全的多个词条形成的数组
    // 自动补全查询
    GET /test2/_search
    {
      "suggest":{
        "titleSuggest":{
          "text":"s",
          "completion":{
            "field":"title",
            "skip_duplicates":true,
            "size":10
          }
        }
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    自动补全对字段的要求:

    • 类型是completion类型
    • 字段值是多词条的数组
    @Test
    public void testSuggest() throws IOException {
        // 准备Request
        SearchRequest request = new SearchRequest("hotel");
        // 准备DSL
        request.source().suggest(new SuggestBuilder().addSuggestion(
            "suggestions" , SuggestBuilders.completionSuggestion("suggestion")
        	.prefix("h")
        	.skipDuplicates(true)
        	.size(10)));
        // 发起请求
        SearchResponse response = client.search(request,RequestOptions.DEFAULT);
        // 解析结果
        Suggest suggest = response.getSuggest();
        // 根据名称获取补全结果
        CompletionSuggestion suggestion = suggest.getSuggestion("hotelSuggestion");
        //获取options并遍历
        for(CompletionSuggestion.Entry.Option option : suggestion.getOptions()){
            String text = option.getText().string();
            System.out.println(text);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    数据同步

    • 同步调用
      • 优点:实现简单,粗暴
      • 缺点:业务耦合度高
    • 异步通知
      • 优点:低耦合,实现难度一般
      • 缺点:依赖mq的可靠性
    • 监听binlog
      • 优点:完全解除服务间耦合
      • 缺点:开启binlog增加数据库负担、实现复杂度高

    异步通知数据同步步骤:

    1. 定义config文件,声明队列和交换机bean,并绑定队列与交换机
    2. 在增加删除修改接口中发送mq消息到指定的增删改队列
    3. 定义监听器,监听mq消息并修改es文档

    集群搭建

    ES集群结构

    单机的es做数据存储,必然面临两个问题:海量数据存储问题,单点故障问题

    • 海量数据存储问题:将索引库从逻辑上拆分N个分片(shard),存储到多个节点
    • 单点故障问题:将分片数据在不同节点备份(replica)

    创建es集群

    首先编写一个docker-compose.yml文件,内容如下:

    version: '2.2'
    services:
      es01:
        image: elasticsearch:7.12.1
        container_name: es01
        environment:
          - node.name=es01
          - cluster.name=es-docker-cluster
          - discovery.seed_hosts=es02,es03
          - cluster.initial_master_nodes=es01,es02,es03
          - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
        volumes:
          - data01:/usr/share/elasticsearch/data
        ports:
          - 9200:9200
        networks:
          - elastic
      es02:
        image: elasticsearch:7.12.1
        container_name: es02
        environment:
          - node.name=es02
          - cluster.name=es-docker-cluster
          - discovery.seed_hosts=es01,es03
          - cluster.initial_master_nodes=es01,es02,es03
          - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
        volumes:
          - data02:/usr/share/elasticsearch/data
        ports:
          - 9201:9200
        networks:
          - elastic
      es03:
        image: elasticsearch:7.12.1
        container_name: es03
        environment:
          - node.name=es03
          - cluster.name=es-docker-cluster
          - discovery.seed_hosts=es01,es02
          - cluster.initial_master_nodes=es01,es02,es03
          - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
        volumes:
          - data03:/usr/share/elasticsearch/data
        networks:
          - elastic
        ports:
          - 9202:9200
    volumes:
      data01:
        driver: local
      data02:
        driver: local
      data03:
        driver: local
    
    networks:
      elastic:
        driver: bridge
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58

    es运行需要修改一些linux系统权限,修改/etc/sysctl.conf文件

    vi /etc/sysctl.conf
    
    • 1

    添加下面的内容:

    vm.max_map_count=262144
    
    • 1

    然后执行命令,让配置生效:

    sysctl -p
    
    • 1

    通过docker-compose启动集群:

    docker-compose up -d
    
    • 1

    集群状态监控

    kibana可以监控es集群,不过新版本需要依赖es的x-pack 功能,配置比较复杂。

    这里推荐使用cerebro来监控es集群状态,官方网址:https://github.com/lmenezes/cerebro

    双击的cerebro.bat文件即可启动服务。

    访问http://localhost:9000 即可进入管理界面:

    在这里插入图片描述

    输入你的elasticsearch的任意节点的地址和端口,点击connect即可:

    在这里插入图片描述

    绿色的条,代表集群处于绿色(健康状态)。

    创建索引库

    1)利用kibana的DevTools创建索引库

    在DevTools中输入指令:

    PUT /itcast
    {
      "settings": {
        "number_of_shards": 3, // 分片数量
        "number_of_replicas": 1 // 副本数量
      },
      "mappings": {
        "properties": {
          // mapping映射定义 ...
        }
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    2)利用cerebro创建索引库

    利用cerebro还可以创建索引库:

    在这里插入图片描述

    填写索引库信息:

    在这里插入图片描述

    点击右下角的create按钮:

    在这里插入图片描述

    查看分片效果

    回到首页,即可查看索引库分片效果:

    在这里插入图片描述

    ES集群中的节点角色

    elasticsearch中集群节点有不同的职责划分:

    在这里插入图片描述

    ES脑裂

    默认情况下,每个节点都是master eligible节点,因此一旦master节点宕机,其他候选节点会选举一个称为主节点。当主节点与其他节点网络故障时,可能发生脑裂问题。

    为了避免脑裂,需要要求选票超过(eligible节点数量+1)/2才能当选为主,因此eligible节点数量最好是奇数。对应配置项是discovery.zen.minimux_master_nodes,在es7.0以后,已经成为默认配置,因此一般不会发生脑裂问题。

    • master eligible结点的作用?
      • 参与集群选主
      • 主节点可以管理集群状态、管理分片信息、处理创建和删除索引库的请求
    • data结点的作用?
      • 数据的CRUD
    • coordinator结点的作用?
      • 路由请求到其他节点
      • 合并查询到的结果,返回给用户

    ES集群的分布式存储

    当新增文档时,应该保存到不同的分片,保证数据均衡,那么coordinating node如何确定数据该存储到哪个分片呢?elasticsearch会通过hash算法来计算文档应该存储到哪个分片:

    shard = hash(_routing) % number_of_shards
    
    • 1

    说明:

    • _routing默认是文档的id
    • 算法与分片数量有关,因此索引库一旦创建,分片数量不能修改!

    ES集群的分布式查询

    elasticsearch的查询分为两个阶段:

    • scatter phase:分散阶段,coordinating node会把请求分发到每一个分片
    • gather phase:聚集阶段,coordinating node汇总data node的搜索结果,并处理为最终结果集返回给用户

    ES集群的故障转移

    集群的master节点会监控集群中的节点状态,如果发现有节点宕机,会立即将宕机节点的分片数据迁移到其他节点,确保数据安全,这个叫做故障转移

  • 相关阅读:
    什么浏览器广告少?多御安全浏览器轻体验
    加密和验签
    Python中,我们可以使用pandas和numpy库对Excel数据进行预处理,包括读取数据、数据清洗、异常值剔除等
    taro(踩坑) npm run dev:weapp 微信小程序开发者工具预览报错
    程序员必看内容连续集之 SpringBoot03 SSM整合SpringBoot
    Git常用命令与分支管理
    【CNN记录】tensorflow slice和strided_slice
    Day08
    面试网络-0x01 http中的GET和POST区别?
    Notepad++使用技巧
  • 原文地址:https://blog.csdn.net/yuanyajieh/article/details/139202632