• SpringBoot整合Kafka的快速使用教程


           

    目录

    一、引入Kafka的依赖

    二、配置Kafka

    三、创建主题

    1、自动创建(不推荐)

    2、手动动创建

    四、生产者代码

    五、消费者代码  

    六、常用的KafKa的命令


            Kafka是一个高性能、分布式的消息发布-订阅系统,被广泛应用于大数据处理、实时日志分析等场景。Spring Boot作为目前最流行的Java开发框架之一,其简洁的配置和丰富的工具使得与Kafka的集成变得更加容易。本文将介绍如何使用Spring Boot整合Kafka,实现高效的数据处理和消息传递。

    一、引入Kafka的依赖

           
                org.springframework.cloud
                spring-cloud-starter-stream-kafka
           

    二、配置Kafka

    1. spring:
    2. kafka:
    3. bootstrap-servers: 156.65.20.76:9092,156.65.20.77:9092,156.65.20.78:9092 #指定Kafka集群的地址,这里有三个地址,用逗号分隔。
    4. listener:
    5. ack-mode: manual_immediate #设置消费者的确认模式为manual_immediate,表示消费者在接收到消息后立即手动确认。
    6. concurrency: 3 #设置消费者的并发数为3
    7. missing-topics-fatal: false #设置为false,表示如果消费者订阅的主题不存在,不会抛出异常。
    8. producer:
    9. key-serializer: org.apache.kafka.common.serialization.StringSerializer # 设置消息键的序列化器
    10. value-serializer: org.apache.kafka.common.serialization.StringSerializer #设置消息值的序列化器
    11. acks: 1 #一般就是选择1,兼顾可靠性和吞吐量 ,如果想要更高的吞吐量设置为0,如果要求更高的可靠性就设置为-1
    12. consumer:
    13. auto-offset-reset: earliest #设置为"earliest"表示将从最早的可用消息开始消费,即从分区的起始位置开始读取消息。
    14. enable-auto-commit: false #禁用了自动提交偏移量的功能,为了避免出现重复数据和数据丢失,一般都是手动提交
    15. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 设置消息键的反序列化器
    16. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #设置消息值的反序列化器

    注:kafka的acks有三个值,可以根据实际情况和需求平衡消息系统的吞吐量和数据安全性,来选择对应的值。

    • acks=0:这是最不可靠的模式。当设置为acks=0时,生产者在发送消息后不会等待任何服务器端的确认响应。这种模式下,生产者可以迅速继续发送下一批消息,效率最高,但风险也最大。如果在此模式下发生网络问题或broker故障,发送的消息可能会永久丢失,生产者无法得知消息是否成功到达Kafka broker。因此,这种配置适合于能够容忍少量数据丢失的场景,例如实时数据分析或生成非关键的实时报表。
    • acks=1:这是默认的配置模式,也是一种折衷方案。在这种模式下,生产者会等待分区的领导者节点(leader)确认消息已经成功写入磁盘,才会发送确认信息给生产者。这提高了数据的安全性,因为只要领导者节点保存了消息,即使跟随者(replicas)没有及时同步,消息也不会丢失。然而,如果领导者在同步给所有追随者之前崩溃,那么尚未同步的副本将无法获取该消息,仍然存在消息丢失的风险。
    • acks=all或-1:这是最可靠的模式。在这个模式下,生产者不仅需要领导者节点确认,还会等待所有同步副本(In-sync replicas, ISR)都确认写入消息后才会收到确认。这极大地增强了数据的持久性保证,确保了即使在多个节点故障的情况下,消息也不会丢失。此模式适用于数据可靠性要求非常高的场景,如金融交易系统或重要的日志记录

    三、创建主题

        1、自动创建(不推荐)

             不存在的主题,会自动创建,分区数和副本数均为默认值。而默认值可能会不符合某些场景的要求。

    在kafka的安装目录conf目录下找到该配置文件server.properties,添加如下配置:
    num.partitions=3 #默认3个分区
    auto.create.topics.enable=true #开启自动创建主题
    default.replication.factor=3 #默认3个副本

        2、手动动创建

             在kafka的安装目录bin目录下,执行如下命令: 

    //创建一个有三个分区和三个副本,名为zhuoye的主题
    ./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3  --topic zhuoye 

    四、生产者代码

    1. @Slf4j
    2. @Component
    3. public class ALiYunServiceImpl implents IALiYunService {
    4. @Autowired
    5. private KafkaTemplate kafkaTemplate;
    6. @Autowired
    7. private ExecutorService executorService;
    8. String topicName = "zhuoye";
    9. @Override
    10. public void queryECSMetricInfo() {
    11. //发送到kafka的消息集合,因为使用了多线程,并且在多线程中往该集合进行添加操作,所以需要线程安全的
    12. List messages = Collections.synchronizedList(new ArrayList<>());
    13. boolean flag = true;
    14. //获取上次查询时间
    15. Long startTime = Long.valueOf(queryTimeRecordMapper.selectTimeByBelongId(3)) * 1000;
    16. Long endTime = System.currentTimeMillis();
    17. try {
    18. //查询出所有的运行中的实例
    19. List cloudInstances = cloudInstanceAssetMapper.queryAllRunningInstance(1, "Running");
    20. if (CollectionUtils.isEmpty(cloudInstances)) {
    21. return;
    22. }
    23. //定义计数器
    24. CountDownLatch latch = new CountDownLatch(cloudInstances.size());
    25. //遍历查询
    26. for (CloudInstanceAssetDto instance : cloudInstances) {
    27. executorService.submit(() -> {
    28. try {
    29. //获取内网流出带宽,并将结果封装到消息集合中
    30. dealMetricDataToMessage(ALiYunConstant.ECS_INTRANET_OUT_RATE, ALiYunConstant.INTRANET_OUT_RATE_NAME, ALiYunConstant.LW_INTRANET_OUT_RATE_CODE,
    31. startTime, endTime, instance, messages);
    32. } catch (Exception e) {
    33. log.error("获取ECS的指标数据-多线程处理任务异常!", e);
    34. } finally {
    35. latch.countDown();
    36. }
    37. });
    38. }
    39. //等待任务执行完毕
    40. latch.await();
    41. //将最终的消息集合发送到kafka
    42. if (CollectionUtils.isNotEmpty(messages)) {
    43. for (int i = 0; i < messages.size(); i++) {
    44. if (StringUtils.isNotBlank(messages.get(i).getValue())
    45. && "noSuchInstance".equals(messages.get(i).getValue())) {
    46. continue;
    47. }
    48. kafkaTemplate.send(topicName, messages.get(i));
    49. }
    50. }
    51. } catch (Exception e) {
    52. flag = false;
    53. log.error("获取ECS的指标数据失败", e);
    54. }
    55. //更新记录上次查询时间
    56. if (flag) {
    57. QueryTimeRecord queryTimeRecord = new QueryTimeRecord();
    58. queryTimeRecord.setBelongId(3).setLastQueryTime(String.valueOf((endTime - 1000 * 60 * 1) / 1000)); //开始时间往前推1分钟
    59. queryTimeRecordMapper.updateByBelongId(queryTimeRecord);
    60. }
    61. }

    这个时候,如果你想看有没有把消息发送到kafka的指定主题可以使用如下命令:

    kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic zhuoye 

    五、消费者代码  

    1. @Slf4j
    2. @Component
    3. public class KafkaConsumer {
    4. // 消费监听
    5. @KafkaListener(topics = "zhuoye",groupId ="zhuoye-aliyunmetric")
    6. public void consumeExtractorChangeMessage(ConsumerRecord record, Acknowledgment ack){
    7. try {
    8. String value = record.value();
    9. //处理数据,存入openTsDb
    10. .................
    11. ................
    12. ack.acknowledge();//手动提交
    13. }catch (Exception e){
    14. log.error("kafa-topic【zhuoye】消费阿里云指标源消息【失败】");
    15. log.error(e.getMessage());
    16. }
    17. }
    18. }

    六、常用的KafKa的命令

    //创建主题
    ./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3  --topic zhuoye
    //查看kafka是否接收对应的消息
     kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic zhuoye
    // 修改kafka-topic分区数
    ./kafka-topics.sh --zookeeper localhost:2181 -alter --partitions 6 --topic zhuoye
    // 查看topic分区数
    ./kafka-topics.sh --zookeeper localhost:2181 --describe --topic zhuoye
    // 查看用户组消费情况
    ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group zhuoye-aliyunmetric --describe

  • 相关阅读:
    知识产权与标准化
    Matlab GUI编程技巧(十八)【实战一】:自定义线条图像绘制
    课时01 TOGAF9.2导入- 课前介绍-课程介绍
    【Spring框架】Spring概述及基本应用
    LeakCanary 源码详解(2)
    Spring系列15:Environment抽象
    Spring MVC视图解析器
    Windows下使用MySQL二进制包安装MySQL5.7
    mysql XA 分布式事务
    谱聚类原理及Python实现
  • 原文地址:https://blog.csdn.net/weixin_50348837/article/details/139250156