Skip to content

Commit b35cd81

Browse files
Replace aiopykube with kr8s in controller (#721)
* Add kr8s Dask resource classes * Replace aiopykube with kr8s in handle_scheduler_service_status * Add kr8s dependency * Revert everything except the import and class definitions * Replace iopykube * Bump kr8s to 0.5.1 and make DaskCluster scalable
1 parent 68d6c96 commit b35cd81

File tree

2 files changed

+44
-10
lines changed

2 files changed

+44
-10
lines changed

dask_kubernetes/operator/controller/controller.py

+43-10
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,10 @@
1111
import kubernetes_asyncio as kubernetes
1212
from importlib_metadata import entry_points
1313
from kubernetes_asyncio.client import ApiException
14+
from kr8s.asyncio.objects import APIObject
1415

1516
from dask_kubernetes.common.auth import ClusterAuth
1617
from dask_kubernetes.common.networking import get_scheduler_address
17-
from dask_kubernetes.aiopykube import HTTPClient, KubeConfig
18-
from dask_kubernetes.aiopykube.dask import DaskCluster
1918
from distributed.core import rpc, clean_exception
2019
from distributed.protocol.pickle import dumps
2120

@@ -40,6 +39,45 @@ class SchedulerCommError(Exception):
4039
"""Raised when unable to communicate with a scheduler."""
4140

4241

42+
class DaskCluster(APIObject):
43+
version = "kubernetes.dask.org/v1"
44+
endpoint = "daskclusters"
45+
kind = "DaskCluster"
46+
plural = "daskclusters"
47+
singular = "daskcluster"
48+
namespaced = True
49+
scalable = True
50+
scalable_spec = "worker.replicas"
51+
52+
53+
class DaskWorkerGroup(APIObject):
54+
version = "kubernetes.dask.org/v1"
55+
endpoint = "daskworkergroups"
56+
kind = "DaskWorkerGroup"
57+
plural = "daskworkergroups"
58+
singular = "daskworkergroup"
59+
namespaced = True
60+
scalable = True
61+
62+
63+
class DaskAutoscaler(APIObject):
64+
version = "kubernetes.dask.org/v1"
65+
endpoint = "daskautoscalers"
66+
kind = "DaskAutoscaler"
67+
plural = "daskautoscalers"
68+
singular = "daskautoscaler"
69+
namespaced = True
70+
71+
72+
class DaskJob(APIObject):
73+
version = "kubernetes.dask.org/v1"
74+
endpoint = "daskjobs"
75+
kind = "DaskJob"
76+
plural = "daskjobs"
77+
singular = "daskjob"
78+
namespaced = True
79+
80+
4381
def _get_annotations(meta):
4482
return {
4583
annotation_key: annotation_value
@@ -347,10 +385,8 @@ async def handle_scheduler_service_status(
347385
# Otherwise mark it as Running
348386
else:
349387
phase = "Running"
350-
351-
api = HTTPClient(KubeConfig.from_env())
352-
cluster = await DaskCluster.objects(api, namespace=namespace).get_by_name(
353-
labels["dask.org/cluster-name"]
388+
cluster = await DaskCluster.get(
389+
labels["dask.org/cluster-name"], namespace=namespace
354390
)
355391
await cluster.patch({"status": {"phase": phase}})
356392

@@ -986,8 +1022,5 @@ async def daskcluster_autoshutdown(spec, name, namespace, logger, **kwargs):
9861022
logger.warn("Unable to connect to scheduler, skipping autoshutdown check.")
9871023
return
9881024
if idle_since and time.time() > idle_since + spec["idleTimeout"]:
989-
api = HTTPClient(KubeConfig.from_env())
990-
cluster = await DaskCluster.objects(api, namespace=namespace).get_by_name(
991-
name
992-
)
1025+
cluster = await DaskCluster.get(name, namespace=namespace)
9931026
await cluster.delete()

requirements.txt

+1
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,4 @@ kubernetes-asyncio>=12.0.1
55
kopf>=1.35.3
66
pykube-ng>=22.9.0
77
rich>=12.5.1
8+
kr8s==0.5.1

0 commit comments

Comments
 (0)