Files
TicketTracker/tickettracker/web/queries.py
laurent 30e4b3e144 feat: dashboard web FastAPI Sprint 4
Ajout d'un dashboard lecture seule par-dessus la DB SQLite existante.

Fichiers créés :
  - tickettracker/web/queries.py   : 7 fonctions SQL (stats, compare, historique...)
  - tickettracker/web/api.py       : router /api/* JSON (FastAPI)
  - tickettracker/web/app.py       : routes HTML + Jinja2 + point d'entrée uvicorn
  - tickettracker/web/templates/   : base.html, index.html, compare.html, product.html, receipt.html
  - tickettracker/web/static/style.css : personnalisations Pico CSS
  - tests/test_web.py              : 19 tests (96 passent, 1 xfail OCR)

Fichiers modifiés :
  - requirements.txt : +fastapi, uvicorn[standard], jinja2, python-multipart, httpx
  - config.py        : +DB_PATH (lu depuis TICKETTRACKER_DB_PATH)

Lancement : python -m tickettracker.web.app

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-24 20:04:55 +01:00

298 lines
8.9 KiB
Python

"""
Requêtes SQL en lecture seule pour le dashboard web.
Toutes les fonctions reçoivent une connexion SQLite ouverte (pattern identique
à repository.py) et retournent des structures Python simples (dict, list).
L'appelant est responsable de l'ouverture et fermeture de la connexion.
"""
import sqlite3
def get_dashboard_stats(conn: sqlite3.Connection) -> dict:
"""Statistiques globales pour la page d'accueil.
Returns:
dict avec les clés :
- total_receipts : int
- total_spent : float
- total_items : int
- distinct_products : int
- receipts_by_store : dict[str, int]
- spent_by_store : dict[str, float]
- date_range : dict {min, max} ou {min: None, max: None}
"""
# Statistiques par enseigne
rows = conn.execute(
"SELECT store, COUNT(*) AS nb, SUM(total) AS spent FROM receipts GROUP BY store"
).fetchall()
receipts_by_store = {row["store"]: row["nb"] for row in rows}
spent_by_store = {row["store"]: round(row["spent"], 2) for row in rows}
total_receipts = sum(receipts_by_store.values())
total_spent = round(sum(row["spent"] for row in rows), 2) if rows else 0.0
# Statistiques articles
item_stats = conn.execute(
"""
SELECT
COUNT(*) AS total_items,
COUNT(DISTINCT name_normalized) AS distinct_products
FROM items
"""
).fetchone()
# Plage de dates
date_row = conn.execute(
"SELECT MIN(date) AS d_min, MAX(date) AS d_max FROM receipts"
).fetchone()
return {
"total_receipts": total_receipts,
"total_spent": total_spent,
"total_items": item_stats["total_items"],
"distinct_products": item_stats["distinct_products"],
"receipts_by_store": receipts_by_store,
"spent_by_store": spent_by_store,
"date_range": {"min": date_row["d_min"], "max": date_row["d_max"]},
}
def get_monthly_spending(conn: sqlite3.Connection) -> list[dict]:
"""Dépenses mensuelles par enseigne, pour le graphique Chart.js.
Returns:
Liste de dicts {month: "2026-01", store: "picnic", total: 45.20},
triée par mois puis enseigne.
"""
rows = conn.execute(
"""
SELECT
substr(date, 1, 7) AS month,
store,
ROUND(SUM(total), 2) AS total
FROM receipts
GROUP BY month, store
ORDER BY month, store
"""
).fetchall()
return [{"month": r["month"], "store": r["store"], "total": r["total"]} for r in rows]
def get_compare_prices(conn: sqlite3.Connection) -> list[dict]:
"""Comparaison de prix entre Picnic et Leclerc pour les produits communs.
Utilise la vue price_history. Ne retourne que les produits présents
dans les deux enseignes. Trié par écart décroissant (le plus cher en premier).
Returns:
Liste de dicts {name, price_picnic, price_leclerc, diff, diff_pct}.
diff = price_leclerc - price_picnic (positif = Leclerc plus cher)
diff_pct = diff / MIN(price_picnic, price_leclerc) * 100
"""
rows = conn.execute(
"""
WITH avg_by_store AS (
SELECT
name_normalized,
store,
ROUND(AVG(unit_price), 2) AS avg_price
FROM price_history
WHERE name_normalized IS NOT NULL
GROUP BY name_normalized, store
)
SELECT
a.name_normalized AS name,
a.avg_price AS price_picnic,
b.avg_price AS price_leclerc,
ROUND(b.avg_price - a.avg_price, 2) AS diff,
ROUND(
(b.avg_price - a.avg_price)
/ MIN(a.avg_price, b.avg_price) * 100
, 1) AS diff_pct
FROM avg_by_store a
JOIN avg_by_store b
ON a.name_normalized = b.name_normalized
AND a.store = 'picnic'
AND b.store = 'leclerc'
ORDER BY ABS(b.avg_price - a.avg_price) DESC
"""
).fetchall()
return [
{
"name": r["name"],
"price_picnic": r["price_picnic"],
"price_leclerc": r["price_leclerc"],
"diff": r["diff"],
"diff_pct": r["diff_pct"],
}
for r in rows
]
def get_product_history(conn: sqlite3.Connection, name: str) -> dict | None:
"""Historique des prix d'un produit normalisé.
Args:
conn: Connexion SQLite ouverte.
name: Valeur de name_normalized à rechercher (sensible à la casse).
Returns:
dict {name, min_price, max_price, avg_price, history: list[dict]}
ou None si le produit est inconnu.
Chaque entrée de history : {date, store, unit_price, quantity, unit}.
"""
# Statistiques globales
stats = conn.execute(
"""
SELECT
name_normalized,
ROUND(MIN(unit_price), 2) AS min_price,
ROUND(MAX(unit_price), 2) AS max_price,
ROUND(AVG(unit_price), 2) AS avg_price
FROM price_history
WHERE name_normalized = ?
""",
(name,),
).fetchone()
if stats is None or stats["name_normalized"] is None:
return None
# Historique chronologique
rows = conn.execute(
"""
SELECT date, store, unit_price, quantity, unit
FROM price_history
WHERE name_normalized = ?
ORDER BY date
""",
(name,),
).fetchall()
return {
"name": stats["name_normalized"],
"min_price": stats["min_price"],
"max_price": stats["max_price"],
"avg_price": stats["avg_price"],
"history": [
{
"date": r["date"],
"store": r["store"],
"unit_price": r["unit_price"],
"quantity": r["quantity"],
"unit": r["unit"],
}
for r in rows
],
}
def get_all_receipts(conn: sqlite3.Connection) -> list[dict]:
"""Liste tous les tickets avec le nombre d'articles associés.
Returns:
Liste de dicts {id, store, date, total, delivery_fee, order_id, nb_items},
triée par date décroissante (le plus récent en premier).
"""
rows = conn.execute(
"""
SELECT
r.id,
r.store,
r.date,
r.total,
r.delivery_fee,
r.order_id,
COUNT(i.id) AS nb_items
FROM receipts r
LEFT JOIN items i ON i.receipt_id = r.id
GROUP BY r.id
ORDER BY r.date DESC, r.id DESC
"""
).fetchall()
return [
{
"id": r["id"],
"store": r["store"],
"date": r["date"],
"total": r["total"],
"delivery_fee": r["delivery_fee"],
"order_id": r["order_id"],
"nb_items": r["nb_items"],
}
for r in rows
]
def get_receipt_detail(conn: sqlite3.Connection, receipt_id: int) -> dict | None:
"""Détail complet d'un ticket et de ses articles.
Args:
conn: Connexion SQLite ouverte.
receipt_id: Id du ticket à récupérer.
Returns:
dict avec les champs du ticket + items: list[dict], ou None si introuvable.
"""
receipt = conn.execute(
"SELECT id, store, date, total, delivery_fee, order_id FROM receipts WHERE id = ?",
(receipt_id,),
).fetchone()
if receipt is None:
return None
items = conn.execute(
"""
SELECT id, name_raw, name_normalized, category, quantity, unit, unit_price, total_price
FROM items
WHERE receipt_id = ?
ORDER BY id
""",
(receipt_id,),
).fetchall()
return {
"id": receipt["id"],
"store": receipt["store"],
"date": receipt["date"],
"total": receipt["total"],
"delivery_fee": receipt["delivery_fee"],
"order_id": receipt["order_id"],
"items": [
{
"id": i["id"],
"name_raw": i["name_raw"],
"name_normalized": i["name_normalized"],
"category": i["category"],
"quantity": i["quantity"],
"unit": i["unit"],
"unit_price": i["unit_price"],
"total_price": i["total_price"],
}
for i in items
],
}
def get_product_list(conn: sqlite3.Connection) -> list[str]:
"""Liste tous les noms normalisés distincts (non NULL) pour le sélecteur.
Returns:
Liste de str triée alphabétiquement.
"""
rows = conn.execute(
"""
SELECT DISTINCT name_normalized
FROM items
WHERE name_normalized IS NOT NULL
ORDER BY name_normalized
"""
).fetchall()
return [r["name_normalized"] for r in rows]