import sqlite3 from datetime import datetime import os import re import discord from discord.ext import commands import io import pdfplumber DB_PATH = "grocery_receipts.db" def init_db(): """Initialize the database with required tables.""" conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() # Create stores table cursor.execute(''' CREATE TABLE IF NOT EXISTS stores ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL UNIQUE ) ''') # Create products table cursor.execute(''' CREATE TABLE IF NOT EXISTS products ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL UNIQUE ) ''') # Create receipts table cursor.execute(''' CREATE TABLE IF NOT EXISTS receipts ( id INTEGER PRIMARY KEY AUTOINCREMENT, store_id INTEGER NOT NULL, date DATE NOT NULL, total REAL NOT NULL, FOREIGN KEY (store_id) REFERENCES stores(id) ) ''') # Create receipt_items table (linking products to receipts with prices) cursor.execute(''' CREATE TABLE IF NOT EXISTS receipt_items ( id INTEGER PRIMARY KEY AUTOINCREMENT, receipt_id INTEGER NOT NULL, product_id INTEGER NOT NULL, quantity REAL NOT NULL, price REAL NOT NULL, FOREIGN KEY (receipt_id) REFERENCES receipts(id), FOREIGN KEY (product_id) REFERENCES products(id) ) ''') conn.commit() conn.close() def extract_text_from_pdf(pdf_path): """Extract text from a PDF file using pdfplumber.""" text = "" try: with pdfplumber.open(pdf_path) as pdf: for page in pdf.pages: text += page.extract_text() or "" except Exception as e: print(f"Error extracting text from PDF: {e}") return "" return text def parse_receipt_text(text): """ Parse receipt text to extract store name, date, and items. This is a basic parser that can be improved with more sophisticated logic. Returns: Tuple of (store_name, date, items_list) """ # Extract date (looking for common date patterns) date_pattern = r'\d{1,2}[/-]\d{1,2}[/-]\d{2,4}' date_match = re.search(date_pattern, text) date = date_match.group() if date_match else datetime.now().strftime('%Y-%m-%d') # Try to extract store name (first line or lines containing common store keywords) lines = text.split('\n') store_name = "Unknown Store" for line in lines[:5]: # Check first few lines if any(keyword in line.lower() for keyword in ['supermarket', 'store', 'grocery', 'market', 'shop', 'saint', 'sainte']): store_name = line.strip() break # Extract items (lines with price patterns) items = [] # Look for lines that have product names followed by prices item_pattern = r'^(.+?)\s+(\d+\.?\d*)\s*(x|\*)?\s*(\d+\.?\d*)\s*$' for line in lines: line = line.strip() # Skip empty lines and lines that are likely headers/footers if not line or any(skip_word in line.lower() for skip_word in ['total', 'subtotal', 'payment', 'change', 'receipt', 'store']): continue # Try to match item patterns match = re.match(r'(.+?)\s+(\d+\.?\d*)\s*x?\s*(\d+\.?\d*)', line, re.IGNORECASE) if match: product_name = match.group(1).strip() try: quantity = float(match.group(2)) price = float(match.group(3)) items.append((product_name, quantity, price)) except ValueError: continue # If no items found with the pattern, try simpler parsing if not items: for line in lines: line = line.strip() # Look for lines with prices (containing decimal points) price_match = re.search(r'(\d+\.?\d*)\s*$', line) if price_match and len(line.split()) > 1: # Extract product name and price parts = line.rsplit(' ', 1) if len(parts) == 2: try: product_name = parts[0].strip() price = float(parts[1]) # Assume quantity 1 if not specified items.append((product_name, 1.0, price)) except ValueError: continue return store_name, date, items def add_receipt(store_name, date, items): """ Add a receipt to the database. Args: store_name: Name of the store date: Date of the receipt (YYYY-MM-DD format) items: List of tuples (product_name, quantity, price_per_unit) """ conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() # Get or create store cursor.execute("SELECT id FROM stores WHERE name = ?", (store_name,)) store = cursor.fetchone() if not store: cursor.execute("INSERT INTO stores (name) VALUES (?)", (store_name,)) conn.commit() cursor.execute("SELECT id FROM stores WHERE name = ?", (store_name,)) store = cursor.fetchone() store_id = store[0] # Insert receipt cursor.execute("INSERT INTO receipts (store_id, date, total) VALUES (?, ?, ?)", (store_id, date, sum(item[1] * item[2] for item in items))) receipt_id = cursor.lastrowid # Insert items for product_name, quantity, price in items: # Get or create product cursor.execute("SELECT id FROM products WHERE name = ?", (product_name,)) product = cursor.fetchone() if not product: cursor.execute("INSERT INTO products (name) VALUES (?)", (product_name,)) conn.commit() cursor.execute("SELECT id FROM products WHERE name = ?", (product_name,)) product = cursor.fetchone() product_id = product[0] cursor.execute("INSERT INTO receipt_items (receipt_id, product_id, quantity, price) VALUES (?, ?, ?, ?)", (receipt_id, product_id, quantity, price)) conn.commit() conn.close() def get_product_prices(product_name): """ Get all prices for a specific product across all receipts. Args: product_name: Name of the product Returns: List of tuples (date, store_name, quantity, price_per_unit) """ conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute(''' SELECT r.date, s.name, ri.quantity, ri.price FROM receipt_items ri JOIN receipts r ON ri.receipt_id = r.id JOIN stores s ON r.store_id = s.id JOIN products p ON ri.product_id = p.id WHERE p.name = ? ORDER BY r.date ''', (product_name,)) results = cursor.fetchall() conn.close() return results def list_receipts(): """ List all receipts in the database. Returns: List of tuples (receipt_id, store_name, date, total) """ conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute(''' SELECT r.id, s.name, r.date, r.total FROM receipts r JOIN stores s ON r.store_id = s.id ORDER BY r.date DESC ''') results = cursor.fetchall() conn.close() return results # Initialize the database when the module is imported init_db() # Discord bot setup intents = discord.Intents.default() intents.message_content = True intents.members = True bot = commands.Bot(command_prefix='!', intents=intents) @bot.event async def on_ready(): print(f'{bot.user} has connected to Discord!') @bot.event async def on_message(message): # Ignore messages from the bot itself if message.author == bot.user: return # Process commands await bot.process_commands(message) # Handle receipt uploads if message.attachments: for attachment in message.attachments: if attachment.filename.lower().endswith('.pdf'): # Download the PDF pdf_bytes = await attachment.read() # Save the PDF to a receipts folder os.makedirs('receipts', exist_ok=True) file_path = os.path.join('receipts', attachment.filename) with open(file_path, 'wb') as f: f.write(pdf_bytes) # Extract text and parse the receipt try: text = extract_text_from_pdf(file_path) if text: store_name, date, items = parse_receipt_text(text) if items: add_receipt(store_name, date, items) await message.channel.send(f"Receipt '{attachment.filename}' processed! Found {len(items)} items.") else: await message.channel.send(f"Receipt '{attachment.filename}' saved but couldn't parse items. Please check the format.") else: await message.channel.send(f"Could not extract text from '{attachment.filename}'. Is it a text-based PDF?") except Exception as e: await message.channel.send(f"Error processing receipt: {str(e)}") @bot.command(name='add_receipt') async def add_receipt_command(ctx, store_name: str, date: str, *, items: str): """ Add a receipt manually. Usage: !add_receipt StoreName 2023-10-15 "Product1,2,1.50;Product2,1,2.00" """ try: # Parse items: format is "Product1,quantity,price;Product2,quantity,price" item_list = [] for item_str in items.split(';'): parts = item_str.strip().split(',') if len(parts) == 3: product_name = parts[0].strip() quantity = float(parts[1].strip()) price = float(parts[2].strip()) item_list.append((product_name, quantity, price)) add_receipt(store_name, date, item_list) await ctx.send(f"Receipt added for {store_name} on {date} with {len(item_list)} items!") except Exception as e: await ctx.send(f"Error adding receipt: {str(e)}") @bot.command(name='prices') async def prices_command(ctx, *, product_name: str): """ Get price history for a product. Usage: !prices ProductName """ try: prices = get_product_prices(product_name) if not prices: await ctx.send(f"No price history found for '{product_name}'.") return message = f"Price history for '{product_name}':\n" for date, store, qty, price in prices: message += f"- {date} at {store}: ${price:.2f} per unit (qty: {qty})\n" await ctx.send(message) except Exception as e: await ctx.send(f"Error retrieving prices: {str(e)}") @bot.command(name='receipts') async def receipts_command(ctx): """ List all receipts. Usage: !receipts """ try: receipts = list_receipts() if not receipts: await ctx.send("No receipts found in the database.") return message = "All receipts:\n" for receipt_id, store, date, total in receipts: message += f"- ID: {receipt_id}, Store: {store}, Date: {date}, Total: ${total:.2f}\n" await ctx.send(message) except Exception as e: await ctx.send(f"Error listing receipts: {str(e)}") # Run the bot if __name__ == "__main__": # You'll need to set your Discord bot token as an environment variable # or replace the token parameter with your actual token (not recommended for production) DISCORD_TOKEN = os.getenv('DISCORD_BOT_TOKEN') if not DISCORD_TOKEN: print("Error: DISCORD_BOT_TOKEN environment variable not set") exit(1) bot.run(DISCORD_TOKEN)