其他模块和上文保持一致https://blog.csdn.net/weixin_59334478/article/details/127740411?spm=1001.2014.3001.5501
ReceiveServiceImpl实现类
- package com.it.rabbitmq.impl;
-
- import com.it.rabbitmq.ReceiveService;
- import org.springframework.amqp.core.AmqpTemplate;
- import org.springframework.amqp.rabbit.annotation.Exchange;
- import org.springframework.amqp.rabbit.annotation.Queue;
- import org.springframework.amqp.rabbit.annotation.QueueBinding;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Service;
-
- import javax.annotation.Resource;
-
- @Service("receiveService")
- public class ReceiveServiceImpl implements ReceiveService {
- //注入amqp的模板类,利用这个对象来发送和接收消息
- @Resource
- private AmqpTemplate amqpTemplate;
-
-
- @Override
- public void receiveMessage() {
- /**
- * 发送消息
- * 参数1为交换机名称
- * 参数2位RoutingKey
- * 参数3为具体发送的消息数据
- */
- String message= (String) amqpTemplate.receiveAndConvert("bootDirectQueue");
- System.out.println(message);
- }
-
- /**
- * @RabbitListener:用于标记当前方法是一个rabbitmq的消息监听方法,作用是持续性的接收消息
- * 这个方法不需要手动调用,spring会自动监听
- * 属性queues:用于指定一个已经存在的队列名称,用于队列的监听
- * @param message 参数就是接收到的具体消息数据
- */
- @RabbitListener(queues = {"bootDirectQueue"})
- public void directReceive(String message) {
- System.out.println("监听器接收的消息:"+message);
- }
-
- @RabbitListener(bindings = {
- @QueueBinding(value = @Queue(),
- exchange = @Exchange(name="fanoutExchange",type = "fanout")
- )})
- public void fanoutReceive1(String message) {
- System.out.println("fanoutReceive1监听器接收的消息:"+message);
- }
-
-
- @RabbitListener(bindings = {
- @QueueBinding(value = @Queue(),
- exchange = @Exchange(name="fanoutExchange",type = "fanout")
- )})
- public void fanoutReceive2(String message) {
- System.out.println("fanoutReceive2监听器接收的消息:"+message);
- }
-
- }


1.SendService接口
- package com.it.rabbitmq;
-
- public interface SendService {
- void sendMessage(String message);
- void sendFanoutMessage(String message);
- }
SendServiceImpl实现类
- package com.it.rabbitmq.impl;
-
- import com.it.rabbitmq.SendService;
- import org.springframework.amqp.core.AmqpTemplate;
- import org.springframework.stereotype.Service;
-
- import javax.annotation.Resource;
-
- @Service("sendService")
- public class SendServiceImpl implements SendService {
- //注入amqp的模板类,利用这个对象来发送和接收消息
- @Resource
- private AmqpTemplate amqpTemplate;
-
- @Override
- public void sendMessage(String message) {
- /**
- * 发送消息
- * 参数1为交换机名称
- * 参数2位RoutingKey
- * 参数3为具体发送的消息数据
- */
- amqpTemplate.convertAndSend("bootDirectExchange", "bootDirectRouting", message);
- }
-
- @Override
- public void sendFanoutMessage(String message) {
- amqpTemplate.convertAndSend("fanoutExchange","",message);
- }
- }
2.RabbitMQConfig类
- package com.it.rabbitmq.config;
-
- import org.springframework.amqp.core.*;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- @Configuration
- public class RabbitMQConfig {
-
- //配置一个direct类型的交换机
- @Bean
- public DirectExchange directExchange() {
- return new DirectExchange("bootDirectExchange");
-
- }
-
- //配置一个队列
- @Bean
- public Queue directQueue() {
- return new Queue("bootDirectQueue");
- }
-
- /**
- * 配置一个交换机和队列的绑定
- *
- * @param directQueue 需要绑定的队列对象,参数名必须要与某个@Bean的方法名完全相同这样就会自动注入
- * @param directQueue 需要绑定的交换机对象,参数名必须要与某个@Bean的方法名完全相同这样就会自动注入
- * @return
- */
- @Bean
- public Binding directBinding(Queue directQueue, DirectExchange directExchange) {
- //完成绑定:参数1为需要绑定的队列,参数2为需要绑定的交换机,参数3为需要绑定的routingkey
- return BindingBuilder.bind(directQueue).to(directExchange).with("bootDirectRouting");
- }
-
- //配置一个fanout类型的交换机
- @Bean
- public FanoutExchange fanoutExchange() {
- return new FanoutExchange("fanoutExchange");
- }
-
-
- }
3.运行主函数
- package com.it;
-
- import com.it.rabbitmq.SendService;
- import org.springframework.boot.SpringApplication;
- import org.springframework.boot.autoconfigure.SpringBootApplication;
- import org.springframework.context.ApplicationContext;
-
- @SpringBootApplication
- public class RabbitmqSpringbootApplication {
-
- public static void main(String[] args) {
-
- ApplicationContext ac = SpringApplication.run(RabbitmqSpringbootApplication.class, args);
- SendService sendService = (SendService) ac.getBean("sendService");
- // sendService.sendMessage("boot的测试数据");
-
- sendService.sendFanoutMessage("boot的fanout测试数据!");
-
- }
-
- }


ReceiveServiceImpl实现类
- package com.it.rabbitmq.impl;
-
- import com.it.rabbitmq.ReceiveService;
- import org.springframework.amqp.core.AmqpTemplate;
- import org.springframework.amqp.rabbit.annotation.Exchange;
- import org.springframework.amqp.rabbit.annotation.Queue;
- import org.springframework.amqp.rabbit.annotation.QueueBinding;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Service;
-
- import javax.annotation.Resource;
-
- @Service("receiveService")
- public class ReceiveServiceImpl implements ReceiveService {
- //注入amqp的模板类,利用这个对象来发送和接收消息
- @Resource
- private AmqpTemplate amqpTemplate;
-
-
- @Override
- public void receiveMessage() {
- /**
- * 发送消息
- * 参数1为交换机名称
- * 参数2位RoutingKey
- * 参数3为具体发送的消息数据
- */
- String message= (String) amqpTemplate.receiveAndConvert("bootDirectQueue");
- System.out.println(message);
- }
-
- /**
- * @RabbitListener:用于标记当前方法是一个rabbitmq的消息监听方法,作用是持续性的接收消息
- * 这个方法不需要手动调用,spring会自动监听
- * 属性queues:用于指定一个已经存在的队列名称,用于队列的监听
- * @param message 参数就是接收到的具体消息数据
- */
- @RabbitListener(queues = {"bootDirectQueue"})
- public void directReceive(String message) {
- System.out.println("监听器接收的消息:"+message);
- }
-
- @RabbitListener(bindings = {
- @QueueBinding(value = @Queue(),
- exchange = @Exchange(name="fanoutExchange",type = "fanout")
- )})
- public void fanoutReceive1(String message) {
- System.out.println("fanoutReceive1监听器接收的消息:"+message);
- }
-
-
- @RabbitListener(bindings = {
- @QueueBinding(value = @Queue(),
- exchange = @Exchange(name="fanoutExchange",type = "fanout")
- )})
- public void fanoutReceive2(String message) {
- System.out.println("fanoutReceive2监听器接收的消息:"+message);
- }
-
- @RabbitListener(bindings = {
- @QueueBinding(value = @Queue("topic1"),
- key = {"aa"},
- exchange = @Exchange(name="topicExchange",type = "topic")
- )})
- public void topicReceive1(String message){
- System.out.println("topic1消费者---aa---"+message);
- }
-
- @RabbitListener(bindings = {
- @QueueBinding(value = @Queue("topic2"),
- key = {"aa.*"},
- exchange = @Exchange(name="topicExchange",type = "topic")
- )})
- public void topicReceive2(String message){
- System.out.println("topic2消费者---aa---"+message);
- }
-
- @RabbitListener(bindings = {
- @QueueBinding(value = @Queue("topic3"),
- key = {"aa.#"},
- exchange = @Exchange(name="topicExchange",type = "topic")
- )})
- public void topicReceive3(String message){
- System.out.println("topic3消费者---aa---"+message);
- }
-
-
- }

1.SendService接口
- package com.it.rabbitmq;
-
- public interface SendService {
- void sendMessage(String message);
- void sendFanoutMessage(String message);
- void sendTopicMessage(String message);
- }
SendServiceImpl类
- package com.it.rabbitmq.impl;
-
- import com.it.rabbitmq.SendService;
- import org.springframework.amqp.core.AmqpTemplate;
- import org.springframework.stereotype.Service;
-
- import javax.annotation.Resource;
-
- @Service("sendService")
- public class SendServiceImpl implements SendService {
- //注入amqp的模板类,利用这个对象来发送和接收消息
- @Resource
- private AmqpTemplate amqpTemplate;
-
- @Override
- public void sendMessage(String message) {
- /**
- * 发送消息
- * 参数1为交换机名称
- * 参数2位RoutingKey
- * 参数3为具体发送的消息数据
- */
- amqpTemplate.convertAndSend("bootDirectExchange", "bootDirectRouting", message);
- }
-
- @Override
- public void sendFanoutMessage(String message) {
- amqpTemplate.convertAndSend("fanoutExchange","",message);
- }
-
- @Override
- public void sendTopicMessage(String message) {
- amqpTemplate.convertAndSend("topicExchange","",message);
- }
-
- }
2.RabbitMQConfig类,提前声明一个topic的交换机
- package com.it.rabbitmq.config;
-
- import org.springframework.amqp.core.*;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- @Configuration
- public class RabbitMQConfig {
-
- //配置一个direct类型的交换机
- @Bean
- public DirectExchange directExchange() {
- return new DirectExchange("bootDirectExchange");
-
- }
-
- //配置一个队列
- @Bean
- public Queue directQueue() {
- return new Queue("bootDirectQueue");
- }
-
- /**
- * 配置一个交换机和队列的绑定
- *
- * @param directQueue 需要绑定的队列对象,参数名必须要与某个@Bean的方法名完全相同这样就会自动注入
- * @param directQueue 需要绑定的交换机对象,参数名必须要与某个@Bean的方法名完全相同这样就会自动注入
- * @return
- */
- @Bean
- public Binding directBinding(Queue directQueue, DirectExchange directExchange) {
- //完成绑定:参数1为需要绑定的队列,参数2为需要绑定的交换机,参数3为需要绑定的routingkey
- return BindingBuilder.bind(directQueue).to(directExchange).with("bootDirectRouting");
- }
-
- //配置一个fanout类型的交换机
- @Bean
- public FanoutExchange fanoutExchange() {
- return new FanoutExchange("fanoutExchange");
- }
-
- //配置一个topic类型的交换机
- @Bean
- public TopicExchange topicExchange() {
- return new TopicExchange("topicExchange");
- }
-
- }
3.运行主函数
- package com.it;
-
- import com.it.rabbitmq.SendService;
- import org.springframework.boot.SpringApplication;
- import org.springframework.boot.autoconfigure.SpringBootApplication;
- import org.springframework.context.ApplicationContext;
-
- @SpringBootApplication
- public class RabbitmqSpringbootApplication {
-
- public static void main(String[] args) {
-
- ApplicationContext ac = SpringApplication.run(RabbitmqSpringbootApplication.class, args);
- SendService sendService = (SendService) ac.getBean("sendService");
- // sendService.sendMessage("boot的测试数据");
-
- // sendService.sendFanoutMessage("boot的fanout测试数据!");
-
- sendService.sendTopicMessage("boot的topic测试数据,key:aa");
- }
-
- }
功能测试:

查看接收类

修改发送信息的routingkey


修改发送信息的routingkey
