Skip to content

Commit 3839922

Browse files
authored
Merge pull request #14 from PandaTechAM/development
Inbox consumer breaking changes. Now inbox consumer uses same scope a…
2 parents 64897a1 + 57234af commit 3839922

File tree

7 files changed

+41
-37
lines changed

7 files changed

+41
-37
lines changed

MassTransit.PostgresOutbox.Demo.Consumer/MassTransit.PostgresOutbox.Demo.Consumer.csproj

+3-3
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,15 @@
77
</PropertyGroup>
88

99
<ItemGroup>
10-
<PackageReference Include="Microsoft.EntityFrameworkCore.Design" Version="9.0.2">
10+
<PackageReference Include="Microsoft.EntityFrameworkCore.Design" Version="9.0.3">
1111
<PrivateAssets>all</PrivateAssets>
1212
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
1313
</PackageReference>
14-
<PackageReference Include="Microsoft.EntityFrameworkCore.Tools" Version="9.0.2">
14+
<PackageReference Include="Microsoft.EntityFrameworkCore.Tools" Version="9.0.3">
1515
<PrivateAssets>all</PrivateAssets>
1616
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
1717
</PackageReference>
18-
<PackageReference Include="Microsoft.AspNetCore.OpenApi" Version="9.0.2" />
18+
<PackageReference Include="Microsoft.AspNetCore.OpenApi" Version="9.0.3" />
1919
</ItemGroup>
2020

2121
<ItemGroup>

MassTransit.PostgresOutbox.Demo.Consumer/Services/ConsumeService.cs

+4-3
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
using MassTransit.PostgresOutbox.Abstractions;
22
using MassTransit.PostgresOutbox.Demo.Consumer.Contexts;
33
using MassTransit.PostgresOutbox.Demo.Shared.Events;
4+
using Microsoft.EntityFrameworkCore.Storage;
45

56
namespace MassTransit.PostgresOutbox.Demo.Consumer.Services;
67

7-
public class ConsumeService(ConsumerContext dbContext, IServiceScopeFactory serviceScopeFactory)
8-
: InboxConsumer<ComplexObjectEvent, ConsumerContext>(serviceScopeFactory)
8+
public class ConsumeService(ConsumerContext dbContext, IServiceProvider sp)
9+
: InboxConsumer<ComplexObjectEvent, ConsumerContext>(sp)
910
{
10-
protected override Task Consume(ComplexObjectEvent message)
11+
protected override Task Consume(ComplexObjectEvent message, IDbContextTransaction dbContextTransaction)
1112
{
1213
var original = ComplexObjectEvent.Init();
1314
var match = message.Equals(original);

MassTransit.PostgresOutbox.Demo.Shared/MassTransit.PostgresOutbox.Demo.Shared.csproj

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
</PropertyGroup>
88

99
<ItemGroup>
10-
<PackageReference Include="MassTransit.RabbitMQ" Version="8.3.6" />
10+
<PackageReference Include="MassTransit.RabbitMQ" Version="8.4.0" />
1111
<ProjectReference Include="..\src\MassTransit.PostgresOutbox\MassTransit.PostgresOutbox.csproj" />
1212
</ItemGroup>
1313

Readme.md

+3-3
Original file line numberDiff line numberDiff line change
@@ -119,13 +119,13 @@ public class YourConsumer : InboxConsumer<YourMessage, PostgresContext>
119119
{
120120
private readonly PostgresContext _context;
121121

122-
public YourConsumer(PostgresContext dbContext, IServiceScopeFactory serviceScopeFactory)
123-
: base(serviceScopeFactory)
122+
public YourConsumer(PostgresContext dbContext, IServiceProvider sp)
123+
: base(sp)
124124
{
125125
_context = dbContext;
126126
}
127127

128-
public override async Task Consume(YourMessage message)
128+
public override async Task Consume(YourMessage message, IDbContextTransaction transaction)
129129
{
130130
// Implement your message processing logic here
131131
}

src/MassTransit.PostgresOutbox/Abstractions/InboxConsumer.cs

+20-17
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using MassTransit.PostgresOutbox.Entities;
55
using MassTransit.PostgresOutbox.Enums;
66
using Microsoft.EntityFrameworkCore;
7+
using Microsoft.EntityFrameworkCore.Storage;
78
using Microsoft.Extensions.DependencyInjection;
89
using Microsoft.Extensions.Logging;
910

@@ -14,23 +15,20 @@ public abstract class InboxConsumer<TMessage, TDbContext> : IConsumer<TMessage>
1415
where TDbContext : DbContext, IInboxDbContext
1516
{
1617
private readonly string _consumerId;
17-
private readonly IServiceScopeFactory _serviceScopeFactory;
18+
private readonly IServiceProvider _sp;
1819

19-
protected InboxConsumer(IServiceScopeFactory serviceScopeFactory)
20+
protected InboxConsumer(IServiceProvider sp)
2021
{
2122
_consumerId = GetType()
2223
.ToString();
23-
_serviceScopeFactory = serviceScopeFactory;
24+
_sp = sp;
2425
}
2526

2627
public async Task Consume(ConsumeContext<TMessage> context)
2728
{
2829
var messageId = context.Headers.Get<Guid>(Constants.OutboxMessageId) ?? context.MessageId;
29-
30-
using var scope = _serviceScopeFactory.CreateScope();
31-
32-
var dbContext = scope.ServiceProvider.GetRequiredService<TDbContext>();
33-
var logger = scope.ServiceProvider.GetRequiredService<ILogger<InboxConsumer<TMessage, TDbContext>>>();
30+
var dbContext = _sp.GetRequiredService<TDbContext>();
31+
var logger = _sp.GetRequiredService<ILogger<InboxConsumer<TMessage, TDbContext>>>();
3432

3533
var exists =
3634
await dbContext.InboxMessages.AnyAsync(x => x.MessageId == messageId && x.ConsumerId == _consumerId);
@@ -57,29 +55,34 @@ public async Task Consume(ConsumeContext<TMessage> context)
5755
.Where(x => x.State == MessageState.New)
5856
.ForUpdate(LockBehavior.SkipLocked)
5957
.FirstOrDefaultAsync();
60-
6158
if (inboxMessage == null)
6259
{
6360
return;
6461
}
6562

6663
try
6764
{
68-
await Consume(context.Message);
65+
await Consume(context.Message, transactionScope);
66+
6967
inboxMessage.State = MessageState.Done;
68+
inboxMessage.UpdatedAt = DateTime.UtcNow;
69+
70+
await dbContext.SaveChangesAsync();
71+
await transactionScope.CommitAsync();
7072
}
7173
catch (Exception ex)
7274
{
73-
logger.LogError(ex, "Exception thrown while consuming message");
74-
throw;
75-
}
76-
finally
77-
{
75+
logger.LogError(ex, "Exception thrown while consuming message {messageId} by {consumerId}",
76+
messageId,
77+
_consumerId);
78+
79+
await transactionScope.RollbackAsync();
80+
7881
inboxMessage.UpdatedAt = DateTime.UtcNow;
7982
await dbContext.SaveChangesAsync();
80-
await transactionScope.CommitAsync();
83+
throw;
8184
}
8285
}
8386

84-
protected abstract Task Consume(TMessage message);
87+
protected abstract Task Consume(TMessage message, IDbContextTransaction transactionScope);
8588
}

src/MassTransit.PostgresOutbox/MassTransit.PostgresOutbox.csproj

+6-6
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,13 @@
88
<PackageReadmeFile>Readme.md</PackageReadmeFile>
99
<Authors>Pandatech</Authors>
1010
<Copyright>MIT</Copyright>
11-
<Version>2.0.3</Version>
11+
<Version>3.0.0</Version>
1212
<PackageId>Pandatech.MassTransit.PostgresOutbox</PackageId>
1313
<Title>Pandatech MassTransit PostgreSQL Outbox Extension</Title>
1414
<PackageTags>Pandatech, library, postgres, distributed systems, microservices, modular monolith, messaging, efcore, mass transit, outbox pattern, inbox pattern</PackageTags>
1515
<Description>Pandatech.MassTransit.PostgresOutbox extends MassTransit to offer advanced message handling capabilities for distributed systems. With first-class support for multiple DbContexts, this library integrates seamlessly with Entity Framework Core and PostgreSQL, providing reliable Outbox and Inbox patterns. It ensures consistent message delivery and processing in complex microservices architectures, leveraging PostgreSQL's ForUpdate feature to handle concurrency with ease.</Description>
1616
<RepositoryUrl>https://github.com/PandaTechAM/be-lib-pandatech-masstransit-postgres-outbox</RepositoryUrl>
17-
<PackageReleaseNotes>Nuget updates</PackageReleaseNotes>
17+
<PackageReleaseNotes>Inbox consumer breaking changes. Now inbox consumer uses same scope and works within db transaction to make inbox maximum atomic,</PackageReleaseNotes>
1818
</PropertyGroup>
1919

2020
<ItemGroup>
@@ -23,10 +23,10 @@
2323
</ItemGroup>
2424

2525
<ItemGroup>
26-
<PackageReference Include="MassTransit" Version="8.3.6" />
27-
<PackageReference Include="Microsoft.AspNetCore.OpenApi" Version="9.0.2" />
28-
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="9.0.2" />
29-
<PackageReference Include="Pandatech.EFCore.PostgresExtensions" Version="4.0.1" />
26+
<PackageReference Include="MassTransit" Version="8.4.0" />
27+
<PackageReference Include="Microsoft.AspNetCore.OpenApi" Version="9.0.3" />
28+
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="9.0.3" />
29+
<PackageReference Include="Pandatech.EFCore.PostgresExtensions" Version="5.1.0" />
3030
</ItemGroup>
3131

3232
</Project>

test/PandaNuGet.Demo/MassTransit.PostgresOutbox.Demo.Publisher.csproj

+4-4
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,16 @@
99
</PropertyGroup>
1010

1111
<ItemGroup>
12-
<PackageReference Include="Microsoft.AspNetCore.OpenApi" Version="9.0.2" />
13-
<PackageReference Include="Microsoft.EntityFrameworkCore.Design" Version="9.0.2">
12+
<PackageReference Include="Microsoft.AspNetCore.OpenApi" Version="9.0.3" />
13+
<PackageReference Include="Microsoft.EntityFrameworkCore.Design" Version="9.0.3">
1414
<PrivateAssets>all</PrivateAssets>
1515
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
1616
</PackageReference>
17-
<PackageReference Include="Microsoft.EntityFrameworkCore.Tools" Version="9.0.2">
17+
<PackageReference Include="Microsoft.EntityFrameworkCore.Tools" Version="9.0.3">
1818
<PrivateAssets>all</PrivateAssets>
1919
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
2020
</PackageReference>
21-
<PackageReference Include="Swashbuckle.AspNetCore" Version="7.2.0" />
21+
<PackageReference Include="Swashbuckle.AspNetCore" Version="8.1.0" />
2222
</ItemGroup>
2323

2424
<ItemGroup>

0 commit comments

Comments
 (0)