• kafka 自定义Interceptor(通过拦截器对消息进行定制化处理)



    1. 说明

      producer生成消息时,interceptor使得用户在消息发送前以及producer回调逻辑触发前对消息做定制化处理。producer允许用户通过配置interceptor.classes参数指定多个interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain)。Intercetpor的实现接口是org.apache.kafka.clients.producer.ProducerInterceptor

    1.1 configure(configs)

      可获取配置信息,初始化调用一次
    在这里插入图片描述

    1.2 onSend(ProducerRecord)

      方法会运行在用户主线程中,封装进org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord, Callback),确保在消息被序列化以及计算分区前调用。(最好不要修改消息所属的topic和分区
    在这里插入图片描述

    1.3 onAcknowledgement(RecordMetadata, Exception)

      方法会在消息从RecordAccumulator成功发送到Kafka Broker之后,或者在发送过程中失败时调用,即在producer回调逻辑触发之前调用。(不要添加复杂逻辑,否则会拖慢producer的消息发送效率
    在这里插入图片描述

    1.4 close

      关闭拦截器,执行资源清理工作
    在这里插入图片描述

    注:producer将按照指定顺序调用拦截器,并仅捕获每个interceptor可能抛出的异常记录到错误日志中而非在向上传递


    2. 案例

      生成拦截链。第一个interceptor会在消息发送前将时间戳信息加到消息value的最前部;第二个interceptor会在消息发送后更新成功发送消息数或失败发送消息数到控制台

    拦截器一:

    import org.apache.kafka.clients.producer.ProducerInterceptor;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    
    import java.util.Map;
    
    public class TimeInterceptor implements ProducerInterceptor<String, String> {
    
        @Override
        public void configure(Map<String, ?> configs) {
    
        }
    
        @Override
        public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
            return new ProducerRecord<String, String>(record.topic(), record.partition(), record.key(), System.currentTimeMillis() + "----" + record.value());
        }
    
        @Override
        public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    
        }
    
        @Override
        public void close() {
    
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    拦截器二:

    import org.apache.kafka.clients.producer.ProducerInterceptor;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    
    import java.util.Map;
    
    public class CounterInterceptor implements ProducerInterceptor<String, String> {
        private int success;
        private int error;
    
        @Override
        public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
            return record;
        }
    
        @Override
        public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
            if (metadata != null) {
                success++;
            } else {
                error++;
            }
        }
    
        @Override
        public void close() {
            System.out.println("success:" + success);
            System.out.println("error:" + error);
        }
    
        @Override
        public void configure(Map<String, ?> configs) {
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    生产者:

    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    import java.util.ArrayList;
    import java.util.Properties;
    
    public class InterceptorProducer {
        public static void main(String[] args) throws InterruptedException {
    
            Properties props = new Properties();
    
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.23:29092,192.168.0.114:29092,192.168.0.110:29092");
            //ack应答级别
            props.put(ProducerConfig.ACKS_CONFIG, "all");
            //重试次数
            props.put("retries", 1);
            //批次大小
            props.put("batch.size", 16384);
            //等待时间
            props.put("linger.ms", 1);
            //RecordAccumulator缓冲区大小
            props.put("buffer.memory", 33554432);
            // key value 序列化
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
            /*添加拦截器链*/
            ArrayList<String> classes = new ArrayList<>();
            classes.add("com.cz.kafka.interceptor.TimeInterceptor");
            classes.add("com.cz.kafka.interceptor.CounterInterceptor");
            props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, classes);
            //创建生产者
            KafkaProducer<String, String> producer = new KafkaProducer<>(props);
            //发送数据
            for (int i = 0; i < 10; i++) {
                producer.send(new ProducerRecord<>("charges", i + ",female,25.84,0,no,northwest,28923.13692"));
            }
            //关闭
            producer.close();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
  • 相关阅读:
    ML&DL:《Hyperparameter tuning for machine learning models机器学习模型的超参数调优》翻译与解读
    KEIL 5.38的ARM-CM3/4 ARM汇编设计学习笔记10 - STM32的SDIO学习2
    【系统设计系列】 DNS和CDN
    Tomcat深入浅出(一)
    Redis从入门到放弃(3):发布与订阅
    IDEA提交本地项目到Gitee远程仓库
    Nginx解决跨域问题的一些想法
    熟悉Redis命令行
    python商城
    安全漏洞笔记-Fastjson高危漏洞预警
  • 原文地址:https://blog.csdn.net/javahelpyou/article/details/125904310