• 5、Kafka海量日志收集系统架构设计


     

    5.1、 Kafka海量日志收集实战-log4j2日志输出实战

     5.1.1、新建SpringBoot工程并引入引入maven相关依赖。

    pom.xml内容如下:

    1. <?xml version="1.0" encoding="UTF-8"?>
    2. <project xmlns="http://maven.apache.org/POM/4.0.0"
    3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    5. <parent>
    6. <artifactId>Kafka</artifactId>
    7. <groupId>com.lvxiaosha</groupId>
    8. <version>0.0.1-SNAPSHOT</version>
    9. </parent>
    10. <modelVersion>4.0.0</modelVersion>
    11. <artifactId>collector</artifactId>
    12. <properties>
    13. <maven.compiler.source>17</maven.compiler.source>
    14. <maven.compiler.target>17</maven.compiler.target>
    15. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    16. <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    17. <java.version>17</java.version>
    18. </properties>
    19. <dependencies>
    20. <dependency>
    21. <groupId>org.springframework.boot</groupId>
    22. <artifactId>spring-boot-starter-web</artifactId>
    23. <!-- 排除spring-boot-starter-logging -->
    24. <exclusions>
    25. <exclusion>
    26. <groupId>org.springframework.boot</groupId>
    27. <artifactId>spring-boot-starter-logging</artifactId>
    28. </exclusion>
    29. </exclusions>
    30. </dependency>
    31. <dependency>
    32. <groupId>org.springframework.boot</groupId>
    33. <artifactId>spring-boot-starter-test</artifactId>
    34. <scope>test</scope>
    35. </dependency>
    36. <dependency>
    37. <groupId>org.projectlombok</groupId>
    38. <artifactId>lombok</artifactId>
    39. </dependency>
    40. <!-- log4j2 -->
    41. <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-log4j2 -->
    42. <dependency>
    43. <groupId>org.springframework.boot</groupId>
    44. <artifactId>spring-boot-starter-log4j2</artifactId>
    45. <version>2.7.3</version>
    46. <exclusions>
    47. <exclusion>
    48. <groupId>org.slf4j</groupId>
    49. <artifactId>slf4j-log4j12</artifactId>
    50. </exclusion>
    51. </exclusions>
    52. </dependency>
    53. <dependency>
    54. <groupId>com.lmax</groupId>
    55. <artifactId>disruptor</artifactId>
    56. <version>3.4.4</version>
    57. </dependency>
    58. <dependency>
    59. <groupId>com.alibaba</groupId>
    60. <artifactId>fastjson</artifactId>
    61. <version>2.0.12</version>
    62. </dependency>
    63. </dependencies>
    64. <build>
    65. <finalName>collector</finalName>
    66. <!-- 打包时包含properties、xml -->
    67. <resources>
    68. <resource>
    69. <directory>src/main/java</directory>
    70. <includes>
    71. <include>**/*.properties</include>
    72. <include>**/*.xml</include>
    73. </includes>
    74. <!-- 是否替换资源中的属性-->
    75. <filtering>true</filtering>
    76. </resource>
    77. <resource>
    78. <directory>src/main/resources</directory>
    79. </resource>
    80. </resources>
    81. <plugins>
    82. <plugin>
    83. <groupId>org.springframework.boot</groupId>
    84. <artifactId>spring-boot-maven-plugin</artifactId>
    85. <configuration>
    86. <mainClass>com.lvxiaosha.collector.CollectorApplication</mainClass>
    87. </configuration>
    88. <executions>
    89. <execution>
    90. <goals>
    91. <goal>repackage</goal>
    92. </goals>
    93. </execution>
    94. </executions>
    95. </plugin>
    96. </plugins>
    97. </build>
    98. </project>

     5.1.2、添加工具类:

    1. import java.lang.management.ManagementFactory;
    2. import java.lang.management.RuntimeMXBean;
    3. import java.net.InetAddress;
    4. import java.net.NetworkInterface;
    5. import java.net.SocketAddress;
    6. import java.net.UnknownHostException;
    7. import java.nio.channels.SocketChannel;
    8. import java.util.Enumeration;
    9. import java.util.regex.Matcher;
    10. import java.util.regex.Pattern;
    11. /**
    12. * $NetUtil
    13. * @author lvxiaosha
    14. * @since 2022年9月9日 下午4:59:02
    15. */
    16. public class NetUtil {
    17. public static String normalizeAddress(String address){
    18. String[] blocks = address.split("[:]");
    19. if(blocks.length > 2){
    20. throw new IllegalArgumentException(address + " is invalid");
    21. }
    22. String host = blocks[0];
    23. int port = 80;
    24. if(blocks.length > 1){
    25. port = Integer.valueOf(blocks[1]);
    26. } else {
    27. address += ":"+port; //use default 80
    28. }
    29. String serverAddr = String.format("%s:%d", host, port);
    30. return serverAddr;
    31. }
    32. public static String getLocalAddress(String address){
    33. String[] blocks = address.split("[:]");
    34. if(blocks.length != 2){
    35. throw new IllegalArgumentException(address + " is invalid address");
    36. }
    37. String host = blocks[0];
    38. int port = Integer.valueOf(blocks[1]);
    39. if("0.0.0.0".equals(host)){
    40. return String.format("%s:%d",NetUtil.getLocalIp(), port);
    41. }
    42. return address;
    43. }
    44. private static int matchedIndex(String ip, String[] prefix){
    45. for(int i=0; i<prefix.length; i++){
    46. String p = prefix[i];
    47. if("*".equals(p)){ //*, assumed to be IP
    48. if(ip.startsWith("127.") ||
    49. ip.startsWith("10.") ||
    50. ip.startsWith("172.") ||
    51. ip.startsWith("192.")){
    52. continue;
    53. }
    54. return i;
    55. } else {
    56. if(ip.startsWith(p)){
    57. return i;
    58. }
    59. }
    60. }
    61. return -1;
    62. }
    63. public static String getLocalIp(String ipPreference) {
    64. if(ipPreference == null){
    65. ipPreference = "*>10>172>192>127";
    66. }
    67. String[] prefix = ipPreference.split("[> ]+");
    68. try {
    69. Pattern pattern = Pattern.compile("[0-9]+\\.[0-9]+\\.[0-9]+\\.[0-9]+");
    70. Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();
    71. String matchedIp = null;
    72. int matchedIdx = -1;
    73. while (interfaces.hasMoreElements()) {
    74. NetworkInterface ni = interfaces.nextElement();
    75. Enumeration<InetAddress> en = ni.getInetAddresses();
    76. while (en.hasMoreElements()) {
    77. InetAddress addr = en.nextElement();
    78. String ip = addr.getHostAddress();
    79. Matcher matcher = pattern.matcher(ip);
    80. if (matcher.matches()) {
    81. int idx = matchedIndex(ip, prefix);
    82. if(idx == -1) continue;
    83. if(matchedIdx == -1){
    84. matchedIdx = idx;
    85. matchedIp = ip;
    86. } else {
    87. if(matchedIdx>idx){
    88. matchedIdx = idx;
    89. matchedIp = ip;
    90. }
    91. }
    92. }
    93. }
    94. }
    95. if(matchedIp != null) return matchedIp;
    96. return "127.0.0.1";
    97. } catch (Exception e) {
    98. return "127.0.0.1";
    99. }
    100. }
    101. public static String getLocalIp() {
    102. return getLocalIp("*>10>172>192>127");
    103. }
    104. public static String remoteAddress(SocketChannel channel){
    105. SocketAddress addr = channel.socket().getRemoteSocketAddress();
    106. String res = String.format("%s", addr);
    107. return res;
    108. }
    109. public static String localAddress(SocketChannel channel){
    110. SocketAddress addr = channel.socket().getLocalSocketAddress();
    111. String res = String.format("%s", addr);
    112. return addr==null? res: res.substring(1);
    113. }
    114. public static String getPid(){
    115. RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
    116. String name = runtime.getName();
    117. int index = name.indexOf("@");
    118. if (index != -1) {
    119. return name.substring(0, index);
    120. }
    121. return null;
    122. }
    123. public static String getLocalHostName() {
    124. try {
    125. return (InetAddress.getLocalHost()).getHostName();
    126. } catch (UnknownHostException uhe) {
    127. String host = uhe.getMessage();
    128. if (host != null) {
    129. int colon = host.indexOf(':');
    130. if (colon > 0) {
    131. return host.substring(0, colon);
    132. }
    133. }
    134. return "UnknownHost";
    135. }
    136. }
    137. }
    1. import java.util.ArrayList;
    2. import java.util.List;
    3. import com.alibaba.fastjson.JSON;
    4. import com.alibaba.fastjson.JSONObject;
    5. import com.alibaba.fastjson.serializer.SerializerFeature;
    6. import lombok.extern.slf4j.Slf4j;
    7. /**
    8. * $FastJsonConvertUtil
    9. * @author lvxiaosha
    10. * @since 2022年9月9日 下午4:53:28
    11. */
    12. @Slf4j
    13. public class FastJsonConvertUtil {
    14. private static final SerializerFeature[] featuresWithNullValue = { SerializerFeature.WriteMapNullValue, SerializerFeature.WriteNullBooleanAsFalse,
    15. SerializerFeature.WriteNullListAsEmpty, SerializerFeature.WriteNullNumberAsZero, SerializerFeature.WriteNullStringAsEmpty };
    16. /**
    17. * <B>方法名称:</B>将JSON字符串转换为实体对象<BR>
    18. * <B>概要说明:</B>将JSON字符串转换为实体对象<BR>
    19. * @author lvxiaosha
    20. * @since 202299日 下午4:53:49
    21. * @param data JSON字符串
    22. * @param clzss 转换对象
    23. * @return T
    24. */
    25. public static <T> T convertJSONToObject(String data, Class<T> clzss) {
    26. try {
    27. T t = JSON.parseObject(data, clzss);
    28. return t;
    29. } catch (Exception e) {
    30. log.error("convertJSONToObject Exception", e);
    31. return null;
    32. }
    33. }
    34. /**
    35. * <B>方法名称:</B>将JSONObject对象转换为实体对象<BR>
    36. * <B>概要说明:</B>将JSONObject对象转换为实体对象<BR>
    37. * @author lvxiaosha
    38. * @since 202299日 下午4:54:32
    39. * @param data JSONObject对象
    40. * @param clzss 转换对象
    41. * @return T
    42. */
    43. public static <T> T convertJSONToObject(JSONObject data, Class<T> clzss) {
    44. try {
    45. T t = JSONObject.toJavaObject(data, clzss);
    46. return t;
    47. } catch (Exception e) {
    48. log.error("convertJSONToObject Exception", e);
    49. return null;
    50. }
    51. }
    52. /**
    53. * <B>方法名称:</B>将JSON字符串数组转为List集合对象<BR>
    54. * <B>概要说明:</B>将JSON字符串数组转为List集合对象<BR>
    55. * @author lvxiaosha
    56. * @since 202299日 下午4:54:50
    57. * @param data JSON字符串数组
    58. * @param clzss 转换对象
    59. * @return List<T>集合对象
    60. */
    61. public static <T> List<T> convertJSONToArray(String data, Class<T> clzss) {
    62. try {
    63. List<T> t = JSON.parseArray(data, clzss);
    64. return t;
    65. } catch (Exception e) {
    66. log.error("convertJSONToArray Exception", e);
    67. return null;
    68. }
    69. }
    70. /**
    71. * <B>方法名称:</B>将List<JSONObject>转为List集合对象<BR>
    72. * <B>概要说明:</B>将List<JSONObject>转为List集合对象<BR>
    73. * @author lvxiaosha
    74. * @since 202299日 下午4:55:11
    75. * @param data List<JSONObject>
    76. * @param clzss 转换对象
    77. * @return List<T>集合对象
    78. */
    79. public static <T> List<T> convertJSONToArray(List<JSONObject> data, Class<T> clzss) {
    80. try {
    81. List<T> t = new ArrayList<T>();
    82. for (JSONObject jsonObject : data) {
    83. t.add(convertJSONToObject(jsonObject, clzss));
    84. }
    85. return t;
    86. } catch (Exception e) {
    87. log.error("convertJSONToArray Exception", e);
    88. return null;
    89. }
    90. }
    91. /**
    92. * <B>方法名称:</B>将对象转为JSON字符串<BR>
    93. * <B>概要说明:</B>将对象转为JSON字符串<BR>
    94. * @author lvxiaosha
    95. * @since 202299日 下午4:55:41
    96. * @param obj 任意对象
    97. * @return JSON字符串
    98. */
    99. public static String convertObjectToJSON(Object obj) {
    100. try {
    101. String text = JSON.toJSONString(obj);
    102. return text;
    103. } catch (Exception e) {
    104. log.error("convertObjectToJSON Exception", e);
    105. return null;
    106. }
    107. }
    108. /**
    109. * <B>方法名称:</B>将对象转为JSONObject对象<BR>
    110. * <B>概要说明:</B>将对象转为JSONObject对象<BR>
    111. * @author lvxiaosha
    112. * @since 202299日 下午4:55:55
    113. * @param obj 任意对象
    114. * @return JSONObject对象
    115. */
    116. public static JSONObject convertObjectToJSONObject(Object obj){
    117. try {
    118. JSONObject jsonObject = (JSONObject) JSONObject.toJSON(obj);
    119. return jsonObject;
    120. } catch (Exception e) {
    121. log.error("convertObjectToJSONObject Exception", e);
    122. return null;
    123. }
    124. }
    125. public static String convertObjectToJSONWithNullValue(Object obj) {
    126. try {
    127. String text = JSON.toJSONString(obj, featuresWithNullValue);
    128. return text;
    129. } catch (Exception e) {
    130. log.error("convertObjectToJSONWithNullValue Exception", e);
    131. return null;
    132. }
    133. }
    134. }
    1. import org.jboss.logging.MDC;
    2. import org.springframework.context.EnvironmentAware;
    3. import org.springframework.core.env.Environment;
    4. import org.springframework.stereotype.Component;
    5. @Component
    6. public class InputMDC implements EnvironmentAware {
    7. private static Environment environment;
    8. @Override
    9. public void setEnvironment(Environment environment) {
    10. InputMDC.environment = environment;
    11. }
    12. public static void putMDC() {
    13. MDC.put("hostName", NetUtil.getLocalHostName());
    14. MDC.put("ip", NetUtil.getLocalIp());
    15. MDC.put("applicationName", environment.getProperty("spring.application.name"));
    16. }
    17. }

     5.1.3、编写IndexController

    1. import com.lvxiaosha.collector.util.InputMDC;
    2. import lombok.extern.slf4j.Slf4j;
    3. import org.springframework.web.bind.annotation.RequestMapping;
    4. import org.springframework.web.bind.annotation.RestController;
    5. @Slf4j
    6. @RestController
    7. public class IndexController {
    8. /**
    9. * [%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ}]
    10. * [%level{length=5}]
    11. * [%thread-%tid]
    12. * [%logger]
    13. * [%X{hostName}]
    14. * [%X{ip}]
    15. * [%X{applicationName}]
    16. * [%F,%L,%C,%M]
    17. * [%m] ## '%ex'%n
    18. * -----------------------------------------------
    19. * [2019-09-18T14:42:51.451+08:00]
    20. * [INFO]
    21. * [main-1]
    22. * [org.springframework.boot.web.embedded.tomcat.TomcatWebServer]
    23. * []
    24. * []
    25. * []
    26. * [TomcatWebServer.java,90,org.springframework.boot.web.embedded.tomcat.TomcatWebServer,initialize]
    27. * [Tomcat initialized with port(s): 8001 (http)] ## ''
    28. *
    29. * ["message",
    30. * "\[%{NOTSPACE:currentDateTime}\]
    31. * \[%{NOTSPACE:level}\]
    32. * \[%{NOTSPACE:thread-id}\]
    33. * \[%{NOTSPACE:class}\]
    34. * \[%{DATA:hostName}\]
    35. * \[%{DATA:ip}\]
    36. * \[%{DATA:applicationName}\]
    37. * \[%{DATA:location}\]
    38. * \[%{DATA:messageInfo}\]
    39. * ## (\'\'|%{QUOTEDSTRING:throwable})"]
    40. * @return
    41. */
    42. @RequestMapping(value = "/index")
    43. public String index() {
    44. InputMDC.putMDC();
    45. log.info("我是一条info日志");
    46. log.warn("我是一条warn日志");
    47. log.error("我是一条error日志");
    48. return "idx";
    49. }
    50. @RequestMapping(value = "/err")
    51. public String err() {
    52. InputMDC.putMDC();
    53. try {
    54. int a = 1/0;
    55. } catch (Exception e) {
    56. log.error("算术异常", e);
    57. }
    58. return "err";
    59. }
    60. }

     5.1.4、添加application.properties和log4j2.xml配置文件

    application.properties内容:

    1. server.servlet.context-path=/
    2. server.port=8001
    3. spring.application.name=collector
    4. server.servlet.encoding.charset=UTF-8
    5. spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
    6. spring.jackson.time-zone=GMT+8
    7. spring.jackson.default-property-inclusion=NON_NULL

    log4j2.xml内容:

    1. <?xml version="1.0" encoding="UTF-8"?>
    2. <Configuration status="INFO" schema="Log4J-V2.0.xsd" monitorInterval="600" >
    3. <!--日志级别以及优先级排序: OFF > FATAL > ERROR > WARN > INFO > DEBUG > TRACE > ALL -->
    4. <!--
    5. Properties里面可以定义一些变量供下面使用
    6. Property的LOG_HOME定义了日志输出的目录名称
    7. PropertyFILE_NAME定义了日志输出的文件名称,这里和项目名称一致。
    8. PropertyFILE_NAME定义了日志输出的格式。
    9. [%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ}] [%level{length=5}] [%thread-%tid] [%logger] [%X{hostName}] [%X{ip}] [%X{applicationName}] [%F,%L,%C,%M] [%m] ## '%ex'%n
    10. [%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ}] %d表示时间,后面的中括号里面定义了时间格式。
    11. [%level{length=5}] %level表示日志级别。
    12. [%thread-%tid] 线程id
    13. [%logger] 日志输出的具体信息
    14. [%X{hostName}] %X表示自定义hostName
    15. [%X{ip}] %X表示自定义ip
    16. [%X{applicationName}] %X表示自定义applicationName
    17. [%F,%L,%C,%M] %F表示当前执行的类是哪个文件,%L表示行号,%C代表class,%M代表Method
    18. [%m] ## '%ex'%n %m表示message,是日志输出的内容。 %ex表示异常信息 %n表示换行
    19. -->
    20. <Properties>
    21. <Property name="LOG_HOME">D:\1-code\0-learning\1-back-end\996-dev\all-learning\Kafka\collector\logs</Property>
    22. <property name="FILE_NAME">collector</property>
    23. <property name="patternLayout">[%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ}] [%level{length=5}] [%thread-%tid] [%logger] [%X{hostName}] [%X{ip}] [%X{applicationName}] [%F,%L,%C,%M] [%m] ## '%ex'%n</property>
    24. <!-- 控制台显示的日志最低级别 -->
    25. <property name="console_print_level">DEBUG</property>
    26. </Properties>
    27. <!-- Appenders表示输出的组件有哪些 -->
    28. <Appenders>
    29. <!--定义输出到控制台的日志格式为上面定义的patternLayout格式-->
    30. <Console name="CONSOLE" target="SYSTEM_OUT">
    31. <!-- 设置控制台只输出level及以上级别的信息(onMatch),其他的直接拒绝(onMismatch)-->
    32. <ThresholdFilter level="${console_print_level}" onMatch="ACCEPT" onMismatch="DENY"/>
    33. <PatternLayout pattern="${patternLayout}"/>
    34. </Console>
    35. <!--
    36. fileName定义了日志路径及名称,最后输出的文件是:logs/app-collector.log
    37. app-collector.log里面输出我们的全量日志
    38. filePattern定义了日志的时间节点
    39. -->
    40. <RollingRandomAccessFile name="appAppender" fileName="${LOG_HOME}/app-${FILE_NAME}.log" filePattern="${LOG_HOME}/app-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log" >
    41. <PatternLayout pattern="${patternLayout}" />
    42. <Policies>
    43. <TimeBasedTriggeringPolicy interval="1"/>
    44. <!-- 500MB一个日志文件 -->
    45. <SizeBasedTriggeringPolicy size="500MB"/>
    46. </Policies>
    47. <DefaultRolloverStrategy max="20"/>
    48. </RollingRandomAccessFile>
    49. <!--
    50. fileName定义了日志路径及名称,最后输出的文件是:logs/error-collector.log
    51. error-collector.log里面输出我们的错误日志
    52. filePattern定义了日志的时间节点
    53. -->
    54. <RollingRandomAccessFile name="errorAppender" fileName="${LOG_HOME}/error-${FILE_NAME}.log" filePattern="${LOG_HOME}/error-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log" >
    55. <PatternLayout pattern="${patternLayout}" />
    56. <!--
    57. Filters对日志进行过滤,只有warn级别及以上级别的日志才会收集。
    58. -->
    59. <Filters>
    60. <ThresholdFilter level="warn" onMatch="ACCEPT" onMismatch="DENY"/>
    61. </Filters>
    62. <Policies>
    63. <TimeBasedTriggeringPolicy interval="1"/>
    64. <SizeBasedTriggeringPolicy size="500MB"/>
    65. </Policies>
    66. <DefaultRolloverStrategy max="20"/>
    67. </RollingRandomAccessFile>
    68. </Appenders>
    69. <Loggers>
    70. <logger name="com.cm.server.busi" level="debug" additivity="false">
    71. <appender-ref ref="CONSOLE"/>
    72. <appender-ref ref="appAppender"/>
    73. <appender-ref ref="errorAppender"/>
    74. </logger>
    75. <!-- 业务相关 异步logger -->
    76. <AsyncLogger name="com.lvxiaosha.*" level="info" includeLocation="true">
    77. <AppenderRef ref="appAppender"/>
    78. </AsyncLogger>
    79. <AsyncLogger name="com.lvxiaosha.*" level="info" includeLocation="true">
    80. <AppenderRef ref="errorAppender"/>
    81. </AsyncLogger>
    82. <Root level="info">
    83. <Appender-Ref ref="CONSOLE"/>
    84. <Appender-Ref ref="appAppender"/>
    85. <AppenderRef ref="errorAppender"/>
    86. </Root>
    87. </Loggers>
    88. </Configuration>

    5.2、Kafka海量日志搜集实战_filebeat日志收集实战

    5.2.1 Filebeat8.2.3安装

    参考文章:Filebeat 的学习笔记_白居不易.的博客-CSDN博客_filebeat 无法执行二进制文件

    下载地址:https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-8.2.3-linux-x86_64.tar.gz

    cd /home/software/11-Filebeat/

    tar -zxvf filebeat-8.2.3-linux-x86_64.tar.gz -C /usr/local/

    cd /usr/local

    mv filebeat-8.2.3-linux-x86_64/ filebeat-8.2.3

    ## 配置filebeat,可以参考filebeat.full.yml中的配置。

    vim /usr/local/filebeat-8.2.3/filebeat.yml

    1. ###################### Filebeat Configuration Example #########################
    2. filebeat.prospectors:
    3. - input_type: log
    4. paths:
    5. ## app-服务名称.log, 为什么写死,防止发生轮转抓取历史数据
    6. - /usr/local/logs/app-collector.log
    7. #定义写入 ES 时的 _type
    8. document_type: "app-log"
    9. multiline:
    10. #pattern: '^\s*(\d{4}|\d{2})\-(\d{2}|[a-zA-Z]{3})\-(\d{2}|\d{4})' # 指定匹配的表达式(匹配以 2017-11-15 08:04:23:889 时间格式开头的字符串)
    11. pattern: '^\[' # 指定匹配的表达式(匹配以 "{ 开头的字符串)
    12. negate: true # 是否匹配到
    13. match: after # 合并到上一行的末尾
    14. max_lines: 2000 # 最大的行数
    15. timeout: 2s # 如果在规定时间没有新的日志事件就不等待后面的日志
    16. fields:
    17. logbiz: collector
    18. logtopic: app-log-collector ## 按服务划分用作kafka topic
    19. evn: dev
    20. - input_type: log
    21. paths:
    22. - /usr/local/logs/error-collector.log
    23. document_type: "error-log"
    24. multiline:
    25. #pattern: '^\s*(\d{4}|\d{2})\-(\d{2}|[a-zA-Z]{3})\-(\d{2}|\d{4})' # 指定匹配的表达式(匹配以 2017-11-15 08:04:23:889 时间格式开头的字符串)
    26. pattern: '^\[' # 指定匹配的表达式(匹配以 "{ 开头的字符串)
    27. negate: true # 是否匹配到
    28. match: after # 合并到上一行的末尾
    29. max_lines: 2000 # 最大的行数
    30. timeout: 2s # 如果在规定时间没有新的日志事件就不等待后面的日志
    31. fields:
    32. logbiz: collector
    33. logtopic: error-log-collector ## 按服务划分用作kafka topic
    34. evn: dev
    35. output.kafka:
    36. enabled: true
    37. hosts: ["192.168.110.130:9092"]
    38. topic: '%{[fields.logtopic]}'
    39. partition.hash:
    40. reachable_only: true
    41. compression: gzip
    42. max_message_bytes: 1000000
    43. required_acks: 1
    44. logging.to_files: true

    filebeat启动:

    ## 检查配置是否正确

    cd /usr/local/filebeat-8.2.3

    ./filebeat test config

    ## 启动zookeeper:

    zkServer.sh start

    查看zookeeper状态:

    zkServer.sh status

    ## 启动kafka

    /usr/local/kafka_2.13/bin/kafka-server-start.sh /usr/local/kafka_2.13/config/server.properties &

    ## 查看topic列表:

    kafka-topics.sh --bootstrap-server 192.168.110.130:9092 --list

    ## 创建topic

    kafka-topics.sh --bootstrap-server 192.168.110.130:9092 --create --topic app-log-collector --partitions 1  --replication-factor 1

    kafka-topics.sh --bootstrap-server 192.168.110.130:9092 --create --topic error-log-collector --partitions 1  --replication-factor 1  


    ## 查看topic情况

    kafka-topics.sh --bootstrap-server 192.168.110.130:9092 --topic app-log-collector --describe

    kafka-topics.sh --bootstrap-server 192.168.110.130:9092 --topic error-log-collector --describe

    ## 启动filebeat

    cd /usr/local/filebeat-8.2.3/

    filebeat &

    ps -ef | grep filebeat

    ## 将后端的SpringBoot程序打成jar包,放到和filebeat同一个虚拟机上,启动后端jar包服务。会看到有日志输出到/usr/local/logs目录下面:

    然后访问虚拟机上的接口,打印一下日志:

     http://192.168.110.130:8001/index

    http://192.168.110.130:8001/err

    这样,filebeat就能收集到后端jar包输出的日志了。

    filebeat收集到数据之后,就要看看kafka里面有没有数据。

     

    我们可以看到log文件大小不是0,说明有数据。具体数据接下来可以通过logstash来看。

    5.2.2 logstash8.2.3基础语法与使用

    logstash-script.conf文件内容:

    1. ## multiline 插件也可以用于其他类似的堆栈式信息,比如 linux 的内核日志。
    2. input {
    3. kafka {
    4. ## app-log-服务名称
    5. topics_pattern => "app-log-.*"
    6. bootstrap_servers => "192.168.110.130:9092"
    7. codec => json
    8. consumer_threads => 1 ## 增加consumer的并行消费线程数
    9. decorate_events => true
    10. #auto_offset_rest => "latest"
    11. group_id => "app-log-group"
    12. }
    13. kafka {
    14. ## error-log-服务名称
    15. topics_pattern => "error-log-.*"
    16. bootstrap_servers => "192.168.110.130:9092"
    17. codec => json
    18. consumer_threads => 1
    19. decorate_events => true
    20. #auto_offset_rest => "latest"
    21. group_id => "error-log-group"
    22. }
    23. }
    24. filter {
    25. ## 时区转换
    26. ruby {
    27. code => "event.set('index_time',event.timestamp.time.localtime.strftime('%Y.%m.%d'))"
    28. }
    29. if "app-log" in [fields][logtopic]{
    30. grok {
    31. ## 表达式
    32. match => ["message", "\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"]
    33. }
    34. }
    35. if "error-log" in [fields][logtopic]{
    36. grok {
    37. ## 表达式
    38. match => ["message", "\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"]
    39. }
    40. }
    41. }
    42. ## 测试输出到控制台:
    43. output {
    44. stdout { codec => rubydebug }
    45. }
    46. ## elasticsearch:
    47. output {
    48. if "app-log" in [fields][logtopic]{
    49. ## es插件
    50. elasticsearch {
    51. # es服务地址
    52. hosts => ["192.168.110.130:9200"]
    53. # 用户名密码
    54. user => "elastic"
    55. password => "123456"
    56. ## 索引名,+ 号开头的,就会自动认为后面是时间格式:
    57. ## javalog-app-service-2019.01.23
    58. index => "app-log-%{[fields][logbiz]}-%{index_time}"
    59. # 是否嗅探集群ip:一般设置true;http://192.168.110.130:9200/_nodes/http?pretty
    60. # 通过嗅探机制进行es集群负载均衡发日志消息
    61. sniffing => true
    62. # logstash默认自带一个mapping模板,进行模板覆盖
    63. template_overwrite => true
    64. }
    65. }
    66. if "error-log" in [fields][logtopic]{
    67. elasticsearch {
    68. hosts => ["192.168.110.130:9200"]
    69. user => "elastic"
    70. password => "1TdhblkFcdhx2a"
    71. index => "error-log-%{[fields][logbiz]}-%{index_time}"
    72. sniffing => true
    73. template_overwrite => true
    74. }
    75. }
    76. }

    logstash安装:

    # 进入logstash上传目录

    cd /home/software/12-logstash/

    ## 解压安装
    tar -zxvf logstash-8.2.3-linux-x86_64.tar.gz -C /usr/local/

    ## 创建script文件夹

    cd  /usr/local/logstash-8.2.3/

    mkdir script

    ## 上传logstash-srcipt.conf文件到/usr/local/logstash-8.2.3/script/目录下

    ## conf下配置文件说明:
    # logstash配置文件:/config/logstash.yml
    # JVM参数文件:/config/jvm.options
    # 日志格式配置文件:log4j2.properties
    # 制作Linux服务参数:/config/startup.options

     # JVM参数文件

    vim /usr/local/logstash-8.2.3/config/jvm.options

     工作中配置一半内存或60%的内存。比如虚拟机是10g内存,那么这块就配6g。

    ## 配置文件说明:
    vim /usr/local/logstash-8.2.3/config/logstash.yml

    --path.config 或 –f :logstash启动时使用的配置文件
    --configtest 或 –t:测试 Logstash 读取到的配置文件语法是否能正常解析
    --log或-l:日志输出存储位置
    --pipeline.workers 或 –w:运行 filter 和 output 的 pipeline 线程数量。默认是 CPU 核数。
    --pipeline.batch.size 或 –b:每个 Logstash pipeline 线程,在执行具体的 filter 和 output 函数之前,最多能累积的日志条数。
    --pipeline.batch.delay 或 –u:每个 Logstash pipeline 线程,在打包批量日志的时候,最多等待几毫秒。
    --verbose:输出调试日志
    --debug:输出更多的调试日志

    ## 启动配置 比如启动时的java位置、LS的home等
    vim /usr/local/logstash-8.2.3/config/startup.options

    ## 数据收集目录:/usr/local/logstash-8.2.3/data
    ## 插件目录:/usr/local/logstash-8.2.3/vendor/bundle/jruby/1.9/gems
    ## 查看插件命令:
    /usr/local/logstash-8.2.3/bin/logstash-plugin list
    ## 更新插件命令:
    /usr/local/logstash-8.2.3/bin/logstash-plugin update logstash-xxxx-xxxxx
    ## 安装插件命令:
    /usr/local/logstash-8.2.3/bin/logstash-plugin install logstash-xxxx-xxxxx
    ## 插件地址: https://github.com/logstash-plugins

    logstash语法与基本使用:

    1. Logstash设计了自己的DSL包括区域,注释。数据类型(布尔值,字符串,数组,哈希),条件判断字段引用等。
    2. Logstash用{}来定义区域。区域内可以包含插件区域定义,你可以在一个区域内定义多个插件。插件区域内则可以定义键值对设置。
    3. 格式、语法、使用方式:
    1. # 注释.
    2. input {
    3. ...
    4. }
    5. filter {
    6. ...
    7. }
    8. output {
    9. ...
    10. }
    1. ## 两个input设置:
    2. input {
    3. file {
    4. path => "/var/log/messages"
    5. type => "syslog"
    6. }
    7. file {
    8. path => "/var/log/apache/access.log"
    9. type => "apache"
    10. }
    11. }
    1. ## 数据类型:
    2. ## bool类型
    3. debug => true
    4. ## string类型
    5. host => "hostname"
    6. ## number类型
    7. port => 6789
    8. ## array or list类型
    9. path => ["/var/log/message","/var/log/*.log"]
    10. ## hash类型
    11. match => {
    12. "field1" => "value1"
    13. "field2" => "value2"
    14. }
    15. ## codec类型
    16. codec => "json"
    17. ##字段引用方式:
    18. {
    19. "agent": "Mozilla/5.0 (compatible; MSIE 9.0)",
    20. "ip": "192.168.24.44",
    21. "request": "/index.html"
    22. "response": {
    23. "status": 200,
    24. "bytes": 52353
    25. },
    26. "ua": {
    27. "os": "Windows 7"
    28. }
    29. }
    30. ##获取字段值:
    31. [response][status]
    32. [ua][os]
    1. ## 条件判断condition:
    2. if EXPRESSION {
    3. ...
    4. } else if EXPRESSION {
    5. ...
    6. } else {
    7. ...
    8. }
    9. ==(等于), !=(不等于), <(小于), >(大于), <=(小于等于), >=(大于等于), =~(匹配正则), !~(不匹配正则)
    10. in(包含), not in(不包含), and(与), or(或), nand(非与), xor(非或)
    11. ()(复合表达式), !()(对复合表达式结果取反)
    1. ## 使用环境变量(缺失报错):
    2. input {
    3. tcp {
    4. port => "${TCP_PORT}"
    5. }
    6. }
    7. ## 使用环境变量(缺失使用默认值):
    8. input {
    9. tcp {
    10. port => "${TCP_PORT:54321}"
    11. }
    12. }

            4.logstash例子:

    1. ## input 从标准输入流:
    2. input { stdin { } }
    3. ## 输入数据之后 如何进行处理:
    4. filter {
    5. ## grok:解析元数据插件,这里从input输入进来的所有数据默认都会存放到 "message" 字段中
    6. ## grok提供很多正则表达式,地址为:http://grokdebug.herokuapp.com/patterns
    7. ## 比如:%{COMBINEDAPACHELOG} 表示其中一种正则表达式 Apache的表达式
    8. grok {
    9. match => { "message" => "%{COMBINEDAPACHELOG}" }
    10. }
    11. ## date:日期格式化
    12. date {
    13. match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
    14. }
    15. }
    16. ## output 从标准输出流:
    17. output {
    18. elasticsearch { hosts => ["192.168.110.130:9200"] }
    19. stdout { codec => rubydebug }
    20. }

            5.file插件使用:

    1. ## file插件
    2. input {
    3. file {
    4. path => ["/var/log/*.log", "/var/log/message"]
    5. type => "system"
    6. start_position => "beginning"
    7. }
    8. }
    9. ## 其他参数:
    10. discover_interval ## 表示每隔多久检测一下文件,默认15
    11. exclude ## 表示排除那些文件
    12. close_older ## 文件超过多长时间没有更新,就关闭监听 默认3600s
    13. ignore_older ## 每次检查文件列表 如果有一个文件 最后修改时间超过这个值 那么就忽略文件 86400s
    14. sincedb_path ## sincedb保存文件的位置,默认存在home下(/dev/null
    15. sincedb_write_interval ## 每隔多久去记录一次 默认15
    16. stat_interval ## 每隔多久查询一次文件状态 默认1
    17. start_position ## 从头开始读取或者从结尾开始读取

    5.2.3 ElasticSearch8.2.3安装

    ElasticSearch8.2.3安装请参照之前的文章:1.1 Centos7中安装ElasticSearch8.2.3_Iamlvxiaosha的博客-CSDN博客

     

    切换esuser用户:

    su esuser

    后台启动ES:

    cd /usr/local/elasticsearch-8.2.3/bin/

    ./elasticsearch  -d

    5.2.3 Kibana8.2.3安装

    下载地址:https://artifacts.elastic.co/downloads/kibana/kibana-8.2.3-linux-x86_64.tar.gz

    ## 解压cd /home/software/13-kibana/目录下的kibana

    tar -zxvf /home/software/13-kibana/kibana-8.2.3-linux-x86_64.tar.gz -C /usr/local/


    ## 进入kibana目录,修改配置文件

    vim /usr/local/kibana-8.2.3/config/kibana.yml


    ## 修改配置如下:

    server.port: 5601

    server.host: "192.168.110.130"

    elasticsearch.hosts: ["http://192.168.110.130:9200"]


    ## 启动:

    # 和ElasticSearch的启动用户一致。

    su esuser

    /usr/local/kibana-8.2.3/bin/kibana &

    如果出现下面的报错:

    解决方案:

    给 Kibana 的安装目录授权:

    chown -R esuser:esuser /usr/local/kibana-8.2.3/


    ## 指定配置文件启动:
    nohup /usr/local/kibana-8.2.3/bin/kibana -c /usr/local/kibana-8.2.3/config/kibana.yml > /dev/null 2>&1 &

    ## 访问:
    http://192.168.110.130:5601/app/kibana (5601为kibana默认端口)

    Logstash启动:

    Logstash后台启动方式:

    nohup /usr/local/logstash-8.2.3/bin/logstash -f /usr/local/logstash-8.2.3/script/logstash-script.conf &

    Logstash控制台启动方式:

    /usr/local/logstash-8.2.3/bin/logstash -f /usr/local/logstash-8.2.3/script/logstash-script.conf

    控制台启动方式可以用于测试阶段,方便看到启动信息,以及控制台打印的Logstash数据。

    如果启动Logstash出现如下报错:

    使用logstash配置output输出是出现的错误,网上百度的答案基本上是密码配置,或者网络故障。

    我遇到的是,由于我的elasticsearch和logstash 配置在一台服务器上,而且用了127.0.0.1 所以在logstash配置文件需要写成hosts => "127.0.0.1:9200" 不然就报错了。

    解决方案:

    logstash-script.conf文件内容修改如下:

    1. ## multiline 插件也可以用于其他类似的堆栈式信息,比如 linux 的内核日志。
    2. input {
    3. kafka {
    4. ## app-log-服务名称
    5. topics_pattern => "app-log-.*"
    6. bootstrap_servers => "127.0.0.1:9092"
    7. codec => json
    8. consumer_threads => 1 ## 增加consumer的并行消费线程数
    9. decorate_events => true
    10. #auto_offset_rest => "latest"
    11. group_id => "app-log-group"
    12. }
    13. kafka {
    14. ## error-log-服务名称
    15. topics_pattern => "error-log-.*"
    16. bootstrap_servers => "127.0.0.1:9092"
    17. codec => json
    18. consumer_threads => 1
    19. decorate_events => true
    20. #auto_offset_rest => "latest"
    21. group_id => "error-log-group"
    22. }
    23. }
    24. filter {
    25. ## 时区转换
    26. ruby {
    27. code => "event.set('index_time',event.timestamp.time.localtime.strftime('%Y.%m.%d'))"
    28. }
    29. if "app-log" in [fields][logtopic]{
    30. grok {
    31. ## 表达式
    32. match => ["message", "\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"]
    33. }
    34. }
    35. if "error-log" in [fields][logtopic]{
    36. grok {
    37. ## 表达式
    38. match => ["message", "\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"]
    39. }
    40. }
    41. }
    42. ## 测试输出到控制台:
    43. output {
    44. stdout { codec => rubydebug }
    45. }
    46. ## elasticsearch:
    47. output {
    48. if "app-log" in [fields][logtopic]{
    49. ## es插件
    50. elasticsearch {
    51. # es服务地址
    52. hosts => ["127.0.0.1:9200"]
    53. # 用户名密码
    54. user => "elastic"
    55. password => "123456"
    56. ## 索引名,+ 号开头的,就会自动认为后面是时间格式:
    57. ## javalog-app-service-2019.01.23
    58. index => "app-log-%{[fields][logbiz]}-%{index_time}"
    59. # 是否嗅探集群ip:一般设置true;http://192.168.110.130:9200/_nodes/http?pretty
    60. # 通过嗅探机制进行es集群负载均衡发日志消息
    61. sniffing => true
    62. # logstash默认自带一个mapping模板,进行模板覆盖
    63. template_overwrite => true
    64. }
    65. }
    66. if "error-log" in [fields][logtopic]{
    67. elasticsearch {
    68. hosts => ["127.0.0.1:9200"]
    69. user => "elastic"
    70. password => "1TdhblkFcdhx2a"
    71. index => "error-log-%{[fields][logbiz]}-%{index_time}"
    72. sniffing => true
    73. template_overwrite => true
    74. }
    75. }
    76. }

    我们再次访问虚拟机上的接口:

     http://192.168.110.130:8001/index

    http://192.168.110.130:8001/err

    这样,Logstash控制台就能打印出日志了

  • 相关阅读:
    M4a文件解析(一)---某些播放器不能播放m4a(如炬芯播放器)
    动态规划解题步骤
    卡尔曼滤波器的通俗理解
    python基础之函数lambda表达式
    AWS SAA-C03 #157
    CIPU落地专有云:是“小众需求”还是“机会之门”?
    NCCL源码解析②:Bootstrap网络连接的建立
    2022/7/20 LocalDateTime将年月日转化为时间戳
    【高等数学基础进阶】不定积分-练习 & 定积分与反常积分-补充
    AI智能网关在工业物联网领域有哪些应用优势
  • 原文地址:https://blog.csdn.net/Xx13624558575/article/details/126783303