上篇文章说了,acks,1代表什么都不管,即使配置了回调也不会起作用,0代表不会等待replic副本里的不会持久化,只要broker leader持久化成功则返回给producer。-1代表all,则表示全部持久化成功才返回成功给producer,Retries,batch.size:kafka,linger.ms,buffer.memory,compression.type等参数。
producer参数---Kafka从入门到精通(七)https://blog.csdn.net/ke1ying/article/details/126089250
producer发送过程有个很重要的步骤,就是确定发送的消息在哪个topic分区中。Producer提供了分区策略和对应的分区器(partitioner)供用户使用。新版本的会把相同key的消息发送到partition上,如果没有指定key,则会通过轮询分配均匀在topic所在分区,而对于旧版本的无法分配均匀。
自定义分区机制:
对于有key的消息,java版本的producer会通过自己的算法计算key的哈希值,然后在总分区取模分配到目标分区。但有的时候用户想实现自己的分区策略,而这又是默认partitioner无法实现的,那么此刻就可以用producer提供的自定义分区策略。
-
- /**
- * @author keying
- */
- public class AuditPartitioner implements Partitioner {
-
- private Random random;
-
- @Override
- public int partition(String topic, Object keyObj, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
- String key = (String) keyObj;
- List<PartitionInfo> partitionInfoList = cluster.availablePartitionsForTopic(topic);
- int auditPartition = partitionInfoList.size() - 1;
- return key == null || key.isEmpty() ||
- !key.contains("audit") ? random.nextInt(partitionInfoList.size() - 1) : auditPartition;
- //return 0;
- }
-
- @Override
- public void close() {
-
- }
-
- @Override
- public void configure(Map<String, ?> configs) {
- random = new Random();
- }
- }
若自定义分区机制,则需要做两件事:
Partition方法里主要接受参数有topic,key和value,还有集群元数据信息,一起来确定目标分区,而close方法则是用于关闭partitioner的,主要是为了关闭那些创建partitioner时初始化的系统资源等。
举个例子如何实现自定义的partitioner呢,假设我们有个类似审计功能,审计功能发送kafka的时候可以给他分配字符串“audit”,我们想让这类消息发到topic最后一个分区上,便于后续统一处理,而对于相同topic下的其他消息则采用随机发送的策略发送到其他分区上。
所以,用户可以根据key来指定一些策略,还可以根据value信息做一些定制化分区策略。
网络中发送数据都是以字节的方式,kafka也不例外,它可以是字符串,一个整数,一个数组或者其他任意对象类型。序列化器(serializer)负责在producer发送将消息转换成字节数组,而与之相反,解序列化器(deserializer)则用于将consumer接受到的字节数组转换成相应的对象。
Kafka1.0.0默认提供十几种序列化器,常见的serializer用的是StringSerializer,然后其他的还有LongSerializer,IntegerSerializer等。如果是复杂的类型,比如Avro则需要自定义序列化。
Producer拦截器相当于一个新的功能,他可以在producer发送消息之后以及回调之前有机会对消息做些定制化需求,比如修改消息等。同时,producer允许用户指定多个interceptor
按序作用于同一条消息从而形成一个拦截器,intercetpor的实现接口是producerInterceptor,其定义方法如下:
onSend(producerRecord):该方法封装进kafkaProducer.send方法中,即他运行在用户主线程中。Producer确保在消息被序列化以计算分区前调用该方法。用户可以在该方法对消息做任何处理,但最好不要修改消息的所属topic和分区,否则影响分区计算。
onAcknowledgement(recordMetadata,Exception):该消息会在被应答之前或者消息发送失败时候调用,并且通常在producer回调触发之前调用。OnAcknoewledgement运行在producer的I/O线程中,因此不要在该方法放入很重的逻辑,否则会拖慢producer的消息发送效率。
Close:关闭interceptor,主要做一些资源清理工作。
如前所述,interceptor可能运行在多个线程中,因此具体实现时候需要用户自行确认保护线程安全。若指定多个interceptor,则producer将按照指定顺序调用他们,同时把每个interceptor中捕获的异常记录到错误日志中而不是向上传递。
-
- /**
- * @author keying
- * @date 2022-08-07 17:24:21
- */
- public class OneInterceptor implements ProducerInterceptor<String, String> {
-
- @Override
- public ProducerRecord<String, String> onSend(ProducerRecord record) {
- return new ProducerRecord(record.topic(), record.partition(),
- record.timestamp(), System.currentTimeMillis() + "," + record.value().toString());
- }
-
- @Override
- public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
-
- }
-
- @Override
- public void close() {
-
- }
-
- @Override
- public void configure(Map<String, ?> configs) {
-
- }
- }
-
-
-
-
- /**
- * @author keying
- * @date 2022-08-07 17:27:40
- */
- public class TwoInterceptor implements ProducerInterceptor<String, String> {
-
- private int errorCounter = 0;
- private int successCounter = 0;
-
- @Override
- public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
- return null;
- }
-
- @Override
- public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
- if (exception == null) {
- successCounter++;
- } else {
- errorCounter++;
- }
- }
-
- @Override
- public void close() {
-
- }
-
- @Override
- public void configure(Map<String, ?> configs) {
- System.out.println("成功:"+successCounter);
- System.out.println("失败:"+errorCounter);
- }
- }
上面例子是实现一个简单的双inteceptor组成的拦截器,第一个拦截器会在消息发送前将时间戳加入到value,第二个拦截器则会统计成功和失败的次数。
Producer采用的是异步发送消息机制,kafkaProducer.send方法仅仅把消息放入缓冲区,由一个专属的I/O线程负责提取缓冲区的消息并封装到batch中,然后发送出去。显然,整个过程存在数据丢失的窗口,若I/O线程在发送之前崩溃,则数据会丢失。
另一个问题则是消息会乱序,比如客户端依次发送两条消息到不同的分区:
Producer.send(records1);和producer.send(records2);
若此刻某些原因,网络出现瞬时抖动,导致records1发送失败,同时kafka又配置了重试机制,max.in.flight.requests.per.connection大于1(默认是5),这样会造成消息乱序,而实际场景很多情况需要包装按顺序消费。
所以这两个问题,kafka该如何规避呢?首先消息丢失很容易想到kafka的同步发送,但这样性能会很差,并不在实际场景中推荐使用。如何配置保证消息不会丢失呢?
Block.on.buffer.full = true
Acks=all 或者 -1
Retries=Integer.MAX_VALUE
Max.in.flight.request.per.connection=1
使用回调机制的send发送消息
CallBack逻辑中显式立即关闭producer,使用close(0)
Unclean.leader.election.enable=false
Replication.factor=3
Min.insync.replicas = 2
Replication.factor>min.insync.replicas
Enable.auto.commit=false
Producer端配置:
Block.on.buffer.full = true,实际上这个参数在kafka0.9.0版本已经被标记为deprecated的,并且使用max.block.ms替代,但还是推荐用户显示的设置它为true,使得内存缓冲区被填满时producer处于阻塞状态,并且停止接受新消息而不是抛出异常。否则producer生产速度过快会耗尽缓冲区,新版本0.10.0.0不用管这个参数,直接设置max.block.ms参数。
Acks = all很好理解,就是所有leader broker和副本replict里的follower都收到消息,才回复producer消息成功发送。
Retries=Integer.MAX_VALUE:这里设置无限大有点极端,想表达的是无线重试,但放心这里不会重试那些无法恢复的错误,只会重试那些可恢复的异常,所以可以放心的设置比较大的值,保证消息不会丢失。
max.in.flight.request.per.connection=1:设置为1防止消息在topic下乱序,这个设置的效果限制了producer在单个broker上连续发送的未响应请求数量。因此如果设置成1,则producer在某个broker发送响应之前将无法再给broker发送producer请求。
使用带回调的send,普通的send官方解释是fire and forget,只管把消息发出去,不管后续,如果发送失败,不会收到任何通知,这里肯定要带回调的send发送。
CallbackBack逻辑中显式处理立刻关闭producer:在calllback失败处逻辑立即使用kafkaProcuer.close(0),这样做的目的就是为了防止消息乱序问题。若不使用close关闭,默认情况下producer会被允许将未完成的消息发送出去,这样可能造成消息乱序。
Broker端配置:
Unclean.leader.election.eable = false:关闭unclean leader选举,即不允许非isr中的副本被选举成leader,从而避免broker端因为日志水位截断造成数据丢失。
Replication.factor>=3 :设置成3主要参考业界的三备份原则,强调多个副本才好。
Min.insync.replias>1:用于控制某条消息至少被写入ISR中多个副本才算成功,大于1代表提升持久性,只有在acks设置成-1或者all的时候才生效。
确保 replication.factors>min.insync.replicas :若两者相等,则只要有一个副本挂掉,则分区无法正常使用,虽然持久性很高,但可用性被降低,建议 replication.factory = min.insync.replicas + 1。