diff --git a/dask_kubernetes/operator/controller/controller.py b/dask_kubernetes/operator/controller/controller.py index e82f76c86..ffe38befb 100644 --- a/dask_kubernetes/operator/controller/controller.py +++ b/dask_kubernetes/operator/controller/controller.py @@ -87,6 +87,7 @@ def build_scheduler_service_spec(cluster_name, spec, annotations, labels): "dask.org/component": "scheduler", } ) + return { "apiVersion": "v1", "kind": "Service", @@ -229,6 +230,7 @@ async def startup(settings: kopf.OperatorSettings, **kwargs): settings.watching.server_timeout = 120 settings.watching.client_timeout = 150 settings.watching.connect_timeout = 5 + settings.admission.server = kopf.WebhookServer() # defaults to localhost:9443 # The default timeout is 300s which is usually to long # https://kopf.readthedocs.io/en/latest/configuration/#networking-timeouts @@ -242,6 +244,21 @@ def get_current_timestamp(**kwargs): return datetime.utcnow().isoformat() +@kopf.on.validate("daskcluster.kubernetes.dask.org") +async def daskcluster_validate_nodeport( + spec, settings: kopf.OperatorSettings, warnings, **kwargs +): + """Ensure that `nodePort` defined in DaskCluster resource is + within a valid range""" + + # Check if NodePort is out of range + for port in spec.get("ports", []): + if port.get("nodePort", None) and ( + port["nodePort"] < 30000 or port["nodePort"] > 32767 + ): + raise kopf.AdmissionError("nodePort must be between 30000 and 32767.") + + @kopf.on.create("daskcluster.kubernetes.dask.org") async def daskcluster_create(name, namespace, logger, patch, **kwargs): """When DaskCluster resource is created set the status.phase. diff --git a/dask_kubernetes/operator/controller/tests/test_controller.py b/dask_kubernetes/operator/controller/tests/test_controller.py index f0b1c676c..083fc5b78 100644 --- a/dask_kubernetes/operator/controller/tests/test_controller.py +++ b/dask_kubernetes/operator/controller/tests/test_controller.py @@ -9,6 +9,7 @@ import yaml from dask.distributed import Client +from dask_kubernetes.operator import KubeCluster, make_cluster_spec from dask_kubernetes.operator.controller import ( KUBERNETES_DATETIME_FORMAT, get_job_runner_pod_name, @@ -433,3 +434,29 @@ async def test_failed_job(k8s_cluster, kopf_runner, gen_job): assert "A DaskJob has been created" in runner.stdout assert "Job failed, deleting Dask cluster." in runner.stdout + + +def custom_nodeport_spec(port, name="foo", scheduler_service_type="NodePort"): + try: + port = int(port) + except ValueError: + raise ValueError(f"{port} is not a valid integer") + + spec = make_cluster_spec(name, scheduler_service_type) + spec["spec"]["scheduler"]["service"]["ports"][0]["nodePort"] = port + return spec + + +def test_nodeport_valid(kopf_runner): + with kopf_runner: + spec = custom_nodeport_spec("30007") + with KubeCluster(custom_cluster_spec=spec, n_workers=1) as cluster: + with Client(cluster) as client: + assert client.submit(lambda x: x + 1, 10).result() == 11 + + +def test_nodeport_out_of_range(kopf_runner): + with kopf_runner: + spec = custom_nodeport_spec("38967") + with pytest.raises(ValueError, match="NodePort out of range"): + KubeCluster(custom_cluster_spec=spec, n_workers=1)