Files
TicketTracker/tickettracker/pipeline.py

149 lines
4.9 KiB
Python
Raw Normal View History

"""
Pipeline d'import : du fichier brut à la base de données.
Ce module coordonne les parsers et la couche DB.
Il choisit le bon parser selon la source, vérifie les doublons,
puis délègue l'insertion à repository.py.
Usage :
from tickettracker.pipeline import import_receipt
inserted = import_receipt("samples/picnic_sample.html", source="picnic")
"""
import email
import logging
from email import policy
from pathlib import Path
from tickettracker.db import schema, repository
logger = logging.getLogger(__name__)
# Parsers disponibles — importés à la demande pour éviter de charger
# pytesseract/pdfplumber si on n'importe que du Picnic.
_SOURCES = ("picnic", "leclerc")
def import_receipt(
file_path: str | Path,
source: str,
db_path: str | Path = schema.DEFAULT_DB_PATH,
) -> bool:
"""Parse un fichier et l'importe dans la base si non dupliqué.
Étapes :
1. Vérifie que la source est connue et que le fichier existe
2. Appelle le bon parser selon `source`
3. Vérifie la déduplication via (store, date, total)
4. Si nouveau : insère le ticket et ses articles en base
5. Retourne True si inséré, False si déjà présent
Args:
file_path: Chemin vers le fichier à importer.
(.html pour Picnic, .pdf pour Leclerc)
source: 'picnic' ou 'leclerc'.
db_path: Chemin vers la base SQLite (créé si absent).
Returns:
True si le ticket a été inséré, False s'il était déjà présent.
Raises:
ValueError: Si `source` est inconnu.
FileNotFoundError: Si `file_path` n'existe pas.
"""
if source not in _SOURCES:
raise ValueError(
f"Source inconnue : '{source}'. Valeurs acceptées : {_SOURCES}"
)
file_path = Path(file_path)
if not file_path.exists():
raise FileNotFoundError(f"Fichier introuvable : {file_path}")
# --- Parsing ---
receipt = _parse(file_path, source)
# --- Initialisation de la base (idempotent) ---
schema.init_db(db_path)
# --- Déduplication ---
with schema.get_connection(db_path) as conn:
date_iso = receipt.date.isoformat()
if repository.receipt_exists(conn, receipt.store, date_iso, receipt.total):
logger.info(
"Ticket déjà présent (store=%s date=%s total=%.2f) — import ignoré.",
receipt.store,
date_iso,
receipt.total,
)
return False
repository.insert_receipt(conn, receipt)
logger.info(
"Ticket importé : store=%s date=%s total=%.2f (%d articles).",
receipt.store,
date_iso,
receipt.total,
len(receipt.items),
)
return True
def _parse(file_path: Path, source: str):
"""Sélectionne et appelle le parser approprié.
Les imports sont retardés pour ne charger les dépendances lourdes
(pytesseract, pdfplumber) que si nécessaire.
"""
if source == "picnic":
from tickettracker.parsers import picnic
if file_path.suffix.lower() == ".eml":
html_content = _eml_to_html(file_path)
else:
html_content = file_path.read_text(encoding="utf-8", errors="replace")
return picnic.parse(html_content)
if source == "leclerc":
from tickettracker.parsers import leclerc
return leclerc.parse(str(file_path))
# Jamais atteint grâce à la validation en amont, mais satisfait mypy
raise ValueError(f"Source inconnue : '{source}'")
def _eml_to_html(file_path: Path) -> str:
"""Extrait la partie HTML d'un fichier .eml (email de confirmation Picnic).
Retourne le corps HTML brut, encore encodé en Quoted-Printable (QP),
exactement comme si on lisait un fichier .html sauvegardé depuis le mail.
Le parser Picnic (picnic._decode_and_parse) se charge lui-même du décodage QP.
Pourquoi ne pas utiliser policy.default / get_content() ?
Parce que cette API décode déjà les accents (=C3=A9 é), ce qui empêche
picnic.py de les retrouver via sa propre pipeline QP UTF-8.
Args:
file_path: Chemin vers le fichier .eml.
Returns:
Corps HTML brut (QP-encodé) sous forme de chaîne ASCII.
Raises:
ValueError: Si aucune partie HTML n'est trouvée dans le .eml.
"""
raw = file_path.read_bytes()
# On utilise l'ancienne API (sans policy.default) pour garder le payload brut
msg = email.message_from_bytes(raw)
for part in msg.walk():
if part.get_content_type() == "text/html":
# decode=False → payload brut, encore QP-encodé, en str ASCII
payload = part.get_payload(decode=False)
if isinstance(payload, bytes):
return payload.decode("ascii", errors="replace")
return payload # déjà une str
raise ValueError(
f"Aucune partie HTML trouvée dans le fichier .eml : {file_path.name}"
)