Skip to content

Commit e56e805

Browse files
authored
AWS Kinesis provider for event hubs (danielgerlag#246)
1 parent 4580fce commit e56e805

15 files changed

+463
-13
lines changed

src/providers/WorkflowCore.Providers.AWS/Services/IDynamoDbProvisioner.cs renamed to src/providers/WorkflowCore.Providers.AWS/Interface/IDynamoDbProvisioner.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
using System.Threading.Tasks;
22

3-
namespace WorkflowCore.Providers.AWS.Services
3+
namespace WorkflowCore.Providers.AWS.Interface
44
{
55
public interface IDynamoDbProvisioner
66
{
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Text;
4+
using System.Threading.Tasks;
5+
using Amazon.Kinesis.Model;
6+
7+
namespace WorkflowCore.Providers.AWS.Interface
8+
{
9+
public interface IKinesisStreamConsumer
10+
{
11+
Task Subscribe(string appName, string stream, Action<Record> action);
12+
}
13+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Text;
4+
using System.Threading.Tasks;
5+
6+
namespace WorkflowCore.Providers.AWS.Interface
7+
{
8+
public interface IKinesisTracker
9+
{
10+
Task<string> GetNextShardIterator(string app, string stream, string shard);
11+
Task IncrementShardIterator(string app, string stream, string shard, string iterator);
12+
}
13+
}

src/providers/WorkflowCore.Providers.AWS/README.md

+17-2
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
* Provides persistence for [Workflow Core](../../README.md) using DynamoDB.
44
* Provides Queueing support on [Workflow Core](../../README.md) using AWS Simple Queue Service.
55
* Provides Distributed locking support on [Workflow Core](../../README.md) using DynamoDB.
6+
* Provides event hub support on [Workflow Core](../../README.md) backed by AWS Kinesis.
67

78
This makes it possible to have a cluster of nodes processing your workflows.
89

@@ -14,7 +15,7 @@ Install the NuGet package "WorkflowCore.Providers.AWS"
1415
PM> Install-Package WorkflowCore.Providers.AWS
1516
```
1617

17-
## Usage
18+
## Usage (Persistence, Queueing and distributed locking)
1819

1920
Use the `IServiceCollection` extension methods when building your service provider
2021
* .UseAwsDynamoPersistence
@@ -31,4 +32,18 @@ services.AddWorkflow(cfg =>
3132
```
3233

3334
If any AWS resources do not exists, they will be automatcially created. By default, all DynamoDB tables and indexes will be provisioned with a throughput of 1, you can modify these values from the AWS console.
34-
You may also specify a prefix for the dynamo table names.
35+
You may also specify a prefix for the dynamo table names.
36+
37+
38+
## Usage (Kinesis)
39+
40+
Use the the `.UseAwsKinesis` extension method on `IServiceCollection` when building your service provider
41+
42+
```C#
43+
services.AddWorkflow(cfg =>
44+
{
45+
cfg.UseAwsKinesis(new EnvironmentVariablesAWSCredentials(), RegionEndpoint.USWest2, "app-name", "stream-name");
46+
});
47+
```
48+
The Kinesis provider will also create a DynamoDB table to track the postion in each shard of the Kinesis stream.
49+
A shard position will be tracked for each app name that you connect with.

src/providers/WorkflowCore.Providers.AWS/ServiceCollectionExtensions.cs

+11
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
using System;
2+
using Amazon;
23
using Amazon.DynamoDBv2;
34
using Amazon.Runtime;
45
using Amazon.SQS;
56
using Microsoft.Extensions.Logging;
7+
using WorkflowCore.Interface;
68
using WorkflowCore.Models;
9+
using WorkflowCore.Providers.AWS.Interface;
710
using WorkflowCore.Providers.AWS.Services;
811

912
namespace Microsoft.Extensions.DependencyInjection
@@ -28,5 +31,13 @@ public static WorkflowOptions UseAwsDynamoPersistence(this WorkflowOptions optio
2831
options.UsePersistence(sp => new DynamoPersistenceProvider(credentials, config, sp.GetService<IDynamoDbProvisioner>(), tablePrefix, sp.GetService<ILoggerFactory>()));
2932
return options;
3033
}
34+
35+
public static WorkflowOptions UseAwsKinesis(this WorkflowOptions options, AWSCredentials credentials, RegionEndpoint region, string appName, string streamName)
36+
{
37+
options.Services.AddTransient<IKinesisTracker>(sp => new KinesisTracker(credentials, region, "workflowcore_kinesis", sp.GetService<ILoggerFactory>()));
38+
options.Services.AddTransient<IKinesisStreamConsumer>(sp => new KinesisStreamConsumer(credentials, region, sp.GetService<IKinesisTracker>(), sp.GetService<IDistributedLockProvider>(), sp.GetService<ILoggerFactory>()));
39+
options.UseEventHub(sp => new KinesisProvider(credentials, region, appName, streamName, sp.GetService<IKinesisStreamConsumer>(), sp.GetService<ILoggerFactory>()));
40+
return options;
41+
}
3142
}
3243
}

src/providers/WorkflowCore.Providers.AWS/Services/DynamoDbProvisioner.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
using System.Collections.Generic;
77
using System.Text;
88
using System.Threading.Tasks;
9-
using WorkflowCore.Interface;
9+
using WorkflowCore.Providers.AWS.Interface;
1010

1111
namespace WorkflowCore.Providers.AWS.Services
1212
{

src/providers/WorkflowCore.Providers.AWS/Services/DynamoLockProvider.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ private async void SendHeartbeat()
145145
{
146146
try
147147
{
148-
foreach (var item in _localLocks)
148+
foreach (var item in _localLocks.ToArray())
149149
{
150150
var req = new PutItemRequest
151151
{

src/providers/WorkflowCore.Providers.AWS/Services/DynamoPersistenceProvider.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
using System.Collections.Generic;
77
using System.Text;
88
using System.Threading.Tasks;
9-
using Amazon.Util;
9+
using WorkflowCore.Providers.AWS.Interface;
1010
using WorkflowCore.Interface;
1111
using WorkflowCore.Models;
1212

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.IO;
4+
using System.Text;
5+
using System.Threading.Tasks;
6+
using Amazon;
7+
using Amazon.Kinesis;
8+
using Amazon.Kinesis.Model;
9+
using Amazon.Runtime;
10+
using Microsoft.Extensions.Logging;
11+
using Newtonsoft.Json;
12+
using WorkflowCore.Interface;
13+
using WorkflowCore.Models.LifeCycleEvents;
14+
using WorkflowCore.Providers.AWS.Interface;
15+
16+
namespace WorkflowCore.Providers.AWS.Services
17+
{
18+
public class KinesisProvider : ILifeCycleEventHub
19+
{
20+
private readonly ILogger _logger;
21+
private Queue<Action<LifeCycleEvent>> _deferredSubscribers = new Queue<Action<LifeCycleEvent>>();
22+
private readonly string _streamName;
23+
private readonly string _appName;
24+
private readonly JsonSerializer _serializer;
25+
private readonly IKinesisStreamConsumer _consumer;
26+
private readonly AmazonKinesisClient _client;
27+
private readonly int _defaultShardCount = 1;
28+
private bool _started = false;
29+
30+
public KinesisProvider(AWSCredentials credentials, RegionEndpoint region, string appName, string streamName, IKinesisStreamConsumer consumer, ILoggerFactory logFactory)
31+
{
32+
_logger = logFactory.CreateLogger(GetType());
33+
_appName = appName;
34+
_streamName = streamName;
35+
_consumer = consumer;
36+
_serializer = new JsonSerializer();
37+
_serializer.TypeNameHandling = TypeNameHandling.All;
38+
_client = new AmazonKinesisClient(credentials, region);
39+
}
40+
41+
public async Task PublishNotification(LifeCycleEvent evt)
42+
{
43+
using (var stream = new MemoryStream())
44+
{
45+
var writer = new StreamWriter(stream);
46+
_serializer.Serialize(writer, evt);
47+
writer.Flush();
48+
49+
var response = await _client.PutRecordAsync(new PutRecordRequest()
50+
{
51+
StreamName = _streamName,
52+
PartitionKey = evt.WorkflowInstanceId,
53+
Data = stream
54+
});
55+
56+
//if (response.HttpStatusCode != System.Net.HttpStatusCode.OK)
57+
//{
58+
// _logger.LogWarning($"Failed to send event to Kinesis {response.HttpStatusCode}");
59+
//}
60+
}
61+
}
62+
63+
public void Subscribe(Action<LifeCycleEvent> action)
64+
{
65+
if (_started)
66+
{
67+
_consumer.Subscribe(_appName, _streamName, record => Consume(record, action));
68+
}
69+
else
70+
{
71+
_deferredSubscribers.Enqueue(action);
72+
}
73+
}
74+
75+
public async Task Start()
76+
{
77+
await EnsureStream();
78+
_started = true;
79+
while (_deferredSubscribers.Count > 0)
80+
{
81+
var action = _deferredSubscribers.Dequeue();
82+
await _consumer.Subscribe(_appName, _streamName, record => Consume(record, action));
83+
}
84+
}
85+
86+
public Task Stop()
87+
{
88+
_started = false;
89+
return Task.CompletedTask;
90+
}
91+
92+
private async Task EnsureStream()
93+
{
94+
try
95+
{
96+
await _client.DescribeStreamSummaryAsync(new DescribeStreamSummaryRequest()
97+
{
98+
StreamName = _streamName
99+
});
100+
}
101+
catch (ResourceNotFoundException)
102+
{
103+
await CreateStream();
104+
}
105+
}
106+
107+
private async Task<string> CreateStream()
108+
{
109+
await _client.CreateStreamAsync(new CreateStreamRequest()
110+
{
111+
StreamName = _streamName,
112+
ShardCount = _defaultShardCount
113+
});
114+
115+
var i = 0;
116+
while (i < 20)
117+
{
118+
i++;
119+
await Task.Delay(3000);
120+
var poll = await _client.DescribeStreamSummaryAsync(new DescribeStreamSummaryRequest()
121+
{
122+
StreamName = _streamName
123+
});
124+
125+
if (poll.StreamDescriptionSummary.StreamStatus == StreamStatus.ACTIVE)
126+
return poll.StreamDescriptionSummary.StreamARN;
127+
}
128+
129+
throw new TimeoutException();
130+
}
131+
132+
private void Consume(Record record, Action<LifeCycleEvent> action)
133+
{
134+
using (var strm = new StreamReader(record.Data))
135+
{
136+
var evt = _serializer.Deserialize(new JsonTextReader(strm));
137+
action(evt as LifeCycleEvent);
138+
}
139+
}
140+
}
141+
}

0 commit comments

Comments
 (0)