excuting misstion from queue
This commit is contained in:
446
src/mission/mission_queue.cpp
Normal file
446
src/mission/mission_queue.cpp
Normal file
@@ -0,0 +1,446 @@
|
||||
#include "mission/mission_queue.hpp"
|
||||
|
||||
#include "util/file_util.hpp"
|
||||
#include "util/id_util.hpp"
|
||||
|
||||
#include <algorithm>
|
||||
#include <chrono>
|
||||
#include <stdexcept>
|
||||
#include <thread>
|
||||
#include <unordered_set>
|
||||
|
||||
namespace lm {
|
||||
|
||||
namespace {
|
||||
|
||||
std::string paramValue(const std::string& action_id,
|
||||
const nlohmann::json& params,
|
||||
const std::string& key,
|
||||
const nlohmann::json& parameters)
|
||||
{
|
||||
const std::string lookup = action_id + ":" + key;
|
||||
if (parameters.is_object() && parameters.contains(lookup))
|
||||
return parameters[lookup].get<std::string>();
|
||||
if (params.contains(key) && params[key].is_string())
|
||||
return params[key].get<std::string>();
|
||||
return "";
|
||||
}
|
||||
|
||||
double paramNumber(const nlohmann::json& params, const std::string& key, double fallback)
|
||||
{
|
||||
if (params.contains(key) && params[key].is_number())
|
||||
return params[key].get<double>();
|
||||
return fallback;
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
MissionQueue::MissionQueue(std::filesystem::path queue_path) : queue_path_(std::move(queue_path))
|
||||
{
|
||||
load();
|
||||
ensureRunnerDefaults();
|
||||
startWorkerIfNeeded();
|
||||
}
|
||||
|
||||
MissionQueue::~MissionQueue()
|
||||
{
|
||||
stop_ = true;
|
||||
wake_ = true;
|
||||
if (worker_.joinable())
|
||||
worker_.join();
|
||||
}
|
||||
|
||||
void MissionQueue::load()
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(mu_);
|
||||
queue_ = nlohmann::json::array();
|
||||
runner_ = nlohmann::json::object();
|
||||
if (!std::filesystem::exists(queue_path_))
|
||||
return;
|
||||
try
|
||||
{
|
||||
const auto parsed = nlohmann::json::parse(FileUtil::readBinary(queue_path_));
|
||||
if (parsed.is_object())
|
||||
{
|
||||
if (parsed.contains("queue") && parsed["queue"].is_array())
|
||||
queue_ = parsed["queue"];
|
||||
if (parsed.contains("runner") && parsed["runner"].is_object())
|
||||
runner_ = parsed["runner"];
|
||||
}
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
queue_ = nlohmann::json::array();
|
||||
}
|
||||
ensureRunnerDefaults();
|
||||
}
|
||||
|
||||
void MissionQueue::saveUnlocked() const
|
||||
{
|
||||
const nlohmann::json payload = {{"queue", queue_}, {"runner", runner_}};
|
||||
FileUtil::writeBinaryAtomic(queue_path_, payload.dump(2));
|
||||
}
|
||||
|
||||
void MissionQueue::ensureRunnerDefaults()
|
||||
{
|
||||
if (!runner_.is_object())
|
||||
runner_ = nlohmann::json::object();
|
||||
if (!runner_.contains("state"))
|
||||
runner_["state"] = "idle";
|
||||
if (!runner_.contains("message"))
|
||||
runner_["message"] = "";
|
||||
if (!runner_.contains("current_queue_id"))
|
||||
runner_["current_queue_id"] = nullptr;
|
||||
if (!runner_.contains("current_action"))
|
||||
runner_["current_action"] = nullptr;
|
||||
}
|
||||
|
||||
void MissionQueue::startWorkerIfNeeded()
|
||||
{
|
||||
if (worker_.joinable())
|
||||
return;
|
||||
worker_ = std::thread([this] { workerLoop(); });
|
||||
}
|
||||
|
||||
nlohmann::json MissionQueue::list() const
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(mu_);
|
||||
return queue_;
|
||||
}
|
||||
|
||||
nlohmann::json MissionQueue::runnerStatus() const
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(mu_);
|
||||
return runner_;
|
||||
}
|
||||
|
||||
std::optional<nlohmann::json> MissionQueue::enqueue(const nlohmann::json& payload, std::string& err)
|
||||
{
|
||||
if (!payload.is_object())
|
||||
{
|
||||
err = "payload must be an object";
|
||||
return std::nullopt;
|
||||
}
|
||||
if (!payload.contains("mission") || !payload["mission"].is_object())
|
||||
{
|
||||
err = "mission is required";
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
nlohmann::json entry = nlohmann::json::object();
|
||||
entry["id"] = IdUtil::newId();
|
||||
entry["mission_id"] = payload["mission"].value("id", "");
|
||||
entry["mission_name"] = payload["mission"].value("name", "Mission");
|
||||
entry["mission_group"] = payload["mission"].value("group", "Missions");
|
||||
entry["mission"] = payload["mission"];
|
||||
entry["parameters"] = payload.contains("parameters") && payload["parameters"].is_object() ? payload["parameters"]
|
||||
: nlohmann::json::object();
|
||||
entry["status"] = "pending";
|
||||
entry["created_at"] = IdUtil::nowIso8601();
|
||||
entry["started_at"] = nullptr;
|
||||
entry["finished_at"] = nullptr;
|
||||
entry["log"] = nlohmann::json::array();
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(mu_);
|
||||
queue_.push_back(entry);
|
||||
saveUnlocked();
|
||||
}
|
||||
|
||||
wake_ = true;
|
||||
return entry;
|
||||
}
|
||||
|
||||
bool MissionQueue::removeById(const std::string& id, std::string& err)
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(mu_);
|
||||
if (!queue_.is_array())
|
||||
{
|
||||
err = "queue unavailable";
|
||||
return false;
|
||||
}
|
||||
const auto before = queue_.size();
|
||||
nlohmann::json next = nlohmann::json::array();
|
||||
for (const auto& item : queue_)
|
||||
{
|
||||
if (!item.is_object())
|
||||
continue;
|
||||
if (item.value("id", "") == id)
|
||||
{
|
||||
const std::string status = item.value("status", "");
|
||||
if (status == "executing")
|
||||
{
|
||||
err = "cannot remove executing mission";
|
||||
return false;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
next.push_back(item);
|
||||
}
|
||||
if (next.size() == before)
|
||||
{
|
||||
err = "queue item not found";
|
||||
return false;
|
||||
}
|
||||
queue_ = std::move(next);
|
||||
saveUnlocked();
|
||||
return true;
|
||||
}
|
||||
|
||||
bool MissionQueue::clearAll(std::string& err)
|
||||
{
|
||||
(void)err;
|
||||
std::lock_guard<std::mutex> lock(mu_);
|
||||
nlohmann::json next = nlohmann::json::array();
|
||||
for (const auto& item : queue_)
|
||||
{
|
||||
if (!item.is_object())
|
||||
continue;
|
||||
if (item.value("status", "") == "executing")
|
||||
next.push_back(item);
|
||||
}
|
||||
queue_ = std::move(next);
|
||||
setRunnerState(next.empty() ? "idle" : "running", next.empty() ? "" : "Đang thực thi mission");
|
||||
saveUnlocked();
|
||||
return true;
|
||||
}
|
||||
|
||||
bool MissionQueue::reorder(const nlohmann::json& ordered_ids, std::string& err)
|
||||
{
|
||||
if (!ordered_ids.is_array())
|
||||
{
|
||||
err = "ordered_ids must be an array";
|
||||
return false;
|
||||
}
|
||||
|
||||
std::lock_guard<std::mutex> lock(mu_);
|
||||
if (!queue_.is_array())
|
||||
{
|
||||
err = "queue unavailable";
|
||||
return false;
|
||||
}
|
||||
|
||||
nlohmann::json by_id = nlohmann::json::object();
|
||||
for (auto& item : queue_)
|
||||
{
|
||||
if (item.is_object())
|
||||
by_id[item.value("id", "")] = item;
|
||||
}
|
||||
|
||||
nlohmann::json next = nlohmann::json::array();
|
||||
std::unordered_set<std::string> seen;
|
||||
for (const auto& id_json : ordered_ids)
|
||||
{
|
||||
if (!id_json.is_string())
|
||||
continue;
|
||||
const std::string id = id_json.get<std::string>();
|
||||
if (!by_id.contains(id))
|
||||
continue;
|
||||
next.push_back(by_id[id]);
|
||||
seen.insert(id);
|
||||
}
|
||||
|
||||
for (const auto& item : queue_)
|
||||
{
|
||||
if (!item.is_object())
|
||||
continue;
|
||||
const std::string id = item.value("id", "");
|
||||
if (seen.count(id))
|
||||
continue;
|
||||
next.push_back(item);
|
||||
}
|
||||
|
||||
queue_ = std::move(next);
|
||||
saveUnlocked();
|
||||
return true;
|
||||
}
|
||||
|
||||
void MissionQueue::workerLoop()
|
||||
{
|
||||
while (!stop_)
|
||||
{
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(mu_);
|
||||
processQueueUnlocked();
|
||||
}
|
||||
for (int i = 0; i < 20 && !wake_; ++i)
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
wake_ = false;
|
||||
}
|
||||
}
|
||||
|
||||
void MissionQueue::processQueueUnlocked()
|
||||
{
|
||||
if (!queue_.is_array())
|
||||
return;
|
||||
|
||||
for (auto& item : queue_)
|
||||
{
|
||||
if (!item.is_object())
|
||||
continue;
|
||||
if (item.value("status", "") != "pending")
|
||||
continue;
|
||||
executeMissionUnlocked(item);
|
||||
return;
|
||||
}
|
||||
|
||||
if (runner_.value("state", "") == "running")
|
||||
setRunnerState("idle", "Queue trống");
|
||||
}
|
||||
|
||||
void MissionQueue::executeMissionUnlocked(nlohmann::json& entry)
|
||||
{
|
||||
entry["status"] = "executing";
|
||||
entry["started_at"] = IdUtil::nowIso8601();
|
||||
runner_["current_queue_id"] = entry.value("id", "");
|
||||
runner_["current_action"] = nullptr;
|
||||
setRunnerState("running", "Đang chạy: " + entry.value("mission_name", "Mission"));
|
||||
saveUnlocked();
|
||||
|
||||
try
|
||||
{
|
||||
nlohmann::json log = nlohmann::json::array();
|
||||
const auto& mission = entry["mission"];
|
||||
const auto& parameters = entry["parameters"];
|
||||
const auto& actions =
|
||||
mission.contains("actions") && mission["actions"].is_array() ? mission["actions"] : nlohmann::json::array();
|
||||
executeActionsUnlocked(actions, parameters, log, 0);
|
||||
entry["log"] = log;
|
||||
entry["status"] = "completed";
|
||||
entry["finished_at"] = IdUtil::nowIso8601();
|
||||
setRunnerState("idle", "Hoàn thành: " + entry.value("mission_name", "Mission"));
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
entry["status"] = "failed";
|
||||
entry["finished_at"] = IdUtil::nowIso8601();
|
||||
setRunnerState("error", "Lỗi khi chạy: " + entry.value("mission_name", "Mission"));
|
||||
}
|
||||
|
||||
runner_["current_queue_id"] = nullptr;
|
||||
runner_["current_action"] = nullptr;
|
||||
saveUnlocked();
|
||||
wake_ = true;
|
||||
}
|
||||
|
||||
void MissionQueue::executeActionsUnlocked(const nlohmann::json& actions,
|
||||
const nlohmann::json& parameters,
|
||||
nlohmann::json& log,
|
||||
int loop_depth)
|
||||
{
|
||||
if (loop_depth > 8)
|
||||
throw std::runtime_error("loop depth exceeded");
|
||||
|
||||
for (const auto& action : actions)
|
||||
{
|
||||
if (!action.is_object())
|
||||
continue;
|
||||
|
||||
const std::string action_id = action.value("id", "");
|
||||
const std::string kind = action.value("kind", "action");
|
||||
const std::string type = action.value("type", "");
|
||||
const std::string label = action.value("label", type);
|
||||
const auto& params = action.contains("params") && action["params"].is_object() ? action["params"]
|
||||
: nlohmann::json::object();
|
||||
|
||||
runner_["current_action"] = label;
|
||||
saveUnlocked();
|
||||
|
||||
if (kind == "mission")
|
||||
{
|
||||
const std::string ref_id = action.value("refId", "");
|
||||
nlohmann::json ref_mission = nlohmann::json::object();
|
||||
// Nested mission snapshot should be resolved by frontend before queueing.
|
||||
(void)ref_id;
|
||||
log.push_back({{"ts", IdUtil::nowIso8601()}, {"level", "info"}, {"message", "Sub-mission: " + label}});
|
||||
if (action.contains("resolved_mission") && action["resolved_mission"].is_object())
|
||||
{
|
||||
const auto& nested_actions = action["resolved_mission"]["actions"];
|
||||
executeActionsUnlocked(nested_actions, parameters, log, loop_depth);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
if (type == "loop")
|
||||
{
|
||||
const std::string mode = params.value("mode", "count");
|
||||
const int count = static_cast<int>(paramNumber(params, "count", 1));
|
||||
const auto& children =
|
||||
action.contains("children") && action["children"].is_array() ? action["children"] : nlohmann::json::array();
|
||||
const int iterations = mode == "endless" ? 1 : std::max(1, count);
|
||||
for (int i = 0; i < iterations; ++i)
|
||||
{
|
||||
log.push_back({{"ts", IdUtil::nowIso8601()},
|
||||
{"level", "info"},
|
||||
{"message", "Loop " + std::to_string(i + 1) + "/" + std::to_string(iterations)}});
|
||||
executeActionsUnlocked(children, parameters, log, loop_depth + 1);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
if (type == "wait")
|
||||
{
|
||||
const int ms = static_cast<int>(paramNumber(params, "seconds", 1) * 1000);
|
||||
log.push_back({{"ts", IdUtil::nowIso8601()}, {"level", "info"}, {"message", "Wait " + std::to_string(ms) + "ms"}});
|
||||
sleepMs(ms);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (type == "move_to_position" || type == "adjust_localization" || type == "pick_cart" || type == "drop_cart")
|
||||
{
|
||||
const std::string pos = paramValue(action_id, params, "position", parameters);
|
||||
log.push_back({{"ts", IdUtil::nowIso8601()},
|
||||
{"level", "info"},
|
||||
{"message", label + " → " + (pos.empty() ? "?" : pos)}});
|
||||
sleepMs(1200);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (type == "move_to_marker")
|
||||
{
|
||||
const std::string marker = paramValue(action_id, params, "marker", parameters);
|
||||
log.push_back({{"ts", IdUtil::nowIso8601()},
|
||||
{"level", "info"},
|
||||
{"message", label + " → " + (marker.empty() ? "?" : marker)}});
|
||||
sleepMs(1200);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (type == "user_log")
|
||||
{
|
||||
const std::string message = params.value("message", "Mission step");
|
||||
log.push_back({{"ts", IdUtil::nowIso8601()}, {"level", "user"}, {"message", message}});
|
||||
sleepMs(200);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (type == "pause")
|
||||
{
|
||||
log.push_back({{"ts", IdUtil::nowIso8601()}, {"level", "warn"}, {"message", "Pause (simulated)"}});
|
||||
sleepMs(500);
|
||||
continue;
|
||||
}
|
||||
|
||||
log.push_back(
|
||||
{{"ts", IdUtil::nowIso8601()}, {"level", "info"}, {"message", label + " (" + type + ") simulated"}});
|
||||
sleepMs(400);
|
||||
}
|
||||
}
|
||||
|
||||
void MissionQueue::sleepMs(int ms)
|
||||
{
|
||||
if (ms <= 0)
|
||||
return;
|
||||
const int step = 100;
|
||||
for (int elapsed = 0; elapsed < ms && !stop_; elapsed += step)
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(std::min(step, ms - elapsed)));
|
||||
}
|
||||
|
||||
void MissionQueue::setRunnerState(const std::string& state, const std::string& message)
|
||||
{
|
||||
runner_["state"] = state;
|
||||
runner_["message"] = message;
|
||||
runner_["updated_at"] = IdUtil::nowIso8601();
|
||||
}
|
||||
|
||||
} // namespace lm
|
||||
Reference in New Issue
Block a user