• 106 基于消息队列来做 mysql 大数据表数据的遍历处理


    前言

    最近有这样的一个需求, 我们存在一张 很大的 mysql 数据表, 数据量大概是在 六百万左右 

    然后 需要获取所有的记录, 将数据传输到 es 中 

    然后 当时 我就写了一个脚本来读取 这张大表, 然后 分页获取数据, 然后 按页进行数据处理 转换到 es 

    但是存在的问题是, 前面 还效率还可以, 但是 约到后面, 大概是到 三百多页, 的时候 从 mysql 读取数据 已经快不行了 

    十分耗时, 这里就是 记录这个问题的 另外的处理方式 

    我这里的处理是基于 消息中间件, 从 mysql 通过 datax/spoon 传输数据到 kafka 很快 

    然后  java 程序从 kafka 中消费队列的数据 也很快, 最终 六百万的数据 读取 + 处理 合计差不多是 一个多小时完成, 其中处理 有一部分地方 业务上面比较耗时 

     

     

    待处理的数据表

    待处理的数据表如下, 里面合计 600w 的数据 

    1. CREATE TABLE `student_all` (
    2. `id` int NOT NULL AUTO_INCREMENT,
    3. `field0` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
    4. `field1` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
    5. `field2` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
    6. `field3` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
    7. `field4` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
    8. `field5` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
    9. `field6` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
    10. `field7` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
    11. `field8` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
    12. `field9` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
    13. `field10` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
    14. `field11` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
    15. `field12` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
    16. `field13` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
    17. `field14` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
    18. `field15` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
    19. `field16` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
    20. `field17` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
    21. `field18` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
    22. `field19` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
    23. `field20` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
    24. `field21` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
    25. `field22` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
    26. `field23` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
    27. `field24` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
    28. `field25` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
    29. `field26` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
    30. `field27` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
    31. `field28` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
    32. `field29` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,
    33. `CREATED_AT` bigint NOT NULL,
    34. `UPDATED_AT` bigint NOT NULL,
    35. PRIMARY KEY (`id`)
    36. ) ENGINE=InnoDB AUTO_INCREMENT=4379001 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci

     

     

    基于 mysql 的数据分页处理

    基于 mysql 的处理程序如下, 就是一个简单的 mysql 分页 

    然后将需要提取的数据封装, 然后 批量提交给 es 

    总的情况来说是 前面的一部分页是可以 很快的响应数据, 但是 越到后面, mysql 服务器越慢 

    1. /**
    2. * Test05PostQy2Es
    3. *
    4. * @author Jerry.X.He
    5. * @version 1.0
    6. * @date 2022/11/21 16:00
    7. */
    8. public class Test05PostEsFromMysql {
    9. private static String mysqlUrl = "jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&autoReconnectForPools=true";
    10. private static String mysqlUsername = "postgres";
    11. private static String mysqlPassword = "postgres";
    12. private static JdbcTemplate mysqlJdbcTemplate = JdbcTemplateUtils.getJdbcTemplate(mysqlUrl, mysqlUsername, mysqlPassword);
    13. private static RestHighLevelClient esClient = getEsClient();
    14. private static IndicesClient indicesClient = esClient.indices();
    15. // Test05PostQy2Es
    16. public static void main(String[] args) throws Exception {
    17. String esIndexName = "student_all_20221211";
    18. bulkEsData(esIndexName);
    19. }
    20. private static void bulkEsData(String esIndexName) throws Exception {
    21. String queryDbTableName = "student_all";
    22. List<String> fieldList = Arrays.asList("id", "field0", "field1", "field2", "field3", "field4", "field5", "field6", "field7", "field8", "field9", "field10", "field11", "field12", "field13", "field14", "field15", "field16", "field17", "field18", "field19", "field20", "field21", "field22", "field23", "field24", "field25", "field26", "field27", "field28", "field29", "CREATED_AT", "UPDATED_AT");
    23. String idKey = "id";
    24. String whereCond = "";
    25. // String orderBy = "order by id asc";
    26. String orderBy = "";
    27. AtomicInteger counter = new AtomicInteger(0);
    28. int pageSize = 1000;
    29. int startPage = 0;
    30. pageDo(queryDbTableName, whereCond, orderBy, pageSize, startPage, (pageNo, list) -> {
    31. BulkRequest bulkRequest = new BulkRequest();
    32. for (Map<String, Object> entity : list) {
    33. IndexRequest indexRequest = new IndexRequest(esIndexName);
    34. Map<String, Object> sourceMap = new LinkedHashMap<>();
    35. List<String> allFieldsListed = new ArrayList<>();
    36. for (String fieldName : fieldList) {
    37. String fieldValue = String.valueOf(entity.get(fieldName));
    38. sourceMap.put(fieldName, fieldValue);
    39. allFieldsListed.add(Objects.toString(fieldValue, ""));
    40. }
    41. String id = String.valueOf(entity.get(idKey));
    42. indexRequest.id(id);
    43. sourceMap.put("_allFields", StringUtils.join(allFieldsListed, "$$"));
    44. indexRequest.source(sourceMap);
    45. bulkRequest.add(indexRequest);
    46. }
    47. try {
    48. BulkResponse bulkResponse = esClient.bulk(bulkRequest, RequestOptions.DEFAULT);
    49. counter.addAndGet(list.size());
    50. } catch (Exception e) {
    51. e.printStackTrace();
    52. }
    53. System.out.println(" page : " + pageNo + ", flushed " + counter.get() + " records ");
    54. });
    55. }
    56. private static void pageDo(String tableName, String whereCond, String orderBy, int pageSize, int startPage,
    57. BiConsumer<Integer, List<Map<String, Object>>> func) {
    58. if (StringUtils.isNotBlank(whereCond) && (!whereCond.trim().toLowerCase().startsWith("where"))) {
    59. whereCond = " where " + whereCond;
    60. }
    61. if (StringUtils.isNotBlank(orderBy) && (!orderBy.trim().toLowerCase().startsWith("order"))) {
    62. orderBy = " order by " + orderBy;
    63. }
    64. String queryCountSql = String.format(" select count(*) from %s %s %s", tableName, whereCond, orderBy);
    65. Integer totalCount = mysqlJdbcTemplate.queryForObject(queryCountSql, Integer.class);
    66. Integer totalPage = (totalCount == null || totalCount == 0) ? 0 : (totalCount - 1) / pageSize + 1;
    67. for (int i = startPage; i < totalPage; i++) {
    68. int offset = i * pageSize;
    69. String queryPageSql = String.format(" select * from %s %s %s limit %s,%s ", tableName, whereCond, orderBy, offset, pageSize);
    70. List<Map<String, Object>> list = mysqlJdbcTemplate.queryForList(queryPageSql);
    71. func.accept(i, list);
    72. }
    73. }
    74. }

     

     

    基于中间件 kafka 的处理

    首先通过 spoon/datax 将数据从 mysql 转换到 kafka 

    然后 再由脚本从 kafka 消费数据, 处理 传输到 es 中 

    入了一次 消息队列之后, 然后程序 再来消费, 就会快很多了, 消息队列本身功能比较单纯 比较适合于做做顺序遍历 就会有优势一些 

     

    这里以 spoon 将数据从 mysql 转换到 kafka 

    我这里 本地环境 内存等什么的都不足, 因此是 一分钟 入库三万条, 但是 实际生产环境 会很快 

    在生产环境 五百多w 的数据, 基于 datax 传输 mysql 到 kafka, 差不多是 五六分钟 就可以了 

    e3cb2b641cfe4d208e11040f1b5fbc2a.png

     

     

    基于 kafka 将数据传输到 es 

    如下程序 仅仅是将 kafka 中的数据 原样照搬过去了, 但是 实际的场景 中会做一些 额外的业务处理, 这里仅仅是为了 演示 

    1. /**
    2. * Test05PostQy2Es
    3. *
    4. * @author Jerry.X.He
    5. * @version 1.0
    6. * @date 2022/11/21 16:00
    7. */
    8. public class Test05PostEsFromKafka {
    9. private static RestHighLevelClient esClient = getEsClient();
    10. private static IndicesClient indicesClient = esClient.indices();
    11. private static String esIndexName = "student_all_20221211";
    12. private static String groupId = "group-01";
    13. // Test05PostQy2Es
    14. public static void main(String[] args) throws Exception {
    15. bulkKafka2EsData(esIndexName, groupId);
    16. }
    17. private static void bulkKafka2EsData(String esIndexName, String groupId) throws Exception {
    18. List<Pair<String, String>> hjk2StdFieldMap = hjk2StdFieldMap();
    19. Properties properties = kafkaProperties(groupId);
    20. String idKey = "ID";
    21. KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
    22. kafkaConsumer.subscribe(Arrays.asList("STUDENT_ALL_20221211"));
    23. AtomicInteger counter = new AtomicInteger(0);
    24. long start = System.currentTimeMillis();
    25. while (true) {
    26. ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
    27. if (records.isEmpty()) {
    28. Thread.sleep(10 * 1000);
    29. long spent = System.currentTimeMillis() - start;
    30. System.out.println(" spent : " + (spent / 1000) + " s ");
    31. continue;
    32. }
    33. BulkRequest bulkRequest = new BulkRequest();
    34. boolean isEmpty = true;
    35. for (ConsumerRecord<String, String> record : records) {
    36. IndexRequest indexRequest = new IndexRequest(esIndexName);
    37. String value = record.value();
    38. JSONObject entity = JSON.parseObject(value);
    39. // 获取 id
    40. String id = StringUtils.defaultIfBlank(entity.getString(idKey), "");
    41. if (isFilterByQy(id)) {
    42. continue;
    43. }
    44. Map<String, Object> sourceMap = new LinkedHashMap<>();
    45. List<String> allFieldsListed = new ArrayList<>();
    46. for (Pair<String, String> entry : hjk2StdFieldMap) {
    47. String hjkKey = entry.getKey(), stdKey = entry.getValue();
    48. String fieldValue = StringUtils.defaultIfBlank(entity.getString(hjkKey), "");
    49. sourceMap.put(stdKey, fieldValue);
    50. allFieldsListed.add(Objects.toString(fieldValue, ""));
    51. }
    52. indexRequest.id(id);
    53. sourceMap.put("_allFields", StringUtils.join(allFieldsListed, "$$"));
    54. isEmpty = false;
    55. indexRequest.source(sourceMap);
    56. bulkRequest.add(indexRequest);
    57. }
    58. if (isEmpty) {
    59. continue;
    60. }
    61. try {
    62. BulkResponse bulkResponse = esClient.bulk(bulkRequest, RequestOptions.DEFAULT);
    63. counter.addAndGet(bulkRequest.requests().size());
    64. } catch (Exception e) {
    65. e.printStackTrace();
    66. }
    67. System.out.println(" flushed " + counter.get() + " records ");
    68. }
    69. }
    70. private static List<Pair<String, String>> hjk2StdFieldMap() {
    71. List<Pair<String, String>> hjk2StdFieldMap = new ArrayList<>();
    72. hjk2StdFieldMap.add(new ImmutablePair<>("id", "id"));
    73. hjk2StdFieldMap.add(new ImmutablePair<>("CREATED_AT", "CREATED_AT"));
    74. hjk2StdFieldMap.add(new ImmutablePair<>("UPDATED_AT", "UPDATED_AT"));
    75. for (int i = 0; i < Test05CreateMysqlBigTable.maxFieldIdx; i++) {
    76. String fieldName = String.format("field%s", i);
    77. hjk2StdFieldMap.add(new ImmutablePair<>(fieldName, fieldName));
    78. }
    79. return hjk2StdFieldMap;
    80. }
    81. private static Properties kafkaProperties(String groupId) {
    82. Properties properties = new Properties();
    83. properties.put("bootstrap.servers", "192.168.0.190:9092");
    84. properties.put("group.id", groupId);
    85. properties.put("enable.auto.commit", "true");
    86. properties.put("auto.commit.interval.ms", "1000");
    87. properties.put("auto.offset.reset", "earliest");
    88. properties.put("session.timeout.ms", "30000");
    89. properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    90. properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    91. properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    92. properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    93. return properties;
    94. }
    95. private static boolean isFilterByQy(String qy) {
    96. if (StringUtils.isBlank(qy)) {
    97. return true;
    98. }
    99. return false;
    100. }
    101. }

     

     

    spoon 安装 kakfa 插件

    来自 Kettle安装Kafka Consumer和Kafka Producer插件

    1. 1.从github上下载kettle的kafka插件,地址如下
    2. Kafka Consumer地址:
    3. https://github.com/RuckusWirelessIL/pentaho-kafka-consumer/releases/tag/v1.7
    4. Kafka Producer地址:
    5. https://github.com/RuckusWirelessIL/pentaho-kafka-producer/releases/tag/v1.9
    6. 2.进入 kettle 安装目录:在plugin目录下创建steps目录
    7. 3.把下载的插件解压后放到 steps 目录下
    8. 5.重启 spoon.bat 即可

     

     

     

     

    参考

    Kettle安装Kafka Consumer和Kafka Producer插件

     

     

     

  • 相关阅读:
    【附源码】Python计算机毕业设计图书销售网站
    UML类图画法介绍及说明
    笙默考试管理系统-MyExamTest----codemirror(15)
    勤奋型人格分析,勤奋型人格如何做职业规划
    【数据结构】C语言实现顺序栈 && OJ题 —— 有效的括号
    Tensorflow2.0:CNN、ResNet实现MNIST分类识别
    基于 FFmpeg 的跨平台视频播放器简明教程(九):Seek 策略
    二维码生成器
    【C#学习】给FormClosing增加消息响应函数
    【Java设计模式 思想原则重构】设计思想、设计原则、重构总结
  • 原文地址:https://blog.csdn.net/u011039332/article/details/128272380