recall/main.py

117 lines
2.6 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""
Egregore DB Service - Message storage API
Provides HTTP API for message storage and retrieval.
Runs on port 8082.
"""
import os
from typing import Optional
from fastapi import FastAPI, Query
from pydantic import BaseModel
from messages import (
init_db,
save_message,
save_response_blocks,
get_messages,
get_conversation_history,
search_messages,
close_pool,
)
app = FastAPI(title="Egregore DB Service", docs_url="/docs")
@app.on_event("startup")
async def startup():
await init_db()
@app.on_event("shutdown")
async def shutdown():
await close_pool()
# Request models
class SaveMessageRequest(BaseModel):
role: str
content: str
msg_type: str = "text"
group_id: Optional[str] = None
metadata: Optional[dict] = None
priority: Optional[int] = None
class SaveBlocksRequest(BaseModel):
blocks: list
group_id: str
# Endpoints
@app.post("/messages")
async def api_save_message(req: SaveMessageRequest):
"""Save a single message"""
msg_id = await save_message(
role=req.role,
content=req.content,
msg_type=req.msg_type,
group_id=req.group_id,
metadata=req.metadata,
priority=req.priority
)
return {"id": msg_id}
@app.post("/messages/blocks")
async def api_save_blocks(req: SaveBlocksRequest):
"""Save multiple response blocks"""
saved = await save_response_blocks(req.blocks, req.group_id)
return {"messages": saved}
@app.get("/messages")
async def api_get_messages(
limit: int = 50,
before: Optional[int] = None,
msg_type: Optional[str] = Query(None, alias="type")
):
"""Get messages with pagination"""
messages, has_more = await get_messages(
limit=min(limit, 100),
before_id=before,
msg_type=msg_type
)
return {"messages": messages, "has_more": has_more}
@app.get("/messages/history")
async def api_get_history(limit: int = 100):
"""Get conversation history in Claude API format"""
history = await get_conversation_history(limit=limit)
return {"history": history}
@app.get("/messages/search")
async def api_search(
q: str,
limit: int = 20,
msg_type: Optional[str] = Query(None, alias="type")
):
"""Search messages by content"""
results = await search_messages(q, limit=limit, msg_type=msg_type)
return {"results": results}
@app.get("/health")
async def health():
"""Health check endpoint"""
return {"status": "ok", "service": "db"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=8082)