基本消息队列
工作消息队列

发布订阅
含有交换机,完整的消息驱动模型

基本的模型,只包含三个角色,生产者,队列,消费者

demo地址:
https://blog.csdn.net/m0_49194578/article/details/122247212
如果遇到不显示统计信息可以参考
https://www.freesion.com/article/20611339568/

SpringAMQP底层就是对RabiitMQ的封装

导入依赖
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-amqpartifactId>
dependency>
修改配置文件
server:
port: 8080
spring:
rabbitmq:
host: 159.203.111.111
port: 5672
virtual-host: / #虚拟主机
username: api
password: 12312312
@SpringBootTest
@RunWith(SpringRunner.class)
public class RabbitTest {
@Resource
private RabbitTemplate rabbitTemplate;
@Test
public void test(){
String queue = "helloQueue";
String msg = "我是来自AMQP的消息";
rabbitTemplate.convertAndSend(queue,msg);
}
}
这是刚才的原api的消费者收到的

主动接收消息
public String receive(String queueName){
Message receive = rabbitTemplate.receive(queueName);
if (receive==null||receive.getBody()==null)
return "";
return new String(receive.getBody());
}
接收效果

这里有一个问题,由于这里只是主动接收,我们总不能死循环一直调用吧,所以我们需要使用另外一个方法,监听这这个队列;
方法如下
监听队列
@Component
@Slf4j
public class SpringRabbitListener {
// 声明监听的队列
@RabbitListener(queues = {"helloQueue","bagaQueue"})
public void listenSimpleQueueMessage(String message){
log.info("接收到了消息:【{}】",message);
}
}
然后我使用原来api的方法进行发送消息,接收效果如下


其实就是使用多个消费者同时监听一个队列
接收消息
@Component
@Slf4j
public class SpringRabbitListener {
@RabbitListener(queues = "helloQueue")
public void listenSimpleQueueMessage1(String message){
log.info("消费者1接收到了消息:【{}】",message);
}
@RabbitListener(queues = "helloQueue")
public void listenSimpleQueueMessage2(String message){
log.info("消费者2接收到了消息:【{}】",message);
}
}
消息发送
for (int i = 0; i < 20; i++) {
channel.basicPublish("", queueName, null, (LocalTime.now() + ":【"+i+"】"+msg).getBytes());
Thread.sleep(500);
}
接收效果

@RabbitListener(
queuesToDeclare = @Queue(name = "helloQueue1",durable = "false",autoDelete = "true"))
public void listenSimpleQueueMessage1(String message) throws InterruptedException {
log.info("消费者1接收到了消息:【{}】",message);
}
可以看到默认是平均分配;这是默认的预取机制,这样有一个坏处,如果有某一台消费服务处理的比较慢,那么就会使得整体处理时间降低;
我们期望的效果是,处理快的服务器多处理几条,处理慢的少处理几条
预取会导致消费者会预先取很多条消息,然后再处理,这里我们只需要修改最大能预取的上限就可以了,这里我们设置为1,也就是说处理完手头的工作才能处理下一条,默认是无限的
server:
port: 8080
spring:
rabbitmq:
host: 159.203.91.216
port: 5672
virtual-host: / #虚拟主机
username: api
password: leavemealone
listener:
simple:
prefetch: 1
logging:
pattern:
dateformat: yyyy-MM-dd HH:mm:ss:SSS
我们修改配置文件,并且给消费者设置一个休眠时间模拟处理快慢的服务器
@RabbitListener(queues = "helloQueue")
public void listenSimpleQueueMessage1(String message) throws InterruptedException {
log.info("消费者1接收到了消息:【{}】",message);
Thread.sleep(50);
}
@RabbitListener(queues = "helloQueue")
public void listenSimpleQueueMessage2(String message) throws InterruptedException {
log.error("消费者2接收到了消息:【{}】",message);
Thread.sleep(200);
}
执行效果



FanoutExchange会把消息分发给绑定在交换机上的每一个队列


@Configuration
public class FanoutConfig {
// 配置类执行完成以后rabbit就拥有了这个交换机和队列
// 声明FanoutExchange交换机
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("doria.fanout");
}
// 声明第一个队列
@Bean(value = "doriaQueue1")
public Queue fanoutQueue1() {
return new Queue("doria.queue1");
}
// 声明第二个队列
@Bean(value = "doriaQueue2")
public Queue fanoutQueue2() {
return new Queue("doria.queue2");
}
/**
* 绑定队列和交换机的方法
* 这里自动根据bean名称完成注入到方法参数并执行bean程序完成绑定,注意这里如果指定的bean名称出现异常
* 就会默认按类型注入,然后发现两个一样类型的bean从而报错,方法名或者name和value值可以指定bean名称
* Queue对象必须在Bena容器重否则会报错,根虚拟机"/"没有xxx队列
* @param doriaQueue1 队列
* @param fanoutExchange 交换机
* @return
*/
@Bean
public Binding bindingQueue1(Queue doriaQueue1, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(doriaQueue1).to(fanoutExchange);
}
@Bean
public Binding bindingQueue2(Queue doriaQueue2, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(doriaQueue2).to(fanoutExchange);
}
}
启动服务
查看管理平台
队列

交换机

点击进入交换机可以看到

@Component
@Slf4j
public class SpringRabbitListener {
// 这里写的就是我们绑定到交换机的队列的名称.原来我们绑定这个,当消息发送给队列的时候就能收到。
// 现在我们绑定了交换机,我们依然监听这个队列,需要改变的是我们发送时,这次我们发送给了交换机
// 交换机收到了消息就会发送给我们这个队列,所以只需要改变发送到交换机就可以了
@RabbitListener(queues = "doria.queue1")
public void listenFanoutQueue1(String message){
log.info("接收到了doria.queue1的消息:【{}】",message);
}
// 这里写的就是我们绑定到交换机的队列的名称
@RabbitListener(queues = "doria.queue2")
public void listenFanoutQueue2(String message){
log.info("接收到了doria.queue2的消息:【{}】",message);
}
}
@Test
public void sendToFanout() {
String exchangeName = "doria.fanout"; // 交换机名称
String message = "发送给所有人的消息!!!";
// 路由名称,路由键,消息;;;路由键以后再解释,这里先不写
rabbitTemplate.convertAndSend(exchangeName,"",message);
}
效果如图:
两个队列都收到了消息


https://blog.csdn.net/Va1mZzz/article/details/120834699
@RabbitListener( // 配置监听
bindings = @QueueBinding( // 配置队列绑定
value = @Queue( // 配置队列
value = "doria.queue.annotation", // 队列名称
durable = "true", // 是否持久化,重启rabbitmq后队列是否继续存在
autoDelete = "false" // 当所有消费者客户端都断开链接后,是否自动删除队列.true是,false否(默认)
),
exchange = @Exchange( // 配置交换机
value = "doria.fanout", // 配置交换机名称
type = ExchangeTypes.FANOUT, // 配置交换机类型
durable = "true", // 是否持久化,重启rabbitmq后交换机是否继续存在
autoDelete = "false" // 当所有绑定队列都不再使用时是否自动删除交换机.true是,false否(默认)
),
key = "r" // 路由键
)
)
public void listenFanoutQueueAnnotation(String message){
log.info("接收到了doria.queue注解的消息:【{}】",message);
}
能和前面两个队列一样从相同的交换机中读取消息

一个注解,内部声明了队列,并建立绑定关系,就是这么神奇!!!
注意@QueueBinding注解的三个属性:
value: @Queue 注解,用于声明队列,value 为 queueName, durable 表示队列是否持久化, autoDelete 表示没有消费者之后队列是否自动删除
exchange: @Exchange 注解,用于声明 exchange, type 指定消息投递策略,我们这里用的 topic 方式
key: 在 topic 方式下,这个就是我们熟知的 routingKey
以上,就是在队列不存在时的使用姿势,看起来也不复杂
**1.**如果声明的交换机是一个现有的交换机,而现有的交换机类型,以及其他配置和这次在注解中声明的类型不一致时会爆出这个异常
如果你是新建交换机,则不会出现,总之,这个功能将会在交换机没有时自动创建交换机,如果有同名交换机,就会直接使用他,如果同名交换机但是有配置不一样,就会抛出异常
**2.**同样的,队列也遵从这一点,如果出现同名队列但是配置不一样,也会出现异常


描述:
消息在发布的时候可以指定一个routingKey,队列在声明绑定关系的时候也可以指定一个bindingKey,一个队列和交换机之间可以指定多个key,不同的队列之间也可以指定相同的key,也就是说它只要给所有队列指定相同的key就可以模范Fanout路由器了,只要key能命中就有资格路由,路由会把消息路由给所有key值匹配的队列;
为什么不给队列直接指定key?
因为一个队列可以绑定多个交换机,如果队列的key写死了,那他在和别的交换机绑定的时候怎么办,还是只能是这个key吗,所以这里叫做bindKey,一个交换机和一个队列有一个绑定Key交换机遵循key对消息进行路由
配置声明交换机队列绑定规则以及配置
这里我们就使用注解完成绑定了!:
不用去麻烦的配置了,直接在监听器上面描述,而且当监听器或者路由不存在的时候就会自动创建
@RabbitListener(
// 一个监听器也可以绑定多个路由器和队列的关系
bindings = {
@QueueBinding(
value = @Queue(
name = "red.queue.red",
autoDelete = "false",
durable = "true"
),
exchange = @Exchange(
name = "direct.exchange",
type = ExchangeTypes.DIRECT,
autoDelete = "false",
durable = "true"
),
key = {"red","all"}
)
})
public void directListenerRed(String message) {
log.info("[红方]收到了消息:【{}】",message);
}
@RabbitListener(
// 一个监听器也可以绑定多个路由器和队列的关系
bindings = {
@QueueBinding(
value = @Queue(
name = "red.queue.blue",
autoDelete = "false",
durable = "true"
),
exchange = @Exchange(
name = "direct.exchange",
type = ExchangeTypes.DIRECT,
autoDelete = "false",
durable = "true"
),
key = {"blue","all"}
)
})
public void directListenerBlue(String message) {
log.info("[蓝方]收到了消息:【{}】",message);
}
发送消息到交换机根据bindingKey进行路由
/**
* 发送消息到direct路由
*
* @param routingKey 路由key,对应绑定时指定的bindingKey
* @param message 消息内容
* @return 发送结果
*/
public boolean directSender(String routingKey, String message) {
String exchangeName = "direct.exchange";
rabbitTemplate.convertAndSend(exchangeName,routingKey,message);
return true;
}
发送测试
三次发送分别为
发送结果:


这个模式其实就相当于direct的升级版,当我们绑定多个的时候可以使用topic模式进行简化;
在队列和交换机进行绑定时,指定的bindingKey,可以使用通配符进行匹配,匹配到的就会进行路由分发;
*#:0或者多个单词
:一个单词
虽然发送的api和direct都是一样的,但是如果你使用derect的路由绑定依然不会触发,,因为路由类型不一样,它依然会全词匹配,只有启用Topic模式的交换机才会触发正则匹配

@RabbitListener(
bindings = @QueueBinding(
value = @Queue(name = "listenerA",durable = "true",autoDelete = "false"),
exchange = @Exchange(name = "topic.exchange",durable = "true",autoDelete = "false",type = ExchangeTypes.TOPIC),
key = {"#.new","japan.weather"}
))
public void listenerA(String message){
log.info("我是A听众,我的订阅规则[{\"#.new\",\"japan.weather\"}],收到的消息是:{}",message);
}
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(name = "listenerB",durable = "true",autoDelete = "false"),
exchange = @Exchange(name = "topic.exchange",durable = "true",autoDelete = "false",type = ExchangeTypes.TOPIC),
key = {"#.weather","china.#"}
))
public void listenerB(String message){
log.info("我是B听众,我的订阅规则[{\"#.weather\",\"china.#\"}],收到的消息是:{}",message);
}
/**
* 发送消息到topic路由
* @param topicRoutingKey 匹配规则,格式为xxx.xxx.xxx
* @param message 发送的消息
*/
public void topicSender(String topicRoutingKey, String message) {
String exchangeName = "topic.exchange";
rabbitTemplate.convertAndSend(exchangeName, topicRoutingKey, message);
}
效果:
测试1


发送japan.new日本新闻,监听所有新闻#.new的听众A收到新闻!
测试2


发送日本天气信息,订阅所有天气信息#.wearther的听众B和订阅日本天气信息japan.weather的听众A收到消息;
测试3


发送中国新闻,订阅中国所有信息china.#的B听众和订阅所有新闻#.new的A听众收到消息
注意不要使用自动创建队列同时监听的注解,那样一会发送出去就看不到了,会被秒接收然后报错,因为那边我们写的是String类型接收

可以看到,被jdk序列化了,jdk序列化长度太长而且不易阅读,而且性能低。我们引入自己的jackson


依赖
<dependency>
<groupId>com.fasterxml.jackson.dataformatgroupId>
<artifactId>jackson-dataformat-xmlartifactId>
<version>2.9.10version>
dependency>
添加配置类
@Configuration
public class RabbitMQConfig {
/**
* 消息序列化转换
*
* @return
*/
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
}
可以看到消息已经变成Json格式了,消息转换回来同样也需要配置这个bean,注意
消息接收
@RabbitListener(
queuesToDeclare = @Queue(name = "helloQueue1",durable = "false",autoDelete = "false"))
public void listenSimpleQueueMessage1(Map<String,Object> message) throws InterruptedException {
log.info("消费者1接收到了消息:【{}】",message);
}
什么类型发的就什么类型接
// 删除队列,并删除绑定关系
rabbitAdmin.deleteQueue("helloQueue1");
// 删除交换和队列的绑定关系,当然需要从spring重拿bean
Queue queue1 = applicationContext.getBean("doriaQueue1", Queue.class);
// 注意这里上下两个bean的名称如果没有设置name就是方法名
FanoutExchange fanoutExchange = applicationContext.getBean("fanoutExchange", FanoutExchange.class);
rabbitAdmin.removeBinding(
BindingBuilder.bind(queue1).to(fanoutExchange)
);
// 单独删除交换机[这里是交换机名称,不是交换机bean的名称]
rabbitAdmin.deleteExchange("doria.fanout");