• 任务调度框架-如何实现定时任务+RabbitMQ事务+手动ACK


    任务调度框架

    Java中如何实现定时任务

    比如:
    1.每天早上6点定时执行
    2.每月最后一个工作日,考勤统计
    3.每个月25号信用卡还款
    4.会员生日祝福
    5.每隔3秒,自动提醒

    10分钟的超时订单的自动取消,每隔30秒或1分钟查询一次订单,拿当前的时间上前推10分钟
    定时任务,资源会有误差的存在,如果使用定时任务
    定时任务,用于统计的时候最多。

    自动统计考勤,一般0点之后开始统计,可以使用定时任务

    nacos心跳

    晚上要求和采购部门生成采购单,达到最低预警值的时候,去发给采购部门

    我们可以通过任务调度框架实现上述的需求
    任务调度框架,可以实现定时任务,实现间隔多少时间的重复执行,实现指定日期的重复执行

    电商自动好评,间隔时间长的,误差几分钟,影响不大。

    java中任务调度框架:
    1、Spring Task spring自带的
    2、Quartz 古老的框架
    3、XXL-Job
    4、第三方云平台-比如说:阿里云-SchedulerX等等
    选择一个:Spring Task
    2个注解+
    包:task任务、job

    使用步骤:
    1、开关类,使用注解
    @EnableScheduling // 开启任务调度
    在这里插入图片描述
    2、定义定时任务

    @Scheduled(cron = "0/3 * * * ?")
    
    
    • 1
    • 2

    CORN表达式:
    秒 分 时 日 月 星期几 年 其中,只有年可以省略
    定时任务,需要重复执行的方法

    CORN表达式:
    特殊字符串,主要用来描述时间的,用于任务调度等
    https://cron.qqe2.com/

    / 间隔
    - 是连续
    , 枚举值
    L 最后,星期、日中用
    W 有效工作日
    LW 某个月最后一个工作日
    # 用于确定每个月第几个星期几,母亲节或父亲节
    4#2 某个月的第二个星期三  4代表星期三,中文的时候有可能不影响
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    在这里插入图片描述

    在这里插入图片描述

    项目名:SpringTask01
    Spring Web、Lombok

    RabbitMq实现延迟:

    死信+延迟消息处理

    死信:RabbitMQ的队列中的消息,满足以下条件任意其一就会成为死信消息:
    1.消息被拒绝
    2.消息过期
    3.队列满了
    死信交换器:专门用来转发队列中的死信消息,将死信消息转发到指定的队列中

    十分钟未支付,取消订单?
    十分钟之后,消息会过期,过期后,通过死信交换器转发队列中的死信消息,将死信消息转发到指定的队列中,由消费者去处理。

    我们可以通过死信+死信交换器实现延迟消息处理
    RabbitMQ实现延迟消息处理有2种方式:
    1、死信+死信交换器 代码实现(可控,更方便一些)
    2、延迟消息插件

    1、死信+死信交换器 代码实现(可控,更方便一些)
    发送消息,到队列1,(产生延迟队列,产生死信)
    在这里插入图片描述
    一个消息,过一段时间,实现消费。

    核心:
    1.队列 是2个队列,第1个队列: 目的 产生死信(1.设置有效期2.不设置消费者),借助死信交换器,把产生的死信发到指定的队列中
    第二个对了:目的 消费死信,这里获取的信息,时间就是延迟的,延迟的就是上面的有效期
    2.1个交换器
    死信交换器,整个系统一般就一个,可以用来转发各个功能产生的死信,是Direct类型的交换,通过RK进行消息匹配到对应的队列中。
    RabbitMQ的消息的有效期有2种设置方式:
    1.设置队列上的有效期,整个队列中所有消息都使用
    2.可以在每个消息上设置有效期,这种适用于有多个不同有效期的消息

    在这里插入图片描述
    如果队列和消息都有有效期,谁短听谁的。

    代码:
    RabbitMQ02
    实现:
    1.pom

    
            
                org.springframework.boot
                spring-boot-starter-web
            
    
            
                org.projectlombok
                lombok
                true
            
            
                org.springframework.boot
                spring-boot-starter-test
                test
            
            
            
                org.springframework.boot
                spring-boot-starter-amqp
            
        
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    2.代码:
    config-RabbitMQConfig

    package com.yd.rabbitmq02.config;
    
    import org.springframework.amqp.core.*;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.HashMap;
    
    /**
     * @author MaoXiqi
     * @organization: Lucky
     * @create 2023-10-16 11:35:54
     * @description RabbitMQConfigApi
     */
    @Configuration
    public class RabbitMQConfig {
        // 1.创建2个队列
        @Bean
        public Queue createQ1() {
            // 1.设置队列 内部消息有效期 设置死信交换器 设置RK
            HashMap<String, Object> params = new HashMap<>();
            // 设置队列中每个消息的有效期 单位 毫秒
            params.put("x-message-ttl", 3000);
            // 设置对应的死信交换器
            params.put("x-dead-letter-exchange", "dead-ex-yd");
            // 设置交换器匹配的路由名称
            params.put("x-dead-letter-routing-key", "test");
            return QueueBuilder.durable("dl-q01").withArguments(params).build();
        }
        @Bean
        public Queue createQ2() {
            return new Queue("dl-q02");
        }
        // 2.创建1个交换器(1.fanout 2,direct 3.topic 4.header)-死信交换器direct类型
        @Bean
        public DirectExchange createDe() {
            return new DirectExchange("dead-ex-yd");
        }
        // 3.创建1个绑定
        @Bean
        public Binding createBd1(DirectExchange de) {
            return BindingBuilder.bind(createQ2()).to(de).with("test");
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    在这里插入图片描述

    2.1.创建两个队列
    1、设置队列,内部消息有效期 设置死信交换器 设置RK 注意:参数名是固定的,值根据业务需求去改,值 单位是毫秒

    2.2创建1个交换器(1.fanout 2.direct 3.tipic
    4.header)-死信交换器direct类型

    2.3.创建一个绑定

    controller-DeadController

        @GetMapping("send")
        public String sendDead(String msg) {
            System.out.println("发送消息," + msg + ",发送时间:" + System.currentTimeMillis());
            template.convertAndSend("", "dl-q01", msg);
            return "ok";
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    发送
    在这里插入图片描述
    监听消息-主要是为了消费:
    listener-DeadListener

        @RabbitListener(queues = "dl-q02")
        public void handler(String m) {
            System.out.println("延迟消息,"+m+",接受时间:" +System.currentTimeMillis());
        }
    
    • 1
    • 2
    • 3
    • 4

    在这里插入图片描述
    yml:

    spring:
      rabbitmq:
        host: 121.36.5.100
        port: 5672
        username: guest
        password: guest
    server:
      port: 8082
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    测试:
    在这里插入图片描述

    RabbitMQ事务:

    数据库中事务:保证数据一致性,特别是多个操作要么都成功,要么都失败
    RabbitMQ事务:一次性发多条消息,需要开启事务,
    RabbitMQ也有自己的事务,如果本次在这里插入图片描述

    在这里插入图片描述

    使用步骤:

    源码:RabbitMQ02
    1.创建配置类
    2.使用基于事务发送
    3.监听消息

    1.创建配置类
    config-RabbitMQTranConfig
    1.准备一个队列
    2.创建事务管理器

    // 创建事务管理器
        @Bean
        public RabbitTransactionManager createTran(ConnectionFactory factory) {
            return new RabbitTransactionManager(factory);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    在这里插入图片描述

    2.controller-TranController
    开启事务
    发送消息

        // 事务
       @Transactional // 需要开启SpringBoot的事务机制
       @GetMapping("sendmsg")
       public String sendMsg(String msg, int count) {
           // 开启 RabbitMQ的通道的事务
           template.setChannelTransacted(true);
           // 发送消息
           for (int i = 0; i < count; i++) {
               template.convertAndSend("", "yd-tran-q01", msg + "--" + count);
               // 出错,看看 事务是否生效
               if (i==2) {
                   System.out.println(1/0);
               }
           }
           return "ok";
       }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    在这里插入图片描述

    3.listener-DeadListener

        // 事务
        @RabbitListener(queues = "yd-tran-q01")
        public void handler2(String msg) {
            System.out.println("监听消息"+msg);
            // 处理业务逻辑 出错了
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    在这里插入图片描述

    在这里插入图片描述

    手动ACK

    RabbitMQ怎么防止消息丢失:
    1.发送端没有发送过去
    解决:
    1.用事务
    2.confirm消息确认机制 万能:转人工处理
    2.MQ服务器丢失,MQ服务器蹦了,
    解决:开启持久化
    3.消费端消息丢失:
    解决:自动应答,改成开始手动ACK

    消息确认机制:默认是自动确认

    消息的发送和接收是异步

    RabbitMQ如何防止消息丢失:
    1.
    代码:
    config-RabbitMqTranConfig

        //消费消息的⼿动应答
        @Bean
        public Queue createQ4() {
            return new Queue("yd-ack-q01");
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    controller-DeadController

        // 事务
        @Transactional // 需要开启SpringBoot的事务机制
        @GetMapping("sendmsg")
        public String sendMsg(String msg, int count) {
            // 开启 RabbitMQ的通道的事务
            template.setChannelTransacted(true);
            // 发送消息
            for (int i = 0; i < count; i++) {
                template.convertAndSend("", "yd-tran-q01", msg + "--" + count);
                // 出错,看看 事务是否生效
                if (i==2) {
                    System.out.println(1/0);
                }
            }
            return "ok";
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    listener-DeadListener
    一般设置个上限,比如最多三次

        @RabbitListener(queues = "yd-ack-q01")
        public void handler3(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
            //消费者获取消息,默认采用的自动应答,就是获取就应答,这样MQ服务器就删除消息
            //还可以手动应答:结果:1.成功(MQ删除)2.失败(MQ消息)
            System.out.println("收到ACK消息,监听消息:" + msg);
            //拒绝消息 参数说明:1.消息id 2.结果 true 成功 false 拒绝 3.是否把消息添加回队列中
            channel.basicNack(tag,false,true);
    
            //成功消息 参数说明:1.消息id 2.结果 true 成功 false 拒绝
            //channel.basicAck(tag,true);
            // 处理业务逻辑 出错了
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    在这里插入图片描述
    在这里插入图片描述

    消费消息的手动应答:
    RabbitMQ默认的消费者消息获取模式采用的是手动应答
    但是这种有缺陷,可能会出现,消息获取了但是业务出了问题,导致MQ也自动删除了消息,最终导致业务没有执行
    所以为了解决这种问题,可以开启手动应答模式,结合自己的业务执行情况,如果业务执行成功,那么就成功应答,如果失败了,就拒绝消息,同时把消息再加回队列,这样就可以再次消息再次处理(加个上限)
    在这里插入图片描述
    测试
    在这里插入图片描述

    RabbitMQ如何保证消息的幂等性:
    幂等性就是重复消费。
    解决:
    1.生成一个全局id,存入redis或者数据库,在消费者消费消息之前,查询一下该消息是否有小费过。
    2.
    在这里插入图片描述

    用户充值,重复消费,相当如充值了多次,是一定要杜绝的。

    在这里插入图片描述

    不要返回值的时候,可以用RabbitMQ替代OpenFegin,因为消费完就不回了。MQ默认是单向的
    短信发信可以用MQ,这个业务要做,做起来可能会很耗时,中间要经过运营商,过程不可控

    RabbitMQ应用场景
    消息通信,发送消息和接收消息,是异步
    1.实现服务通信
    用在微服务中,实现2个服务的通信,这种不带返回值的,只是为了执行另一个服务的方法执行
    2.解决耗时操作
    比如:邮件、短信、第三方接口等,比较耗时
    3.解耦
    4.提升性能
    5.重复代码封装
    6.削峰填谷(订单先下到redis中,再通过MQ和延迟队列,慢慢的从redis搬到mysql中)

    关键词:异步、解耦、延迟

  • 相关阅读:
    HTML5语义化标签解释说明
    聚L-精氨酸/纳米金/石墨烯/聚苯胺复合膜/铝粉/稀土粒子修饰多巴胺的制备
    来也科技飞扬季笔试 2023 届秋招专场 java
    深入浅出理解SVM支持向量机算法
    高精地图,养不起的 「 奢侈品 」
    Linux下的系统编程——信号(十一)
    Python安装selenium时报错:ERROR: No matching distribution found for selenium 附解决方法
    Android平台轻量级RTSP服务模块编码前后数据源对接探究
    现代C++学习指南-方向篇
    java基于Springboot+vue的在线听歌音乐网站与分享平台 elementui
  • 原文地址:https://blog.csdn.net/Sky_MaoXiaoqi/article/details/133852487