码农知识堂 - 1000bd
  •   Python
  •   PHP
  •   JS/TS
  •   JAVA
  •   C/C++
  •   C#
  •   GO
  •   Kotlin
  •   Swift
  • pykafka的基本使用及统计kafka消息总数


    目录

    1、pykafka的安装及连接kafka

    2、获取topic并给kafka的目标topic里面写入数据:

    3、对写入的消息进行消费:

    4、统计kafka消息总数:

    kafka简单说明:

    作为一个消息队列,主要就是由生产者Producer、消费者Consumer这两种角色进行队列的写入和队列的消费:

    1、pykafka的安装及连接kafka

    1. 安装:
    2. pip install pykafka
    3. #支持python3和python2
    4. 连接本地kafka:
    5. >>> from pykafka import KafkaClient
    6. # kafka在服务器上则进行修改:
    7. >>> client = KafkaClient(hosts="127.0.0.1:9092")

    2、获取topic并给kafka的目标topic里面写入数据:

    1. 获取所有topic:
    2. >>> client.topics
    3. {'my.test': 0x19bc8c0 (name=my.test)>}
    4. >>> topic = client.topics['my.test']
    5. 创建producer对象,并对目标topic进行消息写入,也就是生产者写入数据:
    6. >>> with topic.get_producer() as producer:
    7. ... for i in range(4):
    8. ... producer.produce('test message ' + i ** 2)

    3、对写入的消息进行消费:

    1. 创建消费者对象进行消费消息,可以指定group组也可以不指定,两个相同的组会消费相同的数据:
    2. >>> consumer = topic.get_simple_consumer(consumer_group='testwtgroup',
    3. auto_commit_enable=True)
    4. >>> for message in consumer:
    5. if message is not None:
    6. print message.offset, message.value
    7. 结果:
    8. 0 test message 0
    9. 1 test message 1
    10. 2 test message 4
    11. 3 test message 9
    12. # 消费者另一种方式,会根据指定的groupid进行动态分配,保证相同的组不会消费到相同数据:
    13. >>> balanced_consumer = topic.get_balanced_consumer(
    14. consumer_group='testgroup',
    15. auto_commit_enable=True,
    16. zookeeper_connect='myZkClusterNode1.com:2181,myZkClusterNode2.com:2181/myZkChroot'
    17. )

    4、统计kafka消息总数:

    1. # 统计kafka消息总数,找了一圈都没找到相关实现,最后发现其实就是用最后的偏移量的数据-最初偏移量的数据
    2. # 原理参考kafka的官网,通过命令行实现统计总的消息数据:
    3. # kafka-topics.sh --bootstrap-server {IP:port} --list
    4. # 查看kafka的数据 --time-1 表示要获取指定topic所有分区当前的最大位移,--time-2 表示获取当前最早位移
    5. # 下面用参数time -2可进行替换,[--time-1] - [--time-2] 就是partition里面当前实际的数据(相减),
    6. # kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list {IP:port} --topic {target_topic} --time -1
    7. offsets = topic.earliest_available_offsets()
    8. offsets2 = topic.latest_available_offsets()
    9. # print(offsets2)
    10. # print(offsets[0][0][0])
    11. # print(offsets2[0][0][0])
    12. a = offsets2[0][0][0] - offsets[0][0][0]
    13. b = offsets2[1][0][0] - offsets[1][0][0]
    14. c = offsets2[2][0][0] - offsets[2][0][0]
    15. print(a+b+c)
    16. 结果:
    17. 44064

    pykafka官方地址:

    pykafka · PyPI

  • 相关阅读:
    领跑物联网芯片市场|乐鑫 IoT 芯片全球出货量突破 10 亿颗!
    WebRTC Pacer
    03-RabbitMQ之基础概念
    Java-贪吃蛇游戏
    那些有趣好玩强大的Python库
    网站中接入手机验证码和定时任务(含源码)
    MATLAB初步进行机器学习
    数据结构实验6 :图的存储与遍历(邻接矩阵的深度优先遍历DFS和邻接表的广度优先遍历BFS)
    css圣杯布局和双飞翼布局
    HTML5+CSS3+Vue小实例:输入框打字放大特效
  • 原文地址:https://blog.csdn.net/m0_37570494/article/details/127677941
  • 最新文章
  • 攻防演习之三天拿下官网站群
    数据安全治理学习——前期安全规划和安全管理体系建设
    企业安全 | 企业内一次钓鱼演练准备过程
    内网渗透测试 | Kerberos协议及其部分攻击手法
    0day的产生 | 不懂代码的"代码审计"
    安装scrcpy-client模块av模块异常,环境问题解决方案
    leetcode hot100【LeetCode 279. 完全平方数】java实现
    OpenWrt下安装Mosquitto
    AnatoMask论文汇总
    【AI日记】24.11.01 LangChain、openai api和github copilot
  • 热门文章
  • 十款代码表白小特效 一个比一个浪漫 赶紧收藏起来吧!!!
    奉劝各位学弟学妹们,该打造你的技术影响力了!
    五年了,我在 CSDN 的两个一百万。
    Java俄罗斯方块,老程序员花了一个周末,连接中学年代!
    面试官都震惊,你这网络基础可以啊!
    你真的会用百度吗?我不信 — 那些不为人知的搜索引擎语法
    心情不好的时候,用 Python 画棵樱花树送给自己吧
    通宵一晚做出来的一款类似CS的第一人称射击游戏Demo!原来做游戏也不是很难,连憨憨学妹都学会了!
    13 万字 C 语言从入门到精通保姆级教程2021 年版
    10行代码集2000张美女图,Python爬虫120例,再上征途
Copyright © 2022 侵权请联系2656653265@qq.com    京ICP备2022015340号-1
正则表达式工具 cron表达式工具 密码生成工具

京公网安备 11010502049817号