""" Requêtes SQL en lecture seule pour le dashboard web. Toutes les fonctions reçoivent une connexion SQLite ouverte (pattern identique à repository.py) et retournent des structures Python simples (dict, list). L'appelant est responsable de l'ouverture et fermeture de la connexion. """ import sqlite3 def get_dashboard_stats(conn: sqlite3.Connection) -> dict: """Statistiques globales pour la page d'accueil. Returns: dict avec les clés : - total_receipts : int - total_spent : float - total_items : int - distinct_products : int - receipts_by_store : dict[str, int] - spent_by_store : dict[str, float] - date_range : dict {min, max} ou {min: None, max: None} """ # Statistiques par enseigne rows = conn.execute( "SELECT store, COUNT(*) AS nb, SUM(total) AS spent FROM receipts GROUP BY store" ).fetchall() receipts_by_store = {row["store"]: row["nb"] for row in rows} spent_by_store = {row["store"]: round(row["spent"], 2) for row in rows} total_receipts = sum(receipts_by_store.values()) total_spent = round(sum(row["spent"] for row in rows), 2) if rows else 0.0 # Statistiques articles item_stats = conn.execute( """ SELECT COUNT(*) AS total_items, COUNT(DISTINCT name_normalized) AS distinct_products FROM items """ ).fetchone() # Plage de dates date_row = conn.execute( "SELECT MIN(date) AS d_min, MAX(date) AS d_max FROM receipts" ).fetchone() return { "total_receipts": total_receipts, "total_spent": total_spent, "total_items": item_stats["total_items"], "distinct_products": item_stats["distinct_products"], "receipts_by_store": receipts_by_store, "spent_by_store": spent_by_store, "date_range": {"min": date_row["d_min"], "max": date_row["d_max"]}, } def get_monthly_spending(conn: sqlite3.Connection) -> list[dict]: """Dépenses mensuelles par enseigne, pour le graphique Chart.js. Returns: Liste de dicts {month: "2026-01", store: "picnic", total: 45.20}, triée par mois puis enseigne. """ rows = conn.execute( """ SELECT substr(date, 1, 7) AS month, store, ROUND(SUM(total), 2) AS total FROM receipts GROUP BY month, store ORDER BY month, store """ ).fetchall() return [{"month": r["month"], "store": r["store"], "total": r["total"]} for r in rows] def get_compare_prices(conn: sqlite3.Connection) -> list[dict]: """Comparaison de prix entre Picnic et Leclerc pour les produits communs. Combine deux sources : - Correspondances exactes (même name_normalized dans les deux enseignes) - Correspondances fuzzy validées dans product_matches (status='validated') Les doublons éventuels (un produit déjà en exact ET en fuzzy) sont éliminés par UNION (qui déduplique) + sélection par nom picnic. Returns: Liste de dicts {name, price_picnic, price_leclerc, diff, diff_pct, match_type}. diff = price_leclerc - price_picnic (positif = Leclerc plus cher) diff_pct = diff / MIN(price_picnic, price_leclerc) * 100 match_type = 'exact' ou 'fuzzy' """ rows = conn.execute( """ WITH avg_by_store AS ( SELECT name_normalized, store, ROUND(AVG(unit_price), 2) AS avg_price FROM price_history WHERE name_normalized IS NOT NULL GROUP BY name_normalized, store ), exact_matches AS ( SELECT a.name_normalized AS name, a.name_normalized AS name_display, a.avg_price AS price_picnic, b.avg_price AS price_leclerc, ROUND(b.avg_price - a.avg_price, 2) AS diff, ROUND( (b.avg_price - a.avg_price) / MIN(a.avg_price, b.avg_price) * 100 , 1) AS diff_pct, 'exact' AS match_type FROM avg_by_store a JOIN avg_by_store b ON a.name_normalized = b.name_normalized AND a.store = 'picnic' AND b.store = 'leclerc' ), fuzzy_matches AS ( SELECT pm.name_picnic AS name, pm.name_picnic || ' ≈ ' || pm.name_leclerc AS name_display, ap_p.avg_price AS price_picnic, ap_l.avg_price AS price_leclerc, ROUND(ap_l.avg_price - ap_p.avg_price, 2) AS diff, ROUND( (ap_l.avg_price - ap_p.avg_price) / MIN(ap_p.avg_price, ap_l.avg_price) * 100 , 1) AS diff_pct, 'fuzzy' AS match_type FROM product_matches pm JOIN avg_by_store ap_p ON ap_p.name_normalized = pm.name_picnic AND ap_p.store = 'picnic' JOIN avg_by_store ap_l ON ap_l.name_normalized = pm.name_leclerc AND ap_l.store = 'leclerc' WHERE pm.status = 'validated' -- Exclure si déjà présent en exact match AND pm.name_picnic NOT IN (SELECT name FROM exact_matches) ) SELECT name, name_display, price_picnic, price_leclerc, diff, diff_pct, match_type FROM ( SELECT name, name_display, price_picnic, price_leclerc, diff, diff_pct, match_type FROM exact_matches UNION ALL SELECT name, name_display, price_picnic, price_leclerc, diff, diff_pct, match_type FROM fuzzy_matches ) ORDER BY ABS(diff) DESC """ ).fetchall() return [ { "name": r["name"], "name_display": r["name_display"], "price_picnic": r["price_picnic"], "price_leclerc": r["price_leclerc"], "diff": r["diff"], "diff_pct": r["diff_pct"], "match_type": r["match_type"], } for r in rows ] def get_product_history(conn: sqlite3.Connection, name: str) -> dict | None: """Historique des prix d'un produit normalisé. Args: conn: Connexion SQLite ouverte. name: Valeur de name_normalized à rechercher (sensible à la casse). Returns: dict {name, min_price, max_price, avg_price, history: list[dict]} ou None si le produit est inconnu. Chaque entrée de history : {date, store, unit_price, quantity, unit}. """ # Statistiques globales stats = conn.execute( """ SELECT name_normalized, ROUND(MIN(unit_price), 2) AS min_price, ROUND(MAX(unit_price), 2) AS max_price, ROUND(AVG(unit_price), 2) AS avg_price FROM price_history WHERE name_normalized = ? """, (name,), ).fetchone() if stats is None or stats["name_normalized"] is None: return None # Historique chronologique rows = conn.execute( """ SELECT date, store, unit_price, quantity, unit FROM price_history WHERE name_normalized = ? ORDER BY date """, (name,), ).fetchall() return { "name": stats["name_normalized"], "min_price": stats["min_price"], "max_price": stats["max_price"], "avg_price": stats["avg_price"], "history": [ { "date": r["date"], "store": r["store"], "unit_price": r["unit_price"], "quantity": r["quantity"], "unit": r["unit"], } for r in rows ], } def get_all_receipts(conn: sqlite3.Connection) -> list[dict]: """Liste tous les tickets avec le nombre d'articles associés. Returns: Liste de dicts {id, store, date, total, delivery_fee, order_id, nb_items}, triée par date décroissante (le plus récent en premier). """ rows = conn.execute( """ SELECT r.id, r.store, r.date, r.total, r.delivery_fee, r.order_id, COUNT(i.id) AS nb_items FROM receipts r LEFT JOIN items i ON i.receipt_id = r.id GROUP BY r.id ORDER BY r.date DESC, r.id DESC """ ).fetchall() return [ { "id": r["id"], "store": r["store"], "date": r["date"], "total": r["total"], "delivery_fee": r["delivery_fee"], "order_id": r["order_id"], "nb_items": r["nb_items"], } for r in rows ] def get_receipt_detail(conn: sqlite3.Connection, receipt_id: int) -> dict | None: """Détail complet d'un ticket et de ses articles. Args: conn: Connexion SQLite ouverte. receipt_id: Id du ticket à récupérer. Returns: dict avec les champs du ticket + items: list[dict], ou None si introuvable. """ receipt = conn.execute( "SELECT id, store, date, total, delivery_fee, order_id FROM receipts WHERE id = ?", (receipt_id,), ).fetchone() if receipt is None: return None items = conn.execute( """ SELECT id, name_raw, name_normalized, category, quantity, unit, unit_price, total_price FROM items WHERE receipt_id = ? ORDER BY id """, (receipt_id,), ).fetchall() return { "id": receipt["id"], "store": receipt["store"], "date": receipt["date"], "total": receipt["total"], "delivery_fee": receipt["delivery_fee"], "order_id": receipt["order_id"], "items": [ { "id": i["id"], "name_raw": i["name_raw"], "name_normalized": i["name_normalized"], "category": i["category"], "quantity": i["quantity"], "unit": i["unit"], "unit_price": i["unit_price"], "total_price": i["total_price"], } for i in items ], } def get_pending_matches(conn: sqlite3.Connection) -> list[dict]: """Paires en attente de validation, avec prix moyens des deux enseignes. Returns: Liste de dicts {id, name_picnic, price_picnic, name_leclerc, price_leclerc, score}. price_picnic / price_leclerc : prix moyen unitaire de ce produit dans la vue price_history (None si aucune occurrence pour ce nom normalisé). """ rows = conn.execute( """ SELECT pm.id, pm.name_picnic, pm.name_leclerc, pm.score, ROUND(AVG(CASE WHEN ph.store='picnic' THEN ph.unit_price END), 2) AS price_picnic, ROUND(AVG(CASE WHEN ph.store='leclerc' THEN ph.unit_price END), 2) AS price_leclerc FROM product_matches pm LEFT JOIN price_history ph ON ph.name_normalized IN (pm.name_picnic, pm.name_leclerc) WHERE pm.status = 'pending' GROUP BY pm.id ORDER BY pm.score DESC """ ).fetchall() return [ { "id": r["id"], "name_picnic": r["name_picnic"], "name_leclerc": r["name_leclerc"], "score": r["score"], "price_picnic": r["price_picnic"], "price_leclerc": r["price_leclerc"], } for r in rows ] def get_validated_matches(conn: sqlite3.Connection) -> list[dict]: """Paires validées pour enrichir get_compare_prices. Returns: Liste de dicts {name_picnic, price_picnic, name_leclerc, price_leclerc, diff, diff_pct}. """ rows = conn.execute( """ WITH avg_prices AS ( SELECT name_normalized, store, ROUND(AVG(unit_price), 2) AS avg_price FROM price_history WHERE name_normalized IS NOT NULL GROUP BY name_normalized, store ) SELECT pm.id, pm.name_picnic, pm.name_leclerc, ap_p.avg_price AS price_picnic, ap_l.avg_price AS price_leclerc, ROUND(ap_l.avg_price - ap_p.avg_price, 2) AS diff, ROUND( (ap_l.avg_price - ap_p.avg_price) / MIN(ap_p.avg_price, ap_l.avg_price) * 100 , 1) AS diff_pct FROM product_matches pm JOIN avg_prices ap_p ON ap_p.name_normalized = pm.name_picnic AND ap_p.store = 'picnic' JOIN avg_prices ap_l ON ap_l.name_normalized = pm.name_leclerc AND ap_l.store = 'leclerc' WHERE pm.status = 'validated' ORDER BY ABS(ap_l.avg_price - ap_p.avg_price) DESC """ ).fetchall() return [ { "name_picnic": r["name_picnic"], "name_leclerc": r["name_leclerc"], "price_picnic": r["price_picnic"], "price_leclerc": r["price_leclerc"], "diff": r["diff"], "diff_pct": r["diff_pct"], } for r in rows ] def get_product_list(conn: sqlite3.Connection) -> list[str]: """Liste tous les noms normalisés distincts (non NULL) pour le sélecteur. Returns: Liste de str triée alphabétiquement. """ rows = conn.execute( """ SELECT DISTINCT name_normalized FROM items WHERE name_normalized IS NOT NULL ORDER BY name_normalized """ ).fetchall() return [r["name_normalized"] for r in rows]