138 lines
4.8 KiB
Python
138 lines
4.8 KiB
Python
from __future__ import annotations
|
|
|
|
import re
|
|
from urllib.parse import urlparse
|
|
|
|
|
|
APP_ID_RE = re.compile(r"^[a-zA-Z0-9._+-]+$")
|
|
PACKAGE_NAME_RE = re.compile(r"^[a-zA-Z0-9._+-]+$")
|
|
SERVICE_NAME_RE = re.compile(r"^[a-zA-Z0-9._@+-]+\.service$")
|
|
VERSION_RE = re.compile(r"^[a-zA-Z0-9._:+~=-]+$")
|
|
SHA256_RE = re.compile(r"^[0-9a-fA-F]{64}$")
|
|
DOCKER_DIGEST_RE = re.compile(r"^sha256:[0-9a-fA-F]{64}$")
|
|
DOCKER_REF_RE = re.compile(r"^[a-z0-9][a-z0-9._:/@+-]{0,254}$")
|
|
DOCKER_TAG_RE = re.compile(r"^[a-zA-Z0-9_][a-zA-Z0-9_.-]{0,127}$")
|
|
CONTAINER_NAME_RE = re.compile(r"^[a-zA-Z0-9][a-zA-Z0-9_.-]{0,127}$")
|
|
ENV_NAME_RE = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_]*$")
|
|
PORT_MAPPING_RE = re.compile(r"^[a-zA-Z0-9_.:-]+(?:/(?:tcp|udp|sctp))?$")
|
|
VOLUME_MAPPING_RE = re.compile(r"^[^\s:]+:[^\s:]+(?::(?:ro|rw))?$")
|
|
|
|
|
|
def validate_app_id(value: str) -> str:
|
|
if not value or not APP_ID_RE.fullmatch(value):
|
|
raise ValueError("appId contains invalid characters")
|
|
return value
|
|
|
|
|
|
def validate_package_name(value: str) -> str:
|
|
if not value or not PACKAGE_NAME_RE.fullmatch(value):
|
|
raise ValueError("packageName contains invalid characters")
|
|
return value
|
|
|
|
|
|
def validate_service_name(value: str | None) -> str | None:
|
|
if value is None or value == "":
|
|
return None
|
|
if not SERVICE_NAME_RE.fullmatch(value):
|
|
raise ValueError("serviceName must be a systemd .service name")
|
|
return value
|
|
|
|
|
|
def validate_version(value: str) -> str:
|
|
if not value or not VERSION_RE.fullmatch(value):
|
|
raise ValueError("version contains invalid characters")
|
|
return value
|
|
|
|
|
|
def validate_sha256(value: str) -> str:
|
|
if not value or not SHA256_RE.fullmatch(value):
|
|
raise ValueError("sha256/checksum must be a 64 character hex digest")
|
|
return value.lower()
|
|
|
|
|
|
def validate_url_host(url: str, allowed_hosts: list[str]) -> str:
|
|
parsed = urlparse(url)
|
|
if parsed.scheme not in {"http", "https"}:
|
|
raise ValueError("download URL must use http or https")
|
|
if not parsed.hostname:
|
|
raise ValueError("download URL is missing a host")
|
|
if parsed.hostname not in set(allowed_hosts):
|
|
raise ValueError(f"download host is not allowed: {parsed.hostname}")
|
|
return url
|
|
|
|
|
|
def validate_docker_image(value: str) -> str:
|
|
image = value.strip()
|
|
if not image or not DOCKER_REF_RE.fullmatch(image):
|
|
raise ValueError("docker image contains invalid characters")
|
|
if "//" in image or image.endswith(("/", ":", "@")):
|
|
raise ValueError("docker image reference is malformed")
|
|
return image
|
|
|
|
|
|
def validate_docker_tag(value: str | None) -> str | None:
|
|
if value is None or value == "":
|
|
return None
|
|
tag = value.strip()
|
|
if not DOCKER_TAG_RE.fullmatch(tag):
|
|
raise ValueError("docker tag contains invalid characters")
|
|
return tag
|
|
|
|
|
|
def validate_docker_digest(value: str | None) -> str | None:
|
|
if value is None or value == "":
|
|
return None
|
|
digest = value.strip()
|
|
if not DOCKER_DIGEST_RE.fullmatch(digest):
|
|
raise ValueError("docker digest must be sha256:<64 hex characters>")
|
|
return digest.lower()
|
|
|
|
|
|
def validate_docker_registry(image: str, allowed_registries: list[str]) -> str:
|
|
reference = image.split("@", 1)[0]
|
|
parts = reference.split("/", 1)
|
|
first_part = parts[0]
|
|
has_explicit_registry = len(parts) > 1 and (
|
|
"." in first_part or ":" in first_part or first_part == "localhost"
|
|
)
|
|
registry = first_part if has_explicit_registry else "docker.io"
|
|
allowed = {item.strip() for item in allowed_registries if item.strip()}
|
|
if "*" not in allowed and registry not in allowed:
|
|
raise ValueError(f"docker registry is not allowed: {registry}")
|
|
return image
|
|
|
|
|
|
def validate_container_name(value: str) -> str:
|
|
name = value.strip()
|
|
if not name or not CONTAINER_NAME_RE.fullmatch(name):
|
|
raise ValueError("containerName contains invalid characters")
|
|
return name
|
|
|
|
|
|
def validate_restart_policy(value: str | None) -> str:
|
|
policy = (value or "unless-stopped").strip()
|
|
if policy not in {"no", "always", "unless-stopped", "on-failure"}:
|
|
raise ValueError("restartPolicy must be one of: no, always, unless-stopped, on-failure")
|
|
return policy
|
|
|
|
|
|
def validate_port_mapping(value: str) -> str:
|
|
mapping = value.strip()
|
|
if not mapping or not PORT_MAPPING_RE.fullmatch(mapping):
|
|
raise ValueError("docker port mapping contains invalid characters")
|
|
return mapping
|
|
|
|
|
|
def validate_volume_mapping(value: str) -> str:
|
|
mapping = value.strip()
|
|
if not mapping or not VOLUME_MAPPING_RE.fullmatch(mapping):
|
|
raise ValueError("docker volume mapping must be hostPath:containerPath[:ro|rw]")
|
|
return mapping
|
|
|
|
|
|
def validate_env_name(value: str) -> str:
|
|
name = value.strip()
|
|
if not ENV_NAME_RE.fullmatch(name):
|
|
raise ValueError("docker env names may contain only letters, numbers, and underscore")
|
|
return name
|