• logstash输出到es模式action实践


    logstash输出es

    模块

    logstash一共有 input,filter,output 三个模块

    配置

    conf示例

    input {
      jdbc {
        jdbc_connection_string => "jdbc:mysql:/XXX
        jdbc_user => "XXX"
        jdbc_password => "XXX"
        jdbc_driver_library => "/data/logstash-7.1.1/logstash-core/lib/jars/mysql-connector-java-8.0.19.jar"
        jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
        jdbc_paging_enabled => true
        jdbc_page_size => "10000"
        jdbc_default_timezone =>"Asia/Shanghai"
        statement => "select XXX from user  where id >=:sql_last_value limit 10000"
        schedule => "*/5 * * * * *"
        use_column_value => true
        tracking_column => "id"
        tracking_column_type => "numeric"
        last_run_metadata_path => "/data/logstash-7.1.1/bin/prod/user_prod"
      }
    }
    
    output {
      elasticsearch {
        hosts => ["XXX:9200"]
        user => "XXX"
        password => "XXX"
        index => "XXX_index"
        action => "update"
        doc_as_upsert => "true"
        document_id => "%{id}"
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    output插件配置

    output {
      elasticsearch {
        hosts => ["XXX:9200"]
        user => "XXX"
        password => "XXX"
        index => "XXX_index"
        action => "update"
        doc_as_upsert => "true"
        document_id => "%{id}"
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    action类型

    • index:新增覆盖
    • delete:通过id删除文档需要指定 document_id
    • update:修改/修改或新增,upsert
    • create:新增不覆盖

    action对应文档看 Elasticsearch bulk API documentation

    几种 output es 的 action 配置

    1. 全量同步建立索引
    output {
      elasticsearch {
        hosts => ["XXX:9200"]
        user => "XXX"
        password => "XXX"
        index => "XXX_index"
        action => "index"
        document_id => "%{id}"
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    注意:

    • 若指定 document_id 会先查询该文档,存在更新不存在新增
    • document_id 不指定 index 模式会自动生成 _id
    • 对于日志写入等不存在修改场景的业务库尽量不要指定 document_id 提高写入效率
    • index 模式是默认 action
    1. 删除指定文档
    output {
      elasticsearch {
        hosts => ["XXX:9200"]
        user => "XXX"
        password => "XXX"
        index => "XXX_index"
        action => "delete"
        document_id => "%{id}"
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    1. 修改指定文档
    output {
      elasticsearch {
        hosts => ["XXX:9200"]
        user => "XXX"
        password => "XXX"
        index => "XXX_index"
        action => "update"
        document_id => "%{id}"
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    1. 修改指定文档若文档不存在则新增
    output {
      elasticsearch {
        hosts => ["http://localhost:9200"]
        index => "logstash-%{+YYYY.MM.dd}"
        document_id => "%{id}"
        action => "update"
        doc_as_upsert => true
        # 不能同时设置 doc_as_upsert => ture 和 upsert
        # upsert => '{"test":"hello world"}'
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    注意:

    • doc_as_upsert为true,使用input的值做为文档的值
    • scripted_upsert为true,使用script作为文档的值
    • 不能同时设置 doc_as_upsert 为 true 和 upsert 为json字符串
    output {
      elasticsearch {
        scripted_upsert => "true"
        script => "ctx._source.message = params.event.get('message')"
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 设置 upsert:Set upsert content for update mode. Create a new document with this parameter as json string if document_id doesn’t exists;为更新模式设置upsert内容。如果document_id不存在,则使用此参数作为json字符串创建新文档
    1. 只新增文档,文档若存在失败
    output {
      elasticsearch {
        hosts => ["XXX:9200"]
        user => "XXX"
        password => "XXX"
        index => "XXX_index"
        action => "create"
        document_id => "%{id}"
        manage_template => true
        template => "/data/logstash-7.1.1/bin/temp/xxx.json"
        template_overwrite => "true"
        template_name => "test"
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    注意:

    • template_overwrite 设置为true,模板名字一样的时候,logstash的该模板(template_name)会覆盖es中的该命名模板
    • manage_template打开/关闭模板管理,默认true若为false需要手动预加载模板到es
    预加载
    curl -ss -XPUT "http://localhost:9200/_template/indexName/" -H 'Content-Type: application/json' -d @"/data/logstash-7.1.1/bin/temp/xxx.json";
    
    • 1
    • 2
    • 如果打开模板管理,template给出模板的路径
    • template_name是在ES中保存模板的名称

    命令

    nohup sh logstash --path.data=XXX_test -f XXX.conf > XXX.out&
    
    • 1
  • 相关阅读:
    8.spark自适应查询-AQE之自适应调整Shuffle分区数量
    适合汽车音频系统的ADAU1977WBCPZ、ADAU1978WBCPZ、ADAU1979WBCPZ四通道 ADC,24-bit,音频
    类和对象12:容器方法
    【云原生】Docker Compose 构建 Jenkins
    Linux进程替换实现一个简单的shell
    @Autowired 和 @Resource 的区别(为什么更推荐使用@Resource ?)
    代码随想录-day2
    SQLite的DBSTAT 虚拟表(三十六)
    建筑施工员证怎么考?报名条件及报考时间是什么?
    Linux FrameBuffer(三)- struct fb_fix_screeninfo 和 struct fb_var_screeninfo 详解
  • 原文地址:https://blog.csdn.net/white_while/article/details/127405423