log4j2 在 2.11.0 之后的版本,已经内置了 KafkaAppender 支持可以将打印的日志直接发送到 kafka 中,在这之前如果想要集中收集应用的日志,就需要自定义一个 Layout 来实现,相对来说还是比较麻烦的。
官网文档:Log4j – Log4j 2 Appenders
- <!-- kafka client -->
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- <version>2.0.0</version>
- </dependency>
-
- <!-- 支持 scala -->
- <dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-api-scala_2.12</artifactId>
- <version>11.0</version>
- </dependency>
-
- <!-- 核心依赖 -->
- <dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-api</artifactId>
- <version>2.20.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-core</artifactId>
- <version>2.20.0</version>
- </dependency>
注意这里有个 syncSend 控制着是否异步发送,false 使用异步发送也就是会攒小批发送,拥有更高的吞吐量,但相对来说延迟也会增加,建议生产环境开启,本地环境关闭,否则可能会出现程序结束了直接退出,导致 kafka 的批攒的数据没有来得及发送,自然也会导致数据丢失。
此外,如果不想每个类的日志都采集到 kafaka 里面,我们可以定义个类,通过这个类发送的日志才收集到 kafka 里面,可以参考下面的配置例子。
- <?xml version="1.0" encoding="UTF-8"?>
- <Configuration status="INFO">
- <Properties>
- <Property name="kafkaServers">localhost:9092</Property>
- </Properties>
- <Appenders>
- <!-- 定义 Kafka Appender -->
- <Kafka name="KafkaAppender" syncSend="false" topic="recomm-system-log">
- <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n" />
- <!-- 在此处配置 Kafka 的连接信息 -->
- <Property name="bootstrap.servers">${kafkaServers}</Property>
- </Kafka>
-
- <Console name="ConsoleAppender" target="SYSTEM_OUT">
- <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n" />
- </Console>
-
- </Appenders>
- <Loggers>
-
- <Root level="info">
- <!-- 将日志记录到控制台 Appender -->
- <AppenderRef ref="ConsoleAppender" />
- </Root>
-
- <Logger name="log2kafka.KafkaSender$">
- <Appender-ref ref="KafkaAppender"/>
- </Logger>
- </Loggers>
- </Configuration>
这里用的是 scala,如果是 java 基本大同小异
- package log2kafka
- import org.apache.logging.log4j.scala.Logging
-
- object KafkaSender extends Logging {
-
- def send(msg:Any): Unit ={
- logger.info(msg.toString)
- }
-
- def main(args: Array[String]): Unit = {
-
- logger.info("print msg to kafka")
-
- }
-
- }
kafka 命令行查看数据:
- (base) ➜ temp kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testlog
- 2023-08-30 19:42:05 INFO KafkaSender$:12 - print msg to kafka