• 基于kafka的日志收集分析平台


    目录

    基于kafka的日志收集分析平台架构图

     数据走向流程

    一、项目目的

    二、项目环境

    三、项目步骤

    准备好3台虚拟机搭建nginx集群

    配置好三台nginx机器的静态ip地址,防止dhcp模式动态获得ip地址对我们服务器造成影响

    三台机器都配置好dns

     dns解析顺序:

    修改主机名

    每一台机器上面写好域名解析

    安装基本软件,解决依赖关系

    安装时间同步服务

    关闭防火墙

    关闭selinux

    nginx搭建

    nginx配置文件修改

    新建我们的配置文件

    语法检测,检测配置文件语法是否正确

    使用三台虚拟机搭建kafka和zookeeper集群

    安装java和kafka

    配置kafka

    配置zookeeper

    启动kafka

    启动zookeeper

    创建一个topic来测试kafka

    创建topic : 

    创建生产者

    创建消费者

    filebeat部署

    yum安装filebeat

    测试filebeat能否生产数据

    启动filebeat服务

     接下来用kafka自带的消费者程序来测试一下我们能否消费到filebeat生产的nginxlog主题里面的数据

     可以到filebeat的记录数据的文件里面看一下消费者有没有成功消费到数据。(/var/lib/filebeat/registry/filebeat/)

     编写python脚本,模拟消费者消费数据,然后将所需字段提取出来整理后放入数据库里面

    日志收集平台详细架构图:


    基于kafka的日志收集分析平台架构图

     数据走向流程

    一、项目目的

            主要是为了模拟企业在大数据背景下的日志收集、存储,分析,消费等流程。

    二、项目环境

    Windows10机器(测试用)、Linux(centos7)、Nginx(1.20.1)、Filebeat(7.17.5)、kafka(1.12)、zookeeper(3.6.3)、Pycharm2020.3、mysql(5.7.34)

    三、项目步骤

    准备好3台虚拟机搭建nginx集群

    配置好三台nginx机器的静态ip地址,防止dhcp模式动态获得ip地址对我们服务器造成影响

    三台机器都配置好dns

     dns解析顺序:

          1、浏览器的缓存
          2、本地hosts文件  --linux(/etc/hosts)
          3、找本地域名服务器  -- linux(/etc/resolv.conf)

    修改主机名

    1. [root@nginx-kafka01 /]# cat /etc/hostname
    2. nginx-kafka01
    1. [root@nginx-kafka02 ~]# cat /etc/hostname
    2. nginx-kafka02
    1. [root@nginx-kafka03 ~]# cat /etc/hostname
    2. nginx-kafka03

    每一台机器上面写好域名解析

    1. [root@nginx-kafka01 /]# cat /etc/hosts
    2. 127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4
    3. ::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
    4. 192.168.44.181 nginx-kafka01
    5. 192.168.44.182 nginx-kafka02
    6. 192.168.44.183 nginx-kafka03

    安装基本软件,解决依赖关系

       yum install wget lsof vim -y

    安装时间同步服务

    1. yum -y install chrony

    设置开机自启,然后开机服务

    1. vim /etc/selinux/config
    2. SELINUX=disabled
    systemctl enable chronyd
    systemctl start chronyd

    关闭防火墙

    1. [root@nginx-kafka01 ~]# systemctl stop firewalld
    2. [root@nginx-kafka01 ~]# systemctl disable firewalld

    关闭selinux

    selinux是linux里面的一个安全子系统,里面有许多关于安全的规则,很麻烦,会影响项目运行。

    1. vim /etc/selinux/config
    2. SELINUX=disabled

            selinux关闭 需要重启机器才能生效,可以看到selinux处于禁用状态

    1. [root@nginx-kafka01 /]# getenforce
    2. Disabled

    nginx搭建

    安装好epel源,本次nginx安装使用yum安装,以一台机器示例:

    1. yum install epel-release -y
    2. yum install nginx -y

    设置开机自启

    systemctl enable nginx

    启动nginx服务

    systemctl start nginx

    查看nginx是否启动成功

    1. [root@nginx-kafka01 /]# ps aux | grep nginx
    2. root 2098 0.0 0.0 40056 984 ? Ss 7月24 0:00 nginx: master process /usr/sbinnginx
    3. nginx 2179 0.0 0.0 40060 1180 ? S 7月24 0:00 nginx: worker process

    nginx配置文件修改

    1. vim nginx.conf
    2. listen 80 default_server;
    3. 修改成:
    4. listen 80;

    新建我们的配置文件

    1. vim /etc/nginx/conf.d/sc.conf
    2. server {
    3. listen 80 default_server;
    4. server_name www.sc.com;
    5. root /usr/share/nginx/html;
    6. access_log /var/log/nginx/sc/access.log main;
    7. location / {
    8. }
    9. }

    语法检测,检测配置文件语法是否正确

    1. [root@nginx-kafka01 html]# nginx -t
    2. nginx: the configuration file /etc/nginx/nginx.conf syntax is ok
    3. nginx: [emerg] open() "/var/log/nginx/sc/access.log" failed (2: No such file or directory)
    4. nginx: configuration file /etc/nginx/nginx.conf test failed
    5. [root@nginx-kafka01 html]# mkdir /var/log/nginx/sc
    6. [root@nginx-kafka01 html]# nginx -t
    7. nginx: the configuration file /etc/nginx/nginx.conf syntax is ok
    8. nginx: configuration file /etc/nginx/nginx.conf test is successful
    9. #重新加载nginx
    10. nginx -s reload

    使用三台虚拟机搭建kafka和zookeeper集群

    以一台机器示例

    安装java和kafka

    yum install java wget  -y
    wget   https://mirrors.bfsu.edu.cn/apache/kafka/2.8.1/kafka_2.12-2.8.1.tgz 

    解压缩

    tar  xf  kafka_2.12-2.8.1.tgz

    配置kafka

    修改config /server.properties:

    设置broker节点,这代表这台kafka机器

    broker.id=1
    
    zookeeper.connect=192.168.44.181:2181,192.168.44.182:2181,192.168.44.183:2181
    

    配置zookeeper

    进入安装zookeeper的目录

    将配置文件copy一份然后改名为zoo.cfg添加如下三行

    1. server.1=192.168.0.94:3888:4888
    2. server.2=192.168.0.95:3888:4888
    3. server.3=192.168.0.96:3888:4888

    3888和4888都是端口  一个用于数据传输,一个用于检验存活性和选举

    创建/tmp/zookeeper目录 ,在目录中添加myid文件,文件内容就是本机指定的zookeeper id内容
    如:在192.168.44.181机器上
    echo 1 > /tmp/zookeeper/myid

    myid里面的id号要和broker节点号一致,分别设置三台机器。

    查看三台zookeeper的leader和follower情况

    可以看到我设置的kafka02是leader,kafka01和kafka03是follower

    启动kafka

    bin/kafka-server-start.sh -daemon config/server.properties

    启动zookeeper

    [root@nginx-kafka01 bin]# ./zkCli.sh 
    

     此时我们应该看到三个brokers的id

    1. [zk: localhost:2181(CONNECTED) 3] ls /
    2. [admin, brokers, cluster, config, consumers, controller, controller_epoch, feature, isr_change_notification, latest_producer_id_block, log_dir_event_notification, sc, zookeeper]
    3. [zk: localhost:2181(CONNECTED) 4] ls /brokers/ids
    4. [1, 2, 3]

    创建broker

    1. [zk: localhost:2181(CONNECTED) 3] create /sc/yy
    2. Created /sc/yy
    3. [zk: localhost:2181(CONNECTED) 4] ls /sc
    4. [page, xx, yy]
    5. [zk: localhost:2181(CONNECTED) 5] set /sc/yy 90
    6. [zk: localhost:2181(CONNECTED) 6] get /sc/yy
    7. 90

    创建一个topic来测试kafka

    创建topic : 

    bin/kafka-topics.sh --create --zookeeper 192.168.44.181:2181 --replication-factor 1 --partitions 1 --topic test
    

     效果图:

    创建生产者

    bin/kafka-console-producer.sh --broker-list 192.168.44.181:9092 --topic test
    

    创建消费者

    bin/kafka-console-consumer.sh --bootstrap-server 192.168.44.181:9092 --topic test
    

    消费成功效果图:

    消费者消费到了生产者产生的数据

    filebeat部署

    安装

    rpm --import https://packages.elastic.co/GPG-KEY-elasticsearch

    编辑 vim/etc/yum.repos.d/fb.repo

    1. [elastic-7.x]
    2. name=Elastic repository for 7.x packages
    3. baseurl=https://artifacts.elastic.co/packages/7.x/yum
    4. gpgcheck=1
    5. gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
    6. enabled=1
    7. autorefresh=1
    8. type=rpm-md

    yum安装filebeat

    yum  install  filebeat -y

    设置开机自启

    systemctl enable filebeat

    修改filebeat配置文件,filebeat的配置文件是yml格式的,

    首先将filebeat的配置文件filebeat.yml备份一份为filebeat.yml.bak

    [root@nginx-kafka01 filebeat]# cp filebeat.yml filebeat.yml.bak
    

    然后再将filebeat.yml文件清空,加上我们自己配置的一些配置

    1. filebeat.inputs:
    2. - type: log
    3. # Change to true to enable this input configuration.
    4. enabled: true
    5. # Paths that should be crawled and fetched. Glob based paths.
    6. paths:
    7. - /var/log/nginx/sc/access.log
    8. #==========------------------------------kafka-----------------------------------
    9. output.kafka:
    10. hosts: ["192.168.44.181:9092","192.168.44.182:9092","192.168.44.183:9092"]
    11. topic: nginxlog
    12. keep_alive: 10s

    配置好了配置文件,就可以通过filebeat来收集nginx的日志了

    测试filebeat能否生产数据

    创建主题 :nginxlog(这个主题是我们在filebeat指定好的,filebeat会将生产的数据都吐到这个主题里面)

    bin/kafka-topics.sh --create --zookeeper 192.168.44.181:2181 --replication-factor 3 --partitions 1 --topic nginxlog

    启动filebeat服务

    systemctl start  filebeat

    可以看到,filebeat进程是启动成功了的

     接下来用kafka自带的消费者程序来测试一下我们能否消费到filebeat生产的nginxlog主题里面的数据

    [root@nginx-kafka01 kafka_2.12-2.8.1]# bin/kafka-topics.sh --create --zookeeper 192.168.44.181:2181 --replication-factor 3 --partitions 1 --topic nginxlog
    

    消费成功效果图:

     这个时候,我们可以刷新一下我们nginx的静态页面,没有问题的话,我们消费的数据会10秒刷新一次。

     效果图:

     

     可以到filebeat的记录数据的文件里面看一下消费者有没有成功消费到数据。(/var/lib/filebeat/registry/filebeat/)

    [root@nginx-kafka01 filebeat]# less log.json 
    

     编写python脚本,模拟消费者消费数据,然后将所需字段提取出来整理后放入数据库里面

    1. [root@nginx-kafka01 opt]# cat python_consumer.py
    2. import json
    3. import requests
    4. import time
    5. taobao_url = "https://ip.taobao.com/outGetIpInfo?accessKey=alibaba-inc&ip="
    6. #查询ip地址的信息(省份和运营商isp),通过taobao网的接口
    7. def resolv_ip(ip):
    8. response = requests.get(taobao_url+ip)
    9. if response.status_code == 200:
    10. tmp_dict = json.loads(response.text)
    11. prov = tmp_dict["data"]["region"]
    12. isp = tmp_dict["data"]["isp"]
    13. return prov,isp
    14. return None,None
    15. #将日志里读取的格式转换为我们指定的格式
    16. def trans_time(dt):
    17. #把字符串转成时间格式
    18. timeArray = time.strptime(dt, "%d/%b/%Y:%H:%M:%S")
    19. #timeStamp = int(time.mktime(timeArray))
    20. #把时间格式转成字符串
    21. new_time = time.strftime("%Y-%m-%d %H:%M:%S", timeArray)
    22. return new_time
    23. #从kafka里获取数据,清洗为我们需要的ip,时间,带宽
    24. from pykafka import KafkaClient
    25. client = KafkaClient(hosts="192.168.44.181:9092,192.168.44.182:9092,192.168.44.183:9092")
    26. topic = client.topics['nginxlog']
    27. balanced_consumer = topic.get_balanced_consumer(
    28. consumer_group='testgroup',
    29. auto_commit_enable=True,
    30. zookeeper_connect='nginx-kafka01:2181,nginx-kafka02:2181,nginx-kafka03:2181'
    31. )
    32. consumer = topic.get_simple_consumer()
    33. for message in balanced_consumer:
    34. if message is not None:
    35. line = json.loads(message.value.decode("utf-8"))
    36. log = line["message"]
    37. tmp_lst = log.split()
    38. ip = tmp_lst[0]
    39. dt = tmp_lst[3].replace("[","")
    40. bt = tmp_lst[9]
    41. dt = trans_time(dt)
    42. prov, isp = resolv_ip(ip)
    43. if prov and isp:
    44. print(prov, isp,dt)

    效果图:

     

    日志收集平台详细架构图:

    附带一些原理

     

     

  • 相关阅读:
    弘辽科技:拼多多商品发布被驳回是为什么?被驳回怎么办?
    C++中使用嵌套循环遍历多维数组
    $19服务:DTCStatusMask和statusofDTC bit 定义
    Python中的 if __name__ ==‘main‘
    工程物料管理信息化建设(十二)——关于工程物料管理系统最后的思考
    基于划分的方法、K-均值算法、K-medoids、K-prototype(机器学习)
    vsftp部署匿名及本地登录的注意点
    Simulink模型加密共享
    FPGA——三速自适应以太网设计(1)基本模块
    使用Python进行页面开发——Django常用Web工具
  • 原文地址:https://blog.csdn.net/qq_48391148/article/details/125881430