• Zookeeper集群 + Kafka集群


    目录

    一、Zookeeper概述

    1、Zookeeper定义

    2、Zookeeper 工作机制

    3、Zookeeper的特点

    4、Zookeeper应用场景

    二、Zookeeper集群部署

    1、环境准备

    2、安装Zookeeper

    3、修改配置文件

    4、创建数据目录和日志目录,并在data目录中创建一个 myid 的文件

    5、编写启动脚本,并启动

    三、Kafka概述

    1、为什么需要消息队列(MQ)

    2、使用消息队列的好处

    3、消息队列的两种模式

    4、Kafka的系统架构

    四、Kafka群集部署

    1、下载安装包

    2、安装 Kafka并修改配置文件

    3、修改环境变量

    4、配置Kafka启动脚本并启动

    五、Filebeat+Kafka+ELK结构部署

    1、在filebeat节点上修改配置文件

    2、在 Logstash 组件所在节点上新建一个 Logstash 配置文件

    3、最后启动logstash并验证


    一、Zookeeper概述

    1、Zookeeper定义

    Zookeeper是一个开源的分布式的,为分布式框架提供协调服务的Apache项目。

    2、Zookeeper 工作机制

    Zookeeper从设计模式角度来理解:是一个基于观察者模式设计的分布式服务管理框架,它负责存储和管理大家都关心的数据,然后接受观察者的注册,一旦这些数据的状态发生变化,Zookeeper就将负责通知已经在Zookeeper上注册的那些观察者做出相应的反应。也就是说 Zookeeper = 文件系统 + 通知机制。

    Zookeeper的主要作用:用于解决分布式应用集群中的应用系统的统一管理

    • 作为文件系统,用于注册各种分布式应用,存储管理分布式应用的元信息和应用的配置信息
    • 作为通知系统,如果节点或者服务本身的状态出现问题或服务器下线都会通知客户端。

    3、Zookeeper的特点

    (1)Zookeeper:一个领导者(Leader),多个跟随者(Follower)组成的集群

    (2)Zookeeper集群中只要有半数以上节点存活,Zookeeper集群就能正常服务。所以Zookeeper适合安装奇数台服务器。

    (3)全局数据一致:每个Server保存一份相同的数据副本,Client无论连接到哪个Server,数据都是一致的。

    (4)更新请求顺序执行,来自同一个Client的更新请求按其发送顺序依次执行,即先进先出

    (5)数据更新原子性,一次数据更新要么成功,要么失败。

    (6)实时性,在一定时间范围内,Client能读到最新数据。

    Zookeeper数据结构

    ZooKeeper数据模型的结构与Linux文件系统很类似,整体上可以看作是一棵树,每个节点称做一个ZNode。每一个ZNode默认能够存储1MB的数据,每个ZNode都可以通过其路径唯一标识。

    4、Zookeeper应用场景

    提供的服务包括:统一命名服务、统一配置管理、统一集群管理、服务器节点动态上下线、软负载均衡等

    ●统一命名服务
    在分布式环境下,经常需要对应用/服务进行统一命名,便于识别。例如:IP不容易记住,而域名容易记住。

    ●统一配置管理
    (1)分布式环境下,配置文件同步非常常见。一般要求一个集群中,所有节点的配置信息是一致的,比如Kafka集群。对配置文件修改后,希望能够快速同步到各个节点上。
    (2)配置管理可交由ZooKeeper实现。可将配置信息写入ZooKeeper上的一个Znode。各个客户端服务器监听这个Znode。一旦Znode中的数据被修改,ZooKeeper将通知各个客户端服务器。

    ●统一集群管理
    (1)分布式环境中,实时掌握每个节点的状态是必要的。可根据节点实时状态做出一些调整。
    (2)ZooKeeper可以实现实时监控节点状态变化。可将节点信息写入ZooKeeper上的一个ZNode。监听这个ZNode可获取它的实时状态变化。

    ●服务器动态上下线
    客户端能实时洞察到服务器上下线的变化。

    ●软负载均衡
    在Zookeeper中记录每台服务器的访问数,让访问数最少的服务器去处理最新的客户端请求。

    二、Zookeeper集群部署

    环境:

    准备 3 台服务器做 Zookeeper 集群

    192.168.3.104

    192.168.3.105

    192.168.3.106

    1、环境准备

    三台服务器都要做

    1. //关闭防火墙
    2. systemctl stop firewalld
    3. systemctl disable firewalld
    4. setenforce 0
    5. //安装 JDK
    6. yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel
    7. java -version
    8. //下载安装包
    9. 官方下载地址:https://archive.apache.org/dist/zookeeper/
    10. cd /opt
    11. wget https://archive.apache.org/dist/zookeeper/zookeeper-3.5.7/apache-zookeeper-3.5.7-bin.tar.gz

    2、安装Zookeeper

    1. cd /opt
    2. tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz
    3. mv apache-zookeeper-3.5.7-bin /usr/local/zookeeper-3.5.7

    3、修改配置文件

    1. cd /usr/local/zookeeper-3.5.7/conf/
    2. cp zoo_sample.cfg zoo.cfg
    3. vim zoo.cfg
    4. tickTime=2000 #通信心跳时间,Zookeeper服务器与客户端心跳时间,单位毫秒
    5. initLimit=10 #Leader和Follower初始连接时能容忍的最多心跳数(tickTime的数量),这里表示为10*2s
    6. syncLimit=5 #Leader和Follower之间同步通信的超时时间,这里表示如果超过5*2s,Leader认为Follwer死掉,并从服务器列表中删除Follwer
    7. dataDir=/usr/local/zookeeper-3.5.7/data ●修改,指定保存Zookeeper中的数据的目录,目录需要单独创建
    8. dataLogDir=/usr/local/zookeeper-3.5.7/logs ●添加,指定存放日志的目录,目录需要单独创建
    9. clientPort=2181 #客户端连接端口
    10. #添加集群信息
    11. server.1=192.168.3.104:3188:3288
    12. server.2=192.168.3.105:3188:3288
    13. server.3=192.168.3.106:3188:3288

    4、创建数据目录和日志目录,并在data目录中创建一个 myid 的文件

    1. mkdir /usr/local/zookeeper-3.5.7/data
    2. mkdir /usr/local/zookeeper-3.5.7/logs
    3. //server4上输入:
    4. echo 1 > /usr/local/zookeeper-3.5.7/data/myid
    5. //server5上输入:
    6. echo 2 > /usr/local/zookeeper-3.5.7/data/myid
    7. //server6上输入:
    8. echo 3 > /usr/local/zookeeper-3.5.7/data/myid

    5、编写启动脚本,并启动

    1. vim /etc/init.d/zookeeper
    2. #!/bin/bash
    3. #chkconfig: 2345 20 90
    4. #description:Zookeeper Service Control Script
    5. ZK_HOME='/usr/local/zookeeper-3.5.7'
    6. case $1 in
    7. start)
    8. echo "---------- zookeeper 启动 ------------"
    9. $ZK_HOME/bin/zkServer.sh start
    10. ;;
    11. stop)
    12. echo "---------- zookeeper 停止 ------------"
    13. $ZK_HOME/bin/zkServer.sh stop
    14. ;;
    15. restart)
    16. echo "---------- zookeeper 重启 ------------"
    17. $ZK_HOME/bin/zkServer.sh restart
    18. ;;
    19. status)
    20. echo "---------- zookeeper 状态 ------------"
    21. $ZK_HOME/bin/zkServer.sh status
    22. ;;
    23. *)
    24. echo "Usage: $0 {start|stop|restart|status}"
    25. esac
    26. // 设置开机自启
    27. chmod +x /etc/init.d/zookeeper
    28. chkconfig --add zookeeper
    29. //分别启动 Zookeeper
    30. service zookeeper start
    31. //查看当前状态
    32. service zookeeper status

    三、Kafka概述

    Kafka 定义

    Kafka 是一个分布式的基于发布/订阅模式的消息队列(MQ,Message Queue),主要应用于大数据领域的实时计算以及日志收集。

    Kafka的特性

    高吞吐量、低延迟
    Kafka 每秒可以处理几十万条消息,它的延迟最低只有几毫秒。每个 topic 可以分多个 Partition,Consumer Group 对 Partition 进行消费操作,提高负载均衡能力和消费能力。

    可扩展性
    kafka 集群支持热扩展

    持久性、可靠性
    消息被持久化到本地磁盘,并且支持数据备份防止数据丢失

    容错性
    允许集群中节点失败(多副本情况下,若副本数量为 n,则允许 n-1 个节点失败)

    高并发
    支持数千个客户端同时读写

    1、为什么需要消息队列(MQ)

    主要原因是由于在高并发环境下,同步请求来不及处理,请求往往会发生阻塞。比如大量的请求并发访问数据库,导致行锁表锁,最后请求线程会堆积过多,从而触发 too many connection 错误,引发雪崩效应。
    我们使用消息队列,通过异步处理请求,从而缓解系统的压力。消息队列常应用于异步处理,流量削峰,应用解耦,消息通讯等场景。

    当前比较常见的 MQ 中间件有 ActiveMQ、RabbitMQ、RocketMQ、Kafka、Pulsar 等

    2、使用消息队列的好处

    (1)解耦
    允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。

    (2)可恢复性
    系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。

    (3)缓冲
    有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。

    (4)灵活性 & 峰值处理能力
    在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。

    (5)异步通信
    很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。

    3、消息队列的两种模式

    (1)点对点模式(一对一,消费者主动拉取数据,消息收到后消息清除)


    消息生产者生产消息发送到消息队列中,然后消息消费者从消息队列中取出并且消费消息。消息被消费以后,消息队列中不再有存储,所以消息消费者不可能消费到已经被消费的消息。消息队列支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。

    (2)发布/订阅模式(一对多,又叫观察者模式,消费者消费数据之后不会清除消息)


    消息生产者(发布)将消息发布到 topic 中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到 topic 的消息会被所有订阅者消费。
    发布/订阅模式是定义对象间一种一对多的依赖关系,使得每当一个对象(目标对象)的状态发生改变,则所有依赖于它的对象(观察者对象)都会得到通知并自动更新。

    4、Kafka的系统架构

    (1)Broker
    一台 kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker 可以容纳多个 topic。

    (2)Topic
    可以理解为一个队列,生产者和消费者面向的都是一个 topic。
    类似于数据库的表名或者 ES 的 index
    物理上不同 topic 的消息分开存储

    (3)Partition
    为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上,一个 topic 可以分割为一个或多个 partition,每个 partition 是一个有序的队列。Kafka 只保证 partition 内的记录是有序的,而不保证 topic 中不同 partition 的顺序。

    四、Kafka群集部署

    在之前部署了Zookeeper的服务器上部署,因为Kafka依赖Zookeeper来存储管理的元信息和配置信息。在高版本Kafka中,慢慢的就脱离了对Zookeeper的依赖。

    1、下载安装包

    1. cd /opt
    2. wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.7.1/kafka_2.13-2.7.1.tgz

    2、安装 Kafka并修改配置文件

    1. cd /opt/
    2. tar zxvf kafka_2.13-2.7.1.tgz
    3. mv kafka_2.13-2.7.1 /usr/local/kafka
    4. //修改配置文件
    5. cd /usr/local/kafka/config/
    6. cp server.properties{,.bak}
    7. vim server.properties
    8. broker.id=021行,broker的全局唯一编号,每个broker不能重复,因此要在其他机器上配置 broker.id=1、broker.id=2
    9. listeners=PLAINTEXT://192.168.80.10:909231行,指定监听的IP和端口,如果修改每个broker的IP需区分开来,也可保持默认配置不用修改
    10. zookeeper.connect=192.168.3.104:2181,192.168.3.105:2181,192.168.3.106:2181123行,配置连接Zookeeper集群地址

    3、修改环境变量

    1. vim /etc/profile
    2. export KAFKA_HOME=/usr/local/kafka
    3. export PATH=$PATH:$KAFKA_HOME/bin
    4. source /etc/profile

    4、配置Kafka启动脚本并启动

    1. vim /etc/init.d/kafka
    2. #!/bin/bash
    3. #chkconfig:2345 22 88
    4. #description:Kafka Service Control Script
    5. KAFKA_HOME='/usr/local/kafka'
    6. case $1 in
    7. start)
    8. echo "---------- Kafka 启动 ------------"
    9. ${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties
    10. ;;
    11. stop)
    12. echo "---------- Kafka 停止 ------------"
    13. ${KAFKA_HOME}/bin/kafka-server-stop.sh
    14. ;;
    15. restart)
    16. $0 stop
    17. $0 start
    18. ;;
    19. status)
    20. echo "---------- Kafka 状态 ------------"
    21. count=$(ps -ef | grep kafka | egrep -cv "grep|$$")
    22. if [ "$count" -eq 0 ];then
    23. echo "kafka is not running"
    24. else
    25. echo "kafka is running"
    26. fi
    27. ;;
    28. *)
    29. echo "Usage: $0 {start|stop|restart|status}"
    30. esac
    31. //设置开机自启
    32. chmod +x /etc/init.d/kafka
    33. chkconfig --add kafka
    34. //分别启动 Kafka
    35. service kafka start

    五、Filebeat+Kafka+ELK结构部署

    环境
    主机名IP地址部署的应用
    node01192.168.3.101ElasticSearch、kibana
    node02192.168.3.102ElasticSearch
    apache192.168.3.100Logstash、Apache
    filebeat192.168.3.103Filebeat
    server4192.168.3.104Zookeeper、Kafka
    server5192.168.3.105Zookeeper、Kafka
    server6192.168.3.106Zookeeper、Kafka

    ELK+Filebeat请看上一篇文章,Kafka部署请看上方

    将Kafka与ELK+Filebeat结合起来,主要修改Filebeat节点上的配置文件即可

    1、在filebeat节点上修改配置文件

    1. cd /usr/local/filebeat
    2. vim filebeat.yml
    3. filebeat.input:
    4. - type: log
    5. enabled: true
    6. paths:
    7. - /var/log/httpd/access_log
    8. tags: ["access"]
    9. - type: log
    10. enabled: true
    11. paths:
    12. - /var/log/httpd/error_log
    13. tags: ["error"]
    14. ......
    15. #添加输出到 Kafka 的配置
    16. output.kafka:
    17. enabled: true
    18. hosts: ["192.168.3.104:9092","192.168.3.105:9092","192.168.3.106:9092"] #指定 Kafka 集群配置
    19. topic: "httpd" #指定 Kafka 的 topic

    2、在 Logstash 组件所在节点上新建一个 Logstash 配置文件

    1. cd /etc/logstash/conf.d/
    2. vim kafka.conf
    3. input {
    4. kafka {
    5. bootstrap_servers => "192.168.3.104:9092,192.168.3.105:9092,192.168.3.106:9092" #kafka集群地址
    6. topics => "httpd" #拉取的kafka的指定topic
    7. type => "httpd_kafka" #指定 type 字段
    8. codec => "json" #解析json格式的日志数据
    9. auto_offset_reset => "latest" #拉取最近数据,earliest为从头开始拉取
    10. decorate_events => true #传递给elasticsearch的数据额外增加kafka的属性数据
    11. }
    12. }
    13. output {
    14. if "access" in [tags] {
    15. elasticsearch {
    16. hosts => ["192.168.3.100:9200"]
    17. index => "httpd_access-%{+YYYY.MM.dd}"
    18. }
    19. }
    20. if "error" in [tags] {
    21. elasticsearch {
    22. hosts => ["192.168.3.100:9200"]
    23. index => "httpd_error-%{+YYYY.MM.dd}"
    24. }
    25. }
    26. stdout { codec => rubydebug }
    27. }

    3、最后启动logstash并验证

    1. #启动 logstash
    2. logstash -f kafka.conf
    3. 然后可以访问http://192.168.3.101:9200去查看
    4. 最后访问http://192.168.3.101:5601去添加索引
  • 相关阅读:
    “蔚来杯“2022牛客暑期多校训练营2
    Word文件不能编辑是什么原因?
    【深度学习】使用Pytorch实现的用于时间序列预测的各种深度学习模型类
    RPA-3 之B/S第二讲:selenium获取单元素的方法
    flutter背景图片设置
    多微服务合并为一个服务
    牛牛P50591 解的分配问题,高精度组合数
    2022“杭电杯”中国大学生算法设计超级联赛(10)签到题5题
    VPS是什么?详解亚马逊云科技Amazon Lightsail(VPS)虚拟专用服务器
    Docker 哲学 - docker save | load | export | import 及实践
  • 原文地址:https://blog.csdn.net/x74188/article/details/133976439