298 lines
10 KiB
Python
298 lines
10 KiB
Python
|
|
"""
|
||
|
|
Tests pour la couche base de données (schema + repository + pipeline).
|
||
|
|
|
||
|
|
Chaque test reçoit une base SQLite fraîche via le fixture tmp_path de pytest.
|
||
|
|
On utilise des données synthétiques (pas les vrais fichiers sample) pour que
|
||
|
|
ces tests soient rapides et ne dépendent pas de Tesseract ou de fichiers externes.
|
||
|
|
"""
|
||
|
|
|
||
|
|
import sqlite3
|
||
|
|
from datetime import date
|
||
|
|
from pathlib import Path
|
||
|
|
|
||
|
|
import pytest
|
||
|
|
|
||
|
|
from tickettracker.models.receipt import Item, Receipt
|
||
|
|
from tickettracker.db import schema, repository
|
||
|
|
|
||
|
|
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
# Fixtures
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
|
||
|
|
@pytest.fixture
|
||
|
|
def db_path(tmp_path: Path) -> Path:
|
||
|
|
"""Crée une base SQLite isolée dans un répertoire temporaire.
|
||
|
|
|
||
|
|
tmp_path est un fixture pytest qui fournit un Path unique par test.
|
||
|
|
La base est initialisée avec le schéma complet avant chaque test.
|
||
|
|
"""
|
||
|
|
path = tmp_path / "test_tickettracker.db"
|
||
|
|
schema.init_db(path)
|
||
|
|
return path
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.fixture
|
||
|
|
def db_conn(db_path: Path):
|
||
|
|
"""Retourne une connexion ouverte vers la base de test.
|
||
|
|
|
||
|
|
La connexion est fermée après chaque test grâce au yield.
|
||
|
|
"""
|
||
|
|
conn = schema.get_connection(db_path)
|
||
|
|
yield conn
|
||
|
|
conn.close()
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.fixture
|
||
|
|
def sample_receipt() -> Receipt:
|
||
|
|
"""Ticket Picnic synthétique pour les tests d'insertion."""
|
||
|
|
return Receipt(
|
||
|
|
store="picnic",
|
||
|
|
date=date(2026, 2, 14),
|
||
|
|
total=12.50,
|
||
|
|
order_id="TEST-001",
|
||
|
|
items=[
|
||
|
|
Item(
|
||
|
|
name="Lait demi-écrémé",
|
||
|
|
quantity=2,
|
||
|
|
unit="pièce",
|
||
|
|
unit_price=1.05,
|
||
|
|
total_price=2.10,
|
||
|
|
category=None,
|
||
|
|
),
|
||
|
|
Item(
|
||
|
|
name="Pain de campagne",
|
||
|
|
quantity=1,
|
||
|
|
unit="pièce",
|
||
|
|
unit_price=2.40,
|
||
|
|
total_price=2.40,
|
||
|
|
category=None,
|
||
|
|
),
|
||
|
|
],
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.fixture
|
||
|
|
def sample_receipt_leclerc() -> Receipt:
|
||
|
|
"""Ticket Leclerc synthétique avec catégories."""
|
||
|
|
return Receipt(
|
||
|
|
store="leclerc",
|
||
|
|
date=date(2025, 11, 8),
|
||
|
|
total=20.00,
|
||
|
|
order_id="018-0003",
|
||
|
|
items=[
|
||
|
|
Item(
|
||
|
|
name="NOIX CAJOU",
|
||
|
|
quantity=1,
|
||
|
|
unit="pièce",
|
||
|
|
unit_price=5.12,
|
||
|
|
total_price=5.12,
|
||
|
|
category="EPICERIE SALEE",
|
||
|
|
),
|
||
|
|
Item(
|
||
|
|
name="SAUCISSE FUMEES",
|
||
|
|
quantity=2,
|
||
|
|
unit="pièce",
|
||
|
|
unit_price=3.48,
|
||
|
|
total_price=6.96,
|
||
|
|
category="BOUCHERIE LS",
|
||
|
|
),
|
||
|
|
],
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
# Tests du schéma
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
|
||
|
|
def test_schema_tables_exist(db_conn: sqlite3.Connection):
|
||
|
|
"""Les tables receipts et items existent après init_db."""
|
||
|
|
cur = db_conn.execute(
|
||
|
|
"SELECT name FROM sqlite_master WHERE type='table' ORDER BY name"
|
||
|
|
)
|
||
|
|
tables = {row["name"] for row in cur}
|
||
|
|
assert "receipts" in tables
|
||
|
|
assert "items" in tables
|
||
|
|
|
||
|
|
|
||
|
|
def test_schema_view_exists(db_conn: sqlite3.Connection):
|
||
|
|
"""La vue price_history existe après init_db."""
|
||
|
|
cur = db_conn.execute(
|
||
|
|
"SELECT name FROM sqlite_master WHERE type='view'"
|
||
|
|
)
|
||
|
|
views = {row["name"] for row in cur}
|
||
|
|
assert "price_history" in views
|
||
|
|
|
||
|
|
|
||
|
|
def test_schema_foreign_keys_enabled(db_conn: sqlite3.Connection):
|
||
|
|
"""Les clés étrangères sont activées sur la connexion."""
|
||
|
|
row = db_conn.execute("PRAGMA foreign_keys").fetchone()
|
||
|
|
assert row[0] == 1
|
||
|
|
|
||
|
|
|
||
|
|
def test_schema_idempotent(db_path: Path):
|
||
|
|
"""Appeler init_db deux fois ne lève pas d'erreur."""
|
||
|
|
schema.init_db(db_path) # deuxième appel — doit être sans effet
|
||
|
|
schema.init_db(db_path) # troisième appel — idem
|
||
|
|
|
||
|
|
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
# Tests d'insertion
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
|
||
|
|
def test_insert_receipt_row_count(db_conn: sqlite3.Connection, sample_receipt: Receipt):
|
||
|
|
"""Après insertion, receipts contient exactement 1 ligne."""
|
||
|
|
repository.insert_receipt(db_conn, sample_receipt)
|
||
|
|
count = db_conn.execute("SELECT COUNT(*) FROM receipts").fetchone()[0]
|
||
|
|
assert count == 1
|
||
|
|
|
||
|
|
|
||
|
|
def test_insert_receipt_fields(db_conn: sqlite3.Connection, sample_receipt: Receipt):
|
||
|
|
"""Les champs du ticket inséré correspondent au Receipt source."""
|
||
|
|
receipt_id = repository.insert_receipt(db_conn, sample_receipt)
|
||
|
|
row = db_conn.execute(
|
||
|
|
"SELECT * FROM receipts WHERE id = ?", (receipt_id,)
|
||
|
|
).fetchone()
|
||
|
|
|
||
|
|
assert row["store"] == "picnic"
|
||
|
|
assert row["date"] == "2026-02-14"
|
||
|
|
assert row["total"] == pytest.approx(12.50)
|
||
|
|
assert row["order_id"] == "TEST-001"
|
||
|
|
assert row["raw_json"] is not None
|
||
|
|
assert row["created_at"] is not None
|
||
|
|
assert row["delivery_fee"] is None # non renseigné pour l'instant
|
||
|
|
|
||
|
|
|
||
|
|
def test_insert_receipt_returns_id(db_conn: sqlite3.Connection, sample_receipt: Receipt):
|
||
|
|
"""insert_receipt retourne un entier positif (l'id de la ligne insérée)."""
|
||
|
|
receipt_id = repository.insert_receipt(db_conn, sample_receipt)
|
||
|
|
assert isinstance(receipt_id, int)
|
||
|
|
assert receipt_id > 0
|
||
|
|
|
||
|
|
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
# Tests de déduplication
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
|
||
|
|
def test_receipt_not_exists_before_insert(db_conn: sqlite3.Connection, sample_receipt: Receipt):
|
||
|
|
"""receipt_exists retourne False avant tout insert."""
|
||
|
|
exists = repository.receipt_exists(
|
||
|
|
db_conn,
|
||
|
|
sample_receipt.store,
|
||
|
|
sample_receipt.date.isoformat(),
|
||
|
|
sample_receipt.total,
|
||
|
|
)
|
||
|
|
assert not exists
|
||
|
|
|
||
|
|
|
||
|
|
def test_receipt_exists_after_insert(db_conn: sqlite3.Connection, sample_receipt: Receipt):
|
||
|
|
"""receipt_exists retourne True après un insert."""
|
||
|
|
repository.insert_receipt(db_conn, sample_receipt)
|
||
|
|
exists = repository.receipt_exists(
|
||
|
|
db_conn,
|
||
|
|
sample_receipt.store,
|
||
|
|
sample_receipt.date.isoformat(),
|
||
|
|
sample_receipt.total,
|
||
|
|
)
|
||
|
|
assert exists
|
||
|
|
|
||
|
|
|
||
|
|
def test_dedup_insert_twice(db_conn: sqlite3.Connection, sample_receipt: Receipt):
|
||
|
|
"""Insérer un même ticket deux fois laisse exactement 1 ligne en base.
|
||
|
|
|
||
|
|
Note : insert_receipt n'implémente pas lui-même le contrôle de doublon
|
||
|
|
(c'est le rôle du pipeline). Ce test simule le comportement du pipeline
|
||
|
|
en vérifiant receipt_exists avant chaque insert.
|
||
|
|
"""
|
||
|
|
date_iso = sample_receipt.date.isoformat()
|
||
|
|
|
||
|
|
# Premier import
|
||
|
|
if not repository.receipt_exists(db_conn, sample_receipt.store, date_iso, sample_receipt.total):
|
||
|
|
repository.insert_receipt(db_conn, sample_receipt)
|
||
|
|
|
||
|
|
# Deuxième import (doit être ignoré)
|
||
|
|
if not repository.receipt_exists(db_conn, sample_receipt.store, date_iso, sample_receipt.total):
|
||
|
|
repository.insert_receipt(db_conn, sample_receipt)
|
||
|
|
|
||
|
|
count = db_conn.execute("SELECT COUNT(*) FROM receipts").fetchone()[0]
|
||
|
|
assert count == 1
|
||
|
|
|
||
|
|
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
# Tests des articles
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
|
||
|
|
def test_items_stored_count(db_conn: sqlite3.Connection, sample_receipt: Receipt):
|
||
|
|
"""Le nombre de lignes dans items correspond à len(receipt.items)."""
|
||
|
|
receipt_id = repository.insert_receipt(db_conn, sample_receipt)
|
||
|
|
count = db_conn.execute(
|
||
|
|
"SELECT COUNT(*) FROM items WHERE receipt_id = ?", (receipt_id,)
|
||
|
|
).fetchone()[0]
|
||
|
|
assert count == len(sample_receipt.items)
|
||
|
|
|
||
|
|
|
||
|
|
def test_items_name_raw_populated(db_conn: sqlite3.Connection, sample_receipt: Receipt):
|
||
|
|
"""name_raw est rempli ; name_normalized est NULL (Sprint 3)."""
|
||
|
|
receipt_id = repository.insert_receipt(db_conn, sample_receipt)
|
||
|
|
rows = db_conn.execute(
|
||
|
|
"SELECT name_raw, name_normalized FROM items WHERE receipt_id = ?",
|
||
|
|
(receipt_id,),
|
||
|
|
).fetchall()
|
||
|
|
|
||
|
|
for row in rows:
|
||
|
|
assert row["name_raw"] is not None
|
||
|
|
assert row["name_normalized"] is None
|
||
|
|
|
||
|
|
|
||
|
|
def test_items_category_leclerc(db_conn: sqlite3.Connection, sample_receipt_leclerc: Receipt):
|
||
|
|
"""Les catégories Leclerc sont bien stockées dans items."""
|
||
|
|
receipt_id = repository.insert_receipt(db_conn, sample_receipt_leclerc)
|
||
|
|
rows = db_conn.execute(
|
||
|
|
"SELECT name_raw, category FROM items WHERE receipt_id = ? ORDER BY id",
|
||
|
|
(receipt_id,),
|
||
|
|
).fetchall()
|
||
|
|
|
||
|
|
assert rows[0]["category"] == "EPICERIE SALEE"
|
||
|
|
assert rows[1]["category"] == "BOUCHERIE LS"
|
||
|
|
|
||
|
|
|
||
|
|
def test_items_fk_constraint(db_conn: sqlite3.Connection):
|
||
|
|
"""Insérer un item avec un receipt_id inexistant doit échouer (FK active)."""
|
||
|
|
with pytest.raises(sqlite3.IntegrityError):
|
||
|
|
db_conn.execute(
|
||
|
|
"""INSERT INTO items
|
||
|
|
(receipt_id, name_raw, category, quantity, unit, unit_price, total_price)
|
||
|
|
VALUES (999, 'Fantôme', NULL, 1.0, 'pièce', 1.0, 1.0)"""
|
||
|
|
)
|
||
|
|
db_conn.commit()
|
||
|
|
|
||
|
|
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
# Tests des statistiques
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
|
||
|
|
def test_get_stats_empty(db_conn: sqlite3.Connection):
|
||
|
|
"""get_stats sur une base vide retourne des zéros."""
|
||
|
|
stats = repository.get_stats(db_conn)
|
||
|
|
assert stats["receipts_by_store"] == {}
|
||
|
|
assert stats["total_spent"] == 0.0
|
||
|
|
assert stats["total_items"] == 0
|
||
|
|
|
||
|
|
|
||
|
|
def test_get_stats_after_insert(
|
||
|
|
db_conn: sqlite3.Connection,
|
||
|
|
sample_receipt: Receipt,
|
||
|
|
sample_receipt_leclerc: Receipt,
|
||
|
|
):
|
||
|
|
"""get_stats compte correctement après insertion de deux tickets."""
|
||
|
|
repository.insert_receipt(db_conn, sample_receipt)
|
||
|
|
repository.insert_receipt(db_conn, sample_receipt_leclerc)
|
||
|
|
|
||
|
|
stats = repository.get_stats(db_conn)
|
||
|
|
|
||
|
|
assert stats["receipts_by_store"]["picnic"] == 1
|
||
|
|
assert stats["receipts_by_store"]["leclerc"] == 1
|
||
|
|
assert stats["total_spent"] == pytest.approx(12.50 + 20.00)
|
||
|
|
assert stats["total_items"] == len(sample_receipt.items) + len(sample_receipt_leclerc.items)
|
||
|
|
assert stats["null_normalized"] == stats["total_items"] # tout NULL au Sprint 2
|