• 【ELK使用指南 2】常用的 Logstash filter 插件详解(附应用实例)


    一、logstash filter过滤插件的常用模块简介

    外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

    Logstash 中的 filter 模块用于对输入的数据进行处理和转换

    它提供了多个插件,可以根据需要选择适当的插件来应用于数据流

    常见的 Logstash filter 插件包括:

    1. grok:用于将非结构化的日志数据解析为结构化的数据。它使用自定义的正则表达式模式匹配和提取字段。
    2. mutate:用于修改事件中的字段,例如重命名字段、删除字段、添加字段、替换字段值等。
    3. date:用于解析日期和时间字段,并将其转换为特定的格式。
    4. geoip:根据 IP 地址解析并添加地理位置信息字段,例如国家、省份、城市等。
    5. dns:根据域名解析并添加 IP 地址信息字段,例如解析主机名到 IP 地址。
    6. json:用于解析和格式化 JSON 数据。
    7. csv:用于解析和格式化 CSV 数据。
    8. grokdiscovery:用于动态生成 grok 模式,这对于处理多种日志格式非常有用。

    二、grok 正则捕获插件

    2.1 grok插件的作用

    将大文本字段分片成若干个小字段

    2.2 内置正则表达式

    内置正则格式:%{内置正则表达式:小字段名}

    1.举个例子,用内置正则匹配消息记录

    192.168.80.10 GET /index.html 15824 0.043
    
    %{IP:client_id_address} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:http_response_time}
    
    • 1
    • 2
    • 3
    登录kibana,进入Grok Debugger界面
    
    • 1

    在这里插入图片描述

    2.官方提供的常量

    Logstash 官方也给了一些常用的常量来表达那些正则表达式.

    地址:https://github.com/logstash-plugins/logstash-patterns-core/blob/main/patterns/ecs-v1/grok-patterns

    USERNAME [a-zA-Z0-9._-]+
    USER %{USERNAME}
    EMAILLOCALPART [a-zA-Z][a-zA-Z0-9_.+-=:]+
    EMAILADDRESS %{EMAILLOCALPART}@%{HOSTNAME}
    INT (?:[+-]?(?:[0-9]+))
    BASE10NUM (?<![0-9.+-])(?>[+-]?(?:(?:[0-9]+(?:\.[0-9]+)?)|(?:\.[0-9]+)))
    NUMBER (?:%{BASE10NUM})
    BASE16NUM (?<![0-9A-Fa-f])(?:[+-]?(?:0x)?(?:[0-9A-Fa-f]+))
    BASE16FLOAT \b(?<![0-9A-Fa-f.])(?:[+-]?(?:0x)?(?:(?:[0-9A-Fa-f]+(?:\.[0-9A-Fa-f]*)?)|(?:\.[0-9A-Fa-f]+)))\b
    
    POSINT \b(?:[1-9][0-9]*)\b
    NONNEGINT \b(?:[0-9]+)\b
    WORD \b\w+\b
    NOTSPACE \S+
    SPACE \s*
    DATA .*?
    GREEDYDATA .*
    QUOTEDSTRING (?>(?<!\\)(?>"(?>\\.|[^\\"]+)+"|""|(?>'(?>\\.|[^\\']+)+')|''|(?>(?>\\.|[^\\]+)+)|))
    UUID [A-Fa-f0-9]{8}-(?:[A-Fa-f0-9]{4}-){3}[A-Fa-f0-9]{12}
    # URN, allowing use of RFC 2141 section 2.3 reserved characters
    URN urn:[0-9A-Za-z][0-9A-Za-z-]{0,31}:(?:%[0-9a-fA-F]{2}|[0-9A-Za-z()+,.:=@;$_!*'/?#-])+
    
    # Networking
    MAC (?:%{CISCOMAC}|%{WINDOWSMAC}|%{COMMONMAC})
    CISCOMAC (?:(?:[A-Fa-f0-9]{4}\.){2}[A-Fa-f0-9]{4})
    WINDOWSMAC (?:(?:[A-Fa-f0-9]{2}-){5}[A-Fa-f0-9]{2})
    COMMONMAC (?:(?:[A-Fa-f0-9]{2}:){5}[A-Fa-f0-9]{2})
    IPV6 ((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:)))(%.+)?
    IPV4 (?[A-Za-z]+:|\\)(?:\\[^\\?*]*)+
    URIPROTO [A-Za-z]([A-Za-z0-9+\-.]+)+
    URIHOST %{IPORHOST}(?::%{POSINT:port})?
    # uripath comes loosely from RFC1738, but mostly from what Firefox
    # doesn't turn into %XX
    URIPATH (?:/[A-Za-z0-9$.+!*'(){},~:;=@#%&_\-]*)+
    #URIPARAM \?(?:[A-Za-z0-9]+(?:=(?:[^&]*))?(?:&(?:[A-Za-z0-9]+(?:=(?:[^&]*))?)?)*)?
    URIPARAM \?[A-Za-z0-9$.+!*'|(){},~@#%&/=:;_?\-\[\]<>]*
    URIPATHPARAM %{URIPATH}(?:%{URIPARAM})?
    URI %{URIPROTO}://(?:%{USER}(?::[^@]*)?@)?(?:%{URIHOST})?(?:%{URIPATHPARAM})?
    
    # Months: January, Feb, 3, 03, 12, December
    MONTH \b(?:[Jj]an(?:uary|uar)?|[Ff]eb(?:ruary|ruar)?|[Mm](?:a|ä)?r(?:ch|z)?|[Aa]pr(?:il)?|[Mm]a(?:y|i)?|[Jj]un(?:e|i)?|[Jj]ul(?:y)?|[Aa]ug(?:ust)?|[Ss]ep(?:tember)?|[Oo](?:c|k)?t(?:ober)?|[Nn]ov(?:ember)?|[Dd]e(?:c|z)(?:ember)?)\b
    MONTHNUM (?:0?[1-9]|1[0-2])
    MONTHNUM2 (?:0[1-9]|1[0-2])
    MONTHDAY (?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9])
    
    # Days: Monday, Tue, Thu, etc...
    DAY (?:Mon(?:day)?|Tue(?:sday)?|Wed(?:nesday)?|Thu(?:rsday)?|Fri(?:day)?|Sat(?:urday)?|Sun(?:day)?)
    
    # Years?
    YEAR (?>\d\d){1,2}
    HOUR (?:2[0123]|[01]?[0-9])
    MINUTE (?:[0-5][0-9])
    # '60' is a leap second in most time standards and thus is valid.
    SECOND (?:(?:[0-5]?[0-9]|60)(?:[:.,][0-9]+)?)
    TIME (?!<[0-9])%{HOUR}:%{MINUTE}(?::%{SECOND})(?![0-9])
    # datestamp is YYYY/MM/DD-HH:MM:SS.UUUU (or something like it)
    DATE_US %{MONTHNUM}[/-]%{MONTHDAY}[/-]%{YEAR}
    DATE_EU %{MONTHDAY}[./-]%{MONTHNUM}[./-]%{YEAR}
    ISO8601_TIMEZONE (?:Z|[+-]%{HOUR}(?::?%{MINUTE}))
    ISO8601_SECOND (?:%{SECOND}|60)
    TIMESTAMP_ISO8601 %{YEAR}-%{MONTHNUM}-%{MONTHDAY}[T ]%{HOUR}:?%{MINUTE}(?::?%{SECOND})?%{ISO8601_TIMEZONE}?
    DATE %{DATE_US}|%{DATE_EU}
    DATESTAMP %{DATE}[- ]%{TIME}
    TZ (?:[APMCE][SD]T|UTC)
    DATESTAMP_RFC822 %{DAY} %{MONTH} %{MONTHDAY} %{YEAR} %{TIME} %{TZ}
    DATESTAMP_RFC2822 %{DAY}, %{MONTHDAY} %{MONTH} %{YEAR} %{TIME} %{ISO8601_TIMEZONE}
    DATESTAMP_OTHER %{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{TZ} %{YEAR}
    DATESTAMP_EVENTLOG %{YEAR}%{MONTHNUM2}%{MONTHDAY}%{HOUR}%{MINUTE}%{SECOND}
    
    # Syslog Dates: Month Day HH:MM:SS
    SYSLOGTIMESTAMP %{MONTH} +%{MONTHDAY} %{TIME}
    PROG [\x21-\x5a\x5c\x5e-\x7e]+
    SYSLOGPROG %{PROG:program}(?:\[%{POSINT:pid}\])?
    SYSLOGHOST %{IPORHOST}
    SYSLOGFACILITY <%{NONNEGINT:facility}.%{NONNEGINT:priority}>
    HTTPDATE %{MONTHDAY}/%{MONTH}/%{YEAR}:%{TIME} %{INT}
    
    # Shortcuts
    QS %{QUOTEDSTRING}
    
    # Log formats
    SYSLOGBASE %{SYSLOGTIMESTAMP:timestamp} (?:%{SYSLOGFACILITY} )?%{SYSLOGHOST:logsource} %{SYSLOGPROG}:
    
    # Log Levels
    LOGLEVEL ([Aa]lert|ALERT|[Tt]race|TRACE|[Dd]ebug|DEBUG|[Nn]otice|NOTICE|[Ii]nfo|INFO|[Ww]arn?(?:ing)?|WARN?(?:ING)?|[Ee]rr?(?:or)?|ERR?(?:OR)?|[Cc]rit?(?:ical)?|CRIT?(?:ICAL)?|[Ff]atal|FATAL|[Ss]evere|SEVERE|EMERG(?:ENCY)?|[Ee]merg(?:ency)?)
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96

    2.3 自定义正则表达式

    自定义正则格式:(?<小字段名>自定义正则表达式)

    #举个例子
    #目标消息记录
    192.168.80.11 - - [14/Oct/2023:11:53:02 +0800] "GET /jxl.html HTTP/1.1" 200 17 "-" "curl/7.29.0" "-"
    
    #按照以下格式显示
      "remote_addr": 
      "response_code": 
      "http_method": 
      "refer_url": 
      "http_version": 
      "request_uri": 
      "user_agent": 
      "access_time": 
    
    
    #使用的正则表达式
    %{IP:remote_addr} - - \[(?<access_time>.+)\] \"%{WORD:http_method} %{URIPATHPARAM:request_uri} (?<http_version>.+)\" %{NUMBER:response_code} [0-9]+ \"(?<refer_url>.+)\" \"(?<user_agent>.+)\" .*
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    在这里插入图片描述

    三、mutate 数据修改插件

    3.1 mutate插件的作用

    对Logstash收集的日志事件字段进行格式化处理。

    3.2 常用的配置选项

    配置选项功能
    rename重命名字段名
    add_field添加字段
    remove_field删除字段
    replace替换字段的值
    gsub修改字段值的字符
    convert修改字段值的数据类型

    3.3 mutate插件应用实例

    #将字段old_field重命名为new_field
    filter {
        mutate {
    	    #写法1,使用中括号括起来
            rename => ["old_field" => "new_field"]
    
            #写法2,使用大括号{}括起来
    	    rename => { "old_field" => "new_field" }		
        }
    }
    
    
    #添加字段
    filter {
        mutate {
            add_field => {
            	"f1" => "field1"
            	"f2" => "field2"
            }
        }
    }
    
    
    #将字段删除
    filter {
        mutate {
            remove_field  =>  ["message", "@version", "tags"]
        }
    }
    
    
    #将filedName1字段数据类型转换成string类型,filedName2字段数据类型转换成float类型
    filter {
        mutate {
            #写法1,使用中括号括起来
            convert  =>  ["filedName1", "string"]
    		
            #写法2,使用大括号{}括起来
    		convert => { "filedName2" => "float" }
        }
    }
    
    
    #将filedName字段中所有"/“字符替换为”_"
    filter {
        mutate {
            gsub => ["filedName", "/" , "_"]
        }
    }
    
    
    #将filedName字段中所有",“字符后面添加空格
    filter {
        mutate {
            gsub => ["filedName", "," , ", "]
        }
    }
    
    
    #将filedName字段以"|"为分割符拆分数据成为数组
    filter {
        mutate {
            split => ["filedName", "|"]
        }
    }
    
    
    #合并 “filedName1” 和 “ filedName2” 两个字段
    filter {
        merge  { "filedName2" => "filedName1" }
    }
    
    
    #用新值替换filedName字段的值
    filter {
        mutate {
            replace => { "filedName" => "new_value" }
        }
    }
    
    
    #添加字段first,值为message数组的第一个元素的值
    filter {
        mutate {
            split => ["message", "|"]
            add_field => {
                "first" => "%{[message][0]}"    
            } 
        }
    }
    
    
    #有条件的添加标签
    filter {
        #在日志文件路径包含 access 的条件下添加标签内容
        if [path] =~ "access" {
            mutate {
                add_tag => ["Nginx Access Log"]
            }
        }
        
        #在日志文件路径是 /var/log/nginx/error.log 的条件下添加标签内容
        if [path] == "/var/log/nginx/error.log" {
            mutate {
                add_tag => ["Nginx Error Log"]
            }
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110

    四、multiline 多行合并插件

    4.1 multiline插件的作用

    将多行日志内容合并成一整行

    4.2 常用配置项

    在这里插入图片描述

    pattern:通过正则表达式匹配行内容。

    在这里插入图片描述

    negate

    false:不取反,表示正则匹配的行做合并操作。

    true:取反,表示正则不匹配的行做合并操作。
    在这里插入图片描述

    what

    previous表示向上合并

    next表示向下合并

    4.3 multiline插件应用实例

    Step1 安装multiline插件

    方式一:在线安装

    cd /usr/share/logstash
    
    bin/logstash-plugin install logstash-filter-multiline
    
    • 1
    • 2
    • 3

    方式二:离线安装

    先在有网的机器上在线安装插件,然后打包,拷贝到服务器,执行安装命令。

    bin/logstash-plugin prepare-offline-pack --overwrite --output logstash-filter-multiline.zip logstash-filter-multiline
    
    bin/logstash-plugin install file:///usr/share/logstash/logstash-filter-multiline.zip
    
    • 1
    • 2
    • 3

    Step2 使用multiline插件

    第一步:每一条日志的第一行开头都是一个时间,可以用时间的正则表达式匹配到第一行

    第二步:然后将后面每一行的日志与第一行合并

    第三步:当遇到某一行的开头是可以匹配正则表达式的时间的,就停止第一条日志的合并,开始合并第二条日志;

    第四步:重复第二步和第三步。

    filter {
      multiline {
        pattern => "^\d{4}-\d{1,2}-\d{1,2}\s\d{1,2}:\d{1,2}:\d{1,2}.\d{3}"
        negate => true
        what => "previous"
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    在这里插入图片描述

    五、date 时间处理插件

    5.1 date插件的作用

    将Logstash收集的日志事件时间@timestamp日志实际的时间进行格式统一。

    需要先配置grok插件模块获取日志时间的小字段

    5.2 常用的配置项

    在这里插入图片描述

    match:用于配置具体的匹配内容规则

    前半部分内容表示匹配实际日志当中的时间戳的名称,后半部分则用于匹配实际日志当中的时间戳格式

    这个地方是整条配置的核心内容,如果此处规则匹配是无效的,则生成后的日志时间戳将会被input插件读取的时间替代。

    如果时间格式匹配失败,会生成一个tags字段,字段值为 _dateparsefailure,需要重新检查上边的match配置解析是否正确。

    在这里插入图片描述

    target将匹配的时间戳存储到给定的目标字段中

    如果未提供,则默认更新事件的@timestamp字段。
    在这里插入图片描述

    timezone:当需要配置的date里面没有时区信息,而且不是UTC时间,需要设置timezone参数。

    5.3 date插件应用实例

    #新建子配置文件,用于测试
    vim  /etc/logstash/conf.d/test.conf
    
    #在filter块中,启用date插件
    filter {
        date {
            match => ["access_time", "dd/MMM/YYYY:HH:mm:ss Z", "UNIX", "yyyy-MM-dd HH:mm:ss", "dd-MMM-yyyy HH:mm:ss"]
                    target => "@timestamp"
                    timezone => "Asia/Shanghai"
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    在这里插入图片描述

    #运行配置文件
    #还需要启动filebeat收集日志,这里不再赘述
    logstash -f test.conf
    
    然后登录到kibana,查看视图的变化
    
    • 1
    • 2
    • 3
    • 4
    • 5

    在这里插入图片描述

  • 相关阅读:
    Numpy+PyTorch基础《python深度学习----基于pytorch》
    Pygame中实现图片的移动
    Linux下PCIE设备分析软件
    clickhouse中SummingMergeTree
    Swappin.gifts 在 Ambire dApp 中推出独家促销活动
    网络安全(黑客)自学
    家庭用户无线上网案例(AC通过三层口对AP进行管理)
    第二十一章 构建和配置 Nginx (UNIX® Linux macOS) - 为CSP构建Nginx的过程
    超视频时代音视频架构建设与演进
    单片机编程原则
  • 原文地址:https://blog.csdn.net/q2524607033/article/details/133904921