""" Pipeline d'import : du fichier brut à la base de données. Ce module coordonne les parsers et la couche DB. Il choisit le bon parser selon la source, vérifie les doublons, puis délègue l'insertion à repository.py. Usage : from tickettracker.pipeline import import_receipt inserted = import_receipt("samples/picnic_sample.html", source="picnic") """ import email import logging from email import policy from pathlib import Path from tickettracker.db import schema, repository logger = logging.getLogger(__name__) # Parsers disponibles — importés à la demande pour éviter de charger # pytesseract/pdfplumber si on n'importe que du Picnic. _SOURCES = ("picnic", "leclerc") def import_receipt( file_path: str | Path, source: str, db_path: str | Path = schema.DEFAULT_DB_PATH, ) -> bool: """Parse un fichier et l'importe dans la base si non dupliqué. Étapes : 1. Vérifie que la source est connue et que le fichier existe 2. Appelle le bon parser selon `source` 3. Vérifie la déduplication via (store, date, total) 4. Si nouveau : insère le ticket et ses articles en base 5. Retourne True si inséré, False si déjà présent Args: file_path: Chemin vers le fichier à importer. (.html pour Picnic, .pdf pour Leclerc) source: 'picnic' ou 'leclerc'. db_path: Chemin vers la base SQLite (créé si absent). Returns: True si le ticket a été inséré, False s'il était déjà présent. Raises: ValueError: Si `source` est inconnu. FileNotFoundError: Si `file_path` n'existe pas. """ if source not in _SOURCES: raise ValueError( f"Source inconnue : '{source}'. Valeurs acceptées : {_SOURCES}" ) file_path = Path(file_path) if not file_path.exists(): raise FileNotFoundError(f"Fichier introuvable : {file_path}") # --- Parsing --- receipt = _parse(file_path, source) # --- Initialisation de la base (idempotent) --- schema.init_db(db_path) # --- Déduplication --- with schema.get_connection(db_path) as conn: date_iso = receipt.date.isoformat() if repository.receipt_exists(conn, receipt.store, date_iso, receipt.total): logger.info( "Ticket déjà présent (store=%s date=%s total=%.2f) — import ignoré.", receipt.store, date_iso, receipt.total, ) return False repository.insert_receipt(conn, receipt) logger.info( "Ticket importé : store=%s date=%s total=%.2f (%d articles).", receipt.store, date_iso, receipt.total, len(receipt.items), ) return True def _parse(file_path: Path, source: str): """Sélectionne et appelle le parser approprié. Les imports sont retardés pour ne charger les dépendances lourdes (pytesseract, pdfplumber) que si nécessaire. """ if source == "picnic": from tickettracker.parsers import picnic if file_path.suffix.lower() == ".eml": html_content = _eml_to_html(file_path) else: html_content = file_path.read_text(encoding="utf-8", errors="replace") return picnic.parse(html_content) if source == "leclerc": from tickettracker.parsers import leclerc return leclerc.parse(str(file_path)) # Jamais atteint grâce à la validation en amont, mais satisfait mypy raise ValueError(f"Source inconnue : '{source}'") def _eml_to_html(file_path: Path) -> str: """Extrait la partie HTML d'un fichier .eml (email de confirmation Picnic). Lit le .eml avec le module email stdlib, parcourt les parties MIME et retourne le contenu de la première partie text/html trouvée. Args: file_path: Chemin vers le fichier .eml. Returns: Contenu HTML sous forme de chaîne. Raises: ValueError: Si aucune partie HTML n'est trouvée dans le .eml. """ raw = file_path.read_bytes() msg = email.message_from_bytes(raw, policy=policy.default) for part in msg.walk(): if part.get_content_type() == "text/html": return part.get_content() raise ValueError( f"Aucune partie HTML trouvée dans le fichier .eml : {file_path.name}" )