diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index f243a66f7..9ce1d8283 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -85,6 +85,7 @@ use crate::progress::stream_id::StreamId; use crate::quorum::QuorumSet; use crate::raft::AppendEntriesRequest; use crate::raft::ClientWriteResult; +use crate::raft::LogSegment; use crate::raft::ReadPolicy; use crate::raft::StreamAppendError; use crate::raft::VoteRequest; @@ -1474,7 +1475,8 @@ where pub(super) fn handle_append_entries_request(&mut self, req: AppendEntriesRequest, tx: AppendEntriesTx) { tracing::debug!("{}: req: {}", func_name!(), req); - self.engine.handle_append_entries(&req.vote, req.prev_log_id, req.entries, tx); + let segment = LogSegment::new(req.prev_log_id, req.entries); + self.engine.handle_append_entries(&req.vote, segment, tx); // Record append entries to external metrics recorder if let Some(r) = &self.metrics_recorder { diff --git a/openraft/src/core/raft_msg/mod.rs b/openraft/src/core/raft_msg/mod.rs index a4d033e13..b37cec7a0 100644 --- a/openraft/src/core/raft_msg/mod.rs +++ b/openraft/src/core/raft_msg/mod.rs @@ -16,7 +16,6 @@ use crate::errors::InitializeError; use crate::errors::LinearizableReadError; use crate::impls::ProgressResponder; use crate::raft::AppendEntriesRequest; -use crate::raft::AppendEntriesResponse; use crate::raft::ClientWriteResult; use crate::raft::ReadPolicy; use crate::raft::SnapshotResponse; @@ -24,6 +23,7 @@ use crate::raft::VoteRequest; use crate::raft::VoteResponse; use crate::raft::linearizable_read::Linearizer; use crate::raft::responder::core_responder::CoreResponder; +use crate::raft::stream_append::StreamAppendResult; use crate::storage::Snapshot; use crate::type_config::alias::CommittedLeaderIdOf; use crate::type_config::alias::OneshotSenderOf; @@ -43,7 +43,7 @@ pub(crate) type ResultSender = OneshotSenderOf = OneshotSenderOf>; /// TX for Append Entries Response -pub(crate) type AppendEntriesTx = OneshotSenderOf>; +pub(crate) type AppendEntriesTx = OneshotSenderOf>; /// TX for Linearizable Read Response pub(crate) type ClientReadTx = ResultSender, LinearizableReadError>; diff --git a/openraft/src/engine/command.rs b/openraft/src/engine/command.rs index 13416f3f2..613c52318 100644 --- a/openraft/src/engine/command.rs +++ b/openraft/src/engine/command.rs @@ -15,12 +15,12 @@ use crate::engine::replication_progress::TargetProgress; use crate::errors::InitializeError; use crate::errors::InstallSnapshotError; use crate::progress::inflight_id::InflightId; -use crate::raft::AppendEntriesResponse; use crate::raft::InstallSnapshotResponse; use crate::raft::SnapshotResponse; use crate::raft::VoteRequest; use crate::raft::VoteResponse; use crate::raft::message::TransferLeaderRequest; +use crate::raft::stream_append::StreamAppendResult; use crate::raft_state::IOId; use crate::raft_state::IOState; use crate::replication::ReplicationSessionId; @@ -421,7 +421,7 @@ pub(crate) enum Respond where C: RaftTypeConfig { Vote(ValueSender>), - AppendEntries(ValueSender>), + AppendEntries(ValueSender>), ReceiveSnapshotChunk(ValueSender>), InstallSnapshot(ValueSender, InstallSnapshotError>>), InstallFullSnapshot(ValueSender>), @@ -434,7 +434,10 @@ where C: RaftTypeConfig fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { Respond::Vote(vs) => write!(f, "Vote {}", vs.value()), - Respond::AppendEntries(vs) => write!(f, "AppendEntries {}", vs.value()), + Respond::AppendEntries(vs) => match vs.value() { + Ok(log_id) => write!(f, "AppendEntries Ok({})", log_id.display()), + Err(e) => write!(f, "AppendEntries Err({})", e), + }, Respond::ReceiveSnapshotChunk(vs) => { write!( f, diff --git a/openraft/src/engine/engine_impl.rs b/openraft/src/engine/engine_impl.rs index 114f07723..82dfff7bf 100644 --- a/openraft/src/engine/engine_impl.rs +++ b/openraft/src/engine/engine_impl.rs @@ -9,7 +9,6 @@ use crate::core::ServerState; use crate::core::raft_msg::AppendEntriesTx; use crate::core::sm; use crate::display_ext::DisplayOptionExt; -use crate::display_ext::DisplaySliceExt; use crate::engine::Command; use crate::engine::Condition; use crate::engine::EngineOutput; @@ -35,10 +34,11 @@ use crate::proposer::Leader; use crate::proposer::LeaderQuorumSet; use crate::proposer::LeaderState; use crate::proposer::leader_state::CandidateState; -use crate::raft::AppendEntriesResponse; +use crate::raft::LogSegment; use crate::raft::SnapshotResponse; use crate::raft::VoteRequest; use crate::raft::VoteResponse; +use crate::raft::stream_append::StreamAppendResult; use crate::raft_state::IOId; use crate::raft_state::LogStateReader; use crate::raft_state::RaftState; @@ -363,29 +363,19 @@ where C: RaftTypeConfig /// /// Also clean conflicting entries and update membership state. #[tracing::instrument(level = "debug", skip_all)] - pub(crate) fn handle_append_entries( - &mut self, - vote: &VoteOf, - prev_log_id: Option>, - entries: Vec, - tx: AppendEntriesTx, - ) { + pub(crate) fn handle_append_entries(&mut self, vote: &VoteOf, segment: LogSegment, tx: AppendEntriesTx) { tracing::debug!( - "{}: vote: {}, prev_log_id: {}, entries: {}, my_vote: {}, my_last_log_id: {}", + "{}: vote: {}, segment: {}, my_vote: {}, my_last_log_id: {}", func_name!(), vote, - prev_log_id.display(), - entries.display(), + segment, self.state.vote_ref(), self.state.last_log_id().display() ); - let res = self.append_entries(vote, prev_log_id, entries); - let is_ok = res.is_ok(); + let stream_result: StreamAppendResult = self.append_entries(vote, segment).map_err(Into::into); - let resp: AppendEntriesResponse = res.into(); - - let condition = if is_ok { + let condition = if stream_result.is_ok() { Some(Condition::IOFlushed { io_id: self.state.accepted_log_io().unwrap().clone(), }) @@ -395,25 +385,26 @@ where C: RaftTypeConfig self.output.push_command(Command::Respond { when: condition, - resp: Respond::new(resp, tx), + resp: Respond::new(stream_result, tx), }); } pub(crate) fn append_entries( &mut self, vote: &VoteOf, - prev_log_id: Option>, - entries: Vec, - ) -> Result<(), RejectAppendEntries> { + segment: LogSegment, + ) -> Result>, RejectAppendEntries> { self.vote_handler().update_vote(vote)?; // Vote is legal. + let last = segment.last(); + let mut fh = self.following_handler(); - fh.ensure_log_consecutive(prev_log_id.as_ref())?; - fh.append_entries(prev_log_id, entries); + fh.ensure_log_consecutive(segment.prev_log_id.as_ref())?; + fh.append_entries(segment.prev_log_id, segment.entries); - Ok(()) + Ok(last) } /// Install a completely received snapshot on a follower. diff --git a/openraft/src/engine/handler/vote_handler/handle_message_vote_test.rs b/openraft/src/engine/handler/vote_handler/handle_message_vote_test.rs index 74f3c2c96..230b88928 100644 --- a/openraft/src/engine/handler/vote_handler/handle_message_vote_test.rs +++ b/openraft/src/engine/handler/vote_handler/handle_message_vote_test.rs @@ -15,7 +15,7 @@ use crate::engine::Engine; use crate::engine::LogIdList; use crate::engine::testing::UTConfig; use crate::engine::testing::log_id; -use crate::errors::RejectLeadership; +use crate::errors::RejectVote; use crate::type_config::TypeConfigExt; use crate::utime::Leased; @@ -50,7 +50,12 @@ fn test_handle_message_vote_reject_smaller_vote() -> anyhow::Result<()> { let resp = eng.vote_handler().update_vote(&Vote::new(1, 2)); - assert_eq!(Err(RejectLeadership::ByVote(Vote::new_committed(2, 1))), resp); + assert_eq!( + Err(RejectVote { + higher: Vote::new_committed(2, 1), + }), + resp + ); assert_eq!(Vote::new_committed(2, 1), *eng.state.vote_ref()); assert!(eng.leader.is_some()); diff --git a/openraft/src/engine/handler/vote_handler/mod.rs b/openraft/src/engine/handler/vote_handler/mod.rs index 09245722b..063c8151b 100644 --- a/openraft/src/engine/handler/vote_handler/mod.rs +++ b/openraft/src/engine/handler/vote_handler/mod.rs @@ -16,7 +16,7 @@ use crate::engine::handler::leader_handler::LeaderHandler; use crate::engine::handler::replication_handler::ReplicationHandler; use crate::engine::handler::server_state_handler::ServerStateHandler; use crate::entry::payload::EntryPayload; -use crate::errors::RejectLeadership; +use crate::errors::RejectVote; use crate::proposer::CandidateState; use crate::proposer::LeaderState; use crate::raft_state::IOId; @@ -70,7 +70,7 @@ where C: RaftTypeConfig where T: Debug + Eq + OptionalSend, Respond: From>, - F: Fn(&RaftState, RejectLeadership) -> T, + F: Fn(&RaftState, RejectVote) -> T, { let vote_res = self.update_vote(vote); @@ -101,7 +101,7 @@ where C: RaftTypeConfig /// Note: This method does not check last-log-id. handle-vote-request has to deal with /// last-log-id itself. #[tracing::instrument(level = "debug", skip_all)] - pub(crate) fn update_vote(&mut self, vote: &VoteOf) -> Result<(), RejectLeadership> { + pub(crate) fn update_vote(&mut self, vote: &VoteOf) -> Result<(), RejectVote> { // Partial ord compare: // Vote does not have to be total ord. // `!(a >= b)` does not imply `a < b`. @@ -109,7 +109,9 @@ where C: RaftTypeConfig // Ok } else { tracing::info!("vote {} is rejected by local vote: {}", vote, self.state.vote_ref()); - return Err(RejectLeadership::ByVote(self.state.vote_ref().clone())); + return Err(RejectVote { + higher: self.state.vote_ref().clone(), + }); } tracing::debug!(%vote, "vote is changing to" ); diff --git a/openraft/src/engine/tests/append_entries_test.rs b/openraft/src/engine/tests/append_entries_test.rs index 34f3d22ce..a84d84b7c 100644 --- a/openraft/src/engine/tests/append_entries_test.rs +++ b/openraft/src/engine/tests/append_entries_test.rs @@ -18,7 +18,8 @@ use crate::engine::testing::log_id; use crate::entry::RaftEntry; use crate::errors::ConflictingLogId; use crate::errors::RejectAppendEntries; -use crate::errors::RejectLeadership; +use crate::errors::RejectVote; +use crate::raft::LogSegment; use crate::raft_state::IOId; use crate::raft_state::LogStateReader; use crate::testing::blank_ent; @@ -60,12 +61,12 @@ fn eng() -> Engine { fn test_append_entries_vote_is_rejected() -> anyhow::Result<()> { let mut eng = eng(); - let res = eng.append_entries(&Vote::new(1, 1), None, Vec::>::new()); + let res = eng.append_entries(&Vote::new(1, 1), LogSegment::new(None, vec![])); assert_eq!( - Err(RejectAppendEntries::RejectLeadership(RejectLeadership::ByVote( - Vote::new(2, 1) - ))), + Err(RejectAppendEntries::RejectVote(RejectVote { + higher: Vote::new(2, 1), + })), res ); assert_eq!(None, eng.state.log_ids.purged()); @@ -100,11 +101,10 @@ fn test_append_entries_prev_log_id_is_applied() -> anyhow::Result<()> { let res = eng.append_entries( &Vote::new_committed(2, 1), - Some(log_id(0, 1, 0)), - Vec::>::new(), + LogSegment::new(Some(log_id(0, 1, 0)), vec![]), ); - assert_eq!(Ok(()), res); + assert_eq!(Ok(Some(log_id(0, 1, 0))), res); assert_eq!(None, eng.state.log_ids.purged()); assert_eq!( &[ @@ -148,8 +148,7 @@ fn test_append_entries_prev_log_id_conflict() -> anyhow::Result<()> { let res = eng.append_entries( &Vote::new_committed(2, 1), - Some(log_id(2, 1, 2)), - Vec::>::new(), + LogSegment::new(Some(log_id(2, 1, 2)), vec![]), ); assert_eq!( @@ -196,12 +195,12 @@ fn test_append_entries_prev_log_id_conflict() -> anyhow::Result<()> { fn test_append_entries_prev_log_id_is_committed() -> anyhow::Result<()> { let mut eng = eng(); - let res = eng.append_entries(&Vote::new_committed(2, 1), Some(log_id(0, 1, 0)), vec![ - blank_ent(1, 1, 1), - blank_ent(2, 1, 2), - ]); + let res = eng.append_entries( + &Vote::new_committed(2, 1), + LogSegment::new(Some(log_id(0, 1, 0)), vec![blank_ent(1, 1, 1), blank_ent(2, 1, 2)]), + ); - assert_eq!(Ok(()), res); + assert_eq!(Ok(Some(log_id(2, 1, 2))), res); assert_eq!(None, eng.state.log_ids.purged()); assert_eq!( &[ @@ -246,10 +245,10 @@ fn test_append_entries_prev_log_id_not_exists() -> anyhow::Result<()> { eng.state.vote = Leased::new(UTConfig::<()>::now(), Duration::from_millis(500), Vote::new(1, 2)); eng.output.take_commands(); - let res = eng.append_entries(&Vote::new_committed(2, 1), Some(log_id(2, 1, 4)), vec![ - blank_ent(2, 1, 5), - blank_ent(2, 1, 6), - ]); + let res = eng.append_entries( + &Vote::new_committed(2, 1), + LogSegment::new(Some(log_id(2, 1, 4)), vec![blank_ent(2, 1, 5), blank_ent(2, 1, 6)]), + ); assert_eq!( Err(RejectAppendEntries::ConflictingLogId(ConflictingLogId { @@ -298,12 +297,15 @@ fn test_append_entries_conflict() -> anyhow::Result<()> { // It is no longer a member, change to learner let mut eng = eng(); - let resp = eng.append_entries(&Vote::new_committed(2, 1), Some(log_id(1, 1, 1)), vec![ - blank_ent(1, 1, 2), - Entry::new_membership(log_id(3, 1, 3), m34()), - ]); + let resp = eng.append_entries( + &Vote::new_committed(2, 1), + LogSegment::new(Some(log_id(1, 1, 1)), vec![ + blank_ent(1, 1, 2), + Entry::new_membership(log_id(3, 1, 3), m34()), + ]), + ); - assert_eq!(Ok(()), resp); + assert_eq!(Ok(Some(log_id(3, 1, 3))), resp); assert_eq!(None, eng.state.log_ids.purged()); assert_eq!( &[ diff --git a/openraft/src/errors/mod.rs b/openraft/src/errors/mod.rs index 6fa90ad2a..4e5dc24c5 100644 --- a/openraft/src/errors/mod.rs +++ b/openraft/src/errors/mod.rs @@ -14,7 +14,6 @@ mod membership_error; mod node_not_found; mod operation; mod reject_append_entries; -mod reject_leadership; mod reject_vote; mod replication_closed; pub(crate) mod replication_error; @@ -41,7 +40,6 @@ pub use self::membership_error::MembershipError; pub use self::node_not_found::NodeNotFound; pub use self::operation::Operation; pub(crate) use self::reject_append_entries::RejectAppendEntries; -pub(crate) use self::reject_leadership::RejectLeadership; pub use self::reject_vote::RejectVote; pub use self::replication_closed::ReplicationClosed; pub(crate) use self::replication_error::ReplicationError; diff --git a/openraft/src/errors/reject_append_entries.rs b/openraft/src/errors/reject_append_entries.rs index dbf1f74a7..f087fb27c 100644 --- a/openraft/src/errors/reject_append_entries.rs +++ b/openraft/src/errors/reject_append_entries.rs @@ -2,49 +2,41 @@ use peel_off::Peel; use crate::RaftTypeConfig; use crate::errors::ConflictingLogId; -use crate::errors::RejectLeadership; -use crate::raft::AppendEntriesResponse; +use crate::errors::RejectVote; +use crate::raft::StreamAppendError; #[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)] pub(crate) enum RejectAppendEntries { #[error("reject AppendEntries by a greater vote: {0}")] - RejectLeadership(RejectLeadership), + RejectVote(RejectVote), #[error("reject AppendEntries due to conflicting log id: {0}")] ConflictingLogId(ConflictingLogId), } -impl From> for RejectAppendEntries { - fn from(r: RejectLeadership) -> Self { - RejectAppendEntries::RejectLeadership(r) +impl From> for RejectAppendEntries { + fn from(r: RejectVote) -> Self { + RejectAppendEntries::RejectVote(r) } } -impl From>> for AppendEntriesResponse { - fn from(r: Result<(), RejectAppendEntries>) -> Self { - match r { - Ok(_) => AppendEntriesResponse::Success, - Err(e) => match e { - RejectAppendEntries::RejectLeadership(r) => match r { - RejectLeadership::ByVote(v) => AppendEntriesResponse::HigherVote(v), - RejectLeadership::ByLastLogId(_) => { - unreachable!("the leader should always has a greater last log id") - } - }, - RejectAppendEntries::ConflictingLogId(_) => AppendEntriesResponse::Conflict, - }, +impl From> for StreamAppendError { + fn from(e: RejectAppendEntries) -> Self { + match e { + RejectAppendEntries::RejectVote(r) => StreamAppendError::HigherVote(r.higher), + RejectAppendEntries::ConflictingLogId(c) => StreamAppendError::Conflict(c.expect), } } } -/// Peel off `RejectVoteRequest`, leaving `ConflictingLogId` as the residual. +/// Peel off `RejectVote`, leaving `ConflictingLogId` as the residual. impl Peel for RejectAppendEntries { - type Peeled = RejectLeadership; + type Peeled = RejectVote; type Residual = ConflictingLogId; - fn peel(self) -> Result, RejectLeadership> { + fn peel(self) -> Result, RejectVote> { match self { - RejectAppendEntries::RejectLeadership(e) => Err(e), + RejectAppendEntries::RejectVote(e) => Err(e), RejectAppendEntries::ConflictingLogId(e) => Ok(e), } } diff --git a/openraft/src/raft/api/protocol.rs b/openraft/src/raft/api/protocol.rs index 831691088..c8edc41bf 100644 --- a/openraft/src/raft/api/protocol.rs +++ b/openraft/src/raft/api/protocol.rs @@ -93,7 +93,8 @@ where C: RaftTypeConfig tracing::debug!("Raft::append_entries: rpc: {}", rpc); let (tx, rx) = C::oneshot(); - self.inner.call_core(RaftMsg::AppendEntries { rpc, tx }, rx).await + let stream_result: StreamAppendResult = self.inner.call_core(RaftMsg::AppendEntries { rpc, tx }, rx).await?; + Ok(AppendEntriesResponse::from(stream_result)) } #[since(version = "0.10.0")] diff --git a/openraft/src/raft/message/append_entries_response.rs b/openraft/src/raft/message/append_entries_response.rs index 55addb16b..81fcbcabe 100644 --- a/openraft/src/raft/message/append_entries_response.rs +++ b/openraft/src/raft/message/append_entries_response.rs @@ -94,6 +94,16 @@ where C: RaftTypeConfig } } +impl From> for AppendEntriesResponse { + fn from(r: StreamAppendResult) -> Self { + match r { + Ok(_) => AppendEntriesResponse::Success, + Err(StreamAppendError::Conflict(_)) => AppendEntriesResponse::Conflict, + Err(StreamAppendError::HigherVote(v)) => AppendEntriesResponse::HigherVote(v), + } + } +} + impl fmt::Display for AppendEntriesResponse where C: RaftTypeConfig { diff --git a/openraft/src/raft/message/log_segment.rs b/openraft/src/raft/message/log_segment.rs new file mode 100644 index 000000000..974fc6817 --- /dev/null +++ b/openraft/src/raft/message/log_segment.rs @@ -0,0 +1,75 @@ +use std::fmt; + +use crate::RaftTypeConfig; +use crate::display_ext::DisplayOptionExt; +use crate::display_ext::DisplaySliceExt; +use crate::entry::RaftEntry; +use crate::type_config::alias::LogIdOf; + +/// A contiguous segment of log entries, anchored by the entry immediately preceding it. +/// +/// - `prev_log_id`: the log id of the entry just before this segment; `None` means the segment +/// starts at the very beginning of the log. +/// - `entries`: the entries in this segment, each consecutive to the previous. +pub struct LogSegment { + pub prev_log_id: Option>, + pub entries: Vec, +} + +impl LogSegment { + pub fn new(prev_log_id: Option>, entries: Vec) -> Self { + Self { prev_log_id, entries } + } + + /// Returns the log id of the last entry in this segment, or `prev_log_id` if empty. + pub fn last(&self) -> Option> { + self.entries.last().map(|e| e.log_id()).or_else(|| self.prev_log_id.clone()) + } +} + +impl fmt::Display for LogSegment +where C::Entry: fmt::Display +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "prev:{}, entries:{}", + self.prev_log_id.display(), + self.entries.display() + ) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::engine::testing::UTConfig; + use crate::engine::testing::log_id; + use crate::testing::blank_ent; + + type Seg = LogSegment; + + #[test] + fn test_last_empty_no_prev() { + let seg = Seg::new(None, vec![]); + assert_eq!(None, seg.last()); + } + + #[test] + fn test_last_empty_with_prev() { + let seg = Seg::new(Some(log_id(1, 1, 5)), vec![]); + assert_eq!(Some(log_id(1, 1, 5)), seg.last()); + } + + #[test] + fn test_last_with_entries() { + let seg = Seg::new(Some(log_id(1, 1, 5)), vec![blank_ent(2, 1, 6), blank_ent(2, 1, 7)]); + assert_eq!(Some(log_id(2, 1, 7)), seg.last()); + } + + #[test] + fn test_display() { + let seg = Seg::new(Some(log_id(1, 1, 5)), vec![blank_ent(2, 1, 6)]); + assert_eq!("prev:T1-N1.5, entries:[T2-N1.6:blank]", seg.to_string()); + } +} diff --git a/openraft/src/raft/message/mod.rs b/openraft/src/raft/message/mod.rs index 0dc81ca49..11763c374 100644 --- a/openraft/src/raft/message/mod.rs +++ b/openraft/src/raft/message/mod.rs @@ -6,6 +6,7 @@ mod append_entries_request; mod append_entries_response; mod install_snapshot; +mod log_segment; mod stream_append_error; mod transfer_leader; mod vote; @@ -21,6 +22,7 @@ pub use client_write::ClientWriteResult; pub use install_snapshot::InstallSnapshotRequest; pub use install_snapshot::InstallSnapshotResponse; pub use install_snapshot::SnapshotResponse; +pub use log_segment::LogSegment; pub use stream_append_error::StreamAppendError; pub use transfer_leader::TransferLeaderRequest; pub use vote::VoteRequest; diff --git a/openraft/src/raft/mod.rs b/openraft/src/raft/mod.rs index 5ca341445..3c96e4d44 100644 --- a/openraft/src/raft/mod.rs +++ b/openraft/src/raft/mod.rs @@ -17,7 +17,7 @@ pub(crate) mod message; mod raft_inner; pub mod responder; mod runtime_config_handle; -mod stream_append; +pub(crate) mod stream_append; pub mod trigger; mod watch_handle; @@ -46,6 +46,7 @@ pub use message::ClientWriteResponse; pub use message::ClientWriteResult; pub use message::InstallSnapshotRequest; pub use message::InstallSnapshotResponse; +pub use message::LogSegment; pub use message::SnapshotResponse; pub use message::StreamAppendError; pub use message::TransferLeaderRequest; diff --git a/openraft/src/raft/stream_append.rs b/openraft/src/raft/stream_append.rs index 5bb6076bb..da83d0131 100644 --- a/openraft/src/raft/stream_append.rs +++ b/openraft/src/raft/stream_append.rs @@ -9,9 +9,7 @@ use crate::AsyncRuntime; use crate::OptionalSend; use crate::RaftTypeConfig; use crate::core::raft_msg::RaftMsg; -use crate::log_id_range::LogIdRange; use crate::raft::AppendEntriesRequest; -use crate::raft::AppendEntriesResponse; use crate::raft::StreamAppendError; use crate::raft::raft_inner::RaftInner; use crate::type_config::alias::LogIdOf; @@ -26,8 +24,7 @@ pub type StreamAppendResult = Result>, StreamAppendError const PIPELINE_BUFFER_SIZE: usize = 64; struct Pending { - response_rx: OneshotReceiverOf>, - log_id_range: LogIdRange, + response_rx: OneshotReceiverOf>, } /// Create a pipelined stream that processes AppendEntries requests. @@ -37,7 +34,7 @@ struct Pending { /// /// On error (Conflict or HigherVote), the stream terminates immediately. /// The background task exits when it fails to send to the dropped channel. -pub(crate) fn stream_append( +pub(in crate::raft) fn stream_append( inner: Arc>, input: S, ) -> impl Stream> + OptionalSend + 'static @@ -47,23 +44,17 @@ where { let (tx, rx) = C::mpsc::>(PIPELINE_BUFFER_SIZE); - let inner_clone = inner.clone(); - let _join_handle = C::AsyncRuntime::spawn(async move { futures_util::pin_mut!(input); while let Some(req) = input.next().await { - let log_id_range = req.log_id_range(); let (resp_tx, resp_rx) = C::oneshot(); - if inner_clone.send_msg(RaftMsg::AppendEntries { rpc: req, tx: resp_tx }).await.is_err() { + if inner.send_msg(RaftMsg::AppendEntries { rpc: req, tx: resp_tx }).await.is_err() { break; } - let pending = Pending { - response_rx: resp_rx, - log_id_range, - }; + let pending = Pending { response_rx: resp_rx }; if MpscSender::send(&tx, pending).await.is_err() { break; @@ -71,18 +62,22 @@ where } }); - futures_util::stream::unfold(Some((rx, inner)), |state| async move { - let (mut rx, inner) = state?; + futures_util::stream::unfold(Some(rx), |state| async move { + let mut rx = state?; let p: Pending = MpscReceiver::recv(&mut rx).await?; - let resp = inner.recv_msg(p.response_rx).await.ok()?; - let range = p.log_id_range; - let result = resp.into_stream_result(range.prev, range.last); + let result: StreamAppendResult = match p.response_rx.await { + Ok(r) => r, + Err(e) => { + tracing::error!("stream_append: failed to receive response from RaftCore: {}", e); + return None; + } + }; if result.is_err() { return Some((result, None)); } - Some((result, Some((rx, inner)))) + Some((result, Some(rx))) }) }