enum ConfType{
CONF_GLOBAL, // 全局配置
CONF_TOPIC // Topic配置
};
enum ConfResult{
CONF_UNKNOWN = -2,
CONF_INVALID = -1,
CONF_OK = 0
};
static Conf * create(ConfType type);
创建配置对象。
Conf::ConfResult set(const std::string &name, const std::string &value, std::string &errstr);
设置配置对象的属性值,成功返回CONF_OK,错误时错误信息输出到errstr。
Conf::ConfResult set(const std::string &name, DeliveryReportCb *dr_cb, std::string &errstr);
设置dr_cb属性值。
Conf::ConfResult set(const std::string &name, EventCb *event_cb, std::string &errstr);
设置event_cb属性值。
Conf::ConfResult set(const std::string &name, const Conf *topic_conf, std::string &errstr);
设置用于自动订阅Topic的默认Topic配置。
Conf::ConfResult set(const std::string &name, PartitionerCb *partitioner_cb, std::string &errstr);
设置partitioner_cb属性值,配置对象必须是CONF_TOPIC类型。
Conf::ConfResult set(const std::string &name, PartitionerKeyPointerCb *partitioner_kp_cb,std::string &errstr);
设置partitioner_key_pointer_cb属性值。
Conf::ConfResult set(const std::string &name, SocketCb *socket_cb, std::string &errstr);
设置socket_cb属性值。
Conf::ConfResult set(const std::string &name, OpenCb *open_cb, std::string &errstr);
设置open_cb属性值。
Conf::ConfResult set(const std::string &name, RebalanceCb *rebalance_cb, std::string &errstr);
设置rebalance_cb属性值。
Conf::ConfResult set(const std::string &name, OffsetCommitCb *offset_commit_cb, std::string &errstr);
设置offset_commit_cb属性值。
Conf::ConfResult get(const std::string &name, std::string &value) const;
查询单条属性配置值。
enum Type{
EVENT_ERROR, //错误条件事件
EVENT_STATS, // Json文档统计事件
EVENT_LOG, // Log消息事件
EVENT_THROTTLE // 来自Broker的throttle级信号事件
};
事件是从RdKafka传递错误、统计信息、日志等消息到应用程序的通用接口。
virtual void event_cb(Event &event)=0; // 事件回调函数
C++封装示例:
class ConsumerEventCb : public RdKafka::EventCb
{
public:
void event_cb (RdKafka::Event &event)
{
switch (event.type())
{
case RdKafka::Event::EVENT_ERROR:
if (event.fatal())
{
std::cerr << "FATAL ";
}
std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " <<
event.str() << std::endl;
break;
case RdKafka::Event::EVENT_STATS:
std::cerr << "\"STATS\": " << event.str() << std::endl;
break;
case RdKafka::Event::EVENT_LOG:
fprintf(stderr, "LOG-%i-%s: %s\n",
event.severity(), event.fac().c_str(), event.str().c_str());
break;
case RdKafka::Event::EVENT_THROTTLE:
std::cerr << "THROTTLED: " << event.throttle_time() << "ms by " <<
event.broker_name() << " id " << (int)event.broker_id() << std::endl;
break;
default:
std::cerr << "EVENT " << event.type() <<
" (" << RdKafka::err2str(event.err()) << "): " <<
event.str() << std::endl;
break;
}
}
};
virtual void rebalance_cb(RdKafka::KafkaConsumer *consumer, RdKafka::ErrorCode err, std::vector< TopicPartition * > &partitions)=0;
用于RdKafka::KafkaConsunmer的组再平衡回调函数;注册rebalance_cb回调函数会关闭rdkafka的自动分区赋值和再分配并替换应用程序的rebalance_cb回调函数。
再平衡回调函数负责对基于RdKafka::ERR_ASSIGN_PARTITIONS和RdKafka::ERR_REVOKE_PARTITIONS事件更新rdkafka的分区分配,也能处理任意前两者错误除外其它再平衡失败错误。对于RdKafka::ERR_ASSIGN_PARTITIONS和RdKafka::ERR_REVOKE_PARTITIONS事件之外的其它再平衡失败错误,必须调用unassign()同步状态。
没有再平衡回调函数,rdkafka也能自动完成再平衡过程,但注册一个再平衡回调函数可以使应用程序在执行其它操作时拥有更大的灵活性,例如从指定位置获取位移或手动提交位移。
C++封装示例:
class ConsumerRebalanceCb : public RdKafka::RebalanceCb
{
private:
static void printTopicPartition (const std::vector<RdKafka::TopicPartition*>&partitions) // 打印当前获取的分区
{
for (unsigned int i = 0 ; i < partitions.size() ; i++)
std::cerr << partitions[i]->topic() <<
"[" << partitions[i]->partition() << "], ";
std::cerr << "\n";
}
public:
void rebalance_cb (RdKafka::KafkaConsumer *consumer,
RdKafka::ErrorCode err,
std::vector<RdKafka::TopicPartition*> &partitions)
{
std::cerr << "RebalanceCb: " << RdKafka::err2str(err) << ": ";
printTopicPartition(partitions);
if (err == RdKafka::ERR__ASSIGN_PARTITIONS)
{
consumer->assign(partitions);
partition_count = (int)partitions.size();
}
else
{
consumer->unassign();
partition_count = 0;
}
}
private:
int partition_count;
};
Message表示一条消费或生产的消息,或是事件。
KafkaConsumer是高级API,要求Kafka 0.9.0以上版本,当前支持range和roundrobin分区分配策略。
在创建消费者的时候以下以下三个选项是必选的:
| 参数 | 描述 |
|---|---|
| none | 如果没有为消费者找到先前的offset的值,即没有自动维护偏移量,也没有手动维护偏移量,则抛出异常。 |
| earliest | 在各分区下有提交的offset时:从offset处开始消费。在各分区下无提交的offset时:从头开始消费 |
| latest | 在各分区下有提交的offset时:从offset处开始消费。在各分区下无提交的offset时:从最新的数据开始消费 |
配置示例:
std::string errorStr;
RdKafka::Conf::ConfResult errorCode;
m_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
m_event_cb = new ConsumerEventCb;
errorCode = m_config->set("event_cb", m_event_cb, errorStr);
if(errorCode != RdKafka::Conf::CONF_OK)
{
std::cout << "Conf set failed: " << errorStr << std::endl;
}
m_rebalance_cb = new ConsumerRebalanceCb;
errorCode = m_config->set("rebalance_cb", m_rebalance_cb, errorStr);
if(errorCode != RdKafka::Conf::CONF_OK)
{
std::cout << "Conf set failed: " << errorStr << std::endl;
}
errorCode = m_config->set("enable.partition.eof", "false", errorStr);
if(errorCode != RdKafka::Conf::CONF_OK)
{
std::cout << "Conf set failed: " << errorStr << std::endl;
}
errorCode = m_config->set("group.id", m_groupID, errorStr);
if(errorCode != RdKafka::Conf::CONF_OK)
{
std::cout << "Conf set failed: " << errorStr << std::endl;
}
errorCode = m_config->set("bootstrap.servers", m_brokers, errorStr);
if(errorCode != RdKafka::Conf::CONF_OK)
{
std::cout << "Conf set failed: " << errorStr << std::endl;
}
errorCode = m_config->set("max.partition.fetch.bytes", "1024000", errorStr);
if(errorCode != RdKafka::Conf::CONF_OK)
{
std::cout << "Conf set failed: " << errorStr << std::endl;
}
订阅主题,可以订阅多个。
ErrorCode subscribe (const std::vectorstd::string &topics);
也可以通过正则表达式方式一次订阅多个主题,比如 “topic-.*”, 则前缀为“topic-.”的主题都被订阅。
示例:
m_topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
// 获取最新的消息数据
errorCode = m_topicConfig->set("auto.offset.reset", "latest", errorStr);
if(errorCode != RdKafka::Conf::CONF_OK)
{
std::cout << "Topic Conf set failed: " << errorStr << std::endl;
}
errorCode = m_config->set("default_topic_conf", m_topicConfig, errorStr);
if(errorCode != RdKafka::Conf::CONF_OK)
{
std::cout << "Conf set failed: " << errorStr << std::endl;
}
m_consumer = RdKafka::KafkaConsumer::create(m_config, errorStr);
if(m_consumer == NULL)
{
std::cout << "Create KafkaConsumer failed: " << errorStr << std::endl;
}
std::cout << "Created consumer " << m_consumer->name() << std::endl;
void msg_consume(RdKafka::Message* msg, void* opaque)
{
switch (msg->err())
{
case RdKafka::ERR__TIMED_OUT:
std::cerr << "Consumer error: " << msg->errstr() << std::endl; // 超时
break;
case RdKafka::ERR_NO_ERROR: // 有消息进来
std::cout << " Message in-> topic:" << msg->topic_name() << "partition:["
<< msg->partition() << "] at offset " << msg->offset()
<< " key: " << msg->key() << " payload: "
<< (char*)msg->payload() << std::endl;
break;
default:
std::cerr << "Consumer error: " << msg->errstr() << std::endl;
break;
}
}
void KafkaConsumer::pullMessage()
{
// 订阅Topic
RdKafka::ErrorCode errorCode = m_consumer->subscribe(m_topicVector);
if (errorCode != RdKafka::ERR_NO_ERROR)
{
std::cout << "subscribe failed: " << RdKafka::err2str(errorCode) << std::endl;
}
// 消费消息
while(true)
{
RdKafka::Message *msg = m_consumer->consume(1000);
msg_consume(msg, NULL);
delete msg;
}
}
KafkaConsumer.h
#ifndef KAFKACONSUMER_H
#define KAFKACONSUMER_H
#pragma once
#include
#include
#include
#include
#include "rdkafkacpp.h"
class KafkaConsumer
{
public:/**
* @brief KafkaConsumer
* @param brokers
* @param groupID
* @param topics
* @param partition
*/
explicit KafkaConsumer(const std::string& brokers, const std::string& groupID,
const std::vector<std::string>& topics, int partition);
void pullMessage();
~KafkaConsumer();
protected:
std::string m_brokers;
std::string m_groupID;
std::vector<std::string> m_topicVector;
int m_partition;
RdKafka::Conf* m_config;
RdKafka::Conf* m_topicConfig;
RdKafka::KafkaConsumer* m_consumer;
RdKafka::EventCb* m_event_cb;
RdKafka::RebalanceCb* m_rebalance_cb;
};
#endif // KAFKACONSUMER_H
KafkaConsumer.cpp
#include "KafkaConsumer.h"
class ConsumerEventCb : public RdKafka::EventCb
{
public:
void event_cb (RdKafka::Event &event)
{
switch (event.type())
{
case RdKafka::Event::EVENT_ERROR:
if (event.fatal())
{
std::cerr << "FATAL ";
}
std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " <<
event.str() << std::endl;
break;
case RdKafka::Event::EVENT_STATS:
std::cerr << "\"STATS\": " << event.str() << std::endl;
break;
case RdKafka::Event::EVENT_LOG:
fprintf(stderr, "LOG-%i-%s: %s\n",
event.severity(), event.fac().c_str(), event.str().c_str());
break;
case RdKafka::Event::EVENT_THROTTLE:
std::cerr << "THROTTLED: " << event.throttle_time() << "ms by " <<
event.broker_name() << " id " << (int)event.broker_id() << std::endl;
break;
default:
std::cerr << "EVENT " << event.type() <<
" (" << RdKafka::err2str(event.err()) << "): " <<
event.str() << std::endl;
break;
}
}
};
class ConsumerRebalanceCb : public RdKafka::RebalanceCb
{
private:
static void printTopicPartition (const std::vector<RdKafka::TopicPartition*>&partitions) // 打印当前获取的分区
{
for (unsigned int i = 0 ; i < partitions.size() ; i++)
std::cerr << partitions[i]->topic() <<
"[" << partitions[i]->partition() << "], ";
std::cerr << "\n";
}
public:
void rebalance_cb (RdKafka::KafkaConsumer *consumer,
RdKafka::ErrorCode err,
std::vector<RdKafka::TopicPartition*> &partitions)
{
std::cerr << "RebalanceCb: " << RdKafka::err2str(err) << ": ";
printTopicPartition(partitions);
if (err == RdKafka::ERR__ASSIGN_PARTITIONS)
{
consumer->assign(partitions);
partition_count = (int)partitions.size();
}
else
{
consumer->unassign();
partition_count = 0;
}
}
private:
int partition_count;
};
KafkaConsumer::KafkaConsumer(const std::string& brokers, const std::string& groupID,
const std::vector<std::string>& topics, int partition)
{
m_brokers = brokers;
m_groupID = groupID;
m_topicVector = topics;
m_partition = partition;
std::string errorStr;
RdKafka::Conf::ConfResult errorCode;
m_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
m_event_cb = new ConsumerEventCb;
errorCode = m_config->set("event_cb", m_event_cb, errorStr);
if(errorCode != RdKafka::Conf::CONF_OK)
{
std::cout << "Conf set failed: " << errorStr << std::endl;
}
m_rebalance_cb = new ConsumerRebalanceCb;
errorCode = m_config->set("rebalance_cb", m_rebalance_cb, errorStr);
if(errorCode != RdKafka::Conf::CONF_OK)
{
std::cout << "Conf set failed: " << errorStr << std::endl;
}
errorCode = m_config->set("enable.partition.eof", "false", errorStr);
if(errorCode != RdKafka::Conf::CONF_OK)
{
std::cout << "Conf set failed: " << errorStr << std::endl;
}
errorCode = m_config->set("group.id", m_groupID, errorStr);
if(errorCode != RdKafka::Conf::CONF_OK)
{
std::cout << "Conf set failed: " << errorStr << std::endl;
}
errorCode = m_config->set("bootstrap.servers", m_brokers, errorStr);
if(errorCode != RdKafka::Conf::CONF_OK)
{
std::cout << "Conf set failed: " << errorStr << std::endl;
}
errorCode = m_config->set("max.partition.fetch.bytes", "1024000", errorStr);
if(errorCode != RdKafka::Conf::CONF_OK)
{
std::cout << "Conf set failed: " << errorStr << std::endl;
}
// partition.assignment.strategy range,roundrobin
m_topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
// 获取最新的消息数据
errorCode = m_topicConfig->set("auto.offset.reset", "latest", errorStr);
if(errorCode != RdKafka::Conf::CONF_OK)
{
std::cout << "Topic Conf set failed: " << errorStr << std::endl;
}
errorCode = m_config->set("default_topic_conf", m_topicConfig, errorStr);
if(errorCode != RdKafka::Conf::CONF_OK)
{
std::cout << "Conf set failed: " << errorStr << std::endl;
}
m_consumer = RdKafka::KafkaConsumer::create(m_config, errorStr);
if(m_consumer == NULL)
{
std::cout << "Create KafkaConsumer failed: " << errorStr << std::endl;
}
std::cout << "Created consumer " << m_consumer->name() << std::endl;
}
void msg_consume(RdKafka::Message* msg, void* opaque)
{
switch (msg->err())
{
case RdKafka::ERR__TIMED_OUT:
std::cerr << "Consumer error: " << msg->errstr() << std::endl; // 超时
break;
case RdKafka::ERR_NO_ERROR: // 有消息进来
std::cout << " Message in-> topic:" << msg->topic_name() << "partition:["
<< msg->partition() << "] at offset " << msg->offset()
<< " key: " << msg->key() << " payload: "
<< (char*)msg->payload() << std::endl;
break;
default:
std::cerr << "Consumer error: " << msg->errstr() << std::endl;
break;
}
}
void KafkaConsumer::pullMessage()
{
// 订阅Topic
RdKafka::ErrorCode errorCode = m_consumer->subscribe(m_topicVector);
if (errorCode != RdKafka::ERR_NO_ERROR)
{
std::cout << "subscribe failed: " << RdKafka::err2str(errorCode) << std::endl;
}
// 消费消息
while(true)
{
RdKafka::Message *msg = m_consumer->consume(1000);
msg_consume(msg, NULL);
delete msg;
}
}
KafkaConsumer::~KafkaConsumer()
{
m_consumer->close();
delete m_config;
delete m_topicConfig;
delete m_consumer;
delete m_event_cb;
delete m_rebalance_cb;
}
main.cpp使用:
#include "KafkaConsumer.h"
int main(int argc, char* argv[])
{
std::string brokers = "127.0.0.1:9092";
std::vector<std::string> topics;
topics.push_back("test");
// topics.push_back("test2");
std::string group = "testGroup";
if(argc >= 2) {
group = argv[1];
}
std::cout << "group " << group << std::endl;
KafkaConsumer consumer(brokers, group, topics, RdKafka::Topic::OFFSET_BEGINNING);
consumer.pullMessage();
RdKafka::wait_destroyed(5000);
return 0;
}
CMakeLists.txt
cmake_minimum_required(VERSION 2.8)
project(KafkaConsumer)
set(CMAKE_CXX_STANDARD 11)
set(CMAKE_CXX_COMPILER "g++")
set(CMAKE_CXX_FLAGS "-std=c++11 ${CMAKE_CXX_FLAGS}")
set(CMAKE_INCLUDE_CURRENT_DIR ON)
# Kafka头文件路径
include_directories(/usr/local/include/librdkafka)
# Kafka库路径
link_directories(/usr/local/lib)
aux_source_directory(. SOURCE)
#add_executable(${PROJECT_NAME} ${SOURCE})
#TARGET_LINK_LIBRARIES(${PROJECT_NAME} rdkafka++)
ADD_EXECUTABLE(${PROJECT_NAME} main.cpp KafkaConsumer.cpp)
TARGET_LINK_LIBRARIES(${PROJECT_NAME} rdkafka++)
编译:
mkdir build
cd build
cmake ..
make
Consumer 需要向 Kafka 汇报自己的位移数据,这个汇报过程被称为提交位移(Committing Offsets)。因为 Consumer 能够同时消费多个分区的数据,所以位移的提交实际上是在分区粒度上进行的,即 Consumer 需要为分配给它的每个分区提交各自的位移数据。
提交位移主要是为了表征 Consumer 的消费进度,这样当 Consumer 发生故障重启之后,就能够从Kafka 中读取之前提交的位移值,然后从相应的位移处继续消费,从而避免整个消费过程重来一遍。
从用户的角度来说,位移提交分为自动提交和手动提交;从 Consumer 端的角度来说,位移提交分为同步提交和异步提交。
自动提交默认全部为同步提交。
自动提交相关参数:
| 参数 | 描述 |
|---|---|
| enable.auto.commit (bool) | 如果为True,将自动定时提交消费者offset。默认为True。 |
| auto.commit.interval.ms(int) | 自动提交offset之间的间隔毫秒数。如果enable_auto_commit为true,默认值为 5000。 |
当设置 enable.auto.commit 为 true,Kafka 会保证在开始调用 poll 方法时,提交上次 poll 返回的所有消息。从顺序上来说,poll 方法的逻辑是先提交上一批消息的位移,再处理下一批消息,因此它能保证不出现消费丢失的情况。
但自动提交位移的一个问题在于,它可能会出现重复消费。
手动提交可以自己选择是同步提交(commitSync)还是异步提交(commitAsync )。commitAsync 不能够替代 commitSync。commitAsync 的问题在于,出现问题时它不会自动重试,因为它是异步操作。
手动提交,需要将 commitSync 和 commitAsync 组合使用才能到达最理想的效果。可以利用 commitSync 的自动重试来规避那些瞬时错误,同时不希望程序总处于阻塞状态,影响 TPS。
同时使用 commitSync() 和 commitAsync():
ErrorCode commitSync();
提交当前分配分区的位移,同步操作,会阻塞直到位移被提交或提交失败。如果注册了RdKafka::OffsetCommitCb回调函数,其会在KafkaConsumer::consume()函数内调用并提交位移。
ErrorCode commitAsync();
异步提交位移
当kafka遇到如下四种情况的时候,kafka会触发Rebalance机制:
RdKafka提供了两种消费者API,低级API的Consumer和高级API的KafkaConsumer。
Kafka Consumer使用流程:
(1)创建Kafka配置实例。
(2)创建Topic配置实例。
(3)设置Kafka配置实例Broker属性。
(4)设置Topic配置实例属性。
(5)注册回调函数。
(6)创建Kafka Consumer客户端实例。
(7)创建Topic实例。
(8)订阅主题。
(9)消费消息。
(10)关闭消费者实例。
(11)销毁释放RdKafka资源。
