摘要:本文讲述如何在保存Kafka特有能力的情况下给Kafka扩充一个具有能处理延时消息场景的能力。
本文分享自华为云社区《Kafka也能实现消息延时了?》,作者:HuaweiCloudDeveloper 。
Kafka是一个拥有高吞吐、可持久化、可水平扩展,支持流式数据处理等多种特性的分布式消息流处理中间件,采用分布式消息发布与订阅机制,在日志收集、流式数据传输、在线/离线系统分析、实时监控等领域有广泛的应用,Kafka它虽有以上这么多的应用场景和优点,但也具备其缺陷,比如在延时消息场景下,Kafka就不具备这种能力,因此希望能在保存Kafka特有能力的情况下给Kafka扩充一个具有能处理延时消息场景的能力。

分布式消息服务Kafka版:MySQL是目前最受欢迎的开源数据库之一,其性能卓越,搭配LAMP(Linux + Apache + MySQL + Perl/PHP/Python),成为WEB开发的高效解决方案。 云数据库 RDS for MySQL拥有稳定可靠、安全运行、弹性伸缩、轻松管理、经济实用等特点。
此方案实现,需要借助两个Topic来进行实现,一个Topic用于及时接收生产者们所产生的消息,另一个Topic则用于消费者拉取消息进行消费。另外在这两个Topic之间加上一个队列用于做延时的逻辑判断,如果消息满足了延时的条件,则将队列中的消息生产至我们的消费者需要拉取的Topic中。
Kafka消息延时方案架构图

Kafka消息延时实现思路
Kafka消息延时方案时序图

本项目中起到延时作用的类Delay.java其余类为官方提供用于测试生产和消费消息,如需使用官方测试的使用的生产消费代码相关配置介绍可以参考https://support.huaweicloud.com/devg-kafka/how-to-connect-kafka.html 。如需使用自己配置的生产者消费者,只配置Delay.java中的参数即可。
实现代码可参考Kafka消息延时
- package com.dms.delay;
-
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import org.apache.kafka.clients.producer.KafkaProducer;
- import org.apache.kafka.clients.producer.Producer;
- import org.apache.kafka.clients.producer.ProducerRecord;
-
- import java.time.Duration;
- import java.util.Arrays;
- import java.util.Date;
- import java.util.Properties;
- import java.util.concurrent.ConcurrentLinkedQueue;
-
- /**
- * Hello world!
- *
- */
- public class Delay {
-
- //缓存队列
- public static ConcurrentLinkedQueue<ConsumerRecord<String, String>> link = new ConcurrentLinkedQueue();
- //延迟时间(20秒),可根据需要设置延迟大小
- public static long delay = 20000L;
-
- /**
- *入口
- * @param args
- */
- public static void main( String[] args )
- {
- //延时主题(用于控制延时缓冲)
- String topic_delay = "topic_delay";
- //输出主题(直接供消费者消费)
- String topic_out = "topic_out";
- /*
- 消费线程
- */
- new Thread(new Runnable() {
- @Override
- public void run() {
- //消费者配置。请根据需要自行设置Kafka配置
- Properties props = new Properties();
- props.setProperty("bootstrap.servers", "192.168.0.59:9092,192.168.0.185:9092,192.168.0.4:9092");
- props.setProperty("group.id", "test");
- props.setProperty("enable.auto.commit", "true");
- props.setProperty("auto.commit.interval.ms", "1000");
- props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- //创建消费者
- KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
- //指定消费主题
- consumer.subscribe(Arrays.asList(topic_delay));
- while (true) {
- //轮询消费
- ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10));
- //遍历当前轮询批次拉取到的消息
- for (ConsumerRecord<String, String> record : records){
- System.out.println(record);
- //将消息添加到缓存队列
- link.add(record);
- }
- }
- }
- }).start();
- /*
- 生产线程
- */
- new Thread(new Runnable() {
- @Override
- public void run() {
- //生产者配置(请根据需求自行配置)
- Properties props = new Properties();
- props.put("bootstrap.servers", "192.168.0.59:9092,192.168.0.185:9092,192.168.0.4:9092");
- props.put("linger.ms", 1);
- props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- //创建生产者
- Producer<String, String> producer = new KafkaProducer<>(props);
- //持续从缓存队列中获取消息
- while(true){
- //如果缓存队列为空则放缓取值速度
- if(link.isEmpty()){
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- continue;
- }
- //获取缓存队列栈顶消息
- ConsumerRecord<String, String> record = link.peek();
- //获取该消息时间戳
- long timestamp = record.timestamp();
- Date now = new Date();
- long nowTime = now.getTime();
- if(timestamp+ Delay.delay <nowTime){
- //获取消息值
- String value = record.value();
- //生产者发送消息到输出主题
- producer.send(new ProducerRecord<String, String>(topic_out, "",value));
- //从缓存队列中移除该消息
- link.poll();
- }else {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
- }).start();
- }
- }
