Initial commit: Egregore brain service

AI logic with Claude API integration, tool execution, and system prompts.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
egregore 2026-02-02 11:37:48 +00:00
commit 5710c44821
6 changed files with 906 additions and 0 deletions

526
tools.py Normal file
View file

@ -0,0 +1,526 @@
"""
Egregore Brain - Tool definitions and execution
This module contains the tool definitions exposed to Claude and their execution logic.
"""
import os
import re
import json
import subprocess
import glob as globlib
from typing import Any
import aiofiles
# Tool definitions for Claude
TOOLS = [
{
"name": "git",
"description": "Interact with the VPS git repository. Use for viewing status, history, diffs, and making commits.",
"input_schema": {
"type": "object",
"properties": {
"command": {
"type": "string",
"enum": ["status", "log", "diff", "commit"],
"description": "Git operation to perform"
},
"message": {
"type": "string",
"description": "Commit message (required for commit command)"
},
"files": {
"type": "array",
"items": {"type": "string"},
"description": "Files to stage (for commit). Omit to commit all staged changes."
}
},
"required": ["command"]
}
},
{
"name": "docs",
"description": "Read or update VPS documentation files (STATUS.md, HISTORY.md, RUNBOOK.md)",
"input_schema": {
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["read", "append", "update"],
"description": "Action to perform"
},
"file": {
"type": "string",
"enum": ["STATUS.md", "HISTORY.md", "RUNBOOK.md"],
"description": "Which doc file to operate on"
},
"content": {
"type": "string",
"description": "Content to append or new content for update"
},
"section": {
"type": "string",
"description": "Section header to update (for update action)"
}
},
"required": ["action", "file"]
}
},
{
"name": "read",
"description": "Read a file from the filesystem. Use this instead of bash cat/head/tail. Returns file contents with line numbers.",
"input_schema": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Absolute path to the file to read"
},
"offset": {
"type": "integer",
"description": "Line number to start reading from (1-indexed, optional)"
},
"limit": {
"type": "integer",
"description": "Maximum number of lines to read (optional, default 500)"
}
},
"required": ["path"]
}
},
{
"name": "write",
"description": "Write content to a file, creating it if it doesn't exist or overwriting if it does.",
"input_schema": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Absolute path to the file to write"
},
"content": {
"type": "string",
"description": "Content to write to the file"
}
},
"required": ["path", "content"]
}
},
{
"name": "edit",
"description": "Make precise string replacements in a file. Use this instead of bash sed/awk. The old_string must match exactly.",
"input_schema": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Absolute path to the file to edit"
},
"old_string": {
"type": "string",
"description": "Exact string to find and replace"
},
"new_string": {
"type": "string",
"description": "String to replace it with"
},
"replace_all": {
"type": "boolean",
"description": "Replace all occurrences (default: false, replace first only)"
}
},
"required": ["path", "old_string", "new_string"]
}
},
{
"name": "glob",
"description": "Find files matching a glob pattern. Use this instead of bash find/ls for file discovery.",
"input_schema": {
"type": "object",
"properties": {
"pattern": {
"type": "string",
"description": "Glob pattern (e.g., '**/*.py', 'docs/*.md')"
},
"path": {
"type": "string",
"description": "Base directory to search in (default: /home/admin)"
}
},
"required": ["pattern"]
}
},
{
"name": "grep",
"description": "Search file contents using regex patterns. Use this instead of bash grep/rg.",
"input_schema": {
"type": "object",
"properties": {
"pattern": {
"type": "string",
"description": "Regex pattern to search for"
},
"path": {
"type": "string",
"description": "File or directory to search in (default: /home/admin)"
},
"glob": {
"type": "string",
"description": "Glob pattern to filter files (e.g., '*.py')"
},
"context": {
"type": "integer",
"description": "Lines of context around matches (default: 0)"
}
},
"required": ["pattern"]
}
},
{
"name": "bash",
"description": "Run safe bash commands for system inspection. Limited to read-only and status commands. Prefer read/write/edit/glob/grep tools for file operations.",
"input_schema": {
"type": "object",
"properties": {
"command": {
"type": "string",
"description": "Bash command to execute (must be safe/read-only)"
}
},
"required": ["command"]
}
}
]
# Allowlist for bash commands
ALLOWED_BASH_PREFIXES = [
"df", "free", "uptime", "ps", "systemctl status", "cat", "ls",
"head", "tail", "du", "top -bn1", "hostname", "date", "who", "last", "htop"
]
async def execute_tool(name: str, tool_input: dict) -> str:
"""Execute a tool and return the result"""
executors = {
"git": execute_git,
"docs": execute_docs,
"read": execute_read,
"write": execute_write,
"edit": execute_edit,
"glob": execute_glob,
"grep": execute_grep,
"bash": execute_bash,
}
executor = executors.get(name)
if executor:
return await executor(tool_input)
return f"Unknown tool: {name}"
async def execute_git(tool_input: dict) -> str:
"""Execute git commands"""
cmd = tool_input.get("command")
try:
if cmd == "status":
result = subprocess.run(
["git", "-C", "/home/admin", "status", "--short"],
capture_output=True, text=True, timeout=10
)
return result.stdout.strip() or "Working tree clean"
elif cmd == "log":
result = subprocess.run(
["git", "-C", "/home/admin", "log", "--oneline", "-10"],
capture_output=True, text=True, timeout=10
)
return result.stdout.strip()
elif cmd == "diff":
result = subprocess.run(
["git", "-C", "/home/admin", "diff", "--stat"],
capture_output=True, text=True, timeout=10
)
return result.stdout.strip() or "No changes"
elif cmd == "commit":
message = tool_input.get("message")
if not message:
return "Error: commit message required"
files = tool_input.get("files", [])
if files:
subprocess.run(
["git", "-C", "/home/admin", "add"] + files,
timeout=10
)
result = subprocess.run(
["git", "-C", "/home/admin", "commit", "-m", message],
capture_output=True, text=True, timeout=10
)
output = result.stdout + result.stderr
return output.strip() if output.strip() else "Commit completed"
return "Unknown git command"
except subprocess.TimeoutExpired:
return "Error: command timed out"
except Exception as e:
return f"Error: {str(e)}"
async def execute_docs(tool_input: dict) -> str:
"""Execute docs operations"""
action = tool_input.get("action")
filename = tool_input.get("file")
if filename not in ["STATUS.md", "HISTORY.md", "RUNBOOK.md"]:
return "Error: invalid file"
filepath = f"/home/admin/docs/{filename}"
try:
if action == "read":
async with aiofiles.open(filepath) as f:
content = await f.read()
return content[:10000]
elif action == "append":
content = tool_input.get("content", "")
if not content:
return "Error: content required for append"
async with aiofiles.open(filepath, "a") as f:
await f.write("\n" + content)
return f"Appended to {filename}"
elif action == "update":
content = tool_input.get("content", "")
if not content:
return "Error: content required for update"
async with aiofiles.open(filepath, "w") as f:
await f.write(content)
return f"Updated {filename}"
return "Unknown action"
except FileNotFoundError:
return f"Error: {filename} not found"
except Exception as e:
return f"Error: {str(e)}"
async def execute_read(tool_input: dict) -> str:
"""Read a file with optional offset and limit"""
path = tool_input.get("path", "")
offset = tool_input.get("offset", 1)
limit = tool_input.get("limit", 500)
# Security: restrict to /home/admin
if not path.startswith("/home/admin"):
return "Error: can only read files under /home/admin"
try:
async with aiofiles.open(path, "r") as f:
lines = await f.readlines()
start = max(0, offset - 1)
end = start + limit
selected = lines[start:end]
result = []
for i, line in enumerate(selected, start=start + 1):
result.append(f"{i:4d}\t{line.rstrip()}")
output = "\n".join(result)
if len(output) > 50000:
output = output[:50000] + "\n... (truncated)"
return output if output else "(empty file)"
except FileNotFoundError:
return f"Error: file not found: {path}"
except IsADirectoryError:
return f"Error: path is a directory: {path}"
except PermissionError:
return f"Error: permission denied: {path}"
except Exception as e:
return f"Error: {str(e)}"
async def execute_write(tool_input: dict) -> str:
"""Write content to a file"""
path = tool_input.get("path", "")
content = tool_input.get("content", "")
if not path.startswith("/home/admin"):
return "Error: can only write files under /home/admin"
critical = ["/home/admin/.ssh", "/home/admin/.bashrc", "/home/admin/.profile"]
for c in critical:
if path.startswith(c):
return f"Error: cannot write to protected path: {c}"
try:
parent = os.path.dirname(path)
if parent and not os.path.exists(parent):
os.makedirs(parent, exist_ok=True)
async with aiofiles.open(path, "w") as f:
await f.write(content)
return f"Wrote {len(content)} bytes to {path}"
except PermissionError:
return f"Error: permission denied: {path}"
except Exception as e:
return f"Error: {str(e)}"
async def execute_edit(tool_input: dict) -> str:
"""Make string replacements in a file"""
path = tool_input.get("path", "")
old_string = tool_input.get("old_string", "")
new_string = tool_input.get("new_string", "")
replace_all = tool_input.get("replace_all", False)
if not path.startswith("/home/admin"):
return "Error: can only edit files under /home/admin"
if not old_string:
return "Error: old_string cannot be empty"
try:
async with aiofiles.open(path, "r") as f:
content = await f.read()
if old_string not in content:
return f"Error: old_string not found in file"
count = content.count(old_string)
if replace_all:
new_content = content.replace(old_string, new_string)
replaced = count
else:
if count > 1:
return f"Error: old_string appears {count} times. Use replace_all=true or provide more context to make it unique."
new_content = content.replace(old_string, new_string, 1)
replaced = 1
async with aiofiles.open(path, "w") as f:
await f.write(new_content)
return f"Replaced {replaced} occurrence(s) in {path}"
except FileNotFoundError:
return f"Error: file not found: {path}"
except PermissionError:
return f"Error: permission denied: {path}"
except Exception as e:
return f"Error: {str(e)}"
async def execute_glob(tool_input: dict) -> str:
"""Find files matching a glob pattern"""
pattern = tool_input.get("pattern", "")
base_path = tool_input.get("path", "/home/admin")
if not base_path.startswith("/home/admin"):
base_path = "/home/admin"
try:
full_pattern = os.path.join(base_path, pattern)
matches = globlib.glob(full_pattern, recursive=True)
matches.sort(key=lambda x: os.path.getmtime(x) if os.path.exists(x) else 0, reverse=True)
if len(matches) > 100:
matches = matches[:100]
return "\n".join(matches) + f"\n... (showing 100 of {len(matches)} matches)"
return "\n".join(matches) if matches else "No matches found"
except Exception as e:
return f"Error: {str(e)}"
async def execute_grep(tool_input: dict) -> str:
"""Search file contents using regex"""
pattern = tool_input.get("pattern", "")
path = tool_input.get("path", "/home/admin")
file_glob = tool_input.get("glob", None)
context = tool_input.get("context", 0)
if not path.startswith("/home/admin"):
path = "/home/admin"
try:
regex = re.compile(pattern)
except re.error as e:
return f"Error: invalid regex: {e}"
results = []
files_searched = 0
max_results = 50
try:
if os.path.isfile(path):
files = [path]
else:
if file_glob:
files = globlib.glob(os.path.join(path, "**", file_glob), recursive=True)
else:
files = globlib.glob(os.path.join(path, "**", "*"), recursive=True)
files = [f for f in files if os.path.isfile(f)]
for filepath in files:
if len(results) >= max_results:
break
try:
if os.path.getsize(filepath) > 1_000_000:
continue
async with aiofiles.open(filepath, "r", errors="ignore") as f:
lines = await f.readlines()
files_searched += 1
for i, line in enumerate(lines):
if regex.search(line):
if len(results) >= max_results:
break
match_lines = []
start = max(0, i - context)
end = min(len(lines), i + context + 1)
for j in range(start, end):
prefix = ">" if j == i else " "
match_lines.append(f"{prefix}{j+1:4d}: {lines[j].rstrip()}")
results.append(f"{filepath}:\n" + "\n".join(match_lines))
except (PermissionError, IsADirectoryError, UnicodeDecodeError):
continue
if not results:
return f"No matches found (searched {files_searched} files)"
output = "\n\n".join(results)
if len(results) >= max_results:
output += f"\n\n... (showing first {max_results} matches)"
return output
except Exception as e:
return f"Error: {str(e)}"
async def execute_bash(tool_input: dict) -> str:
"""Execute safe bash commands"""
cmd = tool_input.get("command", "").strip()
allowed = False
for prefix in ALLOWED_BASH_PREFIXES:
if cmd.startswith(prefix):
allowed = True
break
if not allowed:
return f"Error: command not allowed. Permitted prefixes: {', '.join(ALLOWED_BASH_PREFIXES)}"
try:
result = subprocess.run(
cmd, shell=True, capture_output=True, text=True,
timeout=30, cwd="/home/admin"
)
output = result.stdout + result.stderr
return output[:5000].strip() if output.strip() else "Command completed (no output)"
except subprocess.TimeoutExpired:
return "Error: command timed out (30s limit)"
except Exception as e:
return f"Error: {str(e)}"