Files
dev eac520bc8e
Docker / docker (push) Successful in 1m29s
feat: use EMAC zone API for paginated genre scraping
Previously only ~17 IDs were extracted from initial HTML per genre page
(before the "voir plus" button). The EMAC API at api.arte.tv exposes all
concerts with full pagination (e.g. 131 Metal, 187 Pop Rock).

Also reuses metadata from EMAC response (title, subtitle, thumbnail,
expiry) — skipping redundant player API calls for genre concerts.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-18 17:24:27 +02:00

396 lines
13 KiB
Python

import re
import sqlite3
import time
import logging
import asyncio
import urllib.request
import json
from concurrent.futures import ThreadPoolExecutor
from urllib.parse import quote_plus
import tmdb as _tmdb
logger = logging.getLogger(__name__)
CACHE_TTL = 6 * 3600
DB_PATH = "data/arte_dl.db"
_cache: dict = {"data": [], "ts": 0}
_fetch_lock: asyncio.Lock | None = None
_refresh_task: asyncio.Task | None = None
def _get_fetch_lock() -> asyncio.Lock:
global _fetch_lock
if _fetch_lock is None:
_fetch_lock = asyncio.Lock()
return _fetch_lock
import os as _os
_os.makedirs("data", exist_ok=True)
def _db():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def _init_concerts_cache_table():
with _db() as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS concerts_cache (
id INTEGER PRIMARY KEY CHECK (id = 1),
data TEXT NOT NULL,
ts REAL NOT NULL
)
""")
_init_concerts_cache_table()
def _load_db_cache() -> tuple[list, float]:
try:
with _db() as conn:
row = conn.execute("SELECT data, ts FROM concerts_cache WHERE id=1").fetchone()
if row:
return json.loads(row["data"]), row["ts"]
except Exception:
pass
return [], 0.0
def _save_db_cache(data: list, ts: float):
try:
with _db() as conn:
conn.execute(
"INSERT OR REPLACE INTO concerts_cache (id, data, ts) VALUES (1, ?, ?)",
(json.dumps(data), ts),
)
except Exception as e:
logger.warning("Failed to save concerts cache: %s", e)
PLAYER_API = "https://api.arte.tv/api/player/v2/config/fr/{pid}"
SEARCH_URL = "https://www.arte.tv/fr/search/?q={q}"
GENRE_PAGES = [
("Pop & Rock", "https://www.arte.tv/fr/p/pop-rock/"),
("Classique", "https://www.arte.tv/fr/p/classique/"),
("Electro", "https://www.arte.tv/fr/p/musiques-electroniques/"),
("Jazz", "https://www.arte.tv/fr/p/jazz"),
("Arts de la scène", "https://www.arte.tv/fr/p/arts-de-la-scene"),
("Hip-hop", "https://www.arte.tv/fr/p/hip-hop"),
("Metal", "https://www.arte.tv/fr/p/metal"),
("Opéra", "https://www.arte.tv/fr/p/opera"),
("World", "https://www.arte.tv/fr/p/world"),
("Baroque", "https://www.arte.tv/fr/p/musique-baroque/"),
]
EXTRA_PAGES = [
"https://www.arte.tv/fr/arte-concert/agenda/",
"https://www.arte.tv/fr/arte-concert/",
]
CATEGORIES = [name for name, _ in GENRE_PAGES]
_HEADERS = {
"User-Agent": (
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
"Chrome/120.0.0.0 Safari/537.36"
),
"RSC": "1",
}
_PROG_RE = re.compile(r"\b(\d{6}-\d{3}-[A-Z])\b")
def _fetch_url(url: str, headers: dict | None = None) -> str:
req = urllib.request.Request(url, headers=headers or _HEADERS)
with urllib.request.urlopen(req, timeout=15) as r:
return r.read().decode("utf-8", errors="replace")
EMAC_ZONE_API = "https://api.arte.tv/api/emac/v4/fr/web/zones/{zone_id}/content?page={page}"
_EMAC_HEADERS = {"User-Agent": _HEADERS["User-Agent"], "Origin": "https://www.arte.tv"}
_ZONE_RE = re.compile(r"/api/emac/v4/fr/web/zones/([a-f0-9-]+)/content\?page=1")
def _zone_ids_from_page(url: str) -> list[str]:
try:
html = _fetch_url(url)
return _ZONE_RE.findall(html)
except Exception as ex:
logger.warning("Failed to fetch %s: %s", url, ex)
return []
def _fetch_zone_concerts(zone_id: str, category: str) -> list[dict]:
"""Fetch all concerts from a single EMAC zone (paginated)."""
concerts = []
page = 1
while True:
url = EMAC_ZONE_API.format(zone_id=zone_id, page=page)
try:
raw = _fetch_url(url, headers=_EMAC_HEADERS)
data = json.loads(raw)
except Exception as ex:
logger.warning("EMAC zone %s page %d failed: %s", zone_id, page, ex)
break
for item in data.get("data", []):
pid = item.get("programId") or ""
if not pid:
continue
img = item.get("mainImage") or {}
avail = item.get("availability") or {}
concerts.append({
"id": pid,
"title": item.get("title") or "",
"subtitle": item.get("subtitle") or "",
"url": item.get("url") or f"https://www.arte.tv/fr/videos/{pid}/",
"thumbnail": img.get("url") or "",
"duration": item.get("duration"),
"description": item.get("shortDescription") or item.get("teaserText") or "",
"expiry": avail.get("end") or "",
})
pagination = data.get("pagination", {})
if page >= pagination.get("pages", 1):
break
page += 1
logger.info(" %s (zone %s…) → %d concerts", category, zone_id[:8], len(concerts))
return concerts
def _metadata_for_pid(pid: str) -> dict | None:
try:
raw = _fetch_url(
PLAYER_API.format(pid=pid),
headers={"User-Agent": _HEADERS["User-Agent"], "Accept": "application/json"},
)
data = json.loads(raw)
attrs = data["data"]["attributes"]
meta = attrs["metadata"]
url = (meta.get("link") or {}).get("url") or f"https://www.arte.tv/fr/videos/{pid}/"
imgs = meta.get("images") or []
thumbnail = imgs[0]["url"] if imgs else ""
duration_s = (meta.get("duration") or {}).get("seconds")
rights = attrs.get("rights") or {}
return {
"id": pid,
"title": meta.get("title") or "",
"subtitle": meta.get("subtitle") or "",
"url": url,
"thumbnail": thumbnail,
"duration": duration_s,
"description": meta.get("description") or "",
"expiry": rights.get("end") or "",
}
except Exception as ex:
logger.debug("Failed to get metadata for %s: %s", pid, ex)
return None
def _fetch_all_sync() -> list[dict]:
by_id: dict[str, dict] = {}
id_cats: dict[str, list[str]] = {}
# Genre pages: use EMAC zone API (paginated) — gets ALL concerts, not just initial HTML
for name, url in GENRE_PAGES:
zone_ids = _zone_ids_from_page(url)
if not zone_ids:
logger.warning("No zone IDs found for %s (%s)", name, url)
continue
# All zones on a page are identical copies; use the first one
concerts = _fetch_zone_concerts(zone_ids[0], name)
for c in concerts:
pid = c["id"]
id_cats.setdefault(pid, []).append(name)
if pid not in by_id:
by_id[pid] = c
# Extra pages: fall back to regex (no EMAC zones)
for url in EXTRA_PAGES:
try:
extra_ids = set(_PROG_RE.findall(_fetch_url(url)))
except Exception as ex:
logger.warning("Failed to fetch %s: %s", url, ex)
extra_ids = set()
new_ids = extra_ids - set(by_id)
logger.info(" %s%d new IDs", url.split("/fr/")[1], len(new_ids))
if new_ids:
with ThreadPoolExecutor(max_workers=10) as pool:
results = list(pool.map(_metadata_for_pid, sorted(new_ids)))
for meta in results:
if meta and meta.get("title"):
by_id[meta["id"]] = meta
logger.info("Total unique programme IDs: %d", len(by_id))
concerts = list(by_id.values())
for c in concerts:
c["categories"] = id_cats.get(c["id"], [])
# TMDB enrichment (concurrent, results cached in SQLite)
def _enrich(c: dict) -> dict:
t = _tmdb.lookup(c["id"], c.get("title", ""), c.get("subtitle", ""))
if t:
c.update(t)
return c
with ThreadPoolExecutor(max_workers=5) as pool:
concerts = list(pool.map(_enrich, concerts))
concerts.sort(key=lambda c: c.get("expiry") or "", reverse=True)
return concerts
def _resolve_ids(ids: set[str], exclude: set[str] | None = None) -> list[dict]:
to_fetch = ids - (exclude or set())
with ThreadPoolExecutor(max_workers=10) as pool:
results = list(pool.map(_metadata_for_pid, sorted(to_fetch)))
return [c for c in results if c and c.get("title")]
def _search_sync(query: str) -> set[str]:
url = SEARCH_URL.format(q=quote_plus(query))
try:
html = _fetch_url(url)
return set(_PROG_RE.findall(html))
except Exception as ex:
logger.warning("Search failed for %r: %s", query, ex)
return set()
# ── public API ────────────────────────────────────────────────────────────────
async def _ensure_cache() -> list[dict]:
return await get_all_concerts()
async def get_concerts_by_category(category: str) -> list[dict]:
data = await _ensure_cache()
return [c for c in data if category in (c.get("categories") or [])]
async def _do_refresh():
"""Full scrape under lock; updates in-memory + DB cache."""
global _refresh_task
async with _get_fetch_lock():
if _cache["data"] and time.time() - _cache["ts"] < CACHE_TTL:
return
loop = asyncio.get_event_loop()
data = await loop.run_in_executor(None, _fetch_all_sync)
if data:
ts = time.time()
_cache["data"] = data
_cache["ts"] = ts
_save_db_cache(data, ts)
logger.info("Cache refreshed: %d concerts", len(data))
_refresh_task = None
async def get_all_concerts() -> list[dict]:
global _refresh_task
now = time.time()
# In-memory cache hit
if _cache["data"] and now - _cache["ts"] < CACHE_TTL:
return _cache["data"]
# Try DB cache — return immediately even if stale, refresh in background
db_data, db_ts = _load_db_cache()
if db_data:
_cache["data"] = db_data
_cache["ts"] = db_ts
logger.info("Concerts loaded from DB cache (%d concerts)", len(db_data))
if now - db_ts >= CACHE_TTL:
# Stale — serve now, refresh silently in background
if _refresh_task is None or _refresh_task.done():
_refresh_task = asyncio.create_task(_do_refresh())
return _cache["data"]
# No cache at all — must scrape synchronously (first run or cleared DB)
await _do_refresh()
return _cache["data"]
async def fetch_concerts(page: int = 1, search: str = "", page_size: int = 24, category: str = "") -> dict:
all_c = await get_all_concerts()
if category:
all_c = [c for c in all_c if category in (c.get("categories") or [])]
cached_ids = {c["id"] for c in all_c}
if search:
q = search.lower()
local = [
c for c in all_c
if q in (c.get("title") or "").lower()
or q in (c.get("subtitle") or "").lower()
or q in (c.get("description") or "").lower()
]
# Remote search only when no category filter (results have no category info)
if not category:
loop = asyncio.get_event_loop()
remote_ids = await loop.run_in_executor(None, _search_sync, search)
new_ids = remote_ids - cached_ids
if new_ids:
extra = await loop.run_in_executor(None, _resolve_ids, new_ids, None)
local_ids = {c["id"] for c in local}
for c in extra:
if c["id"] not in local_ids:
local.append(c)
filtered = local
else:
filtered = all_c
start = (page - 1) * page_size
return {
"concerts": filtered[start : start + page_size],
"total": len(filtered),
"page": page,
"page_size": page_size,
"pages": max(1, (len(filtered) + page_size - 1) // page_size),
}
def get_versions(pid: str) -> list[dict]:
"""Fetch available stream versions from Arte Player API for a programme ID."""
try:
raw = _fetch_url(
PLAYER_API.format(pid=pid),
headers={"User-Agent": _HEADERS["User-Agent"], "Accept": "application/json"},
)
data = json.loads(raw)
streams = data["data"]["attributes"].get("streams") or []
return streams[0].get("versions") or [] if streams else []
except Exception as ex:
logger.debug("Failed to get versions for %s: %s", pid, ex)
return []
def select_lang_tag(versions: list[dict]) -> str:
"""
Determine UNFR language tag from stream versions.
FR audio → FRENCH, non-FR + FR subs → VOSTFR, otherwise → VO.
"""
if not versions:
return "VO"
if any(v.get("audioLanguage") == "fr" for v in versions):
return "FRENCH"
if any(v.get("subtitleLanguage") == "fr" for v in versions):
return "VOSTFR"
return "VO"
async def invalidate_cache() -> int:
_cache["ts"] = 0
try:
with _db() as conn:
conn.execute("DELETE FROM concerts_cache")
except Exception:
pass
data = await get_all_concerts()
return len(data)