""" Egregore Brain - Tool definitions and execution This module contains the tool definitions exposed to Claude and their execution logic. """ import os import re import json import subprocess import glob as globlib from typing import Any import aiofiles # Tool definitions for Claude TOOLS = [ { "name": "git", "description": "Interact with the VPS git repository. Use for viewing status, history, diffs, and making commits.", "input_schema": { "type": "object", "properties": { "command": { "type": "string", "enum": ["status", "log", "diff", "commit"], "description": "Git operation to perform" }, "message": { "type": "string", "description": "Commit message (required for commit command)" }, "files": { "type": "array", "items": {"type": "string"}, "description": "Files to stage (for commit). Omit to commit all staged changes." } }, "required": ["command"] } }, { "name": "docs", "description": "Read or update VPS documentation files (STATUS.md, HISTORY.md, RUNBOOK.md)", "input_schema": { "type": "object", "properties": { "action": { "type": "string", "enum": ["read", "append", "update"], "description": "Action to perform" }, "file": { "type": "string", "enum": ["STATUS.md", "HISTORY.md", "RUNBOOK.md"], "description": "Which doc file to operate on" }, "content": { "type": "string", "description": "Content to append or new content for update" }, "section": { "type": "string", "description": "Section header to update (for update action)" } }, "required": ["action", "file"] } }, { "name": "read", "description": "Read a file from the filesystem. Use this instead of bash cat/head/tail. Returns file contents with line numbers.", "input_schema": { "type": "object", "properties": { "path": { "type": "string", "description": "Absolute path to the file to read" }, "offset": { "type": "integer", "description": "Line number to start reading from (1-indexed, optional)" }, "limit": { "type": "integer", "description": "Maximum number of lines to read (optional, default 500)" } }, "required": ["path"] } }, { "name": "write", "description": "Write content to a file, creating it if it doesn't exist or overwriting if it does.", "input_schema": { "type": "object", "properties": { "path": { "type": "string", "description": "Absolute path to the file to write" }, "content": { "type": "string", "description": "Content to write to the file" } }, "required": ["path", "content"] } }, { "name": "edit", "description": "Make precise string replacements in a file. Use this instead of bash sed/awk. The old_string must match exactly.", "input_schema": { "type": "object", "properties": { "path": { "type": "string", "description": "Absolute path to the file to edit" }, "old_string": { "type": "string", "description": "Exact string to find and replace" }, "new_string": { "type": "string", "description": "String to replace it with" }, "replace_all": { "type": "boolean", "description": "Replace all occurrences (default: false, replace first only)" } }, "required": ["path", "old_string", "new_string"] } }, { "name": "glob", "description": "Find files matching a glob pattern. Use this instead of bash find/ls for file discovery.", "input_schema": { "type": "object", "properties": { "pattern": { "type": "string", "description": "Glob pattern (e.g., '**/*.py', 'docs/*.md')" }, "path": { "type": "string", "description": "Base directory to search in (default: /home/admin)" } }, "required": ["pattern"] } }, { "name": "grep", "description": "Search file contents using regex patterns. Use this instead of bash grep/rg.", "input_schema": { "type": "object", "properties": { "pattern": { "type": "string", "description": "Regex pattern to search for" }, "path": { "type": "string", "description": "File or directory to search in (default: /home/admin)" }, "glob": { "type": "string", "description": "Glob pattern to filter files (e.g., '*.py')" }, "context": { "type": "integer", "description": "Lines of context around matches (default: 0)" } }, "required": ["pattern"] } }, { "name": "bash", "description": "Run safe bash commands for system inspection. Limited to read-only and status commands. Prefer read/write/edit/glob/grep tools for file operations.", "input_schema": { "type": "object", "properties": { "command": { "type": "string", "description": "Bash command to execute (must be safe/read-only)" } }, "required": ["command"] } } ] # Allowlist for bash commands ALLOWED_BASH_PREFIXES = [ "df", "free", "uptime", "ps", "systemctl status", "cat", "ls", "head", "tail", "du", "top -bn1", "hostname", "date", "who", "last", "htop" ] async def execute_tool(name: str, tool_input: dict) -> str: """Execute a tool and return the result""" executors = { "git": execute_git, "docs": execute_docs, "read": execute_read, "write": execute_write, "edit": execute_edit, "glob": execute_glob, "grep": execute_grep, "bash": execute_bash, } executor = executors.get(name) if executor: return await executor(tool_input) return f"Unknown tool: {name}" async def execute_git(tool_input: dict) -> str: """Execute git commands""" cmd = tool_input.get("command") try: if cmd == "status": result = subprocess.run( ["git", "-C", "/home/admin", "status", "--short"], capture_output=True, text=True, timeout=10 ) return result.stdout.strip() or "Working tree clean" elif cmd == "log": result = subprocess.run( ["git", "-C", "/home/admin", "log", "--oneline", "-10"], capture_output=True, text=True, timeout=10 ) return result.stdout.strip() elif cmd == "diff": result = subprocess.run( ["git", "-C", "/home/admin", "diff", "--stat"], capture_output=True, text=True, timeout=10 ) return result.stdout.strip() or "No changes" elif cmd == "commit": message = tool_input.get("message") if not message: return "Error: commit message required" files = tool_input.get("files", []) if files: subprocess.run( ["git", "-C", "/home/admin", "add"] + files, timeout=10 ) result = subprocess.run( ["git", "-C", "/home/admin", "commit", "-m", message], capture_output=True, text=True, timeout=10 ) output = result.stdout + result.stderr return output.strip() if output.strip() else "Commit completed" return "Unknown git command" except subprocess.TimeoutExpired: return "Error: command timed out" except Exception as e: return f"Error: {str(e)}" async def execute_docs(tool_input: dict) -> str: """Execute docs operations""" action = tool_input.get("action") filename = tool_input.get("file") if filename not in ["STATUS.md", "HISTORY.md", "RUNBOOK.md"]: return "Error: invalid file" filepath = f"/home/admin/docs/{filename}" try: if action == "read": async with aiofiles.open(filepath) as f: content = await f.read() return content[:10000] elif action == "append": content = tool_input.get("content", "") if not content: return "Error: content required for append" async with aiofiles.open(filepath, "a") as f: await f.write("\n" + content) return f"Appended to {filename}" elif action == "update": content = tool_input.get("content", "") if not content: return "Error: content required for update" async with aiofiles.open(filepath, "w") as f: await f.write(content) return f"Updated {filename}" return "Unknown action" except FileNotFoundError: return f"Error: {filename} not found" except Exception as e: return f"Error: {str(e)}" async def execute_read(tool_input: dict) -> str: """Read a file with optional offset and limit""" path = tool_input.get("path", "") offset = tool_input.get("offset", 1) limit = tool_input.get("limit", 500) # Security: restrict to /home/admin if not path.startswith("/home/admin"): return "Error: can only read files under /home/admin" try: async with aiofiles.open(path, "r") as f: lines = await f.readlines() start = max(0, offset - 1) end = start + limit selected = lines[start:end] result = [] for i, line in enumerate(selected, start=start + 1): result.append(f"{i:4d}\t{line.rstrip()}") output = "\n".join(result) if len(output) > 50000: output = output[:50000] + "\n... (truncated)" return output if output else "(empty file)" except FileNotFoundError: return f"Error: file not found: {path}" except IsADirectoryError: return f"Error: path is a directory: {path}" except PermissionError: return f"Error: permission denied: {path}" except Exception as e: return f"Error: {str(e)}" async def execute_write(tool_input: dict) -> str: """Write content to a file""" path = tool_input.get("path", "") content = tool_input.get("content", "") if not path.startswith("/home/admin"): return "Error: can only write files under /home/admin" critical = ["/home/admin/.ssh", "/home/admin/.bashrc", "/home/admin/.profile"] for c in critical: if path.startswith(c): return f"Error: cannot write to protected path: {c}" try: parent = os.path.dirname(path) if parent and not os.path.exists(parent): os.makedirs(parent, exist_ok=True) async with aiofiles.open(path, "w") as f: await f.write(content) return f"Wrote {len(content)} bytes to {path}" except PermissionError: return f"Error: permission denied: {path}" except Exception as e: return f"Error: {str(e)}" async def execute_edit(tool_input: dict) -> str: """Make string replacements in a file""" path = tool_input.get("path", "") old_string = tool_input.get("old_string", "") new_string = tool_input.get("new_string", "") replace_all = tool_input.get("replace_all", False) if not path.startswith("/home/admin"): return "Error: can only edit files under /home/admin" if not old_string: return "Error: old_string cannot be empty" try: async with aiofiles.open(path, "r") as f: content = await f.read() if old_string not in content: return f"Error: old_string not found in file" count = content.count(old_string) if replace_all: new_content = content.replace(old_string, new_string) replaced = count else: if count > 1: return f"Error: old_string appears {count} times. Use replace_all=true or provide more context to make it unique." new_content = content.replace(old_string, new_string, 1) replaced = 1 async with aiofiles.open(path, "w") as f: await f.write(new_content) return f"Replaced {replaced} occurrence(s) in {path}" except FileNotFoundError: return f"Error: file not found: {path}" except PermissionError: return f"Error: permission denied: {path}" except Exception as e: return f"Error: {str(e)}" async def execute_glob(tool_input: dict) -> str: """Find files matching a glob pattern""" pattern = tool_input.get("pattern", "") base_path = tool_input.get("path", "/home/admin") if not base_path.startswith("/home/admin"): base_path = "/home/admin" try: full_pattern = os.path.join(base_path, pattern) matches = globlib.glob(full_pattern, recursive=True) matches.sort(key=lambda x: os.path.getmtime(x) if os.path.exists(x) else 0, reverse=True) if len(matches) > 100: matches = matches[:100] return "\n".join(matches) + f"\n... (showing 100 of {len(matches)} matches)" return "\n".join(matches) if matches else "No matches found" except Exception as e: return f"Error: {str(e)}" async def execute_grep(tool_input: dict) -> str: """Search file contents using regex""" pattern = tool_input.get("pattern", "") path = tool_input.get("path", "/home/admin") file_glob = tool_input.get("glob", None) context = tool_input.get("context", 0) if not path.startswith("/home/admin"): path = "/home/admin" try: regex = re.compile(pattern) except re.error as e: return f"Error: invalid regex: {e}" results = [] files_searched = 0 max_results = 50 try: if os.path.isfile(path): files = [path] else: if file_glob: files = globlib.glob(os.path.join(path, "**", file_glob), recursive=True) else: files = globlib.glob(os.path.join(path, "**", "*"), recursive=True) files = [f for f in files if os.path.isfile(f)] for filepath in files: if len(results) >= max_results: break try: if os.path.getsize(filepath) > 1_000_000: continue async with aiofiles.open(filepath, "r", errors="ignore") as f: lines = await f.readlines() files_searched += 1 for i, line in enumerate(lines): if regex.search(line): if len(results) >= max_results: break match_lines = [] start = max(0, i - context) end = min(len(lines), i + context + 1) for j in range(start, end): prefix = ">" if j == i else " " match_lines.append(f"{prefix}{j+1:4d}: {lines[j].rstrip()}") results.append(f"{filepath}:\n" + "\n".join(match_lines)) except (PermissionError, IsADirectoryError, UnicodeDecodeError): continue if not results: return f"No matches found (searched {files_searched} files)" output = "\n\n".join(results) if len(results) >= max_results: output += f"\n\n... (showing first {max_results} matches)" return output except Exception as e: return f"Error: {str(e)}" async def execute_bash(tool_input: dict) -> str: """Execute safe bash commands""" cmd = tool_input.get("command", "").strip() allowed = False for prefix in ALLOWED_BASH_PREFIXES: if cmd.startswith(prefix): allowed = True break if not allowed: return f"Error: command not allowed. Permitted prefixes: {', '.join(ALLOWED_BASH_PREFIXES)}" try: result = subprocess.run( cmd, shell=True, capture_output=True, text=True, timeout=30, cwd="/home/admin" ) output = result.stdout + result.stderr return output[:5000].strip() if output.strip() else "Command completed (no output)" except subprocess.TimeoutExpired: return "Error: command timed out (30s limit)" except Exception as e: return f"Error: {str(e)}"