365 lines
13 KiB
Python
Executable File
365 lines
13 KiB
Python
Executable File
"""
|
||
日志服务守护进程(V3.2)核心架构:
|
||
|
||
输入层:
|
||
■ ZMQ IPC接收(ipc:///tmp/logmessage)
|
||
■ 16进制等级标识 ■ JSON/字符串自动解析
|
||
■ 非阻塞IO设计 ■ 多线程安全
|
||
■ 大日志拦截(>2MB) ■ 流量控制机制
|
||
■ 自动重连机制 ■ 进程隔离保护
|
||
|
||
处理层:
|
||
■ 智能日志过滤 ■ 模块化处理
|
||
■ 日志格式标准化 ■ 内存保护机制
|
||
■ 异常自动恢复 ■ 性能优化
|
||
■ 日志级别动态调整 ■ 消息大小限制
|
||
■ 上下文信息注入 ■ 调用栈追踪
|
||
|
||
输出层:
|
||
├─ 主日志:/data/media/0/c2_logs/swaglog
|
||
│ └─ 滚动策略:128KB/保留1500份
|
||
│ └─ 保留时间:4天
|
||
│ └─ 自动清理机制
|
||
└─ 实时通道:
|
||
├─ logMessage(全量日志)
|
||
└─ errorLogMessage(ERROR级别)
|
||
|
||
监控体系:
|
||
■ 目录健康检查 ■ 队列状态监控
|
||
■ 资源自动回收 ■ 性能指标统计
|
||
■ 错误预警机制 ■ 系统状态报告
|
||
■ 日志过滤统计 ■ 处理器状态跟踪
|
||
|
||
特性:
|
||
■ 支持动态日志配置 ■ 多级别日志处理
|
||
■ 高性能日志处理 ■ 跨平台兼容
|
||
■ 自动清理机制 ■ 实时日志推送
|
||
■ 模块化设计 ■ 可扩展架构
|
||
■ 线程安全 ■ 异常处理
|
||
■ 日志压缩 ■ 远程日志支持
|
||
"""
|
||
#!/usr/bin/env python3
|
||
import os
|
||
import zmq
|
||
import json
|
||
import time
|
||
import datetime
|
||
import logging
|
||
from typing import NoReturn
|
||
from pathlib import Path
|
||
import cereal.messaging as messaging
|
||
from openpilot.common.logging_extra import (
|
||
SwagFormatter, SwaglogRotatingFileHandler,SwagLogger,CustomSwaglogRotatingFileHandler
|
||
)
|
||
from openpilot.common.params import Params
|
||
|
||
# 全局配置常量
|
||
DEFAULT_LOG_DIR = "/data/media/0/c2_logs/logmessage/"
|
||
# 日志滚动配置
|
||
LOG_CONFIG = {
|
||
'INTERVAL': 300, # 滚动时间间隔(秒)
|
||
'MAX_BYTES': 128*1024, # 单个日志文件大小限制(128KB)
|
||
'BACKUP_COUNT': 1500, # 最大保留日志文件数量
|
||
'MAX_AGE': 4*24*3600, # 日志最大保留时间(4天)
|
||
'ENCODING': 'utf8', # 日志文件编码
|
||
'CLEAN_INTERVAL': 6*3600 # 清理检查间隔(6小时)
|
||
}
|
||
|
||
def create_log_handler(boot_ts: float):
|
||
try:
|
||
hex_ts = hex(int(boot_ts))[2:]
|
||
os.makedirs(DEFAULT_LOG_DIR, exist_ok=True)
|
||
|
||
return CustomSwaglogRotatingFileHandler(
|
||
os.path.join(DEFAULT_LOG_DIR, f"swaglog.{hex_ts}.{time.strftime('%Y%m%d_%H%M%S')}.000.log"),
|
||
interval=LOG_CONFIG['INTERVAL'],
|
||
max_bytes=LOG_CONFIG['MAX_BYTES'],
|
||
backup_count=LOG_CONFIG['BACKUP_COUNT'],
|
||
encoding=LOG_CONFIG['ENCODING']
|
||
)
|
||
except Exception as e:
|
||
print(f"HandlerInitError: {str(e)}")
|
||
return None
|
||
|
||
def clean_old_logs():
|
||
"""清理过期日志文件"""
|
||
try:
|
||
current_time = time.time()
|
||
for log_file in Path(DEFAULT_LOG_DIR).glob("*.log"):
|
||
if current_time - log_file.stat().st_mtime > LOG_CONFIG['MAX_AGE']:
|
||
log_file.unlink()
|
||
except Exception as e:
|
||
print(f"CleanLogError: {str(e)}")
|
||
|
||
def get_system_boottime() -> float:
|
||
"""获取系统启动时间戳(秒级精度)
|
||
通过/proc/stat文件获取更准确的启动时间
|
||
当无法获取时(如Windows系统)回退到当前时间"""
|
||
try:
|
||
with open('/proc/stat', 'r') as f:
|
||
for line in f:
|
||
if line.startswith('btime'):
|
||
return float(line.split()[1])
|
||
except:
|
||
return time.time() # 回退到当前时间
|
||
|
||
def get_log_level():
|
||
"""获取全局日志级别"""
|
||
params = Params()
|
||
dp_log_level = params.get("dp_log_level", encoding='utf8')
|
||
|
||
if dp_log_level is not None:
|
||
level_map = {
|
||
"0": logging.WARNING,
|
||
"1": logging.INFO,
|
||
"2": logging.DEBUG
|
||
}
|
||
return level_map.get(dp_log_level, logging.INFO)
|
||
return logging.INFO
|
||
|
||
class FilteredLogFormatter(logging.Formatter):
|
||
def __init__(self, swaglogger=None):
|
||
super().__init__()
|
||
self.critical_modules = {
|
||
'controlsd', 'pandad', 'plannerd', 'radard',
|
||
'thermald', 'uploader', 'manager', 'locationd',
|
||
'modeld', 'sensord', 'monitoringd', 'logmessaged'
|
||
}
|
||
self.reduced_modules = {
|
||
'boardd': logging.WARNING,
|
||
'camerad': logging.WARNING,
|
||
'ui': logging.WARNING
|
||
}
|
||
self.global_level = get_log_level()
|
||
|
||
# 添加计数器,用于统计过滤情况
|
||
self.filtered_count = 0
|
||
self.total_count = 0
|
||
self.last_stats_time = time.time()
|
||
|
||
def format(self, record):
|
||
try:
|
||
# 处理字典类型的日志消息
|
||
if isinstance(record.msg, dict):
|
||
log_dict = record.msg.copy() # 使用副本避免修改原始数据
|
||
# 提取事件信息和额外参数
|
||
event_msg = log_dict.get('event', 'unknown_event')
|
||
# 移除已知的特殊字段
|
||
for field in ['msg', 'module', 'timestamp']:
|
||
log_dict.pop(field, None)
|
||
params = {k: v for k, v in log_dict.items() if k not in {'event', 'msg', 'module', 'timestamp'}}
|
||
|
||
# 构建消息内容
|
||
param_str = ', '.join([f"{k}={v}" for k, v in params.items()])
|
||
msg_content = f"{event_msg} | {param_str}"
|
||
original_length = len(str(record.msg)) # 保持原始长度用于过滤
|
||
else:
|
||
# 原有处理逻辑
|
||
msg_content = str(record.msg)
|
||
original_length = len(msg_content)
|
||
|
||
# 过滤检查应使用原始长度
|
||
if not self._should_log(record.module, record.levelno, original_length):
|
||
self.filtered_count += 1
|
||
return ""
|
||
|
||
# 获取时间戳和日志级别
|
||
current_time = time.localtime()
|
||
microsecond = int(time.time() * 1000) % 1000
|
||
timestamp = f"{time.strftime('%Y-%m-%d %H:%M:%S', current_time)}.{microsecond:03d}"
|
||
level_name = logging.getLevelName(record.levelno)
|
||
|
||
return f"{timestamp} | {level_name:<7} | {record.module:<15} | {msg_content}"
|
||
except Exception as e:
|
||
return f"FormatError: {str(e)[:100]}"
|
||
|
||
def _should_log(self, module: str, log_level: int, message_size: int) -> bool:
|
||
"""判断是否应该记录日志"""
|
||
if module is None:
|
||
module = "unknown"
|
||
module_lower = module.lower()
|
||
|
||
# 1. 检查关键模块
|
||
if module_lower in self.critical_modules:
|
||
return True
|
||
|
||
# 2. 检查全局日志级别
|
||
if log_level < self.global_level:
|
||
return False
|
||
|
||
# 3. 检查消息大小
|
||
if message_size > 2*1024*1024:
|
||
return log_level >= logging.ERROR
|
||
|
||
# 4. 检查减少日志模块的级别
|
||
min_level = self.reduced_modules.get(module_lower, logging.DEBUG)
|
||
return log_level >= min_level
|
||
|
||
def main() -> NoReturn:
|
||
import signal
|
||
import atexit
|
||
|
||
def cleanup():
|
||
"""清理资源"""
|
||
try:
|
||
if 'sock' in globals() and sock is not None:
|
||
sock.close()
|
||
if 'ctx' in globals() and ctx is not None:
|
||
ctx.term()
|
||
if 'handler' in globals() and handler is not None:
|
||
handler.close()
|
||
except Exception as e:
|
||
print(f"清理错误: {str(e)}")
|
||
|
||
def signal_handler(signum, frame):
|
||
"""处理退出信号"""
|
||
print(f"收到信号 {signum},正在退出...")
|
||
cleanup()
|
||
os._exit(0)
|
||
|
||
# 注册清理函数
|
||
atexit.register(cleanup)
|
||
signal.signal(signal.SIGINT, signal_handler)
|
||
signal.signal(signal.SIGTERM, signal_handler)
|
||
|
||
try:
|
||
# 初始化
|
||
ctx = zmq.Context()
|
||
sock = ctx.socket(zmq.PULL)
|
||
sock.bind("ipc:///tmp/logmessage")
|
||
log_sock = messaging.pub_sock("logMessage")
|
||
error_sock = messaging.pub_sock("errorLogMessage")
|
||
|
||
# 日志系统初始化
|
||
BOOT_TIMESTAMP = get_system_boottime()
|
||
Path(DEFAULT_LOG_DIR).mkdir(parents=True, exist_ok=True)
|
||
clean_old_logs()
|
||
|
||
# 初始化日志处理器
|
||
if not (handler := create_log_handler(BOOT_TIMESTAMP)):
|
||
print("无法创建日志处理器")
|
||
# 清理资源
|
||
sock.close()
|
||
ctx.term()
|
||
return
|
||
|
||
# 确保只添加一个处理器
|
||
logger = logging.getLogger()
|
||
logger.setLevel(logging.DEBUG) # 添加这行,确保可以处理所有级别的日志
|
||
logger.handlers.clear() # 清除所有现有处理器
|
||
logger.propagate = False # 防止日志传播到父logger
|
||
handler.setFormatter(FilteredLogFormatter(None))
|
||
logger.addHandler(handler)
|
||
|
||
# 测试日志
|
||
test_record = logging.LogRecord(
|
||
name='logmessaged',
|
||
level=logging.INFO,
|
||
pathname='',
|
||
lineno=0,
|
||
msg='日志系统启动成功',
|
||
args=(),
|
||
exc_info=None
|
||
)
|
||
handler.emit(test_record)
|
||
handler.flush()
|
||
|
||
last_clean_time = time.time()
|
||
|
||
while True:
|
||
# 定期清理旧日志
|
||
if time.time() - last_clean_time > LOG_CONFIG['CLEAN_INTERVAL']:
|
||
clean_old_logs()
|
||
last_clean_time = time.time()
|
||
|
||
try:
|
||
# 接收日志数据
|
||
dat = sock.recv_multipart()[0]
|
||
if not dat:
|
||
continue
|
||
|
||
# 解析日志数据
|
||
try:
|
||
if dat.startswith(b'\x01'): # JSON格式
|
||
raw_data = json.loads(dat[1:].decode('utf-8', errors='replace'))
|
||
level = min(max(int(raw_data.get('level', 10)), logging.DEBUG), logging.CRITICAL)
|
||
msg = raw_data.get('msg', '')
|
||
module = raw_data.get('module', 'unknown').strip() or 'unknown'
|
||
else: # 文本格式
|
||
level = min(max(int(dat[0]), logging.DEBUG), logging.CRITICAL)
|
||
raw_msg = dat[1:].decode('utf-8', errors='replace')
|
||
|
||
# 尝试解析文本中的JSON结构
|
||
try:
|
||
msg_dict = json.loads(raw_msg)
|
||
# 优先使用daemon字段作为模块名
|
||
module = msg_dict.get('daemon') or \
|
||
msg_dict.get('filename', 'text_log').split('/')[-1].split('.')[0]
|
||
module = module.lower().strip() # 统一命名规范
|
||
# 提取消息内容(优先使用msg字段)
|
||
msg = msg_dict.get('msg', raw_msg)
|
||
# 使用日志中的时间戳(如果存在)
|
||
current_time = time.localtime()
|
||
microsecond = int(time.time() * 1000) % 1000
|
||
timestamp = f"{time.strftime('%Y-%m-%d %H:%M:%S', current_time)}.{microsecond:03d}"
|
||
except Exception:
|
||
# 非结构化文本处理
|
||
module = 'text_log'
|
||
msg = raw_msg
|
||
current_time = time.localtime()
|
||
microsecond = int(time.time() * 1000) % 1000
|
||
timestamp = f"{time.strftime('%Y-%m-%d %H:%M:%S', current_time)}.{microsecond:03d}"
|
||
# 构建统一日志格式
|
||
#formatted_msg = f"{timestamp} | {logging.getLevelName(level):<7} | {module:<15} | {msg}"
|
||
|
||
# 日志过滤检查
|
||
if not handler.formatter._should_log(module, level, len(str(msg))):
|
||
continue
|
||
|
||
# 创建日志记录
|
||
record = logging.LogRecord(
|
||
name=module,
|
||
level=level,
|
||
pathname='',
|
||
lineno=0,
|
||
msg=msg,
|
||
args=(),
|
||
exc_info=None
|
||
)
|
||
record.module = module
|
||
|
||
# 处理日志
|
||
formatted = handler.formatter.format(record)
|
||
if formatted:
|
||
handler.emit(record)
|
||
|
||
# 发布日志消息
|
||
if len(formatted) <= 2*1024*1024:
|
||
log_msg = messaging.new_message()
|
||
log_msg.logMessage = formatted
|
||
log_sock.send(log_msg.to_bytes())
|
||
|
||
if level >= logging.ERROR:
|
||
error_msg = messaging.new_message()
|
||
error_msg.errorLogMessage = formatted
|
||
error_sock.send(error_msg.to_bytes())
|
||
|
||
except Exception as parse_error:
|
||
print(f"日志解析错误: {str(parse_error)}")
|
||
continue
|
||
|
||
except Exception as recv_error:
|
||
print(f"接收错误: {str(recv_error)}")
|
||
time.sleep(0.1)
|
||
|
||
except Exception as e:
|
||
print(f"FatalError: {str(e)}")
|
||
os._exit(1)
|
||
finally:
|
||
sock.close()
|
||
ctx.term()
|
||
handler.close()
|
||
|
||
if __name__ == "__main__":
|
||
main()
|