Producer拦截器(interceptor)是个相当新的功能,它和consumer端interceptor是在Kafka 0.10版本被引入的,主要用于实现clients端的定制化控制逻辑。
对于producer而言,interceptor使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时,producer允许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain)
,按照指定顺序调用它们.
public interface ProducerInterceptor<K, V> extends Configurable, AutoCloseable {
//该方法封装进KafkaProducer.send方法中,即它运行在用户主线程中的。Producer确保在消息被序列化以计算分区**前**调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算
ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
//该方法会在消息被应答之前或消息发送失败时调用,并且通常都是在producer回调逻辑触发之前。onAcknowledgement运行在producer的IO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢producer的消息发送效率
void onAcknowledgement(RecordMetadata metadata, Exception exception);
//关闭interceptor,主要用于执行一些资源清理工作
void close();
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
Map<String, Object> props = new HashMap();
props.put("bootstrap.servers", "localhost:9092");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, RawSerializer.class);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, RawSerializer.class);
List<String> interceptors = new ArrayList<>();
interceptors.add("cn.jhs.kakfa.p.interceptor.TimeStampInterceptor"); // interceptor 1
interceptors.add("cn.jhs.kakfa.p.interceptor.CounterInterceptor"); // interceptor 2
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
String topic = "test-topic";
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "message:" + i);
producer.send(record).get();
}
// 一定要关闭producer,这样才会调用interceptor的close方法
producer.close();
}
}
消费者(Consumer)在拉取了分区消息之后,要首先经过反序列化器对key和value进行反序列化处理,处理完之后,如果消费端设置了拦截器,则需要经过拦截器的处理之后,才能返回给消费者应用程 序进行处理。
public interface ConsumerInterceptor<K, V> extends Configurable, AutoCloseable {
/**
该方法在poll方法返回之前调用。调用结束后poll方法就返回消息了。
该方法可以修改消费者消息,返回新的消息。拦截器可以过滤收到的消息或生成新的消息。
*/
ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);
/**
当消费者提交偏移量时,调用该方法。通常你可以在该方法中做一些记账类的动作,比如打日志等。
调用者将忽略此方法抛出的任何异常。
*/
void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
/**
* 关闭Interceptor之前调用
*/
void close();
}
//如果有多个拦截器,用,分割即可
configs.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.warybee.interceptor.MyConsumerInterceptor");
Kafka提供了生产者监听器 ProducerListener,他的作用类似于带回调的KafkaTemplate#send(callback) ; 可以监听到消息发送成功或者失败。ProducerListener 提供了onSuccess 成功回调,和 onError 失败回调,如下:
public interface ProducerListener<K, V> {
/**
* Invoked after the successful send of a message (that is, after it has been acknowledged by the broker).
*/
default void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata) {
}
/**
* Invoked after an attempt to send a message has failed.
*/
default void onError(ProducerRecord<K, V> producerRecord, @Nullable RecordMetadata recordMetadata,
Exception exception) {
}
}
public class MyProducerListener<K, V> implements ProducerListener<K, V> {
private FallbackHandler<K, V> fallbackHandler;
@Override
public void onError(ProducerRecord<K, V> producerRecord, @Nullable RecordMetadata recordMetadata, Exception exception) {
//fallbackHandler.process.
//write error metrics...
}
@Override
public void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata) {
//write success metrics...
}
}
public KafkaTemplate<Object, Object> buildKafkaTemplate(Map<String, Object> props) {
ProducerFactory<Object, Object> factory = new DefaultKafkaProducerFactory<>(props);
KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(factory);
MyProducerListener<Object, Object> listener1 = new MyProducerListener<>();
listener1.setFallbackHandler(fallbackHandler);
kafkaTemplate.setProducerListener(listener1);
return kafkaTemplate;
}
当@KafkaListener方法抛出异常时调用的错误处理程序.
@FunctionalInterface
public interface KafkaListenerErrorHandler {
/**
* Handle the error.
*/
Object handleError(Message<?> message, ListenerExecutionFailedException exception);
}
/**
* 可以通过:
* @org.springframework.kafka.annotation.KafkaListener(errorHandler="customKafkaListenerErrorHandler")
* 来引入该配置
*/
@Component
public class CustomKafkaListenerErrorHandler implements KafkaListenerErrorHandler {
//记录了所有的 kafka MessageListenerContainer
private final KafkaListenerEndpointRegistry endpointRegistry;
public CustomKafkaListenerErrorHandler(KafkaListenerEndpointRegistry endpointRegistry) {
this.endpointRegistry = endpointRegistry;
}
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException exception) {
// 处理异常
// 暂停消费者
String listenerId = exception.getGroupId();
MessageListenerContainer listenerContainer = endpointRegistry.getListenerContainer(listenerId);
listenerContainer.pause();
//滑动窗口算法 ---
// 休眠一段时间(例如 30秒)
try {
Thread.sleep(30000); // 暂停 30 秒
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// 恢复消费者
listenerContainer.resume();
return null;
}
}
@org.springframework.kafka.annotation.KafkaListener(errorHandler="customKafkaListenerErrorHandler")
public interface Callback {
//processed befeore listener...
void onCompletion(RecordMetadata metadata, Exception exception);
}
producer.send(producerRecord, (recordMetadata, exception) -> {
if (exception == null) {
System.out.println("Record written to offset " +
recordMetadata.offset() + " timestamp " +
recordMetadata.timestamp());
} else {
System.err.println("An error occurred");
exception.printStackTrace(System.err);
}
});