eac520bc8e
Docker / docker (push) Successful in 1m29s
Previously only ~17 IDs were extracted from initial HTML per genre page (before the "voir plus" button). The EMAC API at api.arte.tv exposes all concerts with full pagination (e.g. 131 Metal, 187 Pop Rock). Also reuses metadata from EMAC response (title, subtitle, thumbnail, expiry) — skipping redundant player API calls for genre concerts. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
396 lines
13 KiB
Python
396 lines
13 KiB
Python
import re
|
|
import sqlite3
|
|
import time
|
|
import logging
|
|
import asyncio
|
|
import urllib.request
|
|
import json
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from urllib.parse import quote_plus
|
|
import tmdb as _tmdb
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
CACHE_TTL = 6 * 3600
|
|
DB_PATH = "data/arte_dl.db"
|
|
_cache: dict = {"data": [], "ts": 0}
|
|
_fetch_lock: asyncio.Lock | None = None
|
|
_refresh_task: asyncio.Task | None = None
|
|
|
|
|
|
def _get_fetch_lock() -> asyncio.Lock:
|
|
global _fetch_lock
|
|
if _fetch_lock is None:
|
|
_fetch_lock = asyncio.Lock()
|
|
return _fetch_lock
|
|
|
|
|
|
import os as _os
|
|
_os.makedirs("data", exist_ok=True)
|
|
|
|
|
|
def _db():
|
|
conn = sqlite3.connect(DB_PATH)
|
|
conn.row_factory = sqlite3.Row
|
|
return conn
|
|
|
|
|
|
def _init_concerts_cache_table():
|
|
with _db() as conn:
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS concerts_cache (
|
|
id INTEGER PRIMARY KEY CHECK (id = 1),
|
|
data TEXT NOT NULL,
|
|
ts REAL NOT NULL
|
|
)
|
|
""")
|
|
|
|
|
|
_init_concerts_cache_table()
|
|
|
|
|
|
def _load_db_cache() -> tuple[list, float]:
|
|
try:
|
|
with _db() as conn:
|
|
row = conn.execute("SELECT data, ts FROM concerts_cache WHERE id=1").fetchone()
|
|
if row:
|
|
return json.loads(row["data"]), row["ts"]
|
|
except Exception:
|
|
pass
|
|
return [], 0.0
|
|
|
|
|
|
def _save_db_cache(data: list, ts: float):
|
|
try:
|
|
with _db() as conn:
|
|
conn.execute(
|
|
"INSERT OR REPLACE INTO concerts_cache (id, data, ts) VALUES (1, ?, ?)",
|
|
(json.dumps(data), ts),
|
|
)
|
|
except Exception as e:
|
|
logger.warning("Failed to save concerts cache: %s", e)
|
|
|
|
PLAYER_API = "https://api.arte.tv/api/player/v2/config/fr/{pid}"
|
|
SEARCH_URL = "https://www.arte.tv/fr/search/?q={q}"
|
|
|
|
GENRE_PAGES = [
|
|
("Pop & Rock", "https://www.arte.tv/fr/p/pop-rock/"),
|
|
("Classique", "https://www.arte.tv/fr/p/classique/"),
|
|
("Electro", "https://www.arte.tv/fr/p/musiques-electroniques/"),
|
|
("Jazz", "https://www.arte.tv/fr/p/jazz"),
|
|
("Arts de la scène", "https://www.arte.tv/fr/p/arts-de-la-scene"),
|
|
("Hip-hop", "https://www.arte.tv/fr/p/hip-hop"),
|
|
("Metal", "https://www.arte.tv/fr/p/metal"),
|
|
("Opéra", "https://www.arte.tv/fr/p/opera"),
|
|
("World", "https://www.arte.tv/fr/p/world"),
|
|
("Baroque", "https://www.arte.tv/fr/p/musique-baroque/"),
|
|
]
|
|
|
|
EXTRA_PAGES = [
|
|
"https://www.arte.tv/fr/arte-concert/agenda/",
|
|
"https://www.arte.tv/fr/arte-concert/",
|
|
]
|
|
|
|
CATEGORIES = [name for name, _ in GENRE_PAGES]
|
|
|
|
_HEADERS = {
|
|
"User-Agent": (
|
|
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
|
|
"Chrome/120.0.0.0 Safari/537.36"
|
|
),
|
|
"RSC": "1",
|
|
}
|
|
|
|
_PROG_RE = re.compile(r"\b(\d{6}-\d{3}-[A-Z])\b")
|
|
|
|
|
|
def _fetch_url(url: str, headers: dict | None = None) -> str:
|
|
req = urllib.request.Request(url, headers=headers or _HEADERS)
|
|
with urllib.request.urlopen(req, timeout=15) as r:
|
|
return r.read().decode("utf-8", errors="replace")
|
|
|
|
|
|
EMAC_ZONE_API = "https://api.arte.tv/api/emac/v4/fr/web/zones/{zone_id}/content?page={page}"
|
|
_EMAC_HEADERS = {"User-Agent": _HEADERS["User-Agent"], "Origin": "https://www.arte.tv"}
|
|
_ZONE_RE = re.compile(r"/api/emac/v4/fr/web/zones/([a-f0-9-]+)/content\?page=1")
|
|
|
|
|
|
def _zone_ids_from_page(url: str) -> list[str]:
|
|
try:
|
|
html = _fetch_url(url)
|
|
return _ZONE_RE.findall(html)
|
|
except Exception as ex:
|
|
logger.warning("Failed to fetch %s: %s", url, ex)
|
|
return []
|
|
|
|
|
|
def _fetch_zone_concerts(zone_id: str, category: str) -> list[dict]:
|
|
"""Fetch all concerts from a single EMAC zone (paginated)."""
|
|
concerts = []
|
|
page = 1
|
|
while True:
|
|
url = EMAC_ZONE_API.format(zone_id=zone_id, page=page)
|
|
try:
|
|
raw = _fetch_url(url, headers=_EMAC_HEADERS)
|
|
data = json.loads(raw)
|
|
except Exception as ex:
|
|
logger.warning("EMAC zone %s page %d failed: %s", zone_id, page, ex)
|
|
break
|
|
for item in data.get("data", []):
|
|
pid = item.get("programId") or ""
|
|
if not pid:
|
|
continue
|
|
img = item.get("mainImage") or {}
|
|
avail = item.get("availability") or {}
|
|
concerts.append({
|
|
"id": pid,
|
|
"title": item.get("title") or "",
|
|
"subtitle": item.get("subtitle") or "",
|
|
"url": item.get("url") or f"https://www.arte.tv/fr/videos/{pid}/",
|
|
"thumbnail": img.get("url") or "",
|
|
"duration": item.get("duration"),
|
|
"description": item.get("shortDescription") or item.get("teaserText") or "",
|
|
"expiry": avail.get("end") or "",
|
|
})
|
|
pagination = data.get("pagination", {})
|
|
if page >= pagination.get("pages", 1):
|
|
break
|
|
page += 1
|
|
logger.info(" %s (zone %s…) → %d concerts", category, zone_id[:8], len(concerts))
|
|
return concerts
|
|
|
|
|
|
def _metadata_for_pid(pid: str) -> dict | None:
|
|
try:
|
|
raw = _fetch_url(
|
|
PLAYER_API.format(pid=pid),
|
|
headers={"User-Agent": _HEADERS["User-Agent"], "Accept": "application/json"},
|
|
)
|
|
data = json.loads(raw)
|
|
attrs = data["data"]["attributes"]
|
|
meta = attrs["metadata"]
|
|
|
|
url = (meta.get("link") or {}).get("url") or f"https://www.arte.tv/fr/videos/{pid}/"
|
|
imgs = meta.get("images") or []
|
|
thumbnail = imgs[0]["url"] if imgs else ""
|
|
duration_s = (meta.get("duration") or {}).get("seconds")
|
|
rights = attrs.get("rights") or {}
|
|
|
|
return {
|
|
"id": pid,
|
|
"title": meta.get("title") or "",
|
|
"subtitle": meta.get("subtitle") or "",
|
|
"url": url,
|
|
"thumbnail": thumbnail,
|
|
"duration": duration_s,
|
|
"description": meta.get("description") or "",
|
|
"expiry": rights.get("end") or "",
|
|
}
|
|
except Exception as ex:
|
|
logger.debug("Failed to get metadata for %s: %s", pid, ex)
|
|
return None
|
|
|
|
|
|
def _fetch_all_sync() -> list[dict]:
|
|
by_id: dict[str, dict] = {}
|
|
id_cats: dict[str, list[str]] = {}
|
|
|
|
# Genre pages: use EMAC zone API (paginated) — gets ALL concerts, not just initial HTML
|
|
for name, url in GENRE_PAGES:
|
|
zone_ids = _zone_ids_from_page(url)
|
|
if not zone_ids:
|
|
logger.warning("No zone IDs found for %s (%s)", name, url)
|
|
continue
|
|
# All zones on a page are identical copies; use the first one
|
|
concerts = _fetch_zone_concerts(zone_ids[0], name)
|
|
for c in concerts:
|
|
pid = c["id"]
|
|
id_cats.setdefault(pid, []).append(name)
|
|
if pid not in by_id:
|
|
by_id[pid] = c
|
|
|
|
# Extra pages: fall back to regex (no EMAC zones)
|
|
for url in EXTRA_PAGES:
|
|
try:
|
|
extra_ids = set(_PROG_RE.findall(_fetch_url(url)))
|
|
except Exception as ex:
|
|
logger.warning("Failed to fetch %s: %s", url, ex)
|
|
extra_ids = set()
|
|
new_ids = extra_ids - set(by_id)
|
|
logger.info(" %s → %d new IDs", url.split("/fr/")[1], len(new_ids))
|
|
if new_ids:
|
|
with ThreadPoolExecutor(max_workers=10) as pool:
|
|
results = list(pool.map(_metadata_for_pid, sorted(new_ids)))
|
|
for meta in results:
|
|
if meta and meta.get("title"):
|
|
by_id[meta["id"]] = meta
|
|
|
|
logger.info("Total unique programme IDs: %d", len(by_id))
|
|
concerts = list(by_id.values())
|
|
for c in concerts:
|
|
c["categories"] = id_cats.get(c["id"], [])
|
|
|
|
# TMDB enrichment (concurrent, results cached in SQLite)
|
|
def _enrich(c: dict) -> dict:
|
|
t = _tmdb.lookup(c["id"], c.get("title", ""), c.get("subtitle", ""))
|
|
if t:
|
|
c.update(t)
|
|
return c
|
|
|
|
with ThreadPoolExecutor(max_workers=5) as pool:
|
|
concerts = list(pool.map(_enrich, concerts))
|
|
|
|
concerts.sort(key=lambda c: c.get("expiry") or "", reverse=True)
|
|
return concerts
|
|
|
|
|
|
def _resolve_ids(ids: set[str], exclude: set[str] | None = None) -> list[dict]:
|
|
to_fetch = ids - (exclude or set())
|
|
with ThreadPoolExecutor(max_workers=10) as pool:
|
|
results = list(pool.map(_metadata_for_pid, sorted(to_fetch)))
|
|
return [c for c in results if c and c.get("title")]
|
|
|
|
|
|
def _search_sync(query: str) -> set[str]:
|
|
url = SEARCH_URL.format(q=quote_plus(query))
|
|
try:
|
|
html = _fetch_url(url)
|
|
return set(_PROG_RE.findall(html))
|
|
except Exception as ex:
|
|
logger.warning("Search failed for %r: %s", query, ex)
|
|
return set()
|
|
|
|
|
|
# ── public API ────────────────────────────────────────────────────────────────
|
|
|
|
async def _ensure_cache() -> list[dict]:
|
|
return await get_all_concerts()
|
|
|
|
|
|
async def get_concerts_by_category(category: str) -> list[dict]:
|
|
data = await _ensure_cache()
|
|
return [c for c in data if category in (c.get("categories") or [])]
|
|
|
|
|
|
async def _do_refresh():
|
|
"""Full scrape under lock; updates in-memory + DB cache."""
|
|
global _refresh_task
|
|
async with _get_fetch_lock():
|
|
if _cache["data"] and time.time() - _cache["ts"] < CACHE_TTL:
|
|
return
|
|
loop = asyncio.get_event_loop()
|
|
data = await loop.run_in_executor(None, _fetch_all_sync)
|
|
if data:
|
|
ts = time.time()
|
|
_cache["data"] = data
|
|
_cache["ts"] = ts
|
|
_save_db_cache(data, ts)
|
|
logger.info("Cache refreshed: %d concerts", len(data))
|
|
_refresh_task = None
|
|
|
|
|
|
async def get_all_concerts() -> list[dict]:
|
|
global _refresh_task
|
|
now = time.time()
|
|
|
|
# In-memory cache hit
|
|
if _cache["data"] and now - _cache["ts"] < CACHE_TTL:
|
|
return _cache["data"]
|
|
|
|
# Try DB cache — return immediately even if stale, refresh in background
|
|
db_data, db_ts = _load_db_cache()
|
|
if db_data:
|
|
_cache["data"] = db_data
|
|
_cache["ts"] = db_ts
|
|
logger.info("Concerts loaded from DB cache (%d concerts)", len(db_data))
|
|
if now - db_ts >= CACHE_TTL:
|
|
# Stale — serve now, refresh silently in background
|
|
if _refresh_task is None or _refresh_task.done():
|
|
_refresh_task = asyncio.create_task(_do_refresh())
|
|
return _cache["data"]
|
|
|
|
# No cache at all — must scrape synchronously (first run or cleared DB)
|
|
await _do_refresh()
|
|
return _cache["data"]
|
|
|
|
|
|
async def fetch_concerts(page: int = 1, search: str = "", page_size: int = 24, category: str = "") -> dict:
|
|
all_c = await get_all_concerts()
|
|
|
|
if category:
|
|
all_c = [c for c in all_c if category in (c.get("categories") or [])]
|
|
|
|
cached_ids = {c["id"] for c in all_c}
|
|
|
|
if search:
|
|
q = search.lower()
|
|
local = [
|
|
c for c in all_c
|
|
if q in (c.get("title") or "").lower()
|
|
or q in (c.get("subtitle") or "").lower()
|
|
or q in (c.get("description") or "").lower()
|
|
]
|
|
# Remote search only when no category filter (results have no category info)
|
|
if not category:
|
|
loop = asyncio.get_event_loop()
|
|
remote_ids = await loop.run_in_executor(None, _search_sync, search)
|
|
new_ids = remote_ids - cached_ids
|
|
if new_ids:
|
|
extra = await loop.run_in_executor(None, _resolve_ids, new_ids, None)
|
|
local_ids = {c["id"] for c in local}
|
|
for c in extra:
|
|
if c["id"] not in local_ids:
|
|
local.append(c)
|
|
filtered = local
|
|
else:
|
|
filtered = all_c
|
|
|
|
start = (page - 1) * page_size
|
|
return {
|
|
"concerts": filtered[start : start + page_size],
|
|
"total": len(filtered),
|
|
"page": page,
|
|
"page_size": page_size,
|
|
"pages": max(1, (len(filtered) + page_size - 1) // page_size),
|
|
}
|
|
|
|
|
|
def get_versions(pid: str) -> list[dict]:
|
|
"""Fetch available stream versions from Arte Player API for a programme ID."""
|
|
try:
|
|
raw = _fetch_url(
|
|
PLAYER_API.format(pid=pid),
|
|
headers={"User-Agent": _HEADERS["User-Agent"], "Accept": "application/json"},
|
|
)
|
|
data = json.loads(raw)
|
|
streams = data["data"]["attributes"].get("streams") or []
|
|
return streams[0].get("versions") or [] if streams else []
|
|
except Exception as ex:
|
|
logger.debug("Failed to get versions for %s: %s", pid, ex)
|
|
return []
|
|
|
|
|
|
def select_lang_tag(versions: list[dict]) -> str:
|
|
"""
|
|
Determine UNFR language tag from stream versions.
|
|
FR audio → FRENCH, non-FR + FR subs → VOSTFR, otherwise → VO.
|
|
"""
|
|
if not versions:
|
|
return "VO"
|
|
if any(v.get("audioLanguage") == "fr" for v in versions):
|
|
return "FRENCH"
|
|
if any(v.get("subtitleLanguage") == "fr" for v in versions):
|
|
return "VOSTFR"
|
|
return "VO"
|
|
|
|
|
|
async def invalidate_cache() -> int:
|
|
_cache["ts"] = 0
|
|
try:
|
|
with _db() as conn:
|
|
conn.execute("DELETE FROM concerts_cache")
|
|
except Exception:
|
|
pass
|
|
data = await get_all_concerts()
|
|
return len(data)
|