• python-kafka客户端封装



    前言

    本文对python的kafka包做简单封装,方便kafka初学者使用。包安装:

    pip install kafka-python
    
    • 1

    封装代码

    kafka_helper.py

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    import json
    import traceback
    from kafka import KafkaConsumer, KafkaProducer, TopicPartition
    from typing import List
    
    
    class KProducer:
        def __init__(self, bootstrap_servers: List, key_serializer=lambda m: json.dumps(m).encode("ascii"),
                     value_serializer=lambda m: json.dumps(m).encode("ascii"), compression_type=None):
            try:
                self.producer = KafkaProducer(
                    bootstrap_servers=bootstrap_servers,
                    buffer_memory=33554432,
                    batch_size=1048576,
                    max_request_size=1048576,
                    key_serializer=key_serializer,
                    value_serializer=value_serializer,
                    compression_type=compression_type  # 压缩消息发送 gzip lz4 snappy zstd
                )
                print("connect success, kafka producer info {0}".format(bootstrap_servers))
            except Exception as e:
                raise Exception("connect kafka failed, {}.".format(e))
    
        def sync_send(self, topic: str, data):
            """
            同步发送数据
            :param data:  发送数据
            :param topic: 主题
            :return: partition, offset
            """
            try:
                future = self.producer.send(topic, data)
                record_metadata = future.get(timeout=10)  # 同步确认消费
                partition = record_metadata.partition  # 数据所在的分区
                offset = record_metadata.offset  # 数据所在分区的位置
                print("save success, partition: {}, offset: {}".format(partition, offset))
                return partition, offset
            except Exception as e:
                raise Exception("Kafka sync send failed, {}.".format(e))
    
        def async_send(self, topic: str, data):
            """
            异步发送数据
            :param data:  发送数据
            :param topic: 主题
            :return: None
            """
            try:
                self.producer.send(topic, data)
                print("send data:{}".format(data))
            except Exception as e:
                raise Exception("Kafka asyn send failed, {}.".format(e))
    
        def async_callback(self, topic: str, data):
            """
            异步发送数据 + 发送状态处理
            :param data:发送数据
            :param topic: 主题
            :return: None
            """
            try:
                for item in data:
                    self.producer.send(topic, item).add_callback(self.__send_success).add_errback(self.__send_error)
                    self.producer.flush()  # 批量提交
            except Exception as e:
                raise Exception("Kafka asyn send fail, {}.".format(e))
    
        @staticmethod
        def __send_success():
            """异步发送成功回调函数"""
            print("save success")
            return
    
        @staticmethod
        def __send_error():
            """异步发送错误回调函数"""
            print("save error")
            return
    
        def close(self):
            self.producer.close()
    
    
    class KConsumer:
        def __init__(self, bootstrap_servers: List, topic: str, group_id: str, key_deserializer=None,
                     value_deserializer=None, auto_offset_reset="latest"):
            self.topic = topic
            try:
                self.consumer = KafkaConsumer(
                    self.topic,
                    bootstrap_servers=bootstrap_servers,
                    group_id=group_id,
                    enable_auto_commit=False,
                    auto_commit_interval_ms=1000,
                    session_timeout_ms=30000,
                    max_poll_records=50,
                    max_poll_interval_ms=30000,
                    metadata_max_age_ms=3000,
                    key_deserializer=key_deserializer,
                    value_deserializer=value_deserializer,
                    auto_offset_reset=auto_offset_reset
                )
                self.consumer.subscribe(topics=[self.topic])
                print("connect to kafka and subscribe topic success")
            except Exception as e:
                raise Exception("Kafka pconsumers set connect fail, {0}, {1}".format(e, traceback.print_exc()))
    
        def get_consumer(self):
            """
            返会可迭代consumer
            :return: consumer
            """
            return self.consumer
    
        def set_topic(self, topic: str):
            """
            订阅主题
            :param topic: 主题
            :return: None
            """
            self.topic = topic
            self.consumer.subscribe(topics=[self.topic])
    
        def get_message_by_partition_offset(self, partition, offset):
            """
            通过partition、offset获取一个消息
            :param partition: 分区
            :param offset: 游标、下标、序号
            :return: message,消息
            """
            self.consumer.unsubscribe()
            partition = TopicPartition(self.topic, partition)
            self.consumer.assign([partition])
            self.consumer.seek(partition, offset=offset)
            for message in self.consumer:
                return message
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138

    测试代码

    kafka_test.py

    from kafka_helper import KProducer,KConsumer
    import json
    
    def sync_send_test(bootstrap_servers,topic,json_format=True):
        value = {
            "send_type": "sync_send",
            "name": "lady_killer",
            "age": 18
        }
        if json_format:
            p = KProducer(bootstrap_servers=bootstrap_servers)
            p.sync_send(value,topic)
        else:
            p = KProducer(bootstrap_servers=bootstrap_servers,key_serializer=None,value_serializer=None)
            v = bytes('{}'.format(json.dumps(value)), 'utf-8')
            p.sync_send(v,topic)
        p.close()
    
    def async_send_test(bootstrap_servers,topic,json_format=True):
        value = {
            "send_type": "async_send",
            "name":"lady_killer",
            "age":18
        }
        if json_format:
            p = KProducer(bootstrap_servers=bootstrap_servers)
            p.asyn_send(value,topic)
        else:
            p = KProducer(bootstrap_servers=bootstrap_servers,key_serializer=None,value_serializer=None)
            v = bytes('{}'.format(json.dumps(value)), 'utf-8')
            p.asyn_send(v,topic)
        p.close()
    
    def consumer_test(bootstrap_servers,topic):
        c = KConsumer(bootstrap_servers=bootstrap_servers,topic=topic,group_id='test',auto_offset_reset="earliest")
        for data in c.get_consumer():
            print(type(data.value),data.value)
            print(json.loads(data.value))
    
    def get_one_msg(bootstrap_servers,topic,partition,offset):
        c = KConsumer(bootstrap_servers=bootstrap_servers, topic=topic, group_id='test', auto_offset_reset="earliest")
        msg = c.get_message_by_partition_offset(partition,offset)
        print(msg)
    
    
    if __name__ == '__main__':
        bootstrap_servers = ["kafka:9092"]
        topic = "demodata"
        # 测试生产
        sync_send_test(bootstrap_servers=bootstrap_servers,topic=topic)
        async_send_test(bootstrap_servers=bootstrap_servers,topic=topic)
        sync_send_test(bootstrap_servers=bootstrap_servers,topic=topic,json_format=False)
        async_send_test(bootstrap_servers=bootstrap_servers,topic=topic,json_format=False)
        # 测试消费
        consumer_test(bootstrap_servers=bootstrap_servers,topic=topic)
        # get_one_msg(bootstrap_servers=bootstrap_servers,topic=topic,partition=0,offset=0)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56

    参考

    Kafka入门,这一篇就够了(安装,topic,生产者,消费者)

  • 相关阅读:
    SLA 、SLO & SLI
    【数据结构与算法 | 堆篇】力扣215
    Pipeline aggregations管道聚合- parent-2
    WSL2 安装与使用
    Vmware16安装CentOS7虚拟机
    抖音矩阵系统。抖音矩阵系统。抖音矩阵系统。抖音矩阵系统。抖音矩阵系统。抖音矩阵系统。
    技术应用:利用Lua脚本提升Redis操作效率与功能
    【操作系统】文件系统之文件共享与文件保护
    Python里的引用与拷贝规律
    10.前端打包与nginx部署
  • 原文地址:https://blog.csdn.net/lady_killer9/article/details/128908807