目录
二,kafka broker ,producer ,consumer,offset
3.4, offset加锁控制consumer执行顺序是否可以解决1:N
3.5,如果不能,那么我这时其他业务需要重新消费数据如何处理?
3.6,如果consumer比分区多呢?注定有一个消费者consumer被浪费
rabbitmq
mq 技术选型和横向对比
首先,kafka 是一个消息中间件。我们从一个本质的点聊起,我们有一个系统 service,如果这两个服务之间直接调用的话,它们之间会相互约束,耦合性比较强,而且未来的拓展不好,一方有调整的时候,另一方会受到影响。

这时候我们加入一个消息系统,一方发送消息,另一方去取,就起到了我们所谓的消峰,异步,限流,解耦的效果。

现在我们聊的是消息的中间件,其实中间件包括很多,包括有存储的,有缓存的,他们的作用都有一个相通性,就是他们最终都会与分布式挂钩。
那说道中间件,会有几个词汇的需求:可靠的、可扩展的、高性能的等等,关于分布式对中间件大概有这么几个要求。在 redis 中,我们会有 AKF 微服务划分原则,那 kafka 和 AKF 划分有什么联系呢,等待会儿我讲完看一下效果吧,如果讲的不明白的话,后面排一个分布式系统划分分享。
我们先举一个简单的例子,然后去挑这个例子的毛病,把毛病挑出来之后,我们就知道为什么 kafka 会被设计成现在这个样子了。
现在我们假设,整家公司只有一个系统。所有消息被调用的时候,都被打到同一个单机队列中,这个时候确实能够达到我们常说的削峰填谷的作用,但是随着我们业务的增大,并发量的随之增大,这个单机的队列它会有单点问题,会有性能问题。这也是我们在使用很多技术的时候通常会遇到的两个问题。这是两个独立的问题,这两个问题我们可以独立的解决,解决方案也可以独立的应用,但很多时候我们会把他们整合应用。所以继续往下讲你们应该能找到这种感觉。
性能问题
我们要先解决性能问题,因为 kafka 的关键词里面,先是 topic,然后是 partition,再是副本。
性能问题是怎么解决的?
以 AKF 的角度来说,它有三个维度:
x 轴解决的是单点问题,是高可用的,y 轴解决的是业务划分的,z 轴解决的是分片、分治的。
那我们如何把这三个轴对应到 kafka 的特性上?

首先 y 轴将消息按业务划分,比如在一个电商系统,可以把订单拆出一个队列、用户行为拆成一个队列、广告推荐拆成一个队列。这样划分之后,不同业务的生产和消费都不会相互影响,业务之间的隔离性会比较好。而且按照业务划分之后,不同的业务数据会被部署在整个 kafka 集群中的不同的节点上,每个节点只关注自己的一套东西的话,资源的利用率会高。因此如果将我们上面说道的单机版的这个消息队列从逻辑上进行拆分的话,第一个拆出来的就是 topic,我们说的 y 轴其实就是 topic。
可以把 topic 就理解成是不同的业务。但是如果这么划分的话,假设说我的用户行为日志binlog存在某个 topic 中,如果我的日志量太大了,如果全部存放在一个节点(中的一个进程)上的话,工作起来 IO 上会卡顿受限,而且单看自己它会有一个性能瓶颈。
那怎么解决这个性能瓶颈,当我们已经进行过业务拆分之后,怎么再进行更细粒度的拆分,拆分分区,这时候就是我们的 partition对应的是z轴。在一个 topic 下,会有多个 partition。
这个分区partition其实作用的就是我们的 z 轴,因为在业务使用的时候,假如说我们希望用 kafka 去存 用户行为日志,我们会期望把这些日志由一个变成多个,由多台机器,更大的能力去承载它。一般我们聊到 z 轴的时候,z 轴其实是对 y 轴一个细分,那细分的时候,可以使用的手段可以是 range,hash,或者是做一个映射,拆解的方案会有很多。

使用分区解决性能问题引出的问题如何解决
还是拿刚才推荐系统中收集用户行为的例子来说,我要把所有的用户行为日志打散到多台里面去,这个多台应该怎么打才合适?也就是说,当使用的数据从一个线性的队列散落到多个地方的时候,如何去保证前面的生产最后消费的数据的一致性?就是说,在以前没有分区的概念的时候,数据长什么样,我只需要按顺序一个一个推进来,这边取到的就是原有的样子。一旦一变多,会带来一致性的问题,那我们的生产方和消费方,如何组建这个先后关系,保证它仍然是有序的?
如果不加分区,数据是什么顺序进来,就是什么顺序被消费,但由于单机顶不住大数据量的压力,必然会引出多机就有了分区,解决一个问题的同时引出了另一个问题,也就是分区出现之后要考虑的顺序性,一致性问题。

在 AKF z 轴划分的时候,如果你学过大数据,也可以引入分治这个概念了,大数据必然会有分治,也就是 map 阶段,map by key,也会有聚集 reduce,就是相同的数据要打到一起去,然后收集起来进行后续的处理,其实 kafka 中依然是这个原理。在生产用户行为日志的时候,有 展现,点击,收藏 三种不同的用户行为,只要他们各自有自己的顺序。也就是在分而治之的时候,将无关的打开,分散出去,将有关的放在一起,一定要保证顺序。
将无关的打开:扔到不同的分区中,后续无关的并行。
将有关的放在一起:将关联性的东西打到同一个分区中
因为如果不这么做,必然要引入分布式锁,或者消息ID的顺序性,就会变得很慢,所谓的并发还是串行执行,有一个先后执行的顺序。将并行最大化,让不同的数据打开。
不可能50个表就创建50个topic,
所以这时候,z 轴应该如何实现?要规划好整个数据路由,将无关的数据放在不同的 partition 中,这样我们就达到了一个后续无关数据的并行度。
无关的数据是可以并行计算的,如果有关的话,我们会需要在他们之间加上一个分布式锁,或者通过 log id 等方式去保证他们的顺序性,这样会变得很慢,所谓的并发最终还会退化为串行。所以我们希望将并行最大化,将无关的数据分散到不同的分区里,以追求并发、并行处理,有关联的数据,一定要按照原有顺序发送到同一个分区里。
topic 是逻辑概念,partition 是最后物理的对应,partition 是topic的子域:一个 topic 下面有 partition0,1,2,3…
然后我们看到还有一个 x 轴,这个 x 轴什么意思,x 轴做的是可靠性的保证。
现在通过 y 轴和 z 轴,将这些消息打散到不同的分区了,实现的是计算的并行,提升了性能,但是这个时候,如果某一个分区消失了,挂掉了,这时候你堆积的消息会丢失,这时候需要我们给它做副本。在做副本的时候,我们横向走下这个过程。

无论 redis 也好,kafka 也好, 你像纯内存的 redis,它必须有一个持久化,他会认为磁盘是自己可靠性的来源,如果不在单机的维度,而是看集群的维度的话,单物理节点可能也会丢失,会挂,这时候为了解决节点的问题,我们还需要网络的维度提供可靠性,那 redis 就会有单机的持久化、网络的持久化,基于网络的主从复制集群。那他的主从复制集群就来自于它的 x 轴。其实 kafka 也一样,kafka 分区的数据会持久化到磁盘当中去,而且利用顺序读写这种高性能的磁盘 IO,x轴一般是出主机的、异地的备份,因为如果将全量备份放在同一个节点上意义不大。
x 轴的拆分会比较容易出现数据一致性、数据的同步问题,会有一系列分布式集群下的解决方案,比如 mysql 会有读写分离的策略,为了解决一致性和复杂性上的痛点,kafka 做了一个决策就是:只允许在主片上增删改,从片只允许读(做减法)。
规律性问题:单机到分布式,分布式中间件的实现,必然参照AKF,两者是有关联关系的
由单机到分布式,或者说由传统到中间件,基本都是按照这个思路来设计的。我们用到的各种分布式中间件的实现,和 AKF 这一侧是有必然的这么一个参照和关联的关系的。
3.5,分治的顺序性问题解决方案->offset保证分区内的顺序性
这些都是方法论,来推导出 kafka 中有 topic,partition,有序性的这些概念,然后我们再把有序性做一个延伸:下面我们来讲 offset
我们知道,分区内部是有序的(纯队列),分区外部是无序的,那这个顺序是怎么得到的?

我们用 partition1 做一个简单的描述。假设我们的上游发来三个消息,msg 1,2,3,这三个消息会按照接收的顺序(顺序是由生产者维护,最简单的是单个生产者生产)存在 kafka 队列中。当消费者来消费的时候,每个消费者会维护一个自己消费到的位置,也就是偏移量offset。
统一批数据打下来之后,可以由直接业务消费订单处理,也可以由另外一批人重复消费这个数据进行分析。
偏移量offset(消费者知道自己上次消费到哪),本地的持久化。这两个使kafka能适用多种场景
如何去管理这些分布式的东西的一致性?
上面的例子中,只有一个 topic 和这个 topic 下的两个 partition,实际上企业中会有很多个 topic,每个业务下都会有很多 topic,每个 topic 下会有几百个 partition,如何去管理这些分布式的东西的一致性?
在经验中,主从是管理成本最低的,它的协调成本会比主主要低,这样会引发一系列分布式协调的问题,包括选主,包括如何保证数据的一致性,常用的解决方案有 zookeeper,或者 etcd。那我们的 kafka 会依赖 zookeeper,主要是依赖它的分布式协调,不过要注意千万不要把 zookeeper 当做是分布式存储来使用。
kafka 对 zookeeper 的依赖
kafka 的 broker 依赖于 zookeeper
使用 zk 存储 broker 的元数据,将 broker 当做是一个进程。
使用到 zk 的选举机制
老版本的producer获取这些信息的时候会去zk里获取,生产者多的情况下会把zk的leader角色的网卡打满。
新版本放弃这个从zk获取,该从broker
broker依赖zk选主之后,会产生一些源数据metadata
broker,topic,partition客户端和集群相关的数据,同步到集群中。
在业务层次上,自己完成铜须,不需要依赖zk
分布式 角色之间进行通讯,不要因为业务需求,让ZK成为负担。

亲缘性:producer嵌入到tomcat,tomcat会接到很多请求,每个请求要吧数据大到kafka做后续分析,但是没有对tomcat请求做亲缘性或者顺序性的绑定,就没有顺序性的保证。
tomcat抢锁,t1抢到后,按理说是先执行t1,在执行t2,但是由于没有在tomcat请求做亲缘性,由于producer往broker中打数据的过程中,由于网络通讯,调度,虽然可以在业务上可以使用分布式锁保证执行顺序,后续发送无法保证。
在锁的外面producer发送,或者是做切面,切到锁的外面去了,并发下多个producer往一个topic里打数据的时候,因为在锁外执行,无法保证数据的顺序性。
- lock
- sql
- unlock
- producer
在并发下,使用分布式锁操作数据库,使用producer往kafka打数据的时候(这个操作时在锁外还是锁内),数据无关无所谓,数据有相关性需要放在锁内。
- lock
- sql
- producer
- unlock
-
-
consumer 单分区保证顺序,多分区也不影响

如果前面生产者producer把业务都处理完了,不着急做最终结果消费consumer单机也是可以的。

如果追求消费consumer的性能,也可以使用
分区和consumer可行性:1:1 ;N:1

无法保证一致性问题,消费同样的数据注定有执行快慢,会更新错数据。所以保证有序性,绝对不允许有1:N发生。
offset执行排序,加锁控制consumer执行顺序岂不是可以解决这个问题?
但会降低执行效率,这就是为什么放弃了1:N的情况
ES分析数据需要读到对应分区中,曾经所有的数据。这时候就出现了分组的概念。
组和组是隔离的,组内是不允许出现1:N的,组和组之间是无妨的。因为目标地址是不一样的,有一套业务是不需要分组的,只有多套业务情况下是需要分组的。


- kafka的broker的partition保存了producer发送来的数据。
-
- 数据是可以重复利用的;
-
- 在单一使用场景下,先要保证,即使追求性能,用多个consumer,
- 也应该注意,不能一个分区由多个consumer消费;
-
- 数据的重复利用是站在Group上的,但是Group内要保障上述描述。
consumer挂了,会不会造成重复消费的情况,或者是数据的丢失没有被消费?
1,丢失:读到了8号offset,在正要入库的时候挂了,假设他记住了偏移量=8,再次启动后会认为8已经消费了,直接消费9,就造成了数据丢失。
2,重复:读到了8号offset,入库成功了,但是还没有持久化,这时候挂了在重启,认为8没有被消费,就造成了重复消费的问题
围绕offset也就是消费的进度,节奏,频率,顺序先后
在没有人为干涉的时候runtime在内存里维护了自己的offset(偏移量的进度)
offset持久化分问题上也有一个新老版本过度的问题
1,0.8之前默认通过客户端的方式是维护到zk中(zk是分布式协调,不是存储)
2,tpoic自己维护offset(默认50个分区),你内存中维护
3,第三方,可以是redis,mysql
是不是应该在runtime的时候,拉取分区的数据对外界写成功后,更新offset;
拉取回来之后,立刻更新offset,然后在去处理;
根据业务不同,追求的一致性问题来处理这个问题,
1,异步的,5秒之内,先干活,后持久化offset(重复消费),4秒的时候挂了,但是没更新;
2,同步的,业务操作和offser的持久化(性能差)
3,没控制好顺序,offset持久化了,但是业务写失败了(丢失)
重复消费,幂等可以解决,但是幂等解决不了丢失

AKF是指可拓展模型,有3个坐标轴,分别是x,y,z

x:水平扩展,通过绝对平等的复制 (双活高可用)
y:将系统按业务拆分(单个系统拆分成有独立功能的小系统:用户,订单,售后啊业务划分)
z:基于用户分片,分治,使划分出来的系统相互隔离,又相对完整(跟多租户系统类似,分片,分治)