• OUT了吧,Kafka能实现消息延时了


    摘要:本文讲述如何在保存Kafka特有能力的情况下给Kafka扩充一个具有能处理延时消息场景的能力。

    本文分享自华为云社区《Kafka也能实现消息延时了?》,作者:HuaweiCloudDeveloper 。

    1、背景

    Kafka是一个拥有高吞吐、可持久化、可水平扩展,支持流式数据处理等多种特性的分布式消息流处理中间件,采用分布式消息发布与订阅机制,在日志收集、流式数据传输、在线/离线系统分析、实时监控等领域有广泛的应用,Kafka它虽有以上这么多的应用场景和优点,但也具备其缺陷,比如在延时消息场景下,Kafka就不具备这种能力,因此希望能在保存Kafka特有能力的情况下给Kafka扩充一个具有能处理延时消息场景的能力。

    2、开发环境

    3、云服务介绍

    分布式消息服务Kafka版:MySQL是目前最受欢迎的开源数据库之一,其性能卓越,搭配LAMP(Linux + Apache + MySQL + Perl/PHP/Python),成为WEB开发的高效解决方案。 云数据库 RDS for MySQL拥有稳定可靠、安全运行、弹性伸缩、轻松管理、经济实用等特点。

    4、方案设计

    i、方案简述

    此方案实现,需要借助两个Topic来进行实现,一个Topic用于及时接收生产者们所产生的消息,另一个Topic则用于消费者拉取消息进行消费。另外在这两个Topic之间加上一个队列用于做延时的逻辑判断,如果消息满足了延时的条件,则将队列中的消息生产至我们的消费者需要拉取的Topic中。

    ii、方案架构图

    Kafka消息延时方案架构图

    Kafka消息延时实现思路

    1. 生产者将生产消息存入topic_delay主题中进行存储。
    2. 将topic_delay主题中的所有消息拉取至ConcurrentLinkedQueue队列中。
    3. 取值判断是否满足延时要求。
      a. 如果满足延时要求,则将消息生产至topic_out主题中,并将queue队列中的值移除。
      b. 如果不满足延时要求,则等待自定义时间后重试判断。
    4. 消费者最终从topic_out主题中拉取消息进行消费。

    iii、方案时序图

    Kafka消息延时方案时序图

    5、代码参数指南

    本项目中起到延时作用的类Delay.java其余类为官方提供用于测试生产和消费消息,如需使用官方测试的使用的生产消费代码相关配置介绍可以参考https://support.huaweicloud.com/devg-kafka/how-to-connect-kafka.html 。如需使用自己配置的生产者消费者,只配置Delay.java中的参数即可。

    Delay.java参数详情

    1. delay:自定义延时时间,单位ms。
    2. topic_delay变量:用于临时存储消息的topic名称。
    3. topic_out变量:用于消费者拉取消息消费的topic名称。
    4. 关于消费者和生产者配置可按需配置,可参考Kafka官方文档:https://kafka.apache.org/documentation/#producerconfigs

    6、代码实现

    实现代码可参考Kafka消息延时

    1. package com.dms.delay;
    2. import org.apache.kafka.clients.consumer.ConsumerRecord;
    3. import org.apache.kafka.clients.consumer.ConsumerRecords;
    4. import org.apache.kafka.clients.consumer.KafkaConsumer;
    5. import org.apache.kafka.clients.producer.KafkaProducer;
    6. import org.apache.kafka.clients.producer.Producer;
    7. import org.apache.kafka.clients.producer.ProducerRecord;
    8. import java.time.Duration;
    9. import java.util.Arrays;
    10. import java.util.Date;
    11. import java.util.Properties;
    12. import java.util.concurrent.ConcurrentLinkedQueue;
    13. /**
    14. * Hello world!
    15. *
    16. */
    17. public class Delay {
    18. //缓存队列
    19. public static ConcurrentLinkedQueue<ConsumerRecord<String, String>> link = new ConcurrentLinkedQueue();
    20. //延迟时间(20秒),可根据需要设置延迟大小
    21. public static long delay = 20000L;
    22. /**
    23. *入口
    24. * @param args
    25. */
    26. public static void main( String[] args )
    27. {
    28. //延时主题(用于控制延时缓冲)
    29. String topic_delay = "topic_delay";
    30. //输出主题(直接供消费者消费)
    31. String topic_out = "topic_out";
    32. /*
    33. 消费线程
    34. */
    35. new Thread(new Runnable() {
    36. @Override
    37. public void run() {
    38. //消费者配置。请根据需要自行设置Kafka配置
    39. Properties props = new Properties();
    40. props.setProperty("bootstrap.servers", "192.168.0.59:9092,192.168.0.185:9092,192.168.0.4:9092");
    41. props.setProperty("group.id", "test");
    42. props.setProperty("enable.auto.commit", "true");
    43. props.setProperty("auto.commit.interval.ms", "1000");
    44. props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    45. props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    46. //创建消费者
    47. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    48. //指定消费主题
    49. consumer.subscribe(Arrays.asList(topic_delay));
    50. while (true) {
    51. //轮询消费
    52. ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10));
    53. //遍历当前轮询批次拉取到的消息
    54. for (ConsumerRecord<String, String> record : records){
    55. System.out.println(record);
    56. //将消息添加到缓存队列
    57. link.add(record);
    58. }
    59. }
    60. }
    61. }).start();
    62. /*
    63. 生产线程
    64. */
    65. new Thread(new Runnable() {
    66. @Override
    67. public void run() {
    68. //生产者配置(请根据需求自行配置)
    69. Properties props = new Properties();
    70. props.put("bootstrap.servers", "192.168.0.59:9092,192.168.0.185:9092,192.168.0.4:9092");
    71. props.put("linger.ms", 1);
    72. props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    73. props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    74. //创建生产者
    75. Producer<String, String> producer = new KafkaProducer<>(props);
    76. //持续从缓存队列中获取消息
    77. while(true){
    78. //如果缓存队列为空则放缓取值速度
    79. if(link.isEmpty()){
    80. try {
    81. Thread.sleep(2000);
    82. } catch (InterruptedException e) {
    83. e.printStackTrace();
    84. }
    85. continue;
    86. }
    87. //获取缓存队列栈顶消息
    88. ConsumerRecord<String, String> record = link.peek();
    89. //获取该消息时间戳
    90. long timestamp = record.timestamp();
    91. Date now = new Date();
    92. long nowTime = now.getTime();
    93. if(timestamp+ Delay.delay <nowTime){
    94. //获取消息值
    95. String value = record.value();
    96. //生产者发送消息到输出主题
    97. producer.send(new ProducerRecord<String, String>(topic_out, "",value));
    98. //从缓存队列中移除该消息
    99. link.poll();
    100. }else {
    101. try {
    102. Thread.sleep(1000);
    103. } catch (InterruptedException e) {
    104. e.printStackTrace();
    105. }
    106. }
    107. }
    108. }
    109. }).start();
    110. }
    111. }

    7、结果反馈

    点击关注,第一时间了解华为云新鲜技术~

  • 相关阅读:
    go 端口转发 代理V2 --chatGPT
    前端面试--贡献给刚毕业的你们
    再见,Visual Basic——曾经风靡一时的编程语言
    [环境]Ubuntu20.04安装Ceres
    你还不会判定表和因果图法的概念及运用?一篇文章教会你
    后端工程师之路(8)-springboot
    Linux 系统为何产生大量的 core 文件?
    网络编程
    Fabric.js 复制粘贴元素
    Linux下查找JDK默认安装路径
  • 原文地址:https://blog.csdn.net/devcloud/article/details/125502953