• C++实现kafka的消费者客户端


    一、Kafka 消费者的逻辑

    配置客户端参数
    创建消费者实例
    订阅主题
    拉取消息
    消费消息
    提交消息位移
    关闭消费者实例
    1. 配置消费者客户端参数。
    2. 创建相应的消费者实例。
    3. 订阅主题。
    4. 拉取消息并消费;
    5. 提交消息位移;
    6. 关闭消费者实例;

    二、Kafka 的C++ API

    2.1、RdKafka::Conf

    enum ConfType{ 
    	CONF_GLOBAL, 	// 全局配置 
    	CONF_TOPIC 		// Topic配置 
    };
    enum ConfResult{ 
    	CONF_UNKNOWN = -2, 
    	CONF_INVALID = -1, 
    	CONF_OK = 0 
    };
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    1. static Conf * create(ConfType type);
      创建配置对象。

    2. Conf::ConfResult set(const std::string &name, const std::string &value, std::string &errstr);
      设置配置对象的属性值,成功返回CONF_OK,错误时错误信息输出到errstr。

    3. Conf::ConfResult set(const std::string &name, DeliveryReportCb *dr_cb, std::string &errstr);
      设置dr_cb属性值。

    4. Conf::ConfResult set(const std::string &name, EventCb *event_cb, std::string &errstr);
      设置event_cb属性值。

    5. Conf::ConfResult set(const std::string &name, const Conf *topic_conf, std::string &errstr);
      设置用于自动订阅Topic的默认Topic配置。

    6. Conf::ConfResult set(const std::string &name, PartitionerCb *partitioner_cb, std::string &errstr);
      设置partitioner_cb属性值,配置对象必须是CONF_TOPIC类型。

    7. Conf::ConfResult set(const std::string &name, PartitionerKeyPointerCb *partitioner_kp_cb,std::string &errstr);
      设置partitioner_key_pointer_cb属性值。

    8. Conf::ConfResult set(const std::string &name, SocketCb *socket_cb, std::string &errstr);
      设置socket_cb属性值。

    9. Conf::ConfResult set(const std::string &name, OpenCb *open_cb, std::string &errstr);
      设置open_cb属性值。

    10. Conf::ConfResult set(const std::string &name, RebalanceCb *rebalance_cb, std::string &errstr);
      设置rebalance_cb属性值。

    11. Conf::ConfResult set(const std::string &name, OffsetCommitCb *offset_commit_cb, std::string &errstr);
      设置offset_commit_cb属性值。

    12. Conf::ConfResult get(const std::string &name, std::string &value) const;
      查询单条属性配置值。

    2.2、RdKafka::Event

    enum Type{ 
    	EVENT_ERROR, //错误条件事件 
    	EVENT_STATS, // Json文档统计事件 
    	EVENT_LOG, // Log消息事件 
    	EVENT_THROTTLE // 来自Broker的throttle级信号事件 
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    1. virtual Type type() const =0;
      返回事件类型。
    2. virtual ErrorCode err() const =0;
      返回事件错误代码。
    3. virtual Severity severity() const =0;
      返回log严重级别。
    4. virtual std::string fac() const =0;
      返回log基础字符串。
    5. virtual std::string str () const =0;
      返回Log消息字符串。
    6. virtual int throttle_time() const =0;
      返回throttle时间。
    7. virtual std::string broker_name() const =0;
      返回Broker名称。
    8. virtual int broker_id() const =0;
      返回Broker ID。

    2.3、RdKafka::EventCb

    事件是从RdKafka传递错误、统计信息、日志等消息到应用程序的通用接口。

    virtual void event_cb(Event &event)=0; //  事件回调函数
    
    • 1

    C++封装示例:

    class ConsumerEventCb : public RdKafka::EventCb
    {
    public:
        void event_cb (RdKafka::Event &event)
        {
            switch (event.type())
            {
            case RdKafka::Event::EVENT_ERROR:
                if (event.fatal())
                {
                    std::cerr << "FATAL ";
                }
                std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " <<
                          event.str() << std::endl;
                break;
    
            case RdKafka::Event::EVENT_STATS:
                std::cerr << "\"STATS\": " << event.str() << std::endl;
                break;
    
            case RdKafka::Event::EVENT_LOG:
                fprintf(stderr, "LOG-%i-%s: %s\n",
                        event.severity(), event.fac().c_str(), event.str().c_str());
                break;
    
            case RdKafka::Event::EVENT_THROTTLE:
                std::cerr << "THROTTLED: " << event.throttle_time() << "ms by " <<
                          event.broker_name() << " id " << (int)event.broker_id() << std::endl;
                break;
    
            default:
                std::cerr << "EVENT " << event.type() <<
                          " (" << RdKafka::err2str(event.err()) << "): " <<
                          event.str() << std::endl;
                break;
            }
        }
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    2.4、RdKafka::TopicPartition

    1. static TopicPartition * create(const std::string &topic, int partition);
      创建一个TopicPartition对象。
    2. static TopicPartition *create (const std::string &topic, int partition,int64_t offset);
      创建TopicPartition对象。
    3. static void destroy (std::vector &partitions);
      销毁所有TopicPartition对象。
    4. const std::string & topic () const;
      返回Topic名称。
    5. int partition ();
      返回分区号。
    6. int64_t offset();
      返回位移。
    7. void set_offset(int64_t offset);
      设置位移。
    8. ErrorCode err();
      返回错误码。

    2.5、RdKafka::RebalanceCb

    virtual void rebalance_cb(RdKafka::KafkaConsumer *consumer, RdKafka::ErrorCode err, std::vector< TopicPartition * > &partitions)=0;
    
    • 1

    用于RdKafka::KafkaConsunmer的组再平衡回调函数;注册rebalance_cb回调函数会关闭rdkafka的自动分区赋值和再分配并替换应用程序的rebalance_cb回调函数。

    再平衡回调函数负责对基于RdKafka::ERR_ASSIGN_PARTITIONS和RdKafka::ERR_REVOKE_PARTITIONS事件更新rdkafka的分区分配,也能处理任意前两者错误除外其它再平衡失败错误。对于RdKafka::ERR_ASSIGN_PARTITIONS和RdKafka::ERR_REVOKE_PARTITIONS事件之外的其它再平衡失败错误,必须调用unassign()同步状态。
    没有再平衡回调函数,rdkafka也能自动完成再平衡过程,但注册一个再平衡回调函数可以使应用程序在执行其它操作时拥有更大的灵活性,例如从指定位置获取位移或手动提交位移。

    C++封装示例:

    class ConsumerRebalanceCb : public RdKafka::RebalanceCb
    {
    private:
        static void printTopicPartition (const std::vector<RdKafka::TopicPartition*>&partitions)        // 打印当前获取的分区
        {
            for (unsigned int i = 0 ; i < partitions.size() ; i++)
                std::cerr << partitions[i]->topic() <<
                          "[" << partitions[i]->partition() << "], ";
            std::cerr << "\n";
        }
    
    public:
        void rebalance_cb (RdKafka::KafkaConsumer *consumer,
                           RdKafka::ErrorCode err,
                           std::vector<RdKafka::TopicPartition*> &partitions)
        {
            std::cerr << "RebalanceCb: " << RdKafka::err2str(err) << ": ";
            printTopicPartition(partitions);
            if (err == RdKafka::ERR__ASSIGN_PARTITIONS)
            {
                consumer->assign(partitions);
                partition_count = (int)partitions.size();
            }
            else
            {
                consumer->unassign();
                partition_count = 0;
            }
        }
    private:
        int partition_count;
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    2.6、RdKafka::Message

    Message表示一条消费或生产的消息,或是事件。

    1. std::string errstr() const;
      如果消息是一条错误事件,返回错误字符串,否则返回空字符串。
    2. ErrorCode err() const;
      如果消息是一条错误事件,返回错误代码,否则返回0。
    3. Topic * topic() const;
      返回消息的Topic对象。如果消息的Topic对象没有显示使用RdKafka::Topic::create()创建,需要使用topic_name函数。
    4. std::string topic_name() const;
      返回消息的Topic名称。
    5. int32_t partition() const;
      如果分区可用,返回分区号。
    6. void * payload() const;
      返回消息数据。
    7. size_t len() const;
      返回消息数据的长度。
    8. const std::string * key() const;
      返回字符串类型的消息key。
    9. const void * key_pointer() const;
      返回void类型的消息key。
    10. size_t key_len() const;
      返回消息key的二进制长度。
    11. int64_t offset () const;
      返回消息或错误的位移。
    12. void * msg_opaque() const;
      返回RdKafka::Producer::produce()提供的msg_opaque。
    13. virtual MessageTimestamp timestamp() const = 0;
      返回消息时间戳。
    14. virtual int64_t latency() const = 0;
      返回produce函数内生产消息的微秒级时间延迟,如果延迟不可用,返回-1。
    15. virtual struct rd_kafka_message_s *c_ptr () = 0;
      返回底层数据结构的C rd_kafka_message_t句柄。
    16. virtual Status status () const = 0;
      返回消息在Topic Log的持久化状态。
    17. virtual RdKafka::Headers *headers () = 0;
      返回消息头。
    18. virtual RdKafka::Headers *headers (RdKafka::ErrorCode *err) = 0;
      返回消息头,错误信息会输出到err。

    2.7、RdKafka::KafkaConsumer(核心)

    KafkaConsumer是高级API,要求Kafka 0.9.0以上版本,当前支持range和roundrobin分区分配策略。

    1. static KafkaConsumer * create(Conf *conf, std::string &errstr);
      创建KafkaConsumer对象,conf对象必须配置Consumer要加入的消费者组。使用KafkaConsumer::close()进行关闭。
    2. ErrorCode assignment(std::vector< RdKafka::TopicPartition * > &partitions);
      返回由RdKafka::KafkaConsumer::assign() 设置的当前分区。
    3. ErrorCode subscription(std::vector< std::string > &topics);
      返回由RdKafka::KafkaConsumer::subscribe() 设置的当前订阅Topic。
    4. ErrorCode subscribe(const std::vector< std::string > &topics);
      更新订阅Topic分区。
    5. ErrorCode unsubscribe();
      将当前订阅Topic取消订阅分区。
    6. ErrorCode assign(const std::vector< TopicPartition * > &partitions);
      将分配分区更新为partitions。
    7. ErrorCode unassign();
      停止消费并删除当前分配的分区。
    8. Message * consume(int timeout_ms);
      消费消息或获取错误事件,触发回调函数,会自动调用注册的回调函数,包括RebalanceCb、EventCb、OffsetCommitCb等。需要使用delete释放消息。应用程序必须确保consume在指定时间间隔内调用,为了执行等待调用的回调函数,即使没有消息。当RebalanceCb被注册时,在需要调用和适当处理内部Consumer同步状态时,确保consume在指定时间间隔内调用极为重要。应用程序必须禁止对KafkaConsumer对象调用poll函数。
      如果RdKafka::Message::err()是ERR_NO_ERROR,则返回正常的消息;如果RdKafka::Message::err()是ERR_NO_ERRO,返回错误事件;如果RdKafka::Message::err()是ERR_TIMED_OUT,则超时。
    9. ErrorCode commitSync();
      提交当前分配分区的位移,同步操作,会阻塞直到位移被提交或提交失败。如果注册了RdKafka::OffsetCommitCb回调函数,其会在KafkaConsumer::consume()函数内调用并提交位移。
    10. ErrorCode commitAsync();
      异步提交位移。
    11. ErrorCode commitSync(Message *message);
      基于消息对单个topic+partition对象同步提交位移。
    12. virtual ErrorCode commitSync (std::vector &offsets) = 0;
      对指定多个TopicPartition同步提交位移。
    13. ErrorCode commitAsync(Message *message);
      基于消息对单个TopicPartition异步提交位移。
    14. virtual ErrorCode commitAsync (const std::vector &offsets) = 0;
      对多个TopicPartition异步提交位移。
    15. ErrorCode close();
      正常关闭,会阻塞直到四个操作完成(触发避免当前分区分配的局部再平衡,停止当前赋值消费,提交位移,离开分组)
    16. virtual ConsumerGroupMetadata *groupMetadata () = 0;
      返回本Consumer实例的Consumer Group的元数据。
    17. ErrorCode position (std::vector &partitions)
      获取TopicPartition对象中当前位移,会别填充TopicPartition对象的offset字段。
    18. ErrorCode seek (const TopicPartition &partition, int timeout_ms)
      定位TopicPartition的Consumer到位移。timeout_ms为0,会开始Seek并立即返回;timeout_ms非0,Seek会等待timeout_ms时间。
    19. ErrorCode offsets_store (std::vector &offsets)
      为TopicPartition存储位移,位移会在auto.commit.interval.ms时提交或是被手动提交。enable.auto.offset.store属性必须设置为fasle。

    三、Kafka 消费者客户端开发

    3.1、必要的参数配置(bootstrap.servers)

    在创建消费者的时候以下以下三个选项是必选的:

    1. bootstrap.servers:指定 broker 的地址清单,清单里不需要包含所有的 broker 地址,生产者会从给定的 broker 里查找 broker 的信息。不过建议至少要提供两个 broker 的信息作为容错。
    2. group.id:consumer group 是 kafka 提供的可扩展且具有容错性的消费者机制。既然是一个组,那么组内必然可以有多个消费者或消费者实例(consumer instance),它们共享一个公共的 ID,即group ID。组内的所有消费者协调在一起来消费订阅主题(subscribed topics)的所有分区(partition)。
    3. auto.offset.reset:这个参数是针对新的 groupid 中的消费者而言的,当有新 groupid 的消费者来消费指定的 topic时,对于该参数的配置,会有不同的语义。
    参数描述
    none如果没有为消费者找到先前的offset的值,即没有自动维护偏移量,也没有手动维护偏移量,则抛出异常。
    earliest在各分区下有提交的offset时:从offset处开始消费。在各分区下无提交的offset时:从头开始消费
    latest在各分区下有提交的offset时:从offset处开始消费。在各分区下无提交的offset时:从最新的数据开始消费

    配置示例:

    std::string errorStr;
    RdKafka::Conf::ConfResult errorCode;
    m_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    
    m_event_cb = new ConsumerEventCb;
    errorCode = m_config->set("event_cb", m_event_cb, errorStr);
    if(errorCode != RdKafka::Conf::CONF_OK)
    {
        std::cout << "Conf set failed: " << errorStr << std::endl;
    }
    
    m_rebalance_cb = new ConsumerRebalanceCb;
    errorCode = m_config->set("rebalance_cb", m_rebalance_cb, errorStr);
    if(errorCode != RdKafka::Conf::CONF_OK)
    {
        std::cout << "Conf set failed: " << errorStr << std::endl;
    }
    
    errorCode = m_config->set("enable.partition.eof", "false", errorStr);
    if(errorCode != RdKafka::Conf::CONF_OK)
    {
        std::cout << "Conf set failed: " << errorStr << std::endl;
    }
    
    errorCode = m_config->set("group.id", m_groupID, errorStr);
    if(errorCode != RdKafka::Conf::CONF_OK)
    {
        std::cout << "Conf set failed: " << errorStr << std::endl;
    }
    errorCode = m_config->set("bootstrap.servers", m_brokers, errorStr);
    if(errorCode != RdKafka::Conf::CONF_OK)
    {
        std::cout << "Conf set failed: " << errorStr << std::endl;
    }
    errorCode = m_config->set("max.partition.fetch.bytes", "1024000", errorStr);
    if(errorCode != RdKafka::Conf::CONF_OK)
    {
        std::cout << "Conf set failed: " << errorStr << std::endl;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    3.2、订阅主题和分区

    订阅主题,可以订阅多个。

    ErrorCode subscribe (const std::vectorstd::string &topics);
    
    • 1

    也可以通过正则表达式方式一次订阅多个主题,比如 “topic-.*”, 则前缀为“topic-.”的主题都被订阅。

    示例:

    m_topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
    // 获取最新的消息数据
    errorCode = m_topicConfig->set("auto.offset.reset", "latest", errorStr);
    if(errorCode != RdKafka::Conf::CONF_OK)
    {
        std::cout << "Topic Conf set failed: " << errorStr << std::endl;
    }
    errorCode = m_config->set("default_topic_conf", m_topicConfig, errorStr);
    if(errorCode != RdKafka::Conf::CONF_OK)
    {
        std::cout << "Conf set failed: " << errorStr << std::endl;
    }
    m_consumer = RdKafka::KafkaConsumer::create(m_config, errorStr);
    if(m_consumer == NULL)
    {
        std::cout << "Create KafkaConsumer failed: " << errorStr << std::endl;
    }
    std::cout << "Created consumer " << m_consumer->name() << std::endl;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    3.3、消息消费

    void msg_consume(RdKafka::Message* msg, void* opaque)
    {
        switch (msg->err())
        {
        case RdKafka::ERR__TIMED_OUT:
            std::cerr << "Consumer error: " << msg->errstr() << std::endl;  // 超时
            break;
        case RdKafka::ERR_NO_ERROR:     // 有消息进来
            std::cout << " Message in-> topic:" << msg->topic_name() << "partition:["
                      << msg->partition() << "] at offset " << msg->offset()
                      << " key: " << msg->key() << " payload: "
                      << (char*)msg->payload() << std::endl;
            break;
        default:
            std::cerr << "Consumer error: " << msg->errstr() << std::endl;
            break;
        }
    }
    
    void KafkaConsumer::pullMessage()
    {
        // 订阅Topic
        RdKafka::ErrorCode errorCode = m_consumer->subscribe(m_topicVector);
        if (errorCode != RdKafka::ERR_NO_ERROR)
        {
            std::cout << "subscribe failed: " << RdKafka::err2str(errorCode) << std::endl;
        }
        // 消费消息
        while(true)
        {
            RdKafka::Message *msg = m_consumer->consume(1000);
            msg_consume(msg, NULL);
            delete msg;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    3.4、完整示例代码

    KafkaConsumer.h

    #ifndef KAFKACONSUMER_H
    #define KAFKACONSUMER_H
    
    #pragma once
    
    #include 
    #include 
    #include 
    #include 
    #include "rdkafkacpp.h"
    
    class KafkaConsumer
    {
    public:/**
         * @brief KafkaConsumer
         * @param brokers
         * @param groupID
         * @param topics
         * @param partition
         */
        explicit KafkaConsumer(const std::string& brokers, const std::string& groupID,
                               const std::vector<std::string>& topics, int partition);
        void pullMessage();
        ~KafkaConsumer();
    protected:
        std::string m_brokers;
        std::string m_groupID;
        std::vector<std::string> m_topicVector;
        int m_partition;
        RdKafka::Conf* m_config;
        RdKafka::Conf* m_topicConfig;
        RdKafka::KafkaConsumer* m_consumer;
        RdKafka::EventCb* m_event_cb;
        RdKafka::RebalanceCb* m_rebalance_cb;
    };
    
    #endif // KAFKACONSUMER_H
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    KafkaConsumer.cpp

    #include "KafkaConsumer.h"
    
    class ConsumerEventCb : public RdKafka::EventCb
    {
    public:
        void event_cb (RdKafka::Event &event)
        {
            switch (event.type())
            {
            case RdKafka::Event::EVENT_ERROR:
                if (event.fatal())
                {
                    std::cerr << "FATAL ";
                }
                std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " <<
                          event.str() << std::endl;
                break;
    
            case RdKafka::Event::EVENT_STATS:
                std::cerr << "\"STATS\": " << event.str() << std::endl;
                break;
    
            case RdKafka::Event::EVENT_LOG:
                fprintf(stderr, "LOG-%i-%s: %s\n",
                        event.severity(), event.fac().c_str(), event.str().c_str());
                break;
    
            case RdKafka::Event::EVENT_THROTTLE:
                std::cerr << "THROTTLED: " << event.throttle_time() << "ms by " <<
                          event.broker_name() << " id " << (int)event.broker_id() << std::endl;
                break;
    
            default:
                std::cerr << "EVENT " << event.type() <<
                          " (" << RdKafka::err2str(event.err()) << "): " <<
                          event.str() << std::endl;
                break;
            }
        }
    };
    
    class ConsumerRebalanceCb : public RdKafka::RebalanceCb
    {
    private:
        static void printTopicPartition (const std::vector<RdKafka::TopicPartition*>&partitions)        // 打印当前获取的分区
        {
            for (unsigned int i = 0 ; i < partitions.size() ; i++)
                std::cerr << partitions[i]->topic() <<
                          "[" << partitions[i]->partition() << "], ";
            std::cerr << "\n";
        }
    
    public:
        void rebalance_cb (RdKafka::KafkaConsumer *consumer,
                           RdKafka::ErrorCode err,
                           std::vector<RdKafka::TopicPartition*> &partitions)
        {
            std::cerr << "RebalanceCb: " << RdKafka::err2str(err) << ": ";
            printTopicPartition(partitions);
            if (err == RdKafka::ERR__ASSIGN_PARTITIONS)
            {
                consumer->assign(partitions);
                partition_count = (int)partitions.size();
            }
            else
            {
                consumer->unassign();
                partition_count = 0;
            }
        }
    private:
        int partition_count;
    };
    
    KafkaConsumer::KafkaConsumer(const std::string& brokers, const std::string& groupID,
                                 const std::vector<std::string>& topics, int partition)
    {
        m_brokers = brokers;
        m_groupID = groupID;
        m_topicVector = topics;
        m_partition = partition;
    
        std::string errorStr;
        RdKafka::Conf::ConfResult errorCode;
        m_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    
        m_event_cb = new ConsumerEventCb;
        errorCode = m_config->set("event_cb", m_event_cb, errorStr);
        if(errorCode != RdKafka::Conf::CONF_OK)
        {
            std::cout << "Conf set failed: " << errorStr << std::endl;
        }
    
        m_rebalance_cb = new ConsumerRebalanceCb;
        errorCode = m_config->set("rebalance_cb", m_rebalance_cb, errorStr);
        if(errorCode != RdKafka::Conf::CONF_OK)
        {
            std::cout << "Conf set failed: " << errorStr << std::endl;
        }
    
        errorCode = m_config->set("enable.partition.eof", "false", errorStr);
        if(errorCode != RdKafka::Conf::CONF_OK)
        {
            std::cout << "Conf set failed: " << errorStr << std::endl;
        }
    
        errorCode = m_config->set("group.id", m_groupID, errorStr);
        if(errorCode != RdKafka::Conf::CONF_OK)
        {
            std::cout << "Conf set failed: " << errorStr << std::endl;
        }
        errorCode = m_config->set("bootstrap.servers", m_brokers, errorStr);
        if(errorCode != RdKafka::Conf::CONF_OK)
        {
            std::cout << "Conf set failed: " << errorStr << std::endl;
        }
        errorCode = m_config->set("max.partition.fetch.bytes", "1024000", errorStr);
        if(errorCode != RdKafka::Conf::CONF_OK)
        {
            std::cout << "Conf set failed: " << errorStr << std::endl;
        }
    
        // partition.assignment.strategy  range,roundrobin
    
        m_topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
        // 获取最新的消息数据
        errorCode = m_topicConfig->set("auto.offset.reset", "latest", errorStr);
        if(errorCode != RdKafka::Conf::CONF_OK)
        {
            std::cout << "Topic Conf set failed: " << errorStr << std::endl;
        }
        errorCode = m_config->set("default_topic_conf", m_topicConfig, errorStr);
        if(errorCode != RdKafka::Conf::CONF_OK)
        {
            std::cout << "Conf set failed: " << errorStr << std::endl;
        }
        m_consumer = RdKafka::KafkaConsumer::create(m_config, errorStr);
        if(m_consumer == NULL)
        {
            std::cout << "Create KafkaConsumer failed: " << errorStr << std::endl;
        }
        std::cout << "Created consumer " << m_consumer->name() << std::endl;
    }
    
    void msg_consume(RdKafka::Message* msg, void* opaque)
    {
        switch (msg->err())
        {
        case RdKafka::ERR__TIMED_OUT:
            std::cerr << "Consumer error: " << msg->errstr() << std::endl;  // 超时
            break;
        case RdKafka::ERR_NO_ERROR:     // 有消息进来
            std::cout << " Message in-> topic:" << msg->topic_name() << "partition:["
                      << msg->partition() << "] at offset " << msg->offset()
                      << " key: " << msg->key() << " payload: "
                      << (char*)msg->payload() << std::endl;
            break;
        default:
            std::cerr << "Consumer error: " << msg->errstr() << std::endl;
            break;
        }
    }
    
    void KafkaConsumer::pullMessage()
    {
        // 订阅Topic
        RdKafka::ErrorCode errorCode = m_consumer->subscribe(m_topicVector);
        if (errorCode != RdKafka::ERR_NO_ERROR)
        {
            std::cout << "subscribe failed: " << RdKafka::err2str(errorCode) << std::endl;
        }
        // 消费消息
        while(true)
        {
            RdKafka::Message *msg = m_consumer->consume(1000);
            msg_consume(msg, NULL);
            delete msg;
        }
    }
    
    KafkaConsumer::~KafkaConsumer()
    {
        m_consumer->close();
        delete m_config;
        delete m_topicConfig;
        delete m_consumer;
        delete m_event_cb;
        delete m_rebalance_cb;
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191

    main.cpp使用:

    #include "KafkaConsumer.h"
    
    int main(int argc, char* argv[])
    {
        std::string brokers = "127.0.0.1:9092";
        std::vector<std::string> topics;
        topics.push_back("test");
        // topics.push_back("test2");
        std::string group = "testGroup";
        if(argc >= 2) {
            group = argv[1];
        }
        std::cout << "group " << group << std::endl;
    
        KafkaConsumer consumer(brokers, group, topics, RdKafka::Topic::OFFSET_BEGINNING);
        consumer.pullMessage();
    
        RdKafka::wait_destroyed(5000);
        return 0;
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    CMakeLists.txt

    cmake_minimum_required(VERSION 2.8)
    
    project(KafkaConsumer)
    
    set(CMAKE_CXX_STANDARD 11)
    set(CMAKE_CXX_COMPILER "g++")
    set(CMAKE_CXX_FLAGS "-std=c++11 ${CMAKE_CXX_FLAGS}")
    set(CMAKE_INCLUDE_CURRENT_DIR ON)
    
    # Kafka头文件路径
    include_directories(/usr/local/include/librdkafka)
    # Kafka库路径
    link_directories(/usr/local/lib)
    
    aux_source_directory(. SOURCE)
    
    #add_executable(${PROJECT_NAME} ${SOURCE})
    
    #TARGET_LINK_LIBRARIES(${PROJECT_NAME} rdkafka++)
    
    ADD_EXECUTABLE(${PROJECT_NAME} main.cpp KafkaConsumer.cpp) 
    TARGET_LINK_LIBRARIES(${PROJECT_NAME} rdkafka++)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    编译:

    mkdir build
    cd build
    cmake ..
    make
    
    • 1
    • 2
    • 3
    • 4

    四、位移提交

    Consumer 需要向 Kafka 汇报自己的位移数据,这个汇报过程被称为提交位移(Committing Offsets)。因为 Consumer 能够同时消费多个分区的数据,所以位移的提交实际上是在分区粒度上进行的,即 Consumer 需要为分配给它的每个分区提交各自的位移数据。
    提交位移主要是为了表征 Consumer 的消费进度,这样当 Consumer 发生故障重启之后,就能够从Kafka 中读取之前提交的位移值,然后从相应的位移处继续消费,从而避免整个消费过程重来一遍。
    从用户的角度来说,位移提交分为自动提交和手动提交;从 Consumer 端的角度来说,位移提交分为同步提交和异步提交。

    4.1、自动提交

    自动提交默认全部为同步提交。
    自动提交相关参数:

    参数描述
    enable.auto.commit (bool)如果为True,将自动定时提交消费者offset。默认为True。
    auto.commit.interval.ms(int)自动提交offset之间的间隔毫秒数。如果enable_auto_commit为true,默认值为 5000。

    当设置 enable.auto.commit 为 true,Kafka 会保证在开始调用 poll 方法时,提交上次 poll 返回的所有消息。从顺序上来说,poll 方法的逻辑是先提交上一批消息的位移,再处理下一批消息,因此它能保证不出现消费丢失的情况。

    但自动提交位移的一个问题在于,它可能会出现重复消费。

    4.2、手动提交

    手动提交可以自己选择是同步提交(commitSync)还是异步提交(commitAsync )。commitAsync 不能够替代 commitSync。commitAsync 的问题在于,出现问题时它不会自动重试,因为它是异步操作。

    手动提交,需要将 commitSync 和 commitAsync 组合使用才能到达最理想的效果。可以利用 commitSync 的自动重试来规避那些瞬时错误,同时不希望程序总处于阻塞状态,影响 TPS。

    同时使用 commitSync() 和 commitAsync():

    1. 对于常规性、阶段性的手动提交,我们调用 commitAsync() 避免程序阻塞,而在 Consumer 要关闭前,我们调用 commitSync() 方法执行同步阻塞式的位移提交,以确保Consumer 关闭前能够保存正确的位移数据。
    2. 将两者结合后,既实现了异步无阻塞式的位移管理,也确保了 Consumer 位移的正确性.

    4.3、提交API

    ErrorCode commitSync();
    
    • 1

    提交当前分配分区的位移,同步操作,会阻塞直到位移被提交或提交失败。如果注册了RdKafka::OffsetCommitCb回调函数,其会在KafkaConsumer::consume()函数内调用并提交位移。

    ErrorCode commitAsync();
    
    • 1

    异步提交位移

    五、消费Rebalance机制

    当kafka遇到如下四种情况的时候,kafka会触发Rebalance机制:

    1. 消费组成员发生了变更,比如有新的消费者加入了消费组组或者有消费者宕机。
    2. 消费者无法在指定的时间之内完成消息的消费。
    3. 消费组订阅的Topic发生了变化
    4. 订阅的Topic的partition发生了变化

    总结

    RdKafka提供了两种消费者API,低级API的Consumer和高级API的KafkaConsumer。
    Kafka Consumer使用流程:
    (1)创建Kafka配置实例。
    (2)创建Topic配置实例。
    (3)设置Kafka配置实例Broker属性。
    (4)设置Topic配置实例属性。
    (5)注册回调函数。
    (6)创建Kafka Consumer客户端实例。
    (7)创建Topic实例。
    (8)订阅主题。
    (9)消费消息。
    (10)关闭消费者实例。
    (11)销毁释放RdKafka资源。

    在这里插入图片描述

  • 相关阅读:
    开发轻量级REST API样板 基于Node.js、MongoDB 通过Mongoose驱动
    kafka集群与redis集群的概念对应关系
    嵌入式软件开发工程师未来的薪资待遇是什么情况
    基础运维(一)YUM仓库
    『现学现忘』Docker基础 — 32、通过DockerFile的方式挂载数据卷
    模板 vs. 硬编码 HTML
    应用层协议——DNS、DHCP、HTTP、FTP
    shiro的简单介绍
    借助各大模型的优点生成原创视频(真人人声)Plus
    Charles通过Rewrite越过OPTIONS请求拦截
  • 原文地址:https://blog.csdn.net/Long_xu/article/details/128114358