• Java elasticsearch scroll模板实现


    一、scroll说明和使用场景

    scroll的使用场景:大数据量的检索和操作

    scroll顾名思义,就是游标的意思,核心的应用场景就是遍历 elasticsearch中的数据;

    通常我们遍历数据采用的是分页,elastcisearch还支持from size的方式进行分页查询,使用 from and size 的深度分页,比如说 ?size=10&from=10000,因为 100,000 排序的结果必须从每个分片上取出并重新排序最后返回 10 条。这个过程需要对每个请求页重新进行提取+排序,效率很低,消耗很大,所以默认的最大可分页的数据是10000,超过10000是不建议的;

    使用

    通过在url末尾带上scroll=1m表示开启一个游标,1m表示游标的有效期为1分钟

    POST /record/_search?scroll=1m
    {
      "from": 0,
      "size": 20
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    返回结果中会把scroll的id带上,再次查询的时候,直接用scroll id查询即可

    POST /_search/scroll
    {
        "scroll" : "1m", 
        "scroll_id" : "FGluY2x1ZGVfY29udGV4dF91dWlkDnF1ZXJ5VGhlbkZldGNoAhZuYmpMbVpwWFRUMnNFMUFFSHlSMHB3AAAAAALBy_0WUWxrNTRTaWNUcy1sOHQ0VUo5dzF6dxZoemFkZTlMeFQ4MmoyOW5SUG8ybE53AAAAAAN6ip8WMmk5TWZlQ21RQnFsNURwaXRzSGhCdw==" 
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    二、基于ElasticsearchRestTemplate的实现

    这里我们定义了一个template如下,主要作用就是实现一个基于scroll的数据遍历模板,屏蔽开启scroll 以及 scroll遍历所有数据,通过Consumer钩子函数进行数据处理

    import lombok.extern.slf4j.Slf4j;
    import org.elasticsearch.index.query.BoolQueryBuilder;
    import org.elasticsearch.index.query.QueryBuilders;
    import org.springframework.data.domain.PageRequest;
    import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
    import org.springframework.data.elasticsearch.core.SearchHit;
    import org.springframework.data.elasticsearch.core.SearchScrollHits;
    import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
    import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
    
    import java.util.List;
    import java.util.concurrent.*;
    
    /**
     * scrollTemplate 模板,用于遍历整个Index的数据
     * @author xiuzhu
     * @Date 2023/7/28 13:12
     */
    @Slf4j
    public class ElasticSearchScrollTemplate<T> {
    
        ExecutorService executorService = new ThreadPoolExecutor(1, 4,
                                          30,TimeUnit.SECONDS,
                                          new LinkedBlockingQueue<Runnable>(5),
                                            Executors.defaultThreadFactory(),
                                            new ThreadPoolExecutor.CallerRunsPolicy()
                                        );
    
        ElasticsearchRestTemplate elasticSearchRestTemplate;
    
        Class<T> cls;
    
        String indexName;
    
        public ElasticSearchScrollTemplate(
                ElasticsearchRestTemplate template,
                Class<T> cls,
                String indexName
        ) {
            this.elasticSearchRestTemplate = template;
            this.cls = cls;
            this.indexName = indexName;
        }
    
        @FunctionalInterface
        public interface Consumer<T> {
            public void accept(List<T> objects);
        }
    
        public void execute(Consumer<T> consumer) {
            //构建查询条件
            NativeSearchQueryBuilder query = new NativeSearchQueryBuilder();
            BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
    
            query.withPageable(PageRequest.of(0, 300));
            query.withQuery(queryBuilder);
    
            //保留0.5分钟
            long scrollTimeInMillis = 30*1000;
    
            IndexCoordinates recordIndex = IndexCoordinates.of(indexName);
            SearchScrollHits<T> hits = elasticSearchRestTemplate.searchScrollStart(scrollTimeInMillis, query.build(), cls, recordIndex);
    
            // scrollId
            String scrollId = hits.getScrollId();
            List<T> recordEntityList = hits.stream().map(SearchHit::getContent).toList();
            long total = 0L;
    
            log.info("================ began scroll index={} ====================", indexName);
    
            executorService.submit(()->{
                consumer.accept(recordEntityList);
            });
    
            total = total + recordEntityList.size();
    
            log.info("================  has scroll index={} total={} ====================", indexName, total);
            while (!hits.isEmpty()) {
                hits = elasticSearchRestTemplate.searchScrollContinue(scrollId, scrollTimeInMillis, cls, recordIndex);
                List<T> entities = hits.stream().map(SearchHit::getContent).toList();
    
                executorService.submit(()->{
                    consumer.accept(entities);
                });
    
                total = total + entities.size();
                try {
                    //给系统留GC时间,不然容易内存溢出
                    Thread.sleep(300);
                } catch (InterruptedException e) {
                    log.error("sleep error", e);
                }
                log.info("================  has scroll index={} total={} ====================", indexName, total);
            }
            log.info("================ end scroll index={} ====================", indexName);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97

    使用参考:

    @Resource(name = "elasticSearchRestTemplate")
        ElasticsearchRestTemplate elasticsearchRestTemplate;
    
    new ElasticSearchScrollTemplate<>(
                            elasticsearchRestTemplate,
                            RecordEntity.class,
                            "record")
                    ).execute((entities)->{
                        entities.forEach(item->{
                            //这里进行数据的处理,比如修改数据
                            recordEntityService.save(item);
                            log.info("tag update success record={} api={}", item.getId());
    
                        });
                    });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    本文由mdnice多平台发布

  • 相关阅读:
    JUC常见的线程池源码学习 02 ( ThreadPoolExecutor 线程池 )
    灯塔批量添加指纹信息
    three.js 字体精简处理
    [短记] speechbrain
    文件包含漏洞全面详解
    SpringSecurity + jwt + vue2 实现权限管理 , 前端Cookie.set() 设置jwt token无效问题(已解决)
    【总结】岛屿类问题(二维表格的dfs)
    刷题DAY22 | LeetCode 235-二叉搜索树的最近公共祖先 701-二叉搜索树中的插入操作 450-删除二叉搜索树中的节点
    不知道音频格式转换软件哪个好?打工人都在用的几款你别错过
    弄清数据库索引的来龙去脉
  • 原文地址:https://blog.csdn.net/Viogs/article/details/132670616