feat: migration Windows → Ubuntu, stabilisation suite de tests

- Ajout venv Python (.venv) avec pip bootstrap (python3-venv absent)
- Correction OCR Linux : marqueur TTC/TVA tolère la confusion T↔I
  (Tesseract 5.3.4 Linux lit parfois "TIc" au lieu de "TTC")
- test_leclerc.py : skipif si Tesseract absent, xfail pour test de somme
  (précision OCR variable entre plateformes, solution LLM vision prévue)
- Résultat : 77 passent, 1 xfail, 0 échec (vs 78 sur Windows)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-24 18:53:41 +01:00
parent bb62bd6eb6
commit 1e5fc97bb7
24 changed files with 3181 additions and 0 deletions

View File

@@ -0,0 +1 @@
# Package principal TicketTracker

222
tickettracker/cli.py Normal file
View File

@@ -0,0 +1,222 @@
"""
Point d'entrée CLI pour TicketTracker.
Utilisation :
python -m tickettracker.cli import fichier.html --source picnic
python -m tickettracker.cli import fichier.pdf --source leclerc [--db /chemin/db]
python -m tickettracker.cli stats
python -m tickettracker.cli stats --db /chemin/db
python -m tickettracker.cli normalize [--dry-run] [--batch-size N] [--db /chemin/db]
"""
import argparse
import logging
import sys
from pathlib import Path
from tickettracker.db.schema import DEFAULT_DB_PATH
from tickettracker import pipeline
# Affiche les messages INFO dans le terminal (utile pour voir les doublons skippés)
logging.basicConfig(level=logging.INFO, format="%(message)s")
def build_parser() -> argparse.ArgumentParser:
"""Construit le parseur d'arguments CLI.
Structure :
tickettracker.cli
├── import <file> --source {picnic,leclerc} [--db PATH]
├── stats [--db PATH]
└── normalize [--dry-run] [--batch-size N] [--db PATH]
"""
parser = argparse.ArgumentParser(
prog="python -m tickettracker.cli",
description="TicketTracker — import et analyse de tickets de courses",
)
subparsers = parser.add_subparsers(dest="command", required=True)
# --- Sous-commande : import ---
import_parser = subparsers.add_parser(
"import",
help="Parse et importe un ticket dans la base SQLite",
)
import_parser.add_argument(
"file",
type=Path,
help="Chemin vers le fichier à importer (.html pour Picnic, .pdf pour Leclerc)",
)
import_parser.add_argument(
"--source",
required=True,
choices=["picnic", "leclerc"],
help="Format du fichier",
)
import_parser.add_argument(
"--db",
type=Path,
default=DEFAULT_DB_PATH,
metavar="PATH",
help=f"Chemin vers la base SQLite (défaut : {DEFAULT_DB_PATH})",
)
# --- Sous-commande : stats ---
stats_parser = subparsers.add_parser(
"stats",
help="Affiche un résumé de la base de données",
)
stats_parser.add_argument(
"--db",
type=Path,
default=DEFAULT_DB_PATH,
metavar="PATH",
help=f"Chemin vers la base SQLite (défaut : {DEFAULT_DB_PATH})",
)
# --- Sous-commande : normalize ---
from tickettracker import config as _cfg
normalize_parser = subparsers.add_parser(
"normalize",
help="Normalise les noms de produits via le LLM",
)
normalize_parser.add_argument(
"--db",
type=Path,
default=DEFAULT_DB_PATH,
metavar="PATH",
help=f"Chemin vers la base SQLite (défaut : {DEFAULT_DB_PATH})",
)
normalize_parser.add_argument(
"--dry-run",
action="store_true",
help="Calcule les normalisations sans écrire en base",
)
normalize_parser.add_argument(
"--batch-size",
type=int,
default=_cfg.LLM_BATCH_SIZE,
metavar="N",
help=f"Articles par appel LLM (défaut : {_cfg.LLM_BATCH_SIZE})",
)
return parser
def cmd_import(args: argparse.Namespace) -> int:
"""Exécute la sous-commande 'import'.
Returns:
0 si succès (ticket inséré ou déjà présent), 1 si erreur.
"""
try:
inserted = pipeline.import_receipt(args.file, args.source, args.db)
if inserted:
print(f"OK Ticket importé depuis {args.file}")
else:
print(f"[skip] Ticket déjà présent en base — import ignoré.")
return 0
except (FileNotFoundError, ValueError) as e:
print(f"Erreur : {e}", file=sys.stderr)
return 1
except Exception as e:
print(f"Erreur inattendue : {e}", file=sys.stderr)
return 1
def cmd_stats(args: argparse.Namespace) -> int:
"""Exécute la sous-commande 'stats'.
Returns:
0 si succès, 1 si la base est absente ou vide.
"""
from tickettracker.db import schema, repository
if not Path(args.db).exists():
print(f"Base de données absente : {args.db}", file=sys.stderr)
print("Importez d'abord un ticket avec la commande 'import'.", file=sys.stderr)
return 1
with schema.get_connection(args.db) as conn:
stats = repository.get_stats(conn)
total_receipts = sum(stats["receipts_by_store"].values())
if total_receipts == 0:
print("Aucun ticket en base.")
return 0
print("--- TicketTracker : résumé ---")
print("Tickets par enseigne :")
for store, nb in sorted(stats["receipts_by_store"].items()):
print(f" {store:<10}: {nb} ticket(s)")
print(f"Total dépensé : {stats['total_spent']:.2f}")
print(f"Nombre d'articles : {stats['total_items']} lignes")
normalized = stats["distinct_normalized"]
null_count = stats["null_normalized"]
total_items = stats["total_items"]
print(f"Noms normalisés : {normalized} distincts / {total_items} articles")
if null_count > 0:
print(f" ({null_count} articles sans nom normalisé)")
print(" Lancez : python -m tickettracker.cli normalize")
return 0
def cmd_normalize(args: argparse.Namespace) -> int:
"""Exécute la sous-commande 'normalize'.
Normalise les articles dont name_normalized est NULL en appelant
le LLM par batchs. Avec --dry-run, affiche sans écrire en base.
Returns:
0 si succès ou dry-run, 1 si erreur (LLM injoignable, clé manquante…).
"""
from tickettracker import config
from tickettracker.llm.client import LLMError, LLMUnavailable
from tickettracker.llm import normalizer
# Vérification préalable de la clé API
if not config.LLM_API_KEY:
print(
"Erreur : clé API LLM manquante.\n"
"Définissez la variable d'environnement TICKETTRACKER_LLM_API_KEY.",
file=sys.stderr,
)
return 1
if not Path(args.db).exists():
print(f"Base de données absente : {args.db}", file=sys.stderr)
print("Importez d'abord un ticket avec la commande 'import'.", file=sys.stderr)
return 1
try:
nb_ok, nb_err = normalizer.normalize_all_in_db(
db_path=args.db,
batch_size=args.batch_size,
dry_run=args.dry_run,
)
return 0 if nb_err == 0 else 1
except LLMUnavailable as e:
print(f"LLM injoignable : {e}", file=sys.stderr)
return 1
except LLMError as e:
print(f"Erreur LLM : {e}", file=sys.stderr)
return 1
except Exception as e:
print(f"Erreur inattendue : {e}", file=sys.stderr)
return 1
def main() -> None:
"""Point d'entrée principal."""
parser = build_parser()
args = parser.parse_args()
if args.command == "import":
sys.exit(cmd_import(args))
elif args.command == "stats":
sys.exit(cmd_stats(args))
elif args.command == "normalize":
sys.exit(cmd_normalize(args))
if __name__ == "__main__":
main()

37
tickettracker/config.py Normal file
View File

@@ -0,0 +1,37 @@
"""
Configuration de TicketTracker.
Toutes les valeurs sensibles (clé API) sont lues depuis des variables
d'environnement et ne doivent jamais être codées en dur.
Variables d'environnement disponibles :
TICKETTRACKER_LLM_URL URL de l'endpoint OpenAI-compatible
TICKETTRACKER_LLM_MODEL Nom du modèle LLM
TICKETTRACKER_LLM_API_KEY Clé API (obligatoire pour Mammouth)
TICKETTRACKER_LLM_TIMEOUT Timeout en secondes (défaut : 60)
TICKETTRACKER_LLM_BATCH_SIZE Taille des batchs de normalisation (défaut : 20)
"""
import os
# ---------------------------------------------------------------------------
# LLM
# ---------------------------------------------------------------------------
# URL de l'endpoint compatible OpenAI (Mammouth)
LLM_URL: str = os.environ.get(
"TICKETTRACKER_LLM_URL",
"https://api.mammouth.ai/v1/chat/completions",
)
# Modèle à utiliser
LLM_MODEL: str = os.environ.get("TICKETTRACKER_LLM_MODEL", "mistral-small-3.2-24b-instruct")
# Clé API — jamais de valeur par défaut sensible ici
LLM_API_KEY: str = os.environ.get("TICKETTRACKER_LLM_API_KEY", "")
# Timeout par appel en secondes (le modèle local peut être lent)
LLM_TIMEOUT: int = int(os.environ.get("TICKETTRACKER_LLM_TIMEOUT", "60"))
# Nombre d'articles traités par appel LLM
LLM_BATCH_SIZE: int = int(os.environ.get("TICKETTRACKER_LLM_BATCH_SIZE", "20"))

View File

@@ -0,0 +1 @@
# Couche base de données SQLite

View File

@@ -0,0 +1,177 @@
"""
Fonctions de lecture/écriture dans la base SQLite.
Ce module est la seule couche qui manipule les données.
Toutes les fonctions reçoivent une connexion ouverte — elles ne
gèrent pas les connexions elles-mêmes (séparation des responsabilités).
"""
import sqlite3
from datetime import datetime, timezone
from typing import Optional
from tickettracker.models.receipt import Receipt
def receipt_exists(conn: sqlite3.Connection, store: str, date: str, total: float) -> bool:
"""Vérifie si un ticket identique existe déjà en base.
La déduplication repose sur le triplet (store, date, total).
Suffisant pour éviter les doubles imports accidentels d'un même fichier.
Args:
conn: Connexion SQLite ouverte.
store: Enseigne ('picnic' ou 'leclerc').
date: Date ISO 8601 (ex: '2026-02-14').
total: Montant total payé.
Returns:
True si un ticket avec ces valeurs existe déjà.
"""
row = conn.execute(
"SELECT COUNT(*) FROM receipts WHERE store = ? AND date = ? AND total = ?",
(store, date, total),
).fetchone()
return row[0] > 0
def insert_receipt(conn: sqlite3.Connection, receipt: Receipt) -> int:
"""Insère un ticket et tous ses articles dans la base.
Utilise une transaction implicite : si l'insertion des articles échoue,
le ticket est aussi annulé (atomicité garantie par le context manager).
Args:
conn: Connexion SQLite ouverte.
receipt: Ticket normalisé à insérer.
Returns:
L'id (INTEGER) du ticket inséré dans la table receipts.
Raises:
sqlite3.IntegrityError: En cas de violation de contrainte FK.
"""
created_at = datetime.now(timezone.utc).isoformat()
with conn:
cur = conn.execute(
"""
INSERT INTO receipts (store, date, total, delivery_fee, order_id, raw_json, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(
receipt.store,
receipt.date.isoformat(),
receipt.total,
receipt.delivery_fee,
receipt.order_id,
receipt.to_json(),
created_at,
),
)
receipt_id = cur.lastrowid
# Insertion de tous les articles en une seule passe
conn.executemany(
"""
INSERT INTO items
(receipt_id, name_raw, name_normalized, category,
quantity, unit, unit_price, total_price)
VALUES (?, ?, NULL, ?, ?, ?, ?, ?)
""",
[
(
receipt_id,
item.name,
item.category,
item.quantity,
item.unit,
item.unit_price,
item.total_price,
)
for item in receipt.items
],
)
return receipt_id
def get_stats(conn: sqlite3.Connection) -> dict:
"""Calcule les statistiques globales pour la commande CLI 'stats'.
Returns:
Dictionnaire avec :
- receipts_by_store : dict[str, int] — nb tickets par enseigne
- total_spent : float — somme de tous les totaux
- total_items : int — nb total de lignes dans items
- distinct_normalized : int — nb de name_normalized distincts (non NULL)
- null_normalized : int — nb d'articles sans name_normalized
"""
# Tickets par enseigne + total dépensé
rows = conn.execute(
"SELECT store, COUNT(*) AS nb, SUM(total) AS spent FROM receipts GROUP BY store"
).fetchall()
receipts_by_store = {row["store"]: row["nb"] for row in rows}
total_spent = sum(row["spent"] for row in rows) if rows else 0.0
# Statistiques articles
item_stats = conn.execute(
"""
SELECT
COUNT(*) AS total_items,
COUNT(DISTINCT name_normalized) AS distinct_normalized,
SUM(CASE WHEN name_normalized IS NULL THEN 1 ELSE 0 END) AS null_normalized
FROM items
"""
).fetchone()
return {
"receipts_by_store": receipts_by_store,
"total_spent": total_spent,
"total_items": item_stats["total_items"],
"distinct_normalized": item_stats["distinct_normalized"],
"null_normalized": item_stats["null_normalized"],
}
def fetch_unnormalized(
conn: sqlite3.Connection,
limit: Optional[int] = None,
) -> list[sqlite3.Row]:
"""Retourne les articles dont name_normalized est NULL.
Chaque Row expose les clés : id, name_raw, receipt_id.
Trié par id pour un traitement reproductible.
Args:
conn: Connexion SQLite ouverte.
limit: Si fourni, retourne au maximum `limit` articles.
Returns:
Liste de sqlite3.Row.
"""
sql = "SELECT id, name_raw, receipt_id FROM items WHERE name_normalized IS NULL ORDER BY id"
if limit is not None:
sql += f" LIMIT {int(limit)}"
return conn.execute(sql).fetchall()
def update_normalized(
conn: sqlite3.Connection,
item_id: int,
name_normalized: str,
) -> None:
"""Met à jour le nom normalisé d'un article.
N'utilise pas de transaction ici : c'est l'appelant (normalizer.py)
qui gère la transaction globale pour pouvoir faire un commit groupé.
Args:
conn: Connexion SQLite ouverte.
item_id: Id de l'article à mettre à jour.
name_normalized: Valeur à écrire dans name_normalized.
"""
conn.execute(
"UPDATE items SET name_normalized = ? WHERE id = ?",
(name_normalized, item_id),
)

127
tickettracker/db/schema.py Normal file
View File

@@ -0,0 +1,127 @@
"""
Schéma SQLite pour TicketTracker.
Ce module gère uniquement le DDL (création des tables, vues et index).
Il ne contient pas de logique métier.
Tables :
receipts — un ticket de courses par ligne
items — articles, liés à leur ticket par FK
Vue :
price_history — jointure items × receipts pour comparer les prix dans le temps
"""
import sqlite3
from pathlib import Path
# Chemin par défaut : data/tickettracker.db à la racine du projet
DEFAULT_DB_PATH = Path(__file__).parent.parent.parent / "data" / "tickettracker.db"
# ---------------------------------------------------------------------------
# Instructions DDL (CREATE TABLE / INDEX / VIEW)
# ---------------------------------------------------------------------------
_SQL_CREATE_RECEIPTS = """
CREATE TABLE IF NOT EXISTS receipts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
store TEXT NOT NULL,
date TEXT NOT NULL, -- format ISO 8601 : "2026-02-14"
total REAL NOT NULL,
delivery_fee REAL, -- NULL pour Leclerc (magasin physique)
order_id TEXT, -- NULL si non disponible
raw_json TEXT NOT NULL, -- résultat de receipt.to_json() pour debug
created_at TEXT NOT NULL -- datetime UTC ISO au moment de l'insertion
);
"""
_SQL_CREATE_RECEIPTS_IDX = """
CREATE INDEX IF NOT EXISTS idx_receipts_dedup
ON receipts (store, date, total);
"""
_SQL_CREATE_ITEMS = """
CREATE TABLE IF NOT EXISTS items (
id INTEGER PRIMARY KEY AUTOINCREMENT,
receipt_id INTEGER NOT NULL REFERENCES receipts(id),
name_raw TEXT NOT NULL, -- nom tel que sorti du parser
name_normalized TEXT, -- NULL jusqu'au Sprint 3 (normalisation LLM)
category TEXT, -- NULL pour Picnic (pas de catégories dans le mail)
quantity REAL NOT NULL,
unit TEXT NOT NULL,
unit_price REAL NOT NULL,
total_price REAL NOT NULL
);
"""
_SQL_CREATE_ITEMS_IDX = """
CREATE INDEX IF NOT EXISTS idx_items_receipt_id
ON items (receipt_id);
"""
_SQL_CREATE_ITEMS_NORM_IDX = """
CREATE INDEX IF NOT EXISTS idx_items_name_normalized
ON items (name_normalized);
"""
_SQL_CREATE_PRICE_HISTORY = """
CREATE VIEW IF NOT EXISTS price_history AS
SELECT
i.name_normalized,
r.store,
r.date,
i.unit_price,
i.total_price,
i.quantity,
i.unit,
i.category
FROM items i
JOIN receipts r ON i.receipt_id = r.id;
"""
# ---------------------------------------------------------------------------
# Fonctions publiques
# ---------------------------------------------------------------------------
def get_connection(db_path: str | Path = DEFAULT_DB_PATH) -> sqlite3.Connection:
"""Ouvre une connexion SQLite avec les pragmas requis.
Active les clés étrangères (désactivées par défaut dans SQLite —
le pragma doit être réappliqué à chaque nouvelle connexion).
Configure row_factory = sqlite3.Row pour accéder aux colonnes par nom.
Args:
db_path: Chemin vers le fichier .db (créé automatiquement si absent).
Returns:
Connexion sqlite3 configurée.
"""
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA foreign_keys = ON")
return conn
def init_db(db_path: str | Path = DEFAULT_DB_PATH) -> None:
"""Crée les tables, index et vues s'ils n'existent pas encore.
Idempotent : peut être appelé plusieurs fois sans erreur grâce aux
clauses CREATE TABLE IF NOT EXISTS / CREATE INDEX IF NOT EXISTS.
Crée le dossier parent (data/) s'il n'existe pas.
Args:
db_path: Chemin vers le fichier .db.
Raises:
PermissionError: Si le système de fichiers refuse la création du dossier.
"""
db_path = Path(db_path)
db_path.parent.mkdir(parents=True, exist_ok=True)
with get_connection(db_path) as conn:
conn.execute(_SQL_CREATE_RECEIPTS)
conn.execute(_SQL_CREATE_RECEIPTS_IDX)
conn.execute(_SQL_CREATE_ITEMS)
conn.execute(_SQL_CREATE_ITEMS_IDX)
conn.execute(_SQL_CREATE_ITEMS_NORM_IDX)
conn.execute(_SQL_CREATE_PRICE_HISTORY)

View File

@@ -0,0 +1 @@
# Module LLM — normalisation des noms de produits

View File

@@ -0,0 +1,99 @@
"""
Client HTTP bas niveau pour l'API LLM compatible OpenAI.
Ce module ne contient qu'une seule fonction publique : call_llm().
Il ne connaît pas la logique de normalisation — c'est le rôle de normalizer.py.
Exceptions levées :
LLMUnavailable — serveur injoignable (timeout, connexion refusée)
LLMError — réponse HTTP ≥ 400 ou format de réponse inattendu
"""
import logging
import requests
from tickettracker import config
logger = logging.getLogger(__name__)
class LLMUnavailable(Exception):
"""Le serveur LLM est injoignable (réseau, timeout)."""
class LLMError(Exception):
"""L'API LLM a retourné une erreur (HTTP ≥ 400 ou réponse malformée)."""
def call_llm(
messages: list[dict],
*,
model: str | None = None,
timeout: int | None = None,
) -> str:
"""Appelle l'API LLM et retourne le texte brut de la réponse.
Args:
messages: Liste de messages au format OpenAI
[{"role": "system", "content": "..."}, {"role": "user", "content": "..."}]
model: Nom du modèle (défaut : config.LLM_MODEL)
timeout: Timeout en secondes (défaut : config.LLM_TIMEOUT)
Returns:
Le texte du premier choix de la réponse.
Raises:
LLMUnavailable: Si le serveur est injoignable.
LLMError: Si l'API retourne une erreur ou une réponse inattendue.
"""
_model = model or config.LLM_MODEL
_timeout = timeout if timeout is not None else config.LLM_TIMEOUT
if not config.LLM_API_KEY:
raise LLMError(
"Clé API LLM manquante. "
"Définissez la variable d'environnement TICKETTRACKER_LLM_API_KEY."
)
headers = {
"Authorization": f"Bearer {config.LLM_API_KEY}",
"Content-Type": "application/json",
}
payload = {
"model": _model,
"messages": messages,
"temperature": 0.1, # faible variabilité : on veut un format stable
}
logger.debug("Appel LLM %s (model=%s, timeout=%ds)", config.LLM_URL, _model, _timeout)
try:
response = requests.post(
config.LLM_URL,
json=payload,
headers=headers,
timeout=_timeout,
)
except requests.exceptions.Timeout:
raise LLMUnavailable(
f"Timeout après {_timeout}s lors de l'appel au LLM ({config.LLM_URL})."
)
except requests.exceptions.ConnectionError as e:
raise LLMUnavailable(
f"Impossible de joindre le serveur LLM ({config.LLM_URL}) : {e}"
)
if not response.ok:
raise LLMError(
f"Erreur API LLM : HTTP {response.status_code}{response.text[:200]}"
)
try:
data = response.json()
return data["choices"][0]["message"]["content"]
except (KeyError, IndexError, ValueError) as e:
raise LLMError(
f"Réponse LLM inattendue (impossible d'extraire le contenu) : {e}\n"
f"Réponse brute : {response.text[:300]}"
) from e

View File

@@ -0,0 +1,279 @@
"""
Normalisation des noms de produits via LLM.
Ce module orchestre les appels au LLM pour transformer les noms bruts
(OCR Leclerc, HTML Picnic) en noms normalisés au format :
"Nom du produit | Marque | Quantité"
Exemples :
"NOIX CAJOU""Noix de cajou | MDD | -"
"COCA COLA CHERRY 1.25L""Coca-Cola Cherry | Coca-Cola | 1,25L"
"PQ LOTUS CONFORT X6""Papier toilette confort | Lotus | x6"
"""
import logging
import re
from pathlib import Path
from typing import Union
from tickettracker import config
from tickettracker.llm.client import LLMError, LLMUnavailable, call_llm
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# System prompt calibré pour Mistral
# ---------------------------------------------------------------------------
_SYSTEM_PROMPT = """\
Tu es un assistant de normalisation de noms de produits alimentaires et ménagers.
Pour chaque nom de produit, réponds au format strict :
Nom du produit | Marque | Quantité
Règles :
- Nom : en français, lisible, sans abréviation, avec accents et majuscules correctes
- Marque : nom exact de la marque, ou "MDD" si marque de distributeur ou inconnue
- Quantité : format court (50cl, 1L, 200g, x6, 1kg) ou "-" si absente du nom brut
Réponds UNIQUEMENT avec les lignes numérotées demandées. Aucun commentaire, aucune explication.\
"""
# ---------------------------------------------------------------------------
# Parsing de la réponse LLM
# ---------------------------------------------------------------------------
# Accepte : "1. Nom | Marque | Qté" ou "1) Nom | Marque | Qté"
_LINE_RE = re.compile(
r"^\d+[.)]\s*" # numéro suivi de . ou )
r"(?P<nom>.+?)" # nom du produit
r"\s*\|\s*"
r"(?P<marque>.+?)" # marque
r"\s*\|\s*"
r"(?P<qte>.+?)" # quantité
r"\s*$",
re.UNICODE,
)
def _parse_normalized_line(line: str) -> str | None:
"""Extrait le nom normalisé d'une ligne numérotée.
Args:
line: Ligne de réponse LLM, ex: "1. Crème fraîche | MDD | 50cl"
Returns:
"Crème fraîche | MDD | 50cl" si la ligne est valide, None sinon.
"""
m = _LINE_RE.match(line.strip())
if not m:
return None
nom = m.group("nom").strip()
marque = m.group("marque").strip()
qte = m.group("qte").strip()
# Valide que les trois champs ne sont pas vides
if not nom or not marque or not qte:
return None
return f"{nom} | {marque} | {qte}"
def _parse_batch_response(response_text: str, expected_count: int) -> list[str | None]:
"""Transforme la réponse brute du LLM en liste de noms normalisés.
Si le nombre de lignes valides ne correspond pas à expected_count,
retourne une liste de None pour déclencher le fallback.
Args:
response_text: Texte brut retourné par le LLM.
expected_count: Nombre d'items attendus.
Returns:
Liste de longueur expected_count — chaque élément est le nom normalisé
ou None si la ligne correspondante est invalide.
"""
# Garde uniquement les lignes non vides
lines = [l for l in response_text.splitlines() if l.strip()]
# Extrait les lignes numérotées (ignore le bruit éventuel)
parsed = [_parse_normalized_line(l) for l in lines if _LINE_RE.match(l.strip())]
if len(parsed) != expected_count:
logger.warning(
"Batch LLM : attendu %d lignes, reçu %d — fallback unitaire.",
expected_count,
len(parsed),
)
return [None] * expected_count
return parsed
# ---------------------------------------------------------------------------
# Fonctions publiques
# ---------------------------------------------------------------------------
def normalize_product_name(raw_name: str) -> str | None:
"""Normalise un seul nom de produit via le LLM.
Args:
raw_name: Nom brut issu du parser (OCR ou HTML).
Returns:
Nom normalisé "Nom | Marque | Quantité", ou None si le LLM
échoue ou retourne une réponse non parsable.
"""
try:
response = call_llm([
{"role": "system", "content": _SYSTEM_PROMPT},
{
"role": "user",
"content": f"Normalise ce nom de produit :\n1. {raw_name}",
},
])
except (LLMError, LLMUnavailable) as e:
logger.warning("Normalisation unitaire échouée pour %r : %s", raw_name, e)
return None
lines = [l for l in response.splitlines() if l.strip()]
for line in lines:
result = _parse_normalized_line(line)
if result:
return result
logger.warning(
"Réponse LLM non parsable pour %r : %r", raw_name, response[:100]
)
return None
def normalize_batch(raw_names: list[str]) -> list[str | None]:
"""Normalise une liste de noms en un seul appel LLM.
Envoie tous les noms dans un seul prompt numéroté.
Si la réponse ne contient pas exactement len(raw_names) lignes,
retourne une liste de None (le fallback unitaire sera utilisé).
Args:
raw_names: Liste de noms bruts.
Returns:
Liste de même longueur : nom normalisé ou None.
Raises:
LLMUnavailable: Si le serveur est injoignable (propagé pour que
normalize_all_in_db puisse distinguer erreur réseau vs parsing).
"""
if not raw_names:
return []
numbered = "\n".join(f"{i + 1}. {name}" for i, name in enumerate(raw_names))
user_content = (
f"Normalise ces {len(raw_names)} noms de produits :\n"
f"{numbered}\n\n"
f"Retourne exactement {len(raw_names)} lignes numérotées."
)
try:
response = call_llm([
{"role": "system", "content": _SYSTEM_PROMPT},
{"role": "user", "content": user_content},
])
except LLMUnavailable:
raise # propagé : erreur réseau, pas la peine de retenter
except LLMError as e:
logger.warning("Batch LLM échoué : %s — fallback unitaire.", e)
return [None] * len(raw_names)
return _parse_batch_response(response, len(raw_names))
def normalize_all_in_db(
db_path: Union[str, Path],
batch_size: int = config.LLM_BATCH_SIZE,
dry_run: bool = False,
) -> tuple[int, int]:
"""Normalise tous les articles dont name_normalized est NULL.
Algorithme :
1. Récupère les items NULL depuis la DB
2. Traite par batch de batch_size
3. Pour chaque batch : appel LLM groupé
- Si le batch réussit → UPDATE en base (sauf dry_run)
- Si le batch échoue (mauvais count) → fallback un par un
4. Affiche la progression
Args:
db_path: Chemin vers la base SQLite.
batch_size: Nombre d'articles par appel LLM.
dry_run: Si True, calcule mais n'écrit pas en base.
Returns:
(nb_normalisés, nb_erreurs) — erreurs = items restés NULL.
"""
from tickettracker.db import schema, repository
schema.init_db(db_path)
conn = schema.get_connection(db_path)
try:
items = repository.fetch_unnormalized(conn)
total = len(items)
if total == 0:
print("Aucun article à normaliser.")
return (0, 0)
mode = "[DRY-RUN] " if dry_run else ""
print(f"{mode}{total} article(s) à normaliser (batchs de {batch_size})...")
nb_ok = 0
nb_err = 0
done = 0
for start in range(0, total, batch_size):
batch = items[start: start + batch_size]
raw_names = [row["name_raw"] for row in batch]
# --- Tentative batch ---
try:
results = normalize_batch(raw_names)
except LLMUnavailable as e:
logger.error("LLM injoignable : %s", e)
print(
f"\nErreur réseau — arrêt. {nb_ok} articles normalisés, "
f"{total - done} restants."
)
return (nb_ok, total - done)
# Si normalize_batch a retourné que des None (batch échoué),
# tente le fallback un par un
if all(r is None for r in results):
logger.debug("Fallback unitaire pour le batch %d%d.", start, start + len(batch))
results = [normalize_product_name(name) for name in raw_names]
# --- Mise à jour ou affichage ---
for item, normalized in zip(batch, results):
done += 1
if normalized:
print(
f"[{done}/{total}] {item['name_raw']!r} "
f"{normalized}"
)
if not dry_run:
repository.update_normalized(conn, item["id"], normalized)
nb_ok += 1
else:
logger.warning(
"[%d/%d] Impossible de normaliser %r — item ignoré.",
done, total, item["name_raw"],
)
nb_err += 1
# Commit final (toutes les mises à jour sont dans la même transaction implicite)
if not dry_run:
conn.commit()
print(
f"\n{mode}Terminé : {nb_ok} normalisé(s), {nb_err} erreur(s)."
)
return (nb_ok, nb_err)
finally:
conn.close()

View File

@@ -0,0 +1 @@
# Modèles de données communs

View File

@@ -0,0 +1,72 @@
"""
Modèle de données commun pour les tickets de courses.
Toutes les enseignes (Picnic, Leclerc, etc.) produisent
une instance de Receipt après parsing. C'est le format
JSON normalisé en sortie.
"""
import json
from dataclasses import dataclass, field, asdict
from datetime import date
from typing import Optional
@dataclass
class Item:
"""Un article sur le ticket de courses."""
name: str
"""Nom du produit tel qu'il apparaît sur le ticket."""
quantity: float
"""Quantité achetée (ex: 2.0, 0.5)."""
unit: str
"""Unité de mesure : 'pièce', 'kg', 'L', 'g', etc."""
unit_price: float
"""Prix unitaire en euros."""
total_price: float
"""Prix total pour cet article (quantity × unit_price)."""
category: Optional[str] = None
"""Catégorie du produit, si disponible (ex: 'Fruits & Légumes')."""
@dataclass
class Receipt:
"""Ticket de courses normalisé, toutes enseignes confondues."""
store: str
"""Nom de l'enseigne : 'picnic' ou 'leclerc'."""
date: date
"""Date de la commande ou de l'achat."""
total: float
"""Montant total payé en euros."""
items: list[Item] = field(default_factory=list)
"""Liste des articles achetés."""
currency: str = "EUR"
"""Devise (EUR par défaut)."""
order_id: Optional[str] = None
"""Identifiant de commande, si disponible."""
delivery_fee: Optional[float] = None
"""Frais de livraison en euros, si applicable (None pour Leclerc)."""
def to_dict(self) -> dict:
"""Convertit le ticket en dictionnaire JSON-sérialisable."""
d = asdict(self)
# La date n'est pas JSON-sérialisable nativement, on la convertit en string ISO
d["date"] = self.date.isoformat()
return d
def to_json(self) -> str:
"""Sérialise le ticket en JSON formaté."""
return json.dumps(self.to_dict(), ensure_ascii=False, indent=2)

View File

@@ -0,0 +1 @@
# Parsers de tickets de courses

View File

@@ -0,0 +1,403 @@
"""
Parser pour les tickets de caisse PDF Leclerc.
Les PDFs Leclerc sont des scans d'images (pas de couche texte sélectionnable).
Le parser extrait l'image JPEG embarquée et applique Tesseract OCR pour
récupérer le texte, puis l'analyse ligne par ligne.
Prérequis système :
- Tesseract OCR 5.x installé :
Windows : https://github.com/UB-Mannheim/tesseract/wiki
Linux : apt install tesseract-ocr tesseract-ocr-fra
- Modèle français (fra.traineddata) dans le dossier tessdata.
Si les droits manquent pour écrire dans le dossier système,
placer les fichiers eng.traineddata et fra.traineddata dans
un dossier local et définir TESSDATA_PREFIX=<chemin_du_dossier>.
- pip install pytesseract pillow pdfplumber
Structure du ticket Leclerc :
>> CATEGORIE → titre de catégorie (gras sur ticket)
NOM PRODUIT PRIX TVA → article standard (1 ligne)
* NOM PRODUIT PRIX TVA → idem, avec marque promotion
NOM PRODUIT → article multi-unités (pas de prix ici)
QTY X PRIX_UNIT€ TOTAL TVA → ligne de prix (suite du précédent)
Total NN articles TOTAL_TTC → total avant remises
Bon reduction MONTANT → bon de réduction (peut se répéter)
CB MONTANT_FINAL → montant payé par carte
Dépendances Python : pdfplumber, pytesseract, Pillow
"""
import io
import os
import re
from datetime import date
from typing import Optional
import pdfplumber
import pytesseract
from PIL import Image
from tickettracker.models.receipt import Item, Receipt
# ---------------------------------------------------------------------------
# Configuration Tesseract
# ---------------------------------------------------------------------------
# Chemins Tesseract standards selon l'OS
_TESSERACT_PATHS = [
r"C:/Program Files/Tesseract-OCR/tesseract.exe",
r"C:/Program Files (x86)/Tesseract-OCR/tesseract.exe",
"/usr/bin/tesseract",
"/usr/local/bin/tesseract",
]
# Correspondance noms de mois français → numéro
_MOIS_FR = {
"janvier": 1, "février": 2, "mars": 3, "avril": 4,
"mai": 5, "juin": 6, "juillet": 7, "août": 8,
"septembre": 9, "octobre": 10, "novembre": 11, "décembre": 12,
}
def _configure_tesseract() -> None:
"""Détecte et configure le binaire Tesseract et le dossier tessdata.
Priorité :
1. Variable d'environnement TESSERACT_CMD (chemin vers le binaire)
2. Chemins standards Windows/Linux
Pour tessdata :
1. Variable TESSDATA_PREFIX déjà définie dans l'environnement
2. Dossier tessdata/ à côté de ce fichier (usage dev avec droits limités)
"""
# Binaire
cmd = os.environ.get("TESSERACT_CMD")
if not cmd:
for p in _TESSERACT_PATHS:
if os.path.isfile(p):
cmd = p
break
if cmd:
pytesseract.pytesseract.tesseract_cmd = cmd
# Tessdata (uniquement si pas déjà configuré)
if not os.environ.get("TESSDATA_PREFIX"):
# Cherche tessdata/ dans le répertoire du projet (2 niveaux au-dessus)
here = os.path.dirname(os.path.abspath(__file__))
local_tessdata = os.path.join(here, "..", "..", "tessdata")
if os.path.isdir(local_tessdata) and os.path.isfile(
os.path.join(local_tessdata, "fra.traineddata")
):
os.environ["TESSDATA_PREFIX"] = os.path.abspath(local_tessdata)
# ---------------------------------------------------------------------------
# Point d'entrée public
# ---------------------------------------------------------------------------
def parse(pdf_path: str) -> Receipt:
"""Parse un PDF de ticket Leclerc et retourne un ticket normalisé.
Args:
pdf_path: Chemin vers le fichier PDF du ticket Leclerc.
Le PDF doit être un scan image (type Leclerc classique).
Returns:
Receipt: Ticket de courses normalisé avec tous les articles et
leurs catégories.
Raises:
ValueError: Si la date ou le total sont introuvables.
RuntimeError: Si Tesseract n'est pas installé ou pas configuré.
"""
_configure_tesseract()
text = _extract_text_from_pdf(pdf_path)
return _parse_text(text)
# ---------------------------------------------------------------------------
# Extraction image + OCR
# ---------------------------------------------------------------------------
def _extract_text_from_pdf(pdf_path: str) -> str:
"""Extrait l'image JPEG du PDF et retourne le texte OCR.
Les tickets Leclerc sont une unique image haute résolution (1650×10386)
découpée sur plusieurs pages PDF. L'image est identique dans le flux de
chaque page ; on l'extrait une seule fois depuis la page 1.
"""
with pdfplumber.open(pdf_path) as pdf:
if not pdf.pages or not pdf.pages[0].images:
raise ValueError(
f"Le PDF '{pdf_path}' ne contient pas d'image en page 1. "
"Le format Leclerc attendu est un scan image (JPEG embarqué)."
)
img_obj = pdf.pages[0].images[0]
raw_jpeg = img_obj["stream"].get_rawdata()
img = Image.open(io.BytesIO(raw_jpeg))
# Amélioration légère de la lisibilité avant OCR :
# - Conversion en niveaux de gris (le ticket est noir sur blanc)
# - Resize au 2/3 : accélère le traitement sans perte significative
# (1650px → 1100px, le texte reste lisible à ~40px de hauteur)
img_gray = img.convert("L").resize(
(img.width * 2 // 3, img.height * 2 // 3),
Image.LANCZOS,
)
try:
text = pytesseract.image_to_string(
img_gray,
lang="fra+eng",
config="--psm 6 --oem 3", # bloc de texte uniforme, LSTM
)
except pytesseract.TesseractError as e:
raise RuntimeError(
f"Erreur Tesseract lors de l'OCR : {e}\n"
"Vérifiez que Tesseract est installé et que le modèle 'fra' est disponible.\n"
"Voir README ou commentaires dans tickettracker/parsers/leclerc.py."
) from e
return text
# ---------------------------------------------------------------------------
# Parsing du texte OCR
# ---------------------------------------------------------------------------
# Regex pour une ligne article standard :
# NOM PRODUIT PRIX CODE_TVA
#
# Problèmes OCR observés sur le ticket Leclerc Clichy :
# - Un seul espace entre nom et prix (pas d'alignement de colonnes garanti)
# - Artefacts de séparateur de colonne dans le nom : ' | ' ou ' — '
# - Prix avec 3 décimales (ex: "10.460" au lieu de "10.40")
# - Code TVA avec 0 devant : "01" au lieu de "1"
# - Préfixe "* " pour les articles en promotion
#
# Stratégie : le code TVA (1 ou 2 chiffres max) est TOUJOURS le dernier token ;
# le prix (N.NN ou N,NN) est toujours l'avant-dernier. On greedy-matche .+ pour
# le nom et le moteur de regex backtrackera jusqu'à trouver le bon découpage.
_ITEM_RE = re.compile(
r"^(?:\* )?" # préfixe optionnel * (non capturé)
r"(?P<name>.+)" # nom du produit (greedy → backtrack)
r"\s+"
r"(?P<price>\d{1,3}[.,]\d{2,3})" # prix : 2 ou 3 décimales (OCR)
r"\s+"
r"(?P<tva>0?\d{1,2})" # code TVA : 1 ou 4, parfois "01"
r"\s*$"
)
# Regex pour la ligne de prix des articles en multi-unités :
# 2 X 3.48€ 6.96 1
_MULTI_RE = re.compile(
r"^\s*(?P<qty>\d+)\s*[Xx]\s*(?P<unit_price>\d+[.,]\d+)€?"
r"\s+(?P<total>\d+[.,]\d{2})\s+(?P<tva>\d{1,2})\s*$"
)
# Regex pour la ligne de total général
_TOTAL_RE = re.compile(r"Total\s+\d+\s+articles\s+(?P<total>\d+[.,]\d{2})")
# Regex pour les remises
_REDUCTION_RE = re.compile(r"Bon\s+r[ée]duction\s+(?P<amount>\d+[.,]\d{2})", re.IGNORECASE)
# Regex pour le paiement CB (montant final payé)
_CB_RE = re.compile(r"^[Cc][Bb]\s+(?P<amount>\d+[.,]\d{2})\s*$")
# Regex pour la date dans la ligne "Caisse XXX DD mois YYYY HH:MM"
_DATE_RE = re.compile(
r"Caisse\s+\S+\s+(\d{1,2})\s+(\w+)\s+(\d{4})",
re.IGNORECASE,
)
# Regex pour l'identifiant de caisse (ordre "Caisse 018-0003 ...")
_CAISSE_RE = re.compile(r"Caisse\s+(\S+)", re.IGNORECASE)
def _parse_price(s: str) -> float:
"""Convertit une chaîne de prix (virgule ou point) en float."""
return float(s.replace(",", "."))
def _parse_text(text: str) -> Receipt:
"""Analyse le texte OCR d'un ticket Leclerc.
Retourne un Receipt avec tous les articles, catégories, date et total.
Note OCR : Tesseract peut commettre des erreurs sur des caractères
ambigus (0 ↔ 6, | ↔ 1, etc.). Le total du ticket est retourné tel qu'OCR
l'a lu ; il peut différer légèrement si des caractères sont mal reconnus.
"""
lines = text.splitlines()
delivery_date = _extract_date(lines)
order_id = _extract_caisse_id(lines)
items = _extract_items(lines)
total = _extract_total(lines)
return Receipt(
store="leclerc",
date=delivery_date,
total=total,
items=items,
order_id=order_id,
)
def _extract_date(lines: list[str]) -> date:
"""Extrait la date depuis la ligne 'Caisse XXX DD mois AAAA HH:MM'."""
for line in lines:
m = _DATE_RE.search(line)
if m:
day = int(m.group(1))
month_str = m.group(2).lower().strip()
year = int(m.group(3))
month = _MOIS_FR.get(month_str)
if month:
return date(year, month, day)
raise ValueError(
"Date introuvable dans le ticket Leclerc. "
"Attendu : 'Caisse XXX DD mois YYYY' dans l'OCR."
)
def _extract_caisse_id(lines: list[str]) -> Optional[str]:
"""Extrait le numéro de caisse (ex: '018-0003')."""
for line in lines:
m = _CAISSE_RE.search(line)
if m:
return m.group(1)
return None
def _extract_total(lines: list[str]) -> float:
"""Extrait le montant final payé.
Préférence au montant CB (après remises). Sinon, le 'Total N articles'.
"""
cb_total: Optional[float] = None
subtotal: Optional[float] = None
for line in lines:
m_cb = _CB_RE.match(line.strip())
if m_cb:
cb_total = _parse_price(m_cb.group("amount"))
m_tot = _TOTAL_RE.search(line)
if m_tot:
subtotal = _parse_price(m_tot.group("total"))
total = cb_total if cb_total is not None else subtotal
if total is None:
raise ValueError(
"Montant total introuvable dans le ticket Leclerc. "
"La structure du ticket a peut-être changé."
)
return total
def _extract_items(lines: list[str]) -> list[Item]:
"""Extrait tous les articles du texte OCR ligne par ligne.
Gère :
- Articles standard (1 ligne : nom + prix + code TVA)
- Articles en multi-unités (nom sur une ligne, QTY × PRIX sur la suivante)
- Changements de catégorie (lignes débutant par >>)
- Arrêt à la ligne 'Total N articles'
"""
items: list[Item] = []
current_category: Optional[str] = None
pending_name: Optional[str] = None # nom en attente de la ligne de prix
# On cherche d'abord la ligne "TTC TVA" qui marque le début des articles
in_items = False
for line in lines:
raw = line.strip()
# Début de la section articles (insensible à la casse : "TTc TvA" possible)
# Note OCR : Tesseract Linux lit parfois "TIc" au lieu de "TTC"
# (confusion T↔I en 2e position, fréquente avec Tesseract 5.x)
if not in_items:
if re.search(r"T[TI]C\s+TVA", raw, re.IGNORECASE):
in_items = True
continue
# Fin de la section articles
if _TOTAL_RE.search(raw):
break
# Ligne vide → ignore
if not raw:
continue
# Changement de catégorie (>> CATEGORIE)
if raw.startswith(">>"):
category_raw = raw.lstrip(">").strip()
current_category = re.sub(r"[|_—]+", " ", category_raw).strip()
pending_name = None
continue
# Pré-nettoyage des artefacts OCR dans la ligne avant les regex :
# '|' et '—' sont des séparateurs de colonnes que Tesseract voit parfois
# comme des caractères dans le texte.
clean = re.sub(r"\s*[|—]\s*", " ", raw).strip()
# Ligne de prix pour un article multi-unités (2 X 3.48€ 6.96 1)
m_multi = _MULTI_RE.match(clean)
if m_multi and pending_name is not None:
qty = float(m_multi.group("qty"))
unit_price = _parse_price(m_multi.group("unit_price"))
total_price = _parse_price(m_multi.group("total"))
items.append(Item(
name=_clean_name(pending_name),
quantity=qty,
unit="pièce",
unit_price=unit_price,
total_price=total_price,
category=current_category,
))
pending_name = None
continue
# Ligne article standard (nom prix tva) — regex greedy depuis la droite
m_item = _ITEM_RE.match(clean)
if m_item:
if pending_name is not None:
pending_name = None
name = _clean_name(m_item.group("name"))
# Tronque à 2 décimales pour corriger les prix OCR comme "10.460"
price = round(_parse_price(m_item.group("price")), 2)
items.append(Item(
name=name,
quantity=1.0,
unit="pièce",
unit_price=price,
total_price=price,
category=current_category,
))
continue
# Ligne sans prix reconnaissable → début d'un article multi-unités
if len(clean) > 4 and not re.match(r"^[\d\s.,|*_-]+$", clean):
pending_name = clean.lstrip("* ").strip()
return items
def _clean_name(name: str) -> str:
"""Nettoie le nom d'un article des artefacts OCR courants.
Artefacts observés sur le ticket Leclerc Clichy :
- ' |' ou '| ' en fin de nom (artefact de séparateur de colonne)
- ' _' ou '_ ' (bruit d'image)
- espaces multiples
"""
# Retire les artefacts de colonnes en fin de chaîne
name = re.sub(r"[\s|_—]+$", "", name)
# Normalise les espaces internes
name = re.sub(r"\s{2,}", " ", name)
return name.strip()

View File

@@ -0,0 +1,514 @@
"""
Parser pour les mails de confirmation de commande Picnic.
Les mails Picnic arrivent en HTML sur remora@dilain.com
après un forward du mail de confirmation de livraison.
Le corps du mail est encodé en Quoted-Printable (QP), ce parser
le décode automatiquement avant d'analyser le HTML.
Structure HTML Picnic identifiée :
- Date de livraison dans le texte d'intro "livraison du JJ MOIS AAAA"
- Numéro de commande dans "Commande : XXX-XXX-XXXX"
- Articles : lignes HTML repérées par les images produit
(domaine storefront-prod.fr.picnicinternational.com)
Structure standard (7 colonnes directes) :
col 0 : quantité dans un badge borduré
col 2 : image produit (alt = nom du produit)
col 4 : nom (font-size 15px) + format (font-size 12px, vert #234314)
col 6 : prix splitté euros (font-size 26px) / centimes (17px)
Structure corrompue (< 7 colonnes) : fallback via balises <strong>
- Total dans une ligne labelisée <strong>Total</strong>
Corruption QP connue dans ce mail :
- Sauts de ligne doux (=\r\n) au milieu de séquences UTF-8 multi-octets
- Balises corrompues : "<= /td>", "<t d>", "78 <= /strong>", etc.
- Valeur d'attribut style qui déborde en contenu texte du td
Dépendances : beautifulsoup4, lxml
"""
import quopri
import re
from datetime import date
from typing import Optional
from bs4 import BeautifulSoup
from tickettracker.models.receipt import Item, Receipt
# Correspondance noms de mois français → numéro
_MOIS_FR = {
"janvier": 1, "février": 2, "mars": 3, "avril": 4,
"mai": 5, "juin": 6, "juillet": 7, "août": 8,
"septembre": 9, "octobre": 10, "novembre": 11, "décembre": 12,
}
def parse(html_content: str) -> Receipt:
"""Parse un mail HTML Picnic et retourne un ticket normalisé.
Args:
html_content: Contenu HTML du mail de confirmation Picnic,
potentiellement encodé en Quoted-Printable (format brut d'email).
Returns:
Receipt: Ticket de courses normalisé avec tous les articles.
Raises:
ValueError: Si la date ou le total sont introuvables dans le HTML.
"""
soup = _decode_and_parse(html_content)
full_text = soup.get_text(" ", strip=True)
delivery_date = _extract_date(full_text)
order_id = _extract_order_id(full_text)
items = _extract_items(soup)
total = _extract_total(full_text)
return Receipt(
store="picnic",
date=delivery_date,
total=total,
items=items,
order_id=order_id,
)
# ---------------------------------------------------------------------------
# Décodage
# ---------------------------------------------------------------------------
def _decode_and_parse(html_content: str) -> BeautifulSoup:
"""Décode le Quoted-Printable et retourne un objet BeautifulSoup.
Problème connu avec les mails Picnic : l'encodeur QP insère des sauts de
ligne doux (=\\r\\n) au milieu de séquences UTF-8 multi-octets, avec une
indentation HTML sur la ligne suivante. Par exemple :
f=C3=
=A9vrier 2026
devient après décodage QP naïf : f\\xc3⎵⎵⎵⎵\\xa9vrier (séquence cassée).
Solution : on supprime d'abord les sauts doux ET leur indentation éventuelle
(=\\r?\\n\\s*) avant de passer au décodeur QP standard, ce qui reconstitue
correctement =C3=A9 → é.
"""
raw_bytes = html_content.encode("ascii", errors="replace")
# Supprime les sauts de ligne doux QP et leur indentation HTML éventuelle
raw_clean = re.sub(rb"=\r?\n\s*", b"", raw_bytes)
decoded_bytes = quopri.decodestring(raw_clean)
decoded_html = decoded_bytes.decode("utf-8", errors="replace")
# Corrige les artefacts résiduels du double-encodage QP dans les attributs HTML.
#
# Certains encodeurs QP encodent aussi les '=' de la syntaxe HTML (attribut="valeur"),
# créant des séquences comme alt==3D"..." dans l'email brut. Notre décodeur QP
# résout le dernier =3D mais laisse le premier '=' → "alt=3D\"...\"" dans le HTML.
# lxml interprète alors "alt=3D" comme valeur non-quotée et perd le vrai contenu.
#
# De même, un saut QP tombe parfois au milieu du nom d'attribut "src" →
# "sr=c=3D\"...\"" → après décodage QP → "sr=c=\"...\"".
# lxml crée un attribut "sr" au lieu de "src".
#
# Corrections :
# "\w+=3D\"" → "\w+=\"" (ex: alt=3D" → alt=", src=3D" → src=")
# "sr=c=\"" → "src=\"" (reconstruction du nom d'attribut corrompu)
decoded_html = re.sub(r'(\w+=)3D"', r'\1"', decoded_html)
decoded_html = re.sub(r'\bsr=c="', 'src="', decoded_html)
return BeautifulSoup(decoded_html, "lxml")
# ---------------------------------------------------------------------------
# Date
# ---------------------------------------------------------------------------
def _extract_date(text: str) -> date:
"""Extrait la date de livraison depuis le texte du mail.
Le mail contient une phrase du type :
"Voici le reçu de votre livraison du samedi 14 février 2026."
"""
m = re.search(
r"livraison du\s+\w+\s+(\d{1,2})\s+(\w+)\s+(\d{4})",
text,
re.IGNORECASE,
)
if not m:
raise ValueError(
"Date de livraison introuvable dans le mail Picnic. "
"Attendu : 'livraison du <jour_semaine> <JJ> <mois> <AAAA>'"
)
day = int(m.group(1))
month_str = m.group(2).lower().strip()
year = int(m.group(3))
month = _MOIS_FR.get(month_str)
if month is None:
raise ValueError(
f"Mois '{month_str}' non reconnu. "
f"Mois attendus : {', '.join(_MOIS_FR)}"
)
return date(year, month, day)
# ---------------------------------------------------------------------------
# Numéro de commande
# ---------------------------------------------------------------------------
def _extract_order_id(text: str) -> Optional[str]:
"""Extrait le numéro de commande Picnic (optionnel)."""
m = re.search(r"Commande\s*:\s*([\d\-]+)", text)
return m.group(1) if m else None
# ---------------------------------------------------------------------------
# Articles
# ---------------------------------------------------------------------------
def _extract_items(soup: BeautifulSoup) -> list[Item]:
"""Extrait la liste des articles depuis le HTML du mail.
Pour chaque image produit, tente d'abord la lecture en ligne 7 colonnes
(structure standard), puis utilise un fallback basé sur les balises
<strong> pour les lignes dont le HTML est corrompu par l'encodage QP.
"""
items = []
seen_rows: set[int] = set() # Évite de traiter le même conteneur deux fois
# La corruption QP insère des '=' dans les URLs de src (ex: "picnici=nternational.com").
# On les neutralise avant le test pour trouver toutes les images produit.
product_imgs = soup.find_all(
"img",
src=lambda s: s and "picnicinternational.com" in s.replace("=", ""),
)
for img in product_imgs:
item = _try_7col(img, seen_rows)
if item is None:
item = _try_fallback(img, seen_rows)
if item is not None:
items.append(item)
return items
def _try_7col(img, seen_rows: set[int]) -> Optional[Item]:
"""Tente d'extraire un article depuis une ligne à 7 colonnes directes.
Structure attendue :
td[0] : badge quantité | td[2] : image | td[4] : nom+unité | td[6] : prix
"""
# Remonte jusqu'à trouver un <tr> avec exactement 7 <td> directs
tds = []
node = img.parent
while node:
if node.name == "tr":
candidate = node.find_all("td", recursive=False)
if len(candidate) == 7:
tds = candidate
break
node = node.parent
if len(tds) != 7:
return None
row_id = id(tds[0])
if row_id in seen_rows:
return None
seen_rows.add(row_id)
# --- Quantité (colonne 0) ---
qty_text = tds[0].get_text(strip=True)
try:
quantity = float(qty_text)
except ValueError:
return None
# --- Nom et format/unité (colonne 4) ---
# Approche positionnelle : 1er td feuille non vide = nom, 2e = unité.
# On utilise uniquement les tds feuilles (sans td enfant) pour éviter que
# le td parent récapitulatif ne double le texte.
inner_texts = [
" ".join(inner_td.get_text().split())
for inner_td in tds[4].find_all("td")
if not inner_td.find("td") and inner_td.get_text(strip=True)
]
name = inner_texts[0] if inner_texts else img.get("alt", "Inconnu").strip()
unit = inner_texts[1] if len(inner_texts) > 1 else "pièce"
name = _clean_artifact(name)
unit = _clean_unit(unit)
# --- Prix total de la ligne (colonne 6) ---
# Les balises <strong> sont plus robustes que get_text() pour éviter la
# contamination par les valeurs CSS corrompues (ex: "#234314" dans du texte).
total_price = _parse_price_from_cell(tds[6])
if total_price is None:
return None
unit_price = round(total_price / quantity, 4) if quantity > 0 else total_price
return Item(
name=name,
quantity=quantity,
unit=unit,
unit_price=unit_price,
total_price=total_price,
)
def _try_fallback(img, seen_rows: set[int]) -> Optional[Item]:
"""Fallback pour les articles dont la ligne HTML est corrompue (< 7 td directs).
Deux tentatives successives :
1. Tr avec >= 3 td directs ET >= 3 strongs chiffres (quantité + prix en tête)
→ utilisé pour les items dont le badge qty est dans un <strong>
2. Tr avec >= 1 td ET exactement 2 strongs chiffres (badge qty absent/corrompu)
→ utilisé quand seul le prix est dans des <strong> (ex: Jardin Bio, Alfapac)
"""
# --- Tentative 1 : badge qty dans un <strong> (cas standard) ---
item_tr = None
node = img.parent
while node and node.name != "body":
if node.name == "tr":
tds = node.find_all("td", recursive=False)
if len(tds) >= 3 and len(_get_digit_strongs(node)) >= 3:
item_tr = node
break
node = node.parent
if item_tr is not None:
row_id = id(item_tr)
if row_id not in seen_rows:
seen_rows.add(row_id)
digit_strongs = _get_digit_strongs(item_tr)
try:
quantity = float(digit_strongs[0])
except (ValueError, IndexError):
return None
# L'article est le PREMIER dans le tr → ses euros/centimes sont à [1] et [2].
# (Contrairement à _parse_price_from_cell qui utilise [-2][-1] car dans une
# cellule dédiée, le dernier prix affiché est le prix réel après réduction.)
try:
total_price = float(
f"{digit_strongs[1]}.{digit_strongs[2].zfill(2)}"
)
except (ValueError, IndexError):
return None
name, unit = _extract_name_unit_fallback(img, item_tr)
unit_price = round(total_price / quantity, 4) if quantity > 0 else total_price
return Item(
name=name, quantity=quantity, unit=unit,
unit_price=unit_price, total_price=total_price,
)
# --- Tentative 2 : badge qty corrompu, seul le prix est dans des <strong> ---
# Ex: Jardin Bio, Alfapac dont le badge qty n'est pas dans un <strong>.
item_tr = None
node = img.parent
while node and node.name != "body":
if node.name == "tr":
ds = _get_digit_strongs(node)
if len(ds) == 2:
item_tr = node
break
node = node.parent
if item_tr is None:
return None
row_id = id(item_tr)
if row_id in seen_rows:
return None
seen_rows.add(row_id)
digit_strongs = _get_digit_strongs(item_tr)
# Seulement 2 strongs → ce sont euros et centimes du prix (pas de badge qty strong)
try:
total_price = float(f"{digit_strongs[0]}.{digit_strongs[1].zfill(2)}")
except (ValueError, IndexError):
return None
# Quantité : extraire le premier chiffre du texte brut du tr
text = item_tr.get_text(strip=True)
qty_match = re.match(r"(\d+)", text)
quantity = float(qty_match.group(1)) if qty_match else 1.0
name, unit = _extract_name_unit_fallback(img, item_tr)
unit_price = round(total_price / quantity, 4) if quantity > 0 else total_price
return Item(
name=name, quantity=quantity, unit=unit,
unit_price=unit_price, total_price=total_price,
)
def _extract_name_unit_fallback(img, item_tr) -> tuple[str, str]:
"""Extrait nom et unité pour un article en structure corrompue.
Cherche d'abord dans le td ancêtre de l'image au sein de item_tr
(cas 3-col : tout est collapsé dans td[0] à cause du QP), puis dans
les td suivants (cas 5-col ou 6-col : nom+unité dans un td frère).
"""
name = img.get("alt", "Inconnu").strip()
unit = "pièce"
# Trouve le td direct de item_tr qui contient l'image
img_container_td = None
node = img.parent
while node and node is not item_tr:
if node.name == "td" and node.parent is item_tr:
img_container_td = node
break
node = node.parent
if img_container_td is None:
return _clean_artifact(name), unit
def _significant_texts(td):
"""Textes des tds feuilles significatifs (longueur > 3, non numérique).
N'utilise que les tds feuilles (sans td enfant) pour éviter que
les tds parents récapitulatifs ne doublent le nom/unité.
Exclut les artefacts QP du type '= td>' qui commencent par '=' ou '<'.
"""
return [
" ".join(t.get_text().split())
for t in td.find_all("td")
if not t.find("td") # td feuille uniquement
and len(t.get_text(strip=True)) > 3
and not t.get_text(strip=True).isdigit()
and not t.get_text(strip=True).startswith(("=", "<")) # exclut artefacts
]
# Cas A : tout est dans le td de l'image (structure 3-col collapsed)
inner = _significant_texts(img_container_td)
if inner:
name = inner[0]
unit = inner[1] if len(inner) > 1 else "pièce"
else:
# Cas B : nom+unité dans un td frère après l'image (5-col ou 6-col)
sibling = img_container_td.next_sibling
while sibling:
if hasattr(sibling, "find_all"):
inner = _significant_texts(sibling)
if inner:
name = inner[0]
unit = inner[1] if len(inner) > 1 else "pièce"
break
sibling = sibling.next_sibling
return _clean_artifact(name), _clean_unit(unit)
# ---------------------------------------------------------------------------
# Helpers prix et nettoyage
# ---------------------------------------------------------------------------
def _get_digit_strongs(node) -> list[str]:
"""Retourne les valeurs des <strong> ne contenant que des chiffres.
Prend le premier token whitespace de chaque <strong> pour ignorer les
artefacts QP du type "78 <= /strong>" → premier token "78" (digit).
"""
result = []
for s in node.find_all("strong"):
text = s.get_text(strip=True)
first_token = text.split()[0] if text.split() else ""
if first_token.isdigit():
result.append(first_token)
return result
def _parse_price_from_cell(td) -> Optional[float]:
"""Extrait le prix depuis une cellule de prix Picnic.
Picnic split les euros (grande police) et les centimes (petite police)
dans des éléments <strong> séparés. L'approche via <strong> est préférée
à get_text() pour éviter la contamination par des valeurs CSS corrompues.
On utilise les DEUX DERNIERS chiffres strongs car un article soldé affiche
deux prix : barré (original) puis réel. Ex: '3 05 . 2 74 .' → prix=€2.74.
"""
digit_strongs = _get_digit_strongs(td)
if len(digit_strongs) >= 2:
euros = digit_strongs[-2] # avant-dernier = euros du prix réel
centimes = digit_strongs[-1].zfill(2) # dernier = centimes
return float(f"{euros}.{centimes}")
# Fallback : extraction textuelle brute
return _parse_price_from_text(td.get_text())
def _parse_price_from_text(raw_text: str) -> Optional[float]:
"""Parse un montant Picnic depuis le texte brut d'une cellule de prix (fallback).
Stratégie : on extrait tous les chiffres consécutifs, et on interprète
les 2 derniers comme les centimes, le reste comme les euros.
Exemples :
'358.''358' → 3€58 = 3.58
'065.''065' → 0€65 = 0.65
"""
digits = re.sub(r"[^0-9]", "", raw_text)
if len(digits) < 3:
return None
return float(f"{digits[:-2]}.{digits[-2:]}")
def _clean_artifact(text: str) -> str:
"""Supprime les artefacts HTML/QP du texte.
Trois types d'artefacts observés dans ce mail Picnic :
- '<= /td>' : scission sur '<=' → partie avant
- ' = tr>' : scission sur ' = <lettre>' avec espace avant
- 'Soda zéro= td>' : scission sur '= <lettre>' sans espace avant
"""
text = text.split("<=")[0]
text = re.split(r" = [a-z/]", text)[0]
text = re.split(r"= [a-z]", text)[0] # ex: "zéro= td>" → "zéro"
return text.strip()
def _clean_unit(unit: str) -> str:
"""Nettoie l'unité ; retourne 'pièce' si le contenu ressemble à du CSS.
La corruption QP peut faire déborder des valeurs d'attribut style
dans le contenu texte d'un td (ex: '; color: #234314; padding: ...').
"""
if unit.startswith(";") or "font-" in unit or "color:" in unit:
return "pièce"
return _clean_artifact(unit)
# ---------------------------------------------------------------------------
# Total
# ---------------------------------------------------------------------------
def _extract_total(full_text: str) -> float:
"""Extrait le montant total payé depuis le texte décodé du mail.
Picnic affiche le prix splitté : les euros (grand) et les centimes (petit)
sont dans des éléments HTML séparés, ce qui donne dans le texte brut :
"Total Payé avec Paypal 95 10 ."
On cherche "Total" (majuscule, pour ne pas capturer "Sous-total")
puis les deux premiers groupes de chiffres qui suivent.
Note : on utilise le texte plutôt que le DOM car lxml redécoupe les
tables imbriquées de prix Picnic (euros/centimes dans des <td> sœurs),
rendant la navigation par arbre peu fiable.
"""
# "Total" majuscule pour exclure "Sous-total" (lowercase 't' en français)
m = re.search(r"Total[^0-9]{0,60}?(\d+)\s+(\d+)\s*\.", full_text)
if not m:
raise ValueError(
"Montant 'Total' introuvable dans le texte du mail Picnic. "
"La structure HTML a peut-être changé."
)
euros = m.group(1)
centimes = m.group(2).zfill(2)
return float(f"{euros}.{centimes}")

106
tickettracker/pipeline.py Normal file
View File

@@ -0,0 +1,106 @@
"""
Pipeline d'import : du fichier brut à la base de données.
Ce module coordonne les parsers et la couche DB.
Il choisit le bon parser selon la source, vérifie les doublons,
puis délègue l'insertion à repository.py.
Usage :
from tickettracker.pipeline import import_receipt
inserted = import_receipt("samples/picnic_sample.html", source="picnic")
"""
import logging
from pathlib import Path
from tickettracker.db import schema, repository
logger = logging.getLogger(__name__)
# Parsers disponibles — importés à la demande pour éviter de charger
# pytesseract/pdfplumber si on n'importe que du Picnic.
_SOURCES = ("picnic", "leclerc")
def import_receipt(
file_path: str | Path,
source: str,
db_path: str | Path = schema.DEFAULT_DB_PATH,
) -> bool:
"""Parse un fichier et l'importe dans la base si non dupliqué.
Étapes :
1. Vérifie que la source est connue et que le fichier existe
2. Appelle le bon parser selon `source`
3. Vérifie la déduplication via (store, date, total)
4. Si nouveau : insère le ticket et ses articles en base
5. Retourne True si inséré, False si déjà présent
Args:
file_path: Chemin vers le fichier à importer.
(.html pour Picnic, .pdf pour Leclerc)
source: 'picnic' ou 'leclerc'.
db_path: Chemin vers la base SQLite (créé si absent).
Returns:
True si le ticket a été inséré, False s'il était déjà présent.
Raises:
ValueError: Si `source` est inconnu.
FileNotFoundError: Si `file_path` n'existe pas.
"""
if source not in _SOURCES:
raise ValueError(
f"Source inconnue : '{source}'. Valeurs acceptées : {_SOURCES}"
)
file_path = Path(file_path)
if not file_path.exists():
raise FileNotFoundError(f"Fichier introuvable : {file_path}")
# --- Parsing ---
receipt = _parse(file_path, source)
# --- Initialisation de la base (idempotent) ---
schema.init_db(db_path)
# --- Déduplication ---
with schema.get_connection(db_path) as conn:
date_iso = receipt.date.isoformat()
if repository.receipt_exists(conn, receipt.store, date_iso, receipt.total):
logger.info(
"Ticket déjà présent (store=%s date=%s total=%.2f) — import ignoré.",
receipt.store,
date_iso,
receipt.total,
)
return False
repository.insert_receipt(conn, receipt)
logger.info(
"Ticket importé : store=%s date=%s total=%.2f (%d articles).",
receipt.store,
date_iso,
receipt.total,
len(receipt.items),
)
return True
def _parse(file_path: Path, source: str):
"""Sélectionne et appelle le parser approprié.
Les imports sont retardés pour ne charger les dépendances lourdes
(pytesseract, pdfplumber) que si nécessaire.
"""
if source == "picnic":
from tickettracker.parsers import picnic
html_content = file_path.read_text(encoding="utf-8", errors="replace")
return picnic.parse(html_content)
if source == "leclerc":
from tickettracker.parsers import leclerc
return leclerc.parse(str(file_path))
# Jamais atteint grâce à la validation en amont, mais satisfait mypy
raise ValueError(f"Source inconnue : '{source}'")