RabbitMQ是一个消息中间件,它接受并且转发消息。RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的。它有以下几个核心概念:
1. 生产者
产生数据发送消息的程序是生产者
2. 交换机
交换机是 RabbitMQ 非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息
推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推
送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定
3. 队列
队列是 RabbitMQ 内部使用的一种数据结构,尽管消息流经 RabbitMQ 和应用程序,但它们只能存
储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可
以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式
4. 消费者
消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者,消费
者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者。
Broker: 接收和分发消息的应用,RabbitMQ Server 就是 Message Broker
Connection: publisher/consumer 和 broker 之间的 TCP 连接
Channel: 如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 thread 创建单独的 channel 进行通讯,AMQP method 包含了 channel id 帮助客户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的Connection 极大减少了操作系统建立 TCP connection 的开销
Exchange: message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout(multicast)
Queue: 消息最终被送到这里等待 consumer 取走
环境: ubuntu20
sudo apt-get install erlang
安装完毕后输入erl
查看是否安装成功
sudo apt-get install rabbitmq-server
开启:service rabbmitmq-server start
关闭:service rabbmitmq-server stop
重启:service rabbmitmq-server restart
查看状态:service rabbitmq-server status
开启web管理插件:sudo rabbitmq-plugins enable rabbitmq_management
创建账号:sudo rabbitmqctl add_user admin 123
设置用户角色: sudo rabbitmqctl set_user_tags admin administrator
设置用户权限:sudo rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
浏览器输入:localhost:15672
,输入刚刚设置的用户名和密码,即可反问web页面
环境: windows10
在官网下载rabbitmq-server和对应版本的eralng
https://www.rabbitmq.com/
然后找到对应的版本下载,注意不同版本的rabbitmq需要对应不同的erlang
下载完之后,右键管理员运行,然后一路next安装即可
将erlang和rabmitmq的安装位置加入系统变量
命令行测试erlang是否安装成功:
重点:如果你的电脑名称含有中文,打开命令行,进入rabbitmq的sbin目录进行以下操作:
rabbitmq-service.bat remove (管理员权限)
set RABBITMQ_BASE=D:\rabbitmq\data (设置的路径中不要含有中文)
rabbitmq-service.bat install (管理员权限)
rabbitmq-plugins enable rabbitmq_management
rabbitmq-service.bat start (如果服务未启动)
打开浏览器,输入http://localhost:15672, 用户名和密码都是guest, 成功登录
也可以创建另外一个管理员用户:
创建账号:rabbitmqctl add_user admin 123
设置用户角色: rabbitmqctl set_user_tags admin administrator
设置用户权限:rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
使用idea创建一个maven工程,在pom文件中导入以下坐标:
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<!--rabbitmq 依赖客户端-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
<!--操作文件流的一个依赖-->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
</dependencies>
创建两个类,一个Producer.java, 一个Consumer.java
package cn.edu.xd.demo;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
//创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setUsername("admin");
factory.setPassword("123");
//channel 实现了自动 close 接口 自动关闭 不需要显示关闭
try (Connection
connection
=
factory.newConnection(); Channel channel =
connection.createChannel()) {
/**
* 生成一个队列
* 1.队列名称
* 2.队列里面的消息是否持久化 默认消息存储在内存中
* 3.该队列是否只供一个消费者进行消费 是否进行共享 true:可以多个消费者消费
* 4.是否自动删除 最后一个消费者断开连接以后 该队列是否自动删除 true:自动删除
* 5.其他参数
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "hello world";
/**
* 发送一个消息
* 1.发送到那个交换机
* 2.路由的 key 是哪个
* 3.其他的参数信息
* 4.发送消息的消息体
*/
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("消息发送完毕");
}
}
}
package cn.edu.xd.demo;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setUsername("admin");
factory.setPassword("123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
System.out.println("等待接收消息 ......... ");
//推送的消息如何进行消费的接口回调
//lambda表达式实现
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
System.out.println(message);
};
//取消消费的一个回调接口 如在消费的时候队列被删除掉了
//匿名内部类实现
CancelCallback cancelCallback = new CancelCallback() {
@Override
public void handle(String s) throws IOException {
System.out.println("消费者信息被中断");
}
};
/**
* 消费者消费消息
* 1.消费哪个队列
* 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答
* 3.消费者未成功消费的回调
*/
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}
在浏览器输入http://localhost:15672, 输入用户名和密码登录,发现消息队列为空
运行Producer.java, 再次在Web页面查看队列:
运行Consumer.java, 再次在Web页面查看队列: