""" Egregore Schedule Service - Scheduler engine Handles cron parsing, task storage, and execution via reason service. """ import asyncio import json import os from dataclasses import dataclass, asdict from datetime import datetime from typing import Optional import httpx from croniter import croniter REASON_URL = os.getenv("REASON_URL", "http://127.0.0.1:8081") @dataclass class ScheduledTask: id: str name: str cron: str instruction: str enabled: bool created_at: str last_run: Optional[str] run_count: int @dataclass class ExecutionRecord: timestamp: str success: bool duration_ms: int error: Optional[str] = None class Scheduler: """Manages scheduled tasks and executes them via reason service""" def __init__(self, data_dir: str): self.data_dir = data_dir self.tasks_file = os.path.join(data_dir, "tasks.json") self.history_dir = os.path.join(data_dir, "history") self._running = False self._tasks: dict[str, ScheduledTask] = {} # Ensure directories exist os.makedirs(data_dir, exist_ok=True) os.makedirs(self.history_dir, exist_ok=True) # Load existing tasks self._load_tasks() def _load_tasks(self): """Load tasks from disk""" if os.path.exists(self.tasks_file): try: with open(self.tasks_file, "r") as f: data = json.load(f) for task_data in data: task = ScheduledTask(**task_data) self._tasks[task.id] = task except (json.JSONDecodeError, TypeError): pass def _save_tasks(self): """Save tasks to disk""" data = [asdict(task) for task in self._tasks.values()] with open(self.tasks_file, "w") as f: json.dump(data, f, indent=2) def _validate_cron(self, cron_expr: str) -> bool: """Validate a cron expression""" try: croniter(cron_expr) return True except (ValueError, KeyError): return False def list_tasks(self) -> list[ScheduledTask]: """List all scheduled tasks""" return list(self._tasks.values()) def get_task(self, task_id: str) -> Optional[ScheduledTask]: """Get a task by ID""" return self._tasks.get(task_id) def add_task(self, task: ScheduledTask) -> None: """Add a new task""" if not self._validate_cron(task.cron): raise ValueError(f"Invalid cron expression: {task.cron}") self._tasks[task.id] = task self._save_tasks() def update_task(self, task: ScheduledTask) -> None: """Update an existing task""" if not self._validate_cron(task.cron): raise ValueError(f"Invalid cron expression: {task.cron}") self._tasks[task.id] = task self._save_tasks() def delete_task(self, task_id: str) -> bool: """Delete a task""" if task_id in self._tasks: del self._tasks[task_id] self._save_tasks() return True return False def get_next_run(self, task: ScheduledTask) -> Optional[datetime]: """Get the next run time for a task""" try: cron = croniter(task.cron, datetime.utcnow()) return cron.get_next(datetime) except (ValueError, KeyError): return None def get_history(self, task_id: str, limit: int = 20) -> list[dict]: """Get execution history for a task""" history_file = os.path.join(self.history_dir, f"{task_id}.json") if not os.path.exists(history_file): return [] try: with open(history_file, "r") as f: records = json.load(f) return records[-limit:] except (json.JSONDecodeError, TypeError): return [] def _append_history(self, task_id: str, record: ExecutionRecord): """Append an execution record to history""" history_file = os.path.join(self.history_dir, f"{task_id}.json") records = [] if os.path.exists(history_file): try: with open(history_file, "r") as f: records = json.load(f) except (json.JSONDecodeError, TypeError): pass records.append(asdict(record)) # Keep last 100 records records = records[-100:] with open(history_file, "w") as f: json.dump(records, f, indent=2) async def execute_task(self, task: ScheduledTask) -> dict: """Execute a task by calling the reason service""" start_time = datetime.utcnow() # Build conversation history with the scheduled instruction history = [ { "role": "user", "content": f"[Scheduled Task: {task.name}]\n\n{task.instruction}" } ] result = {"success": False, "error": None, "response": None} try: async with httpx.AsyncClient(timeout=300.0) as client: response = await client.post( f"{REASON_URL}/process", json={ "model": "claude-sonnet-4-20250514", "history": history, "max_iterations": 10 } ) response.raise_for_status() data = response.json() result["success"] = True result["response"] = data.get("blocks", []) except httpx.TimeoutException: result["error"] = "Request to reason service timed out" except httpx.HTTPStatusError as e: result["error"] = f"Reason service error: {e.response.status_code}" except Exception as e: result["error"] = str(e) # Calculate duration end_time = datetime.utcnow() duration_ms = int((end_time - start_time).total_seconds() * 1000) # Record execution record = ExecutionRecord( timestamp=start_time.isoformat() + "Z", success=result["success"], duration_ms=duration_ms, error=result["error"] ) self._append_history(task.id, record) # Update task task.last_run = start_time.isoformat() + "Z" task.run_count += 1 self._save_tasks() return result def stop(self): """Stop the scheduler loop""" self._running = False async def run(self): """Main scheduler loop - checks for due tasks every minute""" self._running = True print(f"[schedule] Scheduler started with {len(self._tasks)} tasks") while self._running: now = datetime.utcnow() for task in self._tasks.values(): if not task.enabled: continue try: # Check if task should run now cron = croniter(task.cron, now) prev_time = cron.get_prev(datetime) # If last run was before the previous scheduled time, run now if task.last_run: last_run_dt = datetime.fromisoformat(task.last_run.rstrip("Z")) if last_run_dt < prev_time: # Check we're within 60 seconds of the scheduled time if (now - prev_time).total_seconds() < 60: print(f"[schedule] Executing task: {task.name}") await self.execute_task(task) else: # Never run - check if we're within 60 seconds of schedule if (now - prev_time).total_seconds() < 60: print(f"[schedule] Executing task (first run): {task.name}") await self.execute_task(task) except Exception as e: print(f"[schedule] Error checking task {task.id}: {e}") # Sleep for 30 seconds before next check await asyncio.sleep(30)