261 lines
8.0 KiB
Python
261 lines
8.0 KiB
Python
"""Integration tests for lidar_manager_web REST API."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import time
|
|
|
|
import pytest
|
|
import requests
|
|
|
|
BASE = os.environ.get("TEST_BASE_URL", "http://127.0.0.1:18080")
|
|
TIMEOUT = 10
|
|
PREFERRED_MISSION_ID = "testmission00001"
|
|
|
|
|
|
def resolve_mission_id(api: requests.Session) -> str:
|
|
env_id = os.environ.get("TEST_MISSION_ID", "").strip()
|
|
if env_id:
|
|
return env_id
|
|
missions = api.get(f"{BASE}/api/missions", timeout=TIMEOUT).json().get("missions", [])
|
|
ids = [m["id"] for m in missions if isinstance(m, dict) and m.get("id")]
|
|
if PREFERRED_MISSION_ID in ids:
|
|
return PREFERRED_MISSION_ID
|
|
if not ids:
|
|
pytest.fail("no missions available")
|
|
return ids[0]
|
|
|
|
|
|
def login_admin(api: requests.Session) -> None:
|
|
r = api.post(
|
|
f"{BASE}/api/auth/login",
|
|
json={"username": "Admin", "password": "admin"},
|
|
timeout=TIMEOUT,
|
|
)
|
|
assert r.status_code == 200, r.text
|
|
assert r.json().get("user", {}).get("username") == "Admin"
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def api():
|
|
session = requests.Session()
|
|
session.headers.update({"Content-Type": "application/json"})
|
|
deadline = time.time() + TIMEOUT
|
|
while time.time() < deadline:
|
|
try:
|
|
r = session.get(f"{BASE}/api/health", timeout=1)
|
|
if r.status_code == 200 and r.json().get("ok"):
|
|
break
|
|
except requests.RequestException:
|
|
pass
|
|
time.sleep(0.2)
|
|
else:
|
|
pytest.fail(f"Server not ready at {BASE}")
|
|
login_admin(session)
|
|
return session
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def mission_id(api):
|
|
return resolve_mission_id(api)
|
|
|
|
|
|
def clear_queue(api: requests.Session) -> None:
|
|
api.delete(f"{BASE}/api/mission_queue", timeout=TIMEOUT)
|
|
|
|
|
|
def wait_runner_state(api: requests.Session, state: str, timeout: float = 8.0) -> dict:
|
|
deadline = time.time() + timeout
|
|
while time.time() < deadline:
|
|
runner = api.get(f"{BASE}/api/mission_queue", timeout=TIMEOUT).json().get("runner", {})
|
|
if runner.get("state") == state:
|
|
return runner
|
|
time.sleep(0.15)
|
|
pytest.fail(f"runner did not reach state {state!r} within {timeout}s")
|
|
|
|
|
|
def enqueue_mission(api: requests.Session, mission_id: str) -> None:
|
|
r = api.post(
|
|
f"{BASE}/api/mission_queue",
|
|
json={"mission_id": mission_id},
|
|
timeout=TIMEOUT,
|
|
)
|
|
assert r.status_code == 201
|
|
|
|
|
|
def test_health(api):
|
|
r = api.get(f"{BASE}/api/health", timeout=TIMEOUT)
|
|
assert r.status_code == 200
|
|
assert r.json()["ok"] is True
|
|
|
|
|
|
def test_auth_me(api):
|
|
r = api.get(f"{BASE}/api/auth/me", timeout=TIMEOUT)
|
|
assert r.status_code == 200
|
|
user = r.json().get("user", {})
|
|
assert user.get("username") == "Admin"
|
|
assert user.get("permissions", {}).get("missions") == "write"
|
|
|
|
|
|
def test_auth_unauthorized_without_session():
|
|
session = requests.Session()
|
|
r = session.get(f"{BASE}/api/missions", timeout=TIMEOUT)
|
|
assert r.status_code == 401
|
|
|
|
|
|
def test_auth_user_read_only_missions():
|
|
session = requests.Session()
|
|
login = session.post(
|
|
f"{BASE}/api/auth/login",
|
|
json={"username": "User", "password": "user"},
|
|
timeout=TIMEOUT,
|
|
)
|
|
assert login.status_code == 200
|
|
listed = session.get(f"{BASE}/api/missions", timeout=TIMEOUT)
|
|
assert listed.status_code == 200
|
|
created = session.post(
|
|
f"{BASE}/api/triggers",
|
|
json={"name": "deny-trigger", "coil_id": 1009, "mission_id": "testmission00001"},
|
|
timeout=TIMEOUT,
|
|
)
|
|
assert created.status_code == 403
|
|
|
|
|
|
def test_missions_available(api, mission_id):
|
|
r = api.get(f"{BASE}/api/missions", timeout=TIMEOUT)
|
|
assert r.status_code == 200
|
|
ids = {m["id"] for m in r.json().get("missions", [])}
|
|
assert mission_id in ids
|
|
|
|
|
|
def test_mir_v2_enqueue_and_list(api, mission_id):
|
|
api.delete(f"{BASE}/api/mission_queue", timeout=TIMEOUT)
|
|
r = api.post(
|
|
f"{BASE}/api/v2.0.0/mission_queue",
|
|
json={"mission_id": mission_id, "priority": 3, "robot_id": "default"},
|
|
timeout=TIMEOUT,
|
|
)
|
|
assert r.status_code == 201
|
|
body = r.json()
|
|
assert body["mission_id"] == mission_id
|
|
assert body["priority"] == 3
|
|
|
|
listed = api.get(f"{BASE}/api/v2.0.0/mission_queue", timeout=TIMEOUT)
|
|
assert listed.status_code == 200
|
|
assert any(item.get("mission_id") == mission_id for item in listed.json())
|
|
|
|
|
|
def test_queue_pause_continue(api, mission_id):
|
|
clear_queue(api)
|
|
enqueue_mission(api, mission_id)
|
|
wait_runner_state(api, "running", timeout=5)
|
|
|
|
r = api.post(f"{BASE}/api/mission_queue/pause", timeout=TIMEOUT)
|
|
assert r.status_code == 200
|
|
assert r.json().get("state") == "paused"
|
|
|
|
r = api.post(f"{BASE}/api/mission_queue/continue", timeout=TIMEOUT)
|
|
assert r.status_code == 200
|
|
assert r.json().get("state") != "paused"
|
|
|
|
|
|
def test_queue_cancel_rejects_when_idle(api):
|
|
clear_queue(api)
|
|
r = api.post(f"{BASE}/api/mission_queue/cancel", timeout=TIMEOUT)
|
|
assert r.status_code == 400
|
|
body = r.json()
|
|
assert "error" in body
|
|
|
|
|
|
def test_queue_cancel_stops_running_mission(api, mission_id):
|
|
clear_queue(api)
|
|
enqueue_mission(api, mission_id)
|
|
wait_runner_state(api, "running", timeout=5)
|
|
|
|
r = api.post(f"{BASE}/api/mission_queue/cancel", timeout=TIMEOUT)
|
|
assert r.status_code == 200
|
|
assert r.json().get("message")
|
|
|
|
wait_runner_state(api, "idle", timeout=8)
|
|
data = api.get(f"{BASE}/api/mission_queue", timeout=TIMEOUT).json()
|
|
assert data["runner"]["state"] == "idle"
|
|
|
|
cancelled = [item for item in data.get("queue", []) if item.get("status") == "cancelled"]
|
|
assert cancelled, "expected a cancelled queue entry"
|
|
assert cancelled[0].get("mission_id") == mission_id
|
|
log = cancelled[0].get("log") or []
|
|
assert any(entry.get("message") == "Mission hủy bởi operator" for entry in log if isinstance(entry, dict))
|
|
|
|
|
|
def test_queue_cancel_rejects_while_cancelling(api, mission_id):
|
|
clear_queue(api)
|
|
enqueue_mission(api, mission_id)
|
|
wait_runner_state(api, "running", timeout=5)
|
|
|
|
first = api.post(f"{BASE}/api/mission_queue/cancel", timeout=TIMEOUT)
|
|
assert first.status_code == 200
|
|
|
|
second = api.post(f"{BASE}/api/mission_queue/cancel", timeout=TIMEOUT)
|
|
assert second.status_code == 400
|
|
|
|
wait_runner_state(api, "idle", timeout=8)
|
|
|
|
|
|
def test_modbus_trigger_flow(api, mission_id):
|
|
trig = api.post(
|
|
f"{BASE}/api/triggers",
|
|
json={"name": "pytest-trigger", "coil_id": 1005, "mission_id": mission_id},
|
|
timeout=TIMEOUT,
|
|
)
|
|
assert trig.status_code == 201
|
|
trig_id = trig.json()["id"]
|
|
|
|
before = len(api.get(f"{BASE}/api/mission_queue", timeout=TIMEOUT).json())
|
|
fire = api.post(f"{BASE}/api/modbus/coils/1005/trigger", timeout=TIMEOUT)
|
|
assert fire.status_code == 200
|
|
|
|
after = api.get(f"{BASE}/api/mission_queue", timeout=TIMEOUT).json()
|
|
assert len(after) >= before
|
|
|
|
deleted = api.delete(f"{BASE}/api/triggers/{trig_id}", timeout=TIMEOUT)
|
|
assert deleted.status_code == 204
|
|
|
|
|
|
def test_fleet_schedule_asap(api, mission_id):
|
|
r = api.post(
|
|
f"{BASE}/api/fleet/schedules",
|
|
json={
|
|
"name": "pytest-schedule",
|
|
"mission_id": mission_id,
|
|
"start_mode": "asap",
|
|
"priority": 0,
|
|
"robot_id": "default",
|
|
},
|
|
timeout=TIMEOUT,
|
|
)
|
|
assert r.status_code == 201
|
|
sched_id = r.json()["id"]
|
|
|
|
deleted = api.delete(f"{BASE}/api/fleet/schedules/{sched_id}", timeout=TIMEOUT)
|
|
assert deleted.status_code == 204
|
|
|
|
|
|
def test_lidar_crud(api):
|
|
created = api.post(
|
|
f"{BASE}/api/lidars",
|
|
json={"name": "pytest-lidar", "ip": "10.99.0.1", "port": 2112},
|
|
timeout=TIMEOUT,
|
|
)
|
|
assert created.status_code == 201
|
|
lidar_id = created.json()["id"]
|
|
|
|
updated = api.put(
|
|
f"{BASE}/api/lidars/{lidar_id}",
|
|
json={"name": "pytest-lidar", "ip": "10.99.0.2", "port": 2112},
|
|
timeout=TIMEOUT,
|
|
)
|
|
assert updated.status_code == 200
|
|
|
|
deleted = api.delete(f"{BASE}/api/lidars/{lidar_id}", timeout=TIMEOUT)
|
|
assert deleted.status_code == 204
|