• kafka Interceptors and Listeners


    Interceptors

    ProducerInterceptor

    https://www.cnblogs.com/huxi2b/p/7072447.html

    Producer拦截器(interceptor)是个相当新的功能,它和consumer端interceptor是在Kafka 0.10版本被引入的,主要用于实现clients端的定制化控制逻辑。

    对于producer而言,interceptor使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时,producer允许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain),按照指定顺序调用它们.

    API

    public interface ProducerInterceptor<K, V> extends Configurable, AutoCloseable {
       
       	//该方法封装进KafkaProducer.send方法中,即它运行在用户主线程中的。Producer确保在消息被序列化以计算分区**前**调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算
        ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
    
       
       //该方法会在消息被应答之前或消息发送失败时调用,并且通常都是在producer回调逻辑触发之前。onAcknowledgement运行在producer的IO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢producer的消息发送效率
        void onAcknowledgement(RecordMetadata metadata, Exception exception);
    
       //关闭interceptor,主要用于执行一些资源清理工作
        void close();
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    demo

        public static void main(String[] args) throws ExecutionException, InterruptedException {
            Map<String, Object> props = new HashMap();
            props.put("bootstrap.servers", "localhost:9092");
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, RawSerializer.class);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, RawSerializer.class);
            List<String> interceptors = new ArrayList<>();
            interceptors.add("cn.jhs.kakfa.p.interceptor.TimeStampInterceptor"); // interceptor 1
            interceptors.add("cn.jhs.kakfa.p.interceptor.CounterInterceptor"); // interceptor 2
            props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
    
            String topic = "test-topic";
            Producer<String, String> producer = new KafkaProducer<>(props);
            for (int i = 0; i < 10; i++) {
                ProducerRecord<String, String> record = new ProducerRecord<>(topic, "message:" + i);
                producer.send(record).get();
            }
            // 一定要关闭producer,这样才会调用interceptor的close方法
            producer.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    ConsumerInterceptor

    https://blog.csdn.net/warybee/article/details/121980296

    消费者(Consumer)在拉取了分区消息之后,要首先经过反序列化器对key和value进行反序列化处理,处理完之后,如果消费端设置了拦截器,则需要经过拦截器的处理之后,才能返回给消费者应用程 序进行处理。

    • ConsumerInterceptor允许拦截甚至更改消费者接收到的消息。
    • 常用在于将第三方组件引入 消费者应用程序,用于定制的监控、日志处理等。
    • ConsumerInterceptor方法抛出的异常会被捕获、记录,但是不会向下传播。如果用户配置了 错误的key或value类型参数,消费者不会抛出异常,而仅仅是记录下来。
    • 如果有多个拦截器,则该方法按照KafkaConsumer的configs中配置的顺序调用。
    • 从调用 KafkaConsumer.poll(long) 的同一线程调用 ConsumerInterceptor 回调。

    API

    public interface ConsumerInterceptor<K, V> extends Configurable, AutoCloseable {
    
        /**
            该方法在poll方法返回之前调用。调用结束后poll方法就返回消息了。
            该方法可以修改消费者消息,返回新的消息。拦截器可以过滤收到的消息或生成新的消息。
         */
        ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);
    
        /**
          当消费者提交偏移量时,调用该方法。通常你可以在该方法中做一些记账类的动作,比如打日志等。
          调用者将忽略此方法抛出的任何异常。
         */
        void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
    
        /**
         *  关闭Interceptor之前调用
         */
        void close();
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    配置

    //如果有多个拦截器,用,分割即可
    configs.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.warybee.interceptor.MyConsumerInterceptor");
    
    • 1
    • 2

    Listeners

    ProducerListener

    https://blog.csdn.net/u014494148/article/details/125344184

    Kafka提供了生产者监听器 ProducerListener,他的作用类似于带回调的KafkaTemplate#send(callback) ; 可以监听到消息发送成功或者失败。ProducerListener 提供了onSuccess 成功回调,和 onError 失败回调,如下:

    API

    public interface ProducerListener<K, V> {
    
    	/**
    	 * Invoked after the successful send of a message (that is, after it has been acknowledged by the broker).
    	 */
    	default void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata) {
    	}
    
    	/**
    	 * Invoked after an attempt to send a message has failed.
    	 */
    	default void onError(ProducerRecord<K, V> producerRecord, @Nullable RecordMetadata recordMetadata,
    			Exception exception) {
    	}
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    自定义Listener

    public class MyProducerListener<K, V> implements ProducerListener<K, V> {
    
        private FallbackHandler<K, V> fallbackHandler;
    
        @Override
        public void onError(ProducerRecord<K, V> producerRecord, @Nullable RecordMetadata recordMetadata, Exception exception) {
           //fallbackHandler.process.
           //write error metrics...
        }
    
        @Override
        public void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata) {
           //write success metrics...
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    demo(KafkaTemplate.setProducerListener())

        public KafkaTemplate<Object, Object> buildKafkaTemplate(Map<String, Object> props) {
            ProducerFactory<Object, Object> factory = new DefaultKafkaProducerFactory<>(props);
            KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(factory);
            MyProducerListener<Object, Object> listener1 = new MyProducerListener<>();
            listener1.setFallbackHandler(fallbackHandler);
            kafkaTemplate.setProducerListener(listener1);
            return kafkaTemplate;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    KafkaListenerErrorHandler

    当@KafkaListener方法抛出异常时调用的错误处理程序.

    API

    @FunctionalInterface
    public interface KafkaListenerErrorHandler {
    
    	/**
    	 * Handle the error.
    	 */
    	Object handleError(Message<?> message, ListenerExecutionFailedException exception);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    自定义CustomKafkaListenerErrorHandler(当异常过多时,暂停消费)

    /**
     * 可以通过:
     * @org.springframework.kafka.annotation.KafkaListener(errorHandler="customKafkaListenerErrorHandler")
     * 来引入该配置
     */
    @Component
    public class CustomKafkaListenerErrorHandler implements KafkaListenerErrorHandler {
    	
    	//记录了所有的 kafka MessageListenerContainer
        private final KafkaListenerEndpointRegistry endpointRegistry;
    
        public CustomKafkaListenerErrorHandler(KafkaListenerEndpointRegistry endpointRegistry) {
            this.endpointRegistry = endpointRegistry;
        }
    
        @Override
        public Object handleError(Message<?> message, ListenerExecutionFailedException exception) {
            // 处理异常
            // 暂停消费者
            String listenerId = exception.getGroupId();
            MessageListenerContainer listenerContainer = endpointRegistry.getListenerContainer(listenerId);
            listenerContainer.pause();
    
            //滑动窗口算法 ---
            // 休眠一段时间(例如 30秒)
            try {
                Thread.sleep(30000); // 暂停 30 秒
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
    
            // 恢复消费者
            listenerContainer.resume();
    
            return null;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    demo

    @org.springframework.kafka.annotation.KafkaListener(errorHandler="customKafkaListenerErrorHandler")
    
    • 1

    Callback

    producer.Callback

    public interface Callback {
    	//processed befeore listener...
       void onCompletion(RecordMetadata metadata, Exception exception);
    }
    
    • 1
    • 2
    • 3
    • 4

    demo

    producer.send(producerRecord, (recordMetadata, exception) -> {
          if (exception == null) {
              System.out.println("Record written to offset " +
                      recordMetadata.offset() + " timestamp " +
                      recordMetadata.timestamp());
          } else {
              System.err.println("An error occurred");
              exception.printStackTrace(System.err);
          }
    });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
  • 相关阅读:
    【微信小程序】页面tabBar切换、下拉刷新
    Human Guided Ground-truth Generation for Realistic Image Super-resolution
    【UV打印机】理光喷头组合说明(24H)
    vue项目created()被调用多次的坑
    Mybatics-连接配置
    数据结构之堆的实现(图解➕源代码)
    Javase | 集合-上
    IP地址相关
    面试害怕考到JVM? 看这一篇就够了~
    基于java项目 服务器远程debug开启教程
  • 原文地址:https://blog.csdn.net/it_freshman/article/details/136472203