from __future__ import annotations import re from pathlib import Path from urllib.parse import unquote, urlparse import httpx from app.config import settings from app.storage.repository import Repository from app.utils.validators import validate_url_host SAFE_FILE_RE = re.compile(r"[^a-zA-Z0-9._+-]+") def _safe_file_name(url: str) -> str: parsed = urlparse(url) name = Path(unquote(parsed.path)).name or "package.deb" return SAFE_FILE_RE.sub("-", name).strip("-") or "package.deb" class Downloader: def __init__(self, repository: Repository, task_id: str) -> None: self.repository = repository self.task_id = task_id def download(self, url: str) -> Path: validate_url_host(url, settings.allowed_download_hosts) settings.cache_dir.mkdir(parents=True, exist_ok=True) destination = settings.cache_dir / _safe_file_name(url) self.repository.add_log(self.task_id, "info", f"Downloading {url}") with httpx.stream("GET", url, follow_redirects=True, timeout=120) as response: response.raise_for_status() self._validate_response(url, response) with destination.open("wb") as handle: for chunk in response.iter_bytes(): handle.write(chunk) self.repository.add_log(self.task_id, "info", f"Downloaded to {destination}") return destination def _validate_response(self, requested_url: str, response: httpx.Response) -> None: final_url = str(response.url) validate_url_host(final_url, settings.allowed_download_hosts) content_type = response.headers.get("content-type", "").split(";", 1)[0].strip().lower() if content_type in {"text/html", "text/plain"}: raise ValueError( "download did not return a package file " f"(requested {requested_url}, final {final_url}, content-type {content_type or 'unknown'})" )