5
5
* Queue Job Realm Schema defined in ../config/Database
6
6
*
7
7
*/
8
-
9
8
import Database from '../config/Database' ;
10
9
import uuid from 'react-native-uuid' ;
11
10
import Worker from './Worker' ;
12
11
import promiseReflect from 'promise-reflect' ;
13
12
14
-
15
13
export class Queue {
16
-
17
14
/**
18
15
*
19
16
* Set initial class properties.
@@ -82,7 +79,6 @@ export class Queue {
82
79
* @param startQueue - {boolean} - Whether or not to immediately begin prcessing queue. If false queue.start() must be manually called.
83
80
*/
84
81
createJob ( name , payload = { } , options = { } , startQueue = true ) {
85
-
86
82
if ( ! name ) {
87
83
throw new Error ( 'Job name must be supplied.' ) ;
88
84
}
@@ -109,10 +105,9 @@ export class Queue {
109
105
} ) ;
110
106
111
107
// Start queue on job creation if it isn't running by default.
112
- if ( startQueue && this . status == 'inactive' ) {
108
+ if ( startQueue && this . status === 'inactive' ) {
113
109
this . start ( ) ;
114
110
}
115
-
116
111
}
117
112
118
113
/**
@@ -139,9 +134,8 @@ export class Queue {
139
134
* @return {boolean|undefined } - False if queue is already started. Otherwise nothing is returned when queue finishes processing.
140
135
*/
141
136
async start ( lifespan = 0 ) {
142
-
143
137
// If queue is already running, don't fire up concurrent loop.
144
- if ( this . status == 'active' ) {
138
+ if ( this . status === 'active' ) {
145
139
return false ;
146
140
}
147
141
@@ -160,8 +154,7 @@ export class Queue {
160
154
concurrentJobs = await this . getConcurrentJobs ( ) ;
161
155
}
162
156
163
- while ( this . status == 'active' && concurrentJobs . length ) {
164
-
157
+ while ( this . status === 'active' && concurrentJobs . length ) {
165
158
// Loop over jobs and process them concurrently.
166
159
const processingJobs = concurrentJobs . map ( job => {
167
160
return this . processJob ( job ) ;
@@ -179,11 +172,9 @@ export class Queue {
179
172
} else {
180
173
concurrentJobs = await this . getConcurrentJobs ( ) ;
181
174
}
182
-
183
175
}
184
176
185
177
this . status = 'inactive' ;
186
-
187
178
}
188
179
189
180
/**
@@ -206,22 +197,16 @@ export class Queue {
206
197
* @return {promise } - Promise that resolves to a collection of all the jobs in the queue.
207
198
*/
208
199
async getJobs ( sync = false ) {
209
-
210
200
if ( sync ) {
211
-
212
201
let jobs = null ;
213
202
this . realm . write ( ( ) => {
214
-
215
203
jobs = Array . from ( this . realm . objects ( 'Job' ) ) ;
216
-
217
204
} ) ;
218
205
219
206
return jobs ;
220
-
221
207
} else {
222
208
return Array . from ( await this . realm . objects ( 'Job' ) ) ;
223
209
}
224
-
225
210
}
226
211
227
212
/**
@@ -239,11 +224,9 @@ export class Queue {
239
224
* @return {promise } - Promise resolves to an array of job(s) to be processed next by the queue.
240
225
*/
241
226
async getConcurrentJobs ( queueLifespanRemaining = 0 ) {
242
-
243
227
let concurrentJobs = [ ] ;
244
228
245
229
this . realm . write ( ( ) => {
246
-
247
230
// Get next job from queue.
248
231
let nextJob = null ;
249
232
@@ -294,9 +277,7 @@ export class Queue {
294
277
. sorted ( [ [ 'priority' , true ] , [ 'created' , false ] ] ) ) ;
295
278
296
279
concurrentJobs = reselectedJobs . slice ( 0 , concurrency ) ;
297
-
298
280
}
299
-
300
281
} ) ;
301
282
302
283
return concurrentJobs ;
@@ -319,7 +300,6 @@ export class Queue {
319
300
* @param job {object} - Job realm model object
320
301
*/
321
302
async processJob ( job ) {
322
-
323
303
// Data must be cloned off the realm job object for several lifecycle callbacks to work correctly.
324
304
// This is because realm job is deleted before some callbacks are called if job processed successfully.
325
305
// More info: https://github.com/billmalarky/react-native-queue/issues/2#issuecomment-361418965
@@ -331,27 +311,21 @@ export class Queue {
331
311
this . worker . executeJobLifecycleCallback ( 'onStart' , jobName , jobId , jobPayload ) ;
332
312
333
313
try {
334
-
335
314
await this . worker . executeJob ( job ) ;
336
315
337
316
// On successful job completion, remove job
338
317
this . realm . write ( ( ) => {
339
-
340
318
this . realm . delete ( job ) ;
341
-
342
319
} ) ;
343
320
344
321
// Job has processed successfully, fire onSuccess and onComplete job lifecycle callbacks.
345
322
this . worker . executeJobLifecycleCallback ( 'onSuccess' , jobName , jobId , jobPayload ) ;
346
323
this . worker . executeJobLifecycleCallback ( 'onComplete' , jobName , jobId , jobPayload ) ;
347
-
348
324
} catch ( error ) {
349
-
350
325
// Handle job failure logic, including retries.
351
326
let jobData = JSON . parse ( job . data ) ;
352
327
353
328
this . realm . write ( ( ) => {
354
-
355
329
// Increment failed attempts number
356
330
if ( ! jobData . failedAttempts ) {
357
331
jobData . failedAttempts = 1 ;
@@ -375,7 +349,6 @@ export class Queue {
375
349
if ( jobData . failedAttempts >= jobData . attempts ) {
376
350
job . failed = new Date ( ) ;
377
351
}
378
-
379
352
} ) ;
380
353
381
354
// Execute job onFailure lifecycle callback.
@@ -386,9 +359,7 @@ export class Queue {
386
359
this . worker . executeJobLifecycleCallback ( 'onFailed' , jobName , jobId , jobPayload ) ;
387
360
this . worker . executeJobLifecycleCallback ( 'onComplete' , jobName , jobId , jobPayload ) ;
388
361
}
389
-
390
362
}
391
-
392
363
}
393
364
394
365
/**
@@ -401,31 +372,21 @@ export class Queue {
401
372
* @param jobName {string} - Name associated with job (and related job worker).
402
373
*/
403
374
flushQueue ( jobName = null ) {
404
-
405
375
if ( jobName ) {
406
-
407
376
this . realm . write ( ( ) => {
408
-
409
377
let jobs = Array . from ( this . realm . objects ( 'Job' )
410
378
. filtered ( 'name == "' + jobName + '"' ) ) ;
411
379
412
380
if ( jobs . length ) {
413
381
this . realm . delete ( jobs ) ;
414
382
}
415
-
416
383
} ) ;
417
-
418
384
} else {
419
385
this . realm . write ( ( ) => {
420
-
421
386
this . realm . deleteAll ( ) ;
422
-
423
387
} ) ;
424
388
}
425
-
426
389
}
427
-
428
-
429
390
}
430
391
431
392
/**
@@ -435,10 +396,8 @@ export class Queue {
435
396
* @return {Queue } - A queue instance.
436
397
*/
437
398
export default async function queueFactory ( ) {
438
-
439
399
const queue = new Queue ( ) ;
440
400
await queue . init ( ) ;
441
401
442
402
return queue ;
443
-
444
403
}
0 commit comments