Files
arte-dl/downloader.py
T
2026-05-02 19:15:40 +02:00

210 lines
7.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import re
import sqlite3
import threading
import unicodedata
import uuid
from datetime import datetime
from pathlib import Path
import yt_dlp
from fastapi import BackgroundTasks
OUTPUT_DIR = "/data/Arte"
DB_PATH = "arte_dl.db"
def _db():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
# ── Release naming ─────────────────────────────────────────────────────────────
def _slugify(s: str) -> str:
"""Normalize a string to dot-separated scene-style slug."""
# Strip accents (NFD decompose then drop combining marks)
s = unicodedata.normalize("NFD", s)
s = "".join(c for c in s if unicodedata.category(c) != "Mn")
# Apostrophe before letter → .Letter (L'Amour → .L.Amour)
s = re.sub(r"[']([A-Za-z])", lambda m: "." + m.group(1).upper(), s)
# Spaces / underscores → dot
s = re.sub(r"[\s_]+", ".", s)
# Keep only alphanumeric, dot, hyphen
s = re.sub(r"[^A-Za-z0-9.\-]", "", s)
# Collapse multiple dots
s = re.sub(r"\.{2,}", ".", s)
return s.strip(".")
def build_release_name(title: str, subtitle: str, year: int | None, info: dict) -> str:
"""
Build a proper UNFR/scene release name.
Format: Title.Event.Year.FRENCH.Resolution.WEBRip.x264.AAC-ReMoRa.mp4
"""
# Strip year from both title and subtitle to avoid duplication
t = re.sub(r"\b" + str(year) + r"\b", "", title).strip() if year else title
name = _slugify(t)
sub = subtitle or ""
if year:
sub = re.sub(r"\b" + str(year) + r"\b", "", sub).strip()
sub_slug = _slugify(sub)
if sub_slug:
name = f"{name}.{sub_slug}"
year_str = str(year) if year else ""
# Resolution from yt-dlp info
height = info.get("height") or 0
if height >= 2160:
res = "2160p"
elif height >= 1080:
res = "1080p"
elif height >= 720:
res = "720p"
else:
res = f"{height}p" if height else "1080p"
# Video codec (avc1 = H.264, hev1/hvc1/hevc = H.265)
vcodec = (info.get("vcodec") or "").lower()
if "hevc" in vcodec or "h265" in vcodec or "hev1" in vcodec or "hvc1" in vcodec:
vc = "HEVC"
elif "avc" in vcodec or "h264" in vcodec:
vc = "x264"
else:
vc = "x264"
parts = [name, year_str, res, "WEB-DL", vc, "AAC"]
base = ".".join(p for p in parts if p)
return f"{base}-ReMoRa.mp4"
class DownloadManager:
def __init__(self):
self._active: dict[str, dict] = {}
self._lock = threading.Lock()
self._init_db()
def _init_db(self):
with _db() as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS downloads (
id TEXT PRIMARY KEY,
url TEXT NOT NULL,
title TEXT NOT NULL,
filename TEXT,
state TEXT NOT NULL DEFAULT 'queued',
progress REAL DEFAULT 0,
speed TEXT DEFAULT '',
eta INTEGER,
started_at TEXT,
finished_at TEXT,
error TEXT
)
""")
# ------------------------------------------------------------------ public
def enqueue(self, url: str, title: str, subtitle: str, year: int | None,
bg: BackgroundTasks) -> str:
dl_id = str(uuid.uuid4())
now = datetime.now().isoformat()
with _db() as conn:
conn.execute(
"INSERT INTO downloads (id, url, title, state, started_at) VALUES (?,?,?,'queued',?)",
(dl_id, url, title, now),
)
with self._lock:
self._active[dl_id] = {"state": "queued", "progress": 0, "title": title}
bg.add_task(self._run, dl_id, url, title, subtitle, year)
return dl_id
def status(self, dl_id: str) -> dict:
with self._lock:
return dict(self._active.get(dl_id, {"state": "unknown"}))
def history(self) -> list[dict]:
with _db() as conn:
rows = conn.execute(
"SELECT * FROM downloads ORDER BY started_at DESC LIMIT 200"
).fetchall()
return [dict(r) for r in rows]
def already_downloaded(self, url: str) -> bool:
with _db() as conn:
row = conn.execute(
"SELECT id FROM downloads WHERE url=? AND state='done' LIMIT 1", (url,)
).fetchone()
return row is not None
# ----------------------------------------------------------------- private
def _set(self, dl_id: str, **kw):
with self._lock:
self._active.setdefault(dl_id, {}).update(kw)
def _run(self, dl_id: str, url: str, title: str, subtitle: str, year: int | None):
Path(OUTPUT_DIR).mkdir(parents=True, exist_ok=True)
self._set(dl_id, state="downloading")
with _db() as conn:
conn.execute("UPDATE downloads SET state='downloading' WHERE id=?", (dl_id,))
# For HLS, yt-dlp downloads video then audio separately.
# After the first stream finishes, stay in "processing" to avoid
# resetting progress to 0% when the audio stream starts.
finished_once = [False]
def hook(d):
if d["status"] == "downloading" and not finished_once[0]:
dl = d.get("downloaded_bytes") or 0
total = d.get("total_bytes") or d.get("total_bytes_estimate") or 0
pct = min(dl / total * 100, 99.0) if total > 0 else 0.0
self._set(
dl_id,
state="downloading",
progress=round(pct, 1),
speed=d.get("_speed_str", ""),
eta=d.get("eta"),
)
elif d["status"] == "finished":
finished_once[0] = True
self._set(dl_id, state="processing", progress=100)
ydl_opts = {
"outtmpl": f"{OUTPUT_DIR}/%(title)s.%(ext)s",
"format": "bestvideo[vcodec^=avc1]+bestaudio/bestvideo+bestaudio/best",
"merge_output_format": "mp4",
"progress_hooks": [hook],
"quiet": True,
"no_warnings": True,
}
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=True)
orig_path = Path(ydl.prepare_filename(info))
# Rename to proper release name
release_name = build_release_name(title, subtitle, year, info)
dest_path = orig_path.parent / release_name
if orig_path.exists() and orig_path != dest_path:
if dest_path.exists():
dest_path.unlink()
orig_path.rename(dest_path)
filename = str(dest_path)
self._set(dl_id, state="done", progress=100)
with _db() as conn:
conn.execute(
"UPDATE downloads SET state='done', progress=100, filename=?, finished_at=? WHERE id=?",
(filename, datetime.now().isoformat(), dl_id),
)
except Exception as exc:
self._set(dl_id, state="error", error=str(exc))
with _db() as conn:
conn.execute(
"UPDATE downloads SET state='error', error=?, finished_at=? WHERE id=?",
(str(exc), datetime.now().isoformat(), dl_id),
)