目录
注:示例代码使用的语言是Python
参数说明:
示例:创建 Kafka 生产者,采用异步的方式发送到 Kafka Broker
- from kafka3 import KafkaProducer
-
- def producer(topic: str, msg: str, partition=0):
- """
- :function: 生产者,生产数据
- :param topic: 写入数据所在的topic
- :param msg: 写入的数据
- :param partition: 写入数据所在的分区
- :return:
- """
- print("开始生产数据......")
- # 初始化生产者对象,bootstrap_servers参数传入kafka集群
- # 将acks的值设为0,acks=0,此方式也是异步的方式,但是生产环境中不会这样使用,因为存在数据丢失的风险
- # producer = KafkaProducer(bootstrap_servers=["170.22.70.174:9092", "170.22.70.178:9092", "170.22.70.179:9092"], acks=0)
- producer = KafkaProducer(bootstrap_servers=["170.22.70.174:9092", "170.22.70.178:9092", "170.22.70.179:9092"])
- # 将发送消息转换成bytes类型,编码使用utf-8
- future = producer.send(topic=topic, value=bytes(msg, 'utf-8'), partition=partition)
- producer.close()
-
- if __name__ == '__main__':
- msg = "this is profucer01"
- topic = "first"
- producer(topic, msg)
- """
- 带回调函数的异步发送
- 回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception),
- 如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。
- """
- from kafka3 import KafkaProducer
-
- def producer(topic: str, msg: str, partition=0):
- """
- :function: 生产者,生产数据
- :param topic: 写入数据所在的topic
- :param msg: 写入的数据
- :param partition: 写入数据所在的分区
- :return:
- """
- print("开始生产数据......")
-
- # 定义发送成功的回调函数
- def on_send_success(record_metadata):
- print("消息成功发送到主题:", record_metadata.topic)
- print("分区:", record_metadata.partition)
- print("偏移量:", record_metadata.offset)
-
- # 定义发送失败的回调函数
- def on_send_error(excp):
- print("发送消息时出现错误:", excp)
- # 可以根据实际情况执行一些错误处理逻辑
-
- # 初始化生产者对象,bootstrap_servers参数传入kafka集群
- producer = KafkaProducer(bootstrap_servers=["170.22.70.174:9092", "170.22.70.178:9092", "170.22.70.179:9092"])
- # 将发送消息转换成bytes类型,编码使用utf-8
- producer.send(topic=topic, value=bytes(msg, 'utf-8'), partition=partition).add_callback(on_send_success).add_errback(on_send_error)
- producer.close()
- from kafka3 import KafkaProducer
-
- def producer(topic: str, msg: str, partition=0):
- """
- :function: 生产者,生产数据
- :param topic: 写入数据所在的topic
- :param msg: 写入的数据
- :param partition: 写入数据所在的分区
- :return:
- """
- print("开始生产数据......")
- # 初始化生产者对象,bootstrap_servers参数传入kafka集群
- # 将acks的值设为all,acks="all",此方式也是同步的方式.
- # producer = KafkaProducer(bootstrap_servers=["170.22.70.174:9092", "170.22.70.178:9092", "170.22.70.179:9092"], acks="all")
- producer = KafkaProducer(bootstrap_servers=["170.22.70.174:9092", "170.22.70.178:9092", "170.22.70.179:9092"])
- # 将发送消息转换成bytes类型,编码使用utf-8
- future = producer.send(topic=topic, value=bytes(msg, 'utf-8'), partition=partition)
- # 等待 Future 返回结果,设置超时时间为10秒
- future.get(timeout=10)
- producer.close()
将所有数据发往分区 0 中。
- # 指定分区
- def producer_01(topic: str, msg: str, partition=0):
- """
- :function: 指定分区
- :param topic: 写入数据所在的topic
- :param msg: 写入的数据
- :param partition: 写入数据所在的分区
- :return:
- """
- # 初始化生产者对象,bootstrap_servers参数传入kafka集群
- producer = KafkaProducer(bootstrap_servers=["170.22.70.174:9092", "170.22.70.178:9092", "170.22.70.179:9092"])
- # 将发送消息转换成bytes类型,编码使用utf-8
- future = producer.send(topic=topic, value=bytes(msg, 'utf-8'), partition=partition)
- try:
- # 等待消息发送完成
- sendResult = future.get(timeout=10)
- print(f"消息: {msg}\n所在的分区: {sendResult.partition}\n偏移量为: {sendResult.offset}\n")
- # 关闭生产
- producer.close()
- except KafkaError as e:
- print(f"消息: {msg} 发送失败\n失败信息为: {e}\n")
-
-
- msg = "this is partition"
- topic = "first"
- for i in range(5):
- producer_01(topic, msg+str(i))
- # 没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值。
- def producer_02(topic: str, msg: str, key: str):
- """
- :function: 指定分区
- :param topic: 写入数据所在的topic
- :param msg: 写入的数据
- :param key: 发送消息的key值
- :return:
- """
- # 初始化生产者对象,bootstrap_servers参数传入kafka集群
- producer = KafkaProducer(bootstrap_servers=["170.22.70.174:9092", "170.22.70.178:9092", "170.22.70.179:9092"])
- # 将发送消息转换成bytes类型,编码使用utf-8
- future = producer.send(topic=topic, key=bytes(key, 'utf-8'), value=bytes(msg, 'utf-8'))
- try:
- # 等待消息发送完成
- sendResult = future.get(timeout=10)
- print(f"消息: {msg}\n所在的分区: {sendResult.partition}\n偏移量为: {sendResult.offset}\n")
- # 关闭生产
- producer.close()
- except KafkaError as e:
- print(f"消息: {msg} 发送失败\n失败信息为: {e}\n")
-
- msg = "this is partition"
- topic = "first"
- key = "a"
- for i in range(5):
- producer_02(topic, msg+str(i), key)
- # 自定义分区 发送过来的数据中如果包含 hello,就发往 0 号分区,不包含 hello,就发往 1 号分区。
- def producer_03(topic: str, msg: str):
- """
- :function: 自定义分区
- :param topic: 写入数据所在的topic
- :param msg: 写入的数据
- :return:
- """
- # 自定义分区器
- def my_partitioner(msg):
- if "hello" in str(msg):
- return 0
- else:
- return 1
-
- # 初始化生产者对象,bootstrap_servers参数传入kafka集群
- producer = KafkaProducer(bootstrap_servers=["170.22.70.174:9092", "170.22.70.178:9092", "170.22.70.179:9092"])
- # 将发送消息转换成bytes类型,编码使用utf-8
- future = producer.send(topic=topic, value=bytes(msg, 'utf-8'), partition=my_partitioner(msg))
-
- try:
- # 等待消息发送完成
- sendResult = future.get(timeout=10)
- print(f"消息: {msg}\n所在的分区: {sendResult.partition}\n偏移量为: {sendResult.offset}\n")
- # 关闭生产
- producer.close()
- except KafkaError as e:
- print(f"消息: {msg} 发送失败\n失败信息为: {e}\n")
-
- msg = "hello this is partition"
- msg1 = "this is partition"
- """
- 生产者提高吞吐量
- 1、linger.ms:等待时间,修改为5-100ms
- 2、compression.type:压缩snappy
- 3、RecordAccumulator:缓冲区大小,修改为64m
- """
- from kafka3 import KafkaProducer
- from kafka3.errors import KafkaError
-
-
- def producer(topic: str, msg: str):
- """
- :function: 生产者,生产数据
- :param topic: 写入数据所在的topic
- :param msg: 写入的数据
- :return:
- """
- # 初始化生产者对象,bootstrap_servers参数传入kafka集群
- producer = KafkaProducer(bootstrap_servers=["170.22.70.174:9092", "170.22.70.178:9092", "170.22.70.179:9092"],
- linger_ms=5, # linger_ms设置为5ms
- compression_type="snappy", # 设置压缩类型为snappy
- buffer_memory=64*1024*1024 # 设置缓冲区大小为64MB
- )
- # 将发送消息转换成bytes类型,编码使用utf-8
- future = producer.send(topic=topic, value=bytes(msg, 'utf-8'))
- try:
- # 等待消息发送完成
- sendResult = future.get(timeout=10)
- print(f"消息: {msg}\n所在的分区: {sendResult.partition}\n偏移量为: {sendResult.offset}\n")
- # 关闭生产
- producer.close()
- except KafkaError as e:
- print(f"消息: {msg} 发送失败\n失败信息为: {e}\n")
说明:数据的可靠性保证主要是通过acks的设置来保证的,下面说明acks在不同取值下的数据可靠性情况:
综上分析:要想使得数据完全可靠条件=ACK级别设置为1 + 分区副本数大于等于2 + ISR应答最小副本数大于等于2(min.insync.replicas 参数保证)
Python代码设置acks
- # acks取值:0、1、"all"
- producer = KafkaProducer(bootstrap_servers=["170.22.70.174:9092", "170.22.70.178:9092", "170.22.70.179:9092"], acks=0)
那么如何保证数据只存储一次呢?这就需要使用幂等性。
1、幂等性:
2、幂等性实现原理:
3、使用幂等性
- from kafka import KafkaProducer
-
- # 创建 KafkaProducer 实例,开启幂等性
- producer = KafkaProducer(
- bootstrap_servers="127.0.0.1:9092",
- acks="all", # 设置 acks 参数为 "all",要求所有副本都确认消息
- enable_idempotence=True
- )
说明:开启事务必须开启幂等性。
存储事务信息的特殊主题:__transaction_state_分区_Leader
注意事项:生产者在使用事务功能之前,必须先自定义一个唯一的transaction.id。有了该transaction.id,即使客户端挂掉了,它重启之后也能继续处理未完成的事务。
- from kafka import KafkaProducer
- from kafka.errors import KafkaError
-
- # 创建 KafkaProducer 实例,开启事务
- producer = KafkaProducer(
- bootstrap_servers="127.0.0.1:9092",
- enable_idempotence=True # 开启幂等性
- )
-
- # 初始化事务
- producer.init_transactions()
-
- # 开始事务
- producer.begin_transaction()
-
- try:
- # 发送事务性消息
- for i in range(3):
- key = b"my_key"
- value = b"my_value_%d" % i
- producer.send("my_topic", key=key, value=value)
-
- # 提交事务
- producer.commit_transaction()
-
- except KafkaError as e:
- # 回滚事务
- producer.abort_transaction()
- print(f"发送消息失败: {e}")
-
- finally:
- # 关闭 KafkaProducer 实例
- producer.close()
说明:数据的有序性只能保证单分区有序,分区与分区之间是无序的。
1、Kafka在1.x版本之前保证数据单分区有序,条件如下:
2、Kafka在1.x版本之后保证数据单分区有序,条件如下: