Skip to content

Commit 4fe1d49

Browse files
authored
Add v4 pod lifecycle handlers (#1819)
* patch k8s client to allow field selector on informers * add pod cleaner and tests * add failed pod handler and tests * remove supervisor test script for now
1 parent 5f48b9c commit 4fe1d49

12 files changed

+1487
-6
lines changed

apps/supervisor/package.json

+4-2
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,17 @@
66
"type": "module",
77
"scripts": {
88
"build": "tsc",
9-
"dev": "tsx --experimental-sqlite --require dotenv/config --watch src/index.ts || (echo '!! Remember to run: nvm use'; exit 1)",
10-
"start": "node --experimental-sqlite dist/index.js",
9+
"dev": "tsx --require dotenv/config --watch src/index.ts || (echo '!! Remember to run: nvm use'; exit 1)",
10+
"start": "node dist/index.js",
11+
"test:watch": "vitest",
1112
"typecheck": "tsc --noEmit"
1213
},
1314
"dependencies": {
1415
"@kubernetes/client-node": "^1.0.0",
1516
"@trigger.dev/core": "workspace:*",
1617
"dockerode": "^4.0.3",
1718
"nanoid": "^5.0.9",
19+
"prom-client": "^15.1.0",
1820
"socket.io": "4.7.4",
1921
"std-env": "^3.8.0",
2022
"tinyexec": "^0.3.1",

apps/supervisor/src/clients/kubernetes.ts

+13
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,28 @@
11
import * as k8s from "@kubernetes/client-node";
2+
import { Informer } from "@kubernetes/client-node";
3+
import { ListPromise } from "@kubernetes/client-node";
4+
import { KubernetesObject } from "@kubernetes/client-node";
25
import { assertExhaustive } from "@trigger.dev/core/utils";
36

47
export const RUNTIME_ENV = process.env.KUBERNETES_PORT ? "kubernetes" : "local";
58

69
export function createK8sApi() {
710
const kubeConfig = getKubeConfig();
811

12+
function makeInformer<T extends KubernetesObject>(
13+
path: string,
14+
listPromiseFn: ListPromise<T>,
15+
labelSelector?: string,
16+
fieldSelector?: string
17+
): Informer<T> {
18+
return k8s.makeInformer(kubeConfig, path, listPromiseFn, labelSelector, fieldSelector);
19+
}
20+
921
const api = {
1022
core: kubeConfig.makeApiClient(k8s.CoreV1Api),
1123
batch: kubeConfig.makeApiClient(k8s.BatchV1Api),
1224
apps: kubeConfig.makeApiClient(k8s.AppsV1Api),
25+
makeInformer,
1326
};
1427

1528
return api;

apps/supervisor/src/env.ts

+12
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,18 @@ const Env = z.object({
5151
KUBERNETES_NAMESPACE: z.string().default("default"),
5252
EPHEMERAL_STORAGE_SIZE_LIMIT: z.string().default("10Gi"),
5353
EPHEMERAL_STORAGE_SIZE_REQUEST: z.string().default("2Gi"),
54+
55+
// Metrics
56+
METRICS_COLLECT_DEFAULTS: BoolEnv.default(true),
57+
58+
// Pod cleaner
59+
POD_CLEANER_ENABLED: BoolEnv.default(true),
60+
POD_CLEANER_INTERVAL_MS: z.coerce.number().int().default(10000),
61+
POD_CLEANER_BATCH_SIZE: z.coerce.number().int().default(500),
62+
63+
// Failed pod handler
64+
FAILED_POD_HANDLER_ENABLED: BoolEnv.default(true),
65+
FAILED_POD_HANDLER_RECONNECT_INTERVAL_MS: z.coerce.number().int().default(1000),
5466
});
5567

5668
export const env = Env.parse(stdEnv);

apps/supervisor/src/index.ts

+43-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,15 @@ import {
1818
CheckpointClient,
1919
isKubernetesEnvironment,
2020
} from "@trigger.dev/core/v3/serverOnly";
21-
import { createK8sApi, RUNTIME_ENV } from "./clients/kubernetes.js";
21+
import { createK8sApi } from "./clients/kubernetes.js";
22+
import { collectDefaultMetrics } from "prom-client";
23+
import { register } from "./metrics.js";
24+
import { PodCleaner } from "./services/podCleaner.js";
25+
import { FailedPodHandler } from "./services/failedPodHandler.js";
26+
27+
if (env.METRICS_COLLECT_DEFAULTS) {
28+
collectDefaultMetrics({ register });
29+
}
2230

2331
class ManagedSupervisor {
2432
private readonly workerSession: SupervisorSession;
@@ -29,6 +37,9 @@ class ManagedSupervisor {
2937
private readonly resourceMonitor: ResourceMonitor;
3038
private readonly checkpointClient?: CheckpointClient;
3139

40+
private readonly podCleaner?: PodCleaner;
41+
private readonly failedPodHandler?: FailedPodHandler;
42+
3243
private readonly isKubernetes = isKubernetesEnvironment(env.KUBERNETES_FORCE_ENABLED);
3344
private readonly warmStartUrl = env.TRIGGER_WARM_START_URL;
3445

@@ -37,6 +48,21 @@ class ManagedSupervisor {
3748
const workloadApiDomain = env.TRIGGER_WORKLOAD_API_DOMAIN;
3849
const workloadApiPortExternal = env.TRIGGER_WORKLOAD_API_PORT_EXTERNAL;
3950

51+
if (env.POD_CLEANER_ENABLED) {
52+
this.podCleaner = new PodCleaner({
53+
namespace: env.KUBERNETES_NAMESPACE,
54+
batchSize: env.POD_CLEANER_BATCH_SIZE,
55+
intervalMs: env.POD_CLEANER_INTERVAL_MS,
56+
});
57+
}
58+
59+
if (env.FAILED_POD_HANDLER_ENABLED) {
60+
this.failedPodHandler = new FailedPodHandler({
61+
namespace: env.KUBERNETES_NAMESPACE,
62+
reconnectIntervalMs: env.FAILED_POD_HANDLER_RECONNECT_INTERVAL_MS,
63+
});
64+
}
65+
4066
if (this.warmStartUrl) {
4167
this.logger.log("[ManagedWorker] 🔥 Warm starts enabled", {
4268
warmStartUrl: this.warmStartUrl,
@@ -273,6 +299,14 @@ class ManagedSupervisor {
273299
async start() {
274300
this.logger.log("[ManagedWorker] Starting up");
275301

302+
if (this.podCleaner) {
303+
await this.podCleaner.start();
304+
}
305+
306+
if (this.failedPodHandler) {
307+
await this.failedPodHandler.start();
308+
}
309+
276310
if (env.TRIGGER_WORKLOAD_API_ENABLED) {
277311
this.logger.log("[ManagedWorker] Workload API enabled", {
278312
protocol: env.TRIGGER_WORKLOAD_API_PROTOCOL,
@@ -292,6 +326,14 @@ class ManagedSupervisor {
292326
async stop() {
293327
this.logger.log("[ManagedWorker] Shutting down");
294328
await this.httpServer.stop();
329+
330+
if (this.podCleaner) {
331+
await this.podCleaner.stop();
332+
}
333+
334+
if (this.failedPodHandler) {
335+
await this.failedPodHandler.stop();
336+
}
295337
}
296338
}
297339

apps/supervisor/src/metrics.ts

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
import { Registry } from "prom-client";
2+
3+
export const register = new Registry();

0 commit comments

Comments
 (0)