Logstash的配置单独开一个,因为关于数据转换这块,其实内容比较多.也是Logstash存在的一个很重要的原因cuiyaonan2000@163.com
Logstash不只是一个input|filter|output的数据流,而是一个input|decode|filter|encode|output的数据流。
codec就是用来decode,encode 事件的。所以codec常用在input和output中常见的codec有:
plain:读取原始内容
dots:将内容简化为点进行输出
rubydebug:将Logstash Events按照ruby格式输出,方便调试
line:处理带有换行符的内容
json:处理json格式的内容
multiline:处理多行数据的内容
- #主要用于事件之间没有分隔的纯文本。
- input {
- stdin {
- codec => plain {}
- }
- }
-
-
- #logstash事件output到kafka默认的codec为json,如果设置codec为plain,除了message数据之外还会有一个主机名和时间戳的字段生成,如果只需要message字段,配置如下:
- output {
- kafka {
- codec => plain {
- format => "%{message}"
- }
- }
- }
- #如果数据为json格式,可直接使用该插件,从而省掉filter/grok的配置,降低过滤器的cpu消耗
- input {
- stdin {
- codec => json
- }
- }
用于合并多行数据
有些时候,应用程序调试日志会包含非常丰富的内容,为一个事件打印出很多行内容。这种日志通常都很难通过命令行解析的方式做分析。multiline插件用于解决此类问题。
示例:
tomcat的日志catalina.out有很多调试的日志,日志都以时间戳格式"20-Apr-2016 11:29:28.535"开头,那么我们可以配置如下:
- input {
-
- file{
- path => "/app/elk/logstash-5.6.11-access/llqlogs/5.out"
- tags => ["api-core"]
- codec => multiline {
- pattern => "^\d{2}\:\d{2}\:\d{2}\.\d{3}"
- auto_flush_interval => 10
- negate => true
- what => "previous"
- }
- stat_interval => "1"
- }
- }
pattern为正则表达式匹配
negate为布尔类型,true表示否定正则表达式,即不匹配正则表达式。false为匹配正则表达式。默认值为false
what表示如果正则匹配与否,事件属于上一个或者下一个事件。有两个值可选previous(上一个)或者next(下一个)
auto_flush_interval表示当多长时间没有新的数据,之前积累的多行数据转换为一个事件。这里的10表示10秒
以上的配置可以解释为:不匹配pattern时间戳格式开头的行数据,都归属到上一个事件中,即在下一个匹配的时间戳出现之前的所有行的输出都属于同一个事件,从而达到合并多行的目的,同时等待10秒没有新数据产生,那么最后一个时间戳格式后的所有行数据就是最后一个事件。
格式:
input {
stdin{ }
}
格式:
input {
http {
port => 端口号
}
}
input {
tcp {
mode => "server"
host => "0.0.0.0"
port => 端口号
codec => json_lines
}
}
只需要监听一个端口就可以了
input {
beats {
port => 5044
}
}
多用于调试
output {
stdout {
codec => rubydebug
}
}
实现将分散在多地的文件统一到一处的需求,比如将所有web机器的web日志收集到1个文件中,从而方便查阅信息
output {
file {
path => "文件路径"
codec => line {format => %{message}}
}
}
output {
elasticsearch {
hosts => ["http://192.168.3.12:9200"]
index => "logstash-%{+YYYY.MM.dd}"
document_type => "_doc"
user => "用户名"
password => "密码"
}
}