需要服务方和调用方同时在线(同步)
异步通讯、自动补偿与重试、分布式事务、解决流量削峰问题、系统的解耦
消息队列MQ(Rabbit MQ、rocketMQ)
优点是异步操作可以减少客户端等待的时间,但缺点是容易消耗CPU资源,就是开启线程池,也会造成客户端长时间的等待。
详细说明:
VirtualHost(虚拟消息服务器)每个VirtualHost相当于一个相对独立的RabbitMQ服务器,像mysql有数据库的概念并且可以指定用户对库和表等操作的权限。每个VirtualHost之间是相互隔离的,exchange-交换机、queue-队列、message-消息 不能互通
拿数据库(用MySQL)来对比:RabbitMq相当于MySQL,RabbitMq中的VirtualHost就相当于MySQL中的一个db库,你可以在mysql中新建很多库,设置库的权限操作。
15672:通过 http://serverip:15672 访问 RabbitMQ 的 Web 管理界面,默认用户名密码都是 guest。(注意:RabbitMQ 3.0之前的版本默认端口是55672)
5672:RabbitMQ监听AMQP0-9-1和AMQP 1.0协议客户端的连接端口
25672:25672端口用于节点间和CLI工具通信(Erlang分发服务器端口),并从动态范围分配(默认情况下仅限于单个端口,计算方式为AMQP 0-9-1和AMQP 1.0端口+20000),默认情况下通过 RABBITMQ_NODE_PORT 计算是25672,也可以通过RABBITMQ_DIST_PORT环境变量配置
基本消息模型:一个生产者丶默认交换机丶一个队列丶一个消费者。
work消息模型:一个生产者丶默认交换机丶一个队列丶多个消费者。
fanout广播模式/发布/订阅模式:多个消费者,每一个消费这都有自己的队列,每个队列都绑定到交换机,生产者发送消息到交换机-交换机发送到哪个队列
Routing路由模式(direct):在某种场景下,我们希望不同的消息被不同的队列消费,这个时候我们就要用到direct类型的exchange,生产者向交换机发送消息—交换机根据路由key发送给队列-队列的消费者接收消息
Topics(主题模型):Topics模式和direct路由模式类似,区别在于Topic类型的交换机可以匹配通配符
符号(通配符):#表示匹配一个或者多个词
*表示匹配一个词
不会,因为mq大都有阈值设置,超过了就拒绝接收。
不会,mq队列会自动保存。
不会,如果存在多个的话,会存在轮询消费,无论是点对点还是工作队列,还是其他各种类型。需要设置消费模式,广播消费(每一个消费者都会消费)和集群消费(只有一个消费者才会消费)。
通过实现ConfirmCallBack接口,消息发送到交换器Exchange后触发回调。
通过实现ReturnCallback接口,如果消息从交换器发送到对应队列失败时触发(比如根据发送消息时指定的routingKey找不到队列时会触发)
第一步、生产者提交给消息服务器时,使用确认机制。
生产者收不到确认的情况:
解决办法:每次的生产者消息发送都通过日志表记录下来,后续采用手动补偿即可
第二步、消息服务器对应的队列、交换机等都持久化,保证数据的不丢失
如果已经设置了持久化,则给生产者确认的时候就是持久化成功后的确认。这样就算硬盘坏了、持久化的过程断电了,都不 会影响到消息的丢失,因为生产者收不到确认证明没发送成功。
第三步、消费者采用消息确认机制,保证数据的不丢失
1)、消息队列到消费者的方式有首次主动拉取、后续生产者发送时的主动推送和消费者发生异常时的重试三种方式。
2)、消息应答的方式默认为自动,即消费者收到消息时,队列即删除。如果消费者出现了断电等情况,也会被直接删除。 所以要改成手动确认删除后,队列才会真正删除,这样保证了消息的不丢失。
channel.basicQos(1);【每次推送一个,消费成功后再推送下一个】----每次只会给消费者推送1条消息,等待手动ack确认后才会继续发送,手动确认ack操作。
优点:实现能者多劳的公平性了
将收到的消息根据路由规则路由到特定队列
产生原因:
a、生产者向消费者投递和消费的速率不匹配导致。
b、消费者挂了,导致队列阻塞。
解决办法: 核心—提高系统吞吐量才是关键
a、增加broker分区缓存不同的消息/增加MQ的队列数
b、消费者应该使用集群,提高消费的速率
c、生产者投递消息到MQ服务器如果满的情况下,直接换入死信队列,由死信消费者消费。
d、执行客户端回调监听的方法将消息存放到数据库记录,后期实现补偿
Tips:
死信队列是所有的队列都是有的,属于备胎,也有消费者消费的。
如果MQ的内存、硬盘都满了,那死信队列也是没用的,备胎都没了。
死信队列又称为备胎队列,和正常队列一样拥有交换机、路由key和消费者。死信队列不能够和正常队列存放在同一个服务器中,应该分开服务器存放,保证服务的高可用。
应用场景:
1).主队列中的消息过期了还没消费
2).主队列满了【mq是以个数来计算的,不是以内容大小计算】,生产者拒绝接受消息
3).主队列多次消费都是失败
异步操作,获取回调消费结果,需要实现RabbitTemplate.ConfirmCallback 接口,然后重写 confirm()方法。
全局唯一ID + (Redis/DB)
生产者在发送消息时,为每条消息设置一个全局唯一的messageId,消费者拿到消息后:
a、使用setnx命令,将messageId作为key放到redis中:setnx(messageId,1),若返回1,说明之前没有消费过,正常消费;若返回0,说明这条消息之前已消费过,抛弃。
b、将全局消息ID存入数据库,若添加成功则说明首次执行,否则为重复消费
a.代码问题抛出数据异常,不需要重试-----可记录好日志写文件或数据库,新版本发布后人工补偿
b.消费者获取消息后,调用第三方接口,但是调用第三方接口失败,则需要重试------可配置最多5次,每次间隔3s,重试多次还失败的情况下,写到死信队列或数据库表中后期做人工补偿。
如何发起重试:1.消费者方法报错,2.手动ack告诉消费失败
重试多次还失败的情况下,写到死信队列或数据库表中后期做人工补偿。
在分布式系统不可能有强一致性的问题,因为网络抖动等原因,短暂数据延迟是正常的,但是最终的结果一定要是同步的。
Kafka是一种高吞吐量、分布式、基于发布/订阅的消息系统,最初由LinkedIn公司开发,使用Scala语言编写,目前是Apache的开源项目。
跟RabbitMQ、RocketMQ等目前流行的开源消息中间件相比,Kakfa具有高吞吐、低延迟等特点,在大数据、日志收集等应用场景下被广泛使用。
基本概念:
一个典型的 Kafka 体系架构包括若干 Producer(可以是服务器日志,业务数据,页面前端产生的 page view 等等),若干 broker(Kafka 支持水平扩展,一般 broker 数量越多,集群吞吐率越高),若干 Consumer (Group),以及一个 Zookeeper 集群。Kafka 通过 Zookeeper 管理集群配置,选举 leader,以及在 consumer group 发生变化时进行 rebalance。Producer 使用 push(推) 模式将消息发布到 broker,Consumer 使用 pull(拉) 模式从 broker 订阅并消费消息。
参考文章:点击查看
如此达到相互感知,而且kafka扩展也非常的方便。
生产者在投递消息的时候传递key,然后根据key计算hash值存在到具体的broker中,如果是相同的key,最终投递消息都是同一个broker中。
@KafkaListener(topicPartitions = {@TopicPartition(topic = "kaico", partitions = {"0"})})
public void receive(ConsumerRecord<?, ?> consumer) {
System.out.println("topic名称:" + consumer.topic() + ",key:" +
consumer.key() + "," +
"分区位置:" + consumer.partition()
+ ", 下标" + consumer.offset());
}
队列中的消息本身都是有顺序,都遵循了FIFO的原则。
单台消费者是不存在消息的顺序的问题,但是单机版本的消费吞吐比较低,所以一般消费者肯定要集群。在多个消费者消费同一个队列中的消息时候,有可能产生消息顺序行为错乱的问题。
出现顺序乱的原因或背景
解决思想
Kafka保证消息顺序性
Kafka是一款分布式消息发布与订阅系统,特点是高性能、高吞吐量。
Kafka最开始的应用场景就是针对流数据、运营数据等做日志收集,用户画像、日志收集监控。适用于大数据的场景。
MQ英文全拼为message queue,直译为消息队列。作用:异步、解耦、削峰(就是突然来的高并发,MQ也可以不用慌忙。削峰的好处就是避免高并发压垮系统的关键组件,如核心服务或者数据库。)
queue就是来源于数据结构的FIFO队列。而Topic是个抽象的概念,每个Topic底层对应N个queue,而数据也真实存在queue上的。
不会,每条消息都会持久化到CommitLog中,每个Consumer连接到Broker后会维持消费进度信息,当有消息消费后只是当前Consumer的消费进度(CommitLog的offset)更新了。
4.6版本默认48小时后会删除不再使用的CommitLog文件
消费模型由Consumer决定,消费维度为Topic。
集群消费
1.一条消息只会被同Group中的一个Consumer消费
2.多个Group同时消费一个Topic时,每个Group都会有一个Consumer消费到数据
广播消费
消息将对一 个Consumer Group 下的各个 Consumer 实例都消费一遍。即即使这些 Consumer 属于同一个Consumer Group ,消息也会被 Consumer Group 中的每个 Consumer 都消费一次。
RocketMQ没有真正意义的push,都是pull,虽然有push类,但实际底层实现采用的是长轮询机制,即拉取方式
broker端属性 longPollingEnable 标记是否开启长轮询。默认开启
事件驱动方式是建立好长连接,由事件(发送数据)的方式来实时推送。
如果broker主动推送消息的话有可能push速度快,消费速度慢的情况,那么就会造成消息在consumer端堆积过多,同时又不能被其他consumer消费的情况。而pull的方式可以根据当前自身情况来pull,不会造成过多的压力而造成瓶颈。所以采取了pull的方式。
Consumer首次请求Broker
Broker中是否有符合条件的消息
有
没有
通过Topic在多Broker中分布式存储实现。
producer端
发送端指定message queue发送消息到相应的broker,来达到写入时的负载均衡
提升写入吞吐量,当多个producer同时向一个broker写入数据的时候,性能会下降
消息分布在多broker中,为负载消费做准备
默认策略是随机选择:
其他实现:
也可以自定义实现MessageQueueSelector接口中的select方法
MessageQueue select(final List
consumer端
采用的是平均分配算法来进行负载均衡。
其他负载均衡算法
Consumer和queue会优先平均分配,如果Consumer少于queue的个数,则会存在部分Consumer消费多个queue的情况,如果Consumer等于queue的个数,那就是一个Consumer消费一个queue,如果Consumer个数大于queue的个数,那么会有部分Consumer空余出来,白白的浪费了。
影响消息正常发送和消费的重要原因是网络的不确定性。
引起重复消费的原因(下面五个方面分析)
首先多个queue只能保证单个queue里的顺序,queue是典型的FIFO,天然顺序。多个queue同时消费是无法绝对保证消息的有序性的。所以总结如下:
同一topic,同一个QUEUE,发消息的时候一个线程去发送消息,消费的时候 一个线程去消费一个queue里的消息。
Rocket MQ给我们提供了MessageQueueSelector
接口,可以自己重写里面的接口,实现自己的算法,举个最简单的例子:判断i % 2 == 0,那就都放到queue1里,否则放到queue2里。
for (int i = 0; i < 5; i++) {
Message message = new Message("orderTopic", ("hello!" + i).getBytes());
producer.send(
// 要发的那条消息
message,
// queue 选择器 ,向 topic中的哪个queue去写消息
new MessageQueueSelector() {
// 手动 选择一个queue
@Override
public MessageQueue select(
// 当前topic 里面包含的所有queue
List<MessageQueue> mqs,
// 具体要发的那条消息
Message msg,
// 对应到 send() 里的 args,也就是2000前面的那个0
Object arg) {
// 向固定的一个queue里写消息,比如这里就是向第一个queue里写消息
if (Integer.parseInt(arg.toString()) % 2 == 0) {
return mqs.get(0);
} else {
return mqs.get(1);
}
}
},
// 自定义参数:0
// 2000代表2000毫秒超时时间
i, 2000);
}
首先在如下三个部分都可能会出现丢失消息的情况:
Producer端如何保证消息不丢失?
producer.setRetryTimesWhenSendFailed(10);//设置发送失败后重试次数
Broker端如何保证消息不丢失?
flushDiskType = SYNC_FLUSH
Consumer端如何保证消息不丢失?
完全消费正常后在进行手动ack确认。
问题:
下游消费系统如果宕机了,导致几百万条消息在消息中间件里积压,此时怎么处理?
你们线上是否遇到过消息积压的生产故障?如果没遇到过,你考虑一下如何应对?
解答:
首先要找到是什么原因导致的消息堆积,是Producer太多了,Consumer太少了导致的还是说其他情况,总之先定位问题。
然后看下消息消费速度是否正常,正常的话,可以通过上线更多consumer临时解决消息堆积问题。
只有在开发环境中遇到消息堆积的问题,原因是其他系统在发送告警消息的时候是采用定时批量发送导致的。在RocketMQ操作平台直接清除消息。
RocketMQ中的消息只会在commitLog被删除的时候才会消失,不会超时。也就是说未被消费的消息不会存在超时删除这情况。
不会,消息在消费失败后会进入重试队列(%RETRY%+ConsumerGroup),18次(默认18次,网上所有文章都说是16次,无一例外。但是我没搞懂为啥是16次,这不是18个时间吗 ?)才会进入死信队列(%DLQ%+ConsumerGroup)。
源码如下:
public class MessageStoreConfig {
// 每隔如下时间会进行重试,到最后一次时间重试失败的话就进入死信队列了。
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
}
问题
你们用的是RocketMQ?RocketMQ很大的一个特点是对分布式事务的支持,你说说他在分布式事务支持这块机制的底层原理?
解答
分布式系统中的事务可以使用TCC(Try、Confirm、Cancel)、2pc来解决分布式系统中的消息原子性
RocketMQ 4.3+提供分布事务功能,通过 RocketMQ 事务消息能达到分布式事务的最终一致性。
RocketMQ实现方式:
**Half Message:**预处理消息,当broker收到此类消息后,会存储到RMQ_SYS_TRANS_HALF_TOPIC的消息消费队列中
检查事务状态: Broker会开启一个定时任务,消费RMQ_SYS_TRANS_HALF_TOPIC队列中的消息,每次执行任务会向消息发送者确认事务执行状态(提交、回滚、未知),如果是未知,Broker会定时去回调在重新检查。
超时: 如果超过回查次数,默认回滚消息。
也就是他并未真正进入Topic的queue,而是用了临时queue来放所谓的half message,等提交事务后才会真正的将half message转移到topic下的queue。
答:
开发
运维
其实就是send消息的时候queue的选择。源码在如下:
org.apache.rocketmq.client.latency.MQFaultStrategy#selectOneMessageQueue()
Broker主从架构以及多副本策略。Master收到消息后会同步给Slave,这样一条消息就不止一份了,Master宕机了还有slave中的消息可用,保证了MQ的可靠性和高可用性。而且Rocket MQ4.5.0开始就支持了Dlegder模式,基于raft的,做到了真正意义的HA。
这么问明显在坑你,因为Broker会向所有的NameServer上注册自己的信息,而不是某一个,是每一个,全部!
发送重试:
主要通过发送者这边通过代码控制重试次数:
在设置的时间内没有发送成功,就自动重试。
消费者重试:
消费者重试的话只有在集群消费的时候才会重试,广播消费模式是不会重试的。
如下图:消费的时候如果返回RECONSUME_LATER参数,则表示消费失败,按照默认的时间间隔会重复消费。
返回CONSUME_SUCCESS则表示消费成功。
增加我们队列的数量即可,但每个队列必须要1:1匹配消费者。