"""Integration tests for lidar_manager_web REST API.""" from __future__ import annotations import os import time import pytest import requests BASE = os.environ.get("TEST_BASE_URL", "http://127.0.0.1:18080") TIMEOUT = 10 PREFERRED_MISSION_ID = "testmission00001" def resolve_mission_id(api: requests.Session) -> str: env_id = os.environ.get("TEST_MISSION_ID", "").strip() if env_id: return env_id missions = api.get(f"{BASE}/api/missions", timeout=TIMEOUT).json().get("missions", []) ids = [m["id"] for m in missions if isinstance(m, dict) and m.get("id")] if PREFERRED_MISSION_ID in ids: return PREFERRED_MISSION_ID if not ids: pytest.fail("no missions available") return ids[0] @pytest.fixture(scope="module") def api(): session = requests.Session() session.headers.update({"Content-Type": "application/json"}) deadline = time.time() + TIMEOUT while time.time() < deadline: try: r = session.get(f"{BASE}/api/health", timeout=1) if r.status_code == 200 and r.json().get("ok"): break except requests.RequestException: pass time.sleep(0.2) else: pytest.fail(f"Server not ready at {BASE}") return session @pytest.fixture(scope="module") def mission_id(api): return resolve_mission_id(api) def clear_queue(api: requests.Session) -> None: api.delete(f"{BASE}/api/mission_queue", timeout=TIMEOUT) def wait_runner_state(api: requests.Session, state: str, timeout: float = 8.0) -> dict: deadline = time.time() + timeout while time.time() < deadline: runner = api.get(f"{BASE}/api/mission_queue", timeout=TIMEOUT).json().get("runner", {}) if runner.get("state") == state: return runner time.sleep(0.15) pytest.fail(f"runner did not reach state {state!r} within {timeout}s") def enqueue_mission(api: requests.Session, mission_id: str) -> None: r = api.post( f"{BASE}/api/mission_queue", json={"mission_id": mission_id}, timeout=TIMEOUT, ) assert r.status_code == 201 def test_health(api): r = api.get(f"{BASE}/api/health", timeout=TIMEOUT) assert r.status_code == 200 assert r.json()["ok"] is True def test_missions_available(api, mission_id): r = api.get(f"{BASE}/api/missions", timeout=TIMEOUT) assert r.status_code == 200 ids = {m["id"] for m in r.json().get("missions", [])} assert mission_id in ids def test_mir_v2_enqueue_and_list(api, mission_id): api.delete(f"{BASE}/api/mission_queue", timeout=TIMEOUT) r = api.post( f"{BASE}/api/v2.0.0/mission_queue", json={"mission_id": mission_id, "priority": 3, "robot_id": "default"}, timeout=TIMEOUT, ) assert r.status_code == 201 body = r.json() assert body["mission_id"] == mission_id assert body["priority"] == 3 listed = api.get(f"{BASE}/api/v2.0.0/mission_queue", timeout=TIMEOUT) assert listed.status_code == 200 assert any(item.get("mission_id") == mission_id for item in listed.json()) def test_queue_pause_continue(api, mission_id): clear_queue(api) enqueue_mission(api, mission_id) wait_runner_state(api, "running", timeout=5) r = api.post(f"{BASE}/api/mission_queue/pause", timeout=TIMEOUT) assert r.status_code == 200 assert r.json().get("state") == "paused" r = api.post(f"{BASE}/api/mission_queue/continue", timeout=TIMEOUT) assert r.status_code == 200 assert r.json().get("state") != "paused" def test_queue_cancel_rejects_when_idle(api): clear_queue(api) r = api.post(f"{BASE}/api/mission_queue/cancel", timeout=TIMEOUT) assert r.status_code == 400 body = r.json() assert "error" in body def test_queue_cancel_stops_running_mission(api, mission_id): clear_queue(api) enqueue_mission(api, mission_id) wait_runner_state(api, "running", timeout=5) r = api.post(f"{BASE}/api/mission_queue/cancel", timeout=TIMEOUT) assert r.status_code == 200 assert r.json().get("message") wait_runner_state(api, "idle", timeout=8) data = api.get(f"{BASE}/api/mission_queue", timeout=TIMEOUT).json() assert data["runner"]["state"] == "idle" cancelled = [item for item in data.get("queue", []) if item.get("status") == "cancelled"] assert cancelled, "expected a cancelled queue entry" assert cancelled[0].get("mission_id") == mission_id log = cancelled[0].get("log") or [] assert any(entry.get("message") == "Mission hủy bởi operator" for entry in log if isinstance(entry, dict)) def test_queue_cancel_rejects_while_cancelling(api, mission_id): clear_queue(api) enqueue_mission(api, mission_id) wait_runner_state(api, "running", timeout=5) first = api.post(f"{BASE}/api/mission_queue/cancel", timeout=TIMEOUT) assert first.status_code == 200 second = api.post(f"{BASE}/api/mission_queue/cancel", timeout=TIMEOUT) assert second.status_code == 400 wait_runner_state(api, "idle", timeout=8) def test_modbus_trigger_flow(api, mission_id): trig = api.post( f"{BASE}/api/triggers", json={"name": "pytest-trigger", "coil_id": 1005, "mission_id": mission_id}, timeout=TIMEOUT, ) assert trig.status_code == 201 trig_id = trig.json()["id"] before = len(api.get(f"{BASE}/api/mission_queue", timeout=TIMEOUT).json()) fire = api.post(f"{BASE}/api/modbus/coils/1005/trigger", timeout=TIMEOUT) assert fire.status_code == 200 after = api.get(f"{BASE}/api/mission_queue", timeout=TIMEOUT).json() assert len(after) >= before deleted = api.delete(f"{BASE}/api/triggers/{trig_id}", timeout=TIMEOUT) assert deleted.status_code == 204 def test_fleet_schedule_asap(api, mission_id): r = api.post( f"{BASE}/api/fleet/schedules", json={ "name": "pytest-schedule", "mission_id": mission_id, "start_mode": "asap", "priority": 0, "robot_id": "default", }, timeout=TIMEOUT, ) assert r.status_code == 201 sched_id = r.json()["id"] deleted = api.delete(f"{BASE}/api/fleet/schedules/{sched_id}", timeout=TIMEOUT) assert deleted.status_code == 204 def test_lidar_crud(api): created = api.post( f"{BASE}/api/lidars", json={"name": "pytest-lidar", "ip": "10.99.0.1", "port": 2112}, timeout=TIMEOUT, ) assert created.status_code == 201 lidar_id = created.json()["id"] updated = api.put( f"{BASE}/api/lidars/{lidar_id}", json={"name": "pytest-lidar", "ip": "10.99.0.2", "port": 2112}, timeout=TIMEOUT, ) assert updated.status_code == 200 deleted = api.delete(f"{BASE}/api/lidars/{lidar_id}", timeout=TIMEOUT) assert deleted.status_code == 204