• Springboot 集成kafka


    一、创建项目并导入pom依赖

    1. <dependency>
    2. <groupId>org.springframework.kafkagroupId>
    3. <artifactId>spring-kafkaartifactId>
    4. dependency>

    二、修改application.yml配置

    1. producer 生产端的配置

    1. spring:
    2. #重要提示:kafka配置,该配置属性将直接注入到KafkaTemplate中
    3. kafka:
    4. bootstrap-servers: 192.168.168.160:9092

    2. consumer 消费端的配置,需要给consumer配置一个group-id

    1. spring:
    2. #重要提示:kafka配置,该配置属性将直接注入到KafkaTemplate中
    3. kafka:
    4. bootstrap-servers: 192.168.168.160:9092
    5. #https://kafka.apache.org/documentation/#consumerconfigs
    6. consumer:
    7. group-id: auto-dev #消费者组

    三、生产者生产消息,消费者消费消息

    1. 简单消费

    producer生产者中使用自动注入的方式创建KafkaTemplate 对象

    1. @Autowired
    2. private KafkaTemplate kafkaTemplate;
    3. @Test
    4. void sendMessage(){
    5. // 第一个参数为topic,第二个为消息体
    6. kafkaTemplate.send("ifun","hello");
    7. }

    consumer消费消息,使用@KafkaListener注解监听topic为ifun中的消息,可以监听多个topic

    1. @Component
    2. @Slf4j
    3. public class ConsumerListener {
    4. // 消费监听
    5. @KafkaListener(topics = {"ifun"})
    6. public void onMessage(ConsumerRecord record){
    7. // 消费的哪个topic、partition的消息,打印出消息内容
    8. log.info("topic:{},partition:{},消息:{}",record.topic(),record.partition(),record.value());
    9. }
    10. }

    2. 带回调的生产者,两种方式

    1. @Test
    2. void sendCallBackMessageOne(){
    3. kafkaTemplate.send("ifun","hello callback one").addCallback(success -> {
    4. // 消息发送到的topic
    5. String topic = success.getRecordMetadata().topic();
    6. // 消息发送到的分区
    7. int partition = success.getRecordMetadata().partition();
    8. // 消息在分区内的offset
    9. long offset = success.getRecordMetadata().offset();
    10. log.info("send success:topic:{} partition:{} offset:{}",topic,partition,offset);
    11. }, failure -> {
    12. log.info("send fail:message:{} ", failure.getMessage());
    13. });
    14. }
    15. @Test
    16. void sendCallBackMessageTwo(){
    17. kafkaTemplate.send("ifun", "hello callback two").addCallback(new ListenableFutureCallback>() {
    18. @Override
    19. public void onFailure(Throwable ex) {
    20. log.info("send fail:message:{} ", ex.getMessage());
    21. }
    22. @Override
    23. public void onSuccess(SendResult result) {
    24. String topic = result.getRecordMetadata().topic();
    25. int partition = result.getRecordMetadata().partition();
    26. long offset = result.getRecordMetadata().offset();
    27. log.info("send success:topic:{} partition:{} offset:{}",topic,partition,offset);
    28. }
    29. });
    30. }

    回调补充,全局回调,需要继承ProducerListener,并重写onSuccess和onError方

    1. @Component
    2. @Slf4j
    3. public class KafkaSendResultHandler implements ProducerListener {
    4. @Override
    5. public void onSuccess(ProducerRecord producerRecord,
    6. RecordMetadata recordMetadata) {
    7. String topic = recordMetadata.topic();
    8. int partition = recordMetadata.partition();
    9. long offset = recordMetadata.offset();
    10. log.info("send success:topic:{} partition:{} offset:{}",topic,partition,offset);
    11. }
    12. @Override
    13. public void onError(ProducerRecord producerRecord, RecordMetadata recordMetadata, Exception exception) {
    14. log.info("send fail : {}", exception.getMessage());
    15. }
    16. }

    3. 配置自定义分区策略

    application.yml中需要指定分区策略的class

    1. spring:
    2. #重要提示:kafka配置,该配置属性将直接注入到KafkaTemplate中
    3. kafka:
    4. bootstrap-servers: 192.168.168.160:9092
    5. #https://kafka.apache.org/documentation/#producerconfigs
    6. producer:
    7. properties:
    8. partitioner.class: com.ifun.kafka.producer.config.CustomPartitioner

    分区类的实现

    1. @Component
    2. @Slf4j
    3. public class CustomPartitioner implements Partitioner {
    4. @Override
    5. public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    6. // 自定义分区规则(这里假设全部发到0号分区)
    7. log.info("自定义分区策略 topic:{} key:{} value:{}",topic,key,value.toString());
    8. return 0;
    9. }
    10. @Override
    11. public void close() {
    12. }
    13. @Override
    14. public void configure(Map configs) {
    15. }
    16. }

    4. kafka事务提交

    如果在发送消息的时候需要创建事务,可以使用KafkaTemplate的executeInTransaction方法来声明事务。

    application.yml增加transaction配置

    1. java.lang.IllegalStateException: Producer factory does not support transactions
    2. org.apache.kafka.common.config.ConfigException: Must set acks to all in order to use the idempotent producer. Otherwise we cannot guarantee idempotence.
    3. java.lang.IllegalStateException: No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record
    4. 第一个异常是你没有配置transactions
    5. 第二个异常是因为你配置的acks不为all
    6. 第三个是正常的send方法,但是抛异常了,需要加@Transactional 注解
    7. spring:
    8. #重要提示:kafka配置,该配置属性将直接注入到KafkaTemplate中
    9. kafka:
    10. bootstrap-servers: 192.168.168.160:9092
    11. #https://kafka.apache.org/documentation/#producerconfigs
    12. producer:
    13. properties:
    14. partitioner.class: com.ifun.kafka.producer.config.CustomPartitioner
    15. acks: all
    16. transaction-id-prefix: "IFUN_TX"

    发送消息代码

    1. @Test
    2. @Transactional
    3. void sendWithException(){
    4. kafkaTemplate.send("ifun","不带事务提交!");
    5. kafkaTemplate.executeInTransaction(oper->{
    6. oper.send("ifun","带事务的提交");
    7. throw new RuntimeException("fail 1");
    8. });
    9. throw new RuntimeException("fail 2");
    10. }

    带事务的提交消息发送失败

     不带事务的消息被成功消费

     5. 消费者配置更详细的配置

    @KafkaListener注解说明:

    1. id:唯一标识。如果没有配置,取application.yml中的 consumer.groupId
    2. idIsGroup :默认true,true的话代表该consumer分组group!
    3. groupId:消费者分组。如果不填,取id (idIsGroup=true)作为分组。否则取application.yml中的 consumer.groupId
    4. topic 与 topicPartitions 不能共用。
    5. topic:类似于subscripe订阅模式。
    6. topicPartitions类似于assign手动分配模式。
    1. @KafkaListener(id = "ifun-001",groupId = "ifun-01", topicPartitions={
    2. @TopicPartition(topic = "ifun1",partitions = {"0"}),
    3. @TopicPartition(topic = "ifun2",
    4. partitions = {"0"},
    5. partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "8"))
    6. }
    7. )
    8. public void onTopicsMessage(ConsumerRecord record){
    9. log.info("topic:{},partition:{},消息:{}",record.topic(),record.partition(),record.value());
    10. }

    解释:这里定义了消费者id为ifun-001,消费者组id为ifun-01,同时监听两个topic,ifun1和ifun2,其中监听ifun1的0号分区,ifun2的0号和1号分区,其中1号分区开始的offset为8,也就是说如果next-offset大于8就会消费,小于8不会消费。

    6. 消费者批量消费

    需要在application.yml中开启批量消费,sping.kafka.listener.type: batch 监听类型为batch,spring.kafka.consumer.max-poll-records 批量消费每次最多消费多少条消息,接收消息的时候需要使用List来接收。

    1. spring:
    2. #重要提示:kafka配置,该配置属性将直接注入到KafkaTemplate中
    3. kafka:
    4. bootstrap-servers: 192.168.168.160:9092
    5. #https://kafka.apache.org/documentation/#consumerconfigs
    6. listener:
    7. #batch single
    8. type: batch
    9. consumer:
    10. group-id: auto-dev #消费者组
    11. max-poll-records: 3
    1. @KafkaListener(topics = {"ifun"})
    2. public void onBatchMessage(List> records){
    3. log.info("批量消费");
    4. for (ConsumerRecord record : records) {
    5. log.info("topic:{},partition:{},消息:{}",record.topic(),record.partition(),record.value());
    6. }
    7. }

     7. 消费端手动ack

    设置spring.kafka.consumer.enable-auto-commit 为false 的时候 spring.kafka.listener.ack-mode 才会生效,设置为手动的manual表示手动

    1. spring:
    2. #重要提示:kafka配置,该配置属性将直接注入到KafkaTemplate中
    3. kafka:
    4. bootstrap-servers: 192.168.168.160:9092
    5. #https://kafka.apache.org/documentation/#consumerconfigs
    6. listener:
    7. #batch single
    8. type: batch
    9. # 手动确认模式 RECORD, BATCH, TIME, COUNT, COUNT_TIME, MANUAL, MANUAL_IMMEDIATE;
    10. ack-mode: manual
    11. consumer:
    12. #消费者组 id
    13. group-id: auto-dev
    14. max-poll-records: 3
    15. #是否自动提交偏移量offset
    16. enable-auto-commit: false

    消费代码

    1. @KafkaListener(topics = {"ifun"})
    2. public void onBatchMessage(List> records, Acknowledgment ack){
    3. try{
    4. log.info("批量消费");
    5. for (ConsumerRecord record : records) {
    6. log.info("topic:{},partition:{},消息:{}",record.topic(),record.partition(),record.value());
    7. }
    8. }finally {
    9. ack.acknowledge();
    10. }
    11. }

    如果没有ack,那么会出现如下情况:

    1. 如果在消费kafka的数据过程中,一直没有提交offset,那么在此程序运行的过程中它不会重复消费。但是如果重启之后,就会重复消费之前没有提交offset的数据。
    2. 如果在消费的过程中有几条或者一批数据数据没有提交offset,后面其他的消息消费后正常提交offset,那么服务端会更新为消费后最新的offset,不会重新消费,就算重启程序也不会重新消费。
    3. 消费者如果没有提交offset,程序不会阻塞或者重复消费,除非在消费到这个你不想提交offset的消息时你尝试重新初始化一个客户端消费者,即可再次消费这个未提交offset的数据。因为客户端也记录了当前消费者的offset信息,所以程序会在每次消费了数据之后,自己记录offset,而手动提交到服务端的offset与这个并没有关系,所以程序会继续往下消费。在你重新初始化客户端消费者之后,会从服务端得到最新的offset信息记录到本地。所以说如果当前的消费的消息没有提交offset,此时在你重新初始化消费者之后,可得到这条未提交消息的offset,从此位置开始消费。
       

    9. 消费异常捕获

    配置ConsumerAwareListenerErrorHandler 处理类,在listener上设置errorHandler属性为ConsumerAwareListenerErrorHandler的BeanName

    1. @Bean
    2. public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() {
    3. return (message, exception, consumer) -> {
    4. System.out.println("消费异常:"+message.getPayload());
    5. return null;
    6. };
    7. }
    1. @KafkaListener(topics = {"ifun"}, errorHandler = "consumerAwareErrorHandler")
    2. public void onBatchMessage(List> records, Acknowledgment ack){
    3. try{
    4. log.info("批量消费");
    5. for (ConsumerRecord record : records) {
    6. log.info("topic:{},partition:{},消息:{}",record.topic(),record.partition(),record.value());
    7. throw new RuntimeException("消费异常");
    8. }
    9. }finally {
    10. ack.acknowledge();
    11. }
    12. }

    显示结果如下:

    10. 配置消息过滤器

    消息过滤器可以在消息抵达consumer之前被拦截,在实际应用中,我们可以根据自己的业务逻辑,筛选出需要的信息再交由KafkaListener处理,不需要的消息则过滤掉。

    需要为监听器工厂配置一个RecordFilterStrategy,返回true的时候消息将被抛弃,返回false会正常抵达监听器。

    然后在监听器上设置containerFactory属性为配置的过滤器工厂类

    1. @Bean
    2. public ConcurrentKafkaListenerContainerFactory filterContainerFactory() {
    3. ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
    4. factory.setConsumerFactory(consumerFactory);
    5. // 被过滤的消息将被丢弃
    6. factory.setAckDiscarded(true);
    7. // 消息过滤策略
    8. factory.setRecordFilterStrategy(consumerRecord -> {
    9. if (Integer.parseInt(consumerRecord.value().toString()) % 2 == 0) {
    10. return false;
    11. }
    12. //返回true消息则被过滤
    13. return true;
    14. });
    15. return factory;
    16. }
    1. @KafkaListener(topics = {"ifun"},containerFactory = "filterContainerFactory")
    2. public void onMessage(ConsumerRecord record){
    3. log.info("topic:{},partition:{},消息:{}",record.topic(),record.partition(),record.value());
    4. }

    过滤结果

    11. 消息转发

    Topic A 收到消息后将消息转发给Topic B,使用@SendTo注解即可

    1. @KafkaListener(topics = {"ifun"})
    2. @SendTo("ifun1")
    3. public String onMessage(ConsumerRecord record){
    4. log.info("topic {} 收到需要转发的消息:{}",record.topic(), record.value());
    5. return record.value()+" 【forward message】";
    6. }
    7. @KafkaListener(topics = {"ifun1"})
    8. public void onIFun1Message(ConsumerRecord record){
    9. log.info("topic:{},partition:{},消息:{}",record.topic(),record.partition(),record.value());
    10. }

     结果如下,可以看到消息先记过第一个topic,然后转发给了第二个topic

     12. 设置json序列化方式生产和消费消息

    消费端配置如下

    1. spring:
    2. #重要提示:kafka配置,该配置属性将直接注入到KafkaTemplate中
    3. kafka:
    4. bootstrap-servers: 192.168.168.160:9092
    5. #https://kafka.apache.org/documentation/#consumerconfigs
    6. consumer:
    7. #消费者组 id
    8. group-id: auto-dev
    9. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    10. value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
    11. properties:
    12. spring:
    13. json:
    14. trusted:
    15. # 配置json反序列化信任的包
    16. packages: '*'

    生产端配置如下

    1. spring:
    2. #重要提示:kafka配置,该配置属性将直接注入到KafkaTemplate中
    3. kafka:
    4. bootstrap-servers: 192.168.168.160:9092
    5. #https://kafka.apache.org/documentation/#producerconfigs
    6. producer:
    7. #key的编解码方法
    8. key-serializer: org.apache.kafka.common.serialization.StringSerializer
    9. #value的编解码方法
    10. value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

    编写消息实体类

    1. @Data
    2. public class UserInfo implements Serializable {
    3. private Long id;
    4. private String name;
    5. private Integer age;
    6. }

    发送消息

    1. @Test
    2. void sendMessage(){
    3. UserInfo userInfo = new UserInfo();
    4. userInfo.setAge(21);
    5. userInfo.setId(1L);
    6. userInfo.setName("Jack");
    7. kafkaTemplate.send("ifun",userInfo);
    8. }

    消费消息

    1. @KafkaListener(topics = {"ifun"})
    2. public void onMessage(UserInfo userInfo){
    3. log.info("消息:{}",userInfo);
    4. }

    消费结果如下:

     注意:发送的类要和消费的类的全类名一致才行,不能是类名一样,字段一样,但是包名不一样,这样会抛异常。

    四、kafka其他配置

    1. spring:
    2. #重要提示:kafka配置,该配置属性将直接注入到KafkaTemplate中
    3. kafka:
    4. bootstrap-servers: xx.xx.xx.xx:9092
    5. #https://kafka.apache.org/documentation/#producerconfigs
    6. #生产者配置
    7. producer:
    8. bootstrap-servers: xx.xx.xx.xx:9092
    9. #生产者发送消息失败重试次数
    10. retries: 1
    11. # 同一批次内存大小(默认16K)
    12. batch-size: 16384
    13. #生产者内存缓存区大小(300M = 300*1024*1024)
    14. buffer-memory: 314572800
    15. #acks=0:无论成功还是失败,只发送一次。无需确认
    16. #acks=1:即只需要确认leader收到消息
    17. #acks=all或-1:ISR + Leader都确定收到
    18. acks: 1
    19. #key的编解码方法
    20. key-serializer: org.apache.kafka.common.serialization.StringSerializer
    21. #value的编解码方法
    22. value-serializer: org.apache.kafka.common.serialization.StringSerializer
    23. #开启事务,但是要求ack为all,否则无法保证幂等性
    24. #transaction-id-prefix: "IFUN_TX"
    25. #额外的,没有直接有properties对应的参数,将存放到下面这个Map对象中,一并初始化
    26. properties:
    27. #自定义拦截器,注意,这里是classes(先于分区器)
    28. interceptor.classes: cn.com.controller.TimeInterceptor
    29. #自定义分区器
    30. #partitioner.class: com.alibaba.cola.kafka.test.customer.inteceptor.MyPartitioner
    31. #即使达不到batch-size设定的大小,只要超过这个毫秒的时间,一样会发送消息出去
    32. linger.ms: 1000
    33. #最大请求大小,200M = 200*1024*1024
    34. max.request.size: 209715200
    35. #Producer.send()方法的最大阻塞时间(115秒)
    36. max.block.ms: 115000
    37. #该配置控制客户端等待请求响应的最长时间。
    38. #如果超时之前仍未收到响应,则客户端将在必要时重新发送请求,如果重试次数(retries)已用尽,则会使请求失败。
    39. #此值应大于replica.lag.time.max.ms(broker配置),以减少由于不必要的生产者重试而导致消息重复的可能性。
    40. request.timeout.ms: 115000
    41. #等待send回调的最大时间。常用语重试,如果一定要发送,retries则配Integer.MAX
    42. #如果超过该时间:TimeoutException: Expiring 1 record(s) .. has passed since batch creation
    43. delivery.timeout.ms: 120000
    44. #https://kafka.apache.org/documentation/#consumerconfigs
    45. #消费者配置
    46. consumer:
    47. bootstrap-servers: xx.xx.xx.xx:9092
    48. #消费者组id
    49. group-id: default-group
    50. #消费方式: earliest:从头开始消费 latest:从最新的开始消费,默认latest
    51. auto-offset-reset: earliest
    52. #是否自动提交偏移量offset
    53. enable-auto-commit: false
    54. #前提是 enable-auto-commit=true。自动提交的频率
    55. auto-commit-interval: 1s
    56. #key 解码方式
    57. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    58. #value 解码方式
    59. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    60. #最大消费记录数
    61. max-poll-records: 2
    62. properties:
    63. #如果在这个时间内没有收到心跳,该消费者会被踢出组并触发{组再平衡 rebalance}
    64. session.timeout.ms: 120000
    65. #最大消费时间。此决定了获取消息后提交偏移量的最大时间,超过设定的时间(默认5分钟),服务端也会认为该消费者失效。踢出并再平衡
    66. max.poll.interval.ms: 300000
    67. #配置控制客户端等待请求响应的最长时间。
    68. #如果在超时之前没有收到响应,客户端将在必要时重新发送请求,
    69. #或者如果重试次数用尽,则请求失败。
    70. request.timeout.ms: 60000
    71. #服务器返回的最大数据量,不能超过admin的message.max.bytes单条数据最大大小
    72. max.partition.fetch.bytes: 1048576
    73. #订阅或分配主题时,允许自动创建主题。0.11之前,必须设置false
    74. allow.auto.create.topics: true
    75. # 如果设置的json解码器,需要配置所信任的包名
    76. spring:
    77. json:
    78. trusted:
    79. packages: '*'
    80. #监听器配置
    81. listener:
    82. #当enable.auto.commit的值设置为false时,该值会生效;为true时不会生效
    83. #manual_immediate:需要手动调用Acknowledgment.acknowledge()后立即提交
    84. ack-mode: manual_immediate
    85. #如果至少有一个topic不存在,true启动失败。false忽略
    86. missing-topics-fatal: true
    87. #单条消费 single 批量消费batch
    88. #批量消费需要配合 consumer.max-poll-records
    89. type: batch
    90. #配置多少,就为为每个消费者实例创建多少个线程。多出分区的线程空闲
    91. concurrency: 2
    92. template:
    93. default-topic: "default-topic"

    五、总结

    kafka的简单使用就到此结束了,和rabbitmq还是有挺大的区别的。大家快去试试吧。

  • 相关阅读:
    Unity2021+VS2022调试
    ASM使用小抄
    Java shp 转 GeoJson
    教你一绝招:如何快速提高学习成绩--这样学习,你离考取重点高中或名牌大学很近了
    Apache Dubbo 云原生可观测性的探索与实践
    ZNYQ初体验,持续记录中...
    《DevOps实践指南》- 读书笔记(三)
    赛桨PaddleScience v1.0 Beta:基于飞桨核心框架的科学计算通用求解器
    Java可重入锁(GPT编写)
    面试中的MySQL主从复制|手撕MySQL|对线面试官
  • 原文地址:https://blog.csdn.net/axibazZ/article/details/126902871