178 lines
5.5 KiB
Python
178 lines
5.5 KiB
Python
import os
|
|
import time
|
|
import subprocess
|
|
from functools import wraps
|
|
from pathlib import Path
|
|
|
|
from flask import render_template, request, session
|
|
from openpilot.system.hardware import PC
|
|
from openpilot.system.hardware.hw import Paths
|
|
from openpilot.selfdrive.loggerd.uploader import listdir_by_creation
|
|
|
|
from tools.lib.route import SegmentName
|
|
|
|
# path to sunnypilot screen recordings and error logs
|
|
if PC:
|
|
SCREENRECORD_PATH = os.path.join(str(Path.home()), ".comma", "media", "0", "dashcam", "")
|
|
ERROR_LOGS_PATH = os.path.join(str(Path.home()), ".comma", "media", "0", "crash_logs", "")
|
|
C2_LOGS_PATH = os.path.join(str(Path.home()), ".comma", "media", "0", "c2_logs", "")
|
|
GPX_RECORD_PATH = os.path.join(str(Path.home()), ".comma", "media", "0", "gpx_logs", "")
|
|
PIN_PATH = os.path.join(str(Path.home()), ".comma", "otp", "")
|
|
else:
|
|
SCREENRECORD_PATH = "/data/media/0/dashcam/"
|
|
ERROR_LOGS_PATH = "/data/media/0/crash_logs/"
|
|
C2_LOGS_PATH = "/data/media/0/c2_logs/"
|
|
GPX_RECORD_PATH = "/data/media/0/gpx_logs/"
|
|
PIN_PATH = "/data/otp/"
|
|
|
|
|
|
def login_required(f):
|
|
@wraps(f)
|
|
def decorated_route(*args, **kwargs):
|
|
if not session.get("logged_in"):
|
|
session["previous_page"] = request.url
|
|
return render_template("login.html")
|
|
return f(*args, **kwargs)
|
|
return decorated_route
|
|
|
|
def is_valid_segment(segment):
|
|
try:
|
|
segment_to_segment_name(Paths.log_root(), segment)
|
|
return True
|
|
except AssertionError:
|
|
return False
|
|
|
|
|
|
def segment_to_segment_name(data_dir, segment):
|
|
fake_dongle = "ffffffffffffffff"
|
|
return SegmentName(str(os.path.join(data_dir, fake_dongle + "|" + segment)))
|
|
|
|
|
|
def all_segment_names():
|
|
segments = []
|
|
for segment in listdir_by_creation(Paths.log_root()):
|
|
try:
|
|
segments.append(segment_to_segment_name(Paths.log_root(), segment))
|
|
except AssertionError:
|
|
pass
|
|
return segments
|
|
|
|
|
|
def all_routes():
|
|
segment_names = all_segment_names()
|
|
route_names = [segment_name.route_name for segment_name in segment_names]
|
|
route_times = [route_name.time_str for route_name in route_names]
|
|
unique_routes = list(dict.fromkeys(route_times))
|
|
return sorted(unique_routes, reverse=True)
|
|
|
|
|
|
def segments_in_route(route):
|
|
segment_names = [segment_name for segment_name in all_segment_names() if segment_name.time_str == route]
|
|
segments = [segment_name.time_str + "--" + str(segment_name.segment_num) for segment_name in segment_names]
|
|
return segments
|
|
|
|
|
|
def ffmpeg_mp4_concat_wrap_process_builder(file_list, cameratype, chunk_size=1024*512):
|
|
command_line = ["ffmpeg"]
|
|
if not cameratype == "qcamera":
|
|
command_line += ["-f", "hevc"]
|
|
command_line += ["-r", "20"]
|
|
command_line += ["-i", "concat:" + file_list]
|
|
command_line += ["-c", "copy"]
|
|
command_line += ["-map", "0"]
|
|
if not cameratype == "qcamera":
|
|
command_line += ["-vtag", "hvc1"]
|
|
command_line += ["-f", "mp4"]
|
|
command_line += ["-movflags", "empty_moov"]
|
|
command_line += ["-"]
|
|
return subprocess.Popen(
|
|
command_line, stdout=subprocess.PIPE,
|
|
bufsize=chunk_size
|
|
)
|
|
|
|
|
|
def ffmpeg_mp4_wrap_process_builder(filename):
|
|
"""Returns a process that will wrap the given filename
|
|
inside a mp4 container, for easier playback by browsers
|
|
and other devices. Primary use case is streaming segment videos
|
|
to the vidserver tool.
|
|
filename is expected to be a pathname to one of the following
|
|
/path/to/a/qcamera.ts
|
|
/path/to/a/dcamera.hevc
|
|
/path/to/a/ecamera.hevc
|
|
/path/to/a/fcamera.hevc
|
|
"""
|
|
basename = filename.rsplit("/")[-1]
|
|
extension = basename.rsplit(".")[-1]
|
|
command_line = ["ffmpeg"]
|
|
if extension == "hevc":
|
|
command_line += ["-f", "hevc"]
|
|
command_line += ["-r", "20"]
|
|
command_line += ["-i", filename]
|
|
command_line += ["-c", "copy"]
|
|
command_line += ["-map", "0"]
|
|
if extension == "hevc":
|
|
command_line += ["-vtag", "hvc1"]
|
|
command_line += ["-f", "mp4"]
|
|
command_line += ["-movflags", "empty_moov"]
|
|
command_line += ["-"]
|
|
return subprocess.Popen(
|
|
command_line, stdout=subprocess.PIPE
|
|
)
|
|
|
|
|
|
def ffplay_mp4_wrap_process_builder(file_name):
|
|
command_line = ["ffmpeg"]
|
|
command_line += ["-i", file_name]
|
|
command_line += ["-c", "copy"]
|
|
command_line += ["-map", "0"]
|
|
command_line += ["-f", "mp4"]
|
|
command_line += ["-movflags", "empty_moov"]
|
|
command_line += ["-"]
|
|
return subprocess.Popen(
|
|
command_line, stdout=subprocess.PIPE
|
|
)
|
|
def get_file_info(full_path, name, base_path=""):
|
|
info = {
|
|
"name": name,
|
|
"type": "file",
|
|
"size": "",
|
|
"mtime": "",
|
|
"path": os.path.relpath(full_path, base_path) if base_path else name
|
|
}
|
|
|
|
try:
|
|
stat = os.stat(full_path)
|
|
info["mtime"] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(stat.st_mtime))
|
|
|
|
if os.path.isdir(full_path):
|
|
info["type"] = "directory"
|
|
else:
|
|
size = stat.st_size
|
|
if size < 1024:
|
|
info["size"] = f"{size} B"
|
|
elif size < 1024*1024:
|
|
info["size"] = f"{size/1024:.1f} KB"
|
|
else:
|
|
info["size"] = f"{size/(1024*1024):.1f} MB"
|
|
except Exception:
|
|
pass
|
|
|
|
return info
|
|
|
|
def list_files(path, single=False):
|
|
if not os.path.exists(path):
|
|
return []
|
|
|
|
files = []
|
|
try:
|
|
entries = os.listdir(path) if single else listdir_by_creation(path)
|
|
|
|
for name in entries:
|
|
full_path = os.path.join(path, name)
|
|
info = get_file_info(full_path, name, path)
|
|
files.append(info)
|
|
|
|
return sorted(files, key=lambda x: x["name"], reverse=True)
|
|
except Exception:
|
|
return [] |