• kafka的java客户端-生产者


    kafka的java客户端-生产者

    生产者消息发送流程

    发送原理

    在消息发送的过程中,涉及俩个线程,main线程和sender线程,在main线程中创建一个双端队列RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafkabroker

    在这里插入图片描述

    生产者重要参数列表

    参数名称描述
    bootstrap.servers生产者连接集群所需的 broker 地址清单。例如
    hadoop102:9092,hadoop103:9092,hadoop104:9092,可以设置 1 个或者多个,中间用逗号隔开。注意这里并非需要所有的 broker 地址,因为生产者从给定的 broker里查找到其他 broker信息
    key.serializer和 value.serializer指定发送消息的 key 和 value 的序列化类型。一定要写全类名
    buffer.memoryRecordAccumulator 缓冲区总大小,默认 32m。
    batch.size缓冲区一批数据最大值,默认 16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加
    linger.ms如果数据迟迟未达到 batch.size,sender 等待 linger.time之后就会发送数据。单位 ms,默认值是 0ms,表示没有延迟。生产环境建议该值大小为 5-100ms之间
    acks0:生产者发送过来的数据,不需要等数据落盘应答。
    1:生产者发送过来的数据,Leader收到数据后应答。
    -1(all):生产者发送过来的数据,Leader+和 isr 队列里面的所有节点收齐数据后应答。默认值是-1,-1 和all是等价的
    max.in.flight.requests.per.connection允许最多没有返回 ack 的次数,默认为 5,开启幂等性要保证该值是 1-5 的数字
    retries当消息发送出现错误的时候,系统会重发消息。retries表示重试次数。默认是 int最大值,2147483647。如果设置了重试,还想保证消息的有序性,需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否则在重试此失败消息的时候,其他的消息可能发送成功了
    retry.backoff.ms两次重试之间的时间间隔,默认是 100ms
    enable.idempotence是否开启幂等性,默认 true,开启幂等性
    compression.type生产者发送的所有数据的压缩方式。默认是 none,也就是不压缩。支持压缩类型:none、gzip、snappy、lz4 和 zstd

    生产者发送消息

    依赖

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.4.1</version>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    生产者同步发送消息

    public class MyProducer {
    
        private final static String TOPIC_NAME = "my-topic-test-1";
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            //1.创建kafka生产者配置对象
            Properties props = new Properties();
            //2.给 kafka 配置对象添加配置信息:bootstrap.servers
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "1.14.252.45:19092,1.14.252.45:19093,1.14.252.45:19094");
            //key,value 序列化(必须)
            //把发送的key从字符串序列化为字节数组
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            //把发送消息value从字符串序列化为字节数组
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            // 3. 创建 kafka 生产者对象
            Producer<String, String> producer = new KafkaProducer<>(props);
    
            Order order = new Order((long) 1, 100);
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME, order.getId().toString(), JSON.toJSONString(order));
            //4. 调用 send 方法,发送消息
            RecordMetadata metadata = producer.send(producerRecord).get();
            producer.close();
            //=====阻塞=======
            System.out.println("同步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-" + metadata.partition() + "|offset-" + metadata.offset());
        }
    }
    
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    @Builder
    public class Order {
        private Long id;
        private Integer num;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    在这里插入图片描述

    因为这里我们选择的是同步发送消息,在收到kafka的ack告知发送成功之前一直处于阻塞状态

    这里我们进入docker部署的kafka02容器中开启消费者,然后重新在运行一遍消费者

    在这里插入图片描述

    发送消息在指定分区上

    ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME,0, order.getId().toString(), JSON.toJSONString(order));
    
    • 1

    不指定分区,按照分区策略

    未指定发送分区,具体发送的分区计算公式:hash(key)%partitionNum

    ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME,order.getId().toString(), JSON.toJSONString(order));
    
    • 1

    带回调函数的异步发送

    默认就是异步发送

    回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception),如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。

    生产者发消息,发送完后不用等待broker给回复,直接执行下面的业务逻辑。可以提供callback,让broker异步的调用callback,告知生产者,消息发送的结果

    public class MyProducer2 {
        public static void main(String[] args) throws InterruptedException {
    
            //1.创建kafka生产者配置对象
            Properties props = new Properties();
            //2.给 kafka 配置对象添加配置信息:bootstrap.servers
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "1.14.252.45:19092,1.14.252.45:19093,1.14.252.45:19094");
            //key,value 序列化(必须)
            //把发送的key从字符串序列化为字节数组
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            //把发送消息value从字符串序列化为字节数组
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            // 3. 创建 kafka 生产者对象
            Producer<String, String> producer = new KafkaProducer<>(props);
            for (int i = 0; i < 5; i++) {
                Order order = new Order((long) i, 100);
                ProducerRecord<String, String> producerRecord = new ProducerRecord<>("first", order.getId().toString(), JSON.toJSONString(order));
                //4. 调用 send 方法,发送消息
                producer.send(producerRecord, new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata metadata, Exception exception) {
                        if (exception != null) {
                            System.err.println("发送消息失败:" +
                                    exception.getStackTrace());
                        }
                        if (metadata != null) {
                            System.out.println("异步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-" + metadata.partition() + "|offset-" + metadata.offset());
                        }
                    }
                });
            }
            producer.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    1. 开启 Kafka 消费者
    2. 运行idea中的代码

    在这里插入图片描述

    生产者ack参数配置

    在同步发消息的场景下:生产者发动broker上后,ack会有 3 种不同的选择

    1. acks=0: 表示producer不需要等待任何broker确认收到消息的回复,就可以继续发送下一条消息。性能最高,但是最容易丢消息。
    2. acks=1: 至少要等待leader已经成功将数据写入本地log,但是不需要等待所有follower是否成功写入。就可以继续发送下一条消息。这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息会丢失
    3. acks=-1或all: 需要等待 min.insync.replicas(默认为 1 ,推荐配置大于等于2) 这个参数配置的副本个数都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的数据保证。一般除非是金融级别,或跟钱打交道的场景才会使用这种配置
    props.put(ProducerConfig.ACKS_CONFIG, "1");
    
    • 1

    数据完全可靠条件 = ACK 级别设置为-1 + 分区副本大于等于2 + ISR 里应答的最小 副本 数量大于等于2

    public class CustomProducerAcks {
    
        public static void main(String[] args) {
    
            // 0 配置
            Properties properties = new Properties();
    
            // 连接集群 bootstrap.servers
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");
    
            // 指定对应的key和value的序列化类型 key.serializer
    //        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
    
            // acks
            properties.put(ProducerConfig.ACKS_CONFIG,"1");
    
            // 重试次数
            properties.put(ProducerConfig.RETRIES_CONFIG,3);
    
            // 1 创建kafka生产者对象
            // "" hello
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
    
            // 2 发送数据
            for (int i = 0; i < 5; i++) {
                kafkaProducer.send(new ProducerRecord<>("first","atguigu"+i));
            }
    
            // 3 关闭资源
            kafkaProducer.close();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    生产者如何提高吞吐量

    在这里插入图片描述

    public class CustomProducerParameters {
    
        public static void main(String[] args) {
    
            // 配置
            Properties properties = new Properties();
    
            // 连接kafka集群
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");
    
            // 序列化
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
    
            // 缓冲区大小
            properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
    
            // 批次大小
            properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
    
            // linger.ms
            properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
    
            // 压缩
            properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");
    
    
            // 1 创建生产者
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
    
            // 2 发送数据
            for (int i = 0; i < 5; i++) {
                kafkaProducer.send(new ProducerRecord<>("first","atguigu"+i));
            }
    
            // 3 关闭资源
            kafkaProducer.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    生产者数据去重

    幂等性

    幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复。
    精确一次( (Exactly Once) ) = 幂等性 + 至少一次( ( ack=-1 + 分区副本数>=2 + ISR 最小副本数量>=2) )

    如何使用幂等性

    开启参数 enable.idempotence 默认为 true,false关闭

    生产者事务

    在这里插入图片描述

    事务相关API

    // 1 初始化事务
    void initTransactions();
    // 2 开启事务
    void beginTransaction() throws ProducerFencedException;
    // 3 在事务内提交已经消费的偏移量(主要用于消费者)
    void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
    String consumerGroupId) throws
    ProducerFencedException;
    // 4 提交事务
    void commitTransaction() throws ProducerFencedException;
    // 5 放弃事务(类似于回滚事务的操作)
    void abortTransaction() throws ProducerFencedException;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    单个 Producer,使用事务保证消息的仅一次发送

    public class CustomProducerTranactions {
    
        public static void main(String[] args) {
    
            // 0 配置
            Properties properties = new Properties();
    
            // 连接集群 bootstrap.servers
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");
    
            // 指定对应的key和value的序列化类型 key.serializer
    //        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    
            // 指定事务id
            properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tranactional_id_01");
    
            // 1 创建kafka生产者对象
            // "" hello
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
    
            kafkaProducer.initTransactions();
    
            kafkaProducer.beginTransaction();
    
            try {
                // 2 发送数据
                for (int i = 0; i < 5; i++) {
                    kafkaProducer.send(new ProducerRecord<>("first", "atguigu" + i));
                }
    
                int i = 1 / 0;
    
                kafkaProducer.commitTransaction();
            } catch (Exception e) {
                kafkaProducer.abortTransaction();
            } finally {
                // 3 关闭资源
                kafkaProducer.close();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
  • 相关阅读:
    NFTScan 浏览器再升级:优质数据服务新体验来袭
    【Linux】信号的产生
    低代码相关概念及钉钉宜搭初使用
    [C#] 允许当前应用程序通过防火墙
    JavaScript基础知识: 作用域和闭包
    『牛客|每日一题』逆波兰表达式
    金x软件有限公司安全测试岗位面试
    经典卷积神经网络模型 - InceptionNet
    读《Gaitset: Regarding gait as a set for cross-view gait recognition》
    自动装车系统车辆定位-激光雷达解决方案
  • 原文地址:https://blog.csdn.net/weixin_43296313/article/details/125525493