• 摸鱼大数据——Kafka——kafka tools工具使用


    可以在可视化的工具通过点击来操作kafka完成主题的创建,分区等操作

    注意: 安装完后桌面不会有快捷方式,需要去电脑上搜索,或者去自己选的安装位置找到发送快捷方式到桌面!

     连接配置

    创建主题

    删除主题

    主题下的数据查看

    数据显示问题说明

    修改工具的数据显示类型

    发送消息数据到kafka

    Kafka的Python API的操作

    模块安装

    纯Python的方式操作Kafka。

    准备工作:在node1的节点上安装一个python用于操作Kafka的库

    安装kafka-python 模模块 ,模块中提供了操作kafka的方法

    在线安装

    在node1上安装就可以,需要保证服务器能够连接网络

     安装命令: python -m pip install kafka-python -i https://pypi.tuna.tsinghua.edu.cn/simple

    离线安装

    将kafka_python-2.0.2-py2.py3-none-any.whl安装包上传服务器software目录下进行安装

     安装命令: pip install kafka_python-2.0.2-py2.py3-none-any.whl

    模块使用

    API使用的参考文档: Usage — kafka-python 2.0.2-dev documentation

    模块中封装了两个类,

    一个是生成者类KafkaProducer,提供了向kafka写数据的方法

    另一个是消费者类KafkaConsumer,提供了读取kafka数据的方法

    完成生产者代码

    生成者类KafkaProducer,提供了向kafka写数据的方法

     send(topic,valu)方法: 发送消息
     topic参数:指定向哪个主题发送消息
     value参数:指定发送的消息数据 ,数据类型要求是bytes类型

    示例:

     # 导包
     from kafka import KafkaProducer
     ​
     # 编写代码
     if __name__ == '__main__':
         # 创建生产者对象并指定对应服务器
         producer = KafkaProducer(bootstrap_servers=['node1:9092'])
         # 发送消息
         for i in range(1,101):
             future = producer.send('kafka', f'hi_kafka_{i}'.encode())
             # 获取元数据
             record_metadata = future.get()
             # 从元数据中获取主题,分区,偏移
             print(record_metadata.topic)
             print(record_metadata.partition)
             print(record_metadata.offset)

    完成消费者代码

    消费者类KafkaConsumer,提供了读取kafka数据的方法

     KafkaConsumer(topic,bootstrap_servers)
     第一个参数:指定消费者连接的主题,
     第二个参数:指定消费者连接的kafka服务器

    示例:

     # 导包
     from kafka import KafkaConsumer
     ​
     # 编写代码
     if __name__ == '__main__':
     ​
         # 创建消费者对象
         consumer = KafkaConsumer('kafka',bootstrap_servers=['node1:9092'])
         # 遍历对象
         for message in consumer:
     ​
             # 格式化打印,设置相关参数
             # 因为value是二进制,需要decode解码
             print ("主题:%s,分区:%d,偏移:%d : key=%s value=%s"
                    % (message.topic, message.partition,message.offset, message.key, message.value.decode('utf8')))
     ​

    可能遇到的错误:

     原因: 服务器环境有问题。是因为服务器上既安装了kafka-python的第三方依赖,同时还安装kafka的第三方依赖。可以通过pip list | grep kafka进行确定
     解决办法: 先将这两个第三方依赖全部卸载,然后再重新执行如下命令
     python -m pip install kafka-python -i https://pypi.tuna.tsinghua.edu.cn/simple

  • 相关阅读:
    GrapeCity Documents for Excel:GcExcel 5.1.0
    怎么把登记表做成二维码?用二维码登记信息的方法
    jQuery事件对象
    软文推广如何提升产品转化率,媒介盒子分享
    AutoCAD DWG,DXF文件导出高清图片、PDF
    HTML+CSS鲜花网页制作 DW静态网页设计 简单的个人网页制作
    018python-绝对路径、相对路径
    java将list转化成树
    【Overload游戏引擎细节分析】standard材质Shader
    Kubernetes 部署 nfs-subdir-external-provisioner
  • 原文地址:https://blog.csdn.net/weixin_65694308/article/details/140452831