import asyncio import re import sqlite3 import threading import unicodedata import uuid from datetime import datetime from pathlib import Path import yt_dlp from arte_api import get_versions, select_lang_tag OUTPUT_DIR = "/data/Arte" _PID_RE = re.compile(r"\b(\d{6}-\d{3}-[A-Z])\b") DB_PATH = "data/arte_dl.db" Path("data").mkdir(exist_ok=True) def _db(): conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row return conn # ── Release naming ───────────────────────────────────────────────────────────── def _slugify(s: str) -> str: """Normalize a string to dot-separated scene-style slug.""" # Strip accents (NFD decompose then drop combining marks) s = unicodedata.normalize("NFD", s) s = "".join(c for c in s if unicodedata.category(c) != "Mn") # Apostrophe before letter → .Letter (L'Amour → .L.Amour) s = re.sub(r"['’]([A-Za-z])", lambda m: "." + m.group(1).upper(), s) # Spaces / underscores → dot s = re.sub(r"[\s_]+", ".", s) # Keep only alphanumeric, dot, hyphen s = re.sub(r"[^A-Za-z0-9.\-]", "", s) # Collapse multiple dots s = re.sub(r"\.{2,}", ".", s) return s.strip(".") def build_release_name(title: str, subtitle: str, year: int | None, info: dict, lang_tag: str = "VO") -> str: """Build a proper UNFR/scene release name. Format: Title.Event.Year.LANG.Resolution.WEB-DL.x264.AAC-ReMoRa.mkv """ t = re.sub(r"\b" + str(year) + r"\b", "", title).strip() if year else title name = _slugify(t) sub = subtitle or "" if year: sub = re.sub(r"\b" + str(year) + r"\b", "", sub).strip() sub_slug = _slugify(sub) if sub_slug: name = f"{name}.{sub_slug}" year_str = str(year) if year else "" height = info.get("height") or 0 if height >= 2160: res = "2160p" elif height >= 1080: res = "1080p" elif height >= 720: res = "720p" else: res = f"{height}p" if height else "1080p" vcodec = (info.get("vcodec") or "").lower() if "hevc" in vcodec or "h265" in vcodec or "hev1" in vcodec or "hvc1" in vcodec: vc = "HEVC" elif "avc" in vcodec or "h264" in vcodec: vc = "x264" else: vc = "x264" parts = [name, year_str, lang_tag, res, "WEB-DL", vc, "AAC"] base = ".".join(p for p in parts if p) return f"{base}-ReMoRa.mkv" class DownloadManager: def __init__(self): self._active: dict[str, dict] = {} self._lock = threading.Lock() self._queue: asyncio.Queue = asyncio.Queue() self._init_db() def _init_db(self): with _db() as conn: conn.execute(""" CREATE TABLE IF NOT EXISTS downloads ( id TEXT PRIMARY KEY, url TEXT NOT NULL, title TEXT NOT NULL, filename TEXT, state TEXT NOT NULL DEFAULT 'queued', progress REAL DEFAULT 0, speed TEXT DEFAULT '', eta INTEGER, started_at TEXT, finished_at TEXT, error TEXT ) """) conn.execute(""" CREATE TABLE IF NOT EXISTS auto_dl_categories ( category TEXT PRIMARY KEY, added_at TEXT NOT NULL ) """) # ------------------------------------------------------------------ public def get_watched_categories(self) -> list[str]: with _db() as conn: rows = conn.execute( "SELECT category FROM auto_dl_categories ORDER BY added_at" ).fetchall() return [r["category"] for r in rows] def watch_category(self, category: str): with _db() as conn: conn.execute( "INSERT OR IGNORE INTO auto_dl_categories (category, added_at) VALUES (?,?)", (category, datetime.now().isoformat()), ) def unwatch_category(self, category: str): with _db() as conn: conn.execute("DELETE FROM auto_dl_categories WHERE category=?", (category,)) def already_enqueued(self, url: str) -> bool: with _db() as conn: row = conn.execute( "SELECT id FROM downloads WHERE url=? AND state != 'error' LIMIT 1", (url,) ).fetchone() return row is not None def already_downloaded(self, url: str) -> bool: with _db() as conn: row = conn.execute( "SELECT id FROM downloads WHERE url=? AND state='done' LIMIT 1", (url,) ).fetchone() return row is not None async def enqueue(self, url: str, title: str, subtitle: str, year: int | None, category: str) -> str: dl_id = str(uuid.uuid4()) now = datetime.now().isoformat() with _db() as conn: conn.execute( "INSERT INTO downloads (id, url, title, state, started_at) VALUES (?,?,?,'queued',?)", (dl_id, url, title, now), ) with self._lock: self._active[dl_id] = {"state": "queued", "progress": 0, "title": title} await self._queue.put((dl_id, url, title, subtitle, year, category)) return dl_id async def start_worker(self): loop = asyncio.get_running_loop() while True: job = await self._queue.get() dl_id, url, title, subtitle, year, category = job await loop.run_in_executor(None, self._run, dl_id, url, title, subtitle, year, category) def status(self, dl_id: str) -> dict: with self._lock: return dict(self._active.get(dl_id, {"state": "unknown"})) def history(self) -> list[dict]: with _db() as conn: rows = conn.execute( "SELECT * FROM downloads ORDER BY started_at DESC LIMIT 200" ).fetchall() return [dict(r) for r in rows] # ----------------------------------------------------------------- private def _set(self, dl_id: str, **kw): with self._lock: self._active.setdefault(dl_id, {}).update(kw) def _run(self, dl_id: str, url: str, title: str, subtitle: str, year: int | None, category: str = ""): out_dir = f"{OUTPUT_DIR}/{category}" if category else OUTPUT_DIR Path(out_dir).mkdir(parents=True, exist_ok=True) self._set(dl_id, state="downloading") with _db() as conn: conn.execute("UPDATE downloads SET state='downloading' WHERE id=?", (dl_id,)) # Determine language tag from Arte Player API before downloading pid_m = _PID_RE.search(url) lang_tag = "VO" if pid_m: versions = get_versions(pid_m.group(1)) lang_tag = select_lang_tag(versions) # MKV internal title: "Artist - Concert Title (year)" name_part = f"{title} - {subtitle}" if subtitle else title mkv_title = f"{name_part} ({year})" if year else name_part # For HLS, yt-dlp downloads video then audio separately. # After the first stream finishes, stay in "processing" to avoid # resetting progress to 0% when the audio stream starts. finished_once = [False] def hook(d): if d["status"] == "downloading" and not finished_once[0]: dl = d.get("downloaded_bytes") or 0 total = d.get("total_bytes") or d.get("total_bytes_estimate") or 0 pct = min(dl / total * 100, 99.0) if total > 0 else 0.0 self._set( dl_id, state="downloading", progress=round(pct, 1), speed=d.get("_speed_str", ""), eta=d.get("eta"), ) elif d["status"] == "finished": finished_once[0] = True self._set(dl_id, state="processing", progress=100) ffmpeg_out = ["-metadata", f"title={mkv_title}"] ydl_opts = { "outtmpl": f"{out_dir}/%(title)s.%(ext)s", "format": "bestvideo[vcodec^=avc1]+bestaudio/bestvideo+bestaudio/best", "merge_output_format": "mkv", "postprocessor_args": {"ffmpeg_o": ffmpeg_out}, "progress_hooks": [hook], "quiet": True, "no_warnings": True, } if lang_tag == "VOSTFR": ydl_opts.update({ "writesubtitles": True, "subtitleslangs": ["fr"], "embedsubtitles": True, }) # Append disposition after title metadata ffmpeg_out += ["-disposition:s:0", "default"] try: with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=True) orig_path = Path(ydl.prepare_filename(info)) # yt-dlp renames to .mkv after merge; prepare_filename may return .mp4 if not orig_path.exists(): orig_path = orig_path.with_suffix(".mkv") release_name = build_release_name(title, subtitle, year, info, lang_tag) dest_path = orig_path.parent / release_name if orig_path.exists() and orig_path != dest_path: if dest_path.exists(): dest_path.unlink() orig_path.rename(dest_path) filename = str(dest_path) self._set(dl_id, state="done", progress=100) with _db() as conn: conn.execute( "UPDATE downloads SET state='done', progress=100, filename=?, finished_at=? WHERE id=?", (filename, datetime.now().isoformat(), dl_id), ) except Exception as exc: self._set(dl_id, state="error", error=str(exc)) with _db() as conn: conn.execute( "UPDATE downloads SET state='error', error=?, finished_at=? WHERE id=?", (str(exc), datetime.now().isoformat(), dl_id), )