Skip to content

Commit 18f7766

Browse files
authored
Runlock telemetry (#1974)
* Added error throwing tests to runlock * Added some more run lock tests * Added customConditions so the test can use the Logger * Fix for JSON error * Added a span to the run lock with a name
1 parent aa10279 commit 18f7766

12 files changed

+604
-370
lines changed

internal-packages/run-engine/src/engine/index.ts

+7-3
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,11 @@ export class RunEngine {
9292
},
9393
}
9494
);
95-
this.runLock = new RunLocker({ redis: this.runLockRedis });
95+
this.runLock = new RunLocker({
96+
redis: this.runLockRedis,
97+
logger: this.logger,
98+
tracer: trace.getTracer("RunLocker"),
99+
});
96100

97101
const keys = new RunQueueFullKeyProducer();
98102

@@ -491,7 +495,7 @@ export class RunEngine {
491495

492496
span.setAttribute("runId", taskRun.id);
493497

494-
await this.runLock.lock([taskRun.id], 5000, async (signal) => {
498+
await this.runLock.lock("trigger", [taskRun.id], 5000, async (signal) => {
495499
//create associated waitpoint (this completes when the run completes)
496500
const associatedWaitpoint = await this.waitpointSystem.createRunAssociatedWaitpoint(
497501
prisma,
@@ -1162,7 +1166,7 @@ export class RunEngine {
11621166
tx?: PrismaClientOrTransaction;
11631167
}) {
11641168
const prisma = tx ?? this.prisma;
1165-
return await this.runLock.lock([runId], 5_000, async () => {
1169+
return await this.runLock.lock("handleStalledSnapshot", [runId], 5_000, async () => {
11661170
const latestSnapshot = await getLatestExecutionSnapshot(prisma, runId);
11671171
if (latestSnapshot.id !== snapshotId) {
11681172
this.logger.log(

internal-packages/run-engine/src/engine/locking.ts

+41-12
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@ const { default: Redlock } = require("redlock");
33
import { AsyncLocalStorage } from "async_hooks";
44
import { Redis } from "@internal/redis";
55
import * as redlock from "redlock";
6+
import { tryCatch } from "@trigger.dev/core";
7+
import { Logger } from "@trigger.dev/core/logger";
8+
import { startSpan, Tracer } from "@internal/tracing";
69

710
interface LockContext {
811
resources: string;
@@ -12,8 +15,10 @@ interface LockContext {
1215
export class RunLocker {
1316
private redlock: InstanceType<typeof redlock.default>;
1417
private asyncLocalStorage: AsyncLocalStorage<LockContext>;
18+
private logger: Logger;
19+
private tracer: Tracer;
1520

16-
constructor(options: { redis: Redis }) {
21+
constructor(options: { redis: Redis; logger: Logger; tracer: Tracer }) {
1722
this.redlock = new Redlock([options.redis], {
1823
driftFactor: 0.01,
1924
retryCount: 10,
@@ -22,30 +27,54 @@ export class RunLocker {
2227
automaticExtensionThreshold: 500, // time in ms
2328
});
2429
this.asyncLocalStorage = new AsyncLocalStorage<LockContext>();
30+
this.logger = options.logger;
31+
this.tracer = options.tracer;
2532
}
2633

2734
/** Locks resources using RedLock. It won't lock again if we're already inside a lock with the same resources. */
2835
async lock<T>(
36+
name: string,
2937
resources: string[],
3038
duration: number,
3139
routine: (signal: redlock.RedlockAbortSignal) => Promise<T>
3240
): Promise<T> {
3341
const currentContext = this.asyncLocalStorage.getStore();
3442
const joinedResources = resources.sort().join(",");
3543

36-
if (currentContext && currentContext.resources === joinedResources) {
37-
// We're already inside a lock with the same resources, just run the routine
38-
return routine(currentContext.signal);
39-
}
44+
return startSpan(
45+
this.tracer,
46+
"RunLocker.lock",
47+
async (span) => {
48+
if (currentContext && currentContext.resources === joinedResources) {
49+
span.setAttribute("nested", true);
50+
// We're already inside a lock with the same resources, just run the routine
51+
return routine(currentContext.signal);
52+
}
4053

41-
// Different resources or not in a lock, proceed with new lock
42-
return this.redlock.using(resources, duration, async (signal) => {
43-
const newContext: LockContext = { resources: joinedResources, signal };
54+
span.setAttribute("nested", false);
4455

45-
return this.asyncLocalStorage.run(newContext, async () => {
46-
return routine(signal);
47-
});
48-
});
56+
// Different resources or not in a lock, proceed with new lock
57+
const [error, result] = await tryCatch(
58+
this.redlock.using(resources, duration, async (signal) => {
59+
const newContext: LockContext = { resources: joinedResources, signal };
60+
61+
return this.asyncLocalStorage.run(newContext, async () => {
62+
return routine(signal);
63+
});
64+
})
65+
);
66+
67+
if (error) {
68+
this.logger.error("[RunLocker] Error locking resources", { error, resources, duration });
69+
throw error;
70+
}
71+
72+
return result;
73+
},
74+
{
75+
attributes: { name, resources, timeout: duration },
76+
}
77+
);
4978
}
5079

5180
isInsideLock(): boolean {

internal-packages/run-engine/src/engine/systems/checkpointSystem.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ export class CheckpointSystem {
5353
}): Promise<CreateCheckpointResult> {
5454
const prisma = tx ?? this.$.prisma;
5555

56-
return await this.$.runLock.lock([runId], 5_000, async () => {
56+
return await this.$.runLock.lock("createCheckpoint", [runId], 5_000, async () => {
5757
const snapshot = await getLatestExecutionSnapshot(prisma, runId);
5858

5959
const isValidSnapshot =
@@ -238,7 +238,7 @@ export class CheckpointSystem {
238238
}): Promise<ExecutionResult> {
239239
const prisma = tx ?? this.$.prisma;
240240

241-
return await this.$.runLock.lock([runId], 5_000, async () => {
241+
return await this.$.runLock.lock("continueRunExecution", [runId], 5_000, async () => {
242242
const snapshot = await getLatestExecutionSnapshot(prisma, runId);
243243

244244
if (snapshot.id !== snapshotId) {

internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ export class DelayedRunSystem {
3737
this.$.tracer,
3838
"rescheduleDelayedRun",
3939
async () => {
40-
return await this.$.runLock.lock([runId], 5_000, async () => {
40+
return await this.$.runLock.lock("rescheduleDelayedRun", [runId], 5_000, async () => {
4141
const snapshot = await getLatestExecutionSnapshot(prisma, runId);
4242

4343
//if the run isn't just created then we can't reschedule it

0 commit comments

Comments
 (0)