Skip to content

Commit 637ea1c

Browse files
Small fixes to get things working on Kubeflow (#509)
* Release 2022.5.1 * Small fixes to get things working on Kubeflow * Scheduler listen on all addresses * Switch scheduler probes to httpGet to avoid TCP handshake errors cluttering logs * Rename comm and dashboard ports to tcp-comm and http-dashboard * Wait for scheduler to be Ready instead of Running * Remove stray print * Wait for scheduler comm and get dashboard port more intelligently * Fix stray print * Ditch loop passing due to upstream changes
1 parent 0ec15f4 commit 637ea1c

File tree

10 files changed

+162
-108
lines changed

10 files changed

+162
-108
lines changed

dask_kubernetes/classic/tests/test_sync.py

Lines changed: 22 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import dask
66
import pytest
77
from dask.distributed import Client, wait
8-
from distributed.utils_test import loop, captured_logger # noqa: F401
8+
from distributed.utils_test import captured_logger
99
from dask.utils import tmpfile
1010

1111
from dask_kubernetes import KubeCluster, make_pod_spec
@@ -75,17 +75,17 @@ def test_ipython_display(cluster):
7575
sleep(0.5)
7676

7777

78-
def test_env(pod_spec, loop):
79-
with KubeCluster(pod_spec, env={"ABC": "DEF"}, loop=loop) as cluster:
78+
def test_env(pod_spec):
79+
with KubeCluster(pod_spec, env={"ABC": "DEF"}) as cluster:
8080
cluster.scale(1)
81-
with Client(cluster, loop=loop) as client:
81+
with Client(cluster) as client:
8282
while not cluster.scheduler_info["workers"]:
8383
sleep(0.1)
8484
env = client.run(lambda: dict(os.environ))
8585
assert all(v["ABC"] == "DEF" for v in env.values())
8686

8787

88-
def dont_test_pod_template_yaml(docker_image, loop):
88+
def dont_test_pod_template_yaml(docker_image):
8989
test_yaml = {
9090
"kind": "Pod",
9191
"metadata": {"labels": {"app": "dask", "component": "dask-worker"}},
@@ -109,9 +109,9 @@ def dont_test_pod_template_yaml(docker_image, loop):
109109
with tmpfile(extension="yaml") as fn:
110110
with open(fn, mode="w") as f:
111111
yaml.dump(test_yaml, f)
112-
with KubeCluster(f.name, loop=loop) as cluster:
112+
with KubeCluster(f.name) as cluster:
113113
cluster.scale(2)
114-
with Client(cluster, loop=loop) as client:
114+
with Client(cluster) as client:
115115
future = client.submit(lambda x: x + 1, 10)
116116
result = future.result(timeout=10)
117117
assert result == 11
@@ -128,7 +128,7 @@ def dont_test_pod_template_yaml(docker_image, loop):
128128
assert all(client.has_what().values())
129129

130130

131-
def test_pod_template_yaml_expand_env_vars(docker_image, loop):
131+
def test_pod_template_yaml_expand_env_vars(docker_image):
132132
try:
133133
os.environ["FOO_IMAGE"] = docker_image
134134

@@ -155,13 +155,13 @@ def test_pod_template_yaml_expand_env_vars(docker_image, loop):
155155
with tmpfile(extension="yaml") as fn:
156156
with open(fn, mode="w") as f:
157157
yaml.dump(test_yaml, f)
158-
with KubeCluster(f.name, loop=loop) as cluster:
158+
with KubeCluster(f.name) as cluster:
159159
assert cluster.pod_template.spec.containers[0].image == docker_image
160160
finally:
161161
del os.environ["FOO_IMAGE"]
162162

163163

164-
def test_pod_template_dict(docker_image, loop):
164+
def test_pod_template_dict(docker_image):
165165
spec = {
166166
"metadata": {},
167167
"restartPolicy": "Never",
@@ -185,9 +185,9 @@ def test_pod_template_dict(docker_image, loop):
185185
},
186186
}
187187

188-
with KubeCluster(spec, loop=loop) as cluster:
188+
with KubeCluster(spec) as cluster:
189189
cluster.scale(2)
190-
with Client(cluster, loop=loop) as client:
190+
with Client(cluster) as client:
191191
future = client.submit(lambda x: x + 1, 10)
192192
result = future.result()
193193
assert result == 11
@@ -202,7 +202,7 @@ def test_pod_template_dict(docker_image, loop):
202202
assert all(client.has_what().values())
203203

204204

205-
def test_pod_template_minimal_dict(docker_image, loop):
205+
def test_pod_template_minimal_dict(docker_image):
206206
spec = {
207207
"spec": {
208208
"containers": [
@@ -224,9 +224,9 @@ def test_pod_template_minimal_dict(docker_image, loop):
224224
}
225225
}
226226

227-
with KubeCluster(spec, loop=loop) as cluster:
227+
with KubeCluster(spec) as cluster:
228228
cluster.adapt()
229-
with Client(cluster, loop=loop) as client:
229+
with Client(cluster) as client:
230230
future = client.submit(lambda x: x + 1, 10)
231231
result = future.result()
232232
assert result == 11
@@ -264,9 +264,9 @@ def test_bad_args():
264264
KubeCluster({"kind": "Pod"})
265265

266266

267-
def test_constructor_parameters(pod_spec, loop):
267+
def test_constructor_parameters(pod_spec):
268268
env = {"FOO": "BAR", "A": 1}
269-
with KubeCluster(pod_spec, name="myname", loop=loop, env=env) as cluster:
269+
with KubeCluster(pod_spec, name="myname", env=env) as cluster:
270270
pod = cluster.pod_template
271271

272272
var = [v for v in pod.spec.containers[0].env if v.name == "FOO"]
@@ -380,15 +380,14 @@ def test_maximum(cluster):
380380
assert "scale beyond maximum number of workers" in result.lower()
381381

382382

383-
def test_extra_pod_config(docker_image, loop):
383+
def test_extra_pod_config(docker_image):
384384
"""
385385
Test that our pod config merging process works fine
386386
"""
387387
with KubeCluster(
388388
make_pod_spec(
389389
docker_image, extra_pod_config={"automountServiceAccountToken": False}
390390
),
391-
loop=loop,
392391
n_workers=0,
393392
) as cluster:
394393

@@ -397,7 +396,7 @@ def test_extra_pod_config(docker_image, loop):
397396
assert pod.spec.automount_service_account_token is False
398397

399398

400-
def test_extra_container_config(docker_image, loop):
399+
def test_extra_container_config(docker_image):
401400
"""
402401
Test that our container config merging process works fine
403402
"""
@@ -409,7 +408,6 @@ def test_extra_container_config(docker_image, loop):
409408
"securityContext": {"runAsUser": 0},
410409
},
411410
),
412-
loop=loop,
413411
n_workers=0,
414412
) as cluster:
415413

@@ -419,15 +417,14 @@ def test_extra_container_config(docker_image, loop):
419417
assert pod.spec.containers[0].security_context == {"runAsUser": 0}
420418

421419

422-
def test_container_resources_config(docker_image, loop):
420+
def test_container_resources_config(docker_image):
423421
"""
424422
Test container resource requests / limits being set properly
425423
"""
426424
with KubeCluster(
427425
make_pod_spec(
428426
docker_image, memory_request="0.5G", memory_limit="1G", cpu_limit="1"
429427
),
430-
loop=loop,
431428
n_workers=0,
432429
) as cluster:
433430

@@ -439,7 +436,7 @@ def test_container_resources_config(docker_image, loop):
439436
assert "cpu" not in pod.spec.containers[0].resources.requests
440437

441438

442-
def test_extra_container_config_merge(docker_image, loop):
439+
def test_extra_container_config_merge(docker_image):
443440
"""
444441
Test that our container config merging process works recursively fine
445442
"""
@@ -452,7 +449,6 @@ def test_extra_container_config_merge(docker_image, loop):
452449
"args": ["last-item"],
453450
},
454451
),
455-
loop=loop,
456452
n_workers=0,
457453
) as cluster:
458454

@@ -464,7 +460,7 @@ def test_extra_container_config_merge(docker_image, loop):
464460
assert pod.spec.containers[0].args[-1] == "last-item"
465461

466462

467-
def test_worker_args(docker_image, loop):
463+
def test_worker_args(docker_image):
468464
"""
469465
Test that dask-worker arguments are added to the container args
470466
"""
@@ -474,7 +470,6 @@ def test_worker_args(docker_image, loop):
474470
memory_limit="5000M",
475471
resources="FOO=1 BAR=2",
476472
),
477-
loop=loop,
478473
n_workers=0,
479474
) as cluster:
480475

dask_kubernetes/common/networking.py

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@
66
from weakref import finalize
77

88
import kubernetes_asyncio as kubernetes
9+
from tornado.iostream import StreamClosedError
10+
11+
from distributed.core import rpc
912

1013
from .utils import check_dependency
1114

@@ -15,7 +18,7 @@ async def get_external_address_for_scheduler_service(
1518
service,
1619
port_forward_cluster_ip=None,
1720
service_name_resolution_retries=20,
18-
port_name="comm",
21+
port_name="tcp-comm",
1922
):
2023
"""Take a service object and return the scheduler address."""
2124
[port] = [
@@ -108,7 +111,7 @@ async def port_forward_dashboard(service_name, namespace):
108111
return port
109112

110113

111-
async def get_scheduler_address(service_name, namespace, port_name="comm"):
114+
async def get_scheduler_address(service_name, namespace, port_name="tcp-comm"):
112115
async with kubernetes.client.api_client.ApiClient() as api_client:
113116
api = kubernetes.client.CoreV1Api(api_client)
114117
service = await api.read_namespaced_service(service_name, namespace)
@@ -132,6 +135,21 @@ async def wait_for_scheduler(cluster_name, namespace):
132135
label_selector=f"dask.org/cluster-name={cluster_name},dask.org/component=scheduler",
133136
timeout_seconds=60,
134137
):
135-
if event["object"].status.phase == "Running":
136-
watch.stop()
138+
if event["object"].status.conditions:
139+
conditions = {
140+
c.type: c.status for c in event["object"].status.conditions
141+
}
142+
if "Ready" in conditions and conditions["Ready"] == "True":
143+
watch.stop()
144+
await asyncio.sleep(0.1)
145+
146+
147+
async def wait_for_scheduler_comm(address):
148+
while True:
149+
try:
150+
async with rpc(address) as scheduler_comm:
151+
await scheduler_comm.versions()
152+
except (StreamClosedError, OSError):
137153
await asyncio.sleep(0.1)
154+
continue
155+
break

dask_kubernetes/experimental/kubecluster.py

Lines changed: 29 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,16 @@
2020
)
2121

2222
from dask_kubernetes.common.auth import ClusterAuth
23+
from dask_kubernetes.common.utils import namespace_default
2324
from dask_kubernetes.operator import (
2425
build_cluster_spec,
2526
wait_for_service,
2627
)
2728

2829
from dask_kubernetes.common.networking import (
2930
get_scheduler_address,
30-
port_forward_dashboard,
3131
wait_for_scheduler,
32+
wait_for_scheduler_comm,
3233
)
3334

3435

@@ -121,7 +122,7 @@ class KubeCluster(Cluster):
121122
def __init__(
122123
self,
123124
name,
124-
namespace="default",
125+
namespace=None,
125126
image="ghcr.io/dask/dask:latest",
126127
n_workers=3,
127128
resources={},
@@ -133,8 +134,7 @@ def __init__(
133134
**kwargs,
134135
):
135136
self.name = name
136-
# TODO: Set namespace to None and get default namespace from user's context
137-
self.namespace = namespace
137+
self.namespace = namespace or namespace_default()
138138
self.image = image
139139
self.n_workers = n_workers
140140
self.resources = resources
@@ -208,10 +208,15 @@ async def _create_cluster(self):
208208
) from e
209209
await wait_for_scheduler(cluster_name, self.namespace)
210210
await wait_for_service(core_api, f"{cluster_name}-service", self.namespace)
211-
self.scheduler_comm = rpc(await self._get_scheduler_address())
212-
self.forwarded_dashboard_port = await port_forward_dashboard(
213-
f"{self.name}-cluster-service", self.namespace
211+
scheduler_address = await self._get_scheduler_address()
212+
await wait_for_scheduler_comm(scheduler_address)
213+
self.scheduler_comm = rpc(scheduler_address)
214+
dashboard_address = await get_scheduler_address(
215+
f"{self.name}-cluster-service",
216+
self.namespace,
217+
port_name="http-dashboard",
214218
)
219+
self.forwarded_dashboard_port = dashboard_address.split(":")[-1]
215220

216221
async def _connect_cluster(self):
217222
if self.shutdown_on_close is None:
@@ -230,10 +235,15 @@ async def _connect_cluster(self):
230235
service_name = f'{cluster_spec["metadata"]["name"]}-service'
231236
await wait_for_scheduler(self.cluster_name, self.namespace)
232237
await wait_for_service(core_api, service_name, self.namespace)
233-
self.scheduler_comm = rpc(await self._get_scheduler_address())
234-
self.forwarded_dashboard_port = await port_forward_dashboard(
235-
f"{self.name}-cluster-service", self.namespace
238+
scheduler_address = await self._get_scheduler_address()
239+
await wait_for_scheduler_comm(scheduler_address)
240+
self.scheduler_comm = rpc(scheduler_address)
241+
dashboard_address = await get_scheduler_address(
242+
service_name,
243+
self.namespace,
244+
port_name="http-dashboard",
236245
)
246+
self.forwarded_dashboard_port = dashboard_address.split(":")[-1]
237247

238248
async def _get_cluster(self):
239249
async with kubernetes.client.api_client.ApiClient() as api_client:
@@ -465,30 +475,28 @@ def _build_scheduler_spec(self, cluster_name):
465475
{
466476
"name": "scheduler",
467477
"image": self.image,
468-
"args": [
469-
"dask-scheduler",
470-
],
478+
"args": ["dask-scheduler", "--host", "0.0.0.0"],
471479
"env": env,
472480
"resources": self.resources,
473481
"ports": [
474482
{
475-
"name": "comm",
483+
"name": "tcp-comm",
476484
"containerPort": 8786,
477485
"protocol": "TCP",
478486
},
479487
{
480-
"name": "dashboard",
488+
"name": "http-dashboard",
481489
"containerPort": 8787,
482490
"protocol": "TCP",
483491
},
484492
],
485493
"readinessProbe": {
486-
"tcpSocket": {"port": "comm"},
494+
"httpGet": {"port": "http-dashboard", "path": "/health"},
487495
"initialDelaySeconds": 5,
488496
"periodSeconds": 10,
489497
},
490498
"livenessProbe": {
491-
"tcpSocket": {"port": "comm"},
499+
"httpGet": {"port": "http-dashboard", "path": "/health"},
492500
"initialDelaySeconds": 15,
493501
"periodSeconds": 20,
494502
},
@@ -503,16 +511,16 @@ def _build_scheduler_spec(self, cluster_name):
503511
},
504512
"ports": [
505513
{
506-
"name": "comm",
514+
"name": "tcp-comm",
507515
"protocol": "TCP",
508516
"port": 8786,
509-
"targetPort": "comm",
517+
"targetPort": "tcp-comm",
510518
},
511519
{
512-
"name": "dashboard",
520+
"name": "http-dashboard",
513521
"protocol": "TCP",
514522
"port": 8787,
515-
"targetPort": "dashboard",
523+
"targetPort": "http-dashboard",
516524
},
517525
],
518526
},

dask_kubernetes/kubernetes.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,11 @@ kubernetes:
3030
dask.org/cluster-name: "" # Cluster name will be added automatically
3131
dask.org/component: scheduler
3232
ports:
33-
- name: comm
33+
- name: tcp-comm
3434
protocol: TCP
3535
port: 8786
3636
targetPort: 8786
37-
- name: dashboard
37+
- name: http-dashboard
3838
protocol: TCP
3939
port: 8787
4040
targetPort: 8787

0 commit comments

Comments
 (0)