commit 5710c448212cd0bcdc87c13a830227663d7cb7af Author: egregore Date: Mon Feb 2 11:37:48 2026 +0000 Initial commit: Egregore brain service AI logic with Claude API integration, tool execution, and system prompts. Co-Authored-By: Claude Opus 4.5 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..be46f05 --- /dev/null +++ b/.gitignore @@ -0,0 +1,43 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +*.egg-info/ +.eggs/ +dist/ +build/ + +# Environment +.env +.env.* +*.local + +# IDE +.vscode/ +.idea/ +*.swp +*.swo + +# Secrets - BE PARANOID +*.pem +*.key +*.crt +*.p12 +credentials* +secrets* +tokens* +*_secret* +*_token* +*.credentials + +# Logs and data +*.log +*.db +*.sqlite +*.backup + +# OS +.DS_Store +Thumbs.db diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..9d02a6f --- /dev/null +++ b/__init__.py @@ -0,0 +1,21 @@ +""" +Egregore Brain - AI reasoning and tool execution + +This module handles: +- Claude API interactions +- System prompt generation with dynamic context +- Tool definition and execution +- Conversation processing +""" + +from .tools import TOOLS, execute_tool +from .prompts import get_system_prompt, SYSTEM_PROMPT_BASE +from .conversation import process_conversation + +__all__ = [ + "TOOLS", + "execute_tool", + "get_system_prompt", + "SYSTEM_PROMPT_BASE", + "process_conversation", +] diff --git a/conversation.py b/conversation.py new file mode 100644 index 0000000..7bf6af5 --- /dev/null +++ b/conversation.py @@ -0,0 +1,175 @@ +""" +Egregore Brain - Conversation processing with tool use loop +""" + +import json +import uuid +from typing import Any + +import anthropic + +from tools import TOOLS, execute_tool +from prompts import get_system_prompt + + +def extract_embedded_tool_calls(text: str) -> tuple[list, str]: + """ + Detect if text contains JSON-encoded tool calls and extract them. + Returns (tool_calls, remaining_text) where tool_calls is a list of parsed + tool_use dicts if found, or empty list if not. + """ + if not text or not text.strip().startswith('['): + return [], text + + try: + parsed = json.loads(text.strip()) + if isinstance(parsed, list) and len(parsed) > 0: + if all(isinstance(b, dict) and b.get('type') in ('tool_use', 'tool_result', 'text') for b in parsed): + tool_calls = [b for b in parsed if b.get('type') == 'tool_use'] + if tool_calls: + return tool_calls, "" + except (json.JSONDecodeError, TypeError): + pass + + return [], text + + +def serialize_content_blocks(blocks) -> list: + """Convert Claude API blocks to JSON-serializable format""" + result = [] + for block in blocks: + if hasattr(block, 'type'): + if block.type == "text": + embedded_tools, remaining = extract_embedded_tool_calls(block.text) + if embedded_tools: + for tool in embedded_tools: + result.append(tool) + elif remaining: + result.append({"type": "text", "content": remaining}) + elif block.type == "tool_use": + result.append({ + "type": "tool_use", + "id": block.id, + "name": block.name, + "input": block.input + }) + elif isinstance(block, dict): + result.append(block) + return result + + +def extract_text_from_blocks(blocks) -> str: + """Extract plain text for notifications etc""" + texts = [] + for block in blocks: + if hasattr(block, 'type') and block.type == "text": + texts.append(block.text) + elif isinstance(block, dict) and block.get("type") == "text": + texts.append(block.get("content", "")) + return "\n".join(texts) + + +async def process_conversation( + client: anthropic.AsyncAnthropic, + model: str, + history: list[dict], + max_iterations: int = 10 +) -> list[dict]: + """ + Process a conversation with tool use loop. + + Args: + client: Async Anthropic client + model: Model ID to use + history: Conversation history in Claude API format + max_iterations: Maximum tool use iterations + + Returns: + List of response blocks (text, tool_use, tool_result) + """ + system_prompt = await get_system_prompt() + all_response_blocks = [] + + for _ in range(max_iterations): + response = await client.messages.create( + model=model, + max_tokens=4096, + system=system_prompt, + messages=history, + tools=TOOLS + ) + + tool_uses = [] + embedded_tool_calls = [] + + for block in response.content: + if block.type == "tool_use": + tool_uses.append(block) + all_response_blocks.append({ + "type": "tool_use", + "id": block.id, + "name": block.name, + "input": block.input + }) + elif block.type == "text": + embedded, remaining = extract_embedded_tool_calls(block.text) + if embedded: + embedded_tool_calls.extend(embedded) + for tool in embedded: + all_response_blocks.append(tool) + elif remaining: + all_response_blocks.append({ + "type": "text", + "content": remaining + }) + + if not tool_uses and not embedded_tool_calls: + break + + # Handle embedded tool calls (model misbehavior - output JSON as text) + if embedded_tool_calls and not tool_uses: + embedded_results = [] + for idx, tool_call in enumerate(embedded_tool_calls): + tool_id = tool_call.get("id", f"embedded_{idx}") + tool_name = tool_call.get("name") + tool_input = tool_call.get("input", {}) + + if tool_name: + result = await execute_tool(tool_name, tool_input) + embedded_results.append({ + "type": "tool_result", + "tool_use_id": tool_id, + "content": result + }) + all_response_blocks.append({ + "type": "tool_result", + "tool_use_id": tool_id, + "tool_name": tool_name, + "content": result + }) + + if embedded_results: + history.append({"role": "assistant", "content": response.content}) + history.append({"role": "user", "content": embedded_results}) + continue + + # Execute tools and add results to history + tool_results = [] + for tool_use in tool_uses: + result = await execute_tool(tool_use.name, tool_use.input) + tool_results.append({ + "type": "tool_result", + "tool_use_id": tool_use.id, + "content": result + }) + all_response_blocks.append({ + "type": "tool_result", + "tool_use_id": tool_use.id, + "tool_name": tool_use.name, + "content": result + }) + + history.append({"role": "assistant", "content": response.content}) + history.append({"role": "user", "content": tool_results}) + + return all_response_blocks diff --git a/main.py b/main.py new file mode 100644 index 0000000..bda4fb8 --- /dev/null +++ b/main.py @@ -0,0 +1,90 @@ +#!/usr/bin/env python3 +""" +Egregore Brain Service - AI reasoning API + +Provides HTTP API for conversation processing with Claude. +Runs on port 8081. +""" + +import os +from typing import Optional + +from fastapi import FastAPI, HTTPException +from pydantic import BaseModel +from dotenv import load_dotenv +import anthropic + +from tools import TOOLS, execute_tool +from prompts import get_system_prompt +from conversation import process_conversation + +# Load environment +load_dotenv("/home/admin/.env") + +ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY") +if not ANTHROPIC_API_KEY: + raise ValueError("ANTHROPIC_API_KEY not set") + +app = FastAPI(title="Egregore Brain Service", docs_url="/docs") + +# Initialize Anthropic client +client = anthropic.AsyncAnthropic(api_key=ANTHROPIC_API_KEY) + + +# Request models +class ProcessRequest(BaseModel): + model: str = "claude-sonnet-4-20250514" + history: list # Conversation history in Claude API format + max_iterations: int = 10 + + +class ToolRequest(BaseModel): + name: str + input: dict + + +# Endpoints +@app.post("/process") +async def api_process(req: ProcessRequest): + """Process a conversation with tool use loop""" + try: + response_blocks = await process_conversation( + client=client, + model=req.model, + history=req.history, + max_iterations=req.max_iterations + ) + return {"blocks": response_blocks} + except anthropic.APIError as e: + raise HTTPException(status_code=500, detail=str(e)) + + +@app.post("/tool") +async def api_execute_tool(req: ToolRequest): + """Execute a single tool directly""" + result = await execute_tool(req.name, req.input) + return {"result": result} + + +@app.get("/tools") +async def api_get_tools(): + """Get available tool definitions""" + return {"tools": TOOLS} + + +@app.get("/prompt") +async def api_get_prompt(): + """Get current system prompt with context""" + prompt = await get_system_prompt() + return {"prompt": prompt} + + +@app.get("/health") +async def health(): + """Health check endpoint""" + return {"status": "ok", "service": "brain"} + + +if __name__ == "__main__": + import uvicorn + uvicorn.run(app, host="127.0.0.1", port=8081) diff --git a/prompts.py b/prompts.py new file mode 100644 index 0000000..394d79f --- /dev/null +++ b/prompts.py @@ -0,0 +1,51 @@ +""" +Egregore Brain - System prompt and context injection +""" + +import subprocess +import aiofiles + + +SYSTEM_PROMPT_BASE = """You are Egregore, a personal AI assistant running on a dedicated VPS. + +Context: +- You're hosted at egregore.leaf.ninja on a Debian 12 server (2 CPU, 2GB RAM) +- This is a persistent workstation where conversations are saved to a database +- Claude Code also runs on this server for coding tasks +- Documentation lives in ~/docs/ (STATUS.md, HISTORY.md, RUNBOOK.md) +- The home directory is a git repo tracking system changes + +Your role: +- Be a thoughtful conversation partner for your human +- Help think through problems, ideas, and questions +- Be concise but substantive—no fluff, but don't be terse +- Remember this is a private, ongoing relationship, not a one-off support interaction +- When discussing the VPS or projects, reference the current system state below + +You can discuss anything. Be genuine and direct.""" + + +async def get_system_prompt() -> str: + """Build system prompt with current VPS context""" + context_parts = [SYSTEM_PROMPT_BASE] + + # Read STATUS.md + try: + async with aiofiles.open("/home/admin/docs/STATUS.md") as f: + status = await f.read() + context_parts.append(f"\n\n## Current System Status\n```\n{status.strip()}\n```") + except Exception: + pass + + # Get recent git commits + try: + result = subprocess.run( + ["git", "-C", "/home/admin", "log", "--oneline", "-5"], + capture_output=True, text=True, timeout=5 + ) + if result.returncode == 0 and result.stdout.strip(): + context_parts.append(f"\n\n## Recent Changes (git log)\n```\n{result.stdout.strip()}\n```") + except Exception: + pass + + return "\n".join(context_parts) diff --git a/tools.py b/tools.py new file mode 100644 index 0000000..780946f --- /dev/null +++ b/tools.py @@ -0,0 +1,526 @@ +""" +Egregore Brain - Tool definitions and execution + +This module contains the tool definitions exposed to Claude and their execution logic. +""" + +import os +import re +import json +import subprocess +import glob as globlib +from typing import Any + +import aiofiles + + +# Tool definitions for Claude +TOOLS = [ + { + "name": "git", + "description": "Interact with the VPS git repository. Use for viewing status, history, diffs, and making commits.", + "input_schema": { + "type": "object", + "properties": { + "command": { + "type": "string", + "enum": ["status", "log", "diff", "commit"], + "description": "Git operation to perform" + }, + "message": { + "type": "string", + "description": "Commit message (required for commit command)" + }, + "files": { + "type": "array", + "items": {"type": "string"}, + "description": "Files to stage (for commit). Omit to commit all staged changes." + } + }, + "required": ["command"] + } + }, + { + "name": "docs", + "description": "Read or update VPS documentation files (STATUS.md, HISTORY.md, RUNBOOK.md)", + "input_schema": { + "type": "object", + "properties": { + "action": { + "type": "string", + "enum": ["read", "append", "update"], + "description": "Action to perform" + }, + "file": { + "type": "string", + "enum": ["STATUS.md", "HISTORY.md", "RUNBOOK.md"], + "description": "Which doc file to operate on" + }, + "content": { + "type": "string", + "description": "Content to append or new content for update" + }, + "section": { + "type": "string", + "description": "Section header to update (for update action)" + } + }, + "required": ["action", "file"] + } + }, + { + "name": "read", + "description": "Read a file from the filesystem. Use this instead of bash cat/head/tail. Returns file contents with line numbers.", + "input_schema": { + "type": "object", + "properties": { + "path": { + "type": "string", + "description": "Absolute path to the file to read" + }, + "offset": { + "type": "integer", + "description": "Line number to start reading from (1-indexed, optional)" + }, + "limit": { + "type": "integer", + "description": "Maximum number of lines to read (optional, default 500)" + } + }, + "required": ["path"] + } + }, + { + "name": "write", + "description": "Write content to a file, creating it if it doesn't exist or overwriting if it does.", + "input_schema": { + "type": "object", + "properties": { + "path": { + "type": "string", + "description": "Absolute path to the file to write" + }, + "content": { + "type": "string", + "description": "Content to write to the file" + } + }, + "required": ["path", "content"] + } + }, + { + "name": "edit", + "description": "Make precise string replacements in a file. Use this instead of bash sed/awk. The old_string must match exactly.", + "input_schema": { + "type": "object", + "properties": { + "path": { + "type": "string", + "description": "Absolute path to the file to edit" + }, + "old_string": { + "type": "string", + "description": "Exact string to find and replace" + }, + "new_string": { + "type": "string", + "description": "String to replace it with" + }, + "replace_all": { + "type": "boolean", + "description": "Replace all occurrences (default: false, replace first only)" + } + }, + "required": ["path", "old_string", "new_string"] + } + }, + { + "name": "glob", + "description": "Find files matching a glob pattern. Use this instead of bash find/ls for file discovery.", + "input_schema": { + "type": "object", + "properties": { + "pattern": { + "type": "string", + "description": "Glob pattern (e.g., '**/*.py', 'docs/*.md')" + }, + "path": { + "type": "string", + "description": "Base directory to search in (default: /home/admin)" + } + }, + "required": ["pattern"] + } + }, + { + "name": "grep", + "description": "Search file contents using regex patterns. Use this instead of bash grep/rg.", + "input_schema": { + "type": "object", + "properties": { + "pattern": { + "type": "string", + "description": "Regex pattern to search for" + }, + "path": { + "type": "string", + "description": "File or directory to search in (default: /home/admin)" + }, + "glob": { + "type": "string", + "description": "Glob pattern to filter files (e.g., '*.py')" + }, + "context": { + "type": "integer", + "description": "Lines of context around matches (default: 0)" + } + }, + "required": ["pattern"] + } + }, + { + "name": "bash", + "description": "Run safe bash commands for system inspection. Limited to read-only and status commands. Prefer read/write/edit/glob/grep tools for file operations.", + "input_schema": { + "type": "object", + "properties": { + "command": { + "type": "string", + "description": "Bash command to execute (must be safe/read-only)" + } + }, + "required": ["command"] + } + } +] + +# Allowlist for bash commands +ALLOWED_BASH_PREFIXES = [ + "df", "free", "uptime", "ps", "systemctl status", "cat", "ls", + "head", "tail", "du", "top -bn1", "hostname", "date", "who", "last", "htop" +] + + +async def execute_tool(name: str, tool_input: dict) -> str: + """Execute a tool and return the result""" + executors = { + "git": execute_git, + "docs": execute_docs, + "read": execute_read, + "write": execute_write, + "edit": execute_edit, + "glob": execute_glob, + "grep": execute_grep, + "bash": execute_bash, + } + + executor = executors.get(name) + if executor: + return await executor(tool_input) + return f"Unknown tool: {name}" + + +async def execute_git(tool_input: dict) -> str: + """Execute git commands""" + cmd = tool_input.get("command") + try: + if cmd == "status": + result = subprocess.run( + ["git", "-C", "/home/admin", "status", "--short"], + capture_output=True, text=True, timeout=10 + ) + return result.stdout.strip() or "Working tree clean" + elif cmd == "log": + result = subprocess.run( + ["git", "-C", "/home/admin", "log", "--oneline", "-10"], + capture_output=True, text=True, timeout=10 + ) + return result.stdout.strip() + elif cmd == "diff": + result = subprocess.run( + ["git", "-C", "/home/admin", "diff", "--stat"], + capture_output=True, text=True, timeout=10 + ) + return result.stdout.strip() or "No changes" + elif cmd == "commit": + message = tool_input.get("message") + if not message: + return "Error: commit message required" + files = tool_input.get("files", []) + if files: + subprocess.run( + ["git", "-C", "/home/admin", "add"] + files, + timeout=10 + ) + result = subprocess.run( + ["git", "-C", "/home/admin", "commit", "-m", message], + capture_output=True, text=True, timeout=10 + ) + output = result.stdout + result.stderr + return output.strip() if output.strip() else "Commit completed" + return "Unknown git command" + except subprocess.TimeoutExpired: + return "Error: command timed out" + except Exception as e: + return f"Error: {str(e)}" + + +async def execute_docs(tool_input: dict) -> str: + """Execute docs operations""" + action = tool_input.get("action") + filename = tool_input.get("file") + + if filename not in ["STATUS.md", "HISTORY.md", "RUNBOOK.md"]: + return "Error: invalid file" + + filepath = f"/home/admin/docs/{filename}" + + try: + if action == "read": + async with aiofiles.open(filepath) as f: + content = await f.read() + return content[:10000] + elif action == "append": + content = tool_input.get("content", "") + if not content: + return "Error: content required for append" + async with aiofiles.open(filepath, "a") as f: + await f.write("\n" + content) + return f"Appended to {filename}" + elif action == "update": + content = tool_input.get("content", "") + if not content: + return "Error: content required for update" + async with aiofiles.open(filepath, "w") as f: + await f.write(content) + return f"Updated {filename}" + return "Unknown action" + except FileNotFoundError: + return f"Error: {filename} not found" + except Exception as e: + return f"Error: {str(e)}" + + +async def execute_read(tool_input: dict) -> str: + """Read a file with optional offset and limit""" + path = tool_input.get("path", "") + offset = tool_input.get("offset", 1) + limit = tool_input.get("limit", 500) + + # Security: restrict to /home/admin + if not path.startswith("/home/admin"): + return "Error: can only read files under /home/admin" + + try: + async with aiofiles.open(path, "r") as f: + lines = await f.readlines() + + start = max(0, offset - 1) + end = start + limit + selected = lines[start:end] + + result = [] + for i, line in enumerate(selected, start=start + 1): + result.append(f"{i:4d}\t{line.rstrip()}") + + output = "\n".join(result) + if len(output) > 50000: + output = output[:50000] + "\n... (truncated)" + return output if output else "(empty file)" + except FileNotFoundError: + return f"Error: file not found: {path}" + except IsADirectoryError: + return f"Error: path is a directory: {path}" + except PermissionError: + return f"Error: permission denied: {path}" + except Exception as e: + return f"Error: {str(e)}" + + +async def execute_write(tool_input: dict) -> str: + """Write content to a file""" + path = tool_input.get("path", "") + content = tool_input.get("content", "") + + if not path.startswith("/home/admin"): + return "Error: can only write files under /home/admin" + + critical = ["/home/admin/.ssh", "/home/admin/.bashrc", "/home/admin/.profile"] + for c in critical: + if path.startswith(c): + return f"Error: cannot write to protected path: {c}" + + try: + parent = os.path.dirname(path) + if parent and not os.path.exists(parent): + os.makedirs(parent, exist_ok=True) + + async with aiofiles.open(path, "w") as f: + await f.write(content) + return f"Wrote {len(content)} bytes to {path}" + except PermissionError: + return f"Error: permission denied: {path}" + except Exception as e: + return f"Error: {str(e)}" + + +async def execute_edit(tool_input: dict) -> str: + """Make string replacements in a file""" + path = tool_input.get("path", "") + old_string = tool_input.get("old_string", "") + new_string = tool_input.get("new_string", "") + replace_all = tool_input.get("replace_all", False) + + if not path.startswith("/home/admin"): + return "Error: can only edit files under /home/admin" + + if not old_string: + return "Error: old_string cannot be empty" + + try: + async with aiofiles.open(path, "r") as f: + content = await f.read() + + if old_string not in content: + return f"Error: old_string not found in file" + + count = content.count(old_string) + + if replace_all: + new_content = content.replace(old_string, new_string) + replaced = count + else: + if count > 1: + return f"Error: old_string appears {count} times. Use replace_all=true or provide more context to make it unique." + new_content = content.replace(old_string, new_string, 1) + replaced = 1 + + async with aiofiles.open(path, "w") as f: + await f.write(new_content) + + return f"Replaced {replaced} occurrence(s) in {path}" + except FileNotFoundError: + return f"Error: file not found: {path}" + except PermissionError: + return f"Error: permission denied: {path}" + except Exception as e: + return f"Error: {str(e)}" + + +async def execute_glob(tool_input: dict) -> str: + """Find files matching a glob pattern""" + pattern = tool_input.get("pattern", "") + base_path = tool_input.get("path", "/home/admin") + + if not base_path.startswith("/home/admin"): + base_path = "/home/admin" + + try: + full_pattern = os.path.join(base_path, pattern) + matches = globlib.glob(full_pattern, recursive=True) + matches.sort(key=lambda x: os.path.getmtime(x) if os.path.exists(x) else 0, reverse=True) + + if len(matches) > 100: + matches = matches[:100] + return "\n".join(matches) + f"\n... (showing 100 of {len(matches)} matches)" + + return "\n".join(matches) if matches else "No matches found" + except Exception as e: + return f"Error: {str(e)}" + + +async def execute_grep(tool_input: dict) -> str: + """Search file contents using regex""" + pattern = tool_input.get("pattern", "") + path = tool_input.get("path", "/home/admin") + file_glob = tool_input.get("glob", None) + context = tool_input.get("context", 0) + + if not path.startswith("/home/admin"): + path = "/home/admin" + + try: + regex = re.compile(pattern) + except re.error as e: + return f"Error: invalid regex: {e}" + + results = [] + files_searched = 0 + max_results = 50 + + try: + if os.path.isfile(path): + files = [path] + else: + if file_glob: + files = globlib.glob(os.path.join(path, "**", file_glob), recursive=True) + else: + files = globlib.glob(os.path.join(path, "**", "*"), recursive=True) + files = [f for f in files if os.path.isfile(f)] + + for filepath in files: + if len(results) >= max_results: + break + + try: + if os.path.getsize(filepath) > 1_000_000: + continue + + async with aiofiles.open(filepath, "r", errors="ignore") as f: + lines = await f.readlines() + + files_searched += 1 + + for i, line in enumerate(lines): + if regex.search(line): + if len(results) >= max_results: + break + + match_lines = [] + start = max(0, i - context) + end = min(len(lines), i + context + 1) + + for j in range(start, end): + prefix = ">" if j == i else " " + match_lines.append(f"{prefix}{j+1:4d}: {lines[j].rstrip()}") + + results.append(f"{filepath}:\n" + "\n".join(match_lines)) + except (PermissionError, IsADirectoryError, UnicodeDecodeError): + continue + + if not results: + return f"No matches found (searched {files_searched} files)" + + output = "\n\n".join(results) + if len(results) >= max_results: + output += f"\n\n... (showing first {max_results} matches)" + + return output + except Exception as e: + return f"Error: {str(e)}" + + +async def execute_bash(tool_input: dict) -> str: + """Execute safe bash commands""" + cmd = tool_input.get("command", "").strip() + + allowed = False + for prefix in ALLOWED_BASH_PREFIXES: + if cmd.startswith(prefix): + allowed = True + break + + if not allowed: + return f"Error: command not allowed. Permitted prefixes: {', '.join(ALLOWED_BASH_PREFIXES)}" + + try: + result = subprocess.run( + cmd, shell=True, capture_output=True, text=True, + timeout=30, cwd="/home/admin" + ) + output = result.stdout + result.stderr + return output[:5000].strip() if output.strip() else "Command completed (no output)" + except subprocess.TimeoutExpired: + return "Error: command timed out (30s limit)" + except Exception as e: + return f"Error: {str(e)}"