在连接Kafka服务器消费数据前,需要创建Kafka消费者进行拉取数据,需要配置相应的参数,比如设置消费者所属的消费者组名称、连接的broker服务器地址、序列号和反序列化的方式等配置。
public KafkaConsumer<String, String> getConsumer() {
Properties props = new Properties();
// kafka集群所需的broker地址
props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092);
// kafka消费者群组名称
props.put("group.id", "group_demo");
// 消费者从broker端获取的消息格式都是byte[]数组类型,key和value需要进行反序列化。
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 创建消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
return consumer;
}
更多消费者配置可参考官网:
https://kafka.apache.org/documentation/#consumerconfigs
消费者可使用 subscribe()
方法订阅一个主题。订阅主题时,既可以以集合的形式订阅多个主题,也可以以正则表达式的形式订阅特定模式的主题。对于以正则表达式的形式订阅主题,在之后的过程中,如果有人又创建了新的主题,并且主题的名字与正则表达式相匹配,那么这个消费者就可以消费到新添加的主题中的消息。如果应用程序需要消费多个主题,并且可以处理不同的类型,那么这种订阅方式就很有效
subscribe() 的几个重载方法如下:
上述重载方法中的参数ConsumerRebalance-Listener
,这个是用来设置相应的再均衡监听器的。
比如订阅多个主题的代码如下:
// 订阅多个主题
consumer.subscribe(Arrays.asList("topic1","topic2",...));
通过上述方式,就实现了Kafka创建消费者,并利用subscribe方式订阅指定主题下的数据。接下来,利用poll
方法就可以拉取相应数据了。
除了以主题的形式订阅外,Kafka提供了可以更细致的订阅主题下某个分区的功能。我们知道Kafka主题包含了多个分区来实现负载均衡,分区对于消费者来说,可以提高并发度,提高效率。因此对于特定的场景需求时,Kafka允许指定某个分区下的数据进行消费,所设计的函数就是assign
。
此方法的具体定义如下:
public void assign(Collection partitions)
这个方法只接受一个参数partitions,用来指定需要订阅的分区集合。这里的TopicPartition类只有2个属性:topic和partition,分别代表分区所属的主题和自身的分区编号:
比如需要订阅topic主题分区编号为0的分区,代码如下所示:
consumer.assign(Arrays.asList(new TopicPartition("topic", 0))
如果我们事先并不知道主题中有多少个分区怎么办?Kafka 提供了partitionsFor()
方法可以用来查询指定主题的元数据信息,代码如下所示:
List<TopicPartition> list = new ArrayList<>();
// 根据指定主题,获取到所有分区信息
List<PartitionInfo> partitions = consumer.partitionsFor(topic);
// 为每个分区设置对应的起始时间
for (PartitionInfo par : partitions) {
list.add(new TopicPartition(topic, par.partition()));
}
for (TopicPartition topicPartition : list)
{
consumer.assign(Arrays.asList(topicPartition)); // 订阅主题中指定的分区
}
上述代码利用consumer.partitionsFor(topic)
获取指定主题下的所有分区信息,然后利用new TopicPartition
方法封装主题下的每个分区,最后对于每个分区调用consumer.assign
进行订阅。
取消订阅主题可以采用unsubscribe()
方法。
consumer.ubunsubscribe();
这里有一个有趣的问题,subscribe()
方法有对应的unsubscribe()
方法取消订阅某个主题,但是assign()
却没有unassign
方法来取消订阅主题下的某个分区。因此,对于取消某个分区的订阅,如果我们不希望使用unsubscribe()
上升到主题的维度,而仍然想仅仅考虑这个分区,该采用什么方式呢?
可以使用将订阅的分区置空的方式:
consumer.assgin(new ArrayList<TopicPartition>())
不过似乎unsubscribe()
方法也可以取消通过 assign(Collection) 方式实现的订阅。
Kafka订阅主题或分区分别使用subscribe
方法和assign
方法。
区别:
subscribe
方法在多个消费者的情况下可以根据分区分配策略来自动分配各个消费者与分区的关系,当消费组内的消费者增加或减少时,分区分配关系会自动调整,以实现消费负载均衡及故障自动转移。assign
方法用来指定订阅主题下的某个分区,在使用时是不具备消费者自动均衡的功能的。这一点从 assign()
方法的参数中就可以看出端倪,subscribe()
方法中有 ConsumerRebalanceListener
类型参数,而 assign()
方法却没有。取消订阅:
unsubscribe
方法。consumer.ubunsubscribe();
consumer.assgin(new ArrayList())
或consumer.ubunsubscribe()