• Elasticsearch7.17 七 :Logstash与FileBeat详解以及ELK整合


    Logstash与FileBeat详解以及ELK整合

    ELK架构

    ELK架构分为两种,一种是经典的ELK,另外一种是加上消息队列(Redis或Kafka或RabbitMQ)和Nginx结构。
    经典的ELK主要是由Filebeat + Logstash + Elasticsearch + Kibana组成,如下图:(早期的ELK只有Logstash + Elasticsearch + Kibana)
    在这里插入图片描述
    此架构主要适用于数据量小的开发环境,存在数据丢失的危险。
    整合消息队列+Nginx架构:
    在这里插入图片描述

    Logstash介绍和使用

    Logstash 是免费且开放的服务器端数据处理管道,能够从多个来源采集数据,转换数据,然后将数据发送到您最喜欢的存储库中。
    Logstash核心概念
    Pipeline:

    • 包含了input—filter-output三个阶段的处理流程
    • 插件生命周期管理
    • 队列管理

    Logstash Event

    • 数据在内部流转时的具体表现形式。数据在input 阶段被转换为Event,在 output被转化成目标格式数据
    • Event 其实是一个Java Object,在配置文件中,对Event 的属性进行增删改查

    Codec (Code / Decode)
    将原始数据decode成Event;将Event encode成目标数据
    在这里插入图片描述
    插件
    Logstash的配置文件主要分为三个部分input 、filter 、output 。每个部分都可以配置多个插件
    Input Plugins:https://www.elastic.co/guide/en/logstash/7.17/input-plugins.html

    常见的输入插件:
    Stdin / File
    Beats / Log4J /Elasticsearch / JDBC / Kafka /Rabbitmq /Redis
    JMX/ HTTP / Websocket / UDP / TCP
    Google Cloud Storage / S3
    Github / Twitter

    Output Plugins:https://www.elastic.co/guide/en/logstash/7.17/output-plugins.html

    常见的输出插件
    Elasticsearch
    Email / Pageduty
    Influxdb / Kafka / Mongodb / Opentsdb / Zabbix
    Http / TCP / Websocket

    Filter Plugins:https://www.elastic.co/guide/en/logstash/7.17/filter-plugins.html

    事件处理插件
    Mutate 一操作Event的字段
    Metrics — Aggregate metrics
    Ruby 一执行Ruby 代码‘’

    Codec Plugins:https://www.elastic.co/guide/en/logstash/7.17/codec-plugins.html

    事件编译插件
    Line / Multiline
    JSON / Avro / Cef (ArcSight Common Event Format)
    Dots / Rubydebug

    Logstash安装和使用

    下载并解压logstash: https://www.elastic.co/cn/downloads/past-releases#logstash
    选择版本:7.17.3
    测试:运行最基本的logstash

    cd logstash-7.17.3
    #linux
    #-e选项表示,直接把配置放在命令中,这样可以有效快速进行测试
    bin/logstash -e 'input { stdin { } } output { stdout {} }'
    #windows
    .\bin\logstash.bat -e "input { stdin { } } output { stdout {} }"
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    Logstash导入数据到ES

    测试数据集下载:https://grouplens.org/datasets/movielens/
    在这里插入图片描述
    下载好数据集放到对应的目录 解压
    在这里插入图片描述

    编写logstash-movie.conf配置文件

    input {
      file {
        path => "/usr/local/es/dataset/moves/movies.csv"
        start_position => "beginning"
        sincedb_path => "/dev/null"
      }
    }
    filter {
      csv {
        separator => ","
        columns => ["id","content","genre"]
      }
    
      mutate {
        split => { "genre" => "|" }
        remove_field => ["path", "host","@timestamp","message"]
      }
    
      mutate {
    
        split => ["content", "("]
        add_field => { "title" => "%{[content][0]}"}
        add_field => { "year" => "%{[content][1]}"}
      }
    
      mutate {
        convert => {
          "year" => "integer"
        }
        strip => ["title"]
        remove_field => ["path", "host","@timestamp","message","content"]
      }
    
    }
    output {
       elasticsearch {
         hosts => "http://192.168.10.114:9200"
         index => "movies"
         document_id => "%{id}"
         user => "elastic"
         password => "123456"
       }
      stdout {}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    运行logstash

    # linux
    bin/logstash -f logstash-movie.conf
    
    • 1
    • 2

    在这里插入图片描述
    可以看到原始数据按照这个格式输出。然后来看下es上有没有对应的数据
    在这里插入图片描述
    在这里插入图片描述

    Logstash从数据库中导入数据到ES

    测试数据:

    #user表
    CREATE TABLE `user` (
      `id` int NOT NULL AUTO_INCREMENT,
      `name` varchar(50) DEFAULT NULL,
      `address` varchar(50) CHARACTER DEFAULT NULL,
      `last_updated` bigint DEFAULT NULL,
      `is_deleted` int DEFAULT NULL,
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
    #插入数据
    INSERT INTO user(name,address,last_updated,is_deleted) VALUES("张三","广州天河",unix_timestamp(NOW()),0)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    拷贝jdbc依赖到logstash-7.17.3/driver目录下
    Logstash配置文件

    input {
      jdbc {
        jdbc_driver_library => "/usr/local/es/logstash-7.17.3/driver/mysql-connector-java-5.1.48.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        jdbc_connection_string => "jdbc:mysql://xxx:3306/user?useSSL=false"
        jdbc_user => "root"
        jdbc_password => "xxx"
         #启用追踪,如果为true,则需要指定tracking_column
        use_column_value => true
        tracking_column => "last_updated"
         #追踪字段的类型,目前只有数字(numeric)和时间类型(timestamp),默认是数字类型
        tracking_column_type => "numeric"
         #记录最后一次运行的结果
        record_last_run => true
          #上面运行结果的保存位置
        last_run_metadata_path => "jdbc-position.txt"
        statement => "SELECT * FROM user_elk where last_updated >:sql_last_value;"
         #定时执行
        schedule => " * * * * * *"
      }
    }
    output {
      elasticsearch {
        document_id => "%{id}"
        document_type => "_doc"
        index => "users"
        hosts => ["http://192.168.10.114:9200"]
        user => "elastic"
        password => "123456"
      }
      stdout{
        codec => rubydebug
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    运行Logstash
    bin/logstash -f config/mysql_data.conf
    在这里插入图片描述
    说明成功,查看es数据
    在这里插入图片描述
    当对数据进行增删改查的时候 Logstash回同步的把数据库的变动更新到ES当中,因为 Logstash会定时去执行配置文件中的sql,SELECT * FROM user_elk where last_updated >:sql_last_value :sql_last_value表示最后一次执行成功的时间戳的值,这个值记录在jdbc-position.txt 也是在配置文件中配好的。
    而且删除必须是 逻辑删除才会奏效。
    测试

    # 更新
    update user_elk set address="广州白云山",last_updated=unix_timestamp(NOW()) where name="张三"
    #删除
    update user_elk set is_deleted=1,last_updated=unix_timestamp(NOW()) where name="张三"
    
    • 1
    • 2
    • 3
    • 4

    在这里插入图片描述
    在这里插入图片描述
    当我们在数据中进行删除以后,就不能在查出数据,所以要在es中进行过滤

    # 添加别名 进行过滤
    POST _aliases
    {
    "actions": [
      {
        "add": {
          "index": "users",
          "alias": "show_users",
          "filter": {
            "term": {
              "is_deleted": "0"
            }
          }
        }
      }
    ]  
    }
    # 查询
    GET show_users/_search
    {
      "query": {
        "term": {
          "name.keyword": {
            "value": "张三"
          }
        }
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    Beats介绍和使用

    Beats 是一个免费且开放的平台,集合了多种单一用途的数据采集器。它们从成百上千或成千上万台机器和系统向 Logstash 或 Elasticsearch 发送数据
    在这里插入图片描述
    FileBeat简介
    FileBeat专门用于转发和收集日志数据的轻量级采集工具。它可以作为代理安装在服务器上,FileBeat监视指定路径的日志文件,收集日志数据,并将收集到的日志转发到Elasticsearch或者Logstash。
    FileBeat的工作原理
    启动FileBeat时,会启动一个或者多个输入(Input),这些Input监控指定的日志数据位置。FileBeat会针对每一个文件启动一个Harvester。Harvester读取每一个文件的日志,将新的日志发送到libbeat,libbeat将数据收集到一起,并将数据发送给输出(Output)。
    logstash vs FileBeat

    • Logstash是在jvm上运行的,资源消耗比较大。而FileBeat是基于golang编写的,功能较少但资源消耗也比较小,更轻量级。
    • Logstash 和Filebeat都具有日志收集功能,Filebeat更轻量,占用资源更少
    • Logstash 具有Filter功能,能过滤分析日志
    • 一般结构都是Filebeat采集日志,然后发送到消息队列、Redis、MQ中,然后Logstash去获取,利用Filter功能过滤分析,然后存储到Elasticsearch中
    • FileBeat和Logstash配合,实现背压机制。当将数据发送到Logstash或 Elasticsearch时,Filebeat使用背压敏感协议,以应对更多的数据量。如果Logstash正在忙于处理数据,则会告诉Filebeat 减慢读取速度。一旦拥堵得到解决,Filebeat就会恢复到原来的步伐并继续传输数据。

    FileBeat的安装和使用

    下载并解压Filebeat
    下载地址:https://www.elastic.co/cn/downloads/past-releases#filebeat

    编辑配置
    修改 filebeat.yml 以设置连接信息:

    output.elasticsearch:
      hosts: ["192.168.10.111:9200","192.168.10.112:9200","192.168.10.114:9200"]
      username: "elastic"
      password: "123456"
    setup.kibana:
      host: "192.168.10.111:5601"
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    ** 启用和配置数据收集模块**

    # 查看可以模块列表
    ./filebeat modules list
    #启用 Logstash 模块
    ./filebeat modules enable logstash
    #在 modules.d/logstash.yml 文件中修改设置
    - module: logstash
      log:
        enabled: true
        var.paths: ["/usr/local/es/logstash-7.17.3/logs/*.log"]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    启动 Filebeat

    # setup命令加载Kibana仪表板。 如果仪表板已经设置,则忽略此命令。 
    ./filebeat setup
    # 启动Filebeat
    ./filebeat -e 
    
    • 1
    • 2
    • 3
    • 4

    在这里插入图片描述

  • 相关阅读:
    python 基于 Web3.py 和 Infura 网关采集链上数据
    buildadmin+tp8表格操作(7)表格的事件监听
    定义Student类
    流体的压力
    C#开发的OpenRA游戏之金钱系统(2)
    1.Python是什么?——跟老吕学Python编程
    【PAT甲级】1006 Sign In and Sign Out
    Go实现本地日期时间格式和时间戳相互转换
    csp初赛总结 & 那些年编程走过的坑 & 初高中信竞常考语法算法点
    手把手教你用qt链接sqlserver数据库
  • 原文地址:https://blog.csdn.net/admin522043032/article/details/125514333