Files
arte-dl/arte_api.py
T

272 lines
8.4 KiB
Python
Raw Normal View History

import re
import sqlite3
import time
import logging
import asyncio
import urllib.request
import json
from concurrent.futures import ThreadPoolExecutor
from urllib.parse import quote_plus
import tmdb as _tmdb
logger = logging.getLogger(__name__)
CACHE_TTL = 6 * 3600
DB_PATH = "arte_dl.db"
_cache: dict = {"data": [], "ts": 0}
def _db():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def _init_concerts_cache_table():
with _db() as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS concerts_cache (
id INTEGER PRIMARY KEY CHECK (id = 1),
data TEXT NOT NULL,
ts REAL NOT NULL
)
""")
_init_concerts_cache_table()
def _load_db_cache() -> tuple[list, float]:
try:
with _db() as conn:
row = conn.execute("SELECT data, ts FROM concerts_cache WHERE id=1").fetchone()
if row:
return json.loads(row["data"]), row["ts"]
except Exception:
pass
return [], 0.0
def _save_db_cache(data: list, ts: float):
try:
with _db() as conn:
conn.execute(
"INSERT OR REPLACE INTO concerts_cache (id, data, ts) VALUES (1, ?, ?)",
(json.dumps(data), ts),
)
except Exception as e:
logger.warning("Failed to save concerts cache: %s", e)
PLAYER_API = "https://api.arte.tv/api/player/v2/config/fr/{pid}"
SEARCH_URL = "https://www.arte.tv/fr/search/?q={q}"
GENRE_PAGES = [
2026-04-26 13:03:52 +02:00
("Pop & Rock", "https://www.arte.tv/fr/p/pop-rock/"),
("Classique", "https://www.arte.tv/fr/p/classique/"),
("Electro", "https://www.arte.tv/fr/p/musiques-electroniques/"),
("Jazz", "https://www.arte.tv/fr/p/jazz"),
("Arts de la scène", "https://www.arte.tv/fr/p/arts-de-la-scene"),
("Hip-hop", "https://www.arte.tv/fr/p/hip-hop"),
("Metal", "https://www.arte.tv/fr/p/metal"),
("Opéra", "https://www.arte.tv/fr/p/opera"),
("World", "https://www.arte.tv/fr/p/world"),
("Baroque", "https://www.arte.tv/fr/p/musique-baroque/"),
]
EXTRA_PAGES = [
"https://www.arte.tv/fr/arte-concert/agenda/",
"https://www.arte.tv/fr/arte-concert/",
]
2026-04-26 13:03:52 +02:00
CATEGORIES = [name for name, _ in GENRE_PAGES]
_HEADERS = {
"User-Agent": (
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
"Chrome/120.0.0.0 Safari/537.36"
),
"RSC": "1",
}
_PROG_RE = re.compile(r"\b(\d{6}-\d{3}-[A-Z])\b")
def _fetch_url(url: str, headers: dict | None = None) -> str:
req = urllib.request.Request(url, headers=headers or _HEADERS)
with urllib.request.urlopen(req, timeout=15) as r:
return r.read().decode("utf-8", errors="replace")
def _prog_ids_from_page(url: str) -> set[str]:
try:
return set(_PROG_RE.findall(_fetch_url(url)))
except Exception as ex:
logger.warning("Failed to fetch %s: %s", url, ex)
return set()
def _metadata_for_pid(pid: str) -> dict | None:
try:
raw = _fetch_url(
PLAYER_API.format(pid=pid),
headers={"User-Agent": _HEADERS["User-Agent"], "Accept": "application/json"},
)
data = json.loads(raw)
attrs = data["data"]["attributes"]
meta = attrs["metadata"]
url = (meta.get("link") or {}).get("url") or f"https://www.arte.tv/fr/videos/{pid}/"
imgs = meta.get("images") or []
thumbnail = imgs[0]["url"] if imgs else ""
duration_s = (meta.get("duration") or {}).get("seconds")
rights = attrs.get("rights") or {}
return {
"id": pid,
"title": meta.get("title") or "",
"subtitle": meta.get("subtitle") or "",
"url": url,
"thumbnail": thumbnail,
"duration": duration_s,
"description": meta.get("description") or "",
"expiry": rights.get("end") or "",
}
except Exception as ex:
logger.debug("Failed to get metadata for %s: %s", pid, ex)
return None
def _fetch_all_sync() -> list[dict]:
2026-04-26 13:03:52 +02:00
id_cats: dict[str, list[str]] = {}
for name, url in GENRE_PAGES:
ids = _prog_ids_from_page(url)
logger.info(" %s%d IDs", name, len(ids))
for pid in ids:
id_cats.setdefault(pid, []).append(name)
all_ids: set[str] = set(id_cats)
for url in EXTRA_PAGES:
ids = _prog_ids_from_page(url)
logger.info(" %s%d IDs", url.split("/fr/")[1], len(ids))
all_ids |= ids
logger.info("Total unique programme IDs: %d", len(all_ids))
concerts = _resolve_ids(all_ids)
2026-04-26 13:03:52 +02:00
for c in concerts:
c["categories"] = id_cats.get(c["id"], [])
# TMDB enrichment (concurrent, results cached in SQLite)
def _enrich(c: dict) -> dict:
t = _tmdb.lookup(c["id"], c.get("title", ""), c.get("subtitle", ""))
if t:
c.update(t)
return c
with ThreadPoolExecutor(max_workers=5) as pool:
concerts = list(pool.map(_enrich, concerts))
concerts.sort(key=lambda c: c.get("expiry") or "", reverse=True)
return concerts
def _resolve_ids(ids: set[str], exclude: set[str] | None = None) -> list[dict]:
to_fetch = ids - (exclude or set())
with ThreadPoolExecutor(max_workers=10) as pool:
results = list(pool.map(_metadata_for_pid, sorted(to_fetch)))
return [c for c in results if c and c.get("title")]
def _search_sync(query: str) -> set[str]:
url = SEARCH_URL.format(q=quote_plus(query))
try:
html = _fetch_url(url)
return set(_PROG_RE.findall(html))
except Exception as ex:
logger.warning("Search failed for %r: %s", query, ex)
return set()
# ── public API ────────────────────────────────────────────────────────────────
async def _ensure_cache() -> list[dict]:
return await get_all_concerts()
async def get_concerts_by_category(category: str) -> list[dict]:
data = await _ensure_cache()
return [c for c in data if category in (c.get("categories") or [])]
async def get_all_concerts() -> list[dict]:
now = time.time()
if _cache["data"] and now - _cache["ts"] < CACHE_TTL:
return _cache["data"]
# Try SQLite cache before hitting the network
db_data, db_ts = _load_db_cache()
if db_data and now - db_ts < CACHE_TTL:
logger.info("Concerts cache loaded from DB (%d concerts)", len(db_data))
_cache["data"] = db_data
_cache["ts"] = db_ts
return _cache["data"]
loop = asyncio.get_event_loop()
data = await loop.run_in_executor(None, _fetch_all_sync)
if data:
_cache["data"] = data
_cache["ts"] = now
_save_db_cache(data, now)
return _cache["data"]
2026-04-26 13:03:52 +02:00
async def fetch_concerts(page: int = 1, search: str = "", page_size: int = 24, category: str = "") -> dict:
all_c = await get_all_concerts()
2026-04-26 13:03:52 +02:00
if category:
all_c = [c for c in all_c if category in (c.get("categories") or [])]
cached_ids = {c["id"] for c in all_c}
if search:
q = search.lower()
local = [
c for c in all_c
if q in (c.get("title") or "").lower()
or q in (c.get("subtitle") or "").lower()
or q in (c.get("description") or "").lower()
]
2026-04-26 13:03:52 +02:00
# Remote search only when no category filter (results have no category info)
if not category:
loop = asyncio.get_event_loop()
remote_ids = await loop.run_in_executor(None, _search_sync, search)
new_ids = remote_ids - cached_ids
if new_ids:
extra = await loop.run_in_executor(None, _resolve_ids, new_ids, None)
local_ids = {c["id"] for c in local}
for c in extra:
if c["id"] not in local_ids:
local.append(c)
filtered = local
else:
filtered = all_c
start = (page - 1) * page_size
return {
"concerts": filtered[start : start + page_size],
"total": len(filtered),
"page": page,
"page_size": page_size,
"pages": max(1, (len(filtered) + page_size - 1) // page_size),
}
async def invalidate_cache() -> int:
_cache["ts"] = 0
try:
with _db() as conn:
conn.execute("DELETE FROM concerts_cache")
except Exception:
pass
data = await get_all_concerts()
return len(data)