• Redis分布式锁(中)


    作者简介:大家好,我是smart哥,前中兴通讯、美团架构师,现某互联网公司CTO

    联系qq:184480602,加我进群,大家一起学习,一起进步,一起对抗互联网寒冬

    我们在不久前介绍了SpringBoot定时任务,最近又一起探究了如何使用Redis实现简单的消息队列,都是一些不错的小知识点。为了能跟前面的内容产生联动,这次我们打算把Redis分布式锁相关的介绍融合进定时任务的案例中,学起来更带劲~

    Redis的锁长啥样?

    上一篇我们粗略介绍了JVM锁,比如synchronized关键字和ReentrantLock,它们都是实实在在已经实现的锁,而且还有标志位啥的。但Redis就是一个内存...怎么作为锁呢?

    有一点大家要明确,Redis之所以能用来做分布式锁,肯定不只是因为它是一片内存,否则JVM本身也占有内存,为什么无法自己实现分布式锁呢?

    我个人的理解是,要想自定义一个分布式锁,必须至少满足几个条件:

    • 独立于多节点系统之外的一片内存
    • 唯一性(可以通过单线程,也可以通过选举机制,能保证唯一即可)
    • 当然,如果性能高一点,甚至支持高可用就更好啦

    以上三点Redis都能满足。在上面三个条件下,其实怎么设计锁,完全取决于个人如何定义锁。就好比现实生活中,通常我们理解的锁就是有个钥匙孔、需要插入钥匙的金属小物件。然而锁的形态可不止这么一种,随着科技的发展,什么指纹锁、虹膜锁层出不穷,但归根结底它们之所以被称为“锁”,是因为都保证了“唯一”。

    如果我们能设计一种逻辑,它能造成某个场景下的“唯一事件”,那么它就可以被称为“锁”。比如,某家很有名的网红店,一天只接待一位客人。门口没有营业员,就放了一台取号机,里面放了一张票。你如果去迟了,票就没了,你就进不了这家店。这个场景下,没票的顾客进不去,被锁在门外。此时,取票机造成了“唯一事件”,那么它就可以叫做“锁”。

    而Redis提供了setnx指令,如果某个key当前不存在则设置成功并返回true,否则不再重复设置,直接返回false。这不就是编程界的取号机吗?当然,实际用到的命令可不止这一个,具体如何实现,大家等下看代码即可。

    Demo构思

    在我看来,同样需要使用锁,动机可能完全相反:

    • 在保证线程安全的前提下,尽量让所有线程都执行成功
    • 在保证线程安全的前提下,只让一个线程执行成功

    前者适用于秒杀等场景。作为商家,当然希望在不发生线程安全问题的前提下,让每一个订单都生效,直到商品售罄。此时分布式锁的写法可以是“不断重试”“阻塞等待”,即:递归或while true循环尝试获取、阻塞等待。

    而后者适用于分布式系统或多节点项目的定时任务,比如同一份代码部署在A、B两台服务器上,而数据库共用同一个。如果不做限制,那么在同一时刻,两台服务器都会去拉取列表执行,会发生任务重复执行的情况。

    此时可以考虑使用分布式锁,在cron触发的时刻只允许一个线程去往数据库拉取任务:

    在实现Redis分布式锁控制定时任务唯一性的同时,我们引入之前的Redis消息队列。注意,这与Redis分布式锁本身无关,就是顺便复习一遍Redis消息队列而已,大家可以只实现Redis分布式锁+定时任务的部分。

    整个Demo的结构大致如图:

    当然,实际项目中一般是这样的:

    分布式锁为什么难设计?

    首先,要和大家说一下,但凡牵涉到分布式的处理,没有一个是简单的,上面的Demo设计也不过是玩具,用来启发 大家的思路。

    为什么要把demo设计得这么复杂呢?哈哈,因为这是我在上一家公司自己设计的,遇到了很多坑...拿出来自嘲一番,与各位共勉。

    我当时的设计思路是:

    由于小公司没有用什么Elastic-Job啥的,就是很普通的多节点部署。为了避免任务重复执行,我想设计一个分布式锁。但因为当时根本不知道redisson,所以就自己百度了redis实现分布式锁的方式,然后依葫芦画瓢自己手写了一个 。

    但我写完redis分布式锁后,在实际测试过程中发现还需要考虑锁的失效时间...

    这里有两个问题:

    • 为什么要设置锁的过期时间?
    • 锁的过期时间设置多久合适?

    最简单的实现方案是这样的,一般没问题:

    但极端的情况下(项目在任务进行时重启或意外宕机),可能当前任务来不及解锁就挂了(死锁),那么下一个任务就会一直被锁在方法外等待。就好比厕所里有人被熏晕了,没法开门,而外面的人又进不去...

    此时需要装一个自动解锁的门,到时间自动开门,也就是要给锁设置一个过期时间。但紧接着又会有第二个问题:锁的失效时间设多长合适?

    很难定。

    因为随着项目的发展,定时任务的执行时间很可能是变化的。

    如果设置时间过长,极端点,定为365天。假设任务正常执行,比如10分钟就结束,此时执行完毕的任务自己会主动解锁。但万一和上面一样宕机了,虽说你设置了过期时间,但下一个任务需要等一年才能执行...本质上和没有设置过期时间一样!就好比...你自己想想什么例子合适,能加深你的理解哦。

    如果设置时间过短,上一个人还没拉完,门就“咔嚓”一声开了,尴尬不,重复执行了。

    终上所述,我当时之所以设计得这么复杂,就是想尽量缩短任务执行的时间,让它尽可能短(拉取后直接丢给队列,自己不处理),这样锁的时间一般设置30分钟就没啥问题。另外,对于死锁问题,我当时没有考虑宕机的情况,只考虑了意外重启…问题还有很多,文末会再总结。

    请大家阅读下面代码时思考两个问题:

    • Demo如何处理锁的过期时间
    • Demo如何防止死锁

    项目搭建

    新建一个空的SpringBoot项目。

    拷贝下方代码,构建工程:

    构建完以后,拷贝一份,修改端口号为8081,避免和原先的冲突

    统一管理Redis Key:RedisKeyConst

    1. /**
    2. * 统一管理Redis Key
    3. *
    4. * @author mx
    5. */
    6. public abstract class RedisKeyConst {
    7. /**
    8. * 分布式锁的KEY
    9. */
    10. public static final String RESUME_PULL_TASK_LOCK = "resume_pull_task_lock";
    11. /**
    12. * 简历异步解析任务队列
    13. */
    14. public static final String RESUME_PARSE_TASK_QUEUE = "resume_parse_task_queue";
    15. }

    Redis消息队列:RedisMessageQueueConsumer

    1. /**
    2. * 消费者,异步获取简历解析结果并存入数据库
    3. *
    4. * @author mx
    5. */
    6. @Slf4j
    7. @Component
    8. public class RedisMessageQueueConsumer implements ApplicationListener<ContextRefreshedEvent> {
    9. @Autowired
    10. private RedisService redisService;
    11. @Autowired
    12. private AsyncResumeParser asyncResumeParser;
    13. @Autowired
    14. private ObjectMapper objectMapper;
    15. @Override
    16. public void onApplicationEvent(ContextRefreshedEvent event) {
    17. log.info("开始监听RedisMessageQueue...");
    18. CompletableFuture.runAsync(() -> {
    19. // 大循环,不断监听队列任务(阻塞式)
    20. while (true) {
    21. // 阻塞监听
    22. ResumeCollectionDTO resumeCollectionDTO = (ResumeCollectionDTO) redisService.popQueue(RedisKeyConst.RESUME_PARSE_TASK_QUEUE, 5, TimeUnit.SECONDS);
    23. if (resumeCollectionDTO != null) {
    24. int rePullCount = 0;
    25. int retryCount = 0;
    26. log.info("从队列中取出:{}", resumeCollectionDTO.getName());
    27. log.info(">>>>>>>>>>>>>>>>>>>开始拉取简历:{}", resumeCollectionDTO.getName());
    28. Long asyncPredictId = resumeCollectionDTO.getAsyncPredictId();
    29. // 小循环,针对每一个任务多次调用第三方接口,直到获取最终结果或丢弃任务
    30. while (true) {
    31. try {
    32. PredictResult result = asyncResumeParser.getResult(asyncPredictId);
    33. rePullCount++;
    34. // 如果已经解析完毕
    35. if (result.getStatus() == 2) {
    36. // 保存数据库
    37. try {
    38. log.info("简历:{}解析成功", resumeCollectionDTO.getName());
    39. log.info("resultJson:{}", result.getResultJson());
    40. ResumeCollectionDO resumeCollectionDO = objectMapper.readValue(result.getResultJson(), ResumeCollectionDO.class);
    41. log.info("<<<<<<<<<<<<<<<<<<<保存简历:{}到数据库", resumeCollectionDO);
    42. // 归零
    43. rePullCount = 0;
    44. retryCount = 0;
    45. break;
    46. } catch (Exception e) {
    47. discardTask(resumeCollectionDTO);
    48. log.info("<<<<<<<<<<<<<<<<<<<保存简历失败,丢弃任务");
    49. rePullCount = 0;
    50. retryCount = 0;
    51. break;
    52. }
    53. }
    54. // 远程服务还未解析完毕,重试
    55. else {
    56. try {
    57. if (rePullCount <= 3) {
    58. //3次重试,时间为1s间隔
    59. TimeUnit.SECONDS.sleep(1);
    60. log.info("简历:{}尚未解析完毕, 准备进行第{}次重试, 停顿1s后进行", resumeCollectionDTO.getName(), rePullCount);
    61. } else if (rePullCount > 3 && rePullCount <= 6) {
    62. // 说明任务比较耗时,加长等待时间
    63. TimeUnit.SECONDS.sleep(2);
    64. log.info("简历:{}尚未解析完毕, 准备进行第{}次重试, 停顿2s后进行", resumeCollectionDTO.getName(), rePullCount);
    65. } else if (rePullCount > 6 && rePullCount <= 8) {
    66. // 说明任务比较耗时,加长等待时间
    67. TimeUnit.SECONDS.sleep(3);
    68. log.info("简历:{}尚未解析完毕, 准备进行第{}次重试, 停顿3s后进行", resumeCollectionDTO.getName(), rePullCount);
    69. } else {
    70. discardTask(resumeCollectionDTO);
    71. log.info("<<<<<<<<<<<<<<<<<<<多次拉取仍未得到结果, 丢弃简历:{}", resumeCollectionDTO.getName());
    72. retryCount = 0;
    73. rePullCount = 0;
    74. break;
    75. }
    76. } catch (InterruptedException e) {
    77. discardTask(resumeCollectionDTO);
    78. log.info("<<<<<<<<<<<<<<<<<<<任务中断异常, 简历:{}", resumeCollectionDTO.getName());
    79. rePullCount = 0;
    80. retryCount = 0;
    81. break;
    82. }
    83. }
    84. } catch (Exception e) {
    85. if (retryCount > 3) {
    86. discardTask(resumeCollectionDTO);
    87. log.info("<<<<<<<<<<<<<<<<<<<简历:{}重试{}次后放弃, rePullCount:{}, retryCount:{}", resumeCollectionDTO.getName(), retryCount, rePullCount, retryCount);
    88. rePullCount = 0;
    89. retryCount = 0;
    90. break;
    91. }
    92. retryCount++;
    93. log.info("简历:{}远程调用异常, 准备进行第{}次重试...", resumeCollectionDTO.getName(), retryCount);
    94. }
    95. }
    96. log.info("break......");
    97. }
    98. }
    99. });
    100. }
    101. private void discardTask(ResumeCollectionDTO task) {
    102. // 根据asyncPredictId删除任务...
    103. log.info("丢弃任务:{}...", task.getName());
    104. }
    105. }

    实体类:DO+DTO

    1. @Data
    2. @NoArgsConstructor
    3. @AllArgsConstructor
    4. public class ResumeCollectionDO {
    5. /**
    6. * 简历id
    7. */
    8. private Long id;
    9. /**
    10. * 简历名称
    11. */
    12. private String name;
    13. }
    14. @Data
    15. @NoArgsConstructor
    16. @AllArgsConstructor
    17. public class ResumeCollectionDTO implements Serializable {
    18. /**
    19. * 简历id
    20. */
    21. private Long id;
    22. /**
    23. * 异步解析id,稍后根据id可获取最终解析结果
    24. */
    25. private Long asyncPredictId;
    26. /**
    27. * 简历名称
    28. */
    29. private String name;
    30. }

    分布式锁:RedisService

    1. public interface RedisService {
    2. /**
    3. * 向队列插入消息
    4. *
    5. * @param queue 自定义队列名称
    6. * @param obj 要存入的消息
    7. */
    8. void pushQueue(String queue, Object obj);
    9. /**
    10. * 从队列取出消息
    11. *
    12. * @param queue 自定义队列名称
    13. * @param timeout 最长阻塞等待时间
    14. * @param timeUnit 时间单位
    15. * @return
    16. */
    17. Object popQueue(String queue, long timeout, TimeUnit timeUnit);
    18. /**
    19. * 尝试上锁
    20. *
    21. * @param lockKey
    22. * @param value
    23. * @param expireTime
    24. * @param timeUnit
    25. * @return
    26. */
    27. boolean tryLock(String lockKey, String value, long expireTime, TimeUnit timeUnit);
    28. /**
    29. * 根据MACHINE_ID解锁(只能解自己的)
    30. *
    31. * @param lockKey
    32. * @param value
    33. * @return
    34. */
    35. boolean unLock(String lockKey, String value);
    36. /**
    37. * 释放锁,不管是不是自己的
    38. *
    39. * @param lockKey
    40. * @param value
    41. * @return
    42. */
    43. boolean releaseLock(String lockKey, String value);
    44. }
    45. @Slf4j
    46. @Component
    47. public class RedisServiceImpl implements RedisService {
    48. @Autowired
    49. private RedisTemplate redisTemplate;
    50. /**
    51. * 向队列插入消息
    52. *
    53. * @param queue 自定义队列名称
    54. * @param obj 要存入的消息
    55. */
    56. @Override
    57. public void pushQueue(String queue, Object obj) {
    58. redisTemplate.opsForList().leftPush(queue, obj);
    59. }
    60. /**
    61. * 从队列取出消息
    62. *
    63. * @param queue 自定义队列名称
    64. * @param timeout 最长阻塞等待时间
    65. * @param timeUnit 时间单位
    66. * @return
    67. */
    68. @Override
    69. public Object popQueue(String queue, long timeout, TimeUnit timeUnit) {
    70. return redisTemplate.opsForList().rightPop(queue, timeout, timeUnit);
    71. }
    72. /**
    73. * 尝试上锁
    74. *
    75. * @param lockKey
    76. * @param value
    77. * @param expireTime
    78. * @param timeUnit
    79. * @return
    80. */
    81. @Override
    82. public boolean tryLock(String lockKey, String value, long expireTime, TimeUnit timeUnit) {
    83. Boolean lock = redisTemplate.opsForValue().setIfAbsent(lockKey, value);
    84. if (lock != null && lock) {
    85. redisTemplate.expire(lockKey, expireTime, timeUnit);
    86. return true;
    87. } else {
    88. return false;
    89. }
    90. }
    91. /**
    92. * 根据MACHINE_ID解锁(只能解自己的)
    93. *
    94. * @param lockKey
    95. * @param value
    96. * @return
    97. */
    98. @Override
    99. public boolean unLock(String lockKey, String value) {
    100. String machineId = (String) redisTemplate.opsForValue().get(lockKey);
    101. if (StringUtils.isNotEmpty(machineId) && machineId.equals(value)) {
    102. redisTemplate.delete(lockKey);
    103. return true;
    104. }
    105. return false;
    106. }
    107. /**
    108. * 释放锁,不管是不是自己的
    109. *
    110. * @param lockKey
    111. * @param value
    112. * @return
    113. */
    114. @Override
    115. public boolean releaseLock(String lockKey, String value) {
    116. Boolean delete = redisTemplate.delete(lockKey);
    117. if (delete != null && delete) {
    118. log.info("Spring启动,节点:{}成功释放上次简历汇聚定时任务锁", value);
    119. return true;
    120. }
    121. return false;
    122. }
    123. }

    定时任务:ResumeCollectionTask

    1. @Slf4j
    2. @Component
    3. @EnableScheduling
    4. public class ResumeCollectionTask implements ApplicationListener<ContextRefreshedEvent> {
    5. /**
    6. * 当这份代码被部署到不同的服务器,启动时为每台机器分配一个唯一的机器ID
    7. */
    8. private static String MACHINE_ID = IdUtil.randomUUID();
    9. @Autowired
    10. private RedisService redisService;
    11. @Autowired
    12. private AsyncResumeParser asyncResumeParser;
    13. @Scheduled(cron = "0 */1 * * * ?")
    14. // @Scheduled(fixedDelay = 60 * 1000L)
    15. public void resumeSchedule() {
    16. // 尝试上锁,返回truefalse,锁的过期时间设置为10分钟(实际要根据项目调整,这也是自己实现Redis分布式锁的难点之一)
    17. boolean lock = redisService.tryLock(RedisKeyConst.RESUME_PULL_TASK_LOCK, MACHINE_ID, 10, TimeUnit.MINUTES);
    18. // 如果当前节点成功获取锁,那么整个系统只允许当前程序去MySQL拉取待执行任务
    19. if (lock) {
    20. log.info("节点:{}获取锁成功,定时任务启动", MACHINE_ID);
    21. try {
    22. collectResume();
    23. } catch (Exception e) {
    24. log.info("定时任务异常:", e);
    25. } finally {
    26. redisService.unLock(RedisKeyConst.RESUME_PULL_TASK_LOCK, MACHINE_ID);
    27. log.info("节点:{}释放锁,定时任务结束", MACHINE_ID);
    28. }
    29. } else {
    30. log.info("节点:{}获取锁失败,放弃定时任务", MACHINE_ID);
    31. }
    32. }
    33. /**
    34. * 任务主体:
    35. * 1.从数据库拉取符合条件的HR邮箱
    36. * 2.从HR邮箱拉取附件简历
    37. * 3.调用远程服务异步解析简历
    38. * 4.插入待处理任务到数据库,作为记录留存
    39. * 5.把待处理任务的id丢到Redis Message Queue,让Consumer去异步处理
    40. */
    41. private void collectResume() throws InterruptedException {
    42. // 跳过12两步,假设已经拉取到简历
    43. log.info("节点:{}从数据库拉取任务简历", MACHINE_ID);
    44. List<ResumeCollectionDO> resumeCollectionList = new ArrayList<>();
    45. resumeCollectionList.add(new ResumeCollectionDO(1L, "张三的简历.pdf"));
    46. resumeCollectionList.add(new ResumeCollectionDO(2L, "李四的简历.html"));
    47. resumeCollectionList.add(new ResumeCollectionDO(3L, "王五的简历.doc"));
    48. // 模拟数据库查询耗时
    49. TimeUnit.SECONDS.sleep(3);
    50. log.info("提交任务到消息队列:{}", resumeCollectionList.stream().map(ResumeCollectionDO::getName).collect(Collectors.joining(",")));
    51. for (ResumeCollectionDO resumeCollectionDO : resumeCollectionList) {
    52. // 上传简历异步解析,得到异步结果id
    53. Long asyncPredictId = asyncResumeParser.asyncParse(resumeCollectionDO);
    54. // 把任务插入数据库
    55. // 略...
    56. // 把任务丢到Redis Message Queue
    57. ResumeCollectionDTO resumeCollectionDTO = new ResumeCollectionDTO();
    58. BeanUtils.copyProperties(resumeCollectionDO, resumeCollectionDTO);
    59. resumeCollectionDTO.setAsyncPredictId(asyncPredictId);
    60. redisService.pushQueue(RedisKeyConst.RESUME_PARSE_TASK_QUEUE, resumeCollectionDTO);
    61. }
    62. }
    63. /**
    64. * 项目重启后先尝试删除之前的锁(如果存在),防止死锁等待
    65. *
    66. * @param event the event to respond to
    67. */
    68. @Override
    69. public void onApplicationEvent(ContextRefreshedEvent event) {
    70. redisService.releaseLock(RedisKeyConst.RESUME_PULL_TASK_LOCK, MACHINE_ID);
    71. }
    72. }

    模拟第三方服务(异步)

    1. /**
    2. * 第三方提供给的简历解析服务
    3. *
    4. * @author mx
    5. */
    6. @Service
    7. public class AsyncResumeParser {
    8. @Autowired
    9. private ObjectMapper objectMapper;
    10. /**
    11. * 模拟分配异步任务结果id,不用深究,没啥意义,反正每个任务都会得到一个id,稍后根据id返回最终解析结果
    12. */
    13. private static final AtomicLong ASYNC_RESULT_ID = new AtomicLong(1000);
    14. /**
    15. * 解析结果
    16. */
    17. private static final Map<Long, String> results = new HashMap<>();
    18. /**
    19. * 模拟第三方服务异步解析,返回解析结果
    20. *
    21. * @param resumeCollectionDO
    22. * @return
    23. */
    24. public Long asyncParse(ResumeCollectionDO resumeCollectionDO) {
    25. long asyncPredictId = ASYNC_RESULT_ID.getAndIncrement();
    26. try {
    27. String resultJson = objectMapper.writeValueAsString(resumeCollectionDO);
    28. results.put(asyncPredictId, resultJson);
    29. return asyncPredictId;
    30. } catch (JsonProcessingException e) {
    31. e.printStackTrace();
    32. }
    33. return -1L;
    34. }
    35. /**
    36. * 根据异步id返回解析结果,但此时未必已经解析成功
    37. *

    38. * 解析状态
    39. * 0 初始化
    40. * 1 处理中
    41. * 2 调用成功
    42. * 3 调用失败
    43. *
    44. * @param asyncPredictId
    45. * @return
    46. */
    47. public PredictResult getResult(Long asyncPredictId) throws ParseErrorException, InterruptedException {
    48. // 随机模拟异步解析的状态
    49. int value = ThreadLocalRandom.current().nextInt(100);
    50. if (value >= 85) {
    51. // 模拟解析完成
    52. TimeUnit.SECONDS.sleep(1);
    53. String resultJson = results.get(asyncPredictId);
    54. return new PredictResult(resultJson, 2);
    55. } else if (value <= 5) {
    56. // 模拟解析异常
    57. TimeUnit.SECONDS.sleep(1);
    58. throw new ParseErrorException("简历解析异常");
    59. }
    60. // 如果时间过短,返回status=1,表示解析中
    61. TimeUnit.SECONDS.sleep(1);
    62. return new PredictResult("", 1);
    63. }
    64. }
    65. /**
    66. * 解析异常
    67. *
    68. * @author mx
    69. */
    70. public class ParseErrorException extends Exception {
    71. /**
    72. * Constructs a new exception with {@code null} as its detail message.
    73. * The cause is not initialized, and may subsequently be initialized by a
    74. * call to {@link #initCause}.
    75. */
    76. public ParseErrorException() {
    77. }
    78. /**
    79. * Constructs a new exception with the specified detail message. The
    80. * cause is not initialized, and may subsequently be initialized by
    81. * a call to {@link #initCause}.
    82. *
    83. * @param message the detail message. The detail message is saved for
    84. * later retrieval by the {@link #getMessage()} method.
    85. */
    86. public ParseErrorException(String message) {
    87. super(message);
    88. }
    89. }
    90. /**
    91. * 第三方返回值
    92. *
    93. * @author mx
    94. */
    95. @Data
    96. @NoArgsConstructor
    97. @AllArgsConstructor
    98. public class PredictResult {
    99. /**
    100. * 解析结果
    101. */
    102. private String resultJson;
    103. /**
    104. * 解析状态
    105. * 0 初始化
    106. * 1 处理中
    107. * 2 调用成功
    108. * 3 调用失败
    109. */
    110. private Integer status;
    111. }

    模拟异常

    在项目运行过程中,启动这个测试类的方法,即可观察不一样的现象。

    1. @SpringBootTest
    2. class RedisDistributedLockApplicationTests {
    3. @Autowired
    4. private RedisService redisService;
    5. /**
    6. * 作为失败案例(因为不存在777L这个解析任务,AsyncResumeParse.results会返回null)
    7. * 观察RedisMessageQueueConsumer的处理方式
    8. */
    9. @Test
    10. void contextLoads() {
    11. ResumeCollectionDTO resumeCollectionDTO = new ResumeCollectionDTO();
    12. resumeCollectionDTO.setId(666L);
    13. resumeCollectionDTO.setAsyncPredictId(777L);
    14. resumeCollectionDTO.setName("测试1号");
    15. redisService.pushQueue(RedisKeyConst.RESUME_PARSE_TASK_QUEUE, resumeCollectionDTO);
    16. }
    17. }

    pom.xml

    1. server:
    2. port: 8080
    3. spring:
    4. redis:
    5. host:
    6. password:
    7. database:

    效果展示

    啥都不说了,都在代码里了。大家自己拷贝到本地,动手玩一下,加深对Redis锁和Redis消息队列的理解。

    只有一个定时任务能去数据库拉取任务,到时多节点部署大致是下面这样(redis一般是独立部署的,和节点代码无关):

    后话

    上面展示的代码其实存在很多问题,我们会在下一篇指出并讨论解决方案。

    本文仅提供思路,开阔大家的眼界,千万别在自己项目中使用!!!!我当年被这个坑惨了,花里胡哨的,尤其Consumer里一大堆的sleep(),是非常low的!!

    对于异步调用的结果,不要循环等待,而应该分为几步:

    1. 调用异步接口,得到异步结果唯一id
    2. 将结果id保存到任务表中,作为一个任务
    3. 启动定时任务,根据id拉取最终结果(如果还没有结果,不更改状态,等下一个定时任务处理)

    分布式定时任务可以考虑xxl-job或elastic-job,分布式锁推荐使用redisson。

  • 相关阅读:
    如何使用VS创建QVTKOpenGLNativeWidget应用
    unity基础1-事件执行顺序、自定义事件
    【Vue五分钟】五分钟了解--Vue过渡
    拓展认知边界:如何给大语言模型添加额外的知识
    基于FPGA的ECG信号采集,存储以及传输系统verilog实现
    【CSDN创作活动】——竞赛那些事
    gtk显示4通道rgba图像
    基于Matlab实现logistic方法(源码+数据)
    手把手教你CSP系列之style-src
    面试题53:vue数据的双向绑定原理(如何实现vue的双向绑定)
  • 原文地址:https://blog.csdn.net/smart_an/article/details/134436847