From 3e3e2456ec88244aef932d79321a1fdc66fe9001 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Mon, 30 May 2022 14:26:21 +0100 Subject: [PATCH 01/10] Release 2022.5.1 From 55277914be85af3c4e46d1b4670624b4e8af7912 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Mon, 30 May 2022 16:47:23 +0100 Subject: [PATCH 02/10] Small fixes to get things working on Kubeflow --- dask_kubernetes/experimental/kubecluster.py | 6 +++--- .../deployment/manifests/operator.yaml | 16 ++++++++++++---- doc/source/operator_installation.rst | 19 +++++++++++++++++-- doc/source/operator_resources.rst | 8 +++++--- 4 files changed, 37 insertions(+), 12 deletions(-) diff --git a/dask_kubernetes/experimental/kubecluster.py b/dask_kubernetes/experimental/kubecluster.py index 9bc463cd1..8848fa31b 100644 --- a/dask_kubernetes/experimental/kubecluster.py +++ b/dask_kubernetes/experimental/kubecluster.py @@ -20,6 +20,7 @@ ) from dask_kubernetes.common.auth import ClusterAuth +from dask_kubernetes.common.utils import namespace_default from dask_kubernetes.operator import ( build_cluster_spec, wait_for_service, @@ -121,7 +122,7 @@ class KubeCluster(Cluster): def __init__( self, name, - namespace="default", + namespace=None, image="ghcr.io/dask/dask:latest", n_workers=3, resources={}, @@ -133,8 +134,7 @@ def __init__( **kwargs, ): self.name = name - # TODO: Set namespace to None and get default namespace from user's context - self.namespace = namespace + self.namespace = namespace or namespace_default() self.image = image self.n_workers = n_workers self.resources = resources diff --git a/dask_kubernetes/operator/deployment/manifests/operator.yaml b/dask_kubernetes/operator/deployment/manifests/operator.yaml index 1c54a0ede..73cf5c92b 100644 --- a/dask_kubernetes/operator/deployment/manifests/operator.yaml +++ b/dask_kubernetes/operator/deployment/manifests/operator.yaml @@ -1,7 +1,15 @@ +apiVersion: v1 +kind: Namespace +metadata: + labels: + kubernetes.io/metadata.name: dask-operator + name: dask-operator + name: dask-operator +--- apiVersion: apps/v1 kind: Deployment metadata: - namespace: "kube-system" + namespace: dask-operator name: dask-kubernetes-operator spec: replicas: 1 @@ -24,7 +32,7 @@ spec: apiVersion: v1 kind: ServiceAccount metadata: - namespace: "kube-system" + namespace: dask-operator name: dask-kubernetes-operator --- apiVersion: rbac.authorization.k8s.io/v1 @@ -61,7 +69,7 @@ rules: - apiGroups: [""] resources: [pods] verbs: [create, delete, get, watch, list] - + - apiGroups: [""] resources: [services] verbs: [create, delete, get, watch, list] @@ -77,4 +85,4 @@ roleRef: subjects: - kind: ServiceAccount name: dask-kubernetes-operator - namespace: "kube-system" \ No newline at end of file + namespace: dask-operator diff --git a/doc/source/operator_installation.rst b/doc/source/operator_installation.rst index d07df9296..a9b5fcadd 100644 --- a/doc/source/operator_installation.rst +++ b/doc/source/operator_installation.rst @@ -34,8 +34,8 @@ This will create the appropriate roles, service accounts and a deployment for th .. code-block:: console $ kubectl get pods -A -l application=dask-kubernetes-operator - NAMESPACE NAME READY STATUS RESTARTS AGE - kube-system dask-kubernetes-operator-775b8bbbd5-zdrf7 1/1 Running 0 74s + NAMESPACE NAME READY STATUS RESTARTS AGE + dask-operator dask-kubernetes-operator-775b8bbbd5-zdrf7 1/1 Running 0 74s Installing with Helm @@ -55,3 +55,18 @@ This will install the custom resource definitions, service account, roles, and t .. warning:: Please note that `Helm does not support updating or deleting CRDs. `_ If updates are made to the CRD templates in future releases (to support future k8s releases, for example) you may have to manually update the CRDs. + +Kubeflow +-------- + +In order to use the Dask Operator with `Kubeflow `_ you need to perform some extra installation steps. + +User permissions +^^^^^^^^^^^^^^^^ + +Kubeflow doesn't know anything about our Dask custom resource definitions so we need to update the ``kubeflow-kubernetes-edit`` cluster role. This role +allows users with cluster edit permissions to create pods, jobs and other resources and we need to add the Dask custom resources to that list. + +.. code-block:: console + + $ kubectl patch clusterrole kubeflow-kubernetes-edit --patch '{"rules": [{"apiGroups": ["kubernetes.dask.org"],"resources": ["*"],"verbs": ["*"]}]}' diff --git a/doc/source/operator_resources.rst b/doc/source/operator_resources.rst index b8785c9ad..e6cb9ddb3 100644 --- a/doc/source/operator_resources.rst +++ b/doc/source/operator_resources.rst @@ -256,18 +256,20 @@ Let's create an example called ``highmemworkers.yaml`` with the following config imagePullPolicy: "IfNotPresent" resources: requests: - memory: "2Gi" + memory: "32Gi" limits: memory: "32Gi" args: - dask-worker - --name - $(DASK_WORKER_NAME) + - --resources + - MEMORY=32e9 The main thing we need to ensure is that the ``cluster`` option matches the name of the cluster we created earlier. This will cause the workers to join that cluster. -See the Configuration Reference :ref:`config`. Now apply ``highmemworkers.yaml`` +See the :ref:`config`. Now apply ``highmemworkers.yaml`` .. code-block:: console @@ -414,7 +416,7 @@ Let's create an example called ``job.yaml`` with the following configuration: targetPort: "dashboard" -Editing this file will change the default configuration of you Dask job. See the Configuration Reference :ref:`config`. Now apply ``job.yaml`` +Editing this file will change the default configuration of you Dask job. See the :ref:`config`. Now apply ``job.yaml`` .. code-block:: console From 1e76ea630c84d423ded37ac81ecbf8a7dcaf0496 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Mon, 30 May 2022 16:58:15 +0100 Subject: [PATCH 03/10] Scheduler listen on all addresses --- dask_kubernetes/experimental/kubecluster.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/dask_kubernetes/experimental/kubecluster.py b/dask_kubernetes/experimental/kubecluster.py index 8848fa31b..cdb4d68f4 100644 --- a/dask_kubernetes/experimental/kubecluster.py +++ b/dask_kubernetes/experimental/kubecluster.py @@ -465,9 +465,7 @@ def _build_scheduler_spec(self, cluster_name): { "name": "scheduler", "image": self.image, - "args": [ - "dask-scheduler", - ], + "args": ["dask-scheduler", "--host", "0.0.0.0"], "env": env, "resources": self.resources, "ports": [ From 2f15573f8525494acf3b7540aa8898b7f4106856 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Mon, 30 May 2022 17:11:02 +0100 Subject: [PATCH 04/10] Switch scheduler probes to httpGet to avoid TCP handshake errors cluttering logs --- dask_kubernetes/experimental/kubecluster.py | 4 +-- .../tests/resources/simplecluster.yaml | 10 ++++--- .../operator/tests/resources/simplejob.yaml | 10 ++++--- doc/source/operator_resources.rst | 28 +++++++++++-------- 4 files changed, 30 insertions(+), 22 deletions(-) diff --git a/dask_kubernetes/experimental/kubecluster.py b/dask_kubernetes/experimental/kubecluster.py index cdb4d68f4..0b86e3c7c 100644 --- a/dask_kubernetes/experimental/kubecluster.py +++ b/dask_kubernetes/experimental/kubecluster.py @@ -481,12 +481,12 @@ def _build_scheduler_spec(self, cluster_name): }, ], "readinessProbe": { - "tcpSocket": {"port": "comm"}, + "httpGet": {"port": "dashboard", "path": "/health"}, "initialDelaySeconds": 5, "periodSeconds": 10, }, "livenessProbe": { - "tcpSocket": {"port": "comm"}, + "httpGet": {"port": "dashboard", "path": "/health"}, "initialDelaySeconds": 15, "periodSeconds": 20, }, diff --git a/dask_kubernetes/operator/tests/resources/simplecluster.yaml b/dask_kubernetes/operator/tests/resources/simplecluster.yaml index 674f51174..1841ad5d4 100644 --- a/dask_kubernetes/operator/tests/resources/simplecluster.yaml +++ b/dask_kubernetes/operator/tests/resources/simplecluster.yaml @@ -34,13 +34,15 @@ spec: containerPort: 8787 protocol: TCP readinessProbe: - tcpSocket: - port: comm + httpGet: + port: dashboard + path: /health initialDelaySeconds: 5 periodSeconds: 10 livenessProbe: - tcpSocket: - port: comm + httpGet: + port: dashboard + path: /health initialDelaySeconds: 15 periodSeconds: 20 env: diff --git a/dask_kubernetes/operator/tests/resources/simplejob.yaml b/dask_kubernetes/operator/tests/resources/simplejob.yaml index ee0636ec2..ffc225fb8 100644 --- a/dask_kubernetes/operator/tests/resources/simplejob.yaml +++ b/dask_kubernetes/operator/tests/resources/simplejob.yaml @@ -47,13 +47,15 @@ spec: containerPort: 8787 protocol: TCP readinessProbe: - tcpSocket: - port: comm + httpGet: + port: dashboard + path: /health initialDelaySeconds: 5 periodSeconds: 10 livenessProbe: - tcpSocket: - port: comm + httpGet: + port: dashboard + path: /health initialDelaySeconds: 15 periodSeconds: 20 env: diff --git a/doc/source/operator_resources.rst b/doc/source/operator_resources.rst index e6cb9ddb3..d413405ca 100644 --- a/doc/source/operator_resources.rst +++ b/doc/source/operator_resources.rst @@ -83,15 +83,17 @@ Let's create an example called ``cluster.yaml`` with the following configuration containerPort: 8787 protocol: TCP readinessProbe: - tcpSocket: - port: comm - initialDelaySeconds: 5 - periodSeconds: 10 + httpGet: + port: dashboard + path: /health + initialDelaySeconds: 5 + periodSeconds: 10 livenessProbe: - tcpSocket: - port: comm - initialDelaySeconds: 15 - periodSeconds: 20 + httpGet: + port: dashboard + path: /health + initialDelaySeconds: 15 + periodSeconds: 20 service: type: NodePort selector: @@ -388,13 +390,15 @@ Let's create an example called ``job.yaml`` with the following configuration: containerPort: 8787 protocol: TCP readinessProbe: - tcpSocket: - port: comm + httpGet: + port: dashboard + path: /health initialDelaySeconds: 5 periodSeconds: 10 livenessProbe: - tcpSocket: - port: comm + httpGet: + port: dashboard + path: /health initialDelaySeconds: 15 periodSeconds: 20 env: From 7056cfab2902701a2e402a36a1de407bfcb5effb Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Mon, 30 May 2022 17:33:28 +0100 Subject: [PATCH 05/10] Rename comm and dashboard ports to tcp-comm and http-dashboard --- dask_kubernetes/common/networking.py | 4 +-- dask_kubernetes/experimental/kubecluster.py | 16 +++++----- dask_kubernetes/kubernetes.yaml | 4 +-- dask_kubernetes/operator/operator.py | 2 +- .../tests/resources/simplecluster.yaml | 16 +++++----- .../operator/tests/resources/simplejob.yaml | 16 +++++----- doc/source/operator_resources.rst | 32 +++++++++---------- 7 files changed, 45 insertions(+), 45 deletions(-) diff --git a/dask_kubernetes/common/networking.py b/dask_kubernetes/common/networking.py index 9008856c4..f1810ad51 100644 --- a/dask_kubernetes/common/networking.py +++ b/dask_kubernetes/common/networking.py @@ -15,7 +15,7 @@ async def get_external_address_for_scheduler_service( service, port_forward_cluster_ip=None, service_name_resolution_retries=20, - port_name="comm", + port_name="tcp-comm", ): """Take a service object and return the scheduler address.""" [port] = [ @@ -108,7 +108,7 @@ async def port_forward_dashboard(service_name, namespace): return port -async def get_scheduler_address(service_name, namespace, port_name="comm"): +async def get_scheduler_address(service_name, namespace, port_name="tcp-comm"): async with kubernetes.client.api_client.ApiClient() as api_client: api = kubernetes.client.CoreV1Api(api_client) service = await api.read_namespaced_service(service_name, namespace) diff --git a/dask_kubernetes/experimental/kubecluster.py b/dask_kubernetes/experimental/kubecluster.py index 0b86e3c7c..671e5dc5f 100644 --- a/dask_kubernetes/experimental/kubecluster.py +++ b/dask_kubernetes/experimental/kubecluster.py @@ -470,23 +470,23 @@ def _build_scheduler_spec(self, cluster_name): "resources": self.resources, "ports": [ { - "name": "comm", + "name": "tcp-comm", "containerPort": 8786, "protocol": "TCP", }, { - "name": "dashboard", + "name": "http-dashboard", "containerPort": 8787, "protocol": "TCP", }, ], "readinessProbe": { - "httpGet": {"port": "dashboard", "path": "/health"}, + "httpGet": {"port": "http-dashboard", "path": "/health"}, "initialDelaySeconds": 5, "periodSeconds": 10, }, "livenessProbe": { - "httpGet": {"port": "dashboard", "path": "/health"}, + "httpGet": {"port": "http-dashboard", "path": "/health"}, "initialDelaySeconds": 15, "periodSeconds": 20, }, @@ -501,16 +501,16 @@ def _build_scheduler_spec(self, cluster_name): }, "ports": [ { - "name": "comm", + "name": "tcp-comm", "protocol": "TCP", "port": 8786, - "targetPort": "comm", + "targetPort": "tcp-comm", }, { - "name": "dashboard", + "name": "http-dashboard", "protocol": "TCP", "port": 8787, - "targetPort": "dashboard", + "targetPort": "http-dashboard", }, ], }, diff --git a/dask_kubernetes/kubernetes.yaml b/dask_kubernetes/kubernetes.yaml index 4ab27f9ec..b0bc374ac 100644 --- a/dask_kubernetes/kubernetes.yaml +++ b/dask_kubernetes/kubernetes.yaml @@ -30,11 +30,11 @@ kubernetes: dask.org/cluster-name: "" # Cluster name will be added automatically dask.org/component: scheduler ports: - - name: comm + - name: tcp-comm protocol: TCP port: 8786 targetPort: 8786 - - name: dashboard + - name: http-dashboard protocol: TCP port: 8787 targetPort: 8787 diff --git a/dask_kubernetes/operator/operator.py b/dask_kubernetes/operator/operator.py index e53cb18a7..826b67829 100644 --- a/dask_kubernetes/operator/operator.py +++ b/dask_kubernetes/operator/operator.py @@ -234,7 +234,7 @@ async def retire_workers( dashboard_address = await get_scheduler_address( scheduler_service_name, namespace, - port_name="dashboard", + port_name="http-dashboard", ) async with aiohttp.ClientSession() as session: url = f"{dashboard_address}/api/v1/retire_workers" diff --git a/dask_kubernetes/operator/tests/resources/simplecluster.yaml b/dask_kubernetes/operator/tests/resources/simplecluster.yaml index 1841ad5d4..ee52c0eab 100644 --- a/dask_kubernetes/operator/tests/resources/simplecluster.yaml +++ b/dask_kubernetes/operator/tests/resources/simplecluster.yaml @@ -27,21 +27,21 @@ spec: args: - dask-scheduler ports: - - name: comm + - name: tcp-comm containerPort: 8786 protocol: TCP - - name: dashboard + - name: http-dashboard containerPort: 8787 protocol: TCP readinessProbe: httpGet: - port: dashboard + port: http-dashboard path: /health initialDelaySeconds: 5 periodSeconds: 10 livenessProbe: httpGet: - port: dashboard + port: http-dashboard path: /health initialDelaySeconds: 15 periodSeconds: 20 @@ -54,11 +54,11 @@ spec: dask.org/cluster-name: simple-cluster dask.org/component: scheduler ports: - - name: comm + - name: tcp-comm protocol: TCP port: 8786 - targetPort: "comm" - - name: dashboard + targetPort: "tcp-comm" + - name: http-dashboard protocol: TCP port: 8787 - targetPort: "dashboard" + targetPort: "http-dashboard" diff --git a/dask_kubernetes/operator/tests/resources/simplejob.yaml b/dask_kubernetes/operator/tests/resources/simplejob.yaml index ffc225fb8..e931df292 100644 --- a/dask_kubernetes/operator/tests/resources/simplejob.yaml +++ b/dask_kubernetes/operator/tests/resources/simplejob.yaml @@ -40,21 +40,21 @@ spec: args: - dask-scheduler ports: - - name: comm + - name: tcp-comm containerPort: 8786 protocol: TCP - - name: dashboard + - name: http-dashboard containerPort: 8787 protocol: TCP readinessProbe: httpGet: - port: dashboard + port: http-dashboard path: /health initialDelaySeconds: 5 periodSeconds: 10 livenessProbe: httpGet: - port: dashboard + port: http-dashboard path: /health initialDelaySeconds: 15 periodSeconds: 20 @@ -67,11 +67,11 @@ spec: dask.org/cluster-name: simple-job-cluster dask.org/component: scheduler ports: - - name: comm + - name: tcp-comm protocol: TCP port: 8786 - targetPort: "comm" - - name: dashboard + targetPort: "tcp-comm" + - name: http-dashboard protocol: TCP port: 8787 - targetPort: "dashboard" + targetPort: "http-dashboard" diff --git a/doc/source/operator_resources.rst b/doc/source/operator_resources.rst index d413405ca..adf3ab064 100644 --- a/doc/source/operator_resources.rst +++ b/doc/source/operator_resources.rst @@ -76,21 +76,21 @@ Let's create an example called ``cluster.yaml`` with the following configuration args: - dask-scheduler ports: - - name: comm + - name: tcp-comm containerPort: 8786 protocol: TCP - - name: dashboard + - name: http-dashboard containerPort: 8787 protocol: TCP readinessProbe: httpGet: - port: dashboard + port: http-dashboard path: /health initialDelaySeconds: 5 periodSeconds: 10 livenessProbe: httpGet: - port: dashboard + port: http-dashboard path: /health initialDelaySeconds: 15 periodSeconds: 20 @@ -100,14 +100,14 @@ Let's create an example called ``cluster.yaml`` with the following configuration dask.org/cluster-name: simple-cluster dask.org/component: scheduler ports: - - name: comm + - name: tcp-comm protocol: TCP port: 8786 - targetPort: "comm" - - name: dashboard + targetPort: "tcp-comm" + - name: http-dashboard protocol: TCP port: 8787 - targetPort: "dashboard" + targetPort: "http-dashboard" Editing this file will change the default configuration of you Dask cluster. See the Configuration Reference :ref:`config`. Now apply ``cluster.yaml`` @@ -383,21 +383,21 @@ Let's create an example called ``job.yaml`` with the following configuration: args: - dask-scheduler ports: - - name: comm + - name: tcp-comm containerPort: 8786 protocol: TCP - - name: dashboard + - name: http-dashboard containerPort: 8787 protocol: TCP readinessProbe: httpGet: - port: dashboard + port: http-dashboard path: /health initialDelaySeconds: 5 periodSeconds: 10 livenessProbe: httpGet: - port: dashboard + port: http-dashboard path: /health initialDelaySeconds: 15 periodSeconds: 20 @@ -410,14 +410,14 @@ Let's create an example called ``job.yaml`` with the following configuration: dask.org/cluster-name: simple-job-cluster dask.org/component: scheduler ports: - - name: comm + - name: tcp-comm protocol: TCP port: 8786 - targetPort: "comm" - - name: dashboard + targetPort: "tcp-comm" + - name: http-dashboard protocol: TCP port: 8787 - targetPort: "dashboard" + targetPort: "http-dashboard" Editing this file will change the default configuration of you Dask job. See the :ref:`config`. Now apply ``job.yaml`` From 86e67f389a744e055f3ca5fdd0f3def8af3d55d4 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Mon, 30 May 2022 18:16:26 +0100 Subject: [PATCH 06/10] Wait for scheduler to be Ready instead of Running --- dask_kubernetes/common/networking.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/dask_kubernetes/common/networking.py b/dask_kubernetes/common/networking.py index f1810ad51..5e64d1abb 100644 --- a/dask_kubernetes/common/networking.py +++ b/dask_kubernetes/common/networking.py @@ -132,6 +132,11 @@ async def wait_for_scheduler(cluster_name, namespace): label_selector=f"dask.org/cluster-name={cluster_name},dask.org/component=scheduler", timeout_seconds=60, ): - if event["object"].status.phase == "Running": - watch.stop() + if event["object"].status.conditions: + conditions = { + c.type: c.status for c in event["object"].status.conditions + } + if "Ready" in conditions and conditions["Ready"] == "True": + print(event["object"].status) + watch.stop() await asyncio.sleep(0.1) From a228e1c3d8a7d21f39260762545b05e747f5f25e Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Mon, 30 May 2022 18:18:07 +0100 Subject: [PATCH 07/10] Remove stray print --- dask_kubernetes/common/networking.py | 1 - 1 file changed, 1 deletion(-) diff --git a/dask_kubernetes/common/networking.py b/dask_kubernetes/common/networking.py index 5e64d1abb..cdc5a76de 100644 --- a/dask_kubernetes/common/networking.py +++ b/dask_kubernetes/common/networking.py @@ -137,6 +137,5 @@ async def wait_for_scheduler(cluster_name, namespace): c.type: c.status for c in event["object"].status.conditions } if "Ready" in conditions and conditions["Ready"] == "True": - print(event["object"].status) watch.stop() await asyncio.sleep(0.1) From ec1bd8fd2f79baae7ed150cf746ab64eafbffa75 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Tue, 31 May 2022 10:01:21 +0100 Subject: [PATCH 08/10] Wait for scheduler comm and get dashboard port more intelligently --- dask_kubernetes/common/networking.py | 14 ++++++++++++ dask_kubernetes/experimental/kubecluster.py | 24 +++++++++++++++------ 2 files changed, 31 insertions(+), 7 deletions(-) diff --git a/dask_kubernetes/common/networking.py b/dask_kubernetes/common/networking.py index cdc5a76de..21dedd795 100644 --- a/dask_kubernetes/common/networking.py +++ b/dask_kubernetes/common/networking.py @@ -6,6 +6,9 @@ from weakref import finalize import kubernetes_asyncio as kubernetes +from tornado.iostream import StreamClosedError + +from distributed.core import rpc from .utils import check_dependency @@ -139,3 +142,14 @@ async def wait_for_scheduler(cluster_name, namespace): if "Ready" in conditions and conditions["Ready"] == "True": watch.stop() await asyncio.sleep(0.1) + + +async def wait_for_scheduler_comm(address): + while True: + try: + async with rpc(address) as scheduler_comm: + print(await scheduler_comm.versions()) + except (StreamClosedError, OSError): + await asyncio.sleep(0.1) + continue + break diff --git a/dask_kubernetes/experimental/kubecluster.py b/dask_kubernetes/experimental/kubecluster.py index 671e5dc5f..fa28c2700 100644 --- a/dask_kubernetes/experimental/kubecluster.py +++ b/dask_kubernetes/experimental/kubecluster.py @@ -28,8 +28,8 @@ from dask_kubernetes.common.networking import ( get_scheduler_address, - port_forward_dashboard, wait_for_scheduler, + wait_for_scheduler_comm, ) @@ -208,10 +208,15 @@ async def _create_cluster(self): ) from e await wait_for_scheduler(cluster_name, self.namespace) await wait_for_service(core_api, f"{cluster_name}-service", self.namespace) - self.scheduler_comm = rpc(await self._get_scheduler_address()) - self.forwarded_dashboard_port = await port_forward_dashboard( - f"{self.name}-cluster-service", self.namespace + scheduler_address = await self._get_scheduler_address() + await wait_for_scheduler_comm(scheduler_address) + self.scheduler_comm = rpc(scheduler_address) + dashboard_address = await get_scheduler_address( + f"{self.name}-cluster-service", + self.namespace, + port_name="http-dashboard", ) + self.forwarded_dashboard_port = dashboard_address.split(":")[-1] async def _connect_cluster(self): if self.shutdown_on_close is None: @@ -230,10 +235,15 @@ async def _connect_cluster(self): service_name = f'{cluster_spec["metadata"]["name"]}-service' await wait_for_scheduler(self.cluster_name, self.namespace) await wait_for_service(core_api, service_name, self.namespace) - self.scheduler_comm = rpc(await self._get_scheduler_address()) - self.forwarded_dashboard_port = await port_forward_dashboard( - f"{self.name}-cluster-service", self.namespace + scheduler_address = await self._get_scheduler_address() + await wait_for_scheduler_comm(scheduler_address) + self.scheduler_comm = rpc(scheduler_address) + dashboard_address = await get_scheduler_address( + service_name, + self.namespace, + port_name="http-dashboard", ) + self.forwarded_dashboard_port = dashboard_address.split(":")[-1] async def _get_cluster(self): async with kubernetes.client.api_client.ApiClient() as api_client: From 10a8fc1ffd884fe6e8d20724eabe4d021a59d12c Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Tue, 31 May 2022 10:02:51 +0100 Subject: [PATCH 09/10] Fix stray print --- dask_kubernetes/common/networking.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_kubernetes/common/networking.py b/dask_kubernetes/common/networking.py index 21dedd795..a38738725 100644 --- a/dask_kubernetes/common/networking.py +++ b/dask_kubernetes/common/networking.py @@ -148,7 +148,7 @@ async def wait_for_scheduler_comm(address): while True: try: async with rpc(address) as scheduler_comm: - print(await scheduler_comm.versions()) + await scheduler_comm.versions() except (StreamClosedError, OSError): await asyncio.sleep(0.1) continue From bc98597cdc0a8afa4e6b665e1c848bd363a546e9 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Tue, 31 May 2022 11:24:46 +0100 Subject: [PATCH 10/10] Ditch loop passing due to upstream changes --- dask_kubernetes/classic/tests/test_sync.py | 49 ++++++++++------------ 1 file changed, 22 insertions(+), 27 deletions(-) diff --git a/dask_kubernetes/classic/tests/test_sync.py b/dask_kubernetes/classic/tests/test_sync.py index 5356cb102..49feb5b95 100644 --- a/dask_kubernetes/classic/tests/test_sync.py +++ b/dask_kubernetes/classic/tests/test_sync.py @@ -5,7 +5,7 @@ import dask import pytest from dask.distributed import Client, wait -from distributed.utils_test import loop, captured_logger # noqa: F401 +from distributed.utils_test import captured_logger from dask.utils import tmpfile from dask_kubernetes import KubeCluster, make_pod_spec @@ -75,17 +75,17 @@ def test_ipython_display(cluster): sleep(0.5) -def test_env(pod_spec, loop): - with KubeCluster(pod_spec, env={"ABC": "DEF"}, loop=loop) as cluster: +def test_env(pod_spec): + with KubeCluster(pod_spec, env={"ABC": "DEF"}) as cluster: cluster.scale(1) - with Client(cluster, loop=loop) as client: + with Client(cluster) as client: while not cluster.scheduler_info["workers"]: sleep(0.1) env = client.run(lambda: dict(os.environ)) assert all(v["ABC"] == "DEF" for v in env.values()) -def dont_test_pod_template_yaml(docker_image, loop): +def dont_test_pod_template_yaml(docker_image): test_yaml = { "kind": "Pod", "metadata": {"labels": {"app": "dask", "component": "dask-worker"}}, @@ -109,9 +109,9 @@ def dont_test_pod_template_yaml(docker_image, loop): with tmpfile(extension="yaml") as fn: with open(fn, mode="w") as f: yaml.dump(test_yaml, f) - with KubeCluster(f.name, loop=loop) as cluster: + with KubeCluster(f.name) as cluster: cluster.scale(2) - with Client(cluster, loop=loop) as client: + with Client(cluster) as client: future = client.submit(lambda x: x + 1, 10) result = future.result(timeout=10) assert result == 11 @@ -128,7 +128,7 @@ def dont_test_pod_template_yaml(docker_image, loop): assert all(client.has_what().values()) -def test_pod_template_yaml_expand_env_vars(docker_image, loop): +def test_pod_template_yaml_expand_env_vars(docker_image): try: os.environ["FOO_IMAGE"] = docker_image @@ -155,13 +155,13 @@ def test_pod_template_yaml_expand_env_vars(docker_image, loop): with tmpfile(extension="yaml") as fn: with open(fn, mode="w") as f: yaml.dump(test_yaml, f) - with KubeCluster(f.name, loop=loop) as cluster: + with KubeCluster(f.name) as cluster: assert cluster.pod_template.spec.containers[0].image == docker_image finally: del os.environ["FOO_IMAGE"] -def test_pod_template_dict(docker_image, loop): +def test_pod_template_dict(docker_image): spec = { "metadata": {}, "restartPolicy": "Never", @@ -185,9 +185,9 @@ def test_pod_template_dict(docker_image, loop): }, } - with KubeCluster(spec, loop=loop) as cluster: + with KubeCluster(spec) as cluster: cluster.scale(2) - with Client(cluster, loop=loop) as client: + with Client(cluster) as client: future = client.submit(lambda x: x + 1, 10) result = future.result() assert result == 11 @@ -202,7 +202,7 @@ def test_pod_template_dict(docker_image, loop): assert all(client.has_what().values()) -def test_pod_template_minimal_dict(docker_image, loop): +def test_pod_template_minimal_dict(docker_image): spec = { "spec": { "containers": [ @@ -224,9 +224,9 @@ def test_pod_template_minimal_dict(docker_image, loop): } } - with KubeCluster(spec, loop=loop) as cluster: + with KubeCluster(spec) as cluster: cluster.adapt() - with Client(cluster, loop=loop) as client: + with Client(cluster) as client: future = client.submit(lambda x: x + 1, 10) result = future.result() assert result == 11 @@ -264,9 +264,9 @@ def test_bad_args(): KubeCluster({"kind": "Pod"}) -def test_constructor_parameters(pod_spec, loop): +def test_constructor_parameters(pod_spec): env = {"FOO": "BAR", "A": 1} - with KubeCluster(pod_spec, name="myname", loop=loop, env=env) as cluster: + with KubeCluster(pod_spec, name="myname", env=env) as cluster: pod = cluster.pod_template var = [v for v in pod.spec.containers[0].env if v.name == "FOO"] @@ -380,7 +380,7 @@ def test_maximum(cluster): assert "scale beyond maximum number of workers" in result.lower() -def test_extra_pod_config(docker_image, loop): +def test_extra_pod_config(docker_image): """ Test that our pod config merging process works fine """ @@ -388,7 +388,6 @@ def test_extra_pod_config(docker_image, loop): make_pod_spec( docker_image, extra_pod_config={"automountServiceAccountToken": False} ), - loop=loop, n_workers=0, ) as cluster: @@ -397,7 +396,7 @@ def test_extra_pod_config(docker_image, loop): assert pod.spec.automount_service_account_token is False -def test_extra_container_config(docker_image, loop): +def test_extra_container_config(docker_image): """ Test that our container config merging process works fine """ @@ -409,7 +408,6 @@ def test_extra_container_config(docker_image, loop): "securityContext": {"runAsUser": 0}, }, ), - loop=loop, n_workers=0, ) as cluster: @@ -419,7 +417,7 @@ def test_extra_container_config(docker_image, loop): assert pod.spec.containers[0].security_context == {"runAsUser": 0} -def test_container_resources_config(docker_image, loop): +def test_container_resources_config(docker_image): """ Test container resource requests / limits being set properly """ @@ -427,7 +425,6 @@ def test_container_resources_config(docker_image, loop): make_pod_spec( docker_image, memory_request="0.5G", memory_limit="1G", cpu_limit="1" ), - loop=loop, n_workers=0, ) as cluster: @@ -439,7 +436,7 @@ def test_container_resources_config(docker_image, loop): assert "cpu" not in pod.spec.containers[0].resources.requests -def test_extra_container_config_merge(docker_image, loop): +def test_extra_container_config_merge(docker_image): """ Test that our container config merging process works recursively fine """ @@ -452,7 +449,6 @@ def test_extra_container_config_merge(docker_image, loop): "args": ["last-item"], }, ), - loop=loop, n_workers=0, ) as cluster: @@ -464,7 +460,7 @@ def test_extra_container_config_merge(docker_image, loop): assert pod.spec.containers[0].args[-1] == "last-item" -def test_worker_args(docker_image, loop): +def test_worker_args(docker_image): """ Test that dask-worker arguments are added to the container args """ @@ -474,7 +470,6 @@ def test_worker_args(docker_image, loop): memory_limit="5000M", resources="FOO=1 BAR=2", ), - loop=loop, n_workers=0, ) as cluster: