Spring-kafka配置参数详解,批量发送与批量消费消息
配置文件
- 这个是我正在使用的配置,基本上都加了注释
- 有些我没用到的,就没写,以后有需要遇到了再补充
spring:
kafka:
bootstrap-servers: localhost:9092 # 用来初始化连接kafka(不用配置全部节点,会动态发现)
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
buffer-memory: 33554432 # 缓存容量。默认值32MB = 33554432
batch-size: 163840 # 默认 single request 批处理大小(以字节为单位),默认16KB = 16384
retries: 1 # 消息发送失败重试次数
acks: 1
properties:
linger:
ms: 500 # 不是立即发送一条记录,producer将会等待给定的延迟时间以允许其他消息记录发送。与batch-size配合使用,满足一个就发送
max:
request:
size: 1048576 # 请求的最大字节数
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
group-id: radar # 默认消费者组
max-poll-records: 2000 # 批量一次最大拉取数据量
enable-auto-commit: false # 自动提交已消费offfset,false-禁用
auto-commit-interval: 4000 # 自动提交时间间隔,单位ms
auto-offset-reset: earliest
heartbeat-interval: 10000 # ⼼跳与消费者协调员之间的预期时间(以毫秒为单位)
fetch-max-wait: 500
listener:
ack-mode: manual_immediate # manual_immediate-手动ack后立即提交;batch-批量自动确认;RECORD-单条自动确认;
type: batch # 批量消费
missing-topics-fatal: false # 未发现topic时不报错: 自动创建topic需要设置为false
template:
default-topic: radar
patitions: 7
replications: 1
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
消息批量发送
- 消息批量发送,主要是设置
batch-size和linger.ms。 batch-size参数是指一批消息发送的字节数,消息积累到这么多字节就会发送,默认16384(16KB)。根据自己业务需求,决定是要低延迟还是高吞吐量,可以改小或改大,也可以通过修改数值不断尝试,从而取得延迟与吞吐量的平衡。linger.ms是指延迟毫秒数,默认是0立刻发送,设置数值后在到达指定毫秒数时才会一起发送batch-size和linger.ms这两个条件都设置时,只要满足其中一个条件,就会发送消息- 对于
linger.ms,和batch-size不同,没法直接配置,需要使用properties进行配置,还有一些其他参数也是如此
消息批量消费
- 主要是
listener.type设置为batch,启用批量监听消费 max.poll.records,一次批量拉取的数量,默认500,可以根据需要设置大一点,但要注意,如果一次拉取太多,消费不了阻塞了,也会有问题- 我这里设置了禁用自动确认
enable-auto-commit: false,消息消费后手动确认立刻生效listener.ack-mode: manual_immediate
配置类
- Spring-kafka,维护配置文件即可,不需要手动创建bean
- 此处配置类,是为了在项目启动时,自动创建指定分区数、副本数的topic
- 如果你只有一个topic或者topic的分区和副本数都是一致的,也可以在kafka的配置文件
server.properties里设置,这个配置类就不需要了,只需要设置spring.kafka.listener.missing-topics-fatal为false即可,未发现topic时不会报错而是自动创建topic,具体可参考我的这篇博客
package com.newatc.collect.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class KafkaConfiguration {
@Value("${spring.kafka.template.default-topic}")
private String topic;
@Value("${spring.kafka.template.patitions}")
private Integer patitions;
@Value("${spring.kafka.template.replications}")
private Short replications;
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public Integer getPatitions() {
return patitions;
}
public void setPatitions(Integer patitions) {
this.patitions = patitions;
}
public Short getReplications() {
return replications;
}
public void setReplications(Short replications) {
this.replications = replications;
}
@Bean
public NewTopic topic() {
return new NewTopic(topic, patitions, replications);
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
消息生产者
- Spring kafka集成的很好,很多东西都不需要我们做了,直接使用
KafkaTemplate即可
package com.newatc.collect.config;
import com.newatc.collect.util.PartitionEnum;
import javax.annotation.Resource;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
public class KafkaProducer {
private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
@Value("${spring.kafka.template.default-topic}")
private String topic;
public void sendData(String type, String data) {
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, PartitionEnum.getMap().get(type), type, data);
kafkaTemplate.send(producerRecord);
}
public void sendMessage(String type, String message) {
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, PartitionEnum.getMap().get(type), type, message);
kafkaTemplate.send(producerRecord);
log.debug("发送 {} 数据到kafka : {}", type, message);
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
调用生产者发送消息
- 使用也很简单,把
KafkaProducer依赖注入即可
@Autowired
private KafkaProducer producer;
producer.sendData(PartitionEnum.FLOW_STATS.getType(), JSONObject.toJSONString(flowStats));
消息消费者
- 消息消费者也很简单,使用注解
@KafkaListener即可 - 可以指定消费的
topic和partition
@Component
public class KafkaConsumer {
@KafkaListener(
containerGroup = "${spring.kafka.consumer.group-id}",
topicPartitions = { @TopicPartition(topic = "${spring.kafka.template.default-topic}", partitions = { "0" }) }
)
public void receiverRealTimeDataRecord(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {
int size = records.size();
log.debug("RealTimeData RECV MSG COUNT: {}\n", size);
List<String> data = new ArrayList<>();
for (ConsumerRecord<String, String> consumerRecord : records) {
data.add(consumerRecord.value());
}
realTimeDataClickHouseService.saveAll(data);
log.debug("\n[RealTimeData] {} 消费完成", size);
ack.acknowledge();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24