Test lần 1 oke
Some checks failed
Test / test (push) Has been cancelled

This commit is contained in:
2026-06-13 13:49:42 +07:00
parent 695a942a5d
commit d6f22132ce
3 changed files with 83 additions and 52 deletions

View File

@@ -52,7 +52,7 @@ MissionQueue::~MissionQueue()
void MissionQueue::load()
{
std::lock_guard<std::mutex> lock(mu_);
std::lock_guard<std::recursive_mutex> lock(mu_);
queue_ = nlohmann::json::array();
runner_ = nlohmann::json::object();
if (!std::filesystem::exists(queue_path_))
@@ -97,6 +97,12 @@ void MissionQueue::ensureRunnerDefaults()
runner_["paused"] = false;
}
void MissionQueue::saveLocked() const
{
std::lock_guard<std::recursive_mutex> lock(mu_);
saveUnlocked();
}
void MissionQueue::startWorkerIfNeeded()
{
if (worker_.joinable())
@@ -106,13 +112,13 @@ void MissionQueue::startWorkerIfNeeded()
nlohmann::json MissionQueue::list() const
{
std::lock_guard<std::mutex> lock(mu_);
std::lock_guard<std::recursive_mutex> lock(mu_);
return queue_;
}
nlohmann::json MissionQueue::runnerStatus() const
{
std::lock_guard<std::mutex> lock(mu_);
std::lock_guard<std::recursive_mutex> lock(mu_);
return runner_;
}
@@ -147,7 +153,7 @@ std::optional<nlohmann::json> MissionQueue::enqueue(const nlohmann::json& payloa
entry["log"] = nlohmann::json::array();
{
std::lock_guard<std::mutex> lock(mu_);
std::lock_guard<std::recursive_mutex> lock(mu_);
insertByPriorityUnlocked(entry);
saveUnlocked();
}
@@ -181,7 +187,7 @@ void MissionQueue::insertByPriorityUnlocked(nlohmann::json& entry)
bool MissionQueue::removeById(const std::string& id, std::string& err)
{
std::lock_guard<std::mutex> lock(mu_);
std::lock_guard<std::recursive_mutex> lock(mu_);
if (!queue_.is_array())
{
err = "queue unavailable";
@@ -218,7 +224,7 @@ bool MissionQueue::removeById(const std::string& id, std::string& err)
bool MissionQueue::clearAll(std::string& err)
{
(void)err;
std::lock_guard<std::mutex> lock(mu_);
std::lock_guard<std::recursive_mutex> lock(mu_);
nlohmann::json next = nlohmann::json::array();
for (const auto& item : queue_)
{
@@ -241,7 +247,7 @@ bool MissionQueue::reorder(const nlohmann::json& ordered_ids, std::string& err)
return false;
}
std::lock_guard<std::mutex> lock(mu_);
std::lock_guard<std::recursive_mutex> lock(mu_);
if (!queue_.is_array())
{
err = "queue unavailable";
@@ -285,7 +291,7 @@ bool MissionQueue::reorder(const nlohmann::json& ordered_ids, std::string& err)
bool MissionQueue::pause(std::string& err)
{
std::lock_guard<std::mutex> lock(mu_);
std::lock_guard<std::recursive_mutex> lock(mu_);
const std::string state = runner_.value("state", "idle");
if (state != "running")
{
@@ -306,7 +312,7 @@ bool MissionQueue::resume(std::string& err)
(void)err;
paused_ = false;
{
std::lock_guard<std::mutex> lock(mu_);
std::lock_guard<std::recursive_mutex> lock(mu_);
runner_["paused"] = false;
if (runner_.value("state", "") == "paused")
{
@@ -324,46 +330,65 @@ void MissionQueue::workerLoop()
{
while (!stop_)
{
nlohmann::json working;
bool run = false;
{
std::lock_guard<std::mutex> lock(mu_);
processQueueUnlocked();
std::lock_guard<std::recursive_mutex> lock(mu_);
if (!paused_ && queue_.is_array())
{
for (auto& item : queue_)
{
if (!item.is_object())
continue;
if (item.value("status", "") != "pending")
continue;
item["status"] = "executing";
item["started_at"] = IdUtil::nowIso8601();
runner_["current_queue_id"] = item.value("id", "");
runner_["current_action"] = nullptr;
setRunnerState("running", "Đang chạy: " + item.value("mission_name", "Mission"));
saveUnlocked();
working = item;
run = true;
break;
}
}
if (!run && runner_.value("state", "") == "running")
{
setRunnerState("idle", "Queue trống");
saveUnlocked();
}
}
if (run)
{
runMissionActions(working);
{
std::lock_guard<std::recursive_mutex> lock(mu_);
const std::string id = working.value("id", "");
for (auto& item : queue_)
{
if (item.is_object() && item.value("id", "") == id)
{
item = working;
break;
}
}
runner_["current_queue_id"] = nullptr;
runner_["current_action"] = nullptr;
saveUnlocked();
}
wake_ = true;
}
for (int i = 0; i < 20 && !wake_; ++i)
std::this_thread::sleep_for(std::chrono::milliseconds(100));
wake_ = false;
}
}
void MissionQueue::processQueueUnlocked()
void MissionQueue::runMissionActions(nlohmann::json& entry)
{
if (!queue_.is_array())
return;
if (paused_)
return;
for (auto& item : queue_)
{
if (!item.is_object())
continue;
if (item.value("status", "") != "pending")
continue;
executeMissionUnlocked(item);
return;
}
if (runner_.value("state", "") == "running")
setRunnerState("idle", "Queue trống");
}
void MissionQueue::executeMissionUnlocked(nlohmann::json& entry)
{
entry["status"] = "executing";
entry["started_at"] = IdUtil::nowIso8601();
runner_["current_queue_id"] = entry.value("id", "");
runner_["current_action"] = nullptr;
setRunnerState("running", "Đang chạy: " + entry.value("mission_name", "Mission"));
saveUnlocked();
try
{
nlohmann::json log = nlohmann::json::array();
@@ -375,19 +400,22 @@ void MissionQueue::executeMissionUnlocked(nlohmann::json& entry)
entry["log"] = log;
entry["status"] = "completed";
entry["finished_at"] = IdUtil::nowIso8601();
setRunnerState("idle", "Hoàn thành: " + entry.value("mission_name", "Mission"));
{
std::lock_guard<std::recursive_mutex> lock(mu_);
setRunnerState("idle", "Hoàn thành: " + entry.value("mission_name", "Mission"));
saveUnlocked();
}
}
catch (...)
{
entry["status"] = "failed";
entry["finished_at"] = IdUtil::nowIso8601();
setRunnerState("error", "Lỗi khi chạy: " + entry.value("mission_name", "Mission"));
{
std::lock_guard<std::recursive_mutex> lock(mu_);
setRunnerState("error", "Lỗi khi chạy: " + entry.value("mission_name", "Mission"));
saveUnlocked();
}
}
runner_["current_queue_id"] = nullptr;
runner_["current_action"] = nullptr;
saveUnlocked();
wake_ = true;
}
void MissionQueue::executeActionsUnlocked(const nlohmann::json& actions,
@@ -414,8 +442,11 @@ void MissionQueue::executeActionsUnlocked(const nlohmann::json& actions,
const auto& params = action.contains("params") && action["params"].is_object() ? action["params"]
: nlohmann::json::object();
runner_["current_action"] = label;
saveUnlocked();
{
std::lock_guard<std::recursive_mutex> lock(mu_);
runner_["current_action"] = label;
saveUnlocked();
}
if (kind == "mission")
{

View File

@@ -32,7 +32,7 @@ public:
private:
std::filesystem::path queue_path_;
mutable std::mutex mu_;
mutable std::recursive_mutex mu_;
nlohmann::json queue_;
nlohmann::json runner_;
@@ -46,12 +46,12 @@ private:
void ensureRunnerDefaults();
void startWorkerIfNeeded();
void workerLoop();
void processQueueUnlocked();
void executeMissionUnlocked(nlohmann::json& entry);
void runMissionActions(nlohmann::json& entry);
void executeActionsUnlocked(const nlohmann::json& actions,
const nlohmann::json& parameters,
nlohmann::json& log,
int loop_depth);
void saveLocked() const;
void sleepMs(int ms);
void setRunnerState(const std::string& state, const std::string& message = "");
void insertByPriorityUnlocked(nlohmann::json& entry);