• kafka集群穿透到公网实现过程


    1. 本地两台机器:

    kafka端口:9092
    r730_1:192.168.0.11
    r730_2:192.168.0.22

    2.未穿透到公网前kafka配置(只能进行内网读取):

    r730_1:
    listeners=SASL_PLAINTEXT://192.168.0.11:9092
    r730_2:
    listeners=SASL_PLAINTEXT://192.168.0.22:9092
    kafka会把上面两个配置写到zookeeper集群
    然后内网脚本代码里配置的是192.168.0.11:9092,脚本会通过该配置,到zookeeper集群拿到(192.168.0.11:9092,192.168.0.22:9092)其中一个配置,然后进行通信,达到负载均衡的效果。

    3.只进行公网穿透,没进行kafka外网配置,导致外网脚本不能连接kafka的场景:

    r730_1:
    listeners=SASL_PLAINTEXT://192.168.0.11:9092 (把这个穿透到云服务器的9092端口)
    r730_2:
    listeners=SASL_PLAINTEXT://192.168.0.22:9092(把这个穿透到云服务器的9093端口)

    然后外网代码里配置云服务器穿透后的IP地址: x.x.x.x:9092
    然后外网脚本会通过该穿透地址到zookeeper集群获取到(192.168.0.11:9092,192.168.0.22:9092)其中一个配置,然后对获取到的地址进行通信。
    当时因为获取到的是内网地址,导致外网脚本无法通过获取到的地址连接到kafka。

    4.进行公网穿透,进行kafka外网配置,但是没有进行内外网分流,导致内外网脚本都走公网,耗费云服务器带宽的场景:

    r730_1:
    listeners=SASL_PLAINTEXT://192.168.0.11:9092 (把这个穿透到云服务器的9092端口)
    advertised.listeners=SASL_PLAINTEXT:// x.x.x.x:9092 (这里填写穿透后的地址)

    r730_2:
    listeners=SASL_PLAINTEXT://192.168.0.22:9092(把这个穿透到云服务器的9093端口)
    advertised.listeners=SASL_PLAINTEXT:// x.x.x.x:9093(这里填写穿透后的地址)

    当填写了advertised.listeners地址以后,kafka会把advertised.listeners写入zookeeper,而不写入listeners地址。

    即:kafka会把(SASL_PLAINTEXT:// x.x.x.x:9092,SASL_PLAINTEXT:// x.x.x.x:9093)写入zookeeper。

    外网脚本配置地址为 x.x.x.x:9092,通过该地址连接到kafka,然后获取到(SASL_PLAINTEXT:// x.x.x.x:9092,SASL_PLAINTEXT:// x.x.x.x:9093)
    其中一个地址,然后连接外网地址进行通信,连接到kafka。

    内网脚本配置地址为192.168.0.11:9092,通过该地址连接到kafka,然后获取到(SASL_PLAINTEXT:// x.x.x.x:9092,SASL_PLAINTEXT:// x.x.x.x:9093)
    其中一个地址,由于内网脚本获取到的也是外网地址,所以内网脚本也会连接外网地址进行通信,这样会导致云服务器带宽压力大。

    5.进行公网穿透,进行kafka外网配置,并且通过 绑定域名 + 修改hosts,达到内外网分流的场景:

    r730_1:
    listeners=SASL_PLAINTEXT://192.168.0.11:9092 (把这个穿透到云服务器的9092端口)
    advertised.listeners=SASL_PLAINTEXT://kafka.xxx.cn:9092 (云服务器的9092绑定到kafka.xxx.cn)

    r730_2:
    listeners=SASL_PLAINTEXT://192.168.0.22:9092(把这个穿透到云服务器的9093端口)
    advertised.listeners=SASL_PLAINTEXT://kafka2.xxx.cn:9093(云服务器的9093绑定到kafka2.xxx.cn)

    然后zookeeper会把(SASL_PLAINTEXT://kafka.xxx.cn:9092,SASL_PLAINTEXT://kafka2.xxx.cn:9093)写入zookeeper。

    外网脚本配置地址为kafka.xxx.cn:9092,通过该地址连接到kafka,然后获取到(SASL_PLAINTEXT://kafka.xxx.cn:9092,SASL_PLAINTEXT://kafka2.xxx.cn:9093)
    其中一个地址,然后连接外网地址进行通信,连接到kafka。

    内网脚本配置地址为kafka.xxx.cn:9092,
    在内网服务器的hosts文件需配置:
    192.168.0.11 kafka.xxx.cn
    192.168.0.22 kafka2.xxx.cn

    内网脚本通过kafka.xxx.cn:9092连接到kafka,然后hosts做一个映射,通过内网IP地址192.168.0.11连接r_7301,然后获取到(SASL_PLAINTEXT://kafka.xxx.cn:9092,SASL_PLAINTEXT://kafka2.xxx.cn:9093)其中一个配置。

    由于在hosts里做了映射,所以相当于内网脚本获取的地址是(192.168.0.11:9092,192.168.0.22:9092)其中一个配置,这样就不会走外网地址了。效率更高,也节省云服务器带宽。

    6.docker容器运行kafka消费者或生产者 场景:
    使用 --add-host参数即可:

    docker run -it --add-host=192.168.0.11:kafka.xxx.cn --add-host=192.168.0.22:kafka2.xxx.cn myimage /bin/bash

    最终方案(序号5那种方法不行,那样只能让外网生产者正常生产,但是外网消费者不能正常消费。)

    最终方案参考文章:https://www.cnblogs.com/cnsre/p/14379007.html

    参考文章叙述:

    内外网分流 上面说的有外网ip的情况,直接配置外网ip有没有问题呢?
    如果既要内网访问,又要外网访问,本来可以走内网的流量都走外网网卡,显然不合适;而且有的环境可能被配置成这些kafka宿主机是没有外网访问权限的,即虽然他可以访问自己的外网ip,但是访问不了兄弟节点的外网ip。这时候就要配置内外网。
    配置1: {{< codes 配置1 配置2>}} {{}}

    listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
    listeners=INTERNAL://192.168.0.213:9092,EXTERNAL://192.168.0.213:19092
    advertised.listeners=INTERNAL://192.168.0.213:9092,EXTERNAL://101.89.163.9:19092
    inter.broker.listener.name=INTERNAL {{}} {{}}

    listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
    listeners=INTERNAL://192.168.0.213:9092,EXTERNAL://101.89.163.9:19092
    advertised.listeners=INTERNAL://192.168.0.213:9092,EXTERNAL://101.89.163.9:19092
    inter.broker.listener.name=INTERNAL {{}} {{}}
    注意这两的区别是listeners的EXTERNAL使用的ip不一样,一个使用内网ip,一个使用外网ip。

    如果你的kafka宿主机有外网网卡,只能用外网ip,若使用配置1,kafka通过listeners监听的两个端口都是内网网卡的数据,无法接收到外网网卡数据;
    如果你的kafka宿主机外网ip是映射来的,只能使用内网ip,原因也是上面说过的,不存在外网网卡,kafka启动监听就会报错,而使用内网ip有环境配置好的转发,可以接收到外网ip的数据。

    我的最终配置:(将节点1的9093端口和节点2的9094端口穿透到外网,用来当作外网连接kafka的端口。)
    节点1:

    listener.security.protocol.map=INTERNAL:SASL_PLAINTEXT,EXTERNAL:SASL_PLAINTEXT
    listeners=INTERNAL://192.168.0.xx:9092,EXTERNAL://192.168.0.xx:9093
    #该条与inter.broker.listener.name不能共存
    #security.inter.broker.protocol=SASL_PLAINTEXT
    sasl.mechanism.inter.broker.protocol=PLAIN
    sasl.enabled.mechanisms=PLAIN
    advertised.listeners=INTERNAL://192.168.0.xx:9092,EXTERNAL://182.xxx.xxx.xxx:9093
    inter.broker.listener.name=INTERNAL
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    节点2:

    listener.security.protocol.map=INTERNAL:SASL_PLAINTEXT,EXTERNAL:SASL_PLAINTEXT
    listeners=INTERNAL://192.168.0.xx:9092,EXTERNAL://192.168.0.xx:9094
    #该条与inter.broker.listener.name不能共存
    #security.inter.broker.protocol=SASL_PLAINTEXT
    sasl.mechanism.inter.broker.protocol=PLAIN
    sasl.enabled.mechanisms=PLAIN
    advertised.listeners=INTERNAL://192.168.0.xx:9092,EXTERNAL://182.xxx.xxx.xxx:9094
    inter.broker.listener.name=INTERNAL
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    后续补充:

    1.zookeeper命令

    连接zookeeper:
    zkCli.sh -server 127.0.0.1:2181
    看zookeeper集群里的节点:
    ls /brokers/ids
    看节点记录的kafka配置信息:
    get /brokers/ids/节点id
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    2.kafka脚本报错:pandas这个错是pandas版本不够,其中一个方案,可以在版本够高的本地环境把一小段数据跑掉,后面就正常了。
    line 97, in value_deserializer=lambda m: pickle.loads(m)
    ModuleNotFoundError: No module named ‘pandas.core.internals.managers’; ‘pandas.core.internals’ is not a package

    参考文章:
    https://blog.csdn.net/weixin_44198560/article/details/130244015

    https://www.ngui.cc/el/1990221.html?action=onClick

    https://blog.csdn.net/lsr40/article/details/84135959

    https://blog.csdn.net/weixin_42655822/article/details/109452272

  • 相关阅读:
    【背景渐变】 —— 就算没接触过也能 一遍学会哦
    『 CSS实战』CSS3 实现一些好玩的效果(1)
    【python】准点跑路人必备小程序~ 不信你用不到
    UNIAPP实战项目笔记34 购物车的单选和全选单选联动
    Oracle常用数值型函数
    MFC Windows 程序设计[327]之表格控件例程三(附源码)
    C51--PC通过串口(中断)点亮LED
    当AI遇到IoT:开启智能生活的无限可能
    ChatGPT的发展,是否会有越来越多的人失业?
    mybatis 如何实现批量更新呢?
  • 原文地址:https://blog.csdn.net/qq_44821149/article/details/133317275