摘要:整理列下RabbitMQ相关的内容,包括基本概念、安装(Linux和MacOS)和使用,记录下...
消息队列看作是一个按照顺序存放消息的容器,当使用消息的时候,直接从容器中按照顺序取出消息使用即可。对系统而言,消息队列的作用主要体现为如下三点
通过异步处理提高系统性能(减少响应所需时间)
用户请求数据存入消息队列之后,立即返回结果,异步的对消息进行消费。需要注意的是后续的处理可能会失败。
削峰/限流
将短时间高并发产生的事务消息存储在消息队列中,避免对系统产生的冲击
降低系统耦合性。
消息队列使用发布-订阅模式工作,消息发送者(生产者)发布消息,一个或多个消息接受者(消费者)订阅消息,生产者和消费者之间不存在直接耦合。
RabbitMQ 是采用 Erlang 语言实现 AMQP(Advanced Message Queuing Protocol,高级消息队列协议)的消息中间件,最初起源于金融系统,用于在分布式系统中存储转发消息。RabbitMQ 整体上是一个生产者与消费者模型,主要负责接收、存储和转发消息。
RabbitMQ具有特点如下:可靠性、灵活的路由、扩展性、高可用性、支持多种协议、多语言客户端、易用的管理界面和插件机制。
具体介绍可参阅官网介绍:
RabbitMQ: One broker to queue them all | RabbitMQhttps://www.rabbitmq.com/
RabbitMQ 的整体模型架构如下
RabbitMQ的核心概念
建议在一个新建的阿里云的Cent OS 7.6上安装,不要对yum换源,否则可能会安装失败。实际使用CentOS 7.9 依然OK
- ## 命令解析
- ## echo命令用于输出文本到标准输出。
- ## "export LC_ALL=en_US.UTF-8" 是要输出的文本,这里设置了LC_ALL环境变量的值为en_US.UTF-8,表示使用UTF-8编码作为默认的语言环境。
- ## >> /etc/profile将输出内容追加到/etc/profile文件的末尾。在这里,/etc/profile是一个系统级的shell配置文件,它在用户登录时被执行。
- [root@iZuf6eqvrwmawrgxqo5kw7Z ~]# echo "export LC_ALL=en_US.UTF-8" >> /etc/profile
- ## 命令解析
- ## source命令用于在当前shell环境中**执行指定的脚本文件**,这里是执行/etc/profile文件。
- ## 执行source /etc/profile可以立即使新设置的环境变量生效,而不需要重新登录系统。
- [root@iZuf6eqvrwmawrgxqo5kw7Z ~]# source /etc/profile
- ## 命令解析
- ## curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh: curl命令是一个用于在命令行下传输数据的工具。-s 参数表示静默模式,不输出进度或错误信息。https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh是一个URL,指向了一个脚本文件。
- ## |: 管道符号将前一个命令的输出作为后一个命令的输入。
- ## sudo bash: 使用了sudo命令以超级用户的权限执行后面的命令。bash 是一个Unix shell,也是一个命令行解释器,它用于执行脚本
- [root@iZuf6eqvrwmawrgxqo5kw7Z ~]# curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | sudo bash
- Detected operating system as centos/7.
- Checking for curl...
- Detected curl...
- Downloading repository file: https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/config_file.repo?os=centos&dist=7&source=script
- done.
- Installing pygpgme to verify GPG signatures...
- Loaded plugins: fastestmirror
- Loading mirror speeds from cached hostfile
- * base: mirrors.aliyun.com
- * extras: mirrors.aliyun.com
- * updates: mirrors.aliyun.com
- base | 3.6 kB 00:00:00
- docker-ce-stable | 3.5 kB 00:00:00
- epel | 4.7 kB 00:00:00
- extras | 2.9 kB 00:00:00
- mysql-connectors-community | 2.6 kB 00:00:00
- mysql-tools-community | 2.6 kB 00:00:00
- mysql57-community | 2.6 kB 00:00:00
- nginx-stable | 2.9 kB 00:00:00
- rabbitmq_rabbitmq-server-source/signature | 836 B 00:00:00
- Retrieving key from https://packagecloud.io/rabbitmq/rabbitmq-server/gpgkey
- Importing GPG key 0x4D206F89:
- Userid : "https://packagecloud.io/rabbitmq/rabbitmq-server (https://packagecloud.io/docs#gpg_signing)
" - Fingerprint: 8c69 5b02 19af deb0 4a05 8ed8 f4e7 8920 4d20 6f89
- From : https://packagecloud.io/rabbitmq/rabbitmq-server/gpgkey
- rabbitmq_rabbitmq-server-source/signature | 1.0 kB 00:00:00 !!!
- updates | 2.9 kB 00:00:00
- (1/3): epel/x86_64/updateinfo | 1.0 MB 00:00:00
- (2/3): epel/x86_64/primary_db | 7.0 MB 00:00:00
- (3/3): docker-ce-stable/7/x86_64/primary_db | 142 kB 00:00:00
- rabbitmq_rabbitmq-server-source/primary | 175 B 00:00:02
- Package pygpgme-0.3-9.el7.x86_64 already installed and latest version
- Nothing to do
- Installing yum-utils...
- Loaded plugins: fastestmirror
- Loading mirror speeds from cached hostfile
- * base: mirrors.aliyun.com
- * extras: mirrors.aliyun.com
- * updates: mirrors.aliyun.com
- Package yum-utils-1.1.31-54.el7_8.noarch already installed and latest version
- Nothing to do
- Generating yum cache for rabbitmq_rabbitmq-server...
- Importing GPG key 0x4D206F89:
- Userid : "https://packagecloud.io/rabbitmq/rabbitmq-server (https://packagecloud.io/docs#gpg_signing)
" - Fingerprint: 8c69 5b02 19af deb0 4a05 8ed8 f4e7 8920 4d20 6f89
- From : https://packagecloud.io/rabbitmq/rabbitmq-server/gpgkey
- Generating yum cache for rabbitmq_rabbitmq-server-source...
-
- The repository is setup! You can now install packages.
- ## 命令解析
- ## curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh: 这部分使用了curl命令,同样是用于从指定的URL下载数据。-s参数表示静默模式,不输出进度或错误信息。https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh是一个URL,指向了一个脚本文件。
- ## |: 管道符号,它的作用是将前一个命令的输出作为后一个命令的输入。
- ## sudo bash: sudo命令以超级用户的权限执行后面的命令。bash则是用于执行脚本的命令行解释器。
- [root@iZuf6eqvrwmawrgxqo5kw7Z ~]# curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash
- Detected operating system as centos/7.
- Checking for curl...
- Detected curl...
- Downloading repository file: https://packagecloud.io/install/repositories/rabbitmq/erlang/config_file.repo?os=centos&dist=7&source=script
- done.
- Installing pygpgme to verify GPG signatures...
- Loaded plugins: fastestmirror
- Loading mirror speeds from cached hostfile
- * base: mirrors.aliyun.com
- * extras: mirrors.aliyun.com
- * updates: mirrors.aliyun.com
- rabbitmq_erlang-source/signature | 819 B 00:00:00
- Retrieving key from https://packagecloud.io/rabbitmq/erlang/gpgkey
- Importing GPG key 0xDF309A0B:
- Userid : "https://packagecloud.io/rabbitmq/erlang (https://packagecloud.io/docs#gpg_signing)
" - Fingerprint: 2ebd e413 d3ce 5d35 bcd1 5b7c 71c6 3471 df30 9a0b
- From : https://packagecloud.io/rabbitmq/erlang/gpgkey
- rabbitmq_erlang-source/signature | 951 B 00:00:00 !!!
- rabbitmq_rabbitmq-server/x86_64/signature | 833 B 00:00:00
- rabbitmq_rabbitmq-server/x86_64/signature | 1.8 kB 00:00:00 !!!
- rabbitmq_erlang-source/primary | 175 B 00:00:02
- Package pygpgme-0.3-9.el7.x86_64 already installed and latest version
- Nothing to do
- Installing yum-utils...
- Loaded plugins: fastestmirror
- Loading mirror speeds from cached hostfile
- * base: mirrors.aliyun.com
- * extras: mirrors.aliyun.com
- * updates: mirrors.aliyun.com
- rabbitmq_rabbitmq-server-source/signature | 836 B 00:00:00
- rabbitmq_rabbitmq-server-source/signature | 1.0 kB 00:00:00 !!!
- Package yum-utils-1.1.31-54.el7_8.noarch already installed and latest version
- Nothing to do
- Generating yum cache for rabbitmq_erlang...
- Importing GPG key 0xDF309A0B:
- Userid : "https://packagecloud.io/rabbitmq/erlang (https://packagecloud.io/docs#gpg_signing)
" - Fingerprint: 2ebd e413 d3ce 5d35 bcd1 5b7c 71c6 3471 df30 9a0b
- From : https://packagecloud.io/rabbitmq/erlang/gpgkey
- Generating yum cache for rabbitmq_erlang-source...
-
- The repository is setup! You can now install packages.
- ## 命令解析
- ## rabbitmq-server-3.8.2-1.el7.noarch: 这是要安装的软件包的名称和版本。在这里,rabbitmq-server是软件包的名称,3.8.2-1.el7.noarch是软件包的版本。3.8.2是RabbitMQ的版本号,1.el7表示适用于CentOS 7版本的软件包,noarch表示这个软件包是与特定架构无关的,可以在任何架构的系统上运行
- [root@iZuf6eqvrwmawrgxqo5kw7Z ~]# sudo yum install rabbitmq-server-3.8.2-1.el7.noarch
- Loaded plugins: fastestmirror
- Loading mirror speeds from cached hostfile
- * base: mirrors.aliyun.com
- * extras: mirrors.aliyun.com
- * updates: mirrors.aliyun.com
- Resolving Dependencies
- --> Running transaction check
- ---> Package rabbitmq-server.noarch 0:3.8.2-1.el7 will be installed
- --> Processing Dependency: erlang >= 21.3 for package: rabbitmq-server-3.8.2-1.el7.noarch
- --> Running transaction check
- ---> Package erlang.x86_64 0:23.3.4.11-1.el7 will be installed
- --> Finished Dependency Resolution
-
- Dependencies Resolved
-
- ====================================================================================================================================================================================================
- Package Arch Version Repository Size
- ====================================================================================================================================================================================================
- Installing:
- rabbitmq-server noarch 3.8.2-1.el7 rabbitmq_rabbitmq-server 12 M
- Installing for dependencies:
- erlang x86_64 23.3.4.11-1.el7 rabbitmq_erlang 19 M
-
- Transaction Summary
- ====================================================================================================================================================================================================
- Install 1 Package (+1 Dependent package)
-
- Total download size: 31 M
- Installed size: 47 M
- **Is this ok [y/d/N]: y**
- Downloading packages:
- (1/2): erlang-23.3.4.11-1.el7.x86_64.rpm | 19 MB 00:00:03
- (2/2): rabbitmq-server-3.8.2-1.el7.noarch.rpm | 12 MB 00:00:04
- ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
- Total 7.6 MB/s | 31 MB 00:00:04
- Running transaction check
- Running transaction test
- Transaction test succeeded
- Running transaction
- Installing : erlang-23.3.4.11-1.el7.x86_64 1/2
- Installing : rabbitmq-server-3.8.2-1.el7.noarch 2/2
- Verifying : erlang-23.3.4.11-1.el7.x86_64 1/2
- Verifying : rabbitmq-server-3.8.2-1.el7.noarch 2/2
-
- Installed:
- rabbitmq-server.noarch 0:3.8.2-1.el7
-
- Dependency Installed:
- erlang.x86_64 0:23.3.4.11-1.el7
-
- Complete!
- ## 启动RabbitMQ服务
- ## systemctl: 是一个系统服务管理工具,用于管理系统的服务。它可以启动、停止、重启和管理系统服务的状态;
- ## start: systemctl命令的一个子命令,用于启动指定的系统服务;
- ## rabbitmq-server: 这是要启动的服务的名称,即RabbitMQ。
- [root@iZuf6eqvrwmawrgxqo5kw7Z ~]# systemctl start rabbitmq-server
- ## 检查RabbitMQ的状态
- [root@iZuf6eqvrwmawrgxqo5kw7Z ~]# rabbitmqctl status
- Status of node rabbit@iZuf6eqvrwmawrgxqo5kw7Z ...
- Runtime
-
- OS PID: 14601
- OS: Linux
- Uptime (seconds): 24
- RabbitMQ version: 3.8.2
- Node name: rabbit@iZuf6eqvrwmawrgxqo5kw7Z
- Erlang configuration: Erlang/OTP 23 [erts-11.2.2.10] [source] [64-bit] [smp:2:2] [ds:2:2:10] [async-threads:64] [hipe]
- Erlang processes: 264 used, 1048576 limit
- Scheduler run queue: 1
- Cluster heartbeat timeout (net_ticktime): 60
-
- Plugins
-
- Enabled plugin file: /etc/rabbitmq/enabled_plugins
- Enabled plugins:
-
-
- Data directory
-
- Node data directory: /var/lib/rabbitmq/mnesia/rabbit@iZuf6eqvrwmawrgxqo5kw7Z
-
- Config files
- ## brew update 是Homebrew(一个 macOS 上的包管理器)中用于更新本地Homebrew软件包索引的命令
- ## 检查远程仓库以查看是否有可用的更新版本或者新的软件包,并将本地的软件包索引与远程仓库同步**。
- chenzh12@chenzh12deiMac ~ % brew update
- ==> Downloading https://mirrors.ustc.edu.cn/homebrew-bottles/bottles-portable-ruby/portable-ruby-3.1.4.arm64_big_sur.bottle.tar.gz
- ####################################################################################################################################################### 100.0%
- ==> Pouring portable-ruby-3.1.4.arm64_big_sur.bottle.tar.gz
- ==> Homebrew collects anonymous analytics.
- Read the analytics documentation (and how to opt-out) here:
- https://docs.brew.sh/Analytics
- No analytics have been recorded yet (nor will be during this `brew` run).
-
- ==> homebrew/core is old and unneeded, untapping to save space...
- Untapping homebrew/core...
- Untapped 2 commands and 7039 formulae (7,050 files, 931.5MB).
- ==> homebrew/cask is old and unneeded, untapping to save space...
- Untapping homebrew/cask...
- Untapped 4432 casks (4,214 files, 511.3MB).
- ==> Downloading https://formulae.brew.sh/api/formula_tap_migrations.jws.json
- ####################################################################################################################################################### 100.0%
- Updated 3 taps (homebrew/services, homebrew/core and homebrew/cask).
- ==> New Formulae
- c-blosc2 ingress2gateway libscfg msieve policy_sentry redict valkey
- dissent jnv logdy navidrome protoc-gen-js rtabmap vfox
- ffmpeg@6 jtbl manim overarch rage rustcat whisperkit-cli
- gitu liblc3 mantra parsedmarc ratchet tartufo yo
- ==> New Casks
- arctic ente-auth hhkb-studio loungy starnet2 yandex-music
- boltai fujifilm-x-raw-studio irpf2024 outfox toneprint
- capcut godspeed juxtacode phoenix-code viable
- clearvpn halloy lookaway requestly xcodepilot
- ==> Outdated Formulae
- ca-certificates gettext icu4c libtiff libxdmcp maven pcre2 xz
- cairo glib jpeg-turbo libx11 libxext openjdk redis zstd
- fontconfig harfbuzz libpng libxcb little-cms2 openssl@1.1 xorgproto
-
- You have 23 outdated formulae installed.
- You can upgrade them with brew upgrade
- or list them with brew outdated.
MacOS安装erlang语言环境可能会出现多次失败,需要网络状况良好,此外还需要运行xcode-select --install,提前安装Command Line Tools
- chenzh12@chenzh12deiMac ~ % brew install erlang
- Warning: You are using macOS 11.
- We (and Apple) do not provide support for this old version.
- It is expected behaviour that some formulae will fail to build in this old version.
- It is expected behaviour that Homebrew will be buggy and slow.
- Do not create any issues about this on Homebrew's GitHub repositories.
- Do not create any issues even if you think this message is unrelated.
- Any opened issues will be immediately closed without response.
- Do not ask for help from Homebrew or its maintainers on social media.
- You may ask for help in Homebrew's discussions but are unlikely to receive a response.
- Try to figure out the problem yourself and submit a fix as a pull request.
- We will review it but may or may not accept it.
-
- ==> Fetching dependencies for erlang: ca-certificates, openssl@3, m4, libtool, unixodbc, cmake, jpeg-turbo, libpng, xz, zstd, libtiff, pcre2, pkg-config and wxwidgets
- ==> Fetching ca-certificates
- ==> Downloading https://mirrors.ustc.edu.cn/homebrew-bottles/ca-certificates-2024-03-11.all.bottle.tar.gz
- Already downloaded: /Users/chenzh12/Library/Caches/Homebrew/downloads/17b42fcbbed5f1d5a5f959ad26b218ab108748f29913580d4bbe585be13af894--ca-certificates-2024-03-11.all.bottle.tar.gz
- ==> Fetching openssl@3
- ==> Downloading https://raw.githubusercontent.com/Homebrew/homebrew-core/47f48c875570ca368d8e1ad0e2c7035403e8db90/Formula/o/openssl@3.rb
- ####################################################################################################################################################### 100.0%
- ==> Downloading https://github.com/openssl/openssl/commit/e9d7083e241670332e0443da0f0d4ffb52829f08.patch?full_index=1
- ####################################################################################################################################################### 100.0%
- ==> Downloading https://github.com/openssl/openssl/releases/download/openssl-3.2.1/openssl-3.2.1.tar.gz
- ==> Downloading from https://objects.githubusercontent.com/github-production-release-asset-2e65be/7634677/43abcd22-856f-405a-9126-4942d3d35f1f?X-Amz-Algorithm
- ####################################################################################################################################################### 100.0%
- ==> Fetching m4
- ==> Downloading https://mirrors.ustc.edu.cn/homebrew-bottles/m4-1.4.19.arm64_big_sur.bottle.tar.gz
- ####################################################################################################################################################### 100.0%
- ==> Fetching libtool
- ==> Downloading https://mirrors.ustc.edu.cn/homebrew-bottles/libtool-2.4.7.arm64_big_sur.bottle.1.tar.gz
- ####################################################################################################################################################### 100.0%
- ==> Fetching unixodbc
- ==> Downloading https://mirrors.ustc.edu.cn/homebrew-bottles/unixodbc-2.3.12.arm64_big_sur.bottle.tar.gz
- ####################################################################################################################################################### 100.0%
- ==> Fetching cmake
- ==> Downloading https://raw.githubusercontent.com/Homebrew/homebrew-core/47f48c875570ca368d8e1ad0e2c7035403e8db90/Formula/c/cmake.rb
- ####################################################################################################################################################### 100.0%
- ==> Downloading https://github.com/Kitware/CMake/releases/download/v3.29.2/cmake-3.29.2.tar.gz
- chenzh12@chenzh12deiMac ~ % brew install rabbitmq
- ==> Downloading https://formulae.brew.sh/api/cask.jws.json
- #=O#- # #
- Warning: You are using macOS 11.
- We (and Apple) do not provide support for this old version.
- It is expected behaviour that some formulae will fail to build in this old version.
- It is expected behaviour that Homebrew will be buggy and slow.
- Do not create any issues about this on Homebrew's GitHub repositories.
- Do not create any issues even if you think this message is unrelated.
- Any opened issues will be immediately closed without response.
- Do not ask for help from Homebrew or its maintainers on social media.
- You may ask for help in Homebrew's discussions but are unlikely to receive a response.
- Try to figure out the problem yourself and submit a fix as a pull request.
- We will review it but may or may not accept it.
-
- ==> Fetching rabbitmq
- ==> Downloading https://mirrors.ustc.edu.cn/homebrew-bottles/rabbitmq-3.13.1.all.bottle.tar.gz
- ############################################################################################################################################################################################################################################# 100.0%
- ==> Pouring rabbitmq-3.13.1.all.bottle.tar.gz
- ==> Caveats
- Management UI: http://localhost:15672
- Homebrew-specific docs: https://rabbitmq.com/install-homebrew.html
-
- To start rabbitmq now and restart at login:
- brew services start rabbitmq
- Or, if you don't want/need a background service you can just run:
- CONF_ENV_FILE="/opt/homebrew/etc/rabbitmq/rabbitmq-env.conf" /opt/homebrew/opt/rabbitmq/sbin/rabbitmq-server
- ==> Summary
- 🍺 /opt/homebrew/Cellar/rabbitmq/3.13.1: 1,523 files, 35.9MB
- ==> Running `brew cleanup rabbitmq`...
- Disable this behaviour by setting HOMEBREW_NO_INSTALL_CLEANUP.
- Hide these hints with HOMEBREW_NO_ENV_HINTS (see `man brew`).
- ## 进入当前用户的home目录输入 cd ~
- ## 编辑.bash_profile文件 vim .bash_profile
- ## 添加 export PATH=$PATH:/usr/local/opt/rabbitmq/sbin 保存并关闭文件
- ## 更新刚配置的环境变量,输入source .bash_profile
- ## 启动RabbitMQ的服务端,输入rabbitmq-server
- chenzh12@chenzh12deiMac ~ % cd ~
- chenzh12@chenzh12deiMac ~ % vim .bash_profile
- chenzh12@chenzh12deiMac ~ % source .bash_profile
- chenzh12@chenzh12deiMac ~ % rabbitmq-server
- 2024-04-17 11:13:58.161801+08:00 [notice] <0.44.0> Application syslog exited with reason: stopped
- 2024-04-17 11:13:58.164286+08:00 [notice] <0.247.0> Logging: switching to configured handler(s); following messages may not be visible in this log output
-
- ## ## RabbitMQ 3.13.1
- ## ##
- ########## Copyright (c) 2007-2024 Broadcom Inc and/or its subsidiaries
- ###### ##
- ########## Licensed under the MPL 2.0. Website: https://rabbitmq.com
-
- Erlang: 26.2.4 [jit]
- TLS Library: OpenSSL - OpenSSL 3.2.1 30 Jan 2024
- Release series support status: supported
-
- Doc guides: https://www.rabbitmq.com/docs/documentation
- Support: https://www.rabbitmq.com/docs/contact
- Tutorials: https://www.rabbitmq.com/tutorials
- Monitoring: https://www.rabbitmq.com/docs/monitoring
-
- Logs: /opt/homebrew/var/log/rabbitmq/rabbit@localhost.log
-
-
- Config file(s): (none)
-
- Starting broker... completed with 7 plugins.
-
- chenzh12@chenzh12deiMac ~ % rabbitmqctl stop
- Stopping and halting node rabbit@localhost ...
不推荐,因为要求系统用户名和计算机名必须是英文,而Win10改名比较麻烦,而且可能会有其他坑,而且和未来的实际工作场景严重不符,没有Windows作为服务器的。
开启Web管理后台:rabbitmq-plugins enable rabbitmq_management
- [root@iZuf6eqvrwmawrgxqo5kw7Z ~]# rabbitmq-plugins enable rabbitmq_management
- Enabling plugins on node rabbit@iZuf6eqvrwmawrgxqo5kw7Z:
- rabbitmq_management
- The following plugins have been configured:
- rabbitmq_management
- rabbitmq_management_agent
- rabbitmq_web_dispatch
- Applying plugin configuration to rabbit@iZuf6eqvrwmawrgxqo5kw7Z...
- The following plugins have been enabled:
- rabbitmq_management
- rabbitmq_management_agent
- rabbitmq_web_dispatch
-
- started 3 plugins.
添加admin用户
- [root@iZuf6eqvrwmawrgxqo5kw7Z ~]# rabbitmqctl add_user admin password
- Adding user "admin" ...
- [root@iZuf6eqvrwmawrgxqo5kw7Z ~]# rabbitmqctl set_user_tags admin administrator
- Setting tags for user "admin" to [administrator] ...
-
注:默认用户只有guest,远端无法访问
添加admin用户后,需要在ECS安全组中打开端口15672,然后在登录页面(IP:15672)使用admin的密码进行登录
Admin→Virtual Hosts→Set permission在虚拟主机需要给新创建的admin添加权限
使用Java创建生产者和消费者连接RabbitMQ服务,生产者Send向名为'hello'的消息队列发送消息,消费者Recv从指定的队列接收消息并进行处理。
- /**
- * 描述: 生产者连接到RabbitMQ服务端,然后发送一条消息,然后退出。
- */
- public class Send {
- private final static String QUEUE_NAME = "hello"; // 队列名
-
- public static void main(String[] args) throws IOException, TimeoutException {
- // 创建连接工厂,用于配置RabbitMQ连接。
- ConnectionFactory factory = new ConnectionFactory();
- // 配置RabbitMQ连接,服务端地址用于连接
- factory.setHost("IP"); //运行RabbitMQ的阿里云实例ip
- factory.setUsername("admin"); // 1.需要给ECS实例开端口5672;2.默认创建的guest用户权限虽高,但不支持远端登录,此处使用自己创建的admin;3.Virtual Hosts添加admin
- factory.setPassword("password");
- // 建立连接
- Connection connection = factory.newConnection();
- // 获得信道
- Channel channel = connection.createChannel();
- // 声明队列,参数解释:队列名称、是否持久化、是否独占、是否自动删除、其他参数(如消息 TTL、最大长度等)
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
- // 发布消息
- String message = "Hello World!7";
- channel.basicPublish("",QUEUE_NAME,null,message.getBytes("UTF-8")); // 参数解释:交换机名称(默认交换机为空字符串)、队列名称、消息属性、消息内容(字节数组)
- System.out.println("发送消息:" + message);
- // 关闭连接
- channel.close();
- connection.close();
- }
- }
-
- /**
- * 描述:消费者接收消息,并打印,持续运行
- */
- public class Recv {
- private final static String QUEUE_NAME = "hello"; // 队列名
-
- public static void main(String[] args) throws IOException, TimeoutException {
- //创建连接工厂,用于配置 RabbitMQ 连接的参数
- ConnectionFactory factory = new ConnectionFactory();
- //设置RabbitMQ地址
- factory.setHost("IP");
- factory.setUsername("admin");
- factory.setPassword("password");
- //建立连接
- Connection connection = factory.newConnection();
- //获得信道
- Channel channel = connection.createChannel();
- //声明队列
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
- //接收消息并消费,开始接收指定队列中的消息并消费。参数包括队列名称、是否自动确认消息、消费者对象
- channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel){ // 使用了匿名内部类DefaultConsumer,并重写了其handleDelivery方法,用于处理接收到的消息
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope,
- BasicProperties properties, byte[] body) throws IOException {
- String message = new String(body, "UTF-8");
- System.out.println("收到消息:" + message);
- }
- });
- }
- }
-
当生产者发送消息并被消费这消费之后,可以在RabbitMQ管理后台中直观看到队列中的消息变化:
当存在多个消费者时,RabbitMQ如何分配队列中的的消息?本节介绍循环调度和公平派遣两种策略。
注:IDEA设置多实例并行运行,Run/Debug Configurations→Modify options→Allow mutiple instances开启
RabbitMQ循环调度
循环调度是指当多个消费者同时订阅同一个队列,并且队列中有多条消息时,RabbitMQ 将以循环的方式将消息分发给各个消费者,即逐条将消息轮流发送给不同的消费者。这种行为是 RabbitMQ 的默认行为,称为"轮询"分发(Round-Robin),用于在多个消费者之间均匀地分配消息负载。
在循环调度中,消息将平均分布到每个消费者上,每个消费者一次处理一个消息,直到队列中的所有消息都被处理完毕
RabbitMQ公平派遣(Fair Dispatch)
公平派遣(Fair Dispatch)是一种消息分发策略,旨在确保每个消费者都能平等地获得工作,而不会出现某些消费者繁忙而其他消费者闲置的情况。在工作队列模式中,通常使用公平派遣来提高系统的整体性能和公平性。公平派遣的关键在于使用消息预取(Prefetch Count)机制,需要注意公平派遣需要增加消息确认机制。消费者可以使用channel.basicQos(number)设置每次从队列中取出的消息数,并使用方法channel.basicAck()手动确认消息被消费,这样就不会出现有的消费者繁忙有的限制的情况。
- /**
- * 描述:消费者,接收前面的批量消息
- */
- public class Worker {
-
- private final static String TASK_QUEUE_NAME = "task_queue"; // 存储队列的名称
-
- public static void main(String[] args) throws IOException, TimeoutException {
- //创建连接工厂
- ConnectionFactory factory = new ConnectionFactory();
- //设置RabbitMQ地址
- factory.setHost("localhost"); // 启用本地RabbitMQ客户端,使用默认guest账户登录
- //建立连接
- Connection connection = factory.newConnection();
- //获得信道
- final Channel channel = connection.createChannel();
- //声明队列
- channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
- System.out.println("开始接收消息");
- **channel.basicQos(1);** // 设置每次从队列中获取的消息数量为1,这样可以确保每个消费者在处理完一条消息之前不会接收到新的消息,这也称为"公平派遣"。
- // 从队列中消费消息。参数解释:队列名称、是否自动确认消息、消费者对象(这里使用了 DefaultConsumer,重写了其 handleDelivery 方法来处理消息)
- channel.basicConsume(TASK_QUEUE_NAME, false, new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope,
- BasicProperties properties, byte[] body) throws IOException { // 方法参数解释:消费者标签、信封(包含消息元数据,如交换机、路由键等信息)、消息属性、消息内容
- String message = new String(body, "UTF-8");
- System.out.println("收到了消息:" + message);
- try {
- doWork(message);
- }finally {
- System.out.println("消息处理完成");
- **channel.basicAck(envelope.getDeliveryTag(), false); // 手动确认消息已经被消费,参数解释:消息标识符、是否批量确认**
- }
- }
- });
- }
-
- private static void doWork(String task) { //根据字符(消息内容)处理任务,含有'.'则等待一秒
- char[] chars = task.toCharArray();
- for (char ch : chars) {
- if (ch == '.') {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
RabbitMQ 常用的 Exchange Type 有 fanout、direct、topic、headers 这四种(AMQP 规范里还提到两种 Exchange Type,分别为 system 与 自定义,这里不予以描述)
fanout:广播,这种模式只需要将队列绑定到交换机上即可,是不需要设置路由键的;fanout 类型的交换机会把所有发送到该交换机的消息路由到所有与它绑定的队列中,不需要做任何判断操作,所以也是速度最快的。
- /**
- * 描述:发送日志信息
- */
- public class EmitLog {
- private static final String EXCHANGE_NAME = "logs"; // 交换机名称
- public static void main(String[] args) throws IOException, TimeoutException {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- // 声明了一个名为EXCHANGE_NAME的交换机,交换机类型为****FANOUT**
- **channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); **
- String message = "info:Hello World4"; // 发送内容
- // **发布消息到指定的交换机。参数依次为交换机名称、路由键(对于fanout类型的交换机,路由键为空)、消息属性、消息内容的字节数组。字符串消息内容转换为UTF-8编码的字节数组进行发送
- channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes("UTF-8"));**
- System.out.println("发送了消息:" + message);
- channel.close();
- connection.close();
- }
- }
- /**
- * 描述:接收日志消息
- */
- public class ReceiveLogs {
- private static final String EXCHANGE_NAME = "logs";
- public static void main(String[] args) throws IOException, TimeoutException {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- // 声明了一个名为EXCHANGE_NAME的交换机,交换机类型为****FANOUT**
- channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
- String queueName = channel.queueDeclare().getQueue(); // 声明了临时队列,并获取到队列的名称。临时队列是在没有指定队列名称的情况下创建的,每次连接时都会生成一个不同的队列名。
- channel.queueBind(queueName, EXCHANGE_NAME, ""); // 完成交换机和队列的绑定**
- System.out.println("开始接收消息");
- Consumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope,
- BasicProperties properties, byte[] body) throws IOException {
- String message = new String(body, "UTF-8");
- System.out.println("收到消息:" + message);
- }
- };
- channel.basicConsume(queueName, true, consumer); // 消费指定队列中的消息。参数依次为队列名称、是否自动确认消息、消费者对象
- }
- }
direct:根据RoutingKey匹配消息路由到指定队列;把消息路由到那些 Bindingkey 与 RoutingKey 完全匹配的 Queue 中。例如,channel.basicPublish(EXCHANGE_NAME, "info", null, message.getBytes("UTF-8"))中的路由键"Info"与交换机绑定队列的channel.queueBind(queueName, EXCHANGE_NAME, "info")**方法的绑定键"Info"完全匹配。
- public class EmitLogDirect {
- private static final String EXCHANGE_NAME = "direct_logs"; // 交换机名称
- public static void main(String[] args) throws IOException, TimeoutException {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- // **声明了一个名为EXCHANGE_NAME的交换机,交换机类型为****DIRECT**
- channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
- // 发布消息到指定的交换机。参数依次为交换机名称、路由键(对于fanout类型的交换机,路由键为空)、消息属性、消息内容的字节数组。字符串消息内容转换为UTF-8编码的字节数组进行发送
- String message = "info:Hello World!";
-
- **channel.basicPublish(EXCHANGE_NAME, "info", null, message.getBytes("UTF-8"));**
- System.out.println("发送了消息," + "等级为info,消息内容:" + message);
-
- message = "warning:Hello World!";
- channel.basicPublish(EXCHANGE_NAME, "warning", null, message.getBytes("UTF-8"));
- System.out.println("发送了消息," + "等级为warning,消息内容:" + message);
-
- message = "error:Hello World!";
- channel.basicPublish(EXCHANGE_NAME, "error", null, message.getBytes("UTF-8"));
- System.out.println("发送了消息," + "等级为error,消息内容:" + message);
- channel.close();
- connection.close();
- }
- }
- /**
- * 描述:接收info,warning,error3个等级的日志
- */
- public class ReceiveLogsDirect1 {
- private static final String EXCHANGE_NAME = "direct_logs";
-
- public static void main(String[] args) throws IOException, TimeoutException {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
- //生成一个随机的临时的queue
- String queueName = channel.queueDeclare().getQueue();
- //一个交换机同时绑定3个queue
- channel.queueBind(queueName, EXCHANGE_NAME, "info"); // 绑定Routingkey
- channel.queueBind(queueName, EXCHANGE_NAME, "warning");
- channel.queueBind(queueName, EXCHANGE_NAME, "error");**
-
- System.out.println("开始接收消息");
- Consumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope,
- BasicProperties properties, byte[] body) throws IOException {
- String message = new String(body, "UTF-8");
- System.out.println("收到消息:" + message);
- }
- };
- channel.basicConsume(queueName, true, consumer);
- }
- }
- /**
- * 描述:接收error等级的日志
- */
- public class ReceiveLogsDirect2 {
- private static final String EXCHANGE_NAME = "direct_logs";
-
- public static void main(String[] args) throws IOException, TimeoutException {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
- //生成一个随机的临时的queue
- String queueName = channel.queueDeclare().getQueue();
- //一个交换机绑定1个queue
- channel.queueBind(queueName, EXCHANGE_NAME, "error");
-
- System.out.println("开始接收消息");
- Consumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope,
- BasicProperties properties, byte[] body) throws IOException {
- String message = new String(body, "UTF-8");
- System.out.println("收到消息:" + message);
- }
- };
- channel.basicConsume(queueName, true, consumer);
- }
- }
topic:生产者指定RoutingKey消息根据消费端指定的队列,通过模糊匹配的方式进行相应转发;匹配规则:'*'可以代替一个单词,'#'可以替代零个或者多个单词;
- /**
- * 描述:topic模式交换机,发送消息
- */
- public class EmitLogTopic {
- private static final String EXCHANGE_NAME = "topic_logs";
- public static void main(String[] args) throws IOException, TimeoutException {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);// 交换机类型TOPIC
-
- String message = "Animal World";
-
- String[] routingKeys = new String[9];
- routingKeys[0] = "quick.orange.rabbit";
- routingKeys[1] = "lazy.orange.elephant";
- routingKeys[2] = "quick.orange.fox";
- routingKeys[3] = "lazy.brown.fox";
- routingKeys[4] = "lazy.pink.rabbit";
- routingKeys[5] = "quick.brown.fox";
- routingKeys[6] = ".orange."; // **匹配到ReceiveLogsTopic1,则**
- routingKeys[7] = "quick.orange.male.rabbit";
- routingKeys[8] = "lazy.orange.male.rabbit";
-
- for (int i = 0; i < routingKeys.length; i++) {
- channel.basicPublish(EXCHANGE_NAME, routingKeys[i], null, message.getBytes("UTF-8"));
- System.out.println("发送了:" + message+" routingKey:"+routingKeys[i]);
- }
-
- channel.close();
- connection.close();
- }
- }
- 开始接收消息
- 收到消息:Animal World routingKey: quick.orange.rabbit
- 收到消息:Animal World routingKey: lazy.orange.elephant
- 收到消息:Animal World routingKey: quick.orange.fox
- 收到消息:Animal World routingKey: quick.orange.rabbit
- 收到消息:Animal World routingKey: lazy.orange.elephant
- 收到消息:Animal World routingKey: quick.orange.fox
- 收到消息:Animal World routingKey: .orange.
- /**
- * 描述: 特定路由键
- */
- public class ReceiveLogsTopic1 {
- private static final String EXCHANGE_NAME = "topic_logs";
- public static void main(String[] args) throws IOException, TimeoutException {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
- //生成一个随机的临时的queue
- String queueName = channel.queueDeclare().getQueue();
- String routingKey = "*.orange.*";
- channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
-
- System.out.println("开始接收消息");
- Consumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope,
- BasicProperties properties, byte[] body) throws IOException {
- String message = new String(body, "UTF-8");
- System.out.println("收到消息:" + message + " routingKey: " + envelope.getRoutingKey());
- }
- };
- channel.basicConsume(queueName, true, consumer);
- }
- }
- /**
- * 描述:特定路由键2
- */
- public class ReceiveLogsTopic2 {
- private static final String EXCHANGE_NAME = "topic_logs";
- public static void main(String[] args) throws IOException, TimeoutException {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
- //生成一个随机的临时的queue
- String queueName = channel.queueDeclare().getQueue();
- String routingKey = "*.*.rabbit";
- channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
- String routingKey2 = "lazy.#";
- channel.queueBind(queueName, EXCHANGE_NAME, routingKey2);
-
- System.out.println("开始接收消息");
- Consumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope,
- BasicProperties properties, byte[] body) throws IOException {
- String message = new String(body, "UTF-8");
- System.out.println("收到消息:" + message + " routingKey: " + envelope.getRoutingKey());
- }
- };
- channel.basicConsume(queueName, true, consumer);
- }
- }
headers:不推荐,根据发送消息内容中的headers属性来匹配。
- **pom.xml**
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-starter-amqpartifactId>
- dependency>
-
- **aplication.properties**
- ## 配置RabbitMQ的属性
- server.port=8080
- spring.application.name=producer
-
- spring.rabbitmq.addresses=127.0.0.1:5672
- spring.rabbitmq.username=guest
- spring.rabbitmq.password=guest
- spring.rabbitmq.virtual-host=/
- spring.rabbitmq.connection-timeout=15000
Spring AMQP 提供的一个方法,用于向 RabbitMQ 发送消息方法如下:
- rabbitmqTemplate.convertAndSend(exchange, routingKey, message);
- 其中:
- exchange 是要发送消息到的交换机的名称。
- routingKey 是消息的路由键,用于将消息路由到匹配的队列。
- message 是要发送的消息内容。
- /**
- * 描述: rabbitmq配置类
- */
- @Configuration //标识这是一个配置类
- public class TopicRabbitConfig {
-
- @Bean
- public Queue queue1() {
- return new Queue("queue1");
- } // 定义了一个名为队列"queue1"
-
- @Bean
- public Queue queue2() {
- return new Queue("queue2");
- }
-
- @Bean
- TopicExchange exchange() {
- return new TopicExchange("bootExchange");
- } //定义了一个名为bootExchange的Topic交换机
- **// 绑定到交换机上,并指定路由键**
- @Bean
- Binding bingdingExchangeMessage1(Queue queue1, TopicExchange exchange) { // queue1队列绑定到交换机上,并指定路由键为"dog.red"
- return BindingBuilder.bind(queue1).to(exchange).with("dog.red");
- }
-
- @Bean
- Binding bingdingExchangeMessage2(Queue queue2, TopicExchange exchange) { // queue2队列绑定到交换机上,并指定路由键为"dog.#"
- return BindingBuilder.bind(queue2).to(exchange).with("dog.#");
- }
- }
- /**
- * 描述:发送消息
- */
- @Component
- public class MsgSender {
- @Autowired
- private AmqpTemplate rabbitmqTemplate; // @Autowired注解将AmqpTemplate接口的实例注入到rabbitmqTemplate字段中,用于发送消息到 abbitMQ
-
- public void send1() {
- String message = "This is message1, routing key is dog.red";
- System.out.println("发送了:" + message);
- this.**rabbitmqTemplate.convertAndSend**("bootExchange","dog.red",message); // 将消息发送到名为"bootExchange"的交换机,并使用"dog.red"作为路由键
- }
-
- public void send2() {
- String message = "This is message2, routing key is dog.black";
- System.out.println("发送了:" + message);
- this.**rabbitmqTemplate.convertAndSend**("bootExchange","dog.black",message);
- }
- }
- /**
- * 描述:消费者1
- */
- @Component
- @RabbitListener(queues = "queue1") // **标记一个方法或类是RabbitMQ消息的监听器**,并指定监听的队列名称
- public class Receiver1 {
-
- @RabbitHandler // **标记RabbitMQ消息处理方法**。当消费者接收到消息时,Spring将调用该方法来处理消息。方法的参数类型为String,表示接收到的消息内容。
- public void process(String message) {
- System.out.println("Receiver1: " + message);
- }
- }
- /**
- * 描述:消费者2
- */
- @Component
- @RabbitListener(queues = "queue2")
- public class Receiver2 {
-
- @RabbitHandler
- public void process(String message) {
- System.out.println("Receiver2: " + message);
- }
- }
-
- 当send1和send2发送消息后,消费者收到的结果如下:
- *Receiver1: This is message1, routing key is dog.red
- Receiver2: This is message1, routing key is dog.red
- Receiver2: This is message2, routing key is dog.black*