他的主要作用就是增加消费者的个数,可以提高消息处理速度,避免队列消息堆积。
首先修改一下之前的发送消息的代码,让他循环发送50次,但是不要一次性发完:
@Test
void LoopSend() throws InterruptedException {
String queueName = "hello";
String message = "Hello SpringAMQP";
for(int i =0 ;i<=50;i++){
rabbitTemplate.convertAndSend(queueName,message);
Thread.sleep(20);
}
@RabbitListener(queues = "hello")
public void LoopRecv1(String message) throws InterruptedException {
System.out.println("消费者1接收到了消息:【"+message+"】"+ LocalTime.now());
Thread.sleep(20);
@RabbitListener(queues = "hello")
public void LoopRecv2(String message) throws InterruptedException {
System.err.println("消费者2接收到了消息:【"+message+"】"+LocalTime.now());
Thread.sleep(200);
注意,最上面的方法我们把监听消息的注解给注释掉,表示这个方法在本次测试中不再起作用,我们真正要看的是下面的两个代码的执行结果。
好的,现在生产者的代码已经执行完毕,我们来到消费者这边查看日志:
这边出现了一个很神奇的现象,前几秒很和谐,属于是能者多劳,处理快的多处理,处理慢的少处理,但是等过了一半的时候,这边能者不劳了,只剩下一个消费者在消费,让原本一秒钟就能处理完的事情延长了五六秒。
那么为什么会出现消费者消费了一部分之后就不干了这种情况呢,其实我们认为的工作量是一共一起处理50条消息,但是在RabbitMQ做消息推送的时候,他默认采用的一种机制叫做【消息预取】机制,这个机制意思就是说,当我们有大量的消息来到我们的消息队列中时,各自的消费者的Channel,也就是用来操作RMQ的那个工具,会预先将消息队列中的消息拉取到各自的消费者中,但是此时消费者可能还没有消费完数据,但是Channel已经先给你占下了,所以在我们看到来是混在一起的一堆消息,其实他们早就各自分好了自己的工作,当某一个消费者处理完了之后,就不会再去处理其他的消息。
取消消息预取的方法就是通过修改配置文件中的preFetch这个值,可以控制预取消息的上限:
spring:
rabbitmq:
addresses: 192.168.80.4 # 主机名
port: 5672 # 端口号
username: admin # 用户名
password: 123456 # 密码
virtual-host: / # 用户允许访问的虚拟主机
listener:
simple:
prefetch: 1 # 每次只取一条消息,处理完成之后再取下一个
然后我们再次重复刚才重启项目和发送消息的操作,然后再次观察我的控制台的输出日志:
首先可以看到,这的处理事件确实是缩短了不少,并且输出的内容也是有规律了很多。
发布订阅模式与之前案例区别就是允许将同一消息发送给多个消费者。实现方式是加入了exchange(交换机)。注意是同一个消息,不是之前我们连接多个消费者,在之前的案例中我们的消息只能被一个消费者消费,消费完就删除了,现在要做的是让同一条消息被不同的消费者消费。
关键的部分并不在于如何绑定消费者,而是如何设置交换机,以及如何让消息的生产者将消息发送到交换机中去。
注意:exchange负责消息路由,而不是存储,路由失效则消息丢失
Fanout Exchange会将接收到的消息路由到每一个跟其绑定的queue
利用SpringAMQP演示Fanout Exchange的使用
SpringAMQP提供了声明交换机、队列、绑定关系的API,例如:
在consumer服务创建一个类,添加创建交换机,声明队列,以及绑定交换机到队列的过程
package conf;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutConfig {
// 声明交换机
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("itcast.fanout");
}
// 声明第一个队列
@Bean
public Queue queue1(){
return new Queue("fanout.queue1");
}
// 声明第二个队列
@Bean
public Queue queue2(){
return new Queue("fanout.queue2");
}
// 绑定第一个队列
@Bean
public Binding binding1(FanoutExchange fanoutExchange, Queue queue1){
return BindingBuilder.bind(queue1).to(fanoutExchange);
}
// 绑定第二个队列
@Bean
public Binding binding2(FanoutExchange fanoutExchange, Queue queue2){
return BindingBuilder.bind(queue2).to(fanoutExchange);
}
首先这是一个配置类,所以需要使用@Configuration的注解将其标识为一个配置类,然后在其中编写声明方法和绑定方法
首先,在Queue界面,已经出现了我们刚才声明并绑定的两个队列,然后来到Exchanges界面:
@RabbitListener(queues = "fanout.queue1")
public void FanoutQueue1(String message) throws InterruptedException {
System.err.println("fanoutQueue1接收到了消息:【"+message+"】"+LocalTime.now());
Thread.sleep(200);
}
@RabbitListener(queues = "fanout.queue2")
public void FanoutQueue2(String message) throws InterruptedException {
System.err.println("fanoutQueue2接收到了消息:【"+message+"】"+LocalTime.now());
Thread.sleep(200);
@Test
void SendFanoutExchangeTest(){
// 交换机名称
String changeName = "itcast.exchange";
// 消息
String message = "Hello Every One";
// 发送消息
rabbitTemplate.convertAndSend(changeName,"",message);
接下来,我们就直接运行生产者的代码,然后去消费者的服务哪里看一下日志的输出:
可以看到,我们这边已经接收到了对应的消息,并且是两个消费者都接收到了同一个消息。
Direct Exchange会将接收到的消息根据规则路由到指定的Queue,因此称为路由模式(router)
利用SpringAMQP演示DirectExchange的使用
具体的实现方式,除了声明这一部分的代码不一样,剩下的步骤与之前是一样的。
@RabbitListener(bindings = @QueueBinding(
value = @Queue("direct.queue1"),
exchange = @Exchange("itcast.direct"),
key = {"red","blue"}
))
public void DirectExchange1(String message){
System.out.println("fanoutQueue2接收到了消息:【"+message+"】"+LocalTime.now());
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue("direct.queue2"),
exchange = @Exchange(value = "itcast.direct",type = ExchangeTypes.DIRECT),
key = {"red","yellow"}
))
public void DirectExchange2(String message){
System.out.println("fanoutQueue2接收到了消息:【"+message+"】"+LocalTime.now());
首先,方法的执行代码都是一样的,区别就在于RabbitListener的注解内容不一样,首先,使用bindings属性,属性的值是@QueueBinding,然后在这个值里面,使用value设置队列的名称,使用exchange设置路由器的名称和类型,其中默认的类型就是DIRECT,也就是DirectExchange类型,然后设置key,这个key就是BindingKey,等我们的生产者发送消息的时候,也要带上一个相同的RoutingKey才能发送到对应的路由器绑定的消费者中。
此时就多了两个队列,这就表示我们的监听的服务已经没有问题了。
@Test
void SendDirectExchangeTest(){
// 交换机名称
String changeName = "itcast.direct";
// 消息
String message = "Hello yellow";
// 发送消息
rabbitTemplate.convertAndSend(changeName,"yellow",message);
其实和之前没有多大的变动,首先我们的发送的路由器变了,其次就是我们的RoutingKey变了,这个地方我们的RoutingKey是什么,就发到与之相同的BindingKey中去,然后我们运行生产者的代码,并看一下消费者的日志:
这就是根据RoutingKey的不同选择具体由哪一个消费者去消费数据。
基于@RabbitListener注解声明队列和交换机有哪些常见注解?
TopicExchange与DirectExchange类似,区别在于routingKey必须是多个单词的列表,并且以[.]分割。
Queue与Exchange指定BindingKey时可以使用通配符
使用SpringMAQP演示TopicExchange的使用
@RabbitListener(bindings = @QueueBinding(
value = @Queue("direct.queue1"),
exchange = @Exchange(value = "itcast.topic",type = ExchangeTypes.TOPIC),
key = "china.#"
))
public void TopicExchange1(String message){
System.out.println("TopicQueue1接收到了消息:【"+message+"】"+LocalTime.now());
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue("direct.queue2"),
exchange = @Exchange(value = "itcast.topic",type = ExchangeTypes.TOPIC),
key = "#.news"
))
public void TopicExchange2(String message){
System.out.println("TopicQueue2接收到了消息:【"+message+"】"+LocalTime.now());
则表示已经存在一个路由器,且类型并不是type属性指定的类型。
@Test
void SendTopicExchangeTest(){
// 交换机名称
String changeName = "itcast.topic";
// 消息
String message = "发送一个一条中国的消息";
// 发送消息
rabbitTemplate.convertAndSend(changeName,"china.news",message);
首先,我们向chines.news中发送消息,则两个消费者都会收到消息:
然后我们修改RoutingKey,改成china.weather,则只有TopicQueue1能收到消息:
因为TopicQueue1绑定的是china.#即任意china开头的组合。我们再次修改,将RoutingKey修改成Canada.news:
则这次就只有TopicQueue2收到了消息,因为他绑定的是任意开头,以news结尾的组合,然后我们再次修改RoutingKey为National.wealth:
这次的控制台没有任何东西输出,这是因为他并不符合任何的BindingKey,并且我们再来看一下UI界面:
在topic.queue队列中,并没有存在消息,这也说明了我们之前说过的一个路由器的特性,【路由器只负责转发数据,并不存储数据】,也就是说一条消息被路由器拿到了,那就算是被消费了,无论有没有正确的被接收。
消息转换器的概念其实一直存在,比如之前我们在使用SpringAMQP的API发送消息的时候,虽然我们发送的一直都是String类型的消息,但是其实他支持的数据类型是Object:
那么也就是说,这里的消息可以是任意的类型,比如一个Java对象,List对象,那么我们就来测试一下发送一个Java的对象到消息队列中。
由于绑定交换机之后消息就会自动的发送给交换机,所以我们用JavaBean的方式声明一个队列让消息在不消费的情况下可以保存:
@Bean
public Queue ObjectQueue(){
return new Queue("Object.queue");
@Test
void SendObject(){
// 交换机名称
String changeName = "object.topic";
// 消息
HashMap
message.put("姓名","柳岩");
message.put("age",21);
// 发送消息
rabbitTemplate.convertAndSend(changeName,message);
我们创建了一个HashMap类型的对象,并将这个对象发送到消息队列中,我们运行代码:
在Object.queue中确实有一个消息,我们点进去看看消息内容是什么:
首先我们看到消息体,是很长的一大串字符,然后我们在看到上面,消息的类型,是Java序列化对象。
也就是说,他将我们的Java的对象经过序列化之后,再存储到消息队列中,但是这个序列化的方式有问题,他默认使用的是JDK的序列化方式,这种方式产生的序列化后的数据非常的大,而数据越大,在消息队列中就越占资源,而且消息的传输速度也会下降。
那么现在的问题就变成了,如果修改他原本的序列化工具,将默认不好用的序列化工具转换成一个好用的序列化工具。其实方法很简单,首先我们要引入JSON序列化工具的依赖,就是之前我们使用的jackson的依赖,然后我们在启动类中声明一个MessageConverter类型的Bean,这个Bean的返回值就是我们的jackson的序列化工具。其实这就是一个自动装配替换默认配置的过程:
package org.example;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
@SpringBootApplication
public class PublisherApplication {
public static void main(String[] args) {
SpringApplication.run(PublisherApplication.class);
}
@Bean
public MessageConverter jacksonConverter(){
return new Jackson2JsonMessageConverter();
}
注意,如果是你自己手动写的话,我们在导入MessageConverter的时候有很多的选项,也就是在很多的包里面都存在一个叫做MessageConverter的对象,但是我们要选择的接口是在如下的路径中:
import org.springframework.amqp.support.converter.MessageConverter;
你导错包的话会报错的,如果你发现你的返回值报错,就检查一下是否是因为导错包而导致的报错。
接下来,我们首先来到UI管理界面,我们把之前的序列化类给清理掉:
我们点击这个Purge Message,将之前的消息给清理掉:
现在这个队列就是空的了,然后我们继续运行刚才的发送对象消息的代码:
好的,首先是运行成功,然后我们来到UI管理界面中查看消息的状态:
首先,这次我们看到消息的类型复杂了很多,并且最终的类型是json类型,并且我们也可以清楚的看到消息的内容是什么了。
这就是我们将默认的序列化工具变成了jackson工具,并完成了消息对象转json的过程。
消息发送完成了,接下来就是要接受,或者说是消费这个消息,然后我们就需要修改consumer中的代码:
首先第一步就是导入依赖,不过由于我们在父工程中已经导入过了,所以在子模块中就不需要导入了。
然后就是设置JavaBean,使用jackson去替换默认的JDK的序列化工具。
@RabbitListener(queues = "Object.queue")
public void ReadObjectMessage(HashMap
System.out.println("对象接受到了,内容是:"+message.toString());
注意在形参上,因为之前我们是使用的String,所以消息参数一直都是String类型,但是现在,由于我们发送消息的时候使用的是HashMap的方式,所以现在我们接收消息也要使用HashMap的方式,与消息的发送类型是一眼的,然后我们重启服务器,并清空日志:
很好,我们这边也确实是收到了消息,这就表示我们的消息的接受者也完成了。