From 7c7d61bad8a807e87f371fe6825f610931dcfa5c Mon Sep 17 00:00:00 2001 From: Lois Soto Lopez Date: Thu, 27 Mar 2025 16:08:24 +0100 Subject: [PATCH 1/6] Add incoming message interceptors MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit enables users to provide custom message interceptor modules, i.e. modules to process incoming and outgoing messages. The `rabbit_message_interceptor` behaviour defines a `intercept/4` callback, for those modules to implement. Co-authored-by: Péter Gömöri --- deps/rabbit/priv/schema/rabbit.schema | 109 +++++++++++++++--- deps/rabbit/src/rabbit.erl | 3 +- deps/rabbit/src/rabbit_amqp_session.erl | 16 ++- deps/rabbit/src/rabbit_channel.erl | 17 ++- .../rabbit/src/rabbit_message_interceptor.erl | 75 ++++++------ ...abbit_message_interceptor_routing_node.erl | 14 +++ .../rabbit_message_interceptor_timestamp.erl | 26 +++++ deps/rabbit/test/amqp_client_SUITE.erl | 7 +- .../config_schema_SUITE_data/rabbit.snippets | 54 ++++++++- deps/rabbit/test/mc_unit_SUITE.erl | 6 +- .../test/rabbit_message_interceptor_SUITE.erl | 6 +- .../priv/schema/rabbitmq_mqtt.schema | 6 + ...bit_mqtt_message_interceptor_client_id.erl | 17 +++ .../src/rabbit_mqtt_processor.erl | 18 ++- deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl | 5 +- 15 files changed, 304 insertions(+), 75 deletions(-) create mode 100644 deps/rabbit/src/rabbit_message_interceptor_routing_node.erl create mode 100644 deps/rabbit/src/rabbit_message_interceptor_timestamp.erl create mode 100644 deps/rabbitmq_mqtt/src/rabbit_mqtt_message_interceptor_client_id.erl diff --git a/deps/rabbit/priv/schema/rabbit.schema b/deps/rabbit/priv/schema/rabbit.schema index 1118c7827ab0..664ce02b38df 100644 --- a/deps/rabbit/priv/schema/rabbit.schema +++ b/deps/rabbit/priv/schema/rabbit.schema @@ -2667,23 +2667,102 @@ end}. {mapping, "message_interceptors.incoming.$interceptor.overwrite", "rabbit.incoming_message_interceptors", [ {datatype, {enum, [true, false]}}]}. +% Pseudo-key to include the interceptor in the list of interceptors. +% - If any other configuration is provided for the interceptor this +% configuration is not required. +% - If no other configuration is provided, this one is required so that the +% interceptor gets invoked. +{mapping, "message_interceptors.incoming.$interceptor.enabled", "rabbit.incoming_message_interceptors", [ + {datatype, {enum, [true]}}]}. + +{mapping, "message_interceptors.outgoing.$interceptor.enabled", "rabbit.outgoing_message_interceptors", [ + {datatype, {enum, [true]}}]}. + +{mapping, + "message_interceptors.incoming.set_header_timestamp.overwrite", + "rabbit.incoming_message_interceptors", + [{datatype, {enum, [true, false]}}]}. +{mapping, + "message_interceptors.incoming.rabbit_message_interceptor_routing_node.overwrite", + "rabbit.incoming_message_interceptors", + [{datatype, {enum, [true, false]}}]}. + +{mapping, + "message_interceptors.incoming.set_header_routing_node.overwrite", + "rabbit.incoming_message_interceptors", + [{datatype, {enum, [true, false]}}]}. +{mapping, + "message_interceptors.incoming.rabbit_message_interceptor_timestamp.overwrite", + "rabbit.incoming_message_interceptors", + [{datatype, {enum, [true, false]}}]}. + {translation, "rabbit.incoming_message_interceptors", fun(Conf) -> - case cuttlefish_variable:filter_by_prefix("message_interceptors", Conf) of - [] -> - cuttlefish:unset(); - L -> - [begin - Interceptor = list_to_atom(Interceptor0), - case lists:member(Interceptor, [set_header_timestamp, - set_header_routing_node]) of - true -> - {Interceptor, Overwrite}; - false -> - cuttlefish:invalid(io_lib:format("~p is invalid", [Interceptor])) - end - end || {["message_interceptors", "incoming", Interceptor0, "overwrite"], Overwrite} <- L] - end + case cuttlefish_variable:filter_by_prefix("message_interceptors.incoming", Conf) of + [] -> + cuttlefish:unset(); + L -> + InterceptorsConfig = [ + {Module0, Config, Value} + || {["message_interceptors", "incoming", Module0, Config], Value} <- L + ], + {Result, Order0} = lists:foldl( + fun({Interceptor0, Key0, Value}, {Acc, Order}) -> + Interceptor = list_to_atom(Interceptor0), + Key = list_to_atom(Key0), + MapPutFun = fun(Old) -> maps:put(Key, Value, Old) end, + % This Interceptor -> Module alias exists for + % compatibility reasons + Module = case Interceptor of + set_header_timestamp -> + rabbit_message_interceptor_timestamp; + set_header_routing_node -> + rabbit_message_interceptor_routing_node; + _ -> + Interceptor + end, + NewAcc = maps:update_with(Module, + MapPutFun, + #{Key => Value}, + Acc), + {NewAcc, [Module| Order]} + end, + {#{}, []}, + InterceptorsConfig + ), + Order = lists:uniq(Order0), + [{O, maps:without([enabled], maps:get(O, Result))} || O <- Order] + end + end +}. + +{translation, "rabbit.outgoing_message_interceptors", + fun(Conf) -> + case cuttlefish_variable:filter_by_prefix("message_interceptors.outgoing", Conf) of + [] -> + cuttlefish:unset(); + L -> + InterceptorsConfig = [ + {Module0, Config, Value} + || {["message_interceptors", "outgoing", Module0, Config], Value} <- L + ], + {Result, Order0} = lists:foldl( + fun({Interceptor0, Key0, Value}, {Acc, Order}) -> + Module = list_to_atom(Interceptor0), + Key = list_to_atom(Key0), + MapPutFun = fun(Old) -> maps:put(Key, Value, Old) end, + NewAcc = maps:update_with(Module, + MapPutFun, + #{Key => Value}, + Acc), + {NewAcc, [Module| Order]} + end, + {#{}, []}, + InterceptorsConfig + ), + Order = lists:uniq(Order0), + [{O, maps:without([enabled], maps:get(O, Result))} || O <- Order] + end end }. diff --git a/deps/rabbit/src/rabbit.erl b/deps/rabbit/src/rabbit.erl index 525b1db835ac..fee70422b0b2 100644 --- a/deps/rabbit/src/rabbit.erl +++ b/deps/rabbit/src/rabbit.erl @@ -1656,7 +1656,8 @@ persist_static_configuration() -> [classic_queue_index_v2_segment_entry_count, classic_queue_store_v2_max_cache_size, classic_queue_store_v2_check_crc32, - incoming_message_interceptors + incoming_message_interceptors, + outgoing_message_interceptors ]), %% Disallow the following two cases: diff --git a/deps/rabbit/src/rabbit_amqp_session.erl b/deps/rabbit/src/rabbit_amqp_session.erl index d72a9666fe4f..606c23aef211 100644 --- a/deps/rabbit/src/rabbit_amqp_session.erl +++ b/deps/rabbit/src/rabbit_amqp_session.erl @@ -283,7 +283,8 @@ max_handle :: link_handle(), max_incoming_window :: pos_integer(), max_link_credit :: pos_integer(), - max_queue_credit :: pos_integer() + max_queue_credit :: pos_integer(), + msg_interceptor_ctx :: map() }). -record(state, { @@ -474,7 +475,11 @@ init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ContainerId, max_handle = EffectiveHandleMax, max_incoming_window = MaxIncomingWindow, max_link_credit = MaxLinkCredit, - max_queue_credit = MaxQueueCredit + max_queue_credit = MaxQueueCredit, + msg_interceptor_ctx = #{protocol => ?PROTOCOL, + username => User#user.username, + vhost => Vhost, + conn_name => ConnName} }}}. terminate(_Reason, #state{incoming_links = IncomingLinks, @@ -2411,7 +2416,8 @@ incoming_link_transfer( trace_state = Trace, conn_name = ConnName, channel_num = ChannelNum, - max_link_credit = MaxLinkCredit}}) -> + max_link_credit = MaxLinkCredit, + msg_interceptor_ctx = MsgInterceptorCtx}}) -> {PayloadBin, DeliveryId, Settled} = case MultiTransfer of @@ -2436,7 +2442,9 @@ incoming_link_transfer( Mc0 = mc:init(mc_amqp, PayloadBin, #{}), case lookup_target(LinkExchange, LinkRKey, Mc0, Vhost, User, PermCache0) of {ok, X, RoutingKeys, Mc1, PermCache} -> - Mc2 = rabbit_message_interceptor:intercept(Mc1), + Mc2 = rabbit_message_interceptor:intercept(Mc1, + MsgInterceptorCtx, + incoming_message_interceptors), check_user_id(Mc2, User), TopicPermCache = check_write_permitted_on_topics( X, User, RoutingKeys, TopicPermCache0), diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index 86d71d7af902..c188fd70bbd7 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -110,7 +110,8 @@ authz_context, max_consumers, % taken from rabbit.consumer_max_per_channel %% defines how ofter gc will be executed - writer_gc_threshold + writer_gc_threshold, + msg_interceptor_ctx }). -record(pending_ack, { @@ -509,7 +510,11 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, consumer_timeout = ConsumerTimeout, authz_context = OptionalVariables, max_consumers = MaxConsumers, - writer_gc_threshold = GCThreshold + writer_gc_threshold = GCThreshold, + msg_interceptor_ctx = #{protocol => amqp091, + username => User#user.username, + vhost => VHost, + conn_name => ConnName} }, limiter = Limiter, tx = none, @@ -813,6 +818,7 @@ get_consumer_timeout() -> _ -> undefined end. + %%--------------------------------------------------------------------------- reply(Reply, NewState) -> {reply, Reply, next_state(NewState), hibernate}. @@ -1167,7 +1173,8 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, user = #user{username = Username} = User, trace_state = TraceState, authz_context = AuthzContext, - writer_gc_threshold = GCThreshold + writer_gc_threshold = GCThreshold, + msg_interceptor_ctx = MsgInterceptorCtx }, tx = Tx, confirm_enabled = ConfirmEnabled, @@ -1206,7 +1213,9 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, rabbit_misc:precondition_failed("invalid message: ~tp", [Reason]); {ok, Message0} -> check_write_permitted_on_topics(Exchange, User, Message0, AuthzContext), - Message = rabbit_message_interceptor:intercept(Message0), + Message = rabbit_message_interceptor:intercept(Message0, + MsgInterceptorCtx, + incoming_message_interceptors), check_user_id_header(Message, User), QNames = rabbit_exchange:route(Exchange, Message, #{return_binding_keys => true}), [deliver_reply(RK, Message) || {virtual_reply_queue, RK} <- QNames], diff --git a/deps/rabbit/src/rabbit_message_interceptor.erl b/deps/rabbit/src/rabbit_message_interceptor.erl index 0d28fe6ef9af..b218c46955e8 100644 --- a/deps/rabbit/src/rabbit_message_interceptor.erl +++ b/deps/rabbit/src/rabbit_message_interceptor.erl @@ -1,50 +1,49 @@ -%% This Source Code Form is subject to the terms of the Mozilla Public -%% License, v. 2.0. If a copy of the MPL was not distributed with this -%% file, You can obtain one at https://mozilla.org/MPL/2.0/. -%% -%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. - -%% This module exists since 3.12 replacing plugins rabbitmq-message-timestamp -%% and rabbitmq-routing-node-stamp. Instead of using these plugins, RabbitMQ core can -%% now be configured to add such headers. This enables non-AMQP 0.9.1 protocols (that -%% do not use rabbit_channel) to also add AMQP 0.9.1 headers to incoming messages. -module(rabbit_message_interceptor). --include("mc.hrl"). --export([intercept/1]). +-export([intercept/3, + set_msg_annotation/4]). + +-type protocol() :: amqp091 | amqp10 | mqtt310 | mqtt311 | mqtt50. --define(HEADER_TIMESTAMP, <<"timestamp_in_ms">>). --define(HEADER_ROUTING_NODE, <<"x-routed-by">>). +-type msg_interceptor_ctx() :: #{protocol := protocol(), + vhost := binary(), + username := binary(), + conn_name => binary(), + atom() => term()}. --spec intercept(mc:state()) -> mc:state(). -intercept(Msg) -> - Interceptors = persistent_term:get(incoming_message_interceptors, []), - lists:foldl(fun({InterceptorName, Overwrite}, M) -> - intercept(M, InterceptorName, Overwrite) - end, Msg, Interceptors). +-callback intercept(Msg, MsgInterceptorCtx, Group, Config) -> Msg when + Msg :: mc:state(), + MsgInterceptorCtx :: msg_interceptor_ctx(), + Group :: incoming_message_interceptors | outgoing_message_interceptors, + Config :: #{atom() := term()}. -intercept(Msg, set_header_routing_node, Overwrite) -> - Node = atom_to_binary(node()), - set_annotation(Msg, ?HEADER_ROUTING_NODE, Node, Overwrite); -intercept(Msg0, set_header_timestamp, Overwrite) -> - Ts = mc:get_annotation(?ANN_RECEIVED_AT_TIMESTAMP, Msg0), - Msg = set_annotation(Msg0, ?HEADER_TIMESTAMP, Ts, Overwrite), - set_timestamp(Msg, Ts, Overwrite). +-spec intercept(Msg, MsgInterceptorCtx, Group) -> Msg when + Msg :: mc:state(), + MsgInterceptorCtx :: map(), + Group :: incoming_message_interceptors | outgoing_message_interceptors. +intercept(Msg, MsgInterceptorCtx, Group) -> + Interceptors = persistent_term:get(Group, []), + lists:foldl(fun({Module, Config}, Msg0) -> + try + Module:intercept(Msg0, + MsgInterceptorCtx, + Group, + Config) + catch + error:undef -> + Msg0 + end + end, Msg , Interceptors). --spec set_annotation(mc:state(), mc:ann_key(), mc:ann_value(), boolean()) -> mc:state(). -set_annotation(Msg, Key, Value, Overwrite) -> +-spec set_msg_annotation(mc:state(), + mc:ann_key(), + mc:ann_value(), + boolean() + ) -> mc:state(). +set_msg_annotation(Msg, Key, Value, Overwrite) -> case {mc:x_header(Key, Msg), Overwrite} of {Val, false} when Val =/= undefined -> Msg; _ -> mc:set_annotation(Key, Value, Msg) end. - --spec set_timestamp(mc:state(), pos_integer(), boolean()) -> mc:state(). -set_timestamp(Msg, Timestamp, Overwrite) -> - case {mc:timestamp(Msg), Overwrite} of - {Ts, false} when is_integer(Ts) -> - Msg; - _ -> - mc:set_annotation(?ANN_TIMESTAMP, Timestamp, Msg) - end. diff --git a/deps/rabbit/src/rabbit_message_interceptor_routing_node.erl b/deps/rabbit/src/rabbit_message_interceptor_routing_node.erl new file mode 100644 index 000000000000..1b3f384bf904 --- /dev/null +++ b/deps/rabbit/src/rabbit_message_interceptor_routing_node.erl @@ -0,0 +1,14 @@ +-module(rabbit_message_interceptor_routing_node). +-behaviour(rabbit_message_interceptor). + +-define(HEADER_ROUTING_NODE, <<"x-routed-by">>). + +-export([intercept/4]). + +intercept(Msg, _MsgInterceptorCtx, _Group, Config) -> + Node = atom_to_binary(node()), + Overwrite = maps:get(overwrite, Config, false), + rabbit_message_interceptor:set_msg_annotation(Msg, + ?HEADER_ROUTING_NODE, + Node, + Overwrite). diff --git a/deps/rabbit/src/rabbit_message_interceptor_timestamp.erl b/deps/rabbit/src/rabbit_message_interceptor_timestamp.erl new file mode 100644 index 000000000000..058fd757f5ca --- /dev/null +++ b/deps/rabbit/src/rabbit_message_interceptor_timestamp.erl @@ -0,0 +1,26 @@ +-module(rabbit_message_interceptor_timestamp). +-behaviour(rabbit_message_interceptor). + +-include("mc.hrl"). + +-define(HEADER_TIMESTAMP, <<"timestamp_in_ms">>). + +-export([intercept/4]). + +intercept(Msg0, _MsgInterceptorCtx, _Group, Config) -> + Ts = mc:get_annotation(?ANN_RECEIVED_AT_TIMESTAMP, Msg0), + Overwrite = maps:get(overwrite, Config, false), + Msg = rabbit_message_interceptor:set_msg_annotation( + Msg0, + ?HEADER_TIMESTAMP, + Ts, + Overwrite), + set_msg_timestamp(Msg, Ts, Overwrite). + +set_msg_timestamp(Msg, Timestamp, Overwrite) -> + case {mc:timestamp(Msg), Overwrite} of + {Ts, false} when is_integer(Ts) -> + Msg; + _ -> + mc:set_annotation(?ANN_TIMESTAMP, Timestamp, Msg) + end. diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index 4b2e5e43623c..db060329f207 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -4380,8 +4380,11 @@ available_messages(QType, Config) -> incoming_message_interceptors(Config) -> Key = ?FUNCTION_NAME, - ok = rpc(Config, persistent_term, put, [Key, [{set_header_routing_node, false}, - {set_header_timestamp, false}]]), + ok = rpc(Config, + persistent_term, + put, + [Key, [{rabbit_message_interceptor_routing_node, #{overwrite => false}}, + {rabbit_message_interceptor_timestamp, #{overwrite => false}}]]), Stream = <<"my stream">>, QQName = <<"my quorum queue">>, {_, Session, LinkPair} = Init = init(Config), diff --git a/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets b/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets index 5e266656073d..49236c2bdcf3 100644 --- a/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets +++ b/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets @@ -1114,7 +1114,9 @@ credential_validator.regexp = ^abc\\d+", {message_interceptors, "message_interceptors.incoming.set_header_timestamp.overwrite = true", [{rabbit, [ - {incoming_message_interceptors, [{set_header_timestamp, true}]} + {incoming_message_interceptors, [ + {rabbit_message_interceptor_timestamp, #{overwrite => true}} + ]} ]}], []}, @@ -1124,8 +1126,54 @@ credential_validator.regexp = ^abc\\d+", message_interceptors.incoming.set_header_timestamp.overwrite = false ", [{rabbit, [ - {incoming_message_interceptors, [{set_header_routing_node, false}, - {set_header_timestamp, false}]} + {incoming_message_interceptors, [ + {rabbit_message_interceptor_routing_node, #{overwrite => false}}, + {rabbit_message_interceptor_timestamp, #{overwrite => false}} + ]} + ]}], + []}, + + % Enable key allows to configure interceptors with empty conf + {message_interceptors, + " + message_interceptors.incoming.set_header_routing_node.enabled = true + ", + [{rabbit, [ + {incoming_message_interceptors, [ + {rabbit_message_interceptor_routing_node, #{}} + ]} + ]}], + []}, + + % An interceptor can be configured twice, with different options, both in + % incoming and outgoing group of interceptors + {message_interceptors, + " + message_interceptors.incoming.rabbit_message_interceptor_routing_node.overwrite = true + message_interceptors.outgoing.rabbit_message_interceptor_routing_node.enabled = true + ", + [{rabbit, [ + {incoming_message_interceptors, [ + {rabbit_message_interceptor_routing_node, #{overwrite => true}} + ]}, + {outgoing_message_interceptors, [ + {rabbit_message_interceptor_routing_node, #{}} + ]} + ]}], + []}, + + % Given a parameter gets configured multiple times, last value prevails + {message_interceptors, + " + message_interceptors.incoming.set_header_routing_node.overwrite = true + message_interceptors.incoming.set_header_routing_node.overwrite = false + message_interceptors.incoming.set_header_routing_node.overwrite = true + message_interceptors.incoming.set_header_routing_node.overwrite = false + ", + [{rabbit, [ + {incoming_message_interceptors, [ + {rabbit_message_interceptor_routing_node, #{overwrite => false}} + ]} ]}], []}, diff --git a/deps/rabbit/test/mc_unit_SUITE.erl b/deps/rabbit/test/mc_unit_SUITE.erl index 4b5feddb509d..00d73d719d88 100644 --- a/deps/rabbit/test/mc_unit_SUITE.erl +++ b/deps/rabbit/test/mc_unit_SUITE.erl @@ -348,8 +348,10 @@ amqpl_amqp_bin_amqpl(_Config) -> Msg0 = mc:init(mc_amqpl, Content, annotations()), ok = persistent_term:put(incoming_message_interceptors, - [{set_header_timestamp, false}]), - Msg = rabbit_message_interceptor:intercept(Msg0), + [{rabbit_message_interceptor_timestamp, #{overwrite => false}}]), + Msg = rabbit_message_interceptor:intercept(Msg0, + #{}, + incoming_message_interceptors), ?assertEqual(<<"exch">>, mc:exchange(Msg)), ?assertEqual([<<"apple">>], mc:routing_keys(Msg)), diff --git a/deps/rabbit/test/rabbit_message_interceptor_SUITE.erl b/deps/rabbit/test/rabbit_message_interceptor_SUITE.erl index 1abc39d0b042..37183408e68f 100644 --- a/deps/rabbit/test/rabbit_message_interceptor_SUITE.erl +++ b/deps/rabbit/test/rabbit_message_interceptor_SUITE.erl @@ -40,9 +40,9 @@ init_per_testcase(Testcase, Config0) -> headers_no_overwrite -> false end, Val = maps:to_list( - maps:from_keys([set_header_timestamp, - set_header_routing_node], - Overwrite)), + maps:from_keys([rabbit_message_interceptor_timestamp, + rabbit_message_interceptor_routing_node], + #{overwrite => Overwrite})), Config = rabbit_ct_helpers:merge_app_env( Config1, {rabbit, [{incoming_message_interceptors, Val}]}), rabbit_ct_helpers:run_steps( diff --git a/deps/rabbitmq_mqtt/priv/schema/rabbitmq_mqtt.schema b/deps/rabbitmq_mqtt/priv/schema/rabbitmq_mqtt.schema index b69e2b06075c..89f15fed3ea7 100644 --- a/deps/rabbitmq_mqtt/priv/schema/rabbitmq_mqtt.schema +++ b/deps/rabbitmq_mqtt/priv/schema/rabbitmq_mqtt.schema @@ -303,3 +303,9 @@ end}. {datatype, integer}, {validators, ["non_negative_integer"]} ]}. + +{mapping, "message_interceptor.incoming.rabbit_mqtt_message_interceptor_client_id.annotation_key", + "rabbit.incoming_message_interceptors", + [{datatype, string}, + {default, "x-opt-mqtt-client-id"}] +}. diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_message_interceptor_client_id.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_message_interceptor_client_id.erl new file mode 100644 index 000000000000..eda84589d920 --- /dev/null +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_message_interceptor_client_id.erl @@ -0,0 +1,17 @@ +-module(rabbit_mqtt_message_interceptor_client_id). + +-behaviour(rabbit_message_interceptor). + +-export([intercept/4]). + +intercept(Msg, + #{client_id := ClientId}, + incoming_message_interceptors, + #{annotation_key := AnnotationKey} + ) -> + rabbit_message_interceptor:set_msg_annotation(Msg, + AnnotationKey, + ClientId, + true); +intercept(Msg, _MsgInterceptorCtx, _Group, _Config) -> + Msg. diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl index 7ae0893a13eb..cce8499e7e93 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl @@ -5,7 +5,6 @@ %% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. %% -module(rabbit_mqtt_processor). - -feature(maybe_expr, enable). -export([info/2, init/4, process_packet/2, @@ -1635,10 +1634,13 @@ publish_to_queues( conn_name = ConnName, trace_state = TraceState}, auth_state = #auth_state{user = #user{username = Username}}} = State) -> + MsgInterceptorCtx = build_msg_interceptor_ctx(State), Anns = #{?ANN_EXCHANGE => ExchangeNameBin, ?ANN_ROUTING_KEYS => [mqtt_to_amqp(Topic)]}, Msg0 = mc:init(mc_mqtt, MqttMsg, Anns, mc_env()), - Msg = rabbit_message_interceptor:intercept(Msg0), + Msg = rabbit_message_interceptor:intercept(Msg0, + MsgInterceptorCtx, + incoming_message_interceptors), case rabbit_exchange:lookup(ExchangeName) of {ok, Exchange} -> QNames0 = rabbit_exchange:route(Exchange, Msg, #{return_binding_keys => true}), @@ -2607,3 +2609,15 @@ mc_env() -> MqttX -> #{mqtt_x => MqttX} end. + +build_msg_interceptor_ctx(#state{cfg = #cfg{client_id = ClientId, + conn_name = ConnName, + vhost = VHost, + proto_ver = ProtoVer + }, + auth_state = #auth_state{user = #user{username = Username}}}) -> + #{protocol => ProtoVer, + username => Username, + vhost => VHost, + conn_name => ConnName, + client_id => ClientId}. diff --git a/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl b/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl index 6aae9c152d78..1f151651f5a1 100644 --- a/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl @@ -1779,7 +1779,10 @@ default_queue_type(Config) -> incoming_message_interceptors(Config) -> Key = ?FUNCTION_NAME, - ok = rpc(Config, persistent_term, put, [Key, [{set_header_timestamp, false}]]), + ok = rpc(Config, + persistent_term, + put, + [Key, [{rabbit_message_interceptor_timestamp, #{overwrite => false}}]]), {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), Payload = ClientId = Topic = atom_to_binary(?FUNCTION_NAME), CQName = <<"my classic queue">>, From c7848fcca56c5638030ea88674ca7db0fb1b6b5c Mon Sep 17 00:00:00 2001 From: David Ansari Date: Tue, 15 Apr 2025 15:40:21 +0000 Subject: [PATCH 2/6] Improve message interceptors 1. Force the config for timestamp and routing node message interceptors to be configured with the overwrite boolean() to avoid defining multiple default values throughout the code. 2. Add type specs 3. Extend existing test case for new MQTT client ID interceptor 4. routing node and timestamp should only set the annotation for incoming_message_interceptors group 5. Fix `rabbitmq.conf`. Prior to this commit there were several issue: a.) Setting the right configuration was too user unfriendly, e.g. the user has to set ``` message_interceptor.incoming.rabbit_mqtt_message_interceptor_client_id.annotation_key = x-opt-mqtt-client-id ``` just to enable the MQTT message interceptor. b.) The code that parses was too difficult to understand c.) MQTT plugin was setting the env for app rabbit, which is an anti-pattern d.) disabling a plugin (e.g. MQTT), left its message interceptors still in place This is now all fixed, the user sets the rabbitmq.conf as follows: ``` message_interceptors.incoming.set_header_timestamp.overwrite = true message_interceptors.incoming.set_header_routing_node.overwrite = false mqtt.message_interceptors.incoming.set_client_id_annotation.enabled = true ``` Note that the first two lines use the same format as for RabbitMQ 4.0 for backwards compatiblity. The last line (MQTT) follows a similar pattern. --- deps/rabbit/priv/schema/rabbit.schema | 115 +++--------------- deps/rabbit/src/rabbit.erl | 7 +- deps/rabbit/src/rabbit_amqp_session.erl | 12 +- deps/rabbit/src/rabbit_channel.erl | 14 +-- .../rabbit/src/rabbit_message_interceptor.erl | 88 ++++++++------ ...abbit_message_interceptor_routing_node.erl | 20 ++- .../rabbit_message_interceptor_timestamp.erl | 29 +++-- .../config_schema_SUITE_data/rabbit.snippets | 52 +------- .../priv/schema/rabbitmq_mqtt.schema | 27 +++- deps/rabbitmq_mqtt/src/rabbit_mqtt.erl | 12 +- ...bit_mqtt_message_interceptor_client_id.erl | 25 ++-- .../src/rabbit_mqtt_processor.erl | 30 +++-- .../rabbitmq_mqtt.snippets | 19 ++- deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl | 23 ++-- 14 files changed, 220 insertions(+), 253 deletions(-) diff --git a/deps/rabbit/priv/schema/rabbit.schema b/deps/rabbit/priv/schema/rabbit.schema index 664ce02b38df..330de62707ed 100644 --- a/deps/rabbit/priv/schema/rabbit.schema +++ b/deps/rabbit/priv/schema/rabbit.schema @@ -2667,103 +2667,26 @@ end}. {mapping, "message_interceptors.incoming.$interceptor.overwrite", "rabbit.incoming_message_interceptors", [ {datatype, {enum, [true, false]}}]}. -% Pseudo-key to include the interceptor in the list of interceptors. -% - If any other configuration is provided for the interceptor this -% configuration is not required. -% - If no other configuration is provided, this one is required so that the -% interceptor gets invoked. -{mapping, "message_interceptors.incoming.$interceptor.enabled", "rabbit.incoming_message_interceptors", [ - {datatype, {enum, [true]}}]}. - -{mapping, "message_interceptors.outgoing.$interceptor.enabled", "rabbit.outgoing_message_interceptors", [ - {datatype, {enum, [true]}}]}. - -{mapping, - "message_interceptors.incoming.set_header_timestamp.overwrite", - "rabbit.incoming_message_interceptors", - [{datatype, {enum, [true, false]}}]}. -{mapping, - "message_interceptors.incoming.rabbit_message_interceptor_routing_node.overwrite", - "rabbit.incoming_message_interceptors", - [{datatype, {enum, [true, false]}}]}. - -{mapping, - "message_interceptors.incoming.set_header_routing_node.overwrite", - "rabbit.incoming_message_interceptors", - [{datatype, {enum, [true, false]}}]}. -{mapping, - "message_interceptors.incoming.rabbit_message_interceptor_timestamp.overwrite", - "rabbit.incoming_message_interceptors", - [{datatype, {enum, [true, false]}}]}. - {translation, "rabbit.incoming_message_interceptors", - fun(Conf) -> - case cuttlefish_variable:filter_by_prefix("message_interceptors.incoming", Conf) of - [] -> - cuttlefish:unset(); - L -> - InterceptorsConfig = [ - {Module0, Config, Value} - || {["message_interceptors", "incoming", Module0, Config], Value} <- L - ], - {Result, Order0} = lists:foldl( - fun({Interceptor0, Key0, Value}, {Acc, Order}) -> - Interceptor = list_to_atom(Interceptor0), - Key = list_to_atom(Key0), - MapPutFun = fun(Old) -> maps:put(Key, Value, Old) end, - % This Interceptor -> Module alias exists for - % compatibility reasons - Module = case Interceptor of - set_header_timestamp -> - rabbit_message_interceptor_timestamp; - set_header_routing_node -> - rabbit_message_interceptor_routing_node; - _ -> - Interceptor - end, - NewAcc = maps:update_with(Module, - MapPutFun, - #{Key => Value}, - Acc), - {NewAcc, [Module| Order]} - end, - {#{}, []}, - InterceptorsConfig - ), - Order = lists:uniq(Order0), - [{O, maps:without([enabled], maps:get(O, Result))} || O <- Order] - end - end -}. - -{translation, "rabbit.outgoing_message_interceptors", - fun(Conf) -> - case cuttlefish_variable:filter_by_prefix("message_interceptors.outgoing", Conf) of - [] -> - cuttlefish:unset(); - L -> - InterceptorsConfig = [ - {Module0, Config, Value} - || {["message_interceptors", "outgoing", Module0, Config], Value} <- L - ], - {Result, Order0} = lists:foldl( - fun({Interceptor0, Key0, Value}, {Acc, Order}) -> - Module = list_to_atom(Interceptor0), - Key = list_to_atom(Key0), - MapPutFun = fun(Old) -> maps:put(Key, Value, Old) end, - NewAcc = maps:update_with(Module, - MapPutFun, - #{Key => Value}, - Acc), - {NewAcc, [Module| Order]} - end, - {#{}, []}, - InterceptorsConfig - ), - Order = lists:uniq(Order0), - [{O, maps:without([enabled], maps:get(O, Result))} || O <- Order] - end - end + fun(Conf) -> + case cuttlefish_variable:filter_by_prefix("message_interceptors", Conf) of + [] -> + cuttlefish:unset(); + L -> + [begin + Interceptor = list_to_atom(Interceptor0), + Mod = case Interceptor of + set_header_timestamp -> + rabbit_message_interceptor_timestamp; + set_header_routing_node -> + rabbit_message_interceptor_routing_node; + _ -> + cuttlefish:invalid(io_lib:format("~p is invalid", [Interceptor])) + end, + {Mod, #{overwrite => Overwrite}} + end || {["message_interceptors", "incoming", Interceptor0, "overwrite"], Overwrite} <- L] + end + end }. {mapping, "stream.replication.port_range.min", "osiris.port_range", [ diff --git a/deps/rabbit/src/rabbit.erl b/deps/rabbit/src/rabbit.erl index fee70422b0b2..12b875898d13 100644 --- a/deps/rabbit/src/rabbit.erl +++ b/deps/rabbit/src/rabbit.erl @@ -1655,11 +1655,12 @@ persist_static_configuration() -> persist_static_configuration( [classic_queue_index_v2_segment_entry_count, classic_queue_store_v2_max_cache_size, - classic_queue_store_v2_check_crc32, - incoming_message_interceptors, - outgoing_message_interceptors + classic_queue_store_v2_check_crc32 ]), + Interceptors = application:get_env(?MODULE, incoming_message_interceptors, []), + ok = rabbit_message_interceptor:add(Interceptors, incoming_message_interceptors), + %% Disallow the following two cases: %% 1. Negative values %% 2. MoreCreditAfter greater than InitialCredit diff --git a/deps/rabbit/src/rabbit_amqp_session.erl b/deps/rabbit/src/rabbit_amqp_session.erl index 606c23aef211..78dcd5d2863f 100644 --- a/deps/rabbit/src/rabbit_amqp_session.erl +++ b/deps/rabbit/src/rabbit_amqp_session.erl @@ -284,7 +284,7 @@ max_incoming_window :: pos_integer(), max_link_credit :: pos_integer(), max_queue_credit :: pos_integer(), - msg_interceptor_ctx :: map() + msg_interceptor_ctx :: rabbit_message_interceptor:context() }). -record(state, { @@ -477,9 +477,9 @@ init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ContainerId, max_link_credit = MaxLinkCredit, max_queue_credit = MaxQueueCredit, msg_interceptor_ctx = #{protocol => ?PROTOCOL, - username => User#user.username, vhost => Vhost, - conn_name => ConnName} + username => User#user.username, + connection_name => ConnName} }}}. terminate(_Reason, #state{incoming_links = IncomingLinks, @@ -2442,12 +2442,12 @@ incoming_link_transfer( Mc0 = mc:init(mc_amqp, PayloadBin, #{}), case lookup_target(LinkExchange, LinkRKey, Mc0, Vhost, User, PermCache0) of {ok, X, RoutingKeys, Mc1, PermCache} -> + check_user_id(Mc1, User), + TopicPermCache = check_write_permitted_on_topics( + X, User, RoutingKeys, TopicPermCache0), Mc2 = rabbit_message_interceptor:intercept(Mc1, MsgInterceptorCtx, incoming_message_interceptors), - check_user_id(Mc2, User), - TopicPermCache = check_write_permitted_on_topics( - X, User, RoutingKeys, TopicPermCache0), QNames = rabbit_exchange:route(X, Mc2, #{return_binding_keys => true}), rabbit_trace:tap_in(Mc2, QNames, ConnName, ChannelNum, Username, Trace), Opts = #{correlation => {HandleInt, DeliveryId}}, diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index c188fd70bbd7..95375d6128c1 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -111,7 +111,7 @@ max_consumers, % taken from rabbit.consumer_max_per_channel %% defines how ofter gc will be executed writer_gc_threshold, - msg_interceptor_ctx + msg_interceptor_ctx :: rabbit_message_interceptor:context() }). -record(pending_ack, { @@ -493,6 +493,10 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, OptionalVariables = extract_variable_map_from_amqp_params(AmqpParams), {ok, GCThreshold} = application:get_env(rabbit, writer_gc_threshold), MaxConsumers = application:get_env(rabbit, consumer_max_per_channel, infinity), + MsgInterceptorCtx = #{protocol => amqp091, + vhost => VHost, + username => User#user.username, + connection_name => ConnName}, State = #ch{cfg = #conf{state = starting, protocol = Protocol, channel = Channel, @@ -511,11 +515,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, authz_context = OptionalVariables, max_consumers = MaxConsumers, writer_gc_threshold = GCThreshold, - msg_interceptor_ctx = #{protocol => amqp091, - username => User#user.username, - vhost => VHost, - conn_name => ConnName} - }, + msg_interceptor_ctx = MsgInterceptorCtx}, limiter = Limiter, tx = none, next_tag = 1, @@ -1213,10 +1213,10 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, rabbit_misc:precondition_failed("invalid message: ~tp", [Reason]); {ok, Message0} -> check_write_permitted_on_topics(Exchange, User, Message0, AuthzContext), + check_user_id_header(Message0, User), Message = rabbit_message_interceptor:intercept(Message0, MsgInterceptorCtx, incoming_message_interceptors), - check_user_id_header(Message, User), QNames = rabbit_exchange:route(Exchange, Message, #{return_binding_keys => true}), [deliver_reply(RK, Message) || {virtual_reply_queue, RK} <- QNames], Queues = rabbit_amqqueue:lookup_many(QNames), diff --git a/deps/rabbit/src/rabbit_message_interceptor.erl b/deps/rabbit/src/rabbit_message_interceptor.erl index b218c46955e8..ffa2ada580ae 100644 --- a/deps/rabbit/src/rabbit_message_interceptor.erl +++ b/deps/rabbit/src/rabbit_message_interceptor.erl @@ -1,49 +1,65 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. + -module(rabbit_message_interceptor). +%% client API -export([intercept/3, - set_msg_annotation/4]). + add/2, + remove/2]). +%% helpers for behaviour implementations +-export([set_annotation/4]). +%% same protocol names as output by Prometheus endpoint -type protocol() :: amqp091 | amqp10 | mqtt310 | mqtt311 | mqtt50. +-type context() :: #{protocol := protocol(), + vhost := rabbit_types:vhost(), + username := rabbit_types:username(), + connection_name := binary(), + atom() => term()}. +-type group() :: incoming_message_interceptors | + outgoing_message_interceptors. +-type config() :: #{atom() => term()}. +-type interceptor() :: {module(), config()}. + + +-export_type([context/0]). + +-callback intercept(mc:state(), context(), group(), config()) -> + mc:state(). --type msg_interceptor_ctx() :: #{protocol := protocol(), - vhost := binary(), - username := binary(), - conn_name => binary(), - atom() => term()}. - --callback intercept(Msg, MsgInterceptorCtx, Group, Config) -> Msg when - Msg :: mc:state(), - MsgInterceptorCtx :: msg_interceptor_ctx(), - Group :: incoming_message_interceptors | outgoing_message_interceptors, - Config :: #{atom() := term()}. - --spec intercept(Msg, MsgInterceptorCtx, Group) -> Msg when - Msg :: mc:state(), - MsgInterceptorCtx :: map(), - Group :: incoming_message_interceptors | outgoing_message_interceptors. -intercept(Msg, MsgInterceptorCtx, Group) -> +-spec intercept(mc:state(), context(), group()) -> + mc:state(). +intercept(Msg, Ctx, Group) -> Interceptors = persistent_term:get(Group, []), - lists:foldl(fun({Module, Config}, Msg0) -> - try - Module:intercept(Msg0, - MsgInterceptorCtx, - Group, - Config) - catch - error:undef -> - Msg0 - end - end, Msg , Interceptors). - --spec set_msg_annotation(mc:state(), - mc:ann_key(), - mc:ann_value(), - boolean() - ) -> mc:state(). -set_msg_annotation(Msg, Key, Value, Overwrite) -> + lists:foldl(fun({Mod, Config}, Msg0) -> + Mod:intercept(Msg0, Ctx, Group, Config) + end, Msg, Interceptors). + +-spec set_annotation(mc:state(), mc:ann_key(), mc:ann_value(), boolean()) -> + mc:state(). +set_annotation(Msg, Key, Value, Overwrite) -> case {mc:x_header(Key, Msg), Overwrite} of {Val, false} when Val =/= undefined -> Msg; _ -> mc:set_annotation(Key, Value, Msg) end. + +-spec add([interceptor()], group()) -> ok. +add(Interceptors, Group) -> + %% validation + lists:foreach(fun({Mod, #{}}) -> + case erlang:function_exported(Mod, intercept, 4) of + true -> ok; + false -> error(Mod) + end + end, Interceptors), + persistent_term:put(Group, persistent_term:get(Group, []) ++ Interceptors). + +-spec remove([interceptor()], group()) -> ok. +remove(Interceptors, Group) -> + persistent_term:put(Group, persistent_term:get(Group, []) -- Interceptors). diff --git a/deps/rabbit/src/rabbit_message_interceptor_routing_node.erl b/deps/rabbit/src/rabbit_message_interceptor_routing_node.erl index 1b3f384bf904..32434fb972b0 100644 --- a/deps/rabbit/src/rabbit_message_interceptor_routing_node.erl +++ b/deps/rabbit/src/rabbit_message_interceptor_routing_node.erl @@ -1,3 +1,9 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. + -module(rabbit_message_interceptor_routing_node). -behaviour(rabbit_message_interceptor). @@ -5,10 +11,12 @@ -export([intercept/4]). -intercept(Msg, _MsgInterceptorCtx, _Group, Config) -> +intercept(Msg, _Ctx, incoming_message_interceptors, Config) -> Node = atom_to_binary(node()), - Overwrite = maps:get(overwrite, Config, false), - rabbit_message_interceptor:set_msg_annotation(Msg, - ?HEADER_ROUTING_NODE, - Node, - Overwrite). + Overwrite = maps:get(overwrite, Config), + rabbit_message_interceptor:set_annotation(Msg, + ?HEADER_ROUTING_NODE, + Node, + Overwrite); +intercept(Msg, _Ctx, _Group, _Config) -> + Msg. diff --git a/deps/rabbit/src/rabbit_message_interceptor_timestamp.erl b/deps/rabbit/src/rabbit_message_interceptor_timestamp.erl index 058fd757f5ca..45f9622c29b2 100644 --- a/deps/rabbit/src/rabbit_message_interceptor_timestamp.erl +++ b/deps/rabbit/src/rabbit_message_interceptor_timestamp.erl @@ -1,3 +1,9 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. + -module(rabbit_message_interceptor_timestamp). -behaviour(rabbit_message_interceptor). @@ -7,20 +13,21 @@ -export([intercept/4]). -intercept(Msg0, _MsgInterceptorCtx, _Group, Config) -> +intercept(Msg0, _Ctx, incoming_message_interceptors, Config) -> Ts = mc:get_annotation(?ANN_RECEIVED_AT_TIMESTAMP, Msg0), - Overwrite = maps:get(overwrite, Config, false), - Msg = rabbit_message_interceptor:set_msg_annotation( - Msg0, - ?HEADER_TIMESTAMP, - Ts, - Overwrite), - set_msg_timestamp(Msg, Ts, Overwrite). + Overwrite = maps:get(overwrite, Config), + Msg = rabbit_message_interceptor:set_annotation(Msg0, + ?HEADER_TIMESTAMP, + Ts, + Overwrite), + set_timestamp(Msg, Ts, Overwrite); +intercept(Msg, _MsgInterceptorCtx, _Group, _Config) -> + Msg. -set_msg_timestamp(Msg, Timestamp, Overwrite) -> +set_timestamp(Msg, Ts, Overwrite) -> case {mc:timestamp(Msg), Overwrite} of - {Ts, false} when is_integer(Ts) -> + {ExistingTs, false} when is_integer(ExistingTs) -> Msg; _ -> - mc:set_annotation(?ANN_TIMESTAMP, Timestamp, Msg) + mc:set_annotation(?ANN_TIMESTAMP, Ts, Msg) end. diff --git a/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets b/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets index 49236c2bdcf3..d36e31d9178a 100644 --- a/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets +++ b/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets @@ -1111,16 +1111,16 @@ credential_validator.regexp = ^abc\\d+", %% Message interceptors %% - {message_interceptors, + {single_message_interceptor, "message_interceptors.incoming.set_header_timestamp.overwrite = true", [{rabbit, [ {incoming_message_interceptors, [ {rabbit_message_interceptor_timestamp, #{overwrite => true}} - ]} + ]} ]}], []}, - {message_interceptors, + {multiple_message_interceptors, " message_interceptors.incoming.set_header_routing_node.overwrite = false message_interceptors.incoming.set_header_timestamp.overwrite = false @@ -1129,51 +1129,7 @@ credential_validator.regexp = ^abc\\d+", {incoming_message_interceptors, [ {rabbit_message_interceptor_routing_node, #{overwrite => false}}, {rabbit_message_interceptor_timestamp, #{overwrite => false}} - ]} - ]}], - []}, - - % Enable key allows to configure interceptors with empty conf - {message_interceptors, - " - message_interceptors.incoming.set_header_routing_node.enabled = true - ", - [{rabbit, [ - {incoming_message_interceptors, [ - {rabbit_message_interceptor_routing_node, #{}} - ]} - ]}], - []}, - - % An interceptor can be configured twice, with different options, both in - % incoming and outgoing group of interceptors - {message_interceptors, - " - message_interceptors.incoming.rabbit_message_interceptor_routing_node.overwrite = true - message_interceptors.outgoing.rabbit_message_interceptor_routing_node.enabled = true - ", - [{rabbit, [ - {incoming_message_interceptors, [ - {rabbit_message_interceptor_routing_node, #{overwrite => true}} - ]}, - {outgoing_message_interceptors, [ - {rabbit_message_interceptor_routing_node, #{}} - ]} - ]}], - []}, - - % Given a parameter gets configured multiple times, last value prevails - {message_interceptors, - " - message_interceptors.incoming.set_header_routing_node.overwrite = true - message_interceptors.incoming.set_header_routing_node.overwrite = false - message_interceptors.incoming.set_header_routing_node.overwrite = true - message_interceptors.incoming.set_header_routing_node.overwrite = false - ", - [{rabbit, [ - {incoming_message_interceptors, [ - {rabbit_message_interceptor_routing_node, #{overwrite => false}} - ]} + ]} ]}], []}, diff --git a/deps/rabbitmq_mqtt/priv/schema/rabbitmq_mqtt.schema b/deps/rabbitmq_mqtt/priv/schema/rabbitmq_mqtt.schema index 89f15fed3ea7..140b11c67684 100644 --- a/deps/rabbitmq_mqtt/priv/schema/rabbitmq_mqtt.schema +++ b/deps/rabbitmq_mqtt/priv/schema/rabbitmq_mqtt.schema @@ -304,8 +304,27 @@ end}. {validators, ["non_negative_integer"]} ]}. -{mapping, "message_interceptor.incoming.rabbit_mqtt_message_interceptor_client_id.annotation_key", - "rabbit.incoming_message_interceptors", - [{datatype, string}, - {default, "x-opt-mqtt-client-id"}] +%% +%% Message interceptors +%% +{mapping, "mqtt.message_interceptors.incoming.set_client_id_annotation.enabled", "rabbitmq_mqtt.incoming_message_interceptors", [ + {datatype, {enum, [true, false]}}]}. + +{translation, "rabbitmq_mqtt.incoming_message_interceptors", + fun(Conf) -> + case cuttlefish_variable:filter_by_prefix("mqtt.message_interceptors", Conf) of + [] -> + cuttlefish:unset(); + L -> + [begin + Interceptor = list_to_atom(Interceptor0), + case Interceptor of + set_client_id_annotation -> + {rabbit_mqtt_message_interceptor_client_id, #{}}; + _ -> + cuttlefish:invalid(io_lib:format("~p is invalid", [Interceptor])) + end + end || {["mqtt", "message_interceptors", "incoming", Interceptor0, "enabled"], true} <- L] + end + end }. diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt.erl index 3ea308bb5f5b..4ea3ed0c3704 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt.erl @@ -35,7 +35,9 @@ start(normal, []) -> Result. stop(_) -> - rabbit_mqtt_sup:stop_listeners(). + rabbit_mqtt_sup:stop_listeners(), + rabbit_message_interceptor:remove(mqtt_incoming_message_interceptors(), + incoming_message_interceptors). -spec emit_connection_info_all([node()], rabbit_types:info_keys(), reference(), pid()) -> term(). emit_connection_info_all(Nodes, Items, Ref, AggregatorPid) -> @@ -115,9 +117,15 @@ persist_static_configuration() -> assert_valid_max_packet_size(MaxSizeAuth), {ok, MaxMsgSize} = application:get_env(rabbit, max_message_size), ?assert(MaxSizeAuth =< MaxMsgSize), - ok = persistent_term:put(?PERSISTENT_TERM_MAX_PACKET_SIZE_AUTHENTICATED, MaxSizeAuth). + ok = persistent_term:put(?PERSISTENT_TERM_MAX_PACKET_SIZE_AUTHENTICATED, MaxSizeAuth), + + ok = rabbit_message_interceptor:add(mqtt_incoming_message_interceptors(), + incoming_message_interceptors). assert_valid_max_packet_size(Val) -> ?assert(is_integer(Val) andalso Val > 0 andalso Val =< ?MAX_PACKET_SIZE). + +mqtt_incoming_message_interceptors() -> + application:get_env(?APP_NAME, incoming_message_interceptors, []). diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_message_interceptor_client_id.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_message_interceptor_client_id.erl index eda84589d920..ed442934bea2 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_message_interceptor_client_id.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_message_interceptor_client_id.erl @@ -1,17 +1,24 @@ --module(rabbit_mqtt_message_interceptor_client_id). +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +-module(rabbit_mqtt_message_interceptor_client_id). -behaviour(rabbit_message_interceptor). -export([intercept/4]). +-define(KEY, <<"x-opt-mqtt-client-id">>). + intercept(Msg, - #{client_id := ClientId}, + #{protocol := Proto, + client_id := ClientId}, incoming_message_interceptors, - #{annotation_key := AnnotationKey} - ) -> - rabbit_message_interceptor:set_msg_annotation(Msg, - AnnotationKey, - ClientId, - true); -intercept(Msg, _MsgInterceptorCtx, _Group, _Config) -> + _Config) + when Proto =:= mqtt50 orelse + Proto =:= mqtt311 orelse + Proto =:= mqtt310 -> + mc:set_annotation(?KEY, ClientId, Msg); +intercept(Msg, _Ctx, _Group, _Config) -> Msg. diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl index cce8499e7e93..6e43f4631252 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl @@ -1634,15 +1634,14 @@ publish_to_queues( conn_name = ConnName, trace_state = TraceState}, auth_state = #auth_state{user = #user{username = Username}}} = State) -> - MsgInterceptorCtx = build_msg_interceptor_ctx(State), Anns = #{?ANN_EXCHANGE => ExchangeNameBin, ?ANN_ROUTING_KEYS => [mqtt_to_amqp(Topic)]}, Msg0 = mc:init(mc_mqtt, MqttMsg, Anns, mc_env()), - Msg = rabbit_message_interceptor:intercept(Msg0, - MsgInterceptorCtx, - incoming_message_interceptors), case rabbit_exchange:lookup(ExchangeName) of {ok, Exchange} -> + Msg = rabbit_message_interceptor:intercept(Msg0, + msg_interceptor_ctx(State), + incoming_message_interceptors), QNames0 = rabbit_exchange:route(Exchange, Msg, #{return_binding_keys => true}), QNames = drop_local(QNames0, State), rabbit_trace:tap_in(Msg, QNames, ConnName, Username, TraceState), @@ -2539,6 +2538,17 @@ message_redelivered(_, _, _) -> is_success(ReasonCode) -> ReasonCode < ?RC_UNSPECIFIED_ERROR. +msg_interceptor_ctx(#state{cfg = #cfg{client_id = ClientId, + conn_name = ConnName, + vhost = VHost, + proto_ver = ProtoVer}, + auth_state = #auth_state{user = #user{username = Username}}}) -> + #{protocol => ProtoVer, + vhost => VHost, + username => Username, + connection_name => ConnName, + client_id => ClientId}. + -spec format_status(state()) -> map(). format_status( #state{queue_states = QState, @@ -2609,15 +2619,3 @@ mc_env() -> MqttX -> #{mqtt_x => MqttX} end. - -build_msg_interceptor_ctx(#state{cfg = #cfg{client_id = ClientId, - conn_name = ConnName, - vhost = VHost, - proto_ver = ProtoVer - }, - auth_state = #auth_state{user = #user{username = Username}}}) -> - #{protocol => ProtoVer, - username => Username, - vhost => VHost, - conn_name => ConnName, - client_id => ClientId}. diff --git a/deps/rabbitmq_mqtt/test/config_schema_SUITE_data/rabbitmq_mqtt.snippets b/deps/rabbitmq_mqtt/test/config_schema_SUITE_data/rabbitmq_mqtt.snippets index 92c1b2f29c7e..fe68793996d9 100644 --- a/deps/rabbitmq_mqtt/test/config_schema_SUITE_data/rabbitmq_mqtt.snippets +++ b/deps/rabbitmq_mqtt/test/config_schema_SUITE_data/rabbitmq_mqtt.snippets @@ -171,5 +171,22 @@ "mqtt.topic_alias_maximum = 0", [{rabbitmq_mqtt,[ {topic_alias_maximum, 0}]}], - [rabbitmq_mqtt]} + [rabbitmq_mqtt]}, + + {message_interceptor_enabled, + "mqtt.message_interceptors.incoming.set_client_id_annotation.enabled = true", + [{rabbitmq_mqtt, [ + {incoming_message_interceptors, [ + {rabbit_mqtt_message_interceptor_client_id, #{}} + ]} + ]}], + []}, + + {message_interceptor_disabled, + "mqtt.message_interceptors.incoming.set_client_id_annotation.enabled = false", + [{rabbitmq_mqtt, [ + {incoming_message_interceptors, []} + ]}], + []} + ]. diff --git a/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl b/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl index 1f151651f5a1..cb551e20916a 100644 --- a/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl @@ -1779,12 +1779,14 @@ default_queue_type(Config) -> incoming_message_interceptors(Config) -> Key = ?FUNCTION_NAME, - ok = rpc(Config, - persistent_term, - put, - [Key, [{rabbit_message_interceptor_timestamp, #{overwrite => false}}]]), + ok = rpc(Config, persistent_term, put, + [Key, [ + {rabbit_message_interceptor_timestamp, #{overwrite => false}}, + {rabbit_mqtt_message_interceptor_client_id, #{}} + ]]), {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), - Payload = ClientId = Topic = atom_to_binary(?FUNCTION_NAME), + Payload = Topic = atom_to_binary(?FUNCTION_NAME), + ClientId = <<"🆔"/utf8>>, CQName = <<"my classic queue">>, Stream = <<"my stream">>, declare_queue(Ch, CQName, [{<<"x-queue-type">>, longstr, <<"classic">>}]), @@ -1801,15 +1803,19 @@ incoming_message_interceptors(Config) -> #amqp_msg{payload = Payload, props = #'P_basic'{ timestamp = Secs, - headers = [{<<"timestamp_in_ms">>, long, Millis} | _] + headers = Headers }} } = amqp_channel:call(Ch, #'basic.get'{queue = CQName}), + {<<"timestamp_in_ms">>, long, Millis} = lists:keyfind(<<"timestamp_in_ms">>, 1, Headers), ?assert(Secs < NowSecs + 4), ?assert(Secs > NowSecs - 4), ?assert(Millis < NowMillis + 4000), ?assert(Millis > NowMillis - 4000), + ?assertEqual({<<"x-opt-mqtt-client-id">>, longstr, ClientId}, + lists:keyfind(<<"x-opt-mqtt-client-id">>, 1, Headers)), + #'basic.qos_ok'{} = amqp_channel:call(Ch, #'basic.qos'{prefetch_count = 1}), CTag = <<"my ctag">>, #'basic.consume_ok'{} = amqp_channel:subscribe( @@ -1822,9 +1828,10 @@ incoming_message_interceptors(Config) -> receive {#'basic.deliver'{consumer_tag = CTag}, #amqp_msg{payload = Payload, props = #'P_basic'{ - headers = [{<<"timestamp_in_ms">>, long, Millis} | _XHeaders] + headers = [{<<"timestamp_in_ms">>, long, Millis} | XHeaders] }}} -> - ok + ?assertEqual({<<"x-opt-mqtt-client-id">>, longstr, ClientId}, + lists:keyfind(<<"x-opt-mqtt-client-id">>, 1, XHeaders)) after ?TIMEOUT -> ct:fail(missing_deliver) end, From bca0e56af3c302c025c1f73348038966b0a246b0 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Thu, 17 Apr 2025 10:15:32 +0200 Subject: [PATCH 3/6] Support outgoing message interceptors --- deps/rabbit/Makefile | 2 +- deps/rabbit/ct.test.spec | 2 +- deps/rabbit/priv/schema/rabbit.schema | 53 ++++++++---- deps/rabbit/src/mc_amqpl.erl | 2 +- deps/rabbit/src/rabbit.erl | 4 +- deps/rabbit/src/rabbit_amqp_session.erl | 14 ++-- deps/rabbit/src/rabbit_channel.erl | 45 ++++++----- .../rabbit_message_interceptor_timestamp.erl | 33 -------- ...rceptor.erl => rabbit_msg_interceptor.erl} | 59 ++++++++------ ...> rabbit_msg_interceptor_routing_node.erl} | 15 ++-- .../src/rabbit_msg_interceptor_timestamp.erl | 38 +++++++++ deps/rabbit/test/amqp_client_SUITE.erl | 17 ++-- .../config_schema_SUITE_data/rabbit.snippets | 25 ++++-- deps/rabbit/test/mc_unit_SUITE.erl | 12 +-- ...E.erl => rabbit_msg_interceptor_SUITE.erl} | 80 +++++++++++++------ .../priv/schema/rabbitmq_mqtt.schema | 40 ++++++---- deps/rabbitmq_mqtt/src/rabbit_mqtt.erl | 10 +-- ...rabbit_mqtt_msg_interceptor_client_id.erl} | 12 +-- .../src/rabbit_mqtt_processor.erl | 9 ++- .../rabbitmq_mqtt.snippets | 6 +- deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl | 39 +++++---- .../test/web_mqtt_shared_SUITE.erl | 2 +- 22 files changed, 308 insertions(+), 211 deletions(-) delete mode 100644 deps/rabbit/src/rabbit_message_interceptor_timestamp.erl rename deps/rabbit/src/{rabbit_message_interceptor.erl => rabbit_msg_interceptor.erl} (50%) rename deps/rabbit/src/{rabbit_message_interceptor_routing_node.erl => rabbit_msg_interceptor_routing_node.erl} (50%) create mode 100644 deps/rabbit/src/rabbit_msg_interceptor_timestamp.erl rename deps/rabbit/test/{rabbit_message_interceptor_SUITE.erl => rabbit_msg_interceptor_SUITE.erl} (55%) rename deps/rabbitmq_mqtt/src/{rabbit_mqtt_message_interceptor_client_id.erl => rabbit_mqtt_msg_interceptor_client_id.erl} (67%) diff --git a/deps/rabbit/Makefile b/deps/rabbit/Makefile index 8326990d9e11..c57975f0cce9 100644 --- a/deps/rabbit/Makefile +++ b/deps/rabbit/Makefile @@ -273,7 +273,7 @@ PARALLEL_CT_SET_3_B = cluster_upgrade list_consumers_sanity_check list_queues_on PARALLEL_CT_SET_3_C = cli_forget_cluster_node feature_flags_v2 mc_unit message_containers_deaths_v2 message_size_limit metadata_store_migration PARALLEL_CT_SET_3_D = metadata_store_phase1 metrics mirrored_supervisor peer_discovery_classic_config proxy_protocol runtime_parameters unit_stats_and_metrics unit_supervisor2 unit_vm_memory_monitor -PARALLEL_CT_SET_4_A = clustering_events rabbit_local_random_exchange rabbit_message_interceptor rabbitmq_4_0_deprecations unit_pg_local unit_plugin_directories unit_plugin_versioning unit_policy_validators unit_priority_queue +PARALLEL_CT_SET_4_A = clustering_events rabbit_local_random_exchange rabbit_msg_interceptor rabbitmq_4_0_deprecations unit_pg_local unit_plugin_directories unit_plugin_versioning unit_policy_validators unit_priority_queue PARALLEL_CT_SET_4_B = per_user_connection_tracking per_vhost_connection_limit rabbit_fifo_dlx_integration rabbit_fifo_int PARALLEL_CT_SET_4_C = msg_size_metrics unit_msg_size_metrics per_vhost_msg_store per_vhost_queue_limit priority_queue upgrade_preparation vhost PARALLEL_CT_SET_4_D = per_user_connection_channel_tracking product_info publisher_confirms_parallel queue_type rabbitmq_queues_cli_integration rabbitmqctl_integration rabbitmqctl_shutdown routing rabbit_amqqueue diff --git a/deps/rabbit/ct.test.spec b/deps/rabbit/ct.test.spec index 62f63daff854..104f7f40bfda 100644 --- a/deps/rabbit/ct.test.spec +++ b/deps/rabbit/ct.test.spec @@ -115,7 +115,7 @@ , rabbit_fifo_prop_SUITE , rabbit_fifo_v0_SUITE , rabbit_local_random_exchange_SUITE -, rabbit_message_interceptor_SUITE +, rabbit_msg_interceptor_SUITE , rabbit_stream_coordinator_SUITE , rabbit_stream_sac_coordinator_SUITE , rabbitmq_4_0_deprecations_SUITE diff --git a/deps/rabbit/priv/schema/rabbit.schema b/deps/rabbit/priv/schema/rabbit.schema index 330de62707ed..ba20e864fdb3 100644 --- a/deps/rabbit/priv/schema/rabbit.schema +++ b/deps/rabbit/priv/schema/rabbit.schema @@ -2664,27 +2664,52 @@ end}. %% %% Message interceptors %% -{mapping, "message_interceptors.incoming.$interceptor.overwrite", "rabbit.incoming_message_interceptors", [ + +{mapping, "message_interceptors.$stage.$name.$key", "rabbit.message_interceptors", [ {datatype, {enum, [true, false]}}]}. -{translation, "rabbit.incoming_message_interceptors", +{translation, "rabbit.message_interceptors", fun(Conf) -> case cuttlefish_variable:filter_by_prefix("message_interceptors", Conf) of [] -> cuttlefish:unset(); L -> - [begin - Interceptor = list_to_atom(Interceptor0), - Mod = case Interceptor of - set_header_timestamp -> - rabbit_message_interceptor_timestamp; - set_header_routing_node -> - rabbit_message_interceptor_routing_node; - _ -> - cuttlefish:invalid(io_lib:format("~p is invalid", [Interceptor])) - end, - {Mod, #{overwrite => Overwrite}} - end || {["message_interceptors", "incoming", Interceptor0, "overwrite"], Overwrite} <- L] + lists:foldr( + fun({["message_interceptors", "incoming", "set_header_routing_node", "overwrite"], Overwrite}, Acc) + when is_boolean(Overwrite) -> + Mod = rabbit_msg_interceptor_routing_node, + Cfg = #{overwrite => Overwrite}, + [{Mod, Cfg} | Acc]; + ({["message_interceptors", "incoming", "set_header_timestamp", "overwrite"], Overwrite}, Acc) + when is_boolean(Overwrite) -> + Mod = rabbit_msg_interceptor_timestamp, + Cfg = #{incoming => true, + overwrite => Overwrite}, + case lists:keytake(Mod, 1, Acc) of + false -> + [{Mod, Cfg} | Acc]; + {value, {Mod, Cfg1}, Acc1} -> + Cfg2 = maps:merge(Cfg1, Cfg), + [{Mod, Cfg2} | Acc1] + end; + ({["message_interceptors", "outgoing", "timestamp", "enabled"], Enabled}, Acc) -> + case Enabled of + true -> + Mod = rabbit_msg_interceptor_timestamp, + Cfg = #{outgoing => true}, + case lists:keytake(Mod, 1, Acc) of + false -> + [{Mod, Cfg} | Acc]; + {value, {Mod, Cfg1}, Acc1} -> + Cfg2 = maps:merge(Cfg1, Cfg), + [{Mod, Cfg2} | Acc1] + end; + false -> + Acc + end; + (Other, _Acc) -> + cuttlefish:invalid(io_lib:format("~p is invalid", [Other])) + end, [], L) end end }. diff --git a/deps/rabbit/src/mc_amqpl.erl b/deps/rabbit/src/mc_amqpl.erl index cac190e2cb5e..37602df7fed7 100644 --- a/deps/rabbit/src/mc_amqpl.erl +++ b/deps/rabbit/src/mc_amqpl.erl @@ -462,7 +462,6 @@ protocol_state(#content{properties = #'P_basic'{headers = H00, priority = Priority0, delivery_mode = DeliveryMode0} = B0} = C, Anns) -> - %% Add any x- annotations as headers H0 = case H00 of undefined -> []; _ -> @@ -474,6 +473,7 @@ protocol_state(#content{properties = #'P_basic'{headers = H00, _ -> H0 end, + %% Add any x- annotations as headers Headers1 = maps:fold( fun (<<"x-", _/binary>> = Key, Val, H) when is_integer(Val) -> [{Key, long, Val} | H]; diff --git a/deps/rabbit/src/rabbit.erl b/deps/rabbit/src/rabbit.erl index 12b875898d13..20bd4765b2a3 100644 --- a/deps/rabbit/src/rabbit.erl +++ b/deps/rabbit/src/rabbit.erl @@ -1658,8 +1658,8 @@ persist_static_configuration() -> classic_queue_store_v2_check_crc32 ]), - Interceptors = application:get_env(?MODULE, incoming_message_interceptors, []), - ok = rabbit_message_interceptor:add(Interceptors, incoming_message_interceptors), + Interceptors = application:get_env(?MODULE, message_interceptors, []), + ok = rabbit_msg_interceptor:add(Interceptors), %% Disallow the following two cases: %% 1. Negative values diff --git a/deps/rabbit/src/rabbit_amqp_session.erl b/deps/rabbit/src/rabbit_amqp_session.erl index 78dcd5d2863f..9f841c22682c 100644 --- a/deps/rabbit/src/rabbit_amqp_session.erl +++ b/deps/rabbit/src/rabbit_amqp_session.erl @@ -284,7 +284,7 @@ max_incoming_window :: pos_integer(), max_link_credit :: pos_integer(), max_queue_credit :: pos_integer(), - msg_interceptor_ctx :: rabbit_message_interceptor:context() + msg_interceptor_ctx :: rabbit_msg_interceptor:context() }). -record(state, { @@ -2164,7 +2164,8 @@ handle_deliver(ConsumerTag, AckRequired, conn_name = ConnName, channel_num = ChannelNum, user = #user{username = Username}, - trace_state = Trace}}) -> + trace_state = Trace, + msg_interceptor_ctx = MsgIcptCtx}}) -> Handle = ctag_to_handle(ConsumerTag), case OutgoingLinks0 of #{Handle := #outgoing_link{queue_type = QType, @@ -2180,7 +2181,8 @@ handle_deliver(ConsumerTag, AckRequired, message_format = ?UINT(?MESSAGE_FORMAT), settled = SendSettled}, Mc1 = mc:convert(mc_amqp, Mc0), - Mc = mc:set_annotation(redelivered, Redelivered, Mc1), + Mc2 = mc:set_annotation(redelivered, Redelivered, Mc1), + Mc = rabbit_msg_interceptor:intercept_outgoing(Mc2, MsgIcptCtx), Sections = mc:protocol_state(Mc), validate_message_size(Sections, MaxMessageSize), Frames = transfer_frames(Transfer, Sections, MaxFrameSize), @@ -2417,7 +2419,7 @@ incoming_link_transfer( conn_name = ConnName, channel_num = ChannelNum, max_link_credit = MaxLinkCredit, - msg_interceptor_ctx = MsgInterceptorCtx}}) -> + msg_interceptor_ctx = MsgIcptCtx}}) -> {PayloadBin, DeliveryId, Settled} = case MultiTransfer of @@ -2445,9 +2447,7 @@ incoming_link_transfer( check_user_id(Mc1, User), TopicPermCache = check_write_permitted_on_topics( X, User, RoutingKeys, TopicPermCache0), - Mc2 = rabbit_message_interceptor:intercept(Mc1, - MsgInterceptorCtx, - incoming_message_interceptors), + Mc2 = rabbit_msg_interceptor:intercept_incoming(Mc1, MsgIcptCtx), QNames = rabbit_exchange:route(X, Mc2, #{return_binding_keys => true}), rabbit_trace:tap_in(Mc2, QNames, ConnName, ChannelNum, Username, Trace), Opts = #{correlation => {HandleInt, DeliveryId}}, diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index 95375d6128c1..1cbaa0a7f12e 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -111,7 +111,7 @@ max_consumers, % taken from rabbit.consumer_max_per_channel %% defines how ofter gc will be executed writer_gc_threshold, - msg_interceptor_ctx :: rabbit_message_interceptor:context() + msg_interceptor_ctx :: rabbit_msg_interceptor:context() }). -record(pending_ack, { @@ -662,13 +662,14 @@ handle_cast({deliver_reply, _K, _Del}, noreply(State); handle_cast({deliver_reply, _K, _Msg}, State = #ch{reply_consumer = none}) -> noreply(State); -handle_cast({deliver_reply, Key, Msg}, - State = #ch{cfg = #conf{writer_pid = WriterPid}, +handle_cast({deliver_reply, Key, Mc}, + State = #ch{cfg = #conf{writer_pid = WriterPid, + msg_interceptor_ctx = MsgIcptCtx}, next_tag = DeliveryTag, reply_consumer = {ConsumerTag, _Suffix, Key}}) -> - Content = mc:protocol_state(mc:convert(mc_amqpl, Msg)), - ExchName = mc:exchange(Msg), - [RoutingKey | _] = mc:routing_keys(Msg), + ExchName = mc:exchange(Mc), + [RoutingKey | _] = mc:routing_keys(Mc), + Content = outgoing_content(Mc, MsgIcptCtx), ok = rabbit_writer:send_command( WriterPid, #'basic.deliver'{consumer_tag = ConsumerTag, @@ -1174,7 +1175,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, trace_state = TraceState, authz_context = AuthzContext, writer_gc_threshold = GCThreshold, - msg_interceptor_ctx = MsgInterceptorCtx + msg_interceptor_ctx = MsgIcptCtx }, tx = Tx, confirm_enabled = ConfirmEnabled, @@ -1214,9 +1215,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, {ok, Message0} -> check_write_permitted_on_topics(Exchange, User, Message0, AuthzContext), check_user_id_header(Message0, User), - Message = rabbit_message_interceptor:intercept(Message0, - MsgInterceptorCtx, - incoming_message_interceptors), + Message = rabbit_msg_interceptor:intercept_incoming(Message0, MsgIcptCtx), QNames = rabbit_exchange:route(Exchange, Message, #{return_binding_keys => true}), [deliver_reply(RK, Message) || {virtual_reply_queue, RK} <- QNames], Queues = rabbit_amqqueue:lookup_many(QNames), @@ -2601,15 +2600,15 @@ handle_deliver(CTag, Ack, Msgs, State) when is_list(Msgs) -> end, State, Msgs). handle_deliver0(ConsumerTag, AckRequired, - {QName, QPid, _MsgId, Redelivered, MsgCont0} = Msg, + {QName, QPid, _MsgId, Redelivered, Mc} = Msg, State = #ch{cfg = #conf{writer_pid = WriterPid, - writer_gc_threshold = GCThreshold}, + writer_gc_threshold = GCThreshold, + msg_interceptor_ctx = MsgIcptCtx}, next_tag = DeliveryTag, queue_states = Qs}) -> - Exchange = mc:exchange(MsgCont0), - [RoutingKey | _] = mc:routing_keys(MsgCont0), - MsgCont = mc:convert(mc_amqpl, MsgCont0), - Content = mc:protocol_state(MsgCont), + Exchange = mc:exchange(Mc), + [RoutingKey | _] = mc:routing_keys(Mc), + Content = outgoing_content(Mc, MsgIcptCtx), Deliver = #'basic.deliver'{consumer_tag = ConsumerTag, delivery_tag = DeliveryTag, redelivered = Redelivered, @@ -2630,12 +2629,11 @@ handle_deliver0(ConsumerTag, AckRequired, record_sent(deliver, QueueType, ConsumerTag, AckRequired, Msg, State). handle_basic_get(WriterPid, DeliveryTag, NoAck, MessageCount, - Msg0 = {_QName, _QPid, _MsgId, Redelivered, MsgCont0}, + Msg0 = {_QName, _QPid, _MsgId, Redelivered, Mc}, QueueType, State) -> - Exchange = mc:exchange(MsgCont0), - [RoutingKey | _] = mc:routing_keys(MsgCont0), - MsgCont = mc:convert(mc_amqpl, MsgCont0), - Content = mc:protocol_state(MsgCont), + Exchange = mc:exchange(Mc), + [RoutingKey | _] = mc:routing_keys(Mc), + Content = outgoing_content(Mc, State#ch.cfg#conf.msg_interceptor_ctx), ok = rabbit_writer:send_command( WriterPid, #'basic.get_ok'{delivery_tag = DeliveryTag, @@ -2646,6 +2644,11 @@ handle_basic_get(WriterPid, DeliveryTag, NoAck, MessageCount, Content), {noreply, record_sent(get, QueueType, DeliveryTag, not(NoAck), Msg0, State)}. +outgoing_content(Mc, MsgIcptCtx) -> + Mc1 = mc:convert(mc_amqpl, Mc), + Mc2 = rabbit_msg_interceptor:intercept_outgoing(Mc1, MsgIcptCtx), + mc:protocol_state(Mc2). + init_tick_timer(State = #ch{tick_timer = undefined}) -> {ok, Interval} = application:get_env(rabbit, channel_tick_interval), State#ch{tick_timer = erlang:send_after(Interval, self(), tick)}; diff --git a/deps/rabbit/src/rabbit_message_interceptor_timestamp.erl b/deps/rabbit/src/rabbit_message_interceptor_timestamp.erl deleted file mode 100644 index 45f9622c29b2..000000000000 --- a/deps/rabbit/src/rabbit_message_interceptor_timestamp.erl +++ /dev/null @@ -1,33 +0,0 @@ -%% This Source Code Form is subject to the terms of the Mozilla Public -%% License, v. 2.0. If a copy of the MPL was not distributed with this -%% file, You can obtain one at https://mozilla.org/MPL/2.0/. -%% -%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. - --module(rabbit_message_interceptor_timestamp). --behaviour(rabbit_message_interceptor). - --include("mc.hrl"). - --define(HEADER_TIMESTAMP, <<"timestamp_in_ms">>). - --export([intercept/4]). - -intercept(Msg0, _Ctx, incoming_message_interceptors, Config) -> - Ts = mc:get_annotation(?ANN_RECEIVED_AT_TIMESTAMP, Msg0), - Overwrite = maps:get(overwrite, Config), - Msg = rabbit_message_interceptor:set_annotation(Msg0, - ?HEADER_TIMESTAMP, - Ts, - Overwrite), - set_timestamp(Msg, Ts, Overwrite); -intercept(Msg, _MsgInterceptorCtx, _Group, _Config) -> - Msg. - -set_timestamp(Msg, Ts, Overwrite) -> - case {mc:timestamp(Msg), Overwrite} of - {ExistingTs, false} when is_integer(ExistingTs) -> - Msg; - _ -> - mc:set_annotation(?ANN_TIMESTAMP, Ts, Msg) - end. diff --git a/deps/rabbit/src/rabbit_message_interceptor.erl b/deps/rabbit/src/rabbit_msg_interceptor.erl similarity index 50% rename from deps/rabbit/src/rabbit_message_interceptor.erl rename to deps/rabbit/src/rabbit_msg_interceptor.erl index ffa2ada580ae..89184458f2c8 100644 --- a/deps/rabbit/src/rabbit_message_interceptor.erl +++ b/deps/rabbit/src/rabbit_msg_interceptor.erl @@ -4,12 +4,13 @@ %% %% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. --module(rabbit_message_interceptor). +-module(rabbit_msg_interceptor). %% client API --export([intercept/3, - add/2, - remove/2]). +-export([intercept_incoming/2, + intercept_outgoing/2, + add/1, + remove/1]). %% helpers for behaviour implementations -export([set_annotation/4]). @@ -20,37 +21,49 @@ username := rabbit_types:username(), connection_name := binary(), atom() => term()}. --type group() :: incoming_message_interceptors | - outgoing_message_interceptors. -type config() :: #{atom() => term()}. -type interceptor() :: {module(), config()}. +-type interceptors() :: [interceptor()]. +-type stage() :: incoming | outgoing. +-define(KEY, message_interceptors). -export_type([context/0]). --callback intercept(mc:state(), context(), group(), config()) -> +-callback intercept(mc:state(), context(), stage(), config()) -> mc:state(). --spec intercept(mc:state(), context(), group()) -> +-spec intercept_incoming(mc:state(), context()) -> mc:state(). -intercept(Msg, Ctx, Group) -> - Interceptors = persistent_term:get(Group, []), +intercept_incoming(Msg, Ctx) -> + intercept(Msg, Ctx, incoming). + +-spec intercept_outgoing(mc:state(), context()) -> + mc:state(). +intercept_outgoing(Msg, Ctx) -> + intercept(Msg, Ctx, outgoing). + +intercept(Msg, Ctx, Stage) -> + Interceptors = persistent_term:get(?KEY), lists:foldl(fun({Mod, Config}, Msg0) -> - Mod:intercept(Msg0, Ctx, Group, Config) + Mod:intercept(Msg0, Ctx, Stage, Config) end, Msg, Interceptors). --spec set_annotation(mc:state(), mc:ann_key(), mc:ann_value(), boolean()) -> +-spec set_annotation(mc:state(), mc:ann_key(), mc:ann_value(), + Overwrite :: boolean()) -> mc:state(). -set_annotation(Msg, Key, Value, Overwrite) -> - case {mc:x_header(Key, Msg), Overwrite} of - {Val, false} when Val =/= undefined -> - Msg; +set_annotation(Msg, Key, Value, true) -> + mc:set_annotation(Key, Value, Msg); +set_annotation(Msg, Key, Value, false) -> + case mc:x_header(Key, Msg) of + undefined -> + mc:set_annotation(Key, Value, Msg); _ -> - mc:set_annotation(Key, Value, Msg) + Msg end. --spec add([interceptor()], group()) -> ok. -add(Interceptors, Group) -> +-spec add(interceptors()) -> ok. +add(Interceptors) -> %% validation lists:foreach(fun({Mod, #{}}) -> case erlang:function_exported(Mod, intercept, 4) of @@ -58,8 +71,8 @@ add(Interceptors, Group) -> false -> error(Mod) end end, Interceptors), - persistent_term:put(Group, persistent_term:get(Group, []) ++ Interceptors). + persistent_term:put(?KEY, persistent_term:get(?KEY, []) ++ Interceptors). --spec remove([interceptor()], group()) -> ok. -remove(Interceptors, Group) -> - persistent_term:put(Group, persistent_term:get(Group, []) -- Interceptors). +-spec remove(interceptors()) -> ok. +remove(Interceptors) -> + persistent_term:put(?KEY, persistent_term:get(?KEY, []) -- Interceptors). diff --git a/deps/rabbit/src/rabbit_message_interceptor_routing_node.erl b/deps/rabbit/src/rabbit_msg_interceptor_routing_node.erl similarity index 50% rename from deps/rabbit/src/rabbit_message_interceptor_routing_node.erl rename to deps/rabbit/src/rabbit_msg_interceptor_routing_node.erl index 32434fb972b0..443c6febd125 100644 --- a/deps/rabbit/src/rabbit_message_interceptor_routing_node.erl +++ b/deps/rabbit/src/rabbit_msg_interceptor_routing_node.erl @@ -4,19 +4,16 @@ %% %% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. --module(rabbit_message_interceptor_routing_node). --behaviour(rabbit_message_interceptor). +-module(rabbit_msg_interceptor_routing_node). +-behaviour(rabbit_msg_interceptor). --define(HEADER_ROUTING_NODE, <<"x-routed-by">>). +-define(KEY, <<"x-routed-by">>). -export([intercept/4]). -intercept(Msg, _Ctx, incoming_message_interceptors, Config) -> +intercept(Msg, _Ctx, incoming, Config) -> Node = atom_to_binary(node()), Overwrite = maps:get(overwrite, Config), - rabbit_message_interceptor:set_annotation(Msg, - ?HEADER_ROUTING_NODE, - Node, - Overwrite); -intercept(Msg, _Ctx, _Group, _Config) -> + rabbit_msg_interceptor:set_annotation(Msg, ?KEY, Node, Overwrite); +intercept(Msg, _Ctx, _Stage, _Config) -> Msg. diff --git a/deps/rabbit/src/rabbit_msg_interceptor_timestamp.erl b/deps/rabbit/src/rabbit_msg_interceptor_timestamp.erl new file mode 100644 index 000000000000..5d4e7080a2d2 --- /dev/null +++ b/deps/rabbit/src/rabbit_msg_interceptor_timestamp.erl @@ -0,0 +1,38 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. + +-module(rabbit_msg_interceptor_timestamp). +-behaviour(rabbit_msg_interceptor). + +-include("mc.hrl"). + +%% For backwards compat, we use the key defined in the old plugin +%% https://github.com/rabbitmq/rabbitmq-message-timestamp +-define(KEY_INCOMING, <<"timestamp_in_ms">>). +-define(KEY_OUTGOING, <<"x-opt-rabbitmq-sent-time">>). + +-export([intercept/4]). + +intercept(Msg0, _Ctx, incoming, #{incoming := _True} = Config) -> + Overwrite = maps:get(overwrite, Config), + Ts = mc:get_annotation(?ANN_RECEIVED_AT_TIMESTAMP, Msg0), + Msg = rabbit_msg_interceptor:set_annotation(Msg0, ?KEY_INCOMING, Ts, Overwrite), + set_timestamp(Msg, Ts, Overwrite); +intercept(Msg, _Ctx, outgoing, #{outgoing := _True}) -> + Ts = os:system_time(millisecond), + mc:set_annotation(?KEY_OUTGOING, Ts, Msg); +intercept(Msg, _MsgInterceptorCtx, _Stage, _Config) -> + Msg. + +set_timestamp(Msg, Ts, true) -> + mc:set_annotation(?ANN_TIMESTAMP, Ts, Msg); +set_timestamp(Msg, Ts, false) -> + case mc:timestamp(Msg) of + undefined -> + mc:set_annotation(?ANN_TIMESTAMP, Ts, Msg); + _ -> + Msg + end. diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index db060329f207..27a6f357d027 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -119,7 +119,7 @@ groups() -> available_messages_classic_queue, available_messages_quorum_queue, available_messages_stream, - incoming_message_interceptors, + message_interceptors, trace_classic_queue, trace_stream, user_id, @@ -4378,13 +4378,12 @@ available_messages(QType, Config) -> #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch). -incoming_message_interceptors(Config) -> - Key = ?FUNCTION_NAME, - ok = rpc(Config, - persistent_term, - put, - [Key, [{rabbit_message_interceptor_routing_node, #{overwrite => false}}, - {rabbit_message_interceptor_timestamp, #{overwrite => false}}]]), +message_interceptors(Config) -> + Key = message_interceptors, + ok = rpc(Config, persistent_term, put, + [Key, [{rabbit_msg_interceptor_routing_node, #{overwrite => false}}, + {rabbit_msg_interceptor_timestamp, #{overwrite => false, + incoming => true}}]]), Stream = <<"my stream">>, QQName = <<"my quorum queue">>, {_, Session, LinkPair} = Init = init(Config), @@ -4431,7 +4430,7 @@ incoming_message_interceptors(Config) -> {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, Stream), {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QQName), ok = close(Init), - true = rpc(Config, persistent_term, erase, [Key]). + ok = rpc(Config, persistent_term, put, [Key, []]). trace_classic_queue(Config) -> trace(atom_to_binary(?FUNCTION_NAME), <<"classic">>, Config). diff --git a/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets b/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets index d36e31d9178a..b908b0786a87 100644 --- a/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets +++ b/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets @@ -1111,11 +1111,21 @@ credential_validator.regexp = ^abc\\d+", %% Message interceptors %% - {single_message_interceptor, + {single_incoming_message_interceptor, "message_interceptors.incoming.set_header_timestamp.overwrite = true", [{rabbit, [ - {incoming_message_interceptors, [ - {rabbit_message_interceptor_timestamp, #{overwrite => true}} + {message_interceptors, [ + {rabbit_msg_interceptor_timestamp, #{incoming => true, + overwrite => true}} + ]} + ]}], + []}, + + {single_outgoing_message_interceptor, + "message_interceptors.outgoing.timestamp.enabled = true", + [{rabbit, [ + {message_interceptors, [ + {rabbit_msg_interceptor_timestamp, #{outgoing => true}} ]} ]}], []}, @@ -1124,11 +1134,14 @@ credential_validator.regexp = ^abc\\d+", " message_interceptors.incoming.set_header_routing_node.overwrite = false message_interceptors.incoming.set_header_timestamp.overwrite = false + message_interceptors.outgoing.timestamp.enabled = true ", [{rabbit, [ - {incoming_message_interceptors, [ - {rabbit_message_interceptor_routing_node, #{overwrite => false}}, - {rabbit_message_interceptor_timestamp, #{overwrite => false}} + {message_interceptors, [ + {rabbit_msg_interceptor_routing_node, #{overwrite => false}}, + {rabbit_msg_interceptor_timestamp, #{incoming => true, + overwrite => false, + outgoing => true}} ]} ]}], []}, diff --git a/deps/rabbit/test/mc_unit_SUITE.erl b/deps/rabbit/test/mc_unit_SUITE.erl index 00d73d719d88..3d9c9954cb78 100644 --- a/deps/rabbit/test/mc_unit_SUITE.erl +++ b/deps/rabbit/test/mc_unit_SUITE.erl @@ -347,11 +347,11 @@ amqpl_amqp_bin_amqpl(_Config) -> payload_fragments_rev = [<<"data">>]}, Msg0 = mc:init(mc_amqpl, Content, annotations()), - ok = persistent_term:put(incoming_message_interceptors, - [{rabbit_message_interceptor_timestamp, #{overwrite => false}}]), - Msg = rabbit_message_interceptor:intercept(Msg0, - #{}, - incoming_message_interceptors), + ok = persistent_term:put( + message_interceptors, + [{rabbit_msg_interceptor_timestamp, #{incoming => true, + overwrite => false}}]), + Msg = rabbit_msg_interceptor:intercept_incoming(Msg0, #{}), ?assertEqual(<<"exch">>, mc:exchange(Msg)), ?assertEqual([<<"apple">>], mc:routing_keys(Msg)), @@ -452,7 +452,7 @@ amqpl_amqp_bin_amqpl(_Config) -> ?assertEqual(RoutingHeaders, maps:remove(<<"timestamp_in_ms">>, RoutingHeaders2)), - true = persistent_term:erase(incoming_message_interceptors). + ok = persistent_term:put(message_interceptors, []). amqpl_cc_amqp_bin_amqpl(_Config) -> Headers = [{<<"CC">>, array, [{longstr, <<"q1">>}, diff --git a/deps/rabbit/test/rabbit_message_interceptor_SUITE.erl b/deps/rabbit/test/rabbit_msg_interceptor_SUITE.erl similarity index 55% rename from deps/rabbit/test/rabbit_message_interceptor_SUITE.erl rename to deps/rabbit/test/rabbit_msg_interceptor_SUITE.erl index 37183408e68f..500be0d61383 100644 --- a/deps/rabbit/test/rabbit_message_interceptor_SUITE.erl +++ b/deps/rabbit/test/rabbit_msg_interceptor_SUITE.erl @@ -4,7 +4,7 @@ %% %% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. --module(rabbit_message_interceptor_SUITE). +-module(rabbit_msg_interceptor_SUITE). -include_lib("eunit/include/eunit.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). @@ -15,17 +15,19 @@ all() -> [ - {group, tests} + {group, cluster_size_1} ]. groups() -> [ - {tests, [shuffle], [headers_overwrite, - headers_no_overwrite - ]} + {cluster_size_1, [shuffle], + [incoming_overwrite, + incoming_no_overwrite, + outgoing]} ]. init_per_suite(Config) -> + {ok, _} = application:ensure_all_started(rabbitmq_amqp_client), rabbit_ct_helpers:log_environment(), rabbit_ct_helpers:run_setup_steps(Config). @@ -35,16 +37,20 @@ end_per_suite(Config) -> init_per_testcase(Testcase, Config0) -> Config1 = rabbit_ct_helpers:set_config( Config0, [{rmq_nodename_suffix, Testcase}]), - Overwrite = case Testcase of - headers_overwrite -> true; - headers_no_overwrite -> false - end, - Val = maps:to_list( - maps:from_keys([rabbit_message_interceptor_timestamp, - rabbit_message_interceptor_routing_node], - #{overwrite => Overwrite})), + Val = case Testcase of + incoming_overwrite -> + [{rabbit_msg_interceptor_routing_node, #{overwrite => true}}, + {rabbit_msg_interceptor_timestamp, #{incoming => true, + overwrite => true}}]; + incoming_no_overwrite -> + [{rabbit_msg_interceptor_routing_node, #{overwrite => false}}, + {rabbit_msg_interceptor_timestamp, #{incoming => true, + overwrite => false}}]; + outgoing -> + [{rabbit_msg_interceptor_timestamp, #{outgoing => true}}] + end, Config = rabbit_ct_helpers:merge_app_env( - Config1, {rabbit, [{incoming_message_interceptors, Val}]}), + Config1, {rabbit, [{message_interceptors, Val}]}), rabbit_ct_helpers:run_steps( Config, rabbit_ct_broker_helpers:setup_steps() ++ @@ -57,13 +63,13 @@ end_per_testcase(Testcase, Config0) -> rabbit_ct_client_helpers:teardown_steps() ++ rabbit_ct_broker_helpers:teardown_steps()). -headers_overwrite(Config) -> - headers(true, Config). +incoming_overwrite(Config) -> + incoming(true, Config). -headers_no_overwrite(Config) -> - headers(false, Config). +incoming_no_overwrite(Config) -> + incoming(false, Config). -headers(Overwrite, Config) -> +incoming(Overwrite, Config) -> Server = atom_to_binary(rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename)), Payload = QName = atom_to_binary(?FUNCTION_NAME), Ch = rabbit_ct_client_helpers:open_channel(Config), @@ -80,13 +86,13 @@ headers(Overwrite, Config) -> #amqp_msg{payload = Payload, props = #'P_basic'{ timestamp = Secs, - headers = [{<<"timestamp_in_ms">>, long, Ms}, + headers = [{<<"timestamp_in_ms">>, long, ReceivedMs}, {<<"x-routed-by">>, longstr, Server}] }}} - when Ms < NowMs + 4000 andalso - Ms > NowMs - 4000 andalso - Secs < NowSecs + 4 andalso - Secs > NowSecs - 4, + when ReceivedMs < NowMs + 5000 andalso + ReceivedMs > NowMs - 5000 andalso + Secs < NowSecs + 5 andalso + Secs > NowSecs - 5, amqp_channel:call(Ch, #'basic.get'{queue = QName}))) end, AssertHeaders(), @@ -110,3 +116,29 @@ headers(Overwrite, Config) -> #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), ok. + +outgoing(Config) -> + QName = atom_to_binary(?FUNCTION_NAME), + Address = rabbitmq_amqp_address:queue(QName), + {_, Session, LinkPair} = Init = amqp_utils:init(Config), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{}), + {ok, Receiver} = amqp10_client:attach_receiver_link( + Session, <<"receiver">>, Address, settled), + {ok, Sender} = amqp10_client:attach_sender_link_sync( + Session, <<"sender">>, Address, settled), + ok = amqp_utils:wait_for_credit(Sender), + + Now = os:system_time(millisecond), + ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag">>, <<"msg">>, true)), + + {ok, Msg} = amqp10_client:get_msg(Receiver), + #{<<"x-opt-rabbitmq-sent-time">> := Sent} = amqp10_msg:message_annotations(Msg), + ct:pal("client sent message at ~b~nRabbitMQ sent message at ~b", + [Now, Sent]), + ?assert(Sent > Now - 5000), + ?assert(Sent < Now + 5000), + + ok = amqp10_client:detach_link(Sender), + ok = amqp10_client:detach_link(Receiver), + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), + ok = amqp_utils:close(Init). diff --git a/deps/rabbitmq_mqtt/priv/schema/rabbitmq_mqtt.schema b/deps/rabbitmq_mqtt/priv/schema/rabbitmq_mqtt.schema index 140b11c67684..1be98c757edf 100644 --- a/deps/rabbitmq_mqtt/priv/schema/rabbitmq_mqtt.schema +++ b/deps/rabbitmq_mqtt/priv/schema/rabbitmq_mqtt.schema @@ -307,24 +307,30 @@ end}. %% %% Message interceptors %% -{mapping, "mqtt.message_interceptors.incoming.set_client_id_annotation.enabled", "rabbitmq_mqtt.incoming_message_interceptors", [ - {datatype, {enum, [true, false]}}]}. -{translation, "rabbitmq_mqtt.incoming_message_interceptors", +{mapping, "mqtt.message_interceptors.$stage.$name.$key", "rabbitmq_mqtt.message_interceptors", [ + {datatype, {enum, [true, false]}} +]}. + +{translation, "rabbitmq_mqtt.message_interceptors", fun(Conf) -> - case cuttlefish_variable:filter_by_prefix("mqtt.message_interceptors", Conf) of - [] -> - cuttlefish:unset(); - L -> - [begin - Interceptor = list_to_atom(Interceptor0), - case Interceptor of - set_client_id_annotation -> - {rabbit_mqtt_message_interceptor_client_id, #{}}; - _ -> - cuttlefish:invalid(io_lib:format("~p is invalid", [Interceptor])) - end - end || {["mqtt", "message_interceptors", "incoming", Interceptor0, "enabled"], true} <- L] - end + case cuttlefish_variable:filter_by_prefix("mqtt.message_interceptors", Conf) of + [] -> + cuttlefish:unset(); + L -> + lists:foldr( + fun({["mqtt", "message_interceptors", "incoming", "set_client_id_annotation", "enabled"], Enabled}, Acc) -> + case Enabled of + true -> + Mod = rabbit_mqtt_msg_interceptor_client_id, + Cfg = #{}, + [{Mod, Cfg} | Acc]; + false -> + Acc + end; + (Other, _Acc) -> + cuttlefish:invalid(io_lib:format("~p is invalid", [Other])) + end, [], L) + end end }. diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt.erl index 4ea3ed0c3704..8ecbd85b66ab 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt.erl @@ -36,8 +36,7 @@ start(normal, []) -> stop(_) -> rabbit_mqtt_sup:stop_listeners(), - rabbit_message_interceptor:remove(mqtt_incoming_message_interceptors(), - incoming_message_interceptors). + rabbit_msg_interceptor:remove(mqtt_message_interceptors()). -spec emit_connection_info_all([node()], rabbit_types:info_keys(), reference(), pid()) -> term(). emit_connection_info_all(Nodes, Items, Ref, AggregatorPid) -> @@ -119,13 +118,12 @@ persist_static_configuration() -> ?assert(MaxSizeAuth =< MaxMsgSize), ok = persistent_term:put(?PERSISTENT_TERM_MAX_PACKET_SIZE_AUTHENTICATED, MaxSizeAuth), - ok = rabbit_message_interceptor:add(mqtt_incoming_message_interceptors(), - incoming_message_interceptors). + ok = rabbit_msg_interceptor:add(mqtt_message_interceptors()). assert_valid_max_packet_size(Val) -> ?assert(is_integer(Val) andalso Val > 0 andalso Val =< ?MAX_PACKET_SIZE). -mqtt_incoming_message_interceptors() -> - application:get_env(?APP_NAME, incoming_message_interceptors, []). +mqtt_message_interceptors() -> + application:get_env(?APP_NAME, message_interceptors, []). diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_message_interceptor_client_id.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_msg_interceptor_client_id.erl similarity index 67% rename from deps/rabbitmq_mqtt/src/rabbit_mqtt_message_interceptor_client_id.erl rename to deps/rabbitmq_mqtt/src/rabbit_mqtt_msg_interceptor_client_id.erl index ed442934bea2..e4302c297a9e 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_message_interceptor_client_id.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_msg_interceptor_client_id.erl @@ -4,21 +4,17 @@ %% %% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. --module(rabbit_mqtt_message_interceptor_client_id). --behaviour(rabbit_message_interceptor). +-module(rabbit_mqtt_msg_interceptor_client_id). +-behaviour(rabbit_msg_interceptor). -export([intercept/4]). -define(KEY, <<"x-opt-mqtt-client-id">>). -intercept(Msg, - #{protocol := Proto, - client_id := ClientId}, - incoming_message_interceptors, - _Config) +intercept(Msg, #{protocol := Proto, client_id := ClientId}, incoming, _Cfg) when Proto =:= mqtt50 orelse Proto =:= mqtt311 orelse Proto =:= mqtt310 -> mc:set_annotation(?KEY, ClientId, Msg); -intercept(Msg, _Ctx, _Group, _Config) -> +intercept(Msg, _Ctx, _Stage, _Config) -> Msg. diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl index 6e43f4631252..d62a12ba5a2d 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl @@ -1639,9 +1639,8 @@ publish_to_queues( Msg0 = mc:init(mc_mqtt, MqttMsg, Anns, mc_env()), case rabbit_exchange:lookup(ExchangeName) of {ok, Exchange} -> - Msg = rabbit_message_interceptor:intercept(Msg0, - msg_interceptor_ctx(State), - incoming_message_interceptors), + Ctx = msg_interceptor_ctx(State), + Msg = rabbit_msg_interceptor:intercept_incoming(Msg0, Ctx), QNames0 = rabbit_exchange:route(Exchange, Msg, #{return_binding_keys => true}), QNames = drop_local(QNames0, State), rabbit_trace:tap_in(Msg, QNames, ConnName, Username, TraceState), @@ -2072,7 +2071,9 @@ deliver_one_to_client({QNameOrType, QPid, QMsgId, _Redelivered, Mc} = Delivery, true -> ?QOS_1; false -> ?QOS_0 end, - McMqtt = mc:convert(mc_mqtt, Mc, mc_env()), + McMqtt0 = mc:convert(mc_mqtt, Mc, mc_env()), + MsgIcptCtx = msg_interceptor_ctx(State0), + McMqtt = rabbit_msg_interceptor:intercept_outgoing(McMqtt0, MsgIcptCtx), MqttMsg = #mqtt_msg{qos = PublisherQos} = mc:protocol_state(McMqtt), QoS = effective_qos(PublisherQos, SubscriberQoS), {SettleOp, State1} = maybe_publish_to_client(MqttMsg, Delivery, QoS, State0), diff --git a/deps/rabbitmq_mqtt/test/config_schema_SUITE_data/rabbitmq_mqtt.snippets b/deps/rabbitmq_mqtt/test/config_schema_SUITE_data/rabbitmq_mqtt.snippets index fe68793996d9..a1af02451cd3 100644 --- a/deps/rabbitmq_mqtt/test/config_schema_SUITE_data/rabbitmq_mqtt.snippets +++ b/deps/rabbitmq_mqtt/test/config_schema_SUITE_data/rabbitmq_mqtt.snippets @@ -176,8 +176,8 @@ {message_interceptor_enabled, "mqtt.message_interceptors.incoming.set_client_id_annotation.enabled = true", [{rabbitmq_mqtt, [ - {incoming_message_interceptors, [ - {rabbit_mqtt_message_interceptor_client_id, #{}} + {message_interceptors, [ + {rabbit_mqtt_msg_interceptor_client_id, #{}} ]} ]}], []}, @@ -185,7 +185,7 @@ {message_interceptor_disabled, "mqtt.message_interceptors.incoming.set_client_id_annotation.enabled = false", [{rabbitmq_mqtt, [ - {incoming_message_interceptors, []} + {message_interceptors, []} ]}], []} diff --git a/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl b/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl index cb551e20916a..09bae18c37fe 100644 --- a/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl @@ -120,7 +120,7 @@ cluster_size_1_tests() -> ,max_packet_size_unauthenticated ,max_packet_size_authenticated ,default_queue_type - ,incoming_message_interceptors + ,message_interceptors ,utf8 ,retained_message_conversion ,bind_exchange_to_exchange @@ -1777,13 +1777,15 @@ default_queue_type(Config) -> ok = emqtt:disconnect(C2), ok = rabbit_ct_broker_helpers:delete_vhost(Config, Vhost). -incoming_message_interceptors(Config) -> - Key = ?FUNCTION_NAME, +message_interceptors(Config) -> ok = rpc(Config, persistent_term, put, - [Key, [ - {rabbit_message_interceptor_timestamp, #{overwrite => false}}, - {rabbit_mqtt_message_interceptor_client_id, #{}} - ]]), + [message_interceptors, + [ + {rabbit_mqtt_msg_interceptor_client_id, #{}}, + {rabbit_msg_interceptor_timestamp, #{overwrite => false, + incoming => true, + outgoing => true}} + ]]), {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), Payload = Topic = atom_to_binary(?FUNCTION_NAME), ClientId = <<"🆔"/utf8>>, @@ -1807,11 +1809,14 @@ incoming_message_interceptors(Config) -> }} } = amqp_channel:call(Ch, #'basic.get'{queue = CQName}), - {<<"timestamp_in_ms">>, long, Millis} = lists:keyfind(<<"timestamp_in_ms">>, 1, Headers), - ?assert(Secs < NowSecs + 4), - ?assert(Secs > NowSecs - 4), - ?assert(Millis < NowMillis + 4000), - ?assert(Millis > NowMillis - 4000), + {_, long, ReceivedTs} = lists:keyfind(<<"timestamp_in_ms">>, 1, Headers), + ?assert(Secs < NowSecs + 9), + ?assert(Secs > NowSecs - 9), + ?assert(ReceivedTs < NowMillis + 9000), + ?assert(ReceivedTs > NowMillis - 9000), + {_, long, SentTs} = lists:keyfind(<<"x-opt-rabbitmq-sent-time">>, 1, Headers), + ?assert(SentTs < NowMillis + 9000), + ?assert(SentTs > NowMillis - 9000), ?assertEqual({<<"x-opt-mqtt-client-id">>, longstr, ClientId}, lists:keyfind(<<"x-opt-mqtt-client-id">>, 1, Headers)), @@ -1828,16 +1833,20 @@ incoming_message_interceptors(Config) -> receive {#'basic.deliver'{consumer_tag = CTag}, #amqp_msg{payload = Payload, props = #'P_basic'{ - headers = [{<<"timestamp_in_ms">>, long, Millis} | XHeaders] + headers = [{<<"timestamp_in_ms">>, long, ReceivedTs} | XHeaders] }}} -> ?assertEqual({<<"x-opt-mqtt-client-id">>, longstr, ClientId}, - lists:keyfind(<<"x-opt-mqtt-client-id">>, 1, XHeaders)) + lists:keyfind(<<"x-opt-mqtt-client-id">>, 1, XHeaders)), + + {_, long, SentTs1} = lists:keyfind(<<"x-opt-rabbitmq-sent-time">>, 1, XHeaders), + ?assert(SentTs1 < NowMillis + 9000), + ?assert(SentTs1 > NowMillis - 9000) after ?TIMEOUT -> ct:fail(missing_deliver) end, delete_queue(Ch, Stream), delete_queue(Ch, CQName), - true = rpc(Config, persistent_term, erase, [Key]), + ok = rpc(Config, persistent_term, put, [message_interceptors, []]), ok = emqtt:disconnect(C), ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch). diff --git a/deps/rabbitmq_web_mqtt/test/web_mqtt_shared_SUITE.erl b/deps/rabbitmq_web_mqtt/test/web_mqtt_shared_SUITE.erl index 693345dc4cec..bbe37b56a9c7 100644 --- a/deps/rabbitmq_web_mqtt/test/web_mqtt_shared_SUITE.erl +++ b/deps/rabbitmq_web_mqtt/test/web_mqtt_shared_SUITE.erl @@ -79,7 +79,7 @@ trace_large_message(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config). max_packet_size_unauthenticated(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config). max_packet_size_authenticated(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config). default_queue_type(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config). -incoming_message_interceptors(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config). +message_interceptors(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config). utf8(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config). retained_message_conversion(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config). bind_exchange_to_exchange(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config). From f0745592e268d2d1987bfffd3113848154d05430 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Fri, 18 Apr 2025 10:38:04 +0200 Subject: [PATCH 4/6] Store message interceptor context in MQTT proc state It's a tradeoff between building the map for each incoming and outgoing message (now that there are also outgoing interceptors) vs increased memory usage for the MQTT proc state. Connecting with MQTT 5.0 and client ID "xxxxxxxx", the number of words are 201 before this commit vs 235 after this commit as determined by: ``` S = sys:get_state(MQTTConnectionPid), erts_debug:size(S). ``` Therefore, this commit requires 34 word * 8 bytes = 272 bytes more per MQTT connection, that is 272 MB more for 1,000,000 MQTT connections. --- deps/rabbit/src/rabbit_channel.erl | 10 +++--- deps/rabbit/src/rabbit_msg_interceptor.erl | 4 +-- .../rabbit_msg_interceptor_routing_node.erl | 6 ++-- .../src/rabbit_msg_interceptor_timestamp.erl | 6 ++-- .../rabbit_mqtt_msg_interceptor_client_id.erl | 2 +- .../src/rabbit_mqtt_processor.erl | 35 +++++++++---------- 6 files changed, 30 insertions(+), 33 deletions(-) diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index 1cbaa0a7f12e..6eb438d2f7e9 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -493,10 +493,10 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, OptionalVariables = extract_variable_map_from_amqp_params(AmqpParams), {ok, GCThreshold} = application:get_env(rabbit, writer_gc_threshold), MaxConsumers = application:get_env(rabbit, consumer_max_per_channel, infinity), - MsgInterceptorCtx = #{protocol => amqp091, - vhost => VHost, - username => User#user.username, - connection_name => ConnName}, + MsgIcptCtx = #{protocol => amqp091, + vhost => VHost, + username => User#user.username, + connection_name => ConnName}, State = #ch{cfg = #conf{state = starting, protocol = Protocol, channel = Channel, @@ -515,7 +515,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, authz_context = OptionalVariables, max_consumers = MaxConsumers, writer_gc_threshold = GCThreshold, - msg_interceptor_ctx = MsgInterceptorCtx}, + msg_interceptor_ctx = MsgIcptCtx}, limiter = Limiter, tx = none, next_tag = 1, diff --git a/deps/rabbit/src/rabbit_msg_interceptor.erl b/deps/rabbit/src/rabbit_msg_interceptor.erl index 89184458f2c8..3854a838591f 100644 --- a/deps/rabbit/src/rabbit_msg_interceptor.erl +++ b/deps/rabbit/src/rabbit_msg_interceptor.erl @@ -45,8 +45,8 @@ intercept_outgoing(Msg, Ctx) -> intercept(Msg, Ctx, Stage) -> Interceptors = persistent_term:get(?KEY), - lists:foldl(fun({Mod, Config}, Msg0) -> - Mod:intercept(Msg0, Ctx, Stage, Config) + lists:foldl(fun({Mod, Cfg}, Msg0) -> + Mod:intercept(Msg0, Ctx, Stage, Cfg) end, Msg, Interceptors). -spec set_annotation(mc:state(), mc:ann_key(), mc:ann_value(), diff --git a/deps/rabbit/src/rabbit_msg_interceptor_routing_node.erl b/deps/rabbit/src/rabbit_msg_interceptor_routing_node.erl index 443c6febd125..d8b4c77c6d09 100644 --- a/deps/rabbit/src/rabbit_msg_interceptor_routing_node.erl +++ b/deps/rabbit/src/rabbit_msg_interceptor_routing_node.erl @@ -11,9 +11,9 @@ -export([intercept/4]). -intercept(Msg, _Ctx, incoming, Config) -> +intercept(Msg, _Ctx, incoming, Cfg) -> Node = atom_to_binary(node()), - Overwrite = maps:get(overwrite, Config), + Overwrite = maps:get(overwrite, Cfg), rabbit_msg_interceptor:set_annotation(Msg, ?KEY, Node, Overwrite); -intercept(Msg, _Ctx, _Stage, _Config) -> +intercept(Msg, _Ctx, _Stage, _Cfg) -> Msg. diff --git a/deps/rabbit/src/rabbit_msg_interceptor_timestamp.erl b/deps/rabbit/src/rabbit_msg_interceptor_timestamp.erl index 5d4e7080a2d2..e07269a3c494 100644 --- a/deps/rabbit/src/rabbit_msg_interceptor_timestamp.erl +++ b/deps/rabbit/src/rabbit_msg_interceptor_timestamp.erl @@ -16,15 +16,15 @@ -export([intercept/4]). -intercept(Msg0, _Ctx, incoming, #{incoming := _True} = Config) -> - Overwrite = maps:get(overwrite, Config), +intercept(Msg0, _Ctx, incoming, #{incoming := _True} = Cfg) -> + Overwrite = maps:get(overwrite, Cfg), Ts = mc:get_annotation(?ANN_RECEIVED_AT_TIMESTAMP, Msg0), Msg = rabbit_msg_interceptor:set_annotation(Msg0, ?KEY_INCOMING, Ts, Overwrite), set_timestamp(Msg, Ts, Overwrite); intercept(Msg, _Ctx, outgoing, #{outgoing := _True}) -> Ts = os:system_time(millisecond), mc:set_annotation(?KEY_OUTGOING, Ts, Msg); -intercept(Msg, _MsgInterceptorCtx, _Stage, _Config) -> +intercept(Msg, _MsgIcptCtx, _Stage, _Cfg) -> Msg. set_timestamp(Msg, Ts, true) -> diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_msg_interceptor_client_id.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_msg_interceptor_client_id.erl index e4302c297a9e..00864f03023b 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_msg_interceptor_client_id.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_msg_interceptor_client_id.erl @@ -16,5 +16,5 @@ intercept(Msg, #{protocol := Proto, client_id := ClientId}, incoming, _Cfg) Proto =:= mqtt311 orelse Proto =:= mqtt310 -> mc:set_annotation(?KEY, ClientId, Msg); -intercept(Msg, _Ctx, _Stage, _Config) -> +intercept(Msg, _Ctx, _Stage, _Cfg) -> Msg. diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl index d62a12ba5a2d..928217aa33d3 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl @@ -92,7 +92,8 @@ %% The database stores the MQTT subscription options in the binding arguments for: %% * v1 as Erlang record #mqtt_subscription_opts{} %% * v2 as AMQP 0.9.1 table - binding_args_v2 :: boolean() + binding_args_v2 :: boolean(), + msg_interceptor_ctx :: rabbit_msg_interceptor:context() }). -record(state, @@ -214,9 +215,15 @@ process_connect( %% To simplify logic, we decide at connection establishment time to stick %% with either binding args v1 or v2 for the lifetime of the connection. BindingArgsV2 = rabbit_feature_flags:is_enabled('rabbitmq_4.1.0'), + ProtoVerAtom = proto_integer_to_atom(ProtoVer), + MsgIcptCtx = #{protocol => ProtoVerAtom, + vhost => VHost, + username => Username, + connection_name => ConnName, + client_id => ClientId}, S = #state{ cfg = #cfg{socket = Socket, - proto_ver = proto_integer_to_atom(ProtoVer), + proto_ver = ProtoVerAtom, clean_start = CleanStart, session_expiry_interval_secs = SessionExpiry, ssl_login_name = SslLoginName, @@ -237,7 +244,8 @@ process_connect( will_msg = WillMsg, max_packet_size_outbound = MaxPacketSize, topic_alias_maximum_outbound = TopicAliasMaxOutbound, - binding_args_v2 = BindingArgsV2}, + binding_args_v2 = BindingArgsV2, + msg_interceptor_ctx = MsgIcptCtx}, auth_state = #auth_state{ user = User, authz_ctx = AuthzCtx}}, @@ -1632,15 +1640,15 @@ publish_to_queues( #state{cfg = #cfg{exchange = ExchangeName = #resource{name = ExchangeNameBin}, delivery_flow = Flow, conn_name = ConnName, - trace_state = TraceState}, + trace_state = TraceState, + msg_interceptor_ctx = MsgIcptCtx}, auth_state = #auth_state{user = #user{username = Username}}} = State) -> Anns = #{?ANN_EXCHANGE => ExchangeNameBin, ?ANN_ROUTING_KEYS => [mqtt_to_amqp(Topic)]}, Msg0 = mc:init(mc_mqtt, MqttMsg, Anns, mc_env()), case rabbit_exchange:lookup(ExchangeName) of {ok, Exchange} -> - Ctx = msg_interceptor_ctx(State), - Msg = rabbit_msg_interceptor:intercept_incoming(Msg0, Ctx), + Msg = rabbit_msg_interceptor:intercept_incoming(Msg0, MsgIcptCtx), QNames0 = rabbit_exchange:route(Exchange, Msg, #{return_binding_keys => true}), QNames = drop_local(QNames0, State), rabbit_trace:tap_in(Msg, QNames, ConnName, Username, TraceState), @@ -2066,13 +2074,13 @@ deliver_to_client(Msgs, Ack, State) -> end, State, Msgs). deliver_one_to_client({QNameOrType, QPid, QMsgId, _Redelivered, Mc} = Delivery, - AckRequired, State0) -> + AckRequired, + #state{cfg = #cfg{msg_interceptor_ctx = MsgIcptCtx}} = State0) -> SubscriberQoS = case AckRequired of true -> ?QOS_1; false -> ?QOS_0 end, McMqtt0 = mc:convert(mc_mqtt, Mc, mc_env()), - MsgIcptCtx = msg_interceptor_ctx(State0), McMqtt = rabbit_msg_interceptor:intercept_outgoing(McMqtt0, MsgIcptCtx), MqttMsg = #mqtt_msg{qos = PublisherQos} = mc:protocol_state(McMqtt), QoS = effective_qos(PublisherQos, SubscriberQoS), @@ -2539,17 +2547,6 @@ message_redelivered(_, _, _) -> is_success(ReasonCode) -> ReasonCode < ?RC_UNSPECIFIED_ERROR. -msg_interceptor_ctx(#state{cfg = #cfg{client_id = ClientId, - conn_name = ConnName, - vhost = VHost, - proto_ver = ProtoVer}, - auth_state = #auth_state{user = #user{username = Username}}}) -> - #{protocol => ProtoVer, - vhost => VHost, - username => Username, - connection_name => ConnName, - client_id => ClientId}. - -spec format_status(state()) -> map(). format_status( #state{queue_states = QState, From d302b85c0a3acdde02cec16babf91fde02f122dc Mon Sep 17 00:00:00 2001 From: David Ansari Date: Fri, 18 Apr 2025 14:59:41 +0200 Subject: [PATCH 5/6] Add 4.2.0 release notes [skip ci] --- release-notes/4.2.0.md | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) create mode 100644 release-notes/4.2.0.md diff --git a/release-notes/4.2.0.md b/release-notes/4.2.0.md new file mode 100644 index 000000000000..f387b5143fab --- /dev/null +++ b/release-notes/4.2.0.md @@ -0,0 +1,20 @@ +## RabbitMQ 4.2.0 + +RabbitMQ 4.2.0 is a new feature release. + + +## Features + +### Incoming and Outgoing Message Interceptors for native protocols + +Incoming and outgoing messages can now be intercepted on the broker. +This works for AMQP 1.0, AMQP 0.9.1, and MQTT. + +What the interceptor does is entirely up to its implementation - it can validate message metadata, add annotations, or perform arbitrary side effects. +Custom interceptors can be developed and integrated via [plugins](./plugins). + +Two new optional built-in interceptors were added to RabbitMQ: +1. Timestamps for outgoing messages +2. Setting client ID of publishing MQTT client + +Detailed information can be found in the [Message Interceptor](https://www.rabbitmq.com/docs/next/message-inteceptor) documentation. From 69c28551e8d38d68a2fa44596c22c97f1f50dc6f Mon Sep 17 00:00:00 2001 From: David Ansari Date: Tue, 22 Apr 2025 10:41:56 +0200 Subject: [PATCH 6/6] Intercept outgoing just before conversion Intercept outgoing message just before conversion to target protocol as this will give most flexibility to 3rd party plugins. --- deps/rabbit/src/rabbit_amqp_session.erl | 6 +++--- deps/rabbit/src/rabbit_channel.erl | 4 ++-- deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl | 6 +++--- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/deps/rabbit/src/rabbit_amqp_session.erl b/deps/rabbit/src/rabbit_amqp_session.erl index 9f841c22682c..caa2024fa1e9 100644 --- a/deps/rabbit/src/rabbit_amqp_session.erl +++ b/deps/rabbit/src/rabbit_amqp_session.erl @@ -2180,9 +2180,9 @@ handle_deliver(ConsumerTag, AckRequired, delivery_tag = {binary, Dtag}, message_format = ?UINT(?MESSAGE_FORMAT), settled = SendSettled}, - Mc1 = mc:convert(mc_amqp, Mc0), - Mc2 = mc:set_annotation(redelivered, Redelivered, Mc1), - Mc = rabbit_msg_interceptor:intercept_outgoing(Mc2, MsgIcptCtx), + Mc1 = rabbit_msg_interceptor:intercept_outgoing(Mc0, MsgIcptCtx), + Mc2 = mc:convert(mc_amqp, Mc1), + Mc = mc:set_annotation(redelivered, Redelivered, Mc2), Sections = mc:protocol_state(Mc), validate_message_size(Sections, MaxMessageSize), Frames = transfer_frames(Transfer, Sections, MaxFrameSize), diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index 6eb438d2f7e9..38614fc4de72 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -2645,8 +2645,8 @@ handle_basic_get(WriterPid, DeliveryTag, NoAck, MessageCount, {noreply, record_sent(get, QueueType, DeliveryTag, not(NoAck), Msg0, State)}. outgoing_content(Mc, MsgIcptCtx) -> - Mc1 = mc:convert(mc_amqpl, Mc), - Mc2 = rabbit_msg_interceptor:intercept_outgoing(Mc1, MsgIcptCtx), + Mc1 = rabbit_msg_interceptor:intercept_outgoing(Mc, MsgIcptCtx), + Mc2 = mc:convert(mc_amqpl, Mc1), mc:protocol_state(Mc2). init_tick_timer(State = #ch{tick_timer = undefined}) -> diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl index 928217aa33d3..ac22c9044b05 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl @@ -2073,15 +2073,15 @@ deliver_to_client(Msgs, Ack, State) -> deliver_one_to_client(Msg, Ack, S) end, State, Msgs). -deliver_one_to_client({QNameOrType, QPid, QMsgId, _Redelivered, Mc} = Delivery, +deliver_one_to_client({QNameOrType, QPid, QMsgId, _Redelivered, Mc0} = Delivery, AckRequired, #state{cfg = #cfg{msg_interceptor_ctx = MsgIcptCtx}} = State0) -> SubscriberQoS = case AckRequired of true -> ?QOS_1; false -> ?QOS_0 end, - McMqtt0 = mc:convert(mc_mqtt, Mc, mc_env()), - McMqtt = rabbit_msg_interceptor:intercept_outgoing(McMqtt0, MsgIcptCtx), + Mc = rabbit_msg_interceptor:intercept_outgoing(Mc0, MsgIcptCtx), + McMqtt = mc:convert(mc_mqtt, Mc, mc_env()), MqttMsg = #mqtt_msg{qos = PublisherQos} = mc:protocol_state(McMqtt), QoS = effective_qos(PublisherQos, SubscriberQoS), {SettleOp, State1} = maybe_publish_to_client(MqttMsg, Delivery, QoS, State0),