• 关于kafka-python的若干问题


    1.关于发送的三种方式
    
            1.发送即忘记:不关注消息是否成功到达,会有消息丢失情况
                例如:
    
    1. producer.send('my_test', msg, key='')
    2. producer.close()
            2.同步发送:开发同步发送消息的Producer程序关键在于producer的send方法返回的Future=对象,通过Future对象我们可以知道消息是否发送成功,如果发送成功,可以继续发送下一条消息。如果发送失败,可以再次发送或者存储起来
                例如:
    
    1. fu = producer.send('my_test', msg, key='')
    2. result = fu.get(timeout=10)
            3.异步发送:以回调函数的形式调用 send() 方法,当收到 broker 的响应,会触发回调函数执行。此方法既关注消息是否成功到达,又提高了消息的发送速度
                例如:
    
    1. producer.send(record, DemoProducerCallback);
    2. producer.close()
    2.如何从指定位置读取数据
    
        
    
    1. consumer = KafkaConsumer(bootstrap_servers='localhost:9092', api_version=(0, 10, 2))
    2. tp = TopicPartition("python_test", 0) # 参数是[topic名称,partition]
    3. consumer.assign([tp])
    4. consumer.seek(tp, 10) # 这里是设置偏移量
    5. consumer_data = next(consumer)
    3.怎么知道还有多少数据消费
    
        1.可视化界面
        2.通过获取分区end offset和当前 offset 进行计算即可:顺序消费机制
    
    1. def get_kafka_lag():
    2. consumer = KafkaConsumer('my_test',
    3. bootstrap_servers=['localhost:9092'],
    4. value_deserializer=bytes.decode,
    5. group_id='test',
    6. auto_offset_reset='earliest')
    7. topic = 'round_request'
    8. partitions = [TopicPartition(topic, p) for p in consumer.partitions_for_topic(topic)]
    9. print("start to cal offset:")
    10. # total
    11. toff = consumer.end_offsets(partitions)
    12. toff = [(key.partition, toff[key]) for key in toff.keys()]
    13. toff.sort()
    14. print("total offset: {}".format(str(toff)))
    15. # current
    16. coff = [(x.partition, consumer.committed(x)) for x in partitions]
    17. coff.sort()
    18. print("current offset: {}".format(str(coff)))
    19. # cal sum and left
    20. toff_sum = sum([x[1] for x in toff])
    21. cur_sum = sum([x[1] for x in coff if x[1] is not None])
    22. left_sum = toff_sum - cur_sum
    23. print("kafka sum: {} kafka consume:{}".format(toff_sum, cur_sum))
    24. print("kafka left: {}".format(left_sum))
    25. return left_sum
            
    4.明明读取数据了,但显示剩余量为全部
    
        1-因为没有commit。默认enable_auto_commit=True,如果不起作用,可以增加consumer.commit()
        2-增设group_id,就会自动提交,而且可以在kafka-manager中找到消费者组下的信息[但所有请求都处理完成后才会更改lag值]
        
    5.kafka消息保留机制
    
        5.1消息保留机制
            log.retention.hours=168  #设置保留时间 ms>minutes>hours
            log.retention.bytes=1024 #设置日志大小,超过会删除
            log.segment.bytes=1024   #设置分段大小,超过会新建日志
        5.2修改某一主题数据存留时间
           修改: ./kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name topicName --alter --add-config log.retention.hours=120
           验证: ./kafka-topics.sh --zookeeper localhost:2181 --topic topicName --describe
    
        
    6.是否需要开启多进程?如何开启多进程?能否承受得起高并发?
        启动多个消费者进程,指定partition_id
        
    7.如何监控kafka:数据倾斜,Lag量增加~~
    8.多个消费者之间消费数据的逻辑?平均消费还是随机消费?
        同一个组内消费者随机消费
  • 相关阅读:
    PLS-00172 string literal too long ORA-06550 字段太长插入不进去数据库
    腾讯云CVM服务器操作系统镜像大全
    shell 重定向
    透视虎牙斗鱼三季报:游戏直播在各自“求变”中见分晓
    【学习笔记】ARC146/AGC020/
    CSDN客服体验记录20220817
    Vue页面路由参数的传递和获取
    SpringCloud 微服务(二)
    前端模块化导入导出
    Linux 基础IO
  • 原文地址:https://blog.csdn.net/csdn_kelly/article/details/126226411