• kafka-生产者拦截器(SpringBoot整合Kafka)


    1、生产者拦截器

    1.1、创建生产者拦截器

    package com.atguigu.kafka.interceptor;
    import org.apache.kafka.clients.producer.ProducerInterceptor;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.springframework.stereotype.Component;
    import java.util.Map;
    //拦截器必须手动注册给kafka生产者(KafkaTemplate)
    @Component
    public class MyKafkaInterceptor implements ProducerInterceptor<String,String> {
        //kafka生产者发送消息前执行:拦截发送的消息预处理
        @Override
        public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {
            System.out.println("生产者即将发送消息:topic = "+ producerRecord.topic()
            +",partition:"+producerRecord.partition()
            +",key = "+producerRecord.key()
            +",value = "+producerRecord.value());
            return null;
        }
    
        //kafka broker 给出应答后执行
        @Override
        public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
            //exception为空表示消息发送成功
            if(e == null){
                System.out.println("消息发送成功:topic = "+ recordMetadata.topic()
                        +",partition:"+recordMetadata.partition()
                        +",offset="+recordMetadata.offset()
                +",timestamp="+recordMetadata.timestamp());
            }
        }
    
        @Override
        public void close() {
    
        }
    
        @Override
        public void configure(Map<String, ?> map) {
    
        }
    }
    
    

    1.2、KafkaTemplate配置生产者拦截器

    package com.atguigu.kafka.producer;
    
    import com.atguigu.kafka.interceptor.MyKafkaInterceptor;
    import jakarta.annotation.PostConstruct;
    import jakarta.annotation.Resource;
    import org.junit.jupiter.api.Test;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.kafka.core.KafkaTemplate;
    import java.io.IOException;
    
    @SpringBootTest
    class KafkaProducerApplicationTests {
    
        //装配kafka模板类: springboot启动时会自动根据配置文初始化kafka模板类对象注入到容器中
        @Resource
        KafkaTemplate kafkaTemplate;
    
        @Resource
        MyKafkaInterceptor myKafkaInterceptor;
    
        @PostConstruct
        public void init() {
            kafkaTemplate.setProducerInterceptor(myKafkaInterceptor);
        }
        @Test
        void contextLoads() throws IOException {
            kafkaTemplate.send("my_topic1", "spring-kafka-生产者拦截器");
    
            //回调是等kafka,ack以后才执行,需要阻塞
            System.in.read();
        }
    }
    
    

    1.3、使用Java代码创建主题分区副本

    package com.atguigu.kafka.config;
    import org.apache.kafka.clients.admin.NewTopic;
    import org.springframework.context.annotation.Bean;
    import org.springframework.kafka.config.TopicBuilder;
    import org.springframework.stereotype.Component;
    @Component
    public class KafkaTopicConfig {
        @Bean
        public NewTopic myTopic1() {
            //相同名称的主题 只会创建一次,后面创建的主题名称相同配置不同可以做增量更新(分区、副本数)
            return TopicBuilder.name("my_topic1")//主题名称
                    .partitions(3)//主题分区
                    .replicas(3)//主题分区副本数
                    .build();//创建
        }
    }
    
    

    1.4、application.yml配置----v1版

    server:
      port: 8110
    
    # v1
    spring:
      kafka:
        bootstrap-servers: 192.168.74.148:9095,192.168.74.148:9096,192.168.74.148:9097
        producer: # producer 生产者
          retries: 0 # 重试次数 0表示不重试
          acks: -1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选01-1/all)
          batch-size: 16384 # 批次大小 单位byte
          buffer-memory: 33554432 # 生产者缓冲区大小 单位byte
          key-serializer: org.apache.kafka.common.serialization.StringSerializer # key的序列化器
          value-serializer: org.apache.kafka.common.serialization.StringSerializer # value的序列化器
    
    

    1.5、屏蔽 kafka debug 日志 logback.xml

    <configuration>      
        
        <logger name="org.apache.kafka.clients" level="debug" />
    configuration>
    
    

    1.6、引入spring-kafka依赖

    
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0modelVersion>
        <parent>
            <groupId>org.springframework.bootgroupId>
            <artifactId>spring-boot-starter-parentartifactId>
            <version>3.0.5version>
            <relativePath/> 
        parent>
    
        
        
        <groupId>com.atguigu.kafkagroupId>
        <artifactId>kafka-producerartifactId>
        <version>0.0.1-SNAPSHOTversion>
        <name>kafka-producername>
        <description>kafka-producerdescription>
        <properties>
            <java.version>17java.version>
        properties>
        <dependencies>
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starterartifactId>
            dependency>
    
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starter-testartifactId>
                <scope>testscope>
            dependency>
    
    
            <dependency>
                <groupId>org.springframework.kafkagroupId>
                <artifactId>spring-kafkaartifactId>
            dependency>
        dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.bootgroupId>
                    <artifactId>spring-boot-maven-pluginartifactId>
                plugin>
            plugins>
        build>
    
    project>
    
    
    

    1.7、控制台日志

    生产者即将发送消息:topic = my_topic1,partition:null,key = null,value = spring-kafka-生产者拦截器
    消息发送成功:topic = my_topic1,partition:0,offset=0,timestamp=1717490776329
    
    [
      [
        {
          "partition": 0,
          "offset": 0,
          "msg": "spring-kafka-生产者拦截器",
          "timespan": 1717490776329,
          "date": "2024-06-04 08:46:16"
        }
      ]
    ]
    

    在这里插入图片描述

  • 相关阅读:
    Cesium中自定义材质material
    浮点数精度、域宽、填充
    JS中内置的日期类Date,显示系统时间、停止系统时间
    DirectX12_Windows_GameDevelop_4:Direct3D应用程序框架
    肝通宵写了三万字把SQL数据库的所有命令,函数,运算符讲得明明白白讲解,内容实在丰富,建议收藏+三连好评!
    抖音 UG 社招一面算法原题
    Python面试题:如何在 Python 中实现一个简单的 Web 服务器?
    【Qt按钮基类】QAbstractButton[ 所有按钮基类 ]
    Java面试之JavaWeb常用框架(offer 拿来吧你)
    CNN反向求导推导
  • 原文地址:https://blog.csdn.net/m0_65152767/article/details/139447764