目录
2、获取topic并给kafka的目标topic里面写入数据:
kafka简单说明:
作为一个消息队列,主要就是由生产者Producer、消费者Consumer这两种角色进行队列的写入和队列的消费:
- 安装:
- pip install pykafka
- #支持python3和python2
-
- 连接本地kafka:
- >>> from pykafka import KafkaClient
- # kafka在服务器上则进行修改:
- >>> client = KafkaClient(hosts="127.0.0.1:9092")
- 获取所有topic:
- >>> client.topics
- {'my.test':
0x19bc8c0 (name=my.test)>} - >>> topic = client.topics['my.test']
-
- 创建producer对象,并对目标topic进行消息写入,也就是生产者写入数据:
- >>> with topic.get_producer() as producer:
- ... for i in range(4):
- ... producer.produce('test message ' + i ** 2)
- 创建消费者对象进行消费消息,可以指定group组也可以不指定,两个相同的组会消费相同的数据:
- >>> consumer = topic.get_simple_consumer(consumer_group='testwtgroup',
- auto_commit_enable=True)
- >>> for message in consumer:
- if message is not None:
- print message.offset, message.value
- 结果:
- 0 test message 0
- 1 test message 1
- 2 test message 4
- 3 test message 9
-
- # 消费者另一种方式,会根据指定的groupid进行动态分配,保证相同的组不会消费到相同数据:
- >>> balanced_consumer = topic.get_balanced_consumer(
- consumer_group='testgroup',
- auto_commit_enable=True,
- zookeeper_connect='myZkClusterNode1.com:2181,myZkClusterNode2.com:2181/myZkChroot'
- )
- # 统计kafka消息总数,找了一圈都没找到相关实现,最后发现其实就是用最后的偏移量的数据-最初偏移量的数据
- # 原理参考kafka的官网,通过命令行实现统计总的消息数据:
- # kafka-topics.sh --bootstrap-server {IP:port} --list
- # 查看kafka的数据 --time-1 表示要获取指定topic所有分区当前的最大位移,--time-2 表示获取当前最早位移
- # 下面用参数time -2可进行替换,[--time-1] - [--time-2] 就是partition里面当前实际的数据(相减),
- # kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list {IP:port} --topic {target_topic} --time -1
-
- offsets = topic.earliest_available_offsets()
- offsets2 = topic.latest_available_offsets()
- # print(offsets2)
- # print(offsets[0][0][0])
- # print(offsets2[0][0][0])
- a = offsets2[0][0][0] - offsets[0][0][0]
- b = offsets2[1][0][0] - offsets[1][0][0]
- c = offsets2[2][0][0] - offsets[2][0][0]
- print(a+b+c)
-
- 结果:
- 44064
pykafka官方地址: