Files
dragonpilot/common/logging_extra.py
Comma Device 5c73e264e9 Release 260308
2026-03-08 23:26:57 +08:00

314 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
增强日志系统模块,提供以下核心功能:
类构成:
1. SwagLogger - 扩展的标准日志记录器
- bind()/bind_global():上下文变量绑定
- event():结构化事件记录
- ctx():上下文管理器
- timestamp():高精度时间戳记录
- 支持模块名自动注入
- 提供多级别日志方法info/error/warning/debug
2. SwagFormatter - 增强型日志格式化器
- 支持上下文变量注入
- 异常信息自动格式化
- 多数据类型处理(字典/字符串/异常对象)
- 统一日志格式:时间 | 级别 | 模块 | 消息
3. SwaglogRotatingFileHandler - 增强型滚动日志处理器
- 自定义日志滚动策略
- 支持UTF-8编码
- 自动处理日志文件备份
辅助功能:
- JSON安全序列化json_robust_dumps支持复杂对象序列化
- 线程安全的上下文管理
- 日志文件自动清理
"""
import datetime
import os
import time # 新增导入
import json
import logging
from threading import local
from collections import OrderedDict
from logging.handlers import RotatingFileHandler
from datetime import timezone
class SwagLogger(logging.Logger):
def __init__(self):
logging.Logger.__init__(self, "swaglog")
self.global_ctx = {}
self.log_local = local()
self.log_local.ctx = {}
def get_ctx(self):
if not hasattr(self.log_local, 'ctx'):
self.log_local.ctx = {}
return {**self.log_local.ctx, **self.global_ctx}
def bind(self, **kwargs):
if not hasattr(self.log_local, 'ctx'):
self.log_local.ctx = {}
self.log_local.ctx.update(kwargs)
def bind_global(self, **kwargs):
self.global_ctx.update(kwargs)
def event(self, name, **kwargs):
"""记录事件日志"""
try:
event_data = {"event": name}
event_data.update(kwargs)
if 'error' in kwargs:
self.error(event_data)
else:
self.info(event_data)
except Exception as e:
self.error(f"Failed to log event {name}: {str(e)}")
def ctx(self):
from contextlib import contextmanager
import copy
@contextmanager
def _ctx():
old_ctx = getattr(self.log_local, 'ctx', {})
self.log_local.ctx = copy.copy(old_ctx) or {}
try:
yield
finally:
self.log_local.ctx = old_ctx
return _ctx()
def timestamp(self, event_name):
if "LOG_TIMESTAMPS" in os.environ:
t = time.monotonic()
tstp = {"timestamp": {"event": event_name, "time": t*1e9}}
self.debug(tstp)
def _log(self, level, msg, args, exc_info=None, extra=None, stack_info=False,
stacklevel=1, module_name=None):
if module_name:
self.bind(module=module_name)
if isinstance(msg, str) and module_name:
msg_dict = {
"msg": msg,
"module": module_name
}
msg = msg_dict
args = ()
return super()._log(level, msg, args, exc_info, extra, stack_info, stacklevel)
def info(self, msg, *args, **kwargs):
return self._log(logging.INFO, msg, args, **kwargs)
def error(self, msg, *args, **kwargs):
return self._log(logging.ERROR, msg, args, **kwargs)
def warning(self, msg, *args, **kwargs):
return self._log(logging.WARNING, msg, args, **kwargs)
def debug(self, msg, *args, **kwargs):
return self._log(logging.DEBUG, msg, args, **kwargs)
class SwagFormatter(logging.Formatter):
def __init__(self, swaglogger=None):
super().__init__()
self.swaglogger = swaglogger
def format(self, record):
try:
# 简化消息处理逻辑
if isinstance(record.msg, dict):
msg = record.msg
else:
msg = {'msg': str(record.msg)}
# 只在必要时合并上下文
if self.swaglogger:
ctx = self.swaglogger.get_ctx()
if ctx and isinstance(msg, dict):
msg = {**ctx, **msg}
# 提取消息内容
msg_content = msg.get('msg', str(msg))
record.msg = msg_content
record.raw_msg = msg
# 简化时间戳格式化
current_time = time.localtime()
microsecond = int(time.time() * 1000) % 1000
timestamp = f"{time.strftime('%Y-%m-%d %H:%M:%S', current_time)}.{microsecond:03d}"
level_name = logging.getLevelName(record.levelno)
module = getattr(record, 'module', 'unknown')
return f"{timestamp} | {level_name} | {module} | {msg_content}"
except Exception as e:
return f"FormatterError: {str(e)}"
class SwaglogRotatingFileHandler(RotatingFileHandler):
def __init__(self, filename, max_bytes=0, backup_count=0, interval=None, encoding='utf8', startup_time=None):
super().__init__(
filename,
mode='a',
maxBytes=max_bytes,
backupCount=backup_count,
encoding=encoding,
delay=True
)
self.interval = interval
self.last_rollover = time.time()
self.process_count = 0
self.startup_time = startup_time or time.strftime("%Y%m%d_%H%M%S")
# 解析基础文件名
base_name = os.path.basename(filename)
parts = base_name.split('.')
self.base_name = parts[0]
self.hex_ts = parts[1] if len(parts) > 1 else ''
self.rollover_count = 0
def get_next_filename(self):
"""生成下一个日志文件名"""
timestamp = time.strftime("%Y%m%d_%H%M%S")
return os.path.join(
os.path.dirname(self.baseFilename),
f"{self.base_name}.{self.hex_ts}.{timestamp}.{self.rollover_count:03d}.log"
)
def rotation_filename(self, default_name):
"""重写文件名生成方法"""
return self.get_next_filename()
def doRollover(self):
"""执行日志滚动"""
if self.stream:
self.stream.close()
self.stream = None
next_name = self.get_next_filename()
while os.path.exists(next_name) and self.rollover_count < 1000:
self.rollover_count += 1
next_name = self.get_next_filename()
if self.rollover_count >= 1000:
raise RuntimeError("Rollover count exceeded maximum limit")
self.baseFilename = next_name
self.rollover_count += 1
self.last_rollover = time.time()
self.stream = self._open()
class CustomSwaglogRotatingFileHandler(SwaglogRotatingFileHandler):
def __init__(self, filename, max_bytes=0, backup_count=0, interval=None, encoding='utf8'):
super().__init__(filename, max_bytes, backup_count, interval, encoding)
self.process_count = 0
base_name = os.path.basename(filename)
parts = base_name.split('.')
self.base_name = parts[0]
self.hex_ts = parts[1] if len(parts) > 1 else hex(int(time.time()))[2:]
self.rollover_count = 0
self.current_size = 0
self.has_content = False
self._pending_file = None
self.stream = None # 初始化时不创建文件
def _get_next_filename(self):
"""生成下一个日志文件名"""
timestamp = time.strftime("%Y%m%d_%H%M%S")
next_filename = os.path.join(
os.path.dirname(self.baseFilename),
f"{self.base_name}.{self.hex_ts}.{timestamp}.{self.rollover_count:03d}.log"
)
while os.path.exists(next_filename) and self.rollover_count < 1000:
self.rollover_count += 1
next_filename = os.path.join(
os.path.dirname(self.baseFilename),
f"{self.base_name}.{self.hex_ts}.{timestamp}.{self.rollover_count:03d}.log"
)
if self.rollover_count >= 1000:
raise RuntimeError("Rollover count exceeded maximum limit")
return next_filename
def shouldRollover(self, record):
"""检查是否应该滚动日志文件"""
if not self.has_content:
return False
if self.stream is None: # 如果文件未创建,不需要滚动
return False
size_exceeded = self.maxBytes > 0 and self.current_size >= self.maxBytes
time_exceeded = self.interval and time.time() - self.last_rollover >= self.interval
return size_exceeded or time_exceeded
def emit(self, record):
"""确保只写入有效内容"""
if record and record.msg:
msg = self.format(record)
if msg:
try:
# 第一次写入时才创建文件
if self.stream is None:
self._pending_file = self._get_next_filename()
self.stream = open(self._pending_file, self.mode, encoding=self.encoding)
self.baseFilename = self._pending_file
self.last_rollover = time.time()
self.stream.write(msg + '\n')
self.stream.flush()
self.current_size += len(msg) + 1
self.has_content = True
self.process_count += 1
# 使用 shouldRollover 判断是否需要滚动
if self.shouldRollover(record):
self.doRollover()
except Exception as e:
self.handleError(record)
def doRollover(self):
if not self.has_content: # 如果当前文件没有内容,不执行滚动
return
if self.stream:
self.stream.close()
self.stream = None
self._pending_file = self._get_next_filename()
self.rollover_count += 1
self.current_size = 0
self.has_content = False
def json_robust_dumps(obj):
"""增强型JSON序列化"""
def handler(o): # ✅ 修正内部函数参数名冲突问题
if isinstance(o, (datetime.datetime, datetime.date)): # 参数重命名为 o
return o.isoformat()
if hasattr(o, '__dict__'):
return vars(o)
return repr(o)
return json.dumps(obj, default=handler, ensure_ascii=False)
class NiceOrderedDict(OrderedDict):
def __str__(self):
return json_robust_dumps(self)
if __name__ == "__main__":
log = SwagLogger()
console = logging.StreamHandler()
console.setFormatter(SwagFormatter(log))
log.addHandler(console)
log.info("测试日志")
log.bind(module="test")
log.info("带模块的测试日志")
log.error("错误日志测试")