• SpringBoot整合RabbitMQ学习笔记


    SpringBoot整合RabbitMQ学习笔记

    以下三种类型的消息,生产者和消费者需各自启动一个服务,模拟生产者服务发送消息,消费者服务监听消息,分布式开发。

    一 Fanout类型信息

    在这里插入图片描述
    在这里插入图片描述

    1. . RabbitMQ创建交换机和队列
      在RabbitMQ控制台,新建交换机hmall.fanout,新建两个队列,fanout.queue1和fanout.queue2,并将连个队列和交换机进行绑定即可。
      操作如下图所示:
      一下操作可以通过代码实现,具体参考配置类
      (1)创建队列
      在这里插入图片描述

    (2)创建交换机
    在这里插入图片描述
    (3)绑定
    在这里插入图片描述2. 代码实现
    (1)引入依赖

    <dependency>
    	<groupId>org.springframework.book</groupId>
    	<artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4

    (2)配置MQ配置信息

    spring:
    	rabbitmq:
    		host: 192.168.150.101 #主机ip
    		port: 5672 #端口
    		virtual-host: /hmall #虚拟主机
    		username: hmall #用户名
    		password: 123 #密码
    		exchange: hmall.fanout
    		producer:
    			queue1: fanout.queue1
    		
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    (3)声明队列和交换机配置

    @Component
    public class FanoutConfg{
    
    	@Value("${spring.rabbitmq.exchange}")
    	private String exchange
    	@Value("${spring.rabbitmq.producer.queue1}")
    	private String queueName1
    	// 声明fanout交换机
    	@Bean
    	public FanoutExchange fanoutExchange(){
    		return new FanoutExchange(exchanage);
    	}
    	// 声明队列
    	@Bean
    	public Queue fanoutQueue1(){
    		return new Queue(queueName1);
    	}
    	//绑定队列和交换机
    	@Bean
    	public Binding bindingQueue1(Queue fanoutQueue1,FanoutExchange fanoutExchange){
    		return BindingBuilder,build(fanoutQueue1).to(fanoutExchange);
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    (4)生产者

    @Component
    public class RabbitMqProduce {
    	@Autowired
    	private RabbitTemplate rabbitTemplete;
    	@value("${spring.rabbitmq.producer.queue})
    	private String queueName;
    	/**
    	* 入参说明:
    	* 第一个参数:queueName:队列名称
    	* 第二个参数:路由键,fanout类型不需要路由键
    	* 第三个参数:msg 消息题内容
    	*/
    	public void send(String msg){
    		
    		rabbitTemplete.covertAndSend(queueName,null,msg);
    	}
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    (4)消费者

    @Component
    public class RabbitMqListener {
    	
    	@RabbitListener(queues="${spring.rabbitmq.producer.queue}")
    	public void counsume(String msg){
    		System.out.pringln("消费者收到 fanout.queue队列发的消息",msg);
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    (5)测试类

    @SpringBootTest
    public class SpringBootTest{
    	@AUtowired
    	private RabbitMqProduce producer;
    	
    	@Test
    	public void testSendFanoutMsg(){
    		producer.send("fanout类型发送消息!!!");
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    二 direct类型发送消息在这里插入图片描述

    在这里插入图片描述

    1. 控制台操作
      (1)交换机和队列的创建参考fanout的操作
      (2)绑定:与fanout不同的是 给交换机绑定队列的同时需要指定路由键,如下图所示:
      在这里插入图片描述
    2. 代码实现
      (1)依赖引入参考fanout类型的消息
      (2)mq消息配置
    spring:
    	rabbitmq:
    		host: 192.168.150.101 #主机ip
    		port: 5672 #端口
    		virtual-host: /hmall #虚拟主机
    		username: hmall #用户名
    		password: 123 #密码
    		exchange: hmall.direct
    		producer:
    			queue1: direct.queue1
    			queue2: direct.queue2
    			routingKey1: red
    			routingKey2: red2
    		
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    (3)MQ配置类
    以下配置可以在消费者注解上实现

    @Component
    public class RabbitMqConfig {
    	@Autowired
    	private RabbitTemplate rabbitTemplete;
    
    	@value("${spring.rabbitmq.producer.exchange}")
    	private String exchange;
    	
    	@value("${spring.rabbitmq.producer.queue1}")
    	private String queueName1;
    	
    	@value("${spring.rabbitmq.producer.queue2}")
    	private String queueName2;
    	
    	@value("${spring.rabbitmq.producer.routingKey1}")
    	private String routingKey1;
    
    	@value("${spring.rabbitmq.producer.routingKey2}")
    	private String routingKey2;
    
    	// 创建交换机
    	@Bean("directExchange")
    	public Exchange getExchange(){
    		return ExchangeBuilder
    		.topicExchange(exchange) // 交换机类型,交换机名称
    		.durable(true) //ture为持久化,存到磁盘,false存到内存
    		.build();
    	}
    	// 创建队列
    	@Bean("directQueue1")
    	public Queue getDirectQueue1(){
    		retuen new Queue(queueName1);
    	}
    
    	// 交换机绑定队列
    	@bean
    	public Binging bindDirectQueue1(@Qualifier("directExchange") Exchange exchange,@Qualifier("directQueue1") Queue queue){
    		return BindingBuilder
    		.bind(queue)
    		.to(exchange)
    		.with(routingKey1)
    		.noargs();	
    	}
    
    
    	// 创建队列
    	@Bean("directQueue2")
    	public Queue getDirectQueue2(){
    		retuen new Queue(queueName2);
    	}
    
    	// 交换机绑定队列
    	@bean
    	public Binging bindDirectQueue2(@Qualifier("directExchange") Exchange exchange,@Qualifier("directQueue2") Queue queue){
    		return BindingBuilder
    		.bind(queue)
    		.to(exchange)
    		.with(routingKey2)
    		.noargs();	
    	}
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62

    (4)生产者发送消息

    @Component
    public class RabbitMqProduce {
    	@Autowired
    	private RabbitTemplate rabbitTemplete;
    	
    	@value("${spring.rabbitmq.producer.queue1})
    	private String queueName1;
    	@value("${spring.rabbitmq.producer.queue2})
    	private String queueName2;
    
    	@value("${spring.rabbitmq.producer.routingKey2})
    	private String routingKey1;
    
    	@value("${spring.rabbitmq.producer.routingKey1})
    	private String routingKey2;
    	/**
    	* 入参说明:
    	* 第一个参数:queueName:队列名称
    	* 第二个参数:路由键,fanout类型不需要路由键
    	* 第三个参数:msg 消息题内容
    	*/
    	public void sendQueue1(String msg){
    		
    		rabbitTemplete.covertAndSend(queueName1,routingKey1,msg);
    	}
    	public void sendQueue2(String msg){
    		
    		rabbitTemplete.covertAndSend(queueName2,routingKey2,msg);
    	}
    
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    (5)消费者监听消息
    第一种:已经编写了配置类

    @Component
    public class RabbitMqListener {
    	
    	@RabbitListener(queues="${spring.rabbitmq.producer.queue1}")
    	public void counsume(String msg){
    		System.out.pringln("消费者收到 direct.queue1队列发的消息",msg);
    	}
    	@RabbitListener(queues="${spring.rabbitmq.producer.queue2}")
    	public void counsume(String msg){
    		System.out.pringln("消费者收到 direct.queue2队列发的消息",msg);
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    第二种:在注解上配置交换机和队列以及路由键

    @Component
    public class RabbitMqListener {
    	
    	@RabbitListener(bindings = 
    	@QueueBinding(
    		value = Queue(name="${spring.rabbitmq.producer.queue1}",durable="true"),
    		exchange = @Exchange(name="${spring.rabbitmq.producer.exchange)",type=ExchangeType.DIRECT),
    		key = {"${spring.rabbitmq.producer.routingKey1}","${spring.rabbitmq.producer.routingKey2}"}	
    	))
    	public void counsume(String msg){
    		System.out.pringln("消费者收到 direct.queue1队列发的消息",msg);
    	}
    		@RabbitListener(bindings = 
    	@QueueBinding(
    		value = Queue(name="${spring.rabbitmq.producer.queue2}",durable="true"),
    		exchange = @Exchange(name="${spring.rabbitmq.producer.exchange)",type=ExchangeType.DIRECT),
    		key = {"${spring.rabbitmq.producer.routingKey1}","${spring.rabbitmq.producer.routingKey2}"}	
    	))
    	public void counsume(String msg){
    		System.out.pringln("消费者收到 direct.queue2队列发的消息",msg);
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    (6)测试类

    @SpringBootTest
    public class SpringBootTest{
    	@AUtowired
    	private RabbitMqProduce producer;
    	
    	@Test
    	public void testSendDirectMsg(){
    		producer.send("direct类型发送消息!!!");
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    三 Topic类型消息

    在这里插入图片描述
    在这里插入图片描述

    1. 控制台操作
      参考前面的创建交换机,队列,以及绑定关系操作
    2. 代码实现
      (1)依赖引入参考fanout类型的消息
      (2)mq消息配置
      路由键使用通配符进行匹配,#代表多个,*代表一个
    spring:
    	rabbitmq:
    		host: 192.168.150.101 #主机ip
    		port: 5672 #端口
    		virtual-host: /hmall #虚拟主机
    		username: hmall #用户名
    		password: 123 #密码
    		exchange: hmall.topic
    		producer:
    			queue1: topic.queue1
    			queue2: topic.queue2
    			routingKey1: china.#
    			routingKey2: #.news
    		
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    (3)MQ配置类

    @Component
    public class RabbitMqConfig {
    	@Autowired
    	private RabbitTemplate rabbitTemplete;
    
    	@value("${spring.rabbitmq.producer.exchange}")
    	private String exchange;
    	
    	@value("${spring.rabbitmq.producer.queue1}")
    	private String queueName1;
    	
    	@value("${spring.rabbitmq.producer.queue2}")
    	private String queueName2;
    	
    	@value("${spring.rabbitmq.producer.routingKey1}")
    	private String routingKey1;
    
    	@value("${spring.rabbitmq.producer.routingKey2}")
    	private String routingKey2;
    
    	// 创建交换机
    	@Bean("topicExchange")
    	public Exchange getExchange(){
    		return ExchangeBuilder
    		.topicExchange(exchange) // 交换机类型,交换机名称
    		.durable(true) //ture为持久化,存到磁盘,false存到内存
    		.build();
    	}
    	// 创建队列
    	@Bean("topicQueue1")
    	public Queue getDirectQueue1(){
    		retuen new Queue(queueName1);
    	}
    
    	// 交换机绑定队列
    	@bean
    	public Binging bindDirectQueue1(@Qualifier("topicExchange") Exchange exchange,@Qualifier("topicQueue1") Queue queue){
    		return BindingBuilder
    		.bind(queue)
    		.to(exchange)
    		.with(routingKey1)
    		.noargs();	
    	}
    
    
    	// 创建队列
    	@Bean("topicQueue2")
    	public Queue getDirectQueue2(){
    		retuen new Queue(queueName2);
    	}
    
    	// 交换机绑定队列
    	@bean
    	public Binging bindDirectQueue2(@Qualifier("topicExchange") Exchange exchange,@Qualifier("topicQueue2") Queue queue){
    		return BindingBuilder
    		.bind(queue)
    		.to(exchange)
    		.with(routingKey2)
    		.noargs();	
    	}
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62

    (4)生产者发送消息

    @Component
    public class RabbitMqProduce {
    	@Autowired
    	private RabbitTemplate rabbitTemplete;
    	
    	@value("${spring.rabbitmq.producer.queue1})
    	private String queueName1;
    	@value("${spring.rabbitmq.producer.queue2})
    	private String queueName2;
    
    	@value("${spring.rabbitmq.producer.routingKey2})
    	private String routingKey1;
    
    	@value("${spring.rabbitmq.producer.routingKey1})
    	private String routingKey2;
    	/**
    	* 入参说明:
    	* 第一个参数:queueName:队列名称
    	* 第二个参数:路由键,fanout类型不需要路由键
    	* 第三个参数:msg 消息题内容
    	*/
    	public void sendQueue1(String msg){
    		
    		rabbitTemplete.covertAndSend(queueName1,routingKey1,msg);
    	}
    	public void sendQueue2(String msg){
    		
    		rabbitTemplete.covertAndSend(queueName2,routingKey2,msg);
    	}
    
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    (5)消费者监听消息

    @Component
    public class RabbitMqListener {
    	
    	@RabbitListener(queues="${spring.rabbitmq.producer.queue1}")
    	public void counsume(String msg){
    		System.out.pringln("消费者收到 topic.queue1队列发的消息",msg);
    	}
    	@RabbitListener(queues="${spring.rabbitmq.producer.queue2}")
    	public void counsume(String msg){
    		System.out.pringln("消费者收到 topic.queue2队列发的消息",msg);
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    (6)测试类

    @SpringBootTest
    public class SpringBootTest{
    	@AUtowired
    	private RabbitMqProduce producer;
    	
    	@Test
    	public void testSendDirectMsg(){
    		producer.send("direct类型发送消息!!!");
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    四 消息转换器

    MQ会把消息体变成字节码
    在这里插入图片描述
    解决办法:使用消息转换器,实现如下:

    1. 在生产者和消费者两个服务引入依赖
    <dependency>
    	<groupId>com.fasterxml.jackson</groupId>
    	<artifactId>jasckson-databind</artifactId>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4
    1. 在生产者和消费者两个服务编写消息转换器配置
    @Component
    public class JacksonMessageConvertor{
    	@Bean
    	public MessageCoverter jacksonMessageConvertor(){
    		return new Jackson2JsonMessageConverter();
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    1. 消息体
      对于生产者来说,是map类型的,则生成者接收的时候也是map类型
      例如:
    @Component
    public class RabbitMqListener {
    	
    	@RabbitListener(queues="${spring.rabbitmq.producer.queue1}")
    	public void counsume(Map<String,Objecct> msg){
    		System.out.pringln("消费者收到 topic.queue1队列发的消息",msg);
    	}
    	
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    五 案例演示

    支付服务支付成功后通知交易服务进行后续操作
    在这里插入图片描述
    生产者和消费者两个服务都需要进行1,2,3步骤

    1. 添加依赖
    <!--mq依赖-->
    <dependency>
    	<groupId>org.springframework.book</groupId>
    	<artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <!--消息转换器依赖-->
    <dependency>
    	<groupId>com.fasterxml.jackson</groupId>
    	<artifactId>jasckson-databind</artifactId>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    1. 添加MQ配置信息
    spring:
    	rabbitmq:
    		host: 192.168.150.101 #主机ip
    		port: 5672 #端口
    		virtual-host: /hmall #虚拟主机
    		username: hmall #用户名
    		password: 123 #密码
    		exchange: pay.topic
    		queue: mark.order.pay.queue
    		routKingKey: pay.success
    		
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    1. 消息转换器配置类
    @Component
    public class JacksonMessageConvertor{
    	@Bean
    	public MessageCoverter jacksonMessageConvertor(){
    		return new Jackson2JsonMessageConverter();
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    1. 生产者
      (1)生产者的配置
    @Component
    public class Rabbitroducer {
    	@Autowired
    	private RabbitTemplate rabbitTemplete;
    	
    	@value("${spring.rabbitmq.queue})
    	private String queueName;
    
    	@value("${spring.rabbitmq.routingKey})
    	private String routingKey;
    
    	/**
    	* 入参说明:
    	* 第一个参数:queueName:队列名称
    	* 第二个参数:路由键,fanout类型不需要路由键
    	* 第三个参数:msg 消息题内容
    	*/
    	public void sendMsg(String msg){
    		// 发送消息
    		rabbitTemplete.covertAndSend(queueName,routingKey, msg);
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    (2)业务代码支付成功发送消息

    public class payOrderServiceImpl impletement PayOrderService{
    	@Autowrid
    	private RabbitProducer payProducer;
    	@Overirid
    	@Transactional(rollback = Exception.class)
    	public void payOrder(PayOrderDto payOrder){
    		// 一些列操作最终交易成功
    		// 发送消息通知
    		try{
    			payProducer.send(payOrder.getId());
    		}catch(AmqpException e){
    			log.error("交易成功,发送消息异常:{}",e.getMessages(););
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    1. 消费者
    @Component
    public class PaySatusListener {
    
    	@Autowired
    	private OrderService orderService;
    	
    	@RabbitListener(bindings = 
    	@QueueBinding(
    		value = Queue(name="${spring.rabbitmq.queue}",durable="true"),
    		exchange = @Exchange(name="${spring.rabbitmq.exchange)",type=ExchangeType.TOPIC),
    		key = {"${spring.rabbitmq.routingKey}"}	
    	))
    	public void listenOrderPay(Long orderId){
    		//标记订单为已支付
    		orderService.markOrderPaySuccess(orderId);
    	}
    		
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
  • 相关阅读:
    C# 学习第七弹——数组
    MySQL数据库管理
    【JSON】java获取json数组格式中的值
    论坛万能粘贴手(可将任意文件转为文本)
    mysql分页查询遇到order by发生的血案
    为什么使用Hooks?
    ElasticSearch知识点
    Dubbo3应用开发——架构的演变过程
    神经网络在科研中的应用,神经网络技术及其应用
    JAVA自行车在线租赁管理系统2021计算机毕业设计Mybatis+系统+数据库+调试部署
  • 原文地址:https://blog.csdn.net/hcyxsh/article/details/134218813