From 291d66405157db0d743ceb50cf1a52924f89022b Mon Sep 17 00:00:00 2001 From: egregore Date: Mon, 2 Feb 2026 11:37:55 +0000 Subject: [PATCH] Initial commit: Egregore db service PostgreSQL message storage API with asyncpg connection pooling and full-text search. Co-Authored-By: Claude Opus 4.5 --- .gitignore | 45 +++++ __init__.py | 28 ++++ main.py | 116 +++++++++++++ messages.py | 371 +++++++++++++++++++++++++++++++++++++++++ migrate_to_postgres.py | 126 ++++++++++++++ 5 files changed, 686 insertions(+) create mode 100644 .gitignore create mode 100644 __init__.py create mode 100644 main.py create mode 100644 messages.py create mode 100644 migrate_to_postgres.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..539c818 --- /dev/null +++ b/.gitignore @@ -0,0 +1,45 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +*.egg-info/ +.eggs/ +dist/ +build/ + +# Environment +.env +.env.* +*.local + +# IDE +.vscode/ +.idea/ +*.swp +*.swo + +# Secrets - BE PARANOID +*.pem +*.key +*.crt +*.p12 +credentials* +secrets* +tokens* +*_secret* +*_token* +*.credentials + +# Logs and data +*.log +*.db +*.sqlite +*.sqlite3 +*.backup +chat.db* + +# OS +.DS_Store +Thumbs.db diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..63a0c35 --- /dev/null +++ b/__init__.py @@ -0,0 +1,28 @@ +""" +Egregore Database - Message storage and retrieval + +This module handles all database operations for the chat system. +Currently SQLite, designed for easy migration to PostgreSQL later. +""" + +from .messages import ( + init_db, + save_message, + save_response_blocks, + get_messages, + get_conversation_history, + search_messages, + MESSAGE_PRIORITIES, + get_priority_for_type, +) + +__all__ = [ + "init_db", + "save_message", + "save_response_blocks", + "get_messages", + "get_conversation_history", + "search_messages", + "MESSAGE_PRIORITIES", + "get_priority_for_type", +] diff --git a/main.py b/main.py new file mode 100644 index 0000000..52cf11f --- /dev/null +++ b/main.py @@ -0,0 +1,116 @@ +#!/usr/bin/env python3 +""" +Egregore DB Service - Message storage API + +Provides HTTP API for message storage and retrieval. +Runs on port 8082. +""" + +import os +from typing import Optional + +from fastapi import FastAPI, Query +from pydantic import BaseModel + +from messages import ( + init_db, + save_message, + save_response_blocks, + get_messages, + get_conversation_history, + search_messages, + close_pool, +) + +app = FastAPI(title="Egregore DB Service", docs_url="/docs") + + +@app.on_event("startup") +async def startup(): + await init_db() + + +@app.on_event("shutdown") +async def shutdown(): + await close_pool() + + +# Request models +class SaveMessageRequest(BaseModel): + role: str + content: str + msg_type: str = "text" + group_id: Optional[str] = None + metadata: Optional[dict] = None + priority: Optional[int] = None + + +class SaveBlocksRequest(BaseModel): + blocks: list + group_id: str + + +# Endpoints +@app.post("/messages") +async def api_save_message(req: SaveMessageRequest): + """Save a single message""" + msg_id = await save_message( + role=req.role, + content=req.content, + msg_type=req.msg_type, + group_id=req.group_id, + metadata=req.metadata, + priority=req.priority + ) + return {"id": msg_id} + + +@app.post("/messages/blocks") +async def api_save_blocks(req: SaveBlocksRequest): + """Save multiple response blocks""" + saved = await save_response_blocks(req.blocks, req.group_id) + return {"messages": saved} + + +@app.get("/messages") +async def api_get_messages( + limit: int = 50, + before: Optional[int] = None, + msg_type: Optional[str] = Query(None, alias="type") +): + """Get messages with pagination""" + messages, has_more = await get_messages( + limit=min(limit, 100), + before_id=before, + msg_type=msg_type + ) + return {"messages": messages, "has_more": has_more} + + +@app.get("/messages/history") +async def api_get_history(limit: int = 100): + """Get conversation history in Claude API format""" + history = await get_conversation_history(limit=limit) + return {"history": history} + + +@app.get("/messages/search") +async def api_search( + q: str, + limit: int = 20, + msg_type: Optional[str] = Query(None, alias="type") +): + """Search messages by content""" + results = await search_messages(q, limit=limit, msg_type=msg_type) + return {"results": results} + + +@app.get("/health") +async def health(): + """Health check endpoint""" + return {"status": "ok", "service": "db"} + + +if __name__ == "__main__": + import uvicorn + uvicorn.run(app, host="127.0.0.1", port=8082) diff --git a/messages.py b/messages.py new file mode 100644 index 0000000..9752f36 --- /dev/null +++ b/messages.py @@ -0,0 +1,371 @@ +""" +Egregore Database - Message storage operations (PostgreSQL) +""" + +import json +import os +import uuid +from datetime import datetime +from typing import Optional + +import asyncpg + +# Database connection URL - can be overridden via environment +DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://egregore:egregore_db_pass@localhost/egregore") + +# Connection pool +_pool: Optional[asyncpg.Pool] = None + + +def set_db_url(url: str): + """Set the database URL (call before init_db)""" + global DATABASE_URL + DATABASE_URL = url + + +async def get_pool() -> asyncpg.Pool: + """Get or create the connection pool""" + global _pool + if _pool is None: + _pool = await asyncpg.create_pool(DATABASE_URL, min_size=2, max_size=10) + return _pool + + +async def close_pool(): + """Close the connection pool""" + global _pool + if _pool: + await _pool.close() + _pool = None + + +# Message type priorities for notifications +MESSAGE_PRIORITIES = { + "text": 2, # Regular messages - notify + "tool_use": 0, # Tool invocation - no notify + "tool_result": 0, # Tool output - no notify + "question": 3, # Questions to user - urgent notify + "mode_change": 1, # State transitions - silent + "thinking": 0, # Reasoning process - no notify + "error": 2, # Error messages - notify +} + + +def get_priority_for_type(msg_type: str, content: str = "") -> int: + """Get priority for a message type, with question detection""" + base_priority = MESSAGE_PRIORITIES.get(msg_type, 0) + if msg_type == "text" and content.strip().endswith("?"): + return MESSAGE_PRIORITIES["question"] + return base_priority + + +async def init_db(): + """Initialize PostgreSQL database with messages table""" + pool = await get_pool() + async with pool.acquire() as conn: + await conn.execute(""" + CREATE TABLE IF NOT EXISTS messages ( + id SERIAL PRIMARY KEY, + role TEXT NOT NULL, + type TEXT NOT NULL DEFAULT 'text', + content TEXT NOT NULL, + group_id TEXT, + metadata JSONB, + priority INTEGER DEFAULT 0, + timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW() + ) + """) + + # Create indexes for efficient querying + await conn.execute("CREATE INDEX IF NOT EXISTS idx_messages_role ON messages(role)") + await conn.execute("CREATE INDEX IF NOT EXISTS idx_messages_type ON messages(type)") + await conn.execute("CREATE INDEX IF NOT EXISTS idx_messages_group_id ON messages(group_id)") + await conn.execute("CREATE INDEX IF NOT EXISTS idx_messages_priority ON messages(priority)") + await conn.execute("CREATE INDEX IF NOT EXISTS idx_messages_timestamp ON messages(timestamp)") + + # Full-text search index on content + await conn.execute(""" + CREATE INDEX IF NOT EXISTS idx_messages_content_search + ON messages USING gin(to_tsvector('english', content)) + """) + + +async def save_message( + role: str, + content: str, + msg_type: str = "text", + group_id: Optional[str] = None, + metadata: Optional[dict] = None, + priority: Optional[int] = None +) -> int: + """Save a single message row""" + pool = await get_pool() + async with pool.acquire() as conn: + if priority is None: + priority = get_priority_for_type(msg_type, content) + + row = await conn.fetchrow( + """INSERT INTO messages (role, type, content, group_id, metadata, priority) + VALUES ($1, $2, $3, $4, $5, $6) + RETURNING id""", + role, msg_type, content, group_id, json.dumps(metadata) if metadata else None, priority + ) + return row['id'] + + +async def save_response_blocks(blocks: list, group_id: str) -> list: + """ + Save each response block as a separate row. + Returns list of saved message dicts with IDs for frontend. + """ + saved_messages = [] + pool = await get_pool() + async with pool.acquire() as conn: + timestamp = datetime.utcnow() + + for block in blocks: + block_type = block.get("type", "text") + content = "" + metadata = None + priority = MESSAGE_PRIORITIES.get(block_type, 0) + + if block_type == "text": + content = block.get("content", "") + if content.strip().endswith("?"): + priority = MESSAGE_PRIORITIES["question"] + elif block_type == "tool_use": + content = json.dumps(block.get("input", {})) + metadata = {"tool_name": block.get("name"), "tool_id": block.get("id")} + elif block_type == "tool_result": + content = block.get("content", "") + metadata = {"tool_name": block.get("tool_name"), "tool_use_id": block.get("tool_use_id")} + else: + content = block.get("content", "") + + row = await conn.fetchrow( + """INSERT INTO messages (role, type, content, group_id, metadata, priority, timestamp) + VALUES ($1, $2, $3, $4, $5, $6, $7) + RETURNING id""", + "assistant", block_type, content, group_id, + json.dumps(metadata) if metadata else None, priority, timestamp + ) + msg_id = row['id'] + + saved_messages.append({ + "id": msg_id, + "role": "assistant", + "type": block_type, + "content": content, + "group_id": group_id, + "metadata": metadata, + "priority": priority, + "timestamp": timestamp.isoformat() + }) + + return saved_messages + + +async def get_messages( + limit: int = 50, + before_id: int = None, + msg_type: str = None +) -> tuple[list[dict], bool]: + """Get messages with pagination. Returns (messages, has_more)""" + pool = await get_pool() + async with pool.acquire() as conn: + params = [] + where_clauses = [] + param_idx = 1 + + if before_id: + where_clauses.append(f"id < ${param_idx}") + params.append(before_id) + param_idx += 1 + if msg_type: + where_clauses.append(f"type = ${param_idx}") + params.append(msg_type) + param_idx += 1 + + where_sql = " AND ".join(where_clauses) if where_clauses else "TRUE" + params.append(limit + 1) + + rows = await conn.fetch( + f"""SELECT id, role, type, content, group_id, metadata, priority, timestamp + FROM messages + WHERE {where_sql} + ORDER BY id DESC + LIMIT ${param_idx}""", + *params + ) + + has_more = len(rows) > limit + rows = rows[:limit] + + messages = [] + for row in rows: + metadata = None + if row['metadata']: + try: + metadata = json.loads(row['metadata']) if isinstance(row['metadata'], str) else row['metadata'] + except: + pass + messages.append({ + "id": row['id'], + "role": row['role'], + "type": row['type'], + "content": row['content'], + "group_id": row['group_id'], + "metadata": metadata, + "priority": row['priority'], + "timestamp": row['timestamp'].isoformat() if row['timestamp'] else None + }) + + return list(reversed(messages)), has_more + + +async def get_conversation_history(limit: int = 100) -> list[dict]: + """ + Reconstruct Claude API message format from individual rows. + Groups assistant messages by group_id to build proper content arrays. + """ + messages, _ = await get_messages(limit) + + api_messages = [] + current_group = None + current_assistant_content = [] + + for msg in messages: + if msg["role"] == "user": + if current_assistant_content: + api_messages.append({ + "role": "assistant", + "content": current_assistant_content + }) + current_assistant_content = [] + current_group = None + + api_messages.append({ + "role": "user", + "content": msg["content"] + }) + elif msg["role"] == "assistant": + if msg["group_id"] != current_group: + if current_assistant_content: + api_messages.append({ + "role": "assistant", + "content": current_assistant_content + }) + current_assistant_content = [] + current_group = msg["group_id"] + + if msg["type"] == "text": + current_assistant_content.append({ + "type": "text", + "text": msg["content"] + }) + elif msg["type"] == "tool_use": + tool_input = {} + try: + tool_input = json.loads(msg["content"]) + except: + pass + metadata = msg.get("metadata") or {} + current_assistant_content.append({ + "type": "tool_use", + "id": metadata.get("tool_id", str(uuid.uuid4())), + "name": metadata.get("tool_name", "unknown"), + "input": tool_input + }) + elif msg["type"] == "tool_result": + if current_assistant_content: + api_messages.append({ + "role": "assistant", + "content": current_assistant_content + }) + current_assistant_content = [] + metadata = msg.get("metadata") or {} + api_messages.append({ + "role": "user", + "content": [{ + "type": "tool_result", + "tool_use_id": metadata.get("tool_use_id", ""), + "content": msg["content"] + }] + }) + + if current_assistant_content: + api_messages.append({ + "role": "assistant", + "content": current_assistant_content + }) + + return api_messages + + +async def search_messages( + query: str, + limit: int = 20, + msg_type: str = None +) -> list[dict]: + """Search messages using PostgreSQL full-text search""" + if len(query) < 2: + return [] + + pool = await get_pool() + async with pool.acquire() as conn: + params = [query] + param_idx = 2 + type_filter = "" + + if msg_type: + type_filter = f"AND type = ${param_idx}" + params.append(msg_type) + param_idx += 1 + + params.append(min(limit, 50)) + + # Use PostgreSQL full-text search with fallback to ILIKE + rows = await conn.fetch( + f"""SELECT id, role, type, content, group_id, metadata, priority, timestamp, + ts_headline('english', content, plainto_tsquery('english', $1), + 'StartSel=**, StopSel=**, MaxWords=50, MinWords=20') as snippet + FROM messages + WHERE (to_tsvector('english', content) @@ plainto_tsquery('english', $1) + OR content ILIKE '%' || $1 || '%') + {type_filter} + ORDER BY id DESC + LIMIT ${param_idx}""", + *params + ) + + results = [] + for row in rows: + metadata = None + if row['metadata']: + try: + metadata = json.loads(row['metadata']) if isinstance(row['metadata'], str) else row['metadata'] + except: + pass + + # Use ts_headline snippet, fallback to manual snippet + snippet = row['snippet'] if row['snippet'] else row['content'][:100] + + results.append({ + "id": row['id'], + "role": row['role'], + "type": row['type'], + "content": row['content'], + "group_id": row['group_id'], + "metadata": metadata, + "priority": row['priority'], + "timestamp": row['timestamp'].isoformat() if row['timestamp'] else None, + "snippet": snippet + }) + + return results + + +# Legacy compatibility - for migration +def set_db_path(path: str): + """Legacy function for SQLite compatibility - ignored for PostgreSQL""" + pass diff --git a/migrate_to_postgres.py b/migrate_to_postgres.py new file mode 100644 index 0000000..7f0bdc6 --- /dev/null +++ b/migrate_to_postgres.py @@ -0,0 +1,126 @@ +#!/usr/bin/env python3 +""" +Migration script: SQLite to PostgreSQL + +Migrates data from the old SQLite database to PostgreSQL. +Run once after setting up PostgreSQL. +""" + +import asyncio +import json +import os +import sqlite3 +from datetime import datetime + +import asyncpg + +# Paths +SQLITE_DB = "/home/admin/services/db/chat.db" +POSTGRES_URL = os.getenv("DATABASE_URL", "postgresql://egregore:egregore_db_pass@localhost/egregore") + + +async def migrate(): + print(f"Connecting to PostgreSQL...") + pool = await asyncpg.create_pool(POSTGRES_URL) + + async with pool.acquire() as conn: + # Create table if not exists + print("Creating messages table...") + await conn.execute(""" + CREATE TABLE IF NOT EXISTS messages ( + id SERIAL PRIMARY KEY, + role TEXT NOT NULL, + type TEXT NOT NULL DEFAULT 'text', + content TEXT NOT NULL, + group_id TEXT, + metadata JSONB, + priority INTEGER DEFAULT 0, + timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW() + ) + """) + + # Create indexes + await conn.execute("CREATE INDEX IF NOT EXISTS idx_messages_role ON messages(role)") + await conn.execute("CREATE INDEX IF NOT EXISTS idx_messages_type ON messages(type)") + await conn.execute("CREATE INDEX IF NOT EXISTS idx_messages_group_id ON messages(group_id)") + await conn.execute("CREATE INDEX IF NOT EXISTS idx_messages_priority ON messages(priority)") + await conn.execute("CREATE INDEX IF NOT EXISTS idx_messages_timestamp ON messages(timestamp)") + await conn.execute(""" + CREATE INDEX IF NOT EXISTS idx_messages_content_search + ON messages USING gin(to_tsvector('english', content)) + """) + + # Check if SQLite database exists + if not os.path.exists(SQLITE_DB): + print(f"No SQLite database found at {SQLITE_DB}, nothing to migrate.") + await pool.close() + return + + # Connect to SQLite + print(f"Reading from SQLite: {SQLITE_DB}") + sqlite_conn = sqlite3.connect(SQLITE_DB) + sqlite_conn.row_factory = sqlite3.Row + cursor = sqlite_conn.cursor() + + # Check if messages_v2 table exists + cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='messages_v2'") + if not cursor.fetchone(): + print("No messages_v2 table in SQLite, nothing to migrate.") + sqlite_conn.close() + await pool.close() + return + + # Fetch all messages from SQLite + cursor.execute(""" + SELECT id, role, type, content, group_id, metadata, priority, timestamp + FROM messages_v2 + ORDER BY id ASC + """) + rows = cursor.fetchall() + print(f"Found {len(rows)} messages to migrate") + + if not rows: + print("No messages to migrate.") + sqlite_conn.close() + await pool.close() + return + + # Insert into PostgreSQL + async with pool.acquire() as conn: + migrated = 0 + for row in rows: + timestamp = datetime.fromisoformat(row['timestamp']) if row['timestamp'] else datetime.utcnow() + + await conn.execute( + """INSERT INTO messages (role, type, content, group_id, metadata, priority, timestamp) + VALUES ($1, $2, $3, $4, $5, $6, $7)""", + row['role'], + row['type'], + row['content'], + row['group_id'], + row['metadata'], # Already JSON string, PostgreSQL will handle it + row['priority'], + timestamp + ) + migrated += 1 + + print(f"Migrated {migrated} messages to PostgreSQL") + + # Verify + async with pool.acquire() as conn: + count = await conn.fetchval("SELECT COUNT(*) FROM messages") + print(f"PostgreSQL now has {count} messages") + + sqlite_conn.close() + await pool.close() + + # Backup SQLite database + backup_path = SQLITE_DB + ".backup" + print(f"Backing up SQLite database to {backup_path}") + os.rename(SQLITE_DB, backup_path) + + print("Migration complete!") + + +if __name__ == "__main__": + asyncio.run(migrate())