diff --git a/mini-kvvm/src/lib.rs b/mini-kvvm/src/lib.rs index 42897b65..573a3edd 100644 --- a/mini-kvvm/src/lib.rs +++ b/mini-kvvm/src/lib.rs @@ -2,5 +2,6 @@ pub mod api; pub mod block; pub mod chain; pub mod genesis; +pub mod mempool; pub mod utils; pub mod vm; diff --git a/mini-kvvm/src/mempool/data.rs b/mini-kvvm/src/mempool/data.rs new file mode 100644 index 00000000..4c772aa9 --- /dev/null +++ b/mini-kvvm/src/mempool/data.rs @@ -0,0 +1,83 @@ +use std::{ + collections::{HashMap, VecDeque}, + io::Result, +}; + +use avalanche_types::ids; + +use crate::chain::tx::tx::Transaction; + +/// In memory representation of mempool. +#[derive(Debug)] +pub struct Data { + pub items: VecDeque, + pub lookup: HashMap, +} + +/// Object representing a transaction stored in mempool. +#[derive(Debug, Default, Clone)] +pub struct Entry { + pub id: ids::Id, + pub tx: Option, + pub index: usize, +} + +impl Data { + pub fn new(max_size: usize) -> Self { + Self { + items: VecDeque::with_capacity(max_size), + lookup: HashMap::new(), + } + } + + pub fn len(&self) -> usize { + self.items.len() + } + + pub fn is_empty(&self) -> bool { + self.items.len() == 0 + } + + pub fn swap(&mut self, i: usize, j: usize) { + self.items.swap(i, j); + self.items[i].index = i; + self.items[j].index = j; + } + + pub fn push(&mut self, entry: &Entry) -> Result<()> { + if self.has(&entry.id)? { + return Ok(()); + } + self.items.push_front(entry.to_owned()); + + // insert key only if it does not already exist. + self.lookup.insert(entry.id, entry.to_owned()); + + Ok(()) + } + + pub fn pop(&mut self) -> Result> { + Ok(self.items.pop_front()) + } + + pub fn pop_back(&mut self) -> Result> { + Ok(self.items.pop_back()) + } + + pub fn get(&self, id: &ids::Id) -> Result> { + match self.lookup.get(id) { + Some(v) => Ok(Some(v.to_owned())), + None => Ok(None), + } + } + + pub fn has(&self, id: &ids::Id) -> Result { + match self.get(id) { + Ok(resp) => match resp { + Some(_) => Ok(true), + None => Ok(false), + }, + Err(e) => Err(e), + } + } +} diff --git a/mini-kvvm/src/mempool/mod.rs b/mini-kvvm/src/mempool/mod.rs new file mode 100644 index 00000000..a7abf3fa --- /dev/null +++ b/mini-kvvm/src/mempool/mod.rs @@ -0,0 +1,238 @@ +pub mod data; + +use std::{ + io::Result, + sync::{Arc, RwLock}, +}; + +use avalanche_types::ids; +use tokio::sync::broadcast; + +use crate::chain::tx::tx::Transaction; + +use self::data::{Data, Entry}; + +pub struct Mempool { + data: Arc>, + + /// Channel of length one, which the mempool ensures has an item on + /// it as long as there is an unissued transaction remaining in [txs]. + pending_tx: broadcast::Sender<()>, + + /// Vec of [Tx] that are ready to be gossiped. + new_txs: Vec, +} + +impl Mempool { + pub fn new(max_size: usize) -> Self { + // initialize broadcast channel + let (pending_tx, _rx): (broadcast::Sender<()>, broadcast::Receiver<()>) = + tokio::sync::broadcast::channel(1); + Self { + data: Arc::new(RwLock::new(Data::new(max_size))), + pending_tx, + new_txs: Vec::new(), + } + } + + /// Returns a broadcast receiver for the pending tx channel. + pub fn subscribe_pending(&self) -> broadcast::Receiver<()> { + self.pending_tx.subscribe() + } + + /// Returns Tx from Id if it exists. + pub fn get(&self, id: &ids::Id) -> Result> { + let data = self.data.read().unwrap(); + if let Some(entry) = data.get(id)? { + if let Some(tx) = entry.tx { + return Ok(Some(tx)); + } + } + Ok(None) + } + + /// Adds a Tx Entry to mempool and writes to the pending channel. + pub fn add(&mut self, tx: Transaction) -> Result { + let tx_id = &tx.id; + + let mut data = self.data.write().unwrap(); + if data.has(tx_id)? { + return Ok(false); + } + let old_len = data.len(); + + let entry = &Entry { + id: tx_id.to_owned(), + tx: Some(tx.clone()), + index: old_len, + }; + + // Optimistically add tx to mempool + data.push(entry)?; + + self.new_txs.push(tx); + + self.add_pending(); + + Ok(true) + } + + /// Return + pub fn pop_back(&self) -> Option { + let mut data = self.data.write().unwrap(); + match data.items.pop_back() { + Some(entry) => entry.tx, + None => None, + } + } + + /// Returns len of mempool data. + pub fn len(&self) -> usize { + let data = self.data.read().unwrap(); + data.len() + } + + pub fn is_empty(&self) -> bool { + let data = self.data.read().unwrap(); + data.is_empty() + } + + /// Returns the vec of transactions ready to gossip and replaces it with an empty vec. + pub fn new_txs(&mut self) -> Result> { + let data = self.data.read().unwrap(); + + let mut selected: Vec = Vec::new(); + + // It is possible that a block may have been accepted that contains some + // new transactions before [new_txs] is called. + for tx in self.new_txs.iter() { + if data.has(&tx.id)? { + continue; + } + selected.push(tx.to_owned()) + } + self.new_txs = Vec::new(); + + Ok(selected) + } + + /// Prunes any Ids not included in valid hashes set. + pub fn prune(&self, valid_hashes: ids::Set) { + let mut to_remove: Vec = Vec::with_capacity(valid_hashes.len()); + + let data = self.data.write().unwrap(); + + for entry in data.items.iter() { + if let Some(tx) = &entry.tx { + if !valid_hashes.contains(&tx.id) { + to_remove.push(entry.id); + } + } + } + // drop write lock + drop(data); + + for id in to_remove.iter() { + log::debug!("attempting to prune id: {}", id); + if self.remove(id.to_owned()).is_some() { + log::debug!("id deleted: {}", id); + } else { + log::debug!("failed to delete id: {}: not found", id); + } + } + } + + /// Removes Tx entry from mempool data if it exists. + pub fn remove(&self, id: ids::Id) -> Option { + let mut data = self.data.write().unwrap(); + + // TODO: try to optimize. + // find the position of the entry in vec and remove + match data.items.iter().position(|e| e.id == id) { + Some(index) => { + data.items.remove(index); + } + None => return None, + } + + // remove entry from lookup + match data.lookup.remove(&id) { + Some(entry) => entry.tx, + None => { + // should not happen + log::error!("failed to remove id: {}: mempool is out of balance", id); + None + } + } + } + + fn add_pending(&self) { + self.pending_tx.send(()).unwrap(); + } +} + +#[tokio::test] +async fn test_mempool() { + use crate::chain::tx::{tx::TransactionType, unsigned}; + + // init mempool + let mut mempool = Mempool::new(10); + let mut pending_rx = mempool.subscribe_pending(); + + // create tx_1 + let tx_data_1 = unsigned::TransactionData { + typ: TransactionType::Bucket, + bucket: "foo".to_string(), + key: "".to_string(), + value: vec![], + }; + let resp = tx_data_1.decode(); + assert!(resp.is_ok()); + let utx_1 = resp.unwrap(); + let tx_1 = Transaction::new(utx_1); + + // add tx_1 to mempool + let tx_1_id = tx_1.id; + assert_eq!(mempool.add(tx_1).unwrap(), true); + // drain channel + let resp = pending_rx.recv().await; + assert!(resp.is_ok()); + assert_eq!(mempool.len(), 1); + + // add tx_1 as valid + let mut valid_txs = ids::new_set(2); + valid_txs.insert(tx_1_id); + + // create tx_2 + let tx_data_2 = unsigned::TransactionData { + typ: TransactionType::Bucket, + bucket: "bar".to_string(), + key: "".to_string(), + value: vec![], + }; + let resp = tx_data_2.decode(); + assert!(resp.is_ok()); + let utx_2 = resp.unwrap(); + let mut tx_2 = Transaction::new(utx_2); + tx_2.id = ids::Id::from_slice("sup".as_bytes()); + + // add tx_2 to mempool + assert_eq!(mempool.add(tx_2).unwrap(), true); + assert_eq!(mempool.len(), 2); + + // drain channel + let resp = pending_rx.recv().await; + assert!(resp.is_ok()); + + // prune tx_2 as invalid + mempool.prune(valid_txs); + + // verify one tx entry removed + assert_eq!(mempool.len(), 1); + + // verify tx_1 exists + let resp = mempool.get(&tx_1_id); + assert!(resp.is_ok()); + + assert_eq!(resp.unwrap().unwrap().id, tx_1_id); +} diff --git a/mini-kvvm/src/vm.rs b/mini-kvvm/src/vm.rs index 824e455a..c570a027 100644 --- a/mini-kvvm/src/vm.rs +++ b/mini-kvvm/src/vm.rs @@ -1,5 +1,5 @@ use std::{ - collections::{HashMap, VecDeque}, + collections::HashMap, io::{Error, ErrorKind, Result}, sync::Arc, time, @@ -23,11 +23,15 @@ use crate::{ block::{self, state::State}, chain::{self, tx::Transaction, vm::Vm}, genesis::Genesis, + mempool, }; const PUBLIC_API_ENDPOINT: &str = "/public"; const VERSION: &str = env!("CARGO_PKG_VERSION"); +// TODO: make configurable +const MEMPOOL_SIZE: usize = 1024; + pub struct ChainVmInterior { pub ctx: Option, pub bootstrapped: bool, @@ -59,9 +63,9 @@ impl Default for ChainVmInterior { #[derive(Clone)] pub struct ChainVm { pub db: Box, - pub mempool: Arc>>, + pub mempool: Arc>, pub inner: Arc>, - pub verified_blocks: Arc>>, + pub verified_blocks: Arc>>, } impl ChainVm { @@ -69,19 +73,19 @@ impl ChainVm { pub fn new() -> Box { let inner = Arc::new(RwLock::new(ChainVmInterior::default())); let db = rpcchainvm::database::memdb::Database::new(); - let mempool = Arc::new(RwLock::new(VecDeque::new())); + let mempool = mempool::Mempool::new(MEMPOOL_SIZE); let verified_blocks = Arc::new(RwLock::new(HashMap::new())); Box::new(ChainVm { db, inner, - mempool, + mempool: Arc::new(RwLock::new(mempool)), verified_blocks, }) } pub fn new_with_state(db: &Box) -> Self { - let mempool = Arc::new(RwLock::new(VecDeque::new())); + let mempool = mempool::Mempool::new(MEMPOOL_SIZE); let verified_blocks = &Arc::new(RwLock::new(HashMap::new())); let inner = ChainVmInterior { ctx: None, @@ -97,7 +101,7 @@ impl ChainVm { Self { db: db.clone(), inner: Arc::new(RwLock::new(inner)), - mempool, + mempool: Arc::new(RwLock::new(mempool)), verified_blocks: Arc::clone(verified_blocks), } } @@ -112,7 +116,7 @@ impl crate::chain::vm::Vm for ChainVm { return vm.bootstrapped; } - async fn submit(&self, mut txs: Vec) -> Result<()> { + async fn submit(&self, mut txs: Vec) -> Result<()> { let now = Utc::now().timestamp() as u64; // TODO append errors instead of fail @@ -128,8 +132,11 @@ impl crate::chain::vm::Vm for ChainVm { tx.execute(self.db.clone(), dummy_block) .await .map_err(|e| Error::new(ErrorKind::Other, e.to_string()))?; + let mut mempool = self.mempool.write().await; - mempool.push_front(tx.to_owned()); + let _ = mempool + .add(tx.to_owned()) + .map_err(|e| Error::new(ErrorKind::Other, e.to_string()))?; } Ok(()) } @@ -429,7 +436,7 @@ impl rpcchainvm::snowman::block::ChainVm for ChainVm { async fn build_block( &self, ) -> Result> { - let mut mempool = self.mempool.write().await; + let mempool = self.mempool.read().await; if mempool.len() == 0 { return Err(Error::new(ErrorKind::Other, "no pending blocks")); } diff --git a/mini-kvvm/tests/api/mod.rs b/mini-kvvm/tests/api/mod.rs index 0d4553c9..e2d28364 100644 --- a/mini-kvvm/tests/api/mod.rs +++ b/mini-kvvm/tests/api/mod.rs @@ -15,13 +15,23 @@ async fn service_test() { let db = MemDb::new(); let vm = vm::ChainVm::new_with_state(&db); - { - // initialize genesis block - let mut inner = vm.inner.write().await; - let resp = create_genesis_block(&inner.state.clone(), vec![]).await; - assert!(resp.is_ok()); - inner.preferred = resp.unwrap(); - } + // get a broadcast tx pending receiver for new blocks; + let mempool = vm.mempool.read().await; + let mut pending_rx = mempool.subscribe_pending(); + drop(mempool); + // unblock channel + tokio::spawn(async move { + loop { + pending_rx.recv().await.unwrap(); + } + }); + + // initialize genesis block + let mut inner = vm.inner.write().await; + let resp = create_genesis_block(&inner.state.clone(), vec![]).await; + assert!(resp.is_ok()); + inner.preferred = resp.unwrap(); + drop(inner); let service = api::service::Service::new(vm); let mut io = jsonrpc_core::IoHandler::new();