Files
App/tests/test_api_integration.py
HiepLM 9aee5f4100
Some checks failed
Test / test (push) Has been cancelled
update function login
2026-06-16 09:57:55 +07:00

261 lines
8.0 KiB
Python

"""Integration tests for lidar_manager_web REST API."""
from __future__ import annotations
import os
import time
import pytest
import requests
BASE = os.environ.get("TEST_BASE_URL", "http://127.0.0.1:18080")
TIMEOUT = 10
PREFERRED_MISSION_ID = "testmission00001"
def resolve_mission_id(api: requests.Session) -> str:
env_id = os.environ.get("TEST_MISSION_ID", "").strip()
if env_id:
return env_id
missions = api.get(f"{BASE}/api/missions", timeout=TIMEOUT).json().get("missions", [])
ids = [m["id"] for m in missions if isinstance(m, dict) and m.get("id")]
if PREFERRED_MISSION_ID in ids:
return PREFERRED_MISSION_ID
if not ids:
pytest.fail("no missions available")
return ids[0]
def login_admin(api: requests.Session) -> None:
r = api.post(
f"{BASE}/api/auth/login",
json={"username": "Admin", "password": "admin"},
timeout=TIMEOUT,
)
assert r.status_code == 200, r.text
assert r.json().get("user", {}).get("username") == "Admin"
@pytest.fixture(scope="module")
def api():
session = requests.Session()
session.headers.update({"Content-Type": "application/json"})
deadline = time.time() + TIMEOUT
while time.time() < deadline:
try:
r = session.get(f"{BASE}/api/health", timeout=1)
if r.status_code == 200 and r.json().get("ok"):
break
except requests.RequestException:
pass
time.sleep(0.2)
else:
pytest.fail(f"Server not ready at {BASE}")
login_admin(session)
return session
@pytest.fixture(scope="module")
def mission_id(api):
return resolve_mission_id(api)
def clear_queue(api: requests.Session) -> None:
api.delete(f"{BASE}/api/mission_queue", timeout=TIMEOUT)
def wait_runner_state(api: requests.Session, state: str, timeout: float = 8.0) -> dict:
deadline = time.time() + timeout
while time.time() < deadline:
runner = api.get(f"{BASE}/api/mission_queue", timeout=TIMEOUT).json().get("runner", {})
if runner.get("state") == state:
return runner
time.sleep(0.15)
pytest.fail(f"runner did not reach state {state!r} within {timeout}s")
def enqueue_mission(api: requests.Session, mission_id: str) -> None:
r = api.post(
f"{BASE}/api/mission_queue",
json={"mission_id": mission_id},
timeout=TIMEOUT,
)
assert r.status_code == 201
def test_health(api):
r = api.get(f"{BASE}/api/health", timeout=TIMEOUT)
assert r.status_code == 200
assert r.json()["ok"] is True
def test_auth_me(api):
r = api.get(f"{BASE}/api/auth/me", timeout=TIMEOUT)
assert r.status_code == 200
user = r.json().get("user", {})
assert user.get("username") == "Admin"
assert user.get("permissions", {}).get("missions") == "write"
def test_auth_unauthorized_without_session():
session = requests.Session()
r = session.get(f"{BASE}/api/missions", timeout=TIMEOUT)
assert r.status_code == 401
def test_auth_user_read_only_missions():
session = requests.Session()
login = session.post(
f"{BASE}/api/auth/login",
json={"username": "User", "password": "user"},
timeout=TIMEOUT,
)
assert login.status_code == 200
listed = session.get(f"{BASE}/api/missions", timeout=TIMEOUT)
assert listed.status_code == 200
created = session.post(
f"{BASE}/api/triggers",
json={"name": "deny-trigger", "coil_id": 1009, "mission_id": "testmission00001"},
timeout=TIMEOUT,
)
assert created.status_code == 403
def test_missions_available(api, mission_id):
r = api.get(f"{BASE}/api/missions", timeout=TIMEOUT)
assert r.status_code == 200
ids = {m["id"] for m in r.json().get("missions", [])}
assert mission_id in ids
def test_mir_v2_enqueue_and_list(api, mission_id):
api.delete(f"{BASE}/api/mission_queue", timeout=TIMEOUT)
r = api.post(
f"{BASE}/api/v2.0.0/mission_queue",
json={"mission_id": mission_id, "priority": 3, "robot_id": "default"},
timeout=TIMEOUT,
)
assert r.status_code == 201
body = r.json()
assert body["mission_id"] == mission_id
assert body["priority"] == 3
listed = api.get(f"{BASE}/api/v2.0.0/mission_queue", timeout=TIMEOUT)
assert listed.status_code == 200
assert any(item.get("mission_id") == mission_id for item in listed.json())
def test_queue_pause_continue(api, mission_id):
clear_queue(api)
enqueue_mission(api, mission_id)
wait_runner_state(api, "running", timeout=5)
r = api.post(f"{BASE}/api/mission_queue/pause", timeout=TIMEOUT)
assert r.status_code == 200
assert r.json().get("state") == "paused"
r = api.post(f"{BASE}/api/mission_queue/continue", timeout=TIMEOUT)
assert r.status_code == 200
assert r.json().get("state") != "paused"
def test_queue_cancel_rejects_when_idle(api):
clear_queue(api)
r = api.post(f"{BASE}/api/mission_queue/cancel", timeout=TIMEOUT)
assert r.status_code == 400
body = r.json()
assert "error" in body
def test_queue_cancel_stops_running_mission(api, mission_id):
clear_queue(api)
enqueue_mission(api, mission_id)
wait_runner_state(api, "running", timeout=5)
r = api.post(f"{BASE}/api/mission_queue/cancel", timeout=TIMEOUT)
assert r.status_code == 200
assert r.json().get("message")
wait_runner_state(api, "idle", timeout=8)
data = api.get(f"{BASE}/api/mission_queue", timeout=TIMEOUT).json()
assert data["runner"]["state"] == "idle"
cancelled = [item for item in data.get("queue", []) if item.get("status") == "cancelled"]
assert cancelled, "expected a cancelled queue entry"
assert cancelled[0].get("mission_id") == mission_id
log = cancelled[0].get("log") or []
assert any(entry.get("message") == "Mission hủy bởi operator" for entry in log if isinstance(entry, dict))
def test_queue_cancel_rejects_while_cancelling(api, mission_id):
clear_queue(api)
enqueue_mission(api, mission_id)
wait_runner_state(api, "running", timeout=5)
first = api.post(f"{BASE}/api/mission_queue/cancel", timeout=TIMEOUT)
assert first.status_code == 200
second = api.post(f"{BASE}/api/mission_queue/cancel", timeout=TIMEOUT)
assert second.status_code == 400
wait_runner_state(api, "idle", timeout=8)
def test_modbus_trigger_flow(api, mission_id):
trig = api.post(
f"{BASE}/api/triggers",
json={"name": "pytest-trigger", "coil_id": 1005, "mission_id": mission_id},
timeout=TIMEOUT,
)
assert trig.status_code == 201
trig_id = trig.json()["id"]
before = len(api.get(f"{BASE}/api/mission_queue", timeout=TIMEOUT).json())
fire = api.post(f"{BASE}/api/modbus/coils/1005/trigger", timeout=TIMEOUT)
assert fire.status_code == 200
after = api.get(f"{BASE}/api/mission_queue", timeout=TIMEOUT).json()
assert len(after) >= before
deleted = api.delete(f"{BASE}/api/triggers/{trig_id}", timeout=TIMEOUT)
assert deleted.status_code == 204
def test_fleet_schedule_asap(api, mission_id):
r = api.post(
f"{BASE}/api/fleet/schedules",
json={
"name": "pytest-schedule",
"mission_id": mission_id,
"start_mode": "asap",
"priority": 0,
"robot_id": "default",
},
timeout=TIMEOUT,
)
assert r.status_code == 201
sched_id = r.json()["id"]
deleted = api.delete(f"{BASE}/api/fleet/schedules/{sched_id}", timeout=TIMEOUT)
assert deleted.status_code == 204
def test_lidar_crud(api):
created = api.post(
f"{BASE}/api/lidars",
json={"name": "pytest-lidar", "ip": "10.99.0.1", "port": 2112},
timeout=TIMEOUT,
)
assert created.status_code == 201
lidar_id = created.json()["id"]
updated = api.put(
f"{BASE}/api/lidars/{lidar_id}",
json={"name": "pytest-lidar", "ip": "10.99.0.2", "port": 2112},
timeout=TIMEOUT,
)
assert updated.status_code == 200
deleted = api.delete(f"{BASE}/api/lidars/{lidar_id}", timeout=TIMEOUT)
assert deleted.status_code == 204