• Java实现从Redis中批量读取数据


    一.背景概述

    本周接到一个新的需求:从用户dau日志文件中读取用户uid,然后到Redis中获取对应的用户数据。用户的uid存储于login_day_20220913.txt文件,共1亿2千多万条数据,数量达1.4G。

    要求:尽量在2小时内获得结果,在数据处理过程中,Redis服务器QPS尽量低,不超过某个阈值,不然会触发监控报警。数据从Redis从库读取,只提供一个端口。

    二.分析与实现

    由于之前做过相同数据量的统计需求,所以从一开始就确定单线程完成此次数据处理也是可以的。实际上,对多线程和并发的使用需要慎之又慎,特别是在业务繁忙的系统或环境下。

    接触Redis的朋友都知道,Redis是支持批量读取的,其中常用的两个方法:mget()和hmget()。

    本次处理的数据不是哈希结构,所以确定使用mget()。

    此时,我自然而然地问了同事一个问题,那就是mget批量处理数据的最佳参数范围是多少?因为mget()接受一个字符串数组参数,也就是说字符串数组的长度最佳为多少?

    同事并没有给我明确的答案,只是说他们日常每批次处理10000条,建议我自己可以尝试一下,于是我打算试试50000条数据。

    主要代码如下:

    1. package com.sina.weibo;
    2. import com.sina.weibo.util.FileUtils;
    3. import com.sina.weibo.util.ListUtil;
    4. import org.apache.commons.lang3.time.StopWatch;
    5. import redis.clients.jedis.Jedis;
    6. import java.util.ArrayList;
    7. import java.util.LinkedHashSet;
    8. import java.util.List;
    9. import java.util.concurrent.TimeUnit;
    10. public class Application {
    11. /** dau数据读取路径 */
    12. private static String dauDataPath = "/data1/sinawap/var/logs/wapcommon/place/user_position/dau/login_day_20220913.txt";
    13. /** 结果输出路径 */
    14. private static String outputPath = "/data1/bingqing5/importcampusdata/output/campus_data.txt";
    15. /** 已处理过的uid数据存储路径 */
    16. private static String processedUidDataPath = "/data1/bingqing5/importcampusdata/process/processed_uid.txt";
    17. public static void main(String[] args) {
    18. StopWatch stopWatch = new StopWatch();
    19. // 开始时间
    20. stopWatch.start();
    21. System.out.println("================程序开始===============");
    22. transfer(dauDataPath, processedUidDataPath, outputPath);
    23. System.out.println("================程序结束===============");
    24. // 结束时间
    25. stopWatch.stop();
    26. // 统计执行时间(秒)
    27. System.out.println("执行时长:" + stopWatch.getTime(TimeUnit.SECONDS) + " 秒.");
    28. }
    29. private static void transfer(String dauDataPath, String processedUidDataPath, String outputPath) {
    30. List dauDataList = FileUtils.readInfoFromFile(dauDataPath);
    31. List> bucket = ListUtil.splitList(dauDataList, 50000);
    32. Jedis jedis = new Jedis("rdsxxxxx.xxxx.xxxx.xxxx.com.cn",50000);
    33. List processedUidDataList = FileUtils.readInfoFromFile(processedUidDataPath);
    34. LinkedHashSet linkedHashSet = ListUtil.getLinkedHashSet(processedUidDataList);
    35. for (List list : bucket) {
    36. List jsonStrList = jedis.mget(list.toArray(new String[list.size()]));
    37. for (int i = 0; i < list.size(); i++) {
    38. if (!linkedHashSet.contains(list.get(i))) {
    39. String uid = list.get(i);
    40. FileUtils.appendInfoToFile(processedUidDataPath, uid);
    41. String jsonStr = jsonStrList.get(i);
    42. if (jsonStr == null || jsonStr == "") continue;
    43. String content = uid + "\t" + jsonStr;
    44. FileUtils.appendInfoToFile(outputPath, content);
    45. }
    46. }
    47. System.out.println(list.size());
    48. }
    49. }
    50. }

    三.发现问题与屡次改进

    3.1.QPS过高而且波动很大

    上述代码上线后没多久,就被同事找来,说QPS过高,开始的时候瞬间达到近100k,之后稳定在70k~100k之间。因为担心影响其他业务,于是把jar包暂停,着手优化。

    于是,我多次修改如下代码:

    List> bucket = ListUtil.splitList(dauDataList, 50000);
    

    将50000,调整为10000,5000,1000,500,100等值逐一尝试。

    QPS确实逐步降下来了,但是即便是每次处理1000条,QPS也有40K左右。

    3.2.程序中断,抛异常

    最终以每批次读取500条数据,将代码上线。但是程序总是中断报错,抛出异常:

    而这时候已处理的数据量达到几千万条。

    最初怀疑是因为jedis对象没有调用close方法,于是修改代码如下:

    1. package com.sina.weibo;
    2. import com.sina.weibo.util.FileUtils;
    3. import com.sina.weibo.util.ListUtil;
    4. import org.apache.commons.lang3.time.StopWatch;
    5. import redis.clients.jedis.Jedis;
    6. import java.util.ArrayList;
    7. import java.util.LinkedHashSet;
    8. import java.util.List;
    9. import java.util.concurrent.TimeUnit;
    10. public class Application {
    11. /** dau数据读取路径 */
    12. private static String dauDataPath = "/data1/sinawap/var/logs/wapcommon/place/user_position/dau/login_day_20220913.txt";
    13. /** 结果输出路径 */
    14. private static String outputPath = "/data1/bingqing5/importcampusdata/output/campus_data.txt";
    15. /** 已处理过的uid数据存储路径 */
    16. private static String processedUidDataPath = "/data1/bingqing5/importcampusdata/process/processed_uid.txt";
    17. public static void main(String[] args) {
    18. StopWatch stopWatch = new StopWatch();
    19. // 开始时间
    20. stopWatch.start();
    21. System.out.println("================程序开始===============");
    22. transfer(dauDataPath, processedUidDataPath, outputPath);
    23. System.out.println("================程序结束===============");
    24. // 结束时间
    25. stopWatch.stop();
    26. // 统计执行时间(秒)
    27. System.out.println("执行时长:" + stopWatch.getTime(TimeUnit.SECONDS) + " 秒.");
    28. }
    29. private static void transfer(String dauDataPath, String processedUidDataPath, String outputPath) {
    30. List dauDataList = FileUtils.readInfoFromFile(dauDataPath);
    31. List> bucket = ListUtil.splitList(dauDataList, 50000);
    32. List processedUidDataList = FileUtils.readInfoFromFile(processedUidDataPath);
    33. LinkedHashSet linkedHashSet = ListUtil.getLinkedHashSet(processedUidDataList);
    34. for (List list : bucket) {
    35. Jedis jedis = new Jedis(rdsxxxxx.xxxx.xxxx.xxxx.com.cn", 50000);
    36. List jsonStrList = jedis.mget(list.toArray(new String[list.size()]));
    37. for (int i = 0; i < list.size(); i++) {
    38. if (!linkedHashSet.contains(list.get(i))) {
    39. String uid = list.get(i);
    40. FileUtils.appendInfoToFile(processedUidDataPath, uid);
    41. String jsonStr = jsonStrList.get(i);
    42. if (jsonStr == null || jsonStr == "") continue;
    43. String content = uid + "\t" + jsonStr;
    44. FileUtils.appendInfoToFile(outputPath, content);
    45. }
    46. }
    47. jedis.close();
    48. System.out.println(list.size());
    49. }
    50. }
    51. }

     修改后跑程序依旧没有任何改善,继续修改,代码如下:

    1. package com.sina.weibo;
    2. import com.sina.weibo.util.FileUtils;
    3. import com.sina.weibo.util.ListUtil;
    4. import org.apache.commons.lang3.time.StopWatch;
    5. import redis.clients.jedis.Jedis;
    6. import java.util.ArrayList;
    7. import java.util.LinkedHashSet;
    8. import java.util.List;
    9. import java.util.concurrent.TimeUnit;
    10. public class A {
    11. /** dau数据读取路径 */
    12. private static String dauDataPath = "/data1/sinawap/var/logs/wapcommon/place/user_position/dau/login_day_20220913.txt";
    13. /** 结果输出路径 */
    14. private static String outputPath = "/data1/bingqing5/importcampusdata/output/campus_data.txt";
    15. /** 已处理过的uid数据存储路径 */
    16. private static String processedUidDataPath = "/data1/bingqing5/importcampusdata/process/processed_uid.txt";
    17. public static void main(String[] args) {
    18. StopWatch stopWatch = new StopWatch();
    19. // 开始时间
    20. stopWatch.start();
    21. System.out.println("================程序开始===============");
    22. transfer(dauDataPath, processedUidDataPath, outputPath);
    23. System.out.println("================程序结束===============");
    24. // 结束时间
    25. stopWatch.stop();
    26. // 统计执行时间(秒)
    27. System.out.println("执行时长:" + stopWatch.getTime(TimeUnit.SECONDS) + " 秒.");
    28. }
    29. private static void transfer(String dauDataPath, String processedUidDataPath, String outputPath) {
    30. List dauDataList = FileUtils.readInfoFromFile(dauDataPath);
    31. List> bucket = ListUtil.splitList(dauDataList, 50000);
    32. List processedUidDataList = FileUtils.readInfoFromFile(processedUidDataPath);
    33. LinkedHashSet linkedHashSet = ListUtil.getLinkedHashSet(processedUidDataList);
    34. for (List list : bucket) {
    35. Jedis jedis = new Jedis("rdsxxxxx.xxxx.xxxx.xxxx.com.cn", 50000);
    36. List jsonStrList = jedis.mget(list.toArray(new String[list.size()]));
    37. for (int i = 0; i < list.size(); i++) {
    38. if (!linkedHashSet.contains(list.get(i))) {
    39. String uid = list.get(i);
    40. FileUtils.appendInfoToFile(processedUidDataPath, uid);
    41. String jsonStr = jsonStrList.get(i);
    42. if (jsonStr == null || jsonStr == "") continue;
    43. String content = uid + "\t" + jsonStr;
    44. FileUtils.appendInfoToFile(outputPath, content);
    45. }
    46. }
    47. try {
    48. Thread.sleep(100);
    49. } catch (InterruptedException e) {
    50. e.printStackTrace();
    51. } finally {
    52. jedis.close();
    53. }
    54. System.out.println(list.size());
    55. }
    56. }
    57. }

    上线以后,观测发现QPS区域稳定,但是程序会空跑,也就是从头开始将已处理的数据也要逐一读取一次,很多时候都没有跑到上次程序处理的地方就已经被迫退出。

    linkedHashSet本来是用来标记上次程序运行停止的地方,但是似乎并没有完全发挥作用。

    于是修改代码,加入一个新的list集合,用于存放还没有处理过的数据,代码如下:

    1. package com.sina.weibo;
    2. import com.sina.weibo.util.FileUtils;
    3. import com.sina.weibo.util.ListUtil;
    4. import org.apache.commons.lang3.time.StopWatch;
    5. import redis.clients.jedis.Jedis;
    6. import java.util.ArrayList;
    7. import java.util.LinkedHashSet;
    8. import java.util.List;
    9. import java.util.concurrent.TimeUnit;
    10. /**
    11. * @author bingqing5
    12. * @date 2022/09/14 15:00
    13. * @version 1.0
    14. */
    15. public class Application {
    16. /** dau数据读取路径 */
    17. private static String dauDataPath = "/data1/sinawap/var/logs/wapcommon/place/user_position/dau/login_day_20220913.txt";
    18. /** 结果输出路径 */
    19. private static String outputPath = "/data1/bingqing5/importcampusdata/output/campus_data.txt";
    20. /** 已处理过的uid数据存储路径 */
    21. private static String processedUidDataPath = "/data1/bingqing5/importcampusdata/process/processed_uid.txt";
    22. public static void main(String[] args) {
    23. StopWatch stopWatch = new StopWatch();
    24. // 开始时间
    25. stopWatch.start();
    26. System.out.println("================程序开始===============");
    27. // transfer(dauDataPath, processedUidDataPath, outputPath);
    28. List dauDataList = FileUtils.readInfoFromFile(dauDataPath);
    29. // List> bucket = ListUtil.splitList(dauDataList, 50000);
    30. // Jedis jedis = new Jedis("rdsxxxxx.xxxx.xxxx.xxxx.com.cn", 50000);
    31. List processedUidDataList = FileUtils.readInfoFromFile(processedUidDataPath);
    32. LinkedHashSet linkedHashSet = ListUtil.getLinkedHashSet(processedUidDataList);
    33. List uidList = new ArrayList<>();
    34. for (String uid : dauDataList) {
    35. if (linkedHashSet.contains(uid)) {
    36. continue;
    37. } else {
    38. uidList.add(uid);
    39. }
    40. }
    41. List> bucket;
    42. if (uidList.size() != 0) {
    43. bucket = ListUtil.splitList(uidList, 10000);
    44. } else {
    45. bucket = new ArrayList<>();
    46. }
    47. for (List list : bucket) {
    48. Jedis jedis = new Jedis("rdsxxxxx.xxxx.xxxx.xxxx.com.cn", 50000);
    49. List jsonStrList = jedis.mget(list.toArray(new String[list.size()]));
    50. for (int i = 0; i < list.size(); i++) {
    51. if (!linkedHashSet.contains(list.get(i))) {
    52. String uid = list.get(i);
    53. FileUtils.appendInfoToFile(processedUidDataPath, uid);
    54. String jsonStr = jsonStrList.get(i);
    55. if (jsonStr == null || jsonStr == "") continue;
    56. String content = uid + "\t" + jsonStr;
    57. FileUtils.appendInfoToFile(outputPath, content);
    58. }
    59. }
    60. try {
    61. Thread.sleep(100);
    62. } catch (InterruptedException e) {
    63. e.printStackTrace();
    64. } finally {
    65. jedis.close();
    66. }
    67. System.out.println(list.size());
    68. }
    69. System.out.println("================程序结束===============");
    70. // 结束时间
    71. stopWatch.stop();
    72. // 统计执行时间(秒)
    73. System.out.println("执行时长:" + stopWatch.getTime(TimeUnit.SECONDS) + " 秒.");
    74. }
    75. }

    终于这次修改后,上线代码,代码平稳运行。

    此时查看QPS,发现10000的批读取量,QPS文档在25K以下,此前同样的数据量,QPS能达到40K。

     3.3.内存消耗过大

    在上次修改后,程序平稳运行,期间我查看了机器状态,发现我跑的jar包竟然消耗了32%左右的内存,那台机器也不过62G的总内存。虽然不缺内存资源,但是还是决定趁着程序在跑的期间,回顾一下代码。

    List> bucket = ListUtil.splitList(dauDataList, 10000);

    上面这行代码是将所有的用户uid数据按照10000的大小均等分割,每次遍历,要重复创建同一类Jedis对象,也会消耗大量内存。

    另外,下面这段程序:

    1. List uidList = new ArrayList<>();
    2. for (String uid : dauDataList) {
    3. if (linkedHashSet.contains(uid)) {
    4. continue;
    5. } else {
    6. uidList.add(uid);
    7. }
    8. }

    已经对处理过的数据做过筛选,在循环中再次做如下判断:

    1. if (!linkedHashSet.contains(list.get(i))) {
    2. }

    也是多次一举,会增加耗时。

    综合以上考虑,我做了修改,代码如下:

    1. package com.sina.weibo;
    2. import com.sina.weibo.util.FileUtils;
    3. import com.sina.weibo.util.ListUtil;
    4. import org.apache.commons.lang3.time.StopWatch;
    5. import redis.clients.jedis.Jedis;
    6. import java.util.ArrayList;
    7. import java.util.LinkedHashSet;
    8. import java.util.List;
    9. import java.util.concurrent.TimeUnit;
    10. /**
    11. * @author bingqing5
    12. * @date 2022/09/14 15:00
    13. * @version 1.0
    14. */
    15. public class Application {
    16. /** dau数据读取路径 */
    17. private static String dauDataPath = "/data1/sinawap/var/logs/wapcommon/place/user_position/dau/login_day_20220913.txt";
    18. /** 结果输出路径 */
    19. // private static String outputPath = "/data1/bingqing5/redis_test/output/campus_data.txt";
    20. private static String outputPath = "/data1/bingqing/redis_test/output/campus_data.txt";
    21. /** 已处理过的uid数据存储路径 */
    22. // private static String processedUidDataPath = "/data1/bingqing5/redis_test/process/processed_uid.txt";
    23. private static String processedUidDataPath = "/data1/bingqing/redis_test/process/processed_uid.txt";
    24. public static void main(String[] args) {
    25. StopWatch stopWatch = new StopWatch();
    26. // 开始时间
    27. stopWatch.start();
    28. System.out.println("================程序开始===============");
    29. transfer(dauDataPath, processedUidDataPath, outputPath);
    30. System.out.println("================程序结束===============");
    31. // 结束时间
    32. stopWatch.stop();
    33. // 统计执行时间(秒)
    34. System.out.println("执行时长:" + stopWatch.getTime(TimeUnit.SECONDS) + " 秒.");
    35. }
    36. private static void transfer(String dauDataPath, String processedUidDataPath, String outputPath) {
    37. List dauDataList = FileUtils.readInfoFromFile(dauDataPath);
    38. Jedis jedis = new Jedis("rdsxxxxx.xxxx.xxxx.xxxx.com.cn", 50000);
    39. List processedUidDataList = FileUtils.readInfoFromFile(processedUidDataPath);
    40. LinkedHashSet linkedHashSet = ListUtil.getLinkedHashSet(processedUidDataList);
    41. List uidList = new ArrayList<>();
    42. for (String uid : dauDataList) {
    43. if (linkedHashSet.contains(uid)) {
    44. continue;
    45. } else {
    46. uidList.add(uid);
    47. }
    48. }
    49. List> bucket;
    50. if (uidList.size() != 0) {
    51. bucket = ListUtil.splitList(uidList, 50000);
    52. } else {
    53. bucket = new ArrayList<>();
    54. }
    55. for (List list : bucket) {
    56. List jsonStrList = jedis.mget(list.toArray(new String[list.size()]));
    57. for (int i = 0; i < list.size(); i++) {
    58. String uid = list.get(i);
    59. FileUtils.appendInfoToFile(processedUidDataPath, uid);
    60. String jsonStr = jsonStrList.get(i);
    61. if (jsonStr == null || jsonStr == "") continue;
    62. String content = uid + "\t" + jsonStr;
    63. FileUtils.appendInfoToFile(outputPath, content);
    64. }
    65. try {
    66. Thread.sleep(100);
    67. } catch (InterruptedException e) {
    68. e.printStackTrace();
    69. } finally {
    70. jedis.close();
    71. }
    72. System.out.println(list.size());
    73. }
    74. }
    75. }

    修改代码以后,替换掉原先运行的jar包,接着运行。发现内存消耗明显降低,稳定占总内存的20%。

    然后尝试修改了mget参数量,修改为50000条,再次运行程序发现QPS稳定在40K左右。

     四.总结

    本篇算是笔者刚接触Redis不久的一篇随手记。通过本次需求的开发经历,让我对Redis有了直观的了解,同时也理解了代码优化在实际生产工作和开发中的潜在价值。

    关于Redis,在快速直接从Redis读取数据的场景中,尤其是数据量大的时候,为了防止QPS过高,最好在处理一批次数据后空出一定的时间间隔,比如可以让线程暂时休眠一定时间间隔,再进行下批次读取和处理。

    关于代码优化,尽量创建可重复使用的对象,非必要不添加同类对象,避免大量创建对象带来的资源消耗,本次经历也算是很鲜明的体会到这点。

  • 相关阅读:
    IMU标定之---Allan方差
    JsonCpp 使用指导
    从零开始 Spring Boot 17:MyBatis Plus 续
    【最小的k个数】
    AQS(AbstractQueuedSynchronizer)框架之ReentrantLock
    刷代码随想录有感(66):回溯算法——组合问题的优化(剪枝)
    Linux学习第22天:Linux中断驱动开发(一): 突如其来
    position属性练习:画一个爱心送给你。
    【cmake实战十】c++从动态库(dll)导出类
    String的方法介绍以及实现
  • 原文地址:https://blog.csdn.net/lbq15735104044/article/details/126891550