from __future__ import annotations import hashlib import os import traceback from typing import Any from app.config import settings from app.core.checksum import verify_sha256 from app.core.command_runner import CommandRunner from app.core.downloader import Downloader from app.core.installer import DebInstaller from app.core.manifest_client import ManifestClient from app.core.manifest_validator import ManifestValidator from app.core.service_manager import ServiceManager from app.models.schemas import InstallRequest, RemoveRequest, UpdateRequest from app.storage.repository import Repository, utc_now class TaskRunner: def __init__(self, repository: Repository) -> None: self.repository = repository self.manifest_client = ManifestClient() self.manifest_validator = ManifestValidator() def run_install(self, task_id: str, request: InstallRequest | UpdateRequest, task_type: str = "install") -> None: try: self._mark_started(task_id, f"starting {task_type}") self._require_root_if_available() manifest = self._resolve_manifest(request) self.repository.add_log(task_id, "info", f"Installing {manifest['appId']} {manifest['version']}") self._install_manifest(task_id, manifest) manifest_hash = hashlib.sha256( self.repository.export_manifest_hash(manifest).encode("utf-8") ).hexdigest() self.repository.upsert_installed_app( manifest["appId"], manifest["appName"], manifest["version"], manifest_hash, ) self.repository.update_task( task_id, status="success", progress=100, current_step="completed", finished_at=utc_now(), ) self.repository.add_log(task_id, "info", f"Task {task_id} completed") except Exception as error: self._fail_task(task_id, error) def run_remove(self, task_id: str, request: RemoveRequest) -> None: try: if not settings.allow_remove: raise ValueError("Remove is disabled on this Agent") if request.purge and not settings.allow_purge: raise ValueError("Purge is disabled on this Agent") self._mark_started(task_id, "starting remove") self._require_root_if_available() components = self.repository.list_installed_components(request.app_id) if not components and request.package_name: components = [ { "component_id": request.package_name, "type": "deb", "install_order": 10, "package_name": request.package_name, "service_name": request.service_name, } ] if not components: raise ValueError("No installed components found for this app") command_runner = CommandRunner(self.repository, task_id) installer = DebInstaller(command_runner) services = ServiceManager(command_runner) ordered = sorted(components, key=lambda item: item["install_order"], reverse=True) total = len(ordered) for index, component in enumerate(ordered, start=1): progress = int((index - 1) / total * 80) + 10 component_id = component["component_id"] self.repository.update_task( task_id, progress=progress, current_step=f"removing {component_id}", current_component_id=component_id, ) service_name = component.get("service_name") if service_name: self.repository.add_log(task_id, "info", f"Stopping service {service_name}") services.stop_service(service_name) services.disable_service(service_name) package_name = component.get("package_name") if component["type"] == "deb" and package_name: self.repository.add_log(task_id, "info", f"Removing package {package_name}") installer.remove_package(package_name, purge=request.purge) else: raise ValueError(f"Unsupported installed component type: {component['type']}") self.repository.delete_installed_app(request.app_id) self.repository.update_task( task_id, status="success", progress=100, current_step="completed", finished_at=utc_now(), ) except Exception as error: self._fail_task(task_id, error) def _resolve_manifest(self, request: InstallRequest | UpdateRequest) -> dict[str, Any]: if request.download_url: digest = request.sha256 or request.checksum return self.manifest_validator.validate( { "schemaVersion": "1.0", "appId": request.app_id, "appName": request.app_name or request.app_id, "version": request.version, "architecture": "amd64", "components": [ { "componentId": request.package_name, "type": "deb", "installOrder": 10, "required": True, "packageName": request.package_name, "version": request.version, "downloadUrl": request.download_url, "sha256": digest, "serviceName": request.service_name, } ], } ) payload = self.manifest_client.fetch_manifest(request.app_id, request.version) return self.manifest_validator.validate(payload) def _install_manifest(self, task_id: str, manifest: dict[str, Any]) -> None: components = manifest["components"] for component in components: self.repository.create_task_component( task_id, manifest["appId"], component["componentId"], component["type"], component.get("installOrder", 10), ) total = len(components) if total == 0: raise ValueError("Manifest has no installable components") for index, component in enumerate(components, start=1): base_progress = int((index - 1) / total * 80) + 10 component_id = component["componentId"] self.repository.update_task( task_id, progress=base_progress, current_step=f"installing {component_id}", current_component_id=component_id, ) self.repository.update_task_component( task_id, component_id, status="running", progress=5, current_step="downloading", started_at=utc_now(), ) if component["type"] != "deb": raise ValueError(f"Unsupported component type in MVP: {component['type']}") self._install_deb_component(task_id, manifest["appId"], component) self.repository.update_task_component( task_id, component_id, status="success", progress=100, current_step="completed", finished_at=utc_now(), ) def _install_deb_component(self, task_id: str, app_id: str, component: dict[str, Any]) -> None: component_id = component["componentId"] downloader = Downloader(self.repository, task_id) command_runner = CommandRunner(self.repository, task_id) installer = DebInstaller(command_runner) services = ServiceManager(command_runner) package_path = downloader.download(component["downloadUrl"]) self.repository.update_task_component(task_id, component_id, progress=35, current_step="verifying checksum") if not verify_sha256(package_path, component["sha256"]): raise ValueError(f"Checksum mismatch for {component_id}") self.repository.add_log(task_id, "info", f"Checksum verified for {component_id}") self.repository.update_task_component(task_id, component_id, progress=60, current_step="installing package") installer.install_deb(package_path) self.repository.update_task_component(task_id, component_id, progress=75, current_step="verifying package") installed_version = installer.get_package_version(component["packageName"]) self.repository.add_log( task_id, "info", f"Package {component['packageName']} installed with version {installed_version}", ) service_name = component.get("serviceName") if service_name: self.repository.update_task_component(task_id, component_id, progress=90, current_step="starting service") services.enable_service(service_name) services.start_service(service_name) self.repository.upsert_installed_component(app_id, component) def _mark_started(self, task_id: str, step: str) -> None: self.repository.update_task( task_id, status="running", progress=5, current_step=step, started_at=utc_now(), ) self.repository.add_log(task_id, "info", step) def _fail_task(self, task_id: str, error: Exception) -> None: self.repository.update_task( task_id, status="failed", current_step="failed", error_message=str(error), finished_at=utc_now(), ) self.repository.add_log(task_id, "error", str(error)) self.repository.add_log(task_id, "debug", traceback.format_exc()) def _require_root_if_available(self) -> None: geteuid = getattr(os, "geteuid", None) if callable(geteuid) and geteuid() != 0: raise PermissionError("Agent must run as root to call apt and systemctl")