Files
dragonpilot/common/swaglog.py
Comma Device 5c73e264e9 Release 260308
2026-03-08 23:26:57 +08:00

497 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
日志系统核心模块,提供以下功能:
模块职责:
1. 实现跨进程的日志收集和转发机制
2. 提供结构化日志记录能力
3. 统一管理控制台输出和日志服务端转发
设计目标:
- 解耦日志产生与写入操作
- 支持集中式日志管理通过logmessaged服务
- 提供模块化上下文追踪能力
使用示例:
1. 基础日志记录:
cloudlog.info("系统启动")
2. 带模块标识的日志:
cloudlog.warning("传感器异常", module="sensor")
3. 结构化日志:
cloudlog.error({
"event": "network_error",
"retry_count": 3,
"endpoint": "api/v1/connect"
})
主要组件及其职责:
1. UnixDomainSocketHandler - 基于ZMQ的日志转发处理器
▹ 实现IPC通信管理连接/重连机制)
▹ 处理多进程资源隔离PID检测
▹ 非阻塞式网络传输
2. SwagLogManager - 日志系统管理中枢
▹ 处理器配置(控制台/套接字)
▹ 动态日志方法包装debug/info/warning/error
▹ 调用栈元数据自动注入
3. SwagFormatter - 结构化日志格式化在logging_extra.py实现
▹ 上下文信息整合
▹ 自定义目录标记提取
▹ 多进程安全格式化
全局实例说明:
- log_manager: 单例日志管理器(线程安全初始化)
- cloudlog/log: 统一日志接口(支持上下文绑定)
数据流向:
应用模块 → SwagLogger → 控制台输出
↳→ UnixDomainSocketHandler → ZMQ IPC → logmessaged
↳→ 文件系统/网络存储
扩展性说明:
1. 新增日志处理器通过_setup_handlers()添加新Handler
2. 定制日志格式修改SwagFormatter实现
3. 调整日志级别通过DP参数或环境变量实时生效
"""
import time # 新增导入
import json
import logging
import os
import traceback
import warnings
import zmq
from pathlib import Path
from openpilot.common.logging_extra import (
SwagLogger, SwagFormatter, json_robust_dumps,CustomSwaglogRotatingFileHandler
)
from openpilot.common.params import Params
MEDIA_PATH = "/data/media/0/c2_logs/swaglog"
DEFAULT_PATH = "/data/log/"
SWAGLOG_DIR = MEDIA_PATH if os.path.exists("/data/media/0") else DEFAULT_PATH
# 日志滚动配置
LOG_CONFIG = {
'INTERVAL': 60,
'MAX_BYTES': 1024 * 128,
'BACKUP_COUNT': 2500,
'ENCODING': 'utf8',
'MAX_AGE': 4 * 24 * 3600, # 4天
'CLEAN_INTERVAL': 6 * 3600 # 6小时清理一次
}
def clean_old_logs():
"""清理过期日志文件"""
try:
current_time = time.time()
for log_file in Path(SWAGLOG_DIR).glob("*.log"):
if current_time - log_file.stat().st_mtime > LOG_CONFIG['MAX_AGE']:
log_file.unlink()
except Exception as e:
print(f"CleanLogError: {str(e)}")
def formatted_print(level, module, message):
"""使用统一格式打印消息"""
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
print(f"{current_time} | {level} | {module} | {message}")
class UnixDomainSocketHandler(logging.Handler):
"""Unix域套接字处理器用于日志转发"""
def __init__(self, formatter):
super().__init__()
self.setFormatter(formatter)
self.pid = None
self.zctx = None
self.sock = None
self.swaglogger = getattr(formatter, 'swaglogger', None) # 添加这行
def __del__(self):
if self.sock is not None:
self.sock.close()
if self.zctx is not None:
self.zctx.term()
def connect(self):
self.zctx = zmq.Context()
self.sock = self.zctx.socket(zmq.PUSH)
self.sock.setsockopt(zmq.LINGER, 10)
self.sock.connect("ipc:///tmp/logmessage")
self.pid = os.getpid()
def emit(self, record):
# 检查进程ID是否变化或未初始化如果是则重新连接
if self.pid is None or os.getpid() != self.pid:
warnings.filterwarnings("ignore", category=ResourceWarning, message="unclosed.*<zmq.*>")
try:
self.connect()
except zmq.error.ZMQError:
return
try:
# 获取模块名优先从record.module获取
module = getattr(record, 'module', 'unknown')
# 如果消息是字典且包含module字段使用该字段
if isinstance(record.msg, dict) and 'module' in record.msg:
module = record.msg['module']
# 简化消息结构
msg_content = record.msg
if isinstance(msg_content, dict):
msg_content = msg_content.get('msg', str(msg_content))
# 去除 ANSI 颜色代码
import re
msg_content = re.sub(r'\x1b\[[0-9;]*m', '', str(msg_content))
raw_data = {
'level': record.levelno,
'msg': msg_content, # 使用去除颜色代码后的消息
'module': module,
'timestamp': time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) # 使用本地时间
}
# 编码并发送消息
payload = b'\x01' + json_robust_dumps(raw_data).encode('utf-8')
self.sock.send(payload, zmq.NOBLOCK)
except zmq.error.Again:
pass # 队列满时正常忽略
except Exception as e:
# 简化异常处理,只记录错误类型和消息
print(f"LogEmitError({type(e).__name__}): {str(e)}")
def format(self, record):
try:
if isinstance(record.msg, dict):
msg = record.msg.copy()
else:
raw_msg = str(record.msg)
try:
msg = json.loads(raw_msg)
except json.JSONDecodeError:
msg = {'msg': raw_msg}
if self.swaglogger and isinstance(msg, dict):
ctx = self.swaglogger.get_ctx() or {}
msg = {**ctx, **msg}
# 提取实际消息内容
if isinstance(msg, dict):
msg_content = msg.get('msg', str(msg))
else:
msg_content = str(msg)
# 设置记录的消息为提取的内容
record.msg = msg_content
return super().format(record)
except Exception as e:
return f"FormatterError: {str(e)}"
# class SwaglogRotatingFileHandler(logging.handlers.BaseRotatingHandler):
# """滚动日志文件处理器,支持大小和时间触发滚动"""
# def __init__(self, base_filename, interval=60, max_bytes=1024*256, backup_count=2500, encoding=None, startup_time=None):
# super().__init__(base_filename, mode="a", encoding=encoding, delay=True)
# self.base_filename = base_filename
# self.interval = interval # 秒
# self.max_bytes = max_bytes
# self.backup_count = backup_count
# # 保存启动时间,如果未提供则使用当前时间
# self.startup_time = startup_time or time.strftime("%Y%m%d_%H%M%S")
# self.log_files = self.get_existing_logfiles()
# log_indexes = [f.split(".")[-1] for f in self.log_files]
# self.last_file_idx = max([int(i) for i in log_indexes if i.isdigit()] or [-1])
# self.last_rollover = None
# self.doRollover()
# def _open(self):
# self.last_rollover = time.time()
# self.last_file_idx += 1
# # 在文件名中添加启动时间
# next_filename = f"{self.base_filename}.{self.startup_time}.{self.last_file_idx:010}"
# stream = open(next_filename, self.mode, encoding=self.encoding)
# self.log_files.insert(0, next_filename)
# return stream
# def get_existing_logfiles(self):
# log_files = list()
# base_dir = os.path.dirname(self.base_filename)
# # 修改文件匹配逻辑,考虑启动时间
# for fn in os.listdir(base_dir):
# fp = os.path.join(base_dir, fn)
# if fp.startswith(self.base_filename) and os.path.isfile(fp):
# log_files.append(fp)
# return sorted(log_files)
# def shouldRollover(self, record):
# size_exceeded = self.max_bytes > 0 and self.stream.tell() >= self.max_bytes
# time_exceeded = self.interval > 0 and time.time() - self.last_rollover >= self.interval
# return size_exceeded or time_exceeded
# def doRollover(self):
# if self.stream:
# self.stream.close()
# self.stream = self._open()
# if self.backup_count > 0:
# while len(self.log_files) > self.backup_count:
# to_delete = self.log_files.pop()
# if os.path.exists(to_delete): # 安全检查
# os.remove(to_delete)
class AnsiColorStripFormatter(logging.Formatter):
"""去除ANSI颜色代码的格式化器"""
def __init__(self, orig_formatter):
super().__init__()
self.orig_formatter = orig_formatter
def format(self, record):
import re
# 先使用原始格式化器格式化
formatted = self.orig_formatter.format(record)
# 去除所有ANSI颜色代码
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
return ansi_escape.sub('', formatted)
def get_file_handler():
"""获取文件日志处理器"""
Path(SWAGLOG_DIR).mkdir(parents=True, exist_ok=True)
base_filename = os.path.join(SWAGLOG_DIR, "swaglog")
startup_time = time.strftime("%Y%m%d_%H%M%S")
handler = CustomSwaglogRotatingFileHandler( # 改用 CustomSwaglogRotatingFileHandler
base_filename,
interval=LOG_CONFIG['INTERVAL'],
max_bytes=LOG_CONFIG['MAX_BYTES'],
backup_count=LOG_CONFIG['BACKUP_COUNT'],
encoding=LOG_CONFIG['ENCODING']
)
return handler
def add_file_handler(log):
"""
添加文件日志处理器到swaglog
当logmessaged不运行时可用于存储日志
"""
handler = get_file_handler()
# 使用AnsiColorStripFormatter包装原始格式化器去除颜色代码
orig_formatter = SwagFormatter(log)
handler.setFormatter(AnsiColorStripFormatter(orig_formatter))
# 根据dp_log_level设置文件日志级别
params = Params()
dp_log_level = params.get("dp_log_level", encoding='utf8')
if dp_log_level is not None:
level_map = {
"0": logging.WARNING,
"1": logging.INFO,
"2": logging.DEBUG
}
handler.setLevel(level_map.get(dp_log_level, logging.INFO))
else:
handler.setLevel(logging.INFO)
log.addHandler(handler)
class SwagLogManager:
"""日志管理器,处理日志配置和格式化"""
_instance = None
_initialized = False
def __new__(cls):
if cls._instance is None:
cls._instance = super(SwagLogManager, cls).__new__(cls)
return cls._instance
def __init__(self):
# 确保初始化代码只执行一次
if SwagLogManager._initialized:
return
SwagLogManager._initialized = True
self.logger = SwagLogger()
self.logger.setLevel(logging.DEBUG)
# 清除所有现有处理器
self.logger.handlers.clear()
# 设置日志处理器
self._setup_handlers()
self._wrap_log_methods()
def _get_console_log_level(self):
"""获取控制台日志级别"""
params = Params()
dp_log_level = params.get("dp_log_level", encoding='utf8')
if dp_log_level is not None:
level_map = {"0": "warning", "1": "info", "2": "debug"}
print_level = level_map.get(dp_log_level, "warning")
else:
print_level = os.environ.get('LOGPRINT', 'warning')
return {
'debug': logging.DEBUG,
'info': logging.INFO,
'warning': logging.WARNING
}.get(print_level, logging.WARNING)
def _setup_handlers(self):
clean_old_logs() # 启动时清理
"""设置日志处理器"""
# 清除所有现有处理器
self.logger.handlers.clear()
# 使用控制台和文件处理器
console_handler = logging.StreamHandler()
console_handler.setLevel(self._get_console_log_level())
console_handler.setFormatter(SwagFormatter(swaglogger=self.logger))
self.logger.addHandler(console_handler)
# 根据logmessaged可用性决定使用哪个处理器
if self._is_logmessaged_available():
# 只使用socket_handler
socket_handler = UnixDomainSocketHandler(SwagFormatter(swaglogger=self.logger))
socket_handler.setLevel(logging.DEBUG)
self.logger.addHandler(socket_handler)
formatted_print("INFO", "swaglog", "logmessaged可用使用网络日志转发")
else:
add_file_handler(self.logger)
formatted_print("INFO", "swaglog", "logmessaged不可用已添加本地文件日志备份")
def _is_logmessaged_available(self):
"""检查logmessaged服务是否可用"""
try:
# 尝试连接logmessaged服务
test_ctx = zmq.Context()
test_sock = test_ctx.socket(zmq.PUSH)
test_sock.setsockopt(zmq.LINGER, 0) # 不等待,立即返回
test_sock.setsockopt(zmq.RCVTIMEO, 100) # 100ms超时
test_sock.connect("ipc:///tmp/logmessage")
# 尝试发送一个测试消息
test_data = {
'level': logging.INFO,
'msg': 'Testing logmessaged connection', # 直接使用字符串作为消息内容
'module': 'swaglog',
'timestamp': time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) # 使用本地时间
}
payload = b'\x01' + json_robust_dumps(test_data).encode('utf-8')
test_sock.send(payload, zmq.NOBLOCK)
# 清理资源
test_sock.close()
test_ctx.term()
return True
except zmq.error.Again:
# 队列满但服务可用
return True
except Exception:
# 连接失败,服务不可用
return False
def _get_caller_info(self):
"""获取调用者信息 - 简化版"""
try:
for frame in traceback.extract_stack()[-5:-1]: # 限制搜索范围
if not frame.filename.endswith('swaglog.py'):
return {'file': frame.filename, 'line': frame.lineno}
except:
pass
return None
def _create_log_record(self, level_name, formatted_msg, current_module):
"""创建日志记录 - 简化版"""
return logging.LogRecord(
name=current_module or 'unknown',
level=logging.getLevelName(level_name),
pathname='', # 简化,不需要完整路径
lineno=0,
msg=formatted_msg,
args=(),
exc_info=None,
func=None
)
def _wrap_log_method(self, original_method, level_name):
def wrapped_method(msg, *args, **kwargs):
if not msg:
return None
try:
# 提取模块名
module = kwargs.pop('module', None)
if not module:
for frame in traceback.extract_stack()[-5:-1]:
if not frame.filename.endswith('swaglog.py'):
module = Path(frame.filename).stem
break
# 确保模块名不为空
if not module:
module = 'unknown'
# 简化消息处理
if isinstance(msg, dict):
msg_payload = msg.copy() # 创建副本避免修改原始数据
# 确保消息中包含模块名
if 'module' not in msg_payload:
msg_payload['module'] = module
else:
msg_str = str(msg)
if args:
try:
msg_str = msg_str % args
except:
pass
msg_payload = {'msg': msg_str, 'module': module}
# 检查是否为启动日志,如果是则强制打印到控制台
is_startup_log = False
if isinstance(msg_payload.get('msg'), str):
msg_content = msg_payload.get('msg', '')
is_startup_log = '启动' in msg_content or 'start' in msg_content.lower()
# 创建并处理记录
record = self._create_log_record(level_name, msg_payload, module)
# 确保记录对象有module属性
record.module = module
# 确保raw_msg属性存在供SwagFormatter使用
record.raw_msg = msg_payload
# 对于启动日志,如果当前日志级别不足以显示,则使用格式化器格式化后打印
if is_startup_log and level_name in ('INFO', 'DEBUG') and self._get_console_log_level() > logging.INFO:
# 使用与SwagFormatter一致的格式化方式
formatter = SwagFormatter(swaglogger=self.logger)
formatted_msg = formatter.format(record)
print(formatted_msg)
else:
self.logger.handle(record)
except Exception as e:
formatted_print("ERROR", "swaglog", f"LogError: {e}")
return None
return wrapped_method
def _wrap_log_methods(self):
"""包装所有日志方法"""
original_methods = {
'debug': self.logger.debug,
'info': self.logger.info,
'warning': self.logger.warning,
'error': self.logger.error
}
for level, method in original_methods.items():
setattr(self.logger, level, self._wrap_log_method(method, level.upper()))
# 创建全局日志实例
log_manager = SwagLogManager()
cloudlog = log = log_manager.logger