Files
InstallerRobot/agent/app/storage/repository.py
2026-05-22 16:47:51 +07:00

256 lines
9.4 KiB
Python

from __future__ import annotations
import json
from datetime import datetime, timezone
from typing import Any
from app.storage.database import get_connection
def utc_now() -> str:
return datetime.now(timezone.utc).isoformat(timespec="seconds").replace("+00:00", "Z")
def row_to_dict(row: Any) -> dict[str, Any] | None:
if row is None:
return None
return dict(row)
class Repository:
def create_task(self, task_id: str, task_type: str, app_id: str, app_name: str | None) -> None:
now = utc_now()
with get_connection() as connection:
connection.execute(
"""
INSERT INTO tasks (id, type, app_id, app_name, status, progress, current_step, created_at)
VALUES (?, ?, ?, ?, 'queued', 0, 'queued', ?)
""",
(task_id, task_type, app_id, app_name, now),
)
connection.execute(
"INSERT INTO task_logs (task_id, timestamp, level, message) VALUES (?, ?, 'info', ?)",
(task_id, now, f"Task {task_id} queued"),
)
def update_task(
self,
task_id: str,
*,
status: str | None = None,
progress: int | None = None,
current_step: str | None = None,
current_component_id: str | None = None,
error_message: str | None = None,
started_at: str | None = None,
finished_at: str | None = None,
) -> None:
fields: list[str] = []
values: list[Any] = []
for key, value in {
"status": status,
"progress": progress,
"current_step": current_step,
"current_component_id": current_component_id,
"error_message": error_message,
"started_at": started_at,
"finished_at": finished_at,
}.items():
if value is not None:
fields.append(f"{key} = ?")
values.append(value)
if not fields:
return
values.append(task_id)
with get_connection() as connection:
connection.execute(f"UPDATE tasks SET {', '.join(fields)} WHERE id = ?", values)
def add_log(self, task_id: str, level: str, message: str) -> None:
with get_connection() as connection:
connection.execute(
"INSERT INTO task_logs (task_id, timestamp, level, message) VALUES (?, ?, ?, ?)",
(task_id, utc_now(), level, message),
)
def get_task(self, task_id: str) -> dict[str, Any] | None:
with get_connection() as connection:
row = connection.execute("SELECT * FROM tasks WHERE id = ?", (task_id,)).fetchone()
return row_to_dict(row)
def get_task_logs(self, task_id: str) -> list[dict[str, Any]]:
with get_connection() as connection:
rows = connection.execute(
"""
SELECT timestamp, level, message
FROM task_logs
WHERE task_id = ?
ORDER BY id ASC
""",
(task_id,),
).fetchall()
return [dict(row) for row in rows]
def create_task_component(
self,
task_id: str,
app_id: str,
component_id: str,
component_type: str,
install_order: int,
) -> None:
with get_connection() as connection:
connection.execute(
"""
INSERT INTO task_components (
task_id, app_id, component_id, type, install_order, status, progress, current_step
)
VALUES (?, ?, ?, ?, ?, 'queued', 0, 'queued')
""",
(task_id, app_id, component_id, component_type, install_order),
)
def update_task_component(
self,
task_id: str,
component_id: str,
*,
status: str | None = None,
progress: int | None = None,
current_step: str | None = None,
error_message: str | None = None,
started_at: str | None = None,
finished_at: str | None = None,
) -> None:
fields: list[str] = []
values: list[Any] = []
for key, value in {
"status": status,
"progress": progress,
"current_step": current_step,
"error_message": error_message,
"started_at": started_at,
"finished_at": finished_at,
}.items():
if value is not None:
fields.append(f"{key} = ?")
values.append(value)
if not fields:
return
values.extend([task_id, component_id])
with get_connection() as connection:
connection.execute(
f"UPDATE task_components SET {', '.join(fields)} WHERE task_id = ? AND component_id = ?",
values,
)
def get_task_components(self, task_id: str) -> list[dict[str, Any]]:
with get_connection() as connection:
rows = connection.execute(
"""
SELECT component_id, type, status, progress, current_step, error_message, started_at, finished_at
FROM task_components
WHERE task_id = ?
ORDER BY install_order ASC, id ASC
""",
(task_id,),
).fetchall()
return [dict(row) for row in rows]
def list_installed_apps(self) -> list[dict[str, Any]]:
with get_connection() as connection:
rows = connection.execute(
"""
SELECT app_id, app_name, version, manifest_hash, status, installed_at, updated_at
FROM installed_apps
ORDER BY app_name ASC
"""
).fetchall()
return [dict(row) for row in rows]
def upsert_installed_app(
self,
app_id: str,
app_name: str,
version: str,
manifest_hash: str | None,
status: str = "installed",
) -> None:
now = utc_now()
with get_connection() as connection:
connection.execute(
"""
INSERT INTO installed_apps (app_id, app_name, version, manifest_hash, status, installed_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(app_id) DO UPDATE SET
app_name = excluded.app_name,
version = excluded.version,
manifest_hash = excluded.manifest_hash,
status = excluded.status,
updated_at = excluded.updated_at
""",
(app_id, app_name, version, manifest_hash, status, now, now),
)
def delete_installed_app(self, app_id: str) -> None:
with get_connection() as connection:
connection.execute("DELETE FROM installed_components WHERE app_id = ?", (app_id,))
connection.execute("DELETE FROM installed_apps WHERE app_id = ?", (app_id,))
def upsert_installed_component(self, app_id: str, component: dict[str, Any]) -> None:
now = utc_now()
with get_connection() as connection:
connection.execute(
"""
INSERT INTO installed_components (
app_id, component_id, type, install_order, status, package_name, package_version,
service_name, docker_image, docker_digest, container_name, compose_project_name,
installed_at, updated_at
)
VALUES (?, ?, ?, ?, 'installed', ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(app_id, component_id) DO UPDATE SET
type = excluded.type,
install_order = excluded.install_order,
status = excluded.status,
package_name = excluded.package_name,
package_version = excluded.package_version,
service_name = excluded.service_name,
docker_image = excluded.docker_image,
docker_digest = excluded.docker_digest,
container_name = excluded.container_name,
compose_project_name = excluded.compose_project_name,
updated_at = excluded.updated_at
""",
(
app_id,
component["componentId"],
component["type"],
component.get("installOrder", 10),
component.get("packageName"),
component.get("version"),
component.get("serviceName"),
component.get("image"),
component.get("digest"),
component.get("containerName"),
component.get("projectName"),
now,
now,
),
)
def list_installed_components(self, app_id: str) -> list[dict[str, Any]]:
with get_connection() as connection:
rows = connection.execute(
"""
SELECT *
FROM installed_components
WHERE app_id = ?
ORDER BY install_order ASC, id ASC
""",
(app_id,),
).fetchall()
return [dict(row) for row in rows]
def export_manifest_hash(self, manifest: dict[str, Any]) -> str:
return json.dumps(manifest, sort_keys=True, separators=(",", ":"))