kafka端口:9092
r730_1:192.168.0.11
r730_2:192.168.0.22
r730_1:
listeners=SASL_PLAINTEXT://192.168.0.11:9092
r730_2:
listeners=SASL_PLAINTEXT://192.168.0.22:9092
kafka会把上面两个配置写到zookeeper集群。
然后内网脚本代码里配置的是192.168.0.11:9092,脚本会通过该配置,到zookeeper集群拿到(192.168.0.11:9092,192.168.0.22:9092)其中一个配置,然后进行通信,达到负载均衡的效果。
r730_1:
listeners=SASL_PLAINTEXT://192.168.0.11:9092 (把这个穿透到云服务器的9092端口)
r730_2:
listeners=SASL_PLAINTEXT://192.168.0.22:9092(把这个穿透到云服务器的9093端口)
然后外网代码里配置云服务器穿透后的IP地址: x.x.x.x:9092
然后外网脚本会通过该穿透地址到zookeeper集群获取到(192.168.0.11:9092,192.168.0.22:9092)其中一个配置,然后对获取到的地址进行通信。
当时因为获取到的是内网地址,导致外网脚本无法通过获取到的地址连接到kafka。
r730_1:
listeners=SASL_PLAINTEXT://192.168.0.11:9092 (把这个穿透到云服务器的9092端口)
advertised.listeners=SASL_PLAINTEXT:// x.x.x.x:9092 (这里填写穿透后的地址)
r730_2:
listeners=SASL_PLAINTEXT://192.168.0.22:9092(把这个穿透到云服务器的9093端口)
advertised.listeners=SASL_PLAINTEXT:// x.x.x.x:9093(这里填写穿透后的地址)
当填写了advertised.listeners地址以后,kafka会把advertised.listeners写入zookeeper,而不写入listeners地址。
即:kafka会把(SASL_PLAINTEXT:// x.x.x.x:9092,SASL_PLAINTEXT:// x.x.x.x:9093)写入zookeeper。
外网脚本配置地址为 x.x.x.x:9092,通过该地址连接到kafka,然后获取到(SASL_PLAINTEXT:// x.x.x.x:9092,SASL_PLAINTEXT:// x.x.x.x:9093)
其中一个地址,然后连接外网地址进行通信,连接到kafka。
内网脚本配置地址为192.168.0.11:9092,通过该地址连接到kafka,然后获取到(SASL_PLAINTEXT:// x.x.x.x:9092,SASL_PLAINTEXT:// x.x.x.x:9093)
其中一个地址,由于内网脚本获取到的也是外网地址,所以内网脚本也会连接外网地址进行通信,这样会导致云服务器带宽压力大。
r730_1:
listeners=SASL_PLAINTEXT://192.168.0.11:9092 (把这个穿透到云服务器的9092端口)
advertised.listeners=SASL_PLAINTEXT://kafka.xxx.cn:9092 (云服务器的9092绑定到kafka.xxx.cn)
r730_2:
listeners=SASL_PLAINTEXT://192.168.0.22:9092(把这个穿透到云服务器的9093端口)
advertised.listeners=SASL_PLAINTEXT://kafka2.xxx.cn:9093(云服务器的9093绑定到kafka2.xxx.cn)
然后zookeeper会把(SASL_PLAINTEXT://kafka.xxx.cn:9092,SASL_PLAINTEXT://kafka2.xxx.cn:9093)写入zookeeper。
外网脚本配置地址为kafka.xxx.cn:9092,通过该地址连接到kafka,然后获取到(SASL_PLAINTEXT://kafka.xxx.cn:9092,SASL_PLAINTEXT://kafka2.xxx.cn:9093)
其中一个地址,然后连接外网地址进行通信,连接到kafka。
内网脚本配置地址为kafka.xxx.cn:9092,
在内网服务器的hosts文件需配置:
192.168.0.11 kafka.xxx.cn
192.168.0.22 kafka2.xxx.cn
内网脚本通过kafka.xxx.cn:9092连接到kafka,然后hosts做一个映射,通过内网IP地址192.168.0.11连接r_7301,然后获取到(SASL_PLAINTEXT://kafka.xxx.cn:9092,SASL_PLAINTEXT://kafka2.xxx.cn:9093)其中一个配置。
由于在hosts里做了映射,所以相当于内网脚本获取的地址是(192.168.0.11:9092,192.168.0.22:9092)其中一个配置,这样就不会走外网地址了。效率更高,也节省云服务器带宽。
6.docker容器运行kafka消费者或生产者 场景:
使用 --add-host参数即可:
docker run -it --add-host=192.168.0.11:kafka.xxx.cn --add-host=192.168.0.22:kafka2.xxx.cn myimage /bin/bash
最终方案参考文章:https://www.cnblogs.com/cnsre/p/14379007.html
参考文章叙述:
内外网分流 上面说的有外网ip的情况,直接配置外网ip有没有问题呢?
如果既要内网访问,又要外网访问,本来可以走内网的流量都走外网网卡,显然不合适;而且有的环境可能被配置成这些kafka宿主机是没有外网访问权限的,即虽然他可以访问自己的外网ip,但是访问不了兄弟节点的外网ip。这时候就要配置内外网。
配置1: {{< codes 配置1 配置2>}} {{}}listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
listeners=INTERNAL://192.168.0.213:9092,EXTERNAL://192.168.0.213:19092
advertised.listeners=INTERNAL://192.168.0.213:9092,EXTERNAL://101.89.163.9:19092
inter.broker.listener.name=INTERNAL {{}} {{}}listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
listeners=INTERNAL://192.168.0.213:9092,EXTERNAL://101.89.163.9:19092
advertised.listeners=INTERNAL://192.168.0.213:9092,EXTERNAL://101.89.163.9:19092
inter.broker.listener.name=INTERNAL {{}} {{}}
注意这两的区别是listeners的EXTERNAL使用的ip不一样,一个使用内网ip,一个使用外网ip。如果你的kafka宿主机有外网网卡,只能用外网ip,若使用配置1,kafka通过listeners监听的两个端口都是内网网卡的数据,无法接收到外网网卡数据;
如果你的kafka宿主机外网ip是映射来的,只能使用内网ip,原因也是上面说过的,不存在外网网卡,kafka启动监听就会报错,而使用内网ip有环境配置好的转发,可以接收到外网ip的数据。
我的最终配置:(将节点1的9093端口和节点2的9094端口穿透到外网,用来当作外网连接kafka的端口。)
节点1:
listener.security.protocol.map=INTERNAL:SASL_PLAINTEXT,EXTERNAL:SASL_PLAINTEXT
listeners=INTERNAL://192.168.0.xx:9092,EXTERNAL://192.168.0.xx:9093
#该条与inter.broker.listener.name不能共存
#security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
advertised.listeners=INTERNAL://192.168.0.xx:9092,EXTERNAL://182.xxx.xxx.xxx:9093
inter.broker.listener.name=INTERNAL
节点2:
listener.security.protocol.map=INTERNAL:SASL_PLAINTEXT,EXTERNAL:SASL_PLAINTEXT
listeners=INTERNAL://192.168.0.xx:9092,EXTERNAL://192.168.0.xx:9094
#该条与inter.broker.listener.name不能共存
#security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
advertised.listeners=INTERNAL://192.168.0.xx:9092,EXTERNAL://182.xxx.xxx.xxx:9094
inter.broker.listener.name=INTERNAL
1.zookeeper命令
连接zookeeper:
zkCli.sh -server 127.0.0.1:2181
看zookeeper集群里的节点:
ls /brokers/ids
看节点记录的kafka配置信息:
get /brokers/ids/节点id
2.kafka脚本报错:pandas这个错是pandas版本不够,其中一个方案,可以在版本够高的本地环境把一小段数据跑掉,后面就正常了。
line 97, in value_deserializer=lambda m: pickle.loads(m)
ModuleNotFoundError: No module named ‘pandas.core.internals.managers’; ‘pandas.core.internals’ is not a package
参考文章:
https://blog.csdn.net/weixin_44198560/article/details/130244015
https://www.ngui.cc/el/1990221.html?action=onClick
https://blog.csdn.net/lsr40/article/details/84135959
https://blog.csdn.net/weixin_42655822/article/details/109452272