我们开发完成后发布到线上的项目出现问题时(中小型公司),我们可能需要获取服务器中的日志文件进行定位分析问题。但在规模较大或者更加复杂的分布式场景下就显得力不从心。因此急需通过集中化的日志管理,将所有服务器上的日志进行收集汇总。所以ELK应运而生,它通过一系列开源框架提供了一整套解决方案,将所有节点上的日志统一收集、存储、分析、可视化等。
ELK是Elasticsearch(存储、检索数据)、Logstash(收集、转换、筛选数据)、Kibana(可视化数据) 三大开源框架的首字母大写简称,目前通常也被称为Elastic Stack(在 ELK的基础上增加了 Beats)
数据删除操作
数据浏览
Index: Index是文档的容器,在ES的早期版本中的index(类似于RDMS中的库)中还包含有type(类似于RDMS中的表)的概念。但在后续版本中,type被逐渐取消而index同时具有数据库和表的概念
查询DSL: 使用JSON格式并基于RESTful API进行通信,提供了全文搜索、范围查询、布尔查询、聚合查询等不同的搜索需求

在公司项目实际开发中我们基于logback -> rabbitmq -> elk 工作模式进行日志收集,实现了日志的集中管理。在此基础上通过ES搜索建立系统可视化看板来显示用户在不同周期内访问系统的活跃度
注意:logback是日志框架(log4j也是一种日志框架),而slf4j是日志门面接口
具体相关核心实现流程
- <dependency>
- <groupId>net.logstash.logbackgroupId>
- <artifactId>logstash-logback-encoderartifactId>
- <version>5.1version>
- dependency>
- <dependency>
- <groupId>net.logstash.log4jgroupId>
- <artifactId>jsonevent-layoutartifactId>
- <version>1.7version>
- dependency>
-
- <dependency>
- <groupId>org.elasticsearchgroupId>
- <artifactId>elasticsearchartifactId>
- <version>6.3.1version>
- dependency>
- <dependency>
- <groupId>org.elasticsearch.clientgroupId>
- <artifactId>transportartifactId>
- <version>6.3.1version>
- dependency>
- <dependency>
- <groupId>org.elasticsearch.plugingroupId>
- <artifactId>transport-netty4-clientartifactId>
- <version>6.3.1version>
- dependency>
- # 首先需要在application.yml文件配置log日志相关属性配置
- logging:
- config: classpath:logback-prod.xml #配置logback文件,本地开发不需要配置
- file: logs/${logback.log.file} #存储日志的文件
-
- #我们logback采用Rabbitmq方式收集日志时消息服务配置信息
- logback:
- log:
- path: "./logs/"
- file: logback_amqp.log
- amqp:
- host: 10.225.225.225
- port: 5672
- username: admin
- password: admin
- "1.0" encoding="UTF-8"?>
- <configuration scan="true" scanPeriod="60 seconds" debug="false">
- <include resource="org/springframework/boot/logging/logback/base.xml" />
- <contextName>logbackcontextName>
-
- <springProperty scope="context" name="log.path" source="logback.log.path" />
- <springProperty scope="context" name="log.file" source="logback.log.file" />
- <springProperty scope="context" name="logback.amqp.host" source="logback.amqp.host"/>
- <springProperty scope="context" name="logback.amqp.port" source="logback.amqp.port"/>
- <springProperty scope="context" name="logback.amqp.username" source="logback.amqp.username"/>
- <springProperty scope="context" name="logback.amqp.password" source="logback.amqp.password"/>
-
-
- <appender name="stash-amqp" class="org.springframework.amqp.rabbit.logback.AmqpAppender">
-
- <encoder class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">
- <providers>
- <pattern>
-
- <pattern>
- {
- "time": "%date{ISO8601}",
- "thread": "%thread",
- "level": "%level",
- "class": "%logger{60}",
- "message": "%message",
- "application": "application"
- }
- pattern>
- pattern>
- providers>
- encoder>
- <host>${logback.amqp.host}host>
- <port>${logback.amqp.port}port>
- <username>${logback.amqp.username}username>
- <password>${logback.amqp.password}password>
- <declareExchange>truedeclareExchange>
- <exchangeType>fanoutexchangeType>
- <exchangeName>ex_common_application_LogexchangeName>
- <generateId>truegenerateId>
- <charset>UTF-8charset>
- <durable>truedurable>
- <deliveryMode>PERSISTENTdeliveryMode>
- appender>
-
- <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
- <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
-
- <pattern>%d{HH:mm:ss.SSS} %contextName [%thread] %-5level %logger{36} - %msg%npattern>
- encoder>
- appender>
-
-
- <appender name="file" class="ch.qos.logback.core.rolling.RollingFileAppender">
- <file>${log.path}/${log.file}file>
- <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
-
- <fileNamePattern>${log.path}/%d{yyyy/MM}/${log.file}.%i.zipfileNamePattern>
-
- <MaxHistory>30MaxHistory>
-
- <totalSizeCap>5GBtotalSizeCap>
-
- <maxFileSize>300MBmaxFileSize>
- rollingPolicy>
- <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
-
- <pattern>%d{HH:mm:ss.SSS} %contextName [%thread] %-5level %logger{36} - %msg%npattern>
- encoder>
- appender>
-
-
- <root level="info">
-
- <appender-ref ref="file" />
- <appender-ref ref="stash-amqp" />
- root>
- configuration>
- # 所属应用的yml文件配置elasticsearch信息
- elasticsearch:
- protocol: http
- hostList: 10.225.225.225:9200 # elasticsearch集群-单节点
- connectTimeout: 5000
- socketTimeout: 5000
- connectionRequestTimeout: 5000
- maxConnectNum: 10
- maxConnectPerRoute: 10
- username: # 帐号为空
- password: # 密码为空
Elasticsearch配置类
- package com.bierce;
-
- import java.io.IOException;
- import java.util.concurrent.TimeUnit;
- import org.apache.commons.lang3.StringUtils;
- import org.apache.http.HttpHost;
- import org.apache.http.auth.AuthScope;
- import org.apache.http.auth.UsernamePasswordCredentials;
- import org.apache.http.client.CredentialsProvider;
- import org.apache.http.client.config.RequestConfig.Builder;
- import org.apache.http.impl.client.BasicCredentialsProvider;
- import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
- import org.elasticsearch.client.RestClient;
- import org.elasticsearch.client.RestClientBuilder;
- import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback;
- import org.elasticsearch.client.RestHighLevelClient;
- import org.elasticsearch.client.sniff.ElasticsearchHostsSniffer;
- import org.elasticsearch.client.sniff.HostsSniffer;
- import org.elasticsearch.client.sniff.SniffOnFailureListener;
- import org.elasticsearch.client.sniff.Sniffer;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- /**
- *
- * @ClassName: ElasticSearchConfiguration
- * @Description: ES配置类
- */
- @Configuration
- public class ElasticSearchConfiguration {
-
- @Value("${elasticsearch.protocol}") // 基于Http协议
- private String protocol;
- @Value("${elasticsearch.hostlist}") // 集群地址,如果有多个用“,”隔开
- private String hostList;
- @Value("${elasticsearch.connectTimeout}") // 连接超时时间
- private int connectTimeout;
- @Value("${elasticsearch.socketTimeout}") // Socket 连接超时时间
- private int socketTimeout;
- @Value("${elasticsearch.connectionRequestTimeout}") // 获取请求连接的超时时间
- private int connectionRequestTimeout;
- @Value("${elasticsearch.maxConnectNum}") // 最大连接数
- private int maxConnectNum;
- @Value("${elasticsearch.maxConnectPerRoute}") // 最大路由连接数
- private int maxConnectPerRoute;
- @Value("${elasticsearch.username:}")
- private String username;
- @Value("${elasticsearch.password:}")
- private String password;
-
- // 配置restHighLevelClient,
- // 当Spring容器关闭时,应该调用RestHighLevelClient类的close方法来执行清理工作
- @Bean(destroyMethod="close")
- public RestHighLevelClient restHighLevelClient() {
-
- String[] split = hostList.split(",");
- HttpHost[] httphostArray = new HttpHost[split.length];
- SniffOnFailureListener sniffOnFailureListener = new SniffOnFailureListener();
-
- //获取集群地址进行ip和端口后放入数组
- for(int i=0; i
- String hostName = split[i];
- httphostArray[i] = new HttpHost(hostName.split(":")[0], Integer.parseInt(hostName.split(":")[1]), protocol);
- }
-
- // 构建连接对象
- // 为RestClient 实例设置故障监听器
- RestClientBuilder builder = RestClient.builder(httphostArray).setFailureListener(sniffOnFailureListener);
- // 异步连接延时配置
- builder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
- @Override
- public Builder customizeRequestConfig(Builder requestConfigBuilder) {
- requestConfigBuilder.setConnectTimeout(connectTimeout);
- requestConfigBuilder.setSocketTimeout(socketTimeout);
- requestConfigBuilder.setConnectionRequestTimeout(connectionRequestTimeout);
- return requestConfigBuilder;
- }
- });
- // 连接认证
- CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
- if( StringUtils.isNotBlank( username ) && StringUtils.isNotBlank(password )) {
- credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
- }
- // 异步连接数配置
- builder.setHttpClientConfigCallback(new HttpClientConfigCallback() {
- @Override
- public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
- httpClientBuilder.setMaxConnTotal(maxConnectNum);
- httpClientBuilder.setMaxConnPerRoute(maxConnectPerRoute);
- // 设置帐号密码
- if(credentialsProvider != null
- && StringUtils.isNotBlank( username ) && StringUtils.isNotBlank(password )) {
- httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
- }
- return httpClientBuilder;
- }
- });
-
-
- RestHighLevelClient restHighLevelClient = new RestHighLevelClient(builder);
- RestClient restClient = restHighLevelClient.getLowLevelClient();
-
- HostsSniffer hostsSniffer = new ElasticsearchHostsSniffer(restClient,
- ElasticsearchHostsSniffer.DEFAULT_SNIFF_REQUEST_TIMEOUT,
- ElasticsearchHostsSniffer.Scheme.HTTP);
- try {
- /* 故障后嗅探,不仅意味着每次故障后会更新节点,也会添加普通计划外的嗅探行为,
- * 默认情况是故障之后1分钟后,假设节点将恢复正常,那么我们希望尽可能快的获知。
- * 如上所述,周期可以通过 `setSniffAfterFailureDelayMillis`
- * 方法在创建 Sniffer 实例时进行自定义设置。需要注意的是,当没有启用故障监听时,
- * 这最后一个配置参数不会生效
- */
- Sniffer sniffer = Sniffer.builder(restClient)
- .setSniffAfterFailureDelayMillis(30000)
- .setHostsSniffer(hostsSniffer)
- .build();
- // 将嗅探器关联到嗅探故障监听器上
- sniffOnFailureListener.setSniffer(sniffer);
- sniffer.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- return restHighLevelClient;
- }
- }
- 通过ES提供的API搜索相关数据
- package com.bierce;
-
- /**
- *
- * @ClassName: UserVisitInfo
- * @Description: 用戶访问系统相关信息类
- *
- */
- public class UserVisitInfo {
- private String dayOfWeek; // 星期几
- private Long docCount; //访问次数
-
- public UserVisitInfo() {
- }
- public UserVisitInfo(String dayOfWeek, Long docCount) {
- super();
- this.dayOfWeek = dayOfWeek;
- this.docCount = docCount;
- }
- public String getDayOfWeek() {
- return dayOfWeek;
- }
- public void setDayOfWeek(String dayOfWeek) {
- this.dayOfWeek = dayOfWeek;
- }
- public Long getDocCount() {
- return docCount;
- }
- public void setDocCount(Long docCount) {
- this.docCount = docCount;
- }
- @Override
- public String toString() {
- return "UserVisitInfo [dayOfWeek=" + dayOfWeek + ", docCount=" + docCount + "]";
- }
- }
- package com.bierce;
-
- import java.util.ArrayList;
- import java.util.List;
- import com.bierce.UserVisitInfo;
- import org.elasticsearch.action.search.SearchResponse;
- import org.elasticsearch.client.RestHighLevelClient;
- import org.elasticsearch.index.query.QueryBuilder;
- import org.elasticsearch.index.query.QueryBuilders;
- import org.elasticsearch.script.Script;
- import org.elasticsearch.search.aggregations.AggregationBuilders;
- import org.elasticsearch.search.aggregations.Aggregations;
- import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
- import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder;
- import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
- import org.elasticsearch.search.builder.SearchSourceBuilder;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Service;
-
- /**
- *
- * @ClassName: VisitUserCountSearchTemplate
- * @Description: 获取用户不同周期内活跃度
- *
- */
- @Service
- public class VisitUserCountSearchTemplate {
-
- private static final String INDEX_PREFIX = "user-visit-";
-
- @Autowired
- private RestHighLevelClient restHighLevelClient;
-
- /**
- *
- * @Title: getUserActivityInfo
- * @Description: 获取用户访问系统活跃度
- * @param startDate
- * @param endDate
- * @return
- */
- public List
getUserActivityInfo(String startDate, String endDate) { -
- SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
- searchSourceBuilder.size(0);
- searchSourceBuilder.query(QueryBuilders.matchAllQuery());
-
- // 设置聚合查询相关參數
- String aggregationName = "timeslice";
- String rangeField = "@timestamp";
- String termField = "keyword";
- Script script = new Script("doc['@timestamp'].value.dayOfWeek");
- String[] igonredAppCode = {"AMQP", "Test"};
-
- QueryBuilder timeQueryBuilder = QueryBuilders.boolQuery().must(QueryBuilders.rangeQuery(rangeField).gte(startDate).lte(endDate))
- .mustNot(QueryBuilders.termsQuery(termField, igonredAppCode))
- .filter(QueryBuilders.existsQuery(termField));
-
- // 按照星期几搜索对应数据
- HistogramAggregationBuilder dayOfWeekAggregationBuilder = AggregationBuilders.histogram(aggregationName).script(script).interval(1).extendedBounds(1, 7);
-
- searchSourceBuilder.query(timeQueryBuilder);
- searchSourceBuilder.aggregation(dayOfWeekAggregationBuilder);
-
- SearchResponse searchResponse = ElasticsearchUtils.buildSearchSource(INDEX_PREFIX + "*", searchSourceBuilder, client);
-
- List
userActivityInfoList = new ArrayList<>(); - Aggregations aggregations = searchResponse.getAggregations();
- Histogram dayOfWeekHistogram = aggregations.get(aggregationName);
- List extends Histogram.Bucket> buckets = dayOfWeekHistogram.getBuckets();
-
- for(Histogram.Bucket bucket: buckets) {
- String dayOfWeek = bucket.getKeyAsString();
- long docCount = bucket.getDocCount();
- UserVisitInfo item = new UserVisitInfo(dayOfWeek, docCount);
- userActivityInfoList.add(item);
- }
-
- return userActivityInfoList;
- }
- }
- 将数据返回给前台进行页面渲染,最终实现的效果
可按条件筛选显示用户在不同周期内访问系统的活跃度