diff --git a/Cargo.toml b/Cargo.toml index b9a0861..bc140f8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -57,7 +57,7 @@ repository = "https://github.com/tailscale/tailscale-rs" # if the toolchain is too old, unconditionally failing the compile, while our policy is just that we # only actively support the last two stables -- older compilers should still work as long as they # can still compile the project. This field is the de facto MSRV (the oldest compiler that works). -rust-version = "1.91.0" +rust-version = "1.92.0" [workspace.dependencies] aead = { version = "0.5", features = ["std"] } diff --git a/ts_kv_store/src/index.rs b/ts_kv_store/src/index.rs index 4e860a4..9900702 100644 --- a/ts_kv_store/src/index.rs +++ b/ts_kv_store/src/index.rs @@ -1,8 +1,11 @@ -use std::{borrow::Borrow, hash::Hash}; +use std::{ + borrow::Borrow, + hash::Hash, + sync::{RwLockReadGuard, RwLockWriteGuard}, +}; use crate::{ - KvStore, Owner, RefReadGuard, RefWriteGuard, RefWriteGuardMut, Result, RoTransaction, - Transaction, + KvStore, Owner, Result, RoTransaction, Transaction, operations::{Base, BaseKey, BaseValue, IndexValue, IndexedOps, IndexedOpsMut, Ops, OpsMut}, schema::{IndexDesc, TableDesc}, storage::Storage, @@ -27,7 +30,7 @@ impl<'idx, D: IndexDesc> Ops for &'idx KvTableIndex<'_, D> { type ReadLock = std::sync::RwLockReadGuard<'idx, Storage>; fn read_lock(self) -> Self::ReadLock { - self.store.storage.read().unwrap() + self.store.get_read_lock() } } @@ -35,7 +38,7 @@ impl<'idx, D: IndexDesc> OpsMut for &'idx KvTableIndex<'_, D> { type WriteLock = std::sync::RwLockWriteGuard<'idx, Storage>; fn write_lock(self) -> Self::WriteLock { - self.store.storage.write().unwrap() + self.store.get_write_lock() } } @@ -50,7 +53,7 @@ impl<'store, D: IndexDesc> KvTableIndex<'store, D> { /// Initialize the base table by setting its owner (indexes don't have an owner). /// /// Calling this function is optional, a table can be used without initialization in which case, - /// its owner is set to the owner specifed in the first write. + /// its owner is set to the owner specified in the first write. /// /// Returns an error (containing the current owner of the table) if the table has already been /// initialized. In this case, the table will be in a consistent state and can be used as normal. @@ -104,7 +107,7 @@ impl<'store, D: IndexDesc> KvTableIndex<'store, D> { /// Insert a value into the table using the base table's key. /// /// Returns the previous value if there is one, or `None` if there is no value for the specified key. - pub fn insert(&self, key: BaseKey, value: BaseValue) -> Option> + pub fn insert(&self, key: BaseKey, value: BaseValue) where <::BaseTable as TableDesc>::Key: Clone, IndexValue: Eq + Hash, @@ -120,6 +123,7 @@ impl<'store, D: IndexDesc> KvTableIndex<'store, D> { D::Key: Borrow, Q: ?Sized + Hash + Eq, BaseKey: Clone, + BaseValue: Clone, IndexValue: Eq + Hash, { <&Self as IndexedOpsMut<_>>::with_mut(self, key, f, self.owner) @@ -128,11 +132,11 @@ impl<'store, D: IndexDesc> KvTableIndex<'store, D> { /// Remove a row from the table. /// /// Returns the previous value if there is one, or `None` if there is no value for the specified key. - pub fn remove(&self, key: &Q) -> Option> + pub fn remove(&self, key: &Q) where D::Key: Borrow, - Q: ?Sized + Hash + Eq, - IndexValue: Eq + Hash, + Q: ?Sized + Hash + Eq + ToOwned, + IndexValue: Eq + Hash + ToOwned>, { <&Self as IndexedOpsMut<_>>::remove(self, key, self.owner) } @@ -170,7 +174,8 @@ impl<'store, D: IndexDesc> KvTableIndex<'store, D> { /// Iterate all the key/value pairs in a table. Values are mutable. pub fn for_each_mut(&self, f: impl FnMut(&D::Key, &mut BaseValue)) where - IndexValue: Eq + Hash, + IndexValue: Eq + Hash + Clone, + BaseValue: Clone, { <&Self as IndexedOpsMut<_>>::for_each_mut(self, f, self.owner) } @@ -184,26 +189,26 @@ impl<'store, D: IndexDesc> KvTableIndex<'store, D> { /// SAFETY: `D` and `B` must describe different tables (this is enforced by the macros, but possible /// to violate if building a schema by hand). pub struct KvTableTransactionalIndex<'guard, 'txn, D: IndexDesc> { - pub(crate) txn: &'txn mut Transaction<'guard, Storage>, + pub(crate) txn: &'txn mut Transaction<'guard, D::Storage>, } impl<'guard, 'txn, 'a, D: IndexDesc> Ops for &'a KvTableTransactionalIndex<'guard, 'txn, D> { - type ReadLock = RefWriteGuard<'a, 'guard, Storage>; + type ReadLock = &'a RwLockWriteGuard<'guard, Storage>; fn read_lock(self) -> Self::ReadLock { - RefWriteGuard(&self.txn.guard) + &self.txn.guard } } impl<'guard, 'txn, 'a, D: IndexDesc> OpsMut for &'a mut KvTableTransactionalIndex<'guard, 'txn, D> { - type WriteLock = RefWriteGuardMut<'a, 'guard, Storage>; + type WriteLock = &'a mut RwLockWriteGuard<'guard, Storage>; fn write_lock(self) -> Self::WriteLock { - RefWriteGuardMut(&mut self.txn.guard) + &mut self.txn.guard } } @@ -223,7 +228,7 @@ impl<'guard, 'txn, D: IndexDesc> KvTableTransactionalIndex<'guard, 'txn, D> { /// Initialize the base table by setting its owner (indexes don't have an owner). /// /// Calling this function is optional, a table can be used without initialization in which case, - /// its owner is set to the owner specifed in the first write. + /// its owner is set to the owner specified in the first write. /// /// Returns an error (containing the current owner of the table) if the table has already been /// initialized. In this case, the table will be in a consistent state and can be used as normal. @@ -277,7 +282,7 @@ impl<'guard, 'txn, D: IndexDesc> KvTableTransactionalIndex<'guard, 'txn, D> { /// Insert a value into the table using the base table's key. /// /// Returns the previous value if there is one, or `None` if there is no value for the specified key. - pub fn insert(&mut self, key: BaseKey, value: BaseValue) -> Option> + pub fn insert(&mut self, key: BaseKey, value: BaseValue) where BaseKey: Clone, IndexValue: Eq + Hash, @@ -293,6 +298,7 @@ impl<'guard, 'txn, D: IndexDesc> KvTableTransactionalIndex<'guard, 'txn, D> { D::Key: Borrow, Q: ?Sized + Hash + Eq, BaseKey: Clone, + BaseValue: Clone, IndexValue: Eq + Hash, { <&mut Self as IndexedOpsMut<_>>::with_mut::(self, key, f, self.txn.owner) @@ -301,11 +307,11 @@ impl<'guard, 'txn, D: IndexDesc> KvTableTransactionalIndex<'guard, 'txn, D> { /// Remove a row from the table. /// /// Returns the previous value if there is one, or `None` if there is no value for the specified key. - pub fn remove(&mut self, key: &Q) -> Option> + pub fn remove(&mut self, key: &Q) where D::Key: Borrow, - Q: ?Sized + Hash + Eq, - IndexValue: Eq + Hash, + Q: ?Sized + Hash + Eq + ToOwned, + IndexValue: Eq + Hash + ToOwned>, { <&mut Self as IndexedOpsMut<_>>::remove::(self, key, self.txn.owner) } @@ -342,7 +348,8 @@ impl<'guard, 'txn, D: IndexDesc> KvTableTransactionalIndex<'guard, 'txn, D> { /// Iterate all the key/value pairs in a table. Values are mutable. pub fn for_each_mut(&mut self, f: impl FnMut(&D::Key, &mut BaseValue)) where - IndexValue: Eq + Hash, + IndexValue: Eq + Hash + Clone, + BaseValue: Clone, { <&mut Self as IndexedOpsMut<_>>::for_each_mut(self, f, self.txn.owner) } @@ -356,16 +363,16 @@ impl<'guard, 'txn, D: IndexDesc> KvTableTransactionalIndex<'guard, 'txn, D> { /// SAFETY: `D` and `B` must describe different tables (this is enforced by the macros, but possible /// to violate if building a schema by hand). pub struct KvTableRoTransactionalIndex<'guard, 'txn, D: IndexDesc> { - pub(crate) txn: &'txn RoTransaction<'guard, Storage>, + pub(crate) txn: &'txn RoTransaction<'guard, D::Storage>, } impl<'guard, 'txn, 'a, D: IndexDesc> Ops for &'a KvTableRoTransactionalIndex<'guard, 'txn, D> { - type ReadLock = RefReadGuard<'a, 'guard, Storage>; + type ReadLock = &'a RwLockReadGuard<'guard, Storage>; fn read_lock(self) -> Self::ReadLock { - RefReadGuard(&self.txn.guard) + &self.txn.guard } } @@ -580,29 +587,6 @@ mod test { assert_eq!(len, Some(5)); } - #[test] - fn index_insert_returns_none_on_first() { - let store = KvStore::new(); - assert!( - store - .table_by::(OWNER) - .insert(1, row("Alice")) - .is_none() - ); - } - - #[test] - fn index_insert_returns_previous_value_when_overwriting_base_key() { - let store = KvStore::new(); - store - .table_by::(OWNER) - .insert(1, row("Alice")); - let old = store - .table_by::(OWNER) - .insert(1, row("Alice")); - assert_eq!(old, Some(row("Alice"))); - } - #[test] fn index_insert_is_visible_via_base_table() { let store = KvStore::new(); @@ -625,25 +609,6 @@ mod test { ); } - #[test] - fn index_remove_returns_none_when_absent() { - let store = KvStore::new(); - assert!( - store - .table_by::(OWNER) - .remove("Alice") - .is_none() - ); - } - - #[test] - fn index_remove_returns_value() { - let store = KvStore::new(); - store.table::(OWNER).insert(1, row("Alice")); - let value = store.table_by::(OWNER).remove("Alice"); - assert_eq!(value, Some(row("Alice"))); - } - #[test] fn index_remove_makes_base_absent() { let store = KvStore::new(); @@ -1274,8 +1239,6 @@ mod test_transactional_index { #[cfg(debug_assertions)] const OTHER: &str = "other"; - // Group A: KvTableTransactionalIndex - index maintenance - #[test] fn txn_index_get_returns_none_when_absent() { let store = KvStore::new(); @@ -1292,6 +1255,21 @@ mod test_transactional_index { txn.table_by::().get("Alice"), Some(row("Alice")) ); + txn.commit().unwrap(); + assert_eq!( + store.table_by::(OWNER).get("Alice"), + Some(row("Alice")) + ); + + let mut txn = store.begin_transaction(OWNER); + txn.table_by::().remove("Alice"); + txn.commit().unwrap(); + assert!( + store + .table_by::(OWNER) + .get("Alice") + .is_none() + ); } #[test] @@ -1363,6 +1341,8 @@ mod test_transactional_index { txn.table_by::().insert(1, row("Alice")); txn.table_by::().remove("Alice"); assert!(txn.table::().get(&1).is_none()); + txn.commit().unwrap(); + assert!(store.table::(OWNER).get(&1).is_none()); } #[test] @@ -1393,8 +1373,6 @@ mod test_transactional_index { ); } - // Group B: KvTableTransactional (base table) - index maintenance - #[test] fn txn_base_insert_updates_index() { let store = KvStore::new(); @@ -1460,8 +1438,6 @@ mod test_transactional_index { ); } - // Group C: KvTableRoTransactionalIndex - reads via index in RO transaction - #[test] fn ro_txn_index_get_returns_none_when_absent() { let store = KvStore::new(); @@ -1525,8 +1501,6 @@ mod test_transactional_index { assert_eq!(names, vec!["Alice".to_owned(), "Bob".to_owned()]); } - // Group D: Ownership enforcement - #[test] #[cfg(debug_assertions)] #[should_panic(expected = "Ownership violation")] @@ -1556,4 +1530,97 @@ mod test_transactional_index { let mut txn = store.begin_transaction(OTHER); txn.table_by::().for_each_mut(|_, _| {}); } + + #[test] + fn txn_index_insert_rolled_back_on_drop() { + let store = KvStore::new(); + { + let mut txn = store.begin_transaction(OWNER); + txn.table_by::().insert(1, row("Alice")); + } + assert!( + store + .table_by::(OWNER) + .get("Alice") + .is_none() + ); + assert!(store.table::(OWNER).get(&1).is_none()); + } + + #[test] + fn txn_index_base_mutate_indexed_field_rolled_back() { + let store = KvStore::new(); + store.table::(OWNER).insert(1, row("Alice")); + { + let mut txn = store.begin_transaction(OWNER); + txn.table_by::() + .with_mut("Alice", |v| v.name = "Bob".to_owned()); + } + assert_eq!( + store.table_by::(OWNER).get("Alice"), + Some(row("Alice")) + ); + assert!( + store + .table_by::(OWNER) + .get("Bob") + .is_none() + ); + assert_eq!(store.table::(OWNER).get(&1), Some(row("Alice"))); + } + + #[test] + fn txn_index_remove_rolled_back_on_drop() { + let store = KvStore::new(); + store.table::(OWNER).insert(1, row("Alice")); + { + let mut txn = store.begin_transaction(OWNER); + txn.table_by::().remove("Alice"); + } + assert_eq!( + store.table_by::(OWNER).get("Alice"), + Some(row("Alice")) + ); + assert_eq!(store.table::(OWNER).get(&1), Some(row("Alice"))); + } + + #[test] + fn txn_index_clear_rolled_back_on_drop() { + let store = KvStore::new(); + store.table::(OWNER).insert(1, row("Alice")); + store.table::(OWNER).insert(2, row("Bob")); + { + let mut txn = store.begin_transaction(OWNER); + txn.table_by::().clear(); + } + assert_eq!( + store.table_by::(OWNER).get("Alice"), + Some(row("Alice")) + ); + assert_eq!( + store.table_by::(OWNER).get("Bob"), + Some(row("Bob")) + ); + } + + #[test] + fn raw_base_then_txn_index_insert_commit_both_visible() { + let store = KvStore::new(); + store.table::(OWNER).insert(1, row("Alice")); + + let mut txn = store.begin_transaction(OWNER); + txn.table_by::().insert(2, row("Bob")); + txn.commit().unwrap(); + + assert_eq!( + store.table_by::(OWNER).get("Alice"), + Some(row("Alice")) + ); + assert_eq!( + store.table_by::(OWNER).get("Bob"), + Some(row("Bob")) + ); + assert_eq!(store.table::(OWNER).get(&1), Some(row("Alice"))); + assert_eq!(store.table::(OWNER).get(&2), Some(row("Bob"))); + } } diff --git a/ts_kv_store/src/iter.rs b/ts_kv_store/src/iter.rs index 678940c..56dd300 100644 --- a/ts_kv_store/src/iter.rs +++ b/ts_kv_store/src/iter.rs @@ -1,10 +1,12 @@ //! Iterate over a table -use std::{hash::Hash, marker::PhantomData, ops::Deref}; +use std::{hash::Hash, marker::PhantomData}; use crate::{ + operations::StorageGuard, schema::{IndexDesc, TableDesc}, - storage::{Storage, Table}, + storage::{Table, TableIterator as InnerIterator}, + transactions::TxnId, }; /// Phantom type to iterate over keys. @@ -20,9 +22,6 @@ pub struct KeysAndValues; type Indexes = Table<::BaseTable, <::BaseTable as TableDesc>::Indexes>; -type TableIter<'guard, D> = - std::collections::hash_map::Iter<'guard, ::Key, ::Value>; - /// An iterator for a single table (described by the generic parameter `D`) in the KV store. /// /// This is basically just a wrapper for an iterator over the `HashMap` representing the table. @@ -34,15 +33,15 @@ pub struct TableIterator<'guard, Guard, D: TableDesc, Kind> { /// /// Invariants: /// - `inner.is_some()` once `new` has completed. - inner: Option>, - _kind: PhantomData, + inner: Option>, + _kind: PhantomData<(D, Kind)>, } impl<'guard, Guard, D: TableDesc, Kind> TableIterator<'guard, Guard, D, Kind> { /// Create an iterator over the table described by `D`. pub(crate) fn new(guard: Guard) -> Self where - Guard: Deref> + 'guard, + Guard: StorageGuard + 'guard, D: 'guard, { let mut result = TableIterator { @@ -50,7 +49,10 @@ impl<'guard, Guard, D: TableDesc, Kind> TableIterator<'guard, Guard, D, Kind> { inner: None, _kind: PhantomData, }; - result.inner = Some(inner_iter::(&result.guard)); + result.inner = Some(inner_iter::( + &result.guard, + result.guard.storage().txn_id(), + )); result } @@ -59,7 +61,9 @@ impl<'guard, Guard, D: TableDesc, Kind> TableIterator<'guard, Guard, D, Kind> { } } -impl<'guard, Guard, D: TableDesc> Iterator for TableIterator<'guard, Guard, D, KeysAndValues> { +impl<'guard, Guard: StorageGuard, D: TableDesc> Iterator + for TableIterator<'guard, Guard, D, KeysAndValues> +{ type Item = (&'guard D::Key, &'guard D::Value); fn next(&mut self) -> Option { @@ -77,7 +81,9 @@ impl<'guard, Guard, D: TableDesc> Iterator for TableIterator<'guard, Guard, D, K } } -impl<'guard, Guard, D: TableDesc> Iterator for TableIterator<'guard, Guard, D, Values> { +impl<'guard, Guard: StorageGuard, D: TableDesc> Iterator + for TableIterator<'guard, Guard, D, Values> +{ type Item = &'guard D::Value; fn next(&mut self) -> Option { @@ -86,7 +92,7 @@ impl<'guard, Guard, D: TableDesc> Iterator for TableIterator<'guard, Guard, D, V } } -impl Drop for TableIterator<'_, Guard, D, Kind> { +impl<'guard, Guard, D: TableDesc, Kind> Drop for TableIterator<'guard, Guard, D, Kind> { fn drop(&mut self) { // Ensure that `self.inner` is dropped before `self.guard`. self.inner = None; @@ -102,7 +108,7 @@ pub struct IndexIterator<'guard, Guard, D: IndexDesc, Kind> { /// /// Invariants: /// - `inner.is_some()` once `new` has completed. - inner: Option<(&'guard Indexes, TableIter<'guard, D>)>, + inner: Option<(&'guard Indexes, InnerIterator<'guard, D>)>, _kind: PhantomData, } @@ -110,7 +116,7 @@ impl<'guard, Guard, D: IndexDesc, Kind> IndexIterator<'guard, Guard, D, Kind> { /// Create an iterator over the table described by `D` (the index). pub(crate) fn new(guard: Guard) -> Self where - Guard: Deref> + 'guard, + Guard: StorageGuard + 'guard, D: 'guard, { let mut result = IndexIterator { @@ -119,21 +125,22 @@ impl<'guard, Guard, D: IndexDesc, Kind> IndexIterator<'guard, Guard, D, Kind> { _kind: PhantomData, }; - let base_table = ::get_table(&result.guard.tables); + let base_table = ::get_table(&result.guard.storage().tables); result.inner = Some(( // SAFETY: for the same reasoning as `inner_iter`, we're extending the lifetime of this // internal reference to 'guard. This is safe for the same reasons, i.e. we ensure that - // guard is dropped first. + // guard is dropped last. unsafe { &*(base_table as *const _) }, - inner_iter::(&result.guard), + inner_iter::(&result.guard, result.guard.storage().txn_id()), )); result } } -impl<'guard, Guard, D: IndexDesc> Iterator for IndexIterator<'guard, Guard, D, KeysAndValues> +impl<'guard, Guard: StorageGuard, D: IndexDesc> Iterator + for IndexIterator<'guard, Guard, D, KeysAndValues> where D::Value: Hash + Eq, { @@ -144,7 +151,7 @@ where let (base, iter) = self.inner.as_mut().unwrap(); let (k, bk) = iter.next()?; - Some((k, base.data.get(bk)?)) + Some((k, base.get(bk, self.guard.storage().txn_id())?)) } } @@ -158,7 +165,8 @@ impl<'guard, Guard, D: IndexDesc> Iterator for IndexIterator<'guard, Guard, D, K } } -impl<'guard, Guard, D: IndexDesc> Iterator for IndexIterator<'guard, Guard, D, Values> +impl<'guard, Guard: StorageGuard, D: IndexDesc> Iterator + for IndexIterator<'guard, Guard, D, Values> where D::Value: Hash + Eq, { @@ -169,11 +177,11 @@ where let (base, iter) = self.inner.as_mut().unwrap(); let (_, bk) = iter.next()?; - base.data.get(bk) + base.get(bk, self.guard.storage().txn_id()) } } -impl Drop for IndexIterator<'_, Guard, D, Kind> { +impl<'guard, Guard, D: IndexDesc, Kind> Drop for IndexIterator<'guard, Guard, D, Kind> { fn drop(&mut self) { // Ensure that `self.inner` is dropped before `self.guard`. self.inner = None; @@ -181,14 +189,12 @@ impl Drop for IndexIterator<'_, Guard, D, Kind> { } // Create an iterator over a table's data to use as a base for these iterators. -fn inner_iter<'guard, D, Guard>( - guard: &Guard, -) -> std::collections::hash_map::Iter<'guard, D::Key, D::Value> +fn inner_iter<'guard, D, Guard>(guard: &Guard, txn_id: TxnId) -> InnerIterator<'guard, D> where D: TableDesc + 'guard, - Guard: Deref> + 'guard, + Guard: StorageGuard + 'guard, { - let tables: *const _ = &guard.tables; + let tables: *const _ = &guard.storage().tables; // SAFETY: here we're extending the lifetime of the reference to the KV storage to `'guard`. // We can't use a raw pointer because we won't be able to use that as input to create an // iterator. To ensure safety we must ensure that `self.guard` outlives `self.inner`. We can @@ -197,5 +203,5 @@ where // `result` is moved, `&result.guard.tables` will point at the same address which is guaranteed // to outlive `'guard`. let tables = unsafe { &*tables }; - D::get_table(tables).data.iter() + D::get_table(tables).iter(txn_id) } diff --git a/ts_kv_store/src/lib.rs b/ts_kv_store/src/lib.rs index a7d0569..45d32ba 100644 --- a/ts_kv_store/src/lib.rs +++ b/ts_kv_store/src/lib.rs @@ -39,7 +39,7 @@ //! kind of data, but they have roughly the same operations available. //! //! The store is strongly typed. Both singletons and tables must be statically declared and both keys -//! and values have types from these declarations. Macros for these declations are in the [`schema`] +//! and values have types from these declarations. Macros for these declarations are in the [`schema`] //! module. They expand into an empty type for each singleton or table, and trait impls for these //! types. The types are then used as type parameters for all operations. //! @@ -51,11 +51,12 @@ //! into transactions which are atomic and serializable. Both singleton and tabular data can be part //! of a transaction. The `Table` types used for accessing tabular data do not add any transactional //! elements. That is, operations on a `Table` created from the main store are not part of a transaction, -//! and operations on a `Table` created from a transaction are only part of that transacton. +//! and operations on a `Table` created from a transaction are only part of that transaction. //! -//! Transactions may be read/write or read-only. Transactions don't need to be explicitly committed, -//! the effect is 'commit on drop' and currently operations are committed as they are called (with -//! no rollback). +//! Transactions may be read/write or read-only. Transactions can be committed or rolled-back, if a +//! transaction is dropped without being committed, then it is rolled-back. The system should handle +//! a panic or early return at any stage of a transaction (they are always atomic and leave the store +//! internally consistent; i.e., transactions are ACID). //! //! All data is owned. An owner is a simple token, its up to the user of the library to decide how //! to use these tokens and what rules to follow. Every KV pair has a single owner and can only be @@ -104,7 +105,7 @@ //! for documentation. //! //! The implementation of storage for tabular data is one HashMap per table, and otherwise -//! straighforward. For singleton data, we use a single HashMap which maps `TypeId` to `(Owner, SinValue)`. +//! straightforward. For singleton data, we use a single HashMap which maps `TypeId` to `(Owner, SinValue)`. //! Where the `TypeId` id the id of the type used to describe the singleton. Values can be //! stored in different ways (inline, via an `Arc`, etc.) each of which is a variant of `SinValue`. //! The store transparently converts keys and values to their declared types. @@ -114,21 +115,49 @@ //! types, and transactional index types). These are implemented on traits in the `operations` module. //! For ease of use and documentation, the functions are implemented on each concrete type and //! delegated to the trait implementations. I.e., the traits and impls are an implementation detail. +//! +//! Transactions have an internal id (`TxnId`). Since we use a global lock to ensure serializability, +//! transactions are only used to ensure atomicity. There can only ever be one (mutating) transaction +//! in progress at a time. The store tracks the most recently committed transaction id, and optionally +//! a current transaction id. Raw (non-transactional) operations use the most recently committed +//! transaction id as their 'transaction' id (but there is no separate commit step for these operations). +//! +//! The KV store uses a simplified version of MVCC to implement transactions. Only two versions are +//! required for each key, one will be the currently committed version and one will either be empty, +//! the version belonging to the currently in-progress transaction, or the version belonging to a +//! partially rolled-back or abandoned transaction. So, every value in the store is stored internally as a +//! `VersionedValue`, which has these two slots and a version and value in each. Deletes are stored +//! outside of the main storage in a per-table delete mask. +//! +//! Singletons work slightly differently: they still use a `VersionedValue`, but use a tombstone value +//! for deletes (`SinValue::None`) rather than a delete mask. +//! +//! To commit a transaction, the delete masks are applied to their corresponding tables, the committed +//! transaction id in the store is set to the committed transaction's, and the pending transaction +//! id is cleared. Applying delete masks is implemented such that it will not panic or otherwise fail +//! once application is started. That and the global lock ensures atomicity of commits. +//! +//! To rollback a transaction, all transaction state (i.e., versions belonging to the transaction and +//! delete masks) is deleted, then the store's pending transaction id is cleared. If a transaction +//! is abandoned without a proper rollback, then accessing the store and finding a pending transaction +//! id will trigger garbage collection. A panic which poisons the global lock can be resolved by +//! rolling back any pending transaction and un-poisoning. +//! +//! There is no way to time-out a transaction. So if a transaction takes the global lock and +//! never gives it up (e.g., by panicking without calling destructors, or leaking the transaction +//! handle), then the KV store cannot be recovered. -use std::{ - ops::{Deref, DerefMut}, - sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}, -}; +use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; mod index; mod iter; mod operations; mod raw; pub mod schema; -mod singleton; #[doc(hidden)] pub mod storage; -mod transactions; +#[doc(hidden)] +pub mod transactions; #[doc(inline)] pub use index::KvTableIndex; @@ -152,6 +181,44 @@ impl KvStore { pub fn new_with_storage(storage: RwLock>) -> Self { KvStore { storage } } + + /// Might (theoretically) panic, see the note on [`clear_lock_poison`]. + fn get_read_lock(&self) -> RwLockReadGuard<'_, storage::Storage> { + self.clear_lock_poison(); + + let lock = self.storage.read().unwrap(); + if lock.current_txn().is_none() { + return lock; + } + + let mut lock = self.storage.write().unwrap(); + lock.clear_transaction(); + RwLockWriteGuard::downgrade(lock) + } + + /// Might (theoretically) panic, see the note on [`clear_lock_poison`]. + fn get_write_lock(&self) -> RwLockWriteGuard<'_, storage::Storage> { + self.clear_lock_poison(); + let mut lock = self.storage.write().unwrap(); + lock.clear_transaction(); + lock + } + + /// Clear poison on the internal lock and recover the store. + /// + /// In theory, after calling this function, another thread could get the lock, panic, and poison + /// it again, so calling `clear_lock_poison` immediately followed by `get_read_lock` or `get_write_lock` + /// could panic. However, there is no easy way to avoid this because of the poisoning API. + /// Hopefully, the chance of that happening is small. + fn clear_lock_poison(&self) { + if self.storage.is_poisoned() { + self.storage.clear_poison(); + let mut lock = self.storage.write().unwrap(); + // The store should always be in a consistent state, so all we need to do is clear + // the current transaction (if there is one) and we're good to go. + lock.clear_transaction(); + } + } } impl<'store, TableStorage: schema::GeneratedStorage> operations::Ops @@ -160,7 +227,7 @@ impl<'store, TableStorage: schema::GeneratedStorage> operations::Ops>; fn read_lock(self) -> Self::ReadLock { - self.storage.read().unwrap() + self.get_read_lock() } } @@ -170,7 +237,7 @@ impl<'store, TableStorage: schema::GeneratedStorage> operations::OpsMut>; fn write_lock(self) -> Self::WriteLock { - self.storage.write().unwrap() + self.get_write_lock() } } @@ -182,52 +249,11 @@ pub type Owner = &'static str; // TODO derive(Error) #[derive(Debug, Clone)] pub enum Error { - /// A table was expected to not be initialized, but was by the specifed `Owner`. + /// A table was expected to not be initialized, but was by the specified `Owner`. AlreadyInit(Owner), + /// An inconsistency caused a transaction to fail. It has not been committed and can be re-tried. + TransactionFailed, } /// `Result` alias for a KvStore [`Error`]. pub type Result = std::result::Result; - -/// Helper type for using a reference to a [`RwLockWriteGuard`] as a generic argument -/// with a `Deref` bound. Required because checking trait bounds does not take into account -/// transitivity of `Deref`. -struct RefWriteGuard<'a, 'inner, T>(&'a RwLockWriteGuard<'inner, T>); - -impl<'a, 'inner, T> Deref for RefWriteGuard<'a, 'inner, T> { - type Target = T; - - fn deref(&self) -> &Self::Target { - self.0.deref() - } -} - -/// Helper type for using a mut reference to a [`RwLockWriteGuard`] as a generic argument -/// with `Deref` and `DerefMut` bounds. Required because checking trait bounds does not take into -/// account transitivity of `Deref`. -struct RefWriteGuardMut<'a, 'inner, T>(&'a mut RwLockWriteGuard<'inner, T>); - -impl<'a, 'inner, T> Deref for RefWriteGuardMut<'a, 'inner, T> { - type Target = T; - - fn deref(&self) -> &Self::Target { - self.0.deref() - } -} -impl<'a, 'inner, T> DerefMut for RefWriteGuardMut<'a, 'inner, T> { - fn deref_mut(&mut self) -> &mut Self::Target { - self.0.deref_mut() - } -} -/// Helper type for using a reference to a [`RwLockReadGuard`] as a generic argument -/// with a `Deref` bound. Required because checking trait bounds does not take into account -/// transitivity of `Deref`. -struct RefReadGuard<'a, 'inner, T>(&'a RwLockReadGuard<'inner, T>); - -impl<'a, 'inner, T> Deref for RefReadGuard<'a, 'inner, T> { - type Target = T; - - fn deref(&self) -> &Self::Target { - self.0.deref() - } -} diff --git a/ts_kv_store/src/operations.rs b/ts_kv_store/src/operations.rs index e8b6a63..d4eed91 100644 --- a/ts_kv_store/src/operations.rs +++ b/ts_kv_store/src/operations.rs @@ -12,23 +12,69 @@ use std::{ any::TypeId, borrow::Borrow, hash::Hash, - ops::{Deref, DerefMut}, - sync::Arc, + sync::{Arc, RwLockReadGuard, RwLockWriteGuard}, }; use crate::{ IndexIterator, Owner, Result, TableIterator, iter, - schema::{self, IndexDesc, IndexStorage, TableDesc}, - singleton::{OptSingletonValue, assert_owner}, - storage::{SinValue, Storage}, + schema::{self, IndexDesc, TableDesc}, + storage::Storage, }; pub(crate) trait Ops: Sized { - type ReadLock: Deref>; + type ReadLock: StorageGuard; fn read_lock(self) -> Self::ReadLock; } +pub(crate) trait StorageGuardMut { + fn storage(&mut self) -> &mut Storage; +} + +pub(crate) trait StorageGuard { + fn storage(&self) -> &Storage; +} + +impl<'store, TableStorage: schema::GeneratedStorage> StorageGuard + for RwLockReadGuard<'store, Storage> +{ + fn storage(&self) -> &Storage { + self + } +} + +impl<'store, TableStorage: schema::GeneratedStorage> StorageGuardMut + for RwLockWriteGuard<'store, Storage> +{ + fn storage(&mut self) -> &mut Storage { + self + } +} + +impl<'a, 'inner, TableStorage: schema::GeneratedStorage> StorageGuard + for &'a RwLockReadGuard<'inner, Storage> +{ + fn storage(&self) -> &Storage { + self + } +} + +impl<'a, 'inner, TableStorage: schema::GeneratedStorage> StorageGuard + for &'a RwLockWriteGuard<'inner, Storage> +{ + fn storage(&self) -> &Storage { + self + } +} + +impl<'a, 'inner, TableStorage: schema::GeneratedStorage> StorageGuardMut + for &'a mut RwLockWriteGuard<'inner, Storage> +{ + fn storage(&mut self) -> &mut Storage { + self + } +} + pub(crate) trait SingletonOps: Ops { @@ -37,18 +83,18 @@ pub(crate) trait SingletonOps: D::Value: Clone, { let storage = self.read_lock(); + let storage = storage.storage(); let key = TypeId::of::(); - storage - .get_singleton_value(&key) - .map_singleton_value(|v| D::Value::clone(D::from_value_ref(v))) + let txn_id = storage.txn_id(); + storage.get_singleton_value::(key, txn_id).cloned() } fn get_arc(self, _owner: Owner) -> Option> { let storage = self.read_lock(); + let storage = storage.storage(); let key = TypeId::of::(); - storage - .get_singleton_value(&key) - .map_singleton_value(|v| D::from_value_arc(v)) + let txn_id = storage.txn_id(); + storage.get_singleton_arc::(key, txn_id) } fn with( @@ -57,12 +103,14 @@ pub(crate) trait SingletonOps: _owner: Owner, ) -> Option { let storage = self.read_lock(); + let storage = storage.storage(); let key = TypeId::of::(); - storage - .get_singleton_value(&key) - .map_singleton_value(|v| f(D::from_value_ref(v))) + let txn_id = storage.txn_id(); + let value = storage.get_singleton_value::(key, txn_id)?; + Some(f(value)) } } + pub(crate) trait TabularOps: Ops { @@ -70,14 +118,16 @@ pub(crate) trait TabularOps: fn len(self) -> usize { let storage = self.read_lock(); + let storage = storage.storage(); let table = Self::TableDesc::get_table(&storage.tables); - table.data.len() + table.len(storage.txn_id()) } fn is_empty(self) -> bool { let storage = self.read_lock(); + let storage = storage.storage(); let table = Self::TableDesc::get_table(&storage.tables); - table.data.is_empty() + table.is_empty(storage.txn_id()) } fn get(self, key: &Q, _owner: Owner) -> Option<::Value> @@ -87,8 +137,9 @@ pub(crate) trait TabularOps: Q: ?Sized + Hash + Eq, { let storage = self.read_lock(); + let storage = storage.storage(); let table = Self::TableDesc::get_table(&storage.tables); - table.data.get(key).cloned() + table.get(key, storage.txn_id()).cloned() } fn with( @@ -102,9 +153,9 @@ pub(crate) trait TabularOps: Q: ?Sized + Hash + Eq, { let storage = self.read_lock(); + let storage = storage.storage(); let table = Self::TableDesc::get_table(&storage.tables); - let value = table.data.get(key)?; - + let value = table.get(key, storage.txn_id())?; Some(f(value)) } @@ -163,14 +214,16 @@ pub(crate) trait IndexedOps: fn len(self) -> usize { let storage = self.read_lock(); + let storage = storage.storage(); let base = Base::::get_table(&storage.tables); - base.data.len() + base.len(storage.txn_id()) } fn is_empty(self) -> bool { let storage = self.read_lock(); + let storage = storage.storage(); let base = Base::::get_table(&storage.tables); - base.data.is_empty() + base.is_empty(storage.txn_id()) } fn get(self, key: &Q, _owner: Owner) -> Option> @@ -181,10 +234,11 @@ pub(crate) trait IndexedOps: Q: ?Sized + Hash + Eq, { let storage = self.read_lock(); + let storage = storage.storage(); let base = Base::::get_table(&storage.tables); let index = ::get_table(&storage.tables); - let base_key = index.data.get(key)?; - base.data.get(base_key).cloned() + let base_key = index.get(key, storage.txn_id())?; + base.get(base_key, storage.txn_id()).cloned() } fn with( @@ -199,10 +253,11 @@ pub(crate) trait IndexedOps: Q: ?Sized + Hash + Eq, { let storage = self.read_lock(); + let storage = storage.storage(); let base = Base::::get_table(&storage.tables); let index = ::get_table(&storage.tables); - let base_key = index.data.get(key)?; - let value = base.data.get(base_key)?; + let base_key = index.get(key, storage.txn_id())?; + let value = base.get(base_key, storage.txn_id())?; Some(f(value)) } @@ -253,7 +308,7 @@ pub(crate) trait IndexedOps: } pub(crate) trait OpsMut: Sized { - type WriteLock: DerefMut>; + type WriteLock: StorageGuardMut; fn write_lock(self) -> Self::WriteLock; } @@ -261,47 +316,24 @@ pub(crate) trait OpsMut: Sized { pub(crate) trait SingletonOpsMut: OpsMut { - fn insert(self, value: D::ArgValue, owner: Owner) -> Option { + fn insert(self, value: D::ArgValue, owner: Owner) { let mut storage = self.write_lock(); + let storage = storage.storage(); let key = TypeId::of::(); - assert_owner(owner, &key, &*storage); - - storage - .insert_singleton(key, owner, D::to_value(value)) - .map_singleton_value(|v| D::from_value(v)) - } + assert_owner(owner, key, storage); - fn with_mut( - self, - f: impl FnOnce(&mut D::Value) -> T, - owner: Owner, - ) -> Option { - let mut storage = self.write_lock(); - let key = TypeId::of::(); - assert_owner(owner, &key, &*storage); - storage - .get_singleton_value_mut(&key) - .map_singleton_value(|v| f(D::from_value_mut(v))) + let txn_id = storage.txn_id(); + storage.insert_singleton::(key, owner, value, txn_id); } - fn remove(self, owner: Owner) -> Option { + fn remove(self, owner: Owner) { let mut storage = self.write_lock(); + let storage = storage.storage(); let key = TypeId::of::(); - assert_owner(owner, &key, &*storage); + assert_owner(owner, key, storage); - storage - .remove_singleton(&key) - .map_singleton_value(|v| D::from_value(v)) - } - - fn clear(self, owner: Owner) -> Option { - let mut storage = self.write_lock(); - let key = TypeId::of::(); - assert_owner(owner, &key, &*storage); - - storage - .insert_singleton(key, owner, SinValue::None) - .map_singleton_value(|v| D::from_value(v)) + let txn_id = storage.txn_id(); + storage.remove_singleton(key, txn_id); } } @@ -313,16 +345,19 @@ pub(crate) trait TabularOpsMut: fn init(self, owner: Owner) -> Result<()> { let mut storage = self.write_lock(); - let table = Self::TableDesc::get_table_mut(&mut storage.tables); + let table = Self::TableDesc::get_table_mut(&mut storage.storage().tables); table.try_set_owner(owner) } fn clear(self, owner: Owner) { let mut storage = self.write_lock(); + let storage = storage.storage(); + let txn_id = storage.txn_id(); + let max_committed_id = storage.max_committed_id(); + let table = Self::TableDesc::get_table_mut(&mut storage.tables); table.assert_or_set_owner(owner); - table.indexes.clear(); - table.data.clear(); + table.clear(txn_id, max_committed_id); } fn insert( @@ -330,18 +365,17 @@ pub(crate) trait TabularOpsMut: key: ::Key, value: ::Value, owner: Owner, - ) -> Option<::Value> - where + ) where ::Key: Clone, { let mut storage = self.write_lock(); + let storage = storage.storage(); + let txn_id = storage.txn_id(); + let max_committed_id = storage.max_committed_id(); let table = Self::TableDesc::get_table_mut(&mut storage.tables); table.assert_or_set_owner(owner); - if let Some(old_value) = table.data.get(&key) { - table.indexes.on_remove(old_value); - } - table.indexes.on_insert(&key, &value); - table.data.insert(key, value) + + table.insert(key, value, txn_id, max_committed_id); } // TODO if `f` panics then the indexes will be left in an inconsistent state. @@ -354,51 +388,48 @@ pub(crate) trait TabularOpsMut: where ::Key: Borrow, Q: ?Sized + Hash + Eq + ToOwned::Key>, + ::Value: Clone, { let mut storage = self.write_lock(); + let storage = storage.storage(); + let txn_id = storage.txn_id(); + let max_committed_id = storage.max_committed_id(); let table = Self::TableDesc::get_table_mut(&mut storage.tables); table.assert_owner(owner); - let value = table.data.get_mut(key)?; - table.indexes.on_remove(value); - let result = f(value); - table.indexes.on_insert(key, value); - - Some(result) + table.with_mut(key, f, txn_id, max_committed_id) } - fn remove(self, key: &Q, owner: Owner) -> Option<::Value> + fn remove(self, key: &Q, owner: Owner) where ::Key: Borrow, - Q: ?Sized + Hash + Eq, + Q: ?Sized + Hash + Eq + ToOwned::Key>, { let mut storage = self.write_lock(); + let storage = storage.storage(); + let txn_id = storage.txn_id(); + let max_committed_id = storage.max_committed_id(); let table = Self::TableDesc::get_table_mut(&mut storage.tables); table.assert_owner(owner); - let value = table.data.remove(key.borrow())?; - table.indexes.on_remove(&value); - Some(value) + table.remove(key, txn_id, max_committed_id); } // TODO if `f` panics then the indexes will be left in an inconsistent state. fn for_each_mut( self, - mut f: impl FnMut( - &::Key, - &mut ::Value, - ), + f: impl FnMut(&::Key, &mut ::Value), owner: Owner, - ) { + ) where + ::Value: Clone, + { let mut storage = self.write_lock(); + let storage = storage.storage(); + let txn_id = storage.txn_id(); + let max_committed_id = storage.max_committed_id(); let table = Self::TableDesc::get_table_mut(&mut storage.tables); table.assert_owner(owner); - for (k, v) in &mut table.data { - f(k, v); - } - - table.indexes.clear(); - table.indexes.build(table.data.iter()); + table.for_each_mut(f, txn_id, max_committed_id); } } @@ -409,6 +440,7 @@ pub(crate) trait IndexedOpsMut: fn init(self, owner: Owner) -> Result<()> { let mut storage = self.write_lock(); + let storage = storage.storage(); let base = Base::::get_table_mut(&mut storage.tables); base.try_set_owner(owner) } @@ -418,31 +450,28 @@ pub(crate) trait IndexedOpsMut: IndexValue: Hash + Eq, { let mut storage = self.write_lock(); + let storage = storage.storage(); + let txn_id = storage.txn_id(); + let max_committed_id = storage.max_committed_id(); + let base = Base::::get_table_mut(&mut storage.tables); base.assert_or_set_owner(owner); - base.indexes.clear(); - base.data.clear(); + base.clear(txn_id, max_committed_id); } - fn insert( - self, - key: BaseKey, - value: BaseValue, - owner: Owner, - ) -> Option> + fn insert(self, key: BaseKey, value: BaseValue, owner: Owner) where BaseKey: Clone, IndexValue: Hash + Eq, { let mut storage = self.write_lock(); + let storage = storage.storage(); + let txn_id = storage.txn_id(); + let max_committed_id = storage.max_committed_id(); let base = Base::::get_table_mut(&mut storage.tables); base.assert_or_set_owner(owner); - if let Some(old_value) = base.data.get(&key) { - base.indexes.on_remove(old_value); - } - base.indexes.on_insert(&key, &value); - base.data.insert(key, value) + base.insert(key, value, txn_id, max_committed_id); } // TODO if `f` panics then the indexes will be left in an inconsistent state. @@ -457,43 +486,43 @@ pub(crate) trait IndexedOpsMut: Q: ?Sized + Hash + Eq, BaseKey: Clone, IndexValue: Hash + Eq, + BaseValue: Clone, { let mut storage = self.write_lock(); - let index = ::get_table(&storage.tables); - let base_key: *const BaseKey = index.data.get(key)? as *const _; - // SAFETY: `B` and `D` are different tables, so `get_table[_mut]` will return pointers to - // different `Table` objects. `base_key` is a pointer into `index` and cannot be a pointer - // into `base`. - let base_key = unsafe { &*base_key }; - let base = Base::::get_table_mut(&mut storage.tables); + let storage = storage.storage(); + let txn_id = storage.txn_id(); + let max_committed_id = storage.max_committed_id(); + let (base, index) = schema::get_two_tables_mut::<_, Base, Self::IndexDesc>( + &mut storage.tables, + ); base.assert_owner(owner); - let value = base.data.get_mut(base_key)?; - // TODO we could be more efficient and only update indexes if the foreign key changes - base.indexes.on_remove(value); - let result = f(value); - base.indexes.on_insert(base_key, value); - Some(result) + let base_key = index.get(key, txn_id)?; + base.with_mut(base_key, f, txn_id, max_committed_id) } - fn remove(self, key: &Q, owner: Owner) -> Option> + fn remove(self, key: &Q, owner: Owner) where IndexKey: Borrow, - Q: ?Sized + Hash + Eq, - IndexValue: Hash + Eq, + Q: ?Sized + Hash + Eq + ToOwned>, + IndexValue: Hash + Eq + ToOwned>, { let mut storage = self.write_lock(); + let storage = storage.storage(); + let txn_id = storage.txn_id(); + let max_committed_id = storage.max_committed_id(); + let (base, index) = schema::get_two_tables_mut::<_, Base, Self::IndexDesc>( + &mut storage.tables, + ); // First check ownership, then do the operation. - let base = Base::::get_table_mut(&mut storage.tables); base.assert_owner(owner); - let index = ::get_table_mut(&mut storage.tables); - let base_key = index.data.remove(key)?; - let base = Base::::get_table_mut(&mut storage.tables); - let value = base.data.remove(base_key.borrow())?; - base.indexes.on_remove(&value); - Some(value) + let Some(base_key) = index.get(key, txn_id) else { + return; + }; + base.remove(base_key, txn_id, max_committed_id); + index.remove(key, txn_id, max_committed_id); } // TODO if `f` panics then the indexes will be left in an inconsistent state. @@ -503,27 +532,40 @@ pub(crate) trait IndexedOpsMut: owner: Owner, ) where IndexValue: Hash + Eq, + BaseKey: Clone, + BaseValue: Clone, { let mut storage = self.write_lock(); - let tables: *mut _ = &mut storage.tables; - // SAFETY: `B` and `D` are different tables, so `get_table[_mut]` will return pointers to - // different `Table` objects. Although the compiler treats `base` and `index` as having - // a reference to `storage.tables`, they do not, so the references here are transient and - // all mutable references are unique. - let base = - <::BaseTable>::get_table_mut(unsafe { &mut *tables }); + let storage = storage.storage(); + let txn_id = storage.txn_id(); + let max_committed_id = storage.max_committed_id(); + + let (base, index) = schema::get_two_tables_mut::<_, Base, Self::IndexDesc>( + &mut storage.tables, + ); base.assert_owner(owner); - let index = ::get_table(&storage.tables); - for (k, base_key) in &index.data { - let Some(v) = base.data.get_mut(base_key) else { - continue; - }; - f(k, v); + for (k, base_key) in index.iter(txn_id) { + base.with_mut( + base_key, + |value| { + f(k, value); + }, + txn_id, + max_committed_id, + ); } + } +} - // TODO we could be more efficient and only update indexes if the foreign key changes - base.indexes.clear(); - base.indexes.build(base.data.iter()); +#[allow(unused_variables)] +#[track_caller] +fn assert_owner(owner: Owner, key: TypeId, storage: &Storage) { + #[cfg(debug_assertions)] + if let Some(prev_owner) = storage.get_singleton_owner(key) { + assert_eq!( + prev_owner, owner, + "Ownership violation: expected {prev_owner}, found {owner}" + ); } } diff --git a/ts_kv_store/src/raw.rs b/ts_kv_store/src/raw.rs index 14d5652..b990838 100644 --- a/ts_kv_store/src/raw.rs +++ b/ts_kv_store/src/raw.rs @@ -20,7 +20,7 @@ impl<'a, D: schema::TableDesc> Ops for &'a KvTable<'_, D> { type ReadLock = std::sync::RwLockReadGuard<'a, Storage>; fn read_lock(self) -> Self::ReadLock { - self.store.storage.read().unwrap() + self.store.get_read_lock() } } @@ -32,7 +32,7 @@ impl<'a, D: schema::TableDesc> OpsMut for &'a KvTable<'_, D> { type WriteLock = std::sync::RwLockWriteGuard<'a, Storage>; fn write_lock(self) -> Self::WriteLock { - self.store.storage.write().unwrap() + self.store.get_write_lock() } } @@ -104,40 +104,16 @@ impl KvStore { /// /// Returns the previous value if there is one, or `None` if there is no value for the specified key. // Question: do we need separate insert/update/upsert methods? - pub fn insert( - &self, - owner: Owner, - value: D::ArgValue, - ) -> Option { + pub fn insert(&self, owner: Owner, value: D::ArgValue) { <&Self as SingletonOpsMut<_>>::insert::(self, value, owner) } - /// Get mutable access to a value in the store. - /// - /// Returns `None` (and does not call `f`) if there is no value for the specified key. - pub fn with_mut( - &self, - owner: Owner, - f: impl FnOnce(&mut D::Value) -> T, - ) -> Option { - <&Self as SingletonOpsMut<_>>::with_mut::(self, f, owner) - } - /// Remove a single value from the store. /// /// Returns the previous value if there is one, or `None` if there is no value for the specified key. - pub fn remove(&self, owner: Owner) -> Option { + pub fn remove(&self, owner: Owner) { <&Self as SingletonOpsMut<_>>::remove::(self, owner) } - - /// Remove a single value from the store while preserving ownership of the key/value. - /// - /// Can also be used to initialize a key/value with a key but without a value. - /// - /// Returns the previous value if there is one, or `None` if there is no value for the specified key. - pub fn clear(&self, owner: Owner) -> Option { - <&Self as SingletonOpsMut<_>>::clear::(self, owner) - } } /// Abstracts a table of key/values pairs in the store. @@ -153,7 +129,7 @@ impl KvTable<'_, D> { /// Initialize a table by setting its owner. /// /// Calling this function is optional, a table can be used without initialization in which case, - /// its owner is set to the owner specifed in the first write. + /// its owner is set to the owner specified in the first write. /// /// Returns an error (containing the current owner of the table) if the table has already been /// initialized. In this case, the table will be in a consistent state and can be used as normal. @@ -202,7 +178,7 @@ impl KvTable<'_, D> { /// Insert a value into the table. /// /// Returns the previous value if there is one, or `None` if there is no value for the specified key. - pub fn insert(&self, key: D::Key, value: D::Value) -> Option + pub fn insert(&self, key: D::Key, value: D::Value) where D::Key: Clone, { @@ -216,6 +192,7 @@ impl KvTable<'_, D> { where D::Key: Borrow, Q: ?Sized + Hash + Eq + ToOwned, + D::Value: Clone, { <&Self as TabularOpsMut<_>>::with_mut(self, key, f, self.owner) } @@ -223,10 +200,10 @@ impl KvTable<'_, D> { /// Remove a row from the table. /// /// Returns the previous value if there is one, or `None` if there is no value for the specified key. - pub fn remove(&self, key: &Q) -> Option + pub fn remove(&self, key: &Q) where D::Key: Borrow, - Q: ?Sized + Hash + Eq, + Q: ?Sized + Hash + Eq + ToOwned, { <&Self as TabularOpsMut<_>>::remove(self, key, self.owner) } @@ -247,7 +224,10 @@ impl KvTable<'_, D> { } /// Iterate all the key/value pairs in a table. Values are mutable. - pub fn for_each_mut(&self, f: impl FnMut(&D::Key, &mut D::Value)) { + pub fn for_each_mut(&self, f: impl FnMut(&D::Key, &mut D::Value)) + where + D::Value: Clone, + { <&Self as TabularOpsMut<_>>::for_each_mut(self, f, self.owner) } } @@ -259,7 +239,6 @@ mod test { use crate::{Error, singleton, tables}; singleton!(Count(u64)); - singleton!(Name(String as Box)); singleton!(Shared(String as Arc)); singleton!(Label(u64 as Ref)); @@ -281,14 +260,6 @@ mod test { assert_eq!(store.get::(OWNER), Some(42)); } - #[test] - fn get_returns_none_after_clear() { - let store = KvStore::new(); - store.insert::(OWNER, 1); - store.clear::(OWNER); - assert!(store.get::(OWNER).is_none()); - } - #[test] fn get_returns_none_after_remove() { let store = KvStore::new(); @@ -346,89 +317,6 @@ mod test { assert_eq!(store.with::(OWNER, |v| v * 2), Some(10)); } - #[test] - fn insert_returns_none_on_first_insert() { - let store = KvStore::new(); - assert!(store.insert::(OWNER, 1).is_none()); - } - - #[test] - fn insert_returns_previous_value() { - let store = KvStore::new(); - store.insert::(OWNER, 1); - assert_eq!(store.insert::(OWNER, 2), Some(1)); - } - - #[test] - fn insert_over_tombstone_returns_none() { - let store = KvStore::new(); - store.insert::(OWNER, 1); - store.clear::(OWNER); - assert!(store.insert::(OWNER, 2).is_none()); - } - - #[test] - fn insert_over_removed_returns_none() { - let store = KvStore::new(); - store.insert::(OWNER, 1); - store.remove::(OWNER); - assert!(store.insert::(OWNER, 2).is_none()); - } - - #[test] - fn mutate_returns_none_and_does_not_call_f_when_absent() { - let store = KvStore::new(); - let mut called = false; - let result = store.with_mut::(OWNER, |_| { - called = true; - }); - assert!(result.is_none()); - assert!(!called); - } - - #[test] - fn mutate_modifies_value_in_place() { - let store = KvStore::new(); - store.insert::(OWNER, 10); - assert_eq!( - store.with_mut::(OWNER, |v| { - *v += 5; - *v - }), - Some(15) - ); - assert_eq!(store.get::(OWNER), Some(15)); - } - - #[test] - fn mutate_on_tombstone_returns_none() { - let store = KvStore::new(); - store.insert::(OWNER, 1); - store.clear::(OWNER); - assert!(store.with_mut::(OWNER, |_| {}).is_none()); - } - - #[test] - fn mutate_box_singleton() { - let store = KvStore::new(); - store.insert::(OWNER, "hello".to_owned()); - store.with_mut::(OWNER, |s| s.push_str(" world")); - assert_eq!(store.get::(OWNER), Some("hello world".to_owned())); - } - - #[test] - fn remove_returns_none_when_absent() { - let store = KvStore::new(); - assert!(store.remove::(OWNER).is_none()); - } - - #[test] - fn remove_returns_previous_value() { - let store = KvStore::new(); - store.insert::(OWNER, 7); - assert_eq!(store.remove::(OWNER), Some(7)); - } - #[test] fn remove_makes_entry_absent() { let store = KvStore::new(); @@ -437,45 +325,6 @@ mod test { assert!(store.get::(OWNER).is_none()); } - #[test] - fn remove_allows_reinsert_by_other_owner() { - let store = KvStore::new(); - store.insert::(OWNER, 1); - store.remove::(OWNER); - // Entry is fully gone — any owner can insert - store.insert::(OTHER, 2); - assert_eq!(store.get::(OTHER), Some(2)); - } - - #[test] - fn clear_returns_none_when_absent() { - let store = KvStore::new(); - assert!(store.clear::(OWNER).is_none()); - } - - #[test] - fn clear_returns_previous_value() { - let store = KvStore::new(); - store.insert::(OWNER, 3); - assert_eq!(store.clear::(OWNER), Some(3)); - } - - #[test] - fn clear_makes_get_return_none() { - let store = KvStore::new(); - store.insert::(OWNER, 1); - store.clear::(OWNER); - assert!(store.get::(OWNER).is_none()); - } - - #[test] - fn double_clear_returns_none() { - let store = KvStore::new(); - store.insert::(OWNER, 1); - store.clear::(OWNER); - assert!(store.clear::(OWNER).is_none()); - } - #[test] #[cfg(debug_assertions)] #[should_panic(expected = "Ownership violation")] @@ -485,15 +334,6 @@ mod test { store.insert::(OTHER, 2); } - #[test] - #[cfg(debug_assertions)] - #[should_panic(expected = "Ownership violation")] - fn singleton_mutate_wrong_owner_panics() { - let store = KvStore::new(); - store.insert::(OWNER, 1); - store.with_mut::(OTHER, |_| {}); - } - #[test] #[cfg(debug_assertions)] #[should_panic(expected = "Ownership violation")] @@ -503,26 +343,6 @@ mod test { store.remove::(OTHER); } - #[test] - #[cfg(debug_assertions)] - #[should_panic(expected = "Ownership violation")] - fn singleton_clear_wrong_owner_panics() { - let store = KvStore::new(); - store.insert::(OWNER, 1); - store.clear::(OTHER); - } - - #[test] - #[cfg(debug_assertions)] - #[should_panic(expected = "Ownership violation")] - fn singleton_clear_blocks_other_owner() { - let store = KvStore::new(); - store.insert::(OWNER, 1); - store.clear::(OWNER); - // Tombstone preserves OWNER — OTHER cannot insert - store.insert::(OTHER, 2); - } - #[test] fn table_init_succeeds_on_fresh_table() { let store = KvStore::new(); @@ -558,27 +378,6 @@ mod test { assert!(store.table::(OWNER).get("missing").is_none()); } - #[test] - fn table_insert_returns_none_on_first() { - let store = KvStore::new(); - assert!( - store - .table::(OWNER) - .insert("k", "v".to_owned()) - .is_none() - ); - } - - #[test] - fn table_insert_returns_previous_value() { - let store = KvStore::new(); - store.table::(OWNER).insert("k", "v1".to_owned()); - assert_eq!( - store.table::(OWNER).insert("k", "v2".to_owned()), - Some("v1".to_owned()), - ); - } - #[test] fn table_get_returns_value_after_insert() { let store = KvStore::new(); @@ -634,27 +433,11 @@ mod test { ); } - #[test] - fn table_remove_returns_none_when_absent() { - let store = KvStore::new(); - assert!(store.table::(OWNER).remove("missing").is_none()); - } - - #[test] - fn table_remove_returns_previous_value() { - let store = KvStore::new(); - store.table::(OWNER).insert("k", "v".to_owned()); - assert_eq!( - store.table::(OWNER).remove("k"), - Some("v".to_owned()) - ); - } - #[test] fn table_remove_makes_get_return_none() { let store = KvStore::new(); store.table::(OWNER).insert("k", "v".to_owned()); - store.table::(OWNER).remove("k"); + store.table::(OWNER).remove(&"k"); assert!(store.table::(OWNER).get("k").is_none()); } @@ -707,7 +490,7 @@ mod test { let table = store.table::(OWNER); table.insert("a", "alpha".to_owned()); table.insert("b", "beta".to_owned()); - table.remove("a"); + table.remove(&"a"); assert_eq!(table.len(), 1); } @@ -857,7 +640,7 @@ mod test { fn table_remove_wrong_owner_panics() { let store = KvStore::new(); store.table::(OWNER).init().unwrap(); - store.table::(OTHER).remove("k"); + store.table::(OTHER).remove(&"k"); } #[test] @@ -868,4 +651,14 @@ mod test { store.table::(OWNER).init().unwrap(); store.table::(OTHER).clear(); } + + // `remove` no longer returns the removed value: it is used in statement position and its only + // observable effect is that the row becomes absent. + #[test] + fn table_remove_used_in_statement_position() { + let store = KvStore::new(); + store.table::(OWNER).insert("k", "v".to_owned()); + store.table::(OWNER).remove(&"k"); + assert_eq!(store.table::(OWNER).get("k"), None); + } } diff --git a/ts_kv_store/src/schema.rs b/ts_kv_store/src/schema.rs index f0e85c7..3fb9baf 100644 --- a/ts_kv_store/src/schema.rs +++ b/ts_kv_store/src/schema.rs @@ -1,8 +1,15 @@ //! Traits and macros for defining the KvStore schema. -use std::{any::Any, hash::Hash, sync::Arc}; +use std::{ + any::{Any, TypeId}, + hash::Hash, + sync::Arc, +}; -use crate::storage::{SinValue, Table}; +use crate::{ + storage::{SinValue, Table}, + transactions::TxnId, +}; /// A singleton key/value. /// @@ -58,9 +65,9 @@ pub trait MutSingleton: Singleton { /// Describes tabular key/values in the store. /// /// Prefer to use the macros in this module rather than this trait directly. -pub trait TableDesc: Sized { +pub trait TableDesc: Sized + 'static { /// The type of the key. - type Key: Hash + Eq; + type Key: Hash + Eq + Clone; /// The type of the value. type Value: Any + Send + Sync; /// The storage for the table. @@ -74,6 +81,27 @@ pub trait TableDesc: Sized { fn get_table_mut(storage: &mut Self::Storage) -> &mut Table; } +/// Similar to `TableDesc::get_table_mut`, but allows for getting two different tables at one time. +/// +/// SAFETY: A and B must represent distinct tables. +#[allow(clippy::type_complexity)] +pub(crate) fn get_two_tables_mut< + Storage: GeneratedStorage, + A: TableDesc + Any, + B: TableDesc + Any, +>( + storage: &mut Storage, +) -> (&mut Table, &mut Table) { + debug_assert_ne!(TypeId::of::(), TypeId::of::()); + + // SAFETY: `A` and `B` are different tables, so `get_table_mut` will return pointers to + // different `Table` objects. + let storage = storage as *mut _; + let a = A::get_table_mut(unsafe { &mut *storage }); + let b = B::get_table_mut(unsafe { &mut *storage }); + (a, b) +} + /// Describes a table used as an index. pub trait IndexDesc: TableDesc { /// The table which is indexed. @@ -83,35 +111,58 @@ pub trait IndexDesc: TableDesc { /// Operations on an index. pub trait IndexStorage: Default { /// Clear the whole index. - fn clear(&mut self); + fn clear( + &mut self, + txn_id: crate::transactions::TxnId, + max_committed_id: crate::transactions::TxnId, + ); + /// An item has been inserted into the index. - fn on_insert(&mut self, key: &Q, value: &V) - where + fn on_insert( + &mut self, + key: &Q, + value: &V, + txn_id: crate::transactions::TxnId, + max_committed_id: crate::transactions::TxnId, + ) where K: std::borrow::Borrow, Q: ?Sized + std::hash::Hash + Eq + std::borrow::ToOwned; + /// An item has been removed from the index. - fn on_remove(&mut self, value: &V); - /// Build the index from the base table. - fn build<'a>(&mut self, kvs: impl Iterator) - where - K: 'a, - V: 'a; + fn on_remove( + &mut self, + value: &V, + txn_id: crate::transactions::TxnId, + max_committed_id: crate::transactions::TxnId, + ); } impl IndexStorage for () { - fn clear(&mut self) {} - fn on_insert(&mut self, _key: &Q, _value: &V) - where + fn clear( + &mut self, + _txn_id: crate::transactions::TxnId, + _max_committed_id: crate::transactions::TxnId, + ) { + } + + fn on_insert( + &mut self, + _key: &Q, + _value: &V, + _id: crate::transactions::TxnId, + _max_committed_id: crate::transactions::TxnId, + ) where K: std::borrow::Borrow, Q: ?Sized + std::hash::Hash + Eq, { } - fn on_remove(&mut self, _value: &V) {} - fn build<'a>(&mut self, _kvs: impl Iterator) - where - K: 'a, - V: 'a, - { + + fn on_remove( + &mut self, + _value: &V, + _txn_id: crate::transactions::TxnId, + _max_committed_id: crate::transactions::TxnId, + ) { } } @@ -120,7 +171,16 @@ impl IndexStorage for () { /// This should be considered a sealed trait and not implemented except by the macros in this module. /// Unfortunately it has to be public because of macro visibility hygiene. #[doc(hidden)] -pub trait GeneratedStorage: Default {} +pub trait GeneratedStorage: Default { + /// Commit a transaction by applying all tables' transaction state to their permanent data. + /// + /// This operation must be atomic. I.e., it will only fail without any tables committed, and if it + /// succeeds, then all masks have committed. + fn commit_txn(&mut self, txn_id: TxnId) -> crate::Result<()>; + + /// Delete any uncommitted per-transaction state associated with `txn_id` held in tables. + fn gc_txn(&mut self, txn_id: TxnId); +} /// Macro to declare a singleton key/value in the store. /// @@ -218,13 +278,13 @@ macro_rules! init_helper { $crate::storage::SinValue::U64($value) }; (Box, $value:ident) => { - $crate::storage::SinValue::Box(Box::new($value) as Box) + $crate::storage::SinValue::Box(std::boxed::Box::new($value) as Box) }; (Arc, $value:ident) => { - $crate::storage::SinValue::Arc($value.clone() as Arc) + $crate::storage::SinValue::Arc($value.clone() as std::sync::Arc) }; (Ref, $value:ident) => { - $crate::storage::SinValue::Ref($value as &'static (dyn Any + Send + Sync)) + $crate::storage::SinValue::Ref($value as &'static (dyn std::any::Any + Send + Sync)) }; } @@ -355,7 +415,7 @@ macro_rules! tables { } $( - impl $crate::schema::TableDesc for index::$name::$field { + impl $crate::schema::TableDesc for index::$name::$field where $field_ty: Clone { type Key = $field_ty; type Value = $key_ty; type Storage = TableStorage; @@ -381,7 +441,28 @@ macro_rules! tables { pub struct TableStorage { $($name: $crate::storage::Table<$name, index::$name::Indexes>),* } - impl $crate::schema::GeneratedStorage for TableStorage {} + + impl $crate::schema::GeneratedStorage for TableStorage { + fn commit_txn(&mut self, _txn_id: $crate::transactions::TxnId) -> $crate::Result<()> { + $( + self.$name.check_txn_consistency(_txn_id)?; + $(self.$name.indexes.$field.check_txn_consistency(_txn_id)?;)* + )* + $( + self.$name.commit_txn(_txn_id); + $(self.$name.indexes.$field.commit_txn(_txn_id);)* + )* + + Ok(()) + } + + fn gc_txn(&mut self, _txn_id: $crate::transactions::TxnId) { + $( + self.$name.gc_txn(_txn_id); + $(self.$name.indexes.$field.gc_txn(_txn_id);)* + )* + } + } pub mod index { $( @@ -412,45 +493,31 @@ macro_rules! tables { } impl $crate::schema::IndexStorage<$key_ty, $value_ty> for index::$name::Indexes { - fn clear(&mut self) { + fn clear(&mut self, _txn_id: $crate::transactions::TxnId, _max_committed_id: $crate::transactions::TxnId) { $( - self.$field.data.clear(); + self.$field.clear(_txn_id, _max_committed_id); )* } - fn on_insert(&mut self, _key: &Q, _value: &$value_ty) + fn on_insert(&mut self, _key: &Q, _value: &$value_ty, _txn_id: $crate::transactions::TxnId, _max_committed_id: $crate::transactions::TxnId) where $key_ty: std::borrow::Borrow, Q: ?Sized + std::hash::Hash + Eq + std::borrow::ToOwned { $({ for value in index::$name::Indexes::$field(_value) { - self.$field.data.insert(value, _key.to_owned()); + self.$field.insert(value, _key.to_owned(), _txn_id, _max_committed_id); } })* } - fn on_remove(&mut self, _value: &$value_ty) { + fn on_remove(&mut self, _value: &$value_ty, _txn_id: $crate::transactions::TxnId, _max_committed_id: $crate::transactions::TxnId) { $({ for value in index::$name::Indexes::$field(_value) { - self.$field.data.remove(&value); + self.$field.remove(&value, _txn_id, _max_committed_id); } })* } - - fn build<'a>(&mut self, kvs: impl Iterator) - where - $key_ty: 'a, - $value_ty: 'a, - { - for (_k, _v) in kvs { - $({ - for value in index::$name::Indexes::$field(_v) { - self.$field.data.insert(value, _k.to_owned()); - } - })* - } - } } )* diff --git a/ts_kv_store/src/singleton.rs b/ts_kv_store/src/singleton.rs deleted file mode 100644 index 119c90e..0000000 --- a/ts_kv_store/src/singleton.rs +++ /dev/null @@ -1,60 +0,0 @@ -//! Helpers for working with singleton KVs. - -use std::any::TypeId; - -use crate::{ - Owner, schema, - storage::{SinValue, Storage}, -}; - -/// Helper trait to handle `SinValue::None` -pub trait OptSingletonValue { - type Value; - - fn map_singleton_value(self, f: impl FnOnce(Self::Value) -> T) -> Option; -} - -impl<'a> OptSingletonValue for Option<&'a SinValue> { - type Value = &'a SinValue; - - fn map_singleton_value(self, f: impl FnOnce(Self::Value) -> T) -> Option { - match self? { - SinValue::None => None, - v => Some(f(v)), - } - } -} - -impl<'a> OptSingletonValue for Option<&'a mut SinValue> { - type Value = &'a mut SinValue; - - fn map_singleton_value(self, f: impl FnOnce(Self::Value) -> T) -> Option { - match self? { - SinValue::None => None, - v => Some(f(v)), - } - } -} - -impl OptSingletonValue for Option<(Owner, SinValue)> { - type Value = SinValue; - - fn map_singleton_value(self, f: impl FnOnce(SinValue) -> T) -> Option { - match self? { - (_, SinValue::None) => None, - (_, v) => Some(f(v)), - } - } -} - -#[allow(unused_variables)] -#[track_caller] -pub fn assert_owner(owner: Owner, key: &TypeId, storage: &Storage) { - #[cfg(debug_assertions)] - if let Some(prev_owner) = storage.get_singleton_owner(key) { - assert_eq!( - prev_owner, owner, - "Ownership violation: expected {prev_owner}, found {owner}" - ); - } -} diff --git a/ts_kv_store/src/storage.rs b/ts_kv_store/src/storage.rs index 970a0b8..03ced15 100644 --- a/ts_kv_store/src/storage.rs +++ b/ts_kv_store/src/storage.rs @@ -1,20 +1,39 @@ use std::{ any::{Any, TypeId}, - collections::HashMap, + borrow::Borrow, + collections::{HashMap, HashSet}, + hash::Hash, sync::Arc, }; -use crate::{Error, Owner, Result, schema}; +use crate::{ + Error, Owner, Result, + schema::{self, IndexStorage}, + transactions::TxnId, +}; /// Where we store the data. #[doc(hidden)] pub struct Storage { /// The key is the TypeId of the generated marker type for the KV data. See [`SinValue`] for /// how values are represented in the store. - pub(crate) singletons: HashMap, + pub(crate) singletons: HashMap)>, /// Storage for tabular data. The concrete type will be macro-generated, see the [`crate::schema`] /// module. pub(crate) tables: TableStorage, + + /// The id of the most-recently committed transaction. + pub(crate) committed: TxnId, + /// `None` if there is no transaction in progress. `Some` if there is a transaction in progress + /// or a transaction has been aborted without proper rollback. The `Vec` tracks the keys of + /// singleton key-values modified during this transaction. It may be an over-approximation, but + /// not an under-approximation. `pending_txn` must not be cleared until a transaction has been + /// fully committed or fully rolled-back. + /// + /// `self.tables` may only contain un-committed state if `pending_txn.is_some()`. + pending_txn: Option<(TxnId, Vec)>, + /// Counter for creating new transaction ids. + next_txn: TxnId, } impl Storage { @@ -24,36 +43,150 @@ impl Storage { Storage { singletons: HashMap::new(), tables: TableStorage::default(), + committed: TxnId::FIRST, + pending_txn: None, + next_txn: TxnId::FIRST.next(), + } + } + + /// Returns the transaction id of the current transaction, or the last committed transaction if there + /// is no current transaction. + /// + /// Note that calling this without first ensuring that any aborted transaction has been cleaned + /// may cause this method to be inaccurate. + pub(crate) fn txn_id(&self) -> TxnId { + self.current_txn().unwrap_or(self.committed) + } + + pub(crate) fn current_txn(&self) -> Option { + self.pending_txn.as_ref().map(|(id, _)| *id) + } + + pub(crate) fn max_committed_id(&self) -> TxnId { + self.committed + } + + fn record_mutation(&mut self, key: TypeId, txn_id: TxnId) { + if self.committed != txn_id + && let Some((id, modified)) = &mut self.pending_txn + { + assert_eq!(*id, txn_id); + modified.push(key); } } - pub(crate) fn insert_singleton( + pub(crate) fn insert_singleton( &mut self, key: TypeId, owner: Owner, - value: SinValue, - ) -> Option<(Owner, SinValue)> { - self.singletons.insert(key, (owner, value)) + value: D::ArgValue, + txn_id: TxnId, + ) { + self.record_mutation(key, txn_id); + match self.singletons.get_mut(&key) { + Some((_, v)) => { + v.set(D::to_value(value), txn_id); + } + None => { + self.singletons.insert( + key, + (owner, VersionedValue::new(D::to_value(value), txn_id)), + ); + } + } } - pub(crate) fn remove_singleton(&mut self, key: &TypeId) -> Option<(Owner, SinValue)> { - self.singletons.remove(key) + pub(crate) fn remove_singleton(&mut self, key: TypeId, txn_id: TxnId) { + self.record_mutation(key, txn_id); + if let Some((_, value)) = self.singletons.get_mut(&key) { + // TODO ownership is wrong, but we're getting rid of dynamic ownership + value.set(SinValue::None, txn_id); + } } /// Retrieve a singleton value from the store using the given type-key. - pub(crate) fn get_singleton_value(&self, key: &TypeId) -> Option<&SinValue> { - self.singletons.get(key).map(|(_, v)| v) + pub(crate) fn get_singleton_value( + &self, + key: TypeId, + txn_id: TxnId, + ) -> Option<&D::Value> { + let value = self.singletons.get(&key).map(|(_, v)| v); + map_singleton_value(value, txn_id, |v| D::from_value_ref(v)) } /// Retrieve a singleton value from the store using the given type-key. - pub(crate) fn get_singleton_value_mut(&mut self, key: &TypeId) -> Option<&mut SinValue> { - self.singletons.get_mut(key).map(|&mut (_, ref mut v)| v) + pub(crate) fn get_singleton_arc( + &self, + key: TypeId, + txn_id: TxnId, + ) -> Option> { + let value = self.singletons.get(&key).map(|(_, v)| v); + map_singleton_value(value, txn_id, |v| D::from_value_arc(v)) } /// Retrieve the owner of a singleton KV pair using the given type-key. #[cfg(debug_assertions)] - pub(crate) fn get_singleton_owner(&self, key: &TypeId) -> Option { - self.singletons.get(key).map(|(o, _)| *o) + pub(crate) fn get_singleton_owner(&self, key: TypeId) -> Option { + self.singletons.get(&key).map(|(o, _)| *o) + } + + /// Begin a new transaction. Returns the transactions unique id. + pub(crate) fn begin_transaction(&mut self) -> TxnId { + let new_txn_id = self.next_txn; + self.pending_txn = Some((new_txn_id, Vec::new())); + self.next_txn = new_txn_id.next(); + new_txn_id + } + + /// Commit a transaction. Returns an error if committing fails, usually because the store's + /// current transaction does not match the `txn_id` (which should be impossible with only safe + /// code). + pub(crate) fn commit_transaction(&mut self, txn_id: TxnId) -> Result<()> { + let Some((pending_txn, _)) = self.pending_txn else { + return Err(Error::TransactionFailed); + }; + if pending_txn != txn_id { + return Err(Error::TransactionFailed); + } + + self.tables.commit_txn(txn_id)?; + self.committed = pending_txn; + self.pending_txn = None; + Ok(()) + } + + /// Rollback a transaction. Never returns an error. If `txn_id` does not match the store's current + /// transaction, then nothing happens (the `txn_id` transaction must already have been committed + /// or rolled back). + pub(crate) fn rollback_transaction(&mut self, txn_id: TxnId) { + let Some((pending_txn, _)) = self.pending_txn else { + return; + }; + if pending_txn != txn_id { + return; + } + + self.clear_transaction(); + } + + /// Garbage-collects the remnants of any in-progress transaction, leaving the store ready to begin + /// another transaction or perform raw operations. + pub(crate) fn clear_transaction(&mut self) { + if let Some((id, modified)) = &self.pending_txn { + for k in modified { + let Some((_, v)) = self.singletons.get_mut(k) else { + continue; + }; + v.gc_txn(*id); + + if v.is_empty() { + self.singletons.remove(k); + } + } + self.tables.gc_txn(*id); + // Must do this last so it is only cleared if we've successfully GCed the store. + self.pending_txn = None; + } } } @@ -61,13 +194,13 @@ impl Storage { #[doc(hidden)] #[derive(Default)] pub enum SinValue { - /// Tombstone value. Used where we want to specify an owner but don't have value yet. + /// Tombstone value. #[default] None, // TODO add other special cases /// A single, inline `u64`. U64(u64), - /// A boxed value (i.e., a pointer is stored in the store). + /// A boxed value. Box(Box), /// A shared reference in the store. Arc(Arc), @@ -75,14 +208,354 @@ pub enum SinValue { Ref(&'static (dyn Any + Send + Sync)), } +fn map_singleton_value<'a, T>( + value: Option<&'a VersionedValue>, + id: TxnId, + f: impl FnOnce(&'a SinValue) -> T, +) -> Option { + match value?.get(id)? { + SinValue::None => None, + v => Some(f(v)), + } +} + +/// An MVCC value with only two versions (versioned by [`TxnId`]). +#[doc(hidden)] +#[derive(Debug)] +pub struct VersionedValue { + slot_a: Option<(TxnId, T)>, + slot_b: Option<(TxnId, T)>, +} + +impl Default for VersionedValue { + fn default() -> Self { + VersionedValue { + slot_a: None, + slot_b: None, + } + } +} + +impl VersionedValue { + pub(crate) fn new(t: T, id: TxnId) -> Self { + VersionedValue { + slot_a: Some((id, t)), + slot_b: None, + } + } + + fn gc_txn(&mut self, txn_id: TxnId) { + if let Some((id, _)) = self.slot_a + && id == txn_id + { + self.slot_a = None; + } + if let Some((id, _)) = self.slot_b + && id == txn_id + { + self.slot_b = None; + } + } + + /// True if neither slot holds a value. + fn is_empty(&self) -> bool { + self.slot_a.is_none() && self.slot_b.is_none() + } + + /// If there is a value visible to `txn_id` in one slot, clone it into the other slot and return + /// a mutable reference to it. + fn internal_clone(&mut self, txn_id: TxnId) -> Option<&mut T> + where + T: Clone, + { + if self.slot_a.is_none() && self.slot_b.is_none() { + return None; + } + + // The unpleasantness with `unwrap`s is to work around lifetime issues. + if let Some((aid, a)) = &mut self.slot_a { + if txn_id == *aid { + Some(&mut self.slot_a.as_mut().unwrap().1) + } else if let Some((bid, b)) = &mut self.slot_b { + // Use the current transaction's value or the older of the other two (committed) values. + if txn_id == *bid { + Some(&mut self.slot_b.as_mut().unwrap().1) + } else if aid > bid { + debug_assert!(txn_id > *aid && txn_id > *bid); + self.slot_b = Some((txn_id, a.clone())); + Some(&mut self.slot_b.as_mut().unwrap().1) + } else { + debug_assert!(txn_id > *aid && txn_id > *bid); + self.slot_a = Some((txn_id, b.clone())); + Some(&mut self.slot_a.as_mut().unwrap().1) + } + } else { + self.slot_b = Some((txn_id, a.clone())); + Some(&mut self.slot_b.as_mut().unwrap().1) + } + } else if let Some((bid, b)) = &mut self.slot_b { + if txn_id == *bid { + Some(b) + } else { + self.slot_a = Some((txn_id, b.clone())); + Some(&mut self.slot_a.as_mut().unwrap().1) + } + } else { + unreachable!(); + } + } + + pub(crate) fn get(&self, id: TxnId) -> Option<&T> { + // This could be expressed more simply with a match, but that doesn't work for `get_mut` because + // of mutable borrows. Since the functions do the same thing, I use the more complex code + // here too. + if let Some((aid, a)) = &self.slot_a + && id >= *aid + { + if let Some((bid, b)) = &self.slot_b + && id >= *bid + { + debug_assert_ne!(aid, bid); + if aid > bid { Some(a) } else { Some(b) } + } else { + Some(a) + } + } else if let Some((bid, b)) = &self.slot_b + && id >= *bid + { + Some(b) + } else { + None + } + } + + pub(crate) fn get_mut(&mut self, id: TxnId) -> Option<&mut T> { + if let Some((aid, a)) = &mut self.slot_a + && id >= *aid + { + if let Some((bid, b)) = &mut self.slot_b + && id >= *bid + { + debug_assert_ne!(aid, bid); + if aid > bid { Some(a) } else { Some(b) } + } else { + Some(a) + } + } else if let Some((bid, b)) = &mut self.slot_b + && id >= *bid + { + Some(b) + } else { + None + } + } + + pub(crate) fn take(&mut self, id: TxnId) -> Option { + match (&mut self.slot_a, &mut self.slot_b) { + (Some((aid, a)), Some((bid, b))) if id >= *aid && id >= *bid => { + debug_assert_ne!(aid, bid); + if aid > bid { + self.slot_a.take().map(|(_, v)| v) + } else { + self.slot_b.take().map(|(_, v)| v) + } + } + (Some((vid, v)), _) if id >= *vid => self.slot_a.take().map(|(_, v)| v), + (_, Some((vid, v))) if id >= *vid => self.slot_b.take().map(|(_, v)| v), + _ => None, + } + } + + pub(crate) fn set(&mut self, value: T, id: TxnId) { + match (&mut self.slot_a, &mut self.slot_b) { + (Some((vid, v)), _) | (_, Some((vid, v))) if id == *vid => { + // Overwrite a value from the current transaction. + *v = value; + } + (Some((aid, a)), Some((bid, b))) => { + // Overwrite the older of two committed values. + if aid > bid { + *b = value; + *bid = id; + } else { + *a = value; + *aid = id; + } + } + (None, _) => { + // Write into an empty slot (the other must be committed or also empty). + self.slot_a = Some((id, value)); + } + (_, None) => { + // Write into an empty slot (the other must be committed). + self.slot_b = Some((id, value)); + } + } + } +} + +/// Tracks deletes in a transaction without modifying the permanent storage (to allow rollback). +#[derive(Default, Debug)] +enum DeleteMask { + /// No delete mask, storage should be accessed directly. + #[default] + None, + + /// The whole table has been deleted, the second field contains new key-value pairs. + /// + /// The `VersionedValue` will always have a single value with the same transaction id as the first + /// field. We use this layout so that the delete mask can be committed with a single pointer swap + /// and so that it can be iterated with the same type of iterator as the main storage. + All(TxnId, HashMap>), + + /// Some rows in the table have been deleted, tracked in the second field. + Some(TxnId, HashSet), +} + +impl DeleteMask { + fn check_txn_id(&self, txn_id: TxnId) -> bool { + match self { + DeleteMask::None => true, + DeleteMask::All(self_id, _) | DeleteMask::Some(self_id, _) => *self_id == txn_id, + } + } + + fn clear(&mut self, txn_id: TxnId) { + debug_assert!(self.check_txn_id(txn_id)); + *self = Self::All(txn_id, HashMap::new()); + } + + fn remove(&mut self, k: &Q, txn_id: TxnId, data: &HashMap>) -> Option + where + K: Borrow, + Q: ?Sized + Hash + Eq + ToOwned, + { + debug_assert!(self.check_txn_id(txn_id)); + + // Only record a delete for a key that is actually present (and visible to this transaction). + let present = data.get(k).and_then(|v| v.get(txn_id)).is_some(); + + match self { + DeleteMask::None => { + if present { + let mut removed: HashSet = HashSet::new(); + removed.insert(k.to_owned()); + *self = Self::Some(txn_id, removed); + } + None + } + DeleteMask::All(_, present_rows) => { + present_rows.remove(k).and_then(|mut v| v.take(txn_id)) + } + DeleteMask::Some(_, removed) => { + if present { + removed.insert(k.to_owned()); + } + None + } + } + } + + fn get(&self, k: &Q, txn_id: TxnId) -> MaskStatus<&V> + where + K: Borrow, + Q: ?Sized + Hash + Eq, + { + debug_assert!(self.check_txn_id(txn_id)); + match self { + DeleteMask::All(self_id, present) if *self_id == txn_id => match present.get(k) { + Some(v) => match v.get(txn_id) { + Some(v) => MaskStatus::Overwritten(v), + None => MaskStatus::Unknown, + }, + None => MaskStatus::Removed, + }, + DeleteMask::Some(self_id, removed) if *self_id == txn_id => { + if removed.contains(k) { + MaskStatus::Removed + } else { + MaskStatus::Unknown + } + } + _ => MaskStatus::Unknown, + } + } + + fn get_mut(&mut self, k: &Q, txn_id: TxnId) -> MaskStatus<&mut V> + where + K: Borrow, + Q: ?Sized + Hash + Eq, + { + debug_assert!(self.check_txn_id(txn_id)); + match self { + DeleteMask::None => MaskStatus::Unknown, + DeleteMask::All(self_id, present) if *self_id == txn_id => match present.get_mut(k) { + Some(v) => match v.get_mut(txn_id) { + Some(v) => MaskStatus::Overwritten(v), + None => MaskStatus::Unknown, + }, + None => MaskStatus::Removed, + }, + DeleteMask::Some(self_id, removed) if *self_id == txn_id => { + if removed.contains(k) { + MaskStatus::Removed + } else { + MaskStatus::Unknown + } + } + _ => MaskStatus::Unknown, + } + } + + fn insert(&mut self, k: K, v: V, txn_id: TxnId) -> MaskInsertResult { + debug_assert!(self.check_txn_id(txn_id)); + match self { + DeleteMask::None => MaskInsertResult::NotWritten(k, v), + DeleteMask::All(_, present) => MaskInsertResult::Written( + present + .insert(k, VersionedValue::new(v, txn_id)) + .and_then(|mut v| v.take(txn_id)), + ), + DeleteMask::Some(_, removed) => { + removed.remove(&k); + MaskInsertResult::NotWritten(k, v) + } + } + } +} + +/// The status of a row of a table in the delete mask. +enum MaskStatus { + /// No presence in the delete mask. + Unknown, + /// The key has been deleted. + Removed, + /// The key has been deleted, then a new value written. + Overwritten(T), +} + +/// The result of attempting to write into a delete mask. +enum MaskInsertResult { + /// The field is the key and value which was intended to be written. + NotWritten(K, V), + /// The field is the previous value in the delete mask, if there is one. + Written(Option), +} + /// Tabular data in the KV store, there will be one of these for each logical table in the storage /// implementing `TableStorage` in [`Storage`]. #[doc(hidden)] -pub struct Table { +pub struct Table { /// Owner of the table. pub(crate) owner: Option, /// KV data. - pub data: HashMap, + data: HashMap>, + /// A mask of deleted rows in the table. Should be checked before reading from `data`. + delete_mask: DeleteMask, + /// A list of keys modified (includes inserts, but not deletes) by the given transaction. + /// + /// Can be an over-approximation, but not an under-approximation. + modified: Option>, /// All indexes of this table (empty if there are no indexes or this table itself is an index). pub indexes: I, } @@ -92,12 +565,14 @@ impl Default for Table { Self { owner: None, data: HashMap::new(), + delete_mask: DeleteMask::None, + modified: None, indexes: I::default(), } } } -impl Table { +impl> Table { pub(crate) fn try_set_owner(&mut self, owner: Owner) -> Result<()> { match &self.owner { Some(owner) => Err(Error::AlreadyInit(owner)), @@ -108,6 +583,258 @@ impl Table { } } + pub fn gc_txn(&mut self, txn_id: TxnId) { + self.delete_mask = DeleteMask::None; + + let Some(modified) = &self.modified else { + return; + }; + + assert_eq!( + modified.txn_id, txn_id, + "Found mismatched modified set to GC" + ); + + for k in &modified.keys { + if let Some(value) = self.data.get_mut(k) { + value.gc_txn(txn_id); + + // We don't need to do this, but I think we may as well free up the space. + if value.is_empty() { + self.data.remove(k); + } + } + } + + self.modified = None; + } + + /// Check if this table's transaction state is consistent for commit. + /// + /// Must be called before calling `commit_txn`. In theory, because of the global lock, this + /// should never happen. But if we were to allow transactions to be timed-out (or multiple + /// mutating transaction), or in the presence of unsafe code, then inconsistency could happen. + pub fn check_txn_consistency(&self, txn_id: TxnId) -> Result<()> { + if let Some(modified) = &self.modified + && modified.txn_id != txn_id + { + return Err(Error::TransactionFailed); + } + + match &self.delete_mask { + DeleteMask::All(dm_id, data) if *dm_id == txn_id => Ok(()), + DeleteMask::Some(dm_id, removed) if *dm_id == txn_id => Ok(()), + DeleteMask::None => Ok(()), + _ => Err(Error::TransactionFailed), + } + } + + /// Apply this table's transaction state to its storage. + /// + /// Precondition: `self.check_txn_consistency` returns `Ok`. (Otherwise, commit may not be atomic). + /// + /// Panics if `self.check_txn_consistency` would return an error. + pub fn commit_txn(&mut self, txn_id: TxnId) { + if let Some(modified) = &self.modified { + assert_eq!(modified.txn_id, txn_id); + self.modified = None; + } + + match std::mem::take(&mut self.delete_mask) { + DeleteMask::All(dm_id, data) if dm_id == txn_id => { + self.data = data; + } + DeleteMask::Some(dm_id, removed) if dm_id == txn_id => { + removed.iter().for_each(|k| { + self.data.remove(k); + }); + } + DeleteMask::None => {} + _ => unreachable!(), + } + } + + pub(crate) fn len(&self, txn_id: TxnId) -> usize { + match &self.delete_mask { + DeleteMask::None => self.iter_data(txn_id).count(), + DeleteMask::All(_, pending) => pending.len(), + // Note that this only works because we take care in `VersionedValue::remove` only to track + // removals where the value is already in the main data. + DeleteMask::Some(_, removed) => self.iter_data(txn_id).count() - removed.len(), + } + } + + pub(crate) fn is_empty(&self, txn_id: TxnId) -> bool { + match &self.delete_mask { + DeleteMask::None => self.iter_data(txn_id).count() == 0, + DeleteMask::All(_, pending) => pending.is_empty(), + DeleteMask::Some(..) if self.data.is_empty() => true, + // Note that this only works because we take care in `VersionedValue::remove` only to track + // removals where the value is already in the main data. + DeleteMask::Some(_, removed) => self.iter_data(txn_id).count() - removed.len() == 0, + } + } + + pub(crate) fn iter<'a>(&'a self, txn_id: TxnId) -> TableIterator<'a, D> { + let data = match &self.delete_mask { + DeleteMask::None | DeleteMask::Some(..) => &self.data, + DeleteMask::All(_, pending) => pending, + }; + + TableIterator:: { + data: data.iter(), + delete_mask: &self.delete_mask, + txn_id, + } + } + + pub(crate) fn get(&self, key: &Q, txn_id: TxnId) -> Option<&D::Value> + where + D::Key: Borrow, + Q: ?Sized + Hash + Eq, + { + get_from_table::(&self.delete_mask, &self.data, key, txn_id) + } + + pub(crate) fn with_mut( + &mut self, + key: &Q, + f: impl FnOnce(&mut D::Value) -> T, + txn_id: TxnId, + max_committed_id: TxnId, + ) -> Option + where + D::Key: Borrow, + Q: ?Sized + Hash + Eq + ToOwned, + D::Value: Clone, + { + let value = match self.delete_mask.get_mut(key, txn_id) { + MaskStatus::Unknown => self.data.get_mut(key)?.internal_clone(txn_id)?, + MaskStatus::Removed => return None, + MaskStatus::Overwritten(v) => v, + }; + + // TODO we could be more efficient and only update indexes if the foreign key changes + self.indexes.on_remove(value, txn_id, max_committed_id); + record_mutation(&mut self.modified, key, txn_id, max_committed_id); + let result = f(value); + self.indexes.on_insert(key, value, txn_id, max_committed_id); + Some(result) + } + + fn iter_data(&self, txn_id: TxnId) -> impl Iterator { + self.data + .iter() + .filter_map(move |(k, v)| Some((k, v.get(txn_id)?))) + } + + fn iter_data_mut( + data: &mut HashMap>, + txn_id: TxnId, + ) -> impl Iterator + where + D::Value: Clone, + { + data.iter_mut() + .filter_map(move |(k, v)| Some((k, v.internal_clone(txn_id)?))) + } + + pub(crate) fn for_each_mut( + &mut self, + mut f: impl FnMut(&D::Key, &mut D::Value), + txn_id: TxnId, + max_committed_id: TxnId, + ) where + D::Value: Clone, + { + // TODO could be more efficient by only updating if value is changed + + match &mut self.delete_mask { + DeleteMask::None => Self::iter_data_mut(&mut self.data, txn_id).for_each(|(k, v)| { + self.indexes.on_remove(v, txn_id, max_committed_id); + record_mutation(&mut self.modified, k, txn_id, max_committed_id); + f(k, v); + self.indexes.on_insert(k, v, txn_id, max_committed_id); + }), + DeleteMask::All(_, pending) => pending.iter_mut().for_each(|(k, v)| { + if let Some(v) = v.get_mut(txn_id) { + self.indexes.on_remove(v, txn_id, max_committed_id); + record_mutation(&mut self.modified, k, txn_id, max_committed_id); + f(k, v); + self.indexes.on_insert(k, v, txn_id, max_committed_id); + } + }), + DeleteMask::Some(_, removed) => { + Self::iter_data_mut(&mut self.data, txn_id).for_each(|(k, v)| { + if removed.contains(k) { + return; + } + self.indexes.on_remove(v, txn_id, max_committed_id); + record_mutation(&mut self.modified, k, txn_id, max_committed_id); + f(k, v); + self.indexes.on_insert(k, v, txn_id, max_committed_id); + }) + } + } + } + + pub fn insert(&mut self, key: D::Key, value: D::Value, txn_id: TxnId, max_committed_id: TxnId) { + if let Some(old_value) = + get_from_table::(&self.delete_mask, &self.data, &key, txn_id) + { + self.indexes.on_remove(old_value, txn_id, max_committed_id); + } + self.indexes + .on_insert(&key, &value, txn_id, max_committed_id); + + match self.delete_mask.insert(key, value, txn_id) { + MaskInsertResult::NotWritten(key, value) => { + record_mutation(&mut self.modified, &key, txn_id, max_committed_id); + let entry = self.data.entry(key).or_default(); + entry.set(value, txn_id); + } + MaskInsertResult::Written(_) => {} + } + } + + pub fn remove(&mut self, key: &Q, txn_id: TxnId, max_committed_id: TxnId) + where + D::Key: Borrow, + Q: ?Sized + Hash + Eq + ToOwned, + { + if txn_id == max_committed_id { + if let Some(value) = self.data.remove(key).and_then(|mut v| v.take(txn_id)) { + self.indexes.on_remove(&value, txn_id, max_committed_id); + } + } else if txn_id > max_committed_id { + let dm_result = self.delete_mask.remove(key, txn_id, &self.data); + if let Some(value) = dm_result + .as_ref() + .or_else(|| self.data.get(key).and_then(|v| v.get(txn_id))) + { + self.indexes.on_remove(value, txn_id, max_committed_id); + } + } else { + unreachable!( + "current transaction id less than committed id: {txn_id:?} < {max_committed_id:?}" + ); + } + } + + pub fn clear(&mut self, txn_id: TxnId, max_committed_id: TxnId) { + if txn_id == max_committed_id { + self.data.clear(); + } else if txn_id > max_committed_id { + self.delete_mask.clear(txn_id); + } else { + unreachable!( + "current transaction id less than committed id: {txn_id:?} < {max_committed_id:?}" + ); + } + + self.indexes.clear(txn_id, max_committed_id); + } + pub(crate) fn assert_or_set_owner(&mut self, owner: Owner) { match &self.owner { Some(prev_owner) => debug_assert_eq!( @@ -129,3 +856,333 @@ impl Table { } } } + +// Helper function for `get`, since calling `self.get` can cause lifetime issues. +fn get_from_table<'a, D: schema::TableDesc, Q>( + delete_mask: &'a DeleteMask, + data: &'a HashMap>, + key: &Q, + txn_id: TxnId, +) -> Option<&'a D::Value> +where + D::Key: Borrow, + Q: ?Sized + Hash + Eq, +{ + Some(match delete_mask.get(key, txn_id) { + MaskStatus::Unknown => data.get(key)?.get(txn_id)?, + MaskStatus::Removed => return None, + MaskStatus::Overwritten(v) => v, + }) +} + +struct TxnMutations { + txn_id: TxnId, + keys: Vec, +} + +fn record_mutation( + modified: &mut Option>, + key: &Q, + txn_id: TxnId, + max_committed_id: TxnId, +) where + K: Borrow, + Q: ?Sized + Hash + Eq + ToOwned, +{ + if txn_id != max_committed_id { + let key = key.to_owned(); + match modified { + Some(modified) => { + assert_eq!(modified.txn_id, txn_id); + modified.keys.push(key); + } + None => { + *modified = Some(TxnMutations { + txn_id, + keys: vec![key], + }); + } + } + } +} + +/// Iterate the key-value pairs in a table. +/// +/// Takes into account the state of the table as visible to the transaction with id `self.txn_id`. +pub struct TableIterator<'a, D: schema::TableDesc> { + data: std::collections::hash_map::Iter<'a, D::Key, VersionedValue>, + delete_mask: &'a DeleteMask, + txn_id: TxnId, +} + +impl<'a, D: schema::TableDesc> Iterator for TableIterator<'a, D> { + type Item = (&'a D::Key, &'a D::Value); + + fn next(&mut self) -> Option { + match &self.delete_mask { + DeleteMask::None | DeleteMask::All(..) => { + for (k, v) in &mut self.data { + if let Some(v) = v.get(self.txn_id) { + return Some((k, v)); + } + } + None + } + DeleteMask::Some(_, removed) => { + for (k, v) in &mut self.data { + if !removed.contains(k) + && let Some(v) = v.get(self.txn_id) + { + return Some((k, v)); + } + } + None + } + } + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::transactions::TxnId; + + #[test] + fn get_returns_none_when_empty() { + let v: VersionedValue = VersionedValue::default(); + assert!(v.get(TxnId::new(2)).is_none()); + } + + #[test] + fn get_returns_value_from_slot_a() { + let v = VersionedValue { + slot_a: Some((TxnId::new(2), 42u32)), + slot_b: None, + }; + assert_eq!(v.get(TxnId::new(2)), Some(&42)); + } + + #[test] + fn get_returns_value_from_slot_b() { + let v = VersionedValue { + slot_a: None, + slot_b: Some((TxnId::new(2), 42u32)), + }; + assert_eq!(v.get(TxnId::new(2)), Some(&42)); + } + + #[test] + fn get_returns_none_when_id_less_than_slot_id() { + let v = VersionedValue { + slot_a: Some((TxnId::new(3), 42u32)), + slot_b: None, + }; + assert!(v.get(TxnId::new(2)).is_none()); + } + + #[test] + fn get_returns_most_recent_when_both_slots_visible() { + let v = VersionedValue { + slot_a: Some((TxnId::new(3), 10u32)), + slot_b: Some((TxnId::new(2), 20u32)), + }; + assert_eq!(v.get(TxnId::new(5)), Some(&10)); + } + + #[test] + fn get_returns_value_when_id_exceeds_slot_id() { + let v = VersionedValue { + slot_a: Some((TxnId::new(2), 42u32)), + slot_b: None, + }; + assert_eq!(v.get(TxnId::new(5)), Some(&42)); + } + + #[test] + fn get_returns_visible_value_when_one_slot_is_not_visible() { + let v = VersionedValue { + slot_a: Some((TxnId::new(5), 10u32)), + slot_b: Some((TxnId::new(2), 20u32)), + }; + assert_eq!(v.get(TxnId::new(3)), Some(&20)); + } + + // take + + #[test] + fn take_returns_none_when_empty() { + let mut v: VersionedValue = VersionedValue::default(); + assert!(v.take(TxnId::new(2)).is_none()); + } + + #[test] + fn take_returns_and_clears_slot_a() { + let mut v = VersionedValue { + slot_a: Some((TxnId::new(2), 42u32)), + slot_b: None, + }; + assert_eq!(v.take(TxnId::new(2)), Some(42)); + assert!(v.slot_a.is_none()); + } + + #[test] + fn take_returns_and_clears_slot_b() { + let mut v = VersionedValue { + slot_a: None, + slot_b: Some((TxnId::new(2), 42u32)), + }; + assert_eq!(v.take(TxnId::new(2)), Some(42)); + assert!(v.slot_b.is_none()); + } + + #[test] + fn take_returns_most_recent_and_clears_its_slot() { + let mut v = VersionedValue { + slot_a: Some((TxnId::new(3), 10u32)), + slot_b: Some((TxnId::new(2), 20u32)), + }; + assert_eq!(v.take(TxnId::new(5)), Some(10)); + assert!(v.slot_a.is_none()); + assert!(v.slot_b.is_some()); + } + + #[test] + fn take_returns_none_when_id_less_than_slot_id() { + let mut v = VersionedValue { + slot_a: Some((TxnId::new(3), 42u32)), + slot_b: None, + }; + assert!(v.take(TxnId::new(2)).is_none()); + assert!(v.slot_a.is_some()); + } + + #[test] + fn take_returns_visible_value_when_one_slot_is_not_visible() { + let mut v = VersionedValue { + slot_a: Some((TxnId::new(5), 10u32)), + slot_b: Some((TxnId::new(2), 20u32)), + }; + assert_eq!(v.take(TxnId::new(3)), Some(20)); + assert!(v.slot_b.is_none()); + assert!(v.slot_a.is_some()); + } + + // set + + #[test] + fn set_into_empty_writes_to_slot_a() { + let mut v: VersionedValue = VersionedValue::default(); + v.set(42, TxnId::new(2)); + assert_eq!(v.slot_a, Some((TxnId::new(2), 42))); + assert!(v.slot_b.is_none()); + } + + #[test] + fn set_overwrites_same_txn_id_in_slot_a() { + let mut v = VersionedValue { + slot_a: Some((TxnId::new(2), 1u32)), + slot_b: None, + }; + v.set(99, TxnId::new(2)); + assert_eq!(v.slot_a, Some((TxnId::new(2), 99))); + } + + #[test] + fn set_overwrites_same_txn_id_in_slot_b() { + let mut v = VersionedValue { + slot_a: None, + slot_b: Some((TxnId::new(2), 1u32)), + }; + v.set(99, TxnId::new(2)); + assert_eq!(v.slot_b, Some((TxnId::new(2), 99))); + } + + #[test] + fn set_overwrites_older_slot_when_both_valid() { + let mut v = VersionedValue { + slot_a: Some((TxnId::new(3), 10u32)), + slot_b: Some((TxnId::new(2), 20u32)), + }; + v.set(99, TxnId::new(4)); + assert_eq!(v.slot_a, Some((TxnId::new(3), 10))); + assert_eq!(v.slot_b, Some((TxnId::new(4), 99))); + } + + #[test] + fn set_writes_to_empty_slot_a_when_slot_b_populated() { + let mut v = VersionedValue { + slot_a: None, + slot_b: Some((TxnId::new(2), 20u32)), + }; + v.set(99, TxnId::new(3)); + assert_eq!(v.slot_a, Some((TxnId::new(3), 99))); + assert_eq!(v.slot_b, Some((TxnId::new(2), 20))); + } + + #[test] + fn set_writes_to_slot_b_when_slot_a_populated() { + let mut v = VersionedValue { + slot_a: Some((TxnId::new(2), 10u32)), + slot_b: None, + }; + v.set(99, TxnId::new(3)); + assert_eq!(v.slot_a, Some((TxnId::new(2), 10))); + assert_eq!(v.slot_b, Some((TxnId::new(3), 99))); + } +} + +#[cfg(test)] +mod txn_test { + // The `tables!` macro generates a `KvStore` wrapper, which these storage-level tests + // (operating on `Storage` directly) do not use. + #![allow(dead_code)] + + use std::any::TypeId; + + use crate::{Error, singleton, storage::Storage, tables}; + + singleton!(Count(u64)); + tables!(); + + #[test] + fn commit_with_mismatched_id_fails() { + let mut storage = Storage::::new(); + let id = storage.begin_transaction(); + // A commit must target the in-progress transaction. + assert!(matches!( + storage.commit_transaction(id.next()), + Err(Error::TransactionFailed) + )); + } + + #[test] + fn commit_then_recommit_fails() { + let mut storage = Storage::::new(); + let id = storage.begin_transaction(); + assert!(storage.commit_transaction(id).is_ok()); + // No transaction is pending after a successful commit. + assert!(matches!( + storage.commit_transaction(id), + Err(Error::TransactionFailed) + )); + } + + #[test] + fn clear_transaction_rolls_back_and_frees_singleton_entry() { + let mut storage = Storage::::new(); + let id = storage.begin_transaction(); + let key = TypeId::of::(); + storage.insert_singleton::(key, "owner", 42, id); + // Visible to the in-progress transaction. + assert_eq!(storage.get_singleton_value::(key, id), Some(&42)); + + // Simulate cleanup of an abandoned transaction. + storage.clear_transaction(); + + assert!(storage.current_txn().is_none()); + let now = storage.txn_id(); + assert!(storage.get_singleton_value::(key, now).is_none()); + // The emptied entry was freed, not left as a dead `VersionedValue`. + assert!(!storage.singletons.contains_key(&key)); + } +} diff --git a/ts_kv_store/src/transactions.rs b/ts_kv_store/src/transactions.rs index a9f9578..a03729f 100644 --- a/ts_kv_store/src/transactions.rs +++ b/ts_kv_store/src/transactions.rs @@ -1,55 +1,90 @@ //! KvStore transactional API. +//! +//! Most of the implementation of transactions is in the [`storage`](crate::storage) module. use std::{ borrow::Borrow, + cell::UnsafeCell, hash::Hash, - sync::{Arc, RwLockReadGuard, RwLockWriteGuard, TryLockError}, + num::NonZeroU64, + sync::{Arc, RwLockReadGuard, RwLockWriteGuard}, }; use crate::{ - KvStore, Owner, RefReadGuard, RefWriteGuard, RefWriteGuardMut, Result, + KvStore, Owner, Result, index::{KvTableRoTransactionalIndex, KvTableTransactionalIndex}, operations::{Ops, OpsMut, SingletonOps, SingletonOpsMut, TabularOps, TabularOpsMut}, schema::{self, TableDesc}, storage::Storage, }; +/// Uniquely identifies a transaction. +/// +/// Guaranteed to be non-zero so that `size_of::>() == size_of::()`. +#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] +pub struct TxnId(NonZeroU64); + +impl TxnId { + pub(crate) const FIRST: Self = TxnId(NonZeroU64::new(1).unwrap()); + + pub(crate) fn next(self) -> Self { + TxnId(self.0.checked_add(1).unwrap()) + } + + #[cfg(test)] + pub(crate) fn new(n: u64) -> Self { + TxnId(NonZeroU64::new(n).unwrap()) + } +} + impl KvStore { /// Start a transaction. /// /// Blocks until the store's global lock is available for write access. - pub fn begin_transaction(&self, owner: Owner) -> Transaction<'_, Storage> { + pub fn begin_transaction(&self, owner: Owner) -> Transaction<'_, TableStorage> { + let mut guard = self.get_write_lock(); + let id = guard.begin_transaction(); + Transaction { - guard: self.storage.write().unwrap(), + guard, owner, + id, + _not_send_or_sync: UnsafeCell::new(()), } } /// Start a transaction. /// /// Returns `None` if the store's global lock is unavailable for write access. - pub fn try_begin_transaction( - &self, - owner: Owner, - ) -> Option>> { - let guard = match self.storage.try_write() { + pub fn try_begin_transaction(&self, owner: Owner) -> Option> { + self.clear_lock_poison(); + + let mut guard = match self.storage.try_write() { Ok(g) => g, - Err(TryLockError::WouldBlock) => return None, - Err(TryLockError::Poisoned(_)) => panic!(), + _ => return None, }; - Some(Transaction { guard, owner }) + // Garbage-collect any abandoned transaction before starting a new one (matches the blocking + // `begin_transaction`, which goes through `get_write_lock`). + guard.clear_transaction(); + let id = guard.begin_transaction(); + + Some(Transaction { + guard, + owner, + id, + _not_send_or_sync: UnsafeCell::new(()), + }) } /// Start a read-only transaction (i.e., only supports non-mutating access to the store, but /// all reads are guaranteed to be atomic). /// /// Blocks until the store's global lock is available for read access. - pub fn begin_ro_transaction(&self, owner: Owner) -> RoTransaction<'_, Storage> { - RoTransaction { - guard: self.storage.read().unwrap(), - owner, - } + pub fn begin_ro_transaction(&self, owner: Owner) -> RoTransaction<'_, TableStorage> { + let guard = self.get_read_lock(); + + RoTransaction { guard, owner } } /// Start a read-only transaction (i.e., only supports non-mutating access to the store, but @@ -59,13 +94,25 @@ impl KvStore { pub fn try_begin_ro_transaction( &self, owner: Owner, - ) -> Option>> { - let guard = match self.storage.try_read() { - Ok(g) => g, - Err(TryLockError::WouldBlock) => return None, - Err(TryLockError::Poisoned(_)) => panic!(), + ) -> Option> { + self.clear_lock_poison(); + + if let Ok(guard) = self.storage.try_read() { + return Some(RoTransaction { guard, owner }); + } + + // Garbage-collect the abandoned transaction before reading. + let Ok(mut guard) = self.storage.try_write() else { + return None; }; + + guard.clear_transaction(); + let guard = RwLockWriteGuard::downgrade(guard); + Some(RoTransaction { guard, owner }) + // We only try once, rather than loop because if the user is calling `try_begin_...`, they + // presumably don't want to wait and we'd only need to retry if someone else grabbed the + // the lock before we could. } // TODO single-table transactions? @@ -79,50 +126,60 @@ impl KvStore { /// ([`Transaction::commit`] can be used for this). /// /// A transaction must not be kept alive over an `await` point. This can lead to deadlock. -pub struct Transaction<'guard, Storage> { - pub(crate) guard: RwLockWriteGuard<'guard, Storage>, +pub struct Transaction<'guard, TableStorage: schema::GeneratedStorage> { + pub(crate) guard: RwLockWriteGuard<'guard, Storage>, pub(crate) owner: Owner, + id: TxnId, + // Enforce the transaction is not `Send` or `Sync` so that it isn't accidentally held over an + // await point (at least with a parallel async runtime), etc. + _not_send_or_sync: UnsafeCell<()>, +} + +impl<'guard, TableStorage: schema::GeneratedStorage> Drop for Transaction<'guard, TableStorage> { + fn drop(&mut self) { + self.guard.rollback_transaction(self.id); + } } impl<'guard, 'a, TableStorage: schema::GeneratedStorage> Ops - for &'a Transaction<'guard, Storage> + for &'a Transaction<'guard, TableStorage> { - type ReadLock = RefWriteGuard<'a, 'guard, Storage>; + type ReadLock = &'a RwLockWriteGuard<'guard, Storage>; fn read_lock(self) -> Self::ReadLock { - RefWriteGuard(&self.guard) + &self.guard } } impl SingletonOps - for &Transaction<'_, Storage> + for &Transaction<'_, TableStorage> { } impl<'guard, 'a, TableStorage: schema::GeneratedStorage> OpsMut - for &'a mut Transaction<'guard, Storage> + for &'a mut Transaction<'guard, TableStorage> { - type WriteLock = RefWriteGuardMut<'a, 'guard, Storage>; + type WriteLock = &'a mut RwLockWriteGuard<'guard, Storage>; fn write_lock(self) -> Self::WriteLock { - RefWriteGuardMut(&mut self.guard) + &mut self.guard } } impl SingletonOpsMut - for &mut Transaction<'_, Storage> + for &mut Transaction<'_, TableStorage> { } -impl<'guard, TableStorage: schema::GeneratedStorage> Transaction<'guard, Storage> { +impl<'guard, TableStorage: schema::GeneratedStorage> Transaction<'guard, TableStorage> { /// Commit this transaction. /// /// This simply moves and drops the `Transaction` object. It is optional to call and currently /// always succeeds. You can use this method to release the transaction's lock on the store /// without needing an explicit scope. - pub fn commit(self) -> Result<()> { + pub fn commit(mut self) -> Result<()> { + self.guard.commit_transaction(self.id) // drop `self` to release the lock. - Ok(()) } /// Operate on tables of key/values in the store. @@ -183,59 +240,40 @@ impl<'guard, TableStorage: schema::GeneratedStorage> Transaction<'guard, Storage /// /// Returns the previous value if there is one, or `None` if there is no value for the specified key. // Question: do we need separate insert/update/upsert methods? - pub fn insert(&mut self, value: D::ArgValue) -> Option { + pub fn insert(&mut self, value: D::ArgValue) { <&mut Self as SingletonOpsMut<_>>::insert::(self, value, self.owner) } - /// Get mutable access to a value in the store. - /// - /// Returns `None` (and does not call `f`) if there is no value for the specified key. - pub fn with_mut( - &mut self, - f: impl FnOnce(&mut D::Value) -> T, - ) -> Option { - <&mut Self as SingletonOpsMut<_>>::with_mut::(self, f, self.owner) - } - /// Remove a single value from the store. /// /// Returns the previous value if there is one, or `None` if there is no value for the specified key. - pub fn remove(&mut self) -> Option { + pub fn remove(&mut self) { <&mut Self as SingletonOpsMut<_>>::remove::(self, self.owner) } - - /// Remove a single value from the store while preserving ownership of the key/value. - /// - /// Can also be used to initialize a key/value with a key but without a value. - /// - /// Returns the previous value if there is one, or `None` if there is no value for the specified key. - pub fn clear(&mut self) -> Option { - <&mut Self as SingletonOpsMut<_>>::clear::(self, self.owner) - } } /// Abstracts a table of key/values pairs in the store accessed as part of a transaction. pub struct KvTableTransactional<'guard, 'txn, D: TableDesc> { - txn: &'txn mut Transaction<'guard, Storage>, + txn: &'txn mut Transaction<'guard, D::Storage>, } impl<'guard, 'txn, 'table, D: TableDesc> Ops for &'table KvTableTransactional<'guard, 'txn, D> { - type ReadLock = RefWriteGuard<'table, 'guard, Storage>; + type ReadLock = &'table RwLockWriteGuard<'guard, Storage>; fn read_lock(self) -> Self::ReadLock { - RefWriteGuard(&self.txn.guard) + &self.txn.guard } } impl<'guard, 'txn, 'table, D: TableDesc> OpsMut for &'table mut KvTableTransactional<'guard, 'txn, D> { - type WriteLock = RefWriteGuardMut<'table, 'guard, Storage>; + type WriteLock = &'table mut RwLockWriteGuard<'guard, Storage>; fn write_lock(self) -> Self::WriteLock { - RefWriteGuardMut(&mut self.txn.guard) + &mut self.txn.guard } } @@ -251,7 +289,7 @@ impl<'guard, D: TableDesc> KvTableTransactional<'guard, '_, D> { /// Initialize a table by setting its owner. /// /// Calling this function is optional, a table can be used without initialization in which case, - /// its owner is set to the owner specifed in the first write. + /// its owner is set to the owner specified in the first write. /// /// Returns an error (containing the current owner of the table) if the table has already been /// initialized. In this case, the table will be in a consistent state and can be used as normal. @@ -300,7 +338,7 @@ impl<'guard, D: TableDesc> KvTableTransactional<'guard, '_, D> { /// Insert a value into the table. /// /// Returns the previous value if there is one, or `None` if there is no value for the specified key. - pub fn insert(&mut self, key: D::Key, value: D::Value) -> Option + pub fn insert(&mut self, key: D::Key, value: D::Value) where D::Key: Clone, { @@ -314,6 +352,7 @@ impl<'guard, D: TableDesc> KvTableTransactional<'guard, '_, D> { where D::Key: Borrow, Q: ?Sized + Hash + Eq + ToOwned, + D::Value: Clone, { <&mut Self as TabularOpsMut<_>>::with_mut(self, key, f, self.txn.owner) } @@ -321,10 +360,10 @@ impl<'guard, D: TableDesc> KvTableTransactional<'guard, '_, D> { /// Remove a row from the table. /// /// Returns the previous value if there is one, or `None` if there is no value for the specified key. - pub fn remove(&mut self, key: &Q) -> Option + pub fn remove(&mut self, key: &Q) where D::Key: Borrow, - Q: ?Sized + Hash + Eq, + Q: ?Sized + Hash + Eq + ToOwned, { <&mut Self as TabularOpsMut<_>>::remove(self, key, self.txn.owner) } @@ -345,7 +384,10 @@ impl<'guard, D: TableDesc> KvTableTransactional<'guard, '_, D> { } /// Iterate all the key/value pairs in a table. Values are mutable. - pub fn for_each_mut(&mut self, f: impl FnMut(&D::Key, &mut D::Value)) { + pub fn for_each_mut(&mut self, f: impl FnMut(&D::Key, &mut D::Value)) + where + D::Value: Clone, + { <&mut Self as TabularOpsMut<_>>::for_each_mut(self, f, self.txn.owner) } } @@ -358,33 +400,31 @@ impl<'guard, D: TableDesc> KvTableTransactional<'guard, '_, D> { /// as possible([`RoTransaction::commit`] can be used for this). /// /// A transaction must not be kept alive over an `await` point. This can lead to deadlock. -pub struct RoTransaction<'guard, Storage> { - pub(crate) guard: RwLockReadGuard<'guard, Storage>, +pub struct RoTransaction<'guard, TableStorage: schema::GeneratedStorage> { + pub(crate) guard: RwLockReadGuard<'guard, Storage>, pub(crate) owner: Owner, } impl<'guard, 'txn, TableStorage: schema::GeneratedStorage> Ops - for &'txn RoTransaction<'guard, Storage> + for &'txn RoTransaction<'guard, TableStorage> { - type ReadLock = RefReadGuard<'txn, 'guard, Storage>; + type ReadLock = &'txn RwLockReadGuard<'guard, Storage>; fn read_lock(self) -> Self::ReadLock { - RefReadGuard(&self.guard) + &self.guard } } impl<'guard, TableStorage: schema::GeneratedStorage> SingletonOps - for &RoTransaction<'guard, Storage> + for &RoTransaction<'guard, TableStorage> { } -impl<'guard, 'txn, 'table, D: TableDesc> Ops - for &'table KvTableRoTransactional<'guard, 'txn, D> -{ - type ReadLock = RefReadGuard<'table, 'guard, Storage>; +impl<'guard, 'txn, D: TableDesc> Ops for &'_ KvTableRoTransactional<'guard, 'txn, D> { + type ReadLock = &'txn RwLockReadGuard<'guard, Storage>; fn read_lock(self) -> Self::ReadLock { - RefReadGuard(&self.txn.guard) + &self.txn.guard } } @@ -392,7 +432,7 @@ impl<'guard, D: TableDesc> TabularOps for &KvTableRoTransactional<'g type TableDesc = D; } -impl<'guard, TableStorage: schema::GeneratedStorage> RoTransaction<'guard, Storage> { +impl<'guard, TableStorage: schema::GeneratedStorage> RoTransaction<'guard, TableStorage> { /// Commit this transaction. /// /// This simply moves and drops the `RoTransaction` object. It is optional to call and currently @@ -460,7 +500,7 @@ impl<'guard, TableStorage: schema::GeneratedStorage> RoTransaction<'guard, Stora /// Abstracts a table of key/values pairs in the store as part of a read-only transaction. pub struct KvTableRoTransactional<'guard, 'txn, D: TableDesc> { - txn: &'txn RoTransaction<'guard, Storage>, + txn: &'txn RoTransaction<'guard, D::Storage>, } impl<'guard, D: TableDesc> KvTableRoTransactional<'guard, '_, D> { @@ -565,6 +605,53 @@ mod test { assert!(store.try_begin_ro_transaction(OWNER).is_some()); } + #[test] + fn try_begin_transaction_after_commit_sees_committed() { + let store = KvStore::new(); + let mut txn = store.begin_transaction(OWNER); + txn.insert::(7); + txn.commit().unwrap(); + + let txn = store.try_begin_transaction(OWNER).unwrap(); + assert_eq!(txn.get::(), Some(7)); + } + + #[test] + fn try_begin_transaction_after_rollback_sees_pre_txn() { + let store = KvStore::new(); + store.insert::(OWNER, 1); + { + let mut txn = store.begin_transaction(OWNER); + txn.insert::(2); + // dropped without commit -> rolled back + } + let txn = store.try_begin_transaction(OWNER).unwrap(); + assert_eq!(txn.get::(), Some(1)); + } + + #[test] + fn try_begin_ro_transaction_after_commit_sees_committed() { + let store = KvStore::new(); + let mut txn = store.begin_transaction(OWNER); + txn.insert::(7); + txn.commit().unwrap(); + + let txn = store.try_begin_ro_transaction(OWNER).unwrap(); + assert_eq!(txn.get::(), Some(7)); + } + + #[test] + fn try_begin_ro_transaction_after_rollback_sees_pre_txn() { + let store = KvStore::new(); + store.insert::(OWNER, 1); + { + let mut txn = store.begin_transaction(OWNER); + txn.insert::(2); + } + let txn = store.try_begin_ro_transaction(OWNER).unwrap(); + assert_eq!(txn.get::(), Some(1)); + } + #[test] fn txn_get_returns_none_when_absent() { let store = KvStore::new(); @@ -588,15 +675,6 @@ mod test { assert_eq!(txn.get::(), Some(5)); } - #[test] - fn txn_get_returns_none_after_clear_in_same_txn() { - let store = KvStore::new(); - let mut txn = store.begin_transaction(OWNER); - txn.insert::(1); - txn.clear::(); - assert!(txn.get::().is_none()); - } - #[test] fn txn_get_arc_returns_none_when_absent() { let store = KvStore::new(); @@ -643,72 +721,6 @@ mod test { assert_eq!(txn.with::(|v| v * 2), Some(10)); } - #[test] - fn txn_insert_returns_none_on_first() { - let store = KvStore::new(); - let mut txn = store.begin_transaction(OWNER); - assert!(txn.insert::(1).is_none()); - } - - #[test] - fn txn_insert_returns_previous_value() { - let store = KvStore::new(); - let mut txn = store.begin_transaction(OWNER); - txn.insert::(1); - assert_eq!(txn.insert::(2), Some(1)); - } - - #[test] - fn txn_insert_over_tombstone_returns_none() { - let store = KvStore::new(); - let mut txn = store.begin_transaction(OWNER); - txn.insert::(1); - txn.clear::(); - assert!(txn.insert::(2).is_none()); - } - - #[test] - fn txn_mutate_returns_none_and_does_not_call_f_when_absent() { - let store = KvStore::new(); - let mut txn = store.begin_transaction(OWNER); - let mut called = false; - let result = txn.with_mut::(|_| { - called = true; - }); - assert!(result.is_none()); - assert!(!called); - } - - #[test] - fn txn_mutate_modifies_value_in_place() { - let store = KvStore::new(); - let mut txn = store.begin_transaction(OWNER); - txn.insert::(10); - assert_eq!( - txn.with_mut::(|v| { - *v += 5; - *v - }), - Some(15) - ); - assert_eq!(txn.get::(), Some(15)); - } - - #[test] - fn txn_remove_returns_none_when_absent() { - let store = KvStore::new(); - let mut txn = store.begin_transaction(OWNER); - assert!(txn.remove::().is_none()); - } - - #[test] - fn txn_remove_returns_previous_value() { - let store = KvStore::new(); - let mut txn = store.begin_transaction(OWNER); - txn.insert::(7); - assert_eq!(txn.remove::(), Some(7)); - } - #[test] fn txn_remove_makes_get_return_none() { let store = KvStore::new(); @@ -718,39 +730,6 @@ mod test { assert!(txn.get::().is_none()); } - #[test] - fn txn_clear_returns_none_when_absent() { - let store = KvStore::new(); - let mut txn = store.begin_transaction(OWNER); - assert!(txn.clear::().is_none()); - } - - #[test] - fn txn_clear_returns_previous_value() { - let store = KvStore::new(); - let mut txn = store.begin_transaction(OWNER); - txn.insert::(3); - assert_eq!(txn.clear::(), Some(3)); - } - - #[test] - fn txn_clear_makes_get_return_none() { - let store = KvStore::new(); - let mut txn = store.begin_transaction(OWNER); - txn.insert::(1); - txn.clear::(); - assert!(txn.get::().is_none()); - } - - #[test] - fn txn_double_clear_returns_none() { - let store = KvStore::new(); - let mut txn = store.begin_transaction(OWNER); - txn.insert::(1); - txn.clear::(); - assert!(txn.clear::().is_none()); - } - #[test] fn txn_writes_visible_after_drop() { let store = KvStore::new(); @@ -771,17 +750,6 @@ mod test { assert_eq!(store.table::(OWNER).get("k"), Some("v".to_owned())); } - #[test] - fn txn_mutate_visible_after_drop() { - let store = KvStore::new(); - store.insert::(OWNER, 1); - let mut txn = store.begin_transaction(OWNER); - txn.with_mut::(|v| *v = 100); - txn.commit().unwrap(); - - assert_eq!(store.get::(OWNER), Some(100)); - } - #[test] #[cfg(debug_assertions)] #[should_panic(expected = "Ownership violation")] @@ -792,16 +760,6 @@ mod test { txn.insert::(5); } - #[test] - #[cfg(debug_assertions)] - #[should_panic(expected = "Ownership violation")] - fn txn_mutate_wrong_owner_panics() { - let store = KvStore::new(); - store.insert::(OWNER, 1); - let mut txn = store.begin_transaction(OTHER); - txn.with_mut::(|_| {}); - } - #[test] #[cfg(debug_assertions)] #[should_panic(expected = "Ownership violation")] @@ -812,16 +770,6 @@ mod test { txn.remove::(); } - #[test] - #[cfg(debug_assertions)] - #[should_panic(expected = "Ownership violation")] - fn txn_clear_wrong_owner_panics() { - let store = KvStore::new(); - store.insert::(OWNER, 1); - let mut txn = store.begin_transaction(OTHER); - txn.clear::(); - } - #[test] fn txn_table_init_succeeds() { let store = KvStore::new(); @@ -848,23 +796,6 @@ mod test { assert!(table.get("missing").is_none()); } - #[test] - fn txn_table_insert_returns_none_on_first() { - let store = KvStore::new(); - let mut txn = store.begin_transaction(OWNER); - let mut table = txn.table::(); - assert!(table.insert("k", "v".to_owned()).is_none()); - } - - #[test] - fn txn_table_insert_returns_previous() { - let store = KvStore::new(); - let mut txn = store.begin_transaction(OWNER); - let mut table = txn.table::(); - table.insert("k", "v1".to_owned()); - assert_eq!(table.insert("k", "v2".to_owned()), Some("v1".to_owned())); - } - #[test] fn txn_table_get_returns_value_after_insert() { let store = KvStore::new(); @@ -915,30 +846,13 @@ mod test { assert_eq!(table.get("k"), Some("hello!".to_owned())); } - #[test] - fn txn_table_remove_returns_none_when_absent() { - let store = KvStore::new(); - let mut txn = store.begin_transaction(OWNER); - let mut table = txn.table::(); - assert!(table.remove("missing").is_none()); - } - - #[test] - fn txn_table_remove_returns_previous_value() { - let store = KvStore::new(); - let mut txn = store.begin_transaction(OWNER); - let mut table = txn.table::(); - table.insert("k", "v".to_owned()); - assert_eq!(table.remove("k"), Some("v".to_owned())); - } - #[test] fn txn_table_remove_makes_get_return_none() { let store = KvStore::new(); let mut txn = store.begin_transaction(OWNER); let mut table = txn.table::(); table.insert("k", "v".to_owned()); - table.remove("k"); + table.remove(&"k"); assert!(table.get("k").is_none()); } @@ -1111,7 +1025,7 @@ mod test { let store = KvStore::new(); store.table::(OWNER).init().unwrap(); let mut txn = store.begin_transaction(OTHER); - txn.table::().remove("k"); + txn.table::().remove(&"k"); } #[test] @@ -1329,4 +1243,616 @@ mod test { values.sort(); assert_eq!(values, vec!["alpha", "beta"]); } + + #[test] + fn commit_returns_ok() { + let store = KvStore::new(); + let mut txn = store.begin_transaction(OWNER); + txn.insert::(1); + assert!(txn.commit().is_ok()); + } + + #[test] + fn txn_table_remove_then_commit_row_absent() { + let store = KvStore::new(); + store.table::(OWNER).insert("k", "v".to_owned()); + + let mut txn = store.begin_transaction(OWNER); + txn.table::().remove(&"k"); + txn.commit().unwrap(); + + assert_eq!(store.table::(OWNER).get("k"), None); + } + + #[test] + fn txn_table_clear_then_commit_empty() { + let store = KvStore::new(); + store.table::(OWNER).insert("a", "1".to_owned()); + store.table::(OWNER).insert("b", "2".to_owned()); + + let mut txn = store.begin_transaction(OWNER); + txn.table::().clear(); + txn.commit().unwrap(); + + assert!(store.table::(OWNER).is_empty()); + } + + #[test] + fn txn_singleton_remove_then_commit_absent() { + let store = KvStore::new(); + store.insert::(OWNER, 7); + + let mut txn = store.begin_transaction(OWNER); + txn.remove::(); + txn.commit().unwrap(); + + assert_eq!(store.get::(OWNER), None); + } + + #[test] + fn txn_singleton_insert_rolled_back_on_drop() { + let store = KvStore::new(); + { + let mut txn = store.begin_transaction(OWNER); + txn.insert::(42); + // dropped here without commit + } + assert_eq!(store.get::(OWNER), None); + } + + #[test] + fn txn_singleton_insert_over_existing_rolled_back_on_drop() { + let store = KvStore::new(); + store.insert::(OWNER, 1); + { + let mut txn = store.begin_transaction(OWNER); + txn.insert::(2); + } + assert_eq!(store.get::(OWNER), Some(1)); + } + + #[test] + fn txn_singleton_remove_rolled_back_on_drop() { + let store = KvStore::new(); + store.insert::(OWNER, 7); + { + let mut txn = store.begin_transaction(OWNER); + txn.remove::(); + } + assert_eq!(store.get::(OWNER), Some(7)); + } + + #[test] + fn txn_table_insert_rolled_back_on_drop() { + let store = KvStore::new(); + { + let mut txn = store.begin_transaction(OWNER); + txn.table::().insert("k", "v".to_owned()); + } + assert_eq!(store.table::(OWNER).get("k"), None); + } + + #[test] + fn txn_table_insert_over_existing_rolled_back_on_drop() { + let store = KvStore::new(); + store.table::(OWNER).insert("k", "orig".to_owned()); + { + let mut txn = store.begin_transaction(OWNER); + txn.table::().insert("k", "new".to_owned()); + } + assert_eq!( + store.table::(OWNER).get("k"), + Some("orig".to_owned()) + ); + } + + #[test] + fn txn_table_mutate_rolled_back_on_drop() { + let store = KvStore::new(); + store.table::(OWNER).insert("k", "orig".to_owned()); + { + let mut txn = store.begin_transaction(OWNER); + txn.table::() + .with_mut(&"k", |v| *v = "new".to_owned()); + } + assert_eq!( + store.table::(OWNER).get("k"), + Some("orig".to_owned()) + ); + } + + #[test] + fn txn_table_remove_rolled_back_on_drop() { + let store = KvStore::new(); + store.table::(OWNER).insert("k", "v".to_owned()); + { + let mut txn = store.begin_transaction(OWNER); + txn.table::().remove(&"k"); + } + assert_eq!(store.table::(OWNER).get("k"), Some("v".to_owned())); + } + + #[test] + fn txn_table_clear_rolled_back_on_drop() { + let store = KvStore::new(); + store.table::(OWNER).insert("a", "1".to_owned()); + store.table::(OWNER).insert("b", "2".to_owned()); + { + let mut txn = store.begin_transaction(OWNER); + txn.table::().clear(); + } + assert_eq!(store.table::(OWNER).len(), 2); + } + + #[test] + fn txn_multiple_writes_all_visible_after_commit() { + let store = KvStore::new(); + let mut txn = store.begin_transaction(OWNER); + txn.insert::(1); + txn.table::().insert("k", "v".to_owned()); + txn.table::().insert(9u32, 99u64); + txn.commit().unwrap(); + + assert_eq!(store.get::(OWNER), Some(1)); + assert_eq!(store.table::(OWNER).get("k"), Some("v".to_owned())); + assert_eq!(store.table::(OWNER).get(&9u32), Some(99)); + } + + #[test] + fn txn_multiple_writes_none_visible_after_rollback() { + let store = KvStore::new(); + { + let mut txn = store.begin_transaction(OWNER); + txn.insert::(1); + txn.table::().insert("k", "v".to_owned()); + txn.table::().insert(9u32, 99u64); + } + assert_eq!(store.get::(OWNER), None); + assert_eq!(store.table::(OWNER).get("k"), None); + assert_eq!(store.table::(OWNER).get(&9u32), None); + } + + #[test] + fn txn_mixed_insert_and_remove_commit_consistent() { + let store = KvStore::new(); + store.table::(OWNER).insert("keep", "old".to_owned()); + store.table::(OWNER).insert("drop", "bye".to_owned()); + + let mut txn = store.begin_transaction(OWNER); + txn.table::().insert("keep", "new".to_owned()); + txn.table::().remove(&"drop"); + txn.table::().insert("add", "hi".to_owned()); + txn.commit().unwrap(); + + let table = store.table::(OWNER); + assert_eq!(table.get("keep"), Some("new".to_owned())); + assert_eq!(table.get("drop"), None); + assert_eq!(table.get("add"), Some("hi".to_owned())); + } + + #[test] + fn store_usable_after_rollback() { + let store = KvStore::new(); + { + let mut txn = store.begin_transaction(OWNER); + txn.insert::(5); + } + assert_eq!(store.get::(OWNER), None); + + // A subsequent raw operation round-trips normally. + store.insert::(OWNER, 9); + assert_eq!(store.get::(OWNER), Some(9)); + } + + #[test] + fn try_begin_transaction_succeeds_after_rollback() { + let store = KvStore::new(); + { + let mut txn = store.begin_transaction(OWNER); + txn.insert::(1); + } + // The dropped transaction released the write lock. + assert!(store.try_begin_transaction(OWNER).is_some()); + } + + #[test] + fn new_txn_after_rollback_sees_committed_value() { + let store = KvStore::new(); + store.insert::(OWNER, 1); + { + let mut txn = store.begin_transaction(OWNER); + txn.insert::(2); + } + let txn2 = store.begin_transaction(OWNER); + assert_eq!(txn2.get::(), Some(1)); + } + + #[test] + fn panic_in_transaction_rolls_back() { + let store = KvStore::new(); + store.insert::(OWNER, 1); + + std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { + let mut txn = store.begin_transaction(OWNER); + txn.insert::(99); + panic!(); + })) + .unwrap_err(); + + // The panicked transaction must have rolled back to the pre-transaction value. + assert_eq!(store.get::(OWNER), Some(1)); + } + + #[test] + fn store_recovers_after_panicked_transaction() { + let store = KvStore::new(); + + std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { + let mut txn = store.begin_transaction(OWNER); + txn.insert::(5); + panic!(); + })) + .unwrap_err(); + + // The store recovers from the poisoned lock and remains usable. + store.insert::(OWNER, 7); + assert_eq!(store.get::(OWNER), Some(7)); + + let mut txn = store.begin_transaction(OWNER); + txn.insert::(8); + txn.commit().unwrap(); + assert_eq!(store.get::(OWNER), Some(8)); + } + + #[test] + fn early_return_in_transaction_rolls_back() { + let store = KvStore::new(); + store.insert::(OWNER, 1); + + // Returns early (without committing) when `fail` is set, otherwise commits. + let run = |fail: bool| -> std::result::Result<(), ()> { + let mut txn = store.begin_transaction(OWNER); + txn.insert::(50); + if fail { + return Err(()); + } + txn.commit().unwrap(); + Ok(()) + }; + + assert!(run(true).is_err()); + assert_eq!(store.get::(OWNER), Some(1)); + + assert!(run(false).is_ok()); + assert_eq!(store.get::(OWNER), Some(50)); + } + + #[test] + fn singleton_three_commits_same_key_latest_wins() { + let store = KvStore::new(); + + for v in [10u64, 20, 30] { + let mut txn = store.begin_transaction(OWNER); + // Each transaction sees the value committed by the previous one. + txn.insert::(v); + txn.commit().unwrap(); + assert_eq!(store.get::(OWNER), Some(v)); + } + assert_eq!(store.get::(OWNER), Some(30)); + } + + #[test] + fn table_three_commits_same_key_latest_wins() { + let store = KvStore::new(); + + for v in ["one", "two", "three"] { + let mut txn = store.begin_transaction(OWNER); + txn.table::().insert("k", v.to_owned()); + txn.commit().unwrap(); + assert_eq!(store.table::(OWNER).get("k"), Some(v.to_owned())); + } + assert_eq!( + store.table::(OWNER).get("k"), + Some("three".to_owned()) + ); + } + + #[test] + fn singleton_many_commits_same_key() { + let store = KvStore::new(); + for v in 0..10u64 { + let mut txn = store.begin_transaction(OWNER); + txn.insert::(v); + txn.commit().unwrap(); + } + assert_eq!(store.get::(OWNER), Some(9)); + } + + // A new transaction begun after a rollback, with two committed versions of the key already + // live in the two slots, must still observe the last committed value. + #[test] + fn singleton_rollback_after_two_commits_keeps_committed() { + let store = KvStore::new(); + // First committed version (raw write). + store.insert::(OWNER, 1); + // Second committed version (fills the second slot). + let mut txn = store.begin_transaction(OWNER); + txn.insert::(2); + txn.commit().unwrap(); + // Third write overwrites a slot, then rolls back. + { + let mut txn = store.begin_transaction(OWNER); + txn.insert::(3); + } + assert_eq!(store.get::(OWNER), Some(2)); + // And a fresh transaction also sees the committed value, not the rolled-back one. + let txn = store.begin_transaction(OWNER); + assert_eq!(txn.get::(), Some(2)); + } + + #[test] + fn table_rollback_after_two_commits_keeps_committed() { + let store = KvStore::new(); + store.table::(OWNER).insert("k", "v1".to_owned()); + let mut txn = store.begin_transaction(OWNER); + txn.table::().insert("k", "v2".to_owned()); + txn.commit().unwrap(); + { + let mut txn = store.begin_transaction(OWNER); + txn.table::().insert("k", "v3".to_owned()); + } + assert_eq!(store.table::(OWNER).get("k"), Some("v2".to_owned())); + let mut txn = store.begin_transaction(OWNER); + assert_eq!(txn.table::().get("k"), Some("v2".to_owned())); + } + + #[test] + fn raw_insert_then_txn_overwrite_commit_visible_raw() { + let store = KvStore::new(); + store.insert::(OWNER, 1); + let mut txn = store.begin_transaction(OWNER); + txn.insert::(2); + txn.commit().unwrap(); + assert_eq!(store.get::(OWNER), Some(2)); + } + + #[test] + fn raw_insert_then_txn_overwrite_rollback_keeps_raw() { + let store = KvStore::new(); + store.insert::(OWNER, 1); + { + let mut txn = store.begin_transaction(OWNER); + txn.insert::(2); + } + assert_eq!(store.get::(OWNER), Some(1)); + } + + #[test] + fn txn_commit_then_raw_overwrite_then_txn_read() { + let store = KvStore::new(); + let mut txn = store.begin_transaction(OWNER); + txn.insert::(1); + txn.commit().unwrap(); + + // A raw write at the committed id overwrites the committed value... + store.insert::(OWNER, 2); + assert_eq!(store.get::(OWNER), Some(2)); + + // ...and the next transaction observes it. + let txn = store.begin_transaction(OWNER); + assert_eq!(txn.get::(), Some(2)); + } + + #[test] + fn raw_table_remove_then_txn_reinsert_commit() { + let store = KvStore::new(); + store.table::(OWNER).insert("k", "v".to_owned()); + store.table::(OWNER).remove(&"k"); + assert_eq!(store.table::(OWNER).get("k"), None); + + let mut txn = store.begin_transaction(OWNER); + txn.table::().insert("k", "again".to_owned()); + txn.commit().unwrap(); + assert_eq!( + store.table::(OWNER).get("k"), + Some("again".to_owned()) + ); + } + + #[test] + fn interleave_raw_and_txn_different_keys() { + let store = KvStore::new(); + store.table::(OWNER).insert("a", "A".to_owned()); + + let mut txn = store.begin_transaction(OWNER); + txn.table::().insert("b", "B".to_owned()); + txn.commit().unwrap(); + assert_eq!(store.table::(OWNER).get("a"), Some("A".to_owned())); + assert_eq!(store.table::(OWNER).get("b"), Some("B".to_owned())); + + // A rolled-back transaction touching both keys leaves them as they were. + { + let mut txn = store.begin_transaction(OWNER); + txn.table::().remove(&"a"); + txn.table::().insert("c", "C".to_owned()); + } + assert_eq!(store.table::(OWNER).get("a"), Some("A".to_owned())); + assert_eq!(store.table::(OWNER).get("c"), None); + } + + #[test] + fn txn_len_unchanged_after_removing_absent_key() { + let store = KvStore::new(); + store.table::(OWNER).insert("a", "1".to_owned()); + store.table::(OWNER).insert("b", "2".to_owned()); + + let mut txn = store.begin_transaction(OWNER); + txn.table::().remove(&"zzz"); + assert_eq!(txn.table::().len(), 2); + } + + #[test] + fn txn_is_empty_false_after_removing_absent_key() { + let store = KvStore::new(); + store.table::(OWNER).insert("a", "1".to_owned()); + + let mut txn = store.begin_transaction(OWNER); + txn.table::().remove(&"zzz"); + assert!(!txn.table::().is_empty()); + } + + #[test] + fn txn_len_after_removing_several_absent_keys() { + let store = KvStore::new(); + store.table::(OWNER).insert("a", "1".to_owned()); + + let mut txn = store.begin_transaction(OWNER); + txn.table::().remove(&"x"); + txn.table::().remove(&"y"); + txn.table::().remove(&"z"); + assert_eq!(txn.table::().len(), 1); + } + + #[test] + fn txn_len_reflects_mixed_insert_remove() { + let store = KvStore::new(); + store.table::(OWNER).insert("a", "1".to_owned()); + store.table::(OWNER).insert("b", "2".to_owned()); + store.table::(OWNER).insert("c", "3".to_owned()); + + let mut txn = store.begin_transaction(OWNER); + txn.table::().remove(&"b"); + txn.table::().insert("d", "4".to_owned()); + assert_eq!(txn.table::().len(), 3); + } + + #[test] + fn txn_iter_reflects_committed_plus_pending_minus_removed() { + let store = KvStore::new(); + store.table::(OWNER).insert("a", "1".to_owned()); + store.table::(OWNER).insert("b", "2".to_owned()); + store.table::(OWNER).insert("c", "3".to_owned()); + + let mut txn = store.begin_transaction(OWNER); + txn.table::().remove(&"b"); + txn.table::().insert("d", "4".to_owned()); + + let table = txn.table::(); + let mut keys: Vec<_> = table.keys().copied().collect(); + keys.sort(); + assert_eq!(keys, vec!["a", "c", "d"]); + } + + #[test] + fn txn_clear_then_insert_len_and_iter() { + let store = KvStore::new(); + store.table::(OWNER).insert("a", "1".to_owned()); + store.table::(OWNER).insert("b", "2".to_owned()); + + let mut txn = store.begin_transaction(OWNER); + txn.table::().clear(); + txn.table::().insert("x", "9".to_owned()); + + let table = txn.table::(); + assert_eq!(table.len(), 1); + let keys: Vec<_> = table.keys().copied().collect(); + assert_eq!(keys, vec!["x"]); + } + + #[test] + fn txn_is_empty_after_clear() { + let store = KvStore::new(); + store.table::(OWNER).insert("a", "1".to_owned()); + store.table::(OWNER).insert("b", "2".to_owned()); + + let mut txn = store.begin_transaction(OWNER); + txn.table::().clear(); + assert!(txn.table::().is_empty()); + assert_eq!(txn.table::().len(), 0); + } + + #[test] + fn txn_mutate_committed_value_commit_persists() { + let store = KvStore::new(); + store.table::(OWNER).insert("k", "orig".to_owned()); + + let mut txn = store.begin_transaction(OWNER); + txn.table::().with_mut(&"k", |v| v.push('!')); + txn.commit().unwrap(); + + assert_eq!( + store.table::(OWNER).get("k"), + Some("orig!".to_owned()) + ); + } + + #[test] + fn txn_for_each_mut_committed_values_commit_persists() { + let store = KvStore::new(); + store.table::(OWNER).insert("a", "x".to_owned()); + store.table::(OWNER).insert("b", "y".to_owned()); + + let mut txn = store.begin_transaction(OWNER); + txn.table::().for_each_mut(|_, v| v.push('!')); + txn.commit().unwrap(); + + assert_eq!(store.table::(OWNER).get("a"), Some("x!".to_owned())); + assert_eq!(store.table::(OWNER).get("b"), Some("y!".to_owned())); + } + + #[test] + fn ro_txn_after_commit_sees_committed() { + let store = KvStore::new(); + let mut txn = store.begin_transaction(OWNER); + txn.insert::(7); + txn.commit().unwrap(); + + let txn = store.begin_ro_transaction(OWNER); + assert_eq!(txn.get::(), Some(7)); + } + + #[test] + fn ro_txn_after_rollback_sees_pre_txn() { + let store = KvStore::new(); + store.insert::(OWNER, 1); + { + let mut txn = store.begin_transaction(OWNER); + txn.insert::(2); + } + let txn = store.begin_ro_transaction(OWNER); + assert_eq!(txn.get::(), Some(1)); + } + + #[test] + fn ro_txn_after_multiple_commits_sees_latest() { + let store = KvStore::new(); + for v in [10u64, 20, 30] { + let mut txn = store.begin_transaction(OWNER); + txn.insert::(v); + txn.commit().unwrap(); + } + let txn = store.begin_ro_transaction(OWNER); + assert_eq!(txn.get::(), Some(30)); + } + + #[test] + fn try_begin_transaction_none_while_rw_txn_active() { + let store = KvStore::new(); + let _txn = store.begin_transaction(OWNER); + assert!(store.try_begin_transaction(OWNER).is_none()); + } + + #[test] + fn try_begin_ro_transaction_none_while_rw_txn_active() { + let store = KvStore::new(); + let _txn = store.begin_transaction(OWNER); + assert!(store.try_begin_ro_transaction(OWNER).is_none()); + } + + #[test] + fn try_begin_transaction_none_while_ro_txn_active() { + let store = KvStore::new(); + let _txn = store.begin_ro_transaction(OWNER); + assert!(store.try_begin_transaction(OWNER).is_none()); + } }