Files
suivi-courses/app.py
2026-02-06 17:54:51 +01:00

354 lines
12 KiB
Python

import sqlite3
from datetime import datetime
import os
import re
import discord
from discord.ext import commands
import io
import pdfplumber
DB_PATH = "grocery_receipts.db"
def init_db():
"""Initialize the database with required tables."""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# Create stores table
cursor.execute('''
CREATE TABLE IF NOT EXISTS stores (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE
)
''')
# Create products table
cursor.execute('''
CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE
)
''')
# Create receipts table
cursor.execute('''
CREATE TABLE IF NOT EXISTS receipts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
store_id INTEGER NOT NULL,
date DATE NOT NULL,
total REAL NOT NULL,
FOREIGN KEY (store_id) REFERENCES stores(id)
)
''')
# Create receipt_items table (linking products to receipts with prices)
cursor.execute('''
CREATE TABLE IF NOT EXISTS receipt_items (
id INTEGER PRIMARY KEY AUTOINCREMENT,
receipt_id INTEGER NOT NULL,
product_id INTEGER NOT NULL,
quantity REAL NOT NULL,
price REAL NOT NULL,
FOREIGN KEY (receipt_id) REFERENCES receipts(id),
FOREIGN KEY (product_id) REFERENCES products(id)
)
''')
conn.commit()
conn.close()
def extract_text_from_pdf(pdf_path):
"""Extract text from a PDF file using pdfplumber."""
text = ""
try:
with pdfplumber.open(pdf_path) as pdf:
for page in pdf.pages:
text += page.extract_text() or ""
except Exception as e:
print(f"Error extracting text from PDF: {e}")
return ""
return text
def parse_receipt_text(text):
"""
Parse receipt text to extract store name, date, and items.
This is a basic parser that can be improved with more sophisticated logic.
Returns:
Tuple of (store_name, date, items_list)
"""
# Extract date (looking for common date patterns)
date_pattern = r'\d{1,2}[/-]\d{1,2}[/-]\d{2,4}'
date_match = re.search(date_pattern, text)
date = date_match.group() if date_match else datetime.now().strftime('%Y-%m-%d')
# Try to extract store name (first line or lines containing common store keywords)
lines = text.split('\n')
store_name = "Unknown Store"
for line in lines[:5]: # Check first few lines
if any(keyword in line.lower() for keyword in ['supermarket', 'store', 'grocery', 'market', 'shop', 'saint', 'sainte']):
store_name = line.strip()
break
# Extract items (lines with price patterns)
items = []
# Look for lines that have product names followed by prices
item_pattern = r'^(.+?)\s+(\d+\.?\d*)\s*(x|\*)?\s*(\d+\.?\d*)\s*$'
for line in lines:
line = line.strip()
# Skip empty lines and lines that are likely headers/footers
if not line or any(skip_word in line.lower() for skip_word in ['total', 'subtotal', 'payment', 'change', 'receipt', 'store']):
continue
# Try to match item patterns
match = re.match(r'(.+?)\s+(\d+\.?\d*)\s*x?\s*(\d+\.?\d*)', line, re.IGNORECASE)
if match:
product_name = match.group(1).strip()
try:
quantity = float(match.group(2))
price = float(match.group(3))
items.append((product_name, quantity, price))
except ValueError:
continue
# If no items found with the pattern, try simpler parsing
if not items:
for line in lines:
line = line.strip()
# Look for lines with prices (containing decimal points)
price_match = re.search(r'(\d+\.?\d*)\s*$', line)
if price_match and len(line.split()) > 1:
# Extract product name and price
parts = line.rsplit(' ', 1)
if len(parts) == 2:
try:
product_name = parts[0].strip()
price = float(parts[1])
# Assume quantity 1 if not specified
items.append((product_name, 1.0, price))
except ValueError:
continue
return store_name, date, items
def add_receipt(store_name, date, items):
"""
Add a receipt to the database.
Args:
store_name: Name of the store
date: Date of the receipt (YYYY-MM-DD format)
items: List of tuples (product_name, quantity, price_per_unit)
"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# Get or create store
cursor.execute("SELECT id FROM stores WHERE name = ?", (store_name,))
store = cursor.fetchone()
if not store:
cursor.execute("INSERT INTO stores (name) VALUES (?)", (store_name,))
conn.commit()
cursor.execute("SELECT id FROM stores WHERE name = ?", (store_name,))
store = cursor.fetchone()
store_id = store[0]
# Insert receipt
cursor.execute("INSERT INTO receipts (store_id, date, total) VALUES (?, ?, ?)",
(store_id, date, sum(item[1] * item[2] for item in items)))
receipt_id = cursor.lastrowid
# Insert items
for product_name, quantity, price in items:
# Get or create product
cursor.execute("SELECT id FROM products WHERE name = ?", (product_name,))
product = cursor.fetchone()
if not product:
cursor.execute("INSERT INTO products (name) VALUES (?)", (product_name,))
conn.commit()
cursor.execute("SELECT id FROM products WHERE name = ?", (product_name,))
product = cursor.fetchone()
product_id = product[0]
cursor.execute("INSERT INTO receipt_items (receipt_id, product_id, quantity, price) VALUES (?, ?, ?, ?)",
(receipt_id, product_id, quantity, price))
conn.commit()
conn.close()
def get_product_prices(product_name):
"""
Get all prices for a specific product across all receipts.
Args:
product_name: Name of the product
Returns:
List of tuples (date, store_name, quantity, price_per_unit)
"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''
SELECT r.date, s.name, ri.quantity, ri.price
FROM receipt_items ri
JOIN receipts r ON ri.receipt_id = r.id
JOIN stores s ON r.store_id = s.id
JOIN products p ON ri.product_id = p.id
WHERE p.name = ?
ORDER BY r.date
''', (product_name,))
results = cursor.fetchall()
conn.close()
return results
def list_receipts():
"""
List all receipts in the database.
Returns:
List of tuples (receipt_id, store_name, date, total)
"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''
SELECT r.id, s.name, r.date, r.total
FROM receipts r
JOIN stores s ON r.store_id = s.id
ORDER BY r.date DESC
''')
results = cursor.fetchall()
conn.close()
return results
# Initialize the database when the module is imported
init_db()
# Discord bot setup
intents = discord.Intents.default()
intents.message_content = True
intents.members = True
bot = commands.Bot(command_prefix='!', intents=intents)
@bot.event
async def on_ready():
print(f'{bot.user} has connected to Discord!')
@bot.event
async def on_message(message):
# Ignore messages from the bot itself
if message.author == bot.user:
return
# Process commands
await bot.process_commands(message)
# Handle receipt uploads
if message.attachments:
for attachment in message.attachments:
if attachment.filename.lower().endswith('.pdf'):
# Download the PDF
pdf_bytes = await attachment.read()
# Save the PDF to a receipts folder
os.makedirs('receipts', exist_ok=True)
file_path = os.path.join('receipts', attachment.filename)
with open(file_path, 'wb') as f:
f.write(pdf_bytes)
# Extract text and parse the receipt
try:
text = extract_text_from_pdf(file_path)
if text:
store_name, date, items = parse_receipt_text(text)
if items:
add_receipt(store_name, date, items)
await message.channel.send(f"Receipt '{attachment.filename}' processed! Found {len(items)} items.")
else:
await message.channel.send(f"Receipt '{attachment.filename}' saved but couldn't parse items. Please check the format.")
else:
await message.channel.send(f"Could not extract text from '{attachment.filename}'. Is it a text-based PDF?")
except Exception as e:
await message.channel.send(f"Error processing receipt: {str(e)}")
@bot.command(name='add_receipt')
async def add_receipt_command(ctx, store_name: str, date: str, *, items: str):
"""
Add a receipt manually.
Usage: !add_receipt StoreName 2023-10-15 "Product1,2,1.50;Product2,1,2.00"
"""
try:
# Parse items: format is "Product1,quantity,price;Product2,quantity,price"
item_list = []
for item_str in items.split(';'):
parts = item_str.strip().split(',')
if len(parts) == 3:
product_name = parts[0].strip()
quantity = float(parts[1].strip())
price = float(parts[2].strip())
item_list.append((product_name, quantity, price))
add_receipt(store_name, date, item_list)
await ctx.send(f"Receipt added for {store_name} on {date} with {len(item_list)} items!")
except Exception as e:
await ctx.send(f"Error adding receipt: {str(e)}")
@bot.command(name='prices')
async def prices_command(ctx, *, product_name: str):
"""
Get price history for a product.
Usage: !prices ProductName
"""
try:
prices = get_product_prices(product_name)
if not prices:
await ctx.send(f"No price history found for '{product_name}'.")
return
message = f"Price history for '{product_name}':\n"
for date, store, qty, price in prices:
message += f"- {date} at {store}: ${price:.2f} per unit (qty: {qty})\n"
await ctx.send(message)
except Exception as e:
await ctx.send(f"Error retrieving prices: {str(e)}")
@bot.command(name='receipts')
async def receipts_command(ctx):
"""
List all receipts.
Usage: !receipts
"""
try:
receipts = list_receipts()
if not receipts:
await ctx.send("No receipts found in the database.")
return
message = "All receipts:\n"
for receipt_id, store, date, total in receipts:
message += f"- ID: {receipt_id}, Store: {store}, Date: {date}, Total: ${total:.2f}\n"
await ctx.send(message)
except Exception as e:
await ctx.send(f"Error listing receipts: {str(e)}")
# Run the bot
if __name__ == "__main__":
# You'll need to set your Discord bot token as an environment variable
# or replace the token parameter with your actual token (not recommended for production)
DISCORD_TOKEN = os.getenv('DISCORD_BOT_TOKEN')
if not DISCORD_TOKEN:
print("Error: DISCORD_BOT_TOKEN environment variable not set")
exit(1)
bot.run(DISCORD_TOKEN)