import asyncio import logging import time import yt_dlp logger = logging.getLogger(__name__) CACHE_TTL = 6 * 3600 _cache: dict = {"data": [], "ts": 0} ARTE_CONCERT_URL = "https://www.arte.tv/fr/videos/RC-014034/arte-concert/" def _best_thumbnail(entry: dict) -> str: thumbs = entry.get("thumbnails") or [] if thumbs: # prefer largest sorted_thumbs = sorted(thumbs, key=lambda t: t.get("width", 0), reverse=True) return sorted_thumbs[0].get("url", "") return entry.get("thumbnail", "") def _normalize(e: dict) -> dict | None: if not e or not e.get("id"): return None video_id = e.get("id", "") url = ( e.get("url") or e.get("webpage_url") or f"https://www.arte.tv/fr/videos/{video_id}/" ) return { "id": video_id, "title": e.get("title", ""), "url": url, "thumbnail": _best_thumbnail(e), "duration": e.get("duration"), "description": e.get("description", ""), "upload_date": e.get("upload_date", ""), "release_timestamp": e.get("release_timestamp"), } def _fetch_sync() -> list: concerts: list = [] seen: set = set() ydl_opts = { "quiet": True, "no_warnings": True, "extract_flat": True, "ignoreerrors": True, } def _collect(entries: list, ydl, depth: int = 0): for e in entries or []: if not e: continue etype = e.get("_type", "") # sub-collection → recurse one level if etype in ("playlist", "url_transparent") and depth < 1: sub_url = e.get("url") or e.get("webpage_url") if sub_url: try: info = ydl.extract_info(sub_url, download=False) if info: _collect(info.get("entries", []), ydl, depth + 1) except Exception as ex: logger.debug("sub-collection error: %s", ex) continue entry = _normalize(e) if entry and entry["id"] not in seen: seen.add(entry["id"]) concerts.append(entry) with yt_dlp.YoutubeDL(ydl_opts) as ydl: try: info = ydl.extract_info(ARTE_CONCERT_URL, download=False) if info: _collect(info.get("entries", []), ydl) except Exception as ex: logger.error("fetch error: %s", ex) return concerts async def get_all_concerts() -> list: now = time.time() if _cache["data"] and now - _cache["ts"] < CACHE_TTL: return _cache["data"] loop = asyncio.get_event_loop() data = await loop.run_in_executor(None, _fetch_sync) if data: _cache["data"] = data _cache["ts"] = now return _cache["data"] async def fetch_concerts(page: int = 1, search: str = "", page_size: int = 24) -> dict: all_c = await get_all_concerts() filtered = all_c if search: q = search.lower() filtered = [c for c in all_c if q in c["title"].lower() or q in c["description"].lower()] start = (page - 1) * page_size page_data = filtered[start : start + page_size] return { "concerts": page_data, "total": len(filtered), "page": page, "page_size": page_size, "pages": max(1, (len(filtered) + page_size - 1) // page_size), } async def invalidate_cache() -> int: _cache["ts"] = 0 data = await get_all_concerts() return len(data)