import asyncio import re import sqlite3 import threading import unicodedata import uuid from datetime import datetime from pathlib import Path import yt_dlp from arte_api import get_versions, select_lang_tag OUTPUT_DIR = "/data/Arte" _PID_RE = re.compile(r"\b(\d{6}-\d{3}-[A-Z])\b") DB_PATH = "data/arte_dl.db" Path("data").mkdir(exist_ok=True) def _db(): conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row return conn # ── Release naming ───────────────────────────────────────────────────────────── def _slugify(s: str) -> str: """Normalize a string to dot-separated scene-style slug.""" # Strip accents (NFD decompose then drop combining marks) s = unicodedata.normalize("NFD", s) s = "".join(c for c in s if unicodedata.category(c) != "Mn") # Apostrophe before letter → .Letter (L'Amour → .L.Amour) s = re.sub(r"['’]([A-Za-z])", lambda m: "." + m.group(1).upper(), s) # Spaces / underscores → dot s = re.sub(r"[\s_]+", ".", s) # Keep only alphanumeric, dot, hyphen s = re.sub(r"[^A-Za-z0-9.\-]", "", s) # Collapse multiple dots s = re.sub(r"\.{2,}", ".", s) return s.strip(".") def build_release_name(title: str, subtitle: str, year: int | None, info: dict, lang_tag: str = "VO") -> str: """Build a proper UNFR/scene release name. Format: Title.Event.Year.LANG.Resolution.WEB-DL.x264.AAC-ReMoRa.mkv """ t = re.sub(r"\b" + str(year) + r"\b", "", title).strip() if year else title name = _slugify(t) sub = subtitle or "" if year: sub = re.sub(r"\b" + str(year) + r"\b", "", sub).strip() sub_slug = _slugify(sub) if sub_slug: name = f"{name}.{sub_slug}" year_str = str(year) if year else "" height = info.get("height") or 0 if height >= 2160: res = "2160p" elif height >= 1080: res = "1080p" elif height >= 720: res = "720p" else: res = f"{height}p" if height else "1080p" vcodec = (info.get("vcodec") or "").lower() if "hevc" in vcodec or "h265" in vcodec or "hev1" in vcodec or "hvc1" in vcodec: vc = "HEVC" elif "avc" in vcodec or "h264" in vcodec: vc = "x264" else: vc = "x264" parts = [name, year_str, lang_tag, res, "WEB-DL", vc, "AAC"] base = ".".join(p for p in parts if p) return f"{base}-ReMoRa.mkv" class DownloadManager: def __init__(self): self._active: dict[str, dict] = {} self._lock = threading.Lock() self._queue: asyncio.Queue = asyncio.Queue() self._init_db() def _init_db(self): with _db() as conn: conn.execute(""" CREATE TABLE IF NOT EXISTS downloads ( id TEXT PRIMARY KEY, url TEXT NOT NULL, title TEXT NOT NULL, subtitle TEXT NOT NULL DEFAULT '', year INTEGER, category TEXT NOT NULL DEFAULT '', filename TEXT, state TEXT NOT NULL DEFAULT 'queued', progress REAL DEFAULT 0, speed TEXT DEFAULT '', eta INTEGER, started_at TEXT, finished_at TEXT, error TEXT ) """) for col, definition in [ ("subtitle", "TEXT NOT NULL DEFAULT ''"), ("year", "INTEGER"), ("category", "TEXT NOT NULL DEFAULT ''"), ]: try: conn.execute(f"ALTER TABLE downloads ADD COLUMN {col} {definition}") except Exception: pass conn.execute(""" CREATE TABLE IF NOT EXISTS auto_dl_categories ( category TEXT PRIMARY KEY, added_at TEXT NOT NULL ) """) # ------------------------------------------------------------------ public def get_watched_categories(self) -> list[str]: with _db() as conn: rows = conn.execute( "SELECT category FROM auto_dl_categories ORDER BY added_at" ).fetchall() return [r["category"] for r in rows] def watch_category(self, category: str): with _db() as conn: conn.execute( "INSERT OR IGNORE INTO auto_dl_categories (category, added_at) VALUES (?,?)", (category, datetime.now().isoformat()), ) def unwatch_category(self, category: str): with _db() as conn: conn.execute("DELETE FROM auto_dl_categories WHERE category=?", (category,)) def already_enqueued(self, url: str) -> bool: with _db() as conn: row = conn.execute( "SELECT id FROM downloads WHERE url=? AND state != 'error' LIMIT 1", (url,) ).fetchone() return row is not None def already_downloaded(self, url: str) -> bool: with _db() as conn: row = conn.execute( "SELECT id FROM downloads WHERE url=? AND state='done' LIMIT 1", (url,) ).fetchone() return row is not None async def enqueue(self, url: str, title: str, subtitle: str, year: int | None, category: str) -> str: dl_id = str(uuid.uuid4()) now = datetime.now().isoformat() with _db() as conn: conn.execute( "INSERT INTO downloads (id, url, title, subtitle, year, category, state, started_at) VALUES (?,?,?,?,?,?,'queued',?)", (dl_id, url, title, subtitle, year, category, now), ) with self._lock: self._active[dl_id] = {"state": "queued", "progress": 0, "title": title} await self._queue.put((dl_id, url, title, subtitle, year, category)) return dl_id async def resume_pending(self): """Re-queue downloads interrupted by a container restart.""" with _db() as conn: rows = conn.execute( "SELECT id, url, title, subtitle, year, category FROM downloads" " WHERE state IN ('queued', 'downloading') ORDER BY started_at" ).fetchall() conn.execute( "UPDATE downloads SET state='queued', progress=0, speed='', eta=NULL" " WHERE state='downloading'" ) for r in rows: with self._lock: self._active[r["id"]] = {"state": "queued", "progress": 0, "title": r["title"]} await self._queue.put((r["id"], r["url"], r["title"], r["subtitle"] or "", r["year"], r["category"] or "")) if rows: logger.info("Resumed %d pending download(s)", len(rows)) async def start_worker(self): loop = asyncio.get_running_loop() while True: job = await self._queue.get() dl_id, url, title, subtitle, year, category = job await loop.run_in_executor(None, self._run, dl_id, url, title, subtitle, year, category) def status(self, dl_id: str) -> dict: with self._lock: return dict(self._active.get(dl_id, {"state": "unknown"})) def history(self) -> list[dict]: with _db() as conn: rows = conn.execute( "SELECT * FROM downloads ORDER BY started_at DESC LIMIT 200" ).fetchall() return [dict(r) for r in rows] # ----------------------------------------------------------------- private def _set(self, dl_id: str, **kw): with self._lock: self._active.setdefault(dl_id, {}).update(kw) def _run(self, dl_id: str, url: str, title: str, subtitle: str, year: int | None, category: str = ""): out_dir = f"{OUTPUT_DIR}/{category}" if category else OUTPUT_DIR Path(out_dir).mkdir(parents=True, exist_ok=True) self._set(dl_id, state="downloading") with _db() as conn: conn.execute("UPDATE downloads SET state='downloading' WHERE id=?", (dl_id,)) # Determine language tag from Arte Player API before downloading pid_m = _PID_RE.search(url) lang_tag = "VO" if pid_m: versions = get_versions(pid_m.group(1)) lang_tag = select_lang_tag(versions) # MKV internal title: "Artist - Concert Title (year)" name_part = f"{title} - {subtitle}" if subtitle else title mkv_title = f"{name_part} ({year})" if year else name_part # For HLS, yt-dlp downloads video then audio separately. # After the first stream finishes, stay in "processing" to avoid # resetting progress to 0% when the audio stream starts. finished_once = [False] def hook(d): if d["status"] == "downloading" and not finished_once[0]: dl = d.get("downloaded_bytes") or 0 total = d.get("total_bytes") or d.get("total_bytes_estimate") or 0 pct = min(dl / total * 100, 99.0) if total > 0 else 0.0 self._set( dl_id, state="downloading", progress=round(pct, 1), speed=d.get("_speed_str", ""), eta=d.get("eta"), ) elif d["status"] == "finished": finished_once[0] = True self._set(dl_id, state="processing", progress=100) ffmpeg_out = ["-metadata", f"title={mkv_title}"] ydl_opts = { "outtmpl": f"{out_dir}/%(title)s.%(ext)s", "format": "bestvideo[vcodec^=avc1]+bestaudio/bestvideo+bestaudio/best", "merge_output_format": "mkv", "postprocessor_args": {"ffmpeg_o": ffmpeg_out}, "progress_hooks": [hook], "quiet": True, "no_warnings": True, } if lang_tag == "VOSTFR": ydl_opts.update({ "writesubtitles": True, "subtitleslangs": ["fr"], "embedsubtitles": True, }) # Append disposition after title metadata ffmpeg_out += ["-disposition:s:0", "default"] try: with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=True) orig_path = Path(ydl.prepare_filename(info)) # yt-dlp renames to .mkv after merge; prepare_filename may return .mp4 if not orig_path.exists(): orig_path = orig_path.with_suffix(".mkv") release_name = build_release_name(title, subtitle, year, info, lang_tag) dest_path = orig_path.parent / release_name if orig_path.exists() and orig_path != dest_path: if dest_path.exists(): dest_path.unlink() orig_path.rename(dest_path) filename = str(dest_path) self._set(dl_id, state="done", progress=100) with _db() as conn: conn.execute( "UPDATE downloads SET state='done', progress=100, filename=?, finished_at=? WHERE id=?", (filename, datetime.now().isoformat(), dl_id), ) except Exception as exc: self._set(dl_id, state="error", error=str(exc)) with _db() as conn: conn.execute( "UPDATE downloads SET state='error', error=?, finished_at=? WHERE id=?", (str(exc), datetime.now().isoformat(), dl_id), )