package com.atguigu.spring.kafka.consumer;
import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
@SpringBootTest
class SpringKafkaConsumerApplicationTests {
@Resource
KafkaTemplate kafkaTemplate;
@Test
void contextLoads() {
for (int i = 0; i < 10; i++) {
kafkaTemplate.send("my_topic1",i%6,"", "消费者组"+i);
}
}
}
server:
port: 8120
# v1
spring:
Kafka:
bootstrap-servers: 192.168.74.148:9095,192.168.74.148:9096,192.168.74.148:9097
consumer:
# read-committed读事务已提交的消息 解决脏读问题
isolation-level: read-committed # 消费者的事务隔离级别:read-uncommitted会导致脏读,可以读取生产者事务还未提交的消息
# 消费者是否自动ack :true自动ack 消费者获取到消息后kafka提交消费者偏移量
enable-auto-commit: true
# 消费者提交ack时多长时间批量提交一次
auto-commit-interval: 1000
# 消费者第一次消费主题消息时从哪个位置开始
auto-offset-reset: earliest #指定Offset消费:earliest | latest | none
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
package com.atguigu.spring.kafka.consumer.listener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class MyKafkaListener {
@KafkaListener(topics ={"my_topic1"},groupId = "my_group1")
public void onMessage1(ConsumerRecord<String, String> record) {
System.out.println("my_group1消费者1获取到消息:topic = "+ record.topic()
+",partition:"+record.partition()
+",offset = "+record.offset()
+",key = "+record.key()
+",value = "+record.value());
}
@KafkaListener(topics ={"my_topic1"},groupId = "my_group1")
public void onMessage2(ConsumerRecord<String, String> record) {
System.out.println("my_group1消费者2获取到消息:topic = "+ record.topic()
+",partition:"+record.partition()
+",offset = "+record.offset()
+",key = "+record.key()
+",value = "+record.value());
}
@KafkaListener(topics ={"my_topic1"},groupId = "my_group2")
public void onMessage3(ConsumerRecord<String, String> record) {
System.out.println("my_group2消费者获取到消息:topic = "+ record.topic()
+",partition:"+record.partition()
+",offset = "+record.offset()
+",key = "+record.key()
+",value = "+record.value());
}
}
package com.atguigu.spring.kafka.consumer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
// Generated by https://start.springboot.io
// 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn
@SpringBootApplication
public class SpringKafkaConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(SpringKafkaConsumerApplication.class, args);
}
}
<configuration>
<logger name="org.apache.kafka.clients" level="debug" />
configuration>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0modelVersion>
<parent>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-parentartifactId>
<version>3.0.5version>
<relativePath/>
parent>
<groupId>com.atguigugroupId>
<artifactId>spring-kafka-consumerartifactId>
<version>0.0.1-SNAPSHOTversion>
<name>spring-kafka-consumername>
<description>spring-kafka-consumerdescription>
<properties>
<java.version>17java.version>
properties>
<dependencies>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starterartifactId>
dependency>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-testartifactId>
<scope>testscope>
dependency>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-webartifactId>
dependency>
<dependency>
<groupId>org.springframework.kafkagroupId>
<artifactId>spring-kafkaartifactId>
dependency>
dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-maven-pluginartifactId>
plugin>
plugins>
build>
project>
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v3.0.5)
my_group2消费者获取到消息:topic = my_topic1,partition:2,offset = 0,key = ,value = 消费者组2
my_group2消费者获取到消息:topic = my_topic1,partition:2,offset = 1,key = ,value = 消费者组8
my_group2消费者获取到消息:topic = my_topic1,partition:0,offset = 0,key = ,value = 消费者组0
my_group2消费者获取到消息:topic = my_topic1,partition:0,offset = 1,key = ,value = 消费者组6
my_group2消费者获取到消息:topic = my_topic1,partition:5,offset = 0,key = ,value = 消费者组5
my_group2消费者获取到消息:topic = my_topic1,partition:3,offset = 0,key = ,value = 消费者组3
my_group2消费者获取到消息:topic = my_topic1,partition:3,offset = 1,key = ,value = 消费者组9
my_group2消费者获取到消息:topic = my_topic1,partition:1,offset = 0,key = ,value = 消费者组1
my_group2消费者获取到消息:topic = my_topic1,partition:1,offset = 1,key = ,value = 消费者组7
my_group2消费者获取到消息:topic = my_topic1,partition:4,offset = 0,key = ,value = 消费者组4
my_group1消费者2获取到消息:topic = my_topic1,partition:2,offset = 0,key = ,value = 消费者组2
my_group1消费者2获取到消息:topic = my_topic1,partition:2,offset = 1,key = ,value = 消费者组8
my_group1消费者1获取到消息:topic = my_topic1,partition:5,offset = 0,key = ,value = 消费者组5
my_group1消费者1获取到消息:topic = my_topic1,partition:4,offset = 0,key = ,value = 消费者组4
my_group1消费者1获取到消息:topic = my_topic1,partition:3,offset = 0,key = ,value = 消费者组3
my_group1消费者1获取到消息:topic = my_topic1,partition:3,offset = 1,key = ,value = 消费者组9
my_group1消费者2获取到消息:topic = my_topic1,partition:1,offset = 0,key = ,value = 消费者组1
my_group1消费者2获取到消息:topic = my_topic1,partition:1,offset = 1,key = ,value = 消费者组7
my_group1消费者2获取到消息:topic = my_topic1,partition:0,offset = 0,key = ,value = 消费者组0
my_group1消费者2获取到消息:topic = my_topic1,partition:0,offset = 1,key = ,value = 消费者组6