• Zookeeperr集群+kafka集群


    Zookeeperr集群+kafka集群

    一、Zookeeper

    1、概念

    结构:分布式,为分布式架构提供协调服务的apache的项目

    作用:保存元数据

    工作机制:观察者模式设计的分布式服务器管理架构。负责存储和管理元数据,记录集群的变化,保存集群变化的信息

    2、特点

    2.1 在集群中分为领导者和追随者,组成的集群

    2.2 只要有半数以上的节点正常工作,整个zookeeper就可以正常工作,zookeeper在部署时一般选择奇数台

    2.3 全局数据一致,每个zookeeper不论是领导者还是追随者,在访问他们的数据时都是一致的

    2.4 数据更新的原子性,一次更新数据,要么都成功,要么都失败

    2.5 数据更新实时性

    2.6 领导者和追随者根据投票产生

    拓展:选举机制

    2.6.1 选举机制:

    A B C

    1、服务器A启动,发起一次选举,A会投自己一票。A有一票,不够半数。选举无法完成,A进入looking

    2、服务器B启动,再发起一次选举。服务器B也投自己一票

    服务器A和B会比较myid,谁的myid大,如果A比B小,A会把票改投给B,B自动当选为领导

    3、C启动,自动成为追随者,A也会成为追随者

    3、安装zookeeper

    mysql1-7 192.168.100.17

    mysql2-8 192.168.100.18

    mysql3-9 192.168.100.19

    # mysql1、2、3
    systemctl stop firewalld
    setenforce 0
    yum -y install ntpdate
    ntpdate ntp.aliyun.com
    date
    
    # 部署java环境  apache都是基于java环境的
    yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel
    
    java -version
    # 1.80_412
    cd /opt   # 拖包apache-zookeeper-3.5.7-bin.tar.gz
    tar -xf apache-zookeeper-3.5.7-bin.tar.gz
    mv apache-zookeeper-3.5.7-bin /opt/zookeeper
    cd /opt/zookeeper/conf
    cp zoo_samlpe.cfg zoo.cfg
    vim zoo.cfg
    tickTime=2000
    # 通信心跳时间,zookeeper服务端和客户端之间通信的间隔时间,单位是毫秒
    initLimit=10
    # leader和follower初始连接时,最多能容忍的心跳数。单位秒
    syncLimit=5
    # leader和follower之间同步通信的超时时间,如果5*2时间内发生超时,leader就认为follower死了,会从集群中将其删除daraDir=/opt/zookeeper/data
    下方插入
    dataLogDir=/opt/zookeeper/logs
    
    clientPort=2181
    下方插入
    server.1=192.168.100.17:3188:3288
    server.2=192.168.100.18:3188:3288
    server.3=192.168.100.19:3188:3288
    
    # server.1 表示数字id,也就是数字服务器对应的myid
    # 192.168.100.17 服务器ip地址
    # 3188 zookeeper服务器内部通信的端口
    # 3288 重新选举端口,万一leader挂了,用这个端口进行内部通信,选举新的leader
    wq!
    
    mkdir /opt/zookeeper/data
    mkdir /opt/zookeeper/logs
    
    # 1
    echo 1 > /opt/zookeeper/data/myid
    
    # 2
    echo 2 > /opt/zookeeper/data/myid
    
    # 3
    echo 3 > /opt/zookeeper/data/myid
    
    vim /etc/init.d/zookeeper
    #!/bin/bash
    #chkconfig:2345 20 90
    #description:Zookeeper Service Control Script
    ZK_HOME='/opt/zookeeper'
    case $1 in
    start)
    	echo "---------- zookeeper 启动 ------------"
    	$ZK_HOME/bin/zkServer.sh start
    ;;
    stop)
    	echo "---------- zookeeper 停止 ------------"
    	$ZK_HOME/bin/zkServer.sh stop
    ;;
    restart)
    	echo "---------- zookeeper 重启 ------------"
    	$ZK_HOME/bin/zkServer.sh restart
    ;;
    status)
    	echo "---------- zookeeper 状态 ------------"
    	$ZK_HOME/bin/zkServer.sh status
    ;;
    *)
        echo "Usage: $0 {start|stop|restart|status}"
    esac
    wq!
    
    chmod +x /etc/init.d/zookeeper
    chkconfig --add zookeeper
    service zookeeper start
    service zookeeper status    # 查看zookeeper的状态
    

    二、Kafka

    1、概述

    消息队列:MQ

    应用场景:在高并发环境下,同步的请求来不及处理,请求太多会造成阻塞

    经典报错:大量请求到数据库,too many connection报错,比较常见,请求数太多,处理不了

    消息队列,使用异步处理方式,可以缓解系统处理请求的压力

    2、作用

    1、异步处理

    2、系统解耦

    解耦:每个系统之间独立运行,互相之间没有必然的依赖关系

    解耦的工作环境:微服务架构中的通信对于解耦来说至关重要,各微服务之间独立运行,分别处理各自的请求和消息,提高整个系统的吞吐量和处理能力。(电商的订单系统,网站的工单系统,典型的一个消息队列场景)

    3、负载均衡

    消息队列的负载均衡:把任务发送到多个消费者,多个消费者可以并行处理队列中的消息

    4、流量控制和限流

    通过延迟方式,处理生产速率和消费者的处理速度(代码控制)

    5、数据同步和分发。

    跨系统的数据同步和日志收集

    6、任务调度和定时任务(开发)

    7、实时数据处理

    8、备份和恢复

    3、消息队列的模式

    1、点对点 一对一 消费者消费玩数据之后,生产者会自动清除已消费的数据。一个生产者对应一个消费者(淘汰)

    2、发布订阅模式(一对多,又叫观察者模式,消费者数据在消费玩之后不会被清除——保留一段时间)即生产者发布一个消息,可以是一个消费者使用,也可以是多个消费者同时使用(主流)kafka就是发布订阅模式的消息队列,主要用于大数据实时处理领域

    RabbitMQ也是发布定义模式的消息,小集群内部使用

    4、特性

    1、高吞吐量,低延迟,每秒可以处理几十万条数据,延迟只有几毫秒

    2、集群的可扩展性(热扩展)

    3、消息持久化:生产者发布的消息可以保存到磁盘当中,防止数据丢失(有时间限制,非永久)

    4、容错性:挂了一个可以继续使用

    5、高并发:数千个客户端可以同时读写

    5、kafka的组件:

    1、topic 主题:kafka的基本单元,所有生产者发布的消息都是发到主题。消费者订阅主题,然后消费生产者发布的消息

    2、生产者和消费者

    生产者:把消息发布到主题

    消费者:订阅生产者发布到消息

    3、分区:每个主题都可以分成多个分区,每个分区都是数据的有序子集。分区当中保留数据,按照偏移量来有序的存储数据,消费者可以根据分区的偏移量来消费指定分区里面的信息(一般不用)

    4、偏移量:消息在分区当中的唯一标识,跟踪和定位消息所在的位置,消费者可以根据偏移量来处理信息

    分区还有备份的作用:我们在创建主题时创建分区,创建分区是要指定副本数。分区和我们执行的集群机器数量一般是保持一致的

    副本:备份分区中的消息,最少要两个,互为备份

    5、经纪人(broker):经纪人处理生产者和消费者的请求(kafka)、元数据(zookeeper)

    6、zookeeper:保存元数据(ip地址、集群信息)

    6、kafka工作流程:

    生产者将消息发布到指定的主题,每个消息都附带一个key和value

    主题是有多个分区的,生产者把消息写入一个分区(带偏移量)

    经纪人(kafka)分配和处理生产者的发布请求,偏移量也是经纪人分配(在分区中偏移量唯一)

    消费者订阅主题,一是获取全量的消费者信息(默认模式),也可以从指定的分区获取消息(代码完成,一般不用)

    生产者发布的消息会在本地保留一段时间,防止消费者有延迟或者处理速度过慢,导致没有成功消费(默认保留时间:七天)

    7、安装kafka集群

    mysql1-7 192.168.100.17

    mysql2-8 192.168.100.18

    mysql3-9 192.168.100.19

    # mysql1、2、3
    cd /opt   拖包kafka_2.13-3.4.1.tgz
    tar -xf kafka_2.13-3.4.1.tgz 
    mv kafka_2.13-3.4.1 /usr/local/kafka
    cd /usr/local/kafka/config
    cp server.properties  server.properties.bak.2024.8.4
    
    # 1
    vim server.properties
    24 broker.id=0  1  2 不能相同
    34 listeners=PLAINTEXT://192.168.100.17:9092
    # 不改需要做映射,改就不用了,改的话指向自己的主机192.168.100.17:9092
    44 num.network.threads=3
    # broker处理网络的线程数,一般不用改
    48 num.io.threads=8
    # 这个的数值一定要大于磁盘数,处理磁盘读写的线程数,如:磁盘为3,则设置4
    52 socket.send.buffer.bytes=102400
    # 发送套接字缓冲区的大小
    53 socket.receive.buffer.bytes=102400
    # 接受套接字缓冲区的大小
    56 socket.request.max.bytes=104857600
    # 请求套接字缓冲区的大小
    
    62 log.dirs=/usr/local/kafka/logs
    67 num.partitions=1
    # 创建主题时,经纪人指定的分区数,如果指定的分区数不同,这个值可以被覆盖
    105 log.retention.hours=168
    # 消息在本地持久化保留的时间,168的单位是小时
    112 #log.segment.bytes=1073741824
    # 保存的持久化文件的大小,超过这个值的也会被删除
    125 zookeeper.connect=192.168.100.17:2181,192.168.100.18:2181,192.168.100.19:2181
    
    
    # 2
    vim server.properties
    24 broker.id=1  # 不能相同
    34 listeners=PLAINTEXT://192.168.100.18:9092
    125 zookeeper.connect=192.168.100.17:2181,192.168.100.18:2181,192.168.100.19:2181
    
    # 3
    vim server.properties
    24 broker.id=2  # 不能相同
    34 listeners=PLAINTEXT://192.168.100.19:9092
    125 zookeeper.connect=192.168.100.17:2181,192.168.100.18:2181,192.168.100.19:2181
    
    wq!   #(1、2、3)
    
    # mysql1、2、3
    vim /etc/profile    # 在最后一行添加
    export KAFKA_HOME=/usr/local/kafka
    export PATH=$PATH:$KAFKA_HOME/bin
    wq!
    source /etc/profile
    
    # 配置kafka启动脚本
    vim /etc/init.d/kafka
    #!/bin/bash
    #chkconfig:2345 22 88
    #description:Kafka Service Control Script
    KAFKA_HOME='/usr/local//kafka'
    case $1 in
    start)
    	echo "---------- Kafka 启动 ------------"
    	${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties
    ;;
    stop)
    	echo "---------- Kafka 停止 ------------"
    	${KAFKA_HOME}/bin/kafka-server-stop.sh
    ;;
    restart)
    	$0 stop
    	$0 start
    ;;
    status)
    	echo "---------- Kafka 状态 ------------"
    	count=$(ps -ef | grep kafka | egrep -cv "grep|$$")
    	if [ "$count" -eq 0 ];then
            echo "kafka is not running"
        else
            echo "kafka is running"
        fi
    ;;
    *)
        echo "Usage: $0 {start|stop|restart|status}"
    esac
    wq!
    
    chmod +x /etc/init.d/kafka
    chkconfig --add kafka
    service kafka start
    netstat -antp | grep 9092
    
    # 1
    # 如何创建主题
    kafka-topics.sh --create --bootstrap-server 192.168.100.17:9092,192.168.100.18:9092,192.168.100.19:9092 --replication-factor 2 --partitions 3 --topic test1
    
    # --replication-factor 2   创建分区的副本数2
    # --partitions 3   分区数
    # --topic test1    指定主题的名称
    
    # 查看topic数
    kafka-topics.sh --list --bootstrap-server 192.168.100.17:9092,192.168.100.18:9092,192.168.100.19:9092 
    
    # 发布消息命令
    kafka-console-producer.sh --broker-list 192.168.100.17:9092,192.168.100.18:9092,192.168.100.19:9092 --topic test1     # 回车
    >halo
    >world
    >nice
    
    # 2
    kafka-console-consumer.sh --bootstrap-server 192.168.100.17:9091,192.168.100.18:9092,192.168.100.19:9092 --topic test1 --from-beginning    # 回车
    出现halo、world、nice
    
    返回1 继续发布消息,2会同步收到消息
    
    

    查看kafka的详细情况

    kafka-topics.sh --describe --bootstrap-server 192.168.100.17:9092,192.168.100.18:9092,192.168.100.19:9092
    

    ----详细情况展示

    Topic: test1 Topicld: VNuBK4MSRiOSrlaoHlWLnQ PartitionCount: 3 ReplicationFactor: 2 configs.
    Topic: test1Replicas:0,2 lsr: 0,2Partition:0 Leader:0
    Topic: test1Partition:1Leader: 2Replicas:2,1 lsr: 2,1
    Topic: test1Partition:2 Leader:1Replicas:1.0 lsr:1.0
    

    ----解释说明

    topic:主题名称

    Partition:分区,后面会跟偏移量

    leader:分区的领导者,用来处理分区的读写操作,只有在指定写分区和指定读分区时才工作,全量展示,无意义。

    Replicas:副本,0 1 2-------对应broker id

    ISR:当前与领导者同步的副本。0 1 2

    如何删除主题

    kafka-topics.sh --delete --bootstrap-server 192.168.100.17:9092,192.168.100.18:9092,192.168.100.19:9092
    

    总结:zookeeper就是保存集群的元数据

    kafka工作流程?组件的作用?

    8、kafka的消息堆积改如何解决?

    原因:消费者出现延迟或者处理能力太慢了,导致消息堆积

    解决办法:

    1、减少kafka持久化的保存时间

    2、修改主题的分区数,扩大分区的数量,提高消费者获取的通道

    3、可以指定多个消费者共同工作,处理消息的积压

    如何修改分区数?

    kafka-topics.sh --bootstrap-server 192.168.100.17:9092,192.168.100.18:9092,192.168.100.19:9092 --alter --topic test1 --partitions 6
    
    # 查看分区数
    kafka-topics.sh --describe --bootstrap-server 192.168.100.17:9092,192.168.100.18:9092,192.168.100.19:9092
    # 此时变成6个分区
    
  • 相关阅读:
    钉钉OA自定义审批流的创建和使用
    springboot基于微信小程序的宿舍管理系统毕业设计源码
    电商项目常用的五个设计模式场景及分析实现
    Kafka 万亿级消息实践之资源组流量掉零故障排查分析
    【开箱即用】开发了一个基于环信IM聊天室的Vue3插件,从而快速实现仿直播间聊天窗功能
    数据分析er看过来,五款工具有你需要的
    数据筛选 query 函数介绍(Pandas)
    C++-STL-map:map插入元素的几种方式【用数组方式插入数据】【用insert函数插入pair数据】【用insert函数插入value_type数据】
    视觉SLAM笔记一之经典框架解读:视觉里程估计、后端优化、回环检测、建图
    【驱动开发】LED灯的亮灭——通过字符设备驱动的分步实现编写LED驱动,实现设备文件和设备的绑定
  • 原文地址:https://blog.csdn.net/2401_84592402/article/details/140961428