diff --git a/src/storage/txn/commands/check_secondary_locks.rs b/src/storage/txn/commands/check_secondary_locks.rs index c52ec1a4acd..6c3d472cc42 100644 --- a/src/storage/txn/commands/check_secondary_locks.rs +++ b/src/storage/txn/commands/check_secondary_locks.rs @@ -165,10 +165,12 @@ pub mod tests { use crate::storage::lock_manager::DummyLockManager; use crate::storage::mvcc::tests::*; use crate::storage::txn::commands::WriteCommand; + use crate::storage::txn::scheduler::DEFAULT_EXECUTION_DURATION_LIMIT; use crate::storage::txn::tests::*; use crate::storage::Engine; use concurrency_manager::ConcurrencyManager; use kvproto::kvrpcpb::Context; + use tikv_util::deadline::Deadline; pub fn must_success( engine: &E, @@ -184,6 +186,7 @@ pub mod tests { ctx: ctx.clone(), keys: vec![Key::from_raw(key)], start_ts: lock_ts, + deadline: Deadline::from_now(DEFAULT_EXECUTION_DURATION_LIMIT), }; let result = command .process_write( @@ -219,6 +222,7 @@ pub mod tests { ctx: Default::default(), keys: vec![key], start_ts: ts, + deadline: Deadline::from_now(DEFAULT_EXECUTION_DURATION_LIMIT), }; let result = command .process_write( diff --git a/src/storage/txn/commands/check_txn_status.rs b/src/storage/txn/commands/check_txn_status.rs index c2974323509..564b16b8645 100644 --- a/src/storage/txn/commands/check_txn_status.rs +++ b/src/storage/txn/commands/check_txn_status.rs @@ -142,10 +142,12 @@ pub mod tests { use crate::storage::lock_manager::DummyLockManager; use crate::storage::mvcc::tests::*; use crate::storage::txn::commands::{pessimistic_rollback, WriteCommand, WriteContext}; + use crate::storage::txn::scheduler::DEFAULT_EXECUTION_DURATION_LIMIT; use crate::storage::txn::tests::*; use crate::storage::{types::TxnStatus, ProcessResult, TestEngineBuilder}; use concurrency_manager::ConcurrencyManager; use kvproto::kvrpcpb::Context; + use tikv_util::deadline::Deadline; use txn_types::Key; use txn_types::WriteType; @@ -174,6 +176,7 @@ pub mod tests { rollback_if_not_exist, force_sync_commit, resolving_pessimistic_lock, + deadline: Deadline::from_now(DEFAULT_EXECUTION_DURATION_LIMIT), }; let result = command .process_write( @@ -219,6 +222,7 @@ pub mod tests { rollback_if_not_exist, force_sync_commit, resolving_pessimistic_lock, + deadline: Deadline::from_now(DEFAULT_EXECUTION_DURATION_LIMIT), }; assert!( command diff --git a/src/storage/txn/commands/macros.rs b/src/storage/txn/commands/macros.rs index ae2bbd5335f..a0a923308fe 100644 --- a/src/storage/txn/commands/macros.rs +++ b/src/storage/txn/commands/macros.rs @@ -8,6 +8,9 @@ macro_rules! ctx { fn get_ctx_mut(&mut self) -> &mut crate::storage::Context { &mut self.ctx } + fn deadline(&self) -> ::tikv_util::deadline::Deadline { + self.deadline + } }; } @@ -34,6 +37,7 @@ macro_rules! command { $(#[$outer_doc])* pub struct $cmd { pub ctx: crate::storage::Context, + pub deadline: ::tikv_util::deadline::Deadline, $($(#[$inner_doc])* pub $arg: $arg_ty,)* } @@ -43,8 +47,15 @@ macro_rules! command { $($arg: $arg_ty,)* ctx: crate::storage::Context, ) -> TypedCommand<$cmd_ty> { + let execution_duration_limit = if ctx.max_execution_duration_ms == 0 { + crate::storage::txn::scheduler::DEFAULT_EXECUTION_DURATION_LIMIT + } else { + ::std::time::Duration::from_millis(ctx.max_execution_duration_ms) + }; + let deadline = ::tikv_util::deadline::Deadline::from_now(execution_duration_limit); Command::$cmd($cmd { ctx, + deadline, $($arg,)* }).into() } diff --git a/src/storage/txn/commands/mod.rs b/src/storage/txn/commands/mod.rs index ff37db3fc8b..b4451d8e666 100644 --- a/src/storage/txn/commands/mod.rs +++ b/src/storage/txn/commands/mod.rs @@ -37,6 +37,7 @@ pub use resolve_lock::ResolveLock; pub use resolve_lock_lite::ResolveLockLite; pub use resolve_lock_readphase::ResolveLockReadPhase; pub use rollback::Rollback; +use tikv_util::deadline::Deadline; pub use txn_heart_beat::TxnHeartBeat; pub use resolve_lock::RESOLVE_LOCK_BATCH_SIZE; @@ -465,6 +466,8 @@ pub trait CommandExt: Display { fn get_ctx_mut(&mut self) -> &mut Context; + fn deadline(&self) -> Deadline; + fn incr_cmd_metric(&self); fn ts(&self) -> TimeStamp { @@ -660,6 +663,10 @@ impl Command { pub fn ctx_mut(&mut self) -> &mut Context { self.command_ext_mut().get_ctx_mut() } + + pub fn deadline(&self) -> Deadline { + self.command_ext().deadline() + } } impl Display for Command { diff --git a/src/storage/txn/commands/pessimistic_rollback.rs b/src/storage/txn/commands/pessimistic_rollback.rs index a2735f2b5c2..cf43be1b0e7 100644 --- a/src/storage/txn/commands/pessimistic_rollback.rs +++ b/src/storage/txn/commands/pessimistic_rollback.rs @@ -100,10 +100,12 @@ pub mod tests { use crate::storage::lock_manager::DummyLockManager; use crate::storage::mvcc::tests::*; use crate::storage::txn::commands::{WriteCommand, WriteContext}; + use crate::storage::txn::scheduler::DEFAULT_EXECUTION_DURATION_LIMIT; use crate::storage::txn::tests::*; use crate::storage::TestEngineBuilder; use concurrency_manager::ConcurrencyManager; use kvproto::kvrpcpb::Context; + use tikv_util::deadline::Deadline; use txn_types::Key; pub fn must_success( @@ -122,6 +124,7 @@ pub mod tests { keys: vec![Key::from_raw(key)], start_ts, for_update_ts, + deadline: Deadline::from_now(DEFAULT_EXECUTION_DURATION_LIMIT), }; let lock_mgr = DummyLockManager; let write_context = WriteContext { diff --git a/src/storage/txn/commands/resolve_lock.rs b/src/storage/txn/commands/resolve_lock.rs index 18e50533405..6a229f46c74 100644 --- a/src/storage/txn/commands/resolve_lock.rs +++ b/src/storage/txn/commands/resolve_lock.rs @@ -131,8 +131,14 @@ impl WriteCommand for ResolveLock { let pr = if scan_key.is_none() { ProcessResult::Res } else { + let next_cmd = ResolveLockReadPhase { + ctx: ctx.clone(), + deadline: self.deadline, + txn_status, + scan_key, + }; ProcessResult::NextCommand { - cmd: ResolveLockReadPhase::new(txn_status, scan_key.take(), ctx.clone()).into(), + cmd: Command::ResolveLockReadPhase(next_cmd), } }; let mut write_data = WriteData::from_modifies(txn.into_modifies()); diff --git a/src/storage/txn/commands/resolve_lock_readphase.rs b/src/storage/txn/commands/resolve_lock_readphase.rs index 9bb4b6812c5..1a657687c34 100644 --- a/src/storage/txn/commands/resolve_lock_readphase.rs +++ b/src/storage/txn/commands/resolve_lock_readphase.rs @@ -62,8 +62,15 @@ impl ReadCommand for ResolveLockReadPhase { // All locks are scanned None }; + let next_cmd = ResolveLock { + ctx, + deadline: self.deadline, + txn_status, + scan_key: next_scan_key, + key_locks: kv_pairs, + }; Ok(ProcessResult::NextCommand { - cmd: ResolveLock::new(txn_status, next_scan_key, kv_pairs, ctx).into(), + cmd: Command::ResolveLock(next_cmd), }) } } diff --git a/src/storage/txn/commands/txn_heart_beat.rs b/src/storage/txn/commands/txn_heart_beat.rs index fa0a281658c..ef4f31358ec 100644 --- a/src/storage/txn/commands/txn_heart_beat.rs +++ b/src/storage/txn/commands/txn_heart_beat.rs @@ -99,10 +99,12 @@ pub mod tests { use crate::storage::lock_manager::DummyLockManager; use crate::storage::mvcc::tests::*; use crate::storage::txn::commands::WriteCommand; + use crate::storage::txn::scheduler::DEFAULT_EXECUTION_DURATION_LIMIT; use crate::storage::txn::tests::*; use crate::storage::Engine; use concurrency_manager::ConcurrencyManager; use kvproto::kvrpcpb::Context; + use tikv_util::deadline::Deadline; pub fn must_success( engine: &E, @@ -120,6 +122,7 @@ pub mod tests { primary_key: Key::from_raw(primary_key), start_ts, advise_ttl, + deadline: Deadline::from_now(DEFAULT_EXECUTION_DURATION_LIMIT), }; let result = command .process_write( @@ -159,6 +162,7 @@ pub mod tests { primary_key: Key::from_raw(primary_key), start_ts, advise_ttl, + deadline: Deadline::from_now(DEFAULT_EXECUTION_DURATION_LIMIT), }; assert!( command diff --git a/src/storage/txn/scheduler.rs b/src/storage/txn/scheduler.rs index 3bc7e3818c0..df32d168ceb 100644 --- a/src/storage/txn/scheduler.rs +++ b/src/storage/txn/scheduler.rs @@ -34,9 +34,7 @@ use futures::{select, FutureExt as _}; use kvproto::kvrpcpb::{CommandPri, DiskFullOpt, ExtraOp}; use kvproto::pdpb::QueryKind; use resource_metering::{cpu::FutureExt, ResourceMeteringTag}; -use tikv_util::{ - callback::must_call, deadline::Deadline, time::Instant, timer::GLOBAL_TIMER_HANDLE, -}; +use tikv_util::{callback::must_call, time::Instant, timer::GLOBAL_TIMER_HANDLE}; use txn_types::TimeStamp; use crate::server::lock_manager::waiter_manager; @@ -67,31 +65,22 @@ const TASKS_SLOTS_NUM: usize = 1 << 12; // 4096 slots. // The default limit is set to be very large. Then, requests without `max_exectuion_duration` // will not be aborted unexpectedly. -const DEFAULT_EXECUTION_DURATION_LIMIT: Duration = Duration::from_secs(24 * 60 * 60); +pub const DEFAULT_EXECUTION_DURATION_LIMIT: Duration = Duration::from_secs(24 * 60 * 60); /// Task is a running command. pub(super) struct Task { pub(super) cid: u64, pub(super) cmd: Command, pub(super) extra_op: ExtraOp, - pub(super) deadline: Deadline, } impl Task { /// Creates a task for a running command. pub(super) fn new(cid: u64, cmd: Command) -> Task { - let max_execution_duration_ms = cmd.ctx().max_execution_duration_ms; - let execution_duration_limit = if max_execution_duration_ms == 0 { - DEFAULT_EXECUTION_DURATION_LIMIT - } else { - Duration::from_millis(max_execution_duration_ms) - }; - let deadline = Deadline::from_now(execution_duration_limit); Task { cid, cmd, extra_op: ExtraOp::Noop, - deadline, } } } @@ -272,7 +261,7 @@ impl SchedulerInner { let tctx = task_slot.get_mut(&cid).unwrap(); // Check deadline early during acquiring latches to avoid expired requests blocking // other requests. - if let Err(e) = tctx.task.as_ref().unwrap().deadline.check() { + if let Err(e) = tctx.task.as_ref().unwrap().cmd.deadline().check() { // `acquire_lock_on_wakeup` is called when another command releases its locks and wakes up // command `cid`. This command inserted its lock before and now the lock is at the // front of the queue. The actual acquired count is one more than the `owned_count` @@ -395,7 +384,7 @@ impl Scheduler { let tctx = task_slot .entry(cid) .or_insert_with(|| self.inner.new_task_context(Task::new(cid, cmd), callback)); - let deadline = tctx.task.as_ref().unwrap().deadline; + let deadline = tctx.task.as_ref().unwrap().cmd.deadline(); if self.inner.latches.acquire(&mut tctx.lock, cid) { fail_point!("txn_scheduler_acquire_success"); tctx.on_schedule(); @@ -780,7 +769,7 @@ impl Scheduler { .load(Ordering::Relaxed); let pipelined = pipelined_pessimistic_lock && task.cmd.can_be_pipelined(); - let deadline = task.deadline; + let deadline = task.cmd.deadline(); let write_result = { let context = WriteContext { lock_mgr: &self.inner.lock_mgr, @@ -1007,7 +996,7 @@ impl Scheduler { /// the task with a `DeadlineExceeded` error. #[inline] fn check_task_deadline_exceeded(&self, task: &Task) -> bool { - if let Err(e) = task.deadline.check() { + if let Err(e) = task.cmd.deadline().check() { self.finish_with_err(task.cid, e); true } else { diff --git a/tests/failpoints/cases/test_storage.rs b/tests/failpoints/cases/test_storage.rs index 048aa8a0416..0d4cc5a990f 100644 --- a/tests/failpoints/cases/test_storage.rs +++ b/tests/failpoints/cases/test_storage.rs @@ -1142,6 +1142,77 @@ fn test_before_propose_deadline() { )); } +#[test] +fn test_resolve_lock_deadline() { + let mut cluster = new_server_cluster(0, 1); + cluster.run(); + + let engine = cluster.sim.read().unwrap().storages[&1].clone(); + let storage = TestStorageBuilder::<_, DummyLockManager>::from_engine_and_lock_mgr( + engine, + DummyLockManager {}, + ) + .build() + .unwrap(); + + let mut ctx = Context::default(); + ctx.set_region_id(1); + ctx.set_region_epoch(cluster.get_region_epoch(1)); + ctx.set_peer(cluster.leader_of_region(1).unwrap()); + + // One resolve lock batch is 256 keys. So we need to prewrite more than that. + let mutations = (1i32..300) + .map(|i| { + let data = i.to_le_bytes(); + Mutation::Put((Key::from_raw(&data), data.to_vec())) + }) + .collect(); + let cmd = commands::Prewrite::new( + mutations, + 1i32.to_le_bytes().to_vec(), + 10.into(), + 1, + false, + 299, + 15.into(), + 20.into(), + None, + false, + ctx.clone(), + ); + let (tx, rx) = channel(); + storage + .sched_txn_command( + cmd, + Box::new(move |res: storage::Result<_>| { + tx.send(res).unwrap(); + }), + ) + .unwrap(); + assert!(rx.recv().unwrap().is_ok()); + + // Resolve lock, this needs two rounds, two process_read and two process_write. + // So it needs more than 400ms. It will exceed the deadline. + ctx.max_execution_duration_ms = 300; + fail::cfg("txn_before_process_read", "sleep(100)").unwrap(); + fail::cfg("txn_before_process_write", "sleep(100)").unwrap(); + let (tx, rx) = channel(); + let mut txn_status = HashMap::default(); + txn_status.insert(TimeStamp::new(10), TimeStamp::new(0)); + storage + .sched_txn_command( + commands::ResolveLockReadPhase::new(txn_status, None, ctx), + Box::new(move |res: storage::Result<_>| { + tx.send(res).unwrap(); + }), + ) + .unwrap(); + assert!(matches!( + rx.recv().unwrap(), + Err(StorageError(box StorageErrorInner::DeadlineExceeded)) + )); +} + /// Checks if concurrent transaction works correctly during shutdown. /// /// During shutdown, all pending writes will fail with error so its latch will be released.