Skip to content

Commit eb39298

Browse files
authored
runs replication leader lock expiration fix (#2050)
* runs replication leader lock expiration fix * Allow configuring the container image --max-old-space-size using NODE_MAX_OLD_SPACE_SIZE * Ability to configure the clickhouse keep alive settings * Add some logging because we might not be able to do telemetry
1 parent 0661ee5 commit eb39298

File tree

8 files changed

+318
-73
lines changed

8 files changed

+318
-73
lines changed

apps/webapp/app/env.server.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -766,9 +766,11 @@ const EnvironmentSchema = z.object({
766766
RUN_REPLICATION_LEADER_LOCK_EXTEND_INTERVAL_MS: z.coerce.number().int().default(10_000),
767767
RUN_REPLICATION_ACK_INTERVAL_SECONDS: z.coerce.number().int().default(10),
768768
RUN_REPLICATION_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"),
769-
RUN_REPLICATION_LEADER_LOCK_RETRY_COUNT: z.coerce.number().int().default(240),
769+
RUN_REPLICATION_LEADER_LOCK_ADDITIONAL_TIME_MS: z.coerce.number().int().default(10_000),
770770
RUN_REPLICATION_LEADER_LOCK_RETRY_INTERVAL_MS: z.coerce.number().int().default(500),
771771
RUN_REPLICATION_WAIT_FOR_ASYNC_INSERT: z.string().default("0"),
772+
RUN_REPLICATION_KEEP_ALIVE_ENABLED: z.string().default("1"),
773+
RUN_REPLICATION_KEEP_ALIVE_IDLE_SOCKET_TTL_MS: z.coerce.number().int().default(9_000),
772774
});
773775

774776
export type Environment = z.infer<typeof EnvironmentSchema>;

apps/webapp/app/services/runsReplicationInstance.server.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@ function initializeRunsReplicationInstance() {
2323
const clickhouse = new ClickHouse({
2424
url: env.RUN_REPLICATION_CLICKHOUSE_URL,
2525
name: "runs-replication",
26+
keepAlive: {
27+
enabled: env.RUN_REPLICATION_KEEP_ALIVE_ENABLED === "1",
28+
idleSocketTtl: env.RUN_REPLICATION_KEEP_ALIVE_IDLE_SOCKET_TTL_MS,
29+
},
30+
logLevel: env.RUN_REPLICATION_LOG_LEVEL,
2631
});
2732

2833
const service = new RunsReplicationService({
@@ -45,7 +50,7 @@ function initializeRunsReplicationInstance() {
4550
flushBatchSize: env.RUN_REPLICATION_FLUSH_BATCH_SIZE,
4651
leaderLockTimeoutMs: env.RUN_REPLICATION_LEADER_LOCK_TIMEOUT_MS,
4752
leaderLockExtendIntervalMs: env.RUN_REPLICATION_LEADER_LOCK_EXTEND_INTERVAL_MS,
48-
leaderLockRetryCount: env.RUN_REPLICATION_LEADER_LOCK_RETRY_COUNT,
53+
leaderLockAcquireAdditionalTimeMs: env.RUN_REPLICATION_LEADER_LOCK_ADDITIONAL_TIME_MS,
4954
leaderLockRetryIntervalMs: env.RUN_REPLICATION_LEADER_LOCK_RETRY_INTERVAL_MS,
5055
ackIntervalSeconds: env.RUN_REPLICATION_ACK_INTERVAL_SECONDS,
5156
logLevel: env.RUN_REPLICATION_LOG_LEVEL,

apps/webapp/app/services/runsReplicationService.server.ts

Lines changed: 59 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ export type RunsReplicationServiceOptions = {
4343
flushBatchSize?: number;
4444
leaderLockTimeoutMs?: number;
4545
leaderLockExtendIntervalMs?: number;
46-
leaderLockRetryCount?: number;
46+
leaderLockAcquireAdditionalTimeMs?: number;
4747
leaderLockRetryIntervalMs?: number;
4848
ackIntervalSeconds?: number;
4949
acknowledgeTimeoutMs?: number;
@@ -102,11 +102,11 @@ export class RunsReplicationService {
102102
redisOptions: options.redisOptions,
103103
autoAcknowledge: false,
104104
publicationActions: ["insert", "update", "delete"],
105-
logger: new Logger("LogicalReplicationClient", options.logLevel ?? "info"),
105+
logger: options.logger ?? new Logger("LogicalReplicationClient", options.logLevel ?? "info"),
106106
leaderLockTimeoutMs: options.leaderLockTimeoutMs ?? 30_000,
107107
leaderLockExtendIntervalMs: options.leaderLockExtendIntervalMs ?? 10_000,
108108
ackIntervalSeconds: options.ackIntervalSeconds ?? 10,
109-
leaderLockRetryCount: options.leaderLockRetryCount ?? 240,
109+
leaderLockAcquireAdditionalTimeMs: options.leaderLockAcquireAdditionalTimeMs ?? 10_000,
110110
leaderLockRetryIntervalMs: options.leaderLockRetryIntervalMs ?? 500,
111111
tracer: options.tracer,
112112
});
@@ -330,10 +330,6 @@ export class RunsReplicationService {
330330
return;
331331
}
332332

333-
this.logger.debug("Handling transaction", {
334-
transaction,
335-
});
336-
337333
const lsnToUInt64Start = process.hrtime.bigint();
338334

339335
// If there are events, we need to handle them
@@ -349,20 +345,32 @@ export class RunsReplicationService {
349345
}))
350346
);
351347

352-
const currentSpan = this._tracer.startSpan("handle_transaction", {
353-
attributes: {
354-
"transaction.xid": transaction.xid,
355-
"transaction.replication_lag_ms": transaction.replicationLagMs,
356-
"transaction.events": transaction.events.length,
357-
"transaction.commit_end_lsn": transaction.commitEndLsn,
358-
"transaction.parse_duration_ms": this._currentParseDurationMs ?? undefined,
359-
"transaction.lsn_to_uint64_ms": lsnToUInt64DurationMs,
360-
"transaction.version": _version.toString(),
348+
this._tracer
349+
.startSpan("handle_transaction", {
350+
attributes: {
351+
"transaction.xid": transaction.xid,
352+
"transaction.replication_lag_ms": transaction.replicationLagMs,
353+
"transaction.events": transaction.events.length,
354+
"transaction.commit_end_lsn": transaction.commitEndLsn,
355+
"transaction.parse_duration_ms": this._currentParseDurationMs ?? undefined,
356+
"transaction.lsn_to_uint64_ms": lsnToUInt64DurationMs,
357+
"transaction.version": _version.toString(),
358+
},
359+
startTime: transaction.beginStartTimestamp,
360+
})
361+
.end();
362+
363+
this.logger.debug("handle_transaction", {
364+
transaction: {
365+
xid: transaction.xid,
366+
commitLsn: transaction.commitLsn,
367+
commitEndLsn: transaction.commitEndLsn,
368+
events: transaction.events.length,
369+
parseDurationMs: this._currentParseDurationMs,
370+
lsnToUInt64DurationMs,
371+
version: _version.toString(),
361372
},
362-
startTime: transaction.beginStartTimestamp,
363373
});
364-
365-
currentSpan.end();
366374
}
367375

368376
async #acknowledgeLatestTransaction() {
@@ -387,7 +395,7 @@ export class RunsReplicationService {
387395
this._lastAcknowledgedAt = now;
388396
this._lastAcknowledgedLsn = this._latestCommitEndLsn;
389397

390-
this.logger.debug("Acknowledging transaction", {
398+
this.logger.debug("acknowledge_latest_transaction", {
391399
commitEndLsn: this._latestCommitEndLsn,
392400
lastAcknowledgedAt: this._lastAcknowledgedAt,
393401
});
@@ -747,7 +755,7 @@ export class ConcurrentFlushScheduler<T> {
747755
const callback = this.config.callback;
748756

749757
const promise = this.concurrencyLimiter(async () => {
750-
await startSpan(this._tracer, "flushNextBatch", async (span) => {
758+
return await startSpan(this._tracer, "flushNextBatch", async (span) => {
751759
const batchId = nanoid();
752760

753761
span.setAttribute("batch_id", batchId);
@@ -756,26 +764,47 @@ export class ConcurrentFlushScheduler<T> {
756764
span.setAttribute("concurrency_pending_count", this.concurrencyLimiter.pendingCount);
757765
span.setAttribute("concurrency_concurrency", this.concurrencyLimiter.concurrency);
758766

767+
this.logger.debug("flush_next_batch", {
768+
batchId,
769+
batchSize: batch.length,
770+
concurrencyActiveCount: this.concurrencyLimiter.activeCount,
771+
concurrencyPendingCount: this.concurrencyLimiter.pendingCount,
772+
concurrencyConcurrency: this.concurrencyLimiter.concurrency,
773+
});
774+
775+
const start = performance.now();
776+
759777
await callback(batchId, batch);
778+
779+
const end = performance.now();
780+
781+
const duration = end - start;
782+
783+
return {
784+
batchId,
785+
duration,
786+
};
760787
});
761788
});
762789

763-
const [error] = await tryCatch(promise);
790+
const [error, result] = await tryCatch(promise);
764791

765792
if (error) {
766-
this.logger.error("Error flushing batch", {
793+
this.logger.error("flush_batch_error", {
767794
error,
768795
});
769796

770797
this.failedBatchCount++;
798+
} else {
799+
this.logger.debug("flush_batch_complete", {
800+
totalBatches: 1,
801+
successfulBatches: 1,
802+
failedBatches: 0,
803+
totalFailedBatches: this.failedBatchCount,
804+
duration: result?.duration,
805+
batchId: result?.batchId,
806+
});
771807
}
772-
773-
this.logger.debug("Batch flush complete", {
774-
totalBatches: 1,
775-
successfulBatches: 1,
776-
failedBatches: 0,
777-
totalFailedBatches: this.failedBatchCount,
778-
});
779808
}
780809
}
781810

apps/webapp/test/runsReplicationService.test.ts

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1030,6 +1030,135 @@ describe("RunsReplicationService", () => {
10301030
}
10311031
);
10321032

1033+
containerTest(
1034+
"should handover leadership to a second service, and the second service should be able to extend the leader lock",
1035+
async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => {
1036+
await prisma.$executeRawUnsafe(`ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;`);
1037+
1038+
const clickhouse = new ClickHouse({
1039+
url: clickhouseContainer.getConnectionUrl(),
1040+
name: "runs-replication-shutdown-handover",
1041+
});
1042+
1043+
// Service A
1044+
const runsReplicationServiceA = new RunsReplicationService({
1045+
clickhouse,
1046+
pgConnectionUrl: postgresContainer.getConnectionUri(),
1047+
serviceName: "runs-replication-shutdown-handover",
1048+
slotName: "task_runs_to_clickhouse_v1",
1049+
publicationName: "task_runs_to_clickhouse_v1_publication",
1050+
redisOptions,
1051+
maxFlushConcurrency: 1,
1052+
flushIntervalMs: 100,
1053+
flushBatchSize: 1,
1054+
leaderLockTimeoutMs: 5000,
1055+
leaderLockExtendIntervalMs: 1000,
1056+
leaderLockAcquireAdditionalTimeMs: 10_000,
1057+
ackIntervalSeconds: 5,
1058+
logger: new Logger("runs-replication-shutdown-handover-a", "debug"),
1059+
});
1060+
1061+
await runsReplicationServiceA.start();
1062+
1063+
// Service A
1064+
const runsReplicationServiceB = new RunsReplicationService({
1065+
clickhouse,
1066+
pgConnectionUrl: postgresContainer.getConnectionUri(),
1067+
serviceName: "runs-replication-shutdown-handover",
1068+
slotName: "task_runs_to_clickhouse_v1",
1069+
publicationName: "task_runs_to_clickhouse_v1_publication",
1070+
redisOptions,
1071+
maxFlushConcurrency: 1,
1072+
flushIntervalMs: 100,
1073+
flushBatchSize: 1,
1074+
leaderLockTimeoutMs: 5000,
1075+
leaderLockExtendIntervalMs: 1000,
1076+
leaderLockAcquireAdditionalTimeMs: 10_000,
1077+
ackIntervalSeconds: 5,
1078+
logger: new Logger("runs-replication-shutdown-handover-b", "debug"),
1079+
});
1080+
1081+
// Now we need to initiate starting the second service, and after 6 seconds, we need to shutdown the first service
1082+
await Promise.all([
1083+
setTimeout(6000).then(() => runsReplicationServiceA.stop()),
1084+
runsReplicationServiceB.start(),
1085+
]);
1086+
1087+
const organization = await prisma.organization.create({
1088+
data: {
1089+
title: "test",
1090+
slug: "test",
1091+
},
1092+
});
1093+
1094+
const project = await prisma.project.create({
1095+
data: {
1096+
name: "test",
1097+
slug: "test",
1098+
organizationId: organization.id,
1099+
externalRef: "test",
1100+
},
1101+
});
1102+
1103+
const runtimeEnvironment = await prisma.runtimeEnvironment.create({
1104+
data: {
1105+
slug: "test",
1106+
type: "DEVELOPMENT",
1107+
projectId: project.id,
1108+
organizationId: organization.id,
1109+
apiKey: "test",
1110+
pkApiKey: "test",
1111+
shortcode: "test",
1112+
},
1113+
});
1114+
1115+
// Now we insert a row into the table
1116+
const taskRun = await prisma.taskRun.create({
1117+
data: {
1118+
friendlyId: "run_1234",
1119+
taskIdentifier: "my-task",
1120+
payload: JSON.stringify({ foo: "bar" }),
1121+
traceId: "1234",
1122+
spanId: "1234",
1123+
queue: "test",
1124+
runtimeEnvironmentId: runtimeEnvironment.id,
1125+
projectId: project.id,
1126+
organizationId: organization.id,
1127+
environmentType: "DEVELOPMENT",
1128+
engine: "V2",
1129+
},
1130+
});
1131+
1132+
await setTimeout(10_000);
1133+
1134+
// Check that the row was replicated to clickhouse
1135+
const queryRuns = clickhouse.reader.query({
1136+
name: "runs-replication",
1137+
query: "SELECT * FROM trigger_dev.task_runs_v2",
1138+
schema: z.any(),
1139+
});
1140+
1141+
const [queryError, result] = await queryRuns({});
1142+
1143+
expect(queryError).toBeNull();
1144+
expect(result?.length).toBe(1);
1145+
expect(result?.[0]).toEqual(
1146+
expect.objectContaining({
1147+
run_id: taskRun.id,
1148+
friendly_id: taskRun.friendlyId,
1149+
task_identifier: taskRun.taskIdentifier,
1150+
environment_id: runtimeEnvironment.id,
1151+
project_id: project.id,
1152+
organization_id: organization.id,
1153+
environment_type: "DEVELOPMENT",
1154+
engine: "V2",
1155+
})
1156+
);
1157+
1158+
await runsReplicationServiceB.stop();
1159+
}
1160+
);
1161+
10331162
containerTest(
10341163
"should replicate all 1,000 TaskRuns inserted in bulk to ClickHouse",
10351164
async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => {

docker/scripts/entrypoint.sh

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,13 @@ cp internal-packages/database/prisma/schema.prisma apps/webapp/prisma/
2727
cp node_modules/@prisma/engines/*.node apps/webapp/prisma/
2828

2929
cd /triggerdotdev/apps/webapp
30-
# exec dumb-init pnpm run start:local
31-
NODE_PATH='/triggerdotdev/node_modules/.pnpm/node_modules' exec dumb-init node --max-old-space-size=8192 ./build/server.js
30+
31+
32+
# Decide how much old-space memory Node should get.
33+
# Use $NODE_MAX_OLD_SPACE_SIZE if it’s set; otherwise fall back to 8192.
34+
MAX_OLD_SPACE_SIZE="${NODE_MAX_OLD_SPACE_SIZE:-8192}"
35+
36+
echo "Setting max old space size to ${MAX_OLD_SPACE_SIZE}"
37+
38+
NODE_PATH='/triggerdotdev/node_modules/.pnpm/node_modules' exec dumb-init node --max-old-space-size=${MAX_OLD_SPACE_SIZE} ./build/server.js
3239

internal-packages/clickhouse/src/client/client.ts

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,22 @@ import type {
1515
ClickhouseWriter,
1616
} from "./types.js";
1717
import { generateErrorMessage } from "zod-error";
18-
import { Logger } from "@trigger.dev/core/logger";
18+
import { Logger, type LogLevel } from "@trigger.dev/core/logger";
19+
import type { Agent as HttpAgent } from "http";
20+
import type { Agent as HttpsAgent } from "https";
1921

2022
export type ClickhouseConfig = {
2123
name: string;
2224
url: string;
2325
tracer?: Tracer;
26+
keepAlive?: {
27+
enabled?: boolean;
28+
idleSocketTtl?: number;
29+
};
30+
httpAgent?: HttpAgent | HttpsAgent;
2431
clickhouseSettings?: ClickHouseSettings;
2532
logger?: Logger;
33+
logLevel?: LogLevel;
2634
};
2735

2836
export class ClickhouseClient implements ClickhouseReader, ClickhouseWriter {
@@ -33,11 +41,12 @@ export class ClickhouseClient implements ClickhouseReader, ClickhouseWriter {
3341

3442
constructor(config: ClickhouseConfig) {
3543
this.name = config.name;
36-
this.logger = config.logger ?? new Logger("ClickhouseClient", "debug");
44+
this.logger = config.logger ?? new Logger("ClickhouseClient", config.logLevel ?? "info");
3745

3846
this.client = createClient({
3947
url: config.url,
40-
48+
keep_alive: config.keepAlive,
49+
http_agent: config.httpAgent,
4150
clickhouse_settings: {
4251
...config.clickhouseSettings,
4352
output_format_json_quote_64bit_integers: 0,

0 commit comments

Comments
 (0)