9
9
namespace MassTransit . PostgresOutbox . Abstractions ;
10
10
11
11
public abstract class InboxConsumer < TMessage , TDbContext > : IConsumer < TMessage >
12
- where TMessage : class
13
- where TDbContext : DbContext , IInboxDbContext
12
+ where TMessage : class
13
+ where TDbContext : DbContext , IInboxDbContext
14
14
{
15
- private readonly string _consumerId ;
16
- private readonly IServiceScopeFactory _serviceScopeFactory ;
15
+ private readonly string _consumerId ;
16
+ private readonly IServiceScopeFactory _serviceScopeFactory ;
17
17
18
- protected InboxConsumer ( IServiceScopeFactory serviceScopeFactory )
19
- {
20
- _consumerId = GetType ( ) . ToString ( ) ;
21
- _serviceScopeFactory = serviceScopeFactory ;
22
- }
18
+ protected InboxConsumer ( IServiceScopeFactory serviceScopeFactory )
19
+ {
20
+ _consumerId = GetType ( ) . ToString ( ) ;
21
+ _serviceScopeFactory = serviceScopeFactory ;
22
+ }
23
23
24
- public async Task Consume ( ConsumeContext < TMessage > context )
25
- {
26
- using var scope = _serviceScopeFactory . CreateScope ( ) ;
27
- var messageId = context . Headers . Get < Guid > ( Constants . OutboxMessageId ) ;
24
+ public async Task Consume ( ConsumeContext < TMessage > context )
25
+ {
26
+ var messageId = context . Headers . Get < Guid > ( Constants . OutboxMessageId ) ;
28
27
29
- var dbContext = scope . ServiceProvider . GetRequiredService < TDbContext > ( ) ;
30
- var logger = scope . ServiceProvider . GetRequiredService < ILogger < InboxConsumer < TMessage , TDbContext > > > ( ) ;
28
+ if ( messageId is null )
29
+ {
30
+ await Consume ( context . Message ) ;
31
+ return ;
32
+ }
31
33
32
- var exists = await dbContext . InboxMessages . AnyAsync ( x => x . MessageId == messageId && x . ConsumerId == _consumerId ) ;
34
+ using var scope = _serviceScopeFactory . CreateScope ( ) ;
33
35
34
- if ( ! exists )
35
- {
36
- dbContext . InboxMessages . Add ( new InboxMessage
37
- {
38
- MessageId = messageId ! . Value ,
39
- CreatedAt = DateTime . UtcNow ,
40
- State = MessageState . New ,
41
- ConsumerId = _consumerId ,
42
- } ) ;
36
+ var dbContext = scope . ServiceProvider . GetRequiredService < TDbContext > ( ) ;
37
+ var logger = scope . ServiceProvider . GetRequiredService < ILogger < InboxConsumer < TMessage , TDbContext > > > ( ) ;
43
38
44
- await dbContext . SaveChangesAsync ( ) ;
45
- }
39
+ var exists =
40
+ await dbContext . InboxMessages . AnyAsync ( x => x . MessageId == messageId && x . ConsumerId == _consumerId ) ;
46
41
47
- using var transactionScope = await dbContext . Database . BeginTransactionAsync ( System . Data . IsolationLevel . ReadCommitted ) ;
42
+ if ( ! exists )
43
+ {
44
+ dbContext . InboxMessages . Add ( new InboxMessage
45
+ {
46
+ MessageId = messageId . Value ,
47
+ CreatedAt = DateTime . UtcNow ,
48
+ State = MessageState . New ,
49
+ ConsumerId = _consumerId ,
50
+ } ) ;
48
51
49
- var inboxMessage = await dbContext . InboxMessages
50
- . Where ( x => x . MessageId == messageId )
51
- . Where ( x => x . ConsumerId == _consumerId )
52
- . Where ( x => x . State == MessageState . New )
53
- . ForUpdate ( LockBehavior . SkipLocked )
54
- . FirstOrDefaultAsync ( ) ;
52
+ await dbContext . SaveChangesAsync ( ) ;
53
+ }
55
54
56
- if ( inboxMessage == null )
57
- {
58
- return ;
59
- }
55
+ await using var transactionScope =
56
+ await dbContext . Database . BeginTransactionAsync ( System . Data . IsolationLevel . ReadCommitted ) ;
60
57
61
- try
62
- {
63
- await Consume ( context . Message ) ;
64
- inboxMessage . State = MessageState . Done ;
65
- }
66
- catch ( Exception ex )
67
- {
68
- logger . LogError ( ex , "Exception thrown while consuming message" ) ;
69
- throw ;
70
- }
71
- finally
72
- {
73
- inboxMessage ! . UpdatedAt = DateTime . UtcNow ;
74
- await dbContext . SaveChangesAsync ( ) ;
75
- await transactionScope . CommitAsync ( ) ;
76
- }
77
- }
58
+ var inboxMessage = await dbContext . InboxMessages
59
+ . Where ( x => x . MessageId == messageId )
60
+ . Where ( x => x . ConsumerId == _consumerId )
61
+ . Where ( x => x . State == MessageState . New )
62
+ . ForUpdate ( LockBehavior . SkipLocked )
63
+ . FirstOrDefaultAsync ( ) ;
78
64
79
- public abstract Task Consume ( TMessage message ) ;
65
+ if ( inboxMessage == null )
66
+ {
67
+ return ;
68
+ }
69
+
70
+ try
71
+ {
72
+ await Consume ( context . Message ) ;
73
+ inboxMessage . State = MessageState . Done ;
74
+ }
75
+ catch ( Exception ex )
76
+ {
77
+ logger . LogError ( ex , "Exception thrown while consuming message" ) ;
78
+ throw ;
79
+ }
80
+ finally
81
+ {
82
+ inboxMessage . UpdatedAt = DateTime . UtcNow ;
83
+ await dbContext . SaveChangesAsync ( ) ;
84
+ await transactionScope . CommitAsync ( ) ;
85
+ }
86
+ }
87
+
88
+ protected abstract Task Consume ( TMessage message ) ;
80
89
}
0 commit comments