• centos 上安装 kafka 与 python 调用


    step0: 环境准备

    1、 安装jdk 1.8 以上版本

    yum -y install java-1.8.0-openjdk.x86_64
    
    • 1

    2、 安装配置ZooKeeper

    wget https://dlcdn.apache.org/zookeeper/zookeeper-3.8.2/apache-zookeeper-3.8.2-bin.tar.gz --no-check-certificate
    tar -zxf apache-zookeeper-3.8.2-bin.tar.gz 
    
    
    # 存放数据的路径 根据需要修改
    mkdir /你存放数据的路径/data
    mkdir /你存放日志的路径/logs
    cd apache-zookeeper-3.8.2-bin/conf
    cp zoo_sample.cfg zoo.cfg
    vim zoo.cfg
    # 增加如下内容
    # dataDir = /你存放数据的路径/data(自定义目录)
    # dataLogDir = /你存放日志的路径/logs(自定义目录)
    cd ../bin
    ./zkServer.sh start
    #./zkServer.sh stop
    #./zkServer.sh restart
    #./zkServer.sh status
    # 如果启动失败 Starting zookeeper ... FAILED TO START 可能是server 的端口号被占用 需要指定下端口号
    # vim zoo.cfg 
    # admin.serverPort=9091
    
    ./zkCli.sh 
    ./zkServer.sh stop
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    step1: 下载kafka

    wget https://dlcdn.apache.org/kafka/3.5.0/kafka_2.13-3.5.0.tgz --no-check-certificate
    tar -xzf kafka_2.13-3.5.0.tgz
    cd kafka_2.13-3.5.0
    # 新建一个会话
    # yum install tmux -y
    tmux 
    # Start the ZooKeeper service
    ./bin/zookeeper-server-start.sh config/zookeeper.properties 
    #  Start the Kafka broker service
    bin/kafka-server-start.sh config/server.properties
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    step3: 验证安装

    KafkaProducer

    pip install kafka-python
    
    • 1
    import logging
    import time
    from kafka import KafkaProducer
    
    
    def on_send_success(record_metadata):
        print(record_metadata.topic)
        print(record_metadata.partition)
        print(record_metadata.offset)
    
    
    def on_send_error(excp):
        logging.error('I am an errback', exc_info=excp)
        # handle exception
    
    # 将yoursIp替换为启动kafka 服务的ip
    producer = KafkaProducer(bootstrap_servers="yoursIp:9092")
    i = 0
    while True:
        ts = int(time.time() * 1000)
        future = producer.send(topic="test", value=bytes(str(i), encoding="utf-8"), key=bytes(str(i), encoding="utf-8"),
                               timestamp_ms=ts).add_callback(on_send_success).add_errback(on_send_error)
        record_metadata = future.get(timeout=60)
        producer.flush()
        print(i)
        print(record_metadata)
    
        i += 1
        time.sleep(1)
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    在这里插入图片描述

    KafkaConsummer

    from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata
    import time
    
    tp = TopicPartition("test", 0)
    consummer = KafkaConsumer(bootstrap_servers=["yoursIp:9092"], auto_offset_reset='earliest',
                              enable_auto_commit=False, group_id="my-group")
    consummer.assign([tp])
    print("starting offset is {}".format(consummer.position(tp)))
    for message in consummer:
        print(message, message.offset)
    
        consummer.commit({tp: OffsetAndMetadata(message.offset + 1, message)})
        time.sleep(0.15)
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    在这里插入图片描述
    重新启动后,consumer从上一次记录的offset开始继续消费消息。之后每一次consumer重新启动,consumer都会从上一次停止的地方继续开始消费。

    参考链接

    • https://kafka.apache.org/quickstart
    • https://blog.csdn.net/qq_54780911/article/details/124293670
    • https://cloud.tencent.com/developer/article/1700375
    • https://juejin.cn/post/6924584866327560199
    • https://kafka-python.readthedocs.io/
  • 相关阅读:
    Go 字符串处理
    GEO生信数据挖掘(二)下载基因芯片平台文件及注释
    基于springboot实现新生宿舍管理系统演示【项目源码+论文说明】分享
    MongoDB安装及进程介绍
    2022的七夕,奉上7个精美的表白代码,同时教大家快速改源码自用
    030:vue中使用md5进行数据加密示例
    leetcode:2053. 数组中第 K 个独一无二的字符串(python3解法)
    Python练习分割字符串
    Linux驱动调试之printk的使用
    T1065 奇数求和(信息学一本通C++)
  • 原文地址:https://blog.csdn.net/Magicapprentice/article/details/133169498