Files

423 lines
14 KiB
Python
Raw Permalink Normal View History

"""
Requêtes SQL en lecture seule pour le dashboard web.
Toutes les fonctions reçoivent une connexion SQLite ouverte (pattern identique
à repository.py) et retournent des structures Python simples (dict, list).
L'appelant est responsable de l'ouverture et fermeture de la connexion.
"""
import sqlite3
def get_dashboard_stats(conn: sqlite3.Connection) -> dict:
"""Statistiques globales pour la page d'accueil.
Returns:
dict avec les clés :
- total_receipts : int
- total_spent : float
- total_items : int
- distinct_products : int
- receipts_by_store : dict[str, int]
- spent_by_store : dict[str, float]
- date_range : dict {min, max} ou {min: None, max: None}
"""
# Statistiques par enseigne
rows = conn.execute(
"SELECT store, COUNT(*) AS nb, SUM(total) AS spent FROM receipts GROUP BY store"
).fetchall()
receipts_by_store = {row["store"]: row["nb"] for row in rows}
spent_by_store = {row["store"]: round(row["spent"], 2) for row in rows}
total_receipts = sum(receipts_by_store.values())
total_spent = round(sum(row["spent"] for row in rows), 2) if rows else 0.0
# Statistiques articles
item_stats = conn.execute(
"""
SELECT
COUNT(*) AS total_items,
COUNT(DISTINCT name_normalized) AS distinct_products
FROM items
"""
).fetchone()
# Plage de dates
date_row = conn.execute(
"SELECT MIN(date) AS d_min, MAX(date) AS d_max FROM receipts"
).fetchone()
return {
"total_receipts": total_receipts,
"total_spent": total_spent,
"total_items": item_stats["total_items"],
"distinct_products": item_stats["distinct_products"],
"receipts_by_store": receipts_by_store,
"spent_by_store": spent_by_store,
"date_range": {"min": date_row["d_min"], "max": date_row["d_max"]},
}
def get_monthly_spending(conn: sqlite3.Connection) -> list[dict]:
"""Dépenses mensuelles par enseigne, pour le graphique Chart.js.
Returns:
Liste de dicts {month: "2026-01", store: "picnic", total: 45.20},
triée par mois puis enseigne.
"""
rows = conn.execute(
"""
SELECT
substr(date, 1, 7) AS month,
store,
ROUND(SUM(total), 2) AS total
FROM receipts
GROUP BY month, store
ORDER BY month, store
"""
).fetchall()
return [{"month": r["month"], "store": r["store"], "total": r["total"]} for r in rows]
def get_compare_prices(conn: sqlite3.Connection) -> list[dict]:
"""Comparaison de prix entre Picnic et Leclerc pour les produits communs.
Combine deux sources :
- Correspondances exactes (même name_normalized dans les deux enseignes)
- Correspondances fuzzy validées dans product_matches (status='validated')
Les doublons éventuels (un produit déjà en exact ET en fuzzy) sont éliminés
par UNION (qui déduplique) + sélection par nom picnic.
Returns:
Liste de dicts {name, price_picnic, price_leclerc, diff, diff_pct, match_type}.
diff = price_leclerc - price_picnic (positif = Leclerc plus cher)
diff_pct = diff / MIN(price_picnic, price_leclerc) * 100
match_type = 'exact' ou 'fuzzy'
"""
rows = conn.execute(
"""
WITH avg_by_store AS (
SELECT
name_normalized,
store,
ROUND(AVG(unit_price), 2) AS avg_price
FROM price_history
WHERE name_normalized IS NOT NULL
GROUP BY name_normalized, store
),
exact_matches AS (
SELECT
a.name_normalized AS name,
a.name_normalized AS name_display,
a.avg_price AS price_picnic,
b.avg_price AS price_leclerc,
ROUND(b.avg_price - a.avg_price, 2) AS diff,
ROUND(
(b.avg_price - a.avg_price)
/ MIN(a.avg_price, b.avg_price) * 100
, 1) AS diff_pct,
'exact' AS match_type
FROM avg_by_store a
JOIN avg_by_store b
ON a.name_normalized = b.name_normalized
AND a.store = 'picnic'
AND b.store = 'leclerc'
),
fuzzy_matches AS (
SELECT
pm.name_picnic AS name,
pm.name_picnic || '' || pm.name_leclerc AS name_display,
ap_p.avg_price AS price_picnic,
ap_l.avg_price AS price_leclerc,
ROUND(ap_l.avg_price - ap_p.avg_price, 2) AS diff,
ROUND(
(ap_l.avg_price - ap_p.avg_price)
/ MIN(ap_p.avg_price, ap_l.avg_price) * 100
, 1) AS diff_pct,
'fuzzy' AS match_type
FROM product_matches pm
JOIN avg_by_store ap_p
ON ap_p.name_normalized = pm.name_picnic AND ap_p.store = 'picnic'
JOIN avg_by_store ap_l
ON ap_l.name_normalized = pm.name_leclerc AND ap_l.store = 'leclerc'
WHERE pm.status = 'validated'
-- Exclure si déjà présent en exact match
AND pm.name_picnic NOT IN (SELECT name FROM exact_matches)
)
SELECT name, name_display, price_picnic, price_leclerc, diff, diff_pct, match_type
FROM (
SELECT name, name_display, price_picnic, price_leclerc, diff, diff_pct, match_type
FROM exact_matches
UNION ALL
SELECT name, name_display, price_picnic, price_leclerc, diff, diff_pct, match_type
FROM fuzzy_matches
)
ORDER BY ABS(diff) DESC
"""
).fetchall()
return [
{
"name": r["name"],
"name_display": r["name_display"],
"price_picnic": r["price_picnic"],
"price_leclerc": r["price_leclerc"],
"diff": r["diff"],
"diff_pct": r["diff_pct"],
"match_type": r["match_type"],
}
for r in rows
]
def get_product_history(conn: sqlite3.Connection, name: str) -> dict | None:
"""Historique des prix d'un produit normalisé.
Args:
conn: Connexion SQLite ouverte.
name: Valeur de name_normalized à rechercher (sensible à la casse).
Returns:
dict {name, min_price, max_price, avg_price, history: list[dict]}
ou None si le produit est inconnu.
Chaque entrée de history : {date, store, unit_price, quantity, unit}.
"""
# Statistiques globales
stats = conn.execute(
"""
SELECT
name_normalized,
ROUND(MIN(unit_price), 2) AS min_price,
ROUND(MAX(unit_price), 2) AS max_price,
ROUND(AVG(unit_price), 2) AS avg_price
FROM price_history
WHERE name_normalized = ?
""",
(name,),
).fetchone()
if stats is None or stats["name_normalized"] is None:
return None
# Historique chronologique
rows = conn.execute(
"""
SELECT date, store, unit_price, quantity, unit
FROM price_history
WHERE name_normalized = ?
ORDER BY date
""",
(name,),
).fetchall()
return {
"name": stats["name_normalized"],
"min_price": stats["min_price"],
"max_price": stats["max_price"],
"avg_price": stats["avg_price"],
"history": [
{
"date": r["date"],
"store": r["store"],
"unit_price": r["unit_price"],
"quantity": r["quantity"],
"unit": r["unit"],
}
for r in rows
],
}
def get_all_receipts(conn: sqlite3.Connection) -> list[dict]:
"""Liste tous les tickets avec le nombre d'articles associés.
Returns:
Liste de dicts {id, store, date, total, delivery_fee, order_id, nb_items},
triée par date décroissante (le plus récent en premier).
"""
rows = conn.execute(
"""
SELECT
r.id,
r.store,
r.date,
r.total,
r.delivery_fee,
r.order_id,
COUNT(i.id) AS nb_items
FROM receipts r
LEFT JOIN items i ON i.receipt_id = r.id
GROUP BY r.id
ORDER BY r.date DESC, r.id DESC
"""
).fetchall()
return [
{
"id": r["id"],
"store": r["store"],
"date": r["date"],
"total": r["total"],
"delivery_fee": r["delivery_fee"],
"order_id": r["order_id"],
"nb_items": r["nb_items"],
}
for r in rows
]
def get_receipt_detail(conn: sqlite3.Connection, receipt_id: int) -> dict | None:
"""Détail complet d'un ticket et de ses articles.
Args:
conn: Connexion SQLite ouverte.
receipt_id: Id du ticket à récupérer.
Returns:
dict avec les champs du ticket + items: list[dict], ou None si introuvable.
"""
receipt = conn.execute(
"SELECT id, store, date, total, delivery_fee, order_id FROM receipts WHERE id = ?",
(receipt_id,),
).fetchone()
if receipt is None:
return None
items = conn.execute(
"""
SELECT id, name_raw, name_normalized, category, quantity, unit, unit_price, total_price
FROM items
WHERE receipt_id = ?
ORDER BY id
""",
(receipt_id,),
).fetchall()
return {
"id": receipt["id"],
"store": receipt["store"],
"date": receipt["date"],
"total": receipt["total"],
"delivery_fee": receipt["delivery_fee"],
"order_id": receipt["order_id"],
"items": [
{
"id": i["id"],
"name_raw": i["name_raw"],
"name_normalized": i["name_normalized"],
"category": i["category"],
"quantity": i["quantity"],
"unit": i["unit"],
"unit_price": i["unit_price"],
"total_price": i["total_price"],
}
for i in items
],
}
def get_pending_matches(conn: sqlite3.Connection) -> list[dict]:
"""Paires en attente de validation, avec prix moyens des deux enseignes.
Returns:
Liste de dicts {id, name_picnic, price_picnic, name_leclerc, price_leclerc, score}.
price_picnic / price_leclerc : prix moyen unitaire de ce produit dans la vue
price_history (None si aucune occurrence pour ce nom normalisé).
"""
rows = conn.execute(
"""
SELECT
pm.id,
pm.name_picnic,
pm.name_leclerc,
pm.score,
ROUND(AVG(CASE WHEN ph.store='picnic' THEN ph.unit_price END), 2) AS price_picnic,
ROUND(AVG(CASE WHEN ph.store='leclerc' THEN ph.unit_price END), 2) AS price_leclerc
FROM product_matches pm
LEFT JOIN price_history ph
ON ph.name_normalized IN (pm.name_picnic, pm.name_leclerc)
WHERE pm.status = 'pending'
GROUP BY pm.id
ORDER BY pm.score DESC
"""
).fetchall()
return [
{
"id": r["id"],
"name_picnic": r["name_picnic"],
"name_leclerc": r["name_leclerc"],
"score": r["score"],
"price_picnic": r["price_picnic"],
"price_leclerc": r["price_leclerc"],
}
for r in rows
]
def get_validated_matches(conn: sqlite3.Connection) -> list[dict]:
"""Paires validées pour enrichir get_compare_prices.
Returns:
Liste de dicts {name_picnic, price_picnic, name_leclerc, price_leclerc, diff, diff_pct}.
"""
rows = conn.execute(
"""
WITH avg_prices AS (
SELECT name_normalized, store, ROUND(AVG(unit_price), 2) AS avg_price
FROM price_history
WHERE name_normalized IS NOT NULL
GROUP BY name_normalized, store
)
SELECT
pm.id,
pm.name_picnic,
pm.name_leclerc,
ap_p.avg_price AS price_picnic,
ap_l.avg_price AS price_leclerc,
ROUND(ap_l.avg_price - ap_p.avg_price, 2) AS diff,
ROUND(
(ap_l.avg_price - ap_p.avg_price)
/ MIN(ap_p.avg_price, ap_l.avg_price) * 100
, 1) AS diff_pct
FROM product_matches pm
JOIN avg_prices ap_p ON ap_p.name_normalized = pm.name_picnic AND ap_p.store = 'picnic'
JOIN avg_prices ap_l ON ap_l.name_normalized = pm.name_leclerc AND ap_l.store = 'leclerc'
WHERE pm.status = 'validated'
ORDER BY ABS(ap_l.avg_price - ap_p.avg_price) DESC
"""
).fetchall()
return [
{
"name_picnic": r["name_picnic"],
"name_leclerc": r["name_leclerc"],
"price_picnic": r["price_picnic"],
"price_leclerc": r["price_leclerc"],
"diff": r["diff"],
"diff_pct": r["diff_pct"],
}
for r in rows
]
def get_product_list(conn: sqlite3.Connection) -> list[str]:
"""Liste tous les noms normalisés distincts (non NULL) pour le sélecteur.
Returns:
Liste de str triée alphabétiquement.
"""
rows = conn.execute(
"""
SELECT DISTINCT name_normalized
FROM items
WHERE name_normalized IS NOT NULL
ORDER BY name_normalized
"""
).fetchall()
return [r["name_normalized"] for r in rows]