Skip to content

Commit 7842e9d

Browse files
authored
Warm start and restore improvements (#1793)
* handle warm start service failure on supervisor side * export zodfetch wrapper * add warm start client * rename to keepalive * add missing warm start header * make heartbeat and snapshot poll interval configurable * create warm start client in constructor * add warm start run debug log * re-enable checkpoints and improve error messages * reduce run pod container name cardinality * move runner id generator into core * fix runner id import * log when no checkpoint client and we try to restore * move run controller constructor to the top * fix import * support env overrides after restore
1 parent 5896165 commit 7842e9d

File tree

17 files changed

+599
-419
lines changed

17 files changed

+599
-419
lines changed

apps/supervisor/src/index.ts

+38-21
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,10 @@ class ManagedSupervisor {
8787
});
8888

8989
if (env.TRIGGER_CHECKPOINT_URL) {
90+
this.logger.log("[ManagedWorker] 🥶 Checkpoints enabled", {
91+
checkpointUrl: env.TRIGGER_CHECKPOINT_URL,
92+
});
93+
9094
this.checkpointClient = new CheckpointClient({
9195
apiUrl: new URL(env.TRIGGER_CHECKPOINT_URL),
9296
workerClient: this.workerSession.httpClient,
@@ -126,8 +130,13 @@ class ManagedSupervisor {
126130
if (message.checkpoint) {
127131
this.logger.log("[ManagedWorker] Restoring run", { runId: message.run.id });
128132

133+
if (!this.checkpointClient) {
134+
this.logger.error("[ManagedWorker] No checkpoint client", { runId: message.run.id });
135+
return;
136+
}
137+
129138
try {
130-
const didRestore = await this.checkpointClient?.restoreRun({
139+
const didRestore = await this.checkpointClient.restoreRun({
131140
runFriendlyId: message.run.friendlyId,
132141
snapshotFriendlyId: message.snapshot.friendlyId,
133142
checkpoint: message.checkpoint,
@@ -214,33 +223,41 @@ class ManagedSupervisor {
214223

215224
const warmStartUrlWithPath = new URL("/warm-start", this.warmStartUrl);
216225

217-
const res = await fetch(warmStartUrlWithPath.href, {
218-
method: "POST",
219-
headers: {
220-
"Content-Type": "application/json",
221-
},
222-
body: JSON.stringify({ dequeuedMessage }),
223-
});
224-
225-
if (!res.ok) {
226-
this.logger.error("[ManagedWorker] Warm start failed", {
227-
runId: dequeuedMessage.run.id,
226+
try {
227+
const res = await fetch(warmStartUrlWithPath.href, {
228+
method: "POST",
229+
headers: {
230+
"Content-Type": "application/json",
231+
},
232+
body: JSON.stringify({ dequeuedMessage }),
228233
});
229-
return false;
230-
}
231234

232-
const data = await res.json();
233-
const parsedData = z.object({ didWarmStart: z.boolean() }).safeParse(data);
235+
if (!res.ok) {
236+
this.logger.error("[ManagedWorker] Warm start failed", {
237+
runId: dequeuedMessage.run.id,
238+
});
239+
return false;
240+
}
241+
242+
const data = await res.json();
243+
const parsedData = z.object({ didWarmStart: z.boolean() }).safeParse(data);
234244

235-
if (!parsedData.success) {
236-
this.logger.error("[ManagedWorker] Warm start response invalid", {
245+
if (!parsedData.success) {
246+
this.logger.error("[ManagedWorker] Warm start response invalid", {
247+
runId: dequeuedMessage.run.id,
248+
data,
249+
});
250+
return false;
251+
}
252+
253+
return parsedData.data.didWarmStart;
254+
} catch (error) {
255+
this.logger.error("[ManagedWorker] Warm start error", {
237256
runId: dequeuedMessage.run.id,
238-
data,
257+
error,
239258
});
240259
return false;
241260
}
242-
243-
return parsedData.data.didWarmStart;
244261
}
245262

246263
async start() {

apps/supervisor/src/util.ts

-24
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,6 @@
1-
import { customAlphabet } from "nanoid";
2-
31
export function getDockerHostDomain() {
42
const isMacOs = process.platform === "darwin";
53
const isWindows = process.platform === "win32";
64

75
return isMacOs || isWindows ? "host.docker.internal" : "localhost";
86
}
9-
10-
export class IdGenerator {
11-
private alphabet: string;
12-
private length: number;
13-
private prefix: string;
14-
15-
constructor({ alphabet, length, prefix }: { alphabet: string; length: number; prefix: string }) {
16-
this.alphabet = alphabet;
17-
this.length = length;
18-
this.prefix = prefix;
19-
}
20-
21-
generate(): string {
22-
return `${this.prefix}${customAlphabet(this.alphabet, this.length)()}`;
23-
}
24-
}
25-
26-
export const RunnerId = new IdGenerator({
27-
alphabet: "123456789abcdefghijkmnopqrstuvwxyz",
28-
length: 20,
29-
prefix: "runner_",
30-
});

apps/supervisor/src/workloadManager/docker.ts

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
import { SimpleStructuredLogger } from "@trigger.dev/core/v3/utils/structuredLogger";
2+
import { RunnerId } from "@trigger.dev/core/v3/isomorphic";
23
import {
34
type WorkloadManager,
45
type WorkloadManagerCreateOptions,
56
type WorkloadManagerOptions,
67
} from "./types.js";
78
import { x } from "tinyexec";
89
import { env } from "../env.js";
9-
import { getDockerHostDomain, RunnerId } from "../util.js";
10+
import { getDockerHostDomain } from "../util.js";
1011

1112
export class DockerWorkloadManager implements WorkloadManager {
1213
private readonly logger = new SimpleStructuredLogger("docker-workload-provider");

apps/supervisor/src/workloadManager/kubernetes.ts

+2-3
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import {
44
type WorkloadManagerCreateOptions,
55
type WorkloadManagerOptions,
66
} from "./types.js";
7-
import { RunnerId } from "../util.js";
7+
import { RunnerId } from "@trigger.dev/core/v3/isomorphic";
88
import type { EnvironmentType, MachinePreset } from "@trigger.dev/core/v3";
99
import { env } from "../env.js";
1010
import { type K8sApi, createK8sApi, type k8s } from "../clients/kubernetes.js";
@@ -48,15 +48,14 @@ export class KubernetesWorkloadManager implements WorkloadManager {
4848
app: "task-run",
4949
"app.kubernetes.io/part-of": "trigger-worker",
5050
"app.kubernetes.io/component": "create",
51-
run: opts.runId,
5251
},
5352
},
5453
spec: {
5554
...this.#defaultPodSpec,
5655
terminationGracePeriodSeconds: 60 * 60,
5756
containers: [
5857
{
59-
name: runnerId,
58+
name: "run-controller",
6059
image: opts.image,
6160
ports: [
6261
{

apps/supervisor/src/workloadServer/index.ts

+10-11
Original file line numberDiff line numberDiff line change
@@ -200,30 +200,29 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
200200
handler: async ({ reply, params, req }) => {
201201
console.debug("Suspend request", { params, headers: req.headers });
202202

203-
const runnerId = this.runnerIdFromRequest(req);
204-
205-
if (!runnerId) {
206-
console.error("Invalid headers for suspend request", {
207-
...params,
208-
headers: req.headers,
209-
});
203+
if (!this.checkpointClient) {
210204
reply.json(
211205
{
212206
ok: false,
213-
error: "Invalid headers",
207+
error: "Checkpoints disabled",
214208
} satisfies WorkloadSuspendRunResponseBody,
215209
false,
216210
400
217211
);
218212
return;
219213
}
220214

221-
if (!this.checkpointClient) {
222-
console.error("Checkpoint client unavailable - suspending impossible", { params });
215+
const runnerId = this.runnerIdFromRequest(req);
216+
217+
if (!runnerId) {
218+
console.error("Invalid headers for suspend request", {
219+
...params,
220+
headers: req.headers,
221+
});
223222
reply.json(
224223
{
225224
ok: false,
226-
error: "Suspends are not enabled",
225+
error: "Invalid headers",
227226
} satisfies WorkloadSuspendRunResponseBody,
228227
false,
229228
400

packages/cli-v3/src/apiClient.ts

+1-50
Original file line numberDiff line numberDiff line change
@@ -31,17 +31,15 @@ import {
3131
DevDequeueRequestBody,
3232
DevDequeueResponseBody,
3333
PromoteDeploymentResponseBody,
34-
ListRunResponse,
3534
} from "@trigger.dev/core/v3";
36-
import { zodfetch, zodfetchSSE, ApiError } from "@trigger.dev/core/v3/zodfetch";
35+
import { ApiResult, wrapZodFetch, zodfetchSSE } from "@trigger.dev/core/v3/zodfetch";
3736
import { logger } from "./utilities/logger.js";
3837
import {
3938
WorkloadDebugLogRequestBody,
4039
WorkloadHeartbeatRequestBody,
4140
WorkloadHeartbeatResponseBody,
4241
WorkloadRunAttemptCompleteRequestBody,
4342
WorkloadRunAttemptCompleteResponseBody,
44-
WorkloadRunAttemptStartRequestBody,
4543
WorkloadRunAttemptStartResponseBody,
4644
WorkloadRunLatestSnapshotResponseBody,
4745
} from "@trigger.dev/core/v3/workers";
@@ -644,50 +642,3 @@ export class CliApiClient {
644642
);
645643
}
646644
}
647-
648-
type ApiResult<TSuccessResult> =
649-
| { success: true; data: TSuccessResult }
650-
| {
651-
success: false;
652-
error: string;
653-
};
654-
655-
async function wrapZodFetch<T extends z.ZodTypeAny>(
656-
schema: T,
657-
url: string,
658-
requestInit?: RequestInit
659-
): Promise<ApiResult<z.infer<T>>> {
660-
try {
661-
const response = await zodfetch(schema, url, requestInit, {
662-
retry: {
663-
minTimeoutInMs: 500,
664-
maxTimeoutInMs: 5000,
665-
maxAttempts: 5,
666-
factor: 2,
667-
randomize: false,
668-
},
669-
});
670-
671-
return {
672-
success: true,
673-
data: response,
674-
};
675-
} catch (error) {
676-
if (error instanceof ApiError) {
677-
return {
678-
success: false,
679-
error: error.message,
680-
};
681-
} else if (error instanceof Error) {
682-
return {
683-
success: false,
684-
error: error.message,
685-
};
686-
} else {
687-
return {
688-
success: false,
689-
error: String(error),
690-
};
691-
}
692-
}
693-
}

0 commit comments

Comments
 (0)