• Kafka学习之:mac 上基础使用 python 来使用 kafka 的生产者和消费者进行数据处理


    前提

    • 我的配置是 M1 芯片 Macbook pro

    • 你的 kafka 处于启动状态,如果尚未启动,则通过以下命令依次运行 zookeeperkafka,如果有安装问题可以参考上一篇文章

      brew services start zookeeper
      brew services start kafka
      
      • 1
      • 2

    python 环境配置

    • 首先,确保安装了confluent-kafka

      pip install confluent-kafka
      
      • 1
    • 使用以下Python脚本创建一个新的Kafka主题:

      from confluent_kafka.admin import AdminClient, NewTopic
      
      # Kafka服务器配置
      admin_client = AdminClient({
          "bootstrap.servers": "localhost:9092"
      })
      
      # 创建新主题的配置
      topic_list = [NewTopic("my_new_topic", num_partitions=3, replication_factor=1)]
      # 注意: replication_factor 和 num_partitions 可能需要根据你的Kafka集群配置进行调整; 
      
      # 创建主题
      fs = admin_client.create_topics(topic_list)
      
      # 处理结果
      for topic, f in fs.items():
          try:
              f.result()  # The result itself is None
              print(f"Topic {topic} created")
          except Exception as e:
              print(f"Failed to create topic {topic}: {e}")
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 注意 replication_factor 不能超过 broker 的数量
      • 具体原因可以参考视频
      • 通过 python 只能创建 broker 的主题,而不能控制创建多个 broker,增加或管理brokers的过程需要在集群的配置和部署阶段进行,而不能通过像confluent_kafka这样的客户端库来实现。

    Kafka 生产消费者模型

    生产者 producer

    """
     @file: producer.py
     @Time    : 2024/3/29
     @Author  : Peinuan qin
     """
    from confluent_kafka import Producer
    import json
    
    # Kafka配置
    config = {
        'bootstrap.servers': 'localhost:9092'
    }
    
    # 创建生产者
    producer = Producer(**config)
    
    # 模拟的用户活动数据
    data = {'user_id': 1234, 'activity': 'page_view', 'page': 'homepage'}
    
    # 发送数据
    producer.produce('user_activities', key=str(data['user_id']), value=json.dumps(data))
    producer.flush()
    print("Data sent to Kafka")
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    Data sent to Kafka

    检查当前存在的所有 topic / 是否自动创建 topic

    • 可以用如下命令来检查已经存在的 topics

      kafka-topics --list --bootstrap-server localhost:9092
      
      • 1
    • 对于上述 producer 中,我的 server.property 中的 auto.create.topics.enable 设置为 True,这意味着如果当前 topics 不存在会自动创建。

    • 检查 auto.create.topics.enable 的方式:

      grep "auto.create.topics.enable" /path/to/your/kafka/config/server.properties
      
      • 1
    • 一般 /path/to/your/kafka/config/server.properties 在我的 上篇文章 中提到了, m1 芯片的 mac 的地址是在 /opt/homebrew/etc/kafka/server.properties,所以对应的查看命令就是:

      grep "auto.create.topics.enable" /opt/homebrew/etc/kafka/server.properties
      
      • 1
    • 如果这一行在你的 server.properties 中并不存在,则默认为 true,如果想更改,需要在 server.properties 中加入 auto.create.topics.enable=false 然后保存更改,重新启动 kafka

    • 当你设置成 auto.create.topics.enable=false,再次运行上面的代码,但是 topic 换成一个新的

    producer.produce('user_activities1', key=str(data['user_id']), value=json.dumps(data))
    
    • 1
    • 你会发现执行结果还是下面内容,并且没有报错:

      Data sent to Kafka

    • 但是当你列出所有的 topic,却发现其实 user_activities1 并没有创建成功

    为什么 producer 要通过 key, value 来发布数据

    键(Key)
    • 分区选择: 键主要用于决定消息被发送到主题的哪个分区。如果为消息指定了键,Kafka会对键进行哈希处理,根据哈希值将消息均匀分配到不同的分区。这种方式确保了相同键的所有消息都会被发送到同一个分区中,保证了消息的顺序性。如果没有指定键,消息会以轮询的方式分配到所有分区,这可能不会保证相同键的消息顺序。

    • 日志压缩: 键还用于日志压缩(log compaction)功能。在这个模式下,Kafka保证每个键在分区日志中只保留最后一次更新的值。这对于维护长期运行的聚合状态非常有用。

    值(Value)
    • 消息内容: 值部分承载了消息的实际内容。这是生产者想要发送给消费者的数据。值可以是任何格式的数据,比如字符串、JSON对象、序列化后的字节码等。

    • 使用场景示例

      • 订单系统:在一个订单系统中,订单ID可以作为键,而订单的详细信息(如客户信息、订单项、价格等)作为值。使用订单ID作为键确保了相同订单的更新会被顺序地发送到同一个分区,并且通过日志压缩,Kafka可以只保留订单的最新状态。
      • 用户行为跟踪:在用户行为跟踪应用中,用户ID 可以作为键,用户的行为(如点击、浏览等)作为值。这样,相同用户的所有行为都会被顺序地记录在同一个分区中,便于后续进行用户行为分析。

    消费者 consumer

    """
     @file: consumer.py
     @Time    : 2024/3/29
     @Author  : Peinuan qin
     """
    
    from confluent_kafka import Consumer, KafkaError
    
    # Kafka配置
    config = {
        'bootstrap.servers': 'localhost:9092',
        'group.id': 'user-activity-group',
        'auto.offset.reset': 'earliest'
    }
    
    # 创建消费者
    consumer = Consumer(**config)
    consumer.subscribe(['user_activities'])
    
    # 读取数据
    try:
        while True:
            msg = consumer.poll(timeout=1.0)  # 1秒超时
            if msg is None:
                continue
            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    # End of partition event
                    continue
                else:
                    print(msg.error())
                    break
            # 成功接收消息
            print('Received message: {}'.format(msg.value().decode('utf-8')))
    except KeyboardInterrupt:
        pass
    finally:
        # 清理操作
        consumer.close()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 可以看到我们订阅了 'user_activities' 这个 topic,从其中源源不断地取数据来进行消费(处理)

    • 但是同时在 consumer 的代码中定义了一个 group.id,而这个是 producer 中没有的,这样做的原因是:

      • 负载均衡:在同一个消费者组中,每个消费者可以负责消费特定的分区中的消息,这样可以在消费者之间分摊负载。如果一个消费者组中有多个消费者实例,Kafka会尽量平衡地将分区分配给每个消费者,确保每个分区只被组内的一个消费者消费。这意味着增加消费者可以提高消费的并行度,加快处理速度。

      • 容错和高可用性: 如果某个消费者失败,它负责的分区会被重新分配给同一消费者组内的其他消费者,这样可以确保消息的持续消费,提高了系统的容错能力。

      • 消息广播: 通过使用不同的消费者组,可以实现消息的广播模式,即相同的消息可以被多个消费者组独立消费。

    • 如果不是认为还不是很清楚,可以 参考视频

    consumer 得到的 message 有哪些方法?

    print("msg dict:", dir(msg))
    
    • 1
    msg dict: ['__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__len__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', 'error', 'headers', 'key', 'latency', 'leader_epoch', 'offset', 'partition', 'set_headers', 'set_key', 'set_value', 'timestamp', 'topic', 'value']
    
    • 1
    1. Key: 消息的键(如果有)。键用于消息的分区内排序和日志压缩。

    2. Value: 消息的实际内容或负载。

    3. Topic: 消息所属的主题。

    4. Partition: 消息所在的分区号。Kafka中的每个主题可以被分割成多个分区,分区号从0开始。

    5. Offset: 消息在其分区中的偏移量。偏移量是一个递增的序列号,用于唯一标识分区中的每条消息。

    6. Timestamp: 消息的时间戳。它可以是消息创建时的时间戳(生产者发送消息的时间)或者是消息被追加到日志的时间戳。时间戳的具体含义取决于Kafka生产者的配置。

    7. Headers: 消息头部,是键值对的集合,可以用来存储与消息相关的附加信息。生产者可以添加任意多的键值对作为消息的一部分,消费者可以读取这些信息进行相应的处理。

    8. Serialized Key Size: 键的序列化后的大小(以字节为单位)。如果消息没有键,通常这个值是-1。

    9. Serialized Value Size: 值的序列化后的大小(以字节为单位)。

    10. Leader Epoch (Kafka 0.11.0及以上版本): 分区领导者的纪元号。这是一个内部使用的字段,用于Kafka的复制机制,以确保数据一致性。

    为什么 consumer 拿到的内容需要 decode
    • 消费者在接收到Kafka中的消息后需要进行解码(decoding),原因在于消息的生产者在发送消息到Kafka之前通常会对消息的键(key)和值(value)进行编码(encoding)。编码和解码是为了在网络上传输数据时确保数据的一致性和完整性,同时也支持消息的有效存储。

    • 一般使用 utf-8 进行编解码

    • 上述的 producer 中没有显式调用 encoder 是因为 json.dumps 本身就是序列化的过程,也就是编码的过程。 但好习惯应该是对 keyvalue 都进行 encode

      producer.produce('user_activities1', key=str(data['user_id']).encode("utf-8"), value=json.dumps(data).encode("utf-8"))
      
      • 1
  • 相关阅读:
    后端jar包部署常见运行和停止命令
    漏洞复现----41、Spring Data Rest 远程命令执行漏洞(CVE-2017-8046)
    【LeetCode热题100】【子串】最小覆盖子串
    【git】超详细使用指令
    Spring之IOC容器(依赖注入)&基本介绍&基本配置&多模块化
    安卓Java面试题 91- 100
    电脑d盘不见了怎么恢复?
    内网-win1
    使用数据库维护数据来源,动态切换数据源的工具:dynamic-datasource
    【R语言】概率密度图
  • 原文地址:https://blog.csdn.net/qq_42902997/article/details/137159213