• 【分布式】分布式中间件RabbitMQ、 cache注解


    分布式


    分布式 消息中间件 — RabittMQ


    前面Cfeng列举了抢红包的秒杀式系统,可以看到要想抗住瞬时高并发,必须借助缓存的help,因为数据库是扛不住秒级千万级请求的,只是这里的秒杀不是先查询再进Cache,而是先进Cache后才利用相关的DetailService进入数据库 【使用的#@EnableAsync直接启用多线程异步存入数据库】

    流量带来的除了高并发(数据库压力),还有就是接口阻塞的高延迟

    在介绍RabbitMQ之前,再大概叙述一下SpringBoot中使用缓存的其他用法

    SpringBoot 缓存

    首先,需要明确,处理远程的缓存Cache的Redis,还有本地缓存Caffeine,提升系统插叙性能,不经常变动的信息最适合放在缓存中【redis、本地缓存】,Spring提供了完整的Cache解决方案,直接使用注解可以方便实现数据和方法结果的缓存

    @EnableCaching 启用缓存

    开启缓存功能,一般在配置类中加入该注解,其余的比如@EnableSecurity,@EnableAsync,使用该注解之后,其他注解才生效,是通过aop拦截加入Cache功能

    @Cacheable 赋予缓存 【一次】

    该注解可以放在缓存上,也可以放在类上【表明所有方法支持环迅】,Spring会在其调用之后将结果缓存起来,下次利用同样参数访问,不再执行直接返回结果,键值对 【key支持的策略可以自定义】

    • value: 或者cacheNames 都可以指定Cache的名词,一个Cache可以看作Redis的一个库,指定在哪个库
    • key: 自定义方法返回值在Cache存储的Key,默认策略SimpleKeyGenerator,可以使用root对象生成key
    • condition : 缓存使用条件, true时走缓存,false直接执行方法,不缓存结果
    • unless: 结果缓存与否,调用方法后才执行,condition为空或者true,才会再次判断unless,unless为false,则结果入Cache

    @CachePut 结果放Cache

    @CachePut可以标注在类或者方法上,被标注的方法每次都会被调用, 执行完毕后,结果丢入缓存,但是抛出的时候,condition为false,unless为true不会进入

    @CacheEvict 缓存清理

    @CacheEvict可以标注在类和方法上,标注在类上,相当于所有方法加上该注解,当标注的方法被调用的时候,会清除指定的缓存, 默认清除CacheNames指定的缓存中key参数指定的信息,如果allEntries为true,清除cacheNames库中所有的信息

    @Caching 缓存注解组【复合注解 上all】

    当want在一个方法上同时使用@Cacheable、@CachePut和@CacheEvict时,可以直接使用@Caching这个复合注解实现

    @CacheConfig 提取公共配置

    这个注解标注在类上,将其他的几个注解(@Cacheable、@CachePut、@CacheEvic)公共参数提取出来放在@CacheConfig中

    比如cacheNames、cacheManager、cacheResolver等参数如果都一样,就可以使用@CacheConfig指定,当然还是可以在注解中进行覆盖的

    缓存功能就是依靠aop实现,通过Cache拦截器的invoke拦截所有的待缓存的方法执行; 本地缓存的弱点就是分布式项目无法共享【同步锁也是只能单机】,可以搭建Redis集群实现Cache共享

    Redis作为缓存 cacheNames --> Hash

    当引入Redis后,在配置Cahce时,同样还是@EnableCaching,只是配置CacheManager,不再使用ConcurrentHashMapCacheManager等,而是使用Redis的相关的CacheManager,比如RedissionSpringCacheManager

    选择数据库后,指定缓存的CacheNames ----- 对应的就是Redis一个库中的Hash存储,一个Cache对应库中的一个Hash,上面的那些注解缓存的方法返回值都是放在库中Hash对应的Map中

    RabbitMQ

    MVVM架构最后的Middleware起重要作用,Redis作为高并发请求解决方法,能够提高查询的性能,降低频率,减轻压力

    但随着流量的增长,带来的不只是高并发下数据库的压力,还有就是传统应用系统接口和服务处理模块都是 采用高耦合和同步的方式,导致接口由于线程阻塞延长了整体的响应时间 — 高延迟

    RabbitMQ时目前应用最广泛的分布式消息中间件,在微服务中发挥重要作用,可以通过异步通信的方式降低应用系统接口层面的整体响应时间; 除此之外,RabbitMQ适用于多种场景:

    • 业务服务模块解耦
    • 异步通信
    • 消息分发
    • 高并发限流
    • 超时业务和数据延迟处理

    RabbitMQ intro

    RabbitMQ作为分布式消息中间件,实现了高级消息队列协议 AMQP,(advanced message queuing protocol) ,执行单节点或者多节点集群的部署,满足目前高并发、大规模和高可用的要求

    https://www.rabbitmq.com 可以看到官方的介绍,开源的消息代理中间件,主要就是存储分发消息【之前Cfeng的STOMP协议就是其中一种】,异步通信,解耦业务模块; 其服务端采用Erlang开发,支持多种编程语言调用其API接口, 可以通过其后端管理控制台 实现RabbitMQ的队列、交换机、路由、消息和服务节点的管理,可以通过客户端管理响应的用户 — 分配操作权限和数据管理权限

    典型应用场景

    RabbitMQ在实际生产环境中具有很广泛的应用

    异步通信和服务解耦

    在用户注册 的实际场景中,传统的企业级应用处理用户注册的流程:

     | ->   输入用户名、邮箱、密码等信息 --> 校验信息 ->  写入数据库【用户表】 ----|
    用户            
    |<-- 提示用户注册成功 <-- 发送短信确认 <--短信验证 --------------------------|
    
    • 1
    • 2
    • 3

    用户输入信息后,点击注册之后就提交后台进行处理, 后台会对信息进行校验,比如验证码,校验成功后写入数据库表,除非注册时直接使用手机号,那么后面还需要验证手机号的合法性,【可以调用 相关的第三方服务发送验证码】 , 通过之后才会提示注册成功 (cfeng目前还没有验证手机 ----> 所以注册流程很快)

    从流程中可以发现: 用户点击注册按钮之后 ,需要经过漫长的等待时间, 写入数据库 + 邮箱验证 + 短信确认 所有的时间之和,如果邮箱和短信出现问题,那么流程终止, 所以这里可以优化一下,异步方式 ---- ( 就像之前Cfeng的抢红包系统 写入数据库的操作异步方式, 如果按照传统的同步方式,那么要写入数据库后才返回给用户,但是这里实际就可以异步操作的,这里的异步是多线程,和异步通信不同)

    核心的业务逻辑在于“判断用户注册信息的合法并将信息写入数据库”,而发送邮件和短信验证不属于用户注册的核心流程, 所以可以将这个服务解耦,采用RabbitMQ进行异步通信

     | ->   输入用户名、邮箱、密码等信息 --> 校验信息 ->   ----|  ----异步-->   发送邮件给用户
    用户            
    |<-- 提示用户注册成功 <---写入数据库【用户表】-------------|  ----异步-->   发送短信进行确认
    
    • 1
    • 2
    • 3

    就是将同步的线路 一条线走到底 的业务服务进行解耦 【但是这里不是采用多线程@EnableAsync】,而是采用RabbitMQ进行异步通信,系统的响应时间就 只是有写入数据库 的时间,整体响应时间降低了,实现了低延迟

    接口限流和消息分发

    在 商城用户抢购商品 场景,商城会开启抢购秒杀活动

    用户会守株待兔,商品数量有限,同时点击,巨大的流量【Cfeng介绍的抢红包系统也是同样的瞬时高并发】

    在这里插入图片描述

    按照这里的流程来看,和之前Cfeng介绍的红包秒杀系统完全是类似的, 首先就是开始时刻蜂拥的 巨大的请求流量【Cfeng压测的1000,实际可能上亿】,就算是缓存可能也hold不了

    传统方式就是巨大的流量直接到达后端,获取到库存之后,判断库存(total),当前用户抢到之后,异步方式告知用户抢到商品,同时需要异步更新库存,同时需要将当前的记录写入数据表

    仔细分析 —> 和Cfeng的红包秒杀相比; 这里需要先查询数据库得出库存,同时需要减1更新【这里如果没有🔒会出现问题】,数据不一致,等待时间长,系统挂掉, 这种处理方式只能适用于请求量很少的情况,高并发则不行 【使用Cache优化、RabbitMQ引用】

    在这里插入图片描述

    使用RabbitMQ则可以在前后端交互的时候,由RabbitMQ顶住所有的巨大的流量,转移流量,放入排队入队列,异步限流,同时之前的异步操纵由RabbitMQ接管,异步分发

    • 接口限流: 前端产生高并发请求,不会直接到达后端,而是像高峰期地铁限流操作一样,按照到达顺序排队【加入RabbitMQ的队列】,接口限流
    • 消息异步分发: 商品库存充足时,当前用户可以抢到商品,之后异步通信发短信等告知用户抢购成功,告知用户尽快付款,实现异步分发
    业务延迟处理

    在 “ 春运12306抢票” 场景, 就需要业务延迟处理

    还是上面的商品业务,用户点击之后,还是需要进行订单确认付款的,如果24小时未支付则取消订单,当使用12306抢到火车票之后,12306官方会提醒用户在30分钟内付款,正常情况下用户会立刻付款,输入支付密码,官方通过短信告知付款成功

    但是有点用户过半小时还是没有支付车票,导致系统自动取消该订单,延迟处理业务在付款场景十分常见,传统应用,就是采用定时器去定时获取没有付款的订单,判断时间是否超过30分钟,【类比验证码的比较,使用Time比较即可】

    在这里插入图片描述

    可以看到这里就是商品会设置一个字段记录付款状态,1代表已付款,0代表未付款,可以开启一个新的线程异步扫描购票订单数据库表(订单),找出其中状态为0的订单,根据当前时间和创建时间推算,如果超过,则直接废弃,将商品返回供其他用户调用

    可以看到这里的关键 ---- 定时器10s扫描订单表,当高并发、大数据量场景,定时器需要频繁扫描数据库表,数据量非常大,大批量的用户不付款,那么数据量一直增加,带来巨大的压力,压垮数据库 【早期的抢票软件春运高峰期进程网站崩溃,一直不响应,就是因为数据量过大,导致内存、CPU、网络和数据库服务等负载过高引起】

    在这里插入图片描述

    RabbitMQ引入,优化了业务延迟处理,不需要再使用定时器扫描数据库表,依靠RabbitMQ的延迟队列即可

    延迟队列: 可以延迟一定时间再处理的业务逻辑; 付款的延迟处理都是可以蚕蛹RabbitMQ来处理优化的

    RabbitMQ 后端控制台

    为在项目中使用RabbitMQ,需要在本地安装RabbitMQ

    emmm… 按照常规途径下载极其缓慢,所以Cfeng这里就直接在服务器上面时候使用docker拉取

    docker search rabbitmq
    
    docker pull rabbitmq
    
    docker run -d --hostname hadoopCfeng -p 5672:5672 -p 15672:15672 f2f051ea4689  
    
    ##进入容器内部下载插件
    docker exec -it 8935172ccc0c bash
    
    rabbitmq-plugins enable rabbitmq_management
    
    exit退出容器
    
    
    
    ### 解决Management API returned status code 500 
    docker exec -it 68c2bc39d55f bash
    
    cd /etc/rabbitmq/conf.d/
    
    echo management_agent.disable_metrics_collector = false > management_agent.disable_metrics_collector.conf
    
    exit 退出容器
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    之后就可以在本机访问rabbitMQ的后端控制台了 15672即可访问,同时这里也可以开放15672端口供远程访问

    初始的默认的登录账户密码都是guest

    在这里插入图片描述

    可以在管理位置addUser即可

    • Windows版本RabbitMQ下载

    直接安装应用相比docker拉取实在是慢很多,Cfeng很久都没有直接下载安装的经历了,都是直接网盘下载,速度稍微快点【主要是官网下一半会断掉】

    Erlang下载安装:

    Index - Erlang/OTP 找到安装包下载即可,安装一路Next,为了方便使用其bin的命令,所以配置环境变量

    这里Erlang是要给rabbitmq提供支持的,就像java环境一样,rabbitmq会自动从电脑上查找ERLANG_HOME环境变量的位置, 所以这里先配置home

    新建环境变量
    %ERLANG_HOME%    安装路径
    
    在path后直接跟上   : %ERLANG_HOME%/bin  
    
    • 1
    • 2
    • 3
    • 4

    rabbitmq下载安装

    https://www.rabbitmq.com/download.html 下载安装包,一路next安装,也可以配置其环境变量方便使用rabbitmq命令

    和上面的配置方式一样,当然you 也可以先配置一个HOME的变量【bin上一级目录路径】,在%XX_HOME%/bin即可 rabbitMQ是sbin

    进入sbin目录下载插件:

    rabbitmq-plugins enable rabbitmq_management

    下载之后 : rabbitmq-server start 就可以启动rabiitMQ, 登录localhost:15672即可访问,和linux上是类似的

    可以借助多个rabbitMQ搭建集群,这里Cfeng在虚拟机上和服务器上分别都下载了RabbitMQ

    connections — 链接管理

    channels ---- 通道管理

    exchanges ---- 交换机管理

    queues ----- 队列管理

    admin — 用户管理 : 可以在这里CRUD用户

    消息队列中可以看到队列的详情: 名称、绑定的路由和交换机、持久化策略、消费者实例,还可以清空队列、删除队列等操作

    Spring事件驱动模型 — 传统解决方案

    在没有rabbitMQ之前,构建消息模型、实现消息异步分发和业务模块解耦的解决方案 — Spring事件驱动模型 ApplicationEvent和 ApplicationListener

    事件驱动模型 — 事件驱动范式实现业务模块之间的交互 — > 交互分为同步和异步【事件也是一种消息,比如websocket的连接和断开】,传统模型就是依靠事件驱动实现异步通信的,和RabbitMQ非常相似【发送接收消息一模一样】

    ApplicationPulisher  ------>   ApplicationEvent ---->   ApplicationListener
    发送器 生产者                       事件-消息				 监听者 --- 消费者
    
    • 1
    • 2

    可以参见Cfeng之前的博客了解事件驱动, 模型中3者是绑定在一起的,而rabbitMQ是 生产者 + 消息 + 交换机 + 路由对应队列 + 消费者

    Spring事件驱动 demo ----- 登录成功事件监听

    这里首先先以传统的Spring事件驱动为基础,演示👇场景

    将用户登录成功的消息封装成实体对象,由生产者异步发送给消费者,消费者监听到消息之后进行相关的处理,执行其他业务逻辑 将用户登录成功的消息封装成实体对象,由生产者异步发送给消费者,消费者监听到消息之后进行相关的处理,执行其他业务逻辑 将用户登录成功的消息封装成实体对象,由生产者异步发送给消费者,消费者监听到消息之后进行相关的处理,执行其他业务逻辑

    首先封装登录成功的事件实体LoginEvent, 作为事件消息需要 继承ApplicationEvent

     * 演示传统的Spring事件驱动
     * 封装登录成功的消息实体LoginEvent: 包括登录用户名,时间、IP
     */
    
    @Getter
    @Setter
    @ToString
    public class LoginEvent extends ApplicationEvent implements Serializable {
    
        private static final long serialVersionUID = 2620840985270980987L;
    
        private String userName;
    
        private String logTime;
    
        private String ip; //登陆IP
    
        //需要重写构造方法
        public LoginEvent(Object source) {
            super(source);
        }
    
        public LoginEvent(Object source, String userName, String logTime, String ip) {
            super(source);
            this.userName = userName;
            this.logTime = logTime;
            this.ip = ip;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    消息的通信可以类比之前的STOMP聊天服务; 接下来就是监听消息的监听器消费者Consumer, 需要实现ApplicationListener 并且绑定事件源LoginEvent

     * 事件驱动 -- 消费者
     */
    
    @Component
    @EnableAsync //异步多线程
    @Slf4j
    public class Consumer implements ApplicationListener<LoginEvent> {
    
    
        //监听消费消息
        @Override
        @Async //开启新的线程执行
        public void onApplicationEvent(LoginEvent event) {
            log.info("Spring事件驱动---- 消费者接收消息: {}",event);
            //后续的相关逻辑,比如写入数据库
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    事件【消息】还需要发送者,就像简单的聊天服务,需要发送者将消息发送给消费者,生产者Publisher需要通过ApplicationEventPublisher组件实现消息的发送

     * 事件发送者的生产者
     */
    
    @Component
    @Slf4j
    @RequiredArgsConstructor
    public class Producer {
    
        //消息发送组件Publisher,自动配置的
        private ApplicationEventPublisher publisher;
    
        //发送消息
        public void sendMessage(String userName, String ip) throws Exception {
            //消息实体
            LoginEvent event = new LoginEvent(this, userName,new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()),ip);
            //发送消息,触发事件
            publisher.publishEvent(event);
            log.info("spring消息驱动 生产者发送消息 : {}",event);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    消息生产者、消息、消费者都创建OK了,接下来就是测试: 调用生产者发送消息

     * 测试Spring的事件驱动模型, 事件驱动模型主要就是利用ApplicationEventPublisher发送想要发送的消息,封装消息实体,消费者监听这个事件就可以接收事件【消息】进行处理
     */
    
    @SpringBootTest
    public class SpringEventTest {
    
        @Resource
        private Producer producer;
    
        @Test
        public void test() throws Exception {
            //发送消息
            producer.sendMessage("cfeng","127.0.0.1");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    不要设置同名,@Resource先按照名称注入

    2022-09-15 14:54:03.336  INFO 2112 --- [         task-1] c.s.springEventDriven.consumer.Consumer  : Spring事件驱动---- 消费者接收消息: LoginEvent(userName=cfeng, logTime=2022-09-15 14:54:03, ip=127.0.0.1)
    2022-09-15 14:54:03.335  INFO 2112 --- [           main] c.s.springEventDriven.producer.Producer  : spring消息驱动 生产者发送消息 : LoginEvent(userName=cfeng, logTime=2022-09-15 14:54:03, ip=127.0.0.1)
    
    • 1
    • 2

    传统模型的消息交互就是生产者利用ApplicationEventPublisher 触发事件(消息), 消费者就可以监听消息,获取消息处理, 这里的监听操作是设置异步方式,也就是SE中Cfeng的小demo的多线程监听,Spring简化了开启线程的操作,注解即可搞定

    接下来将使用具体的代码来实现SpringBoot操作RabbitMQ,完成各项功能🌳

  • 相关阅读:
    spring boot 面试题
    2024最新群智能优化算法:人工原生动物优化器(Artificial Protozoa Optimizer ,APO))求解23个函数,MATLAB代码
    公众号选题方向有哪些?
    HBase(超级无敌详细PROMAX讲解版)
    【数据结构】图的实现
    python函数式编程
    第一百六十回 SliverPadding组件
    【PAT甲级 - C++题解】1007 Maximum Subsequence Sum
    java健身房管理系统设计计算机毕业设计MyBatis+系统+LW文档+源码+调试部署
    linux 安装mysql8.0 超详细教程(实战多次)
  • 原文地址:https://blog.csdn.net/a23452/article/details/126871976