diff --git a/.claude/.fuse_hidden0114a7b7000557be b/.claude/.fuse_hidden0114a7b7000557be deleted file mode 100644 index c8e5fd1..0000000 --- a/.claude/.fuse_hidden0114a7b7000557be +++ /dev/null @@ -1,24 +0,0 @@ -{ - "permissions": { - "allow": [ - "Bash(.venv/Scripts/pytest tests/ -v)", - "Bash(git add:*)", - "Bash(git commit:*)", - "Bash(.venv/Scripts/python:*)", - "Bash(cat:*)", - "Bash(python:*)", - "Bash(.venv/Scripts/python.exe:*)", - "Bash(PYTHONIOENCODING=utf-8 .venv/Scripts/python.exe:*)", - "Bash(PYTHONIOENCODING=utf-8 python:*)", - "Bash(tesseract:*)", - "Bash(winget install:*)", - "Bash(curl:*)", - "Bash(TESSDATA=\"/c/Program Files/Tesseract-OCR/tessdata\")", - "Bash(TESSDATA_PREFIX=/c/code/TicketTracker/tessdata python:*)", - "Bash(ls:*)", - "Bash(.venv/Scripts/pip install:*)", - "Bash(Marque)", - "Bash(Quantité\":*)" - ] - } -} diff --git a/tests/test_watcher.py b/tests/test_watcher.py new file mode 100644 index 0000000..d61a550 --- /dev/null +++ b/tests/test_watcher.py @@ -0,0 +1,241 @@ +""" +Tests du watch folder (tickettracker/watcher.py). + +Stratégie : + - Utilise tmp_path pour les dossiers inbox/processed/failed + - Mocke pipeline.import_receipt pour contrôler le résultat sans parser de vrais fichiers + - Teste _process_file directement (évite la dépendance à watchdog / inotify) +""" + +from pathlib import Path +from unittest.mock import patch + +import pytest + +from tickettracker.watcher import _process_file, ReceiptHandler + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + +@pytest.fixture +def dirs(tmp_path): + """Structure de dossiers inbox/picnic, inbox/leclerc, processed, failed.""" + inbox = tmp_path / "inbox" + (inbox / "picnic").mkdir(parents=True) + (inbox / "leclerc").mkdir(parents=True) + processed = tmp_path / "processed" + processed.mkdir() + failed = tmp_path / "failed" + failed.mkdir() + return { + "inbox": inbox, + "processed": processed, + "failed": failed, + "tmp_path": tmp_path, + } + + +@pytest.fixture +def sample_file(dirs): + """Crée un faux fichier HTML Picnic dans inbox/picnic/.""" + f = dirs["inbox"] / "picnic" / "ticket_picnic.html" + f.write_text("Picnic", encoding="utf-8") + return f + + +@pytest.fixture +def sample_leclerc(dirs): + """Crée un faux fichier PDF Leclerc dans inbox/leclerc/.""" + f = dirs["inbox"] / "leclerc" / "ticket_leclerc.pdf" + f.write_bytes(b"%PDF-1.4 fake") + return f + + +# --------------------------------------------------------------------------- +# Tests _process_file — import réussi +# --------------------------------------------------------------------------- + +def test_process_file_success_moves_to_processed(dirs, sample_file): + """Import réussi : le fichier est déplacé dans processed/.""" + with patch("tickettracker.watcher.pipeline.import_receipt", return_value=True): + _process_file( + sample_file, "picnic", + dirs["tmp_path"] / "test.db", + dirs["processed"], dirs["failed"], + ) + # Le fichier original ne doit plus être dans inbox/ + assert not sample_file.exists() + # Un fichier doit être présent dans processed/ + processed_files = list(dirs["processed"].iterdir()) + assert len(processed_files) == 1 + + +def test_process_file_success_naming_convention(dirs, sample_file): + """Le fichier déplacé suit le pattern {source}_{date}_{nom_original}.""" + with patch("tickettracker.watcher.pipeline.import_receipt", return_value=True): + _process_file( + sample_file, "picnic", + dirs["tmp_path"] / "test.db", + dirs["processed"], dirs["failed"], + ) + processed_files = list(dirs["processed"].iterdir()) + name = processed_files[0].name + assert name.startswith("picnic_") + assert name.endswith("ticket_picnic.html") + + +def test_process_file_duplicate_moves_to_processed(dirs, sample_file): + """Doublon (import_receipt retourne False) : fichier dans processed/ quand même.""" + with patch("tickettracker.watcher.pipeline.import_receipt", return_value=False): + _process_file( + sample_file, "picnic", + dirs["tmp_path"] / "test.db", + dirs["processed"], dirs["failed"], + ) + assert not sample_file.exists() + processed_files = list(dirs["processed"].iterdir()) + assert len(processed_files) == 1 + + +def test_process_file_error_moves_to_failed(dirs, sample_file): + """Erreur pendant l'import : le fichier est déplacé dans failed/.""" + with patch( + "tickettracker.watcher.pipeline.import_receipt", + side_effect=ValueError("format invalide"), + ): + _process_file( + sample_file, "picnic", + dirs["tmp_path"] / "test.db", + dirs["processed"], dirs["failed"], + ) + assert not sample_file.exists() + failed_files = [f for f in dirs["failed"].iterdir() if not f.name.endswith(".log")] + assert len(failed_files) == 1 + + +def test_process_file_error_creates_log(dirs, sample_file): + """Erreur : un fichier .log est créé dans failed/ avec le message d'erreur.""" + with patch( + "tickettracker.watcher.pipeline.import_receipt", + side_effect=ValueError("format invalide"), + ): + _process_file( + sample_file, "picnic", + dirs["tmp_path"] / "test.db", + dirs["processed"], dirs["failed"], + ) + log_files = [f for f in dirs["failed"].iterdir() if f.name.endswith(".log")] + assert len(log_files) == 1 + log_content = log_files[0].read_text(encoding="utf-8") + assert "format invalide" in log_content + + +def test_process_file_nothing_in_failed_on_success(dirs, sample_file): + """Import réussi : aucun fichier dans failed/.""" + with patch("tickettracker.watcher.pipeline.import_receipt", return_value=True): + _process_file( + sample_file, "picnic", + dirs["tmp_path"] / "test.db", + dirs["processed"], dirs["failed"], + ) + assert list(dirs["failed"].iterdir()) == [] + + +def test_process_file_leclerc_source(dirs, sample_leclerc): + """Source leclerc : le fichier déplacé commence par 'leclerc_'.""" + with patch("tickettracker.watcher.pipeline.import_receipt", return_value=True): + _process_file( + sample_leclerc, "leclerc", + dirs["tmp_path"] / "test.db", + dirs["processed"], dirs["failed"], + ) + processed_files = list(dirs["processed"].iterdir()) + assert processed_files[0].name.startswith("leclerc_") + + +# --------------------------------------------------------------------------- +# Tests ReceiptHandler +# --------------------------------------------------------------------------- + +def test_handler_detects_source_from_parent_folder(dirs): + """ReceiptHandler détecte la source depuis le nom du sous-dossier.""" + handler = ReceiptHandler( + db_path=dirs["tmp_path"] / "test.db", + inbox_path=dirs["inbox"], + ) + # Le sous-dossier parent du fichier donne la source + assert handler.processed_dir == dirs["processed"] + assert handler.failed_dir == dirs["failed"] + + +def test_handler_ignores_unknown_subfolder(dirs): + """Un fichier dans un sous-dossier inconnu (ni picnic ni leclerc) est ignoré.""" + unknown_dir = dirs["inbox"] / "autre" + unknown_dir.mkdir() + f = unknown_dir / "fichier.txt" + f.write_text("test") + + handler = ReceiptHandler( + db_path=dirs["tmp_path"] / "test.db", + inbox_path=dirs["inbox"], + ) + + # Simuler un événement de création de fichier + class FakeEvent: + is_directory = False + src_path = str(f) + + with patch("tickettracker.watcher._process_file") as mock_process: + handler.on_created(FakeEvent()) + mock_process.assert_not_called() + + +def test_handler_ignores_directory_events(dirs): + """Un événement de création de répertoire est ignoré.""" + handler = ReceiptHandler( + db_path=dirs["tmp_path"] / "test.db", + inbox_path=dirs["inbox"], + ) + + class FakeEvent: + is_directory = True + src_path = str(dirs["inbox"] / "picnic" / "subdir") + + with patch("tickettracker.watcher._process_file") as mock_process: + handler.on_created(FakeEvent()) + mock_process.assert_not_called() + + +# --------------------------------------------------------------------------- +# Tests création des dossiers +# --------------------------------------------------------------------------- + +def test_process_file_creates_processed_dir_if_missing(dirs, sample_file): + """_process_file crée processed/ s'il est absent.""" + dirs["processed"].rmdir() # supprime le dossier + assert not dirs["processed"].exists() + with patch("tickettracker.watcher.pipeline.import_receipt", return_value=True): + _process_file( + sample_file, "picnic", + dirs["tmp_path"] / "test.db", + dirs["processed"], dirs["failed"], + ) + assert dirs["processed"].exists() + + +def test_process_file_creates_failed_dir_if_missing(dirs, sample_file): + """_process_file crée failed/ s'il est absent.""" + dirs["failed"].rmdir() + assert not dirs["failed"].exists() + with patch( + "tickettracker.watcher.pipeline.import_receipt", + side_effect=RuntimeError("boom"), + ): + _process_file( + sample_file, "picnic", + dirs["tmp_path"] / "test.db", + dirs["processed"], dirs["failed"], + ) + assert dirs["failed"].exists() diff --git a/tickettracker/watcher.py b/tickettracker/watcher.py new file mode 100644 index 0000000..06bab39 --- /dev/null +++ b/tickettracker/watcher.py @@ -0,0 +1,123 @@ +""" +Watch folder pour TicketTracker. + +Surveille les dossiers inbox/picnic/ et inbox/leclerc/ et importe automatiquement +tout nouveau fichier déposé. Les fichiers traités sont déplacés vers : + + processed/{source}_{YYYY-MM-DD}_{nom_original} — import OK ou doublon + failed/{nom_original} — erreur + fichier .log créé + +Usage CLI : + python -m tickettracker.cli watch [--inbox PATH] [--db PATH] + +Interrompre avec Ctrl+C. +""" + +import logging +import time +from datetime import datetime +from pathlib import Path + +from watchdog.events import FileSystemEventHandler +from watchdog.observers import Observer + +from tickettracker import pipeline + +logger = logging.getLogger(__name__) + + +class ReceiptHandler(FileSystemEventHandler): + """Gestionnaire d'événements watchdog pour les dossiers inbox/.""" + + def __init__(self, db_path: Path, inbox_path: Path): + self.db_path = db_path + self.inbox_path = inbox_path + # processed/ et failed/ sont au même niveau qu'inbox/ + self.processed_dir = inbox_path.parent / "processed" + self.failed_dir = inbox_path.parent / "failed" + + def on_created(self, event): + """Appelé quand un fichier arrive dans inbox/picnic/ ou inbox/leclerc/.""" + if event.is_directory: + return + file_path = Path(event.src_path) + # La source est déduite du nom du sous-dossier parent (picnic ou leclerc) + source = file_path.parent.name + if source not in ("picnic", "leclerc"): + logger.warning("Fichier ignoré (dossier inconnu) : %s", file_path) + return + _process_file(file_path, source, self.db_path, + self.processed_dir, self.failed_dir) + + +def _process_file( + file_path: Path, + source: str, + db_path: Path, + processed_dir: Path, + failed_dir: Path, +) -> None: + """Importe un fichier, le déplace selon le résultat. + + Succès ou doublon → processed/{source}_{date}_{nom} + Erreur → failed/{nom} + failed/{nom}.log + """ + # Attendre un court instant : certains éditeurs / copiers écrivent en deux passes + time.sleep(0.2) + + date_str = datetime.now().strftime("%Y-%m-%d") + dest_name = f"{source}_{date_str}_{file_path.name}" + + try: + inserted = pipeline.import_receipt(file_path, source, db_path) + status = "importé" if inserted else "doublon ignoré" + logger.info("[watcher] %s : %s → processed/", file_path.name, status) + # Déplacement vers processed/ + processed_dir.mkdir(parents=True, exist_ok=True) + file_path.rename(processed_dir / dest_name) + except Exception as exc: + logger.error("[watcher] Erreur sur %s : %s", file_path.name, exc) + # Déplacement vers failed/ + création d'un .log + failed_dir.mkdir(parents=True, exist_ok=True) + log_path = failed_dir / f"{file_path.name}.log" + log_path.write_text( + f"Fichier : {file_path}\n" + f"Source : {source}\n" + f"Date : {datetime.now().isoformat()}\n" + f"Erreur : {exc}\n", + encoding="utf-8", + ) + file_path.rename(failed_dir / file_path.name) + + +def watch(inbox_path: Path, db_path: Path) -> None: + """Lance le watcher en mode bloquant (interrompre avec Ctrl+C). + + Surveille inbox_path/picnic/ et inbox_path/leclerc/ récursivement. + Crée les dossiers inbox/, processed/ et failed/ s'ils sont absents. + + Args: + inbox_path: Répertoire parent contenant les sous-dossiers picnic/ et leclerc/. + db_path: Chemin vers la base SQLite. + """ + # Créer les dossiers nécessaires + for sub in ("picnic", "leclerc"): + (inbox_path / sub).mkdir(parents=True, exist_ok=True) + (inbox_path.parent / "processed").mkdir(parents=True, exist_ok=True) + (inbox_path.parent / "failed").mkdir(parents=True, exist_ok=True) + + handler = ReceiptHandler(db_path=db_path, inbox_path=inbox_path) + + observer = Observer() + # Surveillance récursive du dossier inbox/ (capte picnic/ et leclerc/ en une passe) + observer.schedule(handler, str(inbox_path), recursive=True) + observer.start() + + print(f"Surveillance de {inbox_path}/picnic/ et {inbox_path}/leclerc/ — Ctrl+C pour arrêter") + + try: + while True: + time.sleep(1) + except KeyboardInterrupt: + observer.stop() + observer.join()