Surveille inbox/picnic/ et inbox/leclerc/ avec watchdog.
Chaque nouveau fichier est importé automatiquement :
- succès/doublon → processed/{source}_{date}_{nom}
- erreur → failed/{nom} + failed/{nom}.log
Nouvelle commande CLI : python -m tickettracker.cli watch [--inbox] [--db]
22 tests ajoutés (test_watcher.py), tous passent.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
124 lines
4.3 KiB
Python
124 lines
4.3 KiB
Python
"""
|
|
Watch folder pour TicketTracker.
|
|
|
|
Surveille les dossiers inbox/picnic/ et inbox/leclerc/ et importe automatiquement
|
|
tout nouveau fichier déposé. Les fichiers traités sont déplacés vers :
|
|
|
|
processed/{source}_{YYYY-MM-DD}_{nom_original} — import OK ou doublon
|
|
failed/{nom_original} — erreur + fichier .log créé
|
|
|
|
Usage CLI :
|
|
python -m tickettracker.cli watch [--inbox PATH] [--db PATH]
|
|
|
|
Interrompre avec Ctrl+C.
|
|
"""
|
|
|
|
import logging
|
|
import time
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
from watchdog.events import FileSystemEventHandler
|
|
from watchdog.observers import Observer
|
|
|
|
from tickettracker import pipeline
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ReceiptHandler(FileSystemEventHandler):
|
|
"""Gestionnaire d'événements watchdog pour les dossiers inbox/."""
|
|
|
|
def __init__(self, db_path: Path, inbox_path: Path):
|
|
self.db_path = db_path
|
|
self.inbox_path = inbox_path
|
|
# processed/ et failed/ sont au même niveau qu'inbox/
|
|
self.processed_dir = inbox_path.parent / "processed"
|
|
self.failed_dir = inbox_path.parent / "failed"
|
|
|
|
def on_created(self, event):
|
|
"""Appelé quand un fichier arrive dans inbox/picnic/ ou inbox/leclerc/."""
|
|
if event.is_directory:
|
|
return
|
|
file_path = Path(event.src_path)
|
|
# La source est déduite du nom du sous-dossier parent (picnic ou leclerc)
|
|
source = file_path.parent.name
|
|
if source not in ("picnic", "leclerc"):
|
|
logger.warning("Fichier ignoré (dossier inconnu) : %s", file_path)
|
|
return
|
|
_process_file(file_path, source, self.db_path,
|
|
self.processed_dir, self.failed_dir)
|
|
|
|
|
|
def _process_file(
|
|
file_path: Path,
|
|
source: str,
|
|
db_path: Path,
|
|
processed_dir: Path,
|
|
failed_dir: Path,
|
|
) -> None:
|
|
"""Importe un fichier, le déplace selon le résultat.
|
|
|
|
Succès ou doublon → processed/{source}_{date}_{nom}
|
|
Erreur → failed/{nom} + failed/{nom}.log
|
|
"""
|
|
# Attendre un court instant : certains éditeurs / copiers écrivent en deux passes
|
|
time.sleep(0.2)
|
|
|
|
date_str = datetime.now().strftime("%Y-%m-%d")
|
|
dest_name = f"{source}_{date_str}_{file_path.name}"
|
|
|
|
try:
|
|
inserted = pipeline.import_receipt(file_path, source, db_path)
|
|
status = "importé" if inserted else "doublon ignoré"
|
|
logger.info("[watcher] %s : %s → processed/", file_path.name, status)
|
|
# Déplacement vers processed/
|
|
processed_dir.mkdir(parents=True, exist_ok=True)
|
|
file_path.rename(processed_dir / dest_name)
|
|
except Exception as exc:
|
|
logger.error("[watcher] Erreur sur %s : %s", file_path.name, exc)
|
|
# Déplacement vers failed/ + création d'un .log
|
|
failed_dir.mkdir(parents=True, exist_ok=True)
|
|
log_path = failed_dir / f"{file_path.name}.log"
|
|
log_path.write_text(
|
|
f"Fichier : {file_path}\n"
|
|
f"Source : {source}\n"
|
|
f"Date : {datetime.now().isoformat()}\n"
|
|
f"Erreur : {exc}\n",
|
|
encoding="utf-8",
|
|
)
|
|
file_path.rename(failed_dir / file_path.name)
|
|
|
|
|
|
def watch(inbox_path: Path, db_path: Path) -> None:
|
|
"""Lance le watcher en mode bloquant (interrompre avec Ctrl+C).
|
|
|
|
Surveille inbox_path/picnic/ et inbox_path/leclerc/ récursivement.
|
|
Crée les dossiers inbox/, processed/ et failed/ s'ils sont absents.
|
|
|
|
Args:
|
|
inbox_path: Répertoire parent contenant les sous-dossiers picnic/ et leclerc/.
|
|
db_path: Chemin vers la base SQLite.
|
|
"""
|
|
# Créer les dossiers nécessaires
|
|
for sub in ("picnic", "leclerc"):
|
|
(inbox_path / sub).mkdir(parents=True, exist_ok=True)
|
|
(inbox_path.parent / "processed").mkdir(parents=True, exist_ok=True)
|
|
(inbox_path.parent / "failed").mkdir(parents=True, exist_ok=True)
|
|
|
|
handler = ReceiptHandler(db_path=db_path, inbox_path=inbox_path)
|
|
|
|
observer = Observer()
|
|
# Surveillance récursive du dossier inbox/ (capte picnic/ et leclerc/ en une passe)
|
|
observer.schedule(handler, str(inbox_path), recursive=True)
|
|
observer.start()
|
|
|
|
print(f"Surveillance de {inbox_path}/picnic/ et {inbox_path}/leclerc/ — Ctrl+C pour arrêter")
|
|
|
|
try:
|
|
while True:
|
|
time.sleep(1)
|
|
except KeyboardInterrupt:
|
|
observer.stop()
|
|
observer.join()
|