• Spring Boot集成kafka的相关配置


    引入依赖:

    额外依赖只需要这一个,kafka-client 不是springboot 的东西,那是原生的 kafka 客户端, kafka-test也不需要,是用代码控制broker的东西。

    1. <dependency>
    2. <groupId>org.springframework.kafka</groupId>
    3. <artifactId>spring-kafka</artifactId>
    4. </dependency>

    yml配置:

    也可以用java类Config 方式配置,如果没有特殊要求,可以只用spring配置的方式

    1. server:
    2. port: 8080
    3. spring:
    4. kafka:
    5. # Kafka服务器,支持集群
    6. bootstrap-servers: 127.0.0.1:9092,127.0.0.2:9092
    7. # 生产者配置
    8. producer:
    9. # 消息发送重试次数,注意会引起重复消费,消费者需要做幂等
    10. retries: 3
    11. # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
    12. # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
    13. # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
    14. # 开启事务时,必须设置为all
    15. acks: all
    16. # 当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
    17. batch-size: 16384
    18. # 生产者内存缓冲区的大小。
    19. buffer-memory: 1024000
    20. # 键的序列化方式
    21. key-serializer: org.apache.kafka.common.serialization.StringSerializer
    22. # 值的序列化方式
    23. value-serializer: org.apache.kafka.common.serialization.StringSerializer
    24. # 消费者配置
    25. consumer:
    26. # 消费组ID,同一个消费组不会重复消费数据
    27. group-id: testGroup
    28. # 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
    29. #auto-commit-interval: 1S
    30. # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
    31. # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费分区的记录
    32. # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据(在消费者启动之后生成的记录)
    33. # none:当各分区都存在已提交的offset时,从提交的offset开始消费;只要有一个分区不存在已提交的offset,则抛出异常
    34. auto-offset-reset: latest
    35. # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
    36. enable-auto-commit: false
    37. # 键的序列化方式
    38. key-serializer: org.apache.kafka.common.serialization.StringSerializer
    39. # 值的序列化方式
    40. value-serializer: org.apache.kafka.common.serialization.StringSerializer
    41. # 这个参数定义了poll方法最多可以拉取多少条消息,默认值为500。如果在拉取消息的时候新消息不足500条,那有多少返回多少;如果超过500条,每次只返回500
    42. # 这个默认值在有些场景下太大,有些场景很难保证能够在5min内处理完500条消息,
    43. # 如果消费者无法在5分钟内处理完500条消息的话就会触发reBalance,
    44. # 然后这批消息会被分配到另一个消费者中,还是会处理不完,这样这批消息就永远也处理不完。
    45. # 要避免出现上述问题,提前评估好处理一条消息最长需要多少时间,然后覆盖默认的max.poll.records参数
    46. # 注:需要开启BatchListener批量监听才会生效,如果不开启BatchListener则不会出现reBalance情况
    47. max-poll-records: 3
    48. properties:
    49. # 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance
    50. max:
    51. poll:
    52. interval:
    53. ms: 600000
    54. # 当broker多久没有收到consumer的心跳请求后就触发reBalance,默认值是10s
    55. session:
    56. timeout:
    57. ms: 10000
    58. listener:
    59. # 在侦听器容器中运行的线程数,一般设置为 机器数*分区数
    60. concurrency: 4
    61. # 自动提交关闭,需要设置手动消息确认
    62. ack-mode: manual_immediate
    63. # 消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略错误
    64. missing-topics-fatal: false
    65. # 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance
    66. poll-timeout: 600000
    67. #批量提交
    68. # type: batch

    业务代码:

    1. 简单生产

      1. @RestController
      2. public class kafkaProducer {
      3. @Autowired
      4. private KafkaTemplate<String, Object> kafkaTemplate;
      5. @GetMapping("/kafka/normal/{message}")
      6. public void sendNormalMessage(@PathVariable("message") String message) {
      7. kafkaTemplate.send("testTopic", message);
      8. }
      9. }
    2. 简单消费

    注意加上@Component,被spring管理监听才有效

    1. @Component
    2. public class KafkaConsumer {
    3. //监听消费
    4. @KafkaListener(topics = {"testTopic"})
    5. public void onNormalMessage(ConsumerRecord<String, Object> record) {
    6. System.out.println("简单消费:" + record.topic() + "-" + record.partition() + "=" +
    7. record.value());
    8. }
    9. }

    主题使用配置的方式:

    1. 在yml配置里加上topic

      1. server:
      2. port: 8080
      3. spring:
      4. kafka:
      5. # Kafka服务器,支持集群
      6. bootstrap-servers: 127.0.0.1:9092,127.0.0.2:9092
      7. # 自定义配置
      8. myconfig:
      9. topic: testTopic
    2. 简单生产+配置主题

      1. @RestController
      2. public class kafkaProducer {
      3. @Autowired
      4. private KafkaTemplate<String, Object> kafkaTemplate;
      5. @Value("${spring.kafka.myconfig.topic}")
      6. private String topic;
      7. @GetMapping("/kafka/normal/{message}")
      8. public void sendNormalMessage(@PathVariable("message") String message) {
      9. kafkaTemplate.send(topic, message);
      10. }
      11. }
    3. 简单消费+配置主题

    注意这里不能用@Value注解的方式,会报错

    1. @Component
    2. public class KafkaConsumer {
    3. //监听消费
    4. @KafkaListener(topics = {"${spring.kafka.myconfig.topic}"})
    5. public void onNormalMessage(ConsumerRecord<String, Object> record) {
    6. System.out.println("简单消费:" + record.topic() + "-" + record.partition() + "=" +
    7. record.value());
    8. }
    9. }

    如果是多个主题需要消费, 可以使用Spring的SpEl表达式

    1. server:
    2. port: 8080
    3. spring:
    4. kafka:
    5. # Kafka服务器,支持集群
    6. bootstrap-servers: 127.0.0.1:9092,127.0.0.2:9092
    7. # 自定义配置
    8. myconfig:
    9. topic: testTopic1,testTopic2
    @KafkaListener(topics = {"#{'${spring.kafka.myconfig.topic}'.split(',')}"})
    1. @Component
    2. public class KafkaConsumer {
    3. //监听消费
    4. @KafkaListener(topics = {"#{'${spring.kafka.myconfig.topic}'.split(',')}"})
    5. public void onNormalMessage(ConsumerRecord<String, Object> record) {
    6. System.out.println("简单消费:" + record.topic() + "-" + record.partition() + "=" +
    7. record.value());
    8. }
    9. }

     Kafka数据重复和数据丢失的解决方案_kafka相同数据_谢小涛的博客-CSDN博客

  • 相关阅读:
    修改/etc/fstab文件导致Linux无法正常启动解决方法
    C++:STL--List
    chattr设置文件只读
    js 正则校验输入是否为空字符串
    Linux信号量
    D2ETR: Decoder-Only DETR with Computationally Efficient Cross-Scale Attention
    python之Flask入门
    golang设计模式——访问者模式
    windows下找出java程序占用cpu很高进程对应的线程 并找到问题代码
    DevC++的调试方法
  • 原文地址:https://blog.csdn.net/u011974797/article/details/133790694