Skip to content

Commit bb606a1

Browse files
authored
Don’t allowed canceled delayed runs to be put into the queue (#1981)
1 parent e9c6cc7 commit bb606a1

File tree

5 files changed

+132
-1
lines changed

5 files changed

+132
-1
lines changed

internal-packages/run-engine/src/engine/index.ts

+1
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,7 @@ export class RunEngine {
303303
executionSnapshotSystem: this.executionSnapshotSystem,
304304
batchSystem: this.batchSystem,
305305
waitpointSystem: this.waitpointSystem,
306+
delayedRunSystem: this.delayedRunSystem,
306307
machines: this.options.machines,
307308
});
308309

internal-packages/run-engine/src/engine/statuses.ts

+5
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,11 @@ export function isFinishedOrPendingFinished(status: TaskRunExecutionStatus): boo
3636
return finishedStatuses.includes(status);
3737
}
3838

39+
export function isInitialState(status: TaskRunExecutionStatus): boolean {
40+
const startedStatuses: TaskRunExecutionStatus[] = ["RUN_CREATED"];
41+
return startedStatuses.includes(status);
42+
}
43+
3944
export function isFinalRunStatus(status: TaskRunStatus): boolean {
4045
const finalStatuses: TaskRunStatus[] = [
4146
"CANCELED",

internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts

+4
Original file line numberDiff line numberDiff line change
@@ -131,4 +131,8 @@ export class DelayedRunSystem {
131131
availableAt: delayUntil,
132132
});
133133
}
134+
135+
async preventDelayedRunFromBeingEnqueued({ runId }: { runId: string }) {
136+
await this.$.worker.ack(`enqueueDelayedRun:${runId}`);
137+
}
134138
}

internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts

+11-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import { runStatusFromError, ServiceValidationError } from "../errors.js";
2222
import { sendNotificationToWorker } from "../eventBus.js";
2323
import { getMachinePreset } from "../machinePresets.js";
2424
import { retryOutcomeFromCompletion } from "../retrying.js";
25-
import { isExecuting } from "../statuses.js";
25+
import { isExecuting, isInitialState } from "../statuses.js";
2626
import { RunEngineOptions } from "../types.js";
2727
import { BatchSystem } from "./batchSystem.js";
2828
import {
@@ -32,12 +32,14 @@ import {
3232
} from "./executionSnapshotSystem.js";
3333
import { SystemResources } from "./systems.js";
3434
import { WaitpointSystem } from "./waitpointSystem.js";
35+
import { DelayedRunSystem } from "./delayedRunSystem.js";
3536

3637
export type RunAttemptSystemOptions = {
3738
resources: SystemResources;
3839
executionSnapshotSystem: ExecutionSnapshotSystem;
3940
batchSystem: BatchSystem;
4041
waitpointSystem: WaitpointSystem;
42+
delayedRunSystem: DelayedRunSystem;
4143
retryWarmStartThresholdMs?: number;
4244
machines: RunEngineOptions["machines"];
4345
};
@@ -47,12 +49,14 @@ export class RunAttemptSystem {
4749
private readonly executionSnapshotSystem: ExecutionSnapshotSystem;
4850
private readonly batchSystem: BatchSystem;
4951
private readonly waitpointSystem: WaitpointSystem;
52+
private readonly delayedRunSystem: DelayedRunSystem;
5053

5154
constructor(private readonly options: RunAttemptSystemOptions) {
5255
this.$ = options.resources;
5356
this.executionSnapshotSystem = options.executionSnapshotSystem;
5457
this.batchSystem = options.batchSystem;
5558
this.waitpointSystem = options.waitpointSystem;
59+
this.delayedRunSystem = options.delayedRunSystem;
5660
}
5761

5862
public async startRunAttempt({
@@ -968,6 +972,7 @@ export class RunAttemptSystem {
968972
completedAt: true,
969973
taskEventStore: true,
970974
parentTaskRunId: true,
975+
delayUntil: true,
971976
runtimeEnvironment: {
972977
select: {
973978
organizationId: true,
@@ -986,6 +991,11 @@ export class RunAttemptSystem {
986991
},
987992
});
988993

994+
//if the run is delayed and hasn't started yet, we need to prevent it being added to the queue in future
995+
if (isInitialState(latestSnapshot.executionStatus) && run.delayUntil) {
996+
await this.delayedRunSystem.preventDelayedRunFromBeingEnqueued({ runId });
997+
}
998+
989999
//remove it from the queue and release concurrency
9901000
await this.$.runQueue.acknowledgeMessage(run.runtimeEnvironment.organizationId, runId);
9911001

internal-packages/run-engine/src/engine/tests/delays.test.ts

+111
Original file line numberDiff line numberDiff line change
@@ -290,4 +290,115 @@ describe("RunEngine delays", () => {
290290
engine.quit();
291291
}
292292
});
293+
294+
containerTest("Cancelling a delayed run", async ({ prisma, redisOptions }) => {
295+
//create environment
296+
const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");
297+
298+
const engine = new RunEngine({
299+
prisma,
300+
worker: {
301+
redis: redisOptions,
302+
workers: 1,
303+
tasksPerWorker: 10,
304+
pollIntervalMs: 100,
305+
},
306+
queue: {
307+
redis: redisOptions,
308+
},
309+
runLock: {
310+
redis: redisOptions,
311+
},
312+
machines: {
313+
defaultMachine: "small-1x",
314+
machines: {
315+
"small-1x": {
316+
name: "small-1x" as const,
317+
cpu: 0.5,
318+
memory: 0.5,
319+
centsPerMs: 0.0001,
320+
},
321+
},
322+
baseCostInCents: 0.0001,
323+
},
324+
tracer: trace.getTracer("test", "0.0.0"),
325+
});
326+
327+
try {
328+
const taskIdentifier = "test-task";
329+
330+
//create background worker
331+
const backgroundWorker = await setupBackgroundWorker(
332+
engine,
333+
authenticatedEnvironment,
334+
taskIdentifier
335+
);
336+
337+
//trigger the run with a 1 second delay
338+
const run = await engine.trigger(
339+
{
340+
number: 1,
341+
friendlyId: "run_1234",
342+
environment: authenticatedEnvironment,
343+
taskIdentifier,
344+
payload: "{}",
345+
payloadType: "application/json",
346+
context: {},
347+
traceContext: {},
348+
traceId: "t12345",
349+
spanId: "s12345",
350+
masterQueue: "main",
351+
queue: "task/test-task",
352+
isTest: false,
353+
tags: [],
354+
delayUntil: new Date(Date.now() + 1000),
355+
},
356+
prisma
357+
);
358+
359+
//verify it's created but not queued
360+
const executionData = await engine.getRunExecutionData({ runId: run.id });
361+
assertNonNullable(executionData);
362+
expect(executionData.snapshot.executionStatus).toBe("RUN_CREATED");
363+
expect(run.status).toBe("DELAYED");
364+
365+
//cancel the run
366+
await engine.cancelRun({
367+
runId: run.id,
368+
reason: "Cancelled by test",
369+
});
370+
371+
//verify it's cancelled
372+
const executionData2 = await engine.getRunExecutionData({ runId: run.id });
373+
assertNonNullable(executionData2);
374+
expect(executionData2.snapshot.executionStatus).toBe("FINISHED");
375+
expect(executionData2.run.status).toBe("CANCELED");
376+
377+
//wait past the original delay time
378+
await setTimeout(1500);
379+
380+
//verify the run is still cancelled
381+
const executionData3 = await engine.getRunExecutionData({ runId: run.id });
382+
assertNonNullable(executionData3);
383+
expect(executionData3.snapshot.executionStatus).toBe("FINISHED");
384+
expect(executionData3.run.status).toBe("CANCELED");
385+
386+
//attempt to dequeue - should get nothing
387+
const dequeued = await engine.dequeueFromMasterQueue({
388+
consumerId: "test_12345",
389+
masterQueue: run.masterQueue,
390+
maxRunCount: 10,
391+
});
392+
393+
expect(dequeued.length).toBe(0);
394+
395+
//verify final state is still cancelled
396+
const executionData4 = await engine.getRunExecutionData({ runId: run.id });
397+
assertNonNullable(executionData4);
398+
expect(executionData4.snapshot.executionStatus).toBe("FINISHED");
399+
expect(executionData4.run.status).toBe("CANCELED");
400+
} finally {
401+
engine.quit();
402+
}
403+
});
293404
});

0 commit comments

Comments
 (0)