Files

354 lines
12 KiB
Python
Raw Permalink Normal View History

import sqlite3
from datetime import datetime
import os
import re
import discord
from discord.ext import commands
import io
import pdfplumber
DB_PATH = "grocery_receipts.db"
def init_db():
"""Initialize the database with required tables."""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# Create stores table
cursor.execute('''
CREATE TABLE IF NOT EXISTS stores (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE
)
''')
# Create products table
cursor.execute('''
CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE
)
''')
# Create receipts table
cursor.execute('''
CREATE TABLE IF NOT EXISTS receipts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
store_id INTEGER NOT NULL,
date DATE NOT NULL,
total REAL NOT NULL,
FOREIGN KEY (store_id) REFERENCES stores(id)
)
''')
# Create receipt_items table (linking products to receipts with prices)
cursor.execute('''
CREATE TABLE IF NOT EXISTS receipt_items (
id INTEGER PRIMARY KEY AUTOINCREMENT,
receipt_id INTEGER NOT NULL,
product_id INTEGER NOT NULL,
quantity REAL NOT NULL,
price REAL NOT NULL,
FOREIGN KEY (receipt_id) REFERENCES receipts(id),
FOREIGN KEY (product_id) REFERENCES products(id)
)
''')
conn.commit()
conn.close()
def extract_text_from_pdf(pdf_path):
"""Extract text from a PDF file using pdfplumber."""
text = ""
try:
with pdfplumber.open(pdf_path) as pdf:
for page in pdf.pages:
text += page.extract_text() or ""
except Exception as e:
print(f"Error extracting text from PDF: {e}")
return ""
return text
def parse_receipt_text(text):
"""
Parse receipt text to extract store name, date, and items.
This is a basic parser that can be improved with more sophisticated logic.
Returns:
Tuple of (store_name, date, items_list)
"""
# Extract date (looking for common date patterns)
date_pattern = r'\d{1,2}[/-]\d{1,2}[/-]\d{2,4}'
date_match = re.search(date_pattern, text)
date = date_match.group() if date_match else datetime.now().strftime('%Y-%m-%d')
# Try to extract store name (first line or lines containing common store keywords)
lines = text.split('\n')
store_name = "Unknown Store"
for line in lines[:5]: # Check first few lines
if any(keyword in line.lower() for keyword in ['supermarket', 'store', 'grocery', 'market', 'shop', 'saint', 'sainte']):
store_name = line.strip()
break
# Extract items (lines with price patterns)
items = []
# Look for lines that have product names followed by prices
item_pattern = r'^(.+?)\s+(\d+\.?\d*)\s*(x|\*)?\s*(\d+\.?\d*)\s*$'
for line in lines:
line = line.strip()
# Skip empty lines and lines that are likely headers/footers
if not line or any(skip_word in line.lower() for skip_word in ['total', 'subtotal', 'payment', 'change', 'receipt', 'store']):
continue
# Try to match item patterns
match = re.match(r'(.+?)\s+(\d+\.?\d*)\s*x?\s*(\d+\.?\d*)', line, re.IGNORECASE)
if match:
product_name = match.group(1).strip()
try:
quantity = float(match.group(2))
price = float(match.group(3))
items.append((product_name, quantity, price))
except ValueError:
continue
# If no items found with the pattern, try simpler parsing
if not items:
for line in lines:
line = line.strip()
# Look for lines with prices (containing decimal points)
price_match = re.search(r'(\d+\.?\d*)\s*$', line)
if price_match and len(line.split()) > 1:
# Extract product name and price
parts = line.rsplit(' ', 1)
if len(parts) == 2:
try:
product_name = parts[0].strip()
price = float(parts[1])
# Assume quantity 1 if not specified
items.append((product_name, 1.0, price))
except ValueError:
continue
return store_name, date, items
def add_receipt(store_name, date, items):
"""
Add a receipt to the database.
Args:
store_name: Name of the store
date: Date of the receipt (YYYY-MM-DD format)
items: List of tuples (product_name, quantity, price_per_unit)
"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# Get or create store
cursor.execute("SELECT id FROM stores WHERE name = ?", (store_name,))
store = cursor.fetchone()
if not store:
cursor.execute("INSERT INTO stores (name) VALUES (?)", (store_name,))
conn.commit()
cursor.execute("SELECT id FROM stores WHERE name = ?", (store_name,))
store = cursor.fetchone()
store_id = store[0]
# Insert receipt
cursor.execute("INSERT INTO receipts (store_id, date, total) VALUES (?, ?, ?)",
(store_id, date, sum(item[1] * item[2] for item in items)))
receipt_id = cursor.lastrowid
# Insert items
for product_name, quantity, price in items:
# Get or create product
cursor.execute("SELECT id FROM products WHERE name = ?", (product_name,))
product = cursor.fetchone()
if not product:
cursor.execute("INSERT INTO products (name) VALUES (?)", (product_name,))
conn.commit()
cursor.execute("SELECT id FROM products WHERE name = ?", (product_name,))
product = cursor.fetchone()
product_id = product[0]
cursor.execute("INSERT INTO receipt_items (receipt_id, product_id, quantity, price) VALUES (?, ?, ?, ?)",
(receipt_id, product_id, quantity, price))
conn.commit()
conn.close()
def get_product_prices(product_name):
"""
Get all prices for a specific product across all receipts.
Args:
product_name: Name of the product
Returns:
List of tuples (date, store_name, quantity, price_per_unit)
"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''
SELECT r.date, s.name, ri.quantity, ri.price
FROM receipt_items ri
JOIN receipts r ON ri.receipt_id = r.id
JOIN stores s ON r.store_id = s.id
JOIN products p ON ri.product_id = p.id
WHERE p.name = ?
ORDER BY r.date
''', (product_name,))
results = cursor.fetchall()
conn.close()
return results
def list_receipts():
"""
List all receipts in the database.
Returns:
List of tuples (receipt_id, store_name, date, total)
"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''
SELECT r.id, s.name, r.date, r.total
FROM receipts r
JOIN stores s ON r.store_id = s.id
ORDER BY r.date DESC
''')
results = cursor.fetchall()
conn.close()
return results
# Initialize the database when the module is imported
init_db()
# Discord bot setup
intents = discord.Intents.default()
intents.message_content = True
intents.members = True
bot = commands.Bot(command_prefix='!', intents=intents)
@bot.event
async def on_ready():
print(f'{bot.user} has connected to Discord!')
@bot.event
async def on_message(message):
# Ignore messages from the bot itself
if message.author == bot.user:
return
# Process commands
await bot.process_commands(message)
# Handle receipt uploads
if message.attachments:
for attachment in message.attachments:
if attachment.filename.lower().endswith('.pdf'):
# Download the PDF
pdf_bytes = await attachment.read()
# Save the PDF to a receipts folder
os.makedirs('receipts', exist_ok=True)
file_path = os.path.join('receipts', attachment.filename)
with open(file_path, 'wb') as f:
f.write(pdf_bytes)
# Extract text and parse the receipt
try:
text = extract_text_from_pdf(file_path)
if text:
store_name, date, items = parse_receipt_text(text)
if items:
add_receipt(store_name, date, items)
await message.channel.send(f"Receipt '{attachment.filename}' processed! Found {len(items)} items.")
else:
await message.channel.send(f"Receipt '{attachment.filename}' saved but couldn't parse items. Please check the format.")
else:
await message.channel.send(f"Could not extract text from '{attachment.filename}'. Is it a text-based PDF?")
except Exception as e:
await message.channel.send(f"Error processing receipt: {str(e)}")
@bot.command(name='add_receipt')
async def add_receipt_command(ctx, store_name: str, date: str, *, items: str):
"""
Add a receipt manually.
Usage: !add_receipt StoreName 2023-10-15 "Product1,2,1.50;Product2,1,2.00"
"""
try:
# Parse items: format is "Product1,quantity,price;Product2,quantity,price"
item_list = []
for item_str in items.split(';'):
parts = item_str.strip().split(',')
if len(parts) == 3:
product_name = parts[0].strip()
quantity = float(parts[1].strip())
price = float(parts[2].strip())
item_list.append((product_name, quantity, price))
add_receipt(store_name, date, item_list)
await ctx.send(f"Receipt added for {store_name} on {date} with {len(item_list)} items!")
except Exception as e:
await ctx.send(f"Error adding receipt: {str(e)}")
@bot.command(name='prices')
async def prices_command(ctx, *, product_name: str):
"""
Get price history for a product.
Usage: !prices ProductName
"""
try:
prices = get_product_prices(product_name)
if not prices:
await ctx.send(f"No price history found for '{product_name}'.")
return
message = f"Price history for '{product_name}':\n"
for date, store, qty, price in prices:
message += f"- {date} at {store}: ${price:.2f} per unit (qty: {qty})\n"
await ctx.send(message)
except Exception as e:
await ctx.send(f"Error retrieving prices: {str(e)}")
@bot.command(name='receipts')
async def receipts_command(ctx):
"""
List all receipts.
Usage: !receipts
"""
try:
receipts = list_receipts()
if not receipts:
await ctx.send("No receipts found in the database.")
return
message = "All receipts:\n"
for receipt_id, store, date, total in receipts:
message += f"- ID: {receipt_id}, Store: {store}, Date: {date}, Total: ${total:.2f}\n"
await ctx.send(message)
except Exception as e:
await ctx.send(f"Error listing receipts: {str(e)}")
# Run the bot
if __name__ == "__main__":
# You'll need to set your Discord bot token as an environment variable
# or replace the token parameter with your actual token (not recommended for production)
DISCORD_TOKEN = os.getenv('DISCORD_BOT_TOKEN')
if not DISCORD_TOKEN:
print("Error: DISCORD_BOT_TOKEN environment variable not set")
exit(1)
bot.run(DISCORD_TOKEN)