from __future__ import annotations import os import subprocess from collections.abc import Mapping from app.config import settings from app.storage.repository import Repository def _tail_output(label: str, output: str, line_limit: int = 6) -> str: lines = [line.strip() for line in output.splitlines() if line.strip()] if not lines: return "" return f"{label}: {' | '.join(lines[-line_limit:])}" def _command_output_summary(stdout: str, stderr: str) -> str: parts = [ part for part in ( _tail_output("stderr", stderr), _tail_output("stdout", stdout), ) if part ] summary = " ; ".join(parts) return summary[:1600] class CommandError(RuntimeError): def __init__(self, command: list[str], returncode: int, stdout: str, stderr: str) -> None: message = f"Command failed with exit code {returncode}: {' '.join(command)}" output_summary = _command_output_summary(stdout, stderr) if output_summary: message = f"{message}. Last output: {output_summary}" super().__init__(message) self.command = command self.returncode = returncode self.stdout = stdout self.stderr = stderr class CommandRunner: def __init__(self, repository: Repository, task_id: str | None = None) -> None: self.repository = repository self.task_id = task_id def run( self, command: list[str], timeout: int | None = None, env: Mapping[str, str] | None = None, ) -> subprocess.CompletedProcess[str]: if self.task_id: self.repository.add_log(self.task_id, "debug", f"Running command: {' '.join(command)}") command_env = os.environ.copy() if env: command_env.update(env) try: result = subprocess.run( command, check=False, capture_output=True, env=command_env, stdin=subprocess.DEVNULL, text=True, timeout=timeout or settings.command_timeout_seconds, ) except subprocess.TimeoutExpired as error: if self.task_id: self.repository.add_log(self.task_id, "error", f"Command timed out: {' '.join(command)}") raise CommandError(command, 124, error.stdout or "", error.stderr or "") from error if self.task_id: for line in result.stdout.splitlines(): self.repository.add_log(self.task_id, "debug", line) for line in result.stderr.splitlines(): self.repository.add_log(self.task_id, "warning", line) if result.returncode != 0: raise CommandError(command, result.returncode, result.stdout, result.stderr) return result