create TESTING

This commit is contained in:
2026-06-13 13:46:53 +07:00
parent 1a8bddb037
commit 695a942a5d
15 changed files with 831 additions and 14 deletions

39
tests/fixtures/data/missions.json vendored Normal file
View File

@@ -0,0 +1,39 @@
{
"dashboard": {
"widgets": []
},
"groups": [
"Missions"
],
"missions": [
{
"actions": [
{
"id": "a1",
"kind": "action",
"label": "Wait",
"params": {
"seconds": 3
},
"type": "wait"
}
],
"description": "Fixture mission for automated tests",
"group": "Missions",
"id": "testmission00001",
"name": "Test Wait",
"updated_at": "2026-06-13T00:00:00Z"
}
],
"robots": [
{
"id": "default",
"name": "Robot test",
"online": true,
"serial": "T-001"
}
],
"schedules": [],
"triggers": [],
"version": 1
}

5
tests/fixtures/data/state.json vendored Normal file
View File

@@ -0,0 +1,5 @@
{
"active_layout_id": "",
"layouts": [],
"version": 3
}

2
tests/requirements.txt Normal file
View File

@@ -0,0 +1,2 @@
pytest>=7.0
requests>=2.28

View File

@@ -0,0 +1,145 @@
"""Integration tests for lidar_manager_web REST API."""
from __future__ import annotations
import os
import time
import pytest
import requests
BASE = os.environ.get("TEST_BASE_URL", "http://127.0.0.1:18080")
MISSION_ID = os.environ.get("TEST_MISSION_ID", "testmission00001")
TIMEOUT = 10
@pytest.fixture(scope="module")
def api():
session = requests.Session()
session.headers.update({"Content-Type": "application/json"})
deadline = time.time() + TIMEOUT
while time.time() < deadline:
try:
r = session.get(f"{BASE}/api/health", timeout=1)
if r.status_code == 200 and r.json().get("ok"):
break
except requests.RequestException:
pass
time.sleep(0.2)
else:
pytest.fail(f"Server not ready at {BASE}")
return session
def test_health(api):
r = api.get(f"{BASE}/api/health", timeout=TIMEOUT)
assert r.status_code == 200
assert r.json()["ok"] is True
def test_missions_fixture(api):
r = api.get(f"{BASE}/api/missions", timeout=TIMEOUT)
assert r.status_code == 200
ids = {m["id"] for m in r.json().get("missions", [])}
assert MISSION_ID in ids
def test_mir_v2_enqueue_and_list(api):
api.delete(f"{BASE}/api/mission_queue", timeout=TIMEOUT)
r = api.post(
f"{BASE}/api/v2.0.0/mission_queue",
json={"mission_id": MISSION_ID, "priority": 3, "robot_id": "default"},
timeout=TIMEOUT,
)
assert r.status_code == 201
body = r.json()
assert body["mission_id"] == MISSION_ID
assert body["priority"] == 3
listed = api.get(f"{BASE}/api/v2.0.0/mission_queue", timeout=TIMEOUT)
assert listed.status_code == 200
assert any(item.get("mission_id") == MISSION_ID for item in listed.json())
def test_queue_pause_continue(api):
api.delete(f"{BASE}/api/mission_queue", timeout=TIMEOUT)
api.post(
f"{BASE}/api/mission_queue",
json={"mission_id": MISSION_ID},
timeout=TIMEOUT,
)
deadline = time.time() + 5
while time.time() < deadline:
runner = api.get(f"{BASE}/api/mission_queue", timeout=TIMEOUT).json().get("runner", {})
if runner.get("state") == "running":
break
time.sleep(0.2)
else:
pytest.skip("runner did not enter running state in time")
r = api.post(f"{BASE}/api/mission_queue/pause", timeout=TIMEOUT)
assert r.status_code == 200
assert r.json().get("state") == "paused"
r = api.post(f"{BASE}/api/mission_queue/continue", timeout=TIMEOUT)
assert r.status_code == 200
assert r.json().get("state") != "paused"
def test_modbus_trigger_flow(api):
trig = api.post(
f"{BASE}/api/triggers",
json={"name": "pytest-trigger", "coil_id": 1005, "mission_id": MISSION_ID},
timeout=TIMEOUT,
)
assert trig.status_code == 201
trig_id = trig.json()["id"]
before = len(api.get(f"{BASE}/api/mission_queue", timeout=TIMEOUT).json())
fire = api.post(f"{BASE}/api/modbus/coils/1005/trigger", timeout=TIMEOUT)
assert fire.status_code == 200
after = api.get(f"{BASE}/api/mission_queue", timeout=TIMEOUT).json()
assert len(after) >= before
deleted = api.delete(f"{BASE}/api/triggers/{trig_id}", timeout=TIMEOUT)
assert deleted.status_code == 204
def test_fleet_schedule_asap(api):
r = api.post(
f"{BASE}/api/fleet/schedules",
json={
"name": "pytest-schedule",
"mission_id": MISSION_ID,
"start_mode": "asap",
"priority": 0,
"robot_id": "default",
},
timeout=TIMEOUT,
)
assert r.status_code == 201
sched_id = r.json()["id"]
deleted = api.delete(f"{BASE}/api/fleet/schedules/{sched_id}", timeout=TIMEOUT)
assert deleted.status_code == 204
def test_lidar_crud(api):
created = api.post(
f"{BASE}/api/lidars",
json={"name": "pytest-lidar", "ip": "10.99.0.1", "port": 2112},
timeout=TIMEOUT,
)
assert created.status_code == 201
lidar_id = created.json()["id"]
updated = api.put(
f"{BASE}/api/lidars/{lidar_id}",
json={"name": "pytest-lidar", "ip": "10.99.0.2", "port": 2112},
timeout=TIMEOUT,
)
assert updated.status_code == 200
deleted = api.delete(f"{BASE}/api/lidars/{lidar_id}", timeout=TIMEOUT)
assert deleted.status_code == 204

View File

@@ -0,0 +1,71 @@
#include "mission/mission_enqueue.hpp"
#include "mission/mission_store.hpp"
#include <gtest/gtest.h>
#include <filesystem>
namespace {
std::filesystem::path fixtureMissionsPath()
{
return std::filesystem::path(TEST_FIXTURE_DIR) / "missions.json";
}
lm::MissionStore makeStore()
{
const auto dir = std::filesystem::temp_directory_path() / "lm_test_enqueue";
std::filesystem::create_directories(dir);
const auto path = dir / "missions.json";
std::filesystem::copy_file(fixtureMissionsPath(), path, std::filesystem::copy_options::overwrite_existing);
return lm::MissionStore(path);
}
} // namespace
TEST(MissionEnqueue, NormalizeParametersFromMirArray)
{
const nlohmann::json params = nlohmann::json::array({{{"id", "pos"}, {"value", "A1"}},
{{"key", "speed"}, {"value", 0.5}}});
const nlohmann::json out = lm::MissionEnqueue::normalizeParameters(params);
EXPECT_TRUE(out.is_object());
EXPECT_EQ(out["pos"], "A1");
EXPECT_EQ(out["speed"], 0.5);
}
TEST(MissionEnqueue, NormalizeParametersObjectPassthrough)
{
const nlohmann::json params = {{"x", 1}};
const nlohmann::json out = lm::MissionEnqueue::normalizeParameters(params);
EXPECT_EQ(out, params);
}
TEST(MissionEnqueue, BuildPayloadFromMissionId)
{
lm::MissionStore store = makeStore();
nlohmann::json payload;
std::string err;
const nlohmann::json request = {{"mission_id", "testmission00001"}, {"priority", 3}, {"robot_id", "default"}};
ASSERT_TRUE(lm::MissionEnqueue::buildPayload(store, request, payload, err)) << err;
EXPECT_EQ(payload["mission"]["id"], "testmission00001");
EXPECT_EQ(payload["priority"], 3);
EXPECT_EQ(payload["robot_id"], "default");
}
TEST(MissionEnqueue, BuildPayloadMissingMissionFails)
{
lm::MissionStore store = makeStore();
nlohmann::json payload;
std::string err;
const nlohmann::json request = {{"mission_id", "does_not_exist"}};
EXPECT_FALSE(lm::MissionEnqueue::buildPayload(store, request, payload, err));
EXPECT_FALSE(err.empty());
}
TEST(MissionEnqueue, BuildPayloadRequiresMissionOrId)
{
lm::MissionStore store = makeStore();
nlohmann::json payload;
std::string err;
EXPECT_FALSE(lm::MissionEnqueue::buildPayload(store, nlohmann::json::object(), payload, err));
}

View File

@@ -0,0 +1,96 @@
#include "mission/mission_store.hpp"
#include <gtest/gtest.h>
#include <filesystem>
namespace {
class MissionStoreTest : public ::testing::Test
{
protected:
void SetUp() override
{
dir_ = std::filesystem::temp_directory_path() / "lm_test_store";
std::filesystem::remove_all(dir_);
std::filesystem::create_directories(dir_);
store_path_ = dir_ / "missions.json";
std::filesystem::copy_file(std::filesystem::path(TEST_FIXTURE_DIR) / "missions.json",
store_path_,
std::filesystem::copy_options::overwrite_existing);
store_ = std::make_unique<lm::MissionStore>(store_path_);
}
std::filesystem::path dir_;
std::filesystem::path store_path_;
std::unique_ptr<lm::MissionStore> store_;
};
} // namespace
TEST_F(MissionStoreTest, FindMissionFromFixture)
{
const auto mission = store_->findMission("testmission00001");
ASSERT_TRUE(mission.has_value());
EXPECT_EQ((*mission)["name"], "Test Wait");
}
TEST_F(MissionStoreTest, AddTriggerValidCoil)
{
std::string err;
const auto trigger = store_->addTrigger(
{{"name", "PLC line 1"}, {"coil_id", 1001}, {"mission_id", "testmission00001"}}, err);
ASSERT_TRUE(trigger.has_value()) << err;
EXPECT_EQ((*trigger)["coil_id"], 1001);
}
TEST_F(MissionStoreTest, AddTriggerRejectsInvalidCoil)
{
std::string err;
const auto trigger =
store_->addTrigger({{"name", "bad"}, {"coil_id", 999}, {"mission_id", "testmission00001"}}, err);
EXPECT_FALSE(trigger.has_value());
EXPECT_FALSE(err.empty());
}
TEST_F(MissionStoreTest, AddTriggerRejectsDuplicateCoil)
{
std::string err;
ASSERT_TRUE(store_->addTrigger(
{{"name", "first"}, {"coil_id", 1002}, {"mission_id", "testmission00001"}}, err)
.has_value())
<< err;
const auto dup = store_->addTrigger(
{{"name", "second"}, {"coil_id", 1002}, {"mission_id", "testmission00001"}}, err);
EXPECT_FALSE(dup.has_value());
}
TEST_F(MissionStoreTest, DeleteTrigger)
{
std::string err;
const auto trigger = store_->addTrigger(
{{"name", "tmp"}, {"coil_id", 1003}, {"mission_id", "testmission00001"}}, err);
ASSERT_TRUE(trigger.has_value()) << err;
const std::string id = (*trigger)["id"].get<std::string>();
EXPECT_TRUE(store_->deleteTrigger(id, err)) << err;
EXPECT_FALSE(store_->findTriggerByCoil(1003).has_value());
}
TEST_F(MissionStoreTest, AddScheduleAsap)
{
std::string err;
const auto schedule = store_->addSchedule(
{{"name", "Morning run"}, {"mission_id", "testmission00001"}, {"priority", 5}, {"start_mode", "asap"}},
err);
ASSERT_TRUE(schedule.has_value()) << err;
EXPECT_EQ((*schedule)["priority"], 5);
EXPECT_EQ((*schedule)["start_mode"], "asap");
}
TEST_F(MissionStoreTest, AddScheduleUnknownMissionFails)
{
std::string err;
const auto schedule =
store_->addSchedule({{"name", "bad"}, {"mission_id", "missing"}, {"start_mode", "asap"}}, err);
EXPECT_FALSE(schedule.has_value());
}

View File

@@ -0,0 +1,63 @@
#include "validation/sensor_validator.hpp"
#include <gtest/gtest.h>
TEST(SensorValidator, LidarRequiresNameIpPort)
{
std::string err;
EXPECT_FALSE(lm::SensorValidator::validateLidarPayload(nlohmann::json::object(), err));
EXPECT_FALSE(err.empty());
err.clear();
EXPECT_TRUE(lm::SensorValidator::validateLidarPayload(
{{"name", "front"}, {"ip", "192.168.1.10"}, {"port", 2112}}, err))
<< err;
}
TEST(SensorValidator, LidarPortRange)
{
std::string err;
EXPECT_FALSE(lm::SensorValidator::validateLidarPayload(
{{"name", "front"}, {"ip", "192.168.1.10"}, {"port", 70000}}, err));
}
TEST(SensorValidator, ImuRequiresFrameAndTopic)
{
std::string err;
EXPECT_FALSE(lm::SensorValidator::validateImuPayload({{"name", "imu1"}}, err));
err.clear();
EXPECT_TRUE(lm::SensorValidator::validateImuPayload(
{{"name", "imu1"}, {"frame_id", "imu_link"}, {"topic", "/imu/data"}}, err))
<< err;
}
TEST(SensorValidator, ImuInvalidSource)
{
std::string err;
EXPECT_FALSE(lm::SensorValidator::validateImuPayload(
{{"name", "imu1"},
{"frame_id", "imu_link"},
{"topic", "/imu/data"},
{"source", "invalid_source"}},
err));
}
TEST(SensorValidator, LidarTripletDuplicateDetection)
{
const nlohmann::json state = {
{"lidars",
nlohmann::json::array({{{"id", "l1"}, {"name", "front"}, {"ip", "10.0.0.1"}, {"port", 2112}}})}};
EXPECT_TRUE(lm::SensorValidator::lidarTripletExists(state, "front", "10.0.0.1", 2112));
const std::string exclude = "l1";
EXPECT_FALSE(lm::SensorValidator::lidarTripletExists(state, "front", "10.0.0.1", 2112, &exclude));
}
TEST(SensorValidator, ImuFrameDuplicateDetection)
{
const nlohmann::json state = {
{"imus", nlohmann::json::array({{{"id", "i1"}, {"frame_id", "base_imu"}}})}};
EXPECT_TRUE(lm::SensorValidator::imuFrameExists(state, "base_imu"));
const std::string exclude = "i1";
EXPECT_FALSE(lm::SensorValidator::imuFrameExists(state, "base_imu", &exclude));
}