Skip to content

Commit b7d744f

Browse files
aygalinclukebakken
authored andcommitted
feat: add activity on connection
1 parent 354d4bd commit b7d744f

File tree

2 files changed

+53
-14
lines changed

2 files changed

+53
-14
lines changed

projects/RabbitMQ.Client/Impl/Connection.cs

+3-3
Original file line numberDiff line numberDiff line change
@@ -226,11 +226,10 @@ internal void TakeOver(Connection other)
226226
internal async ValueTask<IConnection> OpenAsync(CancellationToken cancellationToken)
227227
{
228228
cancellationToken.ThrowIfCancellationRequested();
229-
229+
Activity? _connectionActivity = RabbitMQActivitySource.OpenConnection(_frameHandler);
230230
try
231231
{
232232
RabbitMqClientEventSource.Log.ConnectionOpened();
233-
234233
cancellationToken.ThrowIfCancellationRequested();
235234

236235
// Note: this must happen *after* the frame handler is started
@@ -248,8 +247,9 @@ await _channel0.ConnectionOpenAsync(_config.VirtualHost, cancellationToken)
248247

249248
return this;
250249
}
251-
catch
250+
catch(Exception ex)
252251
{
252+
_connectionActivity?.ReportException(ex);
253253
try
254254
{
255255
var ea = new ShutdownEventArgs(ShutdownInitiator.Library, Constants.InternalError, "FailedOpen");

projects/RabbitMQ.Client/Impl/RabbitMQActivitySource.cs

+50-11
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,15 @@ public static class RabbitMQActivitySource
3838
private static readonly ActivitySource s_subscriberSource =
3939
new ActivitySource(SubscriberSourceName, AssemblyVersion);
4040

41+
private static readonly ActivitySource s_connectionSource =
42+
new ActivitySource(ConnectionSourceName, AssemblyVersion);
43+
4144
public const string PublisherSourceName = "RabbitMQ.Client.Publisher";
4245
public const string SubscriberSourceName = "RabbitMQ.Client.Subscriber";
46+
public const string ConnectionSourceName = "RabbitMQ.Client.Connection";
4347

44-
public static Action<Activity, IDictionary<string, object?>> ContextInjector { get; set; } = DefaultContextInjector;
48+
public static Action<Activity, IDictionary<string, object?>> ContextInjector { get; set; } =
49+
DefaultContextInjector;
4550

4651
public static Func<IReadOnlyBasicProperties, ActivityContext> ContextExtractor { get; set; } =
4752
DefaultContextExtractor;
@@ -56,6 +61,19 @@ public static class RabbitMQActivitySource
5661
new KeyValuePair<string, object?>(ProtocolVersion, "0.9.1")
5762
};
5863

64+
internal static Activity? OpenConnection(IFrameHandler frameHandler)
65+
{
66+
if (!s_connectionSource.HasListeners())
67+
{
68+
return null;
69+
}
70+
Activity? connectionActivity =
71+
s_connectionSource.StartRabbitMQActivity("rabbitmq connect", ActivityKind.Client);
72+
connectionActivity?
73+
.SetNetworkTags(frameHandler);
74+
return connectionActivity;
75+
}
76+
5977
internal static Activity? Send(string routingKey, string exchange, int bodySize,
6078
ActivityContext linkedContext = default)
6179
{
@@ -66,18 +84,21 @@ public static class RabbitMQActivitySource
6684

6785
Activity? activity = linkedContext == default
6886
? s_publisherSource.StartRabbitMQActivity(
69-
UseRoutingKeyAsOperationName ? $"{routingKey} {MessagingOperationTypeSend}" : MessagingOperationTypeSend,
87+
UseRoutingKeyAsOperationName
88+
? $"{routingKey} {MessagingOperationTypeSend}"
89+
: MessagingOperationTypeSend,
7090
ActivityKind.Producer)
7191
: s_publisherSource.StartLinkedRabbitMQActivity(
72-
UseRoutingKeyAsOperationName ? $"{routingKey} {MessagingOperationTypeSend}" : MessagingOperationTypeSend,
92+
UseRoutingKeyAsOperationName
93+
? $"{routingKey} {MessagingOperationTypeSend}"
94+
: MessagingOperationTypeSend,
7395
ActivityKind.Producer, linkedContext);
7496
if (activity != null && activity.IsAllDataRequested)
7597
{
7698
PopulateMessagingTags(MessagingOperationTypeSend, routingKey, exchange, 0, bodySize, activity);
7799
}
78100

79101
return activity;
80-
81102
}
82103

83104
internal static Activity? ReceiveEmpty(string queue)
@@ -88,7 +109,9 @@ public static class RabbitMQActivitySource
88109
}
89110

90111
Activity? activity = s_subscriberSource.StartRabbitMQActivity(
91-
UseRoutingKeyAsOperationName ? $"{queue} {MessagingOperationTypeReceive}" : MessagingOperationTypeReceive,
112+
UseRoutingKeyAsOperationName
113+
? $"{queue} {MessagingOperationTypeReceive}"
114+
: MessagingOperationTypeReceive,
92115
ActivityKind.Consumer);
93116
if (activity != null && activity.IsAllDataRequested)
94117
{
@@ -110,11 +133,14 @@ public static class RabbitMQActivitySource
110133

111134
// Extract the PropagationContext of the upstream parent from the message headers.
112135
Activity? activity = s_subscriberSource.StartLinkedRabbitMQActivity(
113-
UseRoutingKeyAsOperationName ? $"{routingKey} {MessagingOperationTypeReceive}" : MessagingOperationTypeReceive, ActivityKind.Consumer,
136+
UseRoutingKeyAsOperationName
137+
? $"{routingKey} {MessagingOperationTypeReceive}"
138+
: MessagingOperationTypeReceive, ActivityKind.Consumer,
114139
ContextExtractor(readOnlyBasicProperties));
115140
if (activity != null && activity.IsAllDataRequested)
116141
{
117-
PopulateMessagingTags(MessagingOperationTypeReceive, routingKey, exchange, deliveryTag, readOnlyBasicProperties,
142+
PopulateMessagingTags(MessagingOperationTypeReceive, routingKey, exchange, deliveryTag,
143+
readOnlyBasicProperties,
118144
bodySize, activity);
119145
}
120146

@@ -131,7 +157,9 @@ public static class RabbitMQActivitySource
131157

132158
// Extract the PropagationContext of the upstream parent from the message headers.
133159
Activity? activity = s_subscriberSource.StartLinkedRabbitMQActivity(
134-
UseRoutingKeyAsOperationName ? $"{routingKey} {MessagingOperationTypeProcess}" : MessagingOperationTypeProcess,
160+
UseRoutingKeyAsOperationName
161+
? $"{routingKey} {MessagingOperationTypeProcess}"
162+
: MessagingOperationTypeProcess,
135163
ActivityKind.Consumer, ContextExtractor(basicProperties));
136164
if (activity != null && activity.IsAllDataRequested)
137165
{
@@ -142,10 +170,19 @@ public static class RabbitMQActivitySource
142170
return activity;
143171
}
144172

173+
internal static void ReportException(this Activity? activity, Exception exception)
174+
{
175+
activity?.AddTag("exception.message", exception.Message);
176+
activity?.AddTag("exception.stacktrace", exception.ToString());
177+
activity?.AddTag("exception.type", exception.GetType().FullName);
178+
activity?.SetStatus(ActivityStatusCode.Error);
179+
}
180+
145181
private static Activity? StartRabbitMQActivity(this ActivitySource source, string name, ActivityKind kind,
146182
ActivityContext parentContext = default)
147183
{
148-
return source.CreateActivity(name, kind, parentContext, idFormat: ActivityIdFormat.W3C, tags: CreationTags)?.Start();
184+
return source.CreateActivity(name, kind, parentContext, idFormat: ActivityIdFormat.W3C, tags: CreationTags)
185+
?.Start();
149186
}
150187

151188
private static Activity? StartLinkedRabbitMQActivity(this ActivitySource source, string name, ActivityKind kind,
@@ -273,7 +310,8 @@ private static ActivityContext DefaultContextExtractor(IReadOnlyBasicProperties
273310
return default;
274311
}
275312

276-
DistributedContextPropagator.Current.ExtractTraceIdAndState(props.Headers, DefaultContextGetter, out string? traceParent, out string? traceState);
313+
DistributedContextPropagator.Current.ExtractTraceIdAndState(props.Headers, DefaultContextGetter,
314+
out string? traceParent, out string? traceState);
277315
return ActivityContext.TryParse(traceParent, traceState, out ActivityContext context) ? context : default;
278316
}
279317

@@ -288,7 +326,8 @@ private static void DefaultContextSetter(object? carrier, string name, string va
288326
carrierDictionary[name] = value;
289327
}
290328

291-
private static void DefaultContextGetter(object? carrier, string name, out string? value, out IEnumerable<string>? values)
329+
private static void DefaultContextGetter(object? carrier, string name, out string? value,
330+
out IEnumerable<string>? values)
292331
{
293332
if (carrier is IDictionary<string, object> carrierDict &&
294333
carrierDict.TryGetValue(name, out object? propsVal) && propsVal is byte[] bytes)

0 commit comments

Comments
 (0)