org.springframework.boot
spring-boot-starter-parent
2.6.6
org.springframework.cloud
spring-cloud-starter-stream-kafka
org.springframework.kafka
spring-kafka
org.springframework.cloud
spring-cloud-stream
io.projectreactor.kafka
reactor-kafka
1.3.11
关键架包reactor-kafka
package com.kittlen.cloud.reactivekafka.config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate;
import org.springframework.stereotype.Component;
import reactor.kafka.receiver.ReceiverOptions;
import java.util.Collections;
/**
* @author kittlen
* @version 1.0
* @date 2022/38/04 10:38
*/
@Component
public class ReactiveConsumerConfig {
@Bean
public ReceiverOptions<String, String> kafkaReceiverOptions(@Value(value = "${kafka.consumer.topic}") String topic, KafkaProperties kafkaProperties) {
ReceiverOptions<String, String> basicReceiverOptions = ReceiverOptions.create(kafkaProperties.buildConsumerProperties());
return basicReceiverOptions.subscription(Collections.singletonList(topic));
}
@Bean
public ReactiveKafkaConsumerTemplate<String, String> reactiveKafkaConsumerTemplate(ReceiverOptions<String, String> kafkaReceiverOptions) {
return new ReactiveKafkaConsumerTemplate<>(kafkaReceiverOptions);
}
}
package com.kittlen.cloud.reactivekafka.consumers;
import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.kafka.receiver.ReceiverRecord;
/**
* @author kittlen
* @version 1.0
* @date 2022/40/04 10:40
*/
@Service
public class ReactiveConsumerService implements CommandLineRunner {
protected Log log = LogFactory.getLog(ReactiveConsumerService.class);
@Autowired
ReactiveKafkaConsumerTemplate<String, String> requestMsgReactiveKafkaConsumerTemplate;
private Flux<Mono<Boolean>> dgkConsummer() {
Flux<Mono<Boolean>> monoFlux = requestMsgReactiveKafkaConsumerTemplate
.receiveAutoAck()
.map(cr -> handler(cr))
.doOnError(throwable -> log.error("something bad happened while consuming : {}", throwable.getMessage()));
return monoFlux;
}
//返回类型根据实际需求自己进行调整
//在该方法里面如果直接抛出异常,会直接导致停止对该topic的监听
protected Mono<Boolean> handler(ConsumerRecord<String, String> consumerRecord) {
try{
/*
* 对监听到的数据的处理逻辑
* */
return Mono.just(true);
}catch (Exception e) {
return Mono.error(e);
}
}
@Override
public void run(String... args) {
dgkConsummer().subscribe(m -> m.subscribe());
}
}
创建kafkaReceiverOptions时订阅多个topic
@Bean
public ReceiverOptions<String, String> kafkaReceiverOptions(KafkaProperties kafkaProperties) {
ReceiverOptions<String, String> basicReceiverOptions = ReceiverOptions.create(kafkaProperties.buildConsumerProperties());
return basicReceiverOptions.subscription(Stream.of("topic1", "topic2").collect(Collectors.toList()));
}
处理消息时根据topic进行判断
protected Mono<Boolean> handler(ConsumerRecord<String, String> consumerRecord) {
try{
if(consumerRecord.topic().equals("topic1")){
//***
}
/*
* 对监听到的数据的处理逻辑
* */
return Mono.just(true);
}catch (Exception e) {
return Mono.error(e);
}
}
@Bean
public ReceiverOptions<String, String> kafkaReceiverOptions1(KafkaProperties kafkaProperties) {
ReceiverOptions<String, String> basicReceiverOptions = ReceiverOptions.create(kafkaProperties.buildConsumerProperties());
return basicReceiverOptions.subscription(Collections.singletonList("topic1"));
}
@Bean
public ReactiveKafkaConsumerTemplate<String, String> reactiveKafkaConsumerTemplate1(ReceiverOptions<String, String> kafkaReceiverOptions1) {
return new ReactiveKafkaConsumerTemplate<>(kafkaReceiverOptions1);
}
@Bean
public ReceiverOptions<String, String> kafkaReceiverOptions2(KafkaProperties kafkaProperties) {
ReceiverOptions<String, String> basicReceiverOptions = ReceiverOptions.create(kafkaProperties.buildConsumerProperties());
return basicReceiverOptions.subscription(Collections.singletonList("topic2"));
}
@Bean
public ReactiveKafkaConsumerTemplate<String, String> reactiveKafkaConsumerTemplate2(ReceiverOptions<String, String> kafkaReceiverOptions2) {
return new ReactiveKafkaConsumerTemplate<>(kafkaReceiverOptions2);
}
@Resource(name = "reactiveKafkaConsumerTemplate1")
ReactiveKafkaConsumerTemplate<String, String> requestMsgReactiveKafkaConsumerTemplate1;
@Resource(name = "reactiveKafkaConsumerTemplate2")
ReactiveKafkaConsumerTemplate<String, String> requestMsgReactiveKafkaConsumerTemplate2;
private Flux<Mono<Boolean>> dgkConsummer1() {
Flux<Mono<Boolean>> monoFlux = requestMsgReactiveKafkaConsumerTemplate1
.receiveAutoAck()
.map(cr -> handler1(cr))
.doOnError(throwable -> log.error("something bad happened while consuming : {}", throwable.getMessage()));
return monoFlux;
}
//返回类型根据实际需求自己进行调整
protected Mono<Boolean> handler1(ConsumerRecord<String, String> consumerRecord) {
try{
/*
* 对监听到的数据的处理逻辑
* */
return Mono.just(true);
}catch (Exception e) {
return Mono.error(e);
}
}
private Flux<Mono<Boolean>> dgkConsummer2() {
Flux<Mono<Boolean>> monoFlux = requestMsgReactiveKafkaConsumerTemplate2
.receiveAutoAck()
.map(cr -> handler2(cr))
.doOnError(throwable -> log.error("something bad happened while consuming : {}", throwable.getMessage()));
return monoFlux;
}
//返回类型根据实际需求自己进行调整
protected Mono<Boolean> handler2(ConsumerRecord<String, String> consumerRecord) {
try{
/*
* 对监听到的数据的处理逻辑
* */
return Mono.just(true);
}catch (Exception e) {
return Mono.error(e);
}
}
@Override
public void run(String... args) {
dgkConsummer().subscribe(m -> m.subscribe());
dgkConsummer2().subscribe(m -> m.subscribe());
}