From e4c22c1f46fe641eab54f1a26eff3807eb56824d Mon Sep 17 00:00:00 2001 From: Sheilah Kirui Date: Wed, 26 Apr 2023 00:20:53 -0700 Subject: [PATCH 01/15] create new pr, initial changes --- .../operator/kubecluster/kubecluster.py | 13 +++++++++++++ .../kubecluster/tests/test_kubecluster.py | 19 +++++++++++++++++++ 2 files changed, 32 insertions(+) diff --git a/dask_kubernetes/operator/kubecluster/kubecluster.py b/dask_kubernetes/operator/kubecluster/kubecluster.py index de8c8a07a..43381d827 100644 --- a/dask_kubernetes/operator/kubecluster/kubecluster.py +++ b/dask_kubernetes/operator/kubecluster/kubecluster.py @@ -230,6 +230,19 @@ def __init__( if isinstance(self.worker_command, str): self.worker_command = self.worker_command.split(" ") + if self.n_workers is not None and not isinstance(self.n_workers, int): + raise TypeError(f"n_workers must be an integer, got {type(self.n_workers)}") + + try: + # Validate input resources + assert "limits" in resources + assert isinstance(resources["limits"], dict) + assert "CPU" in resources["limits"] + assert isinstance(resources["limits"]["CPU"], str) + except AssertionError: + print("Invalid resource limits format") + raise + name = name.format( user=getpass.getuser(), uuid=str(uuid.uuid4())[:10], **os.environ ) diff --git a/dask_kubernetes/operator/kubecluster/tests/test_kubecluster.py b/dask_kubernetes/operator/kubecluster/tests/test_kubecluster.py index 8ba788e59..ab2be9996 100644 --- a/dask_kubernetes/operator/kubecluster/tests/test_kubecluster.py +++ b/dask_kubernetes/operator/kubecluster/tests/test_kubecluster.py @@ -153,3 +153,22 @@ def test_custom_spec(kopf_runner, docker_image): with KubeCluster(custom_cluster_spec=spec, n_workers=1) as cluster: with Client(cluster) as client: assert client.submit(lambda x: x + 1, 10).result() == 11 + + +def test_typo_resource_limits(kopf_runner): + with kopf_runner: + with pytest.raises(AssertionError): + KubeCluster( + name="foo", + resources={ + "limit": { # <-- Typo, should be `limits` + "CPU": "1", + }, + }, + ) + + +def test_for_integer_n_workers(kopf_runner): + with kopf_runner: + with pytest.raises(TypeError, match="n_workers must be an integer"): + KubeCluster(name="foo", n_workers="1") From ab5d686383a4ac90c8a7243ad8ad74bcbaf1a519 Mon Sep 17 00:00:00 2001 From: Sheilah Kirui Date: Wed, 26 Apr 2023 00:55:12 -0700 Subject: [PATCH 02/15] switch tests order --- .../operator/kubecluster/tests/test_kubecluster.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/dask_kubernetes/operator/kubecluster/tests/test_kubecluster.py b/dask_kubernetes/operator/kubecluster/tests/test_kubecluster.py index ab2be9996..aae7d4b73 100644 --- a/dask_kubernetes/operator/kubecluster/tests/test_kubecluster.py +++ b/dask_kubernetes/operator/kubecluster/tests/test_kubecluster.py @@ -155,6 +155,12 @@ def test_custom_spec(kopf_runner, docker_image): assert client.submit(lambda x: x + 1, 10).result() == 11 +def test_for_integer_n_workers(kopf_runner): + with kopf_runner: + with pytest.raises(TypeError, match="n_workers must be an integer"): + KubeCluster(name="foo", n_workers="1") + + def test_typo_resource_limits(kopf_runner): with kopf_runner: with pytest.raises(AssertionError): @@ -166,9 +172,3 @@ def test_typo_resource_limits(kopf_runner): }, }, ) - - -def test_for_integer_n_workers(kopf_runner): - with kopf_runner: - with pytest.raises(TypeError, match="n_workers must be an integer"): - KubeCluster(name="foo", n_workers="1") From 91238d9cf142fd2bed452cc6a2434e6bb46955c1 Mon Sep 17 00:00:00 2001 From: Sheilah Kirui Date: Thu, 27 Apr 2023 09:34:41 -0700 Subject: [PATCH 03/15] add checks for both and --- .../operator/kubecluster/kubecluster.py | 18 +++++++++++------- .../kubecluster/tests/test_kubecluster.py | 4 ++-- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/dask_kubernetes/operator/kubecluster/kubecluster.py b/dask_kubernetes/operator/kubecluster/kubecluster.py index 43381d827..ee7dc0600 100644 --- a/dask_kubernetes/operator/kubecluster/kubecluster.py +++ b/dask_kubernetes/operator/kubecluster/kubecluster.py @@ -234,13 +234,17 @@ def __init__( raise TypeError(f"n_workers must be an integer, got {type(self.n_workers)}") try: - # Validate input resources - assert "limits" in resources - assert isinstance(resources["limits"], dict) - assert "CPU" in resources["limits"] - assert isinstance(resources["limits"]["CPU"], str) - except AssertionError: - print("Invalid resource limits format") + # Validate `resources` param is a dictionary whose + # keys must either be 'limits' or 'requests' + assert isinstance(resources, dict) + + for field in ("limits", "requests"): + if field in resources: + assert isinstance(field, dict) + else: + print("fields must either be 'limits' or 'requests'") + except ValueError: + print("Invalid resources field format") raise name = name.format( diff --git a/dask_kubernetes/operator/kubecluster/tests/test_kubecluster.py b/dask_kubernetes/operator/kubecluster/tests/test_kubecluster.py index aae7d4b73..3a5668092 100644 --- a/dask_kubernetes/operator/kubecluster/tests/test_kubecluster.py +++ b/dask_kubernetes/operator/kubecluster/tests/test_kubecluster.py @@ -155,7 +155,7 @@ def test_custom_spec(kopf_runner, docker_image): assert client.submit(lambda x: x + 1, 10).result() == 11 -def test_for_integer_n_workers(kopf_runner): +def test_for_noninteger_n_workers(kopf_runner): with kopf_runner: with pytest.raises(TypeError, match="n_workers must be an integer"): KubeCluster(name="foo", n_workers="1") @@ -163,7 +163,7 @@ def test_for_integer_n_workers(kopf_runner): def test_typo_resource_limits(kopf_runner): with kopf_runner: - with pytest.raises(AssertionError): + with pytest.raises(ValueError): KubeCluster( name="foo", resources={ From d2568a649c2cf143c5c7630319092b370baf2e1a Mon Sep 17 00:00:00 2001 From: Sheilah Kirui Date: Thu, 27 Apr 2023 20:41:06 -0700 Subject: [PATCH 04/15] check if field are dict types --- .../operator/kubecluster/kubecluster.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/dask_kubernetes/operator/kubecluster/kubecluster.py b/dask_kubernetes/operator/kubecluster/kubecluster.py index ee7dc0600..3a02582ba 100644 --- a/dask_kubernetes/operator/kubecluster/kubecluster.py +++ b/dask_kubernetes/operator/kubecluster/kubecluster.py @@ -236,15 +236,15 @@ def __init__( try: # Validate `resources` param is a dictionary whose # keys must either be 'limits' or 'requests' - assert isinstance(resources, dict) + assert isinstance(self.resources, dict) for field in ("limits", "requests"): - if field in resources: - assert isinstance(field, dict) - else: - print("fields must either be 'limits' or 'requests'") - except ValueError: - print("Invalid resources field format") + if field in self.resources: + assert isinstance(self.resources[field], dict) + elif field not in self.resources: + raise ValueError(f"Unknown field '{field}' in resources") + except TypeError: + print("Invalid resources type") raise name = name.format( From 1a1cdd91a38272a6a4d829b544b10b27ed527dec Mon Sep 17 00:00:00 2001 From: Sheilah Kirui Date: Thu, 27 Apr 2023 22:11:03 -0700 Subject: [PATCH 05/15] edit the loop that checks dictionary fields --- dask_kubernetes/operator/kubecluster/kubecluster.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/dask_kubernetes/operator/kubecluster/kubecluster.py b/dask_kubernetes/operator/kubecluster/kubecluster.py index 3a02582ba..0603de2c4 100644 --- a/dask_kubernetes/operator/kubecluster/kubecluster.py +++ b/dask_kubernetes/operator/kubecluster/kubecluster.py @@ -238,13 +238,13 @@ def __init__( # keys must either be 'limits' or 'requests' assert isinstance(self.resources, dict) - for field in ("limits", "requests"): - if field in self.resources: + for field in self.resources: + if field in ("limits", "requests"): assert isinstance(self.resources[field], dict) - elif field not in self.resources: + else: raise ValueError(f"Unknown field '{field}' in resources") except TypeError: - print("Invalid resources type") + print(f"resources must be dict, got '{type(resources)}' type") raise name = name.format( From 5a2d7a77423ef56acb11b5b2e433dd4fdac32552 Mon Sep 17 00:00:00 2001 From: Sheilah Kirui Date: Fri, 28 Apr 2023 01:11:34 -0700 Subject: [PATCH 06/15] fix failing tests --- dask_kubernetes/operator/kubecluster/kubecluster.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_kubernetes/operator/kubecluster/kubecluster.py b/dask_kubernetes/operator/kubecluster/kubecluster.py index 0603de2c4..2e469dcbf 100644 --- a/dask_kubernetes/operator/kubecluster/kubecluster.py +++ b/dask_kubernetes/operator/kubecluster/kubecluster.py @@ -244,7 +244,7 @@ def __init__( else: raise ValueError(f"Unknown field '{field}' in resources") except TypeError: - print(f"resources must be dict, got '{type(resources)}' type") + print(f"invalid '{type(resources)}' for resources type") raise name = name.format( From 046d6ec5bacd6aba69b38f36aa54a81ceeb35ced Mon Sep 17 00:00:00 2001 From: Sheilah Kirui Date: Fri, 28 Apr 2023 13:22:11 -0700 Subject: [PATCH 07/15] explicitly set error messaege from exception --- dask_kubernetes/operator/kubecluster/kubecluster.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/dask_kubernetes/operator/kubecluster/kubecluster.py b/dask_kubernetes/operator/kubecluster/kubecluster.py index 2e469dcbf..0c7b7b02b 100644 --- a/dask_kubernetes/operator/kubecluster/kubecluster.py +++ b/dask_kubernetes/operator/kubecluster/kubecluster.py @@ -243,9 +243,8 @@ def __init__( assert isinstance(self.resources[field], dict) else: raise ValueError(f"Unknown field '{field}' in resources") - except TypeError: - print(f"invalid '{type(resources)}' for resources type") - raise + except TypeError as e: + raise TypeError(f"invalid '{type(resources)}' for resources type") from e name = name.format( user=getpass.getuser(), uuid=str(uuid.uuid4())[:10], **os.environ From 61e0c0be5db0278611d6c8e6b41a2ad13f6267f8 Mon Sep 17 00:00:00 2001 From: Sheilah Kirui Date: Tue, 2 May 2023 13:24:40 -0700 Subject: [PATCH 08/15] add validation check for node port value --- dask_kubernetes/operator/controller/controller.py | 9 +++++++++ .../operator/controller/tests/test_controller.py | 4 ++++ 2 files changed, 13 insertions(+) diff --git a/dask_kubernetes/operator/controller/controller.py b/dask_kubernetes/operator/controller/controller.py index e82f76c86..cddc4a091 100644 --- a/dask_kubernetes/operator/controller/controller.py +++ b/dask_kubernetes/operator/controller/controller.py @@ -87,6 +87,15 @@ def build_scheduler_service_spec(cluster_name, spec, annotations, labels): "dask.org/component": "scheduler", } ) + + if spec.get("type") == "NodePort": + try: + for port in spec["ports"]: + node_port = port.get("nodePort") + assert node_port in range(30000, 32768) + except ValueError as e: + raise ValueError("Invalid nodePort value") from e + return { "apiVersion": "v1", "kind": "Service", diff --git a/dask_kubernetes/operator/controller/tests/test_controller.py b/dask_kubernetes/operator/controller/tests/test_controller.py index f0b1c676c..7b121afc0 100644 --- a/dask_kubernetes/operator/controller/tests/test_controller.py +++ b/dask_kubernetes/operator/controller/tests/test_controller.py @@ -433,3 +433,7 @@ async def test_failed_job(k8s_cluster, kopf_runner, gen_job): assert "A DaskJob has been created" in runner.stdout assert "Job failed, deleting Dask cluster." in runner.stdout + + +# def test_node_port_out_of_range(): +# with kopf_runner as runner: From 67788eb11f9e1045c9ec84c1398e632d382d396f Mon Sep 17 00:00:00 2001 From: Sheilah Kirui Date: Tue, 2 May 2023 22:53:22 -0700 Subject: [PATCH 09/15] check all ports in spec --- .../operator/controller/controller.py | 16 +++++++++------- .../operator/controller/tests/test_controller.py | 4 ---- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/dask_kubernetes/operator/controller/controller.py b/dask_kubernetes/operator/controller/controller.py index cddc4a091..e0a62707c 100644 --- a/dask_kubernetes/operator/controller/controller.py +++ b/dask_kubernetes/operator/controller/controller.py @@ -88,13 +88,15 @@ def build_scheduler_service_spec(cluster_name, spec, annotations, labels): } ) - if spec.get("type") == "NodePort": - try: - for port in spec["ports"]: - node_port = port.get("nodePort") - assert node_port in range(30000, 32768) - except ValueError as e: - raise ValueError("Invalid nodePort value") from e + # Check if NodePort is out of range + try: + for port in spec.get("ports", []): + if port.get("nodePort", None) and ( + port["nodePort"] < 30000 or port["nodePort"] > 32767 + ): + raise ValueError("NodePort out of range") + except ValueError as e: + raise ValueError("Invalid nodePort value") from e return { "apiVersion": "v1", diff --git a/dask_kubernetes/operator/controller/tests/test_controller.py b/dask_kubernetes/operator/controller/tests/test_controller.py index 7b121afc0..f0b1c676c 100644 --- a/dask_kubernetes/operator/controller/tests/test_controller.py +++ b/dask_kubernetes/operator/controller/tests/test_controller.py @@ -433,7 +433,3 @@ async def test_failed_job(k8s_cluster, kopf_runner, gen_job): assert "A DaskJob has been created" in runner.stdout assert "Job failed, deleting Dask cluster." in runner.stdout - - -# def test_node_port_out_of_range(): -# with kopf_runner as runner: From 719347a7145ca79aa2afeddd56161f9ff91c9f47 Mon Sep 17 00:00:00 2001 From: Sheilah Kirui <71867292+skirui-source@users.noreply.github.com> Date: Wed, 3 May 2023 09:19:43 -0700 Subject: [PATCH 10/15] Update dask_kubernetes/operator/controller/controller.py Co-authored-by: Jacob Tomlinson --- dask_kubernetes/operator/controller/controller.py | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/dask_kubernetes/operator/controller/controller.py b/dask_kubernetes/operator/controller/controller.py index e0a62707c..d296b642a 100644 --- a/dask_kubernetes/operator/controller/controller.py +++ b/dask_kubernetes/operator/controller/controller.py @@ -89,14 +89,11 @@ def build_scheduler_service_spec(cluster_name, spec, annotations, labels): ) # Check if NodePort is out of range - try: - for port in spec.get("ports", []): - if port.get("nodePort", None) and ( - port["nodePort"] < 30000 or port["nodePort"] > 32767 - ): - raise ValueError("NodePort out of range") - except ValueError as e: - raise ValueError("Invalid nodePort value") from e + for port in spec.get("ports", []): + if port.get("nodePort", None) and ( + port["nodePort"] < 30000 or port["nodePort"] > 32767 + ): + raise ValueError("NodePort out of range") return { "apiVersion": "v1", From 4911a17976e4cb519d14ee4876094ad3cfe71412 Mon Sep 17 00:00:00 2001 From: Sheilah Kirui Date: Wed, 3 May 2023 12:18:14 -0700 Subject: [PATCH 11/15] add test- noeport out of range --- .../operator/controller/tests/test_controller.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/dask_kubernetes/operator/controller/tests/test_controller.py b/dask_kubernetes/operator/controller/tests/test_controller.py index f0b1c676c..5763420c2 100644 --- a/dask_kubernetes/operator/controller/tests/test_controller.py +++ b/dask_kubernetes/operator/controller/tests/test_controller.py @@ -9,6 +9,7 @@ import yaml from dask.distributed import Client +from dask_kubernetes.operator import KubeCluster, make_cluster_spec from dask_kubernetes.operator.controller import ( KUBERNETES_DATETIME_FORMAT, get_job_runner_pod_name, @@ -433,3 +434,13 @@ async def test_failed_job(k8s_cluster, kopf_runner, gen_job): assert "A DaskJob has been created" in runner.stdout assert "Job failed, deleting Dask cluster." in runner.stdout + + +def test_node_port_out_of_range(kopf_runner): + + spec = make_cluster_spec(name="foo", scheduler_service_type="NodePort") + spec["spec"]["scheduler"]["service"]["ports"][0]["nodePort"] = "38967" + + with kopf_runner: + with pytest.raises(ValueError): + KubeCluster(custom_cluster_spec=spec, n_workers=1) From b62caa91e04b0bf04dad34b991d622b33c3ce8f5 Mon Sep 17 00:00:00 2001 From: Sheilah Kirui Date: Wed, 3 May 2023 12:53:05 -0700 Subject: [PATCH 12/15] add test for valid nodeport case --- .../controller/tests/test_controller.py | 24 +++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/dask_kubernetes/operator/controller/tests/test_controller.py b/dask_kubernetes/operator/controller/tests/test_controller.py index 5763420c2..1adccfedc 100644 --- a/dask_kubernetes/operator/controller/tests/test_controller.py +++ b/dask_kubernetes/operator/controller/tests/test_controller.py @@ -436,11 +436,27 @@ async def test_failed_job(k8s_cluster, kopf_runner, gen_job): assert "Job failed, deleting Dask cluster." in runner.stdout -def test_node_port_out_of_range(kopf_runner): +def custom_nodeport_spec(port, name="foo", scheduler_service_type="NodePort"): + try: + port = int(port) + except ValueError as e: + raise ValueError(f"'{port}' is not a valid integer") from e - spec = make_cluster_spec(name="foo", scheduler_service_type="NodePort") - spec["spec"]["scheduler"]["service"]["ports"][0]["nodePort"] = "38967" + spec = make_cluster_spec(name, scheduler_service_type) + spec["spec"]["scheduler"]["service"]["ports"][0]["nodePort"] = port + return spec + +def test_nodeport_valid(kopf_runner): + with kopf_runner: + spec = custom_nodeport_spec("30007") + with KubeCluster(custom_cluster_spec=spec, n_workers=1) as cluster: + with Client(cluster) as client: + assert client.submit(lambda x: x + 1, 10).result() == 11 + + +def test_nodeport_out_of_range(kopf_runner): with kopf_runner: - with pytest.raises(ValueError): + spec = custom_nodeport_spec("38967") + with pytest.raises(ValueError, match="NodePort out of range"): KubeCluster(custom_cluster_spec=spec, n_workers=1) From 6ba10428345d4045d4d1063a7eea3fd5a985ef9e Mon Sep 17 00:00:00 2001 From: Sheilah Kirui Date: Wed, 3 May 2023 12:54:43 -0700 Subject: [PATCH 13/15] minor fix, exception handling --- dask_kubernetes/operator/controller/tests/test_controller.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dask_kubernetes/operator/controller/tests/test_controller.py b/dask_kubernetes/operator/controller/tests/test_controller.py index 1adccfedc..083fc5b78 100644 --- a/dask_kubernetes/operator/controller/tests/test_controller.py +++ b/dask_kubernetes/operator/controller/tests/test_controller.py @@ -439,8 +439,8 @@ async def test_failed_job(k8s_cluster, kopf_runner, gen_job): def custom_nodeport_spec(port, name="foo", scheduler_service_type="NodePort"): try: port = int(port) - except ValueError as e: - raise ValueError(f"'{port}' is not a valid integer") from e + except ValueError: + raise ValueError(f"{port} is not a valid integer") spec = make_cluster_spec(name, scheduler_service_type) spec["spec"]["scheduler"]["service"]["ports"][0]["nodePort"] = port From f42cdade949ea92a6337c11f71a6719ab3da1f56 Mon Sep 17 00:00:00 2001 From: Sheilah Kirui Date: Thu, 4 May 2023 01:20:45 -0700 Subject: [PATCH 14/15] add validation hook at resource creation time --- .../operator/controller/controller.py | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/dask_kubernetes/operator/controller/controller.py b/dask_kubernetes/operator/controller/controller.py index d296b642a..8d1ca2563 100644 --- a/dask_kubernetes/operator/controller/controller.py +++ b/dask_kubernetes/operator/controller/controller.py @@ -88,13 +88,6 @@ def build_scheduler_service_spec(cluster_name, spec, annotations, labels): } ) - # Check if NodePort is out of range - for port in spec.get("ports", []): - if port.get("nodePort", None) and ( - port["nodePort"] < 30000 or port["nodePort"] > 32767 - ): - raise ValueError("NodePort out of range") - return { "apiVersion": "v1", "kind": "Service", @@ -250,6 +243,19 @@ def get_current_timestamp(**kwargs): return datetime.utcnow().isoformat() +@kopf.on.validate("daskcluster.kubernetes.dask.org") +async def daskcluster_validate_nodeport(spec, warnings, **kwargs): + """Ensure that `nodePort` defined in DaskCluster resource is + within a valid range""" + + # Check if NodePort is out of range + for port in spec.get("ports", []): + if port.get("nodePort", None) and ( + port["nodePort"] < 30000 or port["nodePort"] > 32767 + ): + raise kopf.AdmissionError("nodePort must be between 30000 and 32767.") + + @kopf.on.create("daskcluster.kubernetes.dask.org") async def daskcluster_create(name, namespace, logger, patch, **kwargs): """When DaskCluster resource is created set the status.phase. From 06e515c347b141e59d7b76d4c3d4bbf24f4b5244 Mon Sep 17 00:00:00 2001 From: Sheilah Kirui Date: Thu, 4 May 2023 05:37:51 -0700 Subject: [PATCH 15/15] confgure admission handler tunnel/server --- dask_kubernetes/operator/controller/controller.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/dask_kubernetes/operator/controller/controller.py b/dask_kubernetes/operator/controller/controller.py index 8d1ca2563..ffe38befb 100644 --- a/dask_kubernetes/operator/controller/controller.py +++ b/dask_kubernetes/operator/controller/controller.py @@ -230,6 +230,7 @@ async def startup(settings: kopf.OperatorSettings, **kwargs): settings.watching.server_timeout = 120 settings.watching.client_timeout = 150 settings.watching.connect_timeout = 5 + settings.admission.server = kopf.WebhookServer() # defaults to localhost:9443 # The default timeout is 300s which is usually to long # https://kopf.readthedocs.io/en/latest/configuration/#networking-timeouts @@ -244,7 +245,9 @@ def get_current_timestamp(**kwargs): @kopf.on.validate("daskcluster.kubernetes.dask.org") -async def daskcluster_validate_nodeport(spec, warnings, **kwargs): +async def daskcluster_validate_nodeport( + spec, settings: kopf.OperatorSettings, warnings, **kwargs +): """Ensure that `nodePort` defined in DaskCluster resource is within a valid range"""