• Kafka - 异步/同步发送API



    在这里插入图片描述


    异步发送

    普通异步发送

    需求:创建Kafka生产者,采用异步的方式发送到Kafka broker

    异步发送流程

    在这里插入图片描述

    Code

    
    <dependency>
        <groupId>org.apache.kafkagroupId>
        <artifactId>kafka-clientsartifactId>
        <version>3.6.0version>
    dependency>
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    package com.artisan.pc;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    
    /**
     * @author 小工匠
     * @version 1.0
     * @mark: show me the code , change the world
     */
    public class CustomProducer {
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
    
            // 1. 创建kafka生产者的配置对象
            Properties properties = new Properties();
    
            // 2. 给kafka配置对象添加配置信息
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.126.170:9092");
    
            // key,value序列化
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    
            // 3. 创建kafka生产者对象
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
    
            // 4. 调用send方法,发送消息
            for (int i = 0; i < 10; i++) {
                RecordMetadata art = kafkaProducer.send(new ProducerRecord<>("art", "kafka-msg-" + i)).get();
                System.out.println(art.offset());
                System.out.println("over - " + i);
            }
    
            // 5. 关闭资源
            kafkaProducer.close();
    
        }
    
    }
        
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    输出

    31
    over - 0
    32
    over - 1
    33
    over - 2
    34
    over - 3
    35
    over - 4
    36
    over - 5
    37
    over - 6
    38
    over - 7
    39
    over - 8
    40
    over - 9
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    忽略我这个offset … 我都发了好多次了…

    看控制台的吧

    在这里插入图片描述


    回调函数的异步发送

    回调函数callback()会在producer收到ack时调用,为异步调用

    该方法有两个参数分别是RecordMetadata(元数据信息)和Exception(异常信息)。

    • 如果Exception为null,说明消息发送成功,
    • 如果Exception不为null,说明消息发送失败

    带回调函数的异步发送流程

    在这里插入图片描述

    注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

    Code

    package com.artisan.pc;
    
    import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    
    /**
     * @author 小工匠
     * @version 1.0
     * @mark: show me the code , change the world
     */
    public class CustomProducerWithCallBack {
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
    
            // 1. 创建kafka生产者的配置对象
            Properties properties = new Properties();
    
            // 2. 给kafka配置对象添加配置信息
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.126.170:9092");
    
            // key,value序列化
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    
            // 3. 创建kafka生产者对象
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
    
            // 4. 调用send方法,发送消息
            for (int i = 0; i < 10; i++) {
                // 添加回调
                // 该方法在Producer收到ack时调用,为异步调用
                kafkaProducer.send(new ProducerRecord<>("art", "kafka-msg-callback-" + i), (recordMetadata, e) -> {
                    // 没有异常,输出信息到控制台
                    System.out.println("主题" + recordMetadata.topic() + ", 分区:" + recordMetadata.partition() + ", 偏移量:" + recordMetadata.offset());
                });
            }
    
            // 5. 关闭资源
            kafkaProducer.close();
    
        }
    
    }
        
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    在这里插入图片描述

    控制台

    在这里插入图片描述


    同步发送API

    同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回ack。
    由于send方法返回的是一个Future对象,根据Futrue对象的特点,我们也可以实现同步发送的效果,只需在调用Future对象的get方发即可

    在这里插入图片描述

    package com.artisan.pc;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    
    /**
     * @author 小工匠
     * @version 1.0
     * @mark: show me the code , change the world
     */
    public class CustomProducerSync {
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
    
            // 1. 创建kafka生产者的配置对象
            Properties properties = new Properties();
    
            // 2. 给kafka配置对象添加配置信息
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.126.170:9092");
    
            // key,value序列化
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    
            // 3. 创建kafka生产者对象
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
    
            // 4. 调用send方法,发送消息
            for (int i = 0; i < 10; i++) {
                // 通过Future接口的get实现同步阻塞
                kafkaProducer.send(new ProducerRecord<>("art", "kafka-msg-get-" + i)).get() ;
            }
    
            // 5. 关闭资源
            kafkaProducer.close();
    
        }
    
    }
        
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    在这里插入图片描述

  • 相关阅读:
    C++类模板是一种通用的编程工具,可以创建可以适用于多种数据类型的类
    充分复用离线空闲算力,降低了实时计算资源开支
    OKLink携手CertiK在港举办Web3生态安全主题论坛
    springBoot 属性绑定
    极智AI | 讲解 TensorRT 怎么实现 torch.select 层
    常用音频接口:TDM,PDM,I2S,PCM
    yolov8封装进入ROS系统
    2022新版PMP考试有哪些变化?
    指向指针的指针pp指向指针p
    【2023,学点儿新Java-46】条件运算符:语法格式及示例;基础练习:获取两个数/三个数中的较大值;星期运算 | 附:测试代码 位运算符的使用 | 运算符优先级
  • 原文地址:https://blog.csdn.net/yangshangwei/article/details/134045040