kafka消费消息记录。
<dependency>
<groupId>org.apache.kafkagroupId>
<artifactId>kafka-clientsartifactId>
<version>2.3.0version>
dependency>
<dependency>
<groupId>ch.qos.logbackgroupId>
<artifactId>logback-classicartifactId>
<version>1.2.11version>
dependency>
private Logger log= LoggerFactory.getLogger(this.getClass());
/**
* kafka 连接地址
*/
public String bootstrapServer ="localhost:9092";
/**
* topic
*/
public String topic="test1";
public void kafkaDemo() {
Properties properties = new Properties();
//borker地址
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServer);
//反序列化方式
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
//------------------消费组id必须指定----------------------------
//指定消费者组id
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group1");
//earliest:offset偏移至最早时候开始消费;latest:偏移到从最新开始消费(默认) earliest 从最早位置消费消息
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
//每批次最小拉取数据大小,默认1byte
properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG,1);
//每批次最大拉取数据大小,默认50M
properties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG,50 * 1024 * 1024);
//一批次数据,未达到最小数据大小时候,最大等待时间.默认500ms
properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG,500);
//单次调用 poll() 返回的最大记录数,默认500
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,500);
KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<>(properties);
//A. 设置订阅的topic 列表,从所有数据分区读取数据
List<String> topicList = new ArrayList<>();
topicList.add(topic);
kafkaConsumer.subscribe(topicList);
//B. 设置订阅topic,并指定分区
// List topicPartitions = new ArrayList<>();
// topicPartitions.add(new TopicPartition(topic,0));
// kafkaConsumer.assign(topicPartitions);
try {
while (true){
ConsumerRecords<String,String> records = kafkaConsumer.poll(Duration.ofSeconds(2L));
for (ConsumerRecord<String,String> record : records) {
System.out.println("topic:"+record.topic()+"|partition:"+record.partition()+"|数据:"+record.value());
}
}
}catch(Exception e){
log.error(e.toString(),e);
}finally {
kafkaConsumer.close();
}
}
public static void main(String[] args) {
new KafkaCustomer().kafkaDemo();;
}