前言
原本应用的日志是全部输出到os的stdout,也就是控制台输出。因其它团队要求也要保留日志文件,便于他们用其他工具统一采集,另一方面还要保留控制台输出,便于出问题的时候自己直接看pod日志。具体需求如下:
- 日志支持同时控制台输出和文件输出
- 控制台的输出级别可以高点,比如WARNING,个人这边的实际情况是WARNING或ERROR就能判断大部分问题。日志文件的输出级别设置为INFO,如果控制台日志找不到问题,可以具体看日志文件的内容。
- 因为用到了多进程,所以写文件的时候要保证多进程安全,避免日志内容不会缺失。
- 日志文件可以设置自动分割,避免长时间不清理导致硬盘存储资源浪费。
因为不允许随便使用第三方包,所以只能用标准库的logging。一开始想的方法比较挫——对文件加锁,但改来改去发现根本不能给别人review。翻python官方文档的时候发现logging库有个QueueHandler
和QueueListener
,简单试了下感觉逻辑还算清楚,遂简单整理了下代码。
示例代码
目录结构如下,main.py是入口脚本,logs目录和app.log将有程序运行时自动生成,主要日志功能放在pkg/log.py
文件中。pkg/__init__.py
为空文件,仅用于标识为python包。
.
├── main.py
├── logs
│ └── app.log
└── pkg
├── __init__.py
└── log.py
pkg/log.py
内容如下,主要提供logger
已经配置好的日志对象,该对象先将日志记录到QueueHandler,然后QueueListener从队列中取日志,并分别输出到控制台和日志文件中。close_log_queue()
方法将在主进程结束时调用。
import logging
from logging.handlers import TimedRotatingFileHandler, QueueHandler, QueueListener
import sys
import os
# from queue import Queue
from multiprocessing import Queue
log_queue = Queue(-1)
queue_listener = ""
logdir = "logs"
logfile = f"{logdir}/app.log"
if not os.path.exists(logdir):
os.makedirs(logdir, exist_ok=True)
def set_formatter():
"""设置日志格式化器"""
fmt = "%(asctime)s | %(levelname)s | %(name)s | %(filename)s:%(lineno)d | %(funcName)s | %(message)s"
datefmt = "%Y-%m-%d %H:%M:%S"
return logging.Formatter(fmt, datefmt=datefmt)
def set_queue_handler():
# 不要给QueueHandler重复设置formatter, 会引起重复嵌套
handler = QueueHandler(log_queue)
handler.setLevel(logging.INFO)
return handler
def set_stream_handler(formatter: logging.Formatter):
# 输出到控制台的日志处理器
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.WARNING)
handler.setFormatter(formatter)
return handler
def set_timed_rotating_file_handler(formatter: logging.Formatter):
# 输出到文件的日志处理器, 每天生成一个新文件, 最多保留10个文件
handler = TimedRotatingFileHandler(logfile, when="midnight", backupCount=10, encoding="utf-8")
handler.setLevel(logging.INFO)
handler.setFormatter(formatter)
return handler
def close_log_queue():
# 关闭队列监听器
global queue_listener
if queue_listener:
queue_listener.stop()
def get_logger(name: str = "mylogger", level: int = logging.INFO):
logger = logging.getLogger(name)
logger.setLevel(level)
formatter = set_formatter()
stream_handler = set_stream_handler(formatter)
file_handler = set_timed_rotating_file_handler(formatter)
queue_handler = set_queue_handler()
logger.addHandler(queue_handler)
global queue_listener
if not queue_listener:
queue_listener = QueueListener(log_queue, stream_handler, file_handler, respect_handler_level=True)
queue_listener.start()
return logger
logger = get_logger()
if __name__ == "__main__":
logger.info("test")
close_log_queue()
main.py
内容如下,主要是创建子进程调用logger,观察日志输出是否正常。
from multiprocessing import Process
from pkg.log import logger, close_log_queue
import os
class MyProcess(Process):
def __init__(self, delay):
self.delay = delay
super().__init__()
def run(self):
for i in range(self.delay):
logger.info(f"pid: {os.getpid()}, {i}")
if __name__ == '__main__':
logger.info(f"main process pid: {os.getpid()}")
for i in range(10):
p = MyProcess(10000)
p.start()
p.join()
logger.info("main process end")
close_log_queue()
执行输出大致如下所示:
$ tail logs/app.log
2024-01-22 23:10:17 | INFO | mylogger | main.py:12 | run | pid: 7908, 1
2024-01-22 23:10:17 | INFO | mylogger | main.py:12 | run | pid: 7908, 2
2024-01-22 23:10:17 | INFO | mylogger | main.py:12 | run | pid: 7908, 3
2024-01-22 23:10:17 | INFO | mylogger | main.py:12 | run | pid: 7908, 4
2024-01-22 23:10:17 | INFO | mylogger | main.py:12 | run | pid: 7908, 5
2024-01-22 23:10:17 | INFO | mylogger | main.py:12 | run | pid: 7908, 6
2024-01-22 23:10:17 | INFO | mylogger | main.py:12 | run | pid: 7908, 7
2024-01-22 23:10:17 | INFO | mylogger | main.py:12 | run | pid: 7908, 8
2024-01-22 23:10:17 | INFO | mylogger | main.py:12 | run | pid: 7908, 9
2024-01-22 23:10:17 | INFO | mylogger | main.py:21 | | main process end
补充
logging还内置很多其它handler,比如按文件大小自动切割,日志通过HTTP请求输出,日志输出到syslog等,可按照自己需求进行定制。