From 2a5648506ee43051d003f2d6badf5f4f75ad35d6 Mon Sep 17 00:00:00 2001 From: ben Date: Tue, 31 Mar 2026 20:54:56 -0400 Subject: [PATCH] initial ytdlp worker --- youdis/__init__.py | 1 + youdis/adapters/__init__.py | 1 + youdis/adapters/discord.py | 1 + youdis/main.py | 255 ++++++++++++++++++++++++++++++++++++ youdis/models.py | 46 +++++++ 5 files changed, 304 insertions(+) create mode 100644 youdis/__init__.py create mode 100644 youdis/adapters/__init__.py create mode 100644 youdis/adapters/discord.py create mode 100644 youdis/main.py create mode 100644 youdis/models.py diff --git a/youdis/__init__.py b/youdis/__init__.py new file mode 100644 index 0000000..5a9c47b --- /dev/null +++ b/youdis/__init__.py @@ -0,0 +1 @@ +"""Youdis v2 backend package.""" diff --git a/youdis/adapters/__init__.py b/youdis/adapters/__init__.py new file mode 100644 index 0000000..d464cd5 --- /dev/null +++ b/youdis/adapters/__init__.py @@ -0,0 +1 @@ +"""Frontend adapters for the youdis backend.""" diff --git a/youdis/adapters/discord.py b/youdis/adapters/discord.py new file mode 100644 index 0000000..7beab65 --- /dev/null +++ b/youdis/adapters/discord.py @@ -0,0 +1 @@ +"""Discord adapter placeholder for the v2 backend.""" diff --git a/youdis/main.py b/youdis/main.py new file mode 100644 index 0000000..e616544 --- /dev/null +++ b/youdis/main.py @@ -0,0 +1,255 @@ +import asyncio +from asyncio.subprocess import PIPE, STDOUT +from collections import deque +from dataclasses import dataclass, field +from datetime import datetime +from os import getenv +from pathlib import Path +from uuid import uuid4 + +from fastapi import FastAPI, HTTPException + +from .models import CurrentJobResponse, HealthResponse, JobRequest, JobStatus, VersionResponse + + +REPO_ROOT = Path(__file__).resolve().parent.parent +DEFAULT_CONFIG = REPO_ROOT / "default-yt-dlp.conf" +VERSION_FILE = REPO_ROOT / "youdis-version.txt" +YTDLP_EXECUTABLE = getenv("YOUDIS_YTDLP_EXECUTABLE", "yt-dlp") + + +@dataclass +class ManagedJob: + status: JobStatus + process: asyncio.subprocess.Process | None = None + task: asyncio.Task | None = None + cancel_requested: bool = False + output_lines: deque[str] = field(default_factory=lambda: deque(maxlen=25)) + + +app = FastAPI(title="youdis", version="2") +job_lock = asyncio.Lock() +active_job: ManagedJob | None = None +last_job: JobStatus | None = None + + +def now_utc() -> datetime: + return datetime.now() + + +def read_version() -> str: + if VERSION_FILE.exists(): + return VERSION_FILE.read_text().strip() + return "unknown" + + +def build_ytdlp_command(request: JobRequest) -> list[str]: + return [ + YTDLP_EXECUTABLE, + "--config-locations", + str(DEFAULT_CONFIG), + request.url, + ] + + +def clone_status(status: JobStatus) -> JobStatus: + return JobStatus(**status.model_dump()) + + +def update_status(job: ManagedJob, **changes: object) -> None: + for key, value in changes.items(): + setattr(job.status, key, value) + job.status.updated_at = now_utc() + + +def classify_output_line(job: ManagedJob, line: str) -> None: + if not line: + return + + job.output_lines.append(line) + message = line.strip() + if not message: + return + + lowered = message.lower() + if "has already been recorded in the archive" in lowered: + update_status( + job, + disposition="archive_hit", + phase="downloading", + message="already in archive", + ) + return + + if "[download]" in lowered: + update_status(job, phase="downloading", message=message) + if "destination:" in lowered: + result_path = message.split(":", 1)[1].strip() + update_status(job, result_path=result_path) + return + + if "merging formats into" in lowered: + result_path = message.split("into", 1)[1].strip().strip('"') + update_status(job, phase="postprocessing", message=message, result_path=result_path) + return + + update_status(job, message=message) + + +async def finalize_job(job: ManagedJob, returncode: int) -> None: + disposition = job.status.disposition + if job.cancel_requested: + state = "cancelled" + message = "cancelled" + elif returncode == 0 and disposition == "archive_hit": + state = "completed" + message = "already in archive" + elif returncode == 0: + state = "completed" + message = job.status.message or "completed" + else: + state = "failed" + message = job.status.message or f"yt-dlp exited with code {returncode}" + + update_status(job, state=state, phase=None, returncode=returncode, message=message) + + global active_job, last_job + async with job_lock: + if active_job is job: + active_job = None + last_job = clone_status(job.status) + + +async def run_job(job: ManagedJob, request: JobRequest) -> None: + command = build_ytdlp_command(request) + update_status( + job, + state="running", + phase="starting", + message="starting yt-dlp", + command=command, + ) + + try: + if not DEFAULT_CONFIG.exists(): + update_status( + job, + state="failed", + phase=None, + message=f"default config not found: {DEFAULT_CONFIG}", + returncode=78, + ) + await finalize_job(job, 78) + return + + process = await asyncio.create_subprocess_exec( + *command, + stdout=PIPE, + stderr=STDOUT, + ) + except FileNotFoundError: + update_status( + job, + state="failed", + phase=None, + message="yt-dlp executable not found", + returncode=127, + ) + await finalize_job(job, 127) + return + + try: + job.process = process + update_status(job, phase="running", message="yt-dlp running") + + assert process.stdout is not None + while True: + line = await process.stdout.readline() + if not line: + break + classify_output_line(job, line.decode(errors="replace").strip()) + + returncode = await process.wait() + await finalize_job(job, returncode) + except Exception as exc: + update_status( + job, + state="failed", + phase=None, + message=f"backend runner error: {exc}", + returncode=1, + ) + await finalize_job(job, 1) + + +async def create_job(request: JobRequest) -> JobStatus: + global active_job + async with job_lock: + if active_job is not None: + busy_job = active_job.status + return JobStatus( + job_id=busy_job.job_id, + state="busy", + url=request.url, + message=f"busy with {busy_job.url}", + requester_id=request.requester_id, + requester_name=request.requester_name, + origin=request.origin, + ) + + status = JobStatus( + job_id=str(uuid4()), + state="accepted", + url=request.url, + message="accepted", + phase="queued", + requester_id=request.requester_id, + requester_name=request.requester_name, + origin=request.origin, + ) + managed_job = ManagedJob(status=status) + managed_job.task = asyncio.create_task(run_job(managed_job, request)) + active_job = managed_job + return clone_status(status) + + +@app.get("/health", response_model=HealthResponse) +async def health() -> HealthResponse: + return HealthResponse(status="ok") + + +@app.get("/version", response_model=VersionResponse) +async def version() -> VersionResponse: + return VersionResponse(version=read_version(), active_job=active_job is not None) + + +@app.get("/jobs/current", response_model=CurrentJobResponse) +async def current_job() -> CurrentJobResponse: + async with job_lock: + if active_job is not None: + return CurrentJobResponse(active=True, job=clone_status(active_job.status)) + if last_job is not None: + return CurrentJobResponse(active=False, job=clone_status(last_job)) + return CurrentJobResponse(active=False, job=None) + + +@app.post("/jobs", response_model=JobStatus) +async def submit_job(request: JobRequest) -> JobStatus: + return await create_job(request) + + +@app.post("/jobs/current/cancel", response_model=JobStatus) +async def cancel_current_job() -> JobStatus: + async with job_lock: + if active_job is None: + raise HTTPException(status_code=404, detail="no active job") + job = active_job + if job.process is None: + update_status(job, message="cancel requested before yt-dlp started") + job.cancel_requested = True + return clone_status(job.status) + + job.cancel_requested = True + update_status(job, message="cancel requested", phase="running") + job.process.terminate() + return clone_status(job.status) diff --git a/youdis/models.py b/youdis/models.py new file mode 100644 index 0000000..97fb836 --- /dev/null +++ b/youdis/models.py @@ -0,0 +1,46 @@ +from datetime import datetime +from typing import Literal + +from pydantic import BaseModel, Field + + +JobState = Literal["accepted", "busy", "running", "completed", "failed", "cancelled"] + + +class JobRequest(BaseModel): + url: str + requester_id: str | None = None + requester_name: str | None = None + origin: str | None = None + requested_at: datetime | None = None + + +class JobStatus(BaseModel): + job_id: str + state: JobState + url: str + message: str | None = None + phase: str | None = None + disposition: str | None = None + requester_id: str | None = None + requester_name: str | None = None + origin: str | None = None + result_path: str | None = None + command: list[str] = Field(default_factory=list) + returncode: int | None = None + created_at: datetime = Field(default_factory=datetime.utcnow) + updated_at: datetime = Field(default_factory=datetime.utcnow) + + +class CurrentJobResponse(BaseModel): + active: bool + job: JobStatus | None = None + + +class HealthResponse(BaseModel): + status: Literal["ok"] + + +class VersionResponse(BaseModel): + version: str + active_job: bool