原文链接:
Kafka Tutorial: Creating a Kafka Producer in Java (cloudurable.com)
Kafka Tutorial: Creating a Kafka Consumer in Java (cloudurable.com)
承接着上文我们配置好了Kafka之后,我们现在用java 来实现向Kafka里发送message 和消费message。
首先Kafka的依赖我们需要加一下:
添加kafka-clients依赖就可以了。其他的jar,会自动download 下来。
<artifactId>kafka-clients
|
第一步就是要创建一个Producer.创建一个Producer,我们需要知道我们要连的kafka 的bootstrap server和要往哪个topic里面发送消息。这里假设我们的服务器有3台:"localhost:9092,localhost:9093,localhost:9094"
Topic我们来mock一个:"my-example-topic"
下面是我们创建KafkaProducer的代码。
private static Producer Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); props.put(ProducerConfig.CLIENT_ID_CONFIG,"KafkaProducer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName()); return new org.apache.kafka.clients.producer.KafkaProducer<>(props); } |
这里用到了序列化,这里Kafka 的key 和value 都需要序列化,这样才能持久化到硬盘。这里我们使用的序列化类是:StringSerializer和LongSerializer.
第二步就是我们开始往Topic里发送消息。
使用如下的代码:
static void runProducer(final int sendMessageCount) throws Exception { final Producer long time = System.currentTimeMillis(); try { for (long index = time; index < time + sendMessageCount; index++) { final ProducerRecord RecordMetadata metadata = producer.send(record).get(); long elapsedTime = System.currentTimeMillis() - time; System.out.printf("sent record(key=%s value=%s " + "meta(partition=%d, offset=%d) time=%d \n", record.key(),record.value(),metadata.partition(), metadata.offset(),elapsedTime); LOGGER.info("sent record(key=" + record.key() + ", value=%s " + record.value() + "), meta(partition= " + metadata.partition() + ", offset=" + metadata.offset() + ", time= " +elapsedTime + ")"); } } finally { producer.flush(); producer.close(); } } |
然后run一个main 方法就可以运行了。
接下来我们来写消费端。
第一步:同样的,我们也要有一个KafkaConsumer。定义如下:
private static Consumer final Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaExampleConsumer"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // Create the consumer using props. final Consumer new KafkaConsumer<>(props); // Subscribe to the topic. consumer.subscribe(Collections.singletonList(TOPIC)); return consumer; } |
跟生产端一样,这里我们使用了StringDeserializer和LongDeserializer来反序列化Value和Key。
第二步:有了KafkaConsumer,我们就可以消费消息了。
static void runConsumer() { final Consumer final int giveUp = 100; int noRecordsCount = 0; while (true) { final ConsumerRecords consumer.poll(1000); if(consumerRecords.count() == 0) { noRecordsCount ++; if(noRecordsCount > giveUp) break; else continue; } // System.out.printf("Consumer Record:(%d, %s, %d, %d)\n", record.key(), record.value(),record.partition(), record.offset()); consumerRecords.forEach(record -> { LOGGER.info("Consumer Record:(%d, %s, %d, %d)\n", record.key(), record.value(),record.partition(), record.offset()); }); consumer.commitSync(); } consumer.close(); System.out.printf("Done."); } |
这样我们有了生产者,消费者,也有了Kafka Server.我们就可以做一个回路测试了。
消费者: