Skip to content

Commit 33631d5

Browse files
authored
vm: add mempool (#65)
* vm: add mempool * react to new mempool Signed-off-by: Sam Batschelet <sam.batschelet@avalabs.org>
1 parent 63ad673 commit 33631d5

5 files changed

Lines changed: 356 additions & 17 deletions

File tree

mini-kvvm/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,6 @@ pub mod api;
22
pub mod block;
33
pub mod chain;
44
pub mod genesis;
5+
pub mod mempool;
56
pub mod utils;
67
pub mod vm;

mini-kvvm/src/mempool/data.rs

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
use std::{
2+
collections::{HashMap, VecDeque},
3+
io::Result,
4+
};
5+
6+
use avalanche_types::ids;
7+
8+
use crate::chain::tx::tx::Transaction;
9+
10+
/// In memory representation of mempool.
11+
#[derive(Debug)]
12+
pub struct Data {
13+
pub items: VecDeque<Entry>,
14+
pub lookup: HashMap<ids::Id, Entry>,
15+
}
16+
17+
/// Object representing a transaction stored in mempool.
18+
#[derive(Debug, Default, Clone)]
19+
pub struct Entry {
20+
pub id: ids::Id,
21+
pub tx: Option<Transaction>,
22+
pub index: usize,
23+
}
24+
25+
impl Data {
26+
pub fn new(max_size: usize) -> Self {
27+
Self {
28+
items: VecDeque::with_capacity(max_size),
29+
lookup: HashMap::new(),
30+
}
31+
}
32+
33+
pub fn len(&self) -> usize {
34+
self.items.len()
35+
}
36+
37+
pub fn is_empty(&self) -> bool {
38+
self.items.len() == 0
39+
}
40+
41+
pub fn swap(&mut self, i: usize, j: usize) {
42+
self.items.swap(i, j);
43+
self.items[i].index = i;
44+
self.items[j].index = j;
45+
}
46+
47+
pub fn push(&mut self, entry: &Entry) -> Result<()> {
48+
if self.has(&entry.id)? {
49+
return Ok(());
50+
}
51+
self.items.push_front(entry.to_owned());
52+
53+
// insert key only if it does not already exist.
54+
self.lookup.insert(entry.id, entry.to_owned());
55+
56+
Ok(())
57+
}
58+
59+
pub fn pop(&mut self) -> Result<Option<Entry>> {
60+
Ok(self.items.pop_front())
61+
}
62+
63+
pub fn pop_back(&mut self) -> Result<Option<Entry>> {
64+
Ok(self.items.pop_back())
65+
}
66+
67+
pub fn get(&self, id: &ids::Id) -> Result<Option<Entry>> {
68+
match self.lookup.get(id) {
69+
Some(v) => Ok(Some(v.to_owned())),
70+
None => Ok(None),
71+
}
72+
}
73+
74+
pub fn has(&self, id: &ids::Id) -> Result<bool> {
75+
match self.get(id) {
76+
Ok(resp) => match resp {
77+
Some(_) => Ok(true),
78+
None => Ok(false),
79+
},
80+
Err(e) => Err(e),
81+
}
82+
}
83+
}

mini-kvvm/src/mempool/mod.rs

Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
pub mod data;
2+
3+
use std::{
4+
io::Result,
5+
sync::{Arc, RwLock},
6+
};
7+
8+
use avalanche_types::ids;
9+
use tokio::sync::broadcast;
10+
11+
use crate::chain::tx::tx::Transaction;
12+
13+
use self::data::{Data, Entry};
14+
15+
pub struct Mempool {
16+
data: Arc<RwLock<Data>>,
17+
18+
/// Channel of length one, which the mempool ensures has an item on
19+
/// it as long as there is an unissued transaction remaining in [txs].
20+
pending_tx: broadcast::Sender<()>,
21+
22+
/// Vec of [Tx] that are ready to be gossiped.
23+
new_txs: Vec<Transaction>,
24+
}
25+
26+
impl Mempool {
27+
pub fn new(max_size: usize) -> Self {
28+
// initialize broadcast channel
29+
let (pending_tx, _rx): (broadcast::Sender<()>, broadcast::Receiver<()>) =
30+
tokio::sync::broadcast::channel(1);
31+
Self {
32+
data: Arc::new(RwLock::new(Data::new(max_size))),
33+
pending_tx,
34+
new_txs: Vec::new(),
35+
}
36+
}
37+
38+
/// Returns a broadcast receiver for the pending tx channel.
39+
pub fn subscribe_pending(&self) -> broadcast::Receiver<()> {
40+
self.pending_tx.subscribe()
41+
}
42+
43+
/// Returns Tx from Id if it exists.
44+
pub fn get(&self, id: &ids::Id) -> Result<Option<Transaction>> {
45+
let data = self.data.read().unwrap();
46+
if let Some(entry) = data.get(id)? {
47+
if let Some(tx) = entry.tx {
48+
return Ok(Some(tx));
49+
}
50+
}
51+
Ok(None)
52+
}
53+
54+
/// Adds a Tx Entry to mempool and writes to the pending channel.
55+
pub fn add(&mut self, tx: Transaction) -> Result<bool> {
56+
let tx_id = &tx.id;
57+
58+
let mut data = self.data.write().unwrap();
59+
if data.has(tx_id)? {
60+
return Ok(false);
61+
}
62+
let old_len = data.len();
63+
64+
let entry = &Entry {
65+
id: tx_id.to_owned(),
66+
tx: Some(tx.clone()),
67+
index: old_len,
68+
};
69+
70+
// Optimistically add tx to mempool
71+
data.push(entry)?;
72+
73+
self.new_txs.push(tx);
74+
75+
self.add_pending();
76+
77+
Ok(true)
78+
}
79+
80+
/// Return
81+
pub fn pop_back(&self) -> Option<Transaction> {
82+
let mut data = self.data.write().unwrap();
83+
match data.items.pop_back() {
84+
Some(entry) => entry.tx,
85+
None => None,
86+
}
87+
}
88+
89+
/// Returns len of mempool data.
90+
pub fn len(&self) -> usize {
91+
let data = self.data.read().unwrap();
92+
data.len()
93+
}
94+
95+
pub fn is_empty(&self) -> bool {
96+
let data = self.data.read().unwrap();
97+
data.is_empty()
98+
}
99+
100+
/// Returns the vec of transactions ready to gossip and replaces it with an empty vec.
101+
pub fn new_txs(&mut self) -> Result<Vec<Transaction>> {
102+
let data = self.data.read().unwrap();
103+
104+
let mut selected: Vec<Transaction> = Vec::new();
105+
106+
// It is possible that a block may have been accepted that contains some
107+
// new transactions before [new_txs] is called.
108+
for tx in self.new_txs.iter() {
109+
if data.has(&tx.id)? {
110+
continue;
111+
}
112+
selected.push(tx.to_owned())
113+
}
114+
self.new_txs = Vec::new();
115+
116+
Ok(selected)
117+
}
118+
119+
/// Prunes any Ids not included in valid hashes set.
120+
pub fn prune(&self, valid_hashes: ids::Set) {
121+
let mut to_remove: Vec<ids::Id> = Vec::with_capacity(valid_hashes.len());
122+
123+
let data = self.data.write().unwrap();
124+
125+
for entry in data.items.iter() {
126+
if let Some(tx) = &entry.tx {
127+
if !valid_hashes.contains(&tx.id) {
128+
to_remove.push(entry.id);
129+
}
130+
}
131+
}
132+
// drop write lock
133+
drop(data);
134+
135+
for id in to_remove.iter() {
136+
log::debug!("attempting to prune id: {}", id);
137+
if self.remove(id.to_owned()).is_some() {
138+
log::debug!("id deleted: {}", id);
139+
} else {
140+
log::debug!("failed to delete id: {}: not found", id);
141+
}
142+
}
143+
}
144+
145+
/// Removes Tx entry from mempool data if it exists.
146+
pub fn remove(&self, id: ids::Id) -> Option<Transaction> {
147+
let mut data = self.data.write().unwrap();
148+
149+
// TODO: try to optimize.
150+
// find the position of the entry in vec and remove
151+
match data.items.iter().position(|e| e.id == id) {
152+
Some(index) => {
153+
data.items.remove(index);
154+
}
155+
None => return None,
156+
}
157+
158+
// remove entry from lookup
159+
match data.lookup.remove(&id) {
160+
Some(entry) => entry.tx,
161+
None => {
162+
// should not happen
163+
log::error!("failed to remove id: {}: mempool is out of balance", id);
164+
None
165+
}
166+
}
167+
}
168+
169+
fn add_pending(&self) {
170+
self.pending_tx.send(()).unwrap();
171+
}
172+
}
173+
174+
#[tokio::test]
175+
async fn test_mempool() {
176+
use crate::chain::tx::{tx::TransactionType, unsigned};
177+
178+
// init mempool
179+
let mut mempool = Mempool::new(10);
180+
let mut pending_rx = mempool.subscribe_pending();
181+
182+
// create tx_1
183+
let tx_data_1 = unsigned::TransactionData {
184+
typ: TransactionType::Bucket,
185+
bucket: "foo".to_string(),
186+
key: "".to_string(),
187+
value: vec![],
188+
};
189+
let resp = tx_data_1.decode();
190+
assert!(resp.is_ok());
191+
let utx_1 = resp.unwrap();
192+
let tx_1 = Transaction::new(utx_1);
193+
194+
// add tx_1 to mempool
195+
let tx_1_id = tx_1.id;
196+
assert_eq!(mempool.add(tx_1).unwrap(), true);
197+
// drain channel
198+
let resp = pending_rx.recv().await;
199+
assert!(resp.is_ok());
200+
assert_eq!(mempool.len(), 1);
201+
202+
// add tx_1 as valid
203+
let mut valid_txs = ids::new_set(2);
204+
valid_txs.insert(tx_1_id);
205+
206+
// create tx_2
207+
let tx_data_2 = unsigned::TransactionData {
208+
typ: TransactionType::Bucket,
209+
bucket: "bar".to_string(),
210+
key: "".to_string(),
211+
value: vec![],
212+
};
213+
let resp = tx_data_2.decode();
214+
assert!(resp.is_ok());
215+
let utx_2 = resp.unwrap();
216+
let mut tx_2 = Transaction::new(utx_2);
217+
tx_2.id = ids::Id::from_slice("sup".as_bytes());
218+
219+
// add tx_2 to mempool
220+
assert_eq!(mempool.add(tx_2).unwrap(), true);
221+
assert_eq!(mempool.len(), 2);
222+
223+
// drain channel
224+
let resp = pending_rx.recv().await;
225+
assert!(resp.is_ok());
226+
227+
// prune tx_2 as invalid
228+
mempool.prune(valid_txs);
229+
230+
// verify one tx entry removed
231+
assert_eq!(mempool.len(), 1);
232+
233+
// verify tx_1 exists
234+
let resp = mempool.get(&tx_1_id);
235+
assert!(resp.is_ok());
236+
237+
assert_eq!(resp.unwrap().unwrap().id, tx_1_id);
238+
}

0 commit comments

Comments
 (0)