• Spring Boot集成RabbitMQ快速入门Demo


    1.什么是RabbitMQ

    RabbitMQ是一款使用Erlang语言开发的,基于AMQP协议的消息中间件,作为一款优秀的消息系统,RabbitMQ有高并发、可扩展等优势,并适用于大型系统中各个模块之间的通信。

    RabbitMQ的特点为:

    • 持久化、传输确认、发布确认等功能保证消息可靠

    • 支持多种消息分发模式,处理更加灵活

    • 提供可视化管理界面,使用方便

    • 支持集群部署,保证服务高可用

    2.RabbitMQ环境搭建

    1. version: '3'
    2. services:
    3. rabbitmq:
    4. image: registry.cn-hangzhou.aliyuncs.com/zhengqing/rabbitmq:3.7.8-management # 原镜像`rabbitmq:3.7.8-management` 【 注:该版本包含了web控制页面 】
    5. container_name: rabbitmq # 容器名为'rabbitmq'
    6. hostname: my-rabbit
    7. restart: unless-stopped # 指定容器退出后的重启策略为始终重启,但是不考虑在Docker守护进程启动时就已经停止了的容器
    8. environment: # 设置环境变量,相当于docker run命令中的-e
    9. TZ: Asia/Shanghai
    10. LANG: en_US.UTF-8
    11. RABBITMQ_DEFAULT_VHOST: my_vhost # 主机名
    12. RABBITMQ_DEFAULT_USER: admin # 登录账号
    13. RABBITMQ_DEFAULT_PASS: admin # 登录密码
    14. volumes: # 数据卷挂载路径设置,将本机目录映射到容器目录
    15. - "./rabbitmq/data:/var/lib/rabbitmq"
    16. ports: # 映射端口
    17. - "5672:5672"
    18. - "15672:15672"

    运行

    docker-compose -f docker-compose-rabbitmq.yml -p rabbitmq up -d

    web管理端:http://127.0.0.1:15672 登录账号密码:admin/admin

    27a7e34328fda7e39066d77e99a7d4a8.png

    3.代码工程

    实验目的:实现通过rabbitmq发送和接收消息

    pom.xml

    1. "1.0" encoding="UTF-8"?>
    2. <project xmlns="http://maven.apache.org/POM/4.0.0"
    3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    5. <parent>
    6. <artifactId>springboot-demoartifactId>
    7. <groupId>com.etgroupId>
    8. <version>1.0-SNAPSHOTversion>
    9. parent>
    10. <modelVersion>4.0.0modelVersion>
    11. <artifactId>rabbitmqartifactId>
    12. <properties>
    13. <maven.compiler.source>8maven.compiler.source>
    14. <maven.compiler.target>8maven.compiler.target>
    15. properties>
    16. <dependencies>
    17. <dependency>
    18. <groupId>org.springframework.bootgroupId>
    19. <artifactId>spring-boot-starter-webartifactId>
    20. dependency>
    21. <dependency>
    22. <groupId>org.springframework.bootgroupId>
    23. <artifactId>spring-boot-autoconfigureartifactId>
    24. dependency>
    25. <dependency>
    26. <groupId>org.springframework.bootgroupId>
    27. <artifactId>spring-boot-starter-testartifactId>
    28. <scope>testscope>
    29. dependency>
    30. <dependency>
    31. <groupId>org.springframework.bootgroupId>
    32. <artifactId>spring-boot-starter-amqpartifactId>
    33. dependency>
    34. <dependency>
    35. <groupId>org.springframework.bootgroupId>
    36. <artifactId>spring-boot-starter-amqpartifactId>
    37. dependency>
    38. <dependency>
    39. <groupId>org.projectlombokgroupId>
    40. <artifactId>lombokartifactId>
    41. dependency>
    42. dependencies>
    43. project>

    application.properties

    1. server.port=8088
    2. #rabbitmq
    3. spring.rabbitmq.host=localhost
    4. spring.rabbitmq.port=5672
    5. spring.rabbitmq.username=admin
    6. spring.rabbitmq.password=admin
    7. spring.rabbitmq.virtual-host=my_vahost

    config

    简单使用

    1. package com.et.rabbitmq.config;
    2. import org.springframework.amqp.core.Queue;
    3. import org.springframework.context.annotation.Bean;
    4. import org.springframework.context.annotation.Configuration;
    5. @Configuration
    6. public class RabbitConfig {
    7. @Bean
    8. public Queue Queue() {
    9. return new Queue("hello");
    10. }
    11. }

    topic 是RabbitMQ中最灵活的一种方式,可以根据routing_key自由的绑定不同的队列 首先对topic规则配置,这里使用两个队列来测试

    1. package com.et.rabbitmq.config;
    2. import org.springframework.amqp.core.Binding;
    3. import org.springframework.amqp.core.BindingBuilder;
    4. import org.springframework.amqp.core.Queue;
    5. import org.springframework.amqp.core.TopicExchange;
    6. import org.springframework.context.annotation.Bean;
    7. import org.springframework.context.annotation.Configuration;
    8. @Configuration
    9. public class TopicRabbitConfig {
    10. public final static String TOPIC_ONE = "topic.one";
    11. public final static String TOPIC_TWO = "topic.two";
    12. public final static String TOPIC_EXCHANGE = "topicExchange";
    13. @Bean
    14. public Queue queue_one(){
    15. return new Queue(TOPIC_ONE);
    16. }
    17. @Bean
    18. public Queue queue_two(){
    19. return new Queue(TOPIC_TWO);
    20. }
    21. @Bean
    22. TopicExchange exchange(){
    23. return new TopicExchange(TOPIC_EXCHANGE);
    24. }
    25. @Bean
    26. Binding bindingExchangeOne(Queue queue_one, TopicExchange exchange){
    27. return BindingBuilder.bind(queue_one).to(exchange).with("topic.one");
    28. }
    29. @Bean
    30. Binding bindingExchangeTwo(Queue queue_two, TopicExchange exchange){
    31. //# 表示零个或多个词
    32. //* 表示一个词
    33. return BindingBuilder.bind(queue_two).to(exchange).with("topic.#");
    34. }
    35. }

    Fanout 就是我们熟悉的广播模式或者订阅模式,给Fanout交换机发送消息,绑定了这个交换机的所有队列都收到这个消息。

    1. package com.et.rabbitmq.config;
    2. import org.springframework.amqp.core.Binding;
    3. import org.springframework.amqp.core.BindingBuilder;
    4. import org.springframework.amqp.core.FanoutExchange;
    5. import org.springframework.amqp.core.Queue;
    6. import org.springframework.context.annotation.Bean;
    7. import org.springframework.context.annotation.Configuration;
    8. @Configuration
    9. public class FanoutRabbitConfig {
    10. @Bean
    11. public Queue AMessage() {
    12. return new Queue("fanout.A");
    13. }
    14. @Bean
    15. public Queue BMessage() {
    16. return new Queue("fanout.B");
    17. }
    18. @Bean
    19. public Queue CMessage() {
    20. return new Queue("fanout.C");
    21. }
    22. @Bean
    23. FanoutExchange fanoutExchange() {
    24. return new FanoutExchange("fanoutExchange");
    25. }
    26. @Bean
    27. Binding bindingExchangeA(Queue AMessage, FanoutExchange fanoutExchange) {
    28. return BindingBuilder.bind(AMessage).to(fanoutExchange);
    29. }
    30. @Bean
    31. Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
    32. return BindingBuilder.bind(BMessage).to(fanoutExchange);
    33. }
    34. @Bean
    35. Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
    36. return BindingBuilder.bind(CMessage).to(fanoutExchange);
    37. }
    38. }

    receiver

    1. package com.et.rabbitmq.receiver;
    2. import lombok.extern.slf4j.Slf4j;
    3. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
    5. import org.springframework.messaging.handler.annotation.Payload;
    6. import org.springframework.stereotype.Component;
    7. import org.springframework.stereotype.Service;
    8. @Service
    9. @Slf4j
    10. public class HelloReceiver {
    11. @RabbitListener(queues = "hello")
    12. public void process(String hello) {
    13. System.out.println("Receiver : " + hello);
    14. }
    15. @RabbitListener(queues = {"topic.one"})
    16. public void receiveTopic1(@Payload String fileBody) {
    17. log.info("topic1:" + fileBody);
    18. }
    19. @RabbitListener(queues = {"topic.two"})
    20. public void receiveTopic2(@Payload String fileBody) {
    21. log.info("topic2:" + fileBody);
    22. }
    23. @RabbitListener(queues = {"fanout.A"})
    24. public void fanoutA(@Payload String fileBody) {
    25. log.info("fanoutA:" + fileBody);
    26. }
    27. @RabbitListener(queues = {"fanout.B"})
    28. public void fanoutB(@Payload String fileBody) {
    29. log.info("fanoutB:" + fileBody);
    30. }
    31. @RabbitListener(queues = {"fanout.C"})
    32. public void fanoutC(@Payload String fileBody) {
    33. log.info("fanoutC:" + fileBody);
    34. }
    35. }

    sender

    1. package com.et.rabbitmq.sender;
    2. import org.springframework.amqp.core.AmqpTemplate;
    3. import org.springframework.beans.factory.annotation.Autowired;
    4. import org.springframework.stereotype.Component;
    5. import java.util.Date;
    6. @Component
    7. public class HelloSender {
    8. @Autowired
    9. private AmqpTemplate rabbitTemplate;
    10. public void send() {
    11. String context = "hello " + new Date();
    12. System.out.println("Sender : " + context);
    13. this.rabbitTemplate.convertAndSend("hello", context);
    14. }
    15. }
    1. package com.et.rabbitmq.sender;
    2. import com.et.rabbitmq.config.TopicRabbitConfig;
    3. import org.springframework.amqp.core.AmqpTemplate;
    4. import org.springframework.beans.factory.annotation.Autowired;
    5. import org.springframework.stereotype.Component;
    6. @Component
    7. public class TopicSender {
    8. @Autowired
    9. private AmqpTemplate rabbitTemplate;
    10. //两个消息接受者都可以收到
    11. public void send_one() {
    12. String context = "Hi, I am message one";
    13. System.out.println("Sender : " + context);
    14. this.rabbitTemplate.convertAndSend(TopicRabbitConfig.TOPIC_EXCHANGE,"topic.one",context);
    15. }
    16. //只有TopicReceiverTwo都可以收到
    17. public void send_two() {
    18. String context = "Hi, I am message two";
    19. System.out.println("Sender : " + context);
    20. this.rabbitTemplate.convertAndSend(TopicRabbitConfig.TOPIC_EXCHANGE,"topic.two",context);
    21. }
    22. }

    DemoApplication.java

    1. package com.et.quartz;
    2. import org.springframework.boot.SpringApplication;
    3. import org.springframework.boot.autoconfigure.SpringBootApplication;
    4. @SpringBootApplication
    5. public class DemoApplication {
    6. public static void main(String[] args) {
    7. SpringApplication.run(DemoApplication.class, args);
    8. }
    9. }

    以上只是一些关键代码,所有代码请参见下面代码仓库

    代码仓库

    • https://github.com/Harries/springboot-demo

    4.测试

    简单使用

    1. @Test
    2. public void hello() throws Exception {
    3. helloSender.send();
    4. Thread.sleep(50000);
    5. }

    Topic Exchange

    1. @Test
    2. public void topicOne() throws Exception {
    3. topicSender.send_one();
    4. Thread.sleep(50000);
    5. }
    6. @Test
    7. public void topicTwo() throws Exception {
    8. topicSender.send_two();
    9. Thread.sleep(50000);
    10. }

    Fanout Exchange

    1. @Test
    2. public void sendFanout() throws InterruptedException {
    3. String context = "hi, fanout msg ";
    4. System.out.println("Sender : " + context);
    5. this.rabbitTemplate.convertAndSend("fanoutExchange","", context);
    6. Thread.sleep(50000);
    7. }

    5.参考连接

    • https://www.rabbitmq.com/

    • https://spring.io/projects/spring-amqp

  • 相关阅读:
    2023 彩虹全新 SUP 模板,卡卡云模板修复版
    数据库 explain 关键字解析
    Nginx
    理解C#里面的集合有哪些?怎么用,什么是安全集合?
    java基于ssm+Vue儿童福利院管理系统 element 前后端分离
    Java两周半速成之路(第十天)
    51单片机数字电压表仿真设计_LCD显示(仿真+程序+原理图+PCB+设计报告+讲解)
    计算机考研考研院校难度等级,建议收藏
    如何测试服务器的速度
    Fabric.js在vue2中使用
  • 原文地址:https://blog.csdn.net/dot_life/article/details/138174172