Files
dragonpilot/selfdrive/boardd/set_time.py
Comma Device 5c73e264e9 Release 260308
2026-03-08 23:26:57 +08:00

182 lines
7.5 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
import os
import subprocess
import time
import datetime
import requests
from panda import Panda
from openpilot.common.params import Params
import cereal.messaging as messaging
PARAMS = Params()
LAST_TIME_KEY = "LastValidTime"
# 设置最小有效时间为2025年3月15日使用UTC时区
MIN_DATE_UTC = datetime.datetime(2025, 3, 15, tzinfo=datetime.timezone.utc)
NTP_SERVERS = [
"ntp.aliyun.com", # 阿里云
"ntp.tencent.com", # 腾讯
"cn.ntp.org.cn", # 中国 NTP 池
"ntp.ntsc.ac.cn" # 中国科学院国家授时中心
]
def save_last_valid_time(time_value, logger):
if time_value and time_value > MIN_DATE_UTC:
try:
timestamp = int(time_value.timestamp())
if timestamp > 0:
PARAMS.put(LAST_TIME_KEY, str(timestamp))
logger.debug(f"已保存有效时间戳: {timestamp} ({time_value.strftime('%Y-%m-%d %H:%M:%S')})")
except (ValueError, TypeError, OSError) as e:
logger.error(f"保存时间戳失败: {str(e)}")
def get_ntp_time(logger):
logger.info(f"开始尝试从 {len(NTP_SERVERS)} 个NTP服务器获取时间...")
for server in NTP_SERVERS:
try:
logger.debug(f"正在连接NTP服务器: {server}")
response = requests.get(f"https://{server}", timeout=2, verify=False)
if response.ok and 'date' in response.headers:
# 确保NTP时间使用UTC时区
server_time = datetime.datetime.strptime(response.headers['date'], '%a, %d %b %Y %H:%M:%S %Z').replace(tzinfo=datetime.timezone.utc)
logger.info(f"NTP服务器 {server} 同步成功: {server_time.astimezone().strftime('%Y-%m-%d %H:%M:%S %Z')}")
return server_time
except Exception as e:
logger.debug(f"NTP服务器 {server} 同步失败: {str(e)}")
logger.warning("所有NTP服务器同步失败")
return None
def get_gps_time(logger):
try:
logger.info("正在尝试获取GPS时间...")
sm = messaging.SubMaster(['gpsLocationExternal'])
for i in range(50):
sm.update()
if sm.updated['gpsLocationExternal'] and sm.valid['gpsLocationExternal']:
log = sm['gpsLocationExternal']
if log.flags % 2 != 0:
gps_time = datetime.datetime.fromtimestamp(log.unixTimestampMillis / 1000.0, datetime.timezone.utc)
if gps_time and gps_time > MIN_DATE_UTC:
logger.info(f"GPS时间获取成功: {gps_time.astimezone().strftime('%Y-%m-%d %H:%M:%S %Z')}")
return gps_time
if i % 10 == 0:
logger.debug(f"等待GPS数据... ({i/10}/5秒)")
time.sleep(0.1)
logger.warning("等待GPS数据超时")
except Exception as e:
logger.error(f"GPS时间同步失败: {str(e)}")
return None
def get_last_valid_time(logger):
try:
logger.info("正在获取上次保存的有效时间...")
timestamp = PARAMS.get(LAST_TIME_KEY)
if timestamp:
# 使用UTC时区处理时间戳并增加5分钟
time_value = datetime.datetime.fromtimestamp(int(timestamp), datetime.timezone.utc) + datetime.timedelta(minutes=5)
# 转换为本地时间
local_time = time_value.astimezone()
logger.info(f"找到上次有效时间(+5分钟): {local_time.strftime('%Y-%m-%d %H:%M:%S %Z')}")
return time_value # 返回UTC时间保持系统时间统一性
logger.debug("未找到上次有效时间记录")
except Exception as e:
logger.error(f"获取上次有效时间失败: {str(e)}")
return None
def set_system_time(time_value, source, logger):
try:
if time_value and time_value > MIN_DATE_UTC:
if time_value.tzinfo is None:
time_value = time_value.replace(tzinfo=datetime.timezone.utc)
time_str = time_value.astimezone(datetime.timezone.utc).strftime('%Y-%m-%d %H:%M:%S')
logger.info(f"正在设置系统时间为 {time_str} (来源: {source})...")
result = os.system(f"TZ=UTC date -s '{time_str}'") == 0
if result:
logger.info(f"✓ 系统时间设置成功: {time_str} (来源: {source})")
save_last_valid_time(time_value, logger)
return True
else:
logger.error(f"✗ 设置系统时间失败 (来源: {source})")
else:
logger.warning(f"无效的时间值 (来源: {source}): {time_value}")
return False
except Exception as e:
logger.error(f"设置系统时间出错 (来源: {source}): {str(e)}")
return False
def set_time(logger):
logger.info("=== 开始时间同步流程 ===")
sys_time = datetime.datetime.now(datetime.timezone.utc)
if sys_time > MIN_DATE_UTC:
logger.info(f"系统时间已有效: {sys_time.strftime('%Y-%m-%d %H:%M:%S %Z')}")
save_last_valid_time(sys_time, logger)
return
logger.warning(f"系统时间无效: {sys_time.strftime('%Y-%m-%d %H:%M:%S %Z')}, 开始尝试同步...")
# 1. 尝试网络时间同步
logger.info("【方法1】尝试网络时间同步...")
ntp_time = get_ntp_time(logger)
if ntp_time and set_system_time(ntp_time, "NTP", logger):
logger.info("=== 时间同步完成 ===")
return
# 2. 尝试使用上次保存的时间
logger.info("【方法2】尝试使用上次保存的有效时间...")
last_time = get_last_valid_time(logger)
if last_time and set_system_time(last_time, "上次有效时间", logger):
logger.info("=== 时间同步完成 ===")
return
# 3. 尝试GPS时间检查 ubloxd 进程是否运行)
logger.info("【方法3】尝试使用GPS时间...")
try:
# 直接检查 ubloxd 进程
ubloxd_running = subprocess.run(["ps", "-A"], capture_output=True, text=True).stdout.find("ubloxd") != -1
if ubloxd_running:
logger.info("ubloxd进程正在运行尝试获取GPS时间...")
gps_time = get_gps_time(logger)
if gps_time and set_system_time(gps_time, "GPS", logger):
logger.info("=== 时间同步完成 ===")
return
else:
logger.warning("ubloxd进程未运行无法获取GPS时间")
except Exception as e:
logger.error(f"检查ubloxd状态失败: {str(e)}")
# 4. 尝试Panda RTC时间
logger.info("【方法4】尝试使用Panda RTC时间...")
try:
ps = Panda.list()
if len(ps) > 0:
logger.info(f"找到 {len(ps)} 个Panda设备")
for i, s in enumerate(ps):
logger.debug(f"正在连接Panda设备 {i+1}/{len(ps)}: {s}")
with Panda(serial=s) as p:
if not p.is_internal():
logger.debug(f"设备 {s} 不是内部Panda跳过")
continue
logger.info(f"正在从内部Panda设备 {s} 获取时间...")
panda_time = p.get_datetime()
if panda_time:
logger.info(f"Panda RTC时间: {panda_time.strftime('%Y-%m-%d %H:%M:%S')}")
if set_system_time(panda_time, "Panda RTC", logger):
logger.info("=== 时间同步完成 ===")
return
else:
logger.warning(f"Panda设备 {s} 未返回有效时间")
else:
logger.warning("未找到Panda设备")
except Exception as e:
logger.error(f"获取Panda时间失败: {str(e)}")
logger.error("❌ 所有时间源都无法获取有效时间,时间同步失败")
logger.info("=== 时间同步流程结束 ===")
if __name__ == "__main__":
import logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
logger = logging.getLogger("时间同步")
set_time(logger)