from __future__ import annotations import subprocess from app.config import settings from app.storage.repository import Repository class CommandError(RuntimeError): def __init__(self, command: list[str], returncode: int, stdout: str, stderr: str) -> None: super().__init__(f"Command failed with exit code {returncode}: {' '.join(command)}") self.command = command self.returncode = returncode self.stdout = stdout self.stderr = stderr class CommandRunner: def __init__(self, repository: Repository, task_id: str | None = None) -> None: self.repository = repository self.task_id = task_id def run(self, command: list[str], timeout: int | None = None) -> subprocess.CompletedProcess[str]: if self.task_id: self.repository.add_log(self.task_id, "debug", f"Running command: {' '.join(command)}") try: result = subprocess.run( command, check=False, capture_output=True, text=True, timeout=timeout or settings.command_timeout_seconds, ) except subprocess.TimeoutExpired as error: if self.task_id: self.repository.add_log(self.task_id, "error", f"Command timed out: {' '.join(command)}") raise CommandError(command, 124, error.stdout or "", error.stderr or "") from error if self.task_id: for line in result.stdout.splitlines(): self.repository.add_log(self.task_id, "debug", line) for line in result.stderr.splitlines(): self.repository.add_log(self.task_id, "warning", line) if result.returncode != 0: raise CommandError(command, result.returncode, result.stdout, result.stderr) return result