本质上是个队列,队列中存放的是message,还是一种跨进程的通信机制,用于上下游传递信息。是一种“逻辑解耦+物理解耦”的消息通信服务。使用MQ之后,消息发送上游只需要依赖MQ,不用关注其他服务
单机吞吐量万级,时效性ms级,可用性高,基于主从架构实现,可靠性较高。但是高吞吐量场景较少使用
百万级TPS的吞吐量,分布式的,一个数据多个副本,少数机器宕机不会丢失数据,有第三方web管理界面。主要特点是基于Pull模式来处理消息消费,追求高吞吐量,用于日志收集和传输,适合产生大量数据的互联网服务的数据收集业务。
出自alibaba,用Java实现,单机吞吐量十万级,可用性高,分布式架构。消息做到0丢失,MQ功能较为完善,扩展性较好。常用于金融互联网领域,尤其在电商里面的订单扣款,以及业务消峰,在大量交易涌入时,后端可能无法及时处理,使用RocketMQ在稳定性上更为可靠
MQ功能比较完善,健壮,稳定,易用,跨平台,支持多种语言;开源提供的管理界面,更新频率相当高
产生数据发送消息的程序
接受来自生成者的消息,将消息推送到队列中。交换机必须要知道如何处理他接收到的消息,将这些消息推送到特定队列还是推送到多个队列;又或者是将消息丢弃,由交换机类型决定
本质上是一个大的消息缓冲区,队列仅受主机的内存和磁盘限制的约束,许多生产者可以将消息发送到一个队列,许多消费者可以从一个队列接受数据
消费者大多是等待接受消息的程序,消费者和消费中间件在很多时候并不在同一机器上,同一个应用程序既可以是生产者又可以是消费者
Broker:接受和分发消息的应用,RabbitMQ Server就是Message Broker
Virtual host: 当多个用户使用同一个RabbitMQ Server提供服务时,可划分出多个vhost,每个用户在自己的vhost上创建exchange/queue等
Connection: publisher/consumer和broker之间的TCP连接
Channel:在connection内部建立的逻辑连接,如果支持多线程,每个thread创建单独的channel进行通讯,完全隔离,减少了建立TCP connection的开销
Exchange:交换机,根据分发规则,匹配查询表中的routing key,分发消息到队列中。常用的类型有:direct,topic(publish-subscribe)以及fanout(multicast)
Queue:消息最终被送到队列中等待被消费者取走
Binding:exchange和queue之间的虚拟连接,可以包含routing key,被保存到exchange中的查询表中,用于message的分发
针对CentOS7,需要安装对应的elang和rebbitmq server安装包
rpm -ivh erlang-23.2.3-1.el7.x86_64.rpm
yum install socat -y
(yum命令安装socat依赖)
rpm -ivh rabbitmq-server-3.8.11-1.el7.noarch.rpm
添加开机自启RabbitMQ服务(可不做):chkconfig rabbitmq-server on
启动服务:/sbin/service rabbitmq-server start
查看服务状态:/sbin/service rabbitmq-server status
停止RabbitMQ服务:/sbin/service rabbitmq-server stop
停止MQ服务后再开启web管理插件 rabbitmq-plugins enable rabbitmq_management
再次启动RabbitMQ服务,使用默认账号密码(guest)访问:http://主机IP:15672(如不能访问,需要先关闭防火墙)
需要添加一个新用户并且设置权限完成登录
创建账号:rabbitmqctl add_user admin hz1234
设置用户角色:rabbitmqctl set_user_tags admin administrator
设置用户权限(在vhost/下设置配置,读写权限)
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
列举当前用户和角色信息:rabbitmqctl list_users
<dependencies>
<!--rabbitmq的依赖客户端-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
<!--操作IO流的依赖-->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
</dependencies>
/**
* 生产者 发送消息
*/
public class Producer {
public static final String QUEUE_NAME = "hello";
public static void main(String[] args) {
// 创建RabbitMQ连接工厂 设置工厂IP,用户名以及密码
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("linux1");
factory.setUsername("admin");
factory.setPassword("hz1234");
// 创建连接 获取信道channel 生成队列
try {
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 参数列表: 队列name
// 队列中的消息是否持久化(默认将message存储在内存中)
// 该队列是否允许多个消费者消费(默认不允许)
// 是否自动删除,最后一个消费者断开连接之后,queue是否自动删除(true表示自动删除)
// 其他参数
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World";
// 参数列表:
// 发送到哪个交换机(交换机名String)
// 路由的key值是哪一个(queue name)
// 其他参数
// 消息体的字节类型
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
System.out.println("message transmit is over...");
}
}
public class Consumer {
public static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
// 创建RabbitMQ连接工厂 设置工厂IP,用户名以及密码
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("linux1");
factory.setUsername("admin");
factory.setPassword("hz1234");
// 创建连接 获取信道channel 生成队列
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 成功接受消息的回调函数(lambda表达式)
DeliverCallback callback = (tag, message) -> {
System.out.println(new String(message.getBody()));
};
// 消息接受失败的回调函数
CancelCallback cancelCallback = tag -> {
System.out.println("消息被中断...");
};
// 接受消息basicConsume参数列表:
// 消费哪个队列
// 是否需要自动应答true(手动应答false)
// 消息接受成功的反馈
// 消息接受失败的反馈
channel.basicConsume(QUEUE_NAME, true, callback, cancelCallback);
}
}
Work Queues任务队列的主要思想是避免立即执行资源密集型任务,而不得不等待他完成。将任务封装成消息并将其发送到队列,在后台运行的工程进程将弹出任务并最终执行作业,有多个工作线程时,这些工作线程将一起处理这些任务
启动三个工作线程(consumer),一个消息发送线程(producer),注意同一个消息只能被某一个工作线程处理一次,不能被处理多次
/**
* 工作线程1:消费者
*/
public class Worker1 {
public static final String QUEUE_NAME = "Work Queues--轮询分发";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQUtils.getChannel();
// 接受消息的反馈
DeliverCallback receiveCallback = (tag, message) -> {
System.out.println("receive message:" + new String(message.getBody()));
};
// 取消接受的回调
CancelCallback cancelCallback = tag -> {
System.out.println(tag + "message cancel receive");
};
System.out.println("W1等待接受消息...");
channel.basicConsume(QUEUE_NAME, true, receiveCallback, cancelCallback);
}
}
/**
* 连接工厂创建信道的工具类
*/
public class RabbitMQUtils {
public static Channel getChannel() throws Exception{
// 创建RabbitMQ连接工厂 设置工厂IP,用户名以及密码
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("linux1");
factory.setUsername("admin");
factory.setPassword("hz1234");
// 创建连接 获取信道channel 生成队列
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
return channel;
}
}
public class Prod {
public static final String QUEUE_NAME = "Work Queues--轮询分发";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQUtils.getChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
Scanner scanner = new Scanner(System. in);
while(scanner.hasNext()){
String message = scanner.nextLine();
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("发送消息:" + message);
}
}
}
消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个时间较长的任务只完成部分突然就宕机了。而RabbitMQ向消费者传递一条消息,立即将这条消息标记为删除,因此会丢失正在处理中的消息。
为了保证消息在发送过程中不发生丢失,RabbitMQ引入消息应答机制,消息应答就是:消费者在接收到消息并且处理完该消息之后,告诉RabbitMQ他已经处理完成,此时RabbitMQ可以将这条消息进行删除
消息一旦发送立即被认为已经传送成功,由于消费者可能出现连接channel关闭,那么消息就丢失了,而生产者传递的消息数量并没有限制,可能会导致消费者这边还没来得及处理,消息的积压造成内存耗尽,最终这些消费者进程被系统杀死。该模式仅适用于消费者可以高效并以某种速率能够处理这些消息的情况下使用
手动应答的优点:可以批量应答并且减少网络拥堵
Channel.basicAck (tag, true)
(用于肯定确认):第二个参数如果设置为true表示批量应答channel上未应答的消息,如当前tag=1,2,3,4,此时来到4,之前的都会被应答;反之只有当前时刻tag=4会被应答Channel.basicNac(long var1, boolean var3, boolean var4)
(用于否定确认)Channel.basicReject(long var1, boolean var3)
(用于否定确认)如果消费者由于某些原因失去连接,导致消息未发送ACK确认,MQ知道该消息未被处理,将对其重新排队。如果此时其他消费者可以处理,将消息重新分发
public class autoAskProd {
public static final String TASK_QUEUE_NAME = "ack_queue1";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQUtils.getChannel();
channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
Scanner scanner = new Scanner(System.in);
while(scanner.hasNext()){
String msg = scanner.next();
channel.basicPublish("", TASK_QUEUE_NAME, null, msg.getBytes("UTF-8"));
System.out.println("producer send message: " + msg);
}
}
}
public class autoAskConsumer1 {
public static final String TASK_QUEUE_NAME = "ack_queue1";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQUtils.getChannel();
System.out.println("W1等待1s之后接受消息...");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String msg = new String(delivery.getBody(), "UTF-8");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("接收到消息: " + msg);
// 接收到之后进行手动应答 basicAck(long var1, boolean var3)
// 第一个参数表示消息标记tag
// 第二个参数设置为false,代表只应答接收到的当前消息,为true则批量应答
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
// 手动应答设置为false
boolean autoAsk = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAsk, deliverCallback, consumerTag -> {});
}
}
public class autoAckConsumer2 {
public static final String TASK_QUEUE_NAME = "ack_queue1";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQUtils.getChannel();
System.out.println("W2等待10s之后接受消息...");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String msg = new String(delivery.getBody(), "UTF-8");
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("接收到消息: " + msg);
// 接收到之后进行手动应答 basicAck(long var1, boolean var3)
// 第一个参数表示消息标记tag
// 第二个参数设置为false,代表只应答接收到的当前消息,为true则批量应答
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
// 手动应答设置为false
boolean autoAsk = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAsk, deliverCallback, consumerTag -> {});
}
}
测试,当轮序接受消息时,由于消费者2等待时间较长,如果在等待期间消费者2宕机了,那么消费者2收到的消息不会丢失,MQ将其重新放入队列,交由消费者1进行处理。
之前创建的队列都是非持久化的(只存储在内存中),MQ一旦重启,队列就会被删除。
如何保证MQ服务停止后消息生成者发送的消息不丢失?
在声明队列时将durable参数设置为true和消息都标记为持久化
注意:如果之前声明的队列是非持久化的,需要删除原队列,或者新建一个持久化队列,否则会报错
// 队列持久化
boolean durable = true;
channel.queueDeclare(TASK_QUEUE_NAME, durable, false, false, null);
// 消息持久化 MessageProperties.PERSISTENT_TEXT_PLAIN告诉MQ将消息存到磁盘
channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes("UTF-8"));
MQ分发消息采用的是轮询方式,如果某个消费者处理消息速度很快,某个消费者处理消息很慢,就会导致某个消费者大部分时间处于空闲,有的则一直在忙碌。为了避免这种情况,在消费者方设置参数 channel.basicQos(1);
// 设置不公平分发(默认为0轮询分发)
int prefetchCount=1;
channel.basicQos(prefetchCount);
// 手动应答设置为false
boolean autoAsk = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAsk, deliverCallback, consumerTag -> {});
消息发送是异步的,那么channel中肯定不止一个消息来自其他消费者的手动确认,存在一个未确认的消息缓冲区。需要限制缓冲区的大小,避免缓冲区中无限制的未确认消息问题。
使用basicQos()
方法设置预取计数值来完成,定义channel上允许的未确认消息的最大数量。通常,增加预取值会提高向消费者传递消息的速度。虽然自动应答传输消息速率是最佳的,但是已传递但未处理的消息也会增多,从而增加了消费者的RAM消耗。
修改两个消费者中的prefetchCount预取值,较慢的为2,处理快的为10