效果如下
封装如下
import os
import datetime
from loguru import logger
from config import BASE_DIR
class Logings:
__logger = dict()
def __init__(self, file_name='info'):
self.file_name = file_name
if Logings.__logger.__contains__(file_name):
self.logger = Logings.__logger[file_name][1]
return
date = datetime.datetime.now().strftime('%Y-%m-%d')
self.__add_logger(date)
# def __new__(cls, *args, **kwargs):
# if not cls.__instance:
# cls.__instance = super(Logings, cls).__new__(cls, *args, **kwargs)
# return cls.__instance
def __add_logger(self, date):
# 文件名称,按天创建 项目路径下创建logs/{date} 目录保存日志文件
logpath = f'{BASE_DIR}/logs/{date}'
# 判断目录是否存在,不存在则创建新的目录
if not os.path.isdir(logpath): os.makedirs(logpath)
handler_id =logger.add(f'{logpath}/{self.file_name}.log', # 指定文件
format="{time} | {level}| {message}",
# format="{time:YYYY-MM-DD HH:mm:ss} | {level}> {elapsed} | {message}",
encoding='utf-8',
retention='7 days', # 设置历史保留时长
backtrace=True, # 回溯
diagnose=True, # 诊断
enqueue=True, # 异步写入
rotation="1024kb", # rotation="5kb" # 切割,设置文件大小,rotation="12:00"
filter=lambda record: record["extra"].get("name") == self.file_name # 过滤模块
# compression="zip" # 文件压缩
)
self.logger = logger.bind(name=self.file_name)
Logings.__logger[self.file_name] = (handler_id, self.logger, date)
def __check_update_handler(self):
if Logings.__logger.__contains__(self.file_name):
date = datetime.datetime.now().strftime('%Y-%m-%d')
if date > Logings.__logger[self.file_name][2]:
logger.remove(Logings.__logger[self.file_name][0])
self.__add_logger(date)
def info(self, msg, *args, **kwargs):
self.__check_update_handler()
return self.logger.info(msg, *args, **kwargs)
def debug(self, msg, *args, **kwargs):
self.__check_update_handler()
return self.logger.debug(msg, *args, **kwargs)
def warning(self, msg, *args, **kwargs):
self.__check_update_handler()
return self.logger.warning(msg, *args, **kwargs)
def error(self, msg, *args, **kwargs):
self.__check_update_handler()
return self.logger.error(msg, *args, **kwargs)
def exception(self, msg, *args, exc_info=True, **kwargs):
self.__check_update_handler()
return self.logger.exception(msg, *args, exc_info=True, **kwargs)
info_log = Logings()
error_log = Logings('error')
req_log = Logings('request')
class TaskDispatch(object):
sc_type = None
req_data = dict()
req_header = None
logger = None
def __init__(self, **kwargs):
self.logger = Logings(self.__class__.__name__)