Skip to content

Commit

Permalink
*: upgrade kvproto (tikv#10041)
Browse files Browse the repository at this point in the history
* update kvproto

Signed-off-by: linning <[email protected]>

* make ci happy

Signed-off-by: linning <[email protected]>

* address comment

Signed-off-by: linning <[email protected]>
  • Loading branch information
NingLin-P authored Apr 19, 2021
1 parent 69aba17 commit c6cbec4
Show file tree
Hide file tree
Showing 10 changed files with 43 additions and 21 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions components/cdc/src/delegate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ impl Delegate {
"conn_id" => ?downstream.get_conn_id(),
"req_id" => downstream.req_id,
"err" => ?e);
let err = Error::Request(e.into());
let err = Error::request(e.into());
let change_data_error = self.error_event(err);
downstream.sink_event(change_data_error);
return false;
Expand Down Expand Up @@ -495,7 +495,7 @@ impl Delegate {
} else {
let err_header = response.mut_header().take_error();
self.mark_failed();
return Err(Error::Request(err_header));
return Err(Error::request(err_header));
}
}
Ok(())
Expand Down Expand Up @@ -811,7 +811,7 @@ impl Delegate {
_ => return Ok(()),
};
self.mark_failed();
Err(Error::Request(store_err.into()))
Err(Error::request(store_err.into()))
}
}

Expand Down Expand Up @@ -976,21 +976,21 @@ mod tests {

let mut err_header = ErrorHeader::default();
err_header.set_not_leader(Default::default());
delegate.stop(Error::Request(err_header));
delegate.stop(Error::request(err_header));
let err = receive_error();
assert!(err.has_not_leader());
// Enable is disabled by any error.
assert!(!enabled.load(Ordering::SeqCst));

let mut err_header = ErrorHeader::default();
err_header.set_region_not_found(Default::default());
delegate.stop(Error::Request(err_header));
delegate.stop(Error::request(err_header));
let err = receive_error();
assert!(err.has_region_not_found());

let mut err_header = ErrorHeader::default();
err_header.set_epoch_not_match(Default::default());
delegate.stop(Error::Request(err_header));
delegate.stop(Error::request(err_header));
let err = receive_error();
assert!(err.has_epoch_not_match());

Expand Down
14 changes: 7 additions & 7 deletions components/cdc/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,7 @@ impl<T: 'static + RaftStoreRouter<RocksEngine>> Endpoint<T> {
},
) {
warn!("cdc send capture change cmd failed"; "region_id" => region_id, "error" => ?e);
deregister_downstream(Error::Request(e.into()));
deregister_downstream(Error::request(e.into()));
return;
}

Expand Down Expand Up @@ -873,7 +873,7 @@ impl<T: 'static + RaftStoreRouter<RocksEngine>> Endpoint<T> {
let deregister = Deregister::Region {
observe_id,
region_id,
err: Error::Request(e.into()),
err: Error::request(e.into()),
};
if let Err(e) = scheduler_clone.schedule(Task::Deregister(deregister)) {
error!("schedule cdc task failed"; "error" => ?e);
Expand Down Expand Up @@ -1113,7 +1113,7 @@ impl Initializer {
let deregister = Deregister::Region {
region_id: self.region_id,
observe_id: self.observe_id,
err: Error::Request(err),
err: Error::request(err),
};
if let Err(e) = self.sched.schedule(Task::Deregister(deregister)) {
error!("schedule cdc task failed"; "error" => ?e);
Expand Down Expand Up @@ -2175,7 +2175,7 @@ mod tests {
region_id: 1,
downstream_id,
conn_id,
err: Some(Error::Request(err_header.clone())),
err: Some(Error::request(err_header.clone())),
};
ep.run(Task::Deregister(deregister));
'outer: loop {
Expand Down Expand Up @@ -2208,7 +2208,7 @@ mod tests {
region_id: 1,
downstream_id,
conn_id,
err: Some(Error::Request(err_header.clone())),
err: Some(Error::request(err_header.clone())),
};
ep.run(Task::Deregister(deregister));
assert!(harness.recv_timeout(Duration::from_millis(200)).is_err());
Expand All @@ -2218,7 +2218,7 @@ mod tests {
region_id: 1,
downstream_id: new_downstream_id,
conn_id,
err: Some(Error::Request(err_header.clone())),
err: Some(Error::request(err_header.clone())),
};
ep.run(Task::Deregister(deregister));
'outer1: loop {
Expand Down Expand Up @@ -2250,7 +2250,7 @@ mod tests {
region_id: 1,
// A stale ObserveID (different from the actual one).
observe_id: ObserveID::new(),
err: Error::Request(err_header),
err: Error::request(err_header),
};
ep.run(Task::Deregister(deregister));
match harness.recv_timeout(Duration::from_millis(500)) {
Expand Down
10 changes: 8 additions & 2 deletions components/cdc/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub enum Error {
#[error("Mvcc error {0}")]
Mvcc(#[from] MvccError),
#[error("Request error {0:?}")]
Request(ErrorHeader),
Request(Box<ErrorHeader>),
#[error("Engine traits error {0}")]
EngineTraits(#[from] EngineTraitsError),
#[error("Incremental scan timed out {0:?}")]
Expand All @@ -37,6 +37,12 @@ pub enum Error {
GetRealTimeStartFailed,
}

impl Error {
pub fn request(err: ErrorHeader) -> Error {
Error::Request(Box::new(err))
}
}

macro_rules! impl_from {
($($inner:ty => $container:ident,)+) => {
$(
Expand Down Expand Up @@ -66,7 +72,7 @@ impl Error {
| Error::Txn(TxnError(box TxnErrorInner::Mvcc(MvccError(
box MvccErrorInner::Engine(EngineError(box EngineErrorInner::Request(e))),
))))
| Error::Request(e) => e,
| Error::Request(box e) => e,
other => {
let mut e = ErrorHeader::default();
e.set_message(format!("{:?}", other));
Expand Down
4 changes: 2 additions & 2 deletions components/cdc/src/observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ impl RoleObserver for CdcObserver {
let deregister = Deregister::Region {
region_id,
observe_id,
err: CdcError::Request(store_err.into()),
err: CdcError::request(store_err.into()),
};
if let Err(e) = self.sched.schedule(Task::Deregister(deregister)) {
error!("schedule cdc task failed"; "error" => ?e);
Expand All @@ -215,7 +215,7 @@ impl RegionChangeObserver for CdcObserver {
let deregister = Deregister::Region {
region_id,
observe_id,
err: CdcError::Request(store_err.into()),
err: CdcError::request(store_err.into()),
};
if let Err(e) = self.sched.schedule(Task::Deregister(deregister)) {
error!("schedule cdc task failed"; "error" => ?e);
Expand Down
1 change: 1 addition & 0 deletions components/external_storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ pub fn url_of_backend(backend: &StorageBackend) -> url::Url {
}
u.set_path(gcs.get_prefix());
}
Some(Backend::CloudDynamic(_)) => unimplemented!(),
None => {}
}
u
Expand Down
8 changes: 6 additions & 2 deletions components/resolved_ts/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub enum Error {
#[error("Mvcc error {0}")]
Mvcc(#[from] MvccError),
#[error("Request error {0:?}")]
Request(ErrorHeader),
Request(Box<ErrorHeader>),
#[error("Engine traits error {0}")]
EngineTraits(#[from] EngineTraitsError),
#[error("Txn types error {0}")]
Expand All @@ -33,6 +33,10 @@ pub enum Error {
}

impl Error {
pub fn request(err: ErrorHeader) -> Error {
Error::Request(Box::new(err))
}

pub fn extract_error_header(self) -> ErrorHeader {
match self {
Error::Engine(EngineError(box EngineErrorInner::Request(e)))
Expand All @@ -42,7 +46,7 @@ impl Error {
| Error::Txn(TxnError(box TxnErrorInner::Mvcc(MvccError(
box MvccErrorInner::Engine(EngineError(box EngineErrorInner::Request(e))),
))))
| Error::Request(e) => e,
| Error::Request(box e) => e,
other => {
let mut e = ErrorHeader::default();
e.set_message(format!("{:?}", other));
Expand Down
2 changes: 1 addition & 1 deletion components/resolved_ts/src/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ impl<T: 'static + RaftStoreRouter<E>, E: KvEngine> ScannerPool<T, E> {
)?;
let mut resp = box_try!(fut.await);
if resp.response.get_header().has_error() {
return Err(Error::Request(resp.response.take_header().take_error()));
return Err(Error::request(resp.response.take_header().take_error()));
}
Ok(resp.snapshot.unwrap())
}
Expand Down
9 changes: 9 additions & 0 deletions src/server/service/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1085,6 +1085,15 @@ impl<T: RaftStoreRouter<RocksEngine> + 'static, E: Engine, L: LockManager> Tikv

ctx.spawn(task);
}

fn get_store_safe_ts(
&mut self,
_: RpcContext<'_>,
_: StoreSafeTsRequest,
_: UnarySink<StoreSafeTsResponse>,
) {
unimplemented!()
}
}

fn response_batch_commands_request<F>(
Expand Down
2 changes: 2 additions & 0 deletions tests/integrations/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ trait MockKvService {
unary_call!(read_index, ReadIndexRequest, ReadIndexResponse);
bstream_call!(batch_commands, BatchCommandsRequest, BatchCommandsResponse);
unary_call!(check_leader, CheckLeaderRequest, CheckLeaderResponse);
unary_call!(get_store_safe_ts, StoreSafeTsRequest, StoreSafeTsResponse);
}

impl<T: MockKvService + Clone + Send + 'static> Tikv for MockKv<T> {
Expand Down Expand Up @@ -365,6 +366,7 @@ impl<T: MockKvService + Clone + Send + 'static> Tikv for MockKv<T> {
unary_call_dispatch!(read_index, ReadIndexRequest, ReadIndexResponse);
bstream_call_dispatch!(batch_commands, BatchCommandsRequest, BatchCommandsResponse);
unary_call_dispatch!(check_leader, CheckLeaderRequest, CheckLeaderResponse);
unary_call_dispatch!(get_store_safe_ts, StoreSafeTsRequest, StoreSafeTsResponse);
}

fn mock_kv_service<T>(kv: MockKv<T>, ip: &str, port: u16) -> Result<Server>
Expand Down

0 comments on commit c6cbec4

Please sign in to comment.