Skip to content

Commit 0d26866

Browse files
committed
feat: add activity on connection
1 parent 9ecad93 commit 0d26866

File tree

2 files changed

+30
-5
lines changed

2 files changed

+30
-5
lines changed

projects/RabbitMQ.Client/Impl/Connection.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -228,11 +228,10 @@ internal void TakeOver(Connection other)
228228
internal async ValueTask<IConnection> OpenAsync(CancellationToken cancellationToken)
229229
{
230230
cancellationToken.ThrowIfCancellationRequested();
231-
231+
Activity? _connectionActivity = RabbitMQActivitySource.OpenConnection(_frameHandler);
232232
try
233233
{
234234
RabbitMqClientEventSource.Log.ConnectionOpened();
235-
236235
cancellationToken.ThrowIfCancellationRequested();
237236

238237
// Note: this must happen *after* the frame handler is started
@@ -250,8 +249,9 @@ await _channel0.ConnectionOpenAsync(_config.VirtualHost, cancellationToken)
250249

251250
return this;
252251
}
253-
catch
252+
catch(Exception ex)
254253
{
254+
_connectionActivity?.ReportException(ex);
255255
try
256256
{
257257
var ea = new ShutdownEventArgs(ShutdownInitiator.Library, Constants.InternalError, "FailedOpen");

projects/RabbitMQ.Client/Impl/RabbitMQActivitySource.cs

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,15 @@ public static class RabbitMQActivitySource
4343
private static readonly ActivitySource s_subscriberSource =
4444
new ActivitySource(SubscriberSourceName, AssemblyVersion);
4545

46+
private static readonly ActivitySource s_connectionSource =
47+
new ActivitySource(ConnectionSourceName, AssemblyVersion);
48+
4649
public const string PublisherSourceName = "RabbitMQ.Client.Publisher";
4750
public const string SubscriberSourceName = "RabbitMQ.Client.Subscriber";
51+
public const string ConnectionSourceName = "RabbitMQ.Client.Connection";
4852

49-
public static Action<Activity, IDictionary<string, object?>> ContextInjector { get; set; } = DefaultContextInjector;
53+
public static Action<Activity, IDictionary<string, object?>> ContextInjector { get; set; } =
54+
DefaultContextInjector;
5055

5156
public static Func<IReadOnlyBasicProperties, ActivityContext> ContextExtractor { get; set; } =
5257
DefaultContextExtractor;
@@ -61,6 +66,19 @@ public static class RabbitMQActivitySource
6166
new KeyValuePair<string, object?>(ProtocolVersion, "0.9.1")
6267
};
6368

69+
internal static Activity? OpenConnection(IFrameHandler frameHandler)
70+
{
71+
if (!s_connectionSource.HasListeners())
72+
{
73+
return null;
74+
}
75+
Activity? connectionActivity =
76+
s_connectionSource.StartRabbitMQActivity("rabbitmq connect", ActivityKind.Client);
77+
connectionActivity?
78+
.SetNetworkTags(frameHandler);
79+
return connectionActivity;
80+
}
81+
6482
internal static Activity? BasicPublish(string routingKey, string exchange, int bodySize,
6583
ActivityContext linkedContext = default)
6684
{
@@ -82,7 +100,6 @@ public static class RabbitMQActivitySource
82100
}
83101

84102
return activity;
85-
86103
}
87104

88105
internal static Activity? BasicGetEmpty(string queue)
@@ -148,6 +165,14 @@ public static class RabbitMQActivitySource
148165
return activity;
149166
}
150167

168+
internal static void ReportException(this Activity? activity, Exception exception)
169+
{
170+
activity?.AddTag("exception.message", exception.Message);
171+
activity?.AddTag("exception.stacktrace", exception.ToString());
172+
activity?.AddTag("exception.type", exception.GetType().FullName);
173+
activity?.SetStatus(ActivityStatusCode.Error);
174+
}
175+
151176
private static Activity? StartRabbitMQActivity(this ActivitySource source, string name, ActivityKind kind,
152177
ActivityContext parentContext = default)
153178
{

0 commit comments

Comments
 (0)