Skip to content

WIP: Add per-queue-type disk limits #14086

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions deps/rabbit/priv/schema/rabbit.schema
Original file line number Diff line number Diff line change
Expand Up @@ -1241,6 +1241,26 @@ fun(Conf) ->
end
end}.

%%
%% Per-queue-type disk limits
%% =====================
%%

%% TODO: do relative limits make sense - what would they be relative to? The
%% full disk size?
%% {mapping, "quorum_queue_disk_limit.relative", "rabbit.quorum_queue_disk_limit", [
%% {datatype, float}]}.

{mapping, "quorum_queue_disk_limit.absolute", "rabbit.quorum_queue_disk_limit", [
{datatype, [integer, string]},
{validators, ["is_supported_information_unit"]}
]}.

{mapping, "stream_queue_disk_limit.absolute", "rabbit.stream_queue_disk_limit", [
{datatype, [integer, string]},
{validators, ["is_supported_information_unit"]}
]}.

%%
%% Clustering
%% =====================
Expand Down
15 changes: 11 additions & 4 deletions deps/rabbit/src/rabbit_alarm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@

-export_type([alarm/0]).
-type local_alarm() :: 'file_descriptor_limit'.
-type resource_alarm_source() :: 'disk' | 'memory'.
-type resource_alarm_source() :: 'disk' | 'memory' | {queue_type_disk, atom()}.
-type resource_alarm() :: {resource_limit, resource_alarm_source(), node()}.
-type alarm() :: local_alarm() | resource_alarm().
-type resource_alert() :: {WasAlarmSetForNode :: boolean(),
Expand Down Expand Up @@ -138,6 +138,11 @@ format_as_map({resource_limit, memory, Node}) ->
<<"resource">> => ?MEMORY_RESOURCE,
<<"node">> => Node
};
format_as_map({resource_limit, {queue_type_disk, QType}, Node}) ->
#{
<<"resource">> => todo,
<<"node">> => Node
};
format_as_map({resource_limit, Limit, Node}) ->
#{
<<"resource">> => rabbit_data_coercion:to_binary(Limit),
Expand Down Expand Up @@ -291,7 +296,7 @@ maybe_alert(UpdateFun, Node, Source, WasAlertAdded,
StillHasAlerts = lists:any(fun ({_Node, NodeAlerts}) -> lists:member(Source, NodeAlerts) end, dict:to_list(AN1)),
case StillHasAlerts of
true -> ok;
false -> rabbit_log:warning("~ts resource limit alarm cleared across the cluster", [Source])
false -> rabbit_log:warning("~tp resource limit alarm cleared across the cluster", [Source])
end,
Alert = {WasAlertAdded, StillHasAlerts, Node},
case node() of
Expand Down Expand Up @@ -326,9 +331,11 @@ internal_register(Pid, {M, F, A} = AlertMFA,
NewAlertees = dict:store(Pid, AlertMFA, Alertees),
State#alarms{alertees = NewAlertees}.

%% TODO: handle formatting of resources in these:

handle_set_resource_alarm(Source, Node, State) ->
rabbit_log:warning(
"~ts resource limit alarm set on node ~tp.~n~n"
"~tp resource limit alarm set on node ~tp.~n~n"
"**********************************************************~n"
"*** Publishers will be blocked until this alarm clears ***~n"
"**********************************************************~n",
Expand All @@ -347,7 +354,7 @@ handle_set_alarm(Alarm, State) ->
{ok, State}.

handle_clear_resource_alarm(Source, Node, State) ->
rabbit_log:warning("~ts resource limit alarm cleared on node ~tp",
rabbit_log:warning("~tp resource limit alarm cleared on node ~tp",
[Source, Node]),
{ok, maybe_alert(fun dict_unappend/3, Node, Source, false, State)}.

Expand Down
28 changes: 26 additions & 2 deletions deps/rabbit/src/rabbit_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@
delivery_flow :: flow | noflow,
interceptor_state,
queue_states,
queue_types_published :: sets:set(QType :: atom()),
tick_timer,
publishing_mode = false :: boolean()
}).
Expand Down Expand Up @@ -527,7 +528,8 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
reply_consumer = none,
delivery_flow = Flow,
interceptor_state = undefined,
queue_states = rabbit_queue_type:init()
queue_states = rabbit_queue_type:init(),
queue_types_published = sets:new([{version, 2}])
},
State1 = State#ch{
interceptor_state = rabbit_channel_interceptor:init(State)},
Expand Down Expand Up @@ -2057,9 +2059,10 @@ deliver_to_queues(XName,
ok = process_routing_mandatory(Mandatory, RoutedToQueues, Message, XName, State0),
MsgSeqNo = maps:get(correlation, Options, undefined),
State1 = process_routing_confirm(MsgSeqNo, QueueNames, XName, State0),
State2 = notify_published_queue_types(Qs, State1),
%% Actions must be processed after registering confirms as actions may
%% contain rejections of publishes
State = handle_queue_actions(Actions, State1#ch{queue_states = QueueStates}),
State = handle_queue_actions(Actions, State2#ch{queue_states = QueueStates}),
case rabbit_event:stats_level(State, #ch.stats_timer) of
fine ->
?INCR_STATS(exchange_stats, XName, 1, publish),
Expand All @@ -2082,6 +2085,27 @@ deliver_to_queues(XName,
[rabbit_misc:rs(Resource)])
end.

notify_published_queue_types(Qs,
#ch{cfg = #conf{reader_pid = ReaderPid},
queue_types_published = QTypes0} = State0) ->
QTypes = lists:foldl(
fun(Q0, Acc) ->
Q = case Q0 of
{Q1, _RouteInfo} -> Q1;
_ -> Q0
end,
QType = amqqueue:get_type(Q),
case sets:is_element(QType, Acc) of
true ->
Acc;
false ->
ReaderPid ! {channel_published_to_queue_type,
self(), QType},
sets:add_element(QType, Acc)
end
end, QTypes0, Qs),
State0#ch{queue_types_published = QTypes}.

process_routing_mandatory(_Mandatory = true,
_RoutedToQs = [],
Msg,
Expand Down
76 changes: 76 additions & 0 deletions deps/rabbit/src/rabbit_disk_usage.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%

-module(rabbit_disk_usage).

%% Functions for calculating disk usage of a given directory.

-include_lib("kernel/include/file.hrl").

-export([scan/1]).

%% @doc Calculates the disk usage in bytes of the given directory.
%%
%% On especially large directories this can be an expensive operation since
%% each sub-directory is scanned recursively and each file's metadata must be
%% read.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be better for queue types to increment/decrement an internal counter (that may be stored in persistent_term).

-spec scan(Dir) -> {ok, Size} | {error, Error} when
Dir :: filename:filename_all(),
Size :: non_neg_integer(),
Error :: not_directory | file:posix() | badarg.
scan(Dir) ->
case file:read_file_info(Dir) of
{ok, #file_info{type = directory, size = S}} ->
{ok, Gatherer} = gatherer:start_link(),
scan_directory(Dir, Gatherer),
Size = sum(Gatherer, S),
gatherer:stop(Gatherer),
{ok, Size};
{ok, #file_info{}} ->
{error, not_directory};
{error, _} = Err ->
Err
end.

scan_directory(Dir, Gatherer) ->
gatherer:fork(Gatherer),
worker_pool:submit_async(fun() -> scan_directory0(Dir, Gatherer) end).

scan_directory0(Dir, Gatherer) ->
link(Gatherer),
Size = case file:list_dir_all(Dir) of
{ok, Entries} ->
lists:foldl(
fun(Entry, Acc) ->
Path = filename:join(Dir, Entry),
case file:read_file_info(Path) of
{ok, #file_info{type = directory,
size = S}} ->
scan_directory(Path, Gatherer),
Acc + S;
{ok, #file_info{size = S}} ->
Acc + S;
_ ->
Acc
end
end, 0, Entries);
_ ->
0
end,
gatherer:in(Gatherer, Size),
gatherer:finish(Gatherer),
unlink(Gatherer),
ok.

-spec sum(pid(), non_neg_integer()) -> non_neg_integer().
sum(Gatherer, Size) ->
case gatherer:out(Gatherer) of
empty ->
Size;
{value, S} ->
sum(Gatherer, Size + S)
end.
10 changes: 10 additions & 0 deletions deps/rabbit/src/rabbit_queue_type.erl
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,16 @@

-callback queue_vm_ets() -> {StatsKeys :: [atom()], ETSNames:: [[atom()]]}.

%% The disk usage limit for the queue type, if any.
-callback disk_limit() -> rabbit_queue_type_disk_monitor:disk_usage_limit_spec() | undefined.
%% Calculate the disk space in bytes of the queue type.
%% This callback is optional but must be implemented if `disk_limit/0' is
%% defined.
-callback disk_footprint() -> {ok, Bytes :: non_neg_integer()} | {error, file:posix()}.

-optional_callbacks([disk_footprint/0,
disk_limit/0]).

-spec discover(binary() | atom()) -> queue_type().
discover(<<"undefined">>) ->
fallback();
Expand Down
159 changes: 159 additions & 0 deletions deps/rabbit/src/rabbit_queue_type_disk_monitor.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%

-module(rabbit_queue_type_disk_monitor).

%% A server for alarming on high disk usage per queue type.
%%
%% The server scans periodically and checks each queue type against its limit
%% using the `disk_footprint/0' and `disk_limit/0' callbacks in
%% `rabbit_queue_type'. Typically this callback uses `rabbit_disk_usage:scan/1'.
%%
%% Also see `rabbit_disk_monitoring' which periodically checks the total space
%% taken on the mounted disk containing `rabbit:data_dir/0'.

-include_lib("kernel/include/logger.hrl").

-behaviour(gen_server).

-export([start_link/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).

-record(limit, {queue_type :: queue_type(),
type_module :: module(),
limit :: Bytes :: non_neg_integer()}).

-record(state, {limits :: [#limit{}],
alarmed = alarmed() :: alarmed(),
timer :: timer:tref() | undefined}).

-type queue_type() :: atom().
-type alarmed() :: sets:set(queue_type()).

-type disk_usage_limit_spec() :: %% A total number of bytes
{absolute, non_neg_integer()} |
%% %% A fraction of the disk's capacity.
%% {relative, float()} |
%% A string which will be parsed and
%% interpreted as an absolute limit.
string().

%%----------------------------------------------------------------------------

start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

init([]) ->
Limits = lists:foldl(
fun({Type, TypeModule}, Acc) ->
case get_limit(Type, TypeModule) of
{ok, Limit} ->
[#limit{queue_type = Type,
type_module = TypeModule,
limit = Limit} | Acc];
error ->
Acc
end
end, [], rabbit_registry:lookup_all(queue)),
Timer = erlang:send_after(5_000, self(), scan),
{ok, #state{limits = Limits, timer = Timer}}.

handle_call(_Request, _From, State) ->
{noreply, State}.

handle_cast(_Request, State) ->
{noreply, State}.

handle_info(scan, #state{alarmed = Alarmed0} = State) ->
Alarmed = lists:foldl(fun scan/2, alarmed(), State#state.limits),
ok = handle_alarmed(Alarmed0, Alarmed),
Timer = erlang:send_after(5_000, self(), scan),
{noreply, State#state{alarmed = Alarmed, timer = Timer}};
handle_info(Info, State) ->
?LOG_DEBUG("~tp unhandled msg: ~tp", [?MODULE, Info]),
{noreply, State}.

terminate(_Reason, _State) ->
ok.

code_change(_OldVsn, State, _Extra) ->
{ok, State}.

%%----------------------------------------------------------------------------

alarmed() -> sets:new([{version, 2}]).

-spec get_limit(atom(), module()) -> {ok, disk_usage_limit_spec()} | error.
get_limit(QType, QTypeModule) ->
try QTypeModule:disk_limit() of
undefined ->
error;
{absolute, Abs} when is_integer(Abs) andalso Abs >= 0 ->
{ok, Abs};
%% {relative, Rel} when is_float(Rel) andalso Rel >= 0.0 ->
%% TODO: to convert to abs we need to cache the disk capacity for
%% the first `relative' spec we see.
%% Do we even need relative? Should it be proportional to the disk
%% capacity or to the other components?
%% {ok, {relative, Rel}};
String when is_list(String) ->
case rabbit_resource_monitor_misc:parse_information_unit(String) of
{ok, Bytes} ->
{ok, Bytes};
{error, parse_error} ->
?LOG_WARNING("Unable to parse disk limit ~tp for queue "
"type '~ts'", [String, QType]),
error
end
catch
error:undef ->
error
end.

-spec scan(Limit :: #limit{}, alarmed()) -> alarmed().
scan(#limit{queue_type = QType,
type_module = QTypeModule,
limit = Limit}, Alarmed) ->
%% NOTE: `disk_footprint/0' is an optional callback but it should always
%% be implemented if the queue type implements `disk_limit/0'.
case QTypeModule:disk_footprint() of
{ok, Bytes} ->
%% TODO: remove this printf debugging...
?LOG_INFO("Measured queue type '~ts' at ~p bytes (limit ~p)", [QType, Bytes, Limit]),
case Bytes >= Limit of
true -> sets:add_element(QTypeModule, Alarmed);
false -> Alarmed
end;
{error, enoent} ->
Alarmed;
{error, Error} ->
?LOG_WARNING("Failed to calculate disk usage of queue type '~ts': "
"~tp", [QType, Error]),
Alarmed
end.

-spec handle_alarmed(Before :: alarmed(), After :: alarmed()) -> ok.
handle_alarmed(NoChange, NoChange) ->
ok;
handle_alarmed(Before, After) ->
Added = sets:subtract(After, Before),
?LOG_WARNING("Newly alarmed: ~p", [Added]),
ok = sets:fold(
fun(QType, ok) ->
rabbit_alarm:set_alarm({alarm(QType), []})
end, ok, Added),
Removed = sets:subtract(Before, After),
?LOG_WARNING("Stopped alarming: ~p", [Removed]),
ok = sets:fold(
fun(QType, ok) ->
rabbit_alarm:clear_alarm(alarm(QType))
end, ok, Removed),
ok.

alarm(QType) ->
{resource_limit, {queue_type_disk, QType}, node()}.
Loading
Loading