• Kafka消息分区&producer拦截器&无消息丢失(八)


    上篇文章说了,acks,1代表什么都不管,即使配置了回调也不会起作用,0代表不会等待replic副本里的不会持久化,只要broker leader持久化成功则返回给producer。-1代表all,则表示全部持久化成功才返回成功给producer,Retries,batch.size:kafka,linger.ms,buffer.memory,compression.type等参数。

    producer参数---Kafka从入门到精通(七)https://blog.csdn.net/ke1ying/article/details/126089250

    • 消息分区机制

    producer发送过程有个很重要的步骤,就是确定发送的消息在哪个topic分区中。Producer提供了分区策略和对应的分区器(partitioner)供用户使用。新版本的会把相同key的消息发送到partition上,如果没有指定key,则会通过轮询分配均匀在topic所在分区,而对于旧版本的无法分配均匀。

    自定义分区机制:

    对于有key的消息,java版本的producer会通过自己的算法计算key的哈希值,然后在总分区取模分配到目标分区。但有的时候用户想实现自己的分区策略,而这又是默认partitioner无法实现的,那么此刻就可以用producer提供的自定义分区策略。

    1. /**
    2. * @author keying
    3. */
    4. public class AuditPartitioner implements Partitioner {
    5. private Random random;
    6. @Override
    7. public int partition(String topic, Object keyObj, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    8. String key = (String) keyObj;
    9. List<PartitionInfo> partitionInfoList = cluster.availablePartitionsForTopic(topic);
    10. int auditPartition = partitionInfoList.size() - 1;
    11. return key == null || key.isEmpty() ||
    12. !key.contains("audit") ? random.nextInt(partitionInfoList.size() - 1) : auditPartition;
    13. //return 0;
    14. }
    15. @Override
    16. public void close() {
    17. }
    18. @Override
    19. public void configure(Map<String, ?> configs) {
    20. random = new Random();
    21. }
    22. }

    若自定义分区机制,则需要做两件事:

    1. 先定义一个类,实现org.apache.kafka.clients.producer.Partitioner接口,主要重写partition方法。
    2. 在构造kafkaProducer的时候propertites设置partitioner参数。

    Partition方法里主要接受参数有topic,key和value,还有集群元数据信息,一起来确定目标分区,而close方法则是用于关闭partitioner的,主要是为了关闭那些创建partitioner时初始化的系统资源等。

    举个例子如何实现自定义的partitioner呢,假设我们有个类似审计功能,审计功能发送kafka的时候可以给他分配字符串“audit”,我们想让这类消息发到topic最后一个分区上,便于后续统一处理,而对于相同topic下的其他消息则采用随机发送的策略发送到其他分区上。

    所以,用户可以根据key来指定一些策略,还可以根据value信息做一些定制化分区策略。

    • 消息序列化

    网络中发送数据都是以字节的方式,kafka也不例外,它可以是字符串,一个整数,一个数组或者其他任意对象类型。序列化器(serializer)负责在producer发送将消息转换成字节数组,而与之相反,解序列化器(deserializer)则用于将consumer接受到的字节数组转换成相应的对象。

    Kafka1.0.0默认提供十几种序列化器,常见的serializer用的是StringSerializer,然后其他的还有LongSerializer,IntegerSerializer等。如果是复杂的类型,比如Avro则需要自定义序列化。

    • Producer拦截器

    Producer拦截器相当于一个新的功能,他可以在producer发送消息之后以及回调之前有机会对消息做些定制化需求,比如修改消息等。同时,producer允许用户指定多个interceptor

    按序作用于同一条消息从而形成一个拦截器,intercetpor的实现接口是producerInterceptor,其定义方法如下:

    onSend(producerRecord):该方法封装进kafkaProducer.send方法中,即他运行在用户主线程中。Producer确保在消息被序列化以计算分区前调用该方法。用户可以在该方法对消息做任何处理,但最好不要修改消息的所属topic和分区,否则影响分区计算。

    onAcknowledgement(recordMetadata,Exception):该消息会在被应答之前或者消息发送失败时候调用,并且通常在producer回调触发之前调用。OnAcknoewledgement运行在producer的I/O线程中,因此不要在该方法放入很重的逻辑,否则会拖慢producer的消息发送效率。

    Close:关闭interceptor,主要做一些资源清理工作。

    如前所述,interceptor可能运行在多个线程中,因此具体实现时候需要用户自行确认保护线程安全。若指定多个interceptor,则producer将按照指定顺序调用他们,同时把每个interceptor中捕获的异常记录到错误日志中而不是向上传递。

    1. /**
    2. * @author keying
    3. * @date 2022-08-07 17:24:21
    4. */
    5. public class OneInterceptor implements ProducerInterceptor<String, String> {
    6. @Override
    7. public ProducerRecord<String, String> onSend(ProducerRecord record) {
    8. return new ProducerRecord(record.topic(), record.partition(),
    9. record.timestamp(), System.currentTimeMillis() + "," + record.value().toString());
    10. }
    11. @Override
    12. public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    13. }
    14. @Override
    15. public void close() {
    16. }
    17. @Override
    18. public void configure(Map<String, ?> configs) {
    19. }
    20. }
    21. /**
    22. * @author keying
    23. * @date 2022-08-07 17:27:40
    24. */
    25. public class TwoInterceptor implements ProducerInterceptor<String, String> {
    26. private int errorCounter = 0;
    27. private int successCounter = 0;
    28. @Override
    29. public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
    30. return null;
    31. }
    32. @Override
    33. public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    34. if (exception == null) {
    35. successCounter++;
    36. } else {
    37. errorCounter++;
    38. }
    39. }
    40. @Override
    41. public void close() {
    42. }
    43. @Override
    44. public void configure(Map<String, ?> configs) {
    45. System.out.println("成功:"+successCounter);
    46. System.out.println("失败:"+errorCounter);
    47. }
    48. }

    上面例子是实现一个简单的双inteceptor组成的拦截器,第一个拦截器会在消息发送前将时间戳加入到value,第二个拦截器则会统计成功和失败的次数。

    • 无消息丢失配置

    Producer采用的是异步发送消息机制,kafkaProducer.send方法仅仅把消息放入缓冲区,由一个专属的I/O线程负责提取缓冲区的消息并封装到batch中,然后发送出去。显然,整个过程存在数据丢失的窗口,若I/O线程在发送之前崩溃,则数据会丢失。

    另一个问题则是消息会乱序,比如客户端依次发送两条消息到不同的分区:

    Producer.send(records1);和producer.send(records2);

    若此刻某些原因,网络出现瞬时抖动,导致records1发送失败,同时kafka又配置了重试机制,max.in.flight.requests.per.connection大于1(默认是5),这样会造成消息乱序,而实际场景很多情况需要包装按顺序消费。

    所以这两个问题,kafka该如何规避呢?首先消息丢失很容易想到kafka的同步发送,但这样性能会很差,并不在实际场景中推荐使用。如何配置保证消息不会丢失呢?

    Block.on.buffer.full = true

    Acks=all 或者 -1

    Retries=Integer.MAX_VALUE

    Max.in.flight.request.per.connection=1

    使用回调机制的send发送消息

    CallBack逻辑中显式立即关闭producer,使用close(0)

    Unclean.leader.election.enable=false

    Replication.factor=3

    Min.insync.replicas = 2

    Replication.factor>min.insync.replicas

    Enable.auto.commit=false

    Producer端配置:

    Block.on.buffer.full = true,实际上这个参数在kafka0.9.0版本已经被标记为deprecated的,并且使用max.block.ms替代,但还是推荐用户显示的设置它为true,使得内存缓冲区被填满时producer处于阻塞状态,并且停止接受新消息而不是抛出异常。否则producer生产速度过快会耗尽缓冲区,新版本0.10.0.0不用管这个参数,直接设置max.block.ms参数。

    Acks = all很好理解,就是所有leader broker和副本replict里的follower都收到消息,才回复producer消息成功发送。

    Retries=Integer.MAX_VALUE:这里设置无限大有点极端,想表达的是无线重试,但放心这里不会重试那些无法恢复的错误,只会重试那些可恢复的异常,所以可以放心的设置比较大的值,保证消息不会丢失。

    max.in.flight.request.per.connection=1:设置为1防止消息在topic下乱序,这个设置的效果限制了producer在单个broker上连续发送的未响应请求数量。因此如果设置成1,则producer在某个broker发送响应之前将无法再给broker发送producer请求。

    使用带回调的send,普通的send官方解释是fire and forget,只管把消息发出去,不管后续,如果发送失败,不会收到任何通知,这里肯定要带回调的send发送。

    CallbackBack逻辑中显式处理立刻关闭producer:在calllback失败处逻辑立即使用kafkaProcuer.close(0),这样做的目的就是为了防止消息乱序问题。若不使用close关闭,默认情况下producer会被允许将未完成的消息发送出去,这样可能造成消息乱序。

    Broker端配置:

    Unclean.leader.election.eable = false:关闭unclean leader选举,即不允许非isr中的副本被选举成leader,从而避免broker端因为日志水位截断造成数据丢失。

    Replication.factor>=3 :设置成3主要参考业界的三备份原则,强调多个副本才好。

    Min.insync.replias>1:用于控制某条消息至少被写入ISR中多个副本才算成功,大于1代表提升持久性,只有在acks设置成-1或者all的时候才生效。

    确保 replication.factors>min.insync.replicas :若两者相等,则只要有一个副本挂掉,则分区无法正常使用,虽然持久性很高,但可用性被降低,建议 replication.factory = min.insync.replicas + 1。

  • 相关阅读:
    【Effective Modern Cpp】条款9:优先考虑别名声明而非typedef
    Java.lang.Byte类之parseByte(String s, int radix)方法的简介说明
    【动态规划刷题 15】最长定差子序列&& 最长的斐波那契子序列的长度
    MySQL 行变列
    STL 作业:名单1(爱思创)
    【机械仿真】基于matlab GUI直齿圆柱齿轮应力计算【含Matlab源码 2077期】
    GPT-4:思考的曙光还是数据的缩影?
    9月19日作业
    简单而高效:使用PHP爬虫从网易音乐获取音频的方法
    唯一刊登学生历史学术论文的期刊
  • 原文地址:https://blog.csdn.net/ke1ying/article/details/126212509