Files
dragonpilot/selfdrive/athena/athenad.py
Comma Device 5c73e264e9 Release 260308
2026-03-08 23:26:57 +08:00

861 lines
28 KiB
Python
Executable File

#!/usr/bin/env python3
from __future__ import annotations
import base64
import bz2
import hashlib
import io
import json
import os
import queue
import random
import select
import socket
import subprocess
import sys
import tempfile
import threading
import time
from dataclasses import asdict, dataclass, replace
from datetime import datetime
from functools import partial
from queue import Queue
from typing import BinaryIO, Callable, Dict, List, Optional, Set, Union, cast
import requests
from jsonrpc import JSONRPCResponseManager, dispatcher
from websocket import (ABNF, WebSocket, WebSocketException, WebSocketTimeoutException,
create_connection)
import cereal.messaging as messaging
from cereal import log
from cereal.services import SERVICE_LIST
from openpilot.common.api import Api
from openpilot.common.basedir import PERSIST
from openpilot.common.file_helpers import CallbackReader
from openpilot.common.params import Params
from openpilot.common.realtime import set_core_affinity
from openpilot.system.hardware import HARDWARE, PC, AGNOS
from openpilot.selfdrive.loggerd.config import ROOT
from openpilot.selfdrive.loggerd.xattr_cache import getxattr, setxattr
from openpilot.selfdrive.statsd import STATS_DIR
from openpilot.common.swaglog import SWAGLOG_DIR, cloudlog
from openpilot.system.version import get_commit, get_origin, get_short_branch, get_version
# TODO: use socket constant when mypy recognizes this as a valid attribute
TCP_USER_TIMEOUT = 18
ATHENA_HOST = os.getenv('ATHENA_HOST', 'wss://athena.comma.ai')
HANDLER_THREADS = int(os.getenv('HANDLER_THREADS', "4"))
LOCAL_PORT_WHITELIST = {8022}
LOG_ATTR_NAME = 'user.upload'
LOG_ATTR_VALUE_MAX_UNIX_TIME = int.to_bytes(2147483647, 4, sys.byteorder)
RECONNECT_TIMEOUT_S = 70
RETRY_DELAY = 10 # seconds
MAX_RETRY_COUNT = 30 # Try for at most 5 minutes if upload fails immediately
MAX_AGE = 31 * 24 * 3600 # seconds
WS_FRAME_SIZE = 4096
NetworkType = log.DeviceState.NetworkType
UploadFileDict = Dict[str, Union[str, int, float, bool]]
UploadItemDict = Dict[str, Union[str, bool, int, float, Dict[str, str]]]
UploadFilesToUrlResponse = Dict[str, Union[int, List[UploadItemDict], List[str]]]
@dataclass
class UploadFile:
fn: str
url: str
headers: Dict[str, str]
allow_cellular: bool
@classmethod
def from_dict(cls, d: Dict) -> UploadFile:
return cls(d.get("fn", ""), d.get("url", ""), d.get("headers", {}), d.get("allow_cellular", False))
@dataclass
class UploadItem:
path: str
url: str
headers: Dict[str, str]
created_at: int
id: Optional[str] # noqa: A003 (to match the response from the remote server)
retry_count: int = 0
current: bool = False
progress: float = 0
allow_cellular: bool = False
@classmethod
def from_dict(cls, d: Dict) -> UploadItem:
return cls(d["path"], d["url"], d["headers"], d["created_at"], d["id"], d["retry_count"], d["current"],
d["progress"], d["allow_cellular"])
dispatcher["echo"] = lambda s: s
recv_queue: Queue[str] = queue.Queue()
send_queue: Queue[str] = queue.Queue()
upload_queue: Queue[UploadItem] = queue.Queue()
low_priority_send_queue: Queue[str] = queue.Queue()
log_recv_queue: Queue[str] = queue.Queue()
cancelled_uploads: Set[str] = set()
cur_upload_items: Dict[int, Optional[UploadItem]] = {}
def strip_bz2_extension(fn: str) -> str:
if fn.endswith('.bz2'):
return fn[:-4]
return fn
class AbortTransferException(Exception):
pass
class UploadQueueCache:
params = Params()
@staticmethod
def initialize(upload_queue: Queue[UploadItem]) -> None:
try:
upload_queue_json = UploadQueueCache.params.get("AthenadUploadQueue")
if upload_queue_json is not None:
for item in json.loads(upload_queue_json):
upload_queue.put(UploadItem.from_dict(item))
except Exception:
cloudlog.exception("athena.UploadQueueCache.initialize.exception")
@staticmethod
def cache(upload_queue: Queue[UploadItem]) -> None:
try:
queue: List[Optional[UploadItem]] = list(upload_queue.queue)
items = [asdict(i) for i in queue if i is not None and (i.id not in cancelled_uploads)]
UploadQueueCache.params.put("AthenadUploadQueue", json.dumps(items))
except Exception:
cloudlog.exception("athena.UploadQueueCache.cache.exception")
def handle_long_poll(ws: WebSocket, exit_event: Optional[threading.Event]) -> None:
end_event = threading.Event()
threads = [
threading.Thread(target=ws_manage, args=(ws, end_event), name='ws_manage'),
threading.Thread(target=ws_recv, args=(ws, end_event), name='ws_recv'),
threading.Thread(target=ws_send, args=(ws, end_event), name='ws_send'),
threading.Thread(target=upload_handler, args=(end_event,), name='upload_handler'),
threading.Thread(target=log_handler, args=(end_event,), name='log_handler'),
threading.Thread(target=stat_handler, args=(end_event,), name='stat_handler'),
] + [
threading.Thread(target=jsonrpc_handler, args=(end_event,), name=f'worker_{x}')
for x in range(HANDLER_THREADS)
]
for thread in threads:
thread.start()
try:
while not end_event.wait(0.1):
if exit_event is not None and exit_event.is_set():
end_event.set()
except (KeyboardInterrupt, SystemExit):
end_event.set()
raise
finally:
for thread in threads:
cloudlog.debug(f"athena.joining {thread.name}")
thread.join()
def jsonrpc_handler(end_event: threading.Event) -> None:
dispatcher["startLocalProxy"] = partial(startLocalProxy, end_event)
while not end_event.is_set():
try:
data = recv_queue.get(timeout=1)
if "method" in data:
cloudlog.event("athena.jsonrpc_handler.call_method", data=data)
response = JSONRPCResponseManager.handle(data, dispatcher)
send_queue.put_nowait(response.json)
elif "id" in data and ("result" in data or "error" in data):
log_recv_queue.put_nowait(data)
else:
raise Exception("not a valid request or response")
except queue.Empty:
pass
except Exception as e:
cloudlog.exception("athena jsonrpc handler failed")
send_queue.put_nowait(json.dumps({"error": str(e)}))
def retry_upload(tid: int, end_event: threading.Event, increase_count: bool = True) -> None:
item = cur_upload_items[tid]
if item is not None and item.retry_count < MAX_RETRY_COUNT:
new_retry_count = item.retry_count + 1 if increase_count else item.retry_count
item = replace(
item,
retry_count=new_retry_count,
progress=0,
current=False
)
upload_queue.put_nowait(item)
UploadQueueCache.cache(upload_queue)
cur_upload_items[tid] = None
for _ in range(RETRY_DELAY):
time.sleep(1)
if end_event.is_set():
break
def cb(sm, item, tid, sz: int, cur: int) -> None:
# Abort transfer if connection changed to metered after starting upload
sm.update(0)
metered = sm['deviceState'].networkMetered
if metered and (not item.allow_cellular):
raise AbortTransferException
cur_upload_items[tid] = replace(item, progress=cur / sz if sz else 1)
def upload_handler(end_event: threading.Event) -> None:
sm = messaging.SubMaster(['deviceState'])
tid = threading.get_ident()
while not end_event.is_set():
cur_upload_items[tid] = None
try:
cur_upload_items[tid] = item = replace(upload_queue.get(timeout=1), current=True)
if item.id in cancelled_uploads:
cancelled_uploads.remove(item.id)
continue
# Remove item if too old
age = datetime.now() - datetime.fromtimestamp(item.created_at / 1000)
if age.total_seconds() > MAX_AGE:
cloudlog.event("athena.upload_handler.expired", item=item, error=True)
continue
# Check if uploading over metered connection is allowed
sm.update(0)
metered = sm['deviceState'].networkMetered
network_type = sm['deviceState'].networkType.raw
if metered and (not item.allow_cellular):
retry_upload(tid, end_event, False)
continue
try:
fn = item.path
try:
sz = os.path.getsize(fn)
except OSError:
sz = -1
cloudlog.event("athena.upload_handler.upload_start", fn=fn, sz=sz, network_type=network_type, metered=metered, retry_count=item.retry_count)
response = _do_upload(item, partial(cb, sm, item, tid))
if response.status_code not in (200, 201, 401, 403, 412):
cloudlog.event("athena.upload_handler.retry", status_code=response.status_code, fn=fn, sz=sz, network_type=network_type, metered=metered)
retry_upload(tid, end_event)
else:
cloudlog.event("athena.upload_handler.success", fn=fn, sz=sz, network_type=network_type, metered=metered)
UploadQueueCache.cache(upload_queue)
except (requests.exceptions.Timeout, requests.exceptions.ConnectionError, requests.exceptions.SSLError):
cloudlog.event("athena.upload_handler.timeout", fn=fn, sz=sz, network_type=network_type, metered=metered)
retry_upload(tid, end_event)
except AbortTransferException:
cloudlog.event("athena.upload_handler.abort", fn=fn, sz=sz, network_type=network_type, metered=metered)
retry_upload(tid, end_event, False)
except queue.Empty:
pass
except Exception:
cloudlog.exception("athena.upload_handler.exception")
def _do_upload(upload_item: UploadItem, callback: Optional[Callable] = None) -> requests.Response:
path = upload_item.path
compress = False
# If file does not exist, but does exist without the .bz2 extension we will compress on the fly
if not os.path.exists(path) and os.path.exists(strip_bz2_extension(path)):
path = strip_bz2_extension(path)
compress = True
with open(path, "rb") as f:
data: BinaryIO
if compress:
cloudlog.event("athena.upload_handler.compress", fn=path, fn_orig=upload_item.path)
compressed = bz2.compress(f.read())
size = len(compressed)
data = io.BytesIO(compressed)
else:
size = os.fstat(f.fileno()).st_size
data = f
return requests.put(upload_item.url,
data=CallbackReader(data, callback, size) if callback else data,
headers={**upload_item.headers, 'Content-Length': str(size)},
timeout=30)
# security: user should be able to request any message from their car
@dispatcher.add_method
def getMessage(service: str, timeout: int = 1000) -> Dict:
if service is None or service not in SERVICE_LIST:
raise Exception("invalid service")
socket = messaging.sub_sock(service, timeout=timeout)
ret = messaging.recv_one(socket)
if ret is None:
raise TimeoutError
# this is because capnp._DynamicStructReader doesn't have typing information
return cast(Dict, ret.to_dict())
@dispatcher.add_method
def getVersion() -> Dict[str, str]:
return {
"version": get_version(),
"remote": get_origin(''),
"branch": get_short_branch(''),
"commit": get_commit(default=''),
}
@dispatcher.add_method
def setNavDestination(latitude: int = 0, longitude: int = 0, place_name: Optional[str] = None, place_details: Optional[str] = None) -> Dict[str, int]:
destination = {
"latitude": latitude,
"longitude": longitude,
"place_name": place_name,
"place_details": place_details,
}
Params().put("NavDestination", json.dumps(destination))
return {"success": 1}
def scan_dir(path: str, prefix: str) -> List[str]:
files = []
# only walk directories that match the prefix
# (glob and friends traverse entire dir tree)
with os.scandir(path) as i:
for e in i:
rel_path = os.path.relpath(e.path, ROOT)
if e.is_dir(follow_symlinks=False):
# add trailing slash
rel_path = os.path.join(rel_path, '')
# if prefix is a partial dir name, current dir will start with prefix
# if prefix is a partial file name, prefix with start with dir name
if rel_path.startswith(prefix) or prefix.startswith(rel_path):
files.extend(scan_dir(e.path, prefix))
else:
if rel_path.startswith(prefix):
files.append(rel_path)
return files
@dispatcher.add_method
def listDataDirectory(prefix='') -> List[str]:
return scan_dir(ROOT, prefix)
@dispatcher.add_method
def reboot() -> Dict[str, int]:
sock = messaging.sub_sock("deviceState", timeout=1000)
ret = messaging.recv_one(sock)
if ret is None or ret.deviceState.started:
raise Exception("Reboot unavailable")
def do_reboot() -> None:
time.sleep(2)
HARDWARE.reboot()
threading.Thread(target=do_reboot).start()
return {"success": 1}
@dispatcher.add_method
def uploadFileToUrl(fn: str, url: str, headers: Dict[str, str]) -> UploadFilesToUrlResponse:
# this is because mypy doesn't understand that the decorator doesn't change the return type
response: UploadFilesToUrlResponse = uploadFilesToUrls([{
"fn": fn,
"url": url,
"headers": headers,
}])
return response
@dispatcher.add_method
def uploadFilesToUrls(files_data: List[UploadFileDict]) -> UploadFilesToUrlResponse:
files = map(UploadFile.from_dict, files_data)
items: List[UploadItemDict] = []
failed: List[str] = []
for file in files:
if len(file.fn) == 0 or file.fn[0] == '/' or '..' in file.fn or len(file.url) == 0:
failed.append(file.fn)
continue
path = os.path.join(ROOT, file.fn)
if not os.path.exists(path) and not os.path.exists(strip_bz2_extension(path)):
failed.append(file.fn)
continue
# Skip item if already in queue
url = file.url.split('?')[0]
if any(url == item['url'].split('?')[0] for item in listUploadQueue()):
continue
item = UploadItem(
path=path,
url=file.url,
headers=file.headers,
created_at=int(time.time() * 1000),
id=None,
allow_cellular=file.allow_cellular,
)
upload_id = hashlib.sha1(str(item).encode()).hexdigest()
item = replace(item, id=upload_id)
upload_queue.put_nowait(item)
items.append(asdict(item))
UploadQueueCache.cache(upload_queue)
resp: UploadFilesToUrlResponse = {"enqueued": len(items), "items": items}
if failed:
resp["failed"] = failed
return resp
@dispatcher.add_method
def listUploadQueue() -> List[UploadItemDict]:
items = list(upload_queue.queue) + list(cur_upload_items.values())
return [asdict(i) for i in items if (i is not None) and (i.id not in cancelled_uploads)]
@dispatcher.add_method
def cancelUpload(upload_id: Union[str, List[str]]) -> Dict[str, Union[int, str]]:
if not isinstance(upload_id, list):
upload_id = [upload_id]
uploading_ids = {item.id for item in list(upload_queue.queue)}
cancelled_ids = uploading_ids.intersection(upload_id)
if len(cancelled_ids) == 0:
return {"success": 0, "error": "not found"}
cancelled_uploads.update(cancelled_ids)
return {"success": 1}
@dispatcher.add_method
def primeActivated(activated: bool) -> Dict[str, int]:
return {"success": 1}
@dispatcher.add_method
def setBandwithLimit(upload_speed_kbps: int, download_speed_kbps: int) -> Dict[str, Union[int, str]]:
if not AGNOS:
return {"success": 0, "error": "only supported on AGNOS"}
try:
HARDWARE.set_bandwidth_limit(upload_speed_kbps, download_speed_kbps)
return {"success": 1}
except subprocess.CalledProcessError as e:
return {"success": 0, "error": "failed to set limit", "stdout": e.stdout, "stderr": e.stderr}
def startLocalProxy(global_end_event: threading.Event, remote_ws_uri: str, local_port: int) -> Dict[str, int]:
try:
if local_port not in LOCAL_PORT_WHITELIST:
raise Exception("Requested local port not whitelisted")
cloudlog.debug("athena.startLocalProxy.starting")
dongle_id = Params().get("DongleId").decode('utf8')
identity_token = Api(dongle_id).get_token()
ws = create_connection(remote_ws_uri,
cookie="jwt=" + identity_token,
enable_multithread=True)
ssock, csock = socket.socketpair()
local_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
local_sock.connect(('127.0.0.1', local_port))
local_sock.setblocking(False)
proxy_end_event = threading.Event()
threads = [
threading.Thread(target=ws_proxy_recv, args=(ws, local_sock, ssock, proxy_end_event, global_end_event)),
threading.Thread(target=ws_proxy_send, args=(ws, local_sock, csock, proxy_end_event))
]
for thread in threads:
thread.start()
cloudlog.debug("athena.startLocalProxy.started")
return {"success": 1}
except Exception as e:
cloudlog.exception("athenad.startLocalProxy.exception")
raise e
@dispatcher.add_method
def getPublicKey() -> Optional[str]:
if not os.path.isfile(PERSIST + '/comma/id_rsa.pub'):
return None
with open(PERSIST + '/comma/id_rsa.pub') as f:
return f.read()
@dispatcher.add_method
def getSshAuthorizedKeys() -> str:
return Params().get("GithubSshKeys", encoding='utf8') or ''
@dispatcher.add_method
def getGithubUsername() -> str:
return Params().get("GithubUsername", encoding='utf8') or ''
@dispatcher.add_method
def getSimInfo():
return HARDWARE.get_sim_info()
@dispatcher.add_method
def getNetworkType():
return HARDWARE.get_network_type()
@dispatcher.add_method
def getNetworkMetered() -> bool:
network_type = HARDWARE.get_network_type()
return HARDWARE.get_network_metered(network_type)
@dispatcher.add_method
def getNetworks():
return HARDWARE.get_networks()
@dispatcher.add_method
def takeSnapshot() -> Optional[Union[str, Dict[str, str]]]:
from openpilot.system.camerad.snapshot.snapshot import jpeg_write, snapshot
ret = snapshot()
if ret is not None:
def b64jpeg(x):
if x is not None:
f = io.BytesIO()
jpeg_write(f, x)
return base64.b64encode(f.getvalue()).decode("utf-8")
else:
return None
return {'jpegBack': b64jpeg(ret[0]),
'jpegFront': b64jpeg(ret[1])}
else:
raise Exception("not available while camerad is started")
def get_logs_to_send_sorted() -> List[str]:
# TODO: scan once then use inotify to detect file creation/deletion
curr_time = int(time.time())
logs = []
for log_entry in os.listdir(SWAGLOG_DIR):
log_path = os.path.join(SWAGLOG_DIR, log_entry)
time_sent = 0
try:
value = getxattr(log_path, LOG_ATTR_NAME)
if value is not None:
time_sent = int.from_bytes(value, sys.byteorder)
except (ValueError, TypeError):
pass
# assume send failed and we lost the response if sent more than one hour ago
if not time_sent or curr_time - time_sent > 3600:
logs.append(log_entry)
# excluding most recent (active) log file
return sorted(logs)[:-1]
def log_handler(end_event: threading.Event) -> None:
if PC:
cloudlog.warning("log_handler 在PC环境下不上传日志")
return
cloudlog.info("log_handler 启动")
log_files = []
last_scan = 0.
while not end_event.is_set():
try:
curr_scan = time.monotonic()
if curr_scan - last_scan > 10:
log_files = get_logs_to_send_sorted()
cloudlog.debug(f"log_handler 扫描到 {len(log_files)} 个日志文件待上传")
last_scan = curr_scan
# send one log
curr_log = None
if len(log_files) > 0:
log_entry = log_files.pop() # newest log file
cloudlog.info(f"log_handler 准备上传日志文件: {log_entry}")
try:
curr_time = int(time.time())
log_path = os.path.join(SWAGLOG_DIR, log_entry)
setxattr(log_path, LOG_ATTR_NAME, int.to_bytes(curr_time, 4, sys.byteorder))
cloudlog.debug(f"log_handler 设置日志文件属性: {log_path}")
with open(log_path) as f:
log_content = f.read()
log_size = len(log_content)
cloudlog.debug(f"log_handler 日志文件大小: {log_size} 字节")
jsonrpc = {
"method": "forwardLogs",
"params": {
"logs": log_content
},
"jsonrpc": "2.0",
"id": log_entry
}
cloudlog.info(f"log_handler 将日志放入发送队列: {log_entry}")
low_priority_send_queue.put_nowait(json.dumps(jsonrpc))
curr_log = log_entry
except OSError as e:
cloudlog.error(f"log_handler 打开日志文件失败: {str(e)}")
pass # file could be deleted by log rotation
# wait for response up to ~100 seconds
# always read queue at least once to process any old responses that arrive
for i in range(100):
if end_event.is_set():
cloudlog.info("log_handler 收到结束事件")
break
try:
log_resp = json.loads(log_recv_queue.get(timeout=1))
log_entry = log_resp.get("id")
log_success = "result" in log_resp and log_resp["result"].get("success")
cloudlog.info(f"log_handler 收到响应: 文件={log_entry}, 成功={log_success}")
if log_entry and log_success:
log_path = os.path.join(SWAGLOG_DIR, log_entry)
try:
setxattr(log_path, LOG_ATTR_NAME, LOG_ATTR_VALUE_MAX_UNIX_TIME)
cloudlog.debug(f"log_handler 标记日志文件已上传: {log_path}")
except OSError as e:
cloudlog.error(f"log_handler 设置日志文件属性失败: {str(e)}")
pass # file could be deleted by log rotation
if curr_log == log_entry:
cloudlog.info(f"log_handler 当前日志文件处理完成: {curr_log}")
break
except queue.Empty:
if i % 10 == 0: # 每10次循环记录一次等待信息
cloudlog.debug(f"log_handler 等待响应中... ({i}/100)")
if curr_log is None:
break
except Exception as e:
cloudlog.exception(f"log_handler 异常: {str(e)}")
cloudlog.info("log_handler 结束")
def stat_handler(end_event: threading.Event) -> None:
cloudlog.info("stat_handler 启动")
while not end_event.is_set():
last_scan = 0.
curr_scan = time.monotonic()
try:
if curr_scan - last_scan > 10:
stat_filenames = list(filter(lambda name: not name.startswith(tempfile.gettempprefix()), os.listdir(STATS_DIR)))
cloudlog.debug(f"stat_handler 扫描到 {len(stat_filenames)} 个统计文件")
if len(stat_filenames) > 0:
stat_path = os.path.join(STATS_DIR, stat_filenames[0])
cloudlog.info(f"stat_handler 准备上传统计文件: {stat_path}")
with open(stat_path) as f:
stats_content = f.read()
stats_size = len(stats_content)
cloudlog.debug(f"stat_handler 统计文件大小: {stats_size} 字节")
jsonrpc = {
"method": "storeStats",
"params": {
"stats": stats_content
},
"jsonrpc": "2.0",
"id": stat_filenames[0]
}
cloudlog.info(f"stat_handler 将统计数据放入发送队列: {stat_filenames[0]}")
low_priority_send_queue.put_nowait(json.dumps(jsonrpc))
os.remove(stat_path)
cloudlog.info(f"stat_handler 已删除已处理的统计文件: {stat_path}")
last_scan = curr_scan
except Exception as e:
cloudlog.exception(f"stat_handler 异常: {str(e)}")
time.sleep(0.1)
cloudlog.info("stat_handler 结束")
def ws_proxy_recv(ws: WebSocket, local_sock: socket.socket, ssock: socket.socket, end_event: threading.Event, global_end_event: threading.Event) -> None:
while not (end_event.is_set() or global_end_event.is_set()):
try:
data = ws.recv()
local_sock.sendall(data)
except WebSocketTimeoutException:
pass
except Exception:
cloudlog.exception("athenad.ws_proxy_recv.exception")
break
cloudlog.debug("athena.ws_proxy_recv closing sockets")
ssock.close()
local_sock.close()
cloudlog.debug("athena.ws_proxy_recv done closing sockets")
end_event.set()
def ws_proxy_send(ws: WebSocket, local_sock: socket.socket, signal_sock: socket.socket, end_event: threading.Event) -> None:
while not end_event.is_set():
try:
r, _, _ = select.select((local_sock, signal_sock), (), ())
if r:
if r[0].fileno() == signal_sock.fileno():
# got end signal from ws_proxy_recv
end_event.set()
break
data = local_sock.recv(4096)
if not data:
# local_sock is dead
end_event.set()
break
ws.send(data, ABNF.OPCODE_BINARY)
except Exception:
cloudlog.exception("athenad.ws_proxy_send.exception")
end_event.set()
cloudlog.debug("athena.ws_proxy_send closing sockets")
signal_sock.close()
cloudlog.debug("athena.ws_proxy_send done closing sockets")
def ws_recv(ws: WebSocket, end_event: threading.Event) -> None:
last_ping = int(time.monotonic() * 1e9)
while not end_event.is_set():
try:
opcode, data = ws.recv_data(control_frame=True)
if opcode in (ABNF.OPCODE_TEXT, ABNF.OPCODE_BINARY):
if opcode == ABNF.OPCODE_TEXT:
data = data.decode("utf-8")
recv_queue.put_nowait(data)
elif opcode == ABNF.OPCODE_PING:
last_ping = int(time.monotonic() * 1e9)
Params().put("LastAthenaPingTime", str(last_ping))
except WebSocketTimeoutException:
ns_since_last_ping = int(time.monotonic() * 1e9) - last_ping
if ns_since_last_ping > RECONNECT_TIMEOUT_S * 1e9:
cloudlog.exception("athenad.ws_recv.timeout")
end_event.set()
except Exception:
cloudlog.exception("athenad.ws_recv.exception")
end_event.set()
def ws_send(ws: WebSocket, end_event: threading.Event) -> None:
while not end_event.is_set():
try:
try:
data = send_queue.get_nowait()
except queue.Empty:
data = low_priority_send_queue.get(timeout=1)
for i in range(0, len(data), WS_FRAME_SIZE):
frame = data[i:i+WS_FRAME_SIZE]
last = i + WS_FRAME_SIZE >= len(data)
opcode = ABNF.OPCODE_TEXT if i == 0 else ABNF.OPCODE_CONT
ws.send_frame(ABNF.create_frame(frame, opcode, last))
except queue.Empty:
pass
except Exception:
cloudlog.exception("athenad.ws_send.exception")
end_event.set()
def ws_manage(ws: WebSocket, end_event: threading.Event) -> None:
params = Params()
onroad_prev = None
sock = ws.sock
while True:
onroad = params.get_bool("IsOnroad")
if onroad != onroad_prev:
onroad_prev = onroad
sock.setsockopt(socket.IPPROTO_TCP, TCP_USER_TIMEOUT, 16000 if onroad else 0)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 7 if onroad else 30)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 7 if onroad else 10)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 2 if onroad else 3)
if end_event.wait(5):
break
def backoff(retries: int) -> int:
return random.randrange(0, min(128, int(2 ** retries)))
def main(exit_event: Optional[threading.Event] = None):
try:
set_core_affinity([0, 1, 2, 3])
except Exception:
cloudlog.exception("failed to set core affinity")
params = Params()
dongle_id = params.get("DongleId", encoding='utf-8')
UploadQueueCache.initialize(upload_queue)
ws_uri = ATHENA_HOST + "/ws/v2/" + dongle_id
api = Api(dongle_id)
conn_start = None
conn_retries = 0
while exit_event is None or not exit_event.is_set():
try:
if conn_start is None:
conn_start = time.monotonic()
cloudlog.event("athenad.main.connecting_ws", ws_uri=ws_uri, retries=conn_retries)
ws = create_connection(ws_uri,
cookie="jwt=" + api.get_token(),
enable_multithread=True,
timeout=30.0)
cloudlog.event("athenad.main.connected_ws", ws_uri=ws_uri, retries=conn_retries,
duration=time.monotonic() - conn_start)
conn_start = None
conn_retries = 0
cur_upload_items.clear()
handle_long_poll(ws, exit_event)
except (KeyboardInterrupt, SystemExit):
break
except (ConnectionError, TimeoutError, WebSocketException):
conn_retries += 1
params.remove("LastAthenaPingTime")
except Exception:
cloudlog.exception("athenad.main.exception")
conn_retries += 1
params.remove("LastAthenaPingTime")
time.sleep(backoff(conn_retries))
if __name__ == "__main__":
main()