Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ use crate::progress::stream_id::StreamId;
use crate::quorum::QuorumSet;
use crate::raft::AppendEntriesRequest;
use crate::raft::ClientWriteResult;
use crate::raft::LogSegment;
use crate::raft::ReadPolicy;
use crate::raft::StreamAppendError;
use crate::raft::VoteRequest;
Expand Down Expand Up @@ -1474,7 +1475,8 @@ where
pub(super) fn handle_append_entries_request(&mut self, req: AppendEntriesRequest<C>, tx: AppendEntriesTx<C>) {
tracing::debug!("{}: req: {}", func_name!(), req);

self.engine.handle_append_entries(&req.vote, req.prev_log_id, req.entries, tx);
let segment = LogSegment::new(req.prev_log_id, req.entries);
self.engine.handle_append_entries(&req.vote, segment, tx);

// Record append entries to external metrics recorder
if let Some(r) = &self.metrics_recorder {
Expand Down
4 changes: 2 additions & 2 deletions openraft/src/core/raft_msg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ use crate::errors::InitializeError;
use crate::errors::LinearizableReadError;
use crate::impls::ProgressResponder;
use crate::raft::AppendEntriesRequest;
use crate::raft::AppendEntriesResponse;
use crate::raft::ClientWriteResult;
use crate::raft::ReadPolicy;
use crate::raft::SnapshotResponse;
use crate::raft::VoteRequest;
use crate::raft::VoteResponse;
use crate::raft::linearizable_read::Linearizer;
use crate::raft::responder::core_responder::CoreResponder;
use crate::raft::stream_append::StreamAppendResult;
use crate::storage::Snapshot;
use crate::type_config::alias::CommittedLeaderIdOf;
use crate::type_config::alias::OneshotSenderOf;
Expand All @@ -43,7 +43,7 @@ pub(crate) type ResultSender<C, T, E = Infallible> = OneshotSenderOf<C, Result<T
pub(crate) type VoteTx<C> = OneshotSenderOf<C, VoteResponse<C>>;

/// TX for Append Entries Response
pub(crate) type AppendEntriesTx<C> = OneshotSenderOf<C, AppendEntriesResponse<C>>;
pub(crate) type AppendEntriesTx<C> = OneshotSenderOf<C, StreamAppendResult<C>>;

/// TX for Linearizable Read Response
pub(crate) type ClientReadTx<C> = ResultSender<C, Linearizer<C>, LinearizableReadError<C>>;
Expand Down
9 changes: 6 additions & 3 deletions openraft/src/engine/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ use crate::engine::replication_progress::TargetProgress;
use crate::errors::InitializeError;
use crate::errors::InstallSnapshotError;
use crate::progress::inflight_id::InflightId;
use crate::raft::AppendEntriesResponse;
use crate::raft::InstallSnapshotResponse;
use crate::raft::SnapshotResponse;
use crate::raft::VoteRequest;
use crate::raft::VoteResponse;
use crate::raft::message::TransferLeaderRequest;
use crate::raft::stream_append::StreamAppendResult;
use crate::raft_state::IOId;
use crate::raft_state::IOState;
use crate::replication::ReplicationSessionId;
Expand Down Expand Up @@ -421,7 +421,7 @@ pub(crate) enum Respond<C>
where C: RaftTypeConfig
{
Vote(ValueSender<C, VoteResponse<C>>),
AppendEntries(ValueSender<C, AppendEntriesResponse<C>>),
AppendEntries(ValueSender<C, StreamAppendResult<C>>),
ReceiveSnapshotChunk(ValueSender<C, Result<(), InstallSnapshotError>>),
InstallSnapshot(ValueSender<C, Result<InstallSnapshotResponse<C>, InstallSnapshotError>>),
InstallFullSnapshot(ValueSender<C, SnapshotResponse<C>>),
Expand All @@ -434,7 +434,10 @@ where C: RaftTypeConfig
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Respond::Vote(vs) => write!(f, "Vote {}", vs.value()),
Respond::AppendEntries(vs) => write!(f, "AppendEntries {}", vs.value()),
Respond::AppendEntries(vs) => match vs.value() {
Ok(log_id) => write!(f, "AppendEntries Ok({})", log_id.display()),
Err(e) => write!(f, "AppendEntries Err({})", e),
},
Respond::ReceiveSnapshotChunk(vs) => {
write!(
f,
Expand Down
39 changes: 15 additions & 24 deletions openraft/src/engine/engine_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use crate::core::ServerState;
use crate::core::raft_msg::AppendEntriesTx;
use crate::core::sm;
use crate::display_ext::DisplayOptionExt;
use crate::display_ext::DisplaySliceExt;
use crate::engine::Command;
use crate::engine::Condition;
use crate::engine::EngineOutput;
Expand All @@ -35,10 +34,11 @@ use crate::proposer::Leader;
use crate::proposer::LeaderQuorumSet;
use crate::proposer::LeaderState;
use crate::proposer::leader_state::CandidateState;
use crate::raft::AppendEntriesResponse;
use crate::raft::LogSegment;
use crate::raft::SnapshotResponse;
use crate::raft::VoteRequest;
use crate::raft::VoteResponse;
use crate::raft::stream_append::StreamAppendResult;
use crate::raft_state::IOId;
use crate::raft_state::LogStateReader;
use crate::raft_state::RaftState;
Expand Down Expand Up @@ -363,29 +363,19 @@ where C: RaftTypeConfig
///
/// Also clean conflicting entries and update membership state.
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn handle_append_entries(
&mut self,
vote: &VoteOf<C>,
prev_log_id: Option<LogIdOf<C>>,
entries: Vec<C::Entry>,
tx: AppendEntriesTx<C>,
) {
pub(crate) fn handle_append_entries(&mut self, vote: &VoteOf<C>, segment: LogSegment<C>, tx: AppendEntriesTx<C>) {
tracing::debug!(
"{}: vote: {}, prev_log_id: {}, entries: {}, my_vote: {}, my_last_log_id: {}",
"{}: vote: {}, segment: {}, my_vote: {}, my_last_log_id: {}",
func_name!(),
vote,
prev_log_id.display(),
entries.display(),
segment,
self.state.vote_ref(),
self.state.last_log_id().display()
);

let res = self.append_entries(vote, prev_log_id, entries);
let is_ok = res.is_ok();
let stream_result: StreamAppendResult<C> = self.append_entries(vote, segment).map_err(Into::into);

let resp: AppendEntriesResponse<C> = res.into();

let condition = if is_ok {
let condition = if stream_result.is_ok() {
Some(Condition::IOFlushed {
io_id: self.state.accepted_log_io().unwrap().clone(),
})
Expand All @@ -395,25 +385,26 @@ where C: RaftTypeConfig

self.output.push_command(Command::Respond {
when: condition,
resp: Respond::new(resp, tx),
resp: Respond::new(stream_result, tx),
});
}

pub(crate) fn append_entries(
&mut self,
vote: &VoteOf<C>,
prev_log_id: Option<LogIdOf<C>>,
entries: Vec<C::Entry>,
) -> Result<(), RejectAppendEntries<C>> {
segment: LogSegment<C>,
) -> Result<Option<LogIdOf<C>>, RejectAppendEntries<C>> {
self.vote_handler().update_vote(vote)?;

// Vote is legal.

let last = segment.last();

let mut fh = self.following_handler();
fh.ensure_log_consecutive(prev_log_id.as_ref())?;
fh.append_entries(prev_log_id, entries);
fh.ensure_log_consecutive(segment.prev_log_id.as_ref())?;
fh.append_entries(segment.prev_log_id, segment.entries);

Ok(())
Ok(last)
}

/// Install a completely received snapshot on a follower.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::engine::Engine;
use crate::engine::LogIdList;
use crate::engine::testing::UTConfig;
use crate::engine::testing::log_id;
use crate::errors::RejectLeadership;
use crate::errors::RejectVote;
use crate::type_config::TypeConfigExt;
use crate::utime::Leased;

Expand Down Expand Up @@ -50,7 +50,12 @@ fn test_handle_message_vote_reject_smaller_vote() -> anyhow::Result<()> {

let resp = eng.vote_handler().update_vote(&Vote::new(1, 2));

assert_eq!(Err(RejectLeadership::ByVote(Vote::new_committed(2, 1))), resp);
assert_eq!(
Err(RejectVote {
higher: Vote::new_committed(2, 1),
}),
resp
);

assert_eq!(Vote::new_committed(2, 1), *eng.state.vote_ref());
assert!(eng.leader.is_some());
Expand Down
10 changes: 6 additions & 4 deletions openraft/src/engine/handler/vote_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::engine::handler::leader_handler::LeaderHandler;
use crate::engine::handler::replication_handler::ReplicationHandler;
use crate::engine::handler::server_state_handler::ServerStateHandler;
use crate::entry::payload::EntryPayload;
use crate::errors::RejectLeadership;
use crate::errors::RejectVote;
use crate::proposer::CandidateState;
use crate::proposer::LeaderState;
use crate::raft_state::IOId;
Expand Down Expand Up @@ -70,7 +70,7 @@ where C: RaftTypeConfig
where
T: Debug + Eq + OptionalSend,
Respond<C>: From<ValueSender<C, T>>,
F: Fn(&RaftState<C>, RejectLeadership<C>) -> T,
F: Fn(&RaftState<C>, RejectVote<C>) -> T,
{
let vote_res = self.update_vote(vote);

Expand Down Expand Up @@ -101,15 +101,17 @@ where C: RaftTypeConfig
/// Note: This method does not check last-log-id. handle-vote-request has to deal with
/// last-log-id itself.
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn update_vote(&mut self, vote: &VoteOf<C>) -> Result<(), RejectLeadership<C>> {
pub(crate) fn update_vote(&mut self, vote: &VoteOf<C>) -> Result<(), RejectVote<C>> {
// Partial ord compare:
// Vote does not have to be total ord.
// `!(a >= b)` does not imply `a < b`.
if vote.as_ref_vote() >= self.state.vote_ref().as_ref_vote() {
// Ok
} else {
tracing::info!("vote {} is rejected by local vote: {}", vote, self.state.vote_ref());
return Err(RejectLeadership::ByVote(self.state.vote_ref().clone()));
return Err(RejectVote {
higher: self.state.vote_ref().clone(),
});
}
tracing::debug!(%vote, "vote is changing to" );

Expand Down
50 changes: 26 additions & 24 deletions openraft/src/engine/tests/append_entries_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ use crate::engine::testing::log_id;
use crate::entry::RaftEntry;
use crate::errors::ConflictingLogId;
use crate::errors::RejectAppendEntries;
use crate::errors::RejectLeadership;
use crate::errors::RejectVote;
use crate::raft::LogSegment;
use crate::raft_state::IOId;
use crate::raft_state::LogStateReader;
use crate::testing::blank_ent;
Expand Down Expand Up @@ -60,12 +61,12 @@ fn eng() -> Engine<UTConfig> {
fn test_append_entries_vote_is_rejected() -> anyhow::Result<()> {
let mut eng = eng();

let res = eng.append_entries(&Vote::new(1, 1), None, Vec::<Entry<UTConfig>>::new());
let res = eng.append_entries(&Vote::new(1, 1), LogSegment::new(None, vec![]));

assert_eq!(
Err(RejectAppendEntries::RejectLeadership(RejectLeadership::ByVote(
Vote::new(2, 1)
))),
Err(RejectAppendEntries::RejectVote(RejectVote {
higher: Vote::new(2, 1),
})),
res
);
assert_eq!(None, eng.state.log_ids.purged());
Expand Down Expand Up @@ -100,11 +101,10 @@ fn test_append_entries_prev_log_id_is_applied() -> anyhow::Result<()> {

let res = eng.append_entries(
&Vote::new_committed(2, 1),
Some(log_id(0, 1, 0)),
Vec::<Entry<UTConfig>>::new(),
LogSegment::new(Some(log_id(0, 1, 0)), vec![]),
);

assert_eq!(Ok(()), res);
assert_eq!(Ok(Some(log_id(0, 1, 0))), res);
assert_eq!(None, eng.state.log_ids.purged());
assert_eq!(
&[
Expand Down Expand Up @@ -148,8 +148,7 @@ fn test_append_entries_prev_log_id_conflict() -> anyhow::Result<()> {

let res = eng.append_entries(
&Vote::new_committed(2, 1),
Some(log_id(2, 1, 2)),
Vec::<Entry<UTConfig>>::new(),
LogSegment::new(Some(log_id(2, 1, 2)), vec![]),
);

assert_eq!(
Expand Down Expand Up @@ -196,12 +195,12 @@ fn test_append_entries_prev_log_id_conflict() -> anyhow::Result<()> {
fn test_append_entries_prev_log_id_is_committed() -> anyhow::Result<()> {
let mut eng = eng();

let res = eng.append_entries(&Vote::new_committed(2, 1), Some(log_id(0, 1, 0)), vec![
blank_ent(1, 1, 1),
blank_ent(2, 1, 2),
]);
let res = eng.append_entries(
&Vote::new_committed(2, 1),
LogSegment::new(Some(log_id(0, 1, 0)), vec![blank_ent(1, 1, 1), blank_ent(2, 1, 2)]),
);

assert_eq!(Ok(()), res);
assert_eq!(Ok(Some(log_id(2, 1, 2))), res);
assert_eq!(None, eng.state.log_ids.purged());
assert_eq!(
&[
Expand Down Expand Up @@ -246,10 +245,10 @@ fn test_append_entries_prev_log_id_not_exists() -> anyhow::Result<()> {
eng.state.vote = Leased::new(UTConfig::<()>::now(), Duration::from_millis(500), Vote::new(1, 2));
eng.output.take_commands();

let res = eng.append_entries(&Vote::new_committed(2, 1), Some(log_id(2, 1, 4)), vec![
blank_ent(2, 1, 5),
blank_ent(2, 1, 6),
]);
let res = eng.append_entries(
&Vote::new_committed(2, 1),
LogSegment::new(Some(log_id(2, 1, 4)), vec![blank_ent(2, 1, 5), blank_ent(2, 1, 6)]),
);

assert_eq!(
Err(RejectAppendEntries::ConflictingLogId(ConflictingLogId {
Expand Down Expand Up @@ -298,12 +297,15 @@ fn test_append_entries_conflict() -> anyhow::Result<()> {
// It is no longer a member, change to learner
let mut eng = eng();

let resp = eng.append_entries(&Vote::new_committed(2, 1), Some(log_id(1, 1, 1)), vec![
blank_ent(1, 1, 2),
Entry::new_membership(log_id(3, 1, 3), m34()),
]);
let resp = eng.append_entries(
&Vote::new_committed(2, 1),
LogSegment::new(Some(log_id(1, 1, 1)), vec![
blank_ent(1, 1, 2),
Entry::new_membership(log_id(3, 1, 3), m34()),
]),
);

assert_eq!(Ok(()), resp);
assert_eq!(Ok(Some(log_id(3, 1, 3))), resp);
assert_eq!(None, eng.state.log_ids.purged());
assert_eq!(
&[
Expand Down
2 changes: 0 additions & 2 deletions openraft/src/errors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ mod membership_error;
mod node_not_found;
mod operation;
mod reject_append_entries;
mod reject_leadership;
mod reject_vote;
mod replication_closed;
pub(crate) mod replication_error;
Expand All @@ -41,7 +40,6 @@ pub use self::membership_error::MembershipError;
pub use self::node_not_found::NodeNotFound;
pub use self::operation::Operation;
pub(crate) use self::reject_append_entries::RejectAppendEntries;
pub(crate) use self::reject_leadership::RejectLeadership;
pub use self::reject_vote::RejectVote;
pub use self::replication_closed::ReplicationClosed;
pub(crate) use self::replication_error::ReplicationError;
Expand Down
Loading