环境 | IP |
---|---|
ElasticSearch、Logstash、Kafka、Zookeeper | 172.16.3.226/21 |
ElasticSearch、Logstash、Kafka、Zookeeper | 172.16.3.227/21 |
ElasticSearch、Logstash、Kafka、Zookeeper | 172.16.3.228/21 |
Kibana、FileBeat、Nginx、Tomcat | 172.16.4.184/21 |
软件包下载地址
ELFK版本:点击这里可以搜索自己想要下载的版本
https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.17.5-x86_64.rpm
https://artifacts.elastic.co/downloads/kibana/kibana-7.17.5-x86_64.rpm
https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.17.5-x86_64.rpm
https://artifacts.elastic.co/downloads/logstash/logstash-7.17.5-x86_64.rpm
https://dlcdn.apache.org/zookeeper/zookeeper-3.8.0/apache-zookeeper-3.8.0-bin.tar.gz
https://downloads.apache.org/kafka/3.2.1/kafka_2.12-3.2.1.tgz
1、yum localinstall elasticsearch-7.17.5-x86_64.rpm -y
2、cd /etc/elasticsearch
3、备份一下 elasticsearch 默认配置文件
cp elasticsearch.yml{,.bak}
# systemctl cat elasticsearch 有兴趣可以查一下elasticsearch的启动信息
4、修改 elasticsearch 配置文件
egrep -v "^#|^$" elasticsearch.yml
cluster.name: chinaedu-elk
node.name: chinaedu-elk226
path.data: /var/lib/elasticsearch
path.logs: /var/log/elasticsearch
network.host: 172.16.3.226
http.port: 9200
discovery.seed_hosts: ["172.16.3.226"]
相关参数说明:
cluster.name:
集群名称,若不指定,则默认是"elasticsearch",⽇志⽂件的前缀也是集群名称。
node.name:
指定节点的名称,可以⾃定义,推荐使⽤当前的主机名,要求集群唯⼀。
path.data:
数据路径。
path.logs:
⽇志路径
network.host:
ES服务监听的IP地址
http.port:
ES服务对外暴露的端口
discovery.seed_hosts:
服务发现的主机列表,对于单点部署⽽⾔,主机列表和"network.host"字段配置相同
即可。
5、启动 elasticsearch
systemctl daemon-reload
systemctl start elasticsearch
elasticsearch
yum localinstall elasticsearch-7.17.5-x86_64.rpm -y
1、修改 elasticsearch 配置
cp /etc/elasticsearch/elasticsearch.yml{,.bak}
egrep -v "^#|^$" /etc/elasticsearch/elasticsearch.yml
cluster.name: chinaedu-elk
node.name: elk226
path.data: /var/lib/elasticsearch
path.logs: /var/log/elasticsearch
network.host: 0.0.0.0
http.port: 9200
discovery.seed_hosts: ["172.16.3.226","172.16.3.227","172.16.3.228"]
cluster.initial_master_nodes: ["172.16.3.226","172.16.3.227","172.16.3.228"]
温馨提示:
"node.name"各个节点配置要区分清楚,建议写对应的主机名称。
2、将 3.226 上的elasticsearch配置文件同步到其他主机
scp /etc/elasticsearch/elasticsearch.yml root@172.16.3.227:/etc/elasticsearch/
scp /etc/elasticsearch/elasticsearch.yml root@172.16.3.228:/etc/elasticsearch/
3、3.227 配置
...
node.name: elk227
4、3.228 配置
...
node.name: elk228
5、所有节点启用 elasticsearch
# 在启动之前先删除 elasticsearch 生成的数据
rm -rf /var/{log,lib}/elasticsearch/* /tmp/*
systemctl daemon-reload
systemctl start elasticsearch
6、启动完成后可以验证一下elasticsearch 是否正常
curl 127.0.0.1:9200
curl 127.0.0.1:9200/_cat/nodes?v
elasticsearch7 中开始免费了账号密码认证功能,下面是xpack方式开启集群密码认证
1、在es的任一节点下生成p12文件,在es目录下执行命令
/usr/share/elasticsearch/bin/elasticsearch-certutil ca -out /etc/elasticsearch/cert/elastic-certificates.p12 -pass ""
2、生成p12文件后,将p12文件复制到其他节点的机器中,尽量保持p12的目录路径一致
scp -r /etc/elasticsearch/cert/ root@172.16.3.228:/etc/elasticsearch/cert/
scp -r /etc/elasticsearch/cert/ root@172.16.3.227:/etc/elasticsearch/cert/
3、所有主机修改 elastic-certificates.p12 权限以及属组
chown root.elasticsearch /etc/elasticsearch/cert/ -R && chmod 660 /etc/elasticsearch/cert/*
4、在所有节点的es config 目录下的elasticsearch.yml 文件新增如下配置(注意p12文件的目录路径):
xpack.security.enabled: true
xpack.security.transport.ssl.enabled: true
xpack.security.transport.ssl.verification_mode: certificate
xpack.security.transport.ssl.keystore.path: p12文件的绝对目录路径
xpack.security.transport.ssl.truststore.path: p12文件的绝对目录路径
5、重启所有es
systemctl daemon-reload
systemctl restart elasticsearch
6、配置/自动生成密码,es中默认有5个用户
- 随机生成 -
/usr/share/elasticsearch/bin/elasticsearch-setup-passwords auto
Changed password for user apm_system
PASSWORD apm_system = BKZDPuXJI2LCLkhueRLr
Changed password for user kibana_system
PASSWORD kibana_system = 8dOH6NAG6We7gtSMatgG
Changed password for user kibana
PASSWORD kibana = 8dOH6NAG6We7gtSMatgG
Changed password for user logstash_system
PASSWORD logstash_system = XrRbfLgxFYS8tvHPgaGh
Changed password for user beats_system
PASSWORD beats_system = DyOfdQ7XQWLcAtuZ99yV
Changed password for user remote_monitoring_user
PASSWORD remote_monitoring_user = i50tI88A8JS82i89n72A
Changed password for user elastic
PASSWORD elastic = wk9KI8qgCo5IDm2BLino
- 手动配置 -
/usr/share/elasticsearch/bin/elasticsearch-setup-passwords interactive
会提示每个密码都要输入两遍
kind: Deployment
apiVersion: apps/v1
metadata:
name: cerebro
labels:
k8s.kuboard.cn/name: cerebro
spec:
replicas: 1
selector:
matchLabels:
k8s.kuboard.cn/name: cerebro
template:
metadata:
labels:
k8s.kuboard.cn/name: cerebro
spec:
containers:
- name: cerebro
image: lmenezes/cerebro:latest
imagePullPolicy: IfNotPresent
restartPolicy: Always
revisionHistoryLimit: 10
---
kind: Service
apiVersion: v1
metadata:
name: cerebro-nginx
spec:
ports:
- name: yerc7y
protocol: TCP
port: 9000
targetPort: 9000
selector:
k8s.kuboard.cn/name: cerebro
type: NodePort
1、yum localinstall kibana-7.17.5-x86_64.rpm -y
2、cp /etc/kibana/kibana.yml{,.bak}
3、修改 Kibana 配置
egrep -v "^*#|^$" kibana.yml
server.port: 5601
server.host: "0.0.0.0"
elasticsearch.hosts: ["http://172.16.3.226:9200","http://172.16.3.227:9200","http://172.16.3.228:9200"]
elasticsearch.username: "kibana_system"
elasticsearch.password: "8dOH6NAG6We7gtSMatgG"
i18n.locale: "zh-CN"
4、启动 Kibana
systemctl daemon-reload
systemctl start kibana.service
1、配置 Nginx Yum源
yum install yum-utils -y
cat > /etc/yum.repos.d/nginx.repo << 'EOF'
[nginx-stable]
name=nginx stable repo
baseurl=http://nginx.org/packages/centos/$releasever/$basearch/
gpgcheck=1
enabled=1
gpgkey=https://nginx.org/keys/nginx_signing.key
module_hotfixes=true
[nginx-mainline]
name=nginx mainline repo
baseurl=http://nginx.org/packages/mainline/centos/$releasever/$basearch/
gpgcheck=1
enabled=0
gpgkey=https://nginx.org/keys/nginx_signing.key
module_hotfixes=true
EOF
2、安装 Nginx
yum-config-manager --enable nginx-mainline
yum install nginx -y
3、启动 Nginx
systemctl start nginx
yum -y localinstall filebeat-7.17.5-x86_64.rpm
cp /etc/filebeat/filebeat.yml{,.bak}
cat /etc/filebeat/filebeat.yml # 收集单个日志
filebeat.inputs:
- type: filestream
enabled: true # 是否启用当前输入类型,默认为true
id: my-filestream-id
paths:
- /var/log/nginx/*.log
output.elasticsearch:
hosts: ["http://172.16.3.226:9200","http://172.16.3.227:9200","http://172.16.3.228:9200"]
username: "elastic"
password: "chinaedu"
index: "nginx-access-%{+yyyy.MM.dd}"
setup.ilm.enabled: false # 关闭索引生命周期
setup.template.enabled: false # 允许自动生成index模板
setup.template.overwrite: true # 如果存在模块则覆盖
systemctl start filebeat.service
- 访问几次 Nginx服务产生一些日志;;;
curl 127.0.0.1
查看 elasticsearch 是否有nginx-access索引;
curl -u elastic:chinaedu -XGET http://172.16.3.226:9200/_cat/indices?v | grep "nginx-access"
cat /etc/filebeat/filebeat.yml # 收集多个日志
filebeat.inputs:
- type: filestream
enabled: true # 是否启用当前输入类型,默认为true
id: access-nginx-id
paths:
- /var/log/nginx/access.log
tags: ["access"] # 新建 tags 字段可以用于判断
- type: filestream
enabled: true # 是否启用当前输入类型,默认为true
id: error-nginx-id
paths:
- /var/log/nginx/error.log
tags: ["error"] # 新建 tags 字段可以用于判断
output.elasticsearch:
hosts: ["http://172.16.3.226:9200","http://172.16.3.227:9200","http://172.16.3.228:9200"]
username: "elastic"
password: "chinaedu"
indices:
- index: "nginx-access-%{+yyyy.MM.dd}"
when.contains:
tags: "access"
- index: "nginx-error-%{+yyyy.MM.dd}"
when.contains:
tags: "error"
setup.ilm.enabled: false # 关闭索引生命周期
setup.template.enabled: false # 允许自动生成index模板
setup.template.overwrite: true # 如果存在模块则覆盖
systemctl start filebeat.service
- 访问几次 Nginx服务产生一些日志;;;
curl 127.0.0.1
查看 elasticsearch 是否有nginx-access、nginx-error索引;
curl -u elastic:chinaedu -XGET http://172.16.3.226:9200/_cat/indices?v | egrep "nginx-access|nginx-error"
7.17.5版本可能遇到的问题:
(1)input源配置⼀旦超过4个,写⼊ES时,就可能会复现出部分数据⽆法写⼊的问题;
有两种解决⽅案:
⽅案⼀: 拆成多个filebeat实例。运⾏多个filebeat实例时需要指定数据路径"--path.data"。
filebeat -e -c ~/config/23-systemLog-to-es.yml --path.data /tmp/filebeat
⽅案⼆: ⽇志聚合思路解决问题。
1)部署服务
yum -y install rsyslog
2)修改配置⽂件
vim /etc/rsyslog.conf
...
$ModLoad imtcp
$InputTCPServerRun 514
...
*.* /var/log/oldboyedu.log
3)重启服务并测试
systemctl restart rsyslog
logger "1111"
1、修改 nginx 输出格式
vim /etc/nginx/nginx.conf
log_format oldboyedu_nginx_json '{"@timestamp":"$time_iso8601",'
'"host":"$server_addr",'
'"clientip":"$remote_addr",'
'"SendBytes":$body_bytes_sent,'
'"responsetime":$request_time,'
'"upstreamtime":"$upstream_response_time",'
'"upstreamhost":"$upstream_addr",'
'"http_host":"$host",'
'"uri":"$uri",'
'"domain":"$host",'
'"xff":"$http_x_forwarded_for",'
'"referer":"$http_referer",'
'"tcp_xff":"$proxy_protocol_addr",'
'"http_user_agent":"$http_user_agent",'
'"status":"$status"}';
access_log /var/log/nginx/access.log oldboyedu_nginx_json;
2、定义Filebeat配置文件识别json格式
cat /etc/filebeat/filebeat.yaml
filebeat.inputs:
- type: filestream
enabled: true
id: access-nginx-json-id
paths:
- /var/log/nginx/access.log
tags: ["access"]
# 以JSON格式解析message字段的内容
parsers:
- ndjson:
keys_under_root: true
output.elasticsearch:
hosts: ["http://172.16.3.226:9200","http://172.16.3.227:9200","http://172.16.3.228:9200"]
username: "elastic"
password: "chinaedu"
index: "nginx-access-json-%{+yyyy.MM.dd}"
3、启动 Filebeat
4、查一下 elasticsearch 索引是否存在
curl -u elastic:chinaedu -XGET http://172.16.3.226:9200/_cat/indices?v | grep "nginx-access-json"
1、还原 Nginx 日志默认配置;;;
2、cat /etc/filebeat/filebeat.yml
filebeat.config.modules:
# 指定模块配置文件路径,${path.config} 代表 /etc/filebeat
path: ${path.config}/modules.d/nginx.yml
# 是否开启热加载功能
reload.enabled: true
output.elasticsearch:
hosts: ["http://172.16.3.226:9200","http://172.16.3.227:9200","http://172.16.3.228:9200"]
username: "elastic"
password: "chinaedu"
index: "nginx-access-modlues-%{+yyyy.MM.dd}"
setup.ilm.enabled: false
setup.template.enabled: false
setup.template.overwrite: true
3、filebeat -c filebeat.yml modules list # 查看支持的模块
4、filebeat -c filebeat.yml modules enable nginx # 启用 Nginx 模块
# 5、filebeat -c filebeat.yml modules disable nginx # 禁用 Nginx 模块
6、修改 nginx 模块配置
egrep -v "^*#|^$" /etc/filebeat/modules.d/nginx.yml
- module: nginx
access:
enabled: true
var.paths: ["/var/log/nginx/access.log"]
error:
enabled: false
var.paths: ["/var/log/nginx/error.log"]
ingress_controller:
enabled: false
7、启动Filebeat
8、查一下 elasticsearch 索引是否存在
curl -u elastic:chinaedu -XGET http://172.16.3.226:9200/_cat/indices?v | grep "nginx-access-modlues"
1、cat /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: filestream
enabled: true # 是否启用当前输入类型,默认为true
id: access-nginx-id
paths:
- /var/log/nginx/access.log
tags: ["access"] # 新建 tags 字段可以用于判断
- type: filestream
enabled: true # 是否启用当前输入类型,默认为true
id: error-nginx-id
paths:
- /var/log/nginx/error.log
tags: ["error"] # 新建 tags 字段可以用于判断
include_lines: ['\[error\]'] # 收集包含[error]字段的信息
output.elasticsearch:
hosts: ["http://172.16.3.226:9200","http://172.16.3.227:9200","http://172.16.3.228:9200"]
username: "elastic"
password: "chinaedu"
indices:
- index: "nginx-access-%{+yyyy.MM.dd}"
when.contains:
tags: "access"
- index: "nginx-error-%{+yyyy.MM.dd}"
when.contains:
tags: "error"
setup.ilm.enabled: false # 关闭索引生命周期
setup.template.enabled: false # 允许自动生成index模板
setup.template.overwrite: true # 如果存在模块则覆盖
2、启动filebeat
1、这里安装 tomcat 步骤忽略
2、配置 Tomcat beat文件
egrep -v "^*#|^$" /etc/filebeat/filebeat.yml
filebeat.config.modules:
path: ${path.config}/modules.d/tomcat.yml
reload.enabled: true
output.elasticsearch:
hosts: ["http://172.16.3.226:9200","http://172.16.3.227:9200","http://172.16.3.228:9200"]
username: "elastic"
password: "chinaedu"
index: "tomcat-modlues-%{+yyyy.MM.dd}"
setup.ilm.enabled: false
setup.template.enabled: false
setup.template.overwrite: true
3、filebeat -c filebeat.yml modules list # 查看支持的模块
4、filebeat -c filebeat.yml modules enable tomcat # 启用 tomcat 模块
# 5、filebeat -c filebeat.yml modules disable tomcat # 禁用 tomcat 模块
6、修改 Tomcat 模块配置
egrep -v "^*#|^$" /etc/filebeat/modules.d/tomcat.yml
- module: tomcat
log:
enabled: true
var.input: file
var.paths:
- /data/logs/tomcat/catalina.out
7、启动 Filebea
8、查一下 elasticsearch 索引是否存在
curl -u elastic:chinaedu -XGET http://172.16.3.226:9200/_cat/indices?v | grep "tomcat-modlues"
1、egrep -v "^*#|^$" /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: filestream
enabled: true
id: catalina-tomcat-id
paths:
- /data/logs/tomcat/catalina.out
tags: ["catalina"]
output.elasticsearch:
hosts: ["http://172.16.3.226:9200","http://172.16.3.227:9200","http://172.16.3.228:9200"]
username: "elastic"
password: "chinaedu"
index: "catalina.out-tomcat-%{+yyyy.MM.dd}"
setup.ilm.enabled: false
setup.template.enabled: false
setup.template.overwrite: true
2、启动 Filebeat
systemctl enable filebeat
3、查一下 elasticsearch 索引是否存在
curl -u elastic:chinaedu -XGET http://172.16.3.226:9200/_cat/indices?v | grep "catalina.out-tomcat"
1、修改 server.xml 模拟 tomcat 报错
164 "%r" %s %b" />
167
168 > # 在/Host后面新增一些内容
2、多启动几次 tomcat 生成一些报错日志,然后将 server.xml 配置还原,再起启动
3、egrep -v "^*#|^$" /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: filestream
enabled: true
id: catalina-tomcat-id
paths:
- /data/logs/tomcat/catalina.out
tags: ["catalina"]
parsers:
- multiline:
# 指定多行匹配的类型,可选值为"pattern","count"
type: pattern
# 指定匹配模式,匹配以2个数字开头的
pattern: '^\d{2}'
# 下面两个参数,参考官方架构图即可;
# https://www.elastic.co/guide/en/beats/filebeat/7.17/multiline-examples.html
negate: true
match: after
output.elasticsearch:
hosts: ["http://172.16.3.226:9200","http://172.16.3.227:9200","http://172.16.3.228:9200"]
username: "elastic"
password: "chinaedu"
index: "catalina.out-error-%{+yyyy.MM.dd}"
setup.ilm.enabled: false
setup.template.enabled: false
setup.template.overwrite: true
1、cat /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: filestream
enabled: true
paths:
- /var/log/nginx/access.log
tags: ["firewalld"]
output.file:
# 文件保存的路径
path: "/tmp/filebeat"
# 本地保存的文件名字
filename: filebeat-nginx-access.log
# 指定文件的滚动大小,默认为20M
rotate_every_kb: 102400
# 指定保存文件个数,默认是7个,有效值默为2-1024个
number_of_files: 7
# 指定文件的权限
permissions: 0600
2、启动Filebeat
ll /tmp/filebeat/
总用量 8
-rw------- 1 root root 5209 8月 26 15:25 filebeat-nginx-access.log
1、安装logstash
yum localinstall logstash-7.17.5-x86_64.rpm -y
2、创建软连接,在全局下可以执行logstash命令
ln -sv /usr/share/logstash/bin/logstash /usr/local/bin
(1)编写配置⽂件
cat > conf.d/01-stdin-to-stdout.conf <<'EOF'
input {
stdin {}
}
output {
stdout {}
}
EOF
(2)检查配置⽂件语法
logstash -tf conf.d/01-stdin-to-stdout.conf
(3)启动logstash实例
logstash -f conf.d/01-stdin-to-stdout.conf
注释: Nginx 输出日志的格式:
log_format main '$remote_addr - $remote_user [$time_local] "$request" '
'"$status" "$body_bytes_sent" "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for" '
'"$request_length" "$request_time" '
'"$host" "$upstream_addr" "$upstream_status" '
'"$upstream_response_length" "$upstream_response_time"';
access_log /var/log/nginx/access.log main;
(1)filebeat配置:
filebeat.inputs:
- type: filestream
enabled: true
id: access-nginx-id
paths:
- /var/log/nginx/access.log
fields:
type_index: nginx-access
fields_under_root: true
output.logstash:
hosts: ["172.16.3.226:5044","172.16.3.227:5044","172.16.3.228:5044"]
(2)logstash配置:
input {
beats {
port => 5044
}
}
filter {
grok {
# 参考文档: https://www.elastic.co/guide/en/logstash/7.17/plugins-filters-grok.html
# 正则模式可参考: https://github.com/logstash-plugins/logstash-patterns-core/blob/main/patterns/legacy/grok-patterns
match => {
"message" => '%{IP:client} - (%{USERNAME:user}|-) \[%{HTTPDATE:timestamp}\] "%{WORD:request_verb} %{NOTSPACE:request} HTTP/%{NUMBER:httpversion}" "%{NUMBER:status}" "%{NUMBER:bytes}" "(?:%{URI:referrer}|-)" "?(%{DATA:user_agent}|-)" "?(%{DATA:http_x_forwarded_for}|-)" "%{NUMBER:request_length}" "?(%{BASE10NUM:request_time}|-)" "%{HOSTNAME:hostname}" "%{NOTSPACE:upstream_addr}" "(%{NUMBER:upstream_status}|-)" "(%{NUMBER:upstream_response_length}|-)" "?(%{BASE10NUM:upstream_response_time}|-)"'
}
}
mutate {
# 参考文档:https://www.elastic.co/guide/en/logstash/7.17/plugins-filters-mutate.html
# 将指定字段转换成相应对数据类型.
convert => [
"bytes", "integer", # 转换成int类型,这样就可以对字段进行算术运算,如果不转换则默认是字符串类型。
"request_time", "integer",
"upstream_connect_time", "integer",
# "[geoip][coordinates]", "float",
"upstream_response_time", "integer",
"request_length", "integer",
"upstream_response_length", "integer",
"response", "integer",
"upstream_status", "integer"
]
}
mutate {
gsub => [
"bytes", "-", "0",
"request_time", "-", "0",
"upstream_connect_time", "-", "0",
"upstream_response_time", "-", "0",
"request_length", "-", "0",
"upstream_response_length", "-", "0",
"upstream_status", "-", "0"
]
}
mutate {
# 删除不要的字段
remove_field => [ "message","@version","agent","ecs","tags","input" ]
}
}
output {
#stdout {}
elasticsearch {
hosts => ["172.16.3.226:9200","172.16.3.227:9200","172.16.3.228:9200"]
index => "%{[type_index]}-%{+YYYY.MM.dd}"
user => "elastic"
password => "chinaedu"
}
}
input {
beats {
port => 5044
}
}
filter {
grok {
match => {
"message" => '%{IP:client} - (%{USERNAME:user}|-) \[%{HTTPDATE:timestamp}\] "%{WORD:request_verb} %{NOTSPACE:request} HTTP/%{NUMBER:httpversion}" "%{NUMBER:status}" "%{NUMBER:bytes}" "(?:%{URI:referrer}|-)" "?(%{DATA:user_agent}|-)" "?(%{DATA:http_x_forwarded_for}|-)" "%{NUMBER:request_length}" "?(%{BASE10NUM:request_time}|-)" "%{HOSTNAME:hostname}" "%{NOTSPACE:upstream_addr}" "%{NUMBER:upstream_status}" "%{NUMBER:upstream_response_length}" "?(%{BASE10NUM:upstream_response_time}|-)"'
}
}
mutate {
# 参考文档:https://www.elastic.co/guide/en/logstash/7.17/plugins-filters-mutate.html
# 将指定字段转换成相应对数据类型.
convert => [
"bytes", "integer",
"request_time", "integer",
"upstream_connect_time", "integer",
# "[geoip][coordinates]", "float",
"upstream_response_time", "integer",
"request_length", "integer",
"upstream_response_length", "integer",
"response", "integer",
"upstream_status", "integer"
]
}
mutate {
gsub => [
"bytes", "-", "0",
"request_time", "-", "0",
"upstream_connect_time", "-", "0",
"upstream_response_time", "-", "0",
"request_length", "-", "0",
"upstream_response_length", "-", "0",
"upstream_status", "-", "0"
]
}
mutate {
# 删除不要的字段
remove_field => [ "message","@version","agent","ecs","tags","input" ]
}
#参考文档: https://www.elastic.co/guide/en/logstash/7.17/plugins-filters-date.html
date {
# 匹配时间字段并解析"timestamp"
match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
}
}
output {
#stdout {}
elasticsearch {
hosts => ["172.16.3.226:9200","172.16.3.227:9200","172.16.3.228:9200"]
index => "%{[type_index]}-%{+YYYY.MM.dd}"
user => "elastic"
password => "chinaedu"
}
}
(1)Filebeat配置:
filebeat.inputs:
- type: filestream
enabled: true
id: access-nginx-id
paths:
- /var/log/nginx/access.log
fields:
type_index: nginx-access # 定义索引名称
fields_under_root: true # 把fields设置为顶级字段,否则elasticsearch无法识别。
- type: filestream
enabled: true
id: catalina-tomcat-id
paths:
- /data/logs/tomcat/catalina.out
fields:
type_index: catalina-access # 定义索引名称
fields_under_root: true # 把fields设置为顶级字段,否则elasticsearch无法识别。
output.logstash:
hosts: ["172.16.3.226:5044","172.16.3.227:5044","172.16.3.228:5044"]
(2)logstash配置:
input {
beats {
port => 5044
}
}
output {
# stdout {}
elasticsearch {
hosts => ["172.16.3.226:9200","172.16.3.227:9200","172.16.3.228:9200"]
index => "%{[type_index]}-%{+YYYY.MM.dd}" # [type_index] 获取Filebeat设置的名称
user => "elastic"
password => "chinaedu"
}
}
Page View(简称:"PV")
⻚⾯访问或点击量。
kibana界⾯⿏标依次点击如下:
(1)菜单栏;
(2)dashboards
(3)创建新的仪表板
(4)创建可视化
(5)选择指标
(6)选择索引模式(例如"nginx-access-*")
(7)指标栏中选择:
选择函数:计数
显示名称: 空
(8)保存到库
标题:lms-saas 总访问量
客户端IP:
通常指的是访问Web服务器的客户端IP地址,但要注意,客户端IP数量并不难代表UV。
kibana界⾯⿏标依次点击如下:
(1)菜单栏;
(2)dashboards
(3)创建新的仪表板
(4)创建可视化
(5)指标
(6)选择索引模式(例如"nginx-access-*")
(7)指标栏中选择:
选择函数: 唯⼀计数
选择字段: clientip.keyword
显示名称: 空
(8)保存到库
标题:lms-saas IP
带宽:
统计nginx返回给客户端⽂件⼤⼩的字段进⾏累计求和。
kibana界⾯⿏标依次点击如下:
(1)菜单栏;
(2)dashboards
(3)创建新的仪表板
(4)创建可视化
(5)指标
(6)选择索引模式(例如"nginx-access-*")
(7)指标栏中选择:
选择函数: 求和
选择字段: bytes
显示名称: 空
值格式:字节(1024)
(8)保存到库
标题:lms-saas 总流量
访问资源统计:
对URI的访问次数统计。
kibana界⾯⿏标依次点击如下:
(1)菜单栏;
(2)dashboards
(3)创建新的仪表板
(4)创建可视化
(5)⽔平条形图
(6)选择索引模式(例如"nginx-access-*")
(7)"垂直轴"
选择函数:排名最前值
字段: request.keyword
值数目:5
排名依据:访问量
排名方向:降序
高级:取消"将其他值分为其他"
显示名称: 空
(8)"水平轴"
聚合: 计数
显示名称: 空
IP的TopN统计:
统计访问量的客户端IP最⼤的是谁。
kibana界⾯⿏标依次点击如下:
(1)菜单栏;
(2)dashboards
(3)创建新的仪表板
(4)创建可视化
(5)饼图
(6)切片依据:
选择函数:排名最前
选择字段:client.keyword
高级:取消"将其他值分为其他"
显示名称: 空
(7)大小调整依据:
选择函数:计数
(8)保存到库:
标题:lms-saas 客户端IP top5
IP的TopN统计:
统计访问量的客户端IP最⼤的是谁。
kibana界⾯⿏标依次点击如下:
(1)菜单栏;
(2)dashboards
(3)创建新的仪表板
(4)创建可视化
(5)圆环图
(6)切片依据:
选择函数:排名最前值
选择字段:upstream_addr.keyword
高级:取消"将其他值分为其他"
显示名称: 空
(7)大小调整依据:
选择函数:计数
(8)保存到库:
标题:lms-saas upstream Top5
(1)解压 zookeeper 软件包
tar -xf jdk-8u333-linux-x64.tar.gz -C /usr/local/
tar -xf apache-zookeeper-3.8.0-bin.tar.gz -C /usr/local/
(2)创建环境变量
cat >> /etc/profile << 'EOF'
export JAVA_HOME=/usr/local/jdk1.8.0_333
export PATH=$PATH:$JAVA_HOME/bin
export ZK_HOME=/usr/local/apache-zookeeper-3.8.0-bin/
export PATH=$PATH:$ZK_HOME/bin
EOF
source /etc/profile
(3)创建zookeeper配置文件
cp /usr/local/apache-zookeeper-3.8.0-bin/conf/{zoo_sample.cfg,zoo.cfg}
egrep -v "^#|^$" /usr/local/apache-zookeeper-3.8.0-bin/conf/zoo.cfg
dataDir=/tmp/zookeeper
dataLogDir=/var/log/zookeeper
clientPort=2181
tickTime=2000
initLimit=10
syncLimit=5
(4)启动zookeeper节点
zkServer.sh start
zkServer.sh status # 查看zk服务的状态信息
zkServer.sh stop
zkServer.sh restart
zookeeper配置文件解释:
dataDir ZK数据存放目录。.
dataLogDir ZK日志存放目录。
clientPort 客户端连接ZK服务的端口。
tickTime ZK服务器之间或客户端与服务器之间维持心跳的时间间隔。
initLimit 允许follower(相对于Leaderer言的“客户端”)连接并同步到Leader的初始化连接时间,以tickTime为单位。当初始化连接时间超过该值,则表示连接失败。
syncLimit Leader与Follower之间发送消息时,请求和应答时间⻓度。如果follower在设置时间内不能与leader通信,那么此follower将会被丢弃。
(1)解压 kafka 软件包
tar zxf kafka_2.12-3.2.1.tgz -C /usr/local/
(2)配置环境变量
cat >> /etc/profile << 'EOF'
export KAFKA_HOME=/usr/local/kafka_2.12-3.2.1
export PATH=$PATH:$KAFKA_HOME/bin
EOF
source /etc/profile
(3)修改kafka配置文件
cp /usr/local/kafka_2.12-3.2.1/config/server.properties{,.bak}
egrep -v "^#|^$" /usr/local/kafka_2.12-3.2.1/config/server.properties
broker.id=226
listeners=PLAINTEXT://172.16.3.226:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=172.16.3.226:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
(4)启动kafka
kafka-server-start.sh -daemon /usr/local/kafka_2.12-3.2.1/config/server.properties
kafka-server-stop.sh # 关闭Kafka服务
(5)验证kafka节点,是否正常工作
1.启动生产者
kafka-console-producer.sh --topic chinaedu-linux80 --bootstrap-server 172.16.3.226:9092
>AAAAAAAA
>BBBBBBB
>CCCCCCCC
2、启动消费者
kafka-console-consumer.sh --topic chinaedu-linux80 --bootstrap-server 172.16.3.226:9092 --from-beginning
AAAAAAAA
BBBBBBB
CCCCCCCC
温馨提示:
"--topic":要生成消息的主题id。
"--bootstrap-server":指定kafka节点的地址跟端口
"--from-beginning":代表从该topic的最开始位置读取数据,若不加该参数,则默认从topic的末尾读取。
kafka配置文件解释:
broker.id 每个server需要单独配置broker id,如果不配置系统会自动配置。需要和上一步ID一致
listeners 监听地址,格式PLAINTEXT://IP:端口。
num.network.threads 接收和发送网络信息的线程数。
num.io.threads 服务器用于处理请求的线程数,其中可能包括磁盘I/O。
socket.send.buffer.bytes 套接字服务器使用的发送缓冲区(SO_SNDBUF)
socket.receive.buffer.bytes 套接字服务器使用的接收缓冲区(SO_RCVBUF)
socket.request.max.bytes 套接字服务器将接受的请求的最大大小(防止OOM)
log.dirs 日志文件目录。
num.partitions partition数量。
num.recovery.threads.per.data.dir 在启动时恢复日志、关闭时刷盘日志每个数据目录的线程的数量,默认1。
offsets.topic.replication.factor 偏移量话题的复制因子(设置更高保证可用),为了保证有效的复制,偏移话题的复制因子是可配置的,在偏移话题的第一次请求的时候可用的broker的数量至少为复制因子的大小,否则要么话题创建失败,要么复制因子取可用broker的数量和配置复制因子的最小值。
log.retention.hours 日志文件删除之前保留的时间(单位小时),默认168
log.segment.bytes 单个日志文件的大小,默认1073741824
log.retention.check.interval.ms 检查日志段以查看是否可以根据保留策略删除它们的时间间隔。
zookeeper.connect ZK主机地址,如果zookeeper是集群则以逗号隔开。
zookeeper.connection.timeout.ms 连接到Zookeeper的超时时间。
(1)filebeat配置
filebeat.inputs:
- type: filestream
enabled: true
id: access-nginx-id
paths:
- /var/log/nginx/access.log
fields:
type_index: nginx-access
fields_under_root: true
output.kafka:
hosts: ["172.16.3.226:9092"]
topic: "log"
(2)logstash配置
input {
kafka {
codec => json
topics => ["log"]
group_id => "log"
consumer_threads => 8
bootstrap_servers => "172.16.3.226:9092"
}
}
output {
stdout {}
elasticsearch {
hosts => ["172.16.3.226:9200","172.16.3.227:9200","172.16.3.228:9200"]
index => "%{[type_index]}-%{+YYYY.MM.dd}"
user => "elastic"
password => "chinaedu"
}
}
(3)Kibana展示,参考下图:
(1)解压 zookeeper 软件包
tar -xf jdk-8u333-linux-x64.tar.gz -C /usr/local/
tar -xf apache-zookeeper-3.8.0-bin.tar.gz -C /usr/local/
(2)创建环境变量
cat >> /etc/profile << 'EOF'
export JAVA_HOME=/usr/local/jdk1.8.0_333
export PATH=$PATH:$JAVA_HOME/bin
export ZK_HOME=/usr/local/apache-zookeeper-3.8.0-bin/
export PATH=$PATH:$ZK_HOME/bin
EOF
source /etc/profile
(3)创建zookeeper配置文件
cp /usr/local/apache-zookeeper-3.8.0-bin/conf/{zoo_sample.cfg,zoo.cfg}
egrep -v "^#|^$" /usr/local/apache-zookeeper-3.8.0-bin/conf/zoo.cfg
dataDir=/tmp/zookeeper
dataLogDir=/var/log/zookeeper
clientPort=2181
tickTime=2000
initLimit=10
syncLimit=5
server.1=172.16.3.226:2888:3888
server.2=172.16.3.227:2888:3888
server.3=172.16.3.228:2888:3888
(3)创建data、log目录
mkdir -p /tmp/zookeeper /var/log/zookeeper
echo 1 > /tmp/zookeeper/myid # 每台 kafka 机器都要做成唯一的ID,3.226机器
echo 2 > /tmp/zookeeper/myid # 每台 kafka 机器都要做成唯一的ID,3.227机器
echo 3 > /tmp/zookeeper/myid # 每台 kafka 机器都要做成唯一的ID,3.228机器
(5)启动zookeeper节点
zkServer.sh start
zkServer.sh status # 查看zk服务的状态信息
zkServer.sh stop
zkServer.sh restart
(1)解压 kafka 软件包
tar zxf kafka_2.12-3.2.1.tgz -C /usr/local/
(2)配置环境变量
cat >> /etc/profile << 'EOF'
export KAFKA_HOME=/usr/local/kafka_2.12-3.2.1
export PATH=$PATH:$KAFKA_HOME/bin
EOF
source /etc/profile
(3)修改kafka配置文件
cp /usr/local/kafka_2.12-3.2.1/config/server.properties{,.bak}
egrep -v "^#|^$" /usr/local/kafka_2.12-3.2.1/config/server.properties
broker.id=226
listeners=PLAINTEXT://172.16.3.226:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=172.16.3.226:2181,172.16.3.227:2181,172.16.3.228:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
(4)227 配置
...
broker.id=227
listeners=PLAINTEXT://172.16.3.227:9092
(5)228 配置
...
broker.id=228
listeners=PLAINTEXT://172.16.3.228:9092
(6)启动kafka
kafka-server-start.sh -daemon /usr/local/kafka_2.12-3.2.1/config/server.properties
kafka-server-stop.sh # 关闭Kafka服务
(7)验证kafka是否是集群模式:
zkCli.sh ls /brokers/ids | grep "^\["
[226, 227, 228]
(1)filebeat配置
filebeat.inputs:
- type: filestream
enabled: true
id: catalina-tomcat-id
paths:
- /data/logs/tomcat/catalina.out
fields:
type_index: catalina-out
fields_under_root: true
output.kafka:
hosts: ["172.16.3.226:9092","172.16.3.227:9092","172.16.3.228:9092"]
topic: "log"
(2)logstash配置
input {
kafka {
codec => json
topics => ["log"]
group_id => "log"
consumer_threads => 8
bootstrap_servers => "172.16.3.226:9092,172.16.3.227:9092,172.16.3.228:9092"
}
}
output {
stdout {}
elasticsearch {
hosts => ["172.16.3.226:9200","172.16.3.227:9200","172.16.3.228:9200"]
index => "%{[type_index]}-%{+YYYY.MM.dd}"
user => "elastic"
password => "chinaedu"
}
}
(3)Kibana展示,参考下图:
(1)filebeat配置文件:
filebeat.inputs:
- type: filestream
enabled: true
id: nginx-access-id
paths:
- /var/log/nginx/access.log
fields:
type_index: nginx-access
fields_under_root: true
- type: filestream
enabled: true
id: catalina-tomcat-id
paths:
- /data/logs/tomcat/catalina.out
fields:
type_index: catalina-out
fields_under_root: true
- type: filestream
enabled: true
id: mysql-slowlog-id
paths:
- /data/mysql/logs/slowquery.log
fields:
type_index: mysql-slowlog
fields_under_root: true
parsers:
- multiline:
type: pattern
pattern: '^# Time: '
negate: true
match: after
output.kafka:
hosts: ["172.16.3.226:9092","172.16.3.227:9092","172.16.3.228:9092"]
topic: "log"
(2)logstash配置文件:
input {
kafka {
codec => json
topics => ["log"]
group_id => "log"
consumer_threads => 8
bootstrap_servers => "172.16.3.226:9092,172.16.3.227:9092,172.16.3.228:9092"
}
}
filter {
if "nginx" in [type_index] {
grok {
match => ['message', '%{IPV4:remote_addr} - ?(%{DATA:remote_user}|-) \[%{HTTPDATE:timestamp}\] "%{WORD:http_method} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}" "%{NUMBER:response}" "(?:%{NUMBER:bytes}|-)" "(?:%{URI:referrer}|-)" "?(%{DATA:user_agent}|-)" "?(%{DATA:http_x_forwarded_for}|-)" "%{NUMBER:request_length}" "?(%{BASE10NUM:request_time}|-)" "%{HOSTNAME:hostname}" "%{NOTSPACE:upstream_addr}" "%{NUMBER:upstream_status}" "%{NUMBER:upstream_response_length}" "?(%{BASE10NUM:upstream_response_time}|-)"']
}
mutate {
gsub => [
"bytes", "-", "0",
"request_time", "-", "0",
"upstream_connect_time", "-", "0",
"upstream_response_time", "-", "0",
"request_length", "-", "0",
"upstream_response_length", "-", "0",
"upstream_status", "-", "0"
]
}
mutate {
convert => [
"bytes", "integer",
"request_time", "integer",
"upstream_connect_time", "integer",
# "[geoip][coordinates]", "float",
"upstream_response_time", "integer",
"request_length", "integer",
"upstream_response_length", "integer",
"response", "integer",
"upstream_status", "integer"
]
}
mutate {
remove_field => [ "msg" , "message" ]
}
}
}
output {
# stdout {}
if "nginx" in [type_index] {
elasticsearch {
hosts => ["172.16.3.226:9200","172.16.3.227:9200","172.16.3.228:9200"]
index => "logstash-%{[type_index]}-%{+YYYY.MM.dd}"
user => "elastic"
password => "chinaedu"
}
}
else {
elasticsearch {
hosts => ["172.16.3.226:9200","172.16.3.227:9200","172.16.3.228:9200"]
index => "%{[type_index]}-%{+YYYY.MM.dd}"
user => "elastic"
password => "chinaedu"
}
}
}
(3)通过Kibana查看es是否收集到日志