目录
二、利用SpringAMQP实现HelloWorld中的基础消息队列功能
1、因为publisher和consumer服务都需要amqp依赖,因此这里把依赖直接放到父工程mq-demo中
三、在consumer中编写消费逻辑,监听simple.queue
(一)利用SpringAMQP演示FanoutExchange的使用
AMQP是一种高级消息队列协议。
SpringAMQP是基于Spring Framework的AMQP扩展,提供了一个抽象层,使得使用AMQP进行消息传递变得更加简单。
SpringAMQP支持多种消息传递模式,包括点对点、发布/订阅和请求/响应等。
SpringAMQP提供了许多高级功能,例如队列管理、消息确认、事务和消息过滤等。
SpringAMQP提供了集成测试工具和基于Spring Boot的自动配置,使得集成AMQP变得更加容易。
总之,SpringAMQP是一个灵活、可扩展的AMQP实现,它使得使用消息队列时变得更加容易和高效。
- <!-- AMQP依赖,包含RabbitMQ-->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
- logging:
- pattern:
- dateformat: MM-dd HH:mm:ss:SSS
- spring:
- rabbitmq:
- host: 192.168.248.152
- port: 5672
- virtual-host: /
- username: itcast
- password: 123456
-
- @RunWith(SpringRunner.class)
- @SpringBootTest
- public class SpringAmqpTest {
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- @Test
- public void testSendMassage(){
- String queue = "simple.queue";
- String massage="aaaaaaa";
- rabbitTemplate.convertAndSend(queue,massage);
- }
- }
- logging:
- pattern:
- dateformat: MM-dd HH:mm:ss:SSS
- spring:
- rabbitmq:
- host: 192.168.248.152
- port: 5672
- virtual-host: /
- username: itcast
- password: 123456
-
- package cn.itcast.mq.listener;
-
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
-
- @Component
- public class SpringRabbitListener {
- @RabbitListener(queues = "simple.queue")
- public void listenSimpleQueue(String msg){
- System.out.println("消费者接收到消息:"+msg);
- }
- }
注意:
消息一旦消费就会从队列删除,RabbitMQ没有消息回溯功能
- @RunWith(SpringRunner.class)
- @SpringBootTest
- public class SpringAmqpTest {
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- @Test
- public void testSendMassage(){
- String queue = "simple.queue";
- String massage="HelloWorld";
-
- for (int i = 0; i < 50; i++) {
- rabbitTemplate.convertAndSend(queue,massage);
- }
- }
- }
- @Component
- public class SpringRabbitListener {
- @RabbitListener(queues = "simple.queue")
- public void listenSimpleQueue(String msg){
- System.out.println("消费者0接收到消息:"+msg+ LocalTime.now());
- try {
- Thread.sleep(20);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
-
- @RabbitListener(queues = "simple.queue")
- public void listenSimpleQueue1(String msg){
- System.err.println("消费者1接收到消息__________-:"+msg+ LocalTime.now());
- try {
- Thread.sleep(200);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
- }
我们发现,虽然消费者0更快,但是它并没有承担更多的工作量;
这是因为消费预取机制会让消费者事先分配好要处理的消息,而不是按能力分配;
可以在yml文件中修改
- listener:
- simple:
- prefetch: 1 #表示预取上限为1
- @Configuration
- public class FanoutConfig {
- ///1
- @Bean
- public FanoutExchange fanoutExchange(){
- return new FanoutExchange("itcast.fanout");
- }
-
- @Bean
- public Queue fanoutQueue1(){
- return new Queue("fanout.queue1");
- }
-
- @Bean
- public Binding bindingQueue1(FanoutExchange exchange,Queue fanoutQueue1){
- return BindingBuilder.bind(fanoutQueue1).to(exchange);
- }
-
- ///2
- @Bean
- public Queue fanoutQueue2(){
- return new Queue("fanout.queue2");
- }
-
- @Bean
- public Binding bindingQueue2(FanoutExchange exchange,Queue fanoutQueue2){
- return BindingBuilder.bind(fanoutQueue2).to(exchange);
- }
- }
绑定成功
- @RabbitListener(queues = "fanout.queue1")
- public void listenFanoutQueue1(String msg){
- System.err.println("消费者1接收到消息__________-:"+msg+ LocalTime.now());
- }
-
- @RabbitListener(queues = "fanout.queue2")
- public void listenFanoutQueue2(String msg){
- System.err.println("消费者2接收到消息__________-:"+msg+ LocalTime.now());
- }
- @Test
- public void sendFanoutMassage(){
- String exchangeName = "itcast.fanout";
- String message = "Hello Every One";
- rabbitTemplate.convertAndSend(exchangeName,"",message);
- }
发现两个消费者都接收到了消息
实现:
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "direct.queue2"),
- exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT),
- key = {"red","blue"}
- ))
- public void listenDirectQueue1(String msg){
- System.err.println("消费者1接收到消息__________-:"+msg+ LocalTime.now());
- }
-
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "direct.queue2"),
- exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT),
- key = {"red","yellow"}
- ))
- public void listenDirectQueue2(String msg){
- System.out.println("消费者2接收到消息__________-:"+msg+ LocalTime.now());
- }
- @Test
- public void sendDirectMassage(){
- String exchangeName = "itcast.direct";
- String message = "Hello Every One1111";
- rabbitTemplate.convertAndSend(exchangeName,"blue",message);
- }
- @Test
- public void sendDirectMassage(){
- String exchangeName = "itcast.direct";
- String message = "Hello Every One1111";
- rabbitTemplate.convertAndSend(exchangeName,"red",message);
- }
基于@RabbitListener注解声明队列和交换机有哪些常见注解
利用SpringAMQP演示TopicExchange的使用
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "topic.queue1"),
- exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC),
- key = "china.#"
- ))
- public void listenTopicQueue1(String msg){
- System.out.println("消费者1接收到消息aaaaaa__-:"+msg+ LocalTime.now());
- }
-
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "topic.queue2"),
- exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC),
- key = "#.news"
- ))
- public void listenTopicQueue2(String msg){
- System.err.println("消费者2接收到消息a__-:"+msg+ LocalTime.now());
- }
- @Test
- public void sendTopicMassage(){
- String exchangeName = "itcast.topic";
- String message = "Hello Every One12222";
- rabbitTemplate.convertAndSend(exchangeName,"china.news",message);
- }
- @Bean
- public Queue objectQueue(){
- return new Queue("object.queue");
- }
- @Test
- public void sendObjectMassage(){
- Map
message = new HashMap<>(); - message.put("name","11");
- message.put("age","22");
- rabbitTemplate.convertAndSend("object.queue",message);
- }
对象被序列化了,这种方式性能差,不安全(容易被注入)
引入依赖
-
-
com.fasterxml.jackson.core -
jackson-databind -
添加配置Bean
- @Bean
- public Jackson2JsonMessageConverter messageConverter(){
- return new Jackson2JsonMessageConverter();
- }
编写配置Bean
- @Bean
- public Jackson2JsonMessageConverter messageConverter(){
- return new Jackson2JsonMessageConverter();
- }
编写消费者代码
- @RabbitListener(queues = "object.queue")
- public void listenObjectQueue1(Map
msg) { - System.err.println("消费者接收到消息___da_______-:"+msg+ LocalTime.now());
- }
验证
注意: