• kafka-生产者事务-数据传递语义&事务介绍&事务消息发送


    1、kafka数据传递语义

    kafka发送消息时是否需要重试

    1. 仅发送一次:生产者发送消息后不重试,只发送一次 可能丢失消息 效率最高
    2. 至少一次:生产者发送消息后重试,可能重试多次 效率差
    3. 精准一次发送:生产者发送消息后无论是否重复发送 发送了多少次,在 kafka broker 中只保存一次消息,通过幂等性 + 生产者事务来实现

    kafak天然支持幂等性,每个消息头中带了一个唯一的标志 kafka broker 根据此标志判断消息是否已经发送过,生产者事务可以保证数据没有最终发送成功时,消费者不可以消费,如果生产者发送消息时出现异常会自动回滚(清除之前发送的事务中的消息)

    kafka天然幂等性:但是指的是生产者事务 生产消息时的幂等性,发送消息时消息中带唯一标识、broker接收到消息时如果重复不再保存,事务没提交消费者不能消费改消息

    2、kafka生产者事务

    保证消息生产的幂等性
    一组消息要么一起成功 被消费者消息 要么一起失败都不能被消费者消费

    1. 配置ack为-1 分区所有副本均落盘成功
    2. 配置生产者重试(发送失败可以继续发送:需要保证发送失败后再次发送消息到kafka实现 精准一次发送)
    3. 需要给事务分配事务id(区分一个事务中的多条消息)

    3、事务消息发送

    3.1、application.yml配置

    server:
      port: 8110
    
    # v1
    spring:
      kafka:
        bootstrap-servers: 192.168.74.148:9095,192.168.74.148:9096,192.168.74.148:9097
        producer: # producer 生产者
          retries: 1 # 重试次数 0表示不重试
          acks: -1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、-1/all)
          transaction-id-prefix: tx_  # 事务id前缀:配置后producer自动开启事务
          batch-size: 16384 # 批次大小 单位byte
          buffer-memory: 33554432 # 生产者缓冲区大小 单位byte
          key-serializer: org.apache.kafka.common.serialization.StringSerializer # key的序列化器
          value-serializer: org.apache.kafka.common.serialization.StringSerializer # value的序列化器
    
    

    3.2、创建生产者监听器

    package com.atguigu.kafka.listener;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.springframework.kafka.support.ProducerListener;
    @Component
    public class MyKafkaProducerListener implements ProducerListener<String,String> {
    
        //生产者 ack 配置为 0 只要发送即成功
        //ack为 1  leader落盘  broker ack之后 才成功
        //ack为 -1 分区所有副本全部落盘  broker ack之后 才成功
        @Override
        public void onSuccess(ProducerRecord<String, String> producerRecord, RecordMetadata recordMetadata) {
            //ProducerListener.super.onSuccess(producerRecord, recordMetadata);
            System.out.println("MyKafkaProducerListener消息发送成功:"+"topic="+producerRecord.topic()
            +",partition = "+producerRecord.partition()
            +",key = "+producerRecord.key()
            +",value = "+producerRecord.value()
            +",offset = "+recordMetadata.offset());
        }
    
        //消息发送失败的回调:监听器可以接收到发送失败的消息 可以记录失败的消息
        @Override
        public void onError(ProducerRecord<String, String> producerRecord, RecordMetadata recordMetadata, Exception exception) {
            System.out.println("MyKafkaProducerListener消息发送失败:"+"topic="+producerRecord.topic()
                    +",partition = "+producerRecord.partition()
                    +",key = "+producerRecord.key()
                    +",value = "+producerRecord.value()
                    +",offset = "+recordMetadata.offset());
            System.out.println("异常信息:" + exception.getMessage());
        }
    }
    

    3.3、创建生产者拦截器

    package com.atguigu.kafka.interceptor;
    import org.apache.kafka.clients.producer.ProducerInterceptor;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.springframework.stereotype.Component;
    import java.util.Map;
    //拦截器必须手动注册给kafka生产者(KafkaTemplate)
    @Component
    public class MyKafkaInterceptor implements ProducerInterceptor<String,String> {
        //kafka生产者发送消息前执行:拦截发送的消息预处理
        @Override
        public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {
            System.out.println("生产者即将发送消息:topic = "+ producerRecord.topic()
            +",partition:"+producerRecord.partition()
            +",key = "+producerRecord.key()
            +",value = "+producerRecord.value());
            return null;
        }
    
        //kafka broker 给出应答后执行
        @Override
        public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
            //exception为空表示消息发送成功
            if(e == null){
                System.out.println("消息发送成功:topic = "+ recordMetadata.topic()
                        +",partition:"+recordMetadata.partition()
                        +",offset="+recordMetadata.offset()
                +",timestamp="+recordMetadata.timestamp());
            }
        }
    
        @Override
        public void close() {
    
        }
    
        @Override
        public void configure(Map<String, ?> map) {
    
        }
    }
    

    3.4、发送消息测试

    package com.atguigu.kafka.producer;
    
    import com.atguigu.kafka.interceptor.MyKafkaInterceptor;
    import jakarta.annotation.PostConstruct;
    import jakarta.annotation.Resource;
    import org.junit.jupiter.api.Test;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.kafka.core.KafkaTemplate;
    import java.io.IOException;
    
    @SpringBootTest
    class KafkaProducerApplicationTests {
    
        //装配kafka模板类: springboot启动时会自动根据配置文初始化kafka模板类对象注入到容器中
        @Resource
        KafkaTemplate kafkaTemplate;
    
        @Resource
        MyKafkaInterceptor myKafkaInterceptor;
    
        @PostConstruct
        public void init() {
            kafkaTemplate.setProducerInterceptor(myKafkaInterceptor);
        }
        @Test
        void contextLoads() throws IOException {
            kafkaTemplate.send("my_topic1", "spring-kafka-生产者监听器");
            //回调是等kafka,ack以后才执行,需要阻塞
            System.in.read();
        }
    
        //kafka事务支持spring-tx的事务注解
        //单元测试中的事务会自动回滚
    
        @Test
        void testTransaction() throws  IOException {
    
           //多个消息的发送在一个事务中执行
            kafkaTemplate.executeInTransaction((var1) -> {
                //通过一个事务中的operations对象来发送消息,执行事务操作
                var1.send("my_topic1",0,"", "spring-kafka-事务1");
                var1.send("my_topic1",0,"", "spring-kafka-事务2");
                int i = 1/0;
                var1.send("my_topic1",0,"", "spring-kafka-事务3");
                return "发送消息失败";
            });
            System.in.read();
        }
    }
    
    

    3.5、使用Java代码创建主题分区副本

    package com.atguigu.kafka.config;
    import org.apache.kafka.clients.admin.NewTopic;
    import org.springframework.context.annotation.Bean;
    import org.springframework.kafka.config.TopicBuilder;
    import org.springframework.stereotype.Component;
    @Component
    public class KafkaTopicConfig {
        @Bean
        public NewTopic myTopic1() {
            //相同名称的主题 只会创建一次,后面创建的主题名称相同配置不同可以做增量更新(分区、副本数)
            return TopicBuilder.name("my_topic1")//主题名称
                    .partitions(3)//主题分区
                    .replicas(3)//主题分区副本数
                    .build();//创建
        }
    }
    

    3.6、屏蔽 kafka debug 日志 logback.xml

    <configuration>      
        
        <logger name="org.apache.kafka.clients" level="debug" />
    configuration>
    

    3.7、引入spring-kafka依赖

    
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0modelVersion>
        <parent>
            <groupId>org.springframework.bootgroupId>
            <artifactId>spring-boot-starter-parentartifactId>
            <version>3.0.5version>
            <relativePath/> 
        parent>
    
        
        
        <groupId>com.atguigu.kafkagroupId>
        <artifactId>kafka-producerartifactId>
        <version>0.0.1-SNAPSHOTversion>
        <name>kafka-producername>
        <description>kafka-producerdescription>
        <properties>
            <java.version>17java.version>
        properties>
        <dependencies>
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starterartifactId>
            dependency>
    
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starter-testartifactId>
                <scope>testscope>
            dependency>
    
    
            <dependency>
                <groupId>org.springframework.kafkagroupId>
                <artifactId>spring-kafkaartifactId>
            dependency>
        dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.bootgroupId>
                    <artifactId>spring-boot-maven-pluginartifactId>
                plugin>
            plugins>
        build>
    
    project>
    

    3.8、控制台日志

    生产者即将发送消息:topic = my_topic1,partition:0,key = ,value = spring-kafka-事务1
    生产者即将发送消息:topic = my_topic1,partition:0,key = ,value = spring-kafka-事务2
    MyKafkaProducerListener消息发送失败:topic=my_topic1,partition = 0,key = ,value = spring-kafka-事务1,offset = -1
    异常信息:Failing batch since transaction was aborted
    MyKafkaProducerListener消息发送失败:topic=my_topic1,partition = 0,key = ,value = spring-kafka-事务2,offset = -1
    异常信息:Failing batch since transaction was aborted
    
    java.lang.ArithmeticException: / by zero
    

    在这里插入图片描述

  • 相关阅读:
    能链科技深耕苏州,受邀参加中国金融科技产业峰会
    Linux磁盘挂载解挂硬盘
    E9000服务器更改初始密码
    java框架-Springboot3-基础特性+核心原理
    【论文精读】TextDiffuser-2:释放语言模型用于文本渲染的力量
    SpringCloud:分布式事务Seata部署和集成
    el-date-picker的使用,及解决切换type时面板样式错乱问题
    Linux下的的GDB调试技巧二 —— 基本功能
    337. 打家劫舍 III
    什么是shell?模拟实现shell(深刻理解shell的内建命令)
  • 原文地址:https://blog.csdn.net/m0_65152767/article/details/139452792