Initial schedule service implementation

Cron-like daemon that wakes reason service on schedule:
- FastAPI on port 8084
- CRUD API for managing schedules
- Cron expression support via croniter
- Calls POST /process on reason service
- Tracks execution history per task
- 30-second check interval

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
egregore 2026-02-02 12:52:19 +00:00
commit 27d0dce910
4 changed files with 441 additions and 0 deletions

3
.gitignore vendored Normal file
View file

@ -0,0 +1,3 @@
__pycache__/
*.pyc
.env

1
__init__.py Normal file
View file

@ -0,0 +1 @@
"""Egregore Schedule Service - Cron-like task scheduler"""

187
main.py Normal file
View file

@ -0,0 +1,187 @@
#!/usr/bin/env python3
"""
Egregore Schedule Service - Cron-like task scheduler
Provides HTTP API for managing scheduled tasks that wake the reason service.
Runs on port 8084.
"""
import asyncio
import os
import uuid
from contextlib import asynccontextmanager
from datetime import datetime
from typing import Optional
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from scheduler import Scheduler, ScheduledTask
# Lifespan for startup/shutdown
@asynccontextmanager
async def lifespan(app: FastAPI):
# Start the scheduler background task
scheduler_task = asyncio.create_task(scheduler.run())
yield
# Stop the scheduler
scheduler.stop()
scheduler_task.cancel()
try:
await scheduler_task
except asyncio.CancelledError:
pass
app = FastAPI(title="Egregore Schedule Service", docs_url="/docs", lifespan=lifespan)
# Initialize scheduler
DATA_DIR = "/home/admin/data/schedules"
scheduler = Scheduler(DATA_DIR)
# Request/Response models
class ScheduleCreate(BaseModel):
name: str
cron: str # Cron expression (e.g., "0 9 * * *" for 9am daily)
instruction: str # What to tell reason to do
enabled: bool = True
class ScheduleUpdate(BaseModel):
name: Optional[str] = None
cron: Optional[str] = None
instruction: Optional[str] = None
enabled: Optional[bool] = None
class ScheduleResponse(BaseModel):
id: str
name: str
cron: str
instruction: str
enabled: bool
created_at: str
last_run: Optional[str]
next_run: Optional[str]
run_count: int
# Endpoints
@app.get("/health")
async def health():
"""Health check endpoint"""
return {"status": "ok", "service": "schedule"}
@app.get("/schedules")
async def list_schedules():
"""List all scheduled tasks"""
tasks = scheduler.list_tasks()
return {"schedules": [task_to_response(t) for t in tasks]}
@app.post("/schedules")
async def create_schedule(req: ScheduleCreate):
"""Create a new scheduled task"""
task_id = str(uuid.uuid4())[:8]
task = ScheduledTask(
id=task_id,
name=req.name,
cron=req.cron,
instruction=req.instruction,
enabled=req.enabled,
created_at=datetime.utcnow().isoformat() + "Z",
last_run=None,
run_count=0
)
try:
scheduler.add_task(task)
return task_to_response(task)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/schedules/{task_id}")
async def get_schedule(task_id: str):
"""Get a specific scheduled task"""
task = scheduler.get_task(task_id)
if not task:
raise HTTPException(status_code=404, detail="Schedule not found")
return task_to_response(task)
@app.patch("/schedules/{task_id}")
async def update_schedule(task_id: str, req: ScheduleUpdate):
"""Update a scheduled task"""
task = scheduler.get_task(task_id)
if not task:
raise HTTPException(status_code=404, detail="Schedule not found")
if req.name is not None:
task.name = req.name
if req.cron is not None:
task.cron = req.cron
if req.instruction is not None:
task.instruction = req.instruction
if req.enabled is not None:
task.enabled = req.enabled
try:
scheduler.update_task(task)
return task_to_response(task)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@app.delete("/schedules/{task_id}")
async def delete_schedule(task_id: str):
"""Delete a scheduled task"""
if not scheduler.delete_task(task_id):
raise HTTPException(status_code=404, detail="Schedule not found")
return {"status": "deleted", "id": task_id}
@app.post("/schedules/{task_id}/run")
async def run_schedule(task_id: str):
"""Manually trigger a scheduled task"""
task = scheduler.get_task(task_id)
if not task:
raise HTTPException(status_code=404, detail="Schedule not found")
result = await scheduler.execute_task(task)
return {"status": "executed", "id": task_id, "result": result}
@app.get("/schedules/{task_id}/history")
async def get_schedule_history(task_id: str, limit: int = 20):
"""Get execution history for a scheduled task"""
task = scheduler.get_task(task_id)
if not task:
raise HTTPException(status_code=404, detail="Schedule not found")
history = scheduler.get_history(task_id, limit)
return {"id": task_id, "history": history}
def task_to_response(task: ScheduledTask) -> dict:
"""Convert a task to API response format"""
next_run = scheduler.get_next_run(task) if task.enabled else None
return {
"id": task.id,
"name": task.name,
"cron": task.cron,
"instruction": task.instruction,
"enabled": task.enabled,
"created_at": task.created_at,
"last_run": task.last_run,
"next_run": next_run.isoformat() + "Z" if next_run else None,
"run_count": task.run_count
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=8084)

250
scheduler.py Normal file
View file

@ -0,0 +1,250 @@
"""
Egregore Schedule Service - Scheduler engine
Handles cron parsing, task storage, and execution via reason service.
"""
import asyncio
import json
import os
from dataclasses import dataclass, asdict
from datetime import datetime
from typing import Optional
import httpx
from croniter import croniter
REASON_URL = os.getenv("REASON_URL", "http://127.0.0.1:8081")
@dataclass
class ScheduledTask:
id: str
name: str
cron: str
instruction: str
enabled: bool
created_at: str
last_run: Optional[str]
run_count: int
@dataclass
class ExecutionRecord:
timestamp: str
success: bool
duration_ms: int
error: Optional[str] = None
class Scheduler:
"""Manages scheduled tasks and executes them via reason service"""
def __init__(self, data_dir: str):
self.data_dir = data_dir
self.tasks_file = os.path.join(data_dir, "tasks.json")
self.history_dir = os.path.join(data_dir, "history")
self._running = False
self._tasks: dict[str, ScheduledTask] = {}
# Ensure directories exist
os.makedirs(data_dir, exist_ok=True)
os.makedirs(self.history_dir, exist_ok=True)
# Load existing tasks
self._load_tasks()
def _load_tasks(self):
"""Load tasks from disk"""
if os.path.exists(self.tasks_file):
try:
with open(self.tasks_file, "r") as f:
data = json.load(f)
for task_data in data:
task = ScheduledTask(**task_data)
self._tasks[task.id] = task
except (json.JSONDecodeError, TypeError):
pass
def _save_tasks(self):
"""Save tasks to disk"""
data = [asdict(task) for task in self._tasks.values()]
with open(self.tasks_file, "w") as f:
json.dump(data, f, indent=2)
def _validate_cron(self, cron_expr: str) -> bool:
"""Validate a cron expression"""
try:
croniter(cron_expr)
return True
except (ValueError, KeyError):
return False
def list_tasks(self) -> list[ScheduledTask]:
"""List all scheduled tasks"""
return list(self._tasks.values())
def get_task(self, task_id: str) -> Optional[ScheduledTask]:
"""Get a task by ID"""
return self._tasks.get(task_id)
def add_task(self, task: ScheduledTask) -> None:
"""Add a new task"""
if not self._validate_cron(task.cron):
raise ValueError(f"Invalid cron expression: {task.cron}")
self._tasks[task.id] = task
self._save_tasks()
def update_task(self, task: ScheduledTask) -> None:
"""Update an existing task"""
if not self._validate_cron(task.cron):
raise ValueError(f"Invalid cron expression: {task.cron}")
self._tasks[task.id] = task
self._save_tasks()
def delete_task(self, task_id: str) -> bool:
"""Delete a task"""
if task_id in self._tasks:
del self._tasks[task_id]
self._save_tasks()
return True
return False
def get_next_run(self, task: ScheduledTask) -> Optional[datetime]:
"""Get the next run time for a task"""
try:
cron = croniter(task.cron, datetime.utcnow())
return cron.get_next(datetime)
except (ValueError, KeyError):
return None
def get_history(self, task_id: str, limit: int = 20) -> list[dict]:
"""Get execution history for a task"""
history_file = os.path.join(self.history_dir, f"{task_id}.json")
if not os.path.exists(history_file):
return []
try:
with open(history_file, "r") as f:
records = json.load(f)
return records[-limit:]
except (json.JSONDecodeError, TypeError):
return []
def _append_history(self, task_id: str, record: ExecutionRecord):
"""Append an execution record to history"""
history_file = os.path.join(self.history_dir, f"{task_id}.json")
records = []
if os.path.exists(history_file):
try:
with open(history_file, "r") as f:
records = json.load(f)
except (json.JSONDecodeError, TypeError):
pass
records.append(asdict(record))
# Keep last 100 records
records = records[-100:]
with open(history_file, "w") as f:
json.dump(records, f, indent=2)
async def execute_task(self, task: ScheduledTask) -> dict:
"""Execute a task by calling the reason service"""
start_time = datetime.utcnow()
# Build conversation history with the scheduled instruction
history = [
{
"role": "user",
"content": f"[Scheduled Task: {task.name}]\n\n{task.instruction}"
}
]
result = {"success": False, "error": None, "response": None}
try:
async with httpx.AsyncClient(timeout=300.0) as client:
response = await client.post(
f"{REASON_URL}/process",
json={
"model": "claude-sonnet-4-20250514",
"history": history,
"max_iterations": 10
}
)
response.raise_for_status()
data = response.json()
result["success"] = True
result["response"] = data.get("blocks", [])
except httpx.TimeoutException:
result["error"] = "Request to reason service timed out"
except httpx.HTTPStatusError as e:
result["error"] = f"Reason service error: {e.response.status_code}"
except Exception as e:
result["error"] = str(e)
# Calculate duration
end_time = datetime.utcnow()
duration_ms = int((end_time - start_time).total_seconds() * 1000)
# Record execution
record = ExecutionRecord(
timestamp=start_time.isoformat() + "Z",
success=result["success"],
duration_ms=duration_ms,
error=result["error"]
)
self._append_history(task.id, record)
# Update task
task.last_run = start_time.isoformat() + "Z"
task.run_count += 1
self._save_tasks()
return result
def stop(self):
"""Stop the scheduler loop"""
self._running = False
async def run(self):
"""Main scheduler loop - checks for due tasks every minute"""
self._running = True
print(f"[schedule] Scheduler started with {len(self._tasks)} tasks")
while self._running:
now = datetime.utcnow()
for task in self._tasks.values():
if not task.enabled:
continue
try:
# Check if task should run now
cron = croniter(task.cron, now)
prev_time = cron.get_prev(datetime)
# If last run was before the previous scheduled time, run now
if task.last_run:
last_run_dt = datetime.fromisoformat(task.last_run.rstrip("Z"))
if last_run_dt < prev_time:
# Check we're within 60 seconds of the scheduled time
if (now - prev_time).total_seconds() < 60:
print(f"[schedule] Executing task: {task.name}")
await self.execute_task(task)
else:
# Never run - check if we're within 60 seconds of schedule
if (now - prev_time).total_seconds() < 60:
print(f"[schedule] Executing task (first run): {task.name}")
await self.execute_task(task)
except Exception as e:
print(f"[schedule] Error checking task {task.id}: {e}")
# Sleep for 30 seconds before next check
await asyncio.sleep(30)