From 27d0dce910d360dbe9df6432ec8ae9f9e5c60caa Mon Sep 17 00:00:00 2001 From: egregore Date: Mon, 2 Feb 2026 12:52:19 +0000 Subject: [PATCH] Initial schedule service implementation Cron-like daemon that wakes reason service on schedule: - FastAPI on port 8084 - CRUD API for managing schedules - Cron expression support via croniter - Calls POST /process on reason service - Tracks execution history per task - 30-second check interval Co-Authored-By: Claude Opus 4.5 --- .gitignore | 3 + __init__.py | 1 + main.py | 187 ++++++++++++++++++++++++++++++++++++++ scheduler.py | 250 +++++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 441 insertions(+) create mode 100644 .gitignore create mode 100644 __init__.py create mode 100644 main.py create mode 100644 scheduler.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b6cf5f0 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +__pycache__/ +*.pyc +.env diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..2eefa46 --- /dev/null +++ b/__init__.py @@ -0,0 +1 @@ +"""Egregore Schedule Service - Cron-like task scheduler""" diff --git a/main.py b/main.py new file mode 100644 index 0000000..9704cec --- /dev/null +++ b/main.py @@ -0,0 +1,187 @@ +#!/usr/bin/env python3 +""" +Egregore Schedule Service - Cron-like task scheduler + +Provides HTTP API for managing scheduled tasks that wake the reason service. +Runs on port 8084. +""" + +import asyncio +import os +import uuid +from contextlib import asynccontextmanager +from datetime import datetime +from typing import Optional + +from fastapi import FastAPI, HTTPException +from pydantic import BaseModel + +from scheduler import Scheduler, ScheduledTask + + +# Lifespan for startup/shutdown +@asynccontextmanager +async def lifespan(app: FastAPI): + # Start the scheduler background task + scheduler_task = asyncio.create_task(scheduler.run()) + yield + # Stop the scheduler + scheduler.stop() + scheduler_task.cancel() + try: + await scheduler_task + except asyncio.CancelledError: + pass + + +app = FastAPI(title="Egregore Schedule Service", docs_url="/docs", lifespan=lifespan) + +# Initialize scheduler +DATA_DIR = "/home/admin/data/schedules" +scheduler = Scheduler(DATA_DIR) + + +# Request/Response models +class ScheduleCreate(BaseModel): + name: str + cron: str # Cron expression (e.g., "0 9 * * *" for 9am daily) + instruction: str # What to tell reason to do + enabled: bool = True + + +class ScheduleUpdate(BaseModel): + name: Optional[str] = None + cron: Optional[str] = None + instruction: Optional[str] = None + enabled: Optional[bool] = None + + +class ScheduleResponse(BaseModel): + id: str + name: str + cron: str + instruction: str + enabled: bool + created_at: str + last_run: Optional[str] + next_run: Optional[str] + run_count: int + + +# Endpoints +@app.get("/health") +async def health(): + """Health check endpoint""" + return {"status": "ok", "service": "schedule"} + + +@app.get("/schedules") +async def list_schedules(): + """List all scheduled tasks""" + tasks = scheduler.list_tasks() + return {"schedules": [task_to_response(t) for t in tasks]} + + +@app.post("/schedules") +async def create_schedule(req: ScheduleCreate): + """Create a new scheduled task""" + task_id = str(uuid.uuid4())[:8] + task = ScheduledTask( + id=task_id, + name=req.name, + cron=req.cron, + instruction=req.instruction, + enabled=req.enabled, + created_at=datetime.utcnow().isoformat() + "Z", + last_run=None, + run_count=0 + ) + + try: + scheduler.add_task(task) + return task_to_response(task) + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + + +@app.get("/schedules/{task_id}") +async def get_schedule(task_id: str): + """Get a specific scheduled task""" + task = scheduler.get_task(task_id) + if not task: + raise HTTPException(status_code=404, detail="Schedule not found") + return task_to_response(task) + + +@app.patch("/schedules/{task_id}") +async def update_schedule(task_id: str, req: ScheduleUpdate): + """Update a scheduled task""" + task = scheduler.get_task(task_id) + if not task: + raise HTTPException(status_code=404, detail="Schedule not found") + + if req.name is not None: + task.name = req.name + if req.cron is not None: + task.cron = req.cron + if req.instruction is not None: + task.instruction = req.instruction + if req.enabled is not None: + task.enabled = req.enabled + + try: + scheduler.update_task(task) + return task_to_response(task) + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + + +@app.delete("/schedules/{task_id}") +async def delete_schedule(task_id: str): + """Delete a scheduled task""" + if not scheduler.delete_task(task_id): + raise HTTPException(status_code=404, detail="Schedule not found") + return {"status": "deleted", "id": task_id} + + +@app.post("/schedules/{task_id}/run") +async def run_schedule(task_id: str): + """Manually trigger a scheduled task""" + task = scheduler.get_task(task_id) + if not task: + raise HTTPException(status_code=404, detail="Schedule not found") + + result = await scheduler.execute_task(task) + return {"status": "executed", "id": task_id, "result": result} + + +@app.get("/schedules/{task_id}/history") +async def get_schedule_history(task_id: str, limit: int = 20): + """Get execution history for a scheduled task""" + task = scheduler.get_task(task_id) + if not task: + raise HTTPException(status_code=404, detail="Schedule not found") + + history = scheduler.get_history(task_id, limit) + return {"id": task_id, "history": history} + + +def task_to_response(task: ScheduledTask) -> dict: + """Convert a task to API response format""" + next_run = scheduler.get_next_run(task) if task.enabled else None + return { + "id": task.id, + "name": task.name, + "cron": task.cron, + "instruction": task.instruction, + "enabled": task.enabled, + "created_at": task.created_at, + "last_run": task.last_run, + "next_run": next_run.isoformat() + "Z" if next_run else None, + "run_count": task.run_count + } + + +if __name__ == "__main__": + import uvicorn + uvicorn.run(app, host="127.0.0.1", port=8084) diff --git a/scheduler.py b/scheduler.py new file mode 100644 index 0000000..b446d2c --- /dev/null +++ b/scheduler.py @@ -0,0 +1,250 @@ +""" +Egregore Schedule Service - Scheduler engine + +Handles cron parsing, task storage, and execution via reason service. +""" + +import asyncio +import json +import os +from dataclasses import dataclass, asdict +from datetime import datetime +from typing import Optional + +import httpx +from croniter import croniter + + +REASON_URL = os.getenv("REASON_URL", "http://127.0.0.1:8081") + + +@dataclass +class ScheduledTask: + id: str + name: str + cron: str + instruction: str + enabled: bool + created_at: str + last_run: Optional[str] + run_count: int + + +@dataclass +class ExecutionRecord: + timestamp: str + success: bool + duration_ms: int + error: Optional[str] = None + + +class Scheduler: + """Manages scheduled tasks and executes them via reason service""" + + def __init__(self, data_dir: str): + self.data_dir = data_dir + self.tasks_file = os.path.join(data_dir, "tasks.json") + self.history_dir = os.path.join(data_dir, "history") + self._running = False + self._tasks: dict[str, ScheduledTask] = {} + + # Ensure directories exist + os.makedirs(data_dir, exist_ok=True) + os.makedirs(self.history_dir, exist_ok=True) + + # Load existing tasks + self._load_tasks() + + def _load_tasks(self): + """Load tasks from disk""" + if os.path.exists(self.tasks_file): + try: + with open(self.tasks_file, "r") as f: + data = json.load(f) + for task_data in data: + task = ScheduledTask(**task_data) + self._tasks[task.id] = task + except (json.JSONDecodeError, TypeError): + pass + + def _save_tasks(self): + """Save tasks to disk""" + data = [asdict(task) for task in self._tasks.values()] + with open(self.tasks_file, "w") as f: + json.dump(data, f, indent=2) + + def _validate_cron(self, cron_expr: str) -> bool: + """Validate a cron expression""" + try: + croniter(cron_expr) + return True + except (ValueError, KeyError): + return False + + def list_tasks(self) -> list[ScheduledTask]: + """List all scheduled tasks""" + return list(self._tasks.values()) + + def get_task(self, task_id: str) -> Optional[ScheduledTask]: + """Get a task by ID""" + return self._tasks.get(task_id) + + def add_task(self, task: ScheduledTask) -> None: + """Add a new task""" + if not self._validate_cron(task.cron): + raise ValueError(f"Invalid cron expression: {task.cron}") + self._tasks[task.id] = task + self._save_tasks() + + def update_task(self, task: ScheduledTask) -> None: + """Update an existing task""" + if not self._validate_cron(task.cron): + raise ValueError(f"Invalid cron expression: {task.cron}") + self._tasks[task.id] = task + self._save_tasks() + + def delete_task(self, task_id: str) -> bool: + """Delete a task""" + if task_id in self._tasks: + del self._tasks[task_id] + self._save_tasks() + return True + return False + + def get_next_run(self, task: ScheduledTask) -> Optional[datetime]: + """Get the next run time for a task""" + try: + cron = croniter(task.cron, datetime.utcnow()) + return cron.get_next(datetime) + except (ValueError, KeyError): + return None + + def get_history(self, task_id: str, limit: int = 20) -> list[dict]: + """Get execution history for a task""" + history_file = os.path.join(self.history_dir, f"{task_id}.json") + if not os.path.exists(history_file): + return [] + + try: + with open(history_file, "r") as f: + records = json.load(f) + return records[-limit:] + except (json.JSONDecodeError, TypeError): + return [] + + def _append_history(self, task_id: str, record: ExecutionRecord): + """Append an execution record to history""" + history_file = os.path.join(self.history_dir, f"{task_id}.json") + + records = [] + if os.path.exists(history_file): + try: + with open(history_file, "r") as f: + records = json.load(f) + except (json.JSONDecodeError, TypeError): + pass + + records.append(asdict(record)) + + # Keep last 100 records + records = records[-100:] + + with open(history_file, "w") as f: + json.dump(records, f, indent=2) + + async def execute_task(self, task: ScheduledTask) -> dict: + """Execute a task by calling the reason service""" + start_time = datetime.utcnow() + + # Build conversation history with the scheduled instruction + history = [ + { + "role": "user", + "content": f"[Scheduled Task: {task.name}]\n\n{task.instruction}" + } + ] + + result = {"success": False, "error": None, "response": None} + + try: + async with httpx.AsyncClient(timeout=300.0) as client: + response = await client.post( + f"{REASON_URL}/process", + json={ + "model": "claude-sonnet-4-20250514", + "history": history, + "max_iterations": 10 + } + ) + response.raise_for_status() + data = response.json() + result["success"] = True + result["response"] = data.get("blocks", []) + + except httpx.TimeoutException: + result["error"] = "Request to reason service timed out" + except httpx.HTTPStatusError as e: + result["error"] = f"Reason service error: {e.response.status_code}" + except Exception as e: + result["error"] = str(e) + + # Calculate duration + end_time = datetime.utcnow() + duration_ms = int((end_time - start_time).total_seconds() * 1000) + + # Record execution + record = ExecutionRecord( + timestamp=start_time.isoformat() + "Z", + success=result["success"], + duration_ms=duration_ms, + error=result["error"] + ) + self._append_history(task.id, record) + + # Update task + task.last_run = start_time.isoformat() + "Z" + task.run_count += 1 + self._save_tasks() + + return result + + def stop(self): + """Stop the scheduler loop""" + self._running = False + + async def run(self): + """Main scheduler loop - checks for due tasks every minute""" + self._running = True + print(f"[schedule] Scheduler started with {len(self._tasks)} tasks") + + while self._running: + now = datetime.utcnow() + + for task in self._tasks.values(): + if not task.enabled: + continue + + try: + # Check if task should run now + cron = croniter(task.cron, now) + prev_time = cron.get_prev(datetime) + + # If last run was before the previous scheduled time, run now + if task.last_run: + last_run_dt = datetime.fromisoformat(task.last_run.rstrip("Z")) + if last_run_dt < prev_time: + # Check we're within 60 seconds of the scheduled time + if (now - prev_time).total_seconds() < 60: + print(f"[schedule] Executing task: {task.name}") + await self.execute_task(task) + else: + # Never run - check if we're within 60 seconds of schedule + if (now - prev_time).total_seconds() < 60: + print(f"[schedule] Executing task (first run): {task.name}") + await self.execute_task(task) + + except Exception as e: + print(f"[schedule] Error checking task {task.id}: {e}") + + # Sleep for 30 seconds before next check + await asyncio.sleep(30)