Nouvelle table product_matches (status: pending/validated/rejected).
Matching via RapidFuzz token_sort_ratio, seuil configurable (défaut 85%).
Workflow :
1. python -m tickettracker.cli match [--threshold 85]
→ calcule et stocke les paires candidates
2. http://localhost:8000/matches
→ l'utilisateur valide ou rejette chaque paire
3. La comparaison de prix enrichie avec les paires validées
Nouvelles dépendances : rapidfuzz, watchdog (requirements.txt).
10 tests ajoutés (test_matcher.py), tous passent.
Suite complète : 129 passent, 1 xfail, 0 échec.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
423 lines
14 KiB
Python
423 lines
14 KiB
Python
"""
|
|
Requêtes SQL en lecture seule pour le dashboard web.
|
|
|
|
Toutes les fonctions reçoivent une connexion SQLite ouverte (pattern identique
|
|
à repository.py) et retournent des structures Python simples (dict, list).
|
|
L'appelant est responsable de l'ouverture et fermeture de la connexion.
|
|
"""
|
|
|
|
import sqlite3
|
|
|
|
|
|
def get_dashboard_stats(conn: sqlite3.Connection) -> dict:
|
|
"""Statistiques globales pour la page d'accueil.
|
|
|
|
Returns:
|
|
dict avec les clés :
|
|
- total_receipts : int
|
|
- total_spent : float
|
|
- total_items : int
|
|
- distinct_products : int
|
|
- receipts_by_store : dict[str, int]
|
|
- spent_by_store : dict[str, float]
|
|
- date_range : dict {min, max} ou {min: None, max: None}
|
|
"""
|
|
# Statistiques par enseigne
|
|
rows = conn.execute(
|
|
"SELECT store, COUNT(*) AS nb, SUM(total) AS spent FROM receipts GROUP BY store"
|
|
).fetchall()
|
|
|
|
receipts_by_store = {row["store"]: row["nb"] for row in rows}
|
|
spent_by_store = {row["store"]: round(row["spent"], 2) for row in rows}
|
|
total_receipts = sum(receipts_by_store.values())
|
|
total_spent = round(sum(row["spent"] for row in rows), 2) if rows else 0.0
|
|
|
|
# Statistiques articles
|
|
item_stats = conn.execute(
|
|
"""
|
|
SELECT
|
|
COUNT(*) AS total_items,
|
|
COUNT(DISTINCT name_normalized) AS distinct_products
|
|
FROM items
|
|
"""
|
|
).fetchone()
|
|
|
|
# Plage de dates
|
|
date_row = conn.execute(
|
|
"SELECT MIN(date) AS d_min, MAX(date) AS d_max FROM receipts"
|
|
).fetchone()
|
|
|
|
return {
|
|
"total_receipts": total_receipts,
|
|
"total_spent": total_spent,
|
|
"total_items": item_stats["total_items"],
|
|
"distinct_products": item_stats["distinct_products"],
|
|
"receipts_by_store": receipts_by_store,
|
|
"spent_by_store": spent_by_store,
|
|
"date_range": {"min": date_row["d_min"], "max": date_row["d_max"]},
|
|
}
|
|
|
|
|
|
def get_monthly_spending(conn: sqlite3.Connection) -> list[dict]:
|
|
"""Dépenses mensuelles par enseigne, pour le graphique Chart.js.
|
|
|
|
Returns:
|
|
Liste de dicts {month: "2026-01", store: "picnic", total: 45.20},
|
|
triée par mois puis enseigne.
|
|
"""
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT
|
|
substr(date, 1, 7) AS month,
|
|
store,
|
|
ROUND(SUM(total), 2) AS total
|
|
FROM receipts
|
|
GROUP BY month, store
|
|
ORDER BY month, store
|
|
"""
|
|
).fetchall()
|
|
|
|
return [{"month": r["month"], "store": r["store"], "total": r["total"]} for r in rows]
|
|
|
|
|
|
def get_compare_prices(conn: sqlite3.Connection) -> list[dict]:
|
|
"""Comparaison de prix entre Picnic et Leclerc pour les produits communs.
|
|
|
|
Combine deux sources :
|
|
- Correspondances exactes (même name_normalized dans les deux enseignes)
|
|
- Correspondances fuzzy validées dans product_matches (status='validated')
|
|
|
|
Les doublons éventuels (un produit déjà en exact ET en fuzzy) sont éliminés
|
|
par UNION (qui déduplique) + sélection par nom picnic.
|
|
|
|
Returns:
|
|
Liste de dicts {name, price_picnic, price_leclerc, diff, diff_pct, match_type}.
|
|
diff = price_leclerc - price_picnic (positif = Leclerc plus cher)
|
|
diff_pct = diff / MIN(price_picnic, price_leclerc) * 100
|
|
match_type = 'exact' ou 'fuzzy'
|
|
"""
|
|
rows = conn.execute(
|
|
"""
|
|
WITH avg_by_store AS (
|
|
SELECT
|
|
name_normalized,
|
|
store,
|
|
ROUND(AVG(unit_price), 2) AS avg_price
|
|
FROM price_history
|
|
WHERE name_normalized IS NOT NULL
|
|
GROUP BY name_normalized, store
|
|
),
|
|
exact_matches AS (
|
|
SELECT
|
|
a.name_normalized AS name,
|
|
a.name_normalized AS name_display,
|
|
a.avg_price AS price_picnic,
|
|
b.avg_price AS price_leclerc,
|
|
ROUND(b.avg_price - a.avg_price, 2) AS diff,
|
|
ROUND(
|
|
(b.avg_price - a.avg_price)
|
|
/ MIN(a.avg_price, b.avg_price) * 100
|
|
, 1) AS diff_pct,
|
|
'exact' AS match_type
|
|
FROM avg_by_store a
|
|
JOIN avg_by_store b
|
|
ON a.name_normalized = b.name_normalized
|
|
AND a.store = 'picnic'
|
|
AND b.store = 'leclerc'
|
|
),
|
|
fuzzy_matches AS (
|
|
SELECT
|
|
pm.name_picnic AS name,
|
|
pm.name_picnic || ' ≈ ' || pm.name_leclerc AS name_display,
|
|
ap_p.avg_price AS price_picnic,
|
|
ap_l.avg_price AS price_leclerc,
|
|
ROUND(ap_l.avg_price - ap_p.avg_price, 2) AS diff,
|
|
ROUND(
|
|
(ap_l.avg_price - ap_p.avg_price)
|
|
/ MIN(ap_p.avg_price, ap_l.avg_price) * 100
|
|
, 1) AS diff_pct,
|
|
'fuzzy' AS match_type
|
|
FROM product_matches pm
|
|
JOIN avg_by_store ap_p
|
|
ON ap_p.name_normalized = pm.name_picnic AND ap_p.store = 'picnic'
|
|
JOIN avg_by_store ap_l
|
|
ON ap_l.name_normalized = pm.name_leclerc AND ap_l.store = 'leclerc'
|
|
WHERE pm.status = 'validated'
|
|
-- Exclure si déjà présent en exact match
|
|
AND pm.name_picnic NOT IN (SELECT name FROM exact_matches)
|
|
)
|
|
SELECT name, name_display, price_picnic, price_leclerc, diff, diff_pct, match_type
|
|
FROM (
|
|
SELECT name, name_display, price_picnic, price_leclerc, diff, diff_pct, match_type
|
|
FROM exact_matches
|
|
UNION ALL
|
|
SELECT name, name_display, price_picnic, price_leclerc, diff, diff_pct, match_type
|
|
FROM fuzzy_matches
|
|
)
|
|
ORDER BY ABS(diff) DESC
|
|
"""
|
|
).fetchall()
|
|
|
|
return [
|
|
{
|
|
"name": r["name"],
|
|
"name_display": r["name_display"],
|
|
"price_picnic": r["price_picnic"],
|
|
"price_leclerc": r["price_leclerc"],
|
|
"diff": r["diff"],
|
|
"diff_pct": r["diff_pct"],
|
|
"match_type": r["match_type"],
|
|
}
|
|
for r in rows
|
|
]
|
|
|
|
|
|
def get_product_history(conn: sqlite3.Connection, name: str) -> dict | None:
|
|
"""Historique des prix d'un produit normalisé.
|
|
|
|
Args:
|
|
conn: Connexion SQLite ouverte.
|
|
name: Valeur de name_normalized à rechercher (sensible à la casse).
|
|
|
|
Returns:
|
|
dict {name, min_price, max_price, avg_price, history: list[dict]}
|
|
ou None si le produit est inconnu.
|
|
Chaque entrée de history : {date, store, unit_price, quantity, unit}.
|
|
"""
|
|
# Statistiques globales
|
|
stats = conn.execute(
|
|
"""
|
|
SELECT
|
|
name_normalized,
|
|
ROUND(MIN(unit_price), 2) AS min_price,
|
|
ROUND(MAX(unit_price), 2) AS max_price,
|
|
ROUND(AVG(unit_price), 2) AS avg_price
|
|
FROM price_history
|
|
WHERE name_normalized = ?
|
|
""",
|
|
(name,),
|
|
).fetchone()
|
|
|
|
if stats is None or stats["name_normalized"] is None:
|
|
return None
|
|
|
|
# Historique chronologique
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT date, store, unit_price, quantity, unit
|
|
FROM price_history
|
|
WHERE name_normalized = ?
|
|
ORDER BY date
|
|
""",
|
|
(name,),
|
|
).fetchall()
|
|
|
|
return {
|
|
"name": stats["name_normalized"],
|
|
"min_price": stats["min_price"],
|
|
"max_price": stats["max_price"],
|
|
"avg_price": stats["avg_price"],
|
|
"history": [
|
|
{
|
|
"date": r["date"],
|
|
"store": r["store"],
|
|
"unit_price": r["unit_price"],
|
|
"quantity": r["quantity"],
|
|
"unit": r["unit"],
|
|
}
|
|
for r in rows
|
|
],
|
|
}
|
|
|
|
|
|
def get_all_receipts(conn: sqlite3.Connection) -> list[dict]:
|
|
"""Liste tous les tickets avec le nombre d'articles associés.
|
|
|
|
Returns:
|
|
Liste de dicts {id, store, date, total, delivery_fee, order_id, nb_items},
|
|
triée par date décroissante (le plus récent en premier).
|
|
"""
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT
|
|
r.id,
|
|
r.store,
|
|
r.date,
|
|
r.total,
|
|
r.delivery_fee,
|
|
r.order_id,
|
|
COUNT(i.id) AS nb_items
|
|
FROM receipts r
|
|
LEFT JOIN items i ON i.receipt_id = r.id
|
|
GROUP BY r.id
|
|
ORDER BY r.date DESC, r.id DESC
|
|
"""
|
|
).fetchall()
|
|
|
|
return [
|
|
{
|
|
"id": r["id"],
|
|
"store": r["store"],
|
|
"date": r["date"],
|
|
"total": r["total"],
|
|
"delivery_fee": r["delivery_fee"],
|
|
"order_id": r["order_id"],
|
|
"nb_items": r["nb_items"],
|
|
}
|
|
for r in rows
|
|
]
|
|
|
|
|
|
def get_receipt_detail(conn: sqlite3.Connection, receipt_id: int) -> dict | None:
|
|
"""Détail complet d'un ticket et de ses articles.
|
|
|
|
Args:
|
|
conn: Connexion SQLite ouverte.
|
|
receipt_id: Id du ticket à récupérer.
|
|
|
|
Returns:
|
|
dict avec les champs du ticket + items: list[dict], ou None si introuvable.
|
|
"""
|
|
receipt = conn.execute(
|
|
"SELECT id, store, date, total, delivery_fee, order_id FROM receipts WHERE id = ?",
|
|
(receipt_id,),
|
|
).fetchone()
|
|
|
|
if receipt is None:
|
|
return None
|
|
|
|
items = conn.execute(
|
|
"""
|
|
SELECT id, name_raw, name_normalized, category, quantity, unit, unit_price, total_price
|
|
FROM items
|
|
WHERE receipt_id = ?
|
|
ORDER BY id
|
|
""",
|
|
(receipt_id,),
|
|
).fetchall()
|
|
|
|
return {
|
|
"id": receipt["id"],
|
|
"store": receipt["store"],
|
|
"date": receipt["date"],
|
|
"total": receipt["total"],
|
|
"delivery_fee": receipt["delivery_fee"],
|
|
"order_id": receipt["order_id"],
|
|
"items": [
|
|
{
|
|
"id": i["id"],
|
|
"name_raw": i["name_raw"],
|
|
"name_normalized": i["name_normalized"],
|
|
"category": i["category"],
|
|
"quantity": i["quantity"],
|
|
"unit": i["unit"],
|
|
"unit_price": i["unit_price"],
|
|
"total_price": i["total_price"],
|
|
}
|
|
for i in items
|
|
],
|
|
}
|
|
|
|
|
|
def get_pending_matches(conn: sqlite3.Connection) -> list[dict]:
|
|
"""Paires en attente de validation, avec prix moyens des deux enseignes.
|
|
|
|
Returns:
|
|
Liste de dicts {id, name_picnic, price_picnic, name_leclerc, price_leclerc, score}.
|
|
price_picnic / price_leclerc : prix moyen unitaire de ce produit dans la vue
|
|
price_history (None si aucune occurrence pour ce nom normalisé).
|
|
"""
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT
|
|
pm.id,
|
|
pm.name_picnic,
|
|
pm.name_leclerc,
|
|
pm.score,
|
|
ROUND(AVG(CASE WHEN ph.store='picnic' THEN ph.unit_price END), 2) AS price_picnic,
|
|
ROUND(AVG(CASE WHEN ph.store='leclerc' THEN ph.unit_price END), 2) AS price_leclerc
|
|
FROM product_matches pm
|
|
LEFT JOIN price_history ph
|
|
ON ph.name_normalized IN (pm.name_picnic, pm.name_leclerc)
|
|
WHERE pm.status = 'pending'
|
|
GROUP BY pm.id
|
|
ORDER BY pm.score DESC
|
|
"""
|
|
).fetchall()
|
|
|
|
return [
|
|
{
|
|
"id": r["id"],
|
|
"name_picnic": r["name_picnic"],
|
|
"name_leclerc": r["name_leclerc"],
|
|
"score": r["score"],
|
|
"price_picnic": r["price_picnic"],
|
|
"price_leclerc": r["price_leclerc"],
|
|
}
|
|
for r in rows
|
|
]
|
|
|
|
|
|
def get_validated_matches(conn: sqlite3.Connection) -> list[dict]:
|
|
"""Paires validées pour enrichir get_compare_prices.
|
|
|
|
Returns:
|
|
Liste de dicts {name_picnic, price_picnic, name_leclerc, price_leclerc, diff, diff_pct}.
|
|
"""
|
|
rows = conn.execute(
|
|
"""
|
|
WITH avg_prices AS (
|
|
SELECT name_normalized, store, ROUND(AVG(unit_price), 2) AS avg_price
|
|
FROM price_history
|
|
WHERE name_normalized IS NOT NULL
|
|
GROUP BY name_normalized, store
|
|
)
|
|
SELECT
|
|
pm.id,
|
|
pm.name_picnic,
|
|
pm.name_leclerc,
|
|
ap_p.avg_price AS price_picnic,
|
|
ap_l.avg_price AS price_leclerc,
|
|
ROUND(ap_l.avg_price - ap_p.avg_price, 2) AS diff,
|
|
ROUND(
|
|
(ap_l.avg_price - ap_p.avg_price)
|
|
/ MIN(ap_p.avg_price, ap_l.avg_price) * 100
|
|
, 1) AS diff_pct
|
|
FROM product_matches pm
|
|
JOIN avg_prices ap_p ON ap_p.name_normalized = pm.name_picnic AND ap_p.store = 'picnic'
|
|
JOIN avg_prices ap_l ON ap_l.name_normalized = pm.name_leclerc AND ap_l.store = 'leclerc'
|
|
WHERE pm.status = 'validated'
|
|
ORDER BY ABS(ap_l.avg_price - ap_p.avg_price) DESC
|
|
"""
|
|
).fetchall()
|
|
|
|
return [
|
|
{
|
|
"name_picnic": r["name_picnic"],
|
|
"name_leclerc": r["name_leclerc"],
|
|
"price_picnic": r["price_picnic"],
|
|
"price_leclerc": r["price_leclerc"],
|
|
"diff": r["diff"],
|
|
"diff_pct": r["diff_pct"],
|
|
}
|
|
for r in rows
|
|
]
|
|
|
|
|
|
def get_product_list(conn: sqlite3.Connection) -> list[str]:
|
|
"""Liste tous les noms normalisés distincts (non NULL) pour le sélecteur.
|
|
|
|
Returns:
|
|
Liste de str triée alphabétiquement.
|
|
"""
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT DISTINCT name_normalized
|
|
FROM items
|
|
WHERE name_normalized IS NOT NULL
|
|
ORDER BY name_normalized
|
|
"""
|
|
).fetchall()
|
|
|
|
return [r["name_normalized"] for r in rows]
|