Skip to content

Commit 94dae26

Browse files
committed
vm: add mempool
Signed-off-by: Sam Batschelet <[email protected]>
1 parent 63ad673 commit 94dae26

File tree

3 files changed

+296
-0
lines changed

3 files changed

+296
-0
lines changed

mini-kvvm/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,6 @@ pub mod api;
22
pub mod block;
33
pub mod chain;
44
pub mod genesis;
5+
pub mod mempool;
56
pub mod utils;
67
pub mod vm;

mini-kvvm/src/mempool/data.rs

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
use std::{
2+
collections::{HashMap, VecDeque},
3+
io::Result,
4+
};
5+
6+
use avalanche_types::ids;
7+
8+
use crate::chain::tx::tx::Transaction;
9+
10+
#[derive(Debug)]
11+
/// Memory representation of mempool
12+
pub struct Data {
13+
pub items: VecDeque<Entry>,
14+
pub lookup: HashMap<ids::Id, Entry>,
15+
}
16+
17+
#[derive(Debug, Default, Clone)]
18+
pub struct Entry {
19+
pub id: ids::Id,
20+
pub tx: Option<Transaction>,
21+
pub index: usize,
22+
}
23+
24+
impl Data {
25+
pub fn new(max_size: usize) -> Self {
26+
Self {
27+
items: VecDeque::with_capacity(max_size),
28+
lookup: HashMap::new(),
29+
}
30+
}
31+
32+
pub fn len(&self) -> usize {
33+
self.items.len()
34+
}
35+
36+
37+
pub fn is_empty(&self) -> bool {
38+
self.items.len() == 0
39+
}
40+
41+
pub fn less(&self, _i: usize, _j: usize) -> bool {
42+
// return self.items[i].price > self.items[j].price;
43+
false
44+
}
45+
46+
pub fn swap(&mut self, i: usize, j: usize) {
47+
self.items.swap(i, j);
48+
self.items[i].index = i;
49+
self.items[j].index = j;
50+
}
51+
52+
pub fn push(&mut self, entry: &Entry) -> Result<()> {
53+
if self.has(&entry.id)? {
54+
return Ok(());
55+
}
56+
self.items.push_front(entry.to_owned());
57+
58+
// insert key only if it does not already exist.
59+
self.lookup.insert(entry.id, entry.to_owned());
60+
61+
Ok(())
62+
}
63+
64+
pub fn pop(&mut self) -> Result<Option<Entry>> {
65+
Ok(self.items.pop_front())
66+
}
67+
68+
pub fn pop_back(&mut self) -> Result<Option<Entry>> {
69+
Ok(self.items.pop_back())
70+
}
71+
72+
pub fn get(&self, id: &ids::Id) -> Result<Option<Entry>> {
73+
match self.lookup.get(id){
74+
Some(v) => Ok(Some(v.to_owned())),
75+
None => Ok(None),
76+
}
77+
}
78+
79+
pub fn has(&self, id: &ids::Id) -> Result<bool> {
80+
match self.get(id) {
81+
Ok(resp) => match resp {
82+
Some(_) => Ok(true),
83+
None => Ok(false),
84+
},
85+
Err(e) => Err(e),
86+
}
87+
}
88+
}

mini-kvvm/src/mempool/mod.rs

Lines changed: 207 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,207 @@
1+
pub mod data;
2+
3+
use std::{
4+
io::Result,
5+
sync::{Arc, RwLock},
6+
};
7+
8+
use avalanche_types::ids;
9+
use tokio::sync::broadcast;
10+
11+
use crate::chain::tx::tx::Transaction;
12+
13+
use self::data::{Data, Entry};
14+
15+
pub struct Mempool {
16+
data: Arc<RwLock<Data>>,
17+
pending_tx: broadcast::Sender<()>,
18+
new_txs: Vec<Transaction>,
19+
}
20+
21+
impl Mempool {
22+
pub fn new(max_size: usize) -> Self {
23+
// initialize broadcast channel
24+
let (pending_tx, _rx): (broadcast::Sender<()>, broadcast::Receiver<()>) =
25+
tokio::sync::broadcast::channel(1);
26+
Self {
27+
data: Arc::new(RwLock::new(Data::new(max_size))),
28+
/// Channel of length one, which the mempool ensures has an item on
29+
/// it as long as there is an unissued transaction remaining in [txs].
30+
pending_tx,
31+
new_txs: Vec::new(),
32+
}
33+
}
34+
35+
/// Returns a broadcast receiver for the pending tx channel.
36+
pub fn subscribe_pending(&self) -> broadcast::Receiver<()> {
37+
self.pending_tx.subscribe()
38+
}
39+
40+
/// Returns Tx from Id if it exists.
41+
pub fn get(&self, id: &ids::Id) -> Result<Option<Transaction>> {
42+
let data = self.data.read().unwrap();
43+
if let Some(entry) = data.get(id)? {
44+
if let Some(tx) = entry.tx {
45+
return Ok(Some(tx));
46+
}
47+
}
48+
Ok(None)
49+
}
50+
51+
/// Adds a Tx Entry to mempool and writes to the pending channel.
52+
pub fn add(&mut self, tx: Transaction) -> Result<bool> {
53+
let tx_id = &tx.id;
54+
55+
let mut data = self.data.write().unwrap();
56+
if data.has(tx_id)? {
57+
return Ok(false);
58+
}
59+
let old_len = data.len();
60+
61+
let entry = &Entry {
62+
id: tx_id.to_owned(),
63+
tx: Some(tx.clone()),
64+
index: old_len,
65+
};
66+
67+
// Optimistically add tx to mempool
68+
data.push(entry)?;
69+
70+
self.new_txs.push(tx);
71+
72+
self.add_pending();
73+
74+
Ok(true)
75+
}
76+
77+
/// Returns len of mempool data.
78+
pub fn len(&self) -> usize {
79+
let data = self.data.read().unwrap();
80+
data.len()
81+
}
82+
83+
pub fn is_empty(&self) -> bool {
84+
let data = self.data.read().unwrap();
85+
data.is_empty()
86+
}
87+
88+
/// Prunes any Ids not included in valid hashes set.
89+
pub fn prune(&self, valid_hashes: ids::Set) {
90+
let mut to_remove: Vec<ids::Id> = Vec::with_capacity(valid_hashes.len());
91+
92+
let data = self.data.write().unwrap();
93+
94+
for entry in data.items.iter() {
95+
if let Some(tx) = &entry.tx {
96+
if !valid_hashes.contains(&tx.id) {
97+
to_remove.push(entry.id);
98+
}
99+
}
100+
}
101+
// drop write lock
102+
drop(data);
103+
104+
for id in to_remove.iter() {
105+
log::debug!("attempting to prune id: {}", id);
106+
if self.remove(id.to_owned()).is_some() {
107+
log::debug!("id deleted: {}", id);
108+
} else {
109+
log::debug!("failed to delete id: {}: not found", id);
110+
}
111+
}
112+
}
113+
114+
/// Removes Tx entry from mempool data if it exists.
115+
pub fn remove(&self, id: ids::Id) -> Option<Transaction> {
116+
let mut data = self.data.write().unwrap();
117+
118+
// TODO: try to optimize.
119+
// find the position of the entry in vec and remove
120+
match data.items.iter().position(|e| e.id == id) {
121+
Some(index) => {
122+
data.items.remove(index);
123+
}
124+
None => return None,
125+
}
126+
127+
// remove entry from lookup
128+
match data.lookup.remove(&id) {
129+
Some(entry) => entry.tx,
130+
None => {
131+
// should not happen
132+
log::error!("failed to remove id: {}: mempool is out of balance", id);
133+
None
134+
}
135+
}
136+
}
137+
138+
fn add_pending(&self) {
139+
self.pending_tx.send(()).unwrap();
140+
}
141+
}
142+
143+
#[tokio::test]
144+
async fn test_mempool() {
145+
use crate::chain::tx::{tx::TransactionType, unsigned};
146+
147+
// init mempool
148+
let mut mempool = Mempool::new(10);
149+
let mut pending_rx = mempool.subscribe_pending();
150+
151+
// create tx_1
152+
let tx_data_1 = unsigned::TransactionData {
153+
typ: TransactionType::Bucket,
154+
bucket: "foo".to_string(),
155+
key: "".to_string(),
156+
value: vec![],
157+
};
158+
let resp = tx_data_1.decode();
159+
assert!(resp.is_ok());
160+
let utx_1 = resp.unwrap();
161+
let tx_1 = Transaction::new(utx_1);
162+
163+
// add tx_1 to mempool
164+
let tx_1_id = tx_1.id;
165+
assert_eq!(mempool.add(tx_1).unwrap(), true);
166+
// drain channel
167+
let resp = pending_rx.recv().await;
168+
assert!(resp.is_ok());
169+
assert_eq!(mempool.len(), 1);
170+
171+
// add tx_1 as valid
172+
let mut valid_txs = ids::new_set(2);
173+
valid_txs.insert(tx_1_id);
174+
175+
// create tx_2
176+
let tx_data_2 = unsigned::TransactionData {
177+
typ: TransactionType::Bucket,
178+
bucket: "bar".to_string(),
179+
key: "".to_string(),
180+
value: vec![],
181+
};
182+
let resp = tx_data_2.decode();
183+
assert!(resp.is_ok());
184+
let utx_2 = resp.unwrap();
185+
let mut tx_2 = Transaction::new(utx_2);
186+
tx_2.id = ids::Id::from_slice("sup".as_bytes());
187+
188+
// add tx_2 to mempool
189+
assert_eq!(mempool.add(tx_2).unwrap(), true);
190+
assert_eq!(mempool.len(), 2);
191+
192+
// drain channel
193+
let resp = pending_rx.recv().await;
194+
assert!(resp.is_ok());
195+
196+
// prune tx_2 as invalid
197+
mempool.prune(valid_txs);
198+
199+
// verify one tx entry removed
200+
assert_eq!(mempool.len(), 1);
201+
202+
// verify tx_1 exists
203+
let resp = mempool.get(&tx_1_id);
204+
assert!(resp.is_ok());
205+
206+
assert_eq!(resp.unwrap().unwrap().id, tx_1_id);
207+
}

0 commit comments

Comments
 (0)