diff --git a/src/mission/mission_queue.cpp b/src/mission/mission_queue.cpp index 740a763..3d09591 100644 --- a/src/mission/mission_queue.cpp +++ b/src/mission/mission_queue.cpp @@ -52,7 +52,7 @@ MissionQueue::~MissionQueue() void MissionQueue::load() { - std::lock_guard lock(mu_); + std::lock_guard lock(mu_); queue_ = nlohmann::json::array(); runner_ = nlohmann::json::object(); if (!std::filesystem::exists(queue_path_)) @@ -97,6 +97,12 @@ void MissionQueue::ensureRunnerDefaults() runner_["paused"] = false; } +void MissionQueue::saveLocked() const +{ + std::lock_guard lock(mu_); + saveUnlocked(); +} + void MissionQueue::startWorkerIfNeeded() { if (worker_.joinable()) @@ -106,13 +112,13 @@ void MissionQueue::startWorkerIfNeeded() nlohmann::json MissionQueue::list() const { - std::lock_guard lock(mu_); + std::lock_guard lock(mu_); return queue_; } nlohmann::json MissionQueue::runnerStatus() const { - std::lock_guard lock(mu_); + std::lock_guard lock(mu_); return runner_; } @@ -147,7 +153,7 @@ std::optional MissionQueue::enqueue(const nlohmann::json& payloa entry["log"] = nlohmann::json::array(); { - std::lock_guard lock(mu_); + std::lock_guard lock(mu_); insertByPriorityUnlocked(entry); saveUnlocked(); } @@ -181,7 +187,7 @@ void MissionQueue::insertByPriorityUnlocked(nlohmann::json& entry) bool MissionQueue::removeById(const std::string& id, std::string& err) { - std::lock_guard lock(mu_); + std::lock_guard lock(mu_); if (!queue_.is_array()) { err = "queue unavailable"; @@ -218,7 +224,7 @@ bool MissionQueue::removeById(const std::string& id, std::string& err) bool MissionQueue::clearAll(std::string& err) { (void)err; - std::lock_guard lock(mu_); + std::lock_guard lock(mu_); nlohmann::json next = nlohmann::json::array(); for (const auto& item : queue_) { @@ -241,7 +247,7 @@ bool MissionQueue::reorder(const nlohmann::json& ordered_ids, std::string& err) return false; } - std::lock_guard lock(mu_); + std::lock_guard lock(mu_); if (!queue_.is_array()) { err = "queue unavailable"; @@ -285,7 +291,7 @@ bool MissionQueue::reorder(const nlohmann::json& ordered_ids, std::string& err) bool MissionQueue::pause(std::string& err) { - std::lock_guard lock(mu_); + std::lock_guard lock(mu_); const std::string state = runner_.value("state", "idle"); if (state != "running") { @@ -306,7 +312,7 @@ bool MissionQueue::resume(std::string& err) (void)err; paused_ = false; { - std::lock_guard lock(mu_); + std::lock_guard lock(mu_); runner_["paused"] = false; if (runner_.value("state", "") == "paused") { @@ -324,46 +330,65 @@ void MissionQueue::workerLoop() { while (!stop_) { + nlohmann::json working; + bool run = false; { - std::lock_guard lock(mu_); - processQueueUnlocked(); + std::lock_guard lock(mu_); + if (!paused_ && queue_.is_array()) + { + for (auto& item : queue_) + { + if (!item.is_object()) + continue; + if (item.value("status", "") != "pending") + continue; + item["status"] = "executing"; + item["started_at"] = IdUtil::nowIso8601(); + runner_["current_queue_id"] = item.value("id", ""); + runner_["current_action"] = nullptr; + setRunnerState("running", "Đang chạy: " + item.value("mission_name", "Mission")); + saveUnlocked(); + working = item; + run = true; + break; + } + } + if (!run && runner_.value("state", "") == "running") + { + setRunnerState("idle", "Queue trống"); + saveUnlocked(); + } } + + if (run) + { + runMissionActions(working); + { + std::lock_guard lock(mu_); + const std::string id = working.value("id", ""); + for (auto& item : queue_) + { + if (item.is_object() && item.value("id", "") == id) + { + item = working; + break; + } + } + runner_["current_queue_id"] = nullptr; + runner_["current_action"] = nullptr; + saveUnlocked(); + } + wake_ = true; + } + for (int i = 0; i < 20 && !wake_; ++i) std::this_thread::sleep_for(std::chrono::milliseconds(100)); wake_ = false; } } -void MissionQueue::processQueueUnlocked() +void MissionQueue::runMissionActions(nlohmann::json& entry) { - if (!queue_.is_array()) - return; - if (paused_) - return; - - for (auto& item : queue_) - { - if (!item.is_object()) - continue; - if (item.value("status", "") != "pending") - continue; - executeMissionUnlocked(item); - return; - } - - if (runner_.value("state", "") == "running") - setRunnerState("idle", "Queue trống"); -} - -void MissionQueue::executeMissionUnlocked(nlohmann::json& entry) -{ - entry["status"] = "executing"; - entry["started_at"] = IdUtil::nowIso8601(); - runner_["current_queue_id"] = entry.value("id", ""); - runner_["current_action"] = nullptr; - setRunnerState("running", "Đang chạy: " + entry.value("mission_name", "Mission")); - saveUnlocked(); - try { nlohmann::json log = nlohmann::json::array(); @@ -375,19 +400,22 @@ void MissionQueue::executeMissionUnlocked(nlohmann::json& entry) entry["log"] = log; entry["status"] = "completed"; entry["finished_at"] = IdUtil::nowIso8601(); - setRunnerState("idle", "Hoàn thành: " + entry.value("mission_name", "Mission")); + { + std::lock_guard lock(mu_); + setRunnerState("idle", "Hoàn thành: " + entry.value("mission_name", "Mission")); + saveUnlocked(); + } } catch (...) { entry["status"] = "failed"; entry["finished_at"] = IdUtil::nowIso8601(); - setRunnerState("error", "Lỗi khi chạy: " + entry.value("mission_name", "Mission")); + { + std::lock_guard lock(mu_); + setRunnerState("error", "Lỗi khi chạy: " + entry.value("mission_name", "Mission")); + saveUnlocked(); + } } - - runner_["current_queue_id"] = nullptr; - runner_["current_action"] = nullptr; - saveUnlocked(); - wake_ = true; } void MissionQueue::executeActionsUnlocked(const nlohmann::json& actions, @@ -414,8 +442,11 @@ void MissionQueue::executeActionsUnlocked(const nlohmann::json& actions, const auto& params = action.contains("params") && action["params"].is_object() ? action["params"] : nlohmann::json::object(); - runner_["current_action"] = label; - saveUnlocked(); + { + std::lock_guard lock(mu_); + runner_["current_action"] = label; + saveUnlocked(); + } if (kind == "mission") { diff --git a/src/mission/mission_queue.hpp b/src/mission/mission_queue.hpp index e419949..79c74ba 100644 --- a/src/mission/mission_queue.hpp +++ b/src/mission/mission_queue.hpp @@ -32,7 +32,7 @@ public: private: std::filesystem::path queue_path_; - mutable std::mutex mu_; + mutable std::recursive_mutex mu_; nlohmann::json queue_; nlohmann::json runner_; @@ -46,12 +46,12 @@ private: void ensureRunnerDefaults(); void startWorkerIfNeeded(); void workerLoop(); - void processQueueUnlocked(); - void executeMissionUnlocked(nlohmann::json& entry); + void runMissionActions(nlohmann::json& entry); void executeActionsUnlocked(const nlohmann::json& actions, const nlohmann::json& parameters, nlohmann::json& log, int loop_depth); + void saveLocked() const; void sleepMs(int ms); void setRunnerState(const std::string& state, const std::string& message = ""); void insertByPriorityUnlocked(nlohmann::json& entry); diff --git a/tests/__pycache__/test_api_integration.cpython-38-pytest-8.3.5.pyc b/tests/__pycache__/test_api_integration.cpython-38-pytest-8.3.5.pyc new file mode 100644 index 0000000..03f025d Binary files /dev/null and b/tests/__pycache__/test_api_integration.cpython-38-pytest-8.3.5.pyc differ