diff --git a/deps/rabbit/priv/schema/rabbit.schema b/deps/rabbit/priv/schema/rabbit.schema index ba20e864fdb3..79c09fff67e1 100644 --- a/deps/rabbit/priv/schema/rabbit.schema +++ b/deps/rabbit/priv/schema/rabbit.schema @@ -1241,6 +1241,26 @@ fun(Conf) -> end end}. +%% +%% Per-queue-type disk limits +%% ===================== +%% + +%% TODO: do relative limits make sense - what would they be relative to? The +%% full disk size? +%% {mapping, "quorum_queue_disk_limit.relative", "rabbit.quorum_queue_disk_limit", [ +%% {datatype, float}]}. + +{mapping, "quorum_queue_disk_limit.absolute", "rabbit.quorum_queue_disk_limit", [ + {datatype, [integer, string]}, + {validators, ["is_supported_information_unit"]} +]}. + +{mapping, "stream_queue_disk_limit.absolute", "rabbit.stream_queue_disk_limit", [ + {datatype, [integer, string]}, + {validators, ["is_supported_information_unit"]} +]}. + %% %% Clustering %% ===================== diff --git a/deps/rabbit/src/rabbit_alarm.erl b/deps/rabbit/src/rabbit_alarm.erl index ef5b55dfa9f8..c4096c081aca 100644 --- a/deps/rabbit/src/rabbit_alarm.erl +++ b/deps/rabbit/src/rabbit_alarm.erl @@ -46,7 +46,7 @@ -export_type([alarm/0]). -type local_alarm() :: 'file_descriptor_limit'. --type resource_alarm_source() :: 'disk' | 'memory'. +-type resource_alarm_source() :: 'disk' | 'memory' | {queue_type_disk, atom()}. -type resource_alarm() :: {resource_limit, resource_alarm_source(), node()}. -type alarm() :: local_alarm() | resource_alarm(). -type resource_alert() :: {WasAlarmSetForNode :: boolean(), @@ -138,6 +138,11 @@ format_as_map({resource_limit, memory, Node}) -> <<"resource">> => ?MEMORY_RESOURCE, <<"node">> => Node }; +format_as_map({resource_limit, {queue_type_disk, QType}, Node}) -> + #{ + <<"resource">> => todo, + <<"node">> => Node + }; format_as_map({resource_limit, Limit, Node}) -> #{ <<"resource">> => rabbit_data_coercion:to_binary(Limit), @@ -291,7 +296,7 @@ maybe_alert(UpdateFun, Node, Source, WasAlertAdded, StillHasAlerts = lists:any(fun ({_Node, NodeAlerts}) -> lists:member(Source, NodeAlerts) end, dict:to_list(AN1)), case StillHasAlerts of true -> ok; - false -> rabbit_log:warning("~ts resource limit alarm cleared across the cluster", [Source]) + false -> rabbit_log:warning("~tp resource limit alarm cleared across the cluster", [Source]) end, Alert = {WasAlertAdded, StillHasAlerts, Node}, case node() of @@ -326,9 +331,11 @@ internal_register(Pid, {M, F, A} = AlertMFA, NewAlertees = dict:store(Pid, AlertMFA, Alertees), State#alarms{alertees = NewAlertees}. +%% TODO: handle formatting of resources in these: + handle_set_resource_alarm(Source, Node, State) -> rabbit_log:warning( - "~ts resource limit alarm set on node ~tp.~n~n" + "~tp resource limit alarm set on node ~tp.~n~n" "**********************************************************~n" "*** Publishers will be blocked until this alarm clears ***~n" "**********************************************************~n", @@ -347,7 +354,7 @@ handle_set_alarm(Alarm, State) -> {ok, State}. handle_clear_resource_alarm(Source, Node, State) -> - rabbit_log:warning("~ts resource limit alarm cleared on node ~tp", + rabbit_log:warning("~tp resource limit alarm cleared on node ~tp", [Source, Node]), {ok, maybe_alert(fun dict_unappend/3, Node, Source, false, State)}. diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index ed5b58845a59..85490fbd8ef9 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -163,6 +163,7 @@ delivery_flow :: flow | noflow, interceptor_state, queue_states, + queue_types_published :: sets:set(QType :: atom()), tick_timer, publishing_mode = false :: boolean() }). @@ -527,7 +528,8 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, reply_consumer = none, delivery_flow = Flow, interceptor_state = undefined, - queue_states = rabbit_queue_type:init() + queue_states = rabbit_queue_type:init(), + queue_types_published = sets:new([{version, 2}]) }, State1 = State#ch{ interceptor_state = rabbit_channel_interceptor:init(State)}, @@ -2057,9 +2059,10 @@ deliver_to_queues(XName, ok = process_routing_mandatory(Mandatory, RoutedToQueues, Message, XName, State0), MsgSeqNo = maps:get(correlation, Options, undefined), State1 = process_routing_confirm(MsgSeqNo, QueueNames, XName, State0), + State2 = notify_published_queue_types(Qs, State1), %% Actions must be processed after registering confirms as actions may %% contain rejections of publishes - State = handle_queue_actions(Actions, State1#ch{queue_states = QueueStates}), + State = handle_queue_actions(Actions, State2#ch{queue_states = QueueStates}), case rabbit_event:stats_level(State, #ch.stats_timer) of fine -> ?INCR_STATS(exchange_stats, XName, 1, publish), @@ -2082,6 +2085,27 @@ deliver_to_queues(XName, [rabbit_misc:rs(Resource)]) end. +notify_published_queue_types(Qs, + #ch{cfg = #conf{reader_pid = ReaderPid}, + queue_types_published = QTypes0} = State0) -> + QTypes = lists:foldl( + fun(Q0, Acc) -> + Q = case Q0 of + {Q1, _RouteInfo} -> Q1; + _ -> Q0 + end, + QType = amqqueue:get_type(Q), + case sets:is_element(QType, Acc) of + true -> + Acc; + false -> + ReaderPid ! {channel_published_to_queue_type, + self(), QType}, + sets:add_element(QType, Acc) + end + end, QTypes0, Qs), + State0#ch{queue_types_published = QTypes}. + process_routing_mandatory(_Mandatory = true, _RoutedToQs = [], Msg, diff --git a/deps/rabbit/src/rabbit_disk_usage.erl b/deps/rabbit/src/rabbit_disk_usage.erl new file mode 100644 index 000000000000..53a176a7ef93 --- /dev/null +++ b/deps/rabbit/src/rabbit_disk_usage.erl @@ -0,0 +1,76 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% + +-module(rabbit_disk_usage). + +%% Functions for calculating disk usage of a given directory. + +-include_lib("kernel/include/file.hrl"). + +-export([scan/1]). + +%% @doc Calculates the disk usage in bytes of the given directory. +%% +%% On especially large directories this can be an expensive operation since +%% each sub-directory is scanned recursively and each file's metadata must be +%% read. +-spec scan(Dir) -> {ok, Size} | {error, Error} when + Dir :: filename:filename_all(), + Size :: non_neg_integer(), + Error :: not_directory | file:posix() | badarg. +scan(Dir) -> + case file:read_file_info(Dir) of + {ok, #file_info{type = directory, size = S}} -> + {ok, Gatherer} = gatherer:start_link(), + scan_directory(Dir, Gatherer), + Size = sum(Gatherer, S), + gatherer:stop(Gatherer), + {ok, Size}; + {ok, #file_info{}} -> + {error, not_directory}; + {error, _} = Err -> + Err + end. + +scan_directory(Dir, Gatherer) -> + gatherer:fork(Gatherer), + worker_pool:submit_async(fun() -> scan_directory0(Dir, Gatherer) end). + +scan_directory0(Dir, Gatherer) -> + link(Gatherer), + Size = case file:list_dir_all(Dir) of + {ok, Entries} -> + lists:foldl( + fun(Entry, Acc) -> + Path = filename:join(Dir, Entry), + case file:read_file_info(Path) of + {ok, #file_info{type = directory, + size = S}} -> + scan_directory(Path, Gatherer), + Acc + S; + {ok, #file_info{size = S}} -> + Acc + S; + _ -> + Acc + end + end, 0, Entries); + _ -> + 0 + end, + gatherer:in(Gatherer, Size), + gatherer:finish(Gatherer), + unlink(Gatherer), + ok. + +-spec sum(pid(), non_neg_integer()) -> non_neg_integer(). +sum(Gatherer, Size) -> + case gatherer:out(Gatherer) of + empty -> + Size; + {value, S} -> + sum(Gatherer, Size + S) + end. diff --git a/deps/rabbit/src/rabbit_queue_type.erl b/deps/rabbit/src/rabbit_queue_type.erl index d11b1ec14fa8..c761e7403a5e 100644 --- a/deps/rabbit/src/rabbit_queue_type.erl +++ b/deps/rabbit/src/rabbit_queue_type.erl @@ -304,6 +304,16 @@ -callback queue_vm_ets() -> {StatsKeys :: [atom()], ETSNames:: [[atom()]]}. +%% The disk usage limit for the queue type, if any. +-callback disk_limit() -> rabbit_queue_type_disk_monitor:disk_usage_limit_spec() | undefined. +%% Calculate the disk space in bytes of the queue type. +%% This callback is optional but must be implemented if `disk_limit/0' is +%% defined. +-callback disk_footprint() -> {ok, Bytes :: non_neg_integer()} | {error, file:posix()}. + +-optional_callbacks([disk_footprint/0, + disk_limit/0]). + -spec discover(binary() | atom()) -> queue_type(). discover(<<"undefined">>) -> fallback(); diff --git a/deps/rabbit/src/rabbit_queue_type_disk_monitor.erl b/deps/rabbit/src/rabbit_queue_type_disk_monitor.erl new file mode 100644 index 000000000000..7d7f68a54346 --- /dev/null +++ b/deps/rabbit/src/rabbit_queue_type_disk_monitor.erl @@ -0,0 +1,159 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% + +-module(rabbit_queue_type_disk_monitor). + +%% A server for alarming on high disk usage per queue type. +%% +%% The server scans periodically and checks each queue type against its limit +%% using the `disk_footprint/0' and `disk_limit/0' callbacks in +%% `rabbit_queue_type'. Typically this callback uses `rabbit_disk_usage:scan/1'. +%% +%% Also see `rabbit_disk_monitoring' which periodically checks the total space +%% taken on the mounted disk containing `rabbit:data_dir/0'. + +-include_lib("kernel/include/logger.hrl"). + +-behaviour(gen_server). + +-export([start_link/0]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-record(limit, {queue_type :: queue_type(), + type_module :: module(), + limit :: Bytes :: non_neg_integer()}). + +-record(state, {limits :: [#limit{}], + alarmed = alarmed() :: alarmed(), + timer :: timer:tref() | undefined}). + +-type queue_type() :: atom(). +-type alarmed() :: sets:set(queue_type()). + +-type disk_usage_limit_spec() :: %% A total number of bytes + {absolute, non_neg_integer()} | + %% %% A fraction of the disk's capacity. + %% {relative, float()} | + %% A string which will be parsed and + %% interpreted as an absolute limit. + string(). + +%%---------------------------------------------------------------------------- + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +init([]) -> + Limits = lists:foldl( + fun({Type, TypeModule}, Acc) -> + case get_limit(Type, TypeModule) of + {ok, Limit} -> + [#limit{queue_type = Type, + type_module = TypeModule, + limit = Limit} | Acc]; + error -> + Acc + end + end, [], rabbit_registry:lookup_all(queue)), + Timer = erlang:send_after(5_000, self(), scan), + {ok, #state{limits = Limits, timer = Timer}}. + +handle_call(_Request, _From, State) -> + {noreply, State}. + +handle_cast(_Request, State) -> + {noreply, State}. + +handle_info(scan, #state{alarmed = Alarmed0} = State) -> + Alarmed = lists:foldl(fun scan/2, alarmed(), State#state.limits), + ok = handle_alarmed(Alarmed0, Alarmed), + Timer = erlang:send_after(5_000, self(), scan), + {noreply, State#state{alarmed = Alarmed, timer = Timer}}; +handle_info(Info, State) -> + ?LOG_DEBUG("~tp unhandled msg: ~tp", [?MODULE, Info]), + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%---------------------------------------------------------------------------- + +alarmed() -> sets:new([{version, 2}]). + +-spec get_limit(atom(), module()) -> {ok, disk_usage_limit_spec()} | error. +get_limit(QType, QTypeModule) -> + try QTypeModule:disk_limit() of + undefined -> + error; + {absolute, Abs} when is_integer(Abs) andalso Abs >= 0 -> + {ok, Abs}; + %% {relative, Rel} when is_float(Rel) andalso Rel >= 0.0 -> + %% TODO: to convert to abs we need to cache the disk capacity for + %% the first `relative' spec we see. + %% Do we even need relative? Should it be proportional to the disk + %% capacity or to the other components? + %% {ok, {relative, Rel}}; + String when is_list(String) -> + case rabbit_resource_monitor_misc:parse_information_unit(String) of + {ok, Bytes} -> + {ok, Bytes}; + {error, parse_error} -> + ?LOG_WARNING("Unable to parse disk limit ~tp for queue " + "type '~ts'", [String, QType]), + error + end + catch + error:undef -> + error + end. + +-spec scan(Limit :: #limit{}, alarmed()) -> alarmed(). +scan(#limit{queue_type = QType, + type_module = QTypeModule, + limit = Limit}, Alarmed) -> + %% NOTE: `disk_footprint/0' is an optional callback but it should always + %% be implemented if the queue type implements `disk_limit/0'. + case QTypeModule:disk_footprint() of + {ok, Bytes} -> + %% TODO: remove this printf debugging... + ?LOG_INFO("Measured queue type '~ts' at ~p bytes (limit ~p)", [QType, Bytes, Limit]), + case Bytes >= Limit of + true -> sets:add_element(QTypeModule, Alarmed); + false -> Alarmed + end; + {error, enoent} -> + Alarmed; + {error, Error} -> + ?LOG_WARNING("Failed to calculate disk usage of queue type '~ts': " + "~tp", [QType, Error]), + Alarmed + end. + +-spec handle_alarmed(Before :: alarmed(), After :: alarmed()) -> ok. +handle_alarmed(NoChange, NoChange) -> + ok; +handle_alarmed(Before, After) -> + Added = sets:subtract(After, Before), + ?LOG_WARNING("Newly alarmed: ~p", [Added]), + ok = sets:fold( + fun(QType, ok) -> + rabbit_alarm:set_alarm({alarm(QType), []}) + end, ok, Added), + Removed = sets:subtract(Before, After), + ?LOG_WARNING("Stopped alarming: ~p", [Removed]), + ok = sets:fold( + fun(QType, ok) -> + rabbit_alarm:clear_alarm(alarm(QType)) + end, ok, Removed), + ok. + +alarm(QType) -> + {resource_limit, {queue_type_disk, QType}, node()}. diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 9c0e7fd9ca3e..33fece2720c1 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -67,6 +67,8 @@ -export([notify_decorators/1, notify_decorators/3, spawn_notify_decorators/3]). +-export([disk_footprint/0, + disk_limit/0]). -export([is_enabled/0, is_compatible/3, @@ -2066,6 +2068,19 @@ notify_decorators(QName, F, A) -> is_stateful() -> true. +-spec disk_footprint() -> {ok, Bytes :: non_neg_integer()} | {error, file:posix()}. +disk_footprint() -> + case ra_system:fetch(?RA_SYSTEM) of + #{data_dir := Dir} -> + rabbit_disk_usage:scan(Dir); + _ -> + {ok, 0} + end. + +-spec disk_limit() -> rabbit_queue_type_disk_monitor:disk_usage_limit_spec() | undefined. +disk_limit() -> + application:get_env(rabbit, quorum_queue_disk_limit, undefined). + force_shrink_member_to_current_member(VHost, Name) -> Node = node(), QName = rabbit_misc:r(VHost, queue, Name), diff --git a/deps/rabbit/src/rabbit_reader.erl b/deps/rabbit/src/rabbit_reader.erl index e89595e469b3..d2933f7f6d71 100644 --- a/deps/rabbit/src/rabbit_reader.erl +++ b/deps/rabbit/src/rabbit_reader.erl @@ -109,16 +109,19 @@ %% never | timestamp() last_blocked_at, %% a set of the reasons why we are - %% blocked: {resource, memory}, {resource, disk}. + %% blocked: {resource, memory}, {resource, disk}, flow, ... %% More reasons can be added in the future. - blocked_by, + blocked_by :: sets:set(flow | {resource, memory | disk | {queue_type_disk, atom()}}), %% true if received any publishes, false otherwise %% note that this will also be true when connection is %% already blocked should_block, %% true if we had we sent a connection.blocked, %% false otherwise - connection_blocked_message_sent + connection_blocked_message_sent, + %% + published_to_queue_types = #{} :: #{ChPid :: pid() => + sets:set(QType :: atom())} }). -define(CREATION_EVENT_KEYS, @@ -608,6 +611,14 @@ handle_other({channel_closing, ChPid}, State) -> ok = rabbit_channel:ready_for_close(ChPid), {_, State1} = channel_cleanup(ChPid, State), maybe_close(control_throttle(State1)); +handle_other({channel_published_to_queue_type, ChPid, QType}, + #v1{throttle = Throttle0} = State0) -> + QTypes = maps:update_with( + ChPid, fun(Ts) -> sets:add_element(QType, Ts) end, + sets:from_list([QType], [{version, 2}]), + Throttle0#throttle.published_to_queue_types), + Throttle = Throttle0#throttle{published_to_queue_types = QTypes}, + State0#v1{throttle = Throttle}; handle_other({'EXIT', Parent, normal}, State = #v1{parent = Parent}) -> %% rabbitmq/rabbitmq-server#544 %% The connection port process has exited due to the TCP socket being closed. @@ -1004,14 +1015,22 @@ is_over_node_channel_limit() -> end end. -channel_cleanup(ChPid, State = #v1{channel_count = ChannelCount}) -> +channel_cleanup(ChPid, #v1{channel_count = ChannelCount, + throttle = Throttle0} = State) -> case get({ch_pid, ChPid}) of - undefined -> {undefined, State}; - {Channel, MRef} -> credit_flow:peer_down(ChPid), - erase({channel, Channel}), - erase({ch_pid, ChPid}), - erlang:demonitor(MRef, [flush]), - {Channel, State#v1{channel_count = ChannelCount - 1}} + undefined -> + {undefined, State}; + {Channel, MRef} -> + credit_flow:peer_down(ChPid), + erase({channel, Channel}), + erase({ch_pid, ChPid}), + erlang:demonitor(MRef, [flush]), + %% TODO: reevaluate throttle now that the connection may no longer + %% have a publisher to a queue type which is in disk alarm. + QTypes = maps:remove(ChPid, Throttle0#throttle.published_to_queue_types), + Throttle = Throttle0#throttle{published_to_queue_types = QTypes}, + {Channel, State#v1{channel_count = ChannelCount - 1, + throttle = Throttle}} end. all_channels() -> [ChPid || {{ch_pid, ChPid}, _ChannelMRef} <- get()]. @@ -1730,7 +1749,9 @@ blocked_by_message(#throttle{blocked_by = Reasons}) -> format_blocked_by({resource, memory}) -> "memory"; format_blocked_by({resource, disk}) -> "disk"; -format_blocked_by({resource, disc}) -> "disk". +format_blocked_by({resource, disc}) -> "disk"; +format_blocked_by({resource, {queue_type_disk, QType}}) -> + lists:flatten(io_lib:format("~ts disk", [QType])). update_last_blocked_at(Throttle) -> Throttle#throttle{last_blocked_at = erlang:monotonic_time()}. @@ -1738,22 +1759,45 @@ update_last_blocked_at(Throttle) -> connection_blocked_message_sent( #throttle{connection_blocked_message_sent = BS}) -> BS. -should_send_blocked(Throttle = #throttle{blocked_by = Reasons}) -> +should_send_blocked(Throttle) -> should_block(Throttle) andalso - sets:size(sets:del_element(flow, Reasons)) =/= 0 + do_throttle_reasons_apply(Throttle) andalso not connection_blocked_message_sent(Throttle). -should_send_unblocked(Throttle = #throttle{blocked_by = Reasons}) -> +should_send_unblocked(Throttle) -> connection_blocked_message_sent(Throttle) andalso - sets:size(sets:del_element(flow, Reasons)) == 0. + not do_throttle_reasons_apply(Throttle). + +do_throttle_reasons_apply(#throttle{blocked_by = Reasons} = Throttle) -> + lists:any( + fun ({resource, disk}) -> + true; + ({resource, memory}) -> + true; + ({resource, {queue_type_disk, QType}}) -> + has_published_to_queue_type(QType, Throttle); + (_) -> + %% NOTE: flow reason is ignored. + false + end, sets:to_list(Reasons)). + +has_published_to_queue_type( + QType, #throttle{published_to_queue_types = QTypes}) -> + rabbit_misc:maps_any( + fun(_ChPid, ChQTypes) -> sets:is_element(QType, ChQTypes) end, QTypes). %% Returns true if we have a reason to block %% this connection. -has_reasons_to_block(#throttle{blocked_by = Reasons}) -> - sets:size(Reasons) > 0. +has_reasons_to_block(#throttle{blocked_by = Reasons} = Throttle) -> + lists:any( + fun ({resource, {queue_type_disk, QType}}) -> + has_published_to_queue_type(QType, Throttle); + (_) -> + true + end, sets:to_list(Reasons)). is_blocked_by_flow(#throttle{blocked_by = Reasons}) -> sets:is_element(flow, Reasons). diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index dc240e04eee1..d2c883a6823f 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -40,7 +40,9 @@ format/2, capabilities/0, notify_decorators/1, - is_stateful/0]). + is_stateful/0, + disk_footprint/0, + disk_limit/0]). -export([list_with_minimum_quorum/0]). @@ -1357,6 +1359,19 @@ notify_decorators(Q) when ?is_amqqueue(Q) -> %% Not supported ok. +-spec disk_footprint() -> {ok, Bytes :: non_neg_integer()} | {error, file:posix()}. +disk_footprint() -> + case application:get_env(osiris, data_dir) of + {ok, Dir} -> + rabbit_disk_usage:scan(Dir); + _ -> + {ok, 0} + end. + +-spec disk_limit() -> rabbit_queue_type_disk_monitor:disk_usage_limit_spec() | undefined. +disk_limit() -> + application:get_env(rabbit, stream_queue_disk_limit, undefined). + resend_all(#stream_client{leader = LeaderPid, writer_id = WriterId, correlation = Corrs} = State) -> diff --git a/example.conf b/example.conf new file mode 100644 index 000000000000..18062994fb08 --- /dev/null +++ b/example.conf @@ -0,0 +1,2 @@ +quorum_queue_disk_limit.absolute = 2GiB +stream_queue_disk_limit.absolute = 10GiB