import re import sqlite3 import time import logging import asyncio import urllib.request import json from concurrent.futures import ThreadPoolExecutor from urllib.parse import quote_plus import tmdb as _tmdb logger = logging.getLogger(__name__) CACHE_TTL = 6 * 3600 DB_PATH = "data/arte_dl.db" _cache: dict = {"data": [], "ts": 0} _fetch_lock: asyncio.Lock | None = None _refresh_task: asyncio.Task | None = None def _get_fetch_lock() -> asyncio.Lock: global _fetch_lock if _fetch_lock is None: _fetch_lock = asyncio.Lock() return _fetch_lock import os as _os _os.makedirs("data", exist_ok=True) def _db(): conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row return conn def _init_concerts_cache_table(): with _db() as conn: conn.execute(""" CREATE TABLE IF NOT EXISTS concerts_cache ( id INTEGER PRIMARY KEY CHECK (id = 1), data TEXT NOT NULL, ts REAL NOT NULL ) """) _init_concerts_cache_table() def _load_db_cache() -> tuple[list, float]: try: with _db() as conn: row = conn.execute("SELECT data, ts FROM concerts_cache WHERE id=1").fetchone() if row: return json.loads(row["data"]), row["ts"] except Exception: pass return [], 0.0 def _save_db_cache(data: list, ts: float): try: with _db() as conn: conn.execute( "INSERT OR REPLACE INTO concerts_cache (id, data, ts) VALUES (1, ?, ?)", (json.dumps(data), ts), ) except Exception as e: logger.warning("Failed to save concerts cache: %s", e) PLAYER_API = "https://api.arte.tv/api/player/v2/config/fr/{pid}" SEARCH_URL = "https://www.arte.tv/fr/search/?q={q}" GENRE_PAGES = [ ("Pop & Rock", "https://www.arte.tv/fr/p/pop-rock/"), ("Classique", "https://www.arte.tv/fr/p/classique/"), ("Electro", "https://www.arte.tv/fr/p/musiques-electroniques/"), ("Jazz", "https://www.arte.tv/fr/p/jazz"), ("Arts de la scène", "https://www.arte.tv/fr/p/arts-de-la-scene"), ("Hip-hop", "https://www.arte.tv/fr/p/hip-hop"), ("Metal", "https://www.arte.tv/fr/p/metal"), ("Opéra", "https://www.arte.tv/fr/p/opera"), ("World", "https://www.arte.tv/fr/p/world"), ("Baroque", "https://www.arte.tv/fr/p/musique-baroque/"), ] EXTRA_PAGES = [ "https://www.arte.tv/fr/arte-concert/agenda/", "https://www.arte.tv/fr/arte-concert/", ] CATEGORIES = [name for name, _ in GENRE_PAGES] _HEADERS = { "User-Agent": ( "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 " "Chrome/120.0.0.0 Safari/537.36" ), "RSC": "1", } _PROG_RE = re.compile(r"\b(\d{6}-\d{3}-[A-Z])\b") def _fetch_url(url: str, headers: dict | None = None) -> str: req = urllib.request.Request(url, headers=headers or _HEADERS) with urllib.request.urlopen(req, timeout=15) as r: return r.read().decode("utf-8", errors="replace") EMAC_ZONE_API = "https://api.arte.tv/api/emac/v4/fr/web/zones/{zone_id}/content?page={page}" _EMAC_HEADERS = {"User-Agent": _HEADERS["User-Agent"], "Origin": "https://www.arte.tv"} _ZONE_RE = re.compile(r"/api/emac/v4/fr/web/zones/([a-f0-9-]+)/content\?page=1") def _zone_ids_from_page(url: str) -> list[str]: try: html = _fetch_url(url) return _ZONE_RE.findall(html) except Exception as ex: logger.warning("Failed to fetch %s: %s", url, ex) return [] def _fetch_zone_concerts(zone_id: str, category: str) -> list[dict]: """Fetch all concerts from a single EMAC zone (paginated).""" concerts = [] page = 1 while True: url = EMAC_ZONE_API.format(zone_id=zone_id, page=page) try: raw = _fetch_url(url, headers=_EMAC_HEADERS) data = json.loads(raw) except Exception as ex: logger.warning("EMAC zone %s page %d failed: %s", zone_id, page, ex) break for item in data.get("data", []): pid = item.get("programId") or "" if not pid: continue img = item.get("mainImage") or {} avail = item.get("availability") or {} concerts.append({ "id": pid, "title": item.get("title") or "", "subtitle": item.get("subtitle") or "", "url": item.get("url") or f"https://www.arte.tv/fr/videos/{pid}/", "thumbnail": img.get("url") or "", "duration": item.get("duration"), "description": item.get("shortDescription") or item.get("teaserText") or "", "expiry": avail.get("end") or "", }) pagination = data.get("pagination", {}) if page >= pagination.get("pages", 1): break page += 1 logger.info(" %s (zone %s…) → %d concerts", category, zone_id[:8], len(concerts)) return concerts def _metadata_for_pid(pid: str) -> dict | None: try: raw = _fetch_url( PLAYER_API.format(pid=pid), headers={"User-Agent": _HEADERS["User-Agent"], "Accept": "application/json"}, ) data = json.loads(raw) attrs = data["data"]["attributes"] meta = attrs["metadata"] url = (meta.get("link") or {}).get("url") or f"https://www.arte.tv/fr/videos/{pid}/" imgs = meta.get("images") or [] thumbnail = imgs[0]["url"] if imgs else "" duration_s = (meta.get("duration") or {}).get("seconds") rights = attrs.get("rights") or {} return { "id": pid, "title": meta.get("title") or "", "subtitle": meta.get("subtitle") or "", "url": url, "thumbnail": thumbnail, "duration": duration_s, "description": meta.get("description") or "", "expiry": rights.get("end") or "", } except Exception as ex: logger.debug("Failed to get metadata for %s: %s", pid, ex) return None def _fetch_all_sync() -> list[dict]: by_id: dict[str, dict] = {} id_cats: dict[str, list[str]] = {} # Genre pages: use EMAC zone API (paginated) — gets ALL concerts, not just initial HTML for name, url in GENRE_PAGES: zone_ids = _zone_ids_from_page(url) if not zone_ids: logger.warning("No zone IDs found for %s (%s)", name, url) continue # All zones on a page are identical copies; use the first one concerts = _fetch_zone_concerts(zone_ids[0], name) for c in concerts: pid = c["id"] id_cats.setdefault(pid, []).append(name) if pid not in by_id: by_id[pid] = c # Extra pages: fall back to regex (no EMAC zones) for url in EXTRA_PAGES: try: extra_ids = set(_PROG_RE.findall(_fetch_url(url))) except Exception as ex: logger.warning("Failed to fetch %s: %s", url, ex) extra_ids = set() new_ids = extra_ids - set(by_id) logger.info(" %s → %d new IDs", url.split("/fr/")[1], len(new_ids)) if new_ids: with ThreadPoolExecutor(max_workers=10) as pool: results = list(pool.map(_metadata_for_pid, sorted(new_ids))) for meta in results: if meta and meta.get("title"): by_id[meta["id"]] = meta logger.info("Total unique programme IDs: %d", len(by_id)) concerts = list(by_id.values()) for c in concerts: c["categories"] = id_cats.get(c["id"], []) # TMDB enrichment (concurrent, results cached in SQLite) def _enrich(c: dict) -> dict: t = _tmdb.lookup(c["id"], c.get("title", ""), c.get("subtitle", "")) if t: c.update(t) return c with ThreadPoolExecutor(max_workers=5) as pool: concerts = list(pool.map(_enrich, concerts)) concerts.sort(key=lambda c: c.get("expiry") or "", reverse=True) return concerts def _resolve_ids(ids: set[str], exclude: set[str] | None = None) -> list[dict]: to_fetch = ids - (exclude or set()) with ThreadPoolExecutor(max_workers=10) as pool: results = list(pool.map(_metadata_for_pid, sorted(to_fetch))) return [c for c in results if c and c.get("title")] def _search_sync(query: str) -> set[str]: url = SEARCH_URL.format(q=quote_plus(query)) try: html = _fetch_url(url) return set(_PROG_RE.findall(html)) except Exception as ex: logger.warning("Search failed for %r: %s", query, ex) return set() # ── public API ──────────────────────────────────────────────────────────────── async def _ensure_cache() -> list[dict]: return await get_all_concerts() async def get_concerts_by_category(category: str) -> list[dict]: data = await _ensure_cache() return [c for c in data if category in (c.get("categories") or [])] async def _do_refresh(): """Full scrape under lock; updates in-memory + DB cache.""" global _refresh_task async with _get_fetch_lock(): if _cache["data"] and time.time() - _cache["ts"] < CACHE_TTL: return loop = asyncio.get_event_loop() data = await loop.run_in_executor(None, _fetch_all_sync) if data: ts = time.time() _cache["data"] = data _cache["ts"] = ts _save_db_cache(data, ts) logger.info("Cache refreshed: %d concerts", len(data)) _refresh_task = None async def get_all_concerts() -> list[dict]: global _refresh_task now = time.time() # In-memory cache hit if _cache["data"] and now - _cache["ts"] < CACHE_TTL: return _cache["data"] # Try DB cache — return immediately even if stale, refresh in background db_data, db_ts = _load_db_cache() if db_data: _cache["data"] = db_data _cache["ts"] = db_ts logger.info("Concerts loaded from DB cache (%d concerts)", len(db_data)) if now - db_ts >= CACHE_TTL: # Stale — serve now, refresh silently in background if _refresh_task is None or _refresh_task.done(): _refresh_task = asyncio.create_task(_do_refresh()) return _cache["data"] # No cache at all — must scrape synchronously (first run or cleared DB) await _do_refresh() return _cache["data"] async def fetch_concerts(page: int = 1, search: str = "", page_size: int = 24, category: str = "") -> dict: all_c = await get_all_concerts() if category: all_c = [c for c in all_c if category in (c.get("categories") or [])] cached_ids = {c["id"] for c in all_c} if search: q = search.lower() local = [ c for c in all_c if q in (c.get("title") or "").lower() or q in (c.get("subtitle") or "").lower() or q in (c.get("description") or "").lower() ] # Remote search only when no category filter (results have no category info) if not category: loop = asyncio.get_event_loop() remote_ids = await loop.run_in_executor(None, _search_sync, search) new_ids = remote_ids - cached_ids if new_ids: extra = await loop.run_in_executor(None, _resolve_ids, new_ids, None) local_ids = {c["id"] for c in local} for c in extra: if c["id"] not in local_ids: local.append(c) filtered = local else: filtered = all_c start = (page - 1) * page_size return { "concerts": filtered[start : start + page_size], "total": len(filtered), "page": page, "page_size": page_size, "pages": max(1, (len(filtered) + page_size - 1) // page_size), } def get_versions(pid: str) -> list[dict]: """Fetch available stream versions from Arte Player API for a programme ID.""" try: raw = _fetch_url( PLAYER_API.format(pid=pid), headers={"User-Agent": _HEADERS["User-Agent"], "Accept": "application/json"}, ) data = json.loads(raw) streams = data["data"]["attributes"].get("streams") or [] return streams[0].get("versions") or [] if streams else [] except Exception as ex: logger.debug("Failed to get versions for %s: %s", pid, ex) return [] def select_lang_tag(versions: list[dict]) -> str: """ Determine UNFR language tag from stream versions. FR audio → FRENCH, non-FR + FR subs → VOSTFR, otherwise → VO. """ if not versions: return "VO" if any(v.get("audioLanguage") == "fr" for v in versions): return "FRENCH" if any(v.get("subtitleLanguage") == "fr" for v in versions): return "VOSTFR" return "VO" async def invalidate_cache() -> int: _cache["ts"] = 0 try: with _db() as conn: conn.execute("DELETE FROM concerts_cache") except Exception: pass data = await get_all_concerts() return len(data)