PostgreSQL message storage API with asyncpg connection pooling and full-text search. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
126 lines
4.1 KiB
Python
126 lines
4.1 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Migration script: SQLite to PostgreSQL
|
|
|
|
Migrates data from the old SQLite database to PostgreSQL.
|
|
Run once after setting up PostgreSQL.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import os
|
|
import sqlite3
|
|
from datetime import datetime
|
|
|
|
import asyncpg
|
|
|
|
# Paths
|
|
SQLITE_DB = "/home/admin/services/db/chat.db"
|
|
POSTGRES_URL = os.getenv("DATABASE_URL", "postgresql://egregore:egregore_db_pass@localhost/egregore")
|
|
|
|
|
|
async def migrate():
|
|
print(f"Connecting to PostgreSQL...")
|
|
pool = await asyncpg.create_pool(POSTGRES_URL)
|
|
|
|
async with pool.acquire() as conn:
|
|
# Create table if not exists
|
|
print("Creating messages table...")
|
|
await conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS messages (
|
|
id SERIAL PRIMARY KEY,
|
|
role TEXT NOT NULL,
|
|
type TEXT NOT NULL DEFAULT 'text',
|
|
content TEXT NOT NULL,
|
|
group_id TEXT,
|
|
metadata JSONB,
|
|
priority INTEGER DEFAULT 0,
|
|
timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
|
)
|
|
""")
|
|
|
|
# Create indexes
|
|
await conn.execute("CREATE INDEX IF NOT EXISTS idx_messages_role ON messages(role)")
|
|
await conn.execute("CREATE INDEX IF NOT EXISTS idx_messages_type ON messages(type)")
|
|
await conn.execute("CREATE INDEX IF NOT EXISTS idx_messages_group_id ON messages(group_id)")
|
|
await conn.execute("CREATE INDEX IF NOT EXISTS idx_messages_priority ON messages(priority)")
|
|
await conn.execute("CREATE INDEX IF NOT EXISTS idx_messages_timestamp ON messages(timestamp)")
|
|
await conn.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_messages_content_search
|
|
ON messages USING gin(to_tsvector('english', content))
|
|
""")
|
|
|
|
# Check if SQLite database exists
|
|
if not os.path.exists(SQLITE_DB):
|
|
print(f"No SQLite database found at {SQLITE_DB}, nothing to migrate.")
|
|
await pool.close()
|
|
return
|
|
|
|
# Connect to SQLite
|
|
print(f"Reading from SQLite: {SQLITE_DB}")
|
|
sqlite_conn = sqlite3.connect(SQLITE_DB)
|
|
sqlite_conn.row_factory = sqlite3.Row
|
|
cursor = sqlite_conn.cursor()
|
|
|
|
# Check if messages_v2 table exists
|
|
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='messages_v2'")
|
|
if not cursor.fetchone():
|
|
print("No messages_v2 table in SQLite, nothing to migrate.")
|
|
sqlite_conn.close()
|
|
await pool.close()
|
|
return
|
|
|
|
# Fetch all messages from SQLite
|
|
cursor.execute("""
|
|
SELECT id, role, type, content, group_id, metadata, priority, timestamp
|
|
FROM messages_v2
|
|
ORDER BY id ASC
|
|
""")
|
|
rows = cursor.fetchall()
|
|
print(f"Found {len(rows)} messages to migrate")
|
|
|
|
if not rows:
|
|
print("No messages to migrate.")
|
|
sqlite_conn.close()
|
|
await pool.close()
|
|
return
|
|
|
|
# Insert into PostgreSQL
|
|
async with pool.acquire() as conn:
|
|
migrated = 0
|
|
for row in rows:
|
|
timestamp = datetime.fromisoformat(row['timestamp']) if row['timestamp'] else datetime.utcnow()
|
|
|
|
await conn.execute(
|
|
"""INSERT INTO messages (role, type, content, group_id, metadata, priority, timestamp)
|
|
VALUES ($1, $2, $3, $4, $5, $6, $7)""",
|
|
row['role'],
|
|
row['type'],
|
|
row['content'],
|
|
row['group_id'],
|
|
row['metadata'], # Already JSON string, PostgreSQL will handle it
|
|
row['priority'],
|
|
timestamp
|
|
)
|
|
migrated += 1
|
|
|
|
print(f"Migrated {migrated} messages to PostgreSQL")
|
|
|
|
# Verify
|
|
async with pool.acquire() as conn:
|
|
count = await conn.fetchval("SELECT COUNT(*) FROM messages")
|
|
print(f"PostgreSQL now has {count} messages")
|
|
|
|
sqlite_conn.close()
|
|
await pool.close()
|
|
|
|
# Backup SQLite database
|
|
backup_path = SQLITE_DB + ".backup"
|
|
print(f"Backing up SQLite database to {backup_path}")
|
|
os.rename(SQLITE_DB, backup_path)
|
|
|
|
print("Migration complete!")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(migrate())
|