• RabbitMQ入门教程(安装,管理插件,Publisher/Consumer/交换机/路由/队列/绑定关系,及如何保证100%投递等)


    RabbitMQ入门教程(安装,管理插件,Publisher/Consumer/交换机/路由/队列/绑定关系,及如何保证100%投递等)

    1. RabbitMQ简介及AMQP协议

    • 开源的消息代理和队列服务器。基于AMQP(Advanced Message Queuing Protocol 高级消息队列协议)。
    • 底层基于Erlang语言编写;
    • 开源,性能优秀,稳定性保障;
    • 与SpringAMQP完美的整合,API丰富
    • 集群模式丰富,表达式配置,HA模式(负载均衡),镜像队列模型
    • 保证数据不丢失的前提做到高可靠性、可用性

    Publisher
    Server
    Virtual Host
    Exchange
    Queue
    Consumer

    2. RabbitMQ安装及使用

    在这里插入图片描述

    启动 
    rabbitmq-server start &
    rabbitmq 默认端口号5672,查看是否启动
    lsof -i:5672
    安装插件rabbitmq-management后才能登录
    查看插件列表
    rabbitmq-plugins list
    启动插件
    rabbitmq-plugins enable rabbitmq-management
    rabbit-management管控台的端口号15672
    浏览器访问:http://localhost:15672/
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    3. RabbitMQ核心概念

    Producer
    Consumer
    Exchange

    AMQP核心概念

    • Server:又称Broker,接受客户端的了解,实现AMQP实体服务;
    • Connection:连接,应用程序与Broker的网络连接;类似pg的jdbcTemplate
    • Channel:网络信道,几乎所有的操作都在Channel中进行。Channel是进行消息读写的通道。客户端可建立多个Channel,每个Channel代表一个会话任务。类似pg的Session。
    • Message:消息,服务器和应用程序之间传送的数据,由Properties和Body组成。Properties可以对消息进行修饰,比如消息的优先级,延迟等高等特性;Body则就是消息体内容。
    • Virtual host:虚拟地址,用于进行逻辑分离,最上层的消息路由。 一个Virtual Host里面可以有若干个Exchange和Queue,同一个Virtual Host里面不能有相同名称的Exchange和Queue。每一个Virtual Host是独立的单元,用于隔离不同的项目应用;
    • Exchange:交换机,接受生产者的消息,根据路由键转发消息到绑定的队列;
    • Binding:Exchange和Queue之间的虚拟连接,bingding中可以包含routing key;
    • Routing key:一个路由规则,虚拟机可用它来确定如何路由一个特定消息;
    • Queue:也成为Message Queue,消息队列,保存消息并将它们转发给消费者;

    在这里插入图片描述

    4. 与SpringBoot2.x整合-急速入门

    1. 引入相关pom
    2. 配置application.yml

    3. 对象Message 实现接口 implements Serilizable
    4. Producer,RabbitTemplate自动装配@Autowired
    消费者自动监听 @RabbitHandler @RabbitListener
    5. 生产者发消息:converAndSend(exchange, routeKey, MessageObject, CorrelationData); 交换机,路由key,消息体内容,消息唯一id

    消费者监听消息:
    @RabbitListener指定bindings,queue,exchange,routingKey. 这个注解强大的地方是当交换机,队列,绑定关系,路由键不存在时自动创建;

    	@RabbitListener(bindings= @QueueBinding(
    	value=@Queue(value="order-queue",durable="true"),
    	exchange= @Exchange(name="order-exchange",durable="true",type="topic",
    	key="order.*")
    	)
    	@RabbitHandler
    	public void onOrderMessage(@Payload Order order, @Headers Map<String,Object> headers, Channel channel){ #**消息体,headers,通道**
    		// 消费者操作
    		// ...
    		Long deliveryTag = (Long)headers.get(AmqpHdears.DELIVEY_TAG);
    		// 当配置为手动签收时,必须要手动ACK,否则管理控制台即使消费了还是显示队列有数据。
    		channel.basicAck(deliveryTag,false); // 手动确认提交
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    可以在管理界面:http://xx.xx.xx.xx:15672/ 创建交换机,队列,交换机和队列进行绑定,或者删除;

    路由key RoutingKey: order.*, order.#匹配差别;
    order.#: order.111 order.111.abcd 只能匹配前者;
    order.#: order.111 order.111.abcd 都能匹配;

    <!--rabbitmq依赖-->
    <dependency>
    	<groupId>org.springframework.boot</groupId>
    	<artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    # springboot整合rabbitmq基本配置
    spring.rabbitmq.addresses=xx.xx.xx.xx:5672
    # address相当于host,port
    # spring.rabbitmq.host=xx.xx.xx.xx
    # spring.rabbitmq.port=5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    spring.rabbitmq.virtual-host=/
    spring.rabbitmq.connection-timeout=15000
    
    # 保证100%投递模式
    # 发送消息后等待消息确认
    spring.rabbitmq.publisher-confirms=true
    spring.rabbitmq.publisher-returns=true
    spring.rabbitmq.template.mandatory=true
    
    # springboot整合rabbitmq消费端配置
    # 并发数
    spring.rabbitmq.listener.simple.concurrency=5
    # 自动签收 NONE AUTO manual
    spring.rabbitmq.listener.simple.acknowledge-model=AUTO
    spring.rabbitmq.listener.simple.max-concurrency=15
    spring.rabbitmq.listener.simple.idle-event-interval=10000
    spring.rabbitmq.listener.simple.retry.enabled=true
    # 高峰期海量数据压过来,则进行限流,表示最大100条
    #spring.rabbitmq.listener.simple.prefetch=100
    # 不建议事务,性能太差了
    #spring.rabbitmq.listener.simple.transaction-size=1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    5. 保证100%的消息可靠性投递方案落地实现

    蓝色框为生产者
    红色框为消费者
    为保证100%生产者消息投递成功,当Sender在收到需要发送的消息时

    1. 先存储到业务数据库Biz DB;
    2. 存储到消息数据库 MSG DB,修改状态为0;
    3. 发送消息到Broker
    4. 等待Broker返回确认收到的状态
    5. 更新MSG DB中消息状态为1;
    6. 对于网络闪断/超时等长时间未收到状态返回,则更新状态为失败-1;多次时可更新状态为2进行人工确认等。
    7. 定时任务从MSG DB查询对失败的消息-1进行重新发送;

    这种情况只能保证100%发送到队列,但可能会重复推送,需要消费者业务端自动去确认收到重复消息的处理。
    在这里插入图片描述

    3.2 pg 预先把消息存进message记录表

    记录消息体,重试次数,投递状态,下一次投递时间或者超时时间,下次重试投递时间;
    在这里插入图片描述在这里插入图片描述

    3.3 定时任务补救重发

    在这里插入图片描述在这里插入图片描述

    3.4 RabbitMQOrderSender 发送消息确认

    在这里插入图片描述在这里插入图片描述在这里插入图片描述在这里插入图片描述
    在这里插入图片描述

    3.5 使用3部曲

    生产者:

    1. pom
    2. yml配置(host port username password virtualhost
      生产者发送时指定交换机,路由键发送;
      消费者可以直接使用注解指定交换机,路由键,队列不存在会自动创建;)
    3. @RabbitTemplate
    4. 当在docker部署时,存在超时访问不到,可以在mq队列把docker机器的网段配置上就ok

    参考

  • 相关阅读:
    Win10怎么重启资源管理器?重启资源管理器快捷键是什么
    西班牙学者研究利用超材料进行光量子计算
    一个新工具 nolyfill
    神经网络的研究与应用论文,有关神经网络的论文
    issac gym安装与运行 (一)
    如何部署会议室多屏同步显示系统
    React Native 开发常见问题及注意事项
    【MATLAB源码-第53期】m代码基于粒子群算法(PSO)的三维路径规划,显示最优路径和适应度曲线。
    王学岗————直播推流(软便)03x264集成与camera推流
    proxmox PVE 安装 黑群晖
  • 原文地址:https://blog.csdn.net/qq_40985985/article/details/128013229