• ELK安装、部署、调试 (八)logstash配置语法详解


    1. input {
    2.   #输入插件
    3. }
    4. filter {
    5.   #过滤插件
    6. }
    7. output {
    8.   #输出插件
    9. }

    1.读取文件
     使用filewatch的ruby gem库来监听文件变化,并通过.sincedb的数据库文件记录被监听日志we年的读取进度(时间
    搓)
    。sincedb数据文件的默认路径为/plugins/inputs/file下面,文件名类似
    于.sincedb_234534534sdfgsfd23,为logstash的插件存储目录默认是LOGSTASH_HOME/data实验一:本机/var/log/secure为输入日志,标准输出

    1. vi /usr/local/logstash/2logstash-1.conf
    2. input {
    3.   file {
    4.     path => ["/var/log/messages"],[]
    5.     type => "ly_system"
    6.     start_position => "beginning"       
    7. #从beginning也就是文件开头进行读取,如果不写,默认是从文件最后开始读取。                                
    8.         #如果不想把文件全部作为输入,就不配置此属性。
    9.   }
    10. }
    11. output {
    12.   stdout {
    13.     codec => rubydebug
    14.   }
    15. }


    1.保存10.10.10.74 f-kafka-logs-es.conf的配置信息

    1. [root@localhost logstash]# cat f-kafka-logs-es.conf
    2. input {
    3.     kafka {
    4.     bootstrap_servers => "10.10.10.71:9092,10.10.10.72:9092,10.10.10.73:9092"
    5.     topics => ["osmessages"]
    6.     }
    7. }
    8. output {
    9.     elasticsearch {
    10.     hosts => ["10.10.10.65:9200","10.10.10.66:9200","10.10.10.67:9200"]
    11.     index => "osmessageslog-%{+YYYY-MM-dd}"
    12.   }
    13. }

    2.停止logstash服务
     kill -9 13508 
    3.使用2logstash-1.conf作为配置文件启动logstash
    nohup /usr/local/logstash/bin/logstash -f /usr/local/logstash/2logstash-1.conf &4.查看

    1. tail -f nohup.out
    2. {
    3.           "type" => "ly_system",
    4.           "path" => "/var/log/secure",
    5.       "@version" => "1",
    6.           "host" => "localhost.localdomain",
    7.        "message" => "Aug 31 08:43:56 localhost sshd[6920]: Accepted password for root from
    8. 172.16.17.234 port 1909 ssh2",
    9.     "@timestamp" => 2023-08-31T00:52:13.054Z
    10. }

    实验二:input插件添加域和标签

    1. [root@localhost logstash]# cat 2logstash-1.conf
    2. input {
    3.   file {
    4.     path => ["/var/log/secure"]
    5.     type => "ly_system"
    6.     start_position => "beginning"
    7.     add_field => {"I'm " => "10.10.10.74"}
    8.     tags => ["74","logstash1"]


    #从beginning也就是文件开头进行读取,如果不写,默认是从文件最后开始读取。
    #如果不想把文件全部作为输入,就不配置此属性。

    1.   }
    2. }
    3. output {
    4.   stdout {
    5.     codec => rubydebug
    6.   }
    7. }

    ***************************
    add_field => {"I'm " => "10.10.10.74"}  添加一个新的域,自己定义的。
    tags => ["74","logstash1"]  tags是内置的域,可以用来定义标签。
    ***************************
    输出结果

    1. {
    2.       "@version" => "1",
    3.           "host" => "localhost.localdomain",
    4.           "I'm " => "10.10.10.74",
    5.           "path" => "/var/log/secure",
    6.        "message" => "Aug 31 09:03:33 localhost sshd[14339]: Accepted password for root from
    7. 172.16.17.234 port 2684 ssh2",
    8.     "@timestamp" => 2023-08-31T01:03:53.586Z,
    9.           "type" => "ly_system",
    10.           "tags" => [
    11.         [0] "74",
    12.         [1] "logstash1"
    13.     ]
    14. }

    实验三:input读取syslog日志。
    需要完成2个步骤的操作,
    1,vi /etc/rsyslog.conf
    *.* @@10.10.10.74:5514       #10.10.10.74本机logstash服务器的IP地址,这个配置时使用rsyslog客户端把本机
    的日志信息传输到10.10.10.74服务器的5514端口上去。
    2.重启rsyslog
    systemctl restart rsyslog

    logstash配置文件如下:需要先启动,启动后会开启5514端口,用来侦听。

    1. [root@localhost logstash]# cat rsyslog-logstash.conf
    2. input {
    3.   syslog {
    4.     port => "5514"
    5.   }
    6. }
    7. output {
    8.   stdout {
    9.     codec => rubydebug
    10.   }
    11. }
    12. [root@localhost logstash]#

    查看日志

    1. tail -f nohup.out
    2. [2023-08-31T09:51:20,356][INFO ][logstash.inputs.syslog   ][main]
    3. [1ac4f1a43da057380f8444a587ee7cb01fe84a0702afb9d46abc9667eeb0ea0c] Starting syslog tcp listener
    4. {:address=>"0.0.0.0:5514"}
    5. [2023-08-31T09:51:20,390][INFO ][logstash.inputs.syslog   ][main]
    6. [1ac4f1a43da057380f8444a587ee7cb01fe84a0702afb9d46abc9667eeb0ea0c] Starting syslog udp listener
    7. {:address=>"0.0.0.0:5514"}

    日志源服务器 10.10.10.56 启动rsyslog客户端
    1,vi /etc/rsyslog.conf
    *.* @@10.10.10.74:5514       #10.10.10.74本机logstash服务器的IP地址,这个配置时使用rsyslog客户端把本机
    的日志信息传输到10.10.10.74服务器的5514端口上去。
    2.重启rsyslog

    1. systemctl restart rsyslog
    2. [root@node1 ~]# systemctl restart rsyslog
    3. [root@node1 ~]# service status rsyslog
    4. The service command supports only basic LSB actions (start, stop, restart, try-restart, reload, force-
    5. reload, status). For other actions, please try to use systemctl.
    6. [root@node1 ~]# systemctl status rsyslog
    7. ● rsyslog.service - System Logging Service
    8.    Loaded: loaded (/usr/lib/systemd/system/rsyslog.service; enabled; vendor preset: enabled)
    9.    Active: active (running) since 四 2023-08-31 10:00:47 CST; 31s ago

    logstash信息输出

    1. {
    2.     "facility_label" => "system",
    3.           "@version" => "1",
    4.          "timestamp" => "Aug 31 10:02:01",
    5.           "facility" => 3,
    6.               "host" => "10.10.10.56",
    7.          "logsource" => "node1",
    8.           "priority" => 30,
    9.         "@timestamp" => 2023-08-31T02:02:01.000Z,
    10.           "severity" => 6,
    11.     "severity_label" => "Informational",
    12.            "message" => "Removed slice User Slice of liuyang.\n",
    13.            "program" => "systemd"
    14. }
    15. {
    16.     "facility_label" => "security/authorization",
    17.           "@version" => "1",
    18.          "timestamp" => "Aug 31 10:02:01",
    19.           "facility" => 10,
    20.               "host" => "10.10.10.56",
    21.          "logsource" => "node1",
    22.           "priority" => 87,
    23.         "@timestamp" => 2023-08-31T02:02:01.000Z,
    24.           "severity" => 7,
    25.     "severity_label" => "Debug",
    26.                "pid" => "17805",
    27.            "message" => "pam_limits(crond:session): unknown limit item 'noproc'\n",
    28.            "program" => "crond"

    通过上面的日志输入,发现logstash把接收到的日志进行了详细的划分。会把日志中的时间,主机名,程序,具体信
    息拆分成多个字段进行存储。
    "timestamp" 为源日志的时间
    "@timestamp" 为logstash抓取日志的时间,与上面的时间差了8个小时,这个是时区的配置问题。

    **********************************************实验四:读取tcp网络数据
    下面的时间配置文件是通过“LogStash::Inputs::TCP”和"LogStash::Filters::Grok"相配合实现实验三rsyslog功能
    的日志读取

    1. [root@localhost logstash]# cat tcp-logstash.conf
    2. input {
    3.   tcp {
    4.     port => "5514"
    5.   }
    6. }
    7. filter {
    8.   grok {
    9.     match => {"message" => "%{SYSLOGLINE}"}
    10.   }
    11. }
    12. output {
    13.   stdout {
    14.     codec => rubydebug
    15.   }
    16. }


    [root@l
    启动logstash服务
     nohup /usr/local/logstash/bin/logstash -f /usr/local/logstash/tcp-logstash.conf &
    查看日志
    [2023-08-31T10:08:39,596][INFO ][logstash.inputs.tcp      ][main]
    [e17c63be3a5b12883f975a9f5eaf27f19639714f6267583b2142379ed6c8f22a] Starting tcp input listener
    {:address=>"0.0.0.0:5514", :ss                          l_enable=>"false"}

    5514端口已启动

    客户端同样适用rsyslog,同上一样的配置

    logstash 日志查询

    1. {
    2.           "port" => 58526,
    3.        "message" => [
    4.         [0] "<30>Aug 31 10:17:01 node1 systemd: Started Session 351785 of user liuyang.",
    5.         [1] "Started Session 351785 of user liuyang."
    6.     ],
    7.        "program" => "systemd",
    8.      "logsource" => "node1",
    9.           "host" => "10.10.10.56",
    10.      "timestamp" => "Aug 31 10:17:01",
    11.       "@version" => "1",
    12.     "@timestamp" => 2023-08-31T02:10:24.759Z
    13. }
    14. {
    15.           "port" => 58526,
    16.        "message" => [
    17.         [0] "<85>Aug 31 10:17:04 node1 polkitd[1172]: Registered Authentication Agent for unix-
    18. process:20549:1866610077 (system bus name :1.703886 [/usr/bin/pkttyagent --notify-fd 5 --fallback],
    19. object path /org/freedesktop/PolicyKit1/AuthenticationAgent, locale zh_CN.UTF-8)",
    20.         [1] "Registered Authentication Agent for unix-process:20549:1866610077 (system bus name
    21. :1.703886 [/usr/bin/pkttyagent --notify-fd 5 --fallback], object path
    22. /org/freedesktop/PolicyKit1/AuthenticationAgent, locale zh_CN.UTF-8)"
    23.     ],
    24.        "program" => "polkitd",
    25.      "logsource" => "node1",
    26.            "pid" => "1172",
    27.           "host" => "10.10.10.56",
    28.      "timestamp" => "Aug 31 10:17:04",
    29.       "@version" => "1",
    30.     "@timestamp" => 2023-08-31T02:10:26.949Z
    31. }


    我们可以看出【0】是完整的信息输出
    【1】是经过拆分的,如pid 、logsource、 时间、port都进行拆分出来了 
    tcp方式和rsyslog类似。


    ******************************************************
    实验五 适用nc 的方式将日志导入到logstash

    客户端(日志源)10.10.10.56 
    服务器(logstash)10.10.10.74服务器logstash配置方法如实验四

    1. [root@localhost logstash]# cat tcp-logstash.conf
    2. input {
    3.   tcp {
    4.     port => "5514"
    5.   }
    6. }
    7. filter {
    8.   grok {
    9.     match => {"message" => "%{SYSLOGLINE}"}
    10.   }
    11. }
    12. output {
    13.   stdout {
    14.     codec => rubydebug
    15.   }
    16. }

    客户端命令行窗口输入:
     nc 10.10.10.74 5514

    在logstash上查看日志

    1. {
    2.           "port" => 59322,
    3.        "message" => [
    4.         [0] "Aug 27 12:34:57 node1 supervisord: 2023-08-27 12:34:57,594 INFO supervisord started with
    5. pid 4760",
    6.         [1] "2023-08-27 12:34:57,594 INFO supervisord started with pid 4760"
    7.     ],
    8.        "program" => "supervisord",
    9.      "logsource" => "node1",
    10.           "host" => "10.10.10.56",
    11.      "timestamp" => "Aug 27 12:34:57",
    12.       "@version" => "1",
    13.     "@timestamp" => 2023-08-31T02:16:27.986Z
    14. }
    15. {
    16.           "port" => 59322,
    17.        "message" => [
    18.         [0] "Aug 27 12:34:58 node1 supervisord",
    19.         [1] "supervisord"
    20.     ],
    21.      "logsource" => "node1",
    22.           "host" => "10.10.10.56",
    23.      "timestamp" => "Aug 27 12:34:58",
    24.       "@version" => "1",
    25.     "@timestamp" => 2023-08-31T02:16:27.992Z
    26. }


    完成实验。


    实验六  编码插件codec
    此插件可以放到输入和输出时来处理数据
    input -> decode -->  filter --> decode --->output  decode就是使用codec进行编码
    codec支持plain 、json、json_lines等格式。
    1.codec插件之plain
     plain是一个空解释器,输入什么格式,输出就是什么格式

    1. [root@localhost logstash]# vi codec1-logstash.log
    2. input {
    3.   stdin
    4.   }
    5. }
    6. output {
    7.   stdout {
    8.     codec => "plain"
    9. #前面的测试我们都使用rubydebug编码,此编码会以json的格式进行输出
    10.   }
    11. }


    [root@localhost logstash]# vi codec1-logstash.log
    [root@localhost logstash]# /usr/local/logstash/bin/logstash -f /usr/local/logstash/codec1-logstash.log 
    此处不能用nohup,否则抓取不到

    1. hello             #我用键盘输入的,下面的信息是logstash的输出。
    2. 2023-08-31T02:28:31.068Z localhost.localdomain hello
    3. nihao
    4. 2023-08-31T02:28:37.572Z localhost.localdomain nihao
    5. 仅增加了2个字段一个是时间戳,一个是主机名

    和以前使用rubydebug的日志来对比一下

    hello             #我用键盘输入的,下面的信息是logstash的输出。

    1. {
    2.        "message" => "hello",
    3.       "@version" => "1",
    4.           "host" => "localhost.localdomain",
    5.     "@timestamp" => 2023-08-29T02:22:53.965Z
    6. }

    2.codec插件之json
    发送给logstash的数据如果是json格式的,那必须在input字段加入codec=> json来解析进来的数据,
    如果想让logstash输出为json的格式,可以在output字段加入codec=>json,

    1. [root@localhost logstash]# vi codec2-logstash.log
    2. input {
    3.   stdin
    4.   }
    5. }
    6. output {
    7.   stdout {
    8.     codec => "json"   #以json的格式 输出
    9.   }
    10. }


    json模式就是key:values格式


    3.codec插件之json_lines
    若果json文件比较长,需要换行的话,就会使用json_lines编码格式。


    实验七  过滤器插件filter
    1.grok正则捕获
    grok是一个强大的filter插件,通过正则解析任意文本文件,将非结构化的数据弄成结构化的数据,方便查询。

    https://help.aliyun.com/zh/sls/user-guide/grok-patterns
    GROK的模式参考及示例

    grok的语法规则
    %{语法:语义}
    语法指的就是匹配模式,例如使用number模式可以匹配数字,ip模式会匹配出127.0.0.1样式的IP地址

    例如1.输入内容为:172.16.213.132 [07/Feb/2018:16:24:19 +0800]"GET / HTTP/ 1.1" 403 5039
    那么,%{IP:clientip} IP就是语法,要匹配IP地址,  clientip为内容
    匹配的结果为clientip:172.16.213.132

    例如2:
    %{HTTPDATE:timestamp}结果为07/feb/2018:16:24 +800


    例如3:
    %{QS:referrer}匹配的结果
    GET / HTTP/ 1.1

    以上IP\ Httpdate 、QS都是grok内部定义好的模式,
    /usr/local/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-patterns-core-4.1.2/patterns
    [root@localhost patterns]# ls
    aws     bind  exim       grok-patterns  httpd  junos         maven        mcollective-patterns  nagios  
        rails  ruby
    bacula  bro   firewalls  haproxy        java   linux-syslog  mcollective  mongodb              
    postgresql  redis  squid
    [
    这个目录下,有很多匹配模式,我们可以直接拿来应用,其中grok-patterns使我们使用的基础匹配模式

    vi grok-patterns
    显示一小段内容如下:

    1. IPV6 ((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-
    2. 5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0
    3. -9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d))
    4. {3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\d|
    5. 1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]
    6. {1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1
    7. -9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-
    8. 5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:
    9. [0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-
    10. 4]\d|1\d\d|[1-9]?\d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-
    11. 4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:)))(%.+)?
    12. IPV4 (?<![0-9])(?:(?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5])[.](?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5])
    13. [.](?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5])[.](?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5]))(?![0-9])
    14. IP (?:%{IPV6}|%{IPV4})

    grok在线调试工具,
    网址:grokdebug.herokuapp.com 可能需要翻墙
    https://www.5axxw.com/tools/v2/grok.html
    以上两个都不好使
    自己在docker上搭建一个
    10.10.10.56上安装了docker
    docker pull epurs/grokdebugger:latest
    docker images
    docker run -d --name grokdebugger -p 8082:80 epurs/grokdebugger
    http://10.10.10.56:8082

    input输入日志
    pattern为模式

    1. [root@localhost logstash]# vi grok1-logstash.log
    2. input {
    3.   stdin { 
    4.   }
    5. }
    6. filter {
    7.   grok {
    8.     match => ["message","%{IP:clientip}"]
    9.   }
    10. }
    11. output {
    12.   stdout {
    13.     codec => "rubydebug"   
    14.   }
    15. }


    [root@localhost logstash]# /usr/local/logstash/bin/logstash -f /usr/local/logstash/grok1-logstash.log

    1. 172.16.213.132 [07/Feb/2018:16:24:19 +0800]"GET / HTTP/ 1.1" 403 5039        #输入
    2. {输出为:
    3.       "clientip" => "172.16.213.132",
    4.     "@timestamp" => 2023-08-31T07:36:18.564Z,
    5.       "@version" => "1",
    6.        "message" => "172.16.213.132 [07/Feb/2018:16:24:19 +0800]\"GET / HTTP/ 1.1\" 403 5039",
    7.           "host" => "localhost.localdomain"


    测试2:
    [root@localhost logstash]# vi grok2-logstash.log

    1. input {
    2.   stdin { 
    3.   }
    4. }
    5. filter {
    6.   grok {
    7.     match => ["message","%{IP:client-ip} \[%{HTTPDATE:timestamp1}\]%{QS:refer}\ %{NUMBER:nu}
    8. %{NUMBER:bytes1}"]
    9.   }
    10. }
    11. output {
    12.   stdout {
    13.     codec => "rubydebug"   
    14.   }
    15. }


    输出为:

    1. 172.16.213.132 [07/Feb/2018:16:24:19 +0800]"GET / HTTP/ 1.1" 403 5039
    2. {
    3.           "host" => "localhost.localdomain",
    4.     "timestamp1" => "07/Feb/2018:16:24:19 +0800",
    5.          "refer" => "\"GET / HTTP/ 1.1\"",
    6.             "nu" => "403",
    7.       "@version" => "1",
    8.        "message" => "172.16.213.132 [07/Feb/2018:16:24:19 +0800]\"GET / HTTP/ 1.1\" 403 5039",
    9.     "@timestamp" => 2023-08-31T07:46:51.778Z,
    10.         "bytes1" => "5039",
    11.      "client-ip" => "172.16.213.132"


    我们已将看到message已经分成5部分了,原有的message可以去掉了,系统中存在连个timestamp,其实@timestamp也
    不需要了,这个时间是收集日志的时间。而kibana使用@timestamp这个字段来排序。我们可以将timestamp的值付给
    @timestamp

    1. [root@localhost logstash]# vi grok-delete-logstash.log
    2. input {
    3.   stdin {
    4.   }
    5. }
    6. filter {
    7.   grok {
    8.     match => ["message","%{IP:client-ip} \[%{HTTPDATE:timestamp1}\]%{QS:refer}\ %{NUMBER:nu}
    9. %{NUMBER:bytes1}"]
    10.     remove_field => ["message"]
    11.   }
    12.   date {
    13.     match => ["timestamp1","dd/mmm/yyyy:HH:mm:ss Z"]
    14.    }
    15.   mutate {
    16.     remove_field => ["timestamp1"]
    17.   }
    18. }
    19. output {
    20.   stdout {
    21.     codec => "rubydebug"
    22.   }
    23. }

    输出结果

    1. 172.16.213.132 [07/Feb/2018:16:24:19 +0800]"GET / HTTP/ 1.1" 403 5039
    2. {
    3.     "@timestamp" => 2023-08-31T08:16:31.119Z,
    4.          "refer" => "\"GET / HTTP/ 1.1\"",
    5.             "nu" => "403",
    6.           "host" => "localhost.localdomain",
    7.      "client-ip" => "172.16.213.132",
    8.           "tags" => [
    9.         [0] "_dateparsefailure"
    10.     ],
    11.         "bytes1" => "5039",
    12.       "@version" => "1"

    以上使用了grok、date、mutate插件

    时间处理模式 DATE
    date插件 就是将值以什么格式赋值给@timestamp
    date {
        match => ["timestamp1","dd/mmm/yyyy:HH:mm:ss Z"]
       }

    将timestamp1按照后边dd/mmm/yyyy:HH:mm:ss Z的格式赋值给@timestamp

    数据修改插件  mutate
    1.正则表达式替换匹配字段
     gsub可以通过正则表达式替换字段中匹配到的值,只对字符串段有效,例子
    filter {
      mutate {
        gsub => ["filed_name_1","/","_"]
    #表示将field_name_1属性的字段中所有"/"字符替换成"_"
      }
    }
    实例:[root@localhost logstash]# cat grok-mutate-logstash.log                                       input {

    1.   stdin {
    2.   }
    3. }
    4. filter {
    5.   grok {
    6.     match => ["message","%{IP:client-ip} \[%{HTTPDATE:timestamp1}\]%{QS:refer}\ %{NUMBER:nu}
    7. %{NUMBER:bytes1}"]
    8.   }
    9. date {
    10.     match => ["timestamp1","dd/mmm/yyyy:HH:mm:ss Z"]
    11.    }
    12. mutate {
    13.     gsub => ["message","/","_"]
    14.   }
    15. }
    16. output {
    17.   stdout {
    18.     codec => "rubydebug"
    19.   }
    20. }

    /usr/local/logstash/bin/logstash -f /usr/local/logstash/grok-mutate-logstash.log

    1. 172.16.213.132 [07/Feb/2018:16:24:19 +0800]"GET / HTTP/ 1.1" 403 5039
    2. {
    3.          "refer" => "\"GET / HTTP/ 1.1\"",
    4.           "tags" => [
    5.         [0] "_dateparsefailure"
    6.     ],
    7.      "client-ip" => "172.16.213.132",
    8.         "bytes1" => "5039",
    9.        "message" => "172.16.213.132 [07_Feb_2018:16:24:19 +0800]\"GET _ HTTP_ 1.1\" 403 5039",
    10.     "@timestamp" => 2023-08-31T08:35:00.815Z,
    11.       "@version" => "1",
    12.           "host" => "localhost.localdomain",
    13.             "nu" => "403",
    14.     "timestamp1" => "07/Feb/2018:16:24:19 +0800"
    15. }


    看到"/"都替换成了"_"

    2.分隔字符串为数组
    split用分隔符分隔字符串为数组

    1. filter {
    2.   mutate {
    3.     split => ["filed_name_2","|"]
    4. #表示将field_name_1属性的字段中所有"/"字符替换成"_"
    5.   }
    6. }

    172.16.213.132|[07/Feb/2018:16:24:19 +0800]|"GET / HTTP/ 1.1"|403|5039
    实例:

    1. [root@localhost logstash]# cat grok-mutate2-logstash.log
    2. input {
    3.   stdin {
    4.   }
    5. }
    6. filter {
    7.   grok {
    8.     match => ["message","%{IP:client-ip} \[%{HTTPDATE:timestamp1}\]%{QS:refer}\ %{NUMBER:nu}
    9. %{NUMBER:bytes1}"]
    10.   }
    11. date {
    12.     match => ["timestamp1","dd/mmm/yyyy:HH:mm:ss Z"]
    13.    }
    14. mutate {
    15.     split => ["message","|"]
    16.   }
    17. }
    18. output {
    19.   stdout {
    20.     codec => "rubydebug"
    21.   }
    22. }


    结果:

    1. /usr/local/logstash/bin/logstash -f /usr/local/logstash/grok-mutate2-logstash.log
    2. 172.16.213.132|[07/Feb/2018:16:24:19 +0800]|"GET / HTTP/ 1.1"|403|5039
    3. {
    4.        "message" => [
    5.         [0] "172.16.213.132",
    6.         [1] "[07/Feb/2018:16:24:19 +0800]",
    7.         [2] "\"GET / HTTP/ 1.1\"",
    8.         [3] "403",
    9.         [4] "5039"
    10.     ],
    11.     "@timestamp" => 2023-08-31T08:39:17.562Z,
    12.           "host" => "localhost.localdomain",
    13.       "@version" => "1",
    14.           "tags" => [
    15.         [0] "_grokparsefailure"
    16.     ]
    17. }


    我们发现message的信息分成了5部分,以后调用以数组的形式调用

    3.重命名字段rename
    mutate {
        rename => {"message","message_new"}
      }
    }
    实例略

    4.删除字段remove_field

    mutate {
        remove_field => ["message"]
      }
    }

    综合实例:

      mutate {
        rename => {"nu","number"}
        gsub => ["refer","/","_"]
        remove_field => ["timestamp1"]
        split => ["client-ip","."]
        }
    重命名  替换  删除 分隔都可以写在一起。

    Geoip地址查询归类
    geoIP是免费的ip地址归类查询库,可以通过IP地址提供对应的地域信息,包括国别,省市,经纬度等,此插件对可视
    化地图和区域统计非常有用。
    filter {
      geoip {
        source => "ip_field"
    # ip_field字段是输出ip地址的一个字段

      }
    }
    实例:
    logstash配置[root@localhost logstash]# cat grok-geoip.log

    1. input {
    2.   stdin {
    3.   }
    4. }
    5. filter {
    6.   grok {
    7.     match => ["message","%{IP:client-ip} \[%{HTTPDATE:timestamp1}\]%{QS:refer}\ %{NUMBER:nu}
    8. %{NUMBER:bytes1}"]
    9.   }
    10. date {
    11.     match => ["timestamp1","dd/mmm/yyyy:HH:mm:ss Z"]
    12.    }
    13. geoip {
    14.   source => "client-ip"
    15.   }
    16. }
    17. output {
    18.   stdout {
    19.     codec => "rubydebug"
    20.   }
    21. }

    172.16.213.132 [07/Feb/2018:16:24:19 +0800]"GET / HTTP/ 1.1" 403 5039
    114.114.114.114 [07/Feb/2018:16:24:19 +0800]"GET / HTTP/ 1.1" 403 5039
    输出结果:

    1. 202.97.224.68 [07/Feb/2018:16:24:19 +0800]"GET / HTTP/ 1.1" 403 5039
    2. {
    3.     "timestamp1" => "07/Feb/2018:16:24:19 +0800",
    4.         "bytes1" => "5039",
    5.          "geoip" => {
    6.               "latitude" => 45.75,
    7.            "region_name" => "Heilongjiang",
    8.          "country_code2" => "CN",
    9.           "country_name" => "China",
    10.              "longitude" => 126.65,
    11.               "location" => {
    12.             "lon" => 126.65,
    13.             "lat" => 45.75
    14.         },
    15.          "country_code3" => "CN",
    16.            "region_code" => "HL",
    17.         "continent_code" => "AS",
    18.                     "ip" => "202.97.224.68",
    19.               "timezone" => "Asia/Shanghai"
    20.     },
    21.        "message" => "202.97.224.68 [07/Feb/2018:16:24:19 +0800]\"GET / HTTP/ 1.1\" 403 5039",
    22.     "@timestamp" => 2023-09-01T01:27:01.443Z,
    23.      "client-ip" => "202.97.224.68",
    24.          "refer" => "\"GET / HTTP/ 1.1\"",
    25.             "nu" => "403",
    26.       "@version" => "1",
    27.           "host" => "localhost.localdomain",
    28.           "tags" => [
    29.         [0] "_dateparsefailure"
    30.     ]
    31. }

    longitude  latitude 经纬度 

    以上信息有些多,想精简一些

    精简geoip信息
    geoip {
      source => "client-ip"
      fields => ["ip","country_code3","longitude","latitude","region_name"]
    #仅将需要保留的域显示出来
      }[root@localhost logstash]# cat grok-geoip2.log

    1. input {
    2.   stdin {
    3.   }
    4. }
    5. filter {
    6.   grok {
    7.     match => ["message","%{IP:client-ip} \[%{HTTPDATE:timestamp1}\]%{QS:refer}\ %{NUMBER:nu}
    8. %{NUMBER:bytes1}"]
    9.   }
    10. date {
    11.     match => ["timestamp1","dd/mmm/yyyy:HH:mm:ss Z"]
    12.    }
    13. geoip {
    14.   source => "client-ip"
    15.   fields => ["ip","country_code3","longitude","latitude","region_name"]
    16.   }
    17. }
    18. output {
    19.   stdout {
    20.     codec => "rubydebug"
    21.   }
    22. }

    输出结果:

    202.97.224.68 [07/Feb/2018:16:24:19 +0800]"GET / HTTP/ 1.1" 403 5039

    1. {
    2.             "nu" => "403",
    3.     "@timestamp" => 2023-09-01T01:30:39.227Z,
    4.       "@version" => "1",
    5.           "tags" => [
    6.         [0] "_dateparsefailure"
    7.     ],
    8.          "geoip" => {
    9.                    "ip" => "202.97.224.68",
    10.         "country_code3" => "CN",
    11.              "latitude" => 45.75,
    12.           "region_name" => "Heilongjiang",
    13.             "longitude" => 126.65
    14.     },
    15.        "message" => "202.97.224.68 [07/Feb/2018:16:24:19 +0800]\"GET / HTTP/ 1.1\" 403 5039",
    16.      "client-ip" => "202.97.224.68",
    17.           "host" => "localhost.localdomain",
    18.          "refer" => "\"GET / HTTP/ 1.1\"",
    19.     "timestamp1" => "07/Feb/2018:16:24:19 +0800",
    20.         "bytes1" => "5039"
    21. }

    logstash的输出插件output

    file  将数据写入磁盘文件
    elasticsearch :把日志数据发送到es集群
    graphite:用于存储和绘制数据指标
    还支持输出到redis,email,exec,ngios等等

    1.标准输出

    output {
      stdout {
        codec => "rubydebug"
      }
    }


    2.保存到文件

    output {
      file {
        path => "/data/log3/%{+yyyy-MM-dd}/%{host}_%{+HH}.log"
      }
    }

    例子:
    [root@localhost logstash]# cat file-log.log

    1. input {
    2.   stdin {
    3.   }
    4. }
    5. output {
    6.   file {
    7.     path => "/data/log3/%{+yyyy-MM-dd}/%{host}_%{+HH}.log"
    8.   }
    9. }


    /usr/local/logstash/bin/logstash -f /usr/local/logstash/file-log.log
    标准输入信息后
    114.114.114.114 [07/Feb/2018:16:24:19 +0800]\"GET / HTTP/ 1.1\" 403 5039
    asdfasdf

    /data/log3/2023-09-01/下文件内容:

    1. [root@localhost 2023-09-01]# cat localhost.localdomain_01.log
    2. {"@timestamp":"2023-09-01T01:57:05.295Z","message":"114.114.114.114 [07/Feb/2018:16:24:19 +0800]\"GET /
    3. HTTP/ 1.1\" 403 5039","host":"localhost.localdomain","@version":"1"}
    4. [root@localhost 2023-09-01]# tail -f localhost.localdomain_01.log
    5. {"@timestamp":"2023-09-01T01:57:05.295Z","message":"114.114.114.114 [07/Feb/2018:16:24:19 +0800]\"GET /
    6. HTTP/ 1.1\" 403 5039","host":"localhost.localdomain","@version":"1"}
    7. {"@timestamp":"2023-09-
    8. 01T01:57:53.440Z","message":"asdfasdf","host":"localhost.localdomain","@version":"1"}

    我们发现输出的内容会在输入的内容上加了一些信息,如@timestamp @version host等属性
    如果要让输入和输出一样。我们需要使用codec来格式编码

    logstash配置
    [root@localhost logstash]# cat file2-log.log

    1. input {
    2.   stdin {
    3.   }
    4. }
    5. output {
    6.   file {
    7.     path => "/data/log3/%{+yyyy-MM-dd}/%{host}_%{+HH}.log"
    8.     codec => line { format => "%{message}"}
    9.   }
    10. }


    [root@localhost logstash]# /usr/local/logstash/bin/logstash -f /usr/local/logstash/file2-log.log
    标准输入
    adfasdfasdf
    ELK大规模日志实时处理系统从入门到企业应用实战视频课程

    查看输出:

    [root@localhost 2023-09-01]# tail -f localhost.localdomain_02.log
    adfasdfasdf
    ELK大规模日志实时处理系统从入门到企业应用实战视频课程

    [root@localhost 2023-09-01]# pwd
    /data/log3/2023-09-01
    [root@localhost 2023-09-01]#

    输出与输入一致了。


    八  ELK手机apache访问日志的案例
    elk收集日子的几种方法
    1.不修改源日志的输出格式,而是通过logstash的grok方式进行过滤、清晰,然后输出
      优点,对业务系统无影响,缺点是logstash可能会有瓶颈。
    2.修改源日志的输出格式,按要求的格式改变源日志格式进行输出,logstash仅收集和传输。
      优点:减轻了logstash的压力,但是需要一定的工作量去处理源日志格式。

    elk收集apache日志应用架构

    apache(filebeat)  --   kafka(zookeeper) --  logstash  --  ES集群
    使用第二种方式,用改变源日志输出格式来处理。

  • 相关阅读:
    PostgreSQL 多表连接不同维度聚合统计查询
    AWS SQS, Boto3 and Python:带示例的完整指南
    十 计算机网络
    Springboot 国际化
    全新骁龙7系移动平台带来出色的性能和能效,以及多个7系层级首次支持的特性
    【双向传输ConvLSTM网络:Pan-Sharpening】
    redis与Java交互
    MySql使用MyCat分库分表(五)MyCat 管理及监控
    【问题思考总结】已知对角矩阵怎么求原矩阵?原矩阵唯一吗?【相似对角化】
    GB28181,sdk,设备集成和平台测试
  • 原文地址:https://blog.csdn.net/ly4983/article/details/132623138