上一篇文章《基础入门案例》
整体核心
生产者:交换机绑定队列
# 服务端口
server:
port: 8080
# 配置rabbitmq
spring:
rabbitmq:
username: admin
password: admin
virtual-host: /
host: 192.168.159.100
port: 5672
<dependencies>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-amqpartifactId>
dependency>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-webartifactId>
dependency>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-testartifactId>
<scope>testscope>
dependency>
<dependency>
<groupId>org.springframework.amqpgroupId>
<artifactId>spring-rabbit-testartifactId>
<scope>testscope>
dependency>
dependencies>
package com.vinjcent.rabbitmq;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.UUID;
@SuppressWarnings("all")
@Service
public class OrderServiceImpl {
@Autowired
RabbitTemplate rabbitTemplate;
public void createOrderFanout(){
// 1.根据商品id查询库存是否充足
// 2.保存订单
String orderId = UUID.randomUUID().toString();
// 3.通过MQ来完成消息的分发
// 参数1: 交换机 参数2: 路由key/queue队列名称 参数3: 消息内容
String exchangeName = "fanout_order_exchange";
String routingKey = "";
rabbitTemplate.convertAndSend(exchangeName, routingKey, orderId);
}
}
package com.vinjcent.rabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutRabbitMQConfiguration {
// 1.声明注册fanout模式交换机
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("fanout_order_exchange",true,false);
}
// 2.声明队列 account.fanout.queue、express.fanout.queue、sms.fanout.queue
@Bean
public Queue fanout_accountQueue(){
return new Queue("account.fanout.queue",true);
}
@Bean
public Queue fanout_expressQueue(){
return new Queue("express.fanout.queue",true);
}
@Bean
public Queue fanout_smsQueue(){
return new Queue("sms.fanout.queue",true);
}
// 3.完成绑定关系(队列)
@Bean
public Binding fanout_accountBinding(){
return BindingBuilder.bind(fanout_accountQueue()).to(fanoutExchange());
}
@Bean
public Binding fanout_expressBinding(){
return BindingBuilder.bind(fanout_expressQueue()).to(fanoutExchange());
}
@Bean
public Binding fanout_smsBinding(){
return BindingBuilder.bind(fanout_smsQueue()).to(fanoutExchange());
}
}
@SpringBootTest
class SpringbootOrderRabbitmqProducerApplicationTests {
@Autowired
OrderServiceImpl orderService;
@Test
void contextLoads() {
orderService.createOrderFanout("1","1",12);
}
}
消费者接收消息
# 应用服务 WEB 访问端口
server:
port: 80
# 配置rabbitmq
spring:
rabbitmq:
username: admin
password: admin
virtual-host: /
host: 192.168.159.100
port: 5672
application:
# 应用名称
name: springboot-order-rabbitmq-consumer
service
包下添加一个fanout
包,添加以下类,注入Spring容器中(1)FanoutAccountConsumer.java监听队列account.fanout.queue
、express.fanout.queue
、sms.fanout.queue
package com.vinjcent.rabbitmq.service.fanout;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
@Service
@RabbitListener(queues = {"account.fanout.queue"})
public class FanoutAccountConsumer {
@RabbitHandler
public void receiveMessage(String message){
System.out.println("account.fanout.queue===>" + message);
}
}
(2)FanoutExpressConsumer.java
package com.vinjcent.rabbitmq.service.fanout;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
@Service
@RabbitListener(queues = {"express.fanout.queue"})
public class FanoutExpressConsumer {
@RabbitHandler
public void receiveMessage(String message){
System.out.println("express.fanout.queue===>" + message);
}
}
(3)FanoutSMSConsumer.java
package com.vinjcent.rabbitmq.service.fanout;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
@Service
@RabbitListener(queues = {"sms.fanout.queue"})
public class FanoutSMSConsumer {
@RabbitHandler
public void receiveMessage(String message){
System.out.println("sms.fanout.queue===>" + message);
}
}
运行springboot-order-rabbitmq-consumer工程
查看控制台打印结果
1)生产者:交换机绑定队列
service
包下OrderServiceImpl.java添加如下;并配置一个DirectRabbitMQConfiguration.java类OrderServiceImpl.java
package com.vinjcent.rabbitmq;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.UUID;
@SuppressWarnings("all")
@Service
public class OrderServiceImpl {
@Autowired
RabbitTemplate rabbitTemplate;
// fanout模式...
// direct模式
public void createOrderDirect(String userId, String productId,int num){
// 1.根据商品id查询库存是否充足
// 2.保存订单
String orderId = UUID.randomUUID().toString();
System.out.println("订单生产成功: " + orderId);
// 3.通过MQ来完成消息的分发
// 参数1: 交换机 参数2: 路由key/queue队列名称 参数3: 消息内容
String exchangeName = "direct_order_exchange";
String routingKey = "";
// 根据路由推送消息
rabbitTemplate.convertAndSend(exchangeName, "account", orderId);
rabbitTemplate.convertAndSend(exchangeName, "express", orderId);
}
}
DirectRabbitMQConfiguration.java
package com.vinjcent.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DirectRabbitMQConfiguration {
// 1.声明注册fanout模式交换机
@Bean
public DirectExchange directExchange(){
return new DirectExchange("direct_order_exchange",true,false);
}
// 2.声明队列 account.direct.queue、express.direct.queue、sms.direct.queue
@Bean
public Queue direct_accountQueue(){
return new Queue("account.direct.queue",true);
}
@Bean
public Queue direct_expressQueue(){
return new Queue("express.direct.queue",true);
}
@Bean
public Queue direct_smsQueue(){
return new Queue("sms.direct.queue",true);
}
// 3.完成绑定关系(队列)
// direct模式比fanout模式多了一个路由key
@Bean
public Binding direct_accountBinding(){
return BindingBuilder.bind(direct_accountQueue()).to(directExchange()).with("account");
}
@Bean
public Binding direct_expressBinding(){
return BindingBuilder.bind(direct_expressQueue()).to(directExchange()).with("express");
}
@Bean
public Binding direct_smsBinding(){
return BindingBuilder.bind(direct_smsQueue()).to(directExchange()).with("sms");
}
}
@SpringBootTest
class SpringbootOrderRabbitmqProducerApplicationTests {
@Autowired
OrderServiceImpl orderService;
@Test
void testFanout() {
orderService.createOrderFanout("1","1",12);
}
// 运行该测试用例
@Test
void testDirect() {
orderService.createOrderDirect("1","1",12);
}
}
【注】先要启动生产者再自动消费者,不然交换机不存在会报错
2)消费者接收消息
service
包下添加一个direct
包,添加以下类,注入Spring容器中(1)DirectAccountConsumer.java监听队列account.direct.queue
@Service
@RabbitListener(queues = {"account.direct.queue"})
public class DirectAccountConsumer {
@RabbitHandler
public void receiveMessage(String message){
System.out.println("account.direct.queue===>" + message);
}
}
(2)DirectExpressConsumer.java监听队列express.direct.queue
@Service
@RabbitListener(queues = {"express.direct.queue"})
public class DirectExpressConsumer {
@RabbitHandler
public void receiveMessage(String message){
System.out.println("express.direct.queue===>" + message);
}
}
(3)DirectSMSConsumer.java监听队列sms.direct.queue
@Service
@RabbitListener(queues = {"sms.direct.queue"})
public class DirectSMSConsumer {
@RabbitHandler
public void receiveMessage(String message){
System.out.println("sms.direct.queue===>" + message);
}
}
运行springboot-order-rabbitmq-consumer
查看控制台打印结果
使用注解方式实现绑定
消费者
service
包下添加一个topic
包,添加以下类,注入Spring容器中(1)TopicAccountConsumer.java
package com.vinjcent.rabbitmq.service.topic;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Service;
@Service
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "account.topic.queue", durable = "true", autoDelete = "false"),
exchange = @Exchange(value = "topic_order_exchange", type = ExchangeTypes.TOPIC),
key = "#.account.#"
))
public class TopicAccountConsumer {
@RabbitHandler
public void receiveMessage(String message){
System.out.println("account.topic.queue===>" + message);
}
}
(2)TopicExpressConsumer.java
package com.vinjcent.rabbitmq.service.topic;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Service;
@Service
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "express.topic.queue", durable = "true", autoDelete = "false"),
exchange = @Exchange(value = "topic_order_exchange", type = ExchangeTypes.TOPIC),
key = "*.express.#"
))
public class TopicExpressConsumer {
@RabbitHandler
public void receiveMessage(String message){
System.out.println("express.topic.queue===>" + message);
}
}
(3)TopicSMSConsumer.java
package com.vinjcent.rabbitmq.service.topic;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Service;
@Service
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "sms.topic.queue", durable = "true", autoDelete = "false"),
exchange = @Exchange(value = "topic_order_exchange", type = ExchangeTypes.TOPIC),
key = "sms.#"
))public class TopicSMSConsumer {
@RabbitHandler
public void receiveMessage(String message){
System.out.println("sms.topic.queue===>" + message);
}
}
推荐使用配置类的方式实现交换机与队列的绑定,以及路由key规则
生产者
service
包下添加以下内容package com.vinjcent.rabbitmq;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.UUID;
@SuppressWarnings("all")
@Service
public class OrderServiceImpl {
@Autowired
RabbitTemplate rabbitTemplate;
// fanout模式
public void createOrderFanout(String userId, String productId,int num){
//......
}
// direct模式
public void createOrderDirect(String userId, String productId,int num){
//......
}
// topic
public void createOrderTopic(String userId, String productId,int num){
// 1.根据商品id查询库存是否充足
// 2.保存订单
String orderId = UUID.randomUUID().toString();
System.out.println("订单生产成功: " + orderId);
// 3.通过MQ来完成消息的分发
// 参数1: 交换机 参数2: 路由key/queue队列名称 参数3: 消息内容
String exchangeName = "topic_order_exchange";
// 路由给三个消息队列推送消息
String routingKey = "sms.express.account.xxxxx";
/*
* #.account.#
* *.express.#
* sms.#
*/
rabbitTemplate.convertAndSend(exchangeName, routingKey, orderId);
}
}
@SpringBootTest
class SpringbootOrderRabbitmqProducerApplicationTests {
@Autowired
OrderServiceImpl orderService;
@Test
void contextLoads() {
orderService.createOrderFanout("1","1",12);
}
@Test
void testDirect() {
orderService.createOrderDirect("1","1",12);
}
// 运行该测试用例
@Test
void testTopic() {
orderService.createOrderTopic("1","1",12);
}
}
下一篇文章《RabbitMQ高级 - 过期时间 TTL》