最近 Mirror Maker
需要增加同步几个 topic
到下游 Kafka
集群。修改配置后,重启 Mirror Maker
服务后一直无法正常消费。报 (Re-join group)
的错误。这里整理并记录一下。
Mirror Maker
一直在卡在了这,然后上游 Kafka
服务没有明显的日志输出。过段时间后,MM
服务就自动退出了。cpu
使用率升高,使用率大概在 500%
。pool-16-thread-13 org.apache.kafka.clients.consumer.internals.AbstractCoordinator (Re-)joining group
一开始,观察到服务没有别的异常输出,只是在频繁的打印 (Re-) join group
,由于无法登录服务器,所以无法打印服务的 jstack
信息,只是根据 CPU
的异常使用率,怀疑是与锁/GC 有关,只能根据现有的工具来排查问题。
JVM
既然怀疑是与 锁 / GC 有关,那么先从GC 入手,调大 JVM Heap
大小。然后观察服务情况。
重启服务后,发现问题没有解决,依然是服务无法正常消费数据,然后服务CPU 使用率高。
那么只可能是与锁有关了,但是无法打印服务的堆栈信息,只能求助 CDH
原厂专家了。
num.streams
专家介入后,很快发现我们的 num.streams
和 num.produces
这两个参数设置的很不合理。
我们生产上的配置:
num.streams
设置了 700num.producers
设置了400经过介绍,num.producers
这个配置,即使配置了也不会生效,默认是 1
,所以这个配合就算改了还好,但是确实是过大了。
影响最大的是 num.streams
这个配置,这个配置是:
每个线程实例化并使用一个消费者。MM
线程和消费者之间的1:1映射。
每个线程共享同一个生产者。线程和生产者之间的N:1映射。
这里我们一共起了 3 个 MM
实例,每个实例 700
个线程,也就是 700 个消费者,总共 2100 个消费者,但是我们需要消费的 topic
全部的分区数加起来预估可能也才 200 个左右。
综上,很明显我们消费者是远远大于分区数的。这里我们陷入了误区,因为一开始服务是能正常使用的,只是我们多订阅了几个 topic
,然后重启服务,就不可用了,一开始没往这个地方想,而且即使消费者数量多了,最多导致很多的线程空载,不会实际消费数据,应该也无所谓。
这里的解决方案也很简单, 降低 num.streams
为 50。
核心思想: 降低消费者的数量,减少 stop-the-world Rebalaning 次数。
负载平衡和调度是每个分布式系统的核心,Apache Kafka
也不例外。Kafka 客户端使用组管理 API 来形成协作客户端进程组。这些客户端形成组的能力由 Kafka 代理促进,该代理充当参与组的客户端的协调器。
客户端之间的实际负载分配发生在它们之间,这样就不会给 Kafka
服务带来压力,每当需要在客户端之间分配负载时,就会开始新一轮的再平衡,在此期间所有进程都会释放它们的资源。在此阶段结束时,重申组成员身份并选举组领导,每个客户端都被分配一组新资源。简而言之,这也称为stop-the-world rebalancing,这个短语可以追溯到垃圾收集文献。
这里引用了https://www.confluent.io/blog/incremental-cooperative-rebalancing-in-kafka/
的测试数据。
可以看到,当连接器及其任务启动或停止时 stop-the-world)的成本与集群中当前运行的任务数量成正比。
从上文得知,我们的消费者数量过大,就会导致 频繁的进行Rebalancer
,然后就会出现频繁的 (Re-) join group
。而且由于客户端代码的实现很多地方都是 synchronized
,更加重了锁竞争,导致CPU
使用率过高的问题。
直接相关
RequestFuture<ByteBuffer> sendJoinGroupRequest() {
if (coordinatorUnknown())
return RequestFuture.coordinatorNotAvailable();
// send a join group request to the coordinator
log.info("(Re-)joining group");
JoinGroupRequest.Builder requestBuilder = new JoinGroupRequest.Builder(
new JoinGroupRequestData()
.setGroupId(rebalanceConfig.groupId)
.setSessionTimeoutMs(this.rebalanceConfig.sessionTimeoutMs)
.setMemberId(this.generation.memberId)
.setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
.setProtocolType(protocolType())
.setProtocols(metadata())
.setRebalanceTimeoutMs(this.rebalanceConfig.rebalanceTimeoutMs)
);
log.debug("Sending JoinGroup ({}) to coordinator {}", requestBuilder, this.coordinator);
// Note that we override the request timeout using the rebalance timeout since that is the
// maximum time that it may block on the coordinator. We add an extra 5 seconds for small delays.
int joinGroupTimeoutMs = Math.max(rebalanceConfig.rebalanceTimeoutMs, rebalanceConfig.rebalanceTimeoutMs + 5000);
return client.send(coordinator, requestBuilder, joinGroupTimeoutMs)
.compose(new JoinGroupResponseHandler());
}
初始化加入组
private synchronized RequestFuture<ByteBuffer> initiateJoinGroup() {
if (joinFuture == null) {
// fence off the heartbeat thread explicitly so that it cannot interfere with the join group.
// Note that this must come after the call to onJoinPrepare since we must be able to continue
// sending heartbeats if that callback takes some time.
disableHeartbeatThread();
state = MemberState.REBALANCING;
// a rebalance can be triggered consecutively if the previous one failed,
// in this case we would not update the start time.
if (lastRebalanceStartMs == -1L)
lastRebalanceStartMs = time.milliseconds();
joinFuture = sendJoinGroupRequest();
joinFuture.addListener(new RequestFutureListener<ByteBuffer>() {
@Override
public void onSuccess(ByteBuffer value) {
synchronized (AbstractCoordinator.this) {
} else {
log.info("Generation data was cleared by heartbeat thread. Rejoin failed.");
recordRebalanceFailure();
}
}
}
@Override
public void onFailure(RuntimeException e) {
synchronized (AbstractCoordinator.this) {
recordRebalanceFailure();
}
}
private void recordRebalanceFailure() {
state = MemberState.UNJOINED;
sensors.failedRebalanceSensor.record();
}
});
}
return joinFuture;
}
相关调用
/**
* Ensure the group is active (i.e., joined and synced)
*
* @param timer Timer bounding how long this method can block
* @throws KafkaException if the callback throws exception
* @return true iff the group is active
*/
boolean ensureActiveGroup(final Timer timer) {
// always ensure that the coordinator is ready because we may have been disconnected
// when sending heartbeats and does not necessarily require us to rejoin the group.
if (!ensureCoordinatorReady(timer)) {
return false;
}
startHeartbeatThreadIfNeeded();
return joinGroupIfNeeded(timer);
}
客户端最开始调用
public void poll(long now, long remainingMs) {
invokeCompletedOffsetCommitCallbacks();
if (subscriptions.partitionsAutoAssigned()) {
...
if (needRejoin()) {
...
ensureActiveGroup();
...
}
} else {
...
}
}
pollHeartbeat(now);
maybeAutoCommitOffsetsAsync(now);
}
MM
是一个 kafka conect。MM
一个线程对应一个消费者,这个线程会在开始时订阅需要消费的全部topic ,由内部的客户端来实现负载均衡。也就是会进行多次的rebalancer
和 (re-) join group
.MM
最好通过水平横向扩展实例来提高性能,因为 num.producers
配置是写死的。Kafka
消费者数量与 rebalancer时间 和 次数有绝对的影响,不要调到过高。MM
内部的消费线程是 CountDownLatch
实现的。在关闭 MM
服务时,也会出现频繁重平衡问题,因为关闭一个线程,就会触发重平衡。所以这个参数切记调小。