Skip to content

v4: Improved run locking #2173

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open

v4: Improved run locking #2173

wants to merge 7 commits into from

Conversation

ericallam
Copy link
Member

No description provided.

Copy link

changeset-bot bot commented Jun 13, 2025

⚠️ No Changeset found

Latest commit: 918cba8

Merging this PR will not cause a version bump for any packages. If these changes should not result in a new version, you're good to go. If these changes should result in a version bump, you need to add a changeset.

Click here to learn what changesets are, and how to add one.

Click here if you're a maintainer who wants to add a changeset to this PR

Copy link
Contributor

coderabbitai bot commented Jun 13, 2025

Walkthrough

The changes introduce a manual lock acquisition and management system in the RunLocker class, replacing the previous implicit retry mechanism with explicit retry logic, exponential backoff with jitter, automatic lock extension, and comprehensive cleanup of lock contexts. A new error class LockAcquisitionTimeoutError was added to signal lock acquisition failures after retries. The enqueueRun method in EnqueueSystem now accepts an optional skipRunLock parameter to conditionally bypass lock acquisition. The trigger method in RunEngine removes the run lock around its critical section and sets skipRunLock to true when enqueuing runs immediately. The RunEngine constructor and related configuration support new run lock parameters—duration, automatic extension threshold, and retry configuration—exposed via environment variables. Numerous tests were added to cover the new locking behavior and configuration. All existing public API signatures remain unchanged, with additions limited to new methods, parameters, error classes, and configuration options.

✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (2)
internal-packages/run-engine/src/engine/tests/locking.test.ts (1)

11-17: Consider centralising test-setup to reduce repetition

Each test re-creates a Logger + RunLocker and manually calls runLock.quit() in finally. Extracting this into a beforeEach / afterEach (or helper util) would DRY the suite and guarantee cleanup even if construction throws.

Also applies to: 18-29

internal-packages/run-engine/src/engine/systems/enqueueSystem.ts (1)

36-37: Default to explicit false for skipRunLock & invert condition for readability

skipRunLock is optional; when omitted it is undefined, meaning !skipRunLock evaluates to true (locks).
Making the default explicit:

-    skipRunLock,
+    skipRunLock = false,

and using skipRunLock ? routine() : lock(...) would avoid the double-negation and make the intent clearer.

Also applies to: 55-60

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8a5e7f3 and dfdc94e.

📒 Files selected for processing (4)
  • internal-packages/run-engine/src/engine/index.ts (1 hunks)
  • internal-packages/run-engine/src/engine/locking.ts (4 hunks)
  • internal-packages/run-engine/src/engine/systems/enqueueSystem.ts (2 hunks)
  • internal-packages/run-engine/src/engine/tests/locking.test.ts (12 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
internal-packages/run-engine/src/engine/locking.ts (3)
packages/core/src/logger.ts (2)
  • Logger (19-130)
  • error (66-70)
internal-packages/run-engine/src/run-queue/index.ts (2)
  • name (196-198)
  • T (963-992)
packages/redis-worker/src/worker.ts (1)
  • delay (569-571)
⏰ Context from checks skipped due to timeout of 90000ms (25)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 10)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 10)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 10)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 10)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (10, 10)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (9, 10)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 10)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 10)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 10)
  • GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 10)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
  • GitHub Check: typecheck / typecheck
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
  • GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (2)
internal-packages/run-engine/src/engine/index.ts (1)

489-534: Re-evaluate concurrency guarantees after removing the run-lock

Previously the trigger flow ran inside a 5 s runLock.
With the lock removed and skipRunLock: true passed to enqueueRun, concurrent invocations for the same run (or parent-blocking logic) now rely solely on DB constraints/transactions.

If two workers could call trigger with identical idempotency keys very close together, you may re-introduce races around:

  • creating the associated waitpoint,
  • blocking the parent run,
  • double-enqueueing.

Please confirm that upstream uniqueness constraints (e.g. on taskRun.id / idempotencyKey) and transactional scopes are sufficient, or consider keeping a lightweight lock here.

internal-packages/run-engine/src/engine/locking.ts (1)

343-348: Missing abort-signal when lock skipped

lockIf(false, …) invokes routine() with no arguments, but the callback type allows an optional signal.
Call-sites that rely on signal! will now receive undefined.
Either:

routine(signal ?? (undefined as any))

or document that the signal is only defined when locking occurs.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (3)
internal-packages/run-engine/src/engine/types.ts (1)

49-52: Document numeric units & defaults for new props

duration, automaticExtensionThreshold, and retryConfig lack JSDoc in this public type. Down-stream integrators will guess the unit (ms? seconds?). Please copy the inline docs from LockRetryConfig / RunLocker so that IDE tooltips stay useful.

internal-packages/run-engine/src/engine/locking.ts (1)

421-433: Potential race: release issued while extension still in-flight

#cleanupExtension clears the timeout but does not await the context.extension promise.
If an extension RPC is still pending, calling lock.release() immediately afterwards can result in:

  1. release() succeeds
  2. extension completes ⇒ tries to mutate an already-released lock, throwing ExecutionError

Safer pattern:

this.#cleanupExtension(manualContext);
- const [releaseError] = await tryCatch(lock!.release());
+ if (manualContext.extension) {
+   await manualContext.extension.catch(() => {}); // ignore failures
+ }
+ const [releaseError] = await tryCatch(lock!.release());

Pre-empts spurious errors in noisy logs.

internal-packages/run-engine/src/engine/tests/locking.test.ts (1)

901-914: Tests rely on real timers → high flake risk

Several cases (extension-test, timing-test, etc.) assert on wall-clock Date.now() differences with 100–200 ms tolerances; CI jitter regularly exceeds that.

Use Vitest fake timers (vi.useFakeTimers() / vi.advanceTimersByTime()) or widen the expectations:

- expect(elapsed).toBeGreaterThan(190);
- expect(elapsed).toBeLessThan(1000);
+ expect(elapsed).toBeGreaterThan(150);
+ expect(elapsed).toBeLessThan(1500);

to avoid intermittent failures.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between dfdc94e and 5cefaf7.

📒 Files selected for processing (6)
  • apps/webapp/app/env.server.ts (1 hunks)
  • apps/webapp/app/v3/runEngine.server.ts (1 hunks)
  • internal-packages/run-engine/src/engine/index.ts (2 hunks)
  • internal-packages/run-engine/src/engine/locking.ts (5 hunks)
  • internal-packages/run-engine/src/engine/tests/locking.test.ts (13 hunks)
  • internal-packages/run-engine/src/engine/types.ts (2 hunks)
✅ Files skipped from review due to trivial changes (1)
  • apps/webapp/app/env.server.ts
🚧 Files skipped from review as they are similar to previous changes (1)
  • internal-packages/run-engine/src/engine/index.ts
🧰 Additional context used
🧬 Code Graph Analysis (1)
internal-packages/run-engine/src/engine/types.ts (1)
internal-packages/run-engine/src/engine/locking.ts (1)
  • LockRetryConfig (54-67)
⏰ Context from checks skipped due to timeout of 90000ms (25)
  • GitHub Check: Analyze (javascript-typescript)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 10)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 10)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 10)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 10)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (10, 10)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (9, 10)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 10)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 10)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 10)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 10)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
  • GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
  • GitHub Check: typecheck / typecheck
🔇 Additional comments (2)
internal-packages/run-engine/src/engine/types.ts (1)

16-17: Import path may break under classic TS module resolution

import { LockRetryConfig } from "./locking.js"; assumes ESM resolution with .js-suffixed imports.
If the compiler is invoked with the default moduleResolution: "node" (classic CJS output) the file emitted on disk will be locking.js, but TS will look for locking.js.ts at build time and fail to locate it.

Two safer options:

-import { LockRetryConfig } from "./locking.js";
+import type { LockRetryConfig } from "./locking.js"; // keeps ESM style but types-only

or (works for both commonjs & esnext targets):

-import { LockRetryConfig } from "./locking.js";
+import type { LockRetryConfig } from "./locking";
internal-packages/run-engine/src/engine/locking.ts (1)

442-453: automaticExtensionThreshold > duration silently disables extensions

Early-exit guard checks this.automaticExtensionThreshold > duration - 100, but nothing prevents callers from passing an absurd threshold (e.g. 10 000 ms on a 5 000 ms lock).
Recommend clamping threshold to a sensible fraction of duration (e.g. Math.min(threshold, duration / 2)) and logging a warning.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (1)
internal-packages/run-engine/src/engine/locking.ts (1)

69-76: Unused LockOptions interface – dead code

LockOptions is declared but never referenced. Remove it (or wire the constructor
to accept a single options: LockOptions & { redis: Redis; logger: … } object)
to avoid confusion and stale documentation.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5cefaf7 and f066447.

📒 Files selected for processing (1)
  • internal-packages/run-engine/src/engine/locking.ts (6 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (25)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (9, 10)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 10)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (10, 10)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 10)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 10)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 10)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 10)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 10)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 10)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 10)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
  • GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
  • GitHub Check: typecheck / typecheck
  • GitHub Check: Analyze (javascript-typescript)

Comment on lines +478 to +488
context.timeout = undefined;

const [error, newLock] = await tryCatch(context.lock.extend(duration));

if (!error && newLock) {
context.lock = newLock;
// Only schedule next extension if we haven't been cleaned up
if (context.timeout !== null) {
scheduleNext();
}
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Race-condition in extension clean-up sentinel

#cleanupExtension sets context.timeout = null to signal shutdown, but
#extendLock immediately overwrites context.timeout = undefined;
before checking the sentinel, so a late in-flight extension will re-schedule
itself after cleanup.

-  context.timeout = undefined;
+  // Keep the sentinel value set by #cleanupExtension so we can
+  // detect that the context was disposed.
+  if (context.timeout !== null) {
+    context.timeout = undefined; // running
+  }

(or move the sentinel check before the assignment).
This prevents timer leaks when quit() is invoked while an extension is pending.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
context.timeout = undefined;
const [error, newLock] = await tryCatch(context.lock.extend(duration));
if (!error && newLock) {
context.lock = newLock;
// Only schedule next extension if we haven't been cleaned up
if (context.timeout !== null) {
scheduleNext();
}
} else {
// Only clear the previous timer if we haven't been cleaned up
- context.timeout = undefined;
+ // Keep the sentinel value set by #cleanupExtension so we can detect disposal
+ if (context.timeout !== null) {
+ context.timeout = undefined; // running
+ }
const [error, newLock] = await tryCatch(context.lock.extend(duration));
if (!error && newLock) {
context.lock = newLock;
// Only schedule next extension if we haven't been cleaned up
if (context.timeout !== null) {
scheduleNext();
}
} else {
// ...
}
🤖 Prompt for AI Agents
In internal-packages/run-engine/src/engine/locking.ts around lines 478 to 488,
the code sets context.timeout to undefined before checking if it is null, which
causes a race condition allowing a late extension to reschedule itself after
cleanup. To fix this, move the sentinel check for context.timeout being null
before assigning undefined, or perform the null check prior to modifying
context.timeout, ensuring that no new timers are scheduled after cleanup has
started.

Comment on lines 100 to 110
// Initialize configuration values
this.defaultDuration = options.defaultDuration ?? 5000;
this.automaticExtensionThreshold = options.automaticExtensionThreshold ?? 500;

constructor(options: { redis: Redis; logger: Logger; tracer: Tracer; meter?: Meter }) {
this.redlock = new Redlock([options.redis], {
driftFactor: 0.01,
retryCount: 10,
retryDelay: 200, // time in ms
retryJitter: 200, // time in ms
automaticExtensionThreshold: 500, // time in ms
retryCount: 0, // Disable Redlock's internal retrying - we handle retries ourselves
retryDelay: 200, // Not used since retryCount = 0
retryJitter: 200, // Not used since retryCount = 0
automaticExtensionThreshold: this.automaticExtensionThreshold,
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Duplicate auto-extension: you enable Redlock’s internal extension and roll your own

automaticExtensionThreshold is forwarded to the Redlock ctor while a full manual
extension loop (#setupAutoExtension#extendLock) is implemented below.
This results in two independent timers extending the same lock, doubling Redis
traffic and creating subtle race conditions when one extension fails.

-      automaticExtensionThreshold: this.automaticExtensionThreshold,
+      // Disable Redlock’s built-in auto-extension – handled manually
+      automaticExtensionThreshold: 0,

(or omit the option entirely).

Test failure scenarios where the manual extender encounters a fault: with the internal
extender still running, the signal may never abort, masking the error.

🤖 Prompt for AI Agents
In internal-packages/run-engine/src/engine/locking.ts around lines 100 to 110,
the Redlock constructor is passed the automaticExtensionThreshold option while
the code also implements a manual auto-extension loop, causing duplicate lock
extensions and potential race conditions. To fix this, remove the
automaticExtensionThreshold option from the Redlock constructor to disable its
internal auto-extension, ensuring only the manual extension logic runs and
avoids redundant Redis traffic and error masking.

Comment on lines +25 to +40
export class LockAcquisitionTimeoutError extends Error {
constructor(
public readonly resources: string[],
public readonly totalWaitTime: number,
public readonly attempts: number,
message?: string
) {
super(
message ||
`Failed to acquire lock on resources [${resources.join(
", "
)}] after ${totalWaitTime}ms and ${attempts} attempts`
);
this.name = "LockAcquisitionTimeoutError";
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix Error subclass – prototype chain & stack not preserved

Extending Error in TS/JS requires manually resetting the prototype (and optionally capturing the stack) or instanceof checks may fail after transpilation.

 export class LockAcquisitionTimeoutError extends Error {
   constructor(
     public readonly resources: string[],
     public readonly totalWaitTime: number,
     public readonly attempts: number,
     message?: string
   ) {
-    super(
+    super(
       message ||
         `Failed to acquire lock on resources [${resources.join(
           ", "
         )}] after ${totalWaitTime}ms and ${attempts} attempts`
     );
     this.name = "LockAcquisitionTimeoutError";
+
+    // Maintain prototype chain – critical when targeting ES5
+    Object.setPrototypeOf(this, new.target.prototype);
+    // Optional but useful for debuggability
+    /* c8 ignore next 3 */
+    if (Error.captureStackTrace) {
+      Error.captureStackTrace(this, this.constructor);
+    }
   }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
export class LockAcquisitionTimeoutError extends Error {
constructor(
public readonly resources: string[],
public readonly totalWaitTime: number,
public readonly attempts: number,
message?: string
) {
super(
message ||
`Failed to acquire lock on resources [${resources.join(
", "
)}] after ${totalWaitTime}ms and ${attempts} attempts`
);
this.name = "LockAcquisitionTimeoutError";
}
}
export class LockAcquisitionTimeoutError extends Error {
constructor(
public readonly resources: string[],
public readonly totalWaitTime: number,
public readonly attempts: number,
message?: string
) {
super(
message ||
`Failed to acquire lock on resources [${resources.join(
", "
)}] after ${totalWaitTime}ms and ${attempts} attempts`
);
this.name = "LockAcquisitionTimeoutError";
// Maintain prototype chain – critical when targeting ES5
Object.setPrototypeOf(this, new.target.prototype);
// Optional but useful for debuggability
/* c8 ignore next 3 */
if (Error.captureStackTrace) {
Error.captureStackTrace(this, this.constructor);
}
}
}
🤖 Prompt for AI Agents
In internal-packages/run-engine/src/engine/locking.ts around lines 25 to 40, the
LockAcquisitionTimeoutError class extends Error but does not reset the
prototype, which can cause instanceof checks to fail and the stack trace to be
incorrect after transpilation. Fix this by adding a line in the constructor to
set Object.setPrototypeOf(this, LockAcquisitionTimeoutError.prototype) and
optionally capture the stack trace using Error.captureStackTrace(this,
this.constructor) if available.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🔭 Outside diff range comments (1)
internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts (1)

84-90: 🛠️ Refactor suggestion

Multiple run-attempt code paths lack dedicated handling for LockAcquisitionTimeoutError

startRunAttempt, attemptSucceeded, attemptFailed, tryNackAndRequeue, and cancelRun all switched to the 3-arg lock() but still assume the lock will always be acquired.
When contention is high the new locker can raise the timeout error and these methods will bubble an unhandled exception up the RPC boundary.

Minimal fix per method:

try {
  return await this.$.runLock.lock("startRunAttempt", [runId], async () => {  });
} catch (e) {
  if (e instanceof LockAcquisitionTimeoutError) {
    // Visibility for ops + retry behaviour
    this.$.logger.debug("Could not acquire run lock, will retry", { runId, op: "startRunAttempt" });
    throw new ServiceValidationError("Run is locked, please retry", 423);
  }
  throw e;
}

(Adjust status code/message as appropriate.)
Adds resilience without changing core logic.

Also applies to: 440-446, 596-602, 906-912, 983-989

♻️ Duplicate comments (5)
internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts (1)

38-42: Same lock-timeout concern as in ttlSystem – see previous comment for rationale and suggested pattern.

internal-packages/run-engine/src/engine/locking.ts (4)

481-492: lockIf(false, …) leaks the AbortSignal contract

When the condition is false, the routine is invoked without a signal, diverging from code paths that expect one.
Either always supply a dummy AbortSignal or tighten the callback type for the unlocked case.


25-40: ⚠️ Potential issue

Preserve Error prototype & stack

LockAcquisitionTimeoutError still omits the customary Object.setPrototypeOf (and optional captureStackTrace). Without this, instanceof checks can fail once transpiled.

   super( /* … */ );
   this.name = "LockAcquisitionTimeoutError";
+
+  // Fix prototype chain for ES5 targets
+  Object.setPrototypeOf(this, new.target.prototype);
+  /* c8 ignore next 3 */
+  if (Error.captureStackTrace) {
+    Error.captureStackTrace(this, this.constructor);
+  }

95-101: 🛠️ Refactor suggestion

Redundant auto-extension: disable Redlock’s built-in timer

The constructor forwards automaticExtensionThreshold to Redlock while also running a custom extension loop.
This doubles extension traffic and introduces race conditions when either timer fails.

-  automaticExtensionThreshold: this.automaticExtensionThreshold,
+  // Disable Redlock's internal extender – handled manually
+  automaticExtensionThreshold: 0,

431-441: 🛠️ Refactor suggestion

Timer-cleanup race still present

#extendLock sets context.timeout = undefined before checking the sentinel (!== null).
If quit() has already set the sentinel to null, the check passes and a new timer is scheduled, leaking after cleanup.

Move the null‐check before clobbering the value, or guard with a local copy.

🧹 Nitpick comments (4)
internal-packages/run-engine/src/engine/systems/ttlSystem.ts (1)

24-27: Lock now waits for the default timeout – verify that’s acceptable & add graceful fallback

Dropping the explicit 5_000 ms makes the call rely entirely on the new RunLocker defaults.
If expireRun occasionally runs while another long-running operation still owns the lock, it may sit in the retry loop far longer than the original 5 s, or eventually throw LockAcquisitionTimeoutError.

Consider guarding the call so the job simply bails out (and logs) when the lock can’t be obtained, rather than bubbling the error up and turning a routine TTL sweep into an exception:

@@
-    await this.$.runLock.lock("expireRun", [runId], async () => {
+    try {
+      await this.$.runLock.lock("expireRun", [runId], async () => {-    });
+      });
+    } catch (err) {
+      if (err instanceof LockAcquisitionTimeoutError) {
+        this.$.logger.debug("Could not acquire run lock to expire run, skipping", { runId });
+        return;
+      }
+      throw err;
+    }

(import LockAcquisitionTimeoutError from the locking module).
This mirrors the behaviour you already added in dequeueSystem and keeps the TTL worker robust.

internal-packages/run-engine/src/engine/systems/checkpointSystem.ts (1)

54-58: Lock acquisition error handling missing for checkpoint paths

createCheckpoint and continueRunExecution now rely on default retry/timeout but don’t catch LockAcquisitionTimeoutError.
If a worker is holding the run lock for an extended period these public API calls will throw 500s instead of returning a clean “try again later”.

Apply the graceful-bail pattern (see ttlSystem comment) or propagate a domain-specific 409/423 response so callers can safely retry.

Also applies to: 268-272

internal-packages/run-engine/src/engine/systems/dequeueSystem.ts (1)

548-552: Pending-version helper mirrors previous pattern – same optional timeout-handling advice

The helper currently throws on lock-timeout; if that’s acceptable, nothing to do.
Otherwise consider the graceful fallback pattern discussed earlier.

internal-packages/run-engine/src/engine/tests/locking.test.ts (1)

8-18: Close Redis clients in tests

Each test creates its own Redis connection via createRedisClient but never calls redis.quit() / redis.disconnect().
Over dozens of cases this can leak sockets and slow CI. Add a finally block (or afterEach) to close the client alongside runLock.quit().
Example:

-  const redis = createRedisClient(redisOptions);
+  const redis = createRedisClient(redisOptions);
   const runLock = new RunLocker({ /* … */ });

   try {
     …
   } finally {
     await runLock.quit();
+    await redis.quit();
   }

Also applies to: 32-36

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f066447 and 918cba8.

📒 Files selected for processing (11)
  • internal-packages/run-engine/src/engine/index.ts (3 hunks)
  • internal-packages/run-engine/src/engine/locking.ts (6 hunks)
  • internal-packages/run-engine/src/engine/systems/checkpointSystem.ts (2 hunks)
  • internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts (1 hunks)
  • internal-packages/run-engine/src/engine/systems/dequeueSystem.ts (2 hunks)
  • internal-packages/run-engine/src/engine/systems/enqueueSystem.ts (2 hunks)
  • internal-packages/run-engine/src/engine/systems/releaseConcurrencySystem.ts (0 hunks)
  • internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts (5 hunks)
  • internal-packages/run-engine/src/engine/systems/ttlSystem.ts (1 hunks)
  • internal-packages/run-engine/src/engine/systems/waitpointSystem.ts (2 hunks)
  • internal-packages/run-engine/src/engine/tests/locking.test.ts (10 hunks)
💤 Files with no reviewable changes (1)
  • internal-packages/run-engine/src/engine/systems/releaseConcurrencySystem.ts
✅ Files skipped from review due to trivial changes (1)
  • internal-packages/run-engine/src/engine/systems/waitpointSystem.ts
🚧 Files skipped from review as they are similar to previous changes (2)
  • internal-packages/run-engine/src/engine/systems/enqueueSystem.ts
  • internal-packages/run-engine/src/engine/index.ts
🧰 Additional context used
🧬 Code Graph Analysis (1)
internal-packages/run-engine/src/engine/tests/locking.test.ts (2)
internal-packages/testcontainers/src/index.ts (1)
  • redisTest (167-167)
internal-packages/run-engine/src/engine/locking.ts (2)
  • RunLocker (69-535)
  • LockAcquisitionTimeoutError (25-40)
⏰ Context from checks skipped due to timeout of 90000ms (25)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (10, 10)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 10)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 10)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 10)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (9, 10)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 10)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 10)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 10)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 10)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 10)
  • GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
  • GitHub Check: typecheck / typecheck
  • GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (1)
internal-packages/run-engine/src/engine/systems/dequeueSystem.ts (1)

78-84: 👍 Wrapped in try/catch already – looks good

dequeueFromWorkerQueue is already inside a try that logs and requeues on failure, so removing the explicit timeout is safe here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant