1、安装前准备
由于RabbitMQ
使用的是Erlang
语言开发的,因此在安装RabbitMQ
之前需要安装Erlang
环境,Erlang
与RabbitMQ
的下载地址分别为:
Erlang
:https://github.com/rabbitmq/erlang-rpm/releases
RabbitMQ
:https://github.com/rabbitmq/rabbitmq-server/releases
注意:
RabbitMQ
与Erlang
安装时是有版本对于关系,可以查看:https://www.rabbitmq.com/which-erlang.html
从版本对应关系可以看出,RabbitMQ
的最新3.9.13
需要Erlang
的最低23.2
版本,另外下载的时候还需要注意一点,安装的系统为Centos7
,因此在下载RabbitMQ
与Erlang
版本的时候需要下载xxx.el7
的版本,我这里下载版本如下:
rabbitmq-server-3.9.13-1.el7.noarch.rpm
erlang-23.3.4.11-1.el7.x86_64.rpm
将下载好的文件上传到服务器。
2、安装Erlang
使用如下命令安装Erlang
rpm -ivh erlang-23.3.4.11-1.el7.x86_64.rpm
执行命令后,如果出现这样的提示,则需要根据提示下载对应的依赖。
我这里安装提示缺少了libcrypto.so.10(OPENSSL_1.0.2)(64bit)
依赖,可以到https://pkgs.org/下载
将下载好的版本上传到服务器通过如下命令进行安装
rpm -ivh openssl-libs-1.0.2k-19.el7.x86_64.rpm --force
安装结果:
再执行安装Erlang
命令,出现如下结果则表示安装成功:
可以使用命令erl -version
查看Erlang
版本。
3、安装RabbitMQ
上一步我们已经把Erlang
安装成功,现在安装RabbitMQ
,如下:
rpm -ivh rabbitmq-server-3.9.13-1.el7.noarch.rpm
检查是否安装成功rabitmqctl version
4、运行RabbitMQ服务器
4.1、启动服务器
- 添加开机启动
RabbitMQ
服务
chkconfig rabbitmq-server on
- 启动启动
systemctl start rabbitmq-server
- 查看服务器状态
systemctl status rabbitmq-server
如上结果表示RabbitMQ
服务已经成功启动
- 停止服务器
systemctl stop rabbitmq-server
4.2、安装WEB插件
RabbitMQ
默认提供了WEB
插件,方便通过页面进行RabbitMQ
管理,需要执行如下命令启用WEB
插件,启用之前如果RabbitMQ
服务已经启动,则先停止服务。
rabbitmq-plugins enable rabbitmq_management
重新启动RabbitMQ
服务,通过地址:http://192.168.247.136:15672然后访问RabbitMQ
RabbitMQ
默认提供了一个guest
账户,默认的账户没有权限登录不了,因此接下来我们需要创建用户。
4、创建用户
- 创建账号
rabbitmqctl add_user admin 123456
- 设置用户角色
rabbitmqctl set_user_tags admin administrator
- 设置用户权限
命令格式:rabbitmqctl set_permissions [-p <vhostpath>] <user> <conf> <write> <read>
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
上面命令表示用户具有/vhost1
这个virtual host
中的所有资源配置、读、写权限
- 查看用户和角色
rabbitmqctl list_users
可以看到用户已经添加成功并设置了角色,现在就可以使用用户登录RabbitMQ
了。
- 删除用户
rabbitmqctl delete_user guest
5、RabbitMQ集群
在实际的工作场景,一台机器肯定是应付不了的,因此就需要多台机器搭建集群环境,这里我准备了3台虚拟机环境。搭建集群步骤如下:
- 修改3台机器的主机名称
vim /etc/hostname
名称根据自己的需要取名就行,这里我取名为node1
、node2
、node3
- 配置各个节点的
hosts
文件,让各个节点都能互相识别
vim /etc/hosts
192.168.247.133 node1
192.168.247.134 node2
192.168.247.135 node3
- 确保各个节点的cookie文件使用的是同一个值
在node1
上执行远程操作命令
scp /var/lib/rabbitmq/.erlang.cookie root@node2:/var/lib/rabbitmq/.erlang.cookie
会提示是否继续,然后输入node2
的密码
The authenticity of host 'node2 (192.168.247.134)' can't be established.
ECDSA key fingerprint is a2:a0:00:25:7a:3e:45:d1:42:5f:9f:90:09:22:92:17.
Are you sure you want to continue connecting (yes/no)? yes
Warning: Permanently added 'node2,192.168.247.134' (ECDSA) to the list of known hosts.
root@node2's password:
.erlang.cookie 100% 20 0.0KB/s 00:00
将node3
的同步到node1
,与上面操作一样
scp /var/lib/rabbitmq/.erlang.cookie root@node3:/var/lib/rabbitmq/.erlang.cookie
- 分别在三台机器上执行以下命令
rabbitmq-server -detached
- 将
node2
链接到node1
,在node2
机器下执行如下命令
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl start_app
- 将
node3
链接到node2
,在node2
机器下执行如下命令
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node2
rabbitmqctl start_app
当然节点node3
也可以链接到node1
。
- 查看集群状态
rabbitmqctl cluster_status
------------------------
Cluster status of node rabbit@node2 ...
Basics
Cluster name: rabbit@node1
Disk Nodes
rabbit@node1
rabbit@node2
rabbit@node3
Running Nodes
rabbit@node1
rabbit@node2
rabbit@node3
Versions
rabbit@node1: RabbitMQ 3.8.5 on Erlang 21.3.8.1
rabbit@node2: RabbitMQ 3.8.5 on Erlang 21.3.8.1
rabbit@node3: RabbitMQ 3.8.5 on Erlang 21.3.8.1
这里是在node2
上查看集群状态,可以看出有3个节点,由于搭建集群时,将RabbitMQ
重置过,需要重新添加用户并设置权限。到这里我们的集群环境就已经搭建成功,可以登录RabbitMQ
查看搭建结果:
可以看到有3台RabbitMQ
的服务了。
6、在Java中使用
6.1、引入RabbitMQ
包
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.14.1</version>
</dependency>
6.1、定义RabbitMQ
链接工具类
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* ClassName RabbitMQUtil
*
* @Description RabbitMQ连接工具来
* @Author SIE_LiBiao
* @Date 2022/2/11 12:09
* @Version 1.0
*/
public class RabbitMQUtil {
private RabbitMQUtil(){
}
public static Channel getChannel() throws Exception{
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置主机
connectionFactory.setHost("192.168.247.136");
//设置用户名
connectionFactory.setUsername("admin");
//设置密码
connectionFactory.setPassword("123456");
//创建连接
Connection connection = connectionFactory.newConnection();
//创建信道
Channel channel = connection.createChannel();
return channel;
}
}
6.2、消息生产者
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import com.tenghu.rc.utils.RabbitMQUtil;
import java.nio.charset.StandardCharsets;
/**
* ClassName Producers
*
* @Description 消息生产者
* @Author SIE_LiBiao
* @Date 2022/2/12 19:26
* @Version 1.0
*/
public class Producers {
/**
* 交换机
*/
public static final String HELLO_EXCHANGE = "hello_exchange";
/**
* 队列
*/
public static final String HELLO_QUEUE = "hello_queue";
public static void main(String[] args) throws Exception {
//获取信道
Channel channel = RabbitMQUtil.getChannel();
//消息发布确认
channel.confirmSelect();
//创建交换机
channel.exchangeDeclare(HELLO_EXCHANGE, BuiltinExchangeType.DIRECT);
//创建队列
channel.queueDeclare(HELLO_QUEUE,false,false,false,null);
//绑定交换机与队列
channel.queueBind(HELLO_QUEUE,HELLO_EXCHANGE,"");
//发送消息
channel.basicPublish(HELLO_EXCHANGE, "", MessageProperties.PERSISTENT_TEXT_PLAIN, "hello word!".getBytes(StandardCharsets.UTF_8));
if(channel.waitForConfirms()){
System.out.println("消息发布成功!");
}
}
}
执行后,可以在管理页面看到对应的交换机与队列,队列里面有一个未被消费的消息。
进入队列页面就可以看到刚才执行发送的一条消息。
6.2、消息消费者
import com.rabbitmq.client.Channel;
import com.tenghu.rc.utils.RabbitMQUtil;
/**
* ClassName Consumer
*
* @Description 消息消费者
* @Author SIE_LiBiao
* @Date 2022/2/12 19:37
* @Version 1.0
*/
public class Consumer {
/**
* 队列
*/
public static final String HELLO_QUEUE = "hello_queue";
public static void main(String[] args) throws Exception {
//获取信道
Channel channel = RabbitMQUtil.getChannel();
//接收消息
channel.basicConsume(HELLO_QUEUE, false, (consumerTag, message) -> {
System.out.println("获取消息:" + new String(message.getBody()));
//手动确认消息被消费
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
}, consumerTag -> {
System.out.println("消息被取消时回调!");
});
}
}
执行后控制台输出结果:获取消息:hello word!
,消息消费成功后,我们手动确认消息已经被消费了,再去管理页面查看队列里面的消息已经不存在了。