目录
4.1 创建队列时为队列设置过期时间(每条信息都是相同时间)
1.创建springboot项目
- //rabbitmq依赖
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-starter-amqpartifactId>
- dependency>
2.创建两个子项目producer生产者服务 consumer消费者服务
3.生产者服务
①配置文件application.properties
- server.port=7000
-
- #rabbitMQ配置 rabbitMQ所在ip
- spring.rabbitmq.host=192.168.1.88
- # 都有默认值
- spring.rabbitmq.port=5672
- spring.rabbitmq.virtual-host=/
- spring.rabbitmq.username=guest
- spring.rabbitmq.password=guest
②使用工具类RabbitTemplate发送消息到队列
- package com.wzh.producer.controller;
-
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.RestController;
-
- import java.util.HashMap;
- import java.util.Map;
-
-
- @RestController
- public class ProducerController {
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- @GetMapping("saveOrder")
- public void saveOrder(){
- System.out.println("保存订单到数据库");
- Map
map = new HashMap<>(); - map.put("orderId","110");
- map.put("price",2500);
- map.put("num",3);
- map.put("phone","15700085997");
- // 交换机 路由key
- rabbitTemplate.convertAndSend("topic_exchange","a.orange.b",map);
- }
-
- }
③创建主启动类
- package com.wzh.producer;
-
- import org.springframework.boot.SpringApplication;
- import org.springframework.boot.autoconfigure.SpringBootApplication;
-
-
- @SpringBootApplication
- public class ProducerApp {
- public static void main(String[] args) {
- SpringApplication.run(ProducerApp.class,args);
- }
- }
④运行并访问7000端口
然后rabbitmq客户端就可以查看到,这里都是提前创建好的队列,交换机等。
4.消费者服务
①配置文件application.properties
- server.port=7001
-
- #rabbitMQ配置
- spring.rabbitmq.host=192.168.1.88
- spring.rabbitmq.port=5672
- spring.rabbitmq.virtual-host=/
- spring.rabbitmq.username=guest
- spring.rabbitmq.password=guest
②使用工具类RabbitTemplate发送消息到队列
- package com.wzh.rabbitmq;
-
- import org.springframework.amqp.rabbit.annotation.RabbitHandler;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
-
- import java.util.Map;
-
-
- @Component
- public class MyListener {
- //队列名
- @RabbitListener(queues = "topic_queue01")
- public void test(Map
msg) { - System.out.println(msg);
- //进行相关的业务处理
- }
③创建主启动类
- package com.wzh.consumer;
-
- import org.springframework.boot.SpringApplication;
- import org.springframework.boot.autoconfigure.SpringBootApplication;
-
-
- @SpringBootApplication
- public class ConsumerApp {
- public static void main(String[] args) {
- SpringApplication.run(ConsumerApp.class,args);
- }
- }
④运行主启动类
首先确定消息可能在哪些位置丢失---不同的位置可以有不同的解决方案。
confirm确认机制
1. 手动开启确认机制(producer服务的配置文件)
spring.rabbitmq.publisher-confirm-type=correlated
2. 为rabbitTemplate设置确认回调函数
package com.wzh; import com.wzh.producer.ProducerApp; import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import javax.annotation.Resource; /** * @ProjectName: springboot-rabbitmq * @Package: com.wzh * @ClassName: TestRabbitMQ * @Author: 王振华 * @Description: * @Date: 2022/9/20 19:56 * @Version: 1.0 */ @SpringBootTest(classes=ProducerApp.class) public class TestRabbitMQ { //springboot集成了rabbitMQ提供了一个工具类,该类封装了消息的发送 @Resource private RabbitTemplate rabbitTemplate; //测试确认机制 @Test public void testConfirm(){ //为rabbitTemplate设置确认回调函数 //ConfirmCallback函数式接口 匿名内部类 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { //不管是否到达交换机 都会触发该方法 @Override public void confirm(CorrelationData correlationData, boolean b, String s) { if(b==false){ //根据自己的业务完成相应的代码 System.out.println("消息发送失败---订单回滚"); } } }); //这里用的是路由模式 rabbitTemplate.convertAndSend("direct_exchange","info","hello springboot"); } }查看消费者
package com.wzh.consumer.rabbitmq; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.util.Map; /** * @ProjectName: springboot-rabbitmq * @Package: com.wzh.consumer.rabbitmq * @ClassName: Consumer * @Author: 王振华 * @Description: * @Date: 2022/9/20 15:27 * @Version: 1.0 */ @Component public class Consumer { @RabbitListener(queues = "router_queue01") public void queue01(String s){ System.out.println(s); } @RabbitListener(queues = "router_queue02") public void queue02(String s){ System.out.println(s); } }消费者的参数必须与生产者的参数类型一致
returning机制: 如果消息无法到达队列,则会触发returning机制。如果能从交换机到队列则不会触发returning机制。
默认rabbitMQ不开启该机制。
①开启returning机制(生产者配置文件)
spring.rabbitmq.publisher-returns=true
②为rabbitTemplate设置returning回调函数
- /**
- * 1.开启returning机制 spring.rabbitmq.publisher-returns=true
- * 2.为rabbitTemplate设置returning回调函数
- */
- @Test
- public void testReturning(){
- rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
- //该方法只有从交换机到队列失败时才会触发
- @Override
- public void returnedMessage(ReturnedMessage returned) {
- //根据自己的业务完成相应的代码
- System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~");
- }
- });
- //这里用的是路由模式
- rabbitTemplate.convertAndSend("direct_exchange","error","hello springboot2");
- }
③消费者监听结果
队列持久化---》创建的时候设置持久化
搭建rabbitmq集群--保证高可用
①修改为手动确认模式
改为手动确认(消费者服务配置文件)
spring.rabbitmq.listener.simple.acknowledge-mode=manual
②当业务处理完毕后在确认消息给队列让其删除该消息
- package com.wzh.consumer.rabbitmq;
-
- import com.rabbitmq.client.Channel;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
-
- import java.io.IOException;
- import java.util.Map;
-
- /**
- * @ProjectName: springboot-rabbitmq
- * @Package: com.wzh.consumer.rabbitmq
- * @ClassName: Consumer
- * @Author: 王振华
- * @Description:
- * @Date: 2022/9/20 15:27
- * @Version: 1.0
- */
- @Component
- public class Consumer {
- @RabbitListener(queues = "router_queue01")
- public void queue01(String s){
- System.out.println(s);
- }
-
- @RabbitListener(queues = "router_queue02")
- //Message message 封装了信息类, Channel channel 信道
- public void queue02(Message message, Channel channel){
- byte[] body = message.getBody();
- long deliveryTag = message.getMessageProperties().getDeliveryTag();
- System.out.println("接受到消息"+new String(body));
- try {
-
- System.out.println("处理业务代码");
- //这里会因为硬件问题 或其他业务代码问题 导致处理不完 出现异常了 走catch
- System.out.println("业务处理完毕");
- //设置手动确认---队列会把该信息移除。
- //long deliveryTag,消息的标记
- // boolean multiple: 是否把之前没有确认的消息以前确认掉。
- channel.basicAck(deliveryTag,true);
- }catch (Exception e){
- //1.让队列在给我发一次。
- //long deliveryTag, boolean multiple,
- // boolean requeue: 继续发给我 直接扔掉
- try {
- channel.basicNack(deliveryTag,true,true);
- } catch (IOException ioException) {
- ioException.printStackTrace();
- }
-
- }
-
-
- }
- }
-
如何保证消息的可靠性。
设置confirm和returning机制
设置队列和交互机的持久化
搭建rabbitMQ服务集群
消费者改为手动确认机制。
如果队列里有10w条数据,消费者不管能不能消化,会直接把10w条消息都接受,会导致消费者服务垮掉
设置消费者消费消息的条数
消费者端必须为手动确认模式。
① 修改每次拉取消息的条数
②测试
package com.wzh.consumer.rabbitmq; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.Map; /** * @ProjectName: springboot-rabbitmq * @Package: com.wzh.consumer.rabbitmq * @ClassName: Consumer * @Author: 王振华 * @Description: * @Date: 2022/9/20 15:27 * @Version: 1.0 */ @Component public class Consumer { @RabbitListener(queues = "test03") public void queue01(Message message, Channel channel) throws IOException { byte[] body = message.getBody(); System.out.println("消息的内容:"+new String(body)); //手动确认 channel.basicAck(message.getMessageProperties().getDeliveryTag(),true); } @RabbitListener(queues = "router_queue02") //Message message 封装了信息类, Channel channel 信道 public void queue02(Message message, Channel channel){ byte[] body = message.getBody(); long deliveryTag = message.getMessageProperties().getDeliveryTag(); System.out.println("接受到消息"+new String(body)); try { System.out.println("处理业务代码"); //这里会因为硬件问题 或其他业务代码问题 导致处理不完 出现异常了 走catch System.out.println("业务处理完毕"); //设置手动确认---队列会把该信息移除。 //long deliveryTag,消息的标记 // boolean multiple: 是否把之前没有确认的消息以前确认掉。 channel.basicAck(deliveryTag,true); }catch (Exception e){ //1.让队列在给我发一次。 //long deliveryTag, boolean multiple, // boolean requeue: 继续发给我 直接扔掉 try { channel.basicNack(deliveryTag,true,true); } catch (IOException ioException) { ioException.printStackTrace(); } } } }
TTL:time to live
可以为整个队列设置也可以单独为某条信息设置
创建队列
创建交换机
点击test01进行 交换机与队列绑定
测试
- //测试过期时间
- @Test
- public void test01(){
- for(int i=0;i<10;i++) {
- if(i<5){
- //前五个 同时创建 后五个 每个间隔两秒 创建 过期时间也不一样
- rabbitTemplate.convertAndSend("test01", "aaa", "hello springboot"+i);
- }else {
- try{
- Thread.sleep(3000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- rabbitTemplate.convertAndSend("test01", "aaa", "hello springboot" + i);
- }
- }
- }
当前消息队列为普通队列,没有在创建队列的时候设置过期时间
- @Test
- public void test02() {
- MessagePostProcessor messagePostProcessor=new MessagePostProcessor() {
- @Override
- public Message postProcessMessage(Message message) throws AmqpException {
- message.getMessageProperties().setExpiration("10000");
- return message;
- }
- };
- rabbitTemplate.convertAndSend("test01", "aaa", "Hello Springboot",messagePostProcessor);
- }
创建普通队列
创建死信队列
创建普通交换机
创建死信交换机
队列绑定交换机
死信交换机与死信队列绑定
测试
- @Test
- public void test02() {
- for (int i = 0; i < 10; i++)
- rabbitTemplate.convertAndSend("pt_exchange", "dead", "Hello Springboot-------"+i);
- }
- }
这里的判断订单状态 是因为 如果支付系统第29分分钟去支付,支付的比较慢,最后在第31分钟支付成功了。消息30分钟加入死信队列执行库存回滚,就会出错。
1. 如何防止消息被重复消费
2.如何保证消息的可靠性
3.rabbitMQ消息积压过多