Skip to content

Commit 41b3908

Browse files
committed
Merge remote-tracking branch 'upstream/main' into shutdown-on-close
2 parents cec9b6a + 68d6c96 commit 41b3908

25 files changed

+380
-193
lines changed

.github/workflows/helmcluster.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ jobs:
2525
strategy:
2626
fail-fast: false
2727
matrix:
28-
python-version: ["3.8", "3.9", "3.10"]
28+
python-version: ["3.9", "3.10"]
2929
kubernetes-version: ["1.27.0"]
3030
include:
3131
- python-version: "3.10"

.github/workflows/kubecluster.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ jobs:
2525
strategy:
2626
fail-fast: false
2727
matrix:
28-
python-version: ["3.8", "3.9", "3.10"]
28+
python-version: ["3.9", "3.10"]
2929
kubernetes-version: ["1.27.0"]
3030
include:
3131
- python-version: "3.10"

.github/workflows/lint.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ jobs:
88
- uses: actions/checkout@v2
99
- uses: actions/setup-python@v2
1010
with:
11-
python-version: "3.8"
11+
python-version: "3.10"
1212
- name: Install deps
1313
run: ./ci/install-deps.sh
1414
- uses: pre-commit/[email protected]

.github/workflows/operator.yaml

+3-2
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,11 @@ jobs:
3535

3636
test:
3737
runs-on: ubuntu-latest
38-
timeout-minutes: 45
38+
timeout-minutes: 20
3939
strategy:
4040
fail-fast: false
4141
matrix:
42-
python-version: ["3.8", "3.9", "3.10"]
42+
python-version: ["3.9", "3.10"]
4343
kubernetes-version: ["1.27.0"]
4444
include:
4545
- python-version: "3.10"
@@ -63,6 +63,7 @@ jobs:
6363
env:
6464
KUBERNETES_VERSION: ${{ matrix.kubernetes-version }}
6565
TEST_ISTIO: "true"
66+
TEST_DASK_GATEWAY: "true"
6667
run: pytest dask_kubernetes/common dask_kubernetes/operator dask_kubernetes/aiopykube
6768
- name: Debug k8s resources
6869
if: always()

.github/workflows/release.yml

+2-2
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,10 @@ jobs:
1010
- name: Checkout source
1111
uses: actions/checkout@v2
1212

13-
- name: Set up Python 3.8
13+
- name: Set up Python 3.10
1414
uses: actions/setup-python@v1
1515
with:
16-
python-version: 3.8
16+
python-version: "3.10"
1717

1818
- name: Install pypa/build and chartpress
1919
run: python -m pip install build wheel chartpress pyyaml

.readthedocs.yaml

+5-1
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,12 @@ sphinx:
1111

1212
formats: all
1313

14+
build:
15+
os: ubuntu-22.04
16+
tools:
17+
python: "3.10"
18+
1419
python:
15-
version: 3.8
1620
install:
1721
- method: pip
1822
path: .

README.rst

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ Dask Kubernetes
2121
:target: https://anaconda.org/conda-forge/dask-kubernetes
2222
:alt: Conda Forge
2323

24-
.. image:: https://img.shields.io/badge/python%20support-3.8%7C3.9%7C3.10-blue
24+
.. image:: https://img.shields.io/badge/python%20support-3.9%7C3.10-blue
2525
:target: https://kubernetes.dask.org/en/latest/installing.html#supported-versions
2626
:alt: Python Support
2727

ci/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
FROM ghcr.io/dask/dask:latest
1+
FROM ghcr.io/dask/dask:latest-py3.10
22

33
# Install latest dev builds of Dask and Distributed
44
RUN pip install git+https://github.com/dask/distributed@main

dask_kubernetes/classic/tests/test_async.py

+1
Original file line numberDiff line numberDiff line change
@@ -783,6 +783,7 @@ async def test_start_with_workers(k8s_cluster, pod_spec):
783783

784784

785785
@pytest.mark.asyncio
786+
@pytest.mark.xfail(reason="Flaky in CI and classic is deprecated anyway")
786787
async def test_adapt_delete(cluster, ns):
787788
"""
788789
testing whether KubeCluster.adapt will bring

dask_kubernetes/conftest.py

+46-40
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,12 @@
44
import os
55
import subprocess
66
import tempfile
7+
import uuid
78

89
from kopf.testing import KopfRunner
910
from pytest_kind.cluster import KindCluster
1011

11-
from dask_kubernetes.common.utils import check_dependency, get_current_namespace
12+
from dask_kubernetes.common.utils import check_dependency
1213

1314
DIR = pathlib.Path(__file__).parent.absolute()
1415

@@ -60,50 +61,48 @@ def install_istio(k8s_cluster):
6061
)
6162

6263

63-
@pytest.fixture(scope="session")
64+
@pytest.fixture(autouse=True)
6465
def ns(k8s_cluster):
65-
return get_current_namespace()
66-
67-
68-
def run_generate(crd_path, patch_path, temp_path):
69-
subprocess.run(
70-
["k8s-crd-resolver", "-r", "-j", patch_path, crd_path, temp_path],
71-
check=True,
72-
env={**os.environ},
73-
)
66+
ns = "dask-k8s-pytest-" + uuid.uuid4().hex[:10]
67+
k8s_cluster.kubectl("create", "ns", ns)
68+
yield ns
69+
k8s_cluster.kubectl("delete", "ns", ns, "--wait=false", "--ignore-not-found=true")
7470

7571

7672
@pytest.fixture(scope="session", autouse=True)
7773
def install_gateway(k8s_cluster):
78-
check_dependency("helm")
79-
# To ensure the operator can coexist with Gateway
80-
subprocess.run(
81-
[
82-
"helm",
83-
"upgrade",
84-
"dask-gateway",
85-
"dask-gateway",
86-
"--install",
87-
"--repo=https://helm.dask.org",
88-
"--create-namespace",
89-
"--namespace",
90-
"dask-gateway",
91-
],
92-
check=True,
93-
env={**os.environ},
94-
)
95-
yield
96-
subprocess.run(
97-
[
98-
"helm",
99-
"delete",
100-
"--namespace",
101-
"dask-gateway",
102-
"dask-gateway",
103-
],
104-
check=True,
105-
env={**os.environ},
106-
)
74+
if bool(os.environ.get("TEST_DASK_GATEWAY", False)):
75+
check_dependency("helm")
76+
# To ensure the operator can coexist with Gateway
77+
subprocess.run(
78+
[
79+
"helm",
80+
"upgrade",
81+
"dask-gateway",
82+
"dask-gateway",
83+
"--install",
84+
"--repo=https://helm.dask.org",
85+
"--create-namespace",
86+
"--namespace",
87+
"dask-gateway",
88+
],
89+
check=True,
90+
env={**os.environ},
91+
)
92+
yield
93+
subprocess.run(
94+
[
95+
"helm",
96+
"delete",
97+
"--namespace",
98+
"dask-gateway",
99+
"dask-gateway",
100+
],
101+
check=True,
102+
env={**os.environ},
103+
)
104+
else:
105+
yield
107106

108107

109108
@pytest.fixture(scope="session", autouse=True)
@@ -112,6 +111,13 @@ def customresources(k8s_cluster):
112111
temp_dir = tempfile.TemporaryDirectory()
113112
crd_path = os.path.join(DIR, "operator", "customresources")
114113

114+
def run_generate(crd_path, patch_path, temp_path):
115+
subprocess.run(
116+
["k8s-crd-resolver", "-r", "-j", patch_path, crd_path, temp_path],
117+
check=True,
118+
env={**os.environ},
119+
)
120+
115121
for crd in ["daskcluster", "daskworkergroup", "daskjob", "daskautoscaler"]:
116122
run_generate(
117123
os.path.join(crd_path, f"{crd}.yaml"),

dask_kubernetes/operator/controller/controller.py

+93-3
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@
1616
from dask_kubernetes.common.networking import get_scheduler_address
1717
from dask_kubernetes.aiopykube import HTTPClient, KubeConfig
1818
from dask_kubernetes.aiopykube.dask import DaskCluster
19-
from distributed.core import rpc
19+
from distributed.core import rpc, clean_exception
20+
from distributed.protocol.pickle import dumps
2021

2122
_ANNOTATION_NAMESPACES_TO_IGNORE = (
2223
"kopf.zalando.org",
@@ -416,7 +417,7 @@ async def retire_workers(
416417
)
417418

418419
# Otherwise try gracefully retiring via the RPC
419-
logger.info(
420+
logger.debug(
420421
f"Scaling {worker_group_name} failed via the HTTP API, falling back to the Dask RPC"
421422
)
422423
# Dask version mismatches between the operator and scheduler may cause this to fail in any number of unexpected ways
@@ -435,7 +436,7 @@ async def retire_workers(
435436
return workers_to_close
436437

437438
# Finally fall back to last-in-first-out scaling
438-
logger.info(
439+
logger.debug(
439440
f"Scaling {worker_group_name} failed via the Dask RPC, falling back to LIFO scaling"
440441
)
441442
async with kubernetes.client.api_client.ApiClient() as api_client:
@@ -447,6 +448,75 @@ async def retire_workers(
447448
return [w["metadata"]["name"] for w in workers.items[:-n_workers]]
448449

449450

451+
async def check_scheduler_idle(scheduler_service_name, namespace, logger):
452+
# Try getting idle time via HTTP API
453+
dashboard_address = await get_scheduler_address(
454+
scheduler_service_name,
455+
namespace,
456+
port_name="http-dashboard",
457+
allow_external=False,
458+
)
459+
async with aiohttp.ClientSession() as session:
460+
url = f"{dashboard_address}/api/v1/check_idle"
461+
async with session.get(url) as resp:
462+
if resp.status <= 300:
463+
idle_since = (await resp.json())["idle_since"]
464+
if idle_since:
465+
logger.debug("Scheduler idle since: %s", idle_since)
466+
return idle_since
467+
logger.debug(
468+
"Received %d response from scheduler API with body %s",
469+
resp.status,
470+
await resp.text(),
471+
)
472+
473+
# Otherwise try gracefully checking via the RPC
474+
logger.debug(
475+
f"Checking {scheduler_service_name} idleness failed via the HTTP API, falling back to the Dask RPC"
476+
)
477+
# Dask version mismatches between the operator and scheduler may cause this to fail in any number of unexpected ways
478+
with suppress(Exception):
479+
comm_address = await get_scheduler_address(
480+
scheduler_service_name,
481+
namespace,
482+
allow_external=False,
483+
)
484+
async with rpc(comm_address) as scheduler_comm:
485+
idle_since = await scheduler_comm.check_idle()
486+
if idle_since:
487+
logger.debug("Scheduler idle since: %s", idle_since)
488+
return idle_since
489+
490+
# Finally fall back to code injection via the Dask RPC for distributed<=2023.3.1
491+
logger.debug(
492+
f"Checking {scheduler_service_name} idleness failed via the Dask RPC, falling back to run_on_scheduler"
493+
)
494+
495+
def idle_since(dask_scheduler=None):
496+
if not dask_scheduler.idle_timeout:
497+
dask_scheduler.idle_timeout = 300
498+
dask_scheduler.check_idle()
499+
return dask_scheduler.idle_since
500+
501+
comm_address = await get_scheduler_address(
502+
scheduler_service_name,
503+
namespace,
504+
allow_external=False,
505+
)
506+
async with rpc(comm_address) as scheduler_comm:
507+
response = await scheduler_comm.run_function(
508+
function=dumps(idle_since),
509+
)
510+
if response["status"] == "error":
511+
typ, exc, tb = clean_exception(**response)
512+
raise exc.with_traceback(tb)
513+
else:
514+
idle_since = response["result"]
515+
if idle_since:
516+
logger.debug("Scheduler idle since: %s", idle_since)
517+
return idle_since
518+
519+
450520
async def get_desired_workers(scheduler_service_name, namespace, logger):
451521
# Try gracefully retiring via the HTTP API
452522
dashboard_address = await get_scheduler_address(
@@ -901,3 +971,23 @@ async def daskautoscaler_adapt(spec, name, namespace, logger, **kwargs):
901971
logger.debug(
902972
"Not autoscaling %s with %d workers", spec["cluster"], current_replicas
903973
)
974+
975+
976+
@kopf.timer("daskcluster.kubernetes.dask.org", interval=5.0)
977+
async def daskcluster_autoshutdown(spec, name, namespace, logger, **kwargs):
978+
if spec["idleTimeout"]:
979+
try:
980+
idle_since = await check_scheduler_idle(
981+
scheduler_service_name=f"{name}-scheduler",
982+
namespace=namespace,
983+
logger=logger,
984+
)
985+
except Exception as e:
986+
logger.warn("Unable to connect to scheduler, skipping autoshutdown check.")
987+
return
988+
if idle_since and time.time() > idle_since + spec["idleTimeout"]:
989+
api = HTTPClient(KubeConfig.from_env())
990+
cluster = await DaskCluster.objects(api, namespace=namespace).get_by_name(
991+
name
992+
)
993+
await cluster.delete()

dask_kubernetes/operator/controller/tests/resources/failedjob.yaml

-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ apiVersion: kubernetes.dask.org/v1
22
kind: DaskJob
33
metadata:
44
name: failed-job
5-
namespace: default
65
spec:
76
job:
87
spec:

dask_kubernetes/operator/controller/tests/resources/simplecluster.yaml

-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ apiVersion: kubernetes.dask.org/v1
22
kind: DaskCluster
33
metadata:
44
name: simple
5-
namespace: default
65
labels:
76
test-label: "label-value"
87
annotations:

dask_kubernetes/operator/controller/tests/resources/simplejob.yaml

-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ apiVersion: kubernetes.dask.org/v1
22
kind: DaskJob
33
metadata:
44
name: simple-job
5-
namespace: default
65
annotations:
76
test-annotation: "annotation-value"
87
"kopf.zalando.org/foobar": "should-not-be-propagated"

dask_kubernetes/operator/controller/tests/resources/simpleworkergroup.yaml

-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ apiVersion: kubernetes.dask.org/v1
22
kind: DaskWorkerGroup
33
metadata:
44
name: simple-additional
5-
namespace: default
65
spec:
76
cluster: simple
87
worker:

0 commit comments

Comments
 (0)