All checks were successful
Docker / docker (push) Successful in 1m43s
Bouton dans le header (visible uniquement en filtre 10/10) qui supprime toutes les notes 10/10 en une fois via l'API Trakt bulk. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
232 lines
6.7 KiB
Python
232 lines
6.7 KiB
Python
import os
|
|
import time
|
|
|
|
from dotenv import load_dotenv
|
|
from fastapi import FastAPI, HTTPException, Request
|
|
from fastapi.responses import FileResponse, RedirectResponse
|
|
from fastapi.staticfiles import StaticFiles
|
|
from pydantic import BaseModel
|
|
from starlette.middleware.sessions import SessionMiddleware
|
|
|
|
from tmdb import TMDBClient
|
|
from trakt import TraktClient
|
|
|
|
load_dotenv()
|
|
|
|
app = FastAPI()
|
|
app.add_middleware(
|
|
SessionMiddleware,
|
|
secret_key=os.getenv("SECRET_KEY", "change-me"),
|
|
max_age=86400 * 30,
|
|
)
|
|
|
|
TRAKT_CLIENT_ID = os.getenv("TRAKT_CLIENT_ID", "")
|
|
TRAKT_CLIENT_SECRET = os.getenv("TRAKT_CLIENT_SECRET", "")
|
|
TRAKT_REDIRECT_URI = os.getenv("TRAKT_REDIRECT_URI", "http://localhost:8000/auth/callback")
|
|
TMDB_API_KEY = os.getenv("TMDB_API_KEY", "")
|
|
|
|
# Simple in-memory cache (per user token)
|
|
_cache: dict = {}
|
|
CACHE_TTL = 300 # 5 minutes
|
|
|
|
|
|
def cache_get(key: str):
|
|
if key in _cache:
|
|
data, ts = _cache[key]
|
|
if time.time() - ts < CACHE_TTL:
|
|
return data
|
|
del _cache[key]
|
|
return None
|
|
|
|
|
|
def cache_set(key: str, data):
|
|
_cache[key] = (data, time.time())
|
|
|
|
|
|
def cache_del(key: str):
|
|
_cache.pop(key, None)
|
|
|
|
|
|
@app.get("/")
|
|
async def root():
|
|
return FileResponse("static/index.html")
|
|
|
|
|
|
@app.get("/api/auth/status")
|
|
async def auth_status(request: Request):
|
|
return {"authenticated": bool(request.session.get("access_token"))}
|
|
|
|
|
|
@app.get("/auth/login")
|
|
async def auth_login():
|
|
url = (
|
|
f"https://trakt.tv/oauth/authorize"
|
|
f"?response_type=code"
|
|
f"&client_id={TRAKT_CLIENT_ID}"
|
|
f"&redirect_uri={TRAKT_REDIRECT_URI}"
|
|
)
|
|
return RedirectResponse(url)
|
|
|
|
|
|
@app.get("/auth/callback")
|
|
async def auth_callback(request: Request, code: str):
|
|
import httpx
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
resp = await client.post(
|
|
"https://api.trakt.tv/oauth/token",
|
|
json={
|
|
"code": code,
|
|
"client_id": TRAKT_CLIENT_ID,
|
|
"client_secret": TRAKT_CLIENT_SECRET,
|
|
"redirect_uri": TRAKT_REDIRECT_URI,
|
|
"grant_type": "authorization_code",
|
|
},
|
|
)
|
|
data = resp.json()
|
|
if "access_token" not in data:
|
|
raise HTTPException(400, f"OAuth failed: {data}")
|
|
request.session["access_token"] = data["access_token"]
|
|
return RedirectResponse("/")
|
|
|
|
|
|
@app.get("/auth/logout")
|
|
async def auth_logout(request: Request):
|
|
request.session.clear()
|
|
return RedirectResponse("/")
|
|
|
|
|
|
@app.get("/api/movies")
|
|
async def get_movies(
|
|
request: Request,
|
|
page: int = 1,
|
|
per_page: int = 20,
|
|
sort: str = "watched_at",
|
|
filter_type: str = "all",
|
|
exclude: str = "", # comma-separated trakt IDs to skip client-side
|
|
):
|
|
token = request.session.get("access_token")
|
|
if not token:
|
|
raise HTTPException(401, "Not authenticated")
|
|
|
|
cache_key = f"movies_{token[:16]}"
|
|
all_movies = cache_get(cache_key)
|
|
|
|
if all_movies is None:
|
|
trakt = TraktClient(TRAKT_CLIENT_ID, token)
|
|
watched, ratings = await trakt.get_watched_and_ratings()
|
|
rating_map = {r["movie"]["ids"]["trakt"]: r["rating"] for r in ratings}
|
|
|
|
all_movies = []
|
|
for w in watched:
|
|
m = w["movie"]
|
|
tid = m["ids"]["trakt"]
|
|
r = rating_map.get(tid)
|
|
if r is None or r == 10:
|
|
all_movies.append({
|
|
"trakt_id": tid,
|
|
"imdb_id": m["ids"].get("imdb"),
|
|
"tmdb_id": m["ids"].get("tmdb"),
|
|
"title": m["title"],
|
|
"year": m.get("year"),
|
|
"watched_at": w.get("last_watched_at"),
|
|
"plays": w.get("plays", 1),
|
|
"current_rating": r,
|
|
})
|
|
cache_set(cache_key, all_movies)
|
|
|
|
# Exclude skipped IDs (sent by client from localStorage)
|
|
excluded = {int(x) for x in exclude.split(",") if x.strip().lstrip("-").isdigit()}
|
|
|
|
# Filter
|
|
movies = [m for m in all_movies if m["trakt_id"] not in excluded]
|
|
if filter_type == "unrated":
|
|
movies = [m for m in movies if m["current_rating"] is None]
|
|
elif filter_type == "10":
|
|
movies = [m for m in movies if m["current_rating"] == 10]
|
|
|
|
# Sort
|
|
if sort == "watched_at":
|
|
movies = sorted(movies, key=lambda x: x["watched_at"] or "", reverse=True)
|
|
elif sort == "title":
|
|
movies = sorted(movies, key=lambda x: x["title"].lower())
|
|
elif sort == "year":
|
|
movies = sorted(movies, key=lambda x: x["year"] or 0, reverse=True)
|
|
|
|
total = len(movies)
|
|
start = (page - 1) * per_page
|
|
page_movies = movies[start: start + per_page]
|
|
|
|
tmdb = TMDBClient(TMDB_API_KEY)
|
|
enriched = await tmdb.enrich_movies(page_movies)
|
|
|
|
return {
|
|
"movies": enriched,
|
|
"total": total,
|
|
"page": page,
|
|
"per_page": per_page,
|
|
"total_pages": max(1, (total + per_page - 1) // per_page),
|
|
}
|
|
|
|
|
|
class RateBody(BaseModel):
|
|
rating: int
|
|
|
|
|
|
@app.post("/api/rate/{trakt_id}")
|
|
async def rate_movie(request: Request, trakt_id: int, body: RateBody):
|
|
token = request.session.get("access_token")
|
|
if not token:
|
|
raise HTTPException(401, "Not authenticated")
|
|
if not (1 <= body.rating <= 10):
|
|
raise HTTPException(400, "Rating must be between 1 and 10")
|
|
|
|
trakt = TraktClient(TRAKT_CLIENT_ID, token)
|
|
await trakt.rate_movie(trakt_id, body.rating)
|
|
cache_del(f"movies_{token[:16]}")
|
|
return {"success": True}
|
|
|
|
|
|
@app.delete("/api/rate/{trakt_id}")
|
|
async def remove_rating(request: Request, trakt_id: int):
|
|
token = request.session.get("access_token")
|
|
if not token:
|
|
raise HTTPException(401, "Not authenticated")
|
|
|
|
trakt = TraktClient(TRAKT_CLIENT_ID, token)
|
|
await trakt.remove_rating(trakt_id)
|
|
cache_del(f"movies_{token[:16]}")
|
|
return {"success": True}
|
|
|
|
|
|
@app.delete("/api/rates/all10")
|
|
async def remove_all_10(request: Request):
|
|
token = request.session.get("access_token")
|
|
if not token:
|
|
raise HTTPException(401, "Not authenticated")
|
|
|
|
trakt = TraktClient(TRAKT_CLIENT_ID, token)
|
|
_, ratings = await trakt.get_watched_and_ratings()
|
|
ids = [{"ids": r["movie"]["ids"]} for r in ratings if r["rating"] == 10]
|
|
|
|
if not ids:
|
|
return {"deleted": 0}
|
|
|
|
import asyncio, httpx
|
|
async with httpx.AsyncClient(timeout=60) as client:
|
|
tasks = [
|
|
client.post(
|
|
f"{TraktClient.BASE_URL}/sync/ratings/remove",
|
|
headers=trakt.headers,
|
|
json={"movies": ids[i:i + 100]},
|
|
)
|
|
for i in range(0, len(ids), 100)
|
|
]
|
|
responses = await asyncio.gather(*tasks)
|
|
|
|
cache_del(f"movies_{token[:16]}")
|
|
return {"deleted": len(ids)}
|
|
|
|
|
|
app.mount("/static", StaticFiles(directory="static"), name="static")
|