RabbitMQ 是基于 Erlang 语言开发的开源消息通信中间件
RabbitMQ中的几个概念:
(1)channel:操作MQ的工具
(2)exchange:路由消息到队列中
(3)queue:缓存消息
(4)virtual host:虚拟主机,是对queue、exchange等资源的逻辑分组
MQ的官方文档中给出了5个 MQ 的 Demo 示例,对应了几种不同的用法:
(1)基本消息队列(BasicQueue)
(2) 工作消息队列(WorkQueue)
(3)发布订阅(Publish、Subscribe),又根据交换机类型不同分为三种:
1️⃣Fanout Exchange:广播
2️⃣Direct Exchange:路由
3️⃣Topic Exchange:主题
官方的 HelloWorld 是基于最基础的消息队列模型来实现的,只包括三个角色:
(1)publisher:消息发布者,将消息发送到队列 queue
(2)queue:消息队列,负责接受并缓存消息
(3)consumer:订阅队列,处理队列中的消息
(1)基本消息队列的消息发送流程:
1️⃣建立 connection
2️⃣创建 channel
3️⃣利用 channel 声明队列
4️⃣利用 channel 向队列发送消息
public class PublisherTest { @Test public void testSendMessage() throws IOException, TimeoutException { // 1.建立连接 ConnectionFactory factory = new ConnectionFactory(); // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码 factory.setHost("192.168.150.101"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("itcast"); factory.setPassword("123321"); // 1.2.建立连接 Connection connection = factory.newConnection(); // 2.创建通道Channel Channel channel = connection.createChannel(); // 3.创建队列 String queueName = "simple.queue"; channel.queueDeclare(queueName, false, false, false, null); // 4.发送消息 String message = "hello, rabbitmq!"; channel.basicPublish("", queueName, null, message.getBytes()); System.out.println("发送消息成功:【" + message + "】"); // 5.关闭通道和连接 channel.close(); connection.close(); } }
(2)基本消息队列的消息接收流程:
1️⃣建立 connection
2️⃣创建 channel
3️⃣利用 channel 声明队列
4️⃣定义 consumer 的消费行为 handleDelivery()
5️⃣利用 channel 将消费者与队列绑定
public class ConsumerTest { public static void main(String[] args) throws IOException, TimeoutException { // 1.建立连接 ConnectionFactory factory = new ConnectionFactory(); // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码 factory.setHost("192.168.150.101"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("itcast"); factory.setPassword("123321"); // 1.2.建立连接 Connection connection = factory.newConnection(); // 2.创建通道Channel Channel channel = connection.createChannel(); // 3.创建队列 String queueName = "simple.queue"; channel.queueDeclare(queueName, false, false, false, null); // 4.订阅消息 channel.basicConsume(queueName, true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // 5.处理消息 String message = new String(body); System.out.println("接收到消息:【" + message + "】"); } }); System.out.println("等待接收消息。。。。"); } }