@@ -514,7 +514,8 @@ apply(#{index := _Idx}, #garbage_collection{}, State) ->
514
514
{State , ok , [{aux , garbage_collection }]};
515
515
apply (Meta , {timeout , expire_msgs }, State ) ->
516
516
checkout (Meta , State , State , []);
517
- apply (#{system_time := Ts } = Meta ,
517
+ apply (#{machine_version := Vsn ,
518
+ system_time := Ts } = Meta ,
518
519
{down , Pid , noconnection },
519
520
#? STATE {consumers = Cons0 ,
520
521
cfg = # cfg {consumer_strategy = single_active },
@@ -524,7 +525,7 @@ apply(#{system_time := Ts} = Meta,
524
525
% % if the pid refers to an active or cancelled consumer,
525
526
% % mark it as suspected and return it to the waiting queue
526
527
{State1 , Effects0 } =
527
- maps :fold (
528
+ rabbit_fifo_maps :fold (
528
529
fun (CKey , ? CONSUMER_PID (P ) = C0 , {S0 , E0 })
529
530
when node (P ) =:= Node ->
530
531
% % the consumer should be returned to waiting
@@ -546,7 +547,7 @@ apply(#{system_time := Ts} = Meta,
546
547
Effs1 };
547
548
(_ , _ , S ) ->
548
549
S
549
- end , {State0 , []}, Cons0 ),
550
+ end , {State0 , []}, Cons0 , Vsn ),
550
551
WaitingConsumers = update_waiting_consumer_status (Node , State1 ,
551
552
suspected_down ),
552
553
@@ -561,7 +562,8 @@ apply(#{system_time := Ts} = Meta,
561
562
end , Enqs0 ),
562
563
Effects = [{monitor , node , Node } | Effects1 ],
563
564
checkout (Meta , State0 , State #? STATE {enqueuers = Enqs }, Effects );
564
- apply (#{system_time := Ts } = Meta ,
565
+ apply (#{machine_version := Vsn ,
566
+ system_time := Ts } = Meta ,
565
567
{down , Pid , noconnection },
566
568
#? STATE {consumers = Cons0 ,
567
569
enqueuers = Enqs0 } = State0 ) ->
@@ -576,7 +578,7 @@ apply(#{system_time := Ts} = Meta,
576
578
Node = node (Pid ),
577
579
578
580
{State , Effects1 } =
579
- maps :fold (
581
+ rabbit_fifo_maps :fold (
580
582
fun (CKey , # consumer {cfg = # consumer_cfg {pid = P },
581
583
status = up } = C0 ,
582
584
{St0 , Eff }) when node (P ) =:= Node ->
@@ -587,7 +589,7 @@ apply(#{system_time := Ts} = Meta,
587
589
{St , Eff1 };
588
590
(_ , _ , {St , Eff }) ->
589
591
{St , Eff }
590
- end , {State0 , []}, Cons0 ),
592
+ end , {State0 , []}, Cons0 , Vsn ),
591
593
Enqs = maps :map (fun (P , E ) when node (P ) =:= Node ->
592
594
E # enqueuer {status = suspected_down };
593
595
(_ , E ) -> E
@@ -603,15 +605,17 @@ apply(#{system_time := Ts} = Meta,
603
605
apply (Meta , {down , Pid , _Info }, State0 ) ->
604
606
{State1 , Effects1 } = activate_next_consumer (handle_down (Meta , Pid , State0 )),
605
607
checkout (Meta , State0 , State1 , Effects1 );
606
- apply (Meta , {nodeup , Node }, #? STATE {consumers = Cons0 ,
607
- enqueuers = Enqs0 ,
608
- service_queue = _SQ0 } = State0 ) ->
608
+ apply (#{machine_version := Vsn } = Meta ,
609
+ {nodeup , Node },
610
+ #? STATE {consumers = Cons0 ,
611
+ enqueuers = Enqs0 ,
612
+ service_queue = _SQ0 } = State0 ) ->
609
613
% % A node we are monitoring has come back.
610
614
% % If we have suspected any processes of being
611
615
% % down we should now re-issue the monitors for them to detect if they're
612
616
% % actually down or not
613
617
Monitors = [{monitor , process , P }
614
- || P <- suspected_pids_for (Node , State0 )],
618
+ || P <- suspected_pids_for (Node , Vsn , State0 )],
615
619
616
620
Enqs1 = maps :map (fun (P , E ) when node (P ) =:= Node ->
617
621
E # enqueuer {status = up };
@@ -620,17 +624,18 @@ apply(Meta, {nodeup, Node}, #?STATE{consumers = Cons0,
620
624
ConsumerUpdateActiveFun = consumer_active_flag_update_function (State0 ),
621
625
% % mark all consumers as up
622
626
{State1 , Effects1 } =
623
- maps :fold (fun (ConsumerKey , ? CONSUMER_PID (P ) = C , {SAcc , EAcc })
624
- when (node (P ) =:= Node ) and
625
- (C # consumer .status =/= cancelled ) ->
626
- EAcc1 = ConsumerUpdateActiveFun (SAcc , ConsumerKey ,
627
- C , true , up , EAcc ),
628
- {update_or_remove_con (Meta , ConsumerKey ,
629
- C # consumer {status = up },
630
- SAcc ), EAcc1 };
631
- (_ , _ , Acc ) ->
632
- Acc
633
- end , {State0 , Monitors }, Cons0 ),
627
+ rabbit_fifo_maps :fold (
628
+ fun (ConsumerKey , ? CONSUMER_PID (P ) = C , {SAcc , EAcc })
629
+ when (node (P ) =:= Node ) and
630
+ (C # consumer .status =/= cancelled ) ->
631
+ EAcc1 = ConsumerUpdateActiveFun (SAcc , ConsumerKey ,
632
+ C , true , up , EAcc ),
633
+ {update_or_remove_con (Meta , ConsumerKey ,
634
+ C # consumer {status = up },
635
+ SAcc ), EAcc1 };
636
+ (_ , _ , Acc ) ->
637
+ Acc
638
+ end , {State0 , Monitors }, Cons0 , Vsn ),
634
639
Waiting = update_waiting_consumer_status (Node , State1 , up ),
635
640
State2 = State1 #? STATE {enqueuers = Enqs1 ,
636
641
waiting_consumers = Waiting },
@@ -708,27 +713,29 @@ convert_v3_to_v4(#{} = _Meta, StateV3) ->
708
713
msg_cache = rabbit_fifo_v3 :get_field (msg_cache , StateV3 ),
709
714
unused_1 = []}.
710
715
711
- purge_node (Meta , Node , State , Effects ) ->
716
+ purge_node (#{ machine_version : = Vsn } = Meta , Node , State , Effects ) ->
712
717
lists :foldl (fun (Pid , {S0 , E0 }) ->
713
718
{S , E } = handle_down (Meta , Pid , S0 ),
714
719
{S , E0 ++ E }
715
720
end , {State , Effects },
716
- all_pids_for (Node , State )).
721
+ all_pids_for (Node , Vsn , State )).
717
722
718
723
% % any downs that are not noconnection
719
- handle_down (Meta , Pid , #? STATE {consumers = Cons0 ,
720
- enqueuers = Enqs0 } = State0 ) ->
724
+ handle_down (#{machine_version := Vsn } = Meta ,
725
+ Pid , #? STATE {consumers = Cons0 ,
726
+ enqueuers = Enqs0 } = State0 ) ->
721
727
% Remove any enqueuer for the down pid
722
728
State1 = State0 #? STATE {enqueuers = maps :remove (Pid , Enqs0 )},
723
729
{Effects1 , State2 } = handle_waiting_consumer_down (Pid , State1 ),
724
730
% return checked out messages to main queue
725
731
% Find the consumers for the down pid
726
- DownConsumers = maps :keys (maps :filter (fun (_CKey , ? CONSUMER_PID (P )) ->
727
- P =:= Pid
728
- end , Cons0 )),
732
+ DownConsumers = maps :filter (fun (_CKey , ? CONSUMER_PID (P )) ->
733
+ P =:= Pid
734
+ end , Cons0 ),
735
+ DownConsumerKeys = rabbit_fifo_maps :keys (DownConsumers , Vsn ),
729
736
lists :foldl (fun (ConsumerKey , {S , E }) ->
730
737
cancel_consumer (Meta , ConsumerKey , S , E , down )
731
- end , {State2 , Effects1 }, DownConsumers ).
738
+ end , {State2 , Effects1 }, DownConsumerKeys ).
732
739
733
740
consumer_active_flag_update_function (
734
741
#? STATE {cfg = # cfg {consumer_strategy = competing }}) ->
@@ -916,14 +923,15 @@ get_checked_out(CKey, From, To, #?STATE{consumers = Consumers}) ->
916
923
end .
917
924
918
925
-spec version () -> pos_integer ().
919
- version () -> 5 .
926
+ version () -> 6 .
920
927
921
928
which_module (0 ) -> rabbit_fifo_v0 ;
922
929
which_module (1 ) -> rabbit_fifo_v1 ;
923
930
which_module (2 ) -> rabbit_fifo_v3 ;
924
931
which_module (3 ) -> rabbit_fifo_v3 ;
925
932
which_module (4 ) -> ? MODULE ;
926
- which_module (5 ) -> ? MODULE .
933
+ which_module (5 ) -> ? MODULE ;
934
+ which_module (6 ) -> ? MODULE .
927
935
928
936
-define (AUX , aux_v3 ).
929
937
@@ -2692,41 +2700,45 @@ all_nodes(#?STATE{consumers = Cons0,
2692
2700
Acc #{node (P ) => ok }
2693
2701
end , Nodes1 , WaitingConsumers0 )).
2694
2702
2695
- all_pids_for (Node , #? STATE {consumers = Cons0 ,
2696
- enqueuers = Enqs0 ,
2697
- waiting_consumers = WaitingConsumers0 }) ->
2698
- Cons = maps :fold (fun (_ , ? CONSUMER_PID (P ), Acc )
2699
- when node (P ) =:= Node ->
2700
- [P | Acc ];
2701
- (_ , _ , Acc ) -> Acc
2702
- end , [], Cons0 ),
2703
- Enqs = maps :fold (fun (P , _ , Acc )
2704
- when node (P ) =:= Node ->
2705
- [P | Acc ];
2706
- (_ , _ , Acc ) -> Acc
2707
- end , Cons , Enqs0 ),
2703
+ all_pids_for (Node , Vsn , #? STATE {consumers = Cons0 ,
2704
+ enqueuers = Enqs0 ,
2705
+ waiting_consumers = WaitingConsumers0 }) ->
2706
+ Cons = rabbit_fifo_maps :fold (fun (_ , ? CONSUMER_PID (P ), Acc )
2707
+ when node (P ) =:= Node ->
2708
+ [P | Acc ];
2709
+ (_ , _ , Acc ) ->
2710
+ Acc
2711
+ end , [], Cons0 , Vsn ),
2712
+ Enqs = rabbit_fifo_maps :fold (fun (P , _ , Acc )
2713
+ when node (P ) =:= Node ->
2714
+ [P | Acc ];
2715
+ (_ , _ , Acc ) ->
2716
+ Acc
2717
+ end , Cons , Enqs0 , Vsn ),
2708
2718
lists :foldl (fun ({_ , ? CONSUMER_PID (P )}, Acc )
2709
2719
when node (P ) =:= Node ->
2710
2720
[P | Acc ];
2711
2721
(_ , Acc ) -> Acc
2712
2722
end , Enqs , WaitingConsumers0 ).
2713
2723
2714
- suspected_pids_for (Node , #? STATE {consumers = Cons0 ,
2715
- enqueuers = Enqs0 ,
2716
- waiting_consumers = WaitingConsumers0 }) ->
2717
- Cons = maps :fold (fun (_Key ,
2718
- # consumer {cfg = # consumer_cfg {pid = P },
2719
- status = suspected_down },
2720
- Acc )
2721
- when node (P ) =:= Node ->
2722
- [P | Acc ];
2723
- (_ , _ , Acc ) -> Acc
2724
- end , [], Cons0 ),
2725
- Enqs = maps :fold (fun (P , # enqueuer {status = suspected_down }, Acc )
2726
- when node (P ) =:= Node ->
2727
- [P | Acc ];
2728
- (_ , _ , Acc ) -> Acc
2729
- end , Cons , Enqs0 ),
2724
+ suspected_pids_for (Node , Vsn , #? STATE {consumers = Cons0 ,
2725
+ enqueuers = Enqs0 ,
2726
+ waiting_consumers = WaitingConsumers0 }) ->
2727
+ Cons = rabbit_fifo_maps :fold (fun (_Key ,
2728
+ # consumer {cfg = # consumer_cfg {pid = P },
2729
+ status = suspected_down },
2730
+ Acc )
2731
+ when node (P ) =:= Node ->
2732
+ [P | Acc ];
2733
+ (_ , _ , Acc ) ->
2734
+ Acc
2735
+ end , [], Cons0 , Vsn ),
2736
+ Enqs = rabbit_fifo_maps :fold (fun (P , # enqueuer {status = suspected_down }, Acc )
2737
+ when node (P ) =:= Node ->
2738
+ [P | Acc ];
2739
+ (_ , _ , Acc ) ->
2740
+ Acc
2741
+ end , Cons , Enqs0 , Vsn ),
2730
2742
lists :foldl (fun ({_Key ,
2731
2743
# consumer {cfg = # consumer_cfg {pid = P },
2732
2744
status = suspected_down }}, Acc )
@@ -2783,7 +2795,10 @@ convert(Meta, 3, To, State) ->
2783
2795
convert (Meta , 4 , To , convert_v3_to_v4 (Meta , State ));
2784
2796
convert (Meta , 4 , To , State ) ->
2785
2797
% % no conversion needed, this version only includes a logic change
2786
- convert (Meta , 5 , To , State ).
2798
+ convert (Meta , 5 , To , State );
2799
+ convert (Meta , 5 , To , State ) ->
2800
+ % % no conversion needed, this version only includes a logic change
2801
+ convert (Meta , 6 , To , State ).
2787
2802
2788
2803
smallest_raft_index (#? STATE {messages = Messages ,
2789
2804
ra_indexes = Indexes ,
0 commit comments