• 初识RabbitMQ-概念、安装和使用


     摘要:整理列下RabbitMQ相关的内容,包括基本概念、安装(Linux和MacOS)和使用,记录下...

            消息队列看作是一个按照顺序存放消息的容器,当使用消息的时候,直接从容器中按照顺序取出消息使用即可。对系统而言,消息队列的作用主要体现为如下三点

    1. 通过异步处理提高系统性能(减少响应所需时间)

      用户请求数据存入消息队列之后,立即返回结果,异步的对消息进行消费。需要注意的是后续的处理可能会失败。

    2. 削峰/限流

      将短时间高并发产生的事务消息存储在消息队列中,避免对系统产生的冲击

    3. 降低系统耦合性。

      消息队列使用发布-订阅模式工作,消息发送者(生产者)发布消息,一个或多个消息接受者(消费者)订阅消息,生产者和消费者之间不存在直接耦合。

    消息队列基础知识总结 | JavaGuide「Java学习 + 面试指南」一份涵盖大部分 Java 程序员所需要掌握的核心知识。准备 Java 面试,首选 JavaGuide!icon-default.png?t=N7T8https://javaguide.cn/high-performance/message-queue/message-queue.html

    1. RabbitMQ初识

            RabbitMQ 是采用 Erlang 语言实现 AMQP(Advanced Message Queuing Protocol,高级消息队列协议)的消息中间件,最初起源于金融系统,用于在分布式系统中存储转发消息。RabbitMQ 整体上是一个生产者与消费者模型,主要负责接收、存储和转发消息。

            RabbitMQ具有特点如下:可靠性、灵活的路由、扩展性、高可用性、支持多种协议、多语言客户端、易用的管理界面和插件机制

    具体介绍可参阅官网介绍:

    RabbitMQ: One broker to queue them all | RabbitMQicon-default.png?t=N7T8https://www.rabbitmq.com/

                                                     RabbitMQ 的整体模型架构如下

            RabbitMQ的核心概念

    • Producer: 消息生产者,就是投递消息的一方;
    • Message:消息一般分成消息体(playLoad)和消息头(Label)两部分;
    • Exchange:交换机用来接收生产者发送的消息并将这些消息路由给服务器中的队列中,如果路由不到,或许会返回给 Producer(生产者),或许会被直接丢弃掉。RabbitMQ交换机有4中类型对应不同的路由策略direct(默认),fanout, topic, 和 headers,后面介绍。
    • Binding:绑定交换机和队列,在绑定的时候一般会指定一个 BindingKey(绑定建)。注意BindingKey 并不是在所有的情况下都生效,它依赖于交换器类型,fanout类型就会无视BindingKey。
    • Routing key: 路由键,决定路由规则
    • Queue:队列,存储消息直到发送给消费者。RabbitMQ 中消息只能存储在 队列 中,这一点和 Kafka 这种消息中间件相反。注意多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊(Round-Robin,即轮询)给多个消费者进行处理。
    • Connection:连接服务端
    • Channel:信道,读写数据
    • Consumer:消费者,也就是接收消息的一方;其连接到 RabbitMQ 服务器,并订阅到队列上。消费消息时只消费消息体,丢弃标签。
    • Broker:服务实例/节点,大多数情况下也可以将一个 RabbitMQ Broker 看作一台 RabbitMQ 服务器。
    • Virtual host:虚拟主机,用于区分不同的服务,类似于不同域名,不会相互影响

    2. RabbitMQ安装

    2.1 Linux下安装RabbitMQ

            建议在一个新建的阿里云的Cent OS 7.6上安装,不要对yum换源,否则可能会安装失败。实际使用CentOS 7.9 依然OK

    • 设置编码为UTF-8
    1. ## 命令解析
    2. ## echo命令用于输出文本到标准输出。
    3. ## "export LC_ALL=en_US.UTF-8" 是要输出的文本,这里设置了LC_ALL环境变量的值为en_US.UTF-8,表示使用UTF-8编码作为默认的语言环境。
    4. ## >> /etc/profile将输出内容追加到/etc/profile文件的末尾。在这里,/etc/profile是一个系统级的shell配置文件,它在用户登录时被执行。
    5. [root@iZuf6eqvrwmawrgxqo5kw7Z ~]# echo "export LC_ALL=en_US.UTF-8" >> /etc/profile
    6. ## 命令解析
    7. ## source命令用于在当前shell环境中**执行指定的脚本文件**,这里是执行/etc/profile文件。
    8. ## 执行source /etc/profile可以立即使新设置的环境变量生效,而不需要重新登录系统。
    9. [root@iZuf6eqvrwmawrgxqo5kw7Z ~]# source /etc/profile
    • 下载安装 RabbitMQ 的脚本(script.rpm.sh)并通过bash执行
    1. ## 命令解析
    2. ## curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh: curl命令是一个用于在命令行下传输数据的工具。-s 参数表示静默模式,不输出进度或错误信息。https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh是一个URL,指向了一个脚本文件。
    3. ## |: 管道符号将前一个命令的输出作为后一个命令的输入。
    4. ## sudo bash: 使用了sudo命令以超级用户的权限执行后面的命令。bash 是一个Unix shell,也是一个命令行解释器,它用于执行脚本
    5. [root@iZuf6eqvrwmawrgxqo5kw7Z ~]# curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | sudo bash
    6. Detected operating system as centos/7.
    7. Checking for curl...
    8. Detected curl...
    9. Downloading repository file: https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/config_file.repo?os=centos&dist=7&source=script
    10. done.
    11. Installing pygpgme to verify GPG signatures...
    12. Loaded plugins: fastestmirror
    13. Loading mirror speeds from cached hostfile
    14. * base: mirrors.aliyun.com
    15. * extras: mirrors.aliyun.com
    16. * updates: mirrors.aliyun.com
    17. base | 3.6 kB 00:00:00
    18. docker-ce-stable | 3.5 kB 00:00:00
    19. epel | 4.7 kB 00:00:00
    20. extras | 2.9 kB 00:00:00
    21. mysql-connectors-community | 2.6 kB 00:00:00
    22. mysql-tools-community | 2.6 kB 00:00:00
    23. mysql57-community | 2.6 kB 00:00:00
    24. nginx-stable | 2.9 kB 00:00:00
    25. rabbitmq_rabbitmq-server-source/signature | 836 B 00:00:00
    26. Retrieving key from https://packagecloud.io/rabbitmq/rabbitmq-server/gpgkey
    27. Importing GPG key 0x4D206F89:
    28. Userid : "https://packagecloud.io/rabbitmq/rabbitmq-server (https://packagecloud.io/docs#gpg_signing) "
    29. Fingerprint: 8c69 5b02 19af deb0 4a05 8ed8 f4e7 8920 4d20 6f89
    30. From : https://packagecloud.io/rabbitmq/rabbitmq-server/gpgkey
    31. rabbitmq_rabbitmq-server-source/signature | 1.0 kB 00:00:00 !!!
    32. updates | 2.9 kB 00:00:00
    33. (1/3): epel/x86_64/updateinfo | 1.0 MB 00:00:00
    34. (2/3): epel/x86_64/primary_db | 7.0 MB 00:00:00
    35. (3/3): docker-ce-stable/7/x86_64/primary_db | 142 kB 00:00:00
    36. rabbitmq_rabbitmq-server-source/primary | 175 B 00:00:02
    37. Package pygpgme-0.3-9.el7.x86_64 already installed and latest version
    38. Nothing to do
    39. Installing yum-utils...
    40. Loaded plugins: fastestmirror
    41. Loading mirror speeds from cached hostfile
    42. * base: mirrors.aliyun.com
    43. * extras: mirrors.aliyun.com
    44. * updates: mirrors.aliyun.com
    45. Package yum-utils-1.1.31-54.el7_8.noarch already installed and latest version
    46. Nothing to do
    47. Generating yum cache for rabbitmq_rabbitmq-server...
    48. Importing GPG key 0x4D206F89:
    49. Userid : "https://packagecloud.io/rabbitmq/rabbitmq-server (https://packagecloud.io/docs#gpg_signing) "
    50. Fingerprint: 8c69 5b02 19af deb0 4a05 8ed8 f4e7 8920 4d20 6f89
    51. From : https://packagecloud.io/rabbitmq/rabbitmq-server/gpgkey
    52. Generating yum cache for rabbitmq_rabbitmq-server-source...
    53. The repository is setup! You can now install packages.
    1. ## 命令解析
    2. ## curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh: 这部分使用了curl命令,同样是用于从指定的URL下载数据。-s参数表示静默模式,不输出进度或错误信息。https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh是一个URL,指向了一个脚本文件。
    3. ## |: 管道符号,它的作用是将前一个命令的输出作为后一个命令的输入。
    4. ## sudo bash: sudo命令以超级用户的权限执行后面的命令。bash则是用于执行脚本的命令行解释器。
    5. [root@iZuf6eqvrwmawrgxqo5kw7Z ~]# curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash
    6. Detected operating system as centos/7.
    7. Checking for curl...
    8. Detected curl...
    9. Downloading repository file: https://packagecloud.io/install/repositories/rabbitmq/erlang/config_file.repo?os=centos&dist=7&source=script
    10. done.
    11. Installing pygpgme to verify GPG signatures...
    12. Loaded plugins: fastestmirror
    13. Loading mirror speeds from cached hostfile
    14. * base: mirrors.aliyun.com
    15. * extras: mirrors.aliyun.com
    16. * updates: mirrors.aliyun.com
    17. rabbitmq_erlang-source/signature | 819 B 00:00:00
    18. Retrieving key from https://packagecloud.io/rabbitmq/erlang/gpgkey
    19. Importing GPG key 0xDF309A0B:
    20. Userid : "https://packagecloud.io/rabbitmq/erlang (https://packagecloud.io/docs#gpg_signing) "
    21. Fingerprint: 2ebd e413 d3ce 5d35 bcd1 5b7c 71c6 3471 df30 9a0b
    22. From : https://packagecloud.io/rabbitmq/erlang/gpgkey
    23. rabbitmq_erlang-source/signature | 951 B 00:00:00 !!!
    24. rabbitmq_rabbitmq-server/x86_64/signature | 833 B 00:00:00
    25. rabbitmq_rabbitmq-server/x86_64/signature | 1.8 kB 00:00:00 !!!
    26. rabbitmq_erlang-source/primary | 175 B 00:00:02
    27. Package pygpgme-0.3-9.el7.x86_64 already installed and latest version
    28. Nothing to do
    29. Installing yum-utils...
    30. Loaded plugins: fastestmirror
    31. Loading mirror speeds from cached hostfile
    32. * base: mirrors.aliyun.com
    33. * extras: mirrors.aliyun.com
    34. * updates: mirrors.aliyun.com
    35. rabbitmq_rabbitmq-server-source/signature | 836 B 00:00:00
    36. rabbitmq_rabbitmq-server-source/signature | 1.0 kB 00:00:00 !!!
    37. Package yum-utils-1.1.31-54.el7_8.noarch already installed and latest version
    38. Nothing to do
    39. Generating yum cache for rabbitmq_erlang...
    40. Importing GPG key 0xDF309A0B:
    41. Userid : "https://packagecloud.io/rabbitmq/erlang (https://packagecloud.io/docs#gpg_signing) "
    42. Fingerprint: 2ebd e413 d3ce 5d35 bcd1 5b7c 71c6 3471 df30 9a0b
    43. From : https://packagecloud.io/rabbitmq/erlang/gpgkey
    44. Generating yum cache for rabbitmq_erlang-source...
    45. The repository is setup! You can now install packages.
    • 安装指定版本的 RabbitMQ 软件包
    1. ## 命令解析
    2. ## rabbitmq-server-3.8.2-1.el7.noarch: 这是要安装的软件包的名称和版本。在这里,rabbitmq-server是软件包的名称,3.8.2-1.el7.noarch是软件包的版本。3.8.2是RabbitMQ的版本号,1.el7表示适用于CentOS 7版本的软件包,noarch表示这个软件包是与特定架构无关的,可以在任何架构的系统上运行
    3. [root@iZuf6eqvrwmawrgxqo5kw7Z ~]# sudo yum install rabbitmq-server-3.8.2-1.el7.noarch
    4. Loaded plugins: fastestmirror
    5. Loading mirror speeds from cached hostfile
    6. * base: mirrors.aliyun.com
    7. * extras: mirrors.aliyun.com
    8. * updates: mirrors.aliyun.com
    9. Resolving Dependencies
    10. --> Running transaction check
    11. ---> Package rabbitmq-server.noarch 0:3.8.2-1.el7 will be installed
    12. --> Processing Dependency: erlang >= 21.3 for package: rabbitmq-server-3.8.2-1.el7.noarch
    13. --> Running transaction check
    14. ---> Package erlang.x86_64 0:23.3.4.11-1.el7 will be installed
    15. --> Finished Dependency Resolution
    16. Dependencies Resolved
    17. ====================================================================================================================================================================================================
    18. Package Arch Version Repository Size
    19. ====================================================================================================================================================================================================
    20. Installing:
    21. rabbitmq-server noarch 3.8.2-1.el7 rabbitmq_rabbitmq-server 12 M
    22. Installing for dependencies:
    23. erlang x86_64 23.3.4.11-1.el7 rabbitmq_erlang 19 M
    24. Transaction Summary
    25. ====================================================================================================================================================================================================
    26. Install 1 Package (+1 Dependent package)
    27. Total download size: 31 M
    28. Installed size: 47 M
    29. **Is this ok [y/d/N]: y**
    30. Downloading packages:
    31. (1/2): erlang-23.3.4.11-1.el7.x86_64.rpm | 19 MB 00:00:03
    32. (2/2): rabbitmq-server-3.8.2-1.el7.noarch.rpm | 12 MB 00:00:04
    33. ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
    34. Total 7.6 MB/s | 31 MB 00:00:04
    35. Running transaction check
    36. Running transaction test
    37. Transaction test succeeded
    38. Running transaction
    39. Installing : erlang-23.3.4.11-1.el7.x86_64 1/2
    40. Installing : rabbitmq-server-3.8.2-1.el7.noarch 2/2
    41. Verifying : erlang-23.3.4.11-1.el7.x86_64 1/2
    42. Verifying : rabbitmq-server-3.8.2-1.el7.noarch 2/2
    43. Installed:
    44. rabbitmq-server.noarch 0:3.8.2-1.el7
    45. Dependency Installed:
    46. erlang.x86_64 0:23.3.4.11-1.el7
    47. Complete!
    • 安装完成后启动
    1. ## 启动RabbitMQ服务
    2. ## systemctl: 是一个系统服务管理工具,用于管理系统的服务。它可以启动、停止、重启和管理系统服务的状态;
    3. ## start: systemctl命令的一个子命令,用于启动指定的系统服务;
    4. ## rabbitmq-server: 这是要启动的服务的名称,即RabbitMQ。
    5. [root@iZuf6eqvrwmawrgxqo5kw7Z ~]# systemctl start rabbitmq-server
    6. ## 检查RabbitMQ的状态
    7. [root@iZuf6eqvrwmawrgxqo5kw7Z ~]# rabbitmqctl status
    8. Status of node rabbit@iZuf6eqvrwmawrgxqo5kw7Z ...
    9. Runtime
    10. OS PID: 14601
    11. OS: Linux
    12. Uptime (seconds): 24
    13. RabbitMQ version: 3.8.2
    14. Node name: rabbit@iZuf6eqvrwmawrgxqo5kw7Z
    15. Erlang configuration: Erlang/OTP 23 [erts-11.2.2.10] [source] [64-bit] [smp:2:2] [ds:2:2:10] [async-threads:64] [hipe]
    16. Erlang processes: 264 used, 1048576 limit
    17. Scheduler run queue: 1
    18. Cluster heartbeat timeout (net_ticktime): 60
    19. Plugins
    20. Enabled plugin file: /etc/rabbitmq/enabled_plugins
    21. Enabled plugins:
    22. Data directory
    23. Node data directory: /var/lib/rabbitmq/mnesia/rabbit@iZuf6eqvrwmawrgxqo5kw7Z
    24. Config files
    2.2 MacOS下安装RabbitMQ
    • 首先,使用更新 brew update命令Homebrew
    1. ## brew update 是Homebrew(一个 macOS 上的包管理器)中用于更新本地Homebrew软件包索引的命令
    2. ## 检查远程仓库以查看是否有可用的更新版本或者新的软件包,并将本地的软件包索引与远程仓库同步**。
    3. chenzh12@chenzh12deiMac ~ % brew update
    4. ==> Downloading https://mirrors.ustc.edu.cn/homebrew-bottles/bottles-portable-ruby/portable-ruby-3.1.4.arm64_big_sur.bottle.tar.gz
    5. ####################################################################################################################################################### 100.0%
    6. ==> Pouring portable-ruby-3.1.4.arm64_big_sur.bottle.tar.gz
    7. ==> Homebrew collects anonymous analytics.
    8. Read the analytics documentation (and how to opt-out) here:
    9. https://docs.brew.sh/Analytics
    10. No analytics have been recorded yet (nor will be during this `brew` run).
    11. ==> homebrew/core is old and unneeded, untapping to save space...
    12. Untapping homebrew/core...
    13. Untapped 2 commands and 7039 formulae (7,050 files, 931.5MB).
    14. ==> homebrew/cask is old and unneeded, untapping to save space...
    15. Untapping homebrew/cask...
    16. Untapped 4432 casks (4,214 files, 511.3MB).
    17. ==> Downloading https://formulae.brew.sh/api/formula_tap_migrations.jws.json
    18. ####################################################################################################################################################### 100.0%
    19. Updated 3 taps (homebrew/services, homebrew/core and homebrew/cask).
    20. ==> New Formulae
    21. c-blosc2 ingress2gateway libscfg msieve policy_sentry redict valkey
    22. dissent jnv logdy navidrome protoc-gen-js rtabmap vfox
    23. ffmpeg@6 jtbl manim overarch rage rustcat whisperkit-cli
    24. gitu liblc3 mantra parsedmarc ratchet tartufo yo
    25. ==> New Casks
    26. arctic ente-auth hhkb-studio loungy starnet2 yandex-music
    27. boltai fujifilm-x-raw-studio irpf2024 outfox toneprint
    28. capcut godspeed juxtacode phoenix-code viable
    29. clearvpn halloy lookaway requestly xcodepilot
    30. ==> Outdated Formulae
    31. ca-certificates gettext icu4c libtiff libxdmcp maven pcre2 xz
    32. cairo glib jpeg-turbo libx11 libxext openjdk redis zstd
    33. fontconfig harfbuzz libpng libxcb little-cms2 openssl@1.1 xorgproto
    34. You have 23 outdated formulae installed.
    35. You can upgrade them with brew upgrade
    36. or list them with brew outdated.
    • 安装Erlang语言环境: brew install erlang

            MacOS安装erlang语言环境可能会出现多次失败,需要网络状况良好,此外还需要运行xcode-select --install,提前安装Command Line Tools

    1. chenzh12@chenzh12deiMac ~ % brew install erlang
    2. Warning: You are using macOS 11.
    3. We (and Apple) do not provide support for this old version.
    4. It is expected behaviour that some formulae will fail to build in this old version.
    5. It is expected behaviour that Homebrew will be buggy and slow.
    6. Do not create any issues about this on Homebrew's GitHub repositories.
    7. Do not create any issues even if you think this message is unrelated.
    8. Any opened issues will be immediately closed without response.
    9. Do not ask for help from Homebrew or its maintainers on social media.
    10. You may ask for help in Homebrew's discussions but are unlikely to receive a response.
    11. Try to figure out the problem yourself and submit a fix as a pull request.
    12. We will review it but may or may not accept it.
    13. ==> Fetching dependencies for erlang: ca-certificates, openssl@3, m4, libtool, unixodbc, cmake, jpeg-turbo, libpng, xz, zstd, libtiff, pcre2, pkg-config and wxwidgets
    14. ==> Fetching ca-certificates
    15. ==> Downloading https://mirrors.ustc.edu.cn/homebrew-bottles/ca-certificates-2024-03-11.all.bottle.tar.gz
    16. Already downloaded: /Users/chenzh12/Library/Caches/Homebrew/downloads/17b42fcbbed5f1d5a5f959ad26b218ab108748f29913580d4bbe585be13af894--ca-certificates-2024-03-11.all.bottle.tar.gz
    17. ==> Fetching openssl@3
    18. ==> Downloading https://raw.githubusercontent.com/Homebrew/homebrew-core/47f48c875570ca368d8e1ad0e2c7035403e8db90/Formula/o/openssl@3.rb
    19. ####################################################################################################################################################### 100.0%
    20. ==> Downloading https://github.com/openssl/openssl/commit/e9d7083e241670332e0443da0f0d4ffb52829f08.patch?full_index=1
    21. ####################################################################################################################################################### 100.0%
    22. ==> Downloading https://github.com/openssl/openssl/releases/download/openssl-3.2.1/openssl-3.2.1.tar.gz
    23. ==> Downloading from https://objects.githubusercontent.com/github-production-release-asset-2e65be/7634677/43abcd22-856f-405a-9126-4942d3d35f1f?X-Amz-Algorithm
    24. ####################################################################################################################################################### 100.0%
    25. ==> Fetching m4
    26. ==> Downloading https://mirrors.ustc.edu.cn/homebrew-bottles/m4-1.4.19.arm64_big_sur.bottle.tar.gz
    27. ####################################################################################################################################################### 100.0%
    28. ==> Fetching libtool
    29. ==> Downloading https://mirrors.ustc.edu.cn/homebrew-bottles/libtool-2.4.7.arm64_big_sur.bottle.1.tar.gz
    30. ####################################################################################################################################################### 100.0%
    31. ==> Fetching unixodbc
    32. ==> Downloading https://mirrors.ustc.edu.cn/homebrew-bottles/unixodbc-2.3.12.arm64_big_sur.bottle.tar.gz
    33. ####################################################################################################################################################### 100.0%
    34. ==> Fetching cmake
    35. ==> Downloading https://raw.githubusercontent.com/Homebrew/homebrew-core/47f48c875570ca368d8e1ad0e2c7035403e8db90/Formula/c/cmake.rb
    36. ####################################################################################################################################################### 100.0%
    37. ==> Downloading https://github.com/Kitware/CMake/releases/download/v3.29.2/cmake-3.29.2.tar.gz
    • 安装RabbitMQ Server:brew install rabbitmq
    1. chenzh12@chenzh12deiMac ~ % brew install rabbitmq
    2. ==> Downloading https://formulae.brew.sh/api/cask.jws.json
    3. #=O#- # #
    4. Warning: You are using macOS 11.
    5. We (and Apple) do not provide support for this old version.
    6. It is expected behaviour that some formulae will fail to build in this old version.
    7. It is expected behaviour that Homebrew will be buggy and slow.
    8. Do not create any issues about this on Homebrew's GitHub repositories.
    9. Do not create any issues even if you think this message is unrelated.
    10. Any opened issues will be immediately closed without response.
    11. Do not ask for help from Homebrew or its maintainers on social media.
    12. You may ask for help in Homebrew's discussions but are unlikely to receive a response.
    13. Try to figure out the problem yourself and submit a fix as a pull request.
    14. We will review it but may or may not accept it.
    15. ==> Fetching rabbitmq
    16. ==> Downloading https://mirrors.ustc.edu.cn/homebrew-bottles/rabbitmq-3.13.1.all.bottle.tar.gz
    17. ############################################################################################################################################################################################################################################# 100.0%
    18. ==> Pouring rabbitmq-3.13.1.all.bottle.tar.gz
    19. ==> Caveats
    20. Management UI: http://localhost:15672
    21. Homebrew-specific docs: https://rabbitmq.com/install-homebrew.html
    22. To start rabbitmq now and restart at login:
    23. brew services start rabbitmq
    24. Or, if you don't want/need a background service you can just run:
    25. CONF_ENV_FILE="/opt/homebrew/etc/rabbitmq/rabbitmq-env.conf" /opt/homebrew/opt/rabbitmq/sbin/rabbitmq-server
    26. ==> Summary
    27. 🍺 /opt/homebrew/Cellar/rabbitmq/3.13.1: 1,523 files, 35.9MB
    28. ==> Running `brew cleanup rabbitmq`...
    29. Disable this behaviour by setting HOMEBREW_NO_INSTALL_CLEANUP.
    30. Hide these hints with HOMEBREW_NO_ENV_HINTS (see `man brew`).
    • 默认并不会将RabbitMQ加到环境变量中,所以要进行小幅的配置
    1. ## 进入当前用户的home目录输入 cd ~
    2. ## 编辑.bash_profile文件 vim .bash_profile
    3. ## 添加 export PATH=$PATH:/usr/local/opt/rabbitmq/sbin 保存并关闭文件
    4. ## 更新刚配置的环境变量,输入source .bash_profile
    5. ## 启动RabbitMQ的服务端,输入rabbitmq-server
    6. chenzh12@chenzh12deiMac ~ % cd ~
    7. chenzh12@chenzh12deiMac ~ % vim .bash_profile
    8. chenzh12@chenzh12deiMac ~ % source .bash_profile
    9. chenzh12@chenzh12deiMac ~ % rabbitmq-server
    10. 2024-04-17 11:13:58.161801+08:00 [notice] <0.44.0> Application syslog exited with reason: stopped
    11. 2024-04-17 11:13:58.164286+08:00 [notice] <0.247.0> Logging: switching to configured handler(s); following messages may not be visible in this log output
    12. ## ## RabbitMQ 3.13.1
    13. ## ##
    14. ########## Copyright (c) 2007-2024 Broadcom Inc and/or its subsidiaries
    15. ###### ##
    16. ########## Licensed under the MPL 2.0. Website: https://rabbitmq.com
    17. Erlang: 26.2.4 [jit]
    18. TLS Library: OpenSSL - OpenSSL 3.2.1 30 Jan 2024
    19. Release series support status: supported
    20. Doc guides: https://www.rabbitmq.com/docs/documentation
    21. Support: https://www.rabbitmq.com/docs/contact
    22. Tutorials: https://www.rabbitmq.com/tutorials
    23. Monitoring: https://www.rabbitmq.com/docs/monitoring
    24. Logs: /opt/homebrew/var/log/rabbitmq/rabbit@localhost.log
    25. Config file(s): (none)
    26. Starting broker... completed with 7 plugins.
    • 关闭rabbitmq: rabbitmqctl stop
    1. chenzh12@chenzh12deiMac ~ % rabbitmqctl stop
    2. Stopping and halting node rabbit@localhost ...
    2.3 Windows安装

    不推荐,因为要求系统用户名和计算机名必须是英文,而Win10改名比较麻烦,而且可能会有其他坑,而且和未来的实际工作场景严重不符,没有Windows作为服务器的。

    3. RabbitMQ应用

    3.1 RabbitMQ的管理后台

            开启Web管理后台:rabbitmq-plugins enable rabbitmq_management

    1. [root@iZuf6eqvrwmawrgxqo5kw7Z ~]# rabbitmq-plugins enable rabbitmq_management
    2. Enabling plugins on node rabbit@iZuf6eqvrwmawrgxqo5kw7Z:
    3. rabbitmq_management
    4. The following plugins have been configured:
    5. rabbitmq_management
    6. rabbitmq_management_agent
    7. rabbitmq_web_dispatch
    8. Applying plugin configuration to rabbit@iZuf6eqvrwmawrgxqo5kw7Z...
    9. The following plugins have been enabled:
    10. rabbitmq_management
    11. rabbitmq_management_agent
    12. rabbitmq_web_dispatch
    13. started 3 plugins.

            添加admin用户

    1. [root@iZuf6eqvrwmawrgxqo5kw7Z ~]# rabbitmqctl add_user admin password
    2. Adding user "admin" ...
    3. [root@iZuf6eqvrwmawrgxqo5kw7Z ~]# rabbitmqctl set_user_tags admin administrator
    4. Setting tags for user "admin" to [administrator] ...

    注:默认用户只有guest,远端无法访问

    添加admin用户后,需要在ECS安全组中打开端口15672,然后在登录页面(IP:15672)使用admin的密码进行登录

            Admin→Virtual Hosts→Set permission在虚拟主机需要给新创建的admin添加权限

            使用Java创建生产者和消费者连接RabbitMQ服务,生产者Send向名为'hello'的消息队列发送消息,消费者Recv从指定的队列接收消息并进行处理。

    1. /**
    2. * 描述: 生产者连接到RabbitMQ服务端,然后发送一条消息,然后退出。
    3. */
    4. public class Send {
    5. private final static String QUEUE_NAME = "hello"; // 队列名
    6. public static void main(String[] args) throws IOException, TimeoutException {
    7. // 创建连接工厂,用于配置RabbitMQ连接。
    8. ConnectionFactory factory = new ConnectionFactory();
    9. // 配置RabbitMQ连接,服务端地址用于连接
    10. factory.setHost("IP"); //运行RabbitMQ的阿里云实例ip
    11. factory.setUsername("admin"); // 1.需要给ECS实例开端口5672;2.默认创建的guest用户权限虽高,但不支持远端登录,此处使用自己创建的admin;3.Virtual Hosts添加admin
    12. factory.setPassword("password");
    13. // 建立连接
    14. Connection connection = factory.newConnection();
    15. // 获得信道
    16. Channel channel = connection.createChannel();
    17. // 声明队列,参数解释:队列名称、是否持久化、是否独占、是否自动删除、其他参数(如消息 TTL、最大长度等)
    18. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    19. // 发布消息
    20. String message = "Hello World!7";
    21. channel.basicPublish("",QUEUE_NAME,null,message.getBytes("UTF-8")); // 参数解释:交换机名称(默认交换机为空字符串)、队列名称、消息属性、消息内容(字节数组)
    22. System.out.println("发送消息:" + message);
    23. // 关闭连接
    24. channel.close();
    25. connection.close();
    26. }
    27. }
    28. /**
    29. * 描述:消费者接收消息,并打印,持续运行
    30. */
    31. public class Recv {
    32. private final static String QUEUE_NAME = "hello"; // 队列名
    33. public static void main(String[] args) throws IOException, TimeoutException {
    34. //创建连接工厂,用于配置 RabbitMQ 连接的参数
    35. ConnectionFactory factory = new ConnectionFactory();
    36. //设置RabbitMQ地址
    37. factory.setHost("IP");
    38. factory.setUsername("admin");
    39. factory.setPassword("password");
    40. //建立连接
    41. Connection connection = factory.newConnection();
    42. //获得信道
    43. Channel channel = connection.createChannel();
    44. //声明队列
    45. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    46. //接收消息并消费,开始接收指定队列中的消息并消费。参数包括队列名称、是否自动确认消息、消费者对象
    47. channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel){ // 使用了匿名内部类DefaultConsumer,并重写了其handleDelivery方法,用于处理接收到的消息
    48. @Override
    49. public void handleDelivery(String consumerTag, Envelope envelope,
    50. BasicProperties properties, byte[] body) throws IOException {
    51. String message = new String(body, "UTF-8");
    52. System.out.println("收到消息:" + message);
    53. }
    54. });
    55. }
    56. }

            当生产者发送消息并被消费这消费之后,可以在RabbitMQ管理后台中直观看到队列中的消息变化:

    3.2 多个消费者

            当存在多个消费者时,RabbitMQ如何分配队列中的的消息?本节介绍循环调度和公平派遣两种策略。

    注:IDEA设置多实例并行运行,Run/Debug Configurations→Modify options→Allow mutiple instances开启

    RabbitMQ循环调度

            循环调度是指当多个消费者同时订阅同一个队列,并且队列中有多条消息时,RabbitMQ 将以循环的方式将消息分发给各个消费者,即逐条将消息轮流发送给不同的消费者。这种行为是 RabbitMQ 的默认行为,称为"轮询"分发(Round-Robin),用于在多个消费者之间均匀地分配消息负载。

            在循环调度中,消息将平均分布到每个消费者上,每个消费者一次处理一个消息,直到队列中的所有消息都被处理完毕

    RabbitMQ公平派遣(Fair Dispatch)

            公平派遣(Fair Dispatch)是一种消息分发策略,旨在确保每个消费者都能平等地获得工作,而不会出现某些消费者繁忙而其他消费者闲置的情况。在工作队列模式中,通常使用公平派遣来提高系统的整体性能和公平性。公平派遣的关键在于使用消息预取(Prefetch Count)机制,需要注意公平派遣需要增加消息确认机制。消费者可以使用channel.basicQos(number)设置每次从队列中取出的消息数,并使用方法channel.basicAck()手动确认消息被消费,这样就不会出现有的消费者繁忙有的限制的情况。

    1. /**
    2. * 描述:消费者,接收前面的批量消息
    3. */
    4. public class Worker {
    5. private final static String TASK_QUEUE_NAME = "task_queue"; // 存储队列的名称
    6. public static void main(String[] args) throws IOException, TimeoutException {
    7. //创建连接工厂
    8. ConnectionFactory factory = new ConnectionFactory();
    9. //设置RabbitMQ地址
    10. factory.setHost("localhost"); // 启用本地RabbitMQ客户端,使用默认guest账户登录
    11. //建立连接
    12. Connection connection = factory.newConnection();
    13. //获得信道
    14. final Channel channel = connection.createChannel();
    15. //声明队列
    16. channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
    17. System.out.println("开始接收消息");
    18. **channel.basicQos(1);** // 设置每次从队列中获取的消息数量为1,这样可以确保每个消费者在处理完一条消息之前不会接收到新的消息,这也称为"公平派遣"。
    19. // 从队列中消费消息。参数解释:队列名称、是否自动确认消息、消费者对象(这里使用了 DefaultConsumer,重写了其 handleDelivery 方法来处理消息)
    20. channel.basicConsume(TASK_QUEUE_NAME, false, new DefaultConsumer(channel) {
    21. @Override
    22. public void handleDelivery(String consumerTag, Envelope envelope,
    23. BasicProperties properties, byte[] body) throws IOException { // 方法参数解释:消费者标签、信封(包含消息元数据,如交换机、路由键等信息)、消息属性、消息内容
    24. String message = new String(body, "UTF-8");
    25. System.out.println("收到了消息:" + message);
    26. try {
    27. doWork(message);
    28. }finally {
    29. System.out.println("消息处理完成");
    30. **channel.basicAck(envelope.getDeliveryTag(), false); // 手动确认消息已经被消费,参数解释:消息标识符、是否批量确认**
    31. }
    32. }
    33. });
    34. }
    35. private static void doWork(String task) { //根据字符(消息内容)处理任务,含有'.'则等待一秒
    36. char[] chars = task.toCharArray();
    37. for (char ch : chars) {
    38. if (ch == '.') {
    39. try {
    40. Thread.sleep(1000);
    41. } catch (InterruptedException e) {
    42. e.printStackTrace();
    43. }
    44. }
    45. }

    4. 交换机的工作模式

            RabbitMQ 常用的 Exchange Type 有 fanoutdirecttopicheaders 这四种(AMQP 规范里还提到两种 Exchange Type,分别为 system 与 自定义,这里不予以描述)

            fanout:广播,这种模式只需要将队列绑定到交换机上即可,是不需要设置路由键的;fanout 类型的交换机会把所有发送到该交换机的消息路由到所有与它绑定的队列中,不需要做任何判断操作,所以也是速度最快的。

    1. /**
    2. * 描述:发送日志信息
    3. */
    4. public class EmitLog {
    5. private static final String EXCHANGE_NAME = "logs"; // 交换机名称
    6. public static void main(String[] args) throws IOException, TimeoutException {
    7. ConnectionFactory factory = new ConnectionFactory();
    8. factory.setHost("localhost");
    9. Connection connection = factory.newConnection();
    10. Channel channel = connection.createChannel();
    11. // 声明了一个名为EXCHANGE_NAME的交换机,交换机类型为****FANOUT**
    12. **channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); **
    13. String message = "info:Hello World4"; // 发送内容
    14. // **发布消息到指定的交换机。参数依次为交换机名称、路由键(对于fanout类型的交换机,路由键为空)、消息属性、消息内容的字节数组。字符串消息内容转换为UTF-8编码的字节数组进行发送
    15. channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes("UTF-8"));**
    16. System.out.println("发送了消息:" + message);
    17. channel.close();
    18. connection.close();
    19. }
    20. }
    21. /**
    22. * 描述:接收日志消息
    23. */
    24. public class ReceiveLogs {
    25. private static final String EXCHANGE_NAME = "logs";
    26. public static void main(String[] args) throws IOException, TimeoutException {
    27. ConnectionFactory factory = new ConnectionFactory();
    28. factory.setHost("localhost");
    29. Connection connection = factory.newConnection();
    30. Channel channel = connection.createChannel();
    31. // 声明了一个名为EXCHANGE_NAME的交换机,交换机类型为****FANOUT**
    32. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
    33. String queueName = channel.queueDeclare().getQueue(); // 声明了临时队列,并获取到队列的名称。临时队列是在没有指定队列名称的情况下创建的,每次连接时都会生成一个不同的队列名。
    34. channel.queueBind(queueName, EXCHANGE_NAME, ""); // 完成交换机和队列的绑定**
    35. System.out.println("开始接收消息");
    36. Consumer consumer = new DefaultConsumer(channel) {
    37. @Override
    38. public void handleDelivery(String consumerTag, Envelope envelope,
    39. BasicProperties properties, byte[] body) throws IOException {
    40. String message = new String(body, "UTF-8");
    41. System.out.println("收到消息:" + message);
    42. }
    43. };
    44. channel.basicConsume(queueName, true, consumer); // 消费指定队列中的消息。参数依次为队列名称、是否自动确认消息、消费者对象
    45. }
    46. }

            direct:根据RoutingKey匹配消息路由到指定队列;把消息路由到那些 Bindingkey 与 RoutingKey 完全匹配的 Queue 中。例如,channel.basicPublish(EXCHANGE_NAME, "info", null, message.getBytes("UTF-8"))中的路由键"Info"与交换机绑定队列的channel.queueBind(queueName, EXCHANGE_NAME, "info")**方法的绑定键"Info"完全匹配。

    1. public class EmitLogDirect {
    2. private static final String EXCHANGE_NAME = "direct_logs"; // 交换机名称
    3. public static void main(String[] args) throws IOException, TimeoutException {
    4. ConnectionFactory factory = new ConnectionFactory();
    5. factory.setHost("localhost");
    6. Connection connection = factory.newConnection();
    7. Channel channel = connection.createChannel();
    8. // **声明了一个名为EXCHANGE_NAME的交换机,交换机类型为****DIRECT**
    9. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
    10. // 发布消息到指定的交换机。参数依次为交换机名称、路由键(对于fanout类型的交换机,路由键为空)、消息属性、消息内容的字节数组。字符串消息内容转换为UTF-8编码的字节数组进行发送
    11. String message = "info:Hello World!";
    12. **channel.basicPublish(EXCHANGE_NAME, "info", null, message.getBytes("UTF-8"));**
    13. System.out.println("发送了消息," + "等级为info,消息内容:" + message);
    14. message = "warning:Hello World!";
    15. channel.basicPublish(EXCHANGE_NAME, "warning", null, message.getBytes("UTF-8"));
    16. System.out.println("发送了消息," + "等级为warning,消息内容:" + message);
    17. message = "error:Hello World!";
    18. channel.basicPublish(EXCHANGE_NAME, "error", null, message.getBytes("UTF-8"));
    19. System.out.println("发送了消息," + "等级为error,消息内容:" + message);
    20. channel.close();
    21. connection.close();
    22. }
    23. }
    24. /**
    25. * 描述:接收info,warning,error3个等级的日志
    26. */
    27. public class ReceiveLogsDirect1 {
    28. private static final String EXCHANGE_NAME = "direct_logs";
    29. public static void main(String[] args) throws IOException, TimeoutException {
    30. ConnectionFactory factory = new ConnectionFactory();
    31. factory.setHost("localhost");
    32. Connection connection = factory.newConnection();
    33. Channel channel = connection.createChannel();
    34. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
    35. //生成一个随机的临时的queue
    36. String queueName = channel.queueDeclare().getQueue();
    37. //一个交换机同时绑定3个queue
    38. channel.queueBind(queueName, EXCHANGE_NAME, "info"); // 绑定Routingkey
    39. channel.queueBind(queueName, EXCHANGE_NAME, "warning");
    40. channel.queueBind(queueName, EXCHANGE_NAME, "error");**
    41. System.out.println("开始接收消息");
    42. Consumer consumer = new DefaultConsumer(channel) {
    43. @Override
    44. public void handleDelivery(String consumerTag, Envelope envelope,
    45. BasicProperties properties, byte[] body) throws IOException {
    46. String message = new String(body, "UTF-8");
    47. System.out.println("收到消息:" + message);
    48. }
    49. };
    50. channel.basicConsume(queueName, true, consumer);
    51. }
    52. }
    53. /**
    54. * 描述:接收error等级的日志
    55. */
    56. public class ReceiveLogsDirect2 {
    57. private static final String EXCHANGE_NAME = "direct_logs";
    58. public static void main(String[] args) throws IOException, TimeoutException {
    59. ConnectionFactory factory = new ConnectionFactory();
    60. factory.setHost("localhost");
    61. Connection connection = factory.newConnection();
    62. Channel channel = connection.createChannel();
    63. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
    64. //生成一个随机的临时的queue
    65. String queueName = channel.queueDeclare().getQueue();
    66. //一个交换机绑定1个queue
    67. channel.queueBind(queueName, EXCHANGE_NAME, "error");
    68. System.out.println("开始接收消息");
    69. Consumer consumer = new DefaultConsumer(channel) {
    70. @Override
    71. public void handleDelivery(String consumerTag, Envelope envelope,
    72. BasicProperties properties, byte[] body) throws IOException {
    73. String message = new String(body, "UTF-8");
    74. System.out.println("收到消息:" + message);
    75. }
    76. };
    77. channel.basicConsume(queueName, true, consumer);
    78. }
    79. }

            topic:生产者指定RoutingKey消息根据消费端指定的队列,通过模糊匹配的方式进行相应转发;匹配规则:'*'可以代替一个单词,'#'可以替代零个或者多个单词

    1. /**
    2. * 描述:topic模式交换机,发送消息
    3. */
    4. public class EmitLogTopic {
    5. private static final String EXCHANGE_NAME = "topic_logs";
    6. public static void main(String[] args) throws IOException, TimeoutException {
    7. ConnectionFactory factory = new ConnectionFactory();
    8. factory.setHost("localhost");
    9. Connection connection = factory.newConnection();
    10. Channel channel = connection.createChannel();
    11. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);// 交换机类型TOPIC
    12. String message = "Animal World";
    13. String[] routingKeys = new String[9];
    14. routingKeys[0] = "quick.orange.rabbit";
    15. routingKeys[1] = "lazy.orange.elephant";
    16. routingKeys[2] = "quick.orange.fox";
    17. routingKeys[3] = "lazy.brown.fox";
    18. routingKeys[4] = "lazy.pink.rabbit";
    19. routingKeys[5] = "quick.brown.fox";
    20. routingKeys[6] = ".orange."; // **匹配到ReceiveLogsTopic1,则**
    21. routingKeys[7] = "quick.orange.male.rabbit";
    22. routingKeys[8] = "lazy.orange.male.rabbit";
    23. for (int i = 0; i < routingKeys.length; i++) {
    24. channel.basicPublish(EXCHANGE_NAME, routingKeys[i], null, message.getBytes("UTF-8"));
    25. System.out.println("发送了:" + message+" routingKey:"+routingKeys[i]);
    26. }
    27. channel.close();
    28. connection.close();
    29. }
    30. }
    31. 开始接收消息
    32. 收到消息:Animal World routingKey: quick.orange.rabbit
    33. 收到消息:Animal World routingKey: lazy.orange.elephant
    34. 收到消息:Animal World routingKey: quick.orange.fox
    35. 收到消息:Animal World routingKey: quick.orange.rabbit
    36. 收到消息:Animal World routingKey: lazy.orange.elephant
    37. 收到消息:Animal World routingKey: quick.orange.fox
    38. 收到消息:Animal World routingKey: .orange.
    39. /**
    40. * 描述: 特定路由键
    41. */
    42. public class ReceiveLogsTopic1 {
    43. private static final String EXCHANGE_NAME = "topic_logs";
    44. public static void main(String[] args) throws IOException, TimeoutException {
    45. ConnectionFactory factory = new ConnectionFactory();
    46. factory.setHost("localhost");
    47. Connection connection = factory.newConnection();
    48. Channel channel = connection.createChannel();
    49. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
    50. //生成一个随机的临时的queue
    51. String queueName = channel.queueDeclare().getQueue();
    52. String routingKey = "*.orange.*";
    53. channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
    54. System.out.println("开始接收消息");
    55. Consumer consumer = new DefaultConsumer(channel) {
    56. @Override
    57. public void handleDelivery(String consumerTag, Envelope envelope,
    58. BasicProperties properties, byte[] body) throws IOException {
    59. String message = new String(body, "UTF-8");
    60. System.out.println("收到消息:" + message + " routingKey: " + envelope.getRoutingKey());
    61. }
    62. };
    63. channel.basicConsume(queueName, true, consumer);
    64. }
    65. }
    66. /**
    67. * 描述:特定路由键2
    68. */
    69. public class ReceiveLogsTopic2 {
    70. private static final String EXCHANGE_NAME = "topic_logs";
    71. public static void main(String[] args) throws IOException, TimeoutException {
    72. ConnectionFactory factory = new ConnectionFactory();
    73. factory.setHost("localhost");
    74. Connection connection = factory.newConnection();
    75. Channel channel = connection.createChannel();
    76. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
    77. //生成一个随机的临时的queue
    78. String queueName = channel.queueDeclare().getQueue();
    79. String routingKey = "*.*.rabbit";
    80. channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
    81. String routingKey2 = "lazy.#";
    82. channel.queueBind(queueName, EXCHANGE_NAME, routingKey2);
    83. System.out.println("开始接收消息");
    84. Consumer consumer = new DefaultConsumer(channel) {
    85. @Override
    86. public void handleDelivery(String consumerTag, Envelope envelope,
    87. BasicProperties properties, byte[] body) throws IOException {
    88. String message = new String(body, "UTF-8");
    89. System.out.println("收到消息:" + message + " routingKey: " + envelope.getRoutingKey());
    90. }
    91. };
    92. channel.basicConsume(queueName, true, consumer);
    93. }
    94. }

            headers:不推荐,根据发送消息内容中的headers属性来匹配。

    5. Spring Boot 整合RabbitMQ

    • 首先,安装依赖,并配置RabbitMQ
    1. **pom.xml**
    2. <dependency>
    3. <groupId>org.springframework.bootgroupId>
    4. <artifactId>spring-boot-starter-amqpartifactId>
    5. dependency>
    6. **aplication.properties**
    7. ## 配置RabbitMQ的属性
    8. server.port=8080
    9. spring.application.name=producer
    10. spring.rabbitmq.addresses=127.0.0.1:5672
    11. spring.rabbitmq.username=guest
    12. spring.rabbitmq.password=guest
    13. spring.rabbitmq.virtual-host=/
    14. spring.rabbitmq.connection-timeout=15000
    • 然后,生产者发送消息到指定的交换机,并根据指定的路由键将消息路由到相应的队列中。消息被发送到 RabbitMQ 服务器上,并等待消费者来处理。

            Spring AMQP 提供的一个方法,用于向 RabbitMQ 发送消息方法如下:

    1. rabbitmqTemplate.convertAndSend(exchange, routingKey, message);
    2. 其中:
    3. exchange 是要发送消息到的交换机的名称。
    4. routingKey 是消息的路由键,用于将消息路由到匹配的队列。
    5. message 是要发送的消息内容。
    1. /**
    2. * 描述: rabbitmq配置类
    3. */
    4. @Configuration //标识这是一个配置类
    5. public class TopicRabbitConfig {
    6. @Bean
    7. public Queue queue1() {
    8. return new Queue("queue1");
    9. } // 定义了一个名为队列"queue1"
    10. @Bean
    11. public Queue queue2() {
    12. return new Queue("queue2");
    13. }
    14. @Bean
    15. TopicExchange exchange() {
    16. return new TopicExchange("bootExchange");
    17. } //定义了一个名为bootExchange的Topic交换机
    18. **// 绑定到交换机上,并指定路由键**
    19. @Bean
    20. Binding bingdingExchangeMessage1(Queue queue1, TopicExchange exchange) { // queue1队列绑定到交换机上,并指定路由键为"dog.red"
    21. return BindingBuilder.bind(queue1).to(exchange).with("dog.red");
    22. }
    23. @Bean
    24. Binding bingdingExchangeMessage2(Queue queue2, TopicExchange exchange) { // queue2队列绑定到交换机上,并指定路由键为"dog.#"
    25. return BindingBuilder.bind(queue2).to(exchange).with("dog.#");
    26. }
    27. }
    28. /**
    29. * 描述:发送消息
    30. */
    31. @Component
    32. public class MsgSender {
    33. @Autowired
    34. private AmqpTemplate rabbitmqTemplate; // @Autowired注解将AmqpTemplate接口的实例注入到rabbitmqTemplate字段中,用于发送消息到 abbitMQ
    35. public void send1() {
    36. String message = "This is message1, routing key is dog.red";
    37. System.out.println("发送了:" + message);
    38. this.**rabbitmqTemplate.convertAndSend**("bootExchange","dog.red",message); // 将消息发送到名为"bootExchange"的交换机,并使用"dog.red"作为路由键
    39. }
    40. public void send2() {
    41. String message = "This is message2, routing key is dog.black";
    42. System.out.println("发送了:" + message);
    43. this.**rabbitmqTemplate.convertAndSend**("bootExchange","dog.black",message);
    44. }
    45. }
    • 最后,消费者接收消息,并执行相应操作。
    1. /**
    2. * 描述:消费者1
    3. */
    4. @Component
    5. @RabbitListener(queues = "queue1") // **标记一个方法或类是RabbitMQ消息的监听器**,并指定监听的队列名称
    6. public class Receiver1 {
    7. @RabbitHandler // **标记RabbitMQ消息处理方法**。当消费者接收到消息时,Spring将调用该方法来处理消息。方法的参数类型为String,表示接收到的消息内容。
    8. public void process(String message) {
    9. System.out.println("Receiver1: " + message);
    10. }
    11. }
    12. /**
    13. * 描述:消费者2
    14. */
    15. @Component
    16. @RabbitListener(queues = "queue2")
    17. public class Receiver2 {
    18. @RabbitHandler
    19. public void process(String message) {
    20. System.out.println("Receiver2: " + message);
    21. }
    22. }
    23. 当send1和send2发送消息后,消费者收到的结果如下:
    24. *Receiver1: This is message1, routing key is dog.red
    25. Receiver2: This is message1, routing key is dog.red
    26. Receiver2: This is message2, routing key is dog.black*
  • 相关阅读:
    【知识点】怎么确定时间复杂度与空间复杂度
    华为机试真题 Java 实现【矩阵最大值】
    使用XnView MP快速查看图片某个像素点的RGB像素值
    Matlab之查询子字符串在字符串中的起始位置函数strfind
    抓包工具mitmprox
    第 5 章理解 ScrollView 并构建 Carousel UI
    docker打包部署自己的应用
    坦克车机器人操作学习总结开始篇
    Abnova丨A4GNT多克隆抗体中英文说明
    flink sql热加载自定义函数 不重启flink集群
  • 原文地址:https://blog.csdn.net/weixin_61933613/article/details/138095181