Skip to content

SQL Filter Expressions for Streams #14110

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions deps/amqp10_client/include/amqp10_client.hrl
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%

-record(filter, {
descriptor :: binary() | non_neg_integer(),
value :: term()
}).
2 changes: 1 addition & 1 deletion deps/amqp10_client/src/amqp10_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

-module(amqp10_client).

-include("amqp10_client.hrl").
-include("amqp10_client_internal.hrl").
-include_lib("amqp10_common/include/amqp10_framing.hrl").

-export([open_connection/1,
Expand Down
2 changes: 1 addition & 1 deletion deps/amqp10_client/src/amqp10_client_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

-behaviour(gen_statem).

-include("amqp10_client.hrl").
-include("amqp10_client_internal.hrl").
-include_lib("amqp10_common/include/amqp10_framing.hrl").
-include_lib("amqp10_common/include/amqp10_types.hrl").

Expand Down
2 changes: 1 addition & 1 deletion deps/amqp10_client/src/amqp10_client_frame_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

-behaviour(gen_statem).

-include("amqp10_client.hrl").
-include("amqp10_client_internal.hrl").
-include_lib("amqp10_common/include/amqp10_framing.hrl").

-ifdef(TEST).
Expand Down
72 changes: 42 additions & 30 deletions deps/amqp10_client/src/amqp10_client_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
-behaviour(gen_statem).

-include("amqp10_client.hrl").
-include("amqp10_client_internal.hrl").
-include_lib("amqp10_common/include/amqp10_framing.hrl").
-include_lib("amqp10_common/include/amqp10_types.hrl").

Expand Down Expand Up @@ -86,7 +87,7 @@
-type attach_role() :: {sender, target_def()} | {receiver, source_def(), pid()}.

% http://www.amqp.org/specification/1.0/filters
-type filter() :: #{binary() => binary() | map() | list(binary())}.
-type filter() :: #{binary() => #filter{} | binary() | map() | list(binary())}.
-type max_message_size() :: undefined | non_neg_integer().
-type footer_opt() :: crc32 | adler32.

Expand Down Expand Up @@ -781,29 +782,39 @@ translate_filters(Filters)
when map_size(Filters) =:= 0 ->
undefined;
translate_filters(Filters) ->
{map,
maps:fold(
fun
(<<"apache.org:legacy-amqp-headers-binding:map">> = K, V, Acc) when is_map(V) ->
%% special case conversion
Key = sym(K),
[{Key, {described, Key, translate_legacy_amqp_headers_binding(V)}} | Acc];
(K, V, Acc) when is_binary(K) ->
%% try treat any filter value generically
Key = sym(K),
Value = filter_value_type(V),
[{Key, {described, Key, Value}} | Acc]
end, [], Filters)}.

filter_value_type(V) when is_binary(V) ->
{map, lists:map(
fun({Name, #filter{descriptor = Desc,
value = V}})
when is_binary(Name) ->
Descriptor = if is_binary(Desc) -> {symbol, Desc};
is_integer(Desc) -> {ulong, Desc}
end,
{{symbol, Name}, {described, Descriptor, V}};
({<<"apache.org:legacy-amqp-headers-binding:map">> = K, V})
when is_map(V) ->
%% special case conversion
Key = sym(K),
Val = translate_legacy_amqp_headers_binding(V),
{Key, {described, Key, Val}};
({K, V})
when is_binary(K) ->
Key = {symbol, K},
Val = filter_value_type(V),
{Key, {described, Key, Val}}
end, maps:to_list(Filters))}.

filter_value_type(V)
when is_binary(V) ->
%% this is clearly not always correct
{utf8, V};
filter_value_type(V)
when is_integer(V) andalso V >= 0 ->
{uint, V};
filter_value_type(VList) when is_list(VList) ->
filter_value_type(VList)
when is_list(VList) ->
{list, [filter_value_type(V) || V <- VList]};
filter_value_type({T, _} = V) when is_atom(T) ->
filter_value_type({T, _} = V)
when is_atom(T) ->
%% looks like an already tagged type, just pass it through
V.

Expand Down Expand Up @@ -1507,16 +1518,17 @@ translate_filters_selector_filter_test() ->
} = translate_filters(#{<<"apache.org:selector-filter:string">> => <<"amqp.annotation.x-opt-enqueuedtimeutc > 123456789">>}).

translate_filters_multiple_filters_test() ->
{map,
[
{{symbol, <<"apache.org:selector-filter:string">>},
{described, {symbol, <<"apache.org:selector-filter:string">>},
{utf8, <<"amqp.annotation.x-opt-enqueuedtimeutc > 123456789">>}}},
{{symbol, <<"apache.org:legacy-amqp-direct-binding:string">>},
{described, {symbol, <<"apache.org:legacy-amqp-direct-binding:string">>}, {utf8,<<"my topic">>}}}
]
} = translate_filters(#{
<<"apache.org:legacy-amqp-direct-binding:string">> => <<"my topic">>,
<<"apache.org:selector-filter:string">> => <<"amqp.annotation.x-opt-enqueuedtimeutc > 123456789">>
}).
{map, Actual} = translate_filters(
#{
<<"apache.org:legacy-amqp-direct-binding:string">> => <<"my topic">>,
<<"apache.org:selector-filter:string">> => <<"amqp.annotation.x-opt-enqueuedtimeutc > 123456789">>
}),
Expected = [{{symbol, <<"apache.org:selector-filter:string">>},
{described, {symbol, <<"apache.org:selector-filter:string">>},
{utf8, <<"amqp.annotation.x-opt-enqueuedtimeutc > 123456789">>}}},
{{symbol, <<"apache.org:legacy-amqp-direct-binding:string">>},
{described, {symbol, <<"apache.org:legacy-amqp-direct-binding:string">>}, {utf8,<<"my topic">>}}}],
ActualSorted = lists:sort(Actual),
ExpectedSorted = lists:sort(Expected),
ExpectedSorted = ActualSorted.
-endif.
2 changes: 1 addition & 1 deletion deps/amqp10_client/test/mock_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
recv_amqp_header_step/1
]).

-include("src/amqp10_client.hrl").
-include("src/amqp10_client_internal.hrl").

start(Port) ->
{ok, LSock} = gen_tcp:listen(Port, [binary, {packet, 0}, {active, false}]),
Expand Down
31 changes: 31 additions & 0 deletions deps/amqp10_common/include/amqp10_filter.hrl
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.

%% A filter with this name contains a JMS message selector.
%% We use the same name as sent by the Qpid JMS client in
%% https://github.com/apache/qpid-jms/blob/2.7.0/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java#L75
-define(FILTER_NAME_JMS, <<"jms-selector">>).

%% A filter with this name contains an SQL expression.
%% In the current version, such a filter must comply with the JMS message selector syntax.
%% However, we use a name other than "jms-selector" in case we want to extend the allowed syntax
%% in the future, for example allowing for some of the extended grammar described in
%% §6 "SQL Filter Expressions" of
%% https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=66227
-define(FILTER_NAME_SQL, <<"sql-filter">>).

%% SQL-based filtering syntax
%% These descriptors are defined in
%% https://www.amqp.org/specification/1.0/filters
-define(DESCRIPTOR_NAME_SELECTOR_FILTER, <<"apache.org:selector-filter:string">>).
-define(DESCRIPTOR_CODE_SELECTOR_FILTER, 16#0000468C00000004).

%% AMQP Filter Expressions Version 1.0 Working Draft 09
%% https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=66227
-define(DESCRIPTOR_NAME_PROPERTIES_FILTER, <<"amqp:properties-filter">>).
-define(DESCRIPTOR_CODE_PROPERTIES_FILTER, 16#173).
-define(DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER, <<"amqp:application-properties-filter">>).
-define(DESCRIPTOR_CODE_APPLICATION_PROPERTIES_FILTER, 16#174).
15 changes: 0 additions & 15 deletions deps/amqp10_common/include/amqp10_filtex.hrl

This file was deleted.

5 changes: 4 additions & 1 deletion deps/rabbit/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ define ct_master.erl
endef

PARALLEL_CT_SET_1_A = unit_rabbit_ssl unit_cluster_formation_locking_mocks unit_cluster_formation_sort_nodes unit_collections unit_config_value_encryption unit_connection_tracking
PARALLEL_CT_SET_1_B = amqp_address amqp_auth amqp_credit_api_v2 amqp_filtex amqp_dotnet amqp_jms signal_handling single_active_consumer unit_access_control_authn_authz_context_propagation unit_access_control_credential_validation unit_amqp091_content_framing unit_amqp091_server_properties unit_app_management
PARALLEL_CT_SET_1_B = amqp_address amqp_auth amqp_credit_api_v2 amqp_filter_prop amqp_filter_sql amqp_jms_unit amqp_dotnet amqp_jms signal_handling single_active_consumer unit_access_control_authn_authz_context_propagation unit_access_control_credential_validation unit_amqp091_content_framing unit_amqp091_server_properties unit_app_management
PARALLEL_CT_SET_1_C = amqp_proxy_protocol amqpl_consumer_ack amqpl_direct_reply_to backing_queue bindings rabbit_db_maintenance rabbit_db_msup rabbit_db_policy rabbit_db_queue rabbit_db_topic_exchange rabbit_direct_reply_to_prop cluster_limit cluster_minority term_to_binary_compat_prop topic_permission transactions unicode unit_access_control
PARALLEL_CT_SET_1_D = amqqueue_backward_compatibility channel_interceptor channel_operation_timeout classic_queue classic_queue_prop config_schema peer_discovery_dns peer_discovery_tmp_hidden_node per_node_limit per_user_connection_channel_limit

Expand Down Expand Up @@ -363,6 +363,9 @@ ifdef TRACE_SUPERVISOR2
RMQ_ERLC_OPTS += -DTRACE_SUPERVISOR2=true
endif

# https://www.erlang.org/doc/apps/parsetools/leex.html#file/2
export ERL_COMPILER_OPTIONS := deterministic

# --------------------------------------------------------------------
# Documentation.
# --------------------------------------------------------------------
Expand Down
11 changes: 5 additions & 6 deletions deps/rabbit/src/mc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
-type str() :: atom() | string() | binary().
-type internal_ann_key() :: atom().
-type x_ann_key() :: binary(). %% should begin with x- or ideally x-opt-
-type x_ann_value() :: str() | integer() | float() | TaggedValue :: tuple() | [x_ann_value()].
-type x_ann_value() :: str() | number() | TaggedValue :: tuple() | [x_ann_value()].
-type protocol() :: module().
-type annotations() :: #{internal_ann_key() => term(),
x_ann_key() => x_ann_value()}.
Expand Down Expand Up @@ -76,8 +76,7 @@
-type property_value() :: undefined |
string() |
binary() |
integer() |
float() |
number() |
boolean().
-type tagged_value() :: {uuid, binary()} |
{utf8, binary()} |
Expand Down Expand Up @@ -155,9 +154,9 @@ init(Proto, Data, Anns) ->
-spec init(protocol(), term(), annotations(), environment()) -> state().
init(Proto, Data, Anns0, Env) ->
{ProtoData, ProtoAnns} = Proto:init(Data),
Anns1 = case map_size(Env) =:= 0 of
true -> Anns0;
false -> Anns0#{env => Env}
Anns1 = case map_size(Env) of
0 -> Anns0;
_ -> Anns0#{env => Env}
end,
Anns2 = maps:merge(ProtoAnns, Anns1),
Anns = ensure_received_at_timestamp(Anns2),
Expand Down
24 changes: 24 additions & 0 deletions deps/rabbit/src/rabbit_amqp_filter.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.

-module(rabbit_amqp_filter).

-export([eval/2]).

-type expression() :: undefined |
{property, rabbit_amqp_filter_prop:parsed_expressions()} |
{jms, rabbit_amqp_filter_jms:parsed_expression()}.

-export_type([expression/0]).

-spec eval(expression(), mc:state()) -> boolean().
eval(undefined, _Mc) ->
%% A receiver without filter wants all messages.
true;
eval({property, Expr}, Mc) ->
rabbit_amqp_filter_prop:eval(Expr, Mc);
eval({jms, Expr}, Mc) ->
rabbit_amqp_filter_jms:eval(Expr, Mc).
Loading
Loading