Files
youdis/youdis/main.py
2026-04-02 12:27:22 -04:00

312 lines
9.5 KiB
Python

"""FastAPI backend worker managing yt-dlp subprocess jobs and exposing job state."""
import asyncio
from asyncio.subprocess import PIPE, STDOUT
from collections import deque
from dataclasses import dataclass, field
from datetime import datetime
from os import getenv
from pathlib import Path
import tempfile
from uuid import uuid4
from fastapi import FastAPI, HTTPException
from .env import load_project_dotenv
from .models import CurrentJobResponse, HealthResponse, JobRequest, JobStatus, VersionResponse
load_project_dotenv()
REPO_ROOT = Path(__file__).resolve().parent.parent
DEFAULT_CONFIG = REPO_ROOT / "default-yt-dlp.conf"
VERSION_FILE = REPO_ROOT / "youdis-version.txt"
YTDLP_EXECUTABLE = getenv("YOUDIS_YTDLP_EXECUTABLE", "yt-dlp")
PREFERRED_CONFIG_ROOT = Path(getenv("YOUDIS_CONFIG_DIR", "/config"))
PREFERRED_DOWNLOAD_ROOT = Path(getenv("YOUDIS_DOWNLOAD_DIR", "/downloads"))
LOCAL_RUNTIME_ROOT = REPO_ROOT / ".runtime"
FALLBACK_CONFIG_ROOT = LOCAL_RUNTIME_ROOT / "config"
FALLBACK_DOWNLOAD_ROOT = LOCAL_RUNTIME_ROOT / "downloads"
OUTPUT_TEMPLATE = "%(uploader)s/%(playlist_title)s/%(playlist_index)s%(playlist_index& - )s%(title)s.%(ext)s"
@dataclass
class ManagedJob:
status: JobStatus
process: asyncio.subprocess.Process | None = None
task: asyncio.Task | None = None
cancel_requested: bool = False
output_lines: deque[str] = field(default_factory=lambda: deque(maxlen=25))
app = FastAPI(title="youdis", version="2")
job_lock = asyncio.Lock()
active_job: ManagedJob | None = None
last_job: JobStatus | None = None
def now_utc() -> datetime:
return datetime.now()
def read_version() -> str:
if VERSION_FILE.exists():
return VERSION_FILE.read_text().strip()
return "unknown"
def ensure_writable_directory(preferred: Path, fallback: Path) -> Path:
for candidate in (preferred, fallback):
try:
candidate.mkdir(parents=True, exist_ok=True)
with tempfile.NamedTemporaryFile(dir=candidate, prefix=".youdis-write-", delete=True):
pass
return candidate
except OSError:
continue
raise OSError(f"no writable runtime directory available for {preferred} or {fallback}")
def resolve_runtime_paths() -> tuple[Path, Path]:
config_root = ensure_writable_directory(PREFERRED_CONFIG_ROOT, FALLBACK_CONFIG_ROOT)
download_root = ensure_writable_directory(PREFERRED_DOWNLOAD_ROOT, FALLBACK_DOWNLOAD_ROOT)
return config_root, download_root
def build_ytdlp_command(request: JobRequest, config_root: Path, download_root: Path) -> list[str]:
return [
YTDLP_EXECUTABLE,
"--config-locations",
str(DEFAULT_CONFIG),
"--download-archive",
str(config_root / "archive.txt"),
"--output",
str(download_root / OUTPUT_TEMPLATE),
request.url,
]
def clone_status(status: JobStatus) -> JobStatus:
return JobStatus(**status.model_dump())
def update_status(job: ManagedJob, **changes: object) -> None:
for key, value in changes.items():
setattr(job.status, key, value)
job.status.updated_at = now_utc()
def classify_output_line(job: ManagedJob, line: str) -> None:
if not line:
return
job.output_lines.append(line)
message = line.strip()
if not message:
return
lowered = message.lower()
if "has already been recorded in the archive" in lowered:
update_status(
job,
disposition="archive_hit",
phase="downloading",
message="already in archive",
)
return
if "[download]" in lowered:
update_status(job, phase="downloading", message=message)
if "destination:" in lowered:
result_path = message.split(":", 1)[1].strip()
update_status(job, result_path=result_path)
return
if "merging formats into" in lowered:
result_path = message.split("into", 1)[1].strip().strip('"')
update_status(job, phase="postprocessing", message=message, result_path=result_path)
return
update_status(job, message=message)
async def finalize_job(job: ManagedJob, returncode: int) -> None:
disposition = job.status.disposition
if job.cancel_requested:
state = "cancelled"
message = "cancelled"
elif returncode == 0 and disposition == "archive_hit":
state = "completed"
message = "already in archive"
elif returncode == 0:
state = "completed"
message = job.status.message or "completed"
else:
state = "failed"
message = job.status.message or f"yt-dlp exited with code {returncode}"
update_status(job, state=state, phase=None, returncode=returncode, message=message)
global active_job, last_job
async with job_lock:
if active_job is job:
active_job = None
last_job = clone_status(job.status)
async def run_job(job: ManagedJob, request: JobRequest) -> None:
update_status(
job,
state="running",
phase="starting",
message="starting yt-dlp",
)
try:
if not DEFAULT_CONFIG.exists():
update_status(
job,
state="failed",
phase=None,
message=f"default config not found: {DEFAULT_CONFIG}",
returncode=78,
)
await finalize_job(job, 78)
return
config_root, download_root = resolve_runtime_paths()
command = build_ytdlp_command(request, config_root, download_root)
update_status(
job,
command=command,
archive_path=str(config_root / "archive.txt"),
result_path=str(download_root),
)
process = await asyncio.create_subprocess_exec(
*command,
stdout=PIPE,
stderr=STDOUT,
)
except FileNotFoundError:
update_status(
job,
state="failed",
phase=None,
message="yt-dlp executable not found",
returncode=127,
)
await finalize_job(job, 127)
return
except OSError as exc:
update_status(
job,
state="failed",
phase=None,
message=f"runtime path setup failed: {exc}",
returncode=73,
)
await finalize_job(job, 73)
return
try:
job.process = process
update_status(job, phase="running", message="yt-dlp running")
assert process.stdout is not None
while True:
line = await process.stdout.readline()
if not line:
break
classify_output_line(job, line.decode(errors="replace").strip())
returncode = await process.wait()
await finalize_job(job, returncode)
except Exception as exc:
update_status(
job,
state="failed",
phase=None,
message=f"backend runner error: {exc}",
returncode=1,
)
await finalize_job(job, 1)
async def create_job(request: JobRequest) -> JobStatus:
global active_job
async with job_lock:
if active_job is not None:
busy_job = active_job.status
return JobStatus(
job_id=busy_job.job_id,
state="busy",
url=request.url,
message=f"busy with {busy_job.url}",
requester_id=request.requester_id,
requester_name=request.requester_name,
origin=request.origin,
)
status = JobStatus(
job_id=str(uuid4()),
state="accepted",
url=request.url,
message="accepted",
phase="queued",
requester_id=request.requester_id,
requester_name=request.requester_name,
origin=request.origin,
)
managed_job = ManagedJob(status=status)
managed_job.task = asyncio.create_task(run_job(managed_job, request))
active_job = managed_job
return clone_status(status)
@app.get("/health", response_model=HealthResponse)
async def health() -> HealthResponse:
return HealthResponse(status="ok")
@app.get("/version", response_model=VersionResponse)
async def version() -> VersionResponse:
return VersionResponse(
version=read_version(),
active_job=active_job is not None,
ytdlp_executable=YTDLP_EXECUTABLE,
)
@app.get("/jobs/current", response_model=CurrentJobResponse)
async def current_job() -> CurrentJobResponse:
async with job_lock:
if active_job is not None:
return CurrentJobResponse(active=True, job=clone_status(active_job.status))
if last_job is not None:
return CurrentJobResponse(active=False, job=clone_status(last_job))
return CurrentJobResponse(active=False, job=None)
@app.post("/jobs", response_model=JobStatus)
async def submit_job(request: JobRequest) -> JobStatus:
return await create_job(request)
@app.post("/jobs/current/cancel", response_model=JobStatus)
async def cancel_current_job() -> JobStatus:
async with job_lock:
if active_job is None:
raise HTTPException(status_code=404, detail="no active job")
job = active_job
if job.process is None:
update_status(job, message="cancel requested before yt-dlp started")
job.cancel_requested = True
return clone_status(job.status)
job.cancel_requested = True
update_status(job, message="cancel requested", phase="running")
job.process.terminate()
return clone_status(job.status)