Files
dragonpilot/system/fleetmanager/helpers.py
Comma Device 5c73e264e9 Release 260308
2026-03-08 23:26:57 +08:00

178 lines
5.5 KiB
Python

import os
import time
import subprocess
from functools import wraps
from pathlib import Path
from flask import render_template, request, session
from openpilot.system.hardware import PC
from openpilot.system.hardware.hw import Paths
from openpilot.selfdrive.loggerd.uploader import listdir_by_creation
from tools.lib.route import SegmentName
# path to sunnypilot screen recordings and error logs
if PC:
SCREENRECORD_PATH = os.path.join(str(Path.home()), ".comma", "media", "0", "dashcam", "")
ERROR_LOGS_PATH = os.path.join(str(Path.home()), ".comma", "media", "0", "crash_logs", "")
C2_LOGS_PATH = os.path.join(str(Path.home()), ".comma", "media", "0", "c2_logs", "")
GPX_RECORD_PATH = os.path.join(str(Path.home()), ".comma", "media", "0", "gpx_logs", "")
PIN_PATH = os.path.join(str(Path.home()), ".comma", "otp", "")
else:
SCREENRECORD_PATH = "/data/media/0/dashcam/"
ERROR_LOGS_PATH = "/data/media/0/crash_logs/"
C2_LOGS_PATH = "/data/media/0/c2_logs/"
GPX_RECORD_PATH = "/data/media/0/gpx_logs/"
PIN_PATH = "/data/otp/"
def login_required(f):
@wraps(f)
def decorated_route(*args, **kwargs):
if not session.get("logged_in"):
session["previous_page"] = request.url
return render_template("login.html")
return f(*args, **kwargs)
return decorated_route
def is_valid_segment(segment):
try:
segment_to_segment_name(Paths.log_root(), segment)
return True
except AssertionError:
return False
def segment_to_segment_name(data_dir, segment):
fake_dongle = "ffffffffffffffff"
return SegmentName(str(os.path.join(data_dir, fake_dongle + "|" + segment)))
def all_segment_names():
segments = []
for segment in listdir_by_creation(Paths.log_root()):
try:
segments.append(segment_to_segment_name(Paths.log_root(), segment))
except AssertionError:
pass
return segments
def all_routes():
segment_names = all_segment_names()
route_names = [segment_name.route_name for segment_name in segment_names]
route_times = [route_name.time_str for route_name in route_names]
unique_routes = list(dict.fromkeys(route_times))
return sorted(unique_routes, reverse=True)
def segments_in_route(route):
segment_names = [segment_name for segment_name in all_segment_names() if segment_name.time_str == route]
segments = [segment_name.time_str + "--" + str(segment_name.segment_num) for segment_name in segment_names]
return segments
def ffmpeg_mp4_concat_wrap_process_builder(file_list, cameratype, chunk_size=1024*512):
command_line = ["ffmpeg"]
if not cameratype == "qcamera":
command_line += ["-f", "hevc"]
command_line += ["-r", "20"]
command_line += ["-i", "concat:" + file_list]
command_line += ["-c", "copy"]
command_line += ["-map", "0"]
if not cameratype == "qcamera":
command_line += ["-vtag", "hvc1"]
command_line += ["-f", "mp4"]
command_line += ["-movflags", "empty_moov"]
command_line += ["-"]
return subprocess.Popen(
command_line, stdout=subprocess.PIPE,
bufsize=chunk_size
)
def ffmpeg_mp4_wrap_process_builder(filename):
"""Returns a process that will wrap the given filename
inside a mp4 container, for easier playback by browsers
and other devices. Primary use case is streaming segment videos
to the vidserver tool.
filename is expected to be a pathname to one of the following
/path/to/a/qcamera.ts
/path/to/a/dcamera.hevc
/path/to/a/ecamera.hevc
/path/to/a/fcamera.hevc
"""
basename = filename.rsplit("/")[-1]
extension = basename.rsplit(".")[-1]
command_line = ["ffmpeg"]
if extension == "hevc":
command_line += ["-f", "hevc"]
command_line += ["-r", "20"]
command_line += ["-i", filename]
command_line += ["-c", "copy"]
command_line += ["-map", "0"]
if extension == "hevc":
command_line += ["-vtag", "hvc1"]
command_line += ["-f", "mp4"]
command_line += ["-movflags", "empty_moov"]
command_line += ["-"]
return subprocess.Popen(
command_line, stdout=subprocess.PIPE
)
def ffplay_mp4_wrap_process_builder(file_name):
command_line = ["ffmpeg"]
command_line += ["-i", file_name]
command_line += ["-c", "copy"]
command_line += ["-map", "0"]
command_line += ["-f", "mp4"]
command_line += ["-movflags", "empty_moov"]
command_line += ["-"]
return subprocess.Popen(
command_line, stdout=subprocess.PIPE
)
def get_file_info(full_path, name, base_path=""):
info = {
"name": name,
"type": "file",
"size": "",
"mtime": "",
"path": os.path.relpath(full_path, base_path) if base_path else name
}
try:
stat = os.stat(full_path)
info["mtime"] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(stat.st_mtime))
if os.path.isdir(full_path):
info["type"] = "directory"
else:
size = stat.st_size
if size < 1024:
info["size"] = f"{size} B"
elif size < 1024*1024:
info["size"] = f"{size/1024:.1f} KB"
else:
info["size"] = f"{size/(1024*1024):.1f} MB"
except Exception:
pass
return info
def list_files(path, single=False):
if not os.path.exists(path):
return []
files = []
try:
entries = os.listdir(path) if single else listdir_by_creation(path)
for name in entries:
full_path = os.path.join(path, name)
info = get_file_info(full_path, name, path)
files.append(info)
return sorted(files, key=lambda x: x["name"], reverse=True)
except Exception:
return []