recall/migrate_to_postgres.py

127 lines
4.1 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""
Migration script: SQLite to PostgreSQL
Migrates data from the old SQLite database to PostgreSQL.
Run once after setting up PostgreSQL.
"""
import asyncio
import json
import os
import sqlite3
from datetime import datetime
import asyncpg
# Paths
SQLITE_DB = "/home/admin/services/db/chat.db"
POSTGRES_URL = os.getenv("DATABASE_URL", "postgresql://egregore:egregore_db_pass@localhost/egregore")
async def migrate():
print(f"Connecting to PostgreSQL...")
pool = await asyncpg.create_pool(POSTGRES_URL)
async with pool.acquire() as conn:
# Create table if not exists
print("Creating messages table...")
await conn.execute("""
CREATE TABLE IF NOT EXISTS messages (
id SERIAL PRIMARY KEY,
role TEXT NOT NULL,
type TEXT NOT NULL DEFAULT 'text',
content TEXT NOT NULL,
group_id TEXT,
metadata JSONB,
priority INTEGER DEFAULT 0,
timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW()
)
""")
# Create indexes
await conn.execute("CREATE INDEX IF NOT EXISTS idx_messages_role ON messages(role)")
await conn.execute("CREATE INDEX IF NOT EXISTS idx_messages_type ON messages(type)")
await conn.execute("CREATE INDEX IF NOT EXISTS idx_messages_group_id ON messages(group_id)")
await conn.execute("CREATE INDEX IF NOT EXISTS idx_messages_priority ON messages(priority)")
await conn.execute("CREATE INDEX IF NOT EXISTS idx_messages_timestamp ON messages(timestamp)")
await conn.execute("""
CREATE INDEX IF NOT EXISTS idx_messages_content_search
ON messages USING gin(to_tsvector('english', content))
""")
# Check if SQLite database exists
if not os.path.exists(SQLITE_DB):
print(f"No SQLite database found at {SQLITE_DB}, nothing to migrate.")
await pool.close()
return
# Connect to SQLite
print(f"Reading from SQLite: {SQLITE_DB}")
sqlite_conn = sqlite3.connect(SQLITE_DB)
sqlite_conn.row_factory = sqlite3.Row
cursor = sqlite_conn.cursor()
# Check if messages_v2 table exists
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='messages_v2'")
if not cursor.fetchone():
print("No messages_v2 table in SQLite, nothing to migrate.")
sqlite_conn.close()
await pool.close()
return
# Fetch all messages from SQLite
cursor.execute("""
SELECT id, role, type, content, group_id, metadata, priority, timestamp
FROM messages_v2
ORDER BY id ASC
""")
rows = cursor.fetchall()
print(f"Found {len(rows)} messages to migrate")
if not rows:
print("No messages to migrate.")
sqlite_conn.close()
await pool.close()
return
# Insert into PostgreSQL
async with pool.acquire() as conn:
migrated = 0
for row in rows:
timestamp = datetime.fromisoformat(row['timestamp']) if row['timestamp'] else datetime.utcnow()
await conn.execute(
"""INSERT INTO messages (role, type, content, group_id, metadata, priority, timestamp)
VALUES ($1, $2, $3, $4, $5, $6, $7)""",
row['role'],
row['type'],
row['content'],
row['group_id'],
row['metadata'], # Already JSON string, PostgreSQL will handle it
row['priority'],
timestamp
)
migrated += 1
print(f"Migrated {migrated} messages to PostgreSQL")
# Verify
async with pool.acquire() as conn:
count = await conn.fetchval("SELECT COUNT(*) FROM messages")
print(f"PostgreSQL now has {count} messages")
sqlite_conn.close()
await pool.close()
# Backup SQLite database
backup_path = SQLITE_DB + ".backup"
print(f"Backing up SQLite database to {backup_path}")
os.rename(SQLITE_DB, backup_path)
print("Migration complete!")
if __name__ == "__main__":
asyncio.run(migrate())