import re import time import logging import asyncio import urllib.request import json from concurrent.futures import ThreadPoolExecutor logger = logging.getLogger(__name__) CACHE_TTL = 6 * 3600 _cache: dict = {"data": [], "ts": 0} PLAYER_API = "https://api.arte.tv/api/player/v2/config/fr/{pid}" GENRE_PAGES = [ ("classique", "https://www.arte.tv/fr/arte-concert/classique/"), ("jazz", "https://www.arte.tv/fr/arte-concert/jazz/"), ("rock-pop", "https://www.arte.tv/fr/arte-concert/rock-pop/"), ("opéra", "https://www.arte.tv/fr/arte-concert/opera/"), ("musique du monde","https://www.arte.tv/fr/arte-concert/musique-du-monde/"), ("électronique", "https://www.arte.tv/fr/arte-concert/electronica/"), ("agenda", "https://www.arte.tv/fr/arte-concert/agenda/"), ("", "https://www.arte.tv/fr/arte-concert/"), ] _HEADERS = { "User-Agent": ( "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 " "Chrome/120.0.0.0 Safari/537.36" ), "RSC": "1", } _PROG_RE = re.compile(r"\b(\d{6}-\d{3}-[A-Z])\b") def _fetch_url(url: str, headers: dict | None = None) -> str: req = urllib.request.Request(url, headers=headers or _HEADERS) with urllib.request.urlopen(req, timeout=15) as r: return r.read().decode("utf-8", errors="replace") def _prog_ids_from_page(url: str) -> set[str]: try: html = _fetch_url(url) return set(_PROG_RE.findall(html)) except Exception as ex: logger.warning("Failed to fetch %s: %s", url, ex) return set() def _metadata_for_pid(pid: str) -> dict | None: try: raw = _fetch_url( PLAYER_API.format(pid=pid), headers={"User-Agent": _HEADERS["User-Agent"], "Accept": "application/json"}, ) data = json.loads(raw) attrs = data["data"]["attributes"] meta = attrs["metadata"] url = (meta.get("link") or {}).get("url") or f"https://www.arte.tv/fr/videos/{pid}/" imgs = meta.get("images") or [] thumbnail = imgs[0]["url"] if imgs else "" duration_s = (meta.get("duration") or {}).get("seconds") rights = attrs.get("rights") or {} return { "id": pid, "title": meta.get("title", ""), "subtitle": meta.get("subtitle", ""), "url": url, "thumbnail": thumbnail, "duration": duration_s, "description": meta.get("description", "") or "", "expiry": rights.get("end", ""), } except Exception as ex: logger.debug("Failed to get metadata for %s: %s", pid, ex) return None def _fetch_all_sync() -> list[dict]: # 1 — collect programme IDs across all genre pages all_ids: set[str] = set() for _genre, url in GENRE_PAGES: ids = _prog_ids_from_page(url) logger.info(" %s → %d IDs", url.split("/fr/")[1], len(ids)) all_ids |= ids logger.info("Total unique programme IDs: %d", len(all_ids)) # 2 — fetch metadata concurrently concerts: list[dict] = [] with ThreadPoolExecutor(max_workers=10) as pool: results = list(pool.map(_metadata_for_pid, sorted(all_ids))) for c in results: if c and c["title"]: concerts.append(c) concerts.sort(key=lambda c: c.get("expiry", ""), reverse=True) return concerts async def get_all_concerts() -> list[dict]: now = time.time() if _cache["data"] and now - _cache["ts"] < CACHE_TTL: return _cache["data"] loop = asyncio.get_event_loop() data = await loop.run_in_executor(None, _fetch_all_sync) if data: _cache["data"] = data _cache["ts"] = now return _cache["data"] async def fetch_concerts(page: int = 1, search: str = "", page_size: int = 24) -> dict: all_c = await get_all_concerts() filtered = all_c if search: q = search.lower() filtered = [ c for c in all_c if q in c["title"].lower() or q in c.get("subtitle", "").lower() or q in c.get("description", "").lower() ] start = (page - 1) * page_size return { "concerts": filtered[start : start + page_size], "total": len(filtered), "page": page, "page_size": page_size, "pages": max(1, (len(filtered) + page_size - 1) // page_size), } async def invalidate_cache() -> int: _cache["ts"] = 0 data = await get_all_concerts() return len(data)