• Kafka在SpringBoot中的实践


    Kafka作为一种高吞吐量的分布式发布订阅消息系统,目前已经越来越被广泛的应用。这里介绍下如何在Spring Boot下集成、应用

    abstract.png

    环境搭建

    我们使用Docker来进行实践,其中本机IP为192.168.2.101。其实这里还需要一个ZooKeeper实例用于保存元数据,这里就不赘述ZooKeeper相关配置了

    1. # 拉取镜像
    2. docker pull wurstmeister/kafka
    3. # 创建容器  
    4. docker run \
    5.  -e KAFKA_BROKER_ID=1 \
    6.  -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \ # KafKa监听端口
    7.  -e KAFKA_ZOOKEEPER_CONNECT=192.168.2.101:2181/kafka \ # ZooKeeper地址、端口
    8.  -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.2.101:9092 \ # 暴露给客户端的地址、端口信息
    9.  -d -p 9092:9092 \
    10.  --name myKafka \
    11.  wurstmeister/kafka

    由于wurstmeister/kafka镜像使用Alpine Linux作为基础镜像环境,其使用的UTC时间与我们本地时间(北京时间)有8个小时的时差。故这里我们介绍下如何在Alpine Linux设置正确的时区,利用docker exec命令进入容器,然后进行如下设置

    1. # 安装 时区数据
    2. apk add tzdata
    3. # 查看 亚洲可用的时区
    4. ls /usr/share/zoneinfo/Asia
    5. # 复制 亚洲/上海 时区 到 /etc/localtime 下
    6. cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime
    7. # 设置时区为 亚洲/上海
    8. echo "Asia/Shanghai" > /etc/timezone
    9. # 查看当前时间,验证是否生效
    10. date -R

    一切配置好了,现在我们通过命令脚本来验证下看看Kafka是否可以正常工作

    生产者操作如下

    1. # 进入容器
    2. docker exec -it myKafka /bin/bash
    3. # 切换目录
    4. cd /opt/kafka/bin
    5. # 使用kafka-console-producer.sh脚本 生产消息
    6. ./kafka-console-producer.sh --broker-list localhost:9092 --topic test
    7. # 生产消息1
    8. one
    9. # 生产消息2
    10. hell world

    消费者操作如下

    1. # 进入容器
    2. docker exec -it myKafka /bin/bash
    3. # 切换目录
    4. cd /opt/kafka/bin
    5. # 使用kafka-console-consumer.sh脚本 消费消息
    6. ./kafka-console-consumer.sh --broker-list localhost:9092 --topic test

    效果如下所示,符合预期

    figure 1.jpeg

    SpringBoot集成Kafka

    依赖

    SpringBoot下使用Kafka很方便,直接添加依赖即可。

    1. <dependency>
    2.     <groupId>org.springframework.kafkagroupId>
    3.     <artifactId>spring-kafkaartifactId>
    4.     <version>2.6.4version>
    5. dependency>

    这里我们需要注意SpringBoot与spring-kafka之间的版本兼容性,具体地可以参考官网(https://spring.io/projects/spring-kafka ),下图红框、蓝框分别为spring-kafka、SpringBoot的版本要求。这里,我们的SpringBoot版本为2.4.1

    figure 2.jpeg

    其实,关于spring-kafka版本的选择问题还有一个小技巧,我们可以在POM依赖中不指定spring-kafka的版本信息,这样其会自动选择合适的版本

    配置

    在 application.properties 中添加相关的必要配置

    1. # Kafka
    2. # Kafka 地址、端口
    3. spring.kafka.bootstrap-servers=127.0.0.1:9092
    4. # 自定义Kafka分区器
    5. spring.kafka.producer.properties.partitioner.class=com.aaron.SpringBoot1.Kafka.KafkaPartitioner
    6. # 生产者 key、value的序列化器
    7. spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
    8. spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
    9. # 消费者 key、value的反序列化器
    10. spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    11. spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

    实践

    手动声明一个Topic,并设置分区数为4

    1. @Configuration
    2. public class KafkaConfig {
    3.     public static final String TOPIC_ALARM_IN = "topic_alarm_in";
    4.     /**
    5.      * 声明Topic,设置其分区数为4
    6.      * @return
    7.      */
    8.     @Bean
    9.     public NewTopic topic1() {
    10.         return TopicBuilder.name(TOPIC_ALARM_IN)
    11.                 .partitions(4)
    12.                 .build();
    13.     }
    14. }

    在上文的配置中,我们定义了一个自定义的Kafka分区器。我们需要实现Partitioner接口,在partition方法中实现我们的分区逻辑。如下所示

    1. /**
    2.  * 自定义Kafka分区器
    3.  */
    4. public class KafkaPartitioner implements Partitioner {
    5.     @Override
    6.     public void configure(Map<String, ?> configs) {
    7.     }
    8.     @Override
    9.     public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    10.         // 获取该主题的分区数量
    11.         int size = cluster.partitionsForTopic(topic).size();
    12.         // 分区号
    13.         int index = -1;
    14.         switch ( (String) key) {
    15.             case "996":
    16.                 index = 1;
    17.                 break;
    18.             case "247":
    19.                 index = 2;
    20.                 break;
    21.             case "965":
    22.                 index = 3;
    23.                 break;
    24.             default:
    25.                 index = 0;
    26.         }
    27.         return index;
    28.     }
    29.     @Override
    30.     public void close() {
    31.     }
    32. }

    消息的生产者就比较简单了,我们直接使用kafkaTemplate发送即可,这里我们简单地对其进行了封装

    1. @Component
    2. public class Producer {
    3.     @Autowired
    4.     private KafkaTemplate<StringString> kafkaTemplate;
    5.     @Autowired
    6.     private KafkaSendResultHandler kafkaSendResultHandler;
    7.     public void sendMsg(String topic, String key, String value) {
    8.         try{
    9.             // 设置消息发送结果的回调
    10.             kafkaTemplate.setProducerListener(kafkaSendResultHandler);
    11.             kafkaTemplate.send(topic, key, value);
    12.         }catch (Exception e) {
    13.             e.printStackTrace();
    14.         }
    15.     }
    16. }

    其中,send方法会返回一个Future,可进一步地通过get方法获取发送结果。但我们既希望能够了解消息发送是否成功,又不希望被阻塞。幸好Kafka提供了一个回调接口用于处理发送结果,KafkaSendResultHandler实现如下所示

    1. @Component
    2. public class KafkaSendResultHandler implements ProducerListener {
    3.     @Override
    4.     public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {
    5.         String info = "[发送成功]: ";
    6.         String resultStr = buildResult(producerRecord);
    7.         System.out.println( info + resultStr );
    8.     }
    9.     @Override
    10.     public void onError(ProducerRecord producerRecord, RecordMetadata recordMetadata, Exception exception) {
    11.         String info = "[发送失败]: ";
    12.         String resultStr = buildResult(producerRecord);
    13.         System.out.println( info + resultStr );
    14.         exception.printStackTrace();
    15.         System.out.println();
    16.     }
    17.     private String buildResult(ProducerRecord<StringString> producerRecord) {
    18.         String topic = producerRecord.topic();
    19.         String key = producerRecord.key();
    20.         String value = producerRecord.value();
    21.         String str = : " + topic + ", : " + key + ",  :" + value;
    22.         return str;
    23.     }
    24. }

    这里为了简便,使用一个Controller用于控制消息的发送,并将id作为key

    1. @RequiredArgsConstructor(onConstructor = @__(@Autowired))
    2. @Controller
    3. @ResponseBody
    4. @RequestMapping("Kafka")
    5. public class KafkaController {
    6.     @Autowired
    7.     private Producer producer;
    8.     @RequestMapping("/saveAlarmIn")
    9.     public String test1(@RequestBody AlarmIn alarmIn) {
    10.         System.out.println("\n------------------------------------");
    11.         String topic = TOPIC_ALARM_IN;
    12.         String key = alarmIn.getId().toString();
    13.         try {
    14.             String jsonStr = new ObjectMapper().writeValueAsString(alarmIn);
    15.             producer.sendMsg(topic, key, jsonStr);
    16.         } catch (Exception e) {
    17.             e.printStackTrace();
    18.         }
    19.         return "OK";
    20.     }
    21. }
    22. ...
    23. /**
    24.  * 进入告警
    25.  */
    26. @Data
    27. @AllArgsConstructor
    28. @NoArgsConstructor
    29. @Builder
    30. public class AlarmIn {
    31.     /**
    32.      * ID
    33.      */
    34.     private Integer id;
    35.     /**
    36.      * 进入告警的人员姓名
    37.      */
    38.     private String personName;
    39.     /**
    40.      * 进入告警的区域名称
    41.      */
    42.     private String areaName;
    43.     /**
    44.      * 告警级别
    45.      */
    46.     private Integer level;
    47. }

    通过@KafkaListener注解即可实现消息的监听消费,具体地可通过topics、groupId等属性设置主题、消费者群组名等信息

    1. @Component
    2. public class Consumer {
    3.     @KafkaListener(topics = TOPIC_ALARM_IN, groupId = "myGroup1" )
    4.     public void g1c1(ConsumerRecord<StringString> record) {
    5.         AlarmIn alarmIn = parseAlarmIn(record);
    6.         int index = record.partition();
    7.         System.out.println("[myGroup1] : alarmIn: " + alarmIn + ", partition: " + index);
    8.     }
    9.     @KafkaListener(topics = TOPIC_ALARM_IN, groupId = "myGroup1" )
    10.     public void g1c2(ConsumerRecord<StringString> record) {
    11.         AlarmIn alarmIn = parseAlarmIn(record);
    12.         int index = record.partition();
    13.         System.out.println("[myGroup1] : alarmIn: " + alarmIn + ", partition: " + index);
    14.     }
    15.     @KafkaListener(topics = TOPIC_ALARM_IN, groupId = "myGroup2" )
    16.     public void g2c3(ConsumerRecord<StringString> record) {
    17.         AlarmIn alarmIn = parseAlarmIn(record);
    18.         int index = record.partition();
    19.         System.out.println("[myGroup2] : alarmIn: " + alarmIn + ", partition: " + index);
    20.     }
    21.     /**
    22.      * 解析进入告警信息
    23.      * @param record
    24.      * @return
    25.      */
    26.     private AlarmIn parseAlarmIn(ConsumerRecord<StringString> record) {
    27.         String key = record.key();
    28.         String value = record.value();
    29.         AlarmIn alarmIn = null;
    30.         try {
    31.             alarmIn = new ObjectMapper().readValue(value, AlarmIn.class);
    32.         } catch (Exception e) {
    33.             e.printStackTrace();
    34.         }
    35.         return alarmIn;
    36.     }
    37. }

    测试结果如下,符合预期

    figure 3.jpeg

    Note

    1. Kafka将一个消费群组内的所有消费者视为同一个整体,他们均订阅同一个主题。一条消息只会被群组内的一个消费者进行消费。但消费者组之间不会相互影响,换言之,如果另外一个消费者组也订阅了该主题,其同样也会收到该消息并进行处理。上文的测试结果也佐证了这一点

    2. Kafka中一个主题Topic虽然可以拥有多个分区,但一个分区不能被同一个消费者群组下的多个消费者进行消费。所以当 某个消费者群组中消费者的数量 多于 其订阅的主题Topic的分区数 时,该群组多出来的消费者只会被闲置、浪费

    figure 4.jpeg

    1. 上文我们自定义了一个Kafka的分区器,事实上这并不是必须的。一方面我们在发送时可以显式地将消息发送到指定的分区;另一方面,如果发送时未直接指定分区,Kafka也会使用默认的分区器进行分区。具体地,如果key不为null, 默认分区器则通过哈希计算来保证相同key键的消息可以被映射到同一个分区下;如果key为null,默认分区器则使用轮询算法将消息均衡地分布到各个分区

    参考文献

    1. Kafka权威指南 Neha Narkhede/Gwen Shapira/Todd Palino著

  • 相关阅读:
    Gateway Timeout504: 网关超时的完美解决方法
    ubuntu kill命令使用方法极简
    setTimeout引发的刨根问底
    Ubuntu22.04 + ROS2 Humble配置Moveit2环境
    CSS篇九
    C语言实现姿态四元数转欧拉角
    Worthington细胞分离技术丨基本原代细胞分离方法和材料
    搜索数据库中的一行信息
    Java枚举类 (详细解析java中的枚举类深入浅出)
    Debian10Standard无网络安装后,设置静态IP,安装openssh-server 221024记录
  • 原文地址:https://blog.csdn.net/lililidahaoren/article/details/126121180