Skip to content

Commit 485f71c

Browse files
authored
Improve schedule engine performance (#2191)
* Improve schedule engine performance * Convert an info log to a debug log
1 parent ec2175f commit 485f71c

File tree

10 files changed

+83
-19
lines changed

10 files changed

+83
-19
lines changed

apps/webapp/app/env.server.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -618,6 +618,9 @@ const EnvironmentSchema = z.object({
618618
LEGACY_RUN_ENGINE_WORKER_IMMEDIATE_POLL_INTERVAL: z.coerce.number().int().default(50),
619619
LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(100),
620620
LEGACY_RUN_ENGINE_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000),
621+
LEGACY_RUN_ENGINE_WORKER_LOG_LEVEL: z
622+
.enum(["log", "error", "warn", "info", "debug"])
623+
.default("info"),
621624

622625
LEGACY_RUN_ENGINE_WORKER_REDIS_HOST: z
623626
.string()
@@ -661,6 +664,7 @@ const EnvironmentSchema = z.object({
661664
COMMON_WORKER_IMMEDIATE_POLL_INTERVAL: z.coerce.number().int().default(50),
662665
COMMON_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(100),
663666
COMMON_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000),
667+
COMMON_WORKER_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"),
664668

665669
COMMON_WORKER_REDIS_HOST: z
666670
.string()
@@ -699,6 +703,7 @@ const EnvironmentSchema = z.object({
699703
ALERTS_WORKER_IMMEDIATE_POLL_INTERVAL: z.coerce.number().int().default(100),
700704
ALERTS_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(100),
701705
ALERTS_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000),
706+
ALERTS_WORKER_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"),
702707

703708
ALERTS_WORKER_REDIS_HOST: z
704709
.string()
@@ -732,8 +737,8 @@ const EnvironmentSchema = z.object({
732737

733738
SCHEDULE_ENGINE_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"),
734739
SCHEDULE_WORKER_ENABLED: z.string().default(process.env.WORKER_ENABLED ?? "true"),
735-
SCHEDULE_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(1),
736-
SCHEDULE_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(1),
740+
SCHEDULE_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(2),
741+
SCHEDULE_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(10),
737742
SCHEDULE_WORKER_POLL_INTERVAL: z.coerce.number().int().default(1000),
738743
SCHEDULE_WORKER_IMMEDIATE_POLL_INTERVAL: z.coerce.number().int().default(50),
739744
SCHEDULE_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(50),

apps/webapp/app/v3/alertsWorker.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ function initializeWorker() {
6161
pollIntervalMs: env.ALERTS_WORKER_POLL_INTERVAL,
6262
immediatePollIntervalMs: env.ALERTS_WORKER_IMMEDIATE_POLL_INTERVAL,
6363
shutdownTimeoutMs: env.ALERTS_WORKER_SHUTDOWN_TIMEOUT_MS,
64-
logger: new Logger("AlertsWorker", "debug"),
64+
logger: new Logger("AlertsWorker", env.ALERTS_WORKER_LOG_LEVEL),
6565
jobs: {
6666
"v3.deliverAlert": async ({ payload }) => {
6767
const service = new DeliverAlertService();

apps/webapp/app/v3/commonWorker.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ function initializeWorker() {
196196
pollIntervalMs: env.COMMON_WORKER_POLL_INTERVAL,
197197
immediatePollIntervalMs: env.COMMON_WORKER_IMMEDIATE_POLL_INTERVAL,
198198
shutdownTimeoutMs: env.COMMON_WORKER_SHUTDOWN_TIMEOUT_MS,
199-
logger: new Logger("CommonWorker", "debug"),
199+
logger: new Logger("CommonWorker", env.COMMON_WORKER_LOG_LEVEL),
200200
jobs: {
201201
scheduleEmail: async ({ payload }) => {
202202
await sendEmail(payload);

apps/webapp/app/v3/legacyRunEngineWorker.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ function initializeWorker() {
6868
pollIntervalMs: env.LEGACY_RUN_ENGINE_WORKER_POLL_INTERVAL,
6969
immediatePollIntervalMs: env.LEGACY_RUN_ENGINE_WORKER_IMMEDIATE_POLL_INTERVAL,
7070
shutdownTimeoutMs: env.LEGACY_RUN_ENGINE_WORKER_SHUTDOWN_TIMEOUT_MS,
71-
logger: new Logger("LegacyRunEngineWorker", "debug"),
71+
logger: new Logger("LegacyRunEngineWorker", env.LEGACY_RUN_ENGINE_WORKER_LOG_LEVEL),
7272
jobs: {
7373
runHeartbeat: async ({ payload }) => {
7474
const service = new TaskRunHeartbeatFailedService();

apps/webapp/app/v3/scheduleEngine.server.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ function createScheduleEngine() {
6161
},
6262
worker: {
6363
concurrency: env.SCHEDULE_WORKER_CONCURRENCY_LIMIT,
64+
workers: env.SCHEDULE_WORKER_CONCURRENCY_WORKERS,
65+
tasksPerWorker: env.SCHEDULE_WORKER_CONCURRENCY_TASKS_PER_WORKER,
6466
pollIntervalMs: env.SCHEDULE_WORKER_POLL_INTERVAL,
6567
shutdownTimeoutMs: env.SCHEDULE_WORKER_SHUTDOWN_TIMEOUT_MS,
6668
disabled: env.SCHEDULE_WORKER_ENABLED === "0",

internal-packages/run-engine/src/engine/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ export class RunEngine {
152152
pollIntervalMs: options.worker.pollIntervalMs,
153153
immediatePollIntervalMs: options.worker.immediatePollIntervalMs,
154154
shutdownTimeoutMs: options.worker.shutdownTimeoutMs,
155-
logger: new Logger("RunEngineWorker", "debug"),
155+
logger: new Logger("RunEngineWorker", options.logLevel ?? "info"),
156156
jobs: {
157157
finishWaitpoint: async ({ payload }) => {
158158
await this.waitpointSystem.completeWaitpoint({

internal-packages/schedule-engine/src/engine/distributedScheduling.ts

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,21 +5,27 @@
55
*/
66
export function calculateDistributedExecutionTime(
77
exactScheduleTime: Date,
8-
distributionWindowSeconds: number = 30
8+
distributionWindowSeconds: number = 30,
9+
instanceId?: string
910
): Date {
10-
// Use the ISO string of the exact schedule time as the seed for consistency
11-
const seed = exactScheduleTime.toISOString();
11+
// Create seed by combining ISO timestamp with optional instanceId
12+
// This ensures different instances get different distributions even with same schedule time
13+
const timeSeed = exactScheduleTime.toISOString();
14+
const seed = instanceId ? `${timeSeed}:${instanceId}` : timeSeed;
15+
16+
// Use a better hash function (FNV-1a variant) for more uniform distribution
17+
let hash = 2166136261; // FNV offset basis (32-bit)
1218

13-
// Create a simple hash from the seed string
14-
let hash = 0;
1519
for (let i = 0; i < seed.length; i++) {
16-
const char = seed.charCodeAt(i);
17-
hash = (hash << 5) - hash + char;
18-
hash = hash & hash; // Convert to 32-bit integer
20+
hash ^= seed.charCodeAt(i);
21+
hash *= 16777619; // FNV prime (32-bit)
22+
// Keep it as 32-bit unsigned integer
23+
hash = hash >>> 0;
1924
}
2025

21-
// Convert hash to a value between 0 and 1
22-
const normalized = Math.abs(hash) / Math.pow(2, 31);
26+
// Convert hash to a value between 0 and 1 using better normalization
27+
// Use the full 32-bit range for better distribution
28+
const normalized = hash / 0xffffffff;
2329

2430
// Calculate offset in milliseconds (0 to distributionWindowSeconds * 1000)
2531
const offsetMs = Math.floor(normalized * distributionWindowSeconds * 1000);

internal-packages/schedule-engine/src/engine/index.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,8 @@ export class ScheduleEngine {
9292
catalog: scheduleWorkerCatalog,
9393
concurrency: {
9494
limit: options.worker.concurrency,
95+
workers: options.worker.workers,
96+
tasksPerWorker: options.worker.tasksPerWorker,
9597
},
9698
pollIntervalMs: options.worker.pollIntervalMs,
9799
shutdownTimeoutMs: options.worker.shutdownTimeoutMs,
@@ -590,7 +592,8 @@ export class ScheduleEngine {
590592

591593
const distributedExecutionTime = calculateDistributedExecutionTime(
592594
exactScheduleTime,
593-
this.distributionWindowSeconds
595+
this.distributionWindowSeconds,
596+
instanceId
594597
);
595598

596599
const distributionOffsetMs = exactScheduleTime.getTime() - distributedExecutionTime.getTime();

internal-packages/schedule-engine/src/engine/types.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ export interface ScheduleEngineOptions {
3535
redis: RedisOptions;
3636
worker: {
3737
concurrency: number;
38+
workers?: number;
39+
tasksPerWorker?: number;
3840
pollIntervalMs?: number;
3941
shutdownTimeoutMs?: number;
4042
disabled?: boolean;

packages/redis-worker/src/worker.ts

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ class Worker<TCatalog extends WorkerCatalog> {
206206

207207
// Launch a number of "worker loops" on the main thread.
208208
for (let i = 0; i < workers; i++) {
209-
this.workerLoops.push(this.runWorkerLoop(`worker-${nanoid(12)}`, tasksPerWorker));
209+
this.workerLoops.push(this.runWorkerLoop(`worker-${nanoid(12)}`, tasksPerWorker, i, workers));
210210
}
211211

212212
this.setupShutdownHandlers();
@@ -390,14 +390,43 @@ class Worker<TCatalog extends WorkerCatalog> {
390390
* The main loop that each worker runs. It repeatedly polls for items,
391391
* processes them, and then waits before the next iteration.
392392
*/
393-
private async runWorkerLoop(workerId: string, taskCount: number): Promise<void> {
393+
private async runWorkerLoop(
394+
workerId: string,
395+
taskCount: number,
396+
workerIndex: number,
397+
totalWorkers: number
398+
): Promise<void> {
394399
const pollIntervalMs = this.options.pollIntervalMs ?? 1000;
395400
const immediatePollIntervalMs = this.options.immediatePollIntervalMs ?? 100;
396401

402+
// Calculate the delay between starting each worker loop so that they don't all start at the same time.
403+
const delayBetweenWorkers = this.options.pollIntervalMs ?? 1000;
404+
const delay = delayBetweenWorkers * (totalWorkers - workerIndex);
405+
await Worker.delay(delay);
406+
407+
this.logger.info("Starting worker loop", {
408+
workerIndex,
409+
totalWorkers,
410+
delay,
411+
workerId,
412+
taskCount,
413+
pollIntervalMs,
414+
immediatePollIntervalMs,
415+
concurrencyOptions: this.concurrency,
416+
});
417+
397418
while (!this.isShuttingDown) {
398419
// Check overall load. If at capacity, wait a bit before trying to dequeue more.
399420
if (this.limiter.activeCount + this.limiter.pendingCount >= this.concurrency.limit) {
421+
this.logger.debug("Worker at capacity, waiting", {
422+
workerId,
423+
concurrencyOptions: this.concurrency,
424+
activeCount: this.limiter.activeCount,
425+
pendingCount: this.limiter.pendingCount,
426+
});
427+
400428
await Worker.delay(pollIntervalMs);
429+
401430
continue;
402431
}
403432

@@ -412,10 +441,25 @@ class Worker<TCatalog extends WorkerCatalog> {
412441
);
413442

414443
if (items.length === 0) {
444+
this.logger.debug("No items to dequeue", {
445+
workerId,
446+
concurrencyOptions: this.concurrency,
447+
activeCount: this.limiter.activeCount,
448+
pendingCount: this.limiter.pendingCount,
449+
});
450+
415451
await Worker.delay(pollIntervalMs);
416452
continue;
417453
}
418454

455+
this.logger.debug("Dequeued items", {
456+
workerId,
457+
itemCount: items.length,
458+
concurrencyOptions: this.concurrency,
459+
activeCount: this.limiter.activeCount,
460+
pendingCount: this.limiter.pendingCount,
461+
});
462+
419463
// Schedule each item using the limiter.
420464
for (const item of items) {
421465
this.limiter(() => this.processItem(item as AnyQueueItem, items.length, workerId)).catch(
@@ -433,6 +477,8 @@ class Worker<TCatalog extends WorkerCatalog> {
433477
// Wait briefly before immediately polling again since we processed items
434478
await Worker.delay(immediatePollIntervalMs);
435479
}
480+
481+
this.logger.info("Worker loop finished", { workerId });
436482
}
437483

438484
/**

0 commit comments

Comments
 (0)