• 公司实战 ElasticSearch+Kafka+Redis+MySQL


    一、需求

    前一段时间公司要进行数据转移,将我们ES数据库中的数据转移到客户的服务器上,并且使用定时将新增的数据同步,在这过程中学到了很多,在此记录一下!

    二、技术栈

    Mysql + Redis + ElasticSearch + Kafka

    三、方案

    为了降低服务器的压力,在每天的零时进行推送数据,推送前比较上一次推送记录在Redis中的数据,此记录为ES数据库中的时间字段,每次推送结束前都会将最新的时间更新在这个key中,如果获取ES数据库中的字段与key一样,说明今日无数据更新。

    因为ES索引的数据量在千万以上,所以没有选择分页,而是选择了ES的滚轮查询。

     public static void getDayData(RestHighLevelClient client,
                                        KafkaTemplate kafkaTemplate,
                                        RedisUtil redisUtil,
                                        String field,
                                        String indexName,
                                        String topic) {
            //发送创建索引所需的相关信息  索引名 属性 分片
            HashMap<String, Object> map1 = new HashMap<>();
            map1.put("indices", indexName);
            map1.put("mappings", ElasticSearchUtil.getIndexMappings(client, indexName));
            map1.put("settings", ElasticSearchUtil.getIndexSettingsShards(client, indexName));
            kafkaTemplate.send(str, JSON.toJSONString(map1));
    
    
            int i = 0;
            final Scroll scroll = new Scroll(TimeValue.timeValueSeconds(30L));
            SearchRequest request = new SearchRequest(indexName);
            request.scroll(scroll);
            SearchSourceBuilder builder = new SearchSourceBuilder();
            //查询此索引的所有数据
    
            builder.query(
                    QueryBuilders.rangeQuery(field)
                            .gt(redisUtil.hget(indexName,"push_time"))
                            ).sort(field, SortOrder.ASC);
    
            builder.size(1000);
            request.source(builder);
            SearchResponse response = null;
            try {
                response = client.search(request, RequestOptions.DEFAULT);
            } catch (Exception e) {
                e.printStackTrace();
            }
            String scrollId = response.getScrollId();
            SearchHit[] hits = response.getHits().getHits();
            // 没有新增数据
            if(hits == null)
                log.info("索引 {} 今日无新增数据",indexName);
    
            for (SearchHit hit : hits) {
                Map<String, Object> map = hit.getSourceAsMap();
                map.put("_id", hit.getId());
                kafkaTemplate.send(topic, JSON.toJSONString(map));
                i++;
            }
            //完成第一次后 更新key
            redisUtil.hset(indexName, "push_time", hits[hits.length - 1].getSourceAsMap().get(field));
            //通过在循环中调用搜索滚动 API 来检索所有搜索命中 直到不返回任何文件
            while (hits != null && hits.length > 0) {
                // 处理返回的搜索结果
                SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId);
                scrollRequest.scroll(scroll);
                try {
                    response = client.scroll(scrollRequest, RequestOptions.DEFAULT);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                scrollId = response.getScrollId();
                hits = response.getHits().getHits();
                for (SearchHit hit : hits) {
                    Map<String, Object> map = hit.getSourceAsMap();
                    map.put("_id", hit.getId());
                    kafkaTemplate.send(topic, JSON.toJSONString(map));
                    i++;
                    System.out.println(i);
                }
                //从第二次开始 每次都要更新key
                redisUtil.hset(indexName, "push_time", hits[hits.length - 1].getSourceAsMap().get(field));
            }
            log.info("索引 {} 总共推送了 {} 条", indexName, i);
            // 滚动完成后清除滚动上下文
            ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
            clearScrollRequest.addScrollId(scrollId);
            try {
                client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80

    TimeValue.timeValueSeconds(30L)
    builder.size(1000)

    这个参数最开始设置的是5L,但是条件查询的大小设置为了1000,可能会出现到了预计的时间但是没有找到1000条数据从而产生报错,所以尽可能将滚轮滚动的时间设置大一些,反正搜索完就会进行下一次滚动,不会产生拉低效率的问题!


    在正式发送数据之前要提前将要发送的索引的信息(字段属性、分片信息)发送至Kafka的消费端,这样做的目的是如果客户服务器没有该索引就手动创建索引,一般情况来说,我们是不允许在消费端自动创建索引的,会造成字段属性出错。

    获取索引属性信息和分片的工具类

        /**
         * 获取 索引 mappings
         * @param client
         * @param index
         * @return
         */
        public static Map<String, Object> getIndexMappings(RestHighLevelClient client, String index) {
            GetMappingsRequest request = new GetMappingsRequest();
            request.indices(index);
            GetMappingsResponse resp = null;
            try {
                resp = client.indices().getMapping(request, RequestOptions.DEFAULT);
            } catch (Exception e) {
                e.printStackTrace();
            }
            return resp.mappings().get(index).getSourceAsMap();
        }
    
    
        /**
         * 获取 索引 settings的分片
         *
         * @param client
         * @param index
         * @return
         */
        public static Map<String, String> getIndexSettingsShards(RestHighLevelClient client, String index) {
            Map<String, String> resMap = new HashMap<>();
            GetSettingsRequest request = new GetSettingsRequest();
            request.indices(index);
            try {
                GetSettingsResponse resp = client.indices().getSettings(request, RequestOptions.DEFAULT);
                Settings settings = resp.getIndexToSettings().get(index);
                System.out.println(settings);
                resMap.put("number_of_shards", settings.get("index.number_of_shards"));
            } catch (Exception e) {
                e.printStackTrace();
            }
            return resMap;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    为了更好的区分和控制,发送创建索引消息的Topic与发送数据的Topic不是同一个,这时候存在一个问题,就是创建索引完成的时间无法控制,从而无法发送数据
    我们使用到了JUC的辅助类CountDownLatch,作为一个减数器,如果索引创建完毕,减数器减一,释放锁,非常好用!

    @KafkaListener(id = "IPAttack", topics = "IPAttack")
        public void IPAttackContinuous(List<String> records) throws InterruptedException {
            BulkProcessor bulkProcessor = GetBulkProcessor.getBulkProcessor(client);
    
            // 等待 index 创建
            if (!ElasticSearchUtil.isExistsIndex(client, index)) {
                log.error("索引: {} 还未创建", index);
                //加锁 减数器的值设置为1
                cdl = new CountDownLatch(1);
                //减数器归0才能执行下面的代码
                cdl.await();
            }
            //批量入库
            for (String record : records) {
                Map map = JSONObject.parseObject(record, Map.class);
                String _id = map.get("_id").toString();
                map.remove("_id");
    
                bulkProcessor.add(new IndexRequest(index).id(_id).source(map));
            }
            bulkProcessor.flush();
            bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    我们发现如果没有创建好索引,线程会阻塞导致无法执行下面的代码!

      @KafkaListener(id = "CreateIndex",topics = "CreateIndex")
        public void createIndexListener(String record){
            Map map = JSON.parseObject(record,Map.class);
            String index = map.get("indices").toString();
            if(!ElasticSearchUtil.isExistsIndex(client,index)){
                log.info("索引: {} 开始创建", index);
                CreateIndexRequest indices = new CreateIndexRequest(index);
                indices.settings((Map<String, ?>) map.get("settings"));
                indices.mapping((Map<String, ?>)map.get("mappings"));
                try{
                    client.indices().create(indices, RequestOptions.DEFAULT);
                }catch (Exception e){
                    e.printStackTrace();
                }
                //创建索引完毕释放锁
                if(DnsReceive.cdl != null){
                    DnsReceive.cdl.countDown();
                }
                log.info("索引: {} 创建完成", index);
            }else{
                log.info("已经存在索引:{}",index);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    countDown()方法执行后,说明索引创建完毕,此时减数器减一,发送数据的Topic接收到就会开始批量数据入库
    数据推送完毕后,可以将此次推送的数据量、索引名等等信息记录在MySQL中,这边还没有要求所以没有写



    四、总结

    整体下来锻炼了逻辑思维和写代码的能力,完成以后又想了一遍觉得其实没有那么难,但对于小白刚入职场的我来说,是一次历练,无论对于我想问题的方式还是排错的切入点都有很好的帮助!

  • 相关阅读:
    superset支持Kylin4.0.0(兼容处理日期分组功能)
    一文搞懂PKI/CA
    html5&css&js代码 007 文章排版 颜真卿《述张长史笔法十二意》
    自定义http状态码
    tf.compat.v1.global_variables
    C++【哈希】
    java毕业设计爱心互助及物品回收管理系统Mybatis+系统+数据库+调试部署
    2023年第十三届中国国际储能大会(CIES2023)-核心PPT资料下载
    华为十年大佬带你开启springboot实战之旅,从源码到项目,一步到位!
    LOWORD, HIWORD, LOBYTE, HIBYTE的解释
  • 原文地址:https://blog.csdn.net/G823909/article/details/128138229