目录
创建一个父工程和两个子工程consumer和publisher

命名为simple.queue

- <dependencies>
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- </dependency>
- <!--AMQP依赖,包含RabbitMQ-->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
- <!--单元测试-->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- </dependency>
- </dependencies>
- spring:
- rabbitmq:
- host: 192.168.200.129 # 你的虚拟机IP
- port: 5672 # 端口
- virtual-host: / # 虚拟主机
- username: admin # 用户名
- password: 123456 # 密码
- @SpringBootTest
- public class SpringAmqpTest {
-
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- @Test
- public void testSimpleQueue() {
- // 队列名称
- String queueName = "simple.queue";
- // 消息
- String message = "hello, rabbitmq!";
- // 发送消息
- rabbitTemplate.convertAndSend(queueName, message);
- }
- }
查看消息


- spring:
- rabbitmq:
- host: 192.168.200.129 # 你的虚拟机IP
- port: 5672 # 端口
- virtual-host: / # 虚拟主机
- username: admin # 用户名
- password: 123456 # 密码
- @Component
- public class SpringRabbitListener {
- // 利用RabbitListener来声明要监听的队列信息
- // 将来一旦监听的队列中有了消息,就会推送给当前服务,调用当前方法,处理消息。
- // 可以看到方法体中接收的就是消息体的内容
- @RabbitListener(queues = "simple.queue")
- public void listenSimpleQueueMessage(String msg) throws InterruptedException {
- System.out.println("spring 消费者接收到消息:【" + msg + "】");
- }
- }
结果

当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。 此时就可以使用work 模型,多个消费者共同处理消息处理,消息处理的速度就能大大提高了
命名为work.queue

- /**
- * workQueue
- * 向队列中不停发送消息,模拟消息堆积。
- */
- @Test
- public void testWorkQueue() throws InterruptedException {
- // 队列名称
- String queueName = "work.queue";
- // 消息
- String message = "hello, message_";
- for (int i = 0; i < 50; i++) {
- // 发送消息,每20毫秒发送一次,相当于每秒发送50条消息
- rabbitTemplate.convertAndSend(queueName, message + i);
- Thread.sleep(20);
- }
- }
- @RabbitListener(queues = "work.queue")
- public void listenWorkQueue1(String msg) throws InterruptedException {
- System.out.println("消费者1接收到消息:【" + msg + "】");
-
- }
-
- @RabbitListener(queues = "work.queue")
- public void listenWorkQueue2(String msg) throws InterruptedException {
- System.err.println("消费者2........接收到消息:【" + msg + "】");
-
- }
结果

如果消费者睡眠时间不同
- @RabbitListener(queues = "work.queue")
- public void listenWorkQueue1(String msg) throws InterruptedException {
- System.out.println("消费者1接收到消息:【" + msg + "】");
- Thread.sleep(20);
- }
-
- @RabbitListener(queues = "work.queue")
- public void listenWorkQueue2(String msg) throws InterruptedException {
- System.err.println("消费者2........接收到消息:【" + msg + "】");
- Thread.sleep(200);
- }

消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。导致1个消费者空闲,另一个消费者忙的不可开交。没有充分利用每一个消费者的能力,最终消息处理的耗时远远超过了1秒。这样显然是有问题的。
在spring中有一个简单的配置,可以解决这个问题。我们修改consumer服务的application.yml文件,添加配置:
- spring:
- rabbitmq:
- listener:
- simple:
- prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

可以发现,由于消费者1处理速度较快,所以处理了更多的消息;消费者2处理速度较慢。而最终总的执行耗时也大大提升。 正所谓能者多劳,这样充分利用了每一个消费者的处理能力,可以有效避免消息积压问题。

在订阅模型中,多了一个exchange角色,而且过程略有变化:
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
交换机的类型有四种:
Fanout,英文翻译是扇出,我觉得在MQ中叫广播更合适。 在广播模式下,消息发送流程是这样的:

创建两个队列fanout.queue1和fanout.queue2,绑定到交换机hmall.fanout

创建一个名为fanout的交换机,类型是Fanout
3、 绑定交换机
生产者
- /**
- * Fanout交换机
- */
- @Test
- public void testFanoutExchange() {
- // 交换机名称
- String exchangeName = "mq.fanout";
- // 消息
- String message = "hello, everyone!";
- rabbitTemplate.convertAndSend(exchangeName, "", message);
- }
消费者
- @RabbitListener(queues = "fanout.queue1")
- public void listenFanoutQueue1(String msg) {
- System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
- }
-
- @RabbitListener(queues = "fanout.queue2")
- public void listenFanoutQueue2(String msg) {
- System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
- }

Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为定向路由。

在Direct模型下:
RoutingKey(路由key)RoutingKey。Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息 首先在控制台声明两个队列direct.queue1和direct.queue2



一个队列绑定两个RoutingKey
生产者:RoutingKey为red
- /**
- * Direct交换机
- */
- @Test
- public void testSendDirectExchange() {
- // 交换机名称
- String exchangeName = "mq.direct";
- // 消息
- String message = "hello direct red";
- // 发送消息
- rabbitTemplate.convertAndSend(exchangeName, "red", message);
- }
消费者
- @RabbitListener(queues = "direct.queue1")
- public void listenDirectQueue1(String msg) {
- System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
- }
-
- @RabbitListener(queues = "direct.queue2")
- public void listenDirectQueue2(String msg) {
- System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
- }
两个队列都收到消息
RoutingKey为blue
- /**
- * Direct交换机
- */
- @Test
- public void testSendDirectExchange() {
- // 交换机名称
- String exchangeName = "mq.direct";
- // 消息
- String message = "hello direct blue";
- // 发送消息
- rabbitTemplate.convertAndSend(exchangeName, "blue", message);
- }
只有队列2收到消息
Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。 只不过Topic类型Exchange可以让队列在绑定BindingKey 的时候使用通配符!
BindingKey 一般都是有一个或多个单词组成,多个单词之间以.分割,例如: item.insert
通配符规则:
#:匹配一个或多个词*:匹配不多不少恰好1个词举例:
item.#:能够匹配item.spu.insert 或者 item.spuitem.*:只能匹配item.spu 
示例
publisher发送的消息使用的RoutingKey共有四种:
china.news 代表有中国的新闻消息;china.weather 代表中国的天气消息;japan.news 则代表日本新闻japan.weather 代表日本的天气消息;解释:
topic.queue1:绑定的是china.# ,凡是以 china.开头的routing key 都会被匹配到,包括:
china.newschina.weathertopic.queue2:绑定的是#.news ,凡是以 .news结尾的 routing key 都会被匹配。包括:
china.newsjapan.news


生产者:RoutingKey为china.news
- /**
- * topicExchange
- */
- @Test
- public void testSendTopicExchange() {
- // 交换机名称
- String exchangeName = "mq.topic";
- // 消息
- String message = "hello topis china.news";
- // 发送消息
- rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
- }
消费者
- @RabbitListener(queues = "topic.queue1")
- public void listenTopicQueue1(String msg){
- System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
- }
-
- @RabbitListener(queues = "topic.queue2")
- public void listenTopicQueue2(String msg){
- System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
- }

RoutingKey为china.people
- /**
- * topicExchange
- */
- @Test
- public void testSendTopicExchange() {
- // 交换机名称
- String exchangeName = "mq.topic";
- // 消息
- String message = "hello topis china.people";
- // 发送消息
- rabbitTemplate.convertAndSend(exchangeName, "china.people", message);
- }
只有消费者1收到消息

SpringAMQP提供了几个类,用来声明队列、交换机及其绑定关系:
1、Queue:用于声明队列,可以用工厂类QueueBuilder构建
2、Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建
3、Binding:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建

示例:创建Fanout交换机队列
- @Configuration
- public class FanoutConfig {
- /**
- * 声明交换机
- * @return Fanout类型交换机
- */
- @Bean
- public FanoutExchange fanoutExchange2(){
- return new FanoutExchange("mq.fanout2");
- }
-
- /**
- * 第1个队列
- */
- @Bean
- public Queue fanoutQueue3(){
- return new Queue("fanout.queue3");
- }
-
- /**
- * 绑定队列和交换机1
- */
- @Bean
- public Binding bindingQueue1(Queue fanoutQueue3, FanoutExchange fanoutExchange3){
- return BindingBuilder.bind(fanoutQueue3).to(fanoutExchange3);
- }
-
- /**
- * 第2个队列
- */
- @Bean
- public Queue fanoutQueue4(){
- return new Queue("fanout.queue4");
- }
-
- /**
- * 绑定队列和交换机2
- */
- @Bean
- public Binding bindingQueue2(){
- return BindingBuilder.bind(fanoutQueue4()).to(fanoutExchange2());
- }
- }
direct示例
- @Configuration
- public class DirectConfig {
-
- /**
- * 声明交换机
- * @return Direct类型交换机
- */
- @Bean
- public DirectExchange directExchange(){
- return ExchangeBuilder.directExchange("mq.direct2").build();
- }
-
- /**
- * 第1个队列
- */
- @Bean
- public Queue directQueue1(){
- return new Queue("direct.queue1");
- }
-
- /**
- * 绑定队列和交换机
- */
- @Bean
- public Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){
- return BindingBuilder.bind(directQueue1).to(directExchange).with("red");
- }
- /**
- * 绑定队列和交换机
- */
- @Bean
- public Binding bindingQueue1WithBlue(Queue directQueue1, DirectExchange directExchange){
- return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");
- }
-
- /**
- * 第2个队列
- */
- @Bean
- public Queue directQueue2(){
- return new Queue("direct.queue2");
- }
-
- /**
- * 绑定队列和交换机
- */
- @Bean
- public Binding bindingQueue2WithRed(Queue directQueue2, DirectExchange directExchange){
- return BindingBuilder.bind(directQueue2).to(directExchange).with("red");
- }
- /**
- * 绑定队列和交换机
- */
- @Bean
- public Binding bindingQueue2WithYellow(Queue directQueue2, DirectExchange directExchange){
- return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");
- }
- }
direct模式由于要绑定多个KEY,会非常麻烦,每一个Key都要编写一个binding
- /**
- * 注解声明交换机
- * @param msg
- */
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "direct.queue3"),//队列名称
- exchange = @Exchange(name = "mq.direct", //交换机名称
- type = ExchangeTypes.DIRECT),//交换机类型
- key = {"red", "blue"}//RoutingKey
- ))
- public void listenDirectQueue3(String msg){
- System.out.println("消费者3接收到direct.queue3的消息:【" + msg + "】");
- }
-
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "direct.queue4"),
- exchange = @Exchange(name = "mq.direct", type = ExchangeTypes.DIRECT),
- key = {"red", "yellow"}
- ))
- public void listenDirectQueue4(String msg){
- System.out.println("消费者4接收到direct.queue4的消息:【" + msg + "】");
- }
队列
交换机
- @Test
- public void testSendMap() throws InterruptedException {
- // 准备消息
- Map<String,Object> msg = new HashMap<>();
- msg.put("name", "张三");
- msg.put("age", 21);
- // 发送消息
- rabbitTemplate.convertAndSend("object.queue", msg);
- }
当发送的数据为Objiet类型时会出现乱码现象,而在数据传输时,它会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。 只不过,默认情况下Spring采用的序列化方式是JDK序列化

在publisher和consumer两个服务中都引入依赖:
- <dependencies>
- <!--mq消息转换为json-->
- <dependency>
- <groupId>com.fasterxml.jackson.dataformat</groupId>
- <artifactId>jackson-dataformat-xml</artifactId>
- <version>2.9.10</version>
- </dependency>
- </dependencies>
注意:如果项目中引入了spring-boot-starter-web依赖,则无需再次引入Jackson依赖。
配置消息转换器,在publisher和consumer两个服务的启动类中添加一个Bean即可:
- @Bean
- public MessageConverter messageConverter(){
- // 1.定义消息转换器
- Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
- // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
- jackson2JsonMessageConverter.setCreateMessageIds(true);
- return jackson2JsonMessageConverter;
- }
结果

消费者接收Object
我们在consumer服务中定义一个新的消费者,publisher是用Map发送,那么消费者也一定要用Map接收,格式如下:
- @RabbitListener(queues = "object.queue")
- public void listenSimpleQueueMessage(Map<String, Object> msg) throws InterruptedException {
- System.out.println("消费者接收到object.queue消息:【" + msg + "】");
- }
