• SpringBoot使用kafka事务-消费者方


    前言

    在上一篇文章中,写到了如何在springboot中生产者如何使用kafka的事务,详情链接:Springboot使用kafka事务-生产者方

    那么,这一篇就接着上篇所写的内容,讲解一下再springboot中消费者如何使用kafka的事务。

    实现

    在springboot中kafka的消费者配置也和生产者一样,有两种配置的方式:

    • 第一种是使用springboot提供的自定装配机制
    • 第二种是自定义配置

    自动装配机制

    在springboot的配置文件中加入以下代码即可实现

       spring:
       	kafka:
       		bootstrap-servers: localhost:9092
    	    consumer:
    	      group-id: test_group #默认组id  后面会配置多个消费者组
    	      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    	      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    	      isolation-level: read_committed
    	      enable-auto-commit: false #关闭自动提交
    	      auto-commit-interval: 100
    	      max-poll-records: 20 #批量消费 一次接收的最大数量
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    这样就实现了事务的自动状态,特别注意的是配置文件中的isolation-level属性,这个属性一定要设置读已提交的事务级别,这样才能配合生产者实现事务的特性。

    使用

    这种配置方式的使用就很简单了,
    第一:新建一个管理类,类名上用Component注解标识为需要springboot管理

    @Component
    public class kafkaConfigs {
    }
    
    • 1
    • 2
    • 3

    第二:使用springboot提供的KafkaListener注解,即可使用

        @KafkaListener
        public void testListener(String data) {
            log.info("接受到的数据为: {} ",data);
        }
    
    • 1
    • 2
    • 3
    • 4

    全部代码如下:

    @Component
    public class kafkaConfigs {
    	@KafkaListener
        public void testListener(String data) {
            log.info("接受到的数据为: {} ",data);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    缺点

    自动装配机制是很方便的,但是在一些场景下,我们需要连接多个kafka的地址来实现不同的业务,而且有的场景之下我们并不需要kafka的事务管理机制,所以这就需要用到我们的第二种方法,自定义配置了。

    自定义配置

    这次,我们使用springboot为我们提供的KafkaListener注解来实现这个功能。
    在yml配置文件中加入第二个kakfa的连接地址,并且将事务紫隔离级别去掉即可。

       spring:
       	kafka:
       		bootstrap-servers: localhost:9092
       		bootstrap-servers-2: localhost2:9092
    	    consumer:
    	      group-id: test_group #默认组id  后面会配置多个消费者组
    	      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    	      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    	      enable-auto-commit: false #关闭自动提交
    	      auto-commit-interval: 100
    	      max-poll-records: 20 #批量消费 一次接收的最大数量
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    注意bootstrap-servers-2这个key,是我们自定义的key,它在kafka的自动配置包里面是没有的。

    使用

    自定义配置的使用和第一种使用方式大同小异,具体为:
    第一:新建一个管理类,类名上用Component注解标识为需要springboot管理

    @Component
    public class kafkaConfigs {
    }
    
    • 1
    • 2
    • 3

    第二:使用springboot提供的KafkaListener注解,并且在这里标识需要使用到的kafka连接地址以及事务隔离级别

        @KafkaListener(topics = "my-topics2" , groupId = "my-group2",properties = {"bootstrap.servers=${spring.kafka.bootstrap-servers-2}","isolation.level=read_committed"})
        public void testListener1(String data) {
            log.info("接受到的数据为: {} ",data);
        }
    
    • 1
    • 2
    • 3
    • 4

    全代码如下:

    @Component
    public class kafkaConfigs {
        @KafkaListener(topics = "my-topics2" , groupId = "my-group2",properties = {"bootstrap.servers=${spring.kafka.bootstrap-servers-2}","isolation.level=read_committed"})
        public void testListener1(String data) {
            log.info("接受到的数据为: {} ",data);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    可以看到,我们使用了properties属性指定了需要连接的kafka地址,并且指定了事务的隔离级别,这样就实现了一个具有事务功能的消费者,并且对其他方法不产生任何影响。

    总结

    以上,我们使用两种方式配置springboot中kafka消费者如何使用事务,读者可以结合上篇文章结合食用,效果更佳!


    上篇链接:Springboot使用kafka事务-生产者方

  • 相关阅读:
    做个清醒的程序员之拒绝工作
    【DevPress】V2.4.4版本发布,增加数据看板功能
    关于12306网站抢票的架构设计
    教育、卫生和社会服务-省级面板数据数据(1994-2019年)
    Mac安装GYM遇到的一些坑
    第一章 - 第10节- 计算机网络 - 课后习题
    导入csv文件表头字符串出现zwnbsp字符(零宽度空白字符)处理
    【计算机网络】计算机网络体系结构
    云原生技术如何应用到智慧城市数字底座建设中?
    第二个Maven工程_java培训
  • 原文地址:https://blog.csdn.net/qq_43252643/article/details/132649776