Port schedule service from Python to Rust + Rouille

- Background scheduler thread (30s interval)
- Cron expression parsing with cron crate
- File-based JSON storage (compatible with existing data)
- HTTP API: CRUD, manual run, history
- ~3MB binary vs ~50MB Python

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
egregore 2026-02-02 20:29:59 +00:00
parent 6499e17390
commit 8873c4e377
3 changed files with 1999 additions and 0 deletions

1373
Cargo.lock generated Normal file

File diff suppressed because it is too large Load diff

17
Cargo.toml Normal file
View file

@ -0,0 +1,17 @@
[package]
name = "schedule"
version = "0.1.0"
edition = "2021"
[dependencies]
rouille = "3.6"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
chrono = { version = "0.4", features = ["serde"] }
cron = "0.12"
ureq = { version = "2.9", features = ["json"] }
uuid = { version = "1.0", features = ["v4"] }
[profile.release]
strip = true
lto = true

609
src/main.rs Normal file
View file

@ -0,0 +1,609 @@
//! Egregore Schedule Service
//!
//! Cron-like task scheduler that wakes the reason service.
//! Runs on port 8084.
use chrono::{DateTime, Duration, Utc};
use cron::Schedule;
use rouille::{router, Request, Response};
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use std::collections::HashMap;
use std::fs;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration as StdDuration;
use uuid::Uuid;
const DATA_DIR: &str = "/home/admin/data/schedules";
const BIND_ADDR: &str = "127.0.0.1:8084";
const REASON_URL: &str = "http://127.0.0.1:8081";
const DEFAULT_MODEL: &str = "claude-sonnet-4-20250514";
// ============================================================================
// Data Structures
// ============================================================================
#[derive(Serialize, Deserialize, Clone, Debug)]
struct ScheduledTask {
id: String,
name: String,
cron: String,
instruction: String,
enabled: bool,
created_at: String,
last_run: Option<String>,
run_count: u32,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
struct ExecutionRecord {
timestamp: String,
success: bool,
duration_ms: u64,
#[serde(skip_serializing_if = "Option::is_none")]
error: Option<String>,
}
#[derive(Deserialize)]
struct CreateSchedule {
name: String,
cron: String,
instruction: String,
#[serde(default = "default_enabled")]
enabled: bool,
}
fn default_enabled() -> bool {
true
}
#[derive(Deserialize, Default)]
struct UpdateSchedule {
name: Option<String>,
cron: Option<String>,
instruction: Option<String>,
enabled: Option<bool>,
}
// Shared application state
struct AppState {
tasks: Mutex<HashMap<String, ScheduledTask>>,
}
// ============================================================================
// File Operations
// ============================================================================
fn tasks_file() -> PathBuf {
PathBuf::from(DATA_DIR).join("tasks.json")
}
fn history_dir() -> PathBuf {
PathBuf::from(DATA_DIR).join("history")
}
fn load_tasks() -> HashMap<String, ScheduledTask> {
let path = tasks_file();
if !path.exists() {
return HashMap::new();
}
match fs::read_to_string(&path) {
Ok(content) => {
let tasks: Vec<ScheduledTask> = serde_json::from_str(&content).unwrap_or_default();
tasks.into_iter().map(|t| (t.id.clone(), t)).collect()
}
Err(_) => HashMap::new(),
}
}
fn save_tasks(tasks: &HashMap<String, ScheduledTask>) {
let tasks_vec: Vec<&ScheduledTask> = tasks.values().collect();
if let Ok(json) = serde_json::to_string_pretty(&tasks_vec) {
let _ = fs::write(tasks_file(), json);
}
}
fn load_history(task_id: &str) -> Vec<ExecutionRecord> {
let path = history_dir().join(format!("{}.json", task_id));
if !path.exists() {
return vec![];
}
match fs::read_to_string(&path) {
Ok(content) => serde_json::from_str(&content).unwrap_or_default(),
Err(_) => vec![],
}
}
fn append_history(task_id: &str, record: ExecutionRecord) {
let path = history_dir().join(format!("{}.json", task_id));
let mut records = load_history(task_id);
records.push(record);
// Keep last 100 records
if records.len() > 100 {
let skip_count = records.len() - 100;
records = records.into_iter().skip(skip_count).collect();
}
if let Ok(json) = serde_json::to_string_pretty(&records) {
let _ = fs::write(path, json);
}
}
// ============================================================================
// Cron Utilities
// ============================================================================
/// Convert 5-field cron to 6-field (with seconds) for the cron crate
fn to_cron_schedule(cron_expr: &str) -> Result<Schedule, String> {
// The cron crate expects 6 or 7 fields (seconds, minutes, hours, day, month, day-of-week, [year])
// Standard cron has 5 fields, so prepend "0 " for seconds
let full_expr = format!("0 {}", cron_expr);
Schedule::from_str(&full_expr).map_err(|e| format!("Invalid cron expression: {}", e))
}
fn validate_cron(cron_expr: &str) -> Result<(), String> {
to_cron_schedule(cron_expr).map(|_| ())
}
fn get_next_run(task: &ScheduledTask) -> Option<DateTime<Utc>> {
to_cron_schedule(&task.cron)
.ok()?
.upcoming(Utc)
.next()
}
fn should_run_now(task: &ScheduledTask, now: DateTime<Utc>) -> bool {
if let Ok(schedule) = to_cron_schedule(&task.cron) {
// Look back 2 minutes to find the most recent scheduled time
let lookback = now - Duration::minutes(2);
if let Some(prev_scheduled) = schedule.after(&lookback).next() {
// Only consider if the scheduled time is in the past (or now)
if prev_scheduled <= now {
// Check if we're within 60 seconds of the scheduled time
let since_scheduled = (now - prev_scheduled).num_seconds();
if since_scheduled >= 0 && since_scheduled < 60 {
// Check if we already ran after this scheduled time
if let Some(last_run) = &task.last_run {
if let Ok(last) = DateTime::parse_from_rfc3339(last_run) {
return last.with_timezone(&Utc) < prev_scheduled;
}
}
// Never run before, so run now
return true;
}
}
}
}
false
}
// ============================================================================
// Task Execution
// ============================================================================
fn execute_task(task: &ScheduledTask) -> (bool, u64, Option<String>, Option<Value>) {
let start = std::time::Instant::now();
let history = vec![json!({
"role": "user",
"content": format!("[Scheduled Task: {}]\n\n{}", task.name, task.instruction)
})];
let payload = json!({
"model": DEFAULT_MODEL,
"history": history,
"max_iterations": 10
});
let result = ureq::post(&format!("{}/process", REASON_URL))
.timeout(StdDuration::from_secs(300))
.send_json(&payload);
let duration_ms = start.elapsed().as_millis() as u64;
match result {
Ok(response) => {
let body: Value = response.into_json().unwrap_or(json!({}));
(true, duration_ms, None, Some(body))
}
Err(ureq::Error::Status(code, resp)) => {
let msg = resp.into_string().unwrap_or_default();
(
false,
duration_ms,
Some(format!("HTTP {}: {}", code, msg)),
None,
)
}
Err(e) => (false, duration_ms, Some(e.to_string()), None),
}
}
// ============================================================================
// Background Scheduler
// ============================================================================
fn run_scheduler(state: Arc<AppState>) {
println!("[schedule] Scheduler thread started");
loop {
let now = Utc::now();
// Get tasks snapshot
let tasks: Vec<ScheduledTask> = {
let guard = state.tasks.lock().unwrap();
guard.values().cloned().collect()
};
for task in tasks {
if !task.enabled {
continue;
}
if should_run_now(&task, now) {
println!("[schedule] Executing task: {} ({})", task.name, task.id);
let (success, duration_ms, error, _response) = execute_task(&task);
// Update task
{
let mut guard = state.tasks.lock().unwrap();
if let Some(t) = guard.get_mut(&task.id) {
t.last_run = Some(Utc::now().to_rfc3339());
t.run_count += 1;
save_tasks(&guard);
}
}
// Record execution
let record = ExecutionRecord {
timestamp: now.to_rfc3339(),
success,
duration_ms,
error: error.clone(),
};
append_history(&task.id, record);
if success {
println!("[schedule] Task {} completed in {}ms", task.id, duration_ms);
} else {
println!(
"[schedule] Task {} failed: {}",
task.id,
error.unwrap_or_default()
);
}
}
}
// Sleep 30 seconds
thread::sleep(StdDuration::from_secs(30));
}
}
// ============================================================================
// HTTP Handlers
// ============================================================================
fn json_response<T: Serialize>(data: T) -> Response {
Response::json(&data)
}
fn error_response(status: u16, message: &str) -> Response {
Response::json(&json!({ "error": message })).with_status_code(status)
}
fn health() -> Response {
json_response(json!({
"status": "ok",
"service": "schedule"
}))
}
fn list_schedules(state: &AppState) -> Response {
let guard = state.tasks.lock().unwrap();
let mut tasks: Vec<Value> = guard
.values()
.map(|t| {
let next_run = if t.enabled {
get_next_run(t).map(|dt| dt.to_rfc3339())
} else {
None
};
json!({
"id": t.id,
"name": t.name,
"cron": t.cron,
"instruction": t.instruction,
"enabled": t.enabled,
"created_at": t.created_at,
"last_run": t.last_run,
"next_run": next_run,
"run_count": t.run_count
})
})
.collect();
// Sort by name
tasks.sort_by(|a, b| {
let na = a.get("name").and_then(|v| v.as_str()).unwrap_or("");
let nb = b.get("name").and_then(|v| v.as_str()).unwrap_or("");
na.cmp(nb)
});
json_response(json!({ "schedules": tasks }))
}
fn create_schedule(request: &Request, state: &AppState) -> Response {
let body: CreateSchedule = match rouille::input::json_input(request) {
Ok(b) => b,
Err(e) => return error_response(400, &format!("Invalid JSON: {}", e)),
};
// Validate cron expression
if let Err(e) = validate_cron(&body.cron) {
return error_response(400, &e);
}
let task_id = Uuid::new_v4().to_string()[..8].to_string();
let now = Utc::now().to_rfc3339();
let task = ScheduledTask {
id: task_id.clone(),
name: body.name,
cron: body.cron,
instruction: body.instruction,
enabled: body.enabled,
created_at: now,
last_run: None,
run_count: 0,
};
{
let mut guard = state.tasks.lock().unwrap();
guard.insert(task_id.clone(), task.clone());
save_tasks(&guard);
}
let next_run = if task.enabled {
get_next_run(&task).map(|dt| dt.to_rfc3339())
} else {
None
};
Response::json(&json!({
"id": task.id,
"name": task.name,
"cron": task.cron,
"instruction": task.instruction,
"enabled": task.enabled,
"created_at": task.created_at,
"last_run": task.last_run,
"next_run": next_run,
"run_count": task.run_count
}))
.with_status_code(201)
}
fn get_schedule(id: &str, state: &AppState) -> Response {
let guard = state.tasks.lock().unwrap();
match guard.get(id) {
Some(task) => {
let next_run = if task.enabled {
get_next_run(task).map(|dt| dt.to_rfc3339())
} else {
None
};
json_response(json!({
"id": task.id,
"name": task.name,
"cron": task.cron,
"instruction": task.instruction,
"enabled": task.enabled,
"created_at": task.created_at,
"last_run": task.last_run,
"next_run": next_run,
"run_count": task.run_count
}))
}
None => error_response(404, "Schedule not found"),
}
}
fn update_schedule(id: &str, request: &Request, state: &AppState) -> Response {
let body: UpdateSchedule = match rouille::input::json_input(request) {
Ok(b) => b,
Err(e) => return error_response(400, &format!("Invalid JSON: {}", e)),
};
// Validate cron if provided
if let Some(ref cron) = body.cron {
if let Err(e) = validate_cron(cron) {
return error_response(400, &e);
}
}
let mut guard = state.tasks.lock().unwrap();
match guard.get_mut(id) {
Some(task) => {
if let Some(name) = body.name {
task.name = name;
}
if let Some(cron) = body.cron {
task.cron = cron;
}
if let Some(instruction) = body.instruction {
task.instruction = instruction;
}
if let Some(enabled) = body.enabled {
task.enabled = enabled;
}
// Clone task data before releasing mutable borrow
let task_clone = task.clone();
let next_run = if task_clone.enabled {
get_next_run(&task_clone).map(|dt| dt.to_rfc3339())
} else {
None
};
save_tasks(&guard);
json_response(json!({
"id": task_clone.id,
"name": task_clone.name,
"cron": task_clone.cron,
"instruction": task_clone.instruction,
"enabled": task_clone.enabled,
"created_at": task_clone.created_at,
"last_run": task_clone.last_run,
"next_run": next_run,
"run_count": task_clone.run_count
}))
}
None => error_response(404, "Schedule not found"),
}
}
fn delete_schedule(id: &str, state: &AppState) -> Response {
let mut guard = state.tasks.lock().unwrap();
if guard.remove(id).is_some() {
save_tasks(&guard);
json_response(json!({ "status": "deleted", "id": id }))
} else {
error_response(404, "Schedule not found")
}
}
fn run_schedule(id: &str, state: &AppState) -> Response {
let task = {
let guard = state.tasks.lock().unwrap();
guard.get(id).cloned()
};
match task {
Some(task) => {
println!("[schedule] Manual execution: {} ({})", task.name, task.id);
let (success, duration_ms, error, response) = execute_task(&task);
// Update task
{
let mut guard = state.tasks.lock().unwrap();
if let Some(t) = guard.get_mut(id) {
t.last_run = Some(Utc::now().to_rfc3339());
t.run_count += 1;
save_tasks(&guard);
}
}
// Record execution
let record = ExecutionRecord {
timestamp: Utc::now().to_rfc3339(),
success,
duration_ms,
error: error.clone(),
};
append_history(id, record);
json_response(json!({
"status": if success { "executed" } else { "failed" },
"id": id,
"success": success,
"duration_ms": duration_ms,
"error": error,
"result": response
}))
}
None => error_response(404, "Schedule not found"),
}
}
fn get_history(id: &str, request: &Request, state: &AppState) -> Response {
// Check task exists
{
let guard = state.tasks.lock().unwrap();
if !guard.contains_key(id) {
return error_response(404, "Schedule not found");
}
}
let limit: usize = request
.get_param("limit")
.and_then(|s| s.parse().ok())
.unwrap_or(20);
let history = load_history(id);
let history: Vec<&ExecutionRecord> = history.iter().rev().take(limit).collect();
json_response(json!({
"id": id,
"history": history
}))
}
// ============================================================================
// Router
// ============================================================================
fn handle_request(request: &Request, state: &AppState) -> Response {
// Handle CORS preflight
if request.method() == "OPTIONS" {
return Response::empty_204();
}
router!(request,
(GET) ["/health"] => { health() },
(GET) ["/schedules"] => { list_schedules(state) },
(POST) ["/schedules"] => { create_schedule(request, state) },
(GET) ["/schedules/{id}", id: String] => { get_schedule(&id, state) },
(PATCH) ["/schedules/{id}", id: String] => { update_schedule(&id, request, state) },
(DELETE) ["/schedules/{id}", id: String] => { delete_schedule(&id, state) },
(POST) ["/schedules/{id}/run", id: String] => { run_schedule(&id, state) },
(GET) ["/schedules/{id}/history", id: String] => { get_history(&id, request, state) },
_ => Response::empty_404()
)
}
fn main() {
println!("Egregore Schedule Service starting on {}", BIND_ADDR);
println!("Data directory: {}", DATA_DIR);
// Ensure directories exist
let _ = fs::create_dir_all(DATA_DIR);
let _ = fs::create_dir_all(history_dir());
// Load tasks
let tasks = load_tasks();
println!("[schedule] Loaded {} tasks", tasks.len());
let state = Arc::new(AppState {
tasks: Mutex::new(tasks),
});
// Spawn background scheduler thread
let scheduler_state = Arc::clone(&state);
thread::spawn(move || {
run_scheduler(scheduler_state);
});
// Start HTTP server
let http_state = Arc::clone(&state);
rouille::start_server(BIND_ADDR, move |request| {
let response = handle_request(request, &http_state);
// Add CORS headers
response
.with_additional_header("Access-Control-Allow-Origin", "*")
.with_additional_header(
"Access-Control-Allow-Methods",
"GET, POST, PATCH, DELETE, OPTIONS",
)
.with_additional_header("Access-Control-Allow-Headers", "Content-Type")
});
}