• 03 当前机器的虚拟机[NAT网络]映射出 9200 到宿主机, 局域网的其他机器访问 宿主机+9200 访问不到 kafka 的服务


    前言

    上下文是这样的 : 我开发机器上面安装了一个 虚拟机, 这个虚拟机网络基于 NAT 配置的

    我本地机器 局域网 ip 为 192.168.0.101, 虚拟机的 ip 为 192.168.220.133

    然后 在 vmware 中增加了 NAT 网络映射, 将 192.168.220.133:9200 映射到了 192.168.0.101:9200, 然后 我本地的 kafka客户端 是能够正常访问到 kafka 的服务器的数据, 但是局域网的其他机器 的 kafka客户端 不能够正确的访问到 192.168.0.101:9200 的数据

    呵呵 这里就这个问题 简单剖析一下

    kafka 服务器基于 2.4.1, 客户端基于 2.2.0

    测试用例

    1. /**
    2. * Test06KafkaProducer
    3. *
    4. * @author Jerry.X.He <970655147@qq.com>
    5. * @version 1.0
    6. * @date 2022-05-28 10:14
    7. */
    8. public class Test06KafkaConsumer {
    9. // Test06KafkaProducer
    10. public static void main(String[] args) {
    11. Properties props = new Properties();
    12. props.put("bootstrap.servers", "192.168.0.101:9092");
    13. props.put("group.id", "test2");
    14. // props.put("enable.auto.commit", "true");
    15. props.put("enable.auto.commit", "false");
    16. props.put("auto.commit.interval.ms", "1000");
    17. props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    18. props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    19. props.put("auto.offset.reset", "earliest");
    20. String topic = "test20220528";
    21. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    22. consumer.subscribe(Arrays.asList(topic));
    23. try {
    24. while (true) {
    25. ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100_000));
    26. for (ConsumerRecord<String, String> record : records) {
    27. System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    28. }
    29. }
    30. } finally {
    31. consumer.close();
    32. }
    33. }
    34. }

    问题的现象就是 我本地机器跑这个测试用例能够正常的消费到数据 

    但是 在局域网的另外一台机器 拿不到消费不到 kafka 的数据 

    kafka 客户端和服务器的网络交互

    我们大致拆解一下在一次普通的 客户端 和 服务端 交互的过程中的一些网络交互的信息 

    如下客户端 连接 服务端 存在三种配置信息

    -1 来自于客户端 properties 配置的 bootstrap.servers 配置项, 在 ”this.metadata.bootstrap(addresses, time.milliseconds())“ 中初始化 

    0 来自于客户端请求的 MetadataRequest 的响应信息, 数据来自于 kafka 服务器的 metadataSnapshot, 在 "NetworkClient.poll" 中初始化 

    2147483647 来自于客户端请求的 FindCoordinatorRequest 的响应信息, 数据来自于 kafka 服务器的 metadataSnapshot, 在 ”ConsumerCoordinator.poll“ 中初始化 

    1. -1 -> bootstrap.servers 配置的连接信息, 数据来自于 客户端配置
    2. 0 -> MetadataResponse 响应的连接信息, 数据来自于 metadataSnapshot
    3. 2147483647 -> FindCoordinatorRequest 响应的连接信息, 数据来自于 metadataSnapshot

    我们大致来看一下这些 节点配置信息 对应的相关发送的请求 

    这里仅仅是抽样一个结果, 并不去做 具体的为什么 这个请求 使用这个配置信息的解释 

    1. -1 - org.apache.kafka.common.requests.FindCoordinatorRequest$Builder
    2. 0 - org.apache.kafka.common.requests.FindCoordinatorRequest$Builder
    3. 2147483647 - org.apache.kafka.common.requests.JoinGroupRequest$Builder
    4. 2147483647 - org.apache.kafka.common.requests.JoinGroupRequest$Builder
    5. 2147483647 - org.apache.kafka.common.requests.SyncGroupRequest$Builder
    6. 2147483647 - org.apache.kafka.common.requests.OffsetFetchRequest$Builder
    7. 0 - org.apache.kafka.common.requests.ListOffsetRequest$Builder
    8. 0 - org.apache.kafka.common.requests.FetchRequest$Builder
    9. 0 - org.apache.kafka.common.requests.FetchRequest$Builder
    10. 2147483647 - org.apache.kafka.common.requests.HeartbeatRequest$Builder

    为什么 局域网的其他机器访问不到映射出来的 kafka 的服务?

    这里先直接 阐述 原因 

    上面的 三类节点信息, bootstrap.servers 的配置来自于客户端配置, 客户端得到的是 192.168.0.101:9200, 这个服务不管是在本机, 还是在同局域网的其他机器 都是是可以访问的 

    但是 后面的两类, 是来自于 kafka 服务器的 metadataSnapshot, 服务器返回的是 advertised.listener/advertised.listener.host.name+advertised.listener.host.port 配置的信息 

    客户端 得到的是 192.168.220.133:9200, 这个服务是本机可以访问, 但是局域网其他机器是访问不到的, 因此 出现了访问不到 kafka 的服务的问题  

    这个通常来说, 就是增加一层代理路由, 或者 调整虚拟机的网络配置 来解决 

    FindCoordinatorResponse/MetadataResponse 的响应

    MetadataResponse 的 连接信息来自于 metadataCache, 数据存储于 metadataCache.metadataSnapshot 

    FindCoordinatorResponse 的 连接信息来自于 metadataCache, 数据存储于 metadataCache.metadataSnapshot 

    metadataSnapshot 的更新流程大致如下 

    可以看到的是 获取到的节点信息来自于 kafka 服务器配置的 advertised 的相关配置信息 

    1. FindCoordinatorRequest 获取的数据来自于 metadataSnapshot
    2. kafkaController.sendUpdateMetadataRequest 发生事件的时候 更新 metadataSnapshot 的请求
    3. aliveNodes 来自于 broker 的相关信息, 这个 brokerInfo 来自于 KafkaController.initializeControllerContext
    4. KafkaController.initializeControllerContext 的 aliveNodes 来自于 zkClient 向 "/brokers/ids/*" 获取 brokerId 列表, 然后在依次获取 "brokers/ids/$id" 的数据信息, 作为 Broker 的元数据信息
    5. broker 注册到 zk 的信息来自于 KafkaServer 启动的时候向 zk 注册的, endpoints 的相关信息来自于 "advertised.listeners", "advertised.host.name:advertised.host.port", "listeners", "listeners.host.name:listeners.host.port"

  • 相关阅读:
    初识设计模式之单例模式
    全渠道电商 | 国内知名的药妆要如何抓住风口实现快速增长?
    005. 字符串分割[100 分]
    【云原生之Docker实战】使用Docker部署Restyaboard项目管理工具
    马里奥游戏 java
    redis复习总结
    441.排列硬币
    1.机器学习概念及相关术语解释
    OpenSign 开源 PDF 电子签名解决方案
    【Linux 网络】高级 IO -- 详解
  • 原文地址:https://blog.csdn.net/u011039332/article/details/125237601