diff --git a/.changeset/wicked-ads-walk.md b/.changeset/wicked-ads-walk.md new file mode 100644 index 0000000000..c9190c709f --- /dev/null +++ b/.changeset/wicked-ads-walk.md @@ -0,0 +1,6 @@ +--- +"@trigger.dev/react-hooks": patch +"@trigger.dev/core": patch +--- + +Fixes an issue with realtime when re-subscribing to a run, that would temporarily display stale data and the changes. Now when re-subscribing to a run only the latest changes will be vended diff --git a/packages/core/src/v3/apiClient/stream.ts b/packages/core/src/v3/apiClient/stream.ts index 0e155fb33c..428fad8e94 100644 --- a/packages/core/src/v3/apiClient/stream.ts +++ b/packages/core/src/v3/apiClient/stream.ts @@ -114,6 +114,8 @@ class ReadableShapeStream = Row> { }, }); + let updatedKeys = new Set(); + // Create the transformed stream that processes messages and emits complete rows this.#changeStream = createAsyncIterableStream(source, { transform: (messages, controller) => { @@ -122,7 +124,7 @@ class ReadableShapeStream = Row> { } try { - const updatedKeys = new Set(); + let isUpToDate = false; for (const message of messages) { if (isChangeMessage(message)) { @@ -147,18 +149,23 @@ class ReadableShapeStream = Row> { if (message.headers.control === "must-refetch") { this.#currentState.clear(); this.#error = false; + } else if (message.headers.control === "up-to-date") { + isUpToDate = true; } } } // Now enqueue only one updated row per key, after all messages have been processed. - if (!this.#isStreamClosed) { + // If the stream is not up to date, we don't want to enqueue any rows. + if (!this.#isStreamClosed && isUpToDate) { for (const key of updatedKeys) { const finalRow = this.#currentState.get(key); if (finalRow) { controller.enqueue(finalRow); } } + + updatedKeys.clear(); } } catch (error) { console.error("Error processing stream messages:", error); diff --git a/references/hello-world/src/trigger/realtime.ts b/references/hello-world/src/trigger/realtime.ts index 951ec6735f..5a4d571c05 100644 --- a/references/hello-world/src/trigger/realtime.ts +++ b/references/hello-world/src/trigger/realtime.ts @@ -1,5 +1,6 @@ import { logger, runs, task } from "@trigger.dev/sdk"; import { helloWorldTask } from "./example.js"; +import { setTimeout } from "timers/promises"; export const realtimeByTagsTask = task({ id: "realtime-by-tags", @@ -32,3 +33,29 @@ export const realtimeByTagsTask = task({ }; }, }); + +export const realtimeUpToDateTask = task({ + id: "realtime-up-to-date", + run: async ({ runId }: { runId?: string }) => { + if (!runId) { + const handle = await helloWorldTask.trigger( + { hello: "world" }, + { + tags: ["hello-world", "realtime"], + } + ); + + runId = handle.id; + } + + logger.info("runId", { runId }); + + for await (const run of runs.subscribeToRun(runId, { stopOnCompletion: true })) { + logger.info("run", { run }); + } + + return { + message: "Hello, world!", + }; + }, +});