目录
系统版本:Red Hat Enterprise Linux Server release 6.8
此服务器上安装了elk和kafka伪集群
Elk启动正常
Kafka伪集群启动正常
Elk与kafka的连接是由logstash的配置文件完成的,在此配置文件的input中配置kafka的信息;filter中配置过滤器的信息,以方便查看日志时进行筛选;output中则配置elasticsearch的信息。
切至logstash的安装目录的config目录
cd /home/XX/logstash-6.2.4/config
创建kafkas.conf文件
sudo vim kafkas.conf

修改kafkas.conf,添加以下行:
- input {
-
- kafka{
-
- bootstrap_servers => ["kafka的ip:port,若kafka是集群,使用逗号隔开"]
-
- client_id => "logback-kafka"
-
- auto_offset_reset => "latest"
-
- consumer_threads => 5
-
- decorate_events => true
-
- topics => ["test"]
-
- codec => "json"
-
- }
-
- }
-
- filter {
-
- grok {
-
- match => { "message" => "%{TIMESTAMP_ISO8601:timestamp}\s+%{LOGLEVEL:severity}\s+\[%{DATA:service},%{DATA:trace},%{DATA:span},%{DATA:exportable}\]\s+%{DATA:pid}\s+---\s+\[%{DATA:thread}\]\s+%{DATA:class}\s+:\s+%{GREEDYDATA:rest}" }
-
- }
-
- }
-
- output {
-
- elasticsearch {
-
- hosts => ["e的ip:port"]
- index => "XX(索引名)-%{+YYYY-MM-dd}"
-
- }
-
- }
查看logstash进程号
- ps -ef | grep logstash
-
- 杀死进程
-
- sudo kill -9 进程号

重启logstash
切至logstash的bin目录,执行命令:
sudo ./logstash -f ../config/kafkas.conf
启动成功后,执行命令:Ctrl+z,使服务进入后台运行,或直接使用后台运行方式启动logstash,命令:
sudo nohup ./logstash -f ../config/kafkas.conf &
登录kibana
路径:kibana的ip:port
点击Discover,出现kafka中的日志,证明配置成功