Skip to content

Commit a24ba55

Browse files
committed
Store message interceptor context in MQTT proc state
It's a tradeoff between building the map for each incoming and outgoing message (now that there are also outgoing interceptors) vs increased memory usage for the MQTT proc state. Connecting with MQTT 5.0 and client ID "xxxxxxxx", the number of words are 201 before this commit vs 235 after this commit as determined by: ``` S = sys:get_state(MQTTConnectionPid), erts_debug:size(S). ``` Therefore, this commit requires 34 word * 8 bytes = 272 bytes more per MQTT connection, that is 272 MB more for 1,000,000 MQTT connections.
1 parent 21bd300 commit a24ba55

6 files changed

+30
-33
lines changed

deps/rabbit/src/rabbit_channel.erl

+5-5
Original file line numberDiff line numberDiff line change
@@ -493,10 +493,10 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
493493
OptionalVariables = extract_variable_map_from_amqp_params(AmqpParams),
494494
{ok, GCThreshold} = application:get_env(rabbit, writer_gc_threshold),
495495
MaxConsumers = application:get_env(rabbit, consumer_max_per_channel, infinity),
496-
MsgInterceptorCtx = #{protocol => amqp091,
497-
vhost => VHost,
498-
username => User#user.username,
499-
connection_name => ConnName},
496+
MsgIcptCtx = #{protocol => amqp091,
497+
vhost => VHost,
498+
username => User#user.username,
499+
connection_name => ConnName},
500500
State = #ch{cfg = #conf{state = starting,
501501
protocol = Protocol,
502502
channel = Channel,
@@ -515,7 +515,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
515515
authz_context = OptionalVariables,
516516
max_consumers = MaxConsumers,
517517
writer_gc_threshold = GCThreshold,
518-
msg_interceptor_ctx = MsgInterceptorCtx},
518+
msg_interceptor_ctx = MsgIcptCtx},
519519
limiter = Limiter,
520520
tx = none,
521521
next_tag = 1,

deps/rabbit/src/rabbit_msg_interceptor.erl

+2-2
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ intercept_outgoing(Msg, Ctx) ->
4545

4646
intercept(Msg, Ctx, Stage) ->
4747
Interceptors = persistent_term:get(?KEY),
48-
lists:foldl(fun({Mod, Config}, Msg0) ->
49-
Mod:intercept(Msg0, Ctx, Stage, Config)
48+
lists:foldl(fun({Mod, Cfg}, Msg0) ->
49+
Mod:intercept(Msg0, Ctx, Stage, Cfg)
5050
end, Msg, Interceptors).
5151

5252
-spec set_annotation(mc:state(), mc:ann_key(), mc:ann_value(),

deps/rabbit/src/rabbit_msg_interceptor_routing_node.erl

+3-3
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@
1111

1212
-export([intercept/4]).
1313

14-
intercept(Msg, _Ctx, incoming, Config) ->
14+
intercept(Msg, _Ctx, incoming, Cfg) ->
1515
Node = atom_to_binary(node()),
16-
Overwrite = maps:get(overwrite, Config),
16+
Overwrite = maps:get(overwrite, Cfg),
1717
rabbit_msg_interceptor:set_annotation(Msg, ?KEY, Node, Overwrite);
18-
intercept(Msg, _Ctx, _Stage, _Config) ->
18+
intercept(Msg, _Ctx, _Stage, _Cfg) ->
1919
Msg.

deps/rabbit/src/rabbit_msg_interceptor_timestamp.erl

+3-3
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,15 @@
1616

1717
-export([intercept/4]).
1818

19-
intercept(Msg0, _Ctx, incoming, #{incoming := _True} = Config) ->
20-
Overwrite = maps:get(overwrite, Config),
19+
intercept(Msg0, _Ctx, incoming, #{incoming := _True} = Cfg) ->
20+
Overwrite = maps:get(overwrite, Cfg),
2121
Ts = mc:get_annotation(?ANN_RECEIVED_AT_TIMESTAMP, Msg0),
2222
Msg = rabbit_msg_interceptor:set_annotation(Msg0, ?KEY_INCOMING, Ts, Overwrite),
2323
set_timestamp(Msg, Ts, Overwrite);
2424
intercept(Msg, _Ctx, outgoing, #{outgoing := _True}) ->
2525
Ts = os:system_time(millisecond),
2626
mc:set_annotation(?KEY_OUTGOING, Ts, Msg);
27-
intercept(Msg, _MsgInterceptorCtx, _Stage, _Config) ->
27+
intercept(Msg, _MsgIcptCtx, _Stage, _Cfg) ->
2828
Msg.
2929

3030
set_timestamp(Msg, Ts, true) ->

deps/rabbitmq_mqtt/src/rabbit_mqtt_msg_interceptor_client_id.erl

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,5 @@ intercept(Msg, #{protocol := Proto, client_id := ClientId}, incoming, _Cfg)
1616
Proto =:= mqtt311 orelse
1717
Proto =:= mqtt310 ->
1818
mc:set_annotation(?KEY, ClientId, Msg);
19-
intercept(Msg, _Ctx, _Stage, _Config) ->
19+
intercept(Msg, _Ctx, _Stage, _Cfg) ->
2020
Msg.

deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl

+16-19
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,8 @@
9292
%% The database stores the MQTT subscription options in the binding arguments for:
9393
%% * v1 as Erlang record #mqtt_subscription_opts{}
9494
%% * v2 as AMQP 0.9.1 table
95-
binding_args_v2 :: boolean()
95+
binding_args_v2 :: boolean(),
96+
msg_interceptor_ctx :: rabbit_msg_interceptor:context()
9697
}).
9798

9899
-record(state,
@@ -214,9 +215,15 @@ process_connect(
214215
%% To simplify logic, we decide at connection establishment time to stick
215216
%% with either binding args v1 or v2 for the lifetime of the connection.
216217
BindingArgsV2 = rabbit_feature_flags:is_enabled('rabbitmq_4.1.0'),
218+
ProtoVerAtom = proto_integer_to_atom(ProtoVer),
219+
MsgIcptCtx = #{protocol => ProtoVerAtom,
220+
vhost => VHost,
221+
username => Username,
222+
connection_name => ConnName,
223+
client_id => ClientId},
217224
S = #state{
218225
cfg = #cfg{socket = Socket,
219-
proto_ver = proto_integer_to_atom(ProtoVer),
226+
proto_ver = ProtoVerAtom,
220227
clean_start = CleanStart,
221228
session_expiry_interval_secs = SessionExpiry,
222229
ssl_login_name = SslLoginName,
@@ -237,7 +244,8 @@ process_connect(
237244
will_msg = WillMsg,
238245
max_packet_size_outbound = MaxPacketSize,
239246
topic_alias_maximum_outbound = TopicAliasMaxOutbound,
240-
binding_args_v2 = BindingArgsV2},
247+
binding_args_v2 = BindingArgsV2,
248+
msg_interceptor_ctx = MsgIcptCtx},
241249
auth_state = #auth_state{
242250
user = User,
243251
authz_ctx = AuthzCtx}},
@@ -1632,15 +1640,15 @@ publish_to_queues(
16321640
#state{cfg = #cfg{exchange = ExchangeName = #resource{name = ExchangeNameBin},
16331641
delivery_flow = Flow,
16341642
conn_name = ConnName,
1635-
trace_state = TraceState},
1643+
trace_state = TraceState,
1644+
msg_interceptor_ctx = MsgIcptCtx},
16361645
auth_state = #auth_state{user = #user{username = Username}}} = State) ->
16371646
Anns = #{?ANN_EXCHANGE => ExchangeNameBin,
16381647
?ANN_ROUTING_KEYS => [mqtt_to_amqp(Topic)]},
16391648
Msg0 = mc:init(mc_mqtt, MqttMsg, Anns, mc_env()),
16401649
case rabbit_exchange:lookup(ExchangeName) of
16411650
{ok, Exchange} ->
1642-
Ctx = msg_interceptor_ctx(State),
1643-
Msg = rabbit_msg_interceptor:intercept_incoming(Msg0, Ctx),
1651+
Msg = rabbit_msg_interceptor:intercept_incoming(Msg0, MsgIcptCtx),
16441652
QNames0 = rabbit_exchange:route(Exchange, Msg, #{return_binding_keys => true}),
16451653
QNames = drop_local(QNames0, State),
16461654
rabbit_trace:tap_in(Msg, QNames, ConnName, Username, TraceState),
@@ -2066,13 +2074,13 @@ deliver_to_client(Msgs, Ack, State) ->
20662074
end, State, Msgs).
20672075

20682076
deliver_one_to_client({QNameOrType, QPid, QMsgId, _Redelivered, Mc} = Delivery,
2069-
AckRequired, State0) ->
2077+
AckRequired,
2078+
#state{cfg = #cfg{msg_interceptor_ctx = MsgIcptCtx}} = State0) ->
20702079
SubscriberQoS = case AckRequired of
20712080
true -> ?QOS_1;
20722081
false -> ?QOS_0
20732082
end,
20742083
McMqtt0 = mc:convert(mc_mqtt, Mc, mc_env()),
2075-
MsgIcptCtx = msg_interceptor_ctx(State0),
20762084
McMqtt = rabbit_msg_interceptor:intercept_outgoing(McMqtt0, MsgIcptCtx),
20772085
MqttMsg = #mqtt_msg{qos = PublisherQos} = mc:protocol_state(McMqtt),
20782086
QoS = effective_qos(PublisherQos, SubscriberQoS),
@@ -2539,17 +2547,6 @@ message_redelivered(_, _, _) ->
25392547
is_success(ReasonCode) ->
25402548
ReasonCode < ?RC_UNSPECIFIED_ERROR.
25412549

2542-
msg_interceptor_ctx(#state{cfg = #cfg{client_id = ClientId,
2543-
conn_name = ConnName,
2544-
vhost = VHost,
2545-
proto_ver = ProtoVer},
2546-
auth_state = #auth_state{user = #user{username = Username}}}) ->
2547-
#{protocol => ProtoVer,
2548-
vhost => VHost,
2549-
username => Username,
2550-
connection_name => ConnName,
2551-
client_id => ClientId}.
2552-
25532550
-spec format_status(state()) -> map().
25542551
format_status(
25552552
#state{queue_states = QState,

0 commit comments

Comments
 (0)