Skip to content

Commit 1565eab

Browse files
committed
Improve message interceptors
1. Force the config for timestamp and routing node message interceptors to be configured with the overwrite boolean() to avoid defining multiple default values throughout the code. 2. Add type specs 3. Extend existing test case for new MQTT client ID interceptor 4. routing node and timestamp should only set the annotation for incoming_message_interceptors group 5. Fix `rabbitmq.conf`. Prior to this commit there were several issue: a.) Setting the right configuration was too user unfriendly, e.g. the user has to set ``` message_interceptor.incoming.rabbit_mqtt_message_interceptor_client_id.annotation_key = x-opt-mqtt-client-id ``` just to enable the MQTT message interceptor. b.) The code that parses was too difficult to understand c.) MQTT plugin was setting the env for app rabbit, which is an anti-pattern d.) disabling a plugin (e.g. MQTT), left its message interceptors still in place This is now all fixed, the user sets the rabbitmq.conf as follows: ``` message_interceptors.incoming.set_header_timestamp.overwrite = true message_interceptors.incoming.set_header_routing_node.overwrite = false mqtt.message_interceptors.incoming.set_client_id_annotation.enabled = true ``` Note that the first two lines use the same format as for RabbitMQ 4.0 for backwards compatiblity. The last line (MQTT) follows a similar pattern.
1 parent f12bfba commit 1565eab

14 files changed

+200
-247
lines changed

deps/rabbit/priv/schema/rabbit.schema

+19-96
Original file line numberDiff line numberDiff line change
@@ -2667,103 +2667,26 @@ end}.
26672667
{mapping, "message_interceptors.incoming.$interceptor.overwrite", "rabbit.incoming_message_interceptors", [
26682668
{datatype, {enum, [true, false]}}]}.
26692669

2670-
% Pseudo-key to include the interceptor in the list of interceptors.
2671-
% - If any other configuration is provided for the interceptor this
2672-
% configuration is not required.
2673-
% - If no other configuration is provided, this one is required so that the
2674-
% interceptor gets invoked.
2675-
{mapping, "message_interceptors.incoming.$interceptor.enabled", "rabbit.incoming_message_interceptors", [
2676-
{datatype, {enum, [true]}}]}.
2677-
2678-
{mapping, "message_interceptors.outgoing.$interceptor.enabled", "rabbit.outgoing_message_interceptors", [
2679-
{datatype, {enum, [true]}}]}.
2680-
2681-
{mapping,
2682-
"message_interceptors.incoming.set_header_timestamp.overwrite",
2683-
"rabbit.incoming_message_interceptors",
2684-
[{datatype, {enum, [true, false]}}]}.
2685-
{mapping,
2686-
"message_interceptors.incoming.rabbit_message_interceptor_routing_node.overwrite",
2687-
"rabbit.incoming_message_interceptors",
2688-
[{datatype, {enum, [true, false]}}]}.
2689-
2690-
{mapping,
2691-
"message_interceptors.incoming.set_header_routing_node.overwrite",
2692-
"rabbit.incoming_message_interceptors",
2693-
[{datatype, {enum, [true, false]}}]}.
2694-
{mapping,
2695-
"message_interceptors.incoming.rabbit_message_interceptor_timestamp.overwrite",
2696-
"rabbit.incoming_message_interceptors",
2697-
[{datatype, {enum, [true, false]}}]}.
2698-
26992670
{translation, "rabbit.incoming_message_interceptors",
2700-
fun(Conf) ->
2701-
case cuttlefish_variable:filter_by_prefix("message_interceptors.incoming", Conf) of
2702-
[] ->
2703-
cuttlefish:unset();
2704-
L ->
2705-
InterceptorsConfig = [
2706-
{Module0, Config, Value}
2707-
|| {["message_interceptors", "incoming", Module0, Config], Value} <- L
2708-
],
2709-
{Result, Order0} = lists:foldl(
2710-
fun({Interceptor0, Key0, Value}, {Acc, Order}) ->
2711-
Interceptor = list_to_atom(Interceptor0),
2712-
Key = list_to_atom(Key0),
2713-
MapPutFun = fun(Old) -> maps:put(Key, Value, Old) end,
2714-
% This Interceptor -> Module alias exists for
2715-
% compatibility reasons
2716-
Module = case Interceptor of
2717-
set_header_timestamp ->
2718-
rabbit_message_interceptor_timestamp;
2719-
set_header_routing_node ->
2720-
rabbit_message_interceptor_routing_node;
2721-
_ ->
2722-
Interceptor
2723-
end,
2724-
NewAcc = maps:update_with(Module,
2725-
MapPutFun,
2726-
#{Key => Value},
2727-
Acc),
2728-
{NewAcc, [Module| Order]}
2729-
end,
2730-
{#{}, []},
2731-
InterceptorsConfig
2732-
),
2733-
Order = lists:uniq(Order0),
2734-
[{O, maps:without([enabled], maps:get(O, Result))} || O <- Order]
2735-
end
2736-
end
2737-
}.
2738-
2739-
{translation, "rabbit.outgoing_message_interceptors",
2740-
fun(Conf) ->
2741-
case cuttlefish_variable:filter_by_prefix("message_interceptors.outgoing", Conf) of
2742-
[] ->
2743-
cuttlefish:unset();
2744-
L ->
2745-
InterceptorsConfig = [
2746-
{Module0, Config, Value}
2747-
|| {["message_interceptors", "outgoing", Module0, Config], Value} <- L
2748-
],
2749-
{Result, Order0} = lists:foldl(
2750-
fun({Interceptor0, Key0, Value}, {Acc, Order}) ->
2751-
Module = list_to_atom(Interceptor0),
2752-
Key = list_to_atom(Key0),
2753-
MapPutFun = fun(Old) -> maps:put(Key, Value, Old) end,
2754-
NewAcc = maps:update_with(Module,
2755-
MapPutFun,
2756-
#{Key => Value},
2757-
Acc),
2758-
{NewAcc, [Module| Order]}
2759-
end,
2760-
{#{}, []},
2761-
InterceptorsConfig
2762-
),
2763-
Order = lists:uniq(Order0),
2764-
[{O, maps:without([enabled], maps:get(O, Result))} || O <- Order]
2765-
end
2766-
end
2671+
fun(Conf) ->
2672+
case cuttlefish_variable:filter_by_prefix("message_interceptors", Conf) of
2673+
[] ->
2674+
cuttlefish:unset();
2675+
L ->
2676+
[begin
2677+
Interceptor = list_to_atom(Interceptor0),
2678+
Mod = case Interceptor of
2679+
set_header_timestamp ->
2680+
rabbit_message_interceptor_timestamp;
2681+
set_header_routing_node ->
2682+
rabbit_message_interceptor_routing_node;
2683+
_ ->
2684+
cuttlefish:invalid(io_lib:format("~p is invalid", [Interceptor]))
2685+
end,
2686+
{Mod, #{overwrite => Overwrite}}
2687+
end || {["message_interceptors", "incoming", Interceptor0, "overwrite"], Overwrite} <- L]
2688+
end
2689+
end
27672690
}.
27682691

27692692
{mapping, "stream.replication.port_range.min", "osiris.port_range", [

deps/rabbit/src/rabbit.erl

+1-2
Original file line numberDiff line numberDiff line change
@@ -1656,8 +1656,7 @@ persist_static_configuration() ->
16561656
[classic_queue_index_v2_segment_entry_count,
16571657
classic_queue_store_v2_max_cache_size,
16581658
classic_queue_store_v2_check_crc32,
1659-
incoming_message_interceptors,
1660-
outgoing_message_interceptors
1659+
incoming_message_interceptors
16611660
]),
16621661

16631662
%% Disallow the following two cases:

deps/rabbit/src/rabbit_amqp_session.erl

+3-3
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,7 @@
284284
max_incoming_window :: pos_integer(),
285285
max_link_credit :: pos_integer(),
286286
max_queue_credit :: pos_integer(),
287-
msg_interceptor_ctx :: map()
287+
msg_interceptor_ctx :: rabbit_message_interceptor:context()
288288
}).
289289

290290
-record(state, {
@@ -477,9 +477,9 @@ init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ContainerId,
477477
max_link_credit = MaxLinkCredit,
478478
max_queue_credit = MaxQueueCredit,
479479
msg_interceptor_ctx = #{protocol => ?PROTOCOL,
480-
username => User#user.username,
481480
vhost => Vhost,
482-
conn_name => ConnName}
481+
username => User#user.username,
482+
connection_name => ConnName}
483483
}}}.
484484

485485
terminate(_Reason, #state{incoming_links = IncomingLinks,

deps/rabbit/src/rabbit_channel.erl

+6-6
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@
111111
max_consumers, % taken from rabbit.consumer_max_per_channel
112112
%% defines how ofter gc will be executed
113113
writer_gc_threshold,
114-
msg_interceptor_ctx
114+
msg_interceptor_ctx :: rabbit_message_interceptor:context()
115115
}).
116116

117117
-record(pending_ack, {
@@ -493,6 +493,10 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
493493
OptionalVariables = extract_variable_map_from_amqp_params(AmqpParams),
494494
{ok, GCThreshold} = application:get_env(rabbit, writer_gc_threshold),
495495
MaxConsumers = application:get_env(rabbit, consumer_max_per_channel, infinity),
496+
MsgInterceptorCtx = #{protocol => amqp091,
497+
vhost => VHost,
498+
username => User#user.username,
499+
connection_name => ConnName},
496500
State = #ch{cfg = #conf{state = starting,
497501
protocol = Protocol,
498502
channel = Channel,
@@ -511,11 +515,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
511515
authz_context = OptionalVariables,
512516
max_consumers = MaxConsumers,
513517
writer_gc_threshold = GCThreshold,
514-
msg_interceptor_ctx = #{protocol => amqp091,
515-
username => User#user.username,
516-
vhost => VHost,
517-
conn_name => ConnName}
518-
},
518+
msg_interceptor_ctx = MsgInterceptorCtx},
519519
limiter = Limiter,
520520
tx = none,
521521
next_tag = 1,

deps/rabbit/src/rabbit_message_interceptor.erl

+37-37
Original file line numberDiff line numberDiff line change
@@ -1,46 +1,46 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
17
-module(rabbit_message_interceptor).
28

3-
-export([intercept/3,
4-
set_msg_annotation/4]).
9+
%% client API
10+
-export([intercept/3]).
11+
%% helpers for behaviour implementations
12+
-export([set_annotation/4]).
513

14+
%% same protocol names as output by Prometheus endpoint
615
-type protocol() :: amqp091 | amqp10 | mqtt310 | mqtt311 | mqtt50.
716

8-
-type msg_interceptor_ctx() :: #{protocol := protocol(),
9-
vhost := binary(),
10-
username := binary(),
11-
conn_name => binary(),
12-
atom() => term()}.
13-
14-
-callback intercept(Msg, MsgInterceptorCtx, Group, Config) -> Msg when
15-
Msg :: mc:state(),
16-
MsgInterceptorCtx :: msg_interceptor_ctx(),
17-
Group :: incoming_message_interceptors | outgoing_message_interceptors,
18-
Config :: #{atom() := term()}.
19-
20-
-spec intercept(Msg, MsgInterceptorCtx, Group) -> Msg when
21-
Msg :: mc:state(),
22-
MsgInterceptorCtx :: map(),
23-
Group :: incoming_message_interceptors | outgoing_message_interceptors.
24-
intercept(Msg, MsgInterceptorCtx, Group) ->
17+
-type context() :: #{protocol := protocol(),
18+
vhost := rabbit_types:vhost(),
19+
username := rabbit_types:username(),
20+
connection_name := binary(),
21+
atom() => term()}.
22+
23+
-type group() :: incoming_message_interceptors |
24+
outgoing_message_interceptors.
25+
26+
-type config() :: #{atom() => term()}.
27+
28+
-export_type([context/0]).
29+
30+
-callback intercept(mc:state(), context(), group(), config()) ->
31+
mc:state().
32+
33+
-spec intercept(mc:state(), context(), group()) ->
34+
mc:state().
35+
intercept(Msg, Ctx, Group) ->
2536
Interceptors = persistent_term:get(Group, []),
26-
lists:foldl(fun({Module, Config}, Msg0) ->
27-
try
28-
Module:intercept(Msg0,
29-
MsgInterceptorCtx,
30-
Group,
31-
Config)
32-
catch
33-
error:undef ->
34-
Msg0
35-
end
36-
end, Msg , Interceptors).
37-
38-
-spec set_msg_annotation(mc:state(),
39-
mc:ann_key(),
40-
mc:ann_value(),
41-
boolean()
42-
) -> mc:state().
43-
set_msg_annotation(Msg, Key, Value, Overwrite) ->
37+
lists:foldl(fun({Mod, Config}, Msg0) ->
38+
Mod:intercept(Msg0, Ctx, Group, Config)
39+
end, Msg, Interceptors).
40+
41+
-spec set_annotation(mc:state(), mc:ann_key(), mc:ann_value(), boolean()) ->
42+
mc:state().
43+
set_annotation(Msg, Key, Value, Overwrite) ->
4444
case {mc:x_header(Key, Msg), Overwrite} of
4545
{Val, false} when Val =/= undefined ->
4646
Msg;
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,22 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
17
-module(rabbit_message_interceptor_routing_node).
28
-behaviour(rabbit_message_interceptor).
39

410
-define(HEADER_ROUTING_NODE, <<"x-routed-by">>).
511

612
-export([intercept/4]).
713

8-
intercept(Msg, _MsgInterceptorCtx, _Group, Config) ->
14+
intercept(Msg, _Ctx, incoming_message_interceptors, Config) ->
915
Node = atom_to_binary(node()),
10-
Overwrite = maps:get(overwrite, Config, false),
11-
rabbit_message_interceptor:set_msg_annotation(Msg,
12-
?HEADER_ROUTING_NODE,
13-
Node,
14-
Overwrite).
16+
Overwrite = maps:get(overwrite, Config),
17+
rabbit_message_interceptor:set_annotation(Msg,
18+
?HEADER_ROUTING_NODE,
19+
Node,
20+
Overwrite);
21+
intercept(Msg, _Ctx, _Group, _Config) ->
22+
Msg.
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
17
-module(rabbit_message_interceptor_timestamp).
28
-behaviour(rabbit_message_interceptor).
39

@@ -7,20 +13,21 @@
713

814
-export([intercept/4]).
915

10-
intercept(Msg0, _MsgInterceptorCtx, _Group, Config) ->
16+
intercept(Msg0, _Ctx, incoming_message_interceptors, Config) ->
1117
Ts = mc:get_annotation(?ANN_RECEIVED_AT_TIMESTAMP, Msg0),
12-
Overwrite = maps:get(overwrite, Config, false),
13-
Msg = rabbit_message_interceptor:set_msg_annotation(
14-
Msg0,
15-
?HEADER_TIMESTAMP,
16-
Ts,
17-
Overwrite),
18-
set_msg_timestamp(Msg, Ts, Overwrite).
18+
Overwrite = maps:get(overwrite, Config),
19+
Msg = rabbit_message_interceptor:set_annotation(Msg0,
20+
?HEADER_TIMESTAMP,
21+
Ts,
22+
Overwrite),
23+
set_timestamp(Msg, Ts, Overwrite);
24+
intercept(Msg, _MsgInterceptorCtx, _Group, _Config) ->
25+
Msg.
1926

20-
set_msg_timestamp(Msg, Timestamp, Overwrite) ->
27+
set_timestamp(Msg, Ts, Overwrite) ->
2128
case {mc:timestamp(Msg), Overwrite} of
22-
{Ts, false} when is_integer(Ts) ->
29+
{ExistingTs, false} when is_integer(ExistingTs) ->
2330
Msg;
2431
_ ->
25-
mc:set_annotation(?ANN_TIMESTAMP, Timestamp, Msg)
32+
mc:set_annotation(?ANN_TIMESTAMP, Ts, Msg)
2633
end.

0 commit comments

Comments
 (0)