Skip to content

Commit

Permalink
storage: put deadline in txn commands (tikv#10834)
Browse files Browse the repository at this point in the history
* storage: put deadline in txn commands

Signed-off-by: Yilin Chen <[email protected]>

* add tests

Signed-off-by: Yilin Chen <[email protected]>

Co-authored-by: Ti Chi Robot <[email protected]>
  • Loading branch information
sticnarf and ti-chi-bot authored Sep 9, 2021
1 parent 1e45323 commit f9f669b
Show file tree
Hide file tree
Showing 10 changed files with 125 additions and 19 deletions.
4 changes: 4 additions & 0 deletions src/storage/txn/commands/check_secondary_locks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,12 @@ pub mod tests {
use crate::storage::lock_manager::DummyLockManager;
use crate::storage::mvcc::tests::*;
use crate::storage::txn::commands::WriteCommand;
use crate::storage::txn::scheduler::DEFAULT_EXECUTION_DURATION_LIMIT;
use crate::storage::txn::tests::*;
use crate::storage::Engine;
use concurrency_manager::ConcurrencyManager;
use kvproto::kvrpcpb::Context;
use tikv_util::deadline::Deadline;

pub fn must_success<E: Engine>(
engine: &E,
Expand All @@ -184,6 +186,7 @@ pub mod tests {
ctx: ctx.clone(),
keys: vec![Key::from_raw(key)],
start_ts: lock_ts,
deadline: Deadline::from_now(DEFAULT_EXECUTION_DURATION_LIMIT),
};
let result = command
.process_write(
Expand Down Expand Up @@ -219,6 +222,7 @@ pub mod tests {
ctx: Default::default(),
keys: vec![key],
start_ts: ts,
deadline: Deadline::from_now(DEFAULT_EXECUTION_DURATION_LIMIT),
};
let result = command
.process_write(
Expand Down
4 changes: 4 additions & 0 deletions src/storage/txn/commands/check_txn_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,12 @@ pub mod tests {
use crate::storage::lock_manager::DummyLockManager;
use crate::storage::mvcc::tests::*;
use crate::storage::txn::commands::{pessimistic_rollback, WriteCommand, WriteContext};
use crate::storage::txn::scheduler::DEFAULT_EXECUTION_DURATION_LIMIT;
use crate::storage::txn::tests::*;
use crate::storage::{types::TxnStatus, ProcessResult, TestEngineBuilder};
use concurrency_manager::ConcurrencyManager;
use kvproto::kvrpcpb::Context;
use tikv_util::deadline::Deadline;
use txn_types::Key;
use txn_types::WriteType;

Expand Down Expand Up @@ -174,6 +176,7 @@ pub mod tests {
rollback_if_not_exist,
force_sync_commit,
resolving_pessimistic_lock,
deadline: Deadline::from_now(DEFAULT_EXECUTION_DURATION_LIMIT),
};
let result = command
.process_write(
Expand Down Expand Up @@ -219,6 +222,7 @@ pub mod tests {
rollback_if_not_exist,
force_sync_commit,
resolving_pessimistic_lock,
deadline: Deadline::from_now(DEFAULT_EXECUTION_DURATION_LIMIT),
};
assert!(
command
Expand Down
11 changes: 11 additions & 0 deletions src/storage/txn/commands/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ macro_rules! ctx {
fn get_ctx_mut(&mut self) -> &mut crate::storage::Context {
&mut self.ctx
}
fn deadline(&self) -> ::tikv_util::deadline::Deadline {
self.deadline
}
};
}

Expand All @@ -34,6 +37,7 @@ macro_rules! command {
$(#[$outer_doc])*
pub struct $cmd {
pub ctx: crate::storage::Context,
pub deadline: ::tikv_util::deadline::Deadline,
$($(#[$inner_doc])* pub $arg: $arg_ty,)*
}

Expand All @@ -43,8 +47,15 @@ macro_rules! command {
$($arg: $arg_ty,)*
ctx: crate::storage::Context,
) -> TypedCommand<$cmd_ty> {
let execution_duration_limit = if ctx.max_execution_duration_ms == 0 {
crate::storage::txn::scheduler::DEFAULT_EXECUTION_DURATION_LIMIT
} else {
::std::time::Duration::from_millis(ctx.max_execution_duration_ms)
};
let deadline = ::tikv_util::deadline::Deadline::from_now(execution_duration_limit);
Command::$cmd($cmd {
ctx,
deadline,
$($arg,)*
}).into()
}
Expand Down
7 changes: 7 additions & 0 deletions src/storage/txn/commands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub use resolve_lock::ResolveLock;
pub use resolve_lock_lite::ResolveLockLite;
pub use resolve_lock_readphase::ResolveLockReadPhase;
pub use rollback::Rollback;
use tikv_util::deadline::Deadline;
pub use txn_heart_beat::TxnHeartBeat;

pub use resolve_lock::RESOLVE_LOCK_BATCH_SIZE;
Expand Down Expand Up @@ -465,6 +466,8 @@ pub trait CommandExt: Display {

fn get_ctx_mut(&mut self) -> &mut Context;

fn deadline(&self) -> Deadline;

fn incr_cmd_metric(&self);

fn ts(&self) -> TimeStamp {
Expand Down Expand Up @@ -660,6 +663,10 @@ impl Command {
pub fn ctx_mut(&mut self) -> &mut Context {
self.command_ext_mut().get_ctx_mut()
}

pub fn deadline(&self) -> Deadline {
self.command_ext().deadline()
}
}

impl Display for Command {
Expand Down
3 changes: 3 additions & 0 deletions src/storage/txn/commands/pessimistic_rollback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,12 @@ pub mod tests {
use crate::storage::lock_manager::DummyLockManager;
use crate::storage::mvcc::tests::*;
use crate::storage::txn::commands::{WriteCommand, WriteContext};
use crate::storage::txn::scheduler::DEFAULT_EXECUTION_DURATION_LIMIT;
use crate::storage::txn::tests::*;
use crate::storage::TestEngineBuilder;
use concurrency_manager::ConcurrencyManager;
use kvproto::kvrpcpb::Context;
use tikv_util::deadline::Deadline;
use txn_types::Key;

pub fn must_success<E: Engine>(
Expand All @@ -122,6 +124,7 @@ pub mod tests {
keys: vec![Key::from_raw(key)],
start_ts,
for_update_ts,
deadline: Deadline::from_now(DEFAULT_EXECUTION_DURATION_LIMIT),
};
let lock_mgr = DummyLockManager;
let write_context = WriteContext {
Expand Down
8 changes: 7 additions & 1 deletion src/storage/txn/commands/resolve_lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,14 @@ impl<S: Snapshot, L: LockManager> WriteCommand<S, L> for ResolveLock {
let pr = if scan_key.is_none() {
ProcessResult::Res
} else {
let next_cmd = ResolveLockReadPhase {
ctx: ctx.clone(),
deadline: self.deadline,
txn_status,
scan_key,
};
ProcessResult::NextCommand {
cmd: ResolveLockReadPhase::new(txn_status, scan_key.take(), ctx.clone()).into(),
cmd: Command::ResolveLockReadPhase(next_cmd),
}
};
let mut write_data = WriteData::from_modifies(txn.into_modifies());
Expand Down
9 changes: 8 additions & 1 deletion src/storage/txn/commands/resolve_lock_readphase.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,15 @@ impl<S: Snapshot> ReadCommand<S> for ResolveLockReadPhase {
// All locks are scanned
None
};
let next_cmd = ResolveLock {
ctx,
deadline: self.deadline,
txn_status,
scan_key: next_scan_key,
key_locks: kv_pairs,
};
Ok(ProcessResult::NextCommand {
cmd: ResolveLock::new(txn_status, next_scan_key, kv_pairs, ctx).into(),
cmd: Command::ResolveLock(next_cmd),
})
}
}
Expand Down
4 changes: 4 additions & 0 deletions src/storage/txn/commands/txn_heart_beat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,12 @@ pub mod tests {
use crate::storage::lock_manager::DummyLockManager;
use crate::storage::mvcc::tests::*;
use crate::storage::txn::commands::WriteCommand;
use crate::storage::txn::scheduler::DEFAULT_EXECUTION_DURATION_LIMIT;
use crate::storage::txn::tests::*;
use crate::storage::Engine;
use concurrency_manager::ConcurrencyManager;
use kvproto::kvrpcpb::Context;
use tikv_util::deadline::Deadline;

pub fn must_success<E: Engine>(
engine: &E,
Expand All @@ -120,6 +122,7 @@ pub mod tests {
primary_key: Key::from_raw(primary_key),
start_ts,
advise_ttl,
deadline: Deadline::from_now(DEFAULT_EXECUTION_DURATION_LIMIT),
};
let result = command
.process_write(
Expand Down Expand Up @@ -159,6 +162,7 @@ pub mod tests {
primary_key: Key::from_raw(primary_key),
start_ts,
advise_ttl,
deadline: Deadline::from_now(DEFAULT_EXECUTION_DURATION_LIMIT),
};
assert!(
command
Expand Down
23 changes: 6 additions & 17 deletions src/storage/txn/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@ use futures::{select, FutureExt as _};
use kvproto::kvrpcpb::{CommandPri, DiskFullOpt, ExtraOp};
use kvproto::pdpb::QueryKind;
use resource_metering::{cpu::FutureExt, ResourceMeteringTag};
use tikv_util::{
callback::must_call, deadline::Deadline, time::Instant, timer::GLOBAL_TIMER_HANDLE,
};
use tikv_util::{callback::must_call, time::Instant, timer::GLOBAL_TIMER_HANDLE};
use txn_types::TimeStamp;

use crate::server::lock_manager::waiter_manager;
Expand Down Expand Up @@ -67,31 +65,22 @@ const TASKS_SLOTS_NUM: usize = 1 << 12; // 4096 slots.

// The default limit is set to be very large. Then, requests without `max_exectuion_duration`
// will not be aborted unexpectedly.
const DEFAULT_EXECUTION_DURATION_LIMIT: Duration = Duration::from_secs(24 * 60 * 60);
pub const DEFAULT_EXECUTION_DURATION_LIMIT: Duration = Duration::from_secs(24 * 60 * 60);

/// Task is a running command.
pub(super) struct Task {
pub(super) cid: u64,
pub(super) cmd: Command,
pub(super) extra_op: ExtraOp,
pub(super) deadline: Deadline,
}

impl Task {
/// Creates a task for a running command.
pub(super) fn new(cid: u64, cmd: Command) -> Task {
let max_execution_duration_ms = cmd.ctx().max_execution_duration_ms;
let execution_duration_limit = if max_execution_duration_ms == 0 {
DEFAULT_EXECUTION_DURATION_LIMIT
} else {
Duration::from_millis(max_execution_duration_ms)
};
let deadline = Deadline::from_now(execution_duration_limit);
Task {
cid,
cmd,
extra_op: ExtraOp::Noop,
deadline,
}
}
}
Expand Down Expand Up @@ -272,7 +261,7 @@ impl<L: LockManager> SchedulerInner<L> {
let tctx = task_slot.get_mut(&cid).unwrap();
// Check deadline early during acquiring latches to avoid expired requests blocking
// other requests.
if let Err(e) = tctx.task.as_ref().unwrap().deadline.check() {
if let Err(e) = tctx.task.as_ref().unwrap().cmd.deadline().check() {
// `acquire_lock_on_wakeup` is called when another command releases its locks and wakes up
// command `cid`. This command inserted its lock before and now the lock is at the
// front of the queue. The actual acquired count is one more than the `owned_count`
Expand Down Expand Up @@ -395,7 +384,7 @@ impl<E: Engine, L: LockManager> Scheduler<E, L> {
let tctx = task_slot
.entry(cid)
.or_insert_with(|| self.inner.new_task_context(Task::new(cid, cmd), callback));
let deadline = tctx.task.as_ref().unwrap().deadline;
let deadline = tctx.task.as_ref().unwrap().cmd.deadline();
if self.inner.latches.acquire(&mut tctx.lock, cid) {
fail_point!("txn_scheduler_acquire_success");
tctx.on_schedule();
Expand Down Expand Up @@ -780,7 +769,7 @@ impl<E: Engine, L: LockManager> Scheduler<E, L> {
.load(Ordering::Relaxed);
let pipelined = pipelined_pessimistic_lock && task.cmd.can_be_pipelined();

let deadline = task.deadline;
let deadline = task.cmd.deadline();
let write_result = {
let context = WriteContext {
lock_mgr: &self.inner.lock_mgr,
Expand Down Expand Up @@ -1007,7 +996,7 @@ impl<E: Engine, L: LockManager> Scheduler<E, L> {
/// the task with a `DeadlineExceeded` error.
#[inline]
fn check_task_deadline_exceeded(&self, task: &Task) -> bool {
if let Err(e) = task.deadline.check() {
if let Err(e) = task.cmd.deadline().check() {
self.finish_with_err(task.cid, e);
true
} else {
Expand Down
71 changes: 71 additions & 0 deletions tests/failpoints/cases/test_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1142,6 +1142,77 @@ fn test_before_propose_deadline() {
));
}

#[test]
fn test_resolve_lock_deadline() {
let mut cluster = new_server_cluster(0, 1);
cluster.run();

let engine = cluster.sim.read().unwrap().storages[&1].clone();
let storage = TestStorageBuilder::<_, DummyLockManager>::from_engine_and_lock_mgr(
engine,
DummyLockManager {},
)
.build()
.unwrap();

let mut ctx = Context::default();
ctx.set_region_id(1);
ctx.set_region_epoch(cluster.get_region_epoch(1));
ctx.set_peer(cluster.leader_of_region(1).unwrap());

// One resolve lock batch is 256 keys. So we need to prewrite more than that.
let mutations = (1i32..300)
.map(|i| {
let data = i.to_le_bytes();
Mutation::Put((Key::from_raw(&data), data.to_vec()))
})
.collect();
let cmd = commands::Prewrite::new(
mutations,
1i32.to_le_bytes().to_vec(),
10.into(),
1,
false,
299,
15.into(),
20.into(),
None,
false,
ctx.clone(),
);
let (tx, rx) = channel();
storage
.sched_txn_command(
cmd,
Box::new(move |res: storage::Result<_>| {
tx.send(res).unwrap();
}),
)
.unwrap();
assert!(rx.recv().unwrap().is_ok());

// Resolve lock, this needs two rounds, two process_read and two process_write.
// So it needs more than 400ms. It will exceed the deadline.
ctx.max_execution_duration_ms = 300;
fail::cfg("txn_before_process_read", "sleep(100)").unwrap();
fail::cfg("txn_before_process_write", "sleep(100)").unwrap();
let (tx, rx) = channel();
let mut txn_status = HashMap::default();
txn_status.insert(TimeStamp::new(10), TimeStamp::new(0));
storage
.sched_txn_command(
commands::ResolveLockReadPhase::new(txn_status, None, ctx),
Box::new(move |res: storage::Result<_>| {
tx.send(res).unwrap();
}),
)
.unwrap();
assert!(matches!(
rx.recv().unwrap(),
Err(StorageError(box StorageErrorInner::DeadlineExceeded))
));
}

/// Checks if concurrent transaction works correctly during shutdown.
///
/// During shutdown, all pending writes will fail with error so its latch will be released.
Expand Down

0 comments on commit f9f669b

Please sign in to comment.