Files
arte-dl/arte_api.py
T
dev d3ce89f228
Docker / docker (push) Successful in 1m58s
perf: stale-while-revalidate cache + startup pre-warm
If DB has any concerts data (even expired), return it immediately and
refresh in background. Start pre-warming at container startup so the
scrape runs before the first user request.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-15 17:02:09 +02:00

334 lines
10 KiB
Python

import re
import sqlite3
import time
import logging
import asyncio
import urllib.request
import json
from concurrent.futures import ThreadPoolExecutor
from urllib.parse import quote_plus
import tmdb as _tmdb
logger = logging.getLogger(__name__)
CACHE_TTL = 6 * 3600
DB_PATH = "data/arte_dl.db"
_cache: dict = {"data": [], "ts": 0}
_fetch_lock: asyncio.Lock | None = None
_refresh_task: asyncio.Task | None = None
def _get_fetch_lock() -> asyncio.Lock:
global _fetch_lock
if _fetch_lock is None:
_fetch_lock = asyncio.Lock()
return _fetch_lock
import os as _os
_os.makedirs("data", exist_ok=True)
def _db():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def _init_concerts_cache_table():
with _db() as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS concerts_cache (
id INTEGER PRIMARY KEY CHECK (id = 1),
data TEXT NOT NULL,
ts REAL NOT NULL
)
""")
_init_concerts_cache_table()
def _load_db_cache() -> tuple[list, float]:
try:
with _db() as conn:
row = conn.execute("SELECT data, ts FROM concerts_cache WHERE id=1").fetchone()
if row:
return json.loads(row["data"]), row["ts"]
except Exception:
pass
return [], 0.0
def _save_db_cache(data: list, ts: float):
try:
with _db() as conn:
conn.execute(
"INSERT OR REPLACE INTO concerts_cache (id, data, ts) VALUES (1, ?, ?)",
(json.dumps(data), ts),
)
except Exception as e:
logger.warning("Failed to save concerts cache: %s", e)
PLAYER_API = "https://api.arte.tv/api/player/v2/config/fr/{pid}"
SEARCH_URL = "https://www.arte.tv/fr/search/?q={q}"
GENRE_PAGES = [
("Pop & Rock", "https://www.arte.tv/fr/p/pop-rock/"),
("Classique", "https://www.arte.tv/fr/p/classique/"),
("Electro", "https://www.arte.tv/fr/p/musiques-electroniques/"),
("Jazz", "https://www.arte.tv/fr/p/jazz"),
("Arts de la scène", "https://www.arte.tv/fr/p/arts-de-la-scene"),
("Hip-hop", "https://www.arte.tv/fr/p/hip-hop"),
("Metal", "https://www.arte.tv/fr/p/metal"),
("Opéra", "https://www.arte.tv/fr/p/opera"),
("World", "https://www.arte.tv/fr/p/world"),
("Baroque", "https://www.arte.tv/fr/p/musique-baroque/"),
]
EXTRA_PAGES = [
"https://www.arte.tv/fr/arte-concert/agenda/",
"https://www.arte.tv/fr/arte-concert/",
]
CATEGORIES = [name for name, _ in GENRE_PAGES]
_HEADERS = {
"User-Agent": (
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
"Chrome/120.0.0.0 Safari/537.36"
),
"RSC": "1",
}
_PROG_RE = re.compile(r"\b(\d{6}-\d{3}-[A-Z])\b")
def _fetch_url(url: str, headers: dict | None = None) -> str:
req = urllib.request.Request(url, headers=headers or _HEADERS)
with urllib.request.urlopen(req, timeout=15) as r:
return r.read().decode("utf-8", errors="replace")
def _prog_ids_from_page(url: str) -> set[str]:
try:
return set(_PROG_RE.findall(_fetch_url(url)))
except Exception as ex:
logger.warning("Failed to fetch %s: %s", url, ex)
return set()
def _metadata_for_pid(pid: str) -> dict | None:
try:
raw = _fetch_url(
PLAYER_API.format(pid=pid),
headers={"User-Agent": _HEADERS["User-Agent"], "Accept": "application/json"},
)
data = json.loads(raw)
attrs = data["data"]["attributes"]
meta = attrs["metadata"]
url = (meta.get("link") or {}).get("url") or f"https://www.arte.tv/fr/videos/{pid}/"
imgs = meta.get("images") or []
thumbnail = imgs[0]["url"] if imgs else ""
duration_s = (meta.get("duration") or {}).get("seconds")
rights = attrs.get("rights") or {}
return {
"id": pid,
"title": meta.get("title") or "",
"subtitle": meta.get("subtitle") or "",
"url": url,
"thumbnail": thumbnail,
"duration": duration_s,
"description": meta.get("description") or "",
"expiry": rights.get("end") or "",
}
except Exception as ex:
logger.debug("Failed to get metadata for %s: %s", pid, ex)
return None
def _fetch_all_sync() -> list[dict]:
id_cats: dict[str, list[str]] = {}
for name, url in GENRE_PAGES:
ids = _prog_ids_from_page(url)
logger.info(" %s%d IDs", name, len(ids))
for pid in ids:
id_cats.setdefault(pid, []).append(name)
all_ids: set[str] = set(id_cats)
for url in EXTRA_PAGES:
ids = _prog_ids_from_page(url)
logger.info(" %s%d IDs", url.split("/fr/")[1], len(ids))
all_ids |= ids
logger.info("Total unique programme IDs: %d", len(all_ids))
concerts = _resolve_ids(all_ids)
for c in concerts:
c["categories"] = id_cats.get(c["id"], [])
# TMDB enrichment (concurrent, results cached in SQLite)
def _enrich(c: dict) -> dict:
t = _tmdb.lookup(c["id"], c.get("title", ""), c.get("subtitle", ""))
if t:
c.update(t)
return c
with ThreadPoolExecutor(max_workers=5) as pool:
concerts = list(pool.map(_enrich, concerts))
concerts.sort(key=lambda c: c.get("expiry") or "", reverse=True)
return concerts
def _resolve_ids(ids: set[str], exclude: set[str] | None = None) -> list[dict]:
to_fetch = ids - (exclude or set())
with ThreadPoolExecutor(max_workers=10) as pool:
results = list(pool.map(_metadata_for_pid, sorted(to_fetch)))
return [c for c in results if c and c.get("title")]
def _search_sync(query: str) -> set[str]:
url = SEARCH_URL.format(q=quote_plus(query))
try:
html = _fetch_url(url)
return set(_PROG_RE.findall(html))
except Exception as ex:
logger.warning("Search failed for %r: %s", query, ex)
return set()
# ── public API ────────────────────────────────────────────────────────────────
async def _ensure_cache() -> list[dict]:
return await get_all_concerts()
async def get_concerts_by_category(category: str) -> list[dict]:
data = await _ensure_cache()
return [c for c in data if category in (c.get("categories") or [])]
async def _do_refresh():
"""Full scrape under lock; updates in-memory + DB cache."""
global _refresh_task
async with _get_fetch_lock():
if _cache["data"] and time.time() - _cache["ts"] < CACHE_TTL:
return
loop = asyncio.get_event_loop()
data = await loop.run_in_executor(None, _fetch_all_sync)
if data:
ts = time.time()
_cache["data"] = data
_cache["ts"] = ts
_save_db_cache(data, ts)
logger.info("Cache refreshed: %d concerts", len(data))
_refresh_task = None
async def get_all_concerts() -> list[dict]:
global _refresh_task
now = time.time()
# In-memory cache hit
if _cache["data"] and now - _cache["ts"] < CACHE_TTL:
return _cache["data"]
# Try DB cache — return immediately even if stale, refresh in background
db_data, db_ts = _load_db_cache()
if db_data:
_cache["data"] = db_data
_cache["ts"] = db_ts
logger.info("Concerts loaded from DB cache (%d concerts)", len(db_data))
if now - db_ts >= CACHE_TTL:
# Stale — serve now, refresh silently in background
if _refresh_task is None or _refresh_task.done():
_refresh_task = asyncio.create_task(_do_refresh())
return _cache["data"]
# No cache at all — must scrape synchronously (first run or cleared DB)
await _do_refresh()
return _cache["data"]
async def fetch_concerts(page: int = 1, search: str = "", page_size: int = 24, category: str = "") -> dict:
all_c = await get_all_concerts()
if category:
all_c = [c for c in all_c if category in (c.get("categories") or [])]
cached_ids = {c["id"] for c in all_c}
if search:
q = search.lower()
local = [
c for c in all_c
if q in (c.get("title") or "").lower()
or q in (c.get("subtitle") or "").lower()
or q in (c.get("description") or "").lower()
]
# Remote search only when no category filter (results have no category info)
if not category:
loop = asyncio.get_event_loop()
remote_ids = await loop.run_in_executor(None, _search_sync, search)
new_ids = remote_ids - cached_ids
if new_ids:
extra = await loop.run_in_executor(None, _resolve_ids, new_ids, None)
local_ids = {c["id"] for c in local}
for c in extra:
if c["id"] not in local_ids:
local.append(c)
filtered = local
else:
filtered = all_c
start = (page - 1) * page_size
return {
"concerts": filtered[start : start + page_size],
"total": len(filtered),
"page": page,
"page_size": page_size,
"pages": max(1, (len(filtered) + page_size - 1) // page_size),
}
def get_versions(pid: str) -> list[dict]:
"""Fetch available stream versions from Arte Player API for a programme ID."""
try:
raw = _fetch_url(
PLAYER_API.format(pid=pid),
headers={"User-Agent": _HEADERS["User-Agent"], "Accept": "application/json"},
)
data = json.loads(raw)
streams = data["data"]["attributes"].get("streams") or []
return streams[0].get("versions") or [] if streams else []
except Exception as ex:
logger.debug("Failed to get versions for %s: %s", pid, ex)
return []
def select_lang_tag(versions: list[dict]) -> str:
"""
Determine UNFR language tag from stream versions.
FR audio → FRENCH, non-FR + FR subs → VOSTFR, otherwise → VO.
"""
if not versions:
return "VO"
if any(v.get("audioLanguage") == "fr" for v in versions):
return "FRENCH"
if any(v.get("subtitleLanguage") == "fr" for v in versions):
return "VOSTFR"
return "VO"
async def invalidate_cache() -> int:
_cache["ts"] = 0
try:
with _db() as conn:
conn.execute("DELETE FROM concerts_cache")
except Exception:
pass
data = await get_all_concerts()
return len(data)