497 lines
18 KiB
Python
497 lines
18 KiB
Python
"""
|
||
日志系统核心模块,提供以下功能:
|
||
|
||
模块职责:
|
||
1. 实现跨进程的日志收集和转发机制
|
||
2. 提供结构化日志记录能力
|
||
3. 统一管理控制台输出和日志服务端转发
|
||
|
||
设计目标:
|
||
- 解耦日志产生与写入操作
|
||
- 支持集中式日志管理(通过logmessaged服务)
|
||
- 提供模块化上下文追踪能力
|
||
|
||
使用示例:
|
||
1. 基础日志记录:
|
||
cloudlog.info("系统启动")
|
||
|
||
2. 带模块标识的日志:
|
||
cloudlog.warning("传感器异常", module="sensor")
|
||
|
||
3. 结构化日志:
|
||
cloudlog.error({
|
||
"event": "network_error",
|
||
"retry_count": 3,
|
||
"endpoint": "api/v1/connect"
|
||
})
|
||
|
||
主要组件及其职责:
|
||
1. UnixDomainSocketHandler - 基于ZMQ的日志转发处理器
|
||
▹ 实现IPC通信管理(连接/重连机制)
|
||
▹ 处理多进程资源隔离(PID检测)
|
||
▹ 非阻塞式网络传输
|
||
|
||
2. SwagLogManager - 日志系统管理中枢
|
||
▹ 处理器配置(控制台/套接字)
|
||
▹ 动态日志方法包装(debug/info/warning/error)
|
||
▹ 调用栈元数据自动注入
|
||
|
||
3. SwagFormatter - 结构化日志格式化(在logging_extra.py实现)
|
||
▹ 上下文信息整合
|
||
▹ 自定义目录标记提取
|
||
▹ 多进程安全格式化
|
||
|
||
全局实例说明:
|
||
- log_manager: 单例日志管理器(线程安全初始化)
|
||
- cloudlog/log: 统一日志接口(支持上下文绑定)
|
||
|
||
数据流向:
|
||
应用模块 → SwagLogger → 控制台输出
|
||
↳→ UnixDomainSocketHandler → ZMQ IPC → logmessaged
|
||
↳→ 文件系统/网络存储
|
||
|
||
扩展性说明:
|
||
1. 新增日志处理器:通过_setup_handlers()添加新Handler
|
||
2. 定制日志格式:修改SwagFormatter实现
|
||
3. 调整日志级别:通过DP参数或环境变量实时生效
|
||
"""
|
||
import time # 新增导入
|
||
import json
|
||
import logging
|
||
import os
|
||
import traceback
|
||
import warnings
|
||
import zmq
|
||
from pathlib import Path
|
||
from openpilot.common.logging_extra import (
|
||
SwagLogger, SwagFormatter, json_robust_dumps,CustomSwaglogRotatingFileHandler
|
||
)
|
||
from openpilot.common.params import Params
|
||
|
||
MEDIA_PATH = "/data/media/0/c2_logs/swaglog"
|
||
DEFAULT_PATH = "/data/log/"
|
||
SWAGLOG_DIR = MEDIA_PATH if os.path.exists("/data/media/0") else DEFAULT_PATH
|
||
|
||
# 日志滚动配置
|
||
LOG_CONFIG = {
|
||
'INTERVAL': 60,
|
||
'MAX_BYTES': 1024 * 128,
|
||
'BACKUP_COUNT': 2500,
|
||
'ENCODING': 'utf8',
|
||
'MAX_AGE': 4 * 24 * 3600, # 4天
|
||
'CLEAN_INTERVAL': 6 * 3600 # 6小时清理一次
|
||
}
|
||
|
||
def clean_old_logs():
|
||
"""清理过期日志文件"""
|
||
try:
|
||
current_time = time.time()
|
||
for log_file in Path(SWAGLOG_DIR).glob("*.log"):
|
||
if current_time - log_file.stat().st_mtime > LOG_CONFIG['MAX_AGE']:
|
||
log_file.unlink()
|
||
except Exception as e:
|
||
print(f"CleanLogError: {str(e)}")
|
||
|
||
def formatted_print(level, module, message):
|
||
"""使用统一格式打印消息"""
|
||
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
|
||
print(f"{current_time} | {level} | {module} | {message}")
|
||
|
||
|
||
class UnixDomainSocketHandler(logging.Handler):
|
||
"""Unix域套接字处理器,用于日志转发"""
|
||
def __init__(self, formatter):
|
||
super().__init__()
|
||
self.setFormatter(formatter)
|
||
self.pid = None
|
||
self.zctx = None
|
||
self.sock = None
|
||
self.swaglogger = getattr(formatter, 'swaglogger', None) # 添加这行
|
||
|
||
def __del__(self):
|
||
if self.sock is not None:
|
||
self.sock.close()
|
||
if self.zctx is not None:
|
||
self.zctx.term()
|
||
|
||
def connect(self):
|
||
self.zctx = zmq.Context()
|
||
self.sock = self.zctx.socket(zmq.PUSH)
|
||
self.sock.setsockopt(zmq.LINGER, 10)
|
||
self.sock.connect("ipc:///tmp/logmessage")
|
||
self.pid = os.getpid()
|
||
|
||
def emit(self, record):
|
||
# 检查进程ID是否变化或未初始化,如果是则重新连接
|
||
if self.pid is None or os.getpid() != self.pid:
|
||
warnings.filterwarnings("ignore", category=ResourceWarning, message="unclosed.*<zmq.*>")
|
||
try:
|
||
self.connect()
|
||
except zmq.error.ZMQError:
|
||
return
|
||
|
||
try:
|
||
# 获取模块名,优先从record.module获取
|
||
module = getattr(record, 'module', 'unknown')
|
||
|
||
# 如果消息是字典且包含module字段,使用该字段
|
||
if isinstance(record.msg, dict) and 'module' in record.msg:
|
||
module = record.msg['module']
|
||
|
||
# 简化消息结构
|
||
msg_content = record.msg
|
||
if isinstance(msg_content, dict):
|
||
msg_content = msg_content.get('msg', str(msg_content))
|
||
|
||
# 去除 ANSI 颜色代码
|
||
import re
|
||
msg_content = re.sub(r'\x1b\[[0-9;]*m', '', str(msg_content))
|
||
|
||
raw_data = {
|
||
'level': record.levelno,
|
||
'msg': msg_content, # 使用去除颜色代码后的消息
|
||
'module': module,
|
||
'timestamp': time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) # 使用本地时间
|
||
}
|
||
|
||
# 编码并发送消息
|
||
payload = b'\x01' + json_robust_dumps(raw_data).encode('utf-8')
|
||
self.sock.send(payload, zmq.NOBLOCK)
|
||
except zmq.error.Again:
|
||
pass # 队列满时正常忽略
|
||
except Exception as e:
|
||
# 简化异常处理,只记录错误类型和消息
|
||
print(f"LogEmitError({type(e).__name__}): {str(e)}")
|
||
|
||
def format(self, record):
|
||
try:
|
||
if isinstance(record.msg, dict):
|
||
msg = record.msg.copy()
|
||
else:
|
||
raw_msg = str(record.msg)
|
||
try:
|
||
msg = json.loads(raw_msg)
|
||
except json.JSONDecodeError:
|
||
msg = {'msg': raw_msg}
|
||
|
||
if self.swaglogger and isinstance(msg, dict):
|
||
ctx = self.swaglogger.get_ctx() or {}
|
||
msg = {**ctx, **msg}
|
||
|
||
# 提取实际消息内容
|
||
if isinstance(msg, dict):
|
||
msg_content = msg.get('msg', str(msg))
|
||
else:
|
||
msg_content = str(msg)
|
||
|
||
# 设置记录的消息为提取的内容
|
||
record.msg = msg_content
|
||
|
||
return super().format(record)
|
||
except Exception as e:
|
||
return f"FormatterError: {str(e)}"
|
||
|
||
# class SwaglogRotatingFileHandler(logging.handlers.BaseRotatingHandler):
|
||
# """滚动日志文件处理器,支持大小和时间触发滚动"""
|
||
# def __init__(self, base_filename, interval=60, max_bytes=1024*256, backup_count=2500, encoding=None, startup_time=None):
|
||
# super().__init__(base_filename, mode="a", encoding=encoding, delay=True)
|
||
# self.base_filename = base_filename
|
||
# self.interval = interval # 秒
|
||
# self.max_bytes = max_bytes
|
||
# self.backup_count = backup_count
|
||
# # 保存启动时间,如果未提供则使用当前时间
|
||
# self.startup_time = startup_time or time.strftime("%Y%m%d_%H%M%S")
|
||
# self.log_files = self.get_existing_logfiles()
|
||
# log_indexes = [f.split(".")[-1] for f in self.log_files]
|
||
# self.last_file_idx = max([int(i) for i in log_indexes if i.isdigit()] or [-1])
|
||
# self.last_rollover = None
|
||
# self.doRollover()
|
||
|
||
# def _open(self):
|
||
# self.last_rollover = time.time()
|
||
# self.last_file_idx += 1
|
||
# # 在文件名中添加启动时间
|
||
# next_filename = f"{self.base_filename}.{self.startup_time}.{self.last_file_idx:010}"
|
||
# stream = open(next_filename, self.mode, encoding=self.encoding)
|
||
# self.log_files.insert(0, next_filename)
|
||
# return stream
|
||
|
||
# def get_existing_logfiles(self):
|
||
# log_files = list()
|
||
# base_dir = os.path.dirname(self.base_filename)
|
||
# # 修改文件匹配逻辑,考虑启动时间
|
||
# for fn in os.listdir(base_dir):
|
||
# fp = os.path.join(base_dir, fn)
|
||
# if fp.startswith(self.base_filename) and os.path.isfile(fp):
|
||
# log_files.append(fp)
|
||
# return sorted(log_files)
|
||
|
||
# def shouldRollover(self, record):
|
||
# size_exceeded = self.max_bytes > 0 and self.stream.tell() >= self.max_bytes
|
||
# time_exceeded = self.interval > 0 and time.time() - self.last_rollover >= self.interval
|
||
# return size_exceeded or time_exceeded
|
||
|
||
# def doRollover(self):
|
||
# if self.stream:
|
||
# self.stream.close()
|
||
# self.stream = self._open()
|
||
|
||
# if self.backup_count > 0:
|
||
# while len(self.log_files) > self.backup_count:
|
||
# to_delete = self.log_files.pop()
|
||
# if os.path.exists(to_delete): # 安全检查
|
||
# os.remove(to_delete)
|
||
|
||
class AnsiColorStripFormatter(logging.Formatter):
|
||
"""去除ANSI颜色代码的格式化器"""
|
||
def __init__(self, orig_formatter):
|
||
super().__init__()
|
||
self.orig_formatter = orig_formatter
|
||
|
||
def format(self, record):
|
||
import re
|
||
# 先使用原始格式化器格式化
|
||
formatted = self.orig_formatter.format(record)
|
||
# 去除所有ANSI颜色代码
|
||
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
|
||
return ansi_escape.sub('', formatted)
|
||
|
||
def get_file_handler():
|
||
"""获取文件日志处理器"""
|
||
Path(SWAGLOG_DIR).mkdir(parents=True, exist_ok=True)
|
||
base_filename = os.path.join(SWAGLOG_DIR, "swaglog")
|
||
startup_time = time.strftime("%Y%m%d_%H%M%S")
|
||
|
||
handler = CustomSwaglogRotatingFileHandler( # 改用 CustomSwaglogRotatingFileHandler
|
||
base_filename,
|
||
interval=LOG_CONFIG['INTERVAL'],
|
||
max_bytes=LOG_CONFIG['MAX_BYTES'],
|
||
backup_count=LOG_CONFIG['BACKUP_COUNT'],
|
||
encoding=LOG_CONFIG['ENCODING']
|
||
)
|
||
return handler
|
||
|
||
def add_file_handler(log):
|
||
"""
|
||
添加文件日志处理器到swaglog
|
||
当logmessaged不运行时可用于存储日志
|
||
"""
|
||
handler = get_file_handler()
|
||
# 使用AnsiColorStripFormatter包装原始格式化器,去除颜色代码
|
||
orig_formatter = SwagFormatter(log)
|
||
handler.setFormatter(AnsiColorStripFormatter(orig_formatter))
|
||
|
||
# 根据dp_log_level设置文件日志级别
|
||
params = Params()
|
||
dp_log_level = params.get("dp_log_level", encoding='utf8')
|
||
if dp_log_level is not None:
|
||
level_map = {
|
||
"0": logging.WARNING,
|
||
"1": logging.INFO,
|
||
"2": logging.DEBUG
|
||
}
|
||
handler.setLevel(level_map.get(dp_log_level, logging.INFO))
|
||
else:
|
||
handler.setLevel(logging.INFO)
|
||
|
||
log.addHandler(handler)
|
||
|
||
class SwagLogManager:
|
||
"""日志管理器,处理日志配置和格式化"""
|
||
_instance = None
|
||
_initialized = False
|
||
|
||
def __new__(cls):
|
||
if cls._instance is None:
|
||
cls._instance = super(SwagLogManager, cls).__new__(cls)
|
||
return cls._instance
|
||
|
||
def __init__(self):
|
||
# 确保初始化代码只执行一次
|
||
if SwagLogManager._initialized:
|
||
return
|
||
SwagLogManager._initialized = True
|
||
|
||
self.logger = SwagLogger()
|
||
self.logger.setLevel(logging.DEBUG)
|
||
|
||
# 清除所有现有处理器
|
||
self.logger.handlers.clear()
|
||
|
||
# 设置日志处理器
|
||
self._setup_handlers()
|
||
self._wrap_log_methods()
|
||
|
||
def _get_console_log_level(self):
|
||
"""获取控制台日志级别"""
|
||
params = Params()
|
||
dp_log_level = params.get("dp_log_level", encoding='utf8')
|
||
|
||
if dp_log_level is not None:
|
||
level_map = {"0": "warning", "1": "info", "2": "debug"}
|
||
print_level = level_map.get(dp_log_level, "warning")
|
||
else:
|
||
print_level = os.environ.get('LOGPRINT', 'warning')
|
||
|
||
return {
|
||
'debug': logging.DEBUG,
|
||
'info': logging.INFO,
|
||
'warning': logging.WARNING
|
||
}.get(print_level, logging.WARNING)
|
||
|
||
def _setup_handlers(self):
|
||
clean_old_logs() # 启动时清理
|
||
"""设置日志处理器"""
|
||
# 清除所有现有处理器
|
||
self.logger.handlers.clear()
|
||
|
||
# 使用控制台和文件处理器
|
||
console_handler = logging.StreamHandler()
|
||
console_handler.setLevel(self._get_console_log_level())
|
||
console_handler.setFormatter(SwagFormatter(swaglogger=self.logger))
|
||
self.logger.addHandler(console_handler)
|
||
|
||
|
||
# 根据logmessaged可用性决定使用哪个处理器
|
||
if self._is_logmessaged_available():
|
||
# 只使用socket_handler
|
||
socket_handler = UnixDomainSocketHandler(SwagFormatter(swaglogger=self.logger))
|
||
socket_handler.setLevel(logging.DEBUG)
|
||
self.logger.addHandler(socket_handler)
|
||
formatted_print("INFO", "swaglog", "logmessaged可用,使用网络日志转发")
|
||
else:
|
||
add_file_handler(self.logger)
|
||
formatted_print("INFO", "swaglog", "logmessaged不可用,已添加本地文件日志备份")
|
||
|
||
def _is_logmessaged_available(self):
|
||
"""检查logmessaged服务是否可用"""
|
||
try:
|
||
# 尝试连接logmessaged服务
|
||
test_ctx = zmq.Context()
|
||
test_sock = test_ctx.socket(zmq.PUSH)
|
||
test_sock.setsockopt(zmq.LINGER, 0) # 不等待,立即返回
|
||
test_sock.setsockopt(zmq.RCVTIMEO, 100) # 100ms超时
|
||
test_sock.connect("ipc:///tmp/logmessage")
|
||
|
||
# 尝试发送一个测试消息
|
||
test_data = {
|
||
'level': logging.INFO,
|
||
'msg': 'Testing logmessaged connection', # 直接使用字符串作为消息内容
|
||
'module': 'swaglog',
|
||
'timestamp': time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) # 使用本地时间
|
||
}
|
||
payload = b'\x01' + json_robust_dumps(test_data).encode('utf-8')
|
||
test_sock.send(payload, zmq.NOBLOCK)
|
||
|
||
# 清理资源
|
||
test_sock.close()
|
||
test_ctx.term()
|
||
return True
|
||
except zmq.error.Again:
|
||
# 队列满但服务可用
|
||
return True
|
||
except Exception:
|
||
# 连接失败,服务不可用
|
||
return False
|
||
|
||
|
||
def _get_caller_info(self):
|
||
"""获取调用者信息 - 简化版"""
|
||
try:
|
||
for frame in traceback.extract_stack()[-5:-1]: # 限制搜索范围
|
||
if not frame.filename.endswith('swaglog.py'):
|
||
return {'file': frame.filename, 'line': frame.lineno}
|
||
except:
|
||
pass
|
||
return None
|
||
|
||
def _create_log_record(self, level_name, formatted_msg, current_module):
|
||
"""创建日志记录 - 简化版"""
|
||
return logging.LogRecord(
|
||
name=current_module or 'unknown',
|
||
level=logging.getLevelName(level_name),
|
||
pathname='', # 简化,不需要完整路径
|
||
lineno=0,
|
||
msg=formatted_msg,
|
||
args=(),
|
||
exc_info=None,
|
||
func=None
|
||
)
|
||
|
||
def _wrap_log_method(self, original_method, level_name):
|
||
def wrapped_method(msg, *args, **kwargs):
|
||
if not msg:
|
||
return None
|
||
|
||
try:
|
||
# 提取模块名
|
||
module = kwargs.pop('module', None)
|
||
if not module:
|
||
for frame in traceback.extract_stack()[-5:-1]:
|
||
if not frame.filename.endswith('swaglog.py'):
|
||
module = Path(frame.filename).stem
|
||
break
|
||
|
||
# 确保模块名不为空
|
||
if not module:
|
||
module = 'unknown'
|
||
|
||
# 简化消息处理
|
||
if isinstance(msg, dict):
|
||
msg_payload = msg.copy() # 创建副本避免修改原始数据
|
||
# 确保消息中包含模块名
|
||
if 'module' not in msg_payload:
|
||
msg_payload['module'] = module
|
||
else:
|
||
msg_str = str(msg)
|
||
if args:
|
||
try:
|
||
msg_str = msg_str % args
|
||
except:
|
||
pass
|
||
msg_payload = {'msg': msg_str, 'module': module}
|
||
|
||
# 检查是否为启动日志,如果是则强制打印到控制台
|
||
is_startup_log = False
|
||
if isinstance(msg_payload.get('msg'), str):
|
||
msg_content = msg_payload.get('msg', '')
|
||
is_startup_log = '启动' in msg_content or 'start' in msg_content.lower()
|
||
|
||
# 创建并处理记录
|
||
record = self._create_log_record(level_name, msg_payload, module)
|
||
# 确保记录对象有module属性
|
||
record.module = module
|
||
|
||
# 确保raw_msg属性存在,供SwagFormatter使用
|
||
record.raw_msg = msg_payload
|
||
|
||
# 对于启动日志,如果当前日志级别不足以显示,则使用格式化器格式化后打印
|
||
if is_startup_log and level_name in ('INFO', 'DEBUG') and self._get_console_log_level() > logging.INFO:
|
||
# 使用与SwagFormatter一致的格式化方式
|
||
formatter = SwagFormatter(swaglogger=self.logger)
|
||
formatted_msg = formatter.format(record)
|
||
print(formatted_msg)
|
||
else:
|
||
self.logger.handle(record)
|
||
|
||
except Exception as e:
|
||
formatted_print("ERROR", "swaglog", f"LogError: {e}")
|
||
return None
|
||
return wrapped_method
|
||
|
||
def _wrap_log_methods(self):
|
||
"""包装所有日志方法"""
|
||
original_methods = {
|
||
'debug': self.logger.debug,
|
||
'info': self.logger.info,
|
||
'warning': self.logger.warning,
|
||
'error': self.logger.error
|
||
}
|
||
|
||
for level, method in original_methods.items():
|
||
setattr(self.logger, level, self._wrap_log_method(method, level.upper()))
|
||
|
||
# 创建全局日志实例
|
||
log_manager = SwagLogManager()
|
||
cloudlog = log = log_manager.logger
|