Skip to content

Commit 68d6c96

Browse files
Auto shutdown idle clusters (#672)
* Add autoCleanupTimeout option * Add idle_timeout * Update to use dask/distributed#7642 * Rename spec ketyword * Updated with changes in distributed * Nudge CI * Set idle_timeout longer than resource_timeout * Rename cluster to avoid collision * Fix timeout default value * Reduce logging noise and add warning when autoshudown can't reach scheduler
1 parent c70f7fe commit 68d6c96

File tree

6 files changed

+118
-6
lines changed

6 files changed

+118
-6
lines changed

dask_kubernetes/operator/controller/controller.py

+93-3
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@
1616
from dask_kubernetes.common.networking import get_scheduler_address
1717
from dask_kubernetes.aiopykube import HTTPClient, KubeConfig
1818
from dask_kubernetes.aiopykube.dask import DaskCluster
19-
from distributed.core import rpc
19+
from distributed.core import rpc, clean_exception
20+
from distributed.protocol.pickle import dumps
2021

2122
_ANNOTATION_NAMESPACES_TO_IGNORE = (
2223
"kopf.zalando.org",
@@ -416,7 +417,7 @@ async def retire_workers(
416417
)
417418

418419
# Otherwise try gracefully retiring via the RPC
419-
logger.info(
420+
logger.debug(
420421
f"Scaling {worker_group_name} failed via the HTTP API, falling back to the Dask RPC"
421422
)
422423
# Dask version mismatches between the operator and scheduler may cause this to fail in any number of unexpected ways
@@ -435,7 +436,7 @@ async def retire_workers(
435436
return workers_to_close
436437

437438
# Finally fall back to last-in-first-out scaling
438-
logger.info(
439+
logger.debug(
439440
f"Scaling {worker_group_name} failed via the Dask RPC, falling back to LIFO scaling"
440441
)
441442
async with kubernetes.client.api_client.ApiClient() as api_client:
@@ -447,6 +448,75 @@ async def retire_workers(
447448
return [w["metadata"]["name"] for w in workers.items[:-n_workers]]
448449

449450

451+
async def check_scheduler_idle(scheduler_service_name, namespace, logger):
452+
# Try getting idle time via HTTP API
453+
dashboard_address = await get_scheduler_address(
454+
scheduler_service_name,
455+
namespace,
456+
port_name="http-dashboard",
457+
allow_external=False,
458+
)
459+
async with aiohttp.ClientSession() as session:
460+
url = f"{dashboard_address}/api/v1/check_idle"
461+
async with session.get(url) as resp:
462+
if resp.status <= 300:
463+
idle_since = (await resp.json())["idle_since"]
464+
if idle_since:
465+
logger.debug("Scheduler idle since: %s", idle_since)
466+
return idle_since
467+
logger.debug(
468+
"Received %d response from scheduler API with body %s",
469+
resp.status,
470+
await resp.text(),
471+
)
472+
473+
# Otherwise try gracefully checking via the RPC
474+
logger.debug(
475+
f"Checking {scheduler_service_name} idleness failed via the HTTP API, falling back to the Dask RPC"
476+
)
477+
# Dask version mismatches between the operator and scheduler may cause this to fail in any number of unexpected ways
478+
with suppress(Exception):
479+
comm_address = await get_scheduler_address(
480+
scheduler_service_name,
481+
namespace,
482+
allow_external=False,
483+
)
484+
async with rpc(comm_address) as scheduler_comm:
485+
idle_since = await scheduler_comm.check_idle()
486+
if idle_since:
487+
logger.debug("Scheduler idle since: %s", idle_since)
488+
return idle_since
489+
490+
# Finally fall back to code injection via the Dask RPC for distributed<=2023.3.1
491+
logger.debug(
492+
f"Checking {scheduler_service_name} idleness failed via the Dask RPC, falling back to run_on_scheduler"
493+
)
494+
495+
def idle_since(dask_scheduler=None):
496+
if not dask_scheduler.idle_timeout:
497+
dask_scheduler.idle_timeout = 300
498+
dask_scheduler.check_idle()
499+
return dask_scheduler.idle_since
500+
501+
comm_address = await get_scheduler_address(
502+
scheduler_service_name,
503+
namespace,
504+
allow_external=False,
505+
)
506+
async with rpc(comm_address) as scheduler_comm:
507+
response = await scheduler_comm.run_function(
508+
function=dumps(idle_since),
509+
)
510+
if response["status"] == "error":
511+
typ, exc, tb = clean_exception(**response)
512+
raise exc.with_traceback(tb)
513+
else:
514+
idle_since = response["result"]
515+
if idle_since:
516+
logger.debug("Scheduler idle since: %s", idle_since)
517+
return idle_since
518+
519+
450520
async def get_desired_workers(scheduler_service_name, namespace, logger):
451521
# Try gracefully retiring via the HTTP API
452522
dashboard_address = await get_scheduler_address(
@@ -901,3 +971,23 @@ async def daskautoscaler_adapt(spec, name, namespace, logger, **kwargs):
901971
logger.debug(
902972
"Not autoscaling %s with %d workers", spec["cluster"], current_replicas
903973
)
974+
975+
976+
@kopf.timer("daskcluster.kubernetes.dask.org", interval=5.0)
977+
async def daskcluster_autoshutdown(spec, name, namespace, logger, **kwargs):
978+
if spec["idleTimeout"]:
979+
try:
980+
idle_since = await check_scheduler_idle(
981+
scheduler_service_name=f"{name}-scheduler",
982+
namespace=namespace,
983+
logger=logger,
984+
)
985+
except Exception as e:
986+
logger.warn("Unable to connect to scheduler, skipping autoshutdown check.")
987+
return
988+
if idle_since and time.time() > idle_since + spec["idleTimeout"]:
989+
api = HTTPClient(KubeConfig.from_env())
990+
cluster = await DaskCluster.objects(api, namespace=namespace).get_by_name(
991+
name
992+
)
993+
await cluster.delete()

dask_kubernetes/operator/customresources/templates.yaml

+4
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,10 @@ definitions:
6767
- scheduler
6868
- worker
6969
properties:
70+
idleTimeout:
71+
type: integer
72+
description: Delete cluster if scheduler is idle for longer than this timeout. Set to 0 to never auto cleanup.
73+
default: 0
7074
scheduler:
7175
$ref: 'python://dask_kubernetes/operator/customresources/templates.yaml#/definitions/dask.k8s.api.v1.DaskScheduler'
7276
worker:

dask_kubernetes/operator/deployment/helm/dask-kubernetes-operator/crds/daskcluster.yaml

+3
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ spec:
3030
properties:
3131
spec:
3232
properties:
33+
idleTimeout:
34+
default: 0
35+
type: integer
3336
scheduler:
3437
properties:
3538
metadata:

dask_kubernetes/operator/deployment/helm/dask-kubernetes-operator/crds/daskjob.yaml

+3
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ spec:
3232
properties:
3333
spec:
3434
properties:
35+
idleTimeout:
36+
default: 0
37+
type: integer
3538
scheduler:
3639
properties:
3740
metadata:

dask_kubernetes/operator/kubecluster/kubecluster.py

+13-1
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,9 @@ class KubeCluster(Cluster):
102102
shutdown_on_close: bool (optional)
103103
Whether or not to delete the cluster resource when this object is closed.
104104
Defaults to ``True`` when creating a cluster and ``False`` when connecting to an existing one.
105+
idle_timeout: int (optional)
106+
If set Kubernetes will delete the cluster automatically if the scheduler is idle for longer than
107+
this timeout in seconds.
105108
resource_timeout: int (optional)
106109
Time in seconds to wait for Kubernetes resources to enter their expected state.
107110
Example: If the ``DaskCluster`` resource that gets created isn't moved into a known ``status.phase``
@@ -170,6 +173,7 @@ def __init__(
170173
port_forward_cluster_ip=None,
171174
create_mode=None,
172175
shutdown_on_close=None,
176+
idle_timeout=None,
173177
resource_timeout=None,
174178
scheduler_service_type=None,
175179
custom_cluster_spec=None,
@@ -220,6 +224,9 @@ def __init__(
220224
self.scheduler_forward_port = dask.config.get(
221225
"kubernetes.scheduler-forward-port", override_with=scheduler_forward_port
222226
)
227+
self.idle_timeout = dask.config.get(
228+
"kubernetes.idle-timeout", override_with=idle_timeout
229+
)
223230

224231
if self._custom_cluster_spec:
225232
if isinstance(self._custom_cluster_spec, str):
@@ -328,6 +335,7 @@ async def _create_cluster(self):
328335
n_workers=self.n_workers,
329336
image=self.image,
330337
scheduler_service_type=self.scheduler_service_type,
338+
idle_timeout=self.idle_timeout,
331339
)
332340
else:
333341
data = self._custom_cluster_spec
@@ -722,7 +730,7 @@ def close(self, timeout=3600):
722730
"""Delete the dask cluster"""
723731
return self.sync(self._close, timeout=timeout)
724732

725-
async def _close(self, timeout=None):
733+
async def _close(self, timeout=3600):
726734
await super()._close()
727735
if self.shutdown_on_close:
728736
async with kubernetes.client.api_client.ApiClient() as api_client:
@@ -881,6 +889,7 @@ def make_cluster_spec(
881889
env=None,
882890
worker_command="dask-worker",
883891
scheduler_service_type="ClusterIP",
892+
idle_timeout=0,
884893
):
885894
"""Generate a ``DaskCluster`` kubernetes resource.
886895
@@ -900,12 +909,15 @@ def make_cluster_spec(
900909
Environment variables to set on scheduler and workers
901910
worker_command: str (optional)
902911
Worker command to use when starting the workers
912+
idle_timeout: int (optional)
913+
Timeout to cleanup idle cluster
903914
"""
904915
return {
905916
"apiVersion": "kubernetes.dask.org/v1",
906917
"kind": "DaskCluster",
907918
"metadata": {"name": name},
908919
"spec": {
920+
"idleTimeout": idle_timeout,
909921
"worker": make_worker_spec(
910922
env=env,
911923
resources=resources,

dask_kubernetes/operator/kubecluster/tests/test_kubecluster.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -132,11 +132,11 @@ def test_cluster_without_operator(docker_image):
132132
def test_cluster_crashloopbackoff(kopf_runner, docker_image):
133133
with kopf_runner:
134134
with pytest.raises(SchedulerStartupError, match="Scheduler failed to start"):
135-
spec = make_cluster_spec(name="foo", n_workers=1)
135+
spec = make_cluster_spec(name="crashloopbackoff", n_workers=1)
136136
spec["spec"]["scheduler"]["spec"]["containers"][0]["args"][
137137
0
138138
] = "dask-schmeduler"
139-
KubeCluster(custom_cluster_spec=spec, resource_timeout=1)
139+
KubeCluster(custom_cluster_spec=spec, resource_timeout=1, idle_timeout=2)
140140

141141

142142
def test_adapt(kopf_runner, docker_image):

0 commit comments

Comments
 (0)