一个broker代表着一个独立的kafka实例,多个组成集群
topic(话题)的概念是比partition(分区)大的,一个topic由多个partition组成,并且这几个partition都不一定在一个broker里面
bin/kafka-topics.sh --create --topic topicname --replication-factor 1 --partitions 1 --zookeeper localhost:2181
- 1
所以创建一个topic的方法就是:create topic --create --topic topicName --副本数 --分区数 --zk地址
首先要知道这个多副本指的是 分区的副本。然后多个副本之间还有叫做leader的家伙,其他副本是follower,我们发送的消息会被发送到leader上面,然后follow副本才能从leader副本中拉取消息进行同步。
- 生产者和消费者只与leader副本交互!,你可以理解为其他副本只是leader副本的拷贝,它们的存在只是为了保证消息存储的安全性。当leader副本发生故障时会从follow种选举出一个leader,但是follower副本如果有和leader同步程度达不到要求参加不了leader的选举。
一个分组只可以被消费者组的一个消费者所消费
一个消费组中的不同消费者消费的分区一定不会重复!
我们知道的消息引擎模型有:点对点模型和发布/订阅模型。传统的消息引擎就是这两大类。这两大类消息引擎,都有各自适合的应用场景,也都有不适应的场景。
点对点的模型,每消费一个消息之后,被消费的消息就会被删除。如果我们需要多个消费者消费同一个消息队列时,就不能使用点对点模型了。
发布订阅模型,支持多个消费者消费同一个消息队列,但是发布订阅模型中,消费者订阅了一个主题后,就要订阅主题的所有分区。这总方式既不灵活,也会影响消息的真是投递效果。
消费者组就避开了上述两种模型的缺陷,有兼容了他们的优点。
首先消费者之间彼此独立,互不影响。可以订阅同一个主题并且互不干扰。再加上Broker端的消息留存机制,kafka的消费者组就完美的解决了上面的问题。kafka使用一种消费者组(Consumer Group)机制,就同时实现了传统消息引擎系统的两大模型:如果所有的消费者实例都属于一个消费者组那就是点对点模型,如果所有消费者实例各自是独立的消费者那就是发布订阅模型。
因为上面消费组的第三个特性。所以消费者组的消费者实例数最好等于该消费者组订阅的主题中的分区数。如果实例数量多于分区数,那多余的实例将永远不会工作,除非有其他实例挂掉。
在某些业务场景下面,Kafka消息队列的消费顺序必须严格按照生产的顺序来
如何保证的呢
首先要知道保存消息的真正地方是分区,我们发送的消息都被放在了Partition
而我们的partition又存在于topic这个概念中
每次添加消息到分区都会采用尾加法,如上图所示,kafka只能保证partition分区中的消息有序!
消息被追加到partition的时候都会分配一个特定的偏移量offset,kafka通过偏移量来保证消息在具体某一个partition中的顺序性。
由于kafka只能保证单个topic的单个partition中消息是有序的,所以有两个解决办法
消费完自动提交offset:消费者仅拉取了消息,没有消费的时候挂了,offset自动提交了,消息丢失。
不自动提交offset:消费者拉取且消费了消息,没有提交offset的时候挂了,这里就会产生重复消费。
我们知道消息在被追加到 Partition(分区)的时候都会分配一个特定的偏移量(offset)。偏移量(offset)表示 Consumer 当前消费到的 Partition(分区)的所在的位置。Kafka 通过偏移量(offset)可以保证消息在分区内的顺序性。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-nI2PFw95-1661398485859)(https://gitee.com/perfectws/typora-img-plus/raw/master/img/202208231735389.jpeg)]
当消费者拉取到了分区的某个消息之后,消费者会自动提交了 offset。自动提交的话会有一个问题,试想一下,当消费者刚拿到这个消息准备进行真正消费的时候,突然挂掉了,消息实际上并没有被消费,但是 offset 却被自动提交了。
解决办法也比较粗暴,我们手动关闭自动提交 offset,每次在真正消费完消息之后再自己手动提交 offset 。但是,细心的朋友一定会发现,这样会带来消息被重新消费的问题。比如你刚刚消费完消息之后,还没提交 offset,结果自己挂掉了,那么这个消息理论上就会被消费两次。
leader 副本所在的 broker 突然挂掉,那么就要从 follower 副本重新选出一个 leader ,但是 leader 的数据还有一些没有被 follower 副本的同步的话,就会造成消息丢失
我们知道 Kafka 为分区(Partition)引入了多副本(Replica)机制。分区(Partition)中的多个副本之间会有一个叫做 leader 的家伙,其他副本称为 follower。我们发送的消息会被发送到 leader 副本,然后 follower 副本才能从 leader 副本中拉取消息进行同步。生产者和消费者只与 leader 副本交互。你可以理解为其他副本只是 leader 副本的拷贝,它们的存在只是为了保证消息存储的安全性。
试想一种情况:假如 leader 副本所在的 broker 突然挂掉,那么就要从 follower 副本重新选出一个 leader ,但是 leader 的数据还有一些没有被 follower 副本的同步的话,就会造成消息丢失。
设置 acks = all
解决办法就是我们设置 acks = all。acks 是 Kafka 生产者(Producer) 很重要的一个参数。
acks 的默认值即为1,代表我们的消息被leader副本接收之后就算被成功发送。当我们配置 acks = all 代表则所有副本都要接收到该消息之后该消息才算真正成功被发送。
设置 replication.factor >= 3
为了保证 leader 副本能有 follower 副本能同步消息,我们一般会为 topic 设置 replication.factor >= 3。这样就可以保证每个 分区(partition) 至少有 3 个副本。虽然造成了数据冗余,但是带来了数据的安全性。
设置 min.insync.replicas > 1
一般情况下我们还需要设置 min.insync.replicas> 1 ,这样配置代表消息至少要被写入到 2 个副本才算是被成功发送。min.insync.replicas 的默认值为 1 ,在实际生产中应尽量避免默认值 1。
但是,为了保证整个 Kafka 服务的高可用性,你需要确保 replication.factor > min.insync.replicas 。为什么呢?设想一下假如两者相等的话,只要是有一个副本挂掉,整个分区就无法正常工作了。这明显违反高可用性!一般推荐设置成 replication.factor = min.insync.replicas + 1。
设置 unclean.leader.election.enable = false
Kafka 0.11.0.0版本开始 unclean.leader.election.enable 参数的默认值由原来的true 改为false
我们最开始也说了我们发送的消息会被发送到 leader 副本,然后 follower 副本才能从 leader 副本中拉取消息进行同步。多个 follower 副本之间的消息同步情况不一样,当我们配置了 unclean.leader.election.enable = false 的话,当 leader 副本发生故障时就不会从 follower 副本中和 leader 同步程度达不到要求的副本中选择出 leader ,这样降低了消息丢失的可能性
#####原因:
消费消息服务做幂等校验,比如 Redis 的set、MySQL 的主键等天然的幂等功能。这种方法最有效。
将enable.auto.commit
参数设置为 false,关闭自动提交,开发者在代码中手动提交 offset。那么这里会有个问题:
什么时候提交offset合适?
.\bin\windows\kafka-server-start.bat .\config\server.properties
生产者:
package com.xiaour.spring.boot.kafka.producer;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.UUID;
/**
* @Author: Xiaour
* @Description:
* @Date: 2018/5/22 15:07
*/
@Component
public class Producer {
@Autowired
private KafkaTemplate kafkaTemplate;
private static Gson gson = new GsonBuilder().create();
//发送消息方法
public void send() {
Message message = new Message();
message.setId("KFK_"+System.currentTimeMillis());
message.setMsg(UUID.randomUUID().toString());
message.setSendTime(new Date());
kafkaTemplate.send("test", gson.toJson(message));
}
}
消费者:
package com.xiaour.spring.boot.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.Optional;
/**
* @Author: Xiaour
* @Description:
* @Date: 2018/5/22 15:03
*/
@Component
public class Consumer {
@KafkaListener(topics = {"test"})
public void listen(ConsumerRecord<?, ?> record){
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
System.out.println("---->"+record);
System.out.println("---->"+message);
}
}
}
测试Controller
package com.xiaour.spring.boot.kafka.producer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @Author: Xiaour
* @Description:
* @Date: 2018/5/22 15:13
*/
@RestController
@RequestMapping("/kafka")
public class SendController {
@Autowired
private Producer producer;
@RequestMapping(value = "/send")
public String send() {
producer.send();
return "{\"code\":0}";
}
}
yml配置:
server:
servlet:
context-path: /
port: 8080
spring:
kafka:
bootstrap-servers: 127.0.0.1:9092
#生产者的配置,大部分我们可以使用默认的,这里列出几个比较重要的属性
producer:
#每批次发送消息的数量
batch-size: 16
#设置大于0的值将使客户端重新发送任何数据,一旦这些数据发送失败。注意,这些重试与客户端接收到发送错误时的重试没有什么不同。允许重试将潜在的改变数据的顺序,如果这两个消息记录都是发送到同一个partition,则第一个消息失败第二个发送成功,则第二条消息会比第一条消息出现要早。
retries: 0
#producer可以用来缓存数据的内存大小。如果数据产生速度大于向broker发送的速度,producer会阻塞或者抛出异常,以“block.on.buffer.full”来表明。这项设置将和producer能够使用的总内存相关,但并不是一个硬性的限制,因为不是producer使用的所有内存都是用于缓存。一些额外的内存会用于压缩(如果引入压缩机制),同样还有一些用于维护请求。
buffer-memory: 33554432
#key序列化方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
#消费者的配置
consumer:
#Kafka中没有初始偏移或如果当前偏移在服务器上不再存在时,默认区最新 ,有三个选项 【latest, earliest, none】
auto-offset-reset: latest
#是否开启自动提交
enable-auto-commit: true
#自动提交的时间间隔
auto-commit-interval: 100
#key的解码方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#value的解码方式
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#在/usr/local/etc/kafka/consumer.properties中有配置
group-id: test-consumer-group
Record为:---->ConsumerRecord(topic = test, partition = 0, offset = 2, CreateTime = 1661394117248, serialized key size = -1, serialized value size = 109, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"id":"KFK_1661394117247","msg":"e13c3bfd-5370-4a30-bab3-09b21b0a2009","sendTime":"Aug 25, 2022 10:21:57 AM"})
value为:---->{"id":"KFK_1661394117247","msg":"e13c3bfd-5370-4a30-bab3-09b21b0a2009","sendTime":"Aug 25, 2022 10:21:57 AM"}
####源码解析:
value为:---->{“id”:“KFK_1661394117247”,“msg”:“e13c3bfd-5370-4a30-bab3-09b21b0a2009”,“sendTime”:“Aug 25, 2022 10:21:57 AM”}
####源码解析:
- 生产者:send方法
- [外链图片转存中...(img-3F4JvW9z-1661398485861)]
- 本次使用的是send(string topic,V data),这里面Kafka存的key就默认为null了。
- 假如想要指定partition的话,就要用send(String topic,Integer partition,K key,V data),这里可以直接江K key设为null(有点需要注意,这里我们使用的序列化方式是StringSerializer,所以这里send方法里面的kye和value都必须是string类型的,Integer类型和实体类型都是不行的。