import sqlite3 import threading import uuid from datetime import datetime from pathlib import Path import yt_dlp from fastapi import BackgroundTasks OUTPUT_DIR = "/data/Arte" DB_PATH = "arte_dl.db" def _db(): conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row return conn class DownloadManager: def __init__(self): self._active: dict[str, dict] = {} self._lock = threading.Lock() self._init_db() def _init_db(self): with _db() as conn: conn.execute(""" CREATE TABLE IF NOT EXISTS downloads ( id TEXT PRIMARY KEY, url TEXT NOT NULL, title TEXT NOT NULL, filename TEXT, state TEXT NOT NULL DEFAULT 'queued', progress REAL DEFAULT 0, speed TEXT DEFAULT '', eta INTEGER, started_at TEXT, finished_at TEXT, error TEXT ) """) # ------------------------------------------------------------------ public def enqueue(self, url: str, title: str, bg: BackgroundTasks) -> str: dl_id = str(uuid.uuid4()) now = datetime.now().isoformat() with _db() as conn: conn.execute( "INSERT INTO downloads (id, url, title, state, started_at) VALUES (?,?,?,'queued',?)", (dl_id, url, title, now), ) with self._lock: self._active[dl_id] = {"state": "queued", "progress": 0, "title": title} bg.add_task(self._run, dl_id, url) return dl_id def status(self, dl_id: str) -> dict: with self._lock: return dict(self._active.get(dl_id, {"state": "unknown"})) def history(self) -> list[dict]: with _db() as conn: rows = conn.execute( "SELECT * FROM downloads ORDER BY started_at DESC LIMIT 200" ).fetchall() return [dict(r) for r in rows] def already_downloaded(self, url: str) -> bool: with _db() as conn: row = conn.execute( "SELECT id FROM downloads WHERE url=? AND state='done' LIMIT 1", (url,) ).fetchone() return row is not None # ----------------------------------------------------------------- private def _set(self, dl_id: str, **kw): with self._lock: self._active.setdefault(dl_id, {}).update(kw) def _run(self, dl_id: str, url: str): Path(OUTPUT_DIR).mkdir(parents=True, exist_ok=True) self._set(dl_id, state="downloading") with _db() as conn: conn.execute("UPDATE downloads SET state='downloading' WHERE id=?", (dl_id,)) # For HLS, yt-dlp downloads video then audio separately. # After the first stream finishes, stay in "processing" — don't reset # to "downloading" when the audio stream starts. finished_once = [False] def hook(d): if d["status"] == "downloading" and not finished_once[0]: dl = d.get("downloaded_bytes") or 0 total = d.get("total_bytes") or d.get("total_bytes_estimate") or 0 pct = min(dl / total * 100, 99.0) if total > 0 else 0.0 self._set( dl_id, state="downloading", progress=round(pct, 1), speed=d.get("_speed_str", ""), eta=d.get("eta"), ) elif d["status"] == "finished": finished_once[0] = True self._set(dl_id, state="processing", progress=100) ydl_opts = { "outtmpl": f"{OUTPUT_DIR}/%(title)s.%(ext)s", "format": "bestvideo[vcodec^=avc1]+bestaudio/bestvideo+bestaudio/best", "merge_output_format": "mp4", "progress_hooks": [hook], "quiet": True, "no_warnings": True, } try: with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=True) filename = ydl.prepare_filename(info) self._set(dl_id, state="done", progress=100) with _db() as conn: conn.execute( "UPDATE downloads SET state='done', progress=100, filename=?, finished_at=? WHERE id=?", (filename, datetime.now().isoformat(), dl_id), ) except Exception as exc: self._set(dl_id, state="error", error=str(exc)) with _db() as conn: conn.execute( "UPDATE downloads SET state='error', error=?, finished_at=? WHERE id=?", (str(exc), datetime.now().isoformat(), dl_id), )