diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl index 2f841c8f804..d61fa46170a 100644 --- a/deps/rabbit/src/rabbit_fifo.erl +++ b/deps/rabbit/src/rabbit_fifo.erl @@ -514,7 +514,8 @@ apply(#{index := _Idx}, #garbage_collection{}, State) -> {State, ok, [{aux, garbage_collection}]}; apply(Meta, {timeout, expire_msgs}, State) -> checkout(Meta, State, State, []); -apply(#{system_time := Ts} = Meta, +apply(#{machine_version := Vsn, + system_time := Ts} = Meta, {down, Pid, noconnection}, #?STATE{consumers = Cons0, cfg = #cfg{consumer_strategy = single_active}, @@ -524,7 +525,7 @@ apply(#{system_time := Ts} = Meta, %% if the pid refers to an active or cancelled consumer, %% mark it as suspected and return it to the waiting queue {State1, Effects0} = - maps:fold( + rabbit_fifo_maps:fold( fun(CKey, ?CONSUMER_PID(P) = C0, {S0, E0}) when node(P) =:= Node -> %% the consumer should be returned to waiting @@ -546,7 +547,7 @@ apply(#{system_time := Ts} = Meta, Effs1}; (_, _, S) -> S - end, {State0, []}, Cons0), + end, {State0, []}, Cons0, Vsn), WaitingConsumers = update_waiting_consumer_status(Node, State1, suspected_down), @@ -561,7 +562,8 @@ apply(#{system_time := Ts} = Meta, end, Enqs0), Effects = [{monitor, node, Node} | Effects1], checkout(Meta, State0, State#?STATE{enqueuers = Enqs}, Effects); -apply(#{system_time := Ts} = Meta, +apply(#{machine_version := Vsn, + system_time := Ts} = Meta, {down, Pid, noconnection}, #?STATE{consumers = Cons0, enqueuers = Enqs0} = State0) -> @@ -576,7 +578,7 @@ apply(#{system_time := Ts} = Meta, Node = node(Pid), {State, Effects1} = - maps:fold( + rabbit_fifo_maps:fold( fun(CKey, #consumer{cfg = #consumer_cfg{pid = P}, status = up} = C0, {St0, Eff}) when node(P) =:= Node -> @@ -587,7 +589,7 @@ apply(#{system_time := Ts} = Meta, {St, Eff1}; (_, _, {St, Eff}) -> {St, Eff} - end, {State0, []}, Cons0), + end, {State0, []}, Cons0, Vsn), Enqs = maps:map(fun(P, E) when node(P) =:= Node -> E#enqueuer{status = suspected_down}; (_, E) -> E @@ -603,15 +605,17 @@ apply(#{system_time := Ts} = Meta, apply(Meta, {down, Pid, _Info}, State0) -> {State1, Effects1} = activate_next_consumer(handle_down(Meta, Pid, State0)), checkout(Meta, State0, State1, Effects1); -apply(Meta, {nodeup, Node}, #?STATE{consumers = Cons0, - enqueuers = Enqs0, - service_queue = _SQ0} = State0) -> +apply(#{machine_version := Vsn} = Meta, + {nodeup, Node}, + #?STATE{consumers = Cons0, + enqueuers = Enqs0, + service_queue = _SQ0} = State0) -> %% A node we are monitoring has come back. %% If we have suspected any processes of being %% down we should now re-issue the monitors for them to detect if they're %% actually down or not Monitors = [{monitor, process, P} - || P <- suspected_pids_for(Node, State0)], + || P <- suspected_pids_for(Node, Vsn, State0)], Enqs1 = maps:map(fun(P, E) when node(P) =:= Node -> E#enqueuer{status = up}; @@ -620,17 +624,18 @@ apply(Meta, {nodeup, Node}, #?STATE{consumers = Cons0, ConsumerUpdateActiveFun = consumer_active_flag_update_function(State0), %% mark all consumers as up {State1, Effects1} = - maps:fold(fun(ConsumerKey, ?CONSUMER_PID(P) = C, {SAcc, EAcc}) - when (node(P) =:= Node) and - (C#consumer.status =/= cancelled) -> - EAcc1 = ConsumerUpdateActiveFun(SAcc, ConsumerKey, - C, true, up, EAcc), - {update_or_remove_con(Meta, ConsumerKey, - C#consumer{status = up}, - SAcc), EAcc1}; - (_, _, Acc) -> - Acc - end, {State0, Monitors}, Cons0), + rabbit_fifo_maps:fold( + fun(ConsumerKey, ?CONSUMER_PID(P) = C, {SAcc, EAcc}) + when (node(P) =:= Node) and + (C#consumer.status =/= cancelled) -> + EAcc1 = ConsumerUpdateActiveFun(SAcc, ConsumerKey, + C, true, up, EAcc), + {update_or_remove_con(Meta, ConsumerKey, + C#consumer{status = up}, + SAcc), EAcc1}; + (_, _, Acc) -> + Acc + end, {State0, Monitors}, Cons0, Vsn), Waiting = update_waiting_consumer_status(Node, State1, up), State2 = State1#?STATE{enqueuers = Enqs1, waiting_consumers = Waiting}, @@ -708,27 +713,29 @@ convert_v3_to_v4(#{} = _Meta, StateV3) -> msg_cache = rabbit_fifo_v3:get_field(msg_cache, StateV3), unused_1 = []}. -purge_node(Meta, Node, State, Effects) -> +purge_node(#{machine_version := Vsn} = Meta, Node, State, Effects) -> lists:foldl(fun(Pid, {S0, E0}) -> {S, E} = handle_down(Meta, Pid, S0), {S, E0 ++ E} end, {State, Effects}, - all_pids_for(Node, State)). + all_pids_for(Node, Vsn, State)). %% any downs that are not noconnection -handle_down(Meta, Pid, #?STATE{consumers = Cons0, - enqueuers = Enqs0} = State0) -> +handle_down(#{machine_version := Vsn} = Meta, + Pid, #?STATE{consumers = Cons0, + enqueuers = Enqs0} = State0) -> % Remove any enqueuer for the down pid State1 = State0#?STATE{enqueuers = maps:remove(Pid, Enqs0)}, {Effects1, State2} = handle_waiting_consumer_down(Pid, State1), % return checked out messages to main queue % Find the consumers for the down pid - DownConsumers = maps:keys(maps:filter(fun(_CKey, ?CONSUMER_PID(P)) -> - P =:= Pid - end, Cons0)), + DownConsumers = maps:filter(fun(_CKey, ?CONSUMER_PID(P)) -> + P =:= Pid + end, Cons0), + DownConsumerKeys = rabbit_fifo_maps:keys(DownConsumers, Vsn), lists:foldl(fun(ConsumerKey, {S, E}) -> cancel_consumer(Meta, ConsumerKey, S, E, down) - end, {State2, Effects1}, DownConsumers). + end, {State2, Effects1}, DownConsumerKeys). consumer_active_flag_update_function( #?STATE{cfg = #cfg{consumer_strategy = competing}}) -> @@ -916,14 +923,15 @@ get_checked_out(CKey, From, To, #?STATE{consumers = Consumers}) -> end. -spec version() -> pos_integer(). -version() -> 5. +version() -> 6. which_module(0) -> rabbit_fifo_v0; which_module(1) -> rabbit_fifo_v1; which_module(2) -> rabbit_fifo_v3; which_module(3) -> rabbit_fifo_v3; which_module(4) -> ?MODULE; -which_module(5) -> ?MODULE. +which_module(5) -> ?MODULE; +which_module(6) -> ?MODULE. -define(AUX, aux_v3). @@ -2692,41 +2700,45 @@ all_nodes(#?STATE{consumers = Cons0, Acc#{node(P) => ok} end, Nodes1, WaitingConsumers0)). -all_pids_for(Node, #?STATE{consumers = Cons0, - enqueuers = Enqs0, - waiting_consumers = WaitingConsumers0}) -> - Cons = maps:fold(fun(_, ?CONSUMER_PID(P), Acc) - when node(P) =:= Node -> - [P | Acc]; - (_, _, Acc) -> Acc - end, [], Cons0), - Enqs = maps:fold(fun(P, _, Acc) - when node(P) =:= Node -> - [P | Acc]; - (_, _, Acc) -> Acc - end, Cons, Enqs0), +all_pids_for(Node, Vsn, #?STATE{consumers = Cons0, + enqueuers = Enqs0, + waiting_consumers = WaitingConsumers0}) -> + Cons = rabbit_fifo_maps:fold(fun(_, ?CONSUMER_PID(P), Acc) + when node(P) =:= Node -> + [P | Acc]; + (_, _, Acc) -> + Acc + end, [], Cons0, Vsn), + Enqs = rabbit_fifo_maps:fold(fun(P, _, Acc) + when node(P) =:= Node -> + [P | Acc]; + (_, _, Acc) -> + Acc + end, Cons, Enqs0, Vsn), lists:foldl(fun({_, ?CONSUMER_PID(P)}, Acc) when node(P) =:= Node -> [P | Acc]; (_, Acc) -> Acc end, Enqs, WaitingConsumers0). -suspected_pids_for(Node, #?STATE{consumers = Cons0, - enqueuers = Enqs0, - waiting_consumers = WaitingConsumers0}) -> - Cons = maps:fold(fun(_Key, - #consumer{cfg = #consumer_cfg{pid = P}, - status = suspected_down}, - Acc) - when node(P) =:= Node -> - [P | Acc]; - (_, _, Acc) -> Acc - end, [], Cons0), - Enqs = maps:fold(fun(P, #enqueuer{status = suspected_down}, Acc) - when node(P) =:= Node -> - [P | Acc]; - (_, _, Acc) -> Acc - end, Cons, Enqs0), +suspected_pids_for(Node, Vsn, #?STATE{consumers = Cons0, + enqueuers = Enqs0, + waiting_consumers = WaitingConsumers0}) -> + Cons = rabbit_fifo_maps:fold(fun(_Key, + #consumer{cfg = #consumer_cfg{pid = P}, + status = suspected_down}, + Acc) + when node(P) =:= Node -> + [P | Acc]; + (_, _, Acc) -> + Acc + end, [], Cons0, Vsn), + Enqs = rabbit_fifo_maps:fold(fun(P, #enqueuer{status = suspected_down}, Acc) + when node(P) =:= Node -> + [P | Acc]; + (_, _, Acc) -> + Acc + end, Cons, Enqs0, Vsn), lists:foldl(fun({_Key, #consumer{cfg = #consumer_cfg{pid = P}, status = suspected_down}}, Acc) @@ -2783,7 +2795,10 @@ convert(Meta, 3, To, State) -> convert(Meta, 4, To, convert_v3_to_v4(Meta, State)); convert(Meta, 4, To, State) -> %% no conversion needed, this version only includes a logic change - convert(Meta, 5, To, State). + convert(Meta, 5, To, State); +convert(Meta, 5, To, State) -> + %% no conversion needed, this version only includes a logic change + convert(Meta, 6, To, State). smallest_raft_index(#?STATE{messages = Messages, ra_indexes = Indexes, diff --git a/deps/rabbit/src/rabbit_fifo_index.erl b/deps/rabbit/src/rabbit_fifo_index.erl index 852724c35a2..559a1b17102 100644 --- a/deps/rabbit/src/rabbit_fifo_index.erl +++ b/deps/rabbit/src/rabbit_fifo_index.erl @@ -1,3 +1,9 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. + -module(rabbit_fifo_index). -export([ diff --git a/deps/rabbit/src/rabbit_fifo_maps.erl b/deps/rabbit/src/rabbit_fifo_maps.erl new file mode 100644 index 00000000000..ccaac64c71c --- /dev/null +++ b/deps/rabbit/src/rabbit_fifo_maps.erl @@ -0,0 +1,41 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. + +%% Deterministic map operations. +-module(rabbit_fifo_maps). + +-export([keys/2, + fold/4]). + +-spec keys(Map, ra_machine:version()) -> Keys when + Map :: #{Key => _}, + Keys :: [Key]. +keys(Map, Vsn) -> + Keys = maps:keys(Map), + case is_deterministic(Vsn) of + true -> + lists:sort(Keys); + false -> + Keys + end. + +-spec fold(Fun, Init, Map, ra_machine:version()) -> Acc when + Fun :: fun((Key, Value, AccIn) -> AccOut), + Init :: term(), + Acc :: AccOut, + AccIn :: Init | AccOut, + Map :: #{Key => Value}. +fold(Fun, Init, Map, Vsn) -> + Iterable = case is_deterministic(Vsn) of + true -> + maps:iterator(Map, ordered); + false -> + Map + end, + maps:fold(Fun, Init, Iterable). + +is_deterministic(Vsn) when is_integer(Vsn) -> + Vsn > 5. diff --git a/deps/rabbit/test/queue_utils.erl b/deps/rabbit/test/queue_utils.erl index df060f58590..b68895e17dd 100644 --- a/deps/rabbit/test/queue_utils.erl +++ b/deps/rabbit/test/queue_utils.erl @@ -160,10 +160,6 @@ filter_queues(Expected, Got) -> lists:member(hd(G), Keys) end, Got). -ra_machines_use_same_version(Config) -> - Nodenames = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - ra_machines_use_same_version(rabbit_fifo, Config, Nodenames). - ra_machines_use_same_version(MachineModule, Config, Nodenames) when length(Nodenames) >= 1 -> [MachineAVersion | OtherMachinesVersions] = diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index f784d2c44ba..8937ff074ca 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -298,6 +298,24 @@ init_per_testcase(Testcase, Config) when Testcase == reconnect_consumer_and_publ Config2, rabbit_ct_broker_helpers:setup_steps() ++ rabbit_ct_client_helpers:setup_steps()); +init_per_testcase(T, Config) + when T =:= leader_locator_balanced orelse + T =:= leader_locator_policy -> + Vsn0 = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_fifo, version, []), + Vsn1 = rabbit_ct_broker_helpers:rpc(Config, 1, rabbit_fifo, version, []), + case Vsn0 =:= Vsn1 of + true -> + Config1 = rabbit_ct_helpers:testcase_started(Config, T), + Q = rabbit_data_coercion:to_binary(T), + Config2 = rabbit_ct_helpers:set_config( + Config1, [{queue_name, Q}, + {alt_queue_name, <>}, + {alt_2_queue_name, <>}]), + rabbit_ct_helpers:run_steps(Config2, + rabbit_ct_client_helpers:setup_steps()); + false -> + {skip, "machine versions must be the same for desired leader location to work"} + end; init_per_testcase(Testcase, Config) -> ClusterSize = ?config(rmq_nodes_count, Config), IsMixed = rabbit_ct_helpers:is_mixed_versions(), diff --git a/deps/rabbit/test/rabbit_fifo_prop_SUITE.erl b/deps/rabbit/test/rabbit_fifo_prop_SUITE.erl index 31d38424936..37a2c8048c6 100644 --- a/deps/rabbit/test/rabbit_fifo_prop_SUITE.erl +++ b/deps/rabbit/test/rabbit_fifo_prop_SUITE.erl @@ -3,9 +3,6 @@ -compile(nowarn_export_all). -compile(export_all). --export([ - ]). - -include_lib("proper/include/proper.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("eunit/include/eunit.hrl"). @@ -87,7 +84,8 @@ all_tests() -> dlx_07, dlx_08, dlx_09, - single_active_ordering_02 + single_active_ordering_02, + different_nodes ]. groups() -> @@ -1095,6 +1093,39 @@ single_active_ordering_03(_Config) -> false end. +%% Test that running the state machine commands on different Erlang nodes +%% end up in exactly the same state. +different_nodes(Config) -> + Config1 = rabbit_ct_helpers:run_setup_steps( + Config, + rabbit_ct_broker_helpers:setup_steps()), + + Size = 400, + run_proper( + fun () -> + ?FORALL({Length, Bytes, DeliveryLimit, SingleActive}, + frequency([{5, {undefined, undefined, undefined, false}}, + {5, {oneof([range(1, 10), undefined]), + oneof([range(1, 1000), undefined]), + oneof([range(1, 3), undefined]), + oneof([true, false]) + }}]), + begin + Conf = config(?FUNCTION_NAME, + Length, + Bytes, + SingleActive, + DeliveryLimit), + ?FORALL(O, ?LET(Ops, log_gen_different_nodes(Size), expand(Ops, Conf)), + collect({log_size, length(O)}, + different_nodes_prop(Config1, Conf, O))) + end) + end, [], Size), + + rabbit_ct_helpers:run_teardown_steps( + Config1, + rabbit_ct_broker_helpers:teardown_steps()). + max_length(_Config) -> %% tests that max length is never transgressed Size = 1000, @@ -1454,6 +1485,19 @@ single_active_prop(Conf0, Commands, ValidateOrder) -> false end. +different_nodes_prop(Config, Conf0, Commands) -> + Conf = Conf0#{release_cursor_interval => 100}, + Indexes = lists:seq(1, length(Commands)), + Entries = lists:zip(Indexes, Commands), + InitState = test_init(Conf), + Fun = fun(_) -> true end, + Vsn = 6, + + {State0, _Effs0} = run_log(InitState, Entries, Fun, Vsn), + {State1, _Effs1} = rabbit_ct_broker_helpers:rpc(Config, ?MODULE, run_log, + [InitState, Entries, Fun, Vsn]), + State0 =:= State1. + messages_total_prop(Conf0, Commands) -> Conf = Conf0#{release_cursor_interval => 100}, Indexes = lists:seq(1, length(Commands)), @@ -1797,6 +1841,29 @@ log_gen_without_checkout_cancel(Size) -> {1, purge} ]))))). +log_gen_different_nodes(Size) -> + Nodes = [node(), + fakenode@fake, + fakenode@fake2 + ], + ?LET(EPids, vector(4, pid_gen(Nodes)), + ?LET(CPids, vector(4, pid_gen(Nodes)), + resize(Size, + list( + frequency( + [{10, enqueue_gen(oneof(EPids))}, + {20, {input_event, + frequency([{10, settle}, + {2, return}, + {2, discard}, + {2, requeue}])}}, + {8, checkout_gen(oneof(CPids))}, + {2, checkout_cancel_gen(oneof(CPids))}, + {6, down_gen(oneof(EPids ++ CPids))}, + {6, nodeup_gen(Nodes)}, + {1, purge} + ]))))). + monotonic_gen() -> ?LET(_, integer(), erlang:unique_integer([positive, monotonic])).