Compare commits
3 Commits
be4d4a7076
...
1d8f139c7c
| Author | SHA1 | Date | |
|---|---|---|---|
| 1d8f139c7c | |||
| 93333afffa | |||
| 8af474c928 |
107
tests/test_eml.py
Normal file
107
tests/test_eml.py
Normal file
@@ -0,0 +1,107 @@
|
|||||||
|
"""
|
||||||
|
Tests de l'extraction HTML depuis les fichiers .eml (pipeline._eml_to_html).
|
||||||
|
|
||||||
|
Stratégie : on construit des .eml synthétiques en mémoire (tmp_path)
|
||||||
|
sans dépendre d'un vrai mail Picnic.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from tickettracker.pipeline import _eml_to_html
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Helpers pour construire des .eml de test
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def _make_eml(tmp_path: Path, html: str, add_text_part: bool = True) -> Path:
|
||||||
|
"""Crée un fichier .eml multipart/alternative avec une partie HTML."""
|
||||||
|
boundary = "BOUNDARY123"
|
||||||
|
lines = [
|
||||||
|
"MIME-Version: 1.0",
|
||||||
|
f'Content-Type: multipart/alternative; boundary="{boundary}"',
|
||||||
|
"From: picnic@picnic.app",
|
||||||
|
"Subject: Votre commande Picnic",
|
||||||
|
"",
|
||||||
|
f"--{boundary}",
|
||||||
|
]
|
||||||
|
if add_text_part:
|
||||||
|
lines += [
|
||||||
|
"Content-Type: text/plain; charset=utf-8",
|
||||||
|
"",
|
||||||
|
"Version texte de l'email.",
|
||||||
|
"",
|
||||||
|
f"--{boundary}",
|
||||||
|
]
|
||||||
|
lines += [
|
||||||
|
"Content-Type: text/html; charset=utf-8",
|
||||||
|
"",
|
||||||
|
html,
|
||||||
|
"",
|
||||||
|
f"--{boundary}--",
|
||||||
|
]
|
||||||
|
p = tmp_path / "ticket.eml"
|
||||||
|
p.write_text("\n".join(lines), encoding="utf-8")
|
||||||
|
return p
|
||||||
|
|
||||||
|
|
||||||
|
def _make_eml_no_html(tmp_path: Path) -> Path:
|
||||||
|
"""Crée un .eml sans partie HTML (texte seul)."""
|
||||||
|
boundary = "BOUNDARY456"
|
||||||
|
content = "\n".join([
|
||||||
|
"MIME-Version: 1.0",
|
||||||
|
f'Content-Type: multipart/alternative; boundary="{boundary}"',
|
||||||
|
"",
|
||||||
|
f"--{boundary}",
|
||||||
|
"Content-Type: text/plain; charset=utf-8",
|
||||||
|
"",
|
||||||
|
"Texte seul, pas de HTML.",
|
||||||
|
"",
|
||||||
|
f"--{boundary}--",
|
||||||
|
])
|
||||||
|
p = tmp_path / "no_html.eml"
|
||||||
|
p.write_text(content, encoding="utf-8")
|
||||||
|
return p
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Tests
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def test_eml_to_html_retourne_le_contenu_html(tmp_path):
|
||||||
|
"""_eml_to_html extrait correctement le HTML d'un .eml multipart."""
|
||||||
|
html = "<html><body><p>Commande Picnic</p></body></html>"
|
||||||
|
eml = _make_eml(tmp_path, html)
|
||||||
|
result = _eml_to_html(eml)
|
||||||
|
assert "Commande Picnic" in result
|
||||||
|
|
||||||
|
|
||||||
|
def test_eml_to_html_contient_les_balises(tmp_path):
|
||||||
|
"""Le HTML retourné contient bien les balises HTML."""
|
||||||
|
html = "<html><body><h1>Titre</h1></body></html>"
|
||||||
|
eml = _make_eml(tmp_path, html)
|
||||||
|
result = _eml_to_html(eml)
|
||||||
|
assert "<h1>" in result or "Titre" in result
|
||||||
|
|
||||||
|
|
||||||
|
def test_eml_to_html_retourne_str(tmp_path):
|
||||||
|
"""_eml_to_html retourne une chaîne de caractères."""
|
||||||
|
eml = _make_eml(tmp_path, "<html><body>test</body></html>")
|
||||||
|
result = _eml_to_html(eml)
|
||||||
|
assert isinstance(result, str)
|
||||||
|
|
||||||
|
|
||||||
|
def test_eml_to_html_sans_partie_texte(tmp_path):
|
||||||
|
"""Fonctionne aussi sur un .eml avec uniquement une partie HTML."""
|
||||||
|
html = "<html><body><p>HTML only</p></body></html>"
|
||||||
|
eml = _make_eml(tmp_path, html, add_text_part=False)
|
||||||
|
result = _eml_to_html(eml)
|
||||||
|
assert "HTML only" in result
|
||||||
|
|
||||||
|
|
||||||
|
def test_eml_to_html_leve_valueerror_si_pas_de_html(tmp_path):
|
||||||
|
"""Lève ValueError si le .eml ne contient aucune partie HTML."""
|
||||||
|
eml = _make_eml_no_html(tmp_path)
|
||||||
|
with pytest.raises(ValueError, match="Aucune partie HTML"):
|
||||||
|
_eml_to_html(eml)
|
||||||
@@ -51,12 +51,17 @@ def find_fuzzy_matches(
|
|||||||
]
|
]
|
||||||
|
|
||||||
# Produit cartésien filtré par seuil
|
# Produit cartésien filtré par seuil
|
||||||
|
# On compare uniquement le nom (avant le premier " | ") pour éviter que
|
||||||
|
# les différences de marque/quantité ("| MDD | 1kg" vs "| - | -") ne
|
||||||
|
# pénalisent artificiellement le score.
|
||||||
matches = []
|
matches = []
|
||||||
for p in picnic_names:
|
for p in picnic_names:
|
||||||
|
p_name = p.split(" | ")[0].strip()
|
||||||
for lec in leclerc_names:
|
for lec in leclerc_names:
|
||||||
if p == lec:
|
if p == lec:
|
||||||
continue # exact match déjà géré par get_compare_prices
|
continue # exact match déjà géré par get_compare_prices
|
||||||
score = fuzz.token_sort_ratio(p, lec)
|
lec_name = lec.split(" | ")[0].strip()
|
||||||
|
score = fuzz.token_sort_ratio(p_name, lec_name)
|
||||||
if score >= threshold:
|
if score >= threshold:
|
||||||
matches.append({"name_picnic": p, "name_leclerc": lec, "score": score})
|
matches.append({"name_picnic": p, "name_leclerc": lec, "score": score})
|
||||||
|
|
||||||
|
|||||||
@@ -140,7 +140,7 @@ def fetch_unnormalized(
|
|||||||
) -> list[sqlite3.Row]:
|
) -> list[sqlite3.Row]:
|
||||||
"""Retourne les articles dont name_normalized est NULL.
|
"""Retourne les articles dont name_normalized est NULL.
|
||||||
|
|
||||||
Chaque Row expose les clés : id, name_raw, receipt_id.
|
Chaque Row expose les clés : id, name_raw, unit, receipt_id.
|
||||||
Trié par id pour un traitement reproductible.
|
Trié par id pour un traitement reproductible.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
@@ -150,7 +150,7 @@ def fetch_unnormalized(
|
|||||||
Returns:
|
Returns:
|
||||||
Liste de sqlite3.Row.
|
Liste de sqlite3.Row.
|
||||||
"""
|
"""
|
||||||
sql = "SELECT id, name_raw, receipt_id FROM items WHERE name_normalized IS NULL ORDER BY id"
|
sql = "SELECT id, name_raw, unit, receipt_id FROM items WHERE name_normalized IS NULL ORDER BY id"
|
||||||
if limit is not None:
|
if limit is not None:
|
||||||
sql += f" LIMIT {int(limit)}"
|
sql += f" LIMIT {int(limit)}"
|
||||||
return conn.execute(sql).fetchall()
|
return conn.execute(sql).fetchall()
|
||||||
|
|||||||
@@ -229,7 +229,13 @@ def normalize_all_in_db(
|
|||||||
|
|
||||||
for start in range(0, total, batch_size):
|
for start in range(0, total, batch_size):
|
||||||
batch = items[start: start + batch_size]
|
batch = items[start: start + batch_size]
|
||||||
raw_names = [row["name_raw"] for row in batch]
|
# On inclut l'unité/poids (ex: "250 g", "20 sachets") dans le nom
|
||||||
|
# envoyé au LLM pour qu'il puisse le placer dans le champ format.
|
||||||
|
# Pour les articles sans unité (Leclerc OCR), unit est None ou "".
|
||||||
|
raw_names = [
|
||||||
|
f"{row['name_raw']} {row['unit']}".strip() if row["unit"] else row["name_raw"]
|
||||||
|
for row in batch
|
||||||
|
]
|
||||||
|
|
||||||
# --- Tentative batch ---
|
# --- Tentative batch ---
|
||||||
try:
|
try:
|
||||||
@@ -246,7 +252,7 @@ def normalize_all_in_db(
|
|||||||
# tente le fallback un par un
|
# tente le fallback un par un
|
||||||
if all(r is None for r in results):
|
if all(r is None for r in results):
|
||||||
logger.debug("Fallback unitaire pour le batch %d–%d.", start, start + len(batch))
|
logger.debug("Fallback unitaire pour le batch %d–%d.", start, start + len(batch))
|
||||||
results = [normalize_product_name(name) for name in raw_names]
|
results = [normalize_product_name(name) for name in raw_names] # raw_names contient déjà l'unité
|
||||||
|
|
||||||
# --- Mise à jour ou affichage ---
|
# --- Mise à jour ou affichage ---
|
||||||
for item, normalized in zip(batch, results):
|
for item, normalized in zip(batch, results):
|
||||||
|
|||||||
@@ -10,7 +10,9 @@ Usage :
|
|||||||
inserted = import_receipt("samples/picnic_sample.html", source="picnic")
|
inserted = import_receipt("samples/picnic_sample.html", source="picnic")
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
import email
|
||||||
import logging
|
import logging
|
||||||
|
from email import policy
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
from tickettracker.db import schema, repository
|
from tickettracker.db import schema, repository
|
||||||
@@ -95,7 +97,10 @@ def _parse(file_path: Path, source: str):
|
|||||||
"""
|
"""
|
||||||
if source == "picnic":
|
if source == "picnic":
|
||||||
from tickettracker.parsers import picnic
|
from tickettracker.parsers import picnic
|
||||||
html_content = file_path.read_text(encoding="utf-8", errors="replace")
|
if file_path.suffix.lower() == ".eml":
|
||||||
|
html_content = _eml_to_html(file_path)
|
||||||
|
else:
|
||||||
|
html_content = file_path.read_text(encoding="utf-8", errors="replace")
|
||||||
return picnic.parse(html_content)
|
return picnic.parse(html_content)
|
||||||
|
|
||||||
if source == "leclerc":
|
if source == "leclerc":
|
||||||
@@ -104,3 +109,40 @@ def _parse(file_path: Path, source: str):
|
|||||||
|
|
||||||
# Jamais atteint grâce à la validation en amont, mais satisfait mypy
|
# Jamais atteint grâce à la validation en amont, mais satisfait mypy
|
||||||
raise ValueError(f"Source inconnue : '{source}'")
|
raise ValueError(f"Source inconnue : '{source}'")
|
||||||
|
|
||||||
|
|
||||||
|
def _eml_to_html(file_path: Path) -> str:
|
||||||
|
"""Extrait la partie HTML d'un fichier .eml (email de confirmation Picnic).
|
||||||
|
|
||||||
|
Retourne le corps HTML brut, encore encodé en Quoted-Printable (QP),
|
||||||
|
exactement comme si on lisait un fichier .html sauvegardé depuis le mail.
|
||||||
|
Le parser Picnic (picnic._decode_and_parse) se charge lui-même du décodage QP.
|
||||||
|
|
||||||
|
Pourquoi ne pas utiliser policy.default / get_content() ?
|
||||||
|
Parce que cette API décode déjà les accents (=C3=A9 → é), ce qui empêche
|
||||||
|
picnic.py de les retrouver via sa propre pipeline QP → UTF-8.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
file_path: Chemin vers le fichier .eml.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Corps HTML brut (QP-encodé) sous forme de chaîne ASCII.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ValueError: Si aucune partie HTML n'est trouvée dans le .eml.
|
||||||
|
"""
|
||||||
|
raw = file_path.read_bytes()
|
||||||
|
# On utilise l'ancienne API (sans policy.default) pour garder le payload brut
|
||||||
|
msg = email.message_from_bytes(raw)
|
||||||
|
|
||||||
|
for part in msg.walk():
|
||||||
|
if part.get_content_type() == "text/html":
|
||||||
|
# decode=False → payload brut, encore QP-encodé, en str ASCII
|
||||||
|
payload = part.get_payload(decode=False)
|
||||||
|
if isinstance(payload, bytes):
|
||||||
|
return payload.decode("ascii", errors="replace")
|
||||||
|
return payload # déjà une str
|
||||||
|
|
||||||
|
raise ValueError(
|
||||||
|
f"Aucune partie HTML trouvée dans le fichier .eml : {file_path.name}"
|
||||||
|
)
|
||||||
|
|||||||
Reference in New Issue
Block a user