• 分布式运用之企业级日志ELFK+logstash的过滤模块


    一、ELFK集群部署(Filebeat+ELK

     在搭建ELK的基础上安装Filebeat服务,Filebeat服务可以布置在以下任意一台主机,本次实验将布置在apache服务器的节点上 

    步骤一:安装 Filebeat(在apache节点操作) 

    #上传软件包 filebeat-6.7.2-linux-x86_64.tar.gz 到/opt目录
     tar zxvf filebeat-6.7.2-linux-x86_64.tar.gz  
     mv filebeat-6.7.2-linux-x86_64/ /usr/local/filebeat

    步骤二:设置 filebeat 的主配置文件 

    1. cd /usr/local/filebeat
    2. vim filebeat.yml
    3. filebeat.prospectors:
    4. - type: log #指定 log 类型,从日志文件中读取消息
    5. enabled: true
    6. paths:
    7. #指定监控的日志文件
    8. - /var/log/httpd/access_log
    9. tags: ["filebeat"] #设置索引标签
    10. fields: #可以使用 fields 配置选项设置一些参数字段添加到 output 中
    11. service_name: apache
    12. log_type: access
    13. from: 192.168.73.107
    14. --------------Elasticsearch output-------------------
    15. (全部注释掉)
    16. ----------------Logstash output---------------------
    17. output.logstash:
    18. hosts: ["192.168.73.107:5044"] #指定 logstash 的 IP 和端口

     步骤三: 新建 Logstash 配置文件

    1. cd /etc/logstash/conf.d
    2. cp system.conf filebeat.conf ​
    3. vim filebeat.conf
    4. input {
    5. beats { port => "5044" }
    6. }
    7. output {
    8. elasticsearch {
    9. hosts => ["192.168.73.105:9200","192.168.73.106:9200"]
    10. index =>"%{[fields][service_name]}-%{+YYYY.MM.dd}"
    11. }
    12. stdout {
    13. codec => rubydebug
    14. }
    15. }

    filebeat 功能的引入测试 

    1)创建两个访问页面

    echo "

    this  is   filebeat test

    " >/var/www/html/test1.html
    echo "h1> this is a test    " >/var/www/html/test2.html

    2)启动filebeat和logstash 

      #启动 filebeat
     nohup ./filebeat -e -c filebeat.yml > filebeat.out &
     #-e:输出到标准输出,禁用syslog/文件输出
     #-c:指定配置文件
     #nohup:在系统后台不挂断地运行命令,退出终端不会影响程序的运行
     
     #启动 logstash
    cd /etc/logstash/conf.d
    logstash -f filebeat.conf

     

    3)进行访问测试 

    此时:屏幕输出的日志以及发生改变(Logstash的新建配置文件中output模块中stdout) 

     

    4)kibana中创建于apache相关的索引,并且通过时序,可以查看到访问日志的信息 

     

     

    解决日志与kibana时间不一致的方法 

     

     

    二、 Logstash的过滤模块 

    2.1 Logstash配置文件中的模块

    (1)input {}
    • 指定输入流,通过file、beats、kafka、redis中获取数据

    (2)filter {}

    常用插件:

    • grok:对若干个大文本字段进行再分割,分割成一些小字段 (?<字段名>正则表达式) 字段名:正则表示匹配到的内容
    • date:对数据中的时间进行统一格式化
    • mutate:对一些无用的字段进行剔除,或增加字段
    • mutiline:对多行数据进行统一编排,多行合并和拆分

    (3)ourput {}

    • elasticsearch stdout

    2.2 Filter(过滤模块)中的插件 

    而对于 Logstash 的 Filter,这个才是 Logstash 最强大的地方。Filter 插件也非常多,我们常用到的 grok、date、mutate、mutiline 四个插件。

    对于 filter 的各个插件执行流程,可以看下面这张图:

    grok插件(通过grok插件实现对字段的分割,使用通配符)

    这里就要用到 logstash 的 filter 中的 grok 插件。filebeat 发送给 logstash 的日志内容会放到message 字段里面,logstash 匹配这个 message 字段就可以了。

     2.3 kibana 的DEV Tools 中 Grok Debugger工具的运用

     

    在此我们以apache访问日志索引中的message字段为例 ,进行数据切片:

     假设我们要进行的切片信息为messge中的

    192.168.73.105 - - [13/Nov/2022:02:57:13 +0800] "GET /test1.html HTTP/1.1" 304 - "-" "Mozilla/5.0 (X11; Linux x86_64; rv:52.0) Gecko/20100101 Firefox/52.0"

    数据切片的格式为: 

     匹配格式:(?<字段名>正则表达式) 

    切片案例一:分离访问用户的IP 

    (?%{IPV6}|%{IPV4} )(?.+)

     

    切片案例二:以上面的为基础,将时间分离出来 

    (?%{IPV4}|%{IPV6})[\s-]+\[(?.+)](?.+)

     

    切片案例三:取出状态码和http方法  

    (?%{IPV4}|%{IPV6})[\s-]+\[(?.+)] "GET (?.+")[\s](?\d+) -\s"-"\s(?.+)

     

    2.4 数据切片的实战演练 

     切片前的操作 

    1. vim apache_log.conf
    2. input {
    3. file{
    4. path => "/etc/httpd/logs/access_log"
    5. type => "access"
    6. start_position => "beginning"
    7. }
    8. }
    9. output {
    10. if [type] == "access" {
    11. elasticsearch {
    12. hosts => ["192.168.73.105:9200","192.168.73.105:9200"]
    13. index => "apache_access-%{+YYYY.MM.dd}"
    14. }
    15. }
    16. }
    17. logstash -f apache_log.conf

     

     对apche服务进行访问 

     

     进行切片分离 

    修改logstash的控制conf 

    1. vim apache_log.conf
    2. input {
    3. file{
    4. path => "/etc/httpd/logs/access_log"
    5. type => "access"
    6. start_position => "beginning"
    7. }
    8. }
    9. filter {
    10. grok {
    11. match => ["message","(?%{IPV4}|%{IPV6})[\s-]+\[(?.+)] \"GET (?.+\")[\s](?\d+) -\s\"-\"\s(?.+)"]
    12. }
    13. }
    14. output {
    15. if [type] == "access" {
    16. elasticsearch {
    17. hosts => ["192.168.73.105:9200","192.168.73.105:9200"]
    18. index => "apache_access-%{+YYYY.MM.dd}"
    19. }
    20. }
    21. }
    22. logstash -f apache_log.conf

     

     再次进行访问测试 

     

    三、filebeat中multiline 插件的引入 

      java错误日志一般都是一条日志很多行的,会把堆栈信息打印出来,当经过 logstash 解析后,每一行都会当做一条记录存放到 EslaticSearch中,那这种情况肯定是需要处理的。这里就需要使用 multiline 插件,对属于同一个条日志的记录进行拼接。

    3.1  安装 multiline 插件

    在线安装插件
    cd /usr/share/logstash
    bin/logstash-plugin install logstash-filter-multiline
     
    离线安装插件
    先在有网的机器上在线安装插件,然后打包,拷贝到服务器,执行安装命令
    bin/logstash-plugin install file:///usr/share/logstash/logstash-offline-plugins-6.7.2.zip
     
    检查下插件是否安装成功,可以执行以下命令查看插件列表
    bin/logstash-plugin list

    3.2  使用 multiline 插件 进行日志合并 

    日志合并的过程: 

    • 第一步:每一条日志的第一行开头都是一个时间,可以用时间的正则表达式匹配到第一行。

    • 第二步:然后将后面每一行的日志与第一行合并。

    • 第三步:当遇到某一行的开头是可以匹配正则表达式的时间的,就停止第一条日志的合并,开始合并第二条日志。

    • 第四步:重复第二步和第三步

    filter {
      multiline {
        pattern => "^\d{4}-\d{1,2}-\d{1,2}\s\d{1,2}:\d{1,2}:\d{1,2}.\d{3}"
        negate => true
        what => "previous"
      }
    }

    pattern: 用来匹配文本的表达式,也可以是grok表达式

    what: 如果pattern匹配成功的话,那么匹配行是归属于上一个事件,还是归属于下一个事件。previous: 归属于上一个事件,向上合并。next: 归属于下一个事件,向下合并

    negate:是否对 pattern 的结果取反。false: 不取反,是默认值。true: 取反。将多行事件扫描过程中的行匹配逻辑取反(如果pattern匹配失败,则认为当前行是多行事件的组成部分)

    3.3 多行被拆分

    Java 堆栈日志太长了,有 100 多行,被拆分了两部分,一部分被合并到了原来的那一条日志中,另外一部分被合并到了不相关的日志中。

    为了解决这个问题,可以通过配置 filebeat 的 multiline 插件来截断日志。至于为什么不用 logstash 的 multiline 插件呢?因为在 filter 中使用 multiline 没有截断的配置项。filebeat 的 multiline 配置项如下:

    multiline.type: pattern
    multiline.pattern: '^\d{4}-\d{1,2}-\d{1,2}\s\d{1,2}:\d{1,2}:\d{1,2}.\d{3}'
    multiline.negate: true
    multiline.match: after
    multiline.max_lines: 50

    配置项说明:

    • multiline.pattern:希望匹配到的结果(正则表达式)

    • multiline.negate:值为 true 或 false。使用 false 代表匹配到的行合并到上一行;使用 true 代表不匹配的行合并到上一行

    • multiline.match:值为 after 或 before。after 代表合并到上一行的末尾;before 代表合并到下一行的开头

    • multiline.max_lines:合并的最大行数,默认 500

    • multiline.timeout:一次合并事件的超时时间,默认为 5s,防止合并消耗太多时间导致 filebeat 进程卡死

    四、mutate 插件

    当我们将日志解析出来后,Logstash 自身会传一些不相关的字段到 ES 中,这些字段对我们排查线上问题帮助不大。可以直接剔除掉。这里我们就要用到 mutate 插件了。它可以对字段进行转换,剔除等。

    一般会把把 log.offset 字段去掉,这个字段可能会包含很多无意义内容。

    filter{
       mutate {
          remove_field => ["host" "[log][offset]"]
       } 

     

  • 相关阅读:
    GoLand设置GOROOT报错The selected directory is not a valid home for Go SDK
    Oracle ADG状态查看与相关视图
    oracle之执行计划
    C# 集合(一) —— Array类
    【合集】MySQL的入门进阶强化——从 普通人 到 超级赛亚人 的 华丽转身
    【IPC 通信】信号处理接口 Signal API(2)
    x64内核实验3-页机制的研究(1)
    SpringBoot SSMP项目搭建保姆级教程
    类型擦除和桥接方法
    【Java 演示灵活导出任意http、https接口数据】
  • 原文地址:https://blog.csdn.net/weixin_42054864/article/details/133156750