• Kafka3.x核心速查手册二客户端使用篇-1、从基础的客户端说起


    ​ 这一部分主要是从客户端使用的角度来理解Kakfa的重要机制。重点依然是要建立自己脑海中的Kafka消费模型。Kafka的HighLevel API使用是非常简单的,所以梳理模型时也要尽量简单化,主线清晰,细节慢慢扩展。

    # 一、从基础的客户端说起
    
    • 1

    ​ Kafka提供了非常简单的客户端API。只需要引入一个Maven依赖即可:

    <dependency>
    			<groupId>org.apache.kafkagroupId>
    			<artifactId>kafka_2.13artifactId>
    			<version>3.2.0version>
    		dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    1、消息发送者

    ​ 然后可以使用Kafka提供的Producer类,快速发送消息。

    public class MyProducer {
        private static final String BOOTSTRAP_SERVERS = "worker1:9092,worker2:9092,worker3:9092";
        private static final String TOPIC = "disTopic";
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            //PART1:设置发送者相关属性
            Properties props = new Properties();
            // 此处配置的是kafka的端口
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
            // 配置key的序列化类
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
            // 配置value的序列化类
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
            
            Producer<String,String> producer = new KafkaProducer<>(props);
            for(int i = 0; i < 5; i++) {
                //Part2:构建消息
                ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, Integer.toString(i), "MyProducer" + i);
                //Part3:发送消息
                //单向发送:不关心服务端的应答。
            	producer.send(record);
            	System.out.println("message "+i+" sended");
                //同步发送:获取服务端应答消息前,会阻塞当前线程。
                RecordMetadata recordMetadata = producer.send(record).get();
                String topic = recordMetadata.topic();
                long offset = recordMetadata.offset();
                String message = recordMetadata.toString();
                System.out.println("message:["+ message+"] sended with topic:"+topic+";offset:"+offset);
                //异步发送:消息发送后不阻塞,服务端有应答后会触发回调函数
                producer.send(record, new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                        if(null != e){
                            System.out.println("消息发送失败,"+e.getMessage());
                            e.printStackTrace();
                        }else{
                            String topic = recordMetadata.topic();
                            long offset = recordMetadata.offset();
                            String message = recordMetadata.toString();
                            System.out.println("message:["+ message+"] sended with topic:"+topic+";offset:"+offset);
                        }
                    }
                });
            }
            producer.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    ​ 整体来说,构建Producer分为三个步骤:

    1. 通过属性定制Producer :可选的属性都可以由ProducerConfig类管理。比如ProducerConfig.BOOTSTRAP_SERVERS_CONFIG属性,显然就是指发送者要将消息发到哪个Kafka集群上。在这个类中,对于大部分比较重要的属性,都配置了对应的DOC属性进行描述。
    2. 构建消息:Kafka的消息是一个Key-Value结构的消息。其中,key和value都可以是任意对象类型。
    3. 使用Producer发送消息。:通常用到的就是单向发送、同步发送和异步发送者三种发送方式。

    2、消息消费者

    ​ 接下来可以使用Kafka提供的Consumer类,快速消费消息。

    public class MyConsumer {
        private static final String BOOTSTRAP_SERVERS = "worker1:9092,worker2:9092,worker3:9092";
        private static final String TOPIC = "disTopic";
    
        public static void main(String[] args) {
            //PART1:设置发送者相关属性
            Properties props = new Properties();
            //kafka地址
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
            //每个消费者要指定一个group
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
            //key序列化类
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            //value序列化类
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            Consumer<String, String> consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Arrays.asList(TOPIC));
            while (true) {
                //PART2:拉取消息
                // 100毫秒超时时间
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofNanos(100));
                //PART3:处理消息
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("offset = " + record.offset() + ";key = " + record.key() + "; value= " + record.value());
                }
                //提交offset,消息就不会重复推送。
                consumer.commitSync();
    //            consumer.commitAsync();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    ​ 整体来说,Consumer同样是分为三个步骤:

    1. 通过属性定制Consumer :可选的属性都可以由ConsumerConfig类管理。在这个类中,同样对于大部分比较重要的属性,都配置了对应的DOC属性进行描述。
    2. 拉取消息:Kafka采用Consumer主动拉取消息的Pull模式。consumer主动从Broker上拉取一批感兴趣的消息。
    3. 处理消息,提交位点:消费者将消息拉取完成后,就可以交由业务自行处理对应的这一批消息了。只是消费者需要向Broker提交偏移量offset。如果不提交Offset,Broker会认为消费者端消息处理失败了,还会重复进行推送。

    ​ 以Producer、Consumer为主体的这一层Kafka的客户端API,在Kafka中成为HighLevel API。这一层API封装了Kafka运行的很多底层细节,使用起来是非常简单的,基本就是固定的按照这三个大的步骤进行。在具体使用过程中,最大的变数基本上来自于生产者和消费者的第一部分,也就是属性定制的部分。这些属性极大的影响了客户端程序的执行方式。

    渔与鱼:对于这些属性,你并不需要煞有介事的强行去记忆,随时可以根据ProducerConfig和ConsumerConfig以及他们的父类CommonClientConfig去理解,大部分的属性都配有非常简明扼要的解释。然后,尝试自己建立一个消息流转模型,去理解其中比较重要的一些属性。接下来,就只找几个相对比较重要的属性着重解释一下。

  • 相关阅读:
    StrongSORT(deepsort强化版)浅实战+代码解析
    解决 el-tree setChecked 方法偶尔失效的方法
    一位平凡毕业生的大学四年
    【Java】CompletableFuture学习记录
    History库源码分析-Action 动作类型
    Unity中动画系统的性能优化
    优咔科技创新连接方案助力高质量5G车联服务
    Redis缓存使用技巧和设计方案
    Linux:文件搜索
    HTML5期末大作业:基于 html css js仿腾讯课堂首页
  • 原文地址:https://blog.csdn.net/roykingw/article/details/126570720