Skip to content

Commit 1c99cf0

Browse files
author
Misha Savelyev
committed
Use SELECT ... FOR UPDATE correctly in a transaction
1 parent 67547bf commit 1c99cf0

File tree

2 files changed

+37
-32
lines changed

2 files changed

+37
-32
lines changed

services/apps/integration_data_worker/src/jobs/processOldData.ts

+31-31
Original file line numberDiff line numberDiff line change
@@ -14,45 +14,45 @@ export const processOldDataJob = async (
1414
): Promise<void> => {
1515
const store = new DbStore(log, dbConn)
1616
const repo = new IntegrationDataRepository(store, log)
17-
const service = new IntegrationDataService(
18-
redis,
19-
streamWorkerEmitter,
20-
dataSinkWorkerEmitter,
21-
store,
22-
log,
23-
)
24-
25-
const loadNextBatch = async (): Promise<string[]> => {
26-
return await repo.transactionally(async (txRepo) => {
27-
const dataIds = await txRepo.getOldDataToProcess(5)
28-
await txRepo.touchUpdatedAt(dataIds)
29-
return dataIds
30-
})
31-
}
32-
33-
// load 5 oldest apiData and try process them
34-
let dataToProcess = await loadNextBatch()
3517

3618
let successCount = 0
3719
let errorCount = 0
3820

39-
while (dataToProcess.length > 0) {
40-
for (const dataId of dataToProcess) {
41-
try {
42-
const result = await service.processData(dataId)
43-
if (result) {
44-
successCount++
45-
} else {
21+
while (true) {
22+
const processedSomething = await repo.transactionally(async (txRepo) => {
23+
const dataIds = await txRepo.getOldDataToProcess(5)
24+
await txRepo.touchUpdatedAt(dataIds)
25+
26+
const txService = new IntegrationDataService(
27+
redis,
28+
streamWorkerEmitter,
29+
dataSinkWorkerEmitter,
30+
store,
31+
log,
32+
txRepo,
33+
)
34+
35+
for (const dataId of dataIds) {
36+
try {
37+
const result = await txService.processData(dataId)
38+
if (result) {
39+
successCount++
40+
} else {
41+
errorCount++
42+
}
43+
} catch (err) {
44+
log.error(err, 'Failed to process data!')
4645
errorCount++
4746
}
48-
} catch (err) {
49-
log.error(err, 'Failed to process data!')
50-
errorCount++
5147
}
52-
}
5348

54-
log.info(`Processed ${successCount} old data successfully and ${errorCount} with errors.`)
49+
log.info(`Processed ${successCount} old data successfully and ${errorCount} with errors.`)
5550

56-
dataToProcess = await loadNextBatch()
51+
return dataIds.length > 0
52+
})
53+
54+
if (!processedSomething) {
55+
break
56+
}
5757
}
5858
}

services/apps/integration_data_worker/src/service/integrationDataService.ts

+6-1
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,15 @@ export default class IntegrationDataService extends LoggerBase {
1818
private readonly dataSinkWorkerEmitter: DataSinkWorkerEmitter,
1919
store: DbStore,
2020
parentLog: Logger,
21+
repo?: IntegrationDataRepository,
2122
) {
2223
super(parentLog)
2324

24-
this.repo = new IntegrationDataRepository(store, this.log)
25+
if (repo) {
26+
this.repo = repo
27+
} else {
28+
this.repo = new IntegrationDataRepository(store, this.log)
29+
}
2530
}
2631

2732
private async triggerRunError(

0 commit comments

Comments
 (0)