Skip to content

Commit b8ac219

Browse files
authored
Merge pull request #2399 from firebase/next
Release firestore-bigquery-export
2 parents 247c93b + f29b52b commit b8ac219

File tree

5 files changed

+61
-75
lines changed

5 files changed

+61
-75
lines changed

firestore-bigquery-export/CHANGELOG.md

+4
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
## Version 0.2.3
2+
3+
fix: pass full document resource name to bigquery
4+
15
## Version 0.2.2
26

37
fix: remove default value on DATABASE_REGION

firestore-bigquery-export/extension.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
# limitations under the License.
1414

1515
name: firestore-bigquery-export
16-
version: 0.2.2
16+
version: 0.2.3
1717
specVersion: v1beta
1818

1919
displayName: Stream Firestore to BigQuery

firestore-bigquery-export/functions/__tests__/__snapshots__/config.test.ts.snap

+1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ Object {
2626
"maxDispatchesPerSecond": 10,
2727
"maxEnqueueAttempts": 3,
2828
"maxStaleness": undefined,
29+
"projectId": undefined,
2930
"refreshIntervalMinutes": undefined,
3031
"tableId": "my_table",
3132
"timePartitioning": null,

firestore-bigquery-export/functions/src/config.ts

+1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ export function clustering(clusters: string | undefined) {
3434

3535
export default {
3636
bqProjectId: process.env.BIGQUERY_PROJECT_ID,
37+
projectId: process.env.PROJECT_ID,
3738
databaseId: process.env.DATABASE || "(default)",
3839
databaseRegion: process.env.DATABASE_REGION,
3940
collectionPath: process.env.COLLECTION_PATH,

firestore-bigquery-export/functions/src/index.ts

+54-74
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,8 @@ import {
3333
import * as logs from "./logs";
3434
import * as events from "./events";
3535
import { getChangeType, getDocumentId } from "./util";
36-
import { DocumentSnapshot } from "firebase-admin/firestore";
3736

38-
// Configuration for the Firestore Event History Tracker.
37+
// Configuration for the Firestore Event History Tracker
3938
const eventTrackerConfig = {
4039
firestoreInstanceId: config.databaseId,
4140
tableId: config.tableId,
@@ -67,27 +66,27 @@ const eventTrackerConfig = {
6766
logLevel: config.logLevel,
6867
};
6968

70-
// Initialize the Firestore Event History Tracker with the given configuration.
71-
const eventTracker: FirestoreBigQueryEventHistoryTracker =
72-
new FirestoreBigQueryEventHistoryTracker(eventTrackerConfig);
69+
const eventTracker = new FirestoreBigQueryEventHistoryTracker(
70+
eventTrackerConfig
71+
);
7372

74-
// Initialize logging.
7573
logs.logger.setLogLevel(config.logLevel);
7674
logs.init();
7775

78-
/** Initialize Firebase Admin SDK if not already initialized */
7976
if (admin.apps.length === 0) {
8077
admin.initializeApp();
8178
}
8279

83-
// Setup the event channel for EventArc.
8480
events.setupEventChannel();
8581

86-
// Define a type for task data to ensure consistency
82+
/**
83+
* Task data structure for BigQuery synchronization
84+
*/
8785
interface SyncBigQueryTaskData {
8886
timestamp: string;
8987
eventId: string;
90-
documentPath: string;
88+
relativePath: string;
89+
fullResourceName: string;
9190
changeType: ChangeType;
9291
documentId: string;
9392
params: Record<string, any> | null;
@@ -96,39 +95,38 @@ interface SyncBigQueryTaskData {
9695
}
9796

9897
/**
99-
* Cloud Function to handle enqueued tasks to synchronize Firestore changes to BigQuery.
98+
* Handles enqueued tasks for syncing Firestore changes to BigQuery
10099
*/
101100
export const syncBigQuery = functions.tasks
102101
.taskQueue()
103102
.onDispatch(async (taskData: SyncBigQueryTaskData, ctx) => {
104-
const documentName = taskData.documentPath;
103+
const fullResourceName = taskData.fullResourceName;
105104
const eventId = taskData.eventId;
106105
const operation = taskData.changeType;
107106

108107
logs.logEventAction(
109108
"Firestore event received by onDispatch trigger",
110-
documentName,
109+
fullResourceName,
111110
eventId,
112111
operation
113112
);
114113

115114
try {
116-
// Use the shared function to write the event to BigQuery
117115
await recordEventToBigQuery(
118116
taskData.changeType,
119117
taskData.documentId,
118+
taskData.fullResourceName,
120119
taskData.data,
121120
taskData.oldData,
122121
taskData
123122
);
124123

125-
// Record a success event in EventArc, if configured
126124
await events.recordSuccessEvent({
127125
subject: taskData.documentId,
128126
data: {
129127
timestamp: taskData.timestamp,
130128
operation: taskData.changeType,
131-
documentName: taskData.documentPath,
129+
documentName: taskData.fullResourceName,
132130
documentId: taskData.documentId,
133131
pathParams: taskData.params,
134132
eventId: taskData.eventId,
@@ -137,13 +135,11 @@ export const syncBigQuery = functions.tasks
137135
},
138136
});
139137

140-
// Log completion of the task.
141138
logs.complete();
142139
} catch (err) {
143-
// Log error and throw it to handle in the calling function.
144140
logs.logFailedEventAction(
145141
"Failed to write event to BigQuery from onDispatch handler",
146-
documentName,
142+
fullResourceName,
147143
eventId,
148144
operation,
149145
err as Error
@@ -153,35 +149,34 @@ export const syncBigQuery = functions.tasks
153149
}
154150
});
155151

152+
/**
153+
* Main Cloud Function that triggers on Firestore document changes
154+
* and sends the data to BigQuery
155+
*/
156156
export const fsexportbigquery = onDocumentWritten(
157157
`${config.collectionPath}/{documentId}`,
158158
async (event) => {
159159
const { data, ...context } = event;
160-
161-
// Start logging the function execution.
162160
logs.start();
163161

164-
// Determine the type of change (CREATE, UPDATE, DELETE) from the new event data.
165162
const changeType = getChangeType(data);
166163
const documentId = getDocumentId(data);
167-
168-
// Check if the document is newly created or deleted.
169164
const isCreated = changeType === ChangeType.CREATE;
170165
const isDeleted = changeType === ChangeType.DELETE;
171166

172-
// Get the new and old data from the snapshot.
173167
const newData = isDeleted ? undefined : data.after.data();
174168
const oldData =
175169
isCreated || config.excludeOldData ? undefined : data.before.data();
176170

177-
// check this is the full doc name
178-
const documentName = context.document;
171+
const relativeName = context.document;
172+
const projectId = config.projectId;
173+
const fullResourceName = `projects/${projectId}/databases/${config.databaseId}/documents/${relativeName}`;
179174
const eventId = context.id;
180175
const operation = changeType;
181176

182177
logs.logEventAction(
183178
"Firestore event received by onDocumentWritten trigger",
184-
documentName,
179+
fullResourceName,
185180
eventId,
186181
operation
187182
);
@@ -190,13 +185,12 @@ export const fsexportbigquery = onDocumentWritten(
190185
let serializedOldData: any;
191186

192187
try {
193-
// Serialize the data before processing.
194188
serializedData = eventTracker.serializeData(newData);
195189
serializedOldData = eventTracker.serializeData(oldData);
196190
} catch (err) {
197191
logs.logFailedEventAction(
198192
"Failed to serialize data",
199-
documentName,
193+
fullResourceName,
200194
eventId,
201195
operation,
202196
err as Error
@@ -205,7 +199,6 @@ export const fsexportbigquery = onDocumentWritten(
205199
}
206200

207201
try {
208-
// Record the start event in EventArc, if configured.
209202
await events.recordStartEvent({
210203
documentId,
211204
changeType,
@@ -219,16 +212,17 @@ export const fsexportbigquery = onDocumentWritten(
219212
}
220213

221214
try {
222-
// Write the change event to BigQuery.
223215
await recordEventToBigQuery(
224216
changeType,
225217
documentId,
218+
fullResourceName,
226219
serializedData,
227220
serializedOldData,
228221
{
229222
timestamp: context.time,
230223
eventId: context.id,
231-
documentPath: context.document,
224+
relativePath: context.document,
225+
fullResourceName,
232226
changeType,
233227
documentId,
234228
params: config.wildcardIds ? context.params : null,
@@ -238,11 +232,12 @@ export const fsexportbigquery = onDocumentWritten(
238232
);
239233
} catch (err) {
240234
logs.failedToWriteToBigQueryImmediately(err as Error);
241-
// Handle enqueue errors with retries and backup to GCS.
235+
242236
await attemptToEnqueue(err, {
243237
timestamp: context.time,
244238
eventId: context.id,
245-
documentPath: context.document,
239+
relativePath: context.document,
240+
fullResourceName: fullResourceName,
246241
changeType,
247242
documentId,
248243
params: config.wildcardIds ? context.params : null,
@@ -251,49 +246,49 @@ export const fsexportbigquery = onDocumentWritten(
251246
});
252247
}
253248

254-
// Log the successful completion of the function.
255249
logs.complete();
256250
}
257251
);
258252

259253
/**
260-
* Record the event to the Firestore Event History Tracker and BigQuery.
254+
* Records a Firestore document change event to BigQuery
261255
*
262-
* @param changeType - The type of change (CREATE, UPDATE, DELETE).
263-
* @param documentId - The ID of the Firestore document.
264-
* @param serializedData - The serialized new data of the document.
265-
* @param serializedOldData - The serialized old data of the document.
266-
* @param taskData - The task data containing event information.
256+
* @param changeType - The type of change (CREATE, UPDATE, DELETE)
257+
* @param documentId - The ID of the Firestore document
258+
* @param fullResourceName - Fully-qualified Firestore document path
259+
* @param serializedData - The serialized new data
260+
* @param serializedOldData - The serialized old data
261+
* @param taskData - Task metadata containing event information
267262
*/
268263
async function recordEventToBigQuery(
269264
changeType: ChangeType,
270265
documentId: string,
266+
fullResourceName: string,
271267
serializedData: any,
272268
serializedOldData: any,
273269
taskData: SyncBigQueryTaskData
274270
) {
275271
const event: FirestoreDocumentChangeEvent = {
276-
timestamp: taskData.timestamp, // Cloud Firestore commit timestamp
277-
operation: changeType, // The type of operation performed
278-
documentName: taskData.documentPath, // The document name
279-
documentId, // The document ID
272+
timestamp: taskData.timestamp,
273+
operation: changeType,
274+
documentName: fullResourceName,
275+
documentId,
280276
pathParams: taskData.params as
281277
| FirestoreDocumentChangeEvent["pathParams"]
282-
| null, // Path parameters, if any
283-
eventId: taskData.eventId, // The event ID from Firestore
284-
data: serializedData, // Serialized new data
285-
oldData: serializedOldData, // Serialized old data
278+
| null,
279+
eventId: taskData.eventId,
280+
data: serializedData,
281+
oldData: serializedOldData,
286282
};
287283

288-
// Record the event in the Firestore Event History Tracker and BigQuery.
289284
await eventTracker.record([event]);
290285
}
291286

292287
/**
293-
* Handle errors when enqueueing tasks to sync BigQuery.
288+
* Handles task enqueueing with retry logic when BigQuery sync fails
294289
*
295-
* @param err - The error object.
296-
* @param taskData - The task data to be enqueued.
290+
* @param err - The error that occurred
291+
* @param taskData - The task data to enqueue
297292
*/
298293
async function attemptToEnqueue(_err: Error, taskData: SyncBigQueryTaskData) {
299294
try {
@@ -303,36 +298,31 @@ async function attemptToEnqueue(_err: Error, taskData: SyncBigQueryTaskData) {
303298
);
304299

305300
let attempts = 0;
306-
const jitter = Math.random() * 100; // Adding jitter to avoid collision
307-
308-
// Exponential backoff formula with a maximum of 5 + jitter seconds
301+
const jitter = Math.random() * 100;
309302
const backoff = (attempt: number) =>
310303
Math.min(Math.pow(2, attempt) * 100, 5000) + jitter;
311304

312305
while (attempts < config.maxEnqueueAttempts) {
313306
if (attempts > 0) {
314-
// Wait before retrying to enqueue the task.
315307
await new Promise((resolve) => setTimeout(resolve, backoff(attempts)));
316308
}
317309

318310
attempts++;
319311
try {
320312
await queue.enqueue(taskData);
321-
break; // Break the loop if enqueuing is successful.
313+
break;
322314
} catch (enqueueErr) {
323-
// Throw the error if max attempts are reached.
324315
if (attempts === config.maxEnqueueAttempts) {
325316
throw enqueueErr;
326317
}
327318
}
328319
}
329320
} catch (enqueueErr) {
330-
// Record the error event.
331321
await events.recordErrorEvent(enqueueErr as Error);
332322

333323
logs.logFailedEventAction(
334324
"Failed to enqueue event to Cloud Tasks from onWrite handler",
335-
taskData.documentPath,
325+
taskData.fullResourceName,
336326
taskData.eventId,
337327
taskData.changeType,
338328
enqueueErr as Error
@@ -341,37 +331,27 @@ async function attemptToEnqueue(_err: Error, taskData: SyncBigQueryTaskData) {
341331
}
342332

343333
/**
344-
* Cloud Function to set up BigQuery sync by initializing the event tracker.
334+
* Sets up BigQuery synchronization by initializing the event tracker
345335
*/
346336
export const setupBigQuerySync = functions.tasks
347337
.taskQueue()
348338
.onDispatch(async () => {
349-
/** Setup runtime environment */
350339
const runtime = getExtensions().runtime();
351-
352-
// Initialize the BigQuery sync.
353340
await eventTracker.initialize();
354-
355-
// Update the processing state.
356341
await runtime.setProcessingState(
357342
"PROCESSING_COMPLETE",
358343
"Sync setup completed"
359344
);
360345
});
361346

362347
/**
363-
* Cloud Function to initialize BigQuery sync.
348+
* Initializes BigQuery synchronization
364349
*/
365350
export const initBigQuerySync = functions.tasks
366351
.taskQueue()
367352
.onDispatch(async () => {
368-
/** Setup runtime environment */
369353
const runtime = getExtensions().runtime();
370-
371-
// Initialize the BigQuery sync.
372354
await eventTracker.initialize();
373-
374-
// Update the processing state.
375355
await runtime.setProcessingState(
376356
"PROCESSING_COMPLETE",
377357
"Sync setup completed"

0 commit comments

Comments
 (0)