#include "mission/mission_scheduler.hpp" #include "util/id_util.hpp" #include namespace lm { MissionScheduler::MissionScheduler(MissionStore& store, EnqueueFn enqueue_fn) : store_(store), enqueue_fn_(std::move(enqueue_fn)) { worker_ = std::thread([this] { workerLoop(); }); } MissionScheduler::~MissionScheduler() { stop_ = true; if (worker_.joinable()) worker_.join(); } bool MissionScheduler::queueSchedule(const nlohmann::json& schedule, std::string& err) { nlohmann::json req = {{"mission_id", schedule.value("mission_id", "")}, {"priority", schedule.value("priority", 0)}, {"robot_id", schedule.value("robot_id", "default")}, {"source", "fleet:" + schedule.value("name", "schedule")}}; if (!enqueue_fn_(req, err)) return false; markQueued(schedule.value("id", "")); return true; } void MissionScheduler::markQueued(const std::string& id) { if (id.empty()) return; nlohmann::json patch = {{"last_queued_at", IdUtil::nowIso8601()}}; std::string err; store_.updateSchedule(id, patch, err); } bool MissionScheduler::runScheduleNow(const std::string& id, std::string& err) { const auto schedule = store_.findSchedule(id); if (!schedule) { err = "schedule not found"; return false; } if (!schedule->value("enabled", true)) { err = "schedule is disabled"; return false; } return queueSchedule(*schedule, err); } void MissionScheduler::workerLoop() { while (!stop_) { const auto schedules = store_.listSchedules(); const std::string now = IdUtil::nowIso8601(); for (const auto& schedule : schedules) { if (!schedule.is_object() || !schedule.value("enabled", true)) continue; const std::string mode = schedule.value("start_mode", "asap"); if (mode == "asap") { if (schedule.contains("last_queued_at") && !schedule["last_queued_at"].is_null()) continue; std::string err; queueSchedule(schedule, err); continue; } if (mode == "scheduled") { if (!schedule.contains("start_at") || schedule["start_at"].is_null()) continue; const std::string start_at = schedule["start_at"].get(); if (start_at > now) continue; if (schedule.contains("last_queued_at") && !schedule["last_queued_at"].is_null()) continue; std::string err; queueSchedule(schedule, err); } } for (int i = 0; i < 20 && !stop_; ++i) std::this_thread::sleep_for(std::chrono::milliseconds(100)); } } } // namespace lm