Files
dragonpilot/system/logmessaged.py
Comma Device 5c73e264e9 Release 260308
2026-03-08 23:26:57 +08:00

365 lines
13 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
日志服务守护进程V3.2)核心架构:
输入层:
■ ZMQ IPC接收ipc:///tmp/logmessage
■ 16进制等级标识 ■ JSON/字符串自动解析
■ 非阻塞IO设计 ■ 多线程安全
■ 大日志拦截(>2MB) ■ 流量控制机制
■ 自动重连机制 ■ 进程隔离保护
处理层:
■ 智能日志过滤 ■ 模块化处理
■ 日志格式标准化 ■ 内存保护机制
■ 异常自动恢复 ■ 性能优化
■ 日志级别动态调整 ■ 消息大小限制
■ 上下文信息注入 ■ 调用栈追踪
输出层:
├─ 主日志:/data/media/0/c2_logs/swaglog
│ └─ 滚动策略128KB/保留1500份
│ └─ 保留时间4天
│ └─ 自动清理机制
└─ 实时通道:
├─ logMessage全量日志
└─ errorLogMessageERROR级别
监控体系:
■ 目录健康检查 ■ 队列状态监控
■ 资源自动回收 ■ 性能指标统计
■ 错误预警机制 ■ 系统状态报告
■ 日志过滤统计 ■ 处理器状态跟踪
特性:
■ 支持动态日志配置 ■ 多级别日志处理
■ 高性能日志处理 ■ 跨平台兼容
■ 自动清理机制 ■ 实时日志推送
■ 模块化设计 ■ 可扩展架构
■ 线程安全 ■ 异常处理
■ 日志压缩 ■ 远程日志支持
"""
#!/usr/bin/env python3
import os
import zmq
import json
import time
import datetime
import logging
from typing import NoReturn
from pathlib import Path
import cereal.messaging as messaging
from openpilot.common.logging_extra import (
SwagFormatter, SwaglogRotatingFileHandler,SwagLogger,CustomSwaglogRotatingFileHandler
)
from openpilot.common.params import Params
# 全局配置常量
DEFAULT_LOG_DIR = "/data/media/0/c2_logs/logmessage/"
# 日志滚动配置
LOG_CONFIG = {
'INTERVAL': 300, # 滚动时间间隔(秒)
'MAX_BYTES': 128*1024, # 单个日志文件大小限制128KB
'BACKUP_COUNT': 1500, # 最大保留日志文件数量
'MAX_AGE': 4*24*3600, # 日志最大保留时间4天
'ENCODING': 'utf8', # 日志文件编码
'CLEAN_INTERVAL': 6*3600 # 清理检查间隔6小时
}
def create_log_handler(boot_ts: float):
try:
hex_ts = hex(int(boot_ts))[2:]
os.makedirs(DEFAULT_LOG_DIR, exist_ok=True)
return CustomSwaglogRotatingFileHandler(
os.path.join(DEFAULT_LOG_DIR, f"swaglog.{hex_ts}.{time.strftime('%Y%m%d_%H%M%S')}.000.log"),
interval=LOG_CONFIG['INTERVAL'],
max_bytes=LOG_CONFIG['MAX_BYTES'],
backup_count=LOG_CONFIG['BACKUP_COUNT'],
encoding=LOG_CONFIG['ENCODING']
)
except Exception as e:
print(f"HandlerInitError: {str(e)}")
return None
def clean_old_logs():
"""清理过期日志文件"""
try:
current_time = time.time()
for log_file in Path(DEFAULT_LOG_DIR).glob("*.log"):
if current_time - log_file.stat().st_mtime > LOG_CONFIG['MAX_AGE']:
log_file.unlink()
except Exception as e:
print(f"CleanLogError: {str(e)}")
def get_system_boottime() -> float:
"""获取系统启动时间戳(秒级精度)
通过/proc/stat文件获取更准确的启动时间
当无法获取时如Windows系统回退到当前时间"""
try:
with open('/proc/stat', 'r') as f:
for line in f:
if line.startswith('btime'):
return float(line.split()[1])
except:
return time.time() # 回退到当前时间
def get_log_level():
"""获取全局日志级别"""
params = Params()
dp_log_level = params.get("dp_log_level", encoding='utf8')
if dp_log_level is not None:
level_map = {
"0": logging.WARNING,
"1": logging.INFO,
"2": logging.DEBUG
}
return level_map.get(dp_log_level, logging.INFO)
return logging.INFO
class FilteredLogFormatter(logging.Formatter):
def __init__(self, swaglogger=None):
super().__init__()
self.critical_modules = {
'controlsd', 'pandad', 'plannerd', 'radard',
'thermald', 'uploader', 'manager', 'locationd',
'modeld', 'sensord', 'monitoringd', 'logmessaged'
}
self.reduced_modules = {
'boardd': logging.WARNING,
'camerad': logging.WARNING,
'ui': logging.WARNING
}
self.global_level = get_log_level()
# 添加计数器,用于统计过滤情况
self.filtered_count = 0
self.total_count = 0
self.last_stats_time = time.time()
def format(self, record):
try:
# 处理字典类型的日志消息
if isinstance(record.msg, dict):
log_dict = record.msg.copy() # 使用副本避免修改原始数据
# 提取事件信息和额外参数
event_msg = log_dict.get('event', 'unknown_event')
# 移除已知的特殊字段
for field in ['msg', 'module', 'timestamp']:
log_dict.pop(field, None)
params = {k: v for k, v in log_dict.items() if k not in {'event', 'msg', 'module', 'timestamp'}}
# 构建消息内容
param_str = ', '.join([f"{k}={v}" for k, v in params.items()])
msg_content = f"{event_msg} | {param_str}"
original_length = len(str(record.msg)) # 保持原始长度用于过滤
else:
# 原有处理逻辑
msg_content = str(record.msg)
original_length = len(msg_content)
# 过滤检查应使用原始长度
if not self._should_log(record.module, record.levelno, original_length):
self.filtered_count += 1
return ""
# 获取时间戳和日志级别
current_time = time.localtime()
microsecond = int(time.time() * 1000) % 1000
timestamp = f"{time.strftime('%Y-%m-%d %H:%M:%S', current_time)}.{microsecond:03d}"
level_name = logging.getLevelName(record.levelno)
return f"{timestamp} | {level_name:<7} | {record.module:<15} | {msg_content}"
except Exception as e:
return f"FormatError: {str(e)[:100]}"
def _should_log(self, module: str, log_level: int, message_size: int) -> bool:
"""判断是否应该记录日志"""
if module is None:
module = "unknown"
module_lower = module.lower()
# 1. 检查关键模块
if module_lower in self.critical_modules:
return True
# 2. 检查全局日志级别
if log_level < self.global_level:
return False
# 3. 检查消息大小
if message_size > 2*1024*1024:
return log_level >= logging.ERROR
# 4. 检查减少日志模块的级别
min_level = self.reduced_modules.get(module_lower, logging.DEBUG)
return log_level >= min_level
def main() -> NoReturn:
import signal
import atexit
def cleanup():
"""清理资源"""
try:
if 'sock' in globals() and sock is not None:
sock.close()
if 'ctx' in globals() and ctx is not None:
ctx.term()
if 'handler' in globals() and handler is not None:
handler.close()
except Exception as e:
print(f"清理错误: {str(e)}")
def signal_handler(signum, frame):
"""处理退出信号"""
print(f"收到信号 {signum},正在退出...")
cleanup()
os._exit(0)
# 注册清理函数
atexit.register(cleanup)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
try:
# 初始化
ctx = zmq.Context()
sock = ctx.socket(zmq.PULL)
sock.bind("ipc:///tmp/logmessage")
log_sock = messaging.pub_sock("logMessage")
error_sock = messaging.pub_sock("errorLogMessage")
# 日志系统初始化
BOOT_TIMESTAMP = get_system_boottime()
Path(DEFAULT_LOG_DIR).mkdir(parents=True, exist_ok=True)
clean_old_logs()
# 初始化日志处理器
if not (handler := create_log_handler(BOOT_TIMESTAMP)):
print("无法创建日志处理器")
# 清理资源
sock.close()
ctx.term()
return
# 确保只添加一个处理器
logger = logging.getLogger()
logger.setLevel(logging.DEBUG) # 添加这行,确保可以处理所有级别的日志
logger.handlers.clear() # 清除所有现有处理器
logger.propagate = False # 防止日志传播到父logger
handler.setFormatter(FilteredLogFormatter(None))
logger.addHandler(handler)
# 测试日志
test_record = logging.LogRecord(
name='logmessaged',
level=logging.INFO,
pathname='',
lineno=0,
msg='日志系统启动成功',
args=(),
exc_info=None
)
handler.emit(test_record)
handler.flush()
last_clean_time = time.time()
while True:
# 定期清理旧日志
if time.time() - last_clean_time > LOG_CONFIG['CLEAN_INTERVAL']:
clean_old_logs()
last_clean_time = time.time()
try:
# 接收日志数据
dat = sock.recv_multipart()[0]
if not dat:
continue
# 解析日志数据
try:
if dat.startswith(b'\x01'): # JSON格式
raw_data = json.loads(dat[1:].decode('utf-8', errors='replace'))
level = min(max(int(raw_data.get('level', 10)), logging.DEBUG), logging.CRITICAL)
msg = raw_data.get('msg', '')
module = raw_data.get('module', 'unknown').strip() or 'unknown'
else: # 文本格式
level = min(max(int(dat[0]), logging.DEBUG), logging.CRITICAL)
raw_msg = dat[1:].decode('utf-8', errors='replace')
# 尝试解析文本中的JSON结构
try:
msg_dict = json.loads(raw_msg)
# 优先使用daemon字段作为模块名
module = msg_dict.get('daemon') or \
msg_dict.get('filename', 'text_log').split('/')[-1].split('.')[0]
module = module.lower().strip() # 统一命名规范
# 提取消息内容优先使用msg字段
msg = msg_dict.get('msg', raw_msg)
# 使用日志中的时间戳(如果存在)
current_time = time.localtime()
microsecond = int(time.time() * 1000) % 1000
timestamp = f"{time.strftime('%Y-%m-%d %H:%M:%S', current_time)}.{microsecond:03d}"
except Exception:
# 非结构化文本处理
module = 'text_log'
msg = raw_msg
current_time = time.localtime()
microsecond = int(time.time() * 1000) % 1000
timestamp = f"{time.strftime('%Y-%m-%d %H:%M:%S', current_time)}.{microsecond:03d}"
# 构建统一日志格式
#formatted_msg = f"{timestamp} | {logging.getLevelName(level):<7} | {module:<15} | {msg}"
# 日志过滤检查
if not handler.formatter._should_log(module, level, len(str(msg))):
continue
# 创建日志记录
record = logging.LogRecord(
name=module,
level=level,
pathname='',
lineno=0,
msg=msg,
args=(),
exc_info=None
)
record.module = module
# 处理日志
formatted = handler.formatter.format(record)
if formatted:
handler.emit(record)
# 发布日志消息
if len(formatted) <= 2*1024*1024:
log_msg = messaging.new_message()
log_msg.logMessage = formatted
log_sock.send(log_msg.to_bytes())
if level >= logging.ERROR:
error_msg = messaging.new_message()
error_msg.errorLogMessage = formatted
error_sock.send(error_msg.to_bytes())
except Exception as parse_error:
print(f"日志解析错误: {str(parse_error)}")
continue
except Exception as recv_error:
print(f"接收错误: {str(recv_error)}")
time.sleep(0.1)
except Exception as e:
print(f"FatalError: {str(e)}")
os._exit(1)
finally:
sock.close()
ctx.term()
handler.close()
if __name__ == "__main__":
main()