import re import time import logging import asyncio import urllib.request import json from concurrent.futures import ThreadPoolExecutor from urllib.parse import quote_plus logger = logging.getLogger(__name__) CACHE_TTL = 6 * 3600 _cache: dict = {"data": [], "ts": 0} PLAYER_API = "https://api.arte.tv/api/player/v2/config/fr/{pid}" SEARCH_URL = "https://www.arte.tv/fr/search/?q={q}" GENRE_PAGES = [ ("Pop & Rock", "https://www.arte.tv/fr/p/pop-rock/"), ("Classique", "https://www.arte.tv/fr/p/classique/"), ("Electro", "https://www.arte.tv/fr/p/musiques-electroniques/"), ("Jazz", "https://www.arte.tv/fr/p/jazz"), ("Arts de la scène", "https://www.arte.tv/fr/p/arts-de-la-scene"), ("Hip-hop", "https://www.arte.tv/fr/p/hip-hop"), ("Metal", "https://www.arte.tv/fr/p/metal"), ("Opéra", "https://www.arte.tv/fr/p/opera"), ("World", "https://www.arte.tv/fr/p/world"), ("Baroque", "https://www.arte.tv/fr/p/musique-baroque/"), ] EXTRA_PAGES = [ "https://www.arte.tv/fr/arte-concert/agenda/", "https://www.arte.tv/fr/arte-concert/", ] CATEGORIES = [name for name, _ in GENRE_PAGES] _HEADERS = { "User-Agent": ( "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 " "Chrome/120.0.0.0 Safari/537.36" ), "RSC": "1", } _PROG_RE = re.compile(r"\b(\d{6}-\d{3}-[A-Z])\b") def _fetch_url(url: str, headers: dict | None = None) -> str: req = urllib.request.Request(url, headers=headers or _HEADERS) with urllib.request.urlopen(req, timeout=15) as r: return r.read().decode("utf-8", errors="replace") def _prog_ids_from_page(url: str) -> set[str]: try: return set(_PROG_RE.findall(_fetch_url(url))) except Exception as ex: logger.warning("Failed to fetch %s: %s", url, ex) return set() def _metadata_for_pid(pid: str) -> dict | None: try: raw = _fetch_url( PLAYER_API.format(pid=pid), headers={"User-Agent": _HEADERS["User-Agent"], "Accept": "application/json"}, ) data = json.loads(raw) attrs = data["data"]["attributes"] meta = attrs["metadata"] url = (meta.get("link") or {}).get("url") or f"https://www.arte.tv/fr/videos/{pid}/" imgs = meta.get("images") or [] thumbnail = imgs[0]["url"] if imgs else "" duration_s = (meta.get("duration") or {}).get("seconds") rights = attrs.get("rights") or {} return { "id": pid, "title": meta.get("title") or "", "subtitle": meta.get("subtitle") or "", "url": url, "thumbnail": thumbnail, "duration": duration_s, "description": meta.get("description") or "", "expiry": rights.get("end") or "", } except Exception as ex: logger.debug("Failed to get metadata for %s: %s", pid, ex) return None def _fetch_all_sync() -> list[dict]: id_cats: dict[str, list[str]] = {} for name, url in GENRE_PAGES: ids = _prog_ids_from_page(url) logger.info(" %s → %d IDs", name, len(ids)) for pid in ids: id_cats.setdefault(pid, []).append(name) all_ids: set[str] = set(id_cats) for url in EXTRA_PAGES: ids = _prog_ids_from_page(url) logger.info(" %s → %d IDs", url.split("/fr/")[1], len(ids)) all_ids |= ids logger.info("Total unique programme IDs: %d", len(all_ids)) concerts = _resolve_ids(all_ids) for c in concerts: c["categories"] = id_cats.get(c["id"], []) concerts.sort(key=lambda c: c.get("expiry") or "", reverse=True) return concerts def _resolve_ids(ids: set[str], exclude: set[str] | None = None) -> list[dict]: to_fetch = ids - (exclude or set()) with ThreadPoolExecutor(max_workers=10) as pool: results = list(pool.map(_metadata_for_pid, sorted(to_fetch))) return [c for c in results if c and c.get("title")] def _search_sync(query: str) -> set[str]: url = SEARCH_URL.format(q=quote_plus(query)) try: html = _fetch_url(url) return set(_PROG_RE.findall(html)) except Exception as ex: logger.warning("Search failed for %r: %s", query, ex) return set() # ── public API ──────────────────────────────────────────────────────────────── async def get_all_concerts() -> list[dict]: now = time.time() if _cache["data"] and now - _cache["ts"] < CACHE_TTL: return _cache["data"] loop = asyncio.get_event_loop() data = await loop.run_in_executor(None, _fetch_all_sync) if data: _cache["data"] = data _cache["ts"] = now return _cache["data"] async def fetch_concerts(page: int = 1, search: str = "", page_size: int = 24, category: str = "") -> dict: all_c = await get_all_concerts() if category: all_c = [c for c in all_c if category in (c.get("categories") or [])] cached_ids = {c["id"] for c in all_c} if search: q = search.lower() local = [ c for c in all_c if q in (c.get("title") or "").lower() or q in (c.get("subtitle") or "").lower() or q in (c.get("description") or "").lower() ] # Remote search only when no category filter (results have no category info) if not category: loop = asyncio.get_event_loop() remote_ids = await loop.run_in_executor(None, _search_sync, search) new_ids = remote_ids - cached_ids if new_ids: extra = await loop.run_in_executor(None, _resolve_ids, new_ids, None) local_ids = {c["id"] for c in local} for c in extra: if c["id"] not in local_ids: local.append(c) filtered = local else: filtered = all_c start = (page - 1) * page_size return { "concerts": filtered[start : start + page_size], "total": len(filtered), "page": page, "page_size": page_size, "pages": max(1, (len(filtered) + page_size - 1) // page_size), } async def invalidate_cache() -> int: _cache["ts"] = 0 data = await get_all_concerts() return len(data)