添加kafka stream依赖
<dependency>
<groupId>org.springframework.cloudgroupId>
<artifactId>spring-cloud-streamartifactId>
dependency>
<dependency>
<groupId>org.springframework.cloudgroupId>
<artifactId>spring-cloud-stream-binder-kafkaartifactId>
dependency>
application.yml中添加配置
--- #stream config
spring:
cloud:
stream:
binders:
myKafka1:
type: kafka
environment:
spring:
kafka:
bootstrap-servers: 127.0.0.1:9092
bindings:
helloFunc-in-0:
destination: hello-topic
group: hello-local-test-10
binder: myKafka1
consumer:
batch-mode: true
helloFunc-out-0:
destination: hello-topic
group: hello-local-test-10
binder: myKafka1
consumer:
batch-mode: true
# 注意 function 节点与stream 同级,而非子节点
function:
definition: helloFunc;
编写消费者:
@Slf4j
@Component
@RequiredArgsConstructor
public class HelloConsumer {
@Bean
public Consumer<Message<List<String>>> helloFunc() {
return message -> {
log.info("---------------------> ");
List<String> list = message.getPayload();
boolean result = this.handle(list);
if (result) {
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
if (acknowledgment != null) {
acknowledgment.acknowledge();
}
} else {
throw new RuntimeException("消费数据出错!");
}
};
}
private boolean handle(List<String> list){
log.info("list size : {}", list.size());
if (!CollectionUtils.isEmpty(list)){
log.info("group first message : {}", list.get(0));
}
return true ;
}
}