• kafka—消费者


    一、消费者工作流程

    消费者Consumer采用从broker中主动拉取数据,Kafka采用这种方式

    1. 生产者Producer向每一个分区的leader发送数据,follower主动跟leader同步数据保证数据的可靠性
    2. 消费者Consumer消费某一个分区的数据,一个消费者可以消费多个分区的数据
    3. 每个分区的数据只能有一个消费者组中的一个消费者消费,即同一个分区不能有消费者组中的两个消费者同时消费
    4. 每个消费者的offset(分区中数据的偏移量),由消费者保存在主题中。如果某台消费者宕机了(挂了)重启的之后通过offset得到以前消费数据的位置

    二、消费者组

    Consumer Group(CG):消费者组,由多个consumer组成。形成一个消费者组的条件,是所有消费者的groupid相同

    特点:

    • 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费
    • 消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者
    • 如果向消费组中添加更多的消费者,超过主题分区数量,则有一部分消费者就会闲置,不会接收任何消息
    • 消费者组之间互不影响。所有的消费者都属于某个消费者 组,即消费者组是逻辑上的一个订阅者

    1.消费者组初始化流程

    coordinator:辅助实现消费者组的初始化和分区的分配
    coordinator节点选择 = groupid的hashcode值 % __consumer_offsets的分区数量
    例如: groupid的hashcode值 = 1,__consumer_offsets为50,1% 50 = 1,那么__consumer_offsets 主题的1号分区,在哪个broker上,就选择这个节点的coordinator作为这个消费者组的老大。消费者组下的所有的消费者提交offset的时候就往这个分区去提交offset

    1. 所有者的消费者都会主动的向消费者发送请求加入消费者组当中
    2. coordinator从多个消费者中选择一个消费者作为leader(老大)
    3. coordinator将之前所有消费者信息发给leader,其中包括主题情况等
    4. 消费者leader会负责制度消费方案
    5. 消费者leader把消费方案发给coordinator
    6. coordinator把消费者方案分别发给各个消费者consumer

    2.特殊情况☆☆☆☆☆

    每3秒 每个消费者都会和coordinator保持心跳(默认3s),一旦超时超过45s(session.timeout.ms=45s),该消费者会被移除,并触发再平衡;或者消费者处理消息的时间过长(max.poll.interval.ms5分钟)超过了五分钟,也会触发再平衡

    再平衡:把挂掉的消费者的任务分配给其他消费者

    3.消费者组详细消费流程

    1. 首先创建一个消费者网络连接的客户端去跟kafka集群进行交互,然后调用sendFetches方法用来抓取数据,进行初始化,需要设置以下参数
      (1)参数一:Fetch.min.bytes 每批次最小抓取数据大小 默认1字节,可以设置
      (2)参数二:fetch.max.wait.ms 每批数据未到最小抓取数据的大小的超时时间,默认为500ms
      (3)参数三:Fetch.max.bytes 每批次最
      大抓取大小,默认50M
    2. 再调用send方法发送请求,通过onSuccess回调方法,把对应的数据拉去回来,将一批一批的数据 放入消息队列queue中
    3. 消费者从队列中拉去数据,Max.poll.records一次拉取数据返回消息的最大条数,默认500条
    4. 生产者对数据进行序列号,那么消费者则对数据进行反序列化,通过拦截器,最后进行数据处理

    三、快速入门

    在 IDEA 中编写生产者和消费者程序,生产者往主题为first3中发送数据,消费者从主题为first3中拉去数据

    注意:运行程序之前,需要启动zk和kafka集群

    生产者 CustomProducer01 类

    package com.kafka.producer;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.util.Properties;
    
    /**
     * @author wangbo
     * @version 1.0
     */
    
    /**
     * 异步发送,创建不带回调函数的API代码
     */
    public class CustomProducer01 {
        public static void main(String[] args) {
            //配置
            Properties properties = new Properties();
    
            //连接集群
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092,hadoop102:9092");    //写两个节点是为了防止客户挂掉,另一个能够正常工作
    
            //指定对应的key和value的序列化类型
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
    
            // 1.创建kafka生成对象
            //  表示 k的数据类型,和v的数据类型
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
    
            // 2.发送数据
            for (int i = 0; i<5;i++){
                //第一个参数为生产者的主题名,第二个生产者生产的数据value。还有其他配置选项
                kafkaProducer.send(new ProducerRecord<String, String>("first3","kafka"));
            }
    
            // 3.关闭资源
            kafkaProducer.close();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    消费者 CustomConsumer_01 类

    package com.kafka.consumer;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.time.Duration;
    import java.util.ArrayList;
    import java.util.Properties;
    
    /**
     * @author wangbo
     * @version 1.0
     */
    
    /**
     * 1. 启动集群的zk和kafka
     * 2. 运行CustomConsumer_01消费者消费数据
     * 3. 运行CustomProducer01生产者生产数据 注意主题要对上
     */
    
    public class CustomConsumer_01 {
        public static void main(String[] args) {
            //配置
            Properties properties = new Properties();
    
            //连接集群
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092,hadoop102:9092");    //多写一个,避免其中一台挂掉,保证数据的可靠性
    
            //反序列化
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    
            //配置消费者组ID 可以任意起
            properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
    
            //1.创建一个消费者 "","hello"
            KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
    
            //2.订阅主题 first3
            ArrayList<String> topics = new ArrayList<String>();
            topics.add("first3");
            kafkaConsumer.subscribe(topics);
    
            //3.消费数据
            while (true){
                ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));//每1秒拉取一批数据
    
                //循环打印消费的数据 consumerRecords.for
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    System.out.println(consumerRecord);
                }
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
  • 相关阅读:
    汇编语言之源程序
    二、图像处理
    windows系统执行ps1(powershell)脚本文件无法识别命令的问题
    三大传统批发投资领域何去何从?
    PyTorch神经网络-激励函数
    [附源码]计算机毕业设计ssm校园二手交易平台
    混合IP-SDN环境的仿真实验
    SaltStack 常用的一些命令
    vue或webpack加载highcharts与highcharts-3d
    java计算机毕业设计宠物云寄养系统源码+系统+lw文档+mysql数据库+部署
  • 原文地址:https://blog.csdn.net/weixin_44604159/article/details/127538779