Clean and Test làn 3
Some checks failed
Test / test (push) Has been cancelled

This commit is contained in:
2026-06-13 14:04:56 +07:00
parent fbc0c11be2
commit c05b1d5f5c
12 changed files with 281 additions and 40 deletions

View File

@@ -9,8 +9,21 @@ import pytest
import requests
BASE = os.environ.get("TEST_BASE_URL", "http://127.0.0.1:18080")
MISSION_ID = os.environ.get("TEST_MISSION_ID", "testmission00001")
TIMEOUT = 10
PREFERRED_MISSION_ID = "testmission00001"
def resolve_mission_id(api: requests.Session) -> str:
env_id = os.environ.get("TEST_MISSION_ID", "").strip()
if env_id:
return env_id
missions = api.get(f"{BASE}/api/missions", timeout=TIMEOUT).json().get("missions", [])
ids = [m["id"] for m in missions if isinstance(m, dict) and m.get("id")]
if PREFERRED_MISSION_ID in ids:
return PREFERRED_MISSION_ID
if not ids:
pytest.fail("no missions available")
return ids[0]
@pytest.fixture(scope="module")
@@ -31,41 +44,46 @@ def api():
return session
@pytest.fixture(scope="module")
def mission_id(api):
return resolve_mission_id(api)
def test_health(api):
r = api.get(f"{BASE}/api/health", timeout=TIMEOUT)
assert r.status_code == 200
assert r.json()["ok"] is True
def test_missions_fixture(api):
def test_missions_available(api, mission_id):
r = api.get(f"{BASE}/api/missions", timeout=TIMEOUT)
assert r.status_code == 200
ids = {m["id"] for m in r.json().get("missions", [])}
assert MISSION_ID in ids
assert mission_id in ids
def test_mir_v2_enqueue_and_list(api):
def test_mir_v2_enqueue_and_list(api, mission_id):
api.delete(f"{BASE}/api/mission_queue", timeout=TIMEOUT)
r = api.post(
f"{BASE}/api/v2.0.0/mission_queue",
json={"mission_id": MISSION_ID, "priority": 3, "robot_id": "default"},
json={"mission_id": mission_id, "priority": 3, "robot_id": "default"},
timeout=TIMEOUT,
)
assert r.status_code == 201
body = r.json()
assert body["mission_id"] == MISSION_ID
assert body["mission_id"] == mission_id
assert body["priority"] == 3
listed = api.get(f"{BASE}/api/v2.0.0/mission_queue", timeout=TIMEOUT)
assert listed.status_code == 200
assert any(item.get("mission_id") == MISSION_ID for item in listed.json())
assert any(item.get("mission_id") == mission_id for item in listed.json())
def test_queue_pause_continue(api):
def test_queue_pause_continue(api, mission_id):
api.delete(f"{BASE}/api/mission_queue", timeout=TIMEOUT)
api.post(
f"{BASE}/api/mission_queue",
json={"mission_id": MISSION_ID},
json={"mission_id": mission_id},
timeout=TIMEOUT,
)
deadline = time.time() + 5
@@ -86,10 +104,10 @@ def test_queue_pause_continue(api):
assert r.json().get("state") != "paused"
def test_modbus_trigger_flow(api):
def test_modbus_trigger_flow(api, mission_id):
trig = api.post(
f"{BASE}/api/triggers",
json={"name": "pytest-trigger", "coil_id": 1005, "mission_id": MISSION_ID},
json={"name": "pytest-trigger", "coil_id": 1005, "mission_id": mission_id},
timeout=TIMEOUT,
)
assert trig.status_code == 201
@@ -106,12 +124,12 @@ def test_modbus_trigger_flow(api):
assert deleted.status_code == 204
def test_fleet_schedule_asap(api):
def test_fleet_schedule_asap(api, mission_id):
r = api.post(
f"{BASE}/api/fleet/schedules",
json={
"name": "pytest-schedule",
"mission_id": MISSION_ID,
"mission_id": mission_id,
"start_mode": "asap",
"priority": 0,
"robot_id": "default",