• kafka与zookeeper的集群


    基础准备

    systemctl stop firewalld && systemctl disable firewalld
    setenforce 0
    sed -i 's/SELINUX=enforcing/SELINUX=disabled/' /etc/selinux/config
    
    vi /etc/hosts
    ip1 node1
    ip2 node2
    ip3 node3
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    zk安装配置

    #全节点
    yum install -y java-1.8.0-openjdk.x86_64 java-1.8.0-openjdk-devel.x86_64
    
    #环境变量
    vi /etc/profile
    export KAFKA_HOME=/kafka_2.12-2.0.0
    export ZK_HOME=/apache-zookeeper-3.7.1-bin
    export KE_HOME=/efak-web-3.0.1
    export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.372.b07-1.el7_9.x86_64
    export JRE_HOME=$JAVA_HOME/jre
    export PATH=$PATH:$KAFKA_HOME/bin:$ZK_HOME/bin:$KE_HOME/bin:$JAVA_HOME/bin:$JRE_HOME/bin
    
    tar -xvf apache-zookeeper-3.7.1-bin.tar.gz
    mkdir -p /apache-zookeeper-3.7.1-bin/data
    mkdir -p /apache-zookeeper-3.7.1-bin/log
    
    cp conf/zoo_sample.cfg conf/zoo.cfg
    
    cat zoo.cfg
    # 未修改的省略
    # The number of milliseconds of each tick
    tickTime=2000
    # The number of ticks that the initial synchronization phase can take
    initLimit=10
    # The number of ticks that can pass between sending a request and getting an acknowledgement
    syncLimit=5
    # the directory where the snapshot is stored.
    # do not use /tmp for storage, /tmp here is just 
    # example sakes.
    dataDir=/apache-zookeeper-3.7.1-bin/data
    # the port at which the clients will connect
    clientPort=2181
    
    ## Metrics Providers
    #
    # https://prometheus.io Metrics Exporter
    # metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
    # metricsProvider.httpPort=7000
    # metricsProvider.exportJvmInfo=true
    dataLogdir=/apache-zookeeper-3.7.1-bin/log
    server.1=ip1:2888:3888
    server.2=ip2:2888:3888
    server.3=ip3:2888:3888
    
    #节点1至3
    echo "1" > /apache-zookeeper-3.7.1-bin/data/myid
    echo "3" > /apache-zookeeper-3.7.1-bin/data/myid
    echo "3" > /apache-zookeeper-3.7.1-bin/data/myid
    
    #启动、停止
    ./zkServer.sh start
    ./zkServer.sh stop
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52

    kafka安装配置

    #全节点
    tar -xvf kafka_2.12-2.0.0.tgz
    
    vi config/server.properties
    
    log.dirs=/var/log/kafka-logs
    zookeeper.connect=ip1:2181,ip2:2181,ip3:2181
    #1至3节点
    broker.id:0
    listeners=PLAINTEXT://ip1:9092
    
    broker.id:1
    
    listeners=PLAINTEXT://ip2:9092
    
    broker.id:2
    listeners=PLAINTEXT://ip3:9092
    
    #启动顺序:先启动zookeeper,后启动kafka
    #关闭顺序:先关闭kafka,后关闭zookeeper (可使用kill命令直接关闭)
    cd /kafka_2.12-2.0.0
    kafka-server-start.sh -daemon config/server.properties &
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    监控

    启用JMX

    #全节点
    vi /kafka_2.12-2.0.0/bin/kafka-server-start.sh
    export JMX_PORT="9999"
    
    • 1
    • 2
    • 3

    在这里插入图片描述

    可视化eagle

    tar -zvxf v3.0.1.tar.gz
    cd kafka-eagle-bin-3.0.1 && tar -zxvf kafka-eagle-web-3.0.1-bin.tar.gz
    
    yum install -y mariadb*
    
    mysqladmin -uroot -p password Mdb123#
    
    MariaDB [(none)]>create user eagle@localhost identified by 'kafka123#';
    MariaDB [(none)]>select user,host from mysql.user;
    MariaDB [(none)]>create database ke;
    MariaDB [(none)]>exit
    
    vi /etc/profile
    export KE_HOME=/efak-web-3.0.1
    export PATH=$PATH:$KAFKA_HOME/bin:$ZK_HOME/bin
    
    
    vi /efak-web-3.0.1/conf/system-config.properties
    
    cluster1.zk.list=ip1:2181,ip2:2181,ip3:2181
    efak.driver=com.mysql.cj.jdbc.Driver
    efak.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
    efak.username=efak
    efak.password=kafka123#
    
    ke.sh start
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    在这里插入图片描述

    集群启动

    按照以上配置,首先启动zookeeper服务,确保znode节点都能够相互通信、协作,然后启动kafka服务,broker按照不同topic、partition选举为不同leader、follower,实现消息传递和存储任务的分布式协作。

    zookeeper介绍

    zookeeper是一个分布式的协调服务,主要用于维护集群的元数据信息和配置信息。kafka集群依赖其存储、管理自身元数据、配置。

    zookeeper在kafka中的作用

    在这里插入图片描述

    1、管理broker节点:broker的上下线、topic信息、partition信息、副本;

    2、存储、管理集群的元数据信息和配置:broker的ip地址、端口、topic、partition的分配;

    3、监控broker节点状态:实现fail over和load balance;

    kafka介绍

    kafka是一个高性能、低延迟、分布式的消息传递系统,特点在于实时处理数据。集群由多个成员节点broker组成,每个节点都可以独立处理消息传递和存储任务。

    在这里插入图片描述

    路由策略

    发布消息由key、value组成,真正的消息是value,key是标识路由消息时所要存放的Partition:

    1、若已指定partition,消息则直接写入到指定partition;

    2、若未指定partition但指定了key,则通过对key的hash值与partition数量取模,结果就是对应的partition;

    3、若partition和key均未指定,则使用轮询算法选出一个partition;

    写入过程

    1、 producer向kafka集群提交连接请求,任意broker节点都会返回broker controller的通信URL(配置文件中的listeners地址);

    2、producer向broker controller询问指定topic所对应的partition的leader列表地址;

    3、broker controller从zk中查找,返回指定topic所对应的partition的leader列表地址;

    4、producer根据消息路由策略,查找符合要求的partition leader,然后发送消息;

    发送ack机制

    1、 acks=0:producer发送的消息到发送端的buffer中就直接返回了,至于这个消息是否真的发送到broker,producer不关心,(类似udp协议);

    2、ack=1:producer发送的消息一定要存储到对应的partition的leader副本日志文件中才算成功,若失败,则会retry。在这种模式下,当消息已经存储在leader副本中,但是follower副本还没来得及同步,leader副本的broker节点挂了,消息才会丢失;

    3、当acks=-1或者all:producer发送的消息一定要存储到对应的partition的所有副本日志文件中才算消息发送成功,若失败,则会retry。在这种模式下,所有副本的broker节点都挂了,才会丢失;

    消费过程

    1、consumer向kafka集群提交连接请求,任意broker节点都会返回broker controller的通信URL(配置文件中的listeners地址);

    2、consumer指定要消费的topic,向broker controller发送poll请求;

    3、broker controller为consumer分配一个或多个partition leader,并将该partitioin的当前offset发送给consumer;

    4、consumer消费完后,向broker发送新的offset;

    5、broker在相应的consumer_offset中更新offset值;

    6、重复1-5,直到consumer停止请求消息。

    特点

    1、producer发布的所有消息会一致保存在kafka集群中,不管消息是否被消费;

    2、可以通过设置保留时间来清理过期的数据;

    3、consumer可以重置offset,从而可以灵活消费存储在broker上的消息;

  • 相关阅读:
    2009-2018年各省涉农贷款数据(wind)
    基于Django中间件引发的编程思想
    学习了解Angular 2 架构
    OTA: Optimal Transport Assignment for Object Detection 论文和代码学习
    Onetable:统一的表格式元数据表示
    论文写作--总结
    基于Or-Tools的整数规划问题求解
    从入门到进阶!当下火爆的大数据技术及算法怎么还能不知道 一起来学习互联网巨头的大数据架构实践!
    QGIS开发笔记(二):Windows安装版二次开发环境搭建(上):安装OSGeo4W运行依赖其Qt的基础环境Demo
    【图像融合】基于双树复小波变换的像素级图像融合算法附matlab代码
  • 原文地址:https://blog.csdn.net/hy19930118/article/details/133702677