kafka是一款分布式、支持分区的、多副本,基于zookeeper协调的分布式消息中间件。其最大特性就是能够实时处理大量消息。
kafka 每次把生产的数据发送向Leader分区,并顺序写入到磁盘,然后Leader分区会将数据同步到各个从分区Follower,即使主分区挂了,也不会影响服务的正常运行。
Kafka中有三种发送消息的方式:
只发不管结果(fire-and-forget):只调用接口发送消息到Kafka服务器,但不管成功写入与否。由于Kafka是高可用的,因此大部分情况下消息都会写入,但在异常情况下会丢消息。
同步发送(Synchronous send):调用send()方法返回一个Future对象,我们可以使用它的get()方法来判断消息发送成功与否。会阻塞当前线程,执行效率低,但可靠性最高。
异步发送(Asynchronous send):调用send()时提供一个回调方法,不会阻塞当前线程,当接收到broker结果后回调此方法。可靠性不如同步发送,可能因为等问题不保证100%被回调。
acks:用来指定分区中必须要有多少个副本收到这条消息,之后生产者才会认为这条消息是成功写入的。
acks = 1, 默认值即为1.生产者发送消息之后,只要分区的leader副本成功写入消息,那么它就会收到来自服务端的成功响应。如果消息无法写入leader副本,比如在leader副本崩溃、重新选举新的leader副本的过程中,那么生产者就会收到一个错误的响应,为了避免消息丢失,生产者可以选择重发消息。如果消息写入leader副本并返回成功响应给生产者,且在被其他follower副本拉取之前leader副本崩溃,那么此时消息还是会丢失,因为新选举的leader副本中并没有这条对应的消息。acks设置为1,是消息可靠性和吞吐量之间的折中方案。
acks = 0, 生产者发送消息之后不需要等待任何服务的相应。如果在消息从发送到写入kafka的过程中出现某些异常,导致kafka并没有收到这条消息,那么生产者也无从得知,消息也就丢失了。在其他配置环境相同的情况下,acks设置为0可以达到最大的吞吐量。
acks = -1 或acks = all。 生产者在消息发送之后,需要等待ISR中的所有副本都成功写入消息之后才能够对来自服务端的成功响应。在其他配置环境相同的情况下,acks设置为-1(all)可以达到最强的可靠性。但这并不意味着消息就一定可靠,因为ISR中可能只有leader副本,这样就退化成了acks = 1 的情况。要获得更高的消息可靠性需要配合min.insync.replicas等参数的联动。
max.request.size: 用来限制生产者客户端能发送的消息的最大值,默认为1048576B,即1MB。
retries和retry.bakcoff.ms: retries参数用来配置生产者重试的次数。默认值为0,即在发生异常的时候不进行任何重试动作。retry.backoff.ms默认值为100,用来设置两次重试之间的时间间隔。避免无效的频繁重试。
compression.type:这个参数用来指定消息的压缩方式,默认值为“none”,默认情况下,消息不会被压缩。可配置为“gzip”、“snappy”、和“lz4”。
connections.max.idle.ms: 用来指定在多久之后关闭闲置的连接,默认值是540000(ms),及9分钟。
linger.ms: 用来指定生产者发送ProducerBatch之前等待更多消息(ProducerRecord)加入ProducerBatch的时间,默认值为0.生产者客户端会在ProducerBatch被填满或等待时间超过linger.ms值时发送出去。
receive.buffer.bytes: 用来设置Socket接收消息缓冲区(SO_RECBUF)的大小,默认值为131072(B),即32KB。
send.buffer.bytes: 用来设置Socket发送消息缓冲区(SO_SNDBUF)的大小,默认值为131072(B),即32KB。
request.timeout.ms: 用来配置Producer等待请求响应的最长时间,默认值为30000(ms)。请求超时之后可以选择重试。注意: 这个参数需要比broker端参数raplica.lag.time.max.ms的值要大,这样可以减少因客户端重试而引起的消息重复的概率。
消费者主动的去kafka集群拉取消息时,也是从Leader分区去拉取数据。
多个消费者组成一个消费者组,并且同一个消费者组内的消费者不能消费同一个分区中的消息,以免偏移混乱,可以消费不同分区中的消息。
一般建议一个消费者对应一个分区,以提升响应性能。
fetch.min.bytes
一次拉取的最小字节数(1B)
fetch.max.bytes
一次拉取的最大数据量(50M)
fetch.max.wait.ms
拉取时的最大等待时长(500ms)
max.partition.fetch.bytes
每个分区一次拉取的最大数据量(1MB)
max.poll.records
一次拉取的最大条数(500)
connections.max.idle.ms
网络连接的最大闲置时长(540000ms)
request.timeout.ms
一次请求等待响应的最大超时时间,consumer 等待请求响应的最长时间(30000ms)
metadata.max.age.ms
元数据在限定时间内没有进行更新,则会被强制更新(300000)
reconnect.backoff.ms
尝试重新连接指定主机之前的退避时间(50ms)
retry.backoff.ms
尝试重新拉取数据的重试间隔(100ms)
isolation.level
隔离级别!决定消费者能读到什么样的数据read_uncommitted:可以消费到 LSO(LastStableOffset)位置; read_committed:可以消费到 HW(High Watermark)位置
max.poll.interval.ms
超过时限没有发起 poll 操作,则消费组认为该消费者已离开消费组
kafka是基于Java/Scala的软件,所以在安装kafka之前需要先安装java。Linux下安装更方便,且更能利用网络的性能,建议在Linux下安装。
直接安装默认的即可。
sudo apt install default-jre
新版的kafka默认已经集成了zookeeper,所以只要下载稳定版解压即可。
mkdir kafka
cd kafka
wget https://dlcdn.apache.org/kafka/3.3.1/kafka_2.13-3.3.1.tgz
sudo tar -xzf kafka_2.13-3.3.1.tgz
kafka的部署分为两种方式,一种方式是基于zookeeper,另一种方式是单独部署。
zookeeper的配置文件config/zookeeper.properties,一般不需要改动,如果有端口冲突,可以配置新的端口号。
sudo ./bin/zookeeper-server-start.sh config/zookeeper.properties
kafka配置文件config/server.properties的常用配置如下:
#broker的全局唯一编号,不能重复
broker.id=0
#删除topic功能使能
delete.topic.enable=true
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘IO的现成数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka运行日志存放的路径
log.dirs=/opt/module/kafka/logs
#topic在当前broker上的分区个数
num.partitions=1
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment文件保留的最长时间,超时将被删除
log.retention.hours=168
#配置连接Zookeeper地址
zookeeper.connect=172.18.1.22:2181
# 配置连接Zookeeper集群地址,可以将一个broker分布到多个zookeeper上
# zookeeper.connect=172.18.1.22:2181,172.18.1.23:2181,172.18.1.24:2181
sudo bin/kafka-server-start.sh config/server.properties
sudo bin/kafka-server-start.sh config/server1.properties
sudo bin/kafka-server-start.sh config/server2.properties
sudo bin/kafka-server-start.sh config/server3.properties
sudo bin/kafka-server-start.sh config/server1.properties
sudo bin/kafka-server-start.sh config/server2.properties
sudo bin/kafka-server-start.sh config/server3.properties
单一模式,也即KRaft模式,不需要zookeeper。这是kafka新版本提供的功能,此模式避免与zookeeper通信,可以提升管理性能。
KAFKA_CLUSTER_ID=“$(bin/kafka-storage.sh random-uuid)”
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties
sudo ./bin/kafka-server-start.sh config/kraft/server.properties
advertised.listeners=PLAINTEXT://172.18.11.46:9092,需要指定IP,否则其他电脑无法访问
配置config/kraft/server.properties文件
# The role of this server. Setting this puts us in KRaft mode
process.roles=broker,controller
# The node id associated with this instance's roles
node.id=1
# @前面的序列号为node.id, roles=controller的都要配置在此处, 以考虑最终controller的投票
controller.quorum.voters=1@localhost:9093,2@172.18.11.12:9093,3@172.18.11.13:9093
# 默认broker,partition
bin/kafka-topics.sh --create --topic quickstart --bootstrap-server localhost:9092
# 多集群
bin/kafka-topics.sh --create --topic quickstart --bootstrap-server localhost:9092,172.18.11.12:9092,,172.18.11.13:9092
# 指定broker,partition,并且指定3个副本
kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic quickstart-events
sudo bin/kafka-topics.sh --list --bootstrap-server localhost:9092
## 多集群
sudo bin/kafka-topics.sh --list --bootstrap-server localhost:9092,172.18.11.12:9092,,172.18.11.13:9092
sudo bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
生产消息
# KRaft模式
$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
This is my first event
This is my second event
# 多zookeeper模式
$ bin/kafka-console-producer.sh --broker-list localhost:9092 localhost:9093 localhost:9094 --topic my-replicated-topic
消费消息
# KRaft模式
$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
This is my first event
This is my second event
删除主题
先要确保配置文件config/kraft/server.properties中配置了可删除。
auto.create.topics.enable = false
delete.topic.enable=true
删除指令
# KRaft模式
bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic quickstart-events
kafka默认提供的是Java接口,但是C++,Python的接口也都有提供。
pip install kafka-python
from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import kafka_errors
import traceback
import json
def producer_demo():
# 假设生产的消息为键值对(不是一定要键值对),且序列化方式为json
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
key_serializer=lambda k: json.dumps(k).encode(),
value_serializer=lambda v: json.dumps(v).encode())
# 发送三条消息
for i in range(0, 3):
future = producer.send(
'kafka_demo',
key='count_num', # 同一个key值,会被送至同一个分区
value=str(i),
partition=1) # 向分区1发送消息
print("send {}".format(str(i)))
try:
# kafka默认是异步的, 消费会先发送到消息缓存中,根据配置的时间来定时将消息缓存发送到服务器
# 默认立即发往服务器
future.get(timeout=10) # 监控是否发送成功
except kafka_errors: # 发送失败抛出kafka_errors
traceback.format_exc()
def consumer_demo():
consumer = KafkaConsumer(
'kafka_demo',
bootstrap_servers=':9092',
group_id='test'
)
for message in consumer:
print("receive, key: {}, value: {}".format(
json.loads(message.key.decode()),
json.loads(message.value.decode())
)
)
consumer_demo()
producer_demo()