• ElasticSearch(九):ELK 架构


    日志收集——》格式化分析——》检索和可视化——》风险告警

    • ELK架构
      • 经典的ELK
      • 整合消息队列+Nginx架构
    • 什么是Logstash
      • Logstash核心概念
      • Logstash数据传输原理
      • Logstash配置文件结构
      • Logstash Queue
      • Logstash导入数据到ES
      • 同步数据库数据到Elasticsearch
    • 什么是Beats
      • FileBeat简介
      • FileBeat的工作原理
      • logstash vs FileBeat
      • Filebeat安装
    • ELK整合实战
      • 案例:采集tomcat服务器日志
      • 使用FileBeats将日志发送到Logstash
      • 配置Logstash接收FileBeat收集的数据并打印
      • Logstash输出数据到Elasticsearch
      • 利用Logstash过滤器解析日志
      • 输出到Elasticsearch指定索引

    ELK架构

    ELK架构分为两种,一种是经典的ELK,另外一种是加上消息队列(Redis或Kafka或RabbitMQ)和Nginx结构。

    经典的ELK 数据量小的开发环境,存在数据丢失的危险

    经典的ELK主要是由Filebeat + Logstash + Elasticsearch + Kibana组成,如下图:(早期的ELK只有Logstash + Elasticsearch + Kibana)

    在这里插入图片描述

    整合消息队列+Nginx架构 生产环境,可以处理大数据量,并且不会丢失数据

    这种架构,主要加上了Redis或Kafka或RabbitMQ做消息队列,保证了消息的不丢失

    在这里插入图片描述

    什么是Logstash

    Logstash 是免费且开放的服务器端数据处理管道,能够从多个来源采集数据,转换数据,然后将数据发送到您最喜欢的存储库中。

    https://www.elastic.co/cn/logstash/

    应用:ETL工具 / 数据采集处理引擎

    在这里插入图片描述

    Logstash核心概念

    Pipeline

    包含了input—filter-output三个阶段的处理流程
    插件生命周期管理
    队列管理

    Logstash Event

    数据在内部流转时的具体表现形式。数据在input 阶段被转换为Event,在 output被转化成目标格式数据
    Event 其实是一个Java Object,在配置文件中,对Event 的属性进行增删改查

    Codec (Code / Decode)

    将原始数据decode成Event; 将Event encode成目标数据

    在这里插入图片描述

    Logstash数据传输原理

    1. 数据采集与输入:Logstash支持各种输入选择,能够以连续的流式传输方式,轻松地从日志、指标、Web应用以及数据存储中采集数据。
    2. 实时解析和数据转换:通过Logstash过滤器解析各个事件,识别已命名的字段来构建结构,并将它们转换成通用格式,最终将数据从源端传输到存储库中。
    3. 存储与数据导出:Logstash提供多种输出选择,可以将数据发送到指定的地方。

    Logstash通过管道完成数据的采集与处理,管道配置中包含input、output和filter(可选)插件,input和output用来配置输入和输出数据源、filter用来对数据进行过滤或预处理。在这里插入图片描述

    Logstash配置文件结构

    参考:https://www.elastic.co/guide/en/logstash/7.17/configuration.html

    Logstash的管道配置文件对每种类型的插件都提供了一个单独的配置部分,用于处理管道事件

    input {
      stdin { }
    }
    
    filter {
      grok {
        match => { "message" => "%{COMBINEDAPACHELOG}" }
      }
      date {
        match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
      }
    }
    
    output {
      elasticsearch { hosts => ["localhost:9200"]}  
      stdout { codec => rubydebug }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    运行

    bin/logstash -f logstash-demo.conf

    Input Plugins

    https://www.elastic.co/guide/en/logstash/7.17/input-plugins.html
    一个 Pipeline可以有多个input插件
    Stdin / File
    Beats / Log4J /Elasticsearch / JDBC / Kafka /Rabbitmq /Redis
    JMX/ HTTP / Websocket / UDP / TCP
    Google Cloud Storage / S3
    Github / Twitter

    Output Plugins

    https://www.elastic.co/guide/en/logstash/7.17/output-plugins.html
    将Event发送到特定的目的地,是 Pipeline 的最后一个阶段。
    常见 Output Plugins:

    • Elasticsearch
    • Email / Pageduty
    • Influxdb / Kafka / Mongodb / Opentsdb / Zabbix
    • Http / TCP / Websocket

    Filter Plugins

    https://www.elastic.co/guide/en/logstash/7.17/filter-plugins.html
    处理Event
    内置的Filter Plugins:

    • Mutate 一操作Event的字段
    • Metrics — Aggregate metrics
    • Ruby 一执行Ruby 代码

    Codec Plugins

    https://www.elastic.co/guide/en/logstash/7.17/codec-plugins.html
    将原始数据decode成Event;将Event encode成目标数据
    内置的Codec Plugins:

    • Line / Multiline
    • JSON / Avro / Cef (ArcSight Common Event Format)
    • Dots / Rubydebug

    Logstash Queue

    • In Memory Queue
      进程Crash,机器宕机,都会引起数据的丢失
    • Persistent Queue
      机器宕机,数据也不会丢失; 数据保证会被消费; 可以替代 Kafka等消息队列缓冲区的作用
    queue.type: persisted #(默认是memory)
    queue.max_bytes: 4gb
    
    • 1
    • 2

    在这里插入图片描述

    Logstash安装

    logstash官方文档: https://www.elastic.co/guide/en/logstash/7.17/installing-logstash.html

    1)下载并解压logstash

    下载地址: https://www.elastic.co/cn/downloads/past-releases#logstash
    选择版本:7.17.5

    wget https://artifacts.elastic.co/downloads/logstash/logstash-7.17.5-linux-x86_64.tar.gz

    tar -zxvf logstash-7.17.5-linux-x86_64.tar.gz

    2)测试:运行最基本的logstash管道

    cd logstash-7.17.5
    #-e选项表示,直接把配置放在命令中,这样可以有效快速进行测试

    bin/logstash -e 'input { stdin { } } output { stdout {} }'
    
    • 1

    在这里插入图片描述

    Codec Plugin测试

    #single line
    bin/logstash -e "input{stdin{codec=>line}}output{stdout{codec=> rubydebug}}"
    bin/logstash -e "input{stdin{codec=>json}}output{stdout{codec=> rubydebug}}"
    
    • 1
    • 2
    • 3

    在这里插入图片描述

    Codec Plugin —— Multiline

    设置参数:

    • pattern: 设置行匹配的正则表达式
    • what : 如果匹配成功,那么匹配行属于上一个事件还是下一个事件
      previous / next
    • negate : 是否对pattern结果取反
      true / false
    # 多行数据,异常
    Exception in thread "main" java.lang.NullPointerException
            at com.example.myproject.Book.getTitle(Book.java:16)
            at com.example.myproject.Author.getBookTitles(Author.java:25)
            at com.example.myproject.Bootstrap.main(Bootstrap.java:14)
    
    # multiline-exception.conf
    input {
      stdin {
        codec => multiline {
          pattern => "^\s"
          what => "previous"
        }
      }
    }
    
    filter {}
    
    output {
      stdout { codec => rubydebug }
    }
    
    #执行管道
    bin/logstash -f multiline-exception.conf
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    在这里插入图片描述

    Input Plugin —— File

    https://www.elastic.co/guide/en/logstash/7.17/plugins-inputs-file.html

    • 支持从文件中读取数据,如日志文件
    • 文件读取需要解决的问题:只被读取一次。重启后需要从上次读取的位置继续(通过sincedb 实现)
    • 读取到文件新内容,发现新文件
    • 文件发生归档操作(文档位置发生变化,日志rotation),不能影响当前的内容读取

    Filter Plugin

    Filter Plugin可以对Logstash Event进行各种处理,例如解析,删除字段,类型转换

    • Date: 日期解析
    • Dissect: 分割符解析
    • Grok: 正则匹配解析
    • Mutate: 处理字段。重命名,删除,替换
    • Ruby: 利用Ruby 代码来动态修改Event

    Filter Plugin - Mutate

    对字段做各种操作:

    • Convert : 类型转换
    • Gsub : 字符串替换
    • Split / Join /Merge: 字符串切割,数组合并字符串,数组合并数组
    • Rename: 字段重命名
    • Update / Replace: 字段内容更新替换
    • Remove_field: 字段删除

    Logstash导入数据到ES

    • 1)测试数据集下载:https://grouplens.org/datasets/movielens/
      https://files.grouplens.org/datasets/movielens/ml-25m.zip
      在这里插入图片描述

    • 2)准备logstash-movie.conf配置文件

    input {
      file {
        path => "/home/es/logstash-7.17.3/dataset/movies.csv"
        start_position => "beginning"
        sincedb_path => "/dev/null"
      }
    } 
    filter {
      csv {
        separator => ","
        columns => ["id","content","genre"]
      }
    
      mutate {
        split => { "genre" => "|" }
        remove_field => ["path", "host","@timestamp","message"]
      }
    
      mutate {
        split => ["content", "("]
        add_field => { "title" => "%{[content][0]}"}
        add_field => { "year" => "%{[content][1]}"}
      }
    
      mutate {
        convert => {
          "year" => "integer"
        }
        strip => ["title"]
        remove_field => ["path", "host","@timestamp","message","content"]
      }
    
    }
    output {
       elasticsearch {
         hosts => "http://localhost:9200"
         index => "movies"
         document_id => "%{id}"
         user => "elastic"
         password => "123456"
       }
      stdout {}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 3)运行logstash
      bin/logstash -f logstash-movie.conf

    在这里插入图片描述

    get /movies/_search
    
    {
      "took" : 0,
      "timed_out" : false,
      "_shards" : {
        "total" : 1,
        "successful" : 1,
        "skipped" : 0,
        "failed" : 0
      },
      "hits" : {
        "total" : {
          "value" : 10000,
          "relation" : "gte"
        },
        "max_score" : 1.0,
        "hits" : [
          {
            "_index" : "movies",
            "_type" : "_doc",
            "_id" : "6747",
            "_score" : 1.0,
            "_source" : {
              "id" : "6747",
              "year" : 1960,
              "title" : "Adventures of Huckleberry Finn, The",
              "genre" : [
                "Adventure",
                "Children"
              ],
              "@version" : "1"
            }
          },
          {
            "_index" : "movies",
            "_type" : "_doc",
            "_id" : "6748",
            "_score" : 1.0,
            "_source" : {
              "id" : "6748",
              "year" : 1979,
              "title" : "Brood, The",
              "genre" : [
                "Horror"
              ],
              "@version" : "1"
            }
          },
          {
            "_index" : "movies",
            "_type" : "_doc",
            "_id" : "6749",
            "_score" : 1.0,
            "_source" : {
              "id" : "6749",
              "year" : 1937,
              "title" : "Prince and the Pauper, The",
              "genre" : [
                "Adventure",
                "Drama"
              ],
              "@version" : "1"
            }
          },
          ....
        ]
    }  
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68

    同步数据库数据到Elasticsearch

    需求: 将数据库中的数据同步到ES,借助ES的全文搜索,提高搜索速度

    • 需要把新增用户信息同步到Elasticsearch中
    • 用户信息Update 后,需要能被更新到Elasticsearch
    • 支持增量更新
    • 用户注销后,不能被ES所搜索到

    实现思路

    • 基于canal同步数据(项目实战中讲解)
    • 借助JDBC Input Plugin将数据从数据库读到Logstash
      • 需要自己提供所需的 JDBC Driver;
      • JDBC Input Plugin 支持定时任务 Scheduling,其语法来自 Rufus-scheduler,其扩展了 Cron,使用 Cron 的语法可以完成任务的触发;
      • JDBC Input Plugin 支持通过 Tracking_column / sql_last_value 的方式记录 State,最终实现增量的更新;
      • https://www.elastic.co/cn/blog/logstash-jdbc-input-plugin

    JDBC Input Plugin实现步骤

    • 1)拷贝jdbc依赖到logstash-7.17.3/drivers目录下
      /srv/soft/logstash-7.17.5/drivers/mysql-connector-java-5.1.49.jar
    • 2)准备mysql-demo.conf配置文件
    input {
      jdbc {
        jdbc_driver_library => "/srv/soft/logstash-7.17.5/drivers/mysql-connector-java-5.1.49.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        jdbc_connection_string => "jdbc:mysql://localhost:3306/db-es-test?useSSL=false"
        jdbc_user => "xxxxxxxx"
        jdbc_password => "xxxxx"
        #启用追踪,如果为true,则需要指定tracking_column
        use_column_value => true
        #指定追踪的字段,
        tracking_column => "last_updated"
        #追踪字段的类型,目前只有数字(numeric)和时间类型(timestamp),默认是数字类型
        tracking_column_type => "numeric"
        #记录最后一次运行的结果
        record_last_run => true
        #上面运行结果的保存位置
        last_run_metadata_path => "jdbc-position.txt"
        statement => "SELECT * FROM user where last_updated >:sql_last_value;"
        schedule => " * * * * * *"
      }
    }
    output {
      elasticsearch {
        document_id => "%{id}"
        document_type => "_doc"
        index => "users"
        hosts => ["http://localhost:9200"]
        user => "elastic"
        password => "123456"
      }
      stdout{
        codec => rubydebug
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 3)运行logstash
      bin/logstash -f mysql-demo.conf

    测试、新增、更新、删除

    #user表
    CREATE TABLE `user` (
      `id` int NOT NULL AUTO_INCREMENT,
      `name` varchar(50) DEFAULT NULL,
      `address` varchar(50) CHARACTER DEFAULT NULL,
      `last_updated` bigint DEFAULT NULL,
      `is_deleted` int DEFAULT NULL,
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
    #插入数据
    INSERT INTO user(name,address,last_updated,is_deleted) VALUES("张三","广州天河",unix_timestamp(NOW()),0)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    ES查询过滤删除数据

    # 创建 alias,只显示没有被标记 deleted的用户
    POST /_aliases
    {
      "actions": [
        {
          "add": {
            "index": "users",
            "alias": "view_users",
            "filter" : { "term" : { "is_deleted" : 0} }
          }
        }
      ]
    }
    
    # 通过 Alias查询,查不到被标记成 deleted的用户
    POST view_users/_search
    
    POST view_users/_search
    {
      "query": {
        "term": {
          "name.keyword": {
            "value": "张三"
          }
        }
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    轻量型数据采集器 Beats

    https://www.elastic.co/guide/en/beats/libbeat/7.17/index.html

    Beats 是一个免费且开放的平台,集合了多种单一用途的数据采集器。它们从成百上千或成千上万台机器和系统向 Logstash 或 Elasticsearch 发送数据。

    在这里插入图片描述

    FileBeat简介

    FileBeat专门用于转发和收集日志数据的轻量级采集工具。它可以作为代理安装在服务器上,FileBeat监视指定路径的日志文件,收集日志数据,并将收集到的日志转发到Elasticsearch或者Logstash。

    FileBeat的工作原理

    启动FileBeat时,会启动一个或者多个输入(Input),这些Input监控指定的日志数据位置。FileBeat会针对每一个文件启动一个Harvester(收割机)。Harvester读取每一个文件的日志,将新的日志发送到libbeat,libbeat将数据收集到一起,并将数据发送给输出(Output)。

    在这里插入图片描述

    Logstash vs FileBeat

    • Logstash是在jvm上运行的,资源消耗比较大。而FileBeat是基于golang编写的,功能较少但资源消耗也比较小,更轻量级。
    • Logstash 和 Filebeat都具有日志收集功能,Filebeat更轻量,占用资源更少
    • Logstash 具有Filter功能,能过滤分析日志
    • 一般结构都是Filebeat采集日志,然后发送到消息队列、Redis、MQ中,然后Logstash去获取,利用Filter功能过滤分析,然后存储到Elasticsearch中
    • FileBeat和Logstash配合,实现背压机制。当将数据发送到Logstash或 Elasticsearch时,Filebeat使用背压敏感协议,以应对更多的数据量。如果Logstash正在忙于处理数据,则会告诉Filebeat 减慢读取速度。一旦拥堵得到解决,Filebeat就会恢复到原来的步伐并继续传输数据。

    Filebeat安装

    https://www.elastic.co/guide/en/beats/filebeat/7.17/filebeat-installation-configuration.html

    1)下载并解压Filebeat

    下载地址:https://www.elastic.co/cn/downloads/past-releases#filebeat
    选择版本:7.17.5

    https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.17.5-linux-x86_64.tar.gz
    tar zxvf filebeat-7.17.5-linux-x86_64.tar.gz

    2)编辑配置

    修改 filebeat.yml 以设置连接信息:

    # ---------------------------- Elasticsearch Output ----------------------------
    output.elasticsearch:
      # Array of hosts to connect to.
      hosts: ["localhost:9200"]
    
      # Protocol - either `http` (default) or `https`.
      #protocol: "https"
    
      # Authentication credentials - either API key or username/password.
      #api_key: "id:api_key"
      #username: "elastic"
      #password: "changeme"
    
    # ------------------------------ Logstash Output -------------------------------
    #output.logstash:
      # The Logstash hosts
      #hosts: ["localhost:5044"]
    
      # Optional SSL. By default is off.
      # List of root certificates for HTTPS server verifications
      #ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]
    
      # Certificate for SSL client authentication
      #ssl.certificate: "/etc/pki/client/cert.pem"
    
      # Client Certificate Key
      #ssl.key: "/etc/pki/client/cert.key"
    setup.kibana:
      host: "xxxxx.xxxx.com:5601"
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    3) 启用和配置数据收集模块

    从安装目录中,运行:

    查看模块列表 ./filebeat modules list
    启用nginx模块 ./filebeat modules enable nginx
    启用 Logstash 模块 ./filebeat modules enable logstash

    如果需要更改nginx日志路径,修改modules.d/nginx.yml

    - module: nginx
      access:
        var.paths: ["/var/log/nginx/access.log*"]
    
    • 1
    • 2
    • 3

    在 modules.d/logstash.yml 文件中修改设置

    - module: logstash
      log:
        enabled: true
        var.paths: ["/srv/soft/logstash-7.17.5/logs/*.log"]
    
    • 1
    • 2
    • 3
    • 4

    在这里插入图片描述
    在这里插入图片描述

    4)启动 Filebeat

    setup命令加载Kibana仪表板。 如果仪表板已经设置,则忽略此命令
    ./filebeat setup

    启动Filebeat
    ./filebeat -e

    ELK整合

    案例:采集tomcat服务器日志

    Tomcat服务器运行过程中产生很多日志信息,通过Logstash采集并存储日志信息至ElasticSearch中

    使用FileBeats将日志发送到Logstash

    1)创建配置文件filebeat-logstash.yml,配置FileBeats将数据发送到Logstash

    • pattern:正则表达式
    • negate:true 或 false;默认是false,匹配pattern的行合并到上一行;true,不匹配pattern的行合并到上一行
    • match:after 或 before,合并到上一行的末尾或开头

    vim filebeat-logstash.yml
    chmod 644 filebeat-logstash.yml

    #因为Tomcat的web log日志都是以IP地址开头的,所以我们需要修改下匹配字段。
    # 不以ip地址开头的行追加到上一行
    filebeat.inputs:
    - type: log
      enabled: true
      paths:
        - /home/es/apache-tomcat-8.5.33/logs/*access*.*
      multiline.pattern: '^\\d+\\.\\d+\\.\\d+\\.\\d+ '
      multiline.negate: true
      multiline.match: after
    
    output.logstash:
      enabled: true
      hosts: ["x.x.x.x:5044"]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    2)启动FileBeat,并指定使用指定的配置文件

    ./filebeat -e -c filebeat-logstash.yml

    可能出现的异常:

    异常1:

    Exiting: error loading config file: config file ("filebeat-logstash.yml") can only be writable by the owner but the permissions are "-rw-rw-r--" (to fix the permissions use: 'chmod go-w /home/es/filebeat-7.17.3-linux-x86_64/filebeat-logstash.yml')
    因为安全原因不要其他用户写的权限,去掉写的权限就可以了
    chmod 644 filebeat-logstash.yml

    异常2:

    Failed to connect to backoff(async(tcp://192.168.65.204:5044)): dial tcp 192.168.65.204:5044: connect: connection refused
    FileBeat将尝试建立与Logstash监听的IP和端口号进行连接。但此时,我们并没有开启并配置Logstash,所以FileBeat是无法连接到Logstash的。

    配置Logstash接收FileBeat收集的数据并打印

    vim config/filebeat-console.conf

    # 配置从FileBeat接收数据
    input {
        beats {
          port => 5044
        }
    }
    
    output {
        stdout {
          codec => rubydebug
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    测试logstash配置是否正确
    bin/logstash -f config/filebeat-console.conf --config.test_and_exit

    启动logstash
    bin/logstash -f config/filebeat-console.conf --config.reload.automatic
    reload.automatic:修改配置文件时自动重新加载

    测试访问tomcat,logstash是否接收到了Filebeat传过来的tomcat日志

    在这里插入图片描述

    Logstash输出数据到Elasticsearch

    如果我们需要将数据输出值ES而不是控制台的话,我们修改Logstash的output配置。

    vim config/filebeat-elasticSearch.conf

    input {
        beats {
          port => 5044
        }
    }
    
    output {
      elasticsearch {
        hosts => ["http://localhost:9200"]
        user => "elastic"
        password => "123456"
      }
      stdout{
        codec => rubydebug
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    启动logstash

    bin/logstash -f config/filebeat-elasticSearch.conf --config.reload.automatic

    ES中会生成一个以logstash开头的索引,测试日志是否保存到了ES

    get logstash-2022.07.28-000001/_search
    
    response
    {
      "took" : 0,
      "timed_out" : false,
      "_shards" : {
        "total" : 1,
        "successful" : 1,
        "skipped" : 0,
        "failed" : 0
      },
      "hits" : {
        "total" : {
          "value" : 1,
          "relation" : "eq"
        },
        "max_score" : 1.0,
        "hits" : [
          {
            "_index" : "logstash-2022.07.28-000001",
            "_type" : "_doc",
            "_id" : "vL9QRIIBp7dqYq6zWFCS",
            "_score" : 1.0,
            "_source" : {
              "@version" : "1",
              "ecs" : {
                "version" : "1.12.0"
              },
              "log" : {
                "flags" : [
                  "multiline"
                ],
                "file" : {
                  "path" : "/srv/soft/nginx/logs/access.log"
                },
                "offset" : 27904175
              },
              "tags" : [
                "beats_input_codec_plain_applied"
              ],
              "@timestamp" : "2022-07-28T10:18:04.953Z",
              "agent" : {
                "version" : "7.17.5",
                "type" : "filebeat",
                "ephemeral_id" : "4aec03c2-a44d-41f2-a1f8-f0016b78bf16",
                "name" : "k8s-node1",
                "id" : "a47818d0-10a8-46ae-a732-7bf00ca4edab",
                "hostname" : "k8s-node1"
              },
              "message" : "120.244.232.221 - - [28/Jul/2022:18:17:55 +0800] \"GET / HTTP/1.1\" 304 0 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36\"\n120.244.232.221 - - [28/Jul/2022:18:17:55 +0800] \"GET /css/chunk-vendors.0cf3eefd.css HTTP/1.1\" 200 586691 \"http://test.local.com/\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36\"\n120.244.232.221 - - [28/Jul/2022:18:17:56 +0800] \"GET /api/v5/mall/mall.json?mall_version=v3 HTTP/1.1\" 304 0 \"http://test.local.com/\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36\"",
              "input" : {
                "type" : "log"
              },
              "host" : {
                "name" : "k8s-node1"
              }
            }
          }
        ]
      }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63

    思考:日志信息都保证在message字段中,是否可以把日志进行解析一个个的字段?例如:IP字段、时间、请求方式、请求URL、响应结果

    利用Logstash过滤器解析日志

    从日志文件中收集到的数据包含了很多有效信息,比如IP、时间等,在Logstash中可以配置过滤器Filter对采集到的数据进行过滤处理,Logstash中有大量的插件可以供我们使用。
    查看Logstash已经安装的插件

    ➜  logstash-7.17.5 bin/logstash-plugin list
    Using JAVA_HOME defined java: /srv/soft/jdk1.8.0_131/
    WARNING: Using JAVA_HOME while Logstash distribution comes with a bundled JDK.
    DEPRECATION: The use of JAVA_HOME is now deprecated and will be removed starting from 8.0. Please configure LS_JAVA_HOME instead.
    logstash-codec-avro
    logstash-codec-cef
    logstash-codec-collectd
    logstash-codec-dots
    logstash-codec-edn
    logstash-codec-edn_lines
    logstash-codec-es_bulk
    logstash-codec-fluent
    logstash-codec-graphite
    logstash-codec-json
    logstash-codec-json_lines
    logstash-codec-line
    logstash-codec-msgpack
    logstash-codec-multiline
    logstash-codec-netflow
    logstash-codec-plain
    logstash-codec-rubydebug
    logstash-filter-aggregate
    logstash-filter-anonymize
    logstash-filter-cidr
    logstash-filter-clone
    logstash-filter-csv
    logstash-filter-date
    logstash-filter-de_dot
    logstash-filter-dissect
    logstash-filter-dns
    logstash-filter-drop
    logstash-filter-elasticsearch
    logstash-filter-fingerprint
    logstash-filter-geoip
    logstash-filter-grok
    logstash-filter-http
    logstash-filter-json
    logstash-filter-kv
    logstash-filter-memcached
    logstash-filter-metrics
    logstash-filter-mutate
    logstash-filter-prune
    logstash-filter-ruby
    logstash-filter-sleep
    logstash-filter-split
    logstash-filter-syslog_pri
    logstash-filter-throttle
    logstash-filter-translate
    logstash-filter-truncate
    logstash-filter-urldecode
    logstash-filter-useragent
    logstash-filter-uuid
    logstash-filter-xml
    logstash-input-azure_event_hubs
    logstash-input-beats
    └── logstash-input-elastic_agent (alias)
    logstash-input-couchdb_changes
    logstash-input-dead_letter_queue
    logstash-input-elasticsearch
    logstash-input-exec
    logstash-input-file
    logstash-input-ganglia
    logstash-input-gelf
    logstash-input-generator
    logstash-input-graphite
    logstash-input-heartbeat
    logstash-input-http
    logstash-input-http_poller
    logstash-input-imap
    logstash-input-jms
    logstash-input-pipe
    logstash-input-redis
    logstash-input-s3
    logstash-input-snmp
    logstash-input-snmptrap
    logstash-input-sqs
    logstash-input-stdin
    logstash-input-syslog
    logstash-input-tcp
    logstash-input-twitter
    logstash-input-udp
    logstash-input-unix
    logstash-integration-elastic_enterprise_search
     ├── logstash-output-elastic_app_search
     └──  logstash-output-elastic_workplace_search
    logstash-integration-jdbc
     ├── logstash-input-jdbc
     ├── logstash-filter-jdbc_streaming
     └── logstash-filter-jdbc_static
    logstash-integration-kafka
     ├── logstash-input-kafka
     └── logstash-output-kafka
    logstash-integration-rabbitmq
     ├── logstash-input-rabbitmq
     └── logstash-output-rabbitmq
    logstash-output-cloudwatch
    logstash-output-csv
    logstash-output-elasticsearch
    logstash-output-email
    logstash-output-file
    logstash-output-graphite
    logstash-output-http
    logstash-output-lumberjack
    logstash-output-nagios
    logstash-output-null
    logstash-output-pipe
    logstash-output-redis
    logstash-output-s3
    logstash-output-sns
    logstash-output-sqs
    logstash-output-stdout
    logstash-output-tcp
    logstash-output-udp
    logstash-output-webhdfs
    logstash-patterns-core
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115

    Grok插件

    Grok是一种将非结构化日志解析为结构化的插件。这个工具非常适合用来解析系统日志、Web服务器日志、MySQL或者是任意其他的日志格式。

    https://www.elastic.co/guide/en/logstash/7.17/plugins-filters-grok.html

    Grok语法

    Grok是通过模式匹配的方式来识别日志中的数据,可以把Grok插件简单理解为升级版本的正则表达式。它拥有更多的模式,默认Logstash拥有120个模式。如果这些模式不满足我们解析日志的需求,我们可以直接使用正则表达式来进行匹配。
    grok模式的语法是:

    %{SYNTAX:SEMANTIC}
    
    • 1

    SYNTAX(语法)指的是Grok模式名称,SEMANTIC(语义)是给模式匹配到的文本字段名。

    例如:

    %{NUMBER:duration} %{IP:client}
    duration表示:匹配一个数字,client表示匹配一个IP地址。

    默认在Grok中,所有匹配到的的数据类型都是字符串,如果要转换成int类型(目前只支持int和float),可以这样:%{NUMBER:duration:int} %{IP:client}

    常用的Grok模式

    https://help.aliyun.com/document_detail/129387.html?scm=20140722.184.2.173

    用法

    filter {
      grok {
        match => { "message" => "%{IP:ip} - - \[%{HTTPDATE:time}\] \"%{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:http_version}\" %{NUMBER:http_status_code} %{NUMBER:size} \"(?\S+)\" %{QS:http_user_agent} \"(?\S+)\" \"(?\S+)\" \"(?\S+)\" %{NUMBER:request_time} %{NUMBER:upstream_response_time}" }
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    nginx

        log_format main '$remote_addr - $remote_user [$time_local] '
                       '"$request" $status $body_bytes_sent '
                       '"$http_referer" "$http_user_agent" '
                       '"$upstream_http_x_sticky_vk" "$cookie_UID" "$upstream_cookie_UID" '
                       '$request_time $upstream_response_time ';
        access_log  logs/access.log  main;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    比如,nginx日志
    120.244.232.221 - - [28/Jul/2022:19:05:27 +0800] "GET /api/v5/mall/mall.json?mall_version=v3 HTTP/1.1" 304 0 "http://test.local.com/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36" "-" "-" "-" 0.032 0.032

    解析后的字段

    字段名说明
    client IP浏览器端IP
    timestamp请求的时间戳
    method请求方式(GET/POST)
    uri请求的链接地址
    status服务器端响应状态
    length响应的数据长度

    grok模式

    %{IP:ip} - - \[%{HTTPDATE:time}\] \"%{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:http_version}\" %{NUMBER:http_status_code} %{NUMBER:size} \"(?\S+)\" %{QS:http_user_agent} \"(?\S+)\" \"(?\S+)\" \"(?\S+)\" %{NUMBER:request_time} %{NUMBER:upstream_response_time}

    为了方便测试,我们可以使用Kibana来进行Grok开发:

    在这里插入图片描述

    修改Logstash配置文件

    vim config/filebeat-console.conf

    input {
        beats {
          port => 5044
        }
    }
    
    filter {
      grok {
        match => { 
        "message" => "%{IP:ip} - - \[%{HTTPDATE:date}\] \"%{WORD:method} %{PATH:uri} %{DATA:protocol}\" %{INT:status:int} %{INT:length:int}" 
        }
    }
    }
    
    output {
        stdout {
          codec => rubydebug
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    启动logstash测试

    bin/logstash -f config/filebeat-console.conf --config.reload.automatic

    使用mutate插件过滤掉不需要的字段

    mutate {
        enable_metric => "false"
        remove_field => ["message", "log", "tags", "input", "agent", "host", "ecs", "@version"]
    }
    
    • 1
    • 2
    • 3
    • 4

    要将日期格式进行转换,我们可以使用Date插件来实现。该插件专门用来解析字段中的日期,官方说明文档:https://www.elastic.co/guide/en/logstash/7.17/plugins-filters-date.html
    用法如下:

    date {
        match => ["date","dd/MMM/yyyy:HH:mm:ss Z","yyyy-MM-dd HH:mm:ss"]
        target => "date"
    }
    
    • 1
    • 2
    • 3
    • 4

    将date字段转换为「年月日 时分秒」格式。默认字段经过date插件处理后,会输出到@timestamp字段,所以,我们可以通过修改target属性来重新定义输出字段。

    输出到Elasticsearch指定索引

    index来指定索引名称,默认输出的index名称为:logstash-%{+yyyy.MM.dd}。但注意,要在index中使用时间格式化,filter的输出必须包含 @timestamp字段,否则将无法解析日期。

    output {
      elasticsearch {
        index => "tomcat_web_log_%{+YYYY-MM}"
        hosts => ["http://localhost:9200"]
        user => "elastic"
        password => "123456"
      }
      stdout{
        codec => rubydebug
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    注意:index名称中,不能出现大写字符
    完整的Logstash配置文件

    vim config/filebeat-filter-es.conf

    input {
        beats {
        port => 5044
        }
    }
    
    filter {
        grok {
        match => { 
        "message" => "%{IP:ip} - - \[%{HTTPDATE:time:date}\] \"%{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:http_version}\" %{NUMBER:http_status_code} %{NUMBER:size} \"(?\S+)\" %{QS:http_user_agent} \"(?\S+)\" \"(?\S+)\" \"(?\S+)\" %{NUMBER:request_time} %{NUMBER:upstream_response_time}" 
        }
    }
    mutate {
        enable_metric => "false"
        remove_field => ["message", "log", "tags", "input", "agent", "host", "ecs", "@version"]
    }
    date {
        match => ["date","dd/MMM/yyyy:HH:mm:ss Z","yyyy-MM-dd HH:mm:ss"]
        target => "date"
        }
    }
    
    output {
        stdout {
        codec => rubydebug
    }
    elasticsearch {
        index => "tomcat_web_log_%{+YYYY-MM}"
        hosts => ["http://localhost:9200"]
        user => "elastic"
        password => "123456"
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    启动logstash

    bin/logstash -f config/filebeat-filter-es.conf --config.reload.automatic

    get nginx_log_2022-07/_search
    
    {
      "took" : 67,
      "timed_out" : false,
      "_shards" : {
        "total" : 1,
        "successful" : 1,
        "skipped" : 0,
        "failed" : 0
      },
      "hits" : {
        "total" : {
          "value" : 3,
          "relation" : "eq"
        },
        "max_score" : 1.0,
        "hits" : [
          {
            "_index" : "nginx_log_2022-07",
            "_type" : "_doc",
            "_id" : "wb-HRIIBp7dqYq6zSFDd",
            "_score" : 1.0,
            "_source" : {
              "request" : "/",
              "http_referer" : "-",
              "request_time" : "0.001",
              "cookie_UID" : "-",
              "http_status_code" : "304",
              "time" : "28/Jul/2022:19:18:02 +0800",
              "http_user_agent" : "\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36\"",
              "method" : "GET",
              "upstream_response_time" : "0.000",
              "size" : "0",
              "upstream_cookie_UID" : "-",
              "upstream_http_x_sticky_vk" : "-",
              "http_version" : "1.1",
              "ip" : "120.244.232.221",
              "@timestamp" : "2022-07-28T11:18:05.177Z"
            }
          },
          {
            "_index" : "nginx_log_2022-07",
            "_type" : "_doc",
            "_id" : "wr-JRIIBp7dqYq6zflAU",
            "_score" : 1.0,
            "_source" : {
              "request" : "/",
              "http_referer" : "-",
              "request_time" : "0.001",
              "cookie_UID" : "-",
              "http_status_code" : "304",
              "time" : "28/Jul/2022:19:20:20 +0800",
              "http_user_agent" : "\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36\"",
              "method" : "GET",
              "upstream_response_time" : "0.000",
              "size" : "0",
              "upstream_cookie_UID" : "-",
              "upstream_http_x_sticky_vk" : "-",
              "http_version" : "1.1",
              "ip" : "120.244.232.221",
              "@timestamp" : "2022-07-28T11:20:30.188Z"
            }
          },
          {
            "_index" : "nginx_log_2022-07",
            "_type" : "_doc",
            "_id" : "w7-NRIIBp7dqYq6zi1Do",
            "_score" : 1.0,
            "_ignored" : [
              "message.keyword"
            ],
            "_source" : {
              "request" : "/",
              "http_referer" : "-",
              "request_time" : "0.000",
              "message" : """120.244.232.221 - - [28/Jul/2022:19:24:47 +0800] "GET / HTTP/1.1" 304 0 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36" "-" "-" "-" 0.000 0.004 
    120.244.232.221 - - [28/Jul/2022:19:24:47 +0800] "GET /api/v5/mall/mall.json?mall_version=v3 HTTP/1.1" 304 0 "http://test.local.com/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36" "-" "-" "-" 0.050 0.052 """,
              "cookie_UID" : "-",
              "http_status_code" : "304",
              "time" : "28/Jul/2022:19:24:47 +0800",
              "http_user_agent" : "\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36\"",
              "method" : "GET",
              "upstream_response_time" : "0.004",
              "size" : "0",
              "upstream_cookie_UID" : "-",
              "upstream_http_x_sticky_vk" : "-",
              "http_version" : "1.1",
              "ip" : "120.244.232.221",
              "@timestamp" : "2022-07-28T11:24:55.203Z"
            }
          }
        ]
      }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96

    索引模式

    在这里插入图片描述

    在这里插入图片描述

  • 相关阅读:
    聊聊spring中bean的作用域
    Linux 命令学习 -磁盘分区和格式化
    16 C++设计模式之职责链(Chain of Responsibility)模式
    JS--数组类型 Array 1
    Spring Cloud Consul 从入门到精通
    一幅长文细学Vue(五)——组件高级(上)
    小样本学习导论
    接口的详解
    Vue 3 基础(二)基础 1
    第六十章 符号概览
  • 原文地址:https://blog.csdn.net/menxu_work/article/details/126032167