• kafka


    kafka带入集群当中,实现elkfk

    zookeeper集群+kafka集群

    1、zookeeper

    1.1概念

    zookeeper是一个开源的、分布式的,为分布式架构提供协调的apache的项目

    1.2作用

    保存元数据

    1.3zookeeper的工作机制

    观察者模式设计的分布式服务器管理架构

    负责存储和管理元数据,记录集群的变化,保存集群变化的信息

    1.4zookeeper的特点

    1、在集群中分为领导者和追随者,组成的集群
    
    2、只要有半数以上的节点正常工作,整个zookeeper就可以正常工作。zookeeper集群在部署时一般选择奇数台
    
    3、全局数据一致,每个zookeeper不论是领导者还是追随者,在访问他们的数据时都是一致的
    
    4、数据更新的原子性,一次更新数据,要么都成功,要么都失败
    
    5、数据更新的实时性
    
    6、领导者和追随者根据投票产生
    

    1.5选举机制

    1、服务器A启动,发起一次选举,A会投自己一票。A有一票不够半数,选举无法完成,A进入looking状态
    
    2、服务器B启动,再发起一次选举,服务器B也投自己一票,服务器A和B会做个比较,谁的myid大,如果A比B小,A会把票改投给B,B有两票,B自动当选为leader
    
    3、C启动,自动成为追随者,A也会成为追随者
    

    1.6部署zookeeper

    mysql1、2、3
    systemctl stop firewalld
    setenforce 0
    
    #时间同步
    yum -y install ntpdate
    ntpdate ntp.aliyun.com
    date
    
    #安装java环境
    yum -y install java
    java -version
    
    #zookeeper安装包解压
    tar -xf apache-zookeeper-3.5.7-bin.tar.gz
    mv apache-zookeeper-3.5.7-bin /opt/zookeeper
    
    cd zookeeper
    cd conf
    cp zoo_sample.cfg zoo.cfg
    
    #修改配置文件
    vim zoo.cfg
    2ticktime=2000
    #通信心跳时间,zookeeper服务端和客户端之间通信的间隔时间,单位是毫秒
    5initlimit=10
    #leader和follower初始连接时最多能容忍的心跳数,单位是秒
    8synclimit=5
    #leader和followe之间同步通信的超时时间,5*2的时间发生超时,leader就认为follower死了,会从集群中将其删除
    12datadir=/opt/zookeeper/data
    #zookeeper的数据保存目录
    13dataLogDir=/opt/zookeeper/logs
    #日志保存目录
    15clientPort=2181
    #客户端连接服务器的指定端口
    16行添加
    server.1=192.168.230.21:3188:3288
    server.2=192.168.230.22:3188:3288
    server.3=192.168.230.23:3188:3288
    
    server.1:数字id,也就是服务器对应的myid
    192.168.230.21:服务器的ip地址
    3188:zookeeper集群内部通信的端口
    3288:重新选举端口,万一leader挂了,就用这个端口进行内部通信,选举新的leader
    -------------------------------------------------------------------------------------------------------------
    
    #创建数据目录和日志目录
    mkdir /opt/zookeeper/data
    mkdir /opt/zookeeper/logs
    
    #在每个节点的dataDir指定的目录下创建一个 myid 的文件
    echo 1 > /mkdir /opt/zookeeper/data/myid
    echo 2 > /mkdir /opt/zookeeper/data/myid
    echo 3 > /mkdir /opt/zookeeper/data/myid
    
    #配置zookeeper启动脚本
    vim /etc/init.d/zookeeper
    
    #设置开机自启
    chmod +x /etc/init.d/zookeeper
    chkconfig --add zookeeper
    
    #启动zookeeper
    service zookeeper start
    
    #查看zookeeper状态
    service zookeeper status
    

    2、kafka

    2.1概念

    消息队列:MQ

    在高并发环境下,同步的请求来不及处理,请求太对会造成阻塞

    比如大量请求并发到数据库,出现too many connection报错,消息队列使用异步处理方式,可以缓解系统处理请求的压力

    2.2kafka的作用

    1、异步处理
    
    2、系统解耦
    
    每个系统之间独立运行,互相之间没有必然的依赖关系。
    
    微服务架构中的通信对于解耦来说至关重要,各个微服务之间独立运行,分别处理各自的请求和消息,提高整个系统的吞吐量和处理能力。比如电商的订单系统,网站的工单系统,是典型的一个消息队列场景
    
    3、负载均衡
    
    消息队列的负载均衡,把任务发送到多个消费者,多个消费者可以并行处理队列中的消息
    
    4、流量控制和限流
    
    通过延迟方法,处理生产速率和消费者的处理速度(代码控制)
    
    5、数据同步和分发
    
    跨系统的系统同步和日志收集
    
    6、任务调度和定时任务
    
    7、实时数据处理
    
    8、备份和恢复
    

    2.3消息队列的模式

    1、点对点,一对一,消费者消费完数据之后,生产者会自动清除已消毒的数据。一个生产者对应一个消费者(淘汰)
    
    2、发布/订阅模式(一对多模式,观察者模式):消费者数据在消费完之后不会被清除(保留一段时间)。生产者发布一个消息,可以是一个消费者使用,也可以是多个消费者同时使用(主流)
    
    kafka就是发布/订阅模式的消息队列,用于大数据的实时处理领域。RABBITMQ也是发布/订阅模式的消息队列,小集群内部使用
    

    2.4kafka的特性

    1、高吞吐量,低延迟。每秒可以处理几十万条数据,延迟只有几毫秒
    
    2、集群的可扩展性(支持热扩展)
    
    3、消息的持久化:生产者发布的消息可以保存到吸盘当中,防止数据丢失(有时间限制)
    
    3、容错性:挂了一个可以继续使用
    
    4、高并发:数千个客户端可以同时读写
    

    2.5kafka的组件

    1、topic:主题,kafka的基本单元,所有生产者发布的消息都是发到主题,消费者订阅主题,然后消费生产者发布的消息
    
    2、生产者:把消息发布到主题
    
    3、消费者:订阅主题消费,消费生产者发布的消息
    
    4、分区:每个主题都可以分成多个分区,每个分区都是数据的有序子集
    
    分区当中保留数据,按照偏移量来有序的存储数据,消费者可以根据偏分区当中的偏移量来消费指定分区当中的消息(一般不用)
    
    分区还有备份的作用,在创建主题时创建分区,创建分区时要在指定副本数。
    
    分区和执行的集群机器数量一般是保持一致的
    
    副本:备份分区中的消息,最少要2个,互为备份
    
    4、偏移量:消息在分区当中的唯一标识,跟踪和定位消息所在的位置。消费者可以根据偏移量来处理信息/5
    
    5、经纪人broker:处理生产者和消费者的请求(kafka自己),元数据的经纪人/6
    
    6、zookeeper:保存元数据
    

    2.6kafka的工作流程

    生产者将消息发布到指定的主题,每个消息都附带一个key和value

    主题是有多个分区的,生产者把消息写入一个分区(带偏移量)

    经纪人(kafka)分配和处理生产者的发布请求,偏移量也是经纪人分配(在分区中是唯一的)

    消费者订阅主题,获取全量的消费者的消费信息(默认模式),也可以从指定的分区获取消息(代码来完成,一般不用)

    生产者发布的消息会在本地保留一段时间,防止消费者有延迟或者处理速度过慢导致没有成功消费。保留时间默认是7天

    2.7部署kafka

    #kafka安装包解压
    tar -xf kafka_2.13-3.4.1.tgz
    mv kafka_2.13-3.4.1 /usr/local/kafka
    cd /usr/local/kafka/config
    
    #配置文件备份
    cp server.properties server.properties.bak
    
    #修改配置文件
    vim server.properties
    
    24broker.id=0(经纪人id要不一样)
    34listeners=PLAINTETX://192.168.230.21:9092
    #监听本地地址
    44num.network.threads=3
    #broker处理网络请求的线程数,一般不用修改
    48num.io.threads=8
    #数值一定要大于磁盘数,处理磁盘读写的线程数
    52socket.send.buffer.bytes=102400
    #发送套接字缓冲区的大小
    56socket.receive.buffer.bytes=102400
    #接收套接字缓冲区的大小
    60socket.request.buffer.bytes=102400
    #请求套接字缓冲区的大小
    67log.dirs=/usr/local/kafka/logs
    #kafka日志存放的路径
    72num.partitions=1
    #创建主题时经纪人指定的分区数,如果指定的分区数不同,这个值可以被覆盖
    111log.retention.hours=168
    #消息在本地持久化保留的时间是168小时
    118log.segent.bytes=1073741824
    #保存的持久化的文件的大小,超过这个值的也会被删除
    132 zookeeper.connect=192.168.230.21:2181,192.168.230.22:2181,192.168.230.23:2181
    #指定zookeeper集群的ip地址
    
    #声明变量
    vim /etc/profile
    export KAFKA_HOME=/usr/local/kafka
    export PATH=$PATH:$KAFKA_HOME/bin
    
    #立即生效
    source /etc/profile
    
    #主机名映射
    vim /etc/hosts
    192.168.230.21 mysql1
    192.168.230.22 mysql2
    192.168.230.23 mysql3
    
    #配置kafka启动脚本
    vim /etc/init.d/kafka
    
    #设置开机自启
    chmod +x /etc/init.d/kafka
    chkconfig --add kafka
    
    #启动kafka
    service kafka start
    
    #查看端口是否开启
    netstat -antp | grep 9092
    
    #创建主题
    kafka-topics.sh --create --bootstrap-server 192.168.230.21:9092,192.168.230.22:9092,192.168.230.23:9092 --replication-factor 2 --partitions 3 --topic test1
    --replication-factor 2:创建分区的副本数
    --partitions:分区数
    --topic:指定分区的名称
    
    #查看topic数
    kafka-topics.sh --list --bootstrap-server 192.168.230.21:9092,192.168.230.22:9092,192.168.230.23:9092
    
    #模拟发布消息
    kafka-console-producer.sh --broker-list 192.168.230.21;9092,192.168.230.22:9092,192.168.230.23:9092 --topic test1
    test1
    test2
    test3
    
    #消费者订阅主题
    kafka-console-consumer.sh --bootstrap-server 192.168.230.21:9092,192.168.230.22:9092,192.168.230.23:9092 --topic test1 --from-beginning
    
    #查看主题的详细信息
    kafka-topics.sh --describe --bootstrap-server 192.168.230.21:9092,192.168.230.22:9092,192.168.230.23:9092
    
    Topic: test1	TopicId: 3cMz2ZiYSr2rxy9jzGCyeQ	PartitionCount: 3	ReplicationFactor: 2	Configs: 
    	Topic: test1	Partition: 0	Leader: 2	Replicas: 2,1	Isr: 2,1
    	Topic: test1	Partition: 1	Leader: 1	Replicas: 1,0	Isr: 1,0
    	Topic: test1	Partition: 2	Leader: 0	Replicas: 0,2	Isr: 0,2
    
    Topic:主题名称
    Partition:分区,后面的0 1 2是偏移量
    Leader:分区的领导者用来处理分区的读写操作,只有在指定写分区和指定读分区的时候才有工作,如果不是指定,全量展示则无意义
    Replicas:分区的副本,0 1 2对应的是broker id
    Isr:表示当前与领导者同步的副本
    -----------------------------------------------------------------------------------------------------------------------------------------------------------
    
    #删除主题
    kafka-topics.sh --delete --bootstrap-server 192.168.230.21:9092,192.168.230.22:9092,192.168.230.23:9092 --topic test1
    

    2.8kafka的消息堆积该如何解决

    产生的原因:
    消费者出现了延迟或者处理能力太差,导致消息堆积。
    
    方法:
    1、减少kafka持久化的保存时间
    
    2、修改主题的分区数,扩大分区的数量,提高消费者获取的通道
    
    修改分区数:kafka-topics.sh --bootstrap-server 192.168.230.21:9092,192.168.230.22:9092,192.168.230.23:9092 --alter --topic test1 --partitions 6
    
    3、可以指定多个消费者共同工作,处理消息的挤压
    
  • 相关阅读:
    Nginx实例配置详解
    JAVA泡泡堂网络游戏的设计与实现免费源代码+LW
    nginx--指令
    基于STM32单片机和AD9850的智能DDS函数信号发生器
    Axure RP 教程如何隐藏和显示小部件教程
    云原生简介 (Cloud Native)
    Linux 网络编程基础知识
    WEB组态编辑器插件(BY组态)介绍
    解题元宇宙,网络游戏中的多元通信方案
    animate动画库的使用步骤
  • 原文地址:https://blog.csdn.net/QChestnut/article/details/140931301