Skip to content

Commit ec25eb0

Browse files
committed
Shovel: use message containers
1 parent 1277a4b commit ec25eb0

File tree

8 files changed

+164
-173
lines changed

8 files changed

+164
-173
lines changed

deps/amqp10_client/src/amqp10_msg.erl

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -270,14 +270,23 @@ new(DeliveryTag, Bin, Settled) when is_binary(Bin) ->
270270
Body = [#'v1_0.data'{content = Bin}],
271271
new(DeliveryTag, Body, Settled);
272272
new(DeliveryTag, Body, Settled) -> % TODO: constrain to amqp types
273-
#amqp10_msg{
274-
transfer = #'v1_0.transfer'{
275-
delivery_tag = {binary, DeliveryTag},
276-
settled = Settled,
277-
message_format = {uint, ?MESSAGE_FORMAT}},
278-
%% This lib is safe by default.
279-
header = #'v1_0.header'{durable = true},
280-
body = Body}.
273+
case is_amqp10_body(Body) of
274+
true ->
275+
#amqp10_msg{
276+
transfer = #'v1_0.transfer'{
277+
delivery_tag = {binary, DeliveryTag},
278+
settled = Settled,
279+
message_format = {uint, ?MESSAGE_FORMAT}},
280+
%% This lib is safe by default.
281+
header = #'v1_0.header'{durable = true},
282+
body = Body};
283+
false ->
284+
Transfer = #'v1_0.transfer'{
285+
delivery_tag = {binary, DeliveryTag},
286+
settled = Settled,
287+
message_format = {uint, ?MESSAGE_FORMAT}},
288+
from_amqp_records([Transfer | Body])
289+
end.
281290

282291
%% @doc Create a new settled amqp10 message using the specified delivery tag
283292
%% and body.
@@ -462,3 +471,19 @@ uint(B) -> {uint, B}.
462471

463472
has_value(undefined) -> false;
464473
has_value(_) -> true.
474+
475+
is_amqp10_body(#'v1_0.amqp_value'{}) ->
476+
true;
477+
is_amqp10_body(List) when is_list(List) ->
478+
lists:all(fun(#'v1_0.data'{}) ->
479+
true;
480+
(_) ->
481+
false
482+
end, List) orelse
483+
lists:all(fun(#'v1_0.amqp_sequence'{}) ->
484+
true;
485+
(_) ->
486+
false
487+
end, List);
488+
is_amqp10_body(_) ->
489+
false.

deps/rabbit/src/rabbit_amqp_session.erl

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2440,7 +2440,6 @@ incoming_link_transfer(
24402440
validate_message_size(PayloadSize, MaxMessageSize),
24412441
rabbit_msg_size_metrics:observe(?PROTOCOL, PayloadSize),
24422442
messages_received(Settled),
2443-
24442443
Mc0 = mc:init(mc_amqp, PayloadBin, #{}),
24452444
case lookup_target(LinkExchange, LinkRKey, Mc0, Vhost, User, PermCache0) of
24462445
{ok, X, RoutingKeys, Mc1, PermCache} ->

deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl

Lines changed: 21 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
-behaviour(rabbit_shovel_behaviour).
1313

1414
-include_lib("amqp_client/include/amqp_client.hrl").
15+
-include_lib("rabbit/include/mc.hrl").
1516
-include("rabbit_shovel.hrl").
1617

1718
-export([
@@ -33,7 +34,7 @@
3334
ack/3,
3435
nack/3,
3536
status/1,
36-
forward/4
37+
forward/3
3738
]).
3839

3940
%% Function references should not be stored on the metadata store.
@@ -169,8 +170,8 @@ forward_pending(State) ->
169170
case pop_pending(State) of
170171
empty ->
171172
State;
172-
{{Tag, Props, Payload}, S} ->
173-
S2 = do_forward(Tag, Props, Payload, S),
173+
{{Tag, Mc}, S} ->
174+
S2 = do_forward(Tag, Mc, S),
174175
S3 = control_throttle(S2),
175176
case is_blocked(S3) of
176177
true ->
@@ -183,91 +184,50 @@ forward_pending(State) ->
183184
end
184185
end.
185186

186-
forward(IncomingTag, Props, Payload, State) ->
187+
forward(IncomingTag, Mc, State) ->
187188
case is_blocked(State) of
188189
true ->
189190
%% We are blocked by client-side flow-control and/or
190191
%% `connection.blocked` message from the destination
191192
%% broker. Simply cache the forward.
192-
PendingEntry = {IncomingTag, Props, Payload},
193+
PendingEntry = {IncomingTag, Mc},
193194
add_pending(PendingEntry, State);
194195
false ->
195-
State1 = do_forward(IncomingTag, Props, Payload, State),
196+
State1 = do_forward(IncomingTag, Mc, State),
196197
control_throttle(State1)
197198
end.
198199

199-
do_forward(IncomingTag, Props, Payload,
200+
do_forward(IncomingTag, Mc0,
200201
State0 = #{dest := #{props_fun := {M, F, Args},
201202
current := {_, _, DstUri},
202203
fields_fun := {Mf, Ff, Argsf}}}) ->
203204
SrcUri = rabbit_shovel_behaviour:source_uri(State0),
204205
% do publish
205-
Exchange = maps:get(exchange, Props, undefined),
206-
RoutingKey = maps:get(routing_key, Props, undefined),
206+
Exchange = mc:exchange(Mc0),
207+
RoutingKey = case mc:routing_keys(Mc0) of
208+
[RK | _] -> RK;
209+
Any -> Any
210+
end,
207211
Method = #'basic.publish'{exchange = Exchange, routing_key = RoutingKey},
208212
Method1 = apply(Mf, Ff, Argsf ++ [SrcUri, DstUri, Method]),
209-
Msg1 = #amqp_msg{props = apply(M, F, Args ++ [SrcUri, DstUri, props_from_map(Props)]),
213+
Mc = mc:convert(mc_amqpl, Mc0),
214+
{Props, Payload} = rabbit_basic_common:from_content(mc:protocol_state(Mc)),
215+
Msg1 = #amqp_msg{props = apply(M, F, Args ++ [SrcUri, DstUri, Props]),
210216
payload = Payload},
211217
publish(IncomingTag, Method1, Msg1, State0).
212218

213-
props_from_map(Map) ->
214-
#'P_basic'{content_type = maps:get(content_type, Map, undefined),
215-
content_encoding = maps:get(content_encoding, Map, undefined),
216-
headers = maps:get(headers, Map, undefined),
217-
delivery_mode = maps:get(delivery_mode, Map, undefined),
218-
priority = maps:get(priority, Map, undefined),
219-
correlation_id = maps:get(correlation_id, Map, undefined),
220-
reply_to = maps:get(reply_to, Map, undefined),
221-
expiration = maps:get(expiration, Map, undefined),
222-
message_id = maps:get(message_id, Map, undefined),
223-
timestamp = maps:get(timestamp, Map, undefined),
224-
type = maps:get(type, Map, undefined),
225-
user_id = maps:get(user_id, Map, undefined),
226-
app_id = maps:get(app_id, Map, undefined),
227-
cluster_id = maps:get(cluster_id, Map, undefined)}.
228-
229-
map_from_props(#'P_basic'{content_type = Content_type,
230-
content_encoding = Content_encoding,
231-
headers = Headers,
232-
delivery_mode = Delivery_mode,
233-
priority = Priority,
234-
correlation_id = Correlation_id,
235-
reply_to = Reply_to,
236-
expiration = Expiration,
237-
message_id = Message_id,
238-
timestamp = Timestamp,
239-
type = Type,
240-
user_id = User_id,
241-
app_id = App_id,
242-
cluster_id = Cluster_id}) ->
243-
lists:foldl(fun({_K, undefined}, Acc) -> Acc;
244-
({K, V}, Acc) -> Acc#{K => V}
245-
end, #{}, [{content_type, Content_type},
246-
{content_encoding, Content_encoding},
247-
{headers, Headers},
248-
{delivery_mode, Delivery_mode},
249-
{priority, Priority},
250-
{correlation_id, Correlation_id},
251-
{reply_to, Reply_to},
252-
{expiration, Expiration},
253-
{message_id, Message_id},
254-
{timestamp, Timestamp},
255-
{type, Type},
256-
{user_id, User_id},
257-
{app_id, App_id},
258-
{cluster_id, Cluster_id}
259-
]).
260-
261219
handle_source(#'basic.consume_ok'{}, State) ->
262220
State;
263221
handle_source({#'basic.deliver'{delivery_tag = Tag,
264222
exchange = Exchange,
265223
routing_key = RoutingKey},
266224
#amqp_msg{props = Props0, payload = Payload}}, State) ->
267-
Props = (map_from_props(Props0))#{exchange => Exchange,
268-
routing_key => RoutingKey},
225+
Content = rabbit_basic_common:build_content(Props0, Payload),
226+
Msg0 = mc:init(mc_amqpl, Content, #{}),
227+
Msg1 = mc:set_annotation(?ANN_ROUTING_KEYS, [RoutingKey], Msg0),
228+
Msg = mc:set_annotation(?ANN_EXCHANGE, Exchange, Msg1),
269229
% forward to destination
270-
rabbit_shovel_behaviour:forward(Tag, Props, Payload, State);
230+
rabbit_shovel_behaviour:forward(Tag, Msg, State);
271231

272232
handle_source({'EXIT', Conn, Reason},
273233
#{source := #{current := {Conn, _, _}}}) ->

deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl

Lines changed: 28 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
-behaviour(rabbit_shovel_behaviour).
1111

12+
-include_lib("rabbit/include/mc.hrl").
1213
-include("rabbit_shovel.hrl").
1314

1415
-export([
@@ -30,7 +31,7 @@
3031
ack/3,
3132
nack/3,
3233
status/1,
33-
forward/4
34+
forward/3
3435
]).
3536

3637
-import(rabbit_misc, [pget/2, pget/3]).
@@ -184,10 +185,12 @@ dest_endpoint(#{shovel_type := dynamic,
184185

185186
-spec handle_source(Msg :: any(), state()) ->
186187
not_handled | state() | {stop, any()}.
187-
handle_source({amqp10_msg, _LinkRef, Msg}, State) ->
188-
Tag = amqp10_msg:delivery_id(Msg),
189-
Payload = amqp10_msg:body_bin(Msg),
190-
rabbit_shovel_behaviour:forward(Tag, #{}, Payload, State);
188+
handle_source({amqp10_msg, _LinkRef, Msg0}, State) ->
189+
Tag = amqp10_msg:delivery_id(Msg0),
190+
[_ | Rest] = amqp10_msg:to_amqp_records(Msg0),
191+
Bin = iolist_to_binary([amqp10_framing:encode_bin(D) || D <- Rest]),
192+
Msg = mc:init(mc_amqp, Bin, #{}),
193+
rabbit_shovel_behaviour:forward(Tag, Msg, State);
191194
handle_source({amqp10_event, {connection, Conn, opened}},
192195
State = #{source := #{current := #{conn := Conn}}}) ->
193196
State;
@@ -260,8 +263,8 @@ handle_dest({amqp10_event, {link, Link, credited}},
260263
%% we have credit so can begin to forward
261264
State = State0#{dest => Dst#{link_state => credited,
262265
pending => []}},
263-
lists:foldl(fun ({A, B, C}, S) ->
264-
forward(A, B, C, S)
266+
lists:foldl(fun ({A, B}, S) ->
267+
forward(A, B, S)
265268
end, State, lists:reverse(Pend));
266269
handle_dest({amqp10_event, {link, Link, _Evt}},
267270
State= #{dest := #{current := #{link := Link}}}) ->
@@ -315,27 +318,26 @@ status(_) ->
315318
%% Destination not yet connected
316319
ignore.
317320

318-
-spec forward(Tag :: tag(), Props :: #{atom() => any()},
319-
Payload :: binary(), state()) ->
321+
-spec forward(Tag :: tag(), Mc :: mc:state(), state()) ->
320322
state() | {stop, any()}.
321-
forward(_Tag, _Props, _Payload,
323+
forward(_Tag, _Mc,
322324
#{source := #{remaining_unacked := 0}} = State) ->
323325
State;
324-
forward(Tag, Props, Payload,
326+
forward(Tag, Mc,
325327
#{dest := #{current := #{link_state := attached},
326328
pending := Pend0} = Dst} = State) ->
327329
%% simply cache the forward oo
328-
Pend = [{Tag, Props, Payload} | Pend0],
330+
Pend = [{Tag, Mc} | Pend0],
329331
State#{dest => Dst#{pending => {Pend}}};
330-
forward(Tag, Props, Payload,
332+
forward(Tag, Msg0,
331333
#{dest := #{current := #{link := Link},
332334
unacked := Unacked} = Dst,
333335
ack_mode := AckMode} = State) ->
334336
OutTag = rabbit_data_coercion:to_binary(Tag),
335-
Msg0 = new_message(OutTag, Payload, State),
336-
Msg = add_timestamp_header(
337-
State, set_message_properties(
338-
Props, add_forward_headers(State, Msg0))),
337+
Msg1 = mc:protocol_state(mc:convert(mc_amqp, Msg0)),
338+
Records = lists:flatten([amqp10_framing:decode_bin(iolist_to_binary(S)) || S <- Msg1]),
339+
Msg2 = amqp10_msg:new(OutTag, Records, AckMode =/= on_confirm),
340+
Msg = update_amqp10_message(Msg2, mc:exchange(Msg0), mc:routing_keys(Msg0), State),
339341
case send_msg(Link, Msg) of
340342
ok ->
341343
rabbit_shovel_behaviour:decr_remaining_unacked(
@@ -364,73 +366,25 @@ send_msg(Link, Msg) ->
364366
end
365367
end.
366368

367-
new_message(Tag, Payload, #{ack_mode := AckMode,
368-
dest := #{properties := Props,
369-
application_properties := AppProps,
370-
message_annotations := MsgAnns}}) ->
371-
Msg0 = amqp10_msg:new(Tag, Payload, AckMode =/= on_confirm),
369+
update_amqp10_message(Msg0, Exchange, RK, #{dest := #{properties := Props,
370+
application_properties := AppProps0,
371+
message_annotations := MsgAnns}} = State) ->
372372
Msg1 = amqp10_msg:set_properties(Props, Msg0),
373-
Msg = amqp10_msg:set_message_annotations(MsgAnns, Msg1),
374-
amqp10_msg:set_application_properties(AppProps, Msg).
373+
Msg2 = amqp10_msg:set_message_annotations(MsgAnns, Msg1),
374+
AppProps = AppProps0#{<<"exchange">> => Exchange,
375+
<<"routing_key">> => RK},
376+
Msg = amqp10_msg:set_application_properties(AppProps, Msg2),
377+
add_timestamp_header(State, add_forward_headers(State, Msg)).
375378

376379
add_timestamp_header(#{dest := #{add_timestamp_header := true}}, Msg) ->
377380
P =#{creation_time => os:system_time(milli_seconds)},
378381
amqp10_msg:set_properties(P, Msg);
379382
add_timestamp_header(_, Msg) -> Msg.
380383

381384
add_forward_headers(#{dest := #{cached_forward_headers := Props}}, Msg) ->
382-
amqp10_msg:set_application_properties(Props, Msg);
385+
amqp10_msg:set_application_properties(Props, Msg);
383386
add_forward_headers(_, Msg) -> Msg.
384387

385-
set_message_properties(Props, Msg) ->
386-
%% this is effectively special handling properties from amqp 0.9.1
387-
maps:fold(
388-
fun(content_type, Ct, M) ->
389-
amqp10_msg:set_properties(
390-
#{content_type => to_binary(Ct)}, M);
391-
(content_encoding, Ct, M) ->
392-
amqp10_msg:set_properties(
393-
#{content_encoding => to_binary(Ct)}, M);
394-
(delivery_mode, 2, M) ->
395-
amqp10_msg:set_headers(#{durable => true}, M);
396-
(delivery_mode, 1, M) ->
397-
% by default the durable flag is false
398-
M;
399-
(priority, P, M) when is_integer(P) ->
400-
amqp10_msg:set_headers(#{priority => P}, M);
401-
(correlation_id, Ct, M) ->
402-
amqp10_msg:set_properties(#{correlation_id => to_binary(Ct)}, M);
403-
(reply_to, Ct, M) ->
404-
amqp10_msg:set_properties(#{reply_to => to_binary(Ct)}, M);
405-
(message_id, Ct, M) ->
406-
amqp10_msg:set_properties(#{message_id => to_binary(Ct)}, M);
407-
(timestamp, Ct, M) ->
408-
amqp10_msg:set_properties(#{creation_time => Ct}, M);
409-
(user_id, Ct, M) ->
410-
amqp10_msg:set_properties(#{user_id => Ct}, M);
411-
(headers, Headers0, M) when is_list(Headers0) ->
412-
%% AMPQ 0.9.1 are added as applicatin properties
413-
%% TODO: filter headers to make safe
414-
Headers = lists:foldl(
415-
fun ({K, _T, V}, Acc) ->
416-
case is_amqp10_compat(V) of
417-
true ->
418-
Acc#{to_binary(K) => V};
419-
false ->
420-
Acc
421-
end
422-
end, #{}, Headers0),
423-
amqp10_msg:set_application_properties(Headers, M);
424-
(Key, Value, M) ->
425-
case is_amqp10_compat(Value) of
426-
true ->
427-
amqp10_msg:set_application_properties(
428-
#{to_binary(Key) => Value}, M);
429-
false ->
430-
M
431-
end
432-
end, Msg, Props).
433-
434388
gen_unique_name(Pre0, Post0) ->
435389
Pre = to_binary(Pre0),
436390
Post = to_binary(Post0),
@@ -441,8 +395,3 @@ bin_to_hex(Bin) ->
441395
<<<<if N >= 10 -> N -10 + $a;
442396
true -> N + $0 end>>
443397
|| <<N:4>> <= Bin>>.
444-
445-
is_amqp10_compat(T) ->
446-
is_binary(T) orelse
447-
is_number(T) orelse
448-
is_boolean(T).

0 commit comments

Comments
 (0)