• kafka 消息偏移量


    [oswatch@yyjk tmp]$ cat consumerkafka_mario.py
    #!/usr/bin/env python
    # coding=utf-8
    from  kafka import KafkaConsumer
    from  kafka import TopicPartition
    import time
    def get_kafka_reviews(bootstrap_servers,topics):
        # print type(self.bootstrap_servers)
        consumer = KafkaConsumer(bootstrap_servers=[bootstrap_servers],auto_offset_reset='latest', enable_auto_commit=False)
        consumer.subscribe(topics=(topics))  #订阅要消费的主题
        print consumer.topics()
        print "111111",consumer.position(TopicPartition(topic=u'NewProxyBaseData', partition=0)) #获取当前主题的最新偏移量
        print "222222",consumer.position(TopicPartition(topic=u'NewProxyBaseData', partition=1)) #获取当前主题的最新偏移量
        print "333333",consumer.position(TopicPartition(topic=u'NewProxyBaseData', partition=2)) #获取当前主题的最新偏移量
        time.sleep(30)

        review_list =[]
        for message in consumer:
            print message
            #print '====%s:%d:%d:key-%s value=%s=='%(message.topic,message.partition,message.offset,message.key,message.value)
            review_list.append(message.value)
        return  review_list

    print get_kafka_reviews('1.1.1.27:9092','NewProxyBaseData')
    [oswatch@yyjk tmp]$ python consumerkafka_mario.py
    set([u'NewMongoErrorCount', u'PROXY7GeneratorIndex', u'IndexTimeOut', u'MARIOREGISTER', u'BASERECORD', u'ProxyTimeOut', u'NewProxyTimeOut', u'ProxyGatherCompleted', u'NewProxyIndexPrepare', u'mutableAlertInfo', u'NewProxyGather', u'PROXY6', u'PROXY7', u'PROXY4', u'PROXY6GeneratorIndex', u'PROXY2', u'NewProxyNoBaseData', u'PROXY0', u'PROXY1', u'PROXY1GeneratorIndex', u'ProxyBaseData', u'NewMongoReInsert', u'PROXY8', u'PROXY9', u'PROXY10GeneratorIndex', u'ProxyAlert', u'ProxyGather', u'IndexOnTime', u'EXALERTSETINFO', u'RANGEDATA', u'PROXY9GeneratorIndex', u'PROXY100', u'PROXY100GeneratorIndex', u'NewAlertCalStandBy', u'index', u'MessageStandbyBank', u'PROXY5GeneratorIndex', u'BaseRecord', u'alert', u'PROXY10', u'PROXY5', u'databus', u'NewProxyIndexDetail', u'datasend', u'alerttest', u'PROXY2GeneratorIndex', u'PUSHALERTSETSEND', u'indextest', u'PROXY3GeneratorIndex', u'SubIndexTimeOut', u'NewProxyGatherCompleted', u'LUFAX', u'DATAFLOW', u'PROXY3', u'register', u'NewMongoReUpsert', u'MarioAgent', u'MessageStandby', u'PROXY8GeneratorIndex', u'IndexGatherTimeOut', u'GatherIndexTimeOut', u'ALERTINFO', u'NewProxyBaseData', u'luohantest', u'PROXY4GeneratorIndex', u'ProxyIndexDetail', u'registerbank', u'NewProxyGatherNoCore'])
    111111 584672460
    222222 574728466
    333333 581566016

  • 相关阅读:
    【CPP】Introduction
    【Python】牛客HJ43迷宫问题
    【Linux】生产消费模型 + 线程池
    2023.11.13 信息学日志
    安装Selenium
    微服务治理-含服务线上稳定性保障建设治理
    python 并发请求,转发
    数据治理:为什么不见BI作关联分析
    oracle 数据库实验三
    MySQL如何改进LRU算法
  • 原文地址:https://blog.csdn.net/zhaoyangjian724/article/details/126362392