from __future__ import annotations import re from urllib.parse import urlparse APP_ID_RE = re.compile(r"^[a-zA-Z0-9._+-]+$") PACKAGE_NAME_RE = re.compile(r"^[a-zA-Z0-9._+-]+$") SERVICE_NAME_RE = re.compile(r"^[a-zA-Z0-9._@+-]+\.service$") VERSION_RE = re.compile(r"^[a-zA-Z0-9._:+~=-]+$") SHA256_RE = re.compile(r"^[0-9a-fA-F]{64}$") DOCKER_DIGEST_RE = re.compile(r"^sha256:[0-9a-fA-F]{64}$") DOCKER_REF_RE = re.compile(r"^[a-z0-9][a-z0-9._:/@+-]{0,254}$") DOCKER_TAG_RE = re.compile(r"^[a-zA-Z0-9_][a-zA-Z0-9_.-]{0,127}$") CONTAINER_NAME_RE = re.compile(r"^[a-zA-Z0-9][a-zA-Z0-9_.-]{0,127}$") ENV_NAME_RE = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_]*$") PORT_MAPPING_RE = re.compile(r"^[a-zA-Z0-9_.:-]+(?:/(?:tcp|udp|sctp))?$") VOLUME_MAPPING_RE = re.compile(r"^[^\s:]+:[^\s:]+(?::(?:ro|rw))?$") def validate_app_id(value: str) -> str: if not value or not APP_ID_RE.fullmatch(value): raise ValueError("appId contains invalid characters") return value def validate_package_name(value: str) -> str: if not value or not PACKAGE_NAME_RE.fullmatch(value): raise ValueError("packageName contains invalid characters") return value def validate_service_name(value: str | None) -> str | None: if value is None or value == "": return None if not SERVICE_NAME_RE.fullmatch(value): raise ValueError("serviceName must be a systemd .service name") return value def validate_version(value: str) -> str: if not value or not VERSION_RE.fullmatch(value): raise ValueError("version contains invalid characters") return value def validate_sha256(value: str) -> str: if not value or not SHA256_RE.fullmatch(value): raise ValueError("sha256/checksum must be a 64 character hex digest") return value.lower() def validate_url_host(url: str, allowed_hosts: list[str]) -> str: parsed = urlparse(url) if parsed.scheme not in {"http", "https"}: raise ValueError("download URL must use http or https") if not parsed.hostname: raise ValueError("download URL is missing a host") if parsed.hostname not in set(allowed_hosts): raise ValueError(f"download host is not allowed: {parsed.hostname}") return url def validate_docker_image(value: str) -> str: image = value.strip() if not image or not DOCKER_REF_RE.fullmatch(image): raise ValueError("docker image contains invalid characters") if "//" in image or image.endswith(("/", ":", "@")): raise ValueError("docker image reference is malformed") return image def validate_docker_tag(value: str | None) -> str | None: if value is None or value == "": return None tag = value.strip() if not DOCKER_TAG_RE.fullmatch(tag): raise ValueError("docker tag contains invalid characters") return tag def validate_docker_digest(value: str | None) -> str | None: if value is None or value == "": return None digest = value.strip() if not DOCKER_DIGEST_RE.fullmatch(digest): raise ValueError("docker digest must be sha256:<64 hex characters>") return digest.lower() def validate_docker_registry(image: str, allowed_registries: list[str]) -> str: reference = image.split("@", 1)[0] parts = reference.split("/", 1) first_part = parts[0] has_explicit_registry = len(parts) > 1 and ( "." in first_part or ":" in first_part or first_part == "localhost" ) registry = first_part if has_explicit_registry else "docker.io" allowed = {item.strip() for item in allowed_registries if item.strip()} if "*" not in allowed and registry not in allowed: raise ValueError(f"docker registry is not allowed: {registry}") return image def validate_container_name(value: str) -> str: name = value.strip() if not name or not CONTAINER_NAME_RE.fullmatch(name): raise ValueError("containerName contains invalid characters") return name def validate_restart_policy(value: str | None) -> str: policy = (value or "unless-stopped").strip() if policy not in {"no", "always", "unless-stopped", "on-failure"}: raise ValueError("restartPolicy must be one of: no, always, unless-stopped, on-failure") return policy def validate_port_mapping(value: str) -> str: mapping = value.strip() if not mapping or not PORT_MAPPING_RE.fullmatch(mapping): raise ValueError("docker port mapping contains invalid characters") return mapping def validate_volume_mapping(value: str) -> str: mapping = value.strip() if not mapping or not VOLUME_MAPPING_RE.fullmatch(mapping): raise ValueError("docker volume mapping must be hostPath:containerPath[:ro|rw]") return mapping def validate_env_name(value: str) -> str: name = value.strip() if not ENV_NAME_RE.fullmatch(name): raise ValueError("docker env names may contain only letters, numbers, and underscore") return name