pom文件引入kafka
项目用的是java8&java11 ,spring boot 2.X,对应kafka2.X
如果项目是java17 ,spring boot3.X,对应kafka3.X
<dependency>
<groupId>org.springframework.kafkagroupId>
<artifactId>spring-kafkaartifactId>
<version>2.5.4.RELEASEversion>
dependency>
注意:在之前的版本中,需要引入apache下的kafka,现在spring已经将kafka整合入springframework中了,只引这一个包就可以
kafka:
bootstrap-servers: 192.168.2.91:9090,192.168.2.91:9091,192.168.2.91:9092 # 集群的地址
producer:
retries: 1 #设置大于0的值,则客户端会将发送失败的记录重新发送。
batch-size: 16384 #16KB
buffer-memory: 33554432 #32MB
ack-s: all #指定消息key和消息体的编码方式。 0,1,all
key-serializer: org.apache.kafka.common.serialization.StringSerializer #键的序列化
value-serializer: org.apache.kafka.common.serialization.StringSerializer # 值的序列化
consumer:
group-id: default
enable-auto-commit: false # 自动提交
auto-commit-interval: 100 # 自动提交次数
auto-offset-reset: earliest #当默认的消费组启动的时候,会从默认的第一个消费组开始消费。
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
max-poll-records: 500 #一次最多拉500条消息。
listener:
# RECORD, 每条记录被监听
# BATCH, 批量
# TIME,
# COUNT,
# COUNT_TIME,
# MANUAL, 手动通知
# MANUAL_IMMEDIATE; 立即手动通知
ack-mode: RECORD
(1) spring 自动创建创建topic
@Configuration
public class KafkaTopicConfig {
/**
* 创建TopicName为topic.quick.initial的Topic并设置分区数为8以及副本数为1
* 修改分区数并不会导致数据的丢失,但是分区数只能增大不能减小
* @return
*/
@Bean
public NewTopic newTopic() {
return new NewTopic("topic.quick.initial",8, (short) 1);
}
@Bean
public NewTopic newDefaultTopic() {
return new NewTopic("topic.quick.default",2, (short) 1);
}
}
(2)手动创建topic(不常用)
**
* 手动创建topic
* @return
*/
@Bean
public AdminClient adminClient() {
return AdminClient.create(kafkaAdmin().getConfigurationProperties());
}
public KafkaAdmin kafkaAdmin() {
Map<String, Object> props = new HashMap<>();
//配置Kafka实例的连接地址
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.91:9090");
KafkaAdmin admin = new KafkaAdmin(props);
return admin;
}
代码如下:
启动后项目后,打开offset
就可以看见我们创建的topic了
@Component
public class KafkaReceiver {
/**
* id 该listener的id ,默认也是listener的组(groupId)
* topics 该listener监听的topic(可以同时监听多个topic)
*/
@KafkaListener(id = "rollback_default_test", topics = {"topic.quick.default"})
public void receiveSk(ConsumerRecord<String, String> record) {
System.out.println(record);
System.out.println("我收到了普通消息");
}
}
kafka的发送消息很简单
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@RequestMapping("/userGets")
public Object gets() {
// send 第一个参数为topic的名称,第二个参数为我们要发送的信息
kafkaTemplate.send("topic.quick.default","1231235");
return "发送成功";
}
控制台打印如下消息