• kafka配合ElasticStack技术栈的搭配使用


    今日内容:
        - kafka生产环境调优;
        - kafka配合ElasticStack技术栈的搭配使用;
        - zookeeper集群部署;
        - zookeeper的ACL;
        - zookeeper的调优;
        - PB级别项目;
        - ES8集群搭建/elk; (待定...)
        


    订阅1个的topic:
        老男孩: 10
        
    多个不同的主题分配如下: (linux82-elk: 3)
        c1: 0 3 6 9
        c2: 1 4 7
        c3: 2 5 8


    订阅多个的topic:
        老男孩: 10 t1
        Linux: 10 t2 
        Python: 10  t3
        
    多个不同的主题分配如下: (linux82-elk: 3)
        c1: t1_0 ...  t2_0 ...  t3_0 ...
        c2: t1_1 ...  t2_1 ...
        c3: t1_2  ...  t2_2 ...
        
        
        
        

    订阅1个的topic:
        老男孩: 10
        
    多个不同的主题分配如下: (linux82-elk: 3)
        c1: 0 1 2
        c2: 3 4 5
        c3: 6 7 8 9
        
    10 / 3 = 3  .. 1 ===> 
    999 / 100 = 9 .. 99

    JBOD ---> RAID 0
    ---> RAID

    BOND ---> ...


    注意的参数:
        log.dirs
        auto.create.topics.enable
        zookeeper.connect
        num.io.threads
        
        
    ElasticStack集成kafka实战案例:
        1.创建topic
    kafka-topics.sh --bootstrap-server 10.0.0.103:9092 --create --topic oldboyedu-linux82-kafka-000001

        2.使用filebeat收集日志到kafka集群:
    cat > config/28-nginx-to-kafka.yaml < filebeat.inputs:
    - type: log
      paths:
        - /var/log/nginx/access.log*
      json.keys_under_root: true


    output.kafka:
      # 写入kafka集群的地址
      hosts: 
      - 10.0.0.102:9092
      - 10.0.0.103:9092

      # 写入集群的topic
      topic: "oldboyedu-linux82-kafka-000001"
    EOF


        3.使用logstash收集kafka日志
    cat > config/18-kafka-to-es.conf < input { 
       kafka {
         # 指定kafka的集群
         bootstrap_servers => "10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092"
         # 从哪个topic消费数据
         topics => ["oldboyedu-linux82-kafka-000001"]
         # 指定消费者组
         group_id => "oldboyedu-linux82-logstash"
       }


    filter {
      json {
        source => "message"
        remove_field => ["tags","@version","ecs","agent","input","message"]
      }
      
      geoip {
         source => "clientip"
      }

      date {
         match => [
            "@oldboyedu-timestamp",
            "yyyy-MM-dd'T'HH:mm:ssZ"
         ]
      }

      useragent {
        source => "http_user_agent"
        target => "oldboyedu-linux82-useragent"
      }

      
    }

    output { 
      stdout {}

      elasticsearch {
          hosts => ["10.0.0.101:9200","10.0.0.102:9200","10.0.0.103:9200"]
          index => "oldboyedu-linux82-project-kafka"
          user => "elastic"
          password => "123456"
      }
    }
    EOF

        
        
    zookeeper集群部署:
        1.创建zookeeper的数据目录
    install -d /oldboyedu/data/zk
    data_rsync.sh /oldboyedu/data/zk/


        2.修改单点zk的配置文件
    vim /oldboyedu/softwares/zk/conf/zoo.cfg                          
    ...
    # 定义最小单元的时间范围tick。
    tickTime=2000
    # 启动时最长等待tick数量。
    initLimit=5
    # 数据同步时最长等待的tick时间进行响应ACK
    syncLimit=2
    # 指定数据目录
    dataDir=/oldboyedu/data/zk
    # 监听端口
    clientPort=2181
    # 开启四字命令允许所有的节点访问。
    4lw.commands.whitelist=*
    # server.ID=A:B:C[:D]
    # ID:
    #    zk的唯一编号。
    # A:
    #    zk的主机地址。
    # B:
    #    leader的选举端口,是谁leader角色,就会监听该端口。
    # C: 
    #    数据通信端口。
    # D:
    #    可选配置,指定角色。
    server.101=10.0.0.101:2888:3888
    server.102=10.0.0.102:2888:3888
    server.103=10.0.0.103:2888:3888

        3.同步数据
    data_rsync.sh /oldboyedu/softwares/apache-zookeeper-3.8.0-bin/
        
        
        4.创建myid文件
    for ((host_id=101;host_id<=103;host_id++)) do ssh 10.0.0.${host_id} "echo ${host_id} > /oldboyedu/data/zk/myid";done
        
        5.所有节点启动zk服务
    zkServer.sh start
    zkServer.sh  status

        
        6.链接方式
    zkCli.sh -server 10.0.0.101:2181,10.0.0.102:2181,10.0.0.103:2181

    zookeeper.connect=10.0.0.101:2181,10.0.0.102:2181,10.0.0.103:2181/oldboyedu-linux82-kafka3.2.1
        
    zookeeper的leader选举流程:    
        myid:
            唯一标识一个zookeeper节点。
        zxid:
            唯一事务的标识。用于记录唯一的写操作!
        
        选举流程:
            (1)相比较zxid,若zxid较大,则会成为新的leader;
            (2)如果zxid比较不出来,则比较myid,myid较大者会有限成为新的leader;
        
        
    使用zkWeb管理zookeeper集群:
        1.运行zkWeb
    java -jar zkWeb-v1.2.1.jar &>/dev/null &

        2.访问webUI
    略。

    临时znode:
        当前的会话退出时,znode会默认等待30秒后自动消失,等待时间是可以修改的哟。
        
    持久的znode:
        不会随着客户端的退出而删除znode。


    docker


        

  • 相关阅读:
    【2023提前批 之 面经】~ 锐捷
    Spring Authorization Server优化篇:Redis值序列化器添加Jackson Mixin,解决Redis反序列化失败问题
    LeetCode 算法:反转链表 c++
    iOS应用闪退或崩溃的解决方法
    20篇NeurIPS论文精选:语言大模型的六大趋势
    react笔记-03react-router篇
    Socks5 与 HTTP 代理在网络安全中的应用
    AMD发布22.9.2驱动,支持《禁闭求生(Grounded)》
    流式结构化数据计算语言的进化与新选择
    面试 10年+ Java开发的感受和自省!
  • 原文地址:https://blog.csdn.net/lpx1249115962/article/details/132650159