@@ -288,6 +288,12 @@ impl Lighthouse {
288288 if quorum_met. is_some ( ) {
289289 let participants = quorum_met. unwrap ( ) ;
290290
291+ let commit_failure_replica_ids: Vec < String > = participants
292+ . iter ( )
293+ . filter ( |p| p. commit_failures > 0 )
294+ . map ( |p| p. replica_id . clone ( ) )
295+ . collect ( ) ;
296+
291297 // only increment quorum ID if something about the quorum
292298 // changed (members/addresses/etc)
293299 if state. prev_quorum . is_none ( )
@@ -301,6 +307,13 @@ impl Lighthouse {
301307 "Detected quorum change, bumping quorum_id to {}" ,
302308 state. quorum_id
303309 ) ;
310+ } else if commit_failure_replica_ids. len ( ) > 0 {
311+ state. quorum_id += 1 ;
312+ info ! (
313+ "Detected commit failures in [{}], bumping quorum_id to {}" ,
314+ commit_failure_replica_ids. join( ", " ) ,
315+ state. quorum_id
316+ ) ;
304317 }
305318
306319 let quorum = Quorum {
@@ -639,6 +652,7 @@ mod tests {
639652 world_size : 1 ,
640653 shrink_only : false ,
641654 data : String :: new ( ) ,
655+ commit_failures : 0 ,
642656 } ,
643657 } ,
644658 ) ;
@@ -656,6 +670,7 @@ mod tests {
656670 world_size : 1 ,
657671 shrink_only : false ,
658672 data : String :: new ( ) ,
673+ commit_failures : 0 ,
659674 } ,
660675 } ,
661676 ) ;
@@ -712,6 +727,7 @@ mod tests {
712727 world_size : 1 ,
713728 shrink_only : false ,
714729 data : String :: new ( ) ,
730+ commit_failures : 0 ,
715731 } ,
716732 } ,
717733 ) ;
@@ -751,6 +767,7 @@ mod tests {
751767 world_size : 1 ,
752768 shrink_only : false ,
753769 data : String :: new ( ) ,
770+ commit_failures : 0 ,
754771 } ,
755772 } ,
756773 ) ;
@@ -798,6 +815,7 @@ mod tests {
798815 world_size : 1 ,
799816 shrink_only : false ,
800817 data : String :: new ( ) ,
818+ commit_failures : 0 ,
801819 } ,
802820 } ,
803821 ) ;
@@ -819,6 +837,7 @@ mod tests {
819837 world_size: 1 ,
820838 shrink_only: false ,
821839 data: String :: new( ) ,
840+ commit_failures: 0 ,
822841 } ] ,
823842 created : Some ( SystemTime :: now ( ) . into ( ) ) ,
824843 } ) ;
@@ -838,6 +857,7 @@ mod tests {
838857 world_size : 1 ,
839858 shrink_only : false ,
840859 data : String :: new ( ) ,
860+ commit_failures : 0 ,
841861 } ,
842862 } ,
843863 ) ;
@@ -882,6 +902,7 @@ mod tests {
882902 world_size: 1 ,
883903 shrink_only: false ,
884904 data: String :: new( ) ,
905+ commit_failures: 0 ,
885906 } ,
886907 QuorumMember {
887908 replica_id: "b" . to_string( ) ,
@@ -891,6 +912,7 @@ mod tests {
891912 world_size: 1 ,
892913 shrink_only: false ,
893914 data: String :: new( ) ,
915+ commit_failures: 0 ,
894916 } ,
895917 ] ,
896918 created : Some ( SystemTime :: now ( ) . into ( ) ) ,
@@ -908,6 +930,7 @@ mod tests {
908930 world_size : 1 ,
909931 shrink_only : true ,
910932 data : String :: new ( ) ,
933+ commit_failures : 0 ,
911934 } ,
912935 } ,
913936 ) ;
@@ -926,6 +949,7 @@ mod tests {
926949 world_size : 1 ,
927950 shrink_only : true ,
928951 data : String :: new ( ) ,
952+ commit_failures : 0 ,
929953 } ,
930954 } ,
931955 ) ;
@@ -975,6 +999,7 @@ mod tests {
975999 world_size : 1 ,
9761000 shrink_only : false ,
9771001 data : String :: new ( ) ,
1002+ commit_failures : 0 ,
9781003 } ) ,
9791004 } ) ;
9801005
@@ -1021,6 +1046,7 @@ mod tests {
10211046 world_size : 1 ,
10221047 shrink_only : false ,
10231048 data : String :: new ( ) ,
1049+ commit_failures : 0 ,
10241050 } ,
10251051 } ,
10261052 ) ;
@@ -1047,6 +1073,7 @@ mod tests {
10471073 world_size: 1 ,
10481074 shrink_only: false ,
10491075 data: String :: new( ) ,
1076+ commit_failures: 0 ,
10501077 } ] ;
10511078 let b = vec ! [ QuorumMember {
10521079 replica_id: "1" . to_string( ) ,
@@ -1056,6 +1083,7 @@ mod tests {
10561083 world_size: 1 ,
10571084 shrink_only: false ,
10581085 data: String :: new( ) ,
1086+ commit_failures: 0 ,
10591087 } ] ;
10601088
10611089 // replica_id is the same
@@ -1069,12 +1097,13 @@ mod tests {
10691097 world_size: 1 ,
10701098 shrink_only: false ,
10711099 data: String :: new( ) ,
1100+ commit_failures: 0 ,
10721101 } ] ;
10731102 // replica_id changed
10741103 assert ! ( quorum_changed( & a, & c) ) ;
10751104 }
1076- #[ tokio:: test]
10771105
1106+ #[ tokio:: test]
10781107 async fn test_lighthouse_join_during_shrink ( ) -> Result < ( ) > {
10791108 fn create_member ( id : & str , addr_num : & str , step : i64 , shrink_only : bool ) -> QuorumMember {
10801109 QuorumMember {
@@ -1085,6 +1114,7 @@ mod tests {
10851114 world_size : 1 ,
10861115 shrink_only,
10871116 data : String :: new ( ) ,
1117+ commit_failures : 0 ,
10881118 }
10891119 }
10901120
@@ -1179,4 +1209,76 @@ mod tests {
11791209 lighthouse_task. abort ( ) ;
11801210 Ok ( ( ) )
11811211 }
1212+
1213+ #[ tokio:: test]
1214+ async fn test_lighthouse_commit_failures ( ) -> Result < ( ) > {
1215+ fn create_member ( id : & str , commit_failures : i64 ) -> QuorumMember {
1216+ QuorumMember {
1217+ replica_id : id. to_string ( ) ,
1218+ address : format ! ( "addr{}" , id) ,
1219+ store_address : format ! ( "store{}" , id) ,
1220+ step : 10 ,
1221+ world_size : 1 ,
1222+ shrink_only : false ,
1223+ data : String :: new ( ) ,
1224+ commit_failures,
1225+ }
1226+ }
1227+
1228+ fn create_request ( member : & QuorumMember ) -> tonic:: Request < LighthouseQuorumRequest > {
1229+ tonic:: Request :: new ( LighthouseQuorumRequest {
1230+ requester : Some ( member. clone ( ) ) ,
1231+ } )
1232+ }
1233+
1234+ let opt = LighthouseOpt {
1235+ min_replicas : 2 ,
1236+ bind : "[::]:0" . to_string ( ) ,
1237+ join_timeout_ms : 1000 ,
1238+ quorum_tick_ms : 10 ,
1239+ heartbeat_timeout_ms : 5000 ,
1240+ } ;
1241+
1242+ // Start the lighthouse service
1243+ let lighthouse = Lighthouse :: new ( opt) . await ?;
1244+ let lighthouse_task = tokio:: spawn ( lighthouse. clone ( ) . run ( ) ) ;
1245+
1246+ // Create client to interact with lighthouse
1247+ let mut client = lighthouse_client_new ( lighthouse. address ( ) ) . await ?;
1248+
1249+ // First two quorums should be stable
1250+ for _i in 0 ..2 {
1251+ let first_request = create_request ( & create_member ( "replica0" , 0 ) ) ;
1252+ let second_request = create_request ( & create_member ( "replica1" , 0 ) ) ;
1253+
1254+ tokio:: spawn ( {
1255+ let mut client = client. clone ( ) ;
1256+ async move { client. quorum ( first_request) . await }
1257+ } ) ;
1258+ let first_response = client. quorum ( second_request) . await ?;
1259+ let first_quorum = first_response. into_inner ( ) . quorum . unwrap ( ) ;
1260+ assert_eq ! ( first_quorum. quorum_id, 1 ) ;
1261+ assert_eq ! ( first_quorum. participants. len( ) , 2 ) ;
1262+ assert_eq ! ( first_quorum. participants[ 0 ] . commit_failures, 0 ) ;
1263+ assert_eq ! ( first_quorum. participants[ 1 ] . commit_failures, 0 ) ;
1264+ }
1265+
1266+ // commit_failures should increment quorum_id
1267+ let first_request = create_request ( & create_member ( "replica0" , 0 ) ) ;
1268+ let second_request = create_request ( & create_member ( "replica1" , 2 ) ) ;
1269+
1270+ tokio:: spawn ( {
1271+ let mut client = client. clone ( ) ;
1272+ async move { client. quorum ( first_request) . await }
1273+ } ) ;
1274+ let first_response = client. quorum ( second_request) . await ?;
1275+ let first_quorum = first_response. into_inner ( ) . quorum . unwrap ( ) ;
1276+ assert_eq ! ( first_quorum. quorum_id, 2 ) ;
1277+ assert_eq ! ( first_quorum. participants. len( ) , 2 ) ;
1278+ assert_eq ! ( first_quorum. participants[ 0 ] . commit_failures, 0 ) ;
1279+ assert_eq ! ( first_quorum. participants[ 1 ] . commit_failures, 2 ) ;
1280+
1281+ lighthouse_task. abort ( ) ;
1282+ Ok ( ( ) )
1283+ }
11821284}
0 commit comments