上下文是这样的 : 我开发机器上面安装了一个 虚拟机, 这个虚拟机网络基于 NAT 配置的
我本地机器 局域网 ip 为 192.168.0.101, 虚拟机的 ip 为 192.168.220.133
然后 在 vmware 中增加了 NAT 网络映射, 将 192.168.220.133:9200 映射到了 192.168.0.101:9200, 然后 我本地的 kafka客户端 是能够正常访问到 kafka 的服务器的数据, 但是局域网的其他机器 的 kafka客户端 不能够正确的访问到 192.168.0.101:9200 的数据
呵呵 这里就这个问题 简单剖析一下
kafka 服务器基于 2.4.1, 客户端基于 2.2.0
- /**
- * Test06KafkaProducer
- *
- * @author Jerry.X.He <970655147@qq.com>
- * @version 1.0
- * @date 2022-05-28 10:14
- */
- public class Test06KafkaConsumer {
-
- // Test06KafkaProducer
- public static void main(String[] args) {
-
- Properties props = new Properties();
- props.put("bootstrap.servers", "192.168.0.101:9092");
- props.put("group.id", "test2");
- // props.put("enable.auto.commit", "true");
- props.put("enable.auto.commit", "false");
- props.put("auto.commit.interval.ms", "1000");
- props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.put("auto.offset.reset", "earliest");
-
- String topic = "test20220528";
- KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
- consumer.subscribe(Arrays.asList(topic));
- try {
- while (true) {
- ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100_000));
- for (ConsumerRecord<String, String> record : records) {
- System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
- }
- }
- } finally {
- consumer.close();
- }
- }
-
- }
问题的现象就是 我本地机器跑这个测试用例能够正常的消费到数据
但是 在局域网的另外一台机器 拿不到消费不到 kafka 的数据
我们大致拆解一下在一次普通的 客户端 和 服务端 交互的过程中的一些网络交互的信息
如下客户端 连接 服务端 存在三种配置信息
-1 来自于客户端 properties 配置的 bootstrap.servers 配置项, 在 ”this.metadata.bootstrap(addresses, time.milliseconds())“ 中初始化
0 来自于客户端请求的 MetadataRequest 的响应信息, 数据来自于 kafka 服务器的 metadataSnapshot, 在 "NetworkClient.poll" 中初始化
2147483647 来自于客户端请求的 FindCoordinatorRequest 的响应信息, 数据来自于 kafka 服务器的 metadataSnapshot, 在 ”ConsumerCoordinator.poll“ 中初始化
- -1 -> bootstrap.servers 配置的连接信息, 数据来自于 客户端配置
- 0 -> MetadataResponse 响应的连接信息, 数据来自于 metadataSnapshot
- 2147483647 -> FindCoordinatorRequest 响应的连接信息, 数据来自于 metadataSnapshot
我们大致来看一下这些 节点配置信息 对应的相关发送的请求
这里仅仅是抽样一个结果, 并不去做 具体的为什么 这个请求 使用这个配置信息的解释
- -1 - org.apache.kafka.common.requests.FindCoordinatorRequest$Builder
- 0 - org.apache.kafka.common.requests.FindCoordinatorRequest$Builder
- 2147483647 - org.apache.kafka.common.requests.JoinGroupRequest$Builder
- 2147483647 - org.apache.kafka.common.requests.JoinGroupRequest$Builder
- 2147483647 - org.apache.kafka.common.requests.SyncGroupRequest$Builder
- 2147483647 - org.apache.kafka.common.requests.OffsetFetchRequest$Builder
- 0 - org.apache.kafka.common.requests.ListOffsetRequest$Builder
- 0 - org.apache.kafka.common.requests.FetchRequest$Builder
- 0 - org.apache.kafka.common.requests.FetchRequest$Builder
- 2147483647 - org.apache.kafka.common.requests.HeartbeatRequest$Builder
这里先直接 阐述 原因
上面的 三类节点信息, bootstrap.servers 的配置来自于客户端配置, 客户端得到的是 192.168.0.101:9200, 这个服务不管是在本机, 还是在同局域网的其他机器 都是是可以访问的
但是 后面的两类, 是来自于 kafka 服务器的 metadataSnapshot, 服务器返回的是 advertised.listener/advertised.listener.host.name+advertised.listener.host.port 配置的信息
客户端 得到的是 192.168.220.133:9200, 这个服务是本机可以访问, 但是局域网其他机器是访问不到的, 因此 出现了访问不到 kafka 的服务的问题
这个通常来说, 就是增加一层代理路由, 或者 调整虚拟机的网络配置 来解决
MetadataResponse 的 连接信息来自于 metadataCache, 数据存储于 metadataCache.metadataSnapshot
FindCoordinatorResponse 的 连接信息来自于 metadataCache, 数据存储于 metadataCache.metadataSnapshot
metadataSnapshot 的更新流程大致如下
可以看到的是 获取到的节点信息来自于 kafka 服务器配置的 advertised 的相关配置信息
- FindCoordinatorRequest 获取的数据来自于 metadataSnapshot
- kafkaController.sendUpdateMetadataRequest 发生事件的时候 更新 metadataSnapshot 的请求
- aliveNodes 来自于 broker 的相关信息, 这个 brokerInfo 来自于 KafkaController.initializeControllerContext
- KafkaController.initializeControllerContext 的 aliveNodes 来自于 zkClient 向 "/brokers/ids/*" 获取 brokerId 列表, 然后在依次获取 "brokers/ids/$id" 的数据信息, 作为 Broker 的元数据信息
- broker 注册到 zk 的信息来自于 KafkaServer 启动的时候向 zk 注册的, endpoints 的相关信息来自于 "advertised.listeners", "advertised.host.name:advertised.host.port", "listeners", "listeners.host.name:listeners.host.port"
完