Skip to content

Commit 08ccd96

Browse files
authored
lighthouse: add commit_failures support (#183)
1 parent 3bd67a8 commit 08ccd96

File tree

6 files changed

+178
-18
lines changed

6 files changed

+178
-18
lines changed

proto/torchft.proto

+3
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ message QuorumMember {
4242
int64 step = 4;
4343
uint64 world_size = 5;
4444
bool shrink_only = 6;
45+
int64 commit_failures = 8;
4546
// User passing in data stored as JSON string.
4647
string data = 7;
4748
}
@@ -77,6 +78,7 @@ message ManagerQuorumRequest {
7778
string checkpoint_metadata = 3;
7879
bool shrink_only = 4;
7980
bool init_sync = 5;
81+
int64 commit_failures = 6;
8082
}
8183

8284
message ManagerQuorumResponse {
@@ -93,6 +95,7 @@ message ManagerQuorumResponse {
9395
int64 replica_rank = 9;
9496
int64 replica_world_size = 10;
9597
bool heal = 11;
98+
int64 commit_failures = 12;
9699
}
97100

98101
message CheckpointMetadataRequest {

src/lib.rs

+3
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,7 @@ impl ManagerClient {
177177
checkpoint_metadata: String,
178178
shrink_only: bool,
179179
init_sync: bool,
180+
commit_failures: i64,
180181
timeout: Duration,
181182
) -> Result<QuorumResult, StatusError> {
182183
py.allow_threads(move || {
@@ -186,6 +187,7 @@ impl ManagerClient {
186187
checkpoint_metadata: checkpoint_metadata,
187188
shrink_only: shrink_only,
188189
init_sync: init_sync,
190+
commit_failures: commit_failures,
189191
});
190192

191193
// This timeout is processed on the server side so we also enable
@@ -547,6 +549,7 @@ impl LighthouseClient {
547549
world_size: world_size,
548550
shrink_only: shrink_only,
549551
data: data_string,
552+
commit_failures: 0,
550553
}),
551554
});
552555

src/lighthouse.rs

+103-1
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,12 @@ impl Lighthouse {
288288
if quorum_met.is_some() {
289289
let participants = quorum_met.unwrap();
290290

291+
let commit_failure_replica_ids: Vec<String> = participants
292+
.iter()
293+
.filter(|p| p.commit_failures > 0)
294+
.map(|p| p.replica_id.clone())
295+
.collect();
296+
291297
// only increment quorum ID if something about the quorum
292298
// changed (members/addresses/etc)
293299
if state.prev_quorum.is_none()
@@ -301,6 +307,13 @@ impl Lighthouse {
301307
"Detected quorum change, bumping quorum_id to {}",
302308
state.quorum_id
303309
);
310+
} else if commit_failure_replica_ids.len() > 0 {
311+
state.quorum_id += 1;
312+
info!(
313+
"Detected commit failures in [{}], bumping quorum_id to {}",
314+
commit_failure_replica_ids.join(", "),
315+
state.quorum_id
316+
);
304317
}
305318

306319
let quorum = Quorum {
@@ -639,6 +652,7 @@ mod tests {
639652
world_size: 1,
640653
shrink_only: false,
641654
data: String::new(),
655+
commit_failures: 0,
642656
},
643657
},
644658
);
@@ -656,6 +670,7 @@ mod tests {
656670
world_size: 1,
657671
shrink_only: false,
658672
data: String::new(),
673+
commit_failures: 0,
659674
},
660675
},
661676
);
@@ -712,6 +727,7 @@ mod tests {
712727
world_size: 1,
713728
shrink_only: false,
714729
data: String::new(),
730+
commit_failures: 0,
715731
},
716732
},
717733
);
@@ -751,6 +767,7 @@ mod tests {
751767
world_size: 1,
752768
shrink_only: false,
753769
data: String::new(),
770+
commit_failures: 0,
754771
},
755772
},
756773
);
@@ -798,6 +815,7 @@ mod tests {
798815
world_size: 1,
799816
shrink_only: false,
800817
data: String::new(),
818+
commit_failures: 0,
801819
},
802820
},
803821
);
@@ -819,6 +837,7 @@ mod tests {
819837
world_size: 1,
820838
shrink_only: false,
821839
data: String::new(),
840+
commit_failures: 0,
822841
}],
823842
created: Some(SystemTime::now().into()),
824843
});
@@ -838,6 +857,7 @@ mod tests {
838857
world_size: 1,
839858
shrink_only: false,
840859
data: String::new(),
860+
commit_failures: 0,
841861
},
842862
},
843863
);
@@ -882,6 +902,7 @@ mod tests {
882902
world_size: 1,
883903
shrink_only: false,
884904
data: String::new(),
905+
commit_failures: 0,
885906
},
886907
QuorumMember {
887908
replica_id: "b".to_string(),
@@ -891,6 +912,7 @@ mod tests {
891912
world_size: 1,
892913
shrink_only: false,
893914
data: String::new(),
915+
commit_failures: 0,
894916
},
895917
],
896918
created: Some(SystemTime::now().into()),
@@ -908,6 +930,7 @@ mod tests {
908930
world_size: 1,
909931
shrink_only: true,
910932
data: String::new(),
933+
commit_failures: 0,
911934
},
912935
},
913936
);
@@ -926,6 +949,7 @@ mod tests {
926949
world_size: 1,
927950
shrink_only: true,
928951
data: String::new(),
952+
commit_failures: 0,
929953
},
930954
},
931955
);
@@ -975,6 +999,7 @@ mod tests {
975999
world_size: 1,
9761000
shrink_only: false,
9771001
data: String::new(),
1002+
commit_failures: 0,
9781003
}),
9791004
});
9801005

@@ -1021,6 +1046,7 @@ mod tests {
10211046
world_size: 1,
10221047
shrink_only: false,
10231048
data: String::new(),
1049+
commit_failures: 0,
10241050
},
10251051
},
10261052
);
@@ -1047,6 +1073,7 @@ mod tests {
10471073
world_size: 1,
10481074
shrink_only: false,
10491075
data: String::new(),
1076+
commit_failures: 0,
10501077
}];
10511078
let b = vec![QuorumMember {
10521079
replica_id: "1".to_string(),
@@ -1056,6 +1083,7 @@ mod tests {
10561083
world_size: 1,
10571084
shrink_only: false,
10581085
data: String::new(),
1086+
commit_failures: 0,
10591087
}];
10601088

10611089
// replica_id is the same
@@ -1069,12 +1097,13 @@ mod tests {
10691097
world_size: 1,
10701098
shrink_only: false,
10711099
data: String::new(),
1100+
commit_failures: 0,
10721101
}];
10731102
// replica_id changed
10741103
assert!(quorum_changed(&a, &c));
10751104
}
1076-
#[tokio::test]
10771105

1106+
#[tokio::test]
10781107
async fn test_lighthouse_join_during_shrink() -> Result<()> {
10791108
fn create_member(id: &str, addr_num: &str, step: i64, shrink_only: bool) -> QuorumMember {
10801109
QuorumMember {
@@ -1085,6 +1114,7 @@ mod tests {
10851114
world_size: 1,
10861115
shrink_only,
10871116
data: String::new(),
1117+
commit_failures: 0,
10881118
}
10891119
}
10901120

@@ -1179,4 +1209,76 @@ mod tests {
11791209
lighthouse_task.abort();
11801210
Ok(())
11811211
}
1212+
1213+
#[tokio::test]
1214+
async fn test_lighthouse_commit_failures() -> Result<()> {
1215+
fn create_member(id: &str, commit_failures: i64) -> QuorumMember {
1216+
QuorumMember {
1217+
replica_id: id.to_string(),
1218+
address: format!("addr{}", id),
1219+
store_address: format!("store{}", id),
1220+
step: 10,
1221+
world_size: 1,
1222+
shrink_only: false,
1223+
data: String::new(),
1224+
commit_failures,
1225+
}
1226+
}
1227+
1228+
fn create_request(member: &QuorumMember) -> tonic::Request<LighthouseQuorumRequest> {
1229+
tonic::Request::new(LighthouseQuorumRequest {
1230+
requester: Some(member.clone()),
1231+
})
1232+
}
1233+
1234+
let opt = LighthouseOpt {
1235+
min_replicas: 2,
1236+
bind: "[::]:0".to_string(),
1237+
join_timeout_ms: 1000,
1238+
quorum_tick_ms: 10,
1239+
heartbeat_timeout_ms: 5000,
1240+
};
1241+
1242+
// Start the lighthouse service
1243+
let lighthouse = Lighthouse::new(opt).await?;
1244+
let lighthouse_task = tokio::spawn(lighthouse.clone().run());
1245+
1246+
// Create client to interact with lighthouse
1247+
let mut client = lighthouse_client_new(lighthouse.address()).await?;
1248+
1249+
// First two quorums should be stable
1250+
for _i in 0..2 {
1251+
let first_request = create_request(&create_member("replica0", 0));
1252+
let second_request = create_request(&create_member("replica1", 0));
1253+
1254+
tokio::spawn({
1255+
let mut client = client.clone();
1256+
async move { client.quorum(first_request).await }
1257+
});
1258+
let first_response = client.quorum(second_request).await?;
1259+
let first_quorum = first_response.into_inner().quorum.unwrap();
1260+
assert_eq!(first_quorum.quorum_id, 1);
1261+
assert_eq!(first_quorum.participants.len(), 2);
1262+
assert_eq!(first_quorum.participants[0].commit_failures, 0);
1263+
assert_eq!(first_quorum.participants[1].commit_failures, 0);
1264+
}
1265+
1266+
// commit_failures should increment quorum_id
1267+
let first_request = create_request(&create_member("replica0", 0));
1268+
let second_request = create_request(&create_member("replica1", 2));
1269+
1270+
tokio::spawn({
1271+
let mut client = client.clone();
1272+
async move { client.quorum(first_request).await }
1273+
});
1274+
let first_response = client.quorum(second_request).await?;
1275+
let first_quorum = first_response.into_inner().quorum.unwrap();
1276+
assert_eq!(first_quorum.quorum_id, 2);
1277+
assert_eq!(first_quorum.participants.len(), 2);
1278+
assert_eq!(first_quorum.participants[0].commit_failures, 0);
1279+
assert_eq!(first_quorum.participants[1].commit_failures, 2);
1280+
1281+
lighthouse_task.abort();
1282+
Ok(())
1283+
}
11821284
}

0 commit comments

Comments
 (0)