一个功能完善的日志系统是企业在日常产品、项目开发中是必须的,接触过bug fix的同学应该深有体会一份记录详细完善的日志能提升的效率
之前参与项目架构技术会的时候,谈论到了日志技术的选型及实现,鉴于对该方面还不太熟悉,决定需要花时间学习实践一下,拿着常见成熟的轮子跑一跑…学习分析下原理…
日志一般分为:
项目模块结构:
simple-log-solution-scl
/-- canal:canal项目源码
/-- simple-canal-example : 包含SimpleCanalClientExample客户端类测试canal
/-- simple-canal-kafka-examle:canal整合kafka,springboot整合kafka消费数据
/-- simple-logfile-example:原生方式log.info输出日志、保存日志到文件,美团开源仓库基于AOP利用注解实现日志输出
/-- simple-logstash-elk-examle:入门级别轻量级的日志解决方案实践
/-- simple-filebeat-elk-examle:引入filebeat、kafka之后海量数据级别的日志解决方案实现
/-- README.md
/-- ...
地址:https://github.com/sichaolong/simple-log-solution-scl
参考:美团技术团队https://tech.meituan.com/2021/09/16/operational-logbook.html
一、利用中间件Canal来记录操作日志
二、通过文件形式记录日志
三、ELK日志解决方案实践
Canal 是一款基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费的开源组件,通过采用监听数据库 Binlog 的方式,这样可以从底层知道是哪些数据做了修改,然后根据更改的数据记录操作日志。
这种方式的优点是和业务逻辑完全分离。缺点也很明显,局限性太高,只能针对数据库的更改做操作日志记录,如果修改涉及到其他团队的 RPC 的调用,就没办法监听数据库了,举个例子:给用户发送通知,通知服务一般都是公司内部的公共组件,这时候只能在调用 RPC 的时候手工记录发送通知的操作日志了。
参考阿里巴巴开源的cannel组件https://github.com/alibaba/canal,cannel订阅binlog日志,其他监听者消费cannel即可,具体可以实现以下功能:
其中cannel的工作原理就是基于MySQL的主从复制原理,其中MySQL主从复制大的原理大概如下
canal 工作原理是通过master、slave之间的通信协议将自己伪装成一个从数据库,然后订阅master的binlog日志
参考quickstart文档:https://github.com/alibaba/canal/wiki/QuickStart,首先是配置my.cnf / my.ini 开启binlog功能,因为我直接在windows下进行实践
# 不要修改windows下的my-default.ini文件 不是 !!!位置:C:\Program Files\MySQL\MySQL Server 5.7)
# 需要注意的是图中的my.ini位于:C:\ProgramData\MySQL\MySQL Server 5.7,而不是位于:C:\Program Files\MySQL\MySQL Server 5.7 !!!
# 开启MySQL的binlog功能
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
# 之后别忘了重启!!!,然后可以root登录mysql,使用show master status 查看是否成功开启主从模式
然后就是允许cannel作为slave订阅binlog日志
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
# windows下win + r 输入下列命令重启MySQL
net start mysql57
net start mysql57
下载deployer版本https://github.com/alibaba/canal/releases/tag/canal-1.1.5,接下来配置参考文档配置启动canal即可server日志位置logs/canal/canal.log。instance日志位置logs/example/example.log
注意:放置目录不要有中文!!!!
参考文档https://github.com/alibaba/canal/wiki/ClientExample
启动canal-server、启动测试类SimpleCanalClientExample,修改数据库表数据,观察控制台
...
empty count : 36
empty count : 37
empty count : 38
empty count : 39
empty count : 40
empty count : 41
================> binlog[mysql-bin.000001:345] , name[mpj,product] , eventType : UPDATE
-------> before
id : 1 update=false
product_name : iPhone 13 update=false
unti_price : 4869 update=false
-------> after
id : 1 update=false
product_name : iPhone 13 update=false
unti_price : 48690 update=true
empty count : 1
empty count : 2
empty count : 3
empty count : 4
empty count : 5
empty count : 6
empty count : 7
empty count : 8
empty count : 9
...
canal-admin设计上是为canal提供整体配置管理、节点运维等面向运维的功能,提供相对友好的WebUI操作界面,方便更多用户快速和安全的操作,简单理解就是可以借助admin-ui的web页面远程运维canal-server服务。
canal-admin的核心模型主要有:
简单解释
需要配置下canal-server整合canal-admin,修改canal.properties文件部分内容如下
# register ip
canal.register.ip =
# canal admin config
canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster =
参考文档https://github.com/alibaba/canal/wiki/Canal-Admin-QuickStart,最后执行startup.bat启动即可。可以通过 http://127.0.0.1:8089/ 访问,默认密码:admin/123456测试
可以在线更改canal-server的canal.properties配置
更多配置参考文档https://github.com/alibaba/canal/wiki/AdminGuide
canal 1.1.1版本之后, 默认支持将canal server接收到的binlog数据直接投递到MQ, 目前默认支持的MQ系统有:
(1)安装Zookeeper
参考:https://github.com/alibaba/canal/wiki/Zookeeper-QuickStart。按照文档配置启动,打开命令行输入 telnet 127.0.0.1 2181
测试kafka服务。
然后可以安装zk ui 可视化界面
# 1、拉取代码:
git clone https://github.com/DeemOpen/zkui.git
# 2、源码编译需要安装 maven
wget http://repos.fedorapeople.org/repos/dchen/apache-maven/epel-apache-maven.repo -O /etc/yum.repos.d/epel-apache-maven.repo
cd zkui/
yum install -y maven
mvn clean install
# 3、修改配置文件默认值
vim config.cfg
serverPort=9090 #指定端口
zkServer=xxx:2181
sessionTimeout=300000
# 4、启动程序至后台 2.0-SNAPSHOT 会随软件的更新版本不同而不同,执行时请查看target 目录中真正生成的版本
nohup java -jar target/zkui-2.0-SNAPSHOT-jar-with-dependencies.jar &
# 5、用浏览器访问:
http://xxx:9090/
(2)安装Kafka
参考https://kafka.apache.org/downloads,https://github.com/alibaba/canal/wiki/Kafka-QuickStart
# 下载的kafka包里面包含了zookeeper,可以选用上一步安装的独立的zookeeper,也可以采用包含的,启动包含的zookeeper命令
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
# 然后启动kafka命令
.\bin\windows\kafka-server-start.bat .\config\server.properties
# 测试是否启动成功
# 创建一个主题
.\bin\windows\kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
# 查看主题
.\bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092
安装kafka可视化界面https://github.com/xxd763795151/kafka-console-ui
修改canal-server的配置文件,配置canal向MQ发送的topic
1.指明canal投到哪一个topic主题 进入 XXX\canal-service\conf\example 目录,修改 instance.properties
canal.mq.topic=example 修改为 canal.mq.topic=tst-topic(投到哪个MQ里面)
2.指明canal是kafka模式
进入 XXX\canal-service\conf
修改 canal.properties
将 canal.serverMode = tcp 修改为 canal.serverMode = kafka
支持模式 tcp, kafka, RocketMQ
3.指明canal投递到那个MQ地址
进入 XXX\canal-service\conf
修改 canal.properties
kafka.bootstrap.servers = 127.0.0.1:9092
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0
kafka.kerberos.enable = false
kafka.kerberos.krb5.file = "../conf/kerberos/krb5.conf"
kafka.kerberos.jaas.file = "../conf/kerberos/jaas.conf"
4.重启 启动
进入 XXX\canal-service\bin 目录下
运行 startup.bat
测试修改数据库表数据,canal能否将修改的日志发给MQ的example,经过测试kafka已经能收到消息
测试kafka消费example的消息,使用springboot整合kafka代码在simple-canal-kafka-example模块
需要注意的是配置kafka的配置consumer.properties文件,保证groupId和测试类中设置的一致,否则无法正确消费消息。
这种方式的操作记录需要解决三个问题。
借助 SLF4J 中的 MDC 工具类,把操作人放在日志中,然后在日志中统一打印出来。首先在用户的拦截器中把用户的标识 Put 到 MDC 中。什么是MDC参考https://blog.csdn.net/taiyangdao/article/details/82860105
@Component
public class UserInterceptor extends HandlerInterceptorAdapter {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
//获取到用户标识
String userNo = getUserNo(request);
//把用户 ID 放到 MDC 上下文中
MDC.put("userId", userNo);
return super.preHandle(request, response, handler);
}
private String getUserNo(HttpServletRequest request) {
// 通过 SSO 或者Cookie 或者 Auth信息获取到 当前登陆的用户信息
return null;
}
}
// 其次,把 userId 格式化到日志中,使用 %X{userId} 可以取到 MDC 中用户标识。
<pattern>"%d{yyyy-MM-dd HH:mm:ss.SSS} %t %-5level %X{userId} %logger{30}.%method:%L - %msg%n"</pattern>
通过配置 Log 的配置文件,把有关操作日志的 Log 单独放到一日志文件中。
//不同业务日志记录到不同的文件
<appender name="businessLogAppender" class="ch.qos.logback.core.rolling.RollingFileAppender">
<File>logs/business.logFile>
<append>trueappend>
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>INFOlevel>
<onMatch>ACCEPTonMatch>
<onMismatch>DENYonMismatch>
filter>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>logs/业务A.%d.%i.logfileNamePattern>
<maxHistory>90maxHistory>
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>10MBmaxFileSize>
timeBasedFileNamingAndTriggeringPolicy>
rollingPolicy>
<encoder>
<pattern>"%d{yyyy-MM-dd HH:mm:ss.SSS} %t %-5level %X{userId} %logger{30}.%method:%L - %msg%n"pattern>
<charset>UTF-8charset>
encoder>
appender>
<logger name="businessLog" additivity="false" level="INFO">
<appender-ref ref="businessLogAppender"/>
logger>
然后在 Java 代码中单独的记录业务日志。
//记录特定日志的声明
private final Logger businessLog = LoggerFactory.getLogger("businessLog");
//日志存储
businessLog.info("修改了配送地址");
可以采用 自己封装的LogUtil 的方式,也可以采用切面的方式生成日志模板,后续内容将会进行介绍。这样就可以把日志单独保存在一个文件中,然后通过日志收集可以把日志保存在 Elasticsearch 或者数据库中,接下来看下如何生成可读的操作日志。
private OnesIssueDO updateAddress(updateDeliveryRequest request) {
DeliveryOrder deliveryOrder = deliveryQueryService.queryOldAddress(request.getDeliveryOrderNo());
// 更新派送信息,电话,收件人,地址
doUpdate(request);
String logContent = getLogContent(request, deliveryOrder);
LogUtils.logRecord(request.getOrderNo(), logContent, request.getOperator);
return onesIssueDO;
}
private String getLogContent(updateDeliveryRequest request, DeliveryOrder deliveryOrder) {
String template = "用户%s修改了订单的配送地址:从“%s”修改到“%s”";
return String.format(tempalte, request.getUserName(), deliveryOrder.getAddress(), request.getAddress);
}
可以看到上面的例子使用了两个方法代码,外加一个 getLogContent 的函数实现了操作日志的记录。当业务变得复杂后,记录操作日志放在业务代码中会导致业务的逻辑比较繁杂,最后导致 LogUtils.logRecord() 方法的调用存在于很多业务的代码中,而且类似 getLogContent() 这样的方法也散落在各个业务类中,对于代码的可读性和可维护性来说是一个灾难。下面介绍下如何避免这个灾难。
为了解决上面问题,一般采用 AOP 的方式记录日志,让操作日志和业务逻辑解耦,接下来看一个简单的 AOP 日志的例子。
@LogRecord(content="修改了配送地址")
public void modifyAddress(updateDeliveryRequest request){
// 更新派送信息 电话,收件人、地址
doUpdate(request);
}
我们可以在注解的操作日志上记录固定文案,这样业务逻辑和业务代码可以做到解耦,让我们的业务代码变得纯净起来。可能有同学注意到,上面的方式虽然解耦了操作日志的代码,但是记录的文案并不符合我们的预期,文案是静态的,没有包含动态的文案,因为我们需要记录的操作日志是: 用户%s修改了订单的配送地址,从“%s”修改到“%s”。接下来,我们介绍一下如何优雅地使用 AOP 生成动态的操作日志。
美团开源仓库https://github.com/mouzt/mzt-biz-log/blob/master/doc/document-2.x.md,另外一个开源的仓库https://github.com/qqxx6661/logRecord,测试实践使用的代码在simple-logfile-example中
待更新…
ELK架构演进参考https://www.cnblogs.com/tanwentao/p/15749435.html
目前大多数项目都是采用微服务架构,在项目的初期,为了按计划上线就没有搭建日志收集分析平台,日志都是保存在服务器本地,看日志时一个个的找。随着项目的服务越来越多,各个服务都是集群部署,服务器数量也快速增长,此时就暴露了很多的问题:
正常情况下需要在linux服务器上安装服务或者使用docker拉去es、kibana、logstash镜像安装服务,这里我在windows上直接实践。
参考https://blog.csdn.net/weixin_45728976/article/details/106129058,尽量保证三个版本一致,我之前安装的es是7.13.3,下面两个保持一致
es官网:https://www.elastic.co/cn/downloads/elasticsearch
logstash官网:https://www.elastic.co/cn/downloads/logstash
kibana国内镜像:https://mirrors.huaweicloud.com/kibana/7.13.3/
如果官网下载太慢直接从https://elasticsearch.cn/download/下载,启动es访问localhost:5600测试,启动kinaba访问localhost5601访问测试。
对于logstash启动需要指定配置文件启动命令 logstash -f ../config/logstash.conf
,logstash.conf内容,注意如果es配置了x-pack登录认证,这里的配置文件一定要配置账号密码,否则报错,见后面的报错…
input {
tcp {
mode => "server"
host => "0.0.0.0"
port => 4560
codec => json_lines
}
}
output {
elasticsearch {
hosts => "localhost:9200"
// es索引存储名称
index => "springboot-%{+YYYY.MM.dd}"
user => "elastic"
password => "jKr3gNyLcDnvv1ostjP0"
}
}
然后安装一下插件logstash-plugin install logstash-codec-json_lines
报错:Elasticsearch built-in security features are not enabled. Without authentication, your cluster could be accessible to anyone. See https://www.elastic.co/guide/en/elasticsearch/reference/7.13/security-minimal-setup.html to enable security.意思就是本地的kibana不能直接操作es了,需要在es设置一个账户密码,在kibana配置文件中配置验证,然后才能连接执行命令。
解决:按照官网的提示设置elasticsearch.yml文件启动es安全功能,参考:https://www.elastic.co/guide/en/elasticsearch/reference/7.13/security-minimal-setup.html
// 执行 elasticsearch-setup-passwords auto 随机生成密码
Changed password for user apm_system
PASSWORD apm_system = xjs3ShoWt64oI6aXH31J
Changed password for user kibana_system
PASSWORD kibana_system = jRpUJIrydkAKVQeqyicg
Changed password for user kibana
PASSWORD kibana = jRpUJIrydkAKVQeqyicg
Changed password for user logstash_system
PASSWORD logstash_system = iIttpbbbgQPtbyyVVh9J
Changed password for user beats_system
PASSWORD beats_system = EXCCkvKRu3qACJ92flvf
Changed password for user remote_monitoring_user
PASSWORD remote_monitoring_user = FCoK0QL2cG5Z7DqjuHaB
Changed password for user elastic
PASSWORD elastic = jKr3gNyLcDnvv1ostjP0
保证三者都能启动成功。
1、首先新建sprinboot工程,导入依赖
<dependency>
<groupId>net.logstash.logbackgroupId>
<artifactId>logstash-logback-encoderartifactId>
<version>5.3version>
dependency>
2、添加配置文件logback-spring.xml让logback的日志输出到logstash
<configuration scan="true" scanPeriod="10 seconds" >
<contextName>logback-springcontextName>
<conversionRule conversionWord="clr" converterClass="org.springframework.boot.logging.logback.ColorConverter" />
<conversionRule conversionWord="wex" converterClass="org.springframework.boot.logging.logback.WhitespaceThrowableProxyConverter" />
<conversionRule conversionWord="wEx" converterClass="org.springframework.boot.logging.logback.ExtendedWhitespaceThrowableProxyConverter" />
<property name="CONSOLE_LOG_PATTERN" value="${CONSOLE_LOG_PATTERN:-%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}}"/>
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>INFOlevel>
filter>
<encoder>
<Pattern>${CONSOLE_LOG_PATTERN}Pattern>
<charset>UTF-8charset>
encoder>
appender>
<appender name="DEBUG_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${logging.path}/web_debug.logfile>
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%npattern>
<charset>UTF-8charset>
encoder>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${logging.path}/web-debug-%d{yyyy-MM-dd}.%i.logfileNamePattern>
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>100MBmaxFileSize>
timeBasedFileNamingAndTriggeringPolicy>
<maxHistory>15maxHistory>
rollingPolicy>
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>debuglevel>
<onMatch>ACCEPTonMatch>
<onMismatch>DENYonMismatch>
filter>
appender>
<appender name="INFO_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${logging.path}/web_info.logfile>
<encoder>
<pattern>"%d{yyyy-MM-dd HH:mm:ss.SSS} %t %-5level %X{name} %logger{30}.%method:%L - %msg%n"pattern>
<charset>UTF-8charset>
encoder>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${logging.path}/web-info-%d{yyyy-MM-dd}.%i.logfileNamePattern>
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>100MBmaxFileSize>
timeBasedFileNamingAndTriggeringPolicy>
<maxHistory>15maxHistory>
rollingPolicy>
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>infolevel>
<onMatch>ACCEPTonMatch>
<onMismatch>DENYonMismatch>
filter>
appender>
<appender name="WARN_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${logging.path}/web_warn.logfile>
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%npattern>
<charset>UTF-8charset>
encoder>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${logging.path}/web-warn-%d{yyyy-MM-dd}.%i.logfileNamePattern>
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>100MBmaxFileSize>
timeBasedFileNamingAndTriggeringPolicy>
<maxHistory>15maxHistory>
rollingPolicy>
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>warnlevel>
<onMatch>ACCEPTonMatch>
<onMismatch>DENYonMismatch>
filter>
appender>
<appender name="ERROR_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${logging.path}/web_error.logfile>
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%npattern>
<charset>UTF-8charset>
encoder>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${logging.path}/web-error-%d{yyyy-MM-dd}.%i.logfileNamePattern>
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>100MBmaxFileSize>
timeBasedFileNamingAndTriggeringPolicy>
<maxHistory>15maxHistory>
rollingPolicy>
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>ERRORlevel>
<onMatch>ACCEPTonMatch>
<onMismatch>DENYonMismatch>
filter>
appender>
<appender name="LOGSTASH" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
<destination>127.0.0.1:4560destination>
<encoder charset="UTF-8" class="net.logstash.logback.encoder.LogstashEncoder"/>
appender>
<root level="INFO_FILE">
<appender-ref ref="CONSOLE" />
<appender-ref ref="DEBUG_FILE" />
<appender-ref ref="INFO_FILE" />
<appender-ref ref="WARN_FILE" />
<appender-ref ref="ERROR_FILE" />
<appender-ref ref="LOGSTASH" />
root>
<springProfile name="pro">
<root level="info">
<appender-ref ref="CONSOLE" />
<appender-ref ref="DEBUG_FILE" />
<appender-ref ref="INFO_FILE" />
<appender-ref ref="ERROR_FILE" />
<appender-ref ref="WARN_FILE" />
<appender-ref ref="LOGSTASH" />
root>
springProfile>
configuration>
3、启动项目测试,查看kibana有无日志
kibana可视化查看es数据https://blog.csdn.net/liuming690452074/article/details/120106397
其中logstash报了一个错:BadResponseCodeError, :message=>"Got response code '401' contacting Elasticsearch at URL 'http://localhost:9200/'"}
,因为配置了es的x-pack,因此需要在logstash.conf配置认证。解决方法https://elasticsearch.cn/question/2345
上述基本ELK架构入门级版本的缺点主要是两个
因此升级版的ELK架构引入kafka、filebeat,它是一个轻量级的日志收集处理工具(Agent),Filebeat占用资源少,适合于在各个服务器上搜集日志后传输给Logstash,官方也推荐此工具。基本流程就是filebeat会读取app生成的xxx.log文件,Filebeat由两个主要组件组成:prospectors 和 harvesters。这两个组件协同工作将文件变动发送到指定的输出kafka中,然后logstash消费kafka即可。加入kafka的好处就是如果其他业务需要进行日志分析,可以直接消费kafka里面的日志,不同业务模块之间的耦合度较小。
整合参考:https://blog.51cto.com/zhongmayisheng/4292182
新建springboot工程,pom依赖如下
<dependencies>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-webartifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-loggingartifactId>
exclusion>
exclusions>
dependency>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-log4j2artifactId>
dependency>
<dependency>
<groupId>com.lmaxgroupId>
<artifactId>disruptorartifactId>
<version>3.3.4version>
dependency>
<dependency>
<groupId>org.projectlombokgroupId>
<artifactId>lombokartifactId>
dependency>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-testartifactId>
<scope>testscope>
<exclusions>
<exclusion>
<groupId>org.junit.vintagegroupId>
<artifactId>junit-vintage-engineartifactId>
exclusion>
exclusions>
dependency>
dependencies>
在resources目录下新建log4j.xml文件
<Configuration status="INFO" schema="Log4J-V2.0.xsd" monitorInterval="600" >
<Properties>
<Property name="LOG_HOME">./simple-filebeat-elk-example/logsProperty>
<property name="FILE_NAME">collectorproperty>
<property name="patternLayout">[%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ}] [%level{length=5}] [%thread-%tid] [%logger] [%X{hostName}] [%X{ip}] [%X{applicationName}] [%F,%L,%C,%M] [%m] ## '%ex'%nproperty>
Properties>
<Appenders>
<Console name="CONSOLE" target="SYSTEM_OUT">
<PatternLayout pattern="${patternLayout}"/>
Console>
<RollingRandomAccessFile name="appAppender" fileName="${LOG_HOME}/app-${FILE_NAME}.log" filePattern="${LOG_HOME}/app-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log" >
<PatternLayout pattern="${patternLayout}" />
<Policies>
<TimeBasedTriggeringPolicy interval="1"/>
<SizeBasedTriggeringPolicy size="500MB"/>
Policies>
<DefaultRolloverStrategy max="20"/>
RollingRandomAccessFile>
<RollingRandomAccessFile name="errorAppender" fileName="${LOG_HOME}/error-${FILE_NAME}.log" filePattern="${LOG_HOME}/error-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log" >
<PatternLayout pattern="${patternLayout}" />
<Filters>
<ThresholdFilter level="warn" onMatch="ACCEPT" onMismatch="DENY"/>
Filters>
<Policies>
<TimeBasedTriggeringPolicy interval="1"/>
<SizeBasedTriggeringPolicy size="500MB"/>
Policies>
<DefaultRolloverStrategy max="20"/>
RollingRandomAccessFile>
Appenders>
<Loggers>
<AsyncLogger name="henu.soft.*" level="info" includeLocation="true">
<AppenderRef ref="appAppender"/>
AsyncLogger>
<AsyncLogger name="henu.soft.*" level="info" includeLocation="true">
<AppenderRef ref="errorAppender"/>
AsyncLogger>
<Root level="info">
<Appender-Ref ref="CONSOLE"/>
<Appender-Ref ref="appAppender"/>
<AppenderRef ref="errorAppender"/>
Root>
Loggers>
Configuration>
启动项目,查看控制台打印的日志以及输出文件的日志格式,其中日志前面的电脑名称、本地ip地址、模块地址来源于工具类NetUtil,具体是使用log4j2的MDC存储,然后在xml文件指定输出格式取出相应属性值。
[2022-09-13T17:21:37.471+08:00] [INFO] [http-nio-6677-exec-1-30] [henu.soft.filebeat.test.controller.TestController] [DESKTOP-VJBPC13] [10.0.248.35] [simple-filebeat-elk-example] [TestController.java,21,henu.soft.filebeat.test.controller.TestController,test] [我是一条info日志] ## ''
[2022-09-13T17:21:37.474+08:00] [WARN] [http-nio-6677-exec-1-30] [henu.soft.filebeat.test.controller.TestController] [DESKTOP-VJBPC13] [10.0.248.35] [simple-filebeat-elk-example] [TestController.java,23,henu.soft.filebeat.test.controller.TestController,test] [我是一条warn日志] ## ''
[2022-09-13T17:21:37.475+08:00] [ERROR] [http-nio-6677-exec-1-30] [henu.soft.filebeat.test.controller.TestController] [DESKTOP-VJBPC13] [10.0.248.35] [simple-filebeat-elk-example] [TestController.java,25,henu.soft.filebeat.test.controller.TestController,test] [我是一条error日志] ## ''
启动kafka,创建两个topic,用于待会接受filebeat的输出
## 创建topic
kafka-topics.sh --zookeeper 192.168.11.111:2181 --create --topic app-log-collector --partitions 1 --replication-factor 1
kafka-topics.sh --zookeeper 192.168.11.111:2181 --create --topic error-log-collector --partitions 1 --replication-factor 1
# 查看topic
kafka-topics.sh --zookeeper 192.168.11.111:2181 --topic app-log-test --describe
修改filebeat.yaml文件
###################### Filebeat Configuration Example #########################
# This file is an example configuration file highlighting only the most common
# options. The filebeat.reference.yml file from the same directory contains all the
# supported options with more comments. You can use it as a reference.
#
# You can find the full configuration reference here:
# https://www.elastic.co/guide/en/beats/filebeat/index.html
# For more available modules and options, please see the filebeat.reference.yml sample
# configuration file.
# ============================== Filebeat inputs ===============================
filebeat.inputs:
# Each - is an input. Most options can be set at the input level, so
# you can use different inputs for various configurations.
# Below are the input specific configurations.
- type: log
# Change to true to enable this input configuration.
enabled: false
# Paths that should be crawled and fetched. Glob based paths.
paths:
- /var/log/*.log
#- c:\programdata\elasticsearch\logs\*
# Exclude lines. A list of regular expressions to match. It drops the lines that are
# matching any regular expression from the list.
#exclude_lines: ['^DBG']
# Include lines. A list of regular expressions to match. It exports the lines that are
# matching any regular expression from the list.
#include_lines: ['^ERR', '^WARN']
# Exclude files. A list of regular expressions to match. Filebeat drops the files that
# are matching any regular expression from the list. By default, no files are dropped.
#exclude_files: ['.gz$']
# Optional additional fields. These fields can be freely picked
# to add additional information to the crawled log files for filtering
#fields:
# level: debug
# review: 1
### Multiline options
# Multiline can be used for log messages spanning multiple lines. This is common
# for Java Stack Traces or C-Line Continuation
# The regexp Pattern that has to be matched. The example pattern matches all lines starting with [
#multiline.pattern: ^\[
# Defines if the pattern set under pattern should be negated or not. Default is false.
#multiline.negate: false
# Match can be set to "after" or "before". It is used to define if lines should be append to a pattern
# that was (not) matched before or after or as long as a pattern is not matched based on negate.
# Note: After is the equivalent to previous and before is the equivalent to to next in Logstash
#multiline.match: after
# filestream is an experimental input. It is going to replace log input in the future.
- type: filestream
# Change to true to enable this input configuration.
enabled: false
# Paths that should be crawled and fetched. Glob based paths.
paths:
- /var/log/*.log
#- c:\programdata\elasticsearch\logs\*
# Exclude lines. A list of regular expressions to match. It drops the lines that are
# matching any regular expression from the list.
#exclude_lines: ['^DBG']
# Include lines. A list of regular expressions to match. It exports the lines that are
# matching any regular expression from the list.
#include_lines: ['^ERR', '^WARN']
# Exclude files. A list of regular expressions to match. Filebeat drops the files that
# are matching any regular expression from the list. By default, no files are dropped.
#prospector.scanner.exclude_files: ['.gz$']
# Optional additional fields. These fields can be freely picked
# to add additional information to the crawled log files for filtering
#fields:
# level: debug
# review: 1
# ============================== Filebeat modules ==============================
# 这里配置发送到kafka
- type: log
enabled: true
paths:
## app-服务名称.log, 为什么写死,防止发生轮转抓取历史数据
- E:\项目\simple-log-solution-scl\simple-filebeat-elk-example\logs\app-collector.log
#定义写入 ES 时的 _type 值
document_type: "app-log"
multiline:
#pattern: '^\s*(\d{4}|\d{2})\-(\d{2}|[a-zA-Z]{3})\-(\d{2}|\d{4})' # 指定匹配的表达式(匹配以 2017-11-15 08:04:23:889 时间格式开头的字符串)
pattern: '^\[' # 指定匹配的表达式(匹配以 "{ 开头的字符串)
negate: true # 是否匹配到
match: after # 合并到上一行的末尾
max_lines: 2000 # 最大的行数
timeout: 2s # 如果在规定时间没有新的日志事件就不等待后面的日志
fields:
logbiz: collector
logtopic: app-log-collector ## 按服务划分用作kafka topic
evn: dev
- type: log
enabled: true
paths:
- E:\项目\simple-log-solution-scl\simple-filebeat-elk-example\logs\error-collector.log
document_type: "error-log"
multiline:
#pattern: '^\s*(\d{4}|\d{2})\-(\d{2}|[a-zA-Z]{3})\-(\d{2}|\d{4})' # 指定匹配的表达式(匹配以 2017-11-15 08:04:23:889 时间格式开头的字符串)
pattern: '^\[' # 指定匹配的表达式(匹配以 "{ 开头的字符串)
negate: true # 是否匹配到
match: after # 合并到上一行的末尾
max_lines: 2000 # 最大的行数
timeout: 2s # 如果在规定时间没有新的日志事件就不等待后面的日志
fields:
logbiz: collector
logtopic: error-log-collector ## 按服务划分用作kafka topic
evn: dev
output.kafka:
enabled: true
hosts: ["localhost:9092"]
topic: '%{[fields.logtopic]}'
partition.hash:
reachable_only: true
compression: gzip
max_message_bytes: 1000000
required_acks: 1
logging.to_files: true
filebeat.config.modules:
# Glob pattern for configuration loading
path: ${path.config}/modules.d/*.yml
# Set to true to enable config reloading
reload.enabled: false
# Period on which files under path should be checked for changes
#reload.period: 10s
# ======================= Elasticsearch template setting =======================
setup.template.settings:
index.number_of_shards: 1
#index.codec: best_compression
#_source.enabled: false
# ================================== General ===================================
# The name of the shipper that publishes the network data. It can be used to group
# all the transactions sent by a single shipper in the web interface.
#name:
# The tags of the shipper are included in their own field with each
# transaction published.
#tags: ["service-X", "web-tier"]
# Optional fields that you can specify to add additional information to the
# output.
#fields:
# env: staging
# ================================= Dashboards =================================
# These settings control loading the sample dashboards to the Kibana index. Loading
# the dashboards is disabled by default and can be enabled either by setting the
# options here or by using the `setup` command.
#setup.dashboards.enabled: false
# The URL from where to download the dashboards archive. By default this URL
# has a value which is computed based on the Beat name and version. For released
# versions, this URL points to the dashboard archive on the artifacts.elastic.co
# website.
#setup.dashboards.url:
# =================================== Kibana ===================================
# Starting with Beats version 6.0.0, the dashboards are loaded via the Kibana API.
# This requires a Kibana endpoint configuration.
setup.kibana:
# Kibana Host
# Scheme and port can be left out and will be set to the default (http and 5601)
# In case you specify and additional path, the scheme is required: http://localhost:5601/path
# IPv6 addresses should always be defined as: https://[2001:db8::1]:5601
#host: "localhost:5601"
# Kibana Space ID
# ID of the Kibana Space into which the dashboards should be loaded. By default,
# the Default Space will be used.
#space.id:
# =============================== Elastic Cloud ================================
# These settings simplify using Filebeat with the Elastic Cloud (https://cloud.elastic.co/).
# The cloud.id setting overwrites the `output.elasticsearch.hosts` and
# `setup.kibana.host` options.
# You can find the `cloud.id` in the Elastic Cloud web UI.
#cloud.id:
# The cloud.auth setting overwrites the `output.elasticsearch.username` and
# `output.elasticsearch.password` settings. The format is `:`.
#cloud.auth:
# ================================== Outputs ===================================
# Configure what output to use when sending the data collected by the beat.
# ---------------------------- Elasticsearch Output ----------------------------
#output.elasticsearch:
# Array of hosts to connect to.
#hosts: ["localhost:9200"]
# Protocol - either `http` (default) or `https`.
#protocol: "https"
# Authentication credentials - either API key or username/password.
#api_key: "id:api_key"
#username: "elastic"
#password: "changeme"
# ------------------------------ Logstash Output -------------------------------
#output.logstash:
# The Logstash hosts
#hosts: ["localhost:5044"]
# Optional SSL. By default is off.
# List of root certificates for HTTPS server verifications
#ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]
# Certificate for SSL client authentication
#ssl.certificate: "/etc/pki/client/cert.pem"
# Client Certificate Key
#ssl.key: "/etc/pki/client/cert.key"
# ================================= Processors =================================
processors:
- add_host_metadata:
when.not.contains.tags: forwarded
- add_cloud_metadata: ~
- add_docker_metadata: ~
- add_kubernetes_metadata: ~
# ================================== Logging ===================================
# Sets log level. The default log level is info.
# Available log levels are: error, warning, info, debug
#logging.level: debug
# At debug level, you can selectively enable logging only for some components.
# To enable all selectors use ["*"]. Examples of other selectors are "beat",
# "publisher", "service".
#logging.selectors: ["*"]
# ============================= X-Pack Monitoring ==============================
# Filebeat can export internal metrics to a central Elasticsearch monitoring
# cluster. This requires xpack monitoring to be enabled in Elasticsearch. The
# reporting is disabled by default.
# Set to true to enable the monitoring reporter.
#monitoring.enabled: false
# Sets the UUID of the Elasticsearch cluster under which monitoring data for this
# Filebeat instance will appear in the Stack Monitoring UI. If output.elasticsearch
# is enabled, the UUID is derived from the Elasticsearch cluster referenced by output.elasticsearch.
#monitoring.cluster_uuid:
# Uncomment to send the metrics to Elasticsearch. Most settings from the
# Elasticsearch output are accepted here as well.
# Note that the settings should point to your Elasticsearch *monitoring* cluster.
# Any setting that is not set is automatically inherited from the Elasticsearch
# output configuration, so if you have the Elasticsearch output configured such
# that it is pointing to your Elasticsearch monitoring cluster, you can simply
# uncomment the following line.
#monitoring.elasticsearch:
# ============================== Instrumentation ===============================
# Instrumentation support for the filebeat.
#instrumentation:
# Set to true to enable instrumentation of filebeat.
#enabled: false
# Environment in which filebeat is running on (eg: staging, production, etc.)
#environment: ""
# APM Server hosts to report instrumentation results to.
#hosts:
# - http://localhost:8200
# API Key for the APM Server(s).
# If api_key is set then secret_token will be ignored.
#api_key:
# Secret token for the APM Server(s).
#secret_token:
# ================================= Migration ==================================
# This allows to enable 6.7 migration aliases
#migration.6_to_7.enabled: true
然后跑起来应用,测试filebeat是否能将info和error日志是发送到kafka
测试结果正确,消息格式
{
"@timestamp":"2022-09-13T10:06:30.941Z",
"@metadata":{
"beat":"filebeat",
"type":"_doc",
"version":"7.13.3"
},
"host":{
"os":{
"platform":"windows",
"version":"10.0",
"family":"windows",
"name":"Windows 10 Home China",
"kernel":"10.0.22000.856 (WinBuild.160101.0800)",
"build":"22000.856",
"type":"windows"
},
"id":"3df61e5a-95b0-4ec7-9bce-09f84e227cea",
"ip":[
"fe80::89c5:bb7b:b3f2:6c0d",
"169.254.108.13",
"fe80::8bd:3d8:3849:82b9",
"169.254.130.185",
"fe80::7131:1b4b:74d3:a5b9",
"169.254.165.185",
"fe80::ec12:81d5:90ba:7d33",
"169.254.125.51",
"fe80::28b6:1903:76a0:4456",
"192.168.26.1",
"fe80::997:1c7f:24d3:25b0",
"192.168.24.1",
"fe80::39c6:c542:cea1:ea07",
"10.0.248.35",
"fe80::740d:2bcd:3383:e18c",
"172.19.208.1"
],
"mac":[
"00:e0:4c:85:3e:a3",
"00:ff:7d:50:95:ea",
"3c:f0:11:ea:74:88",
"3e:f0:11:ea:74:87",
"00:50:56:c0:00:01",
"00:50:56:c0:00:08",
"3c:f0:11:ea:74:87",
"00:15:5d:e2:40:e5"
],
"hostname":"DESKTOP-VJBPC13",
"architecture":"x86_64",
"name":"DESKTOP-VJBPC13"
},
"log":{
"offset":5596,
"file":{
"path":"E:\\项目\\simple-log-solution-scl\\simple-filebeat-elk-example\\logs\\app-collector.log"
}
},
"message":"[2022-09-13T18:06:25.424+08:00] [ERROR] [http-nio-6677-exec-4-33] [henu.soft.filebeat.test.controller.TestController] [DESKTOP-VJBPC13] [10.0.248.35] [simple-filebeat-elk-example] [TestController.java,25,henu.soft.filebeat.test.controller.TestController,test] [我是一条error日志] ## ''",
"input":{
"type":"log"
},
"fields":{
"evn":"dev",
"logbiz":"collector",
"logtopic":"app-log-collector"
},
"agent":{
"hostname":"DESKTOP-VJBPC13",
"ephemeral_id":"8dad3af5-228c-45f0-bb4c-95a4293ac26b",
"id":"658d9af1-076e-424e-a48d-b05bdaf601a3",
"name":"DESKTOP-VJBPC13",
"type":"filebeat",
"version":"7.13.3"
},
"ecs":{
"version":"1.8.0"
}
}
logstash干的工作就是将kafka的消息作为input,然后转换格式,输出到es中,因此需要配置kafka、es相关连接信息。新建logstash-script.conf配置文件配置一下内容
input {
kafka {
## app-log-服务名称
topics_pattern => "app-log-.*"
bootstrap_servers => "localhost:9092"
codec => json
consumer_threads => 1 ## 增加consumer的并行消费线程数
decorate_events => true
#auto_offset_rest => "latest"
group_id => "app-log-group"
}
kafka {
## error-log-服务名称
topics_pattern => "error-log-.*"
bootstrap_servers => "localhost:9092"
codec => json
consumer_threads => 1
decorate_events => true
#auto_offset_rest => "latest"
group_id => "error-log-group"
}
}
filter {
## 时区转换
ruby {
code => "event.set('index_time',event.timestamp.time.localtime.strftime('%Y.%m.%d'))"
}
if "app-log" in [fields][logtopic]{
grok {
## 表达式,这里对应的是Springboot输出的日志格式
match => ["message", "\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"]
}
}
if "error-log" in [fields][logtopic]{
grok {
## 表达式
match => ["message", "\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"]
}
}
}
## 测试输出到控制台:
output {
stdout { codec => rubydebug }
}
## elasticsearch:
output {
if "app-log" in [fields][logtopic]{
## es插件
elasticsearch {
# es服务地址
hosts => ["192.168.11.35:9200"]
# 用户名密码
user => "elastic"
password => "123456"
## 索引名,+ 号开头的,就会自动认为后面是时间格式:
## javalog-app-service-2019.01.23
index => "app-log-%{[fields][logbiz]}-%{index_time}"
# 是否嗅探集群ip:一般设置true;http://192.168.11.35:9200/_nodes/http?pretty
# 通过嗅探机制进行es集群负载均衡发日志消息
sniffing => true
# logstash默认自带一个mapping模板,进行模板覆盖
template_overwrite => true
}
}
if "error-log" in [fields][logtopic]{
elasticsearch {
hosts => ["192.168.11.35:9200"]
user => "elastic"
password => "123456"
index => "error-log-%{[fields][logbiz]}-%{index_time}"
sniffing => true
template_overwrite => true
}
}
}
启动logstash -f ..\script\logstash-script.conf
,然后在启动es、kibana,访问app查看日志是否打印,日志是否已经在kibana可以看到。