commit 10e9da9b32d0ea3c48d5c6ca17c6c95ecde4712b Author: egregore Date: Mon Feb 2 19:51:16 2026 +0000 Initial relay service setup API gateway that relays authenticated requests to backend services. Features JWT auth, rate limiting, scope-based access control. Co-Authored-By: Claude Opus 4.5 diff --git a/__pycache__/main.cpython-311.pyc b/__pycache__/main.cpython-311.pyc new file mode 100644 index 0000000..73d59e6 Binary files /dev/null and b/__pycache__/main.cpython-311.pyc differ diff --git a/main.py b/main.py new file mode 100644 index 0000000..063c8ad --- /dev/null +++ b/main.py @@ -0,0 +1,314 @@ +#!/usr/bin/env python3 +""" +Egregore Relay Service - API Gateway + +Relays authenticated API requests to backend services. +Runs on port 8085. +""" + +import os +import secrets +from datetime import datetime, timedelta, timezone +from typing import Optional + +from fastapi import FastAPI, HTTPException, Depends, Request +from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials +from pydantic import BaseModel +from dotenv import load_dotenv +from slowapi import Limiter, _rate_limit_exceeded_handler +from slowapi.util import get_remote_address +from slowapi.errors import RateLimitExceeded +import httpx +import jwt +import bcrypt + +# Load environment +load_dotenv("/home/admin/.env") + +JWT_SECRET = os.getenv("JWT_SECRET", secrets.token_hex(32)) +JWT_EXPIRY = int(os.getenv("JWT_EXPIRY", "3600")) +GATEWAY_PORT = int(os.getenv("GATEWAY_PORT", "8085")) + +# Backend service URLs +REASON_URL = os.getenv("REASON_URL", "http://127.0.0.1:8081") +RECALL_URL = os.getenv("RECALL_URL", "http://127.0.0.1:8082") +CONVERSE_URL = os.getenv("CONVERSE_URL", "http://127.0.0.1:8080") + +# Rate limiting +limiter = Limiter(key_func=get_remote_address) + +app = FastAPI( + title="Egregore Relay Service", + version="1.0.0", + docs_url="/docs" +) +app.state.limiter = limiter +app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) + +# Security +bearer_scheme = HTTPBearer(auto_error=False) + +# HTTP client for backend services +http_client = httpx.AsyncClient(timeout=120.0) + + +# In-memory API key store (replace with DB in production) +# Format: api_key_hash -> {client_id, scopes, rate_limit} +API_CLIENTS = {} + + +# Request/Response models +class TokenRequest(BaseModel): + api_key: str + + +class TokenResponse(BaseModel): + token: str + expires_in: int + token_type: str = "Bearer" + + +class ChatRequest(BaseModel): + message: str + model: str = "claude-sonnet-4-20250514" + max_iterations: int = 10 + + +class ErrorResponse(BaseModel): + error: str + message: str + details: Optional[dict] = None + + +# Helper functions +def verify_api_key(api_key: str) -> Optional[dict]: + """Verify API key and return client info""" + for key_hash, client in API_CLIENTS.items(): + if bcrypt.checkpw(api_key.encode(), key_hash.encode()): + return client + return None + + +def create_jwt(client_id: str, scopes: list) -> str: + """Create a JWT token for the client""" + now = datetime.now(timezone.utc) + payload = { + "sub": client_id, + "iss": "egregore", + "iat": now, + "exp": now + timedelta(seconds=JWT_EXPIRY), + "scope": scopes + } + return jwt.encode(payload, JWT_SECRET, algorithm="HS256") + + +def verify_jwt(token: str) -> Optional[dict]: + """Verify JWT and return payload""" + try: + payload = jwt.decode(token, JWT_SECRET, algorithms=["HS256"]) + return payload + except jwt.ExpiredSignatureError: + return None + except jwt.InvalidTokenError: + return None + + +async def get_current_client( + credentials: HTTPAuthorizationCredentials = Depends(bearer_scheme) +) -> dict: + """Dependency to get current authenticated client from JWT""" + if not credentials: + raise HTTPException( + status_code=401, + detail={"error": "invalid_token", "message": "Missing authorization header"} + ) + + payload = verify_jwt(credentials.credentials) + if not payload: + raise HTTPException( + status_code=401, + detail={"error": "invalid_token", "message": "Invalid or expired token"} + ) + + return payload + + +def require_scope(required: str): + """Dependency factory to check for required scope""" + async def check_scope(client: dict = Depends(get_current_client)): + scopes = client.get("scope", []) + if "*" in scopes or required in scopes: + return client + raise HTTPException( + status_code=403, + detail={"error": "insufficient_scope", "message": f"Requires '{required}' scope"} + ) + return check_scope + + +# Public endpoints +@app.get("/health") +async def health(): + """Health check with backend status""" + reason_ok = False + recall_ok = False + + try: + resp = await http_client.get(f"{REASON_URL}/health", timeout=2.0) + reason_ok = resp.status_code == 200 + except: + pass + + try: + resp = await http_client.get(f"{RECALL_URL}/health", timeout=2.0) + recall_ok = resp.status_code == 200 + except: + pass + + return { + "status": "ok" if (reason_ok and recall_ok) else "degraded", + "service": "relay", + "backends": { + "reason": "ok" if reason_ok else "unavailable", + "recall": "ok" if recall_ok else "unavailable" + } + } + + +@app.post("/auth/token", response_model=TokenResponse) +async def get_token(req: TokenRequest): + """Exchange API key for JWT token""" + client = verify_api_key(req.api_key) + if not client: + raise HTTPException( + status_code=401, + detail={"error": "invalid_api_key", "message": "API key not found or revoked"} + ) + + token = create_jwt(client["client_id"], client["scopes"]) + return TokenResponse(token=token, expires_in=JWT_EXPIRY) + + +# Protected endpoints +@app.post("/v1/chat") +@limiter.limit("10/minute") +async def chat( + request: Request, + req: ChatRequest, + client: dict = Depends(require_scope("chat")) +): + """Send a message and get AI response""" + try: + # Get conversation history + history_resp = await http_client.get(f"{RECALL_URL}/messages/history") + history = history_resp.json().get("history", []) + + # Add user message to history + history.append({"role": "user", "content": req.message}) + + # Process with reason service + reason_resp = await http_client.post( + f"{REASON_URL}/process", + json={ + "model": req.model, + "history": history, + "max_iterations": req.max_iterations + } + ) + + if reason_resp.status_code != 200: + raise HTTPException( + status_code=502, + detail={"error": "backend_error", "message": "Reason service error"} + ) + + return reason_resp.json() + + except httpx.RequestError as e: + raise HTTPException( + status_code=503, + detail={"error": "backend_unavailable", "message": str(e)} + ) + + +@app.get("/v1/history") +async def get_history( + limit: int = 50, + before: Optional[int] = None, + client: dict = Depends(require_scope("history")) +): + """Get message history with pagination""" + params = {"limit": min(limit, 100)} + if before: + params["before"] = before + + try: + resp = await http_client.get(f"{RECALL_URL}/messages", params=params) + return resp.json() + except httpx.RequestError as e: + raise HTTPException( + status_code=503, + detail={"error": "backend_unavailable", "message": str(e)} + ) + + +@app.get("/v1/history/search") +async def search_history( + q: str, + limit: int = 20, + client: dict = Depends(require_scope("history")) +): + """Search message history""" + try: + resp = await http_client.get( + f"{RECALL_URL}/messages/search", + params={"q": q, "limit": limit} + ) + return resp.json() + except httpx.RequestError as e: + raise HTTPException( + status_code=503, + detail={"error": "backend_unavailable", "message": str(e)} + ) + + +@app.get("/v1/tools") +async def get_tools(client: dict = Depends(require_scope("tools"))): + """Get available AI tools""" + try: + resp = await http_client.get(f"{REASON_URL}/tools") + return resp.json() + except httpx.RequestError as e: + raise HTTPException( + status_code=503, + detail={"error": "backend_unavailable", "message": str(e)} + ) + + +# Admin endpoint to register API keys (temporary, move to proper admin) +@app.post("/admin/clients") +async def register_client( + client_id: str, + scopes: list = ["chat", "history"], +): + """Register a new API client (temporary admin endpoint)""" + # Generate API key + api_key = f"eg_{secrets.token_hex(20)}" + key_hash = bcrypt.hashpw(api_key.encode(), bcrypt.gensalt()).decode() + + API_CLIENTS[key_hash] = { + "client_id": client_id, + "scopes": scopes, + "rate_limit": 100 + } + + return { + "client_id": client_id, + "api_key": api_key, # Only shown once! + "scopes": scopes + } + + +if __name__ == "__main__": + import uvicorn + uvicorn.run(app, host="127.0.0.1", port=GATEWAY_PORT) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..5b6177f --- /dev/null +++ b/requirements.txt @@ -0,0 +1,8 @@ +fastapi>=0.100.0 +uvicorn[standard]>=0.23.0 +python-dotenv>=1.0.0 +httpx>=0.24.0 +pyjwt>=2.8.0 +bcrypt>=4.0.0 +slowapi>=0.1.9 +pydantic>=2.0.0