Skip to content

Commit d828e87

Browse files
committed
*: react to new mempool
Signed-off-by: Sam Batschelet <[email protected]>
1 parent 2894da0 commit d828e87

File tree

3 files changed

+67
-19
lines changed

3 files changed

+67
-19
lines changed

mini-kvvm/src/mempool/mod.rs

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,12 @@ use self::data::{Data, Entry};
1414

1515
pub struct Mempool {
1616
data: Arc<RwLock<Data>>,
17+
18+
/// Channel of length one, which the mempool ensures has an item on
19+
/// it as long as there is an unissued transaction remaining in [txs].
1720
pending_tx: broadcast::Sender<()>,
21+
22+
/// Vec of [Tx] that are ready to be gossiped.
1823
new_txs: Vec<Transaction>,
1924
}
2025

@@ -25,8 +30,6 @@ impl Mempool {
2530
tokio::sync::broadcast::channel(1);
2631
Self {
2732
data: Arc::new(RwLock::new(Data::new(max_size))),
28-
/// Channel of length one, which the mempool ensures has an item on
29-
/// it as long as there is an unissued transaction remaining in [txs].
3033
pending_tx,
3134
new_txs: Vec::new(),
3235
}
@@ -74,6 +77,15 @@ impl Mempool {
7477
Ok(true)
7578
}
7679

80+
/// Return
81+
pub fn pop_back(&self) -> Option<Transaction> {
82+
let mut data = self.data.write().unwrap();
83+
match data.items.pop_back() {
84+
Some(entry) => entry.tx,
85+
None => None,
86+
}
87+
}
88+
7789
/// Returns len of mempool data.
7890
pub fn len(&self) -> usize {
7991
let data = self.data.read().unwrap();
@@ -85,6 +97,25 @@ impl Mempool {
8597
data.is_empty()
8698
}
8799

100+
/// Returns the vec of transactions ready to gossip and replaces it with an empty vec.
101+
pub fn new_txs(&mut self) -> Result<Vec<Transaction>> {
102+
let data = self.data.read().unwrap();
103+
104+
let mut selected: Vec<Transaction> = Vec::new();
105+
106+
// It is possible that a block may have been accepted that contains some
107+
// new transactions before [new_txs] is called.
108+
for tx in self.new_txs.iter() {
109+
if data.has(&tx.id)? {
110+
continue;
111+
}
112+
selected.push(tx.to_owned())
113+
}
114+
self.new_txs = Vec::new();
115+
116+
Ok(selected)
117+
}
118+
88119
/// Prunes any Ids not included in valid hashes set.
89120
pub fn prune(&self, valid_hashes: ids::Set) {
90121
let mut to_remove: Vec<ids::Id> = Vec::with_capacity(valid_hashes.len());

mini-kvvm/src/vm.rs

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use std::{
2-
collections::{HashMap, VecDeque},
2+
collections::HashMap,
33
io::{Error, ErrorKind, Result},
44
sync::Arc,
55
time,
@@ -23,11 +23,15 @@ use crate::{
2323
block::{self, state::State},
2424
chain::{self, tx::Transaction, vm::Vm},
2525
genesis::Genesis,
26+
mempool,
2627
};
2728

2829
const PUBLIC_API_ENDPOINT: &str = "/public";
2930
const VERSION: &str = env!("CARGO_PKG_VERSION");
3031

32+
// TODO: make configurable
33+
const MEMPOOL_SIZE: usize = 1024;
34+
3135
pub struct ChainVmInterior {
3236
pub ctx: Option<rpcchainvm::context::Context>,
3337
pub bootstrapped: bool,
@@ -59,29 +63,29 @@ impl Default for ChainVmInterior {
5963
#[derive(Clone)]
6064
pub struct ChainVm {
6165
pub db: Box<dyn rpcchainvm::database::Database + Sync + Send>,
62-
pub mempool: Arc<RwLock<VecDeque<chain::tx::tx::Transaction>>>,
66+
pub mempool: Arc<RwLock<mempool::Mempool>>,
6367
pub inner: Arc<RwLock<ChainVmInterior>>,
64-
pub verified_blocks: Arc<RwLock<HashMap<ids::Id, crate::block::Block>>>,
68+
pub verified_blocks: Arc<RwLock<HashMap<ids::Id, block::Block>>>,
6569
}
6670

6771
impl ChainVm {
6872
/// Returns initialized ChainVm Boxed as rpcchainvm::vm::Vm trait.
6973
pub fn new() -> Box<dyn rpcchainvm::vm::Vm + Send + Sync> {
7074
let inner = Arc::new(RwLock::new(ChainVmInterior::default()));
7175
let db = rpcchainvm::database::memdb::Database::new();
72-
let mempool = Arc::new(RwLock::new(VecDeque::new()));
76+
let mempool = mempool::Mempool::new(MEMPOOL_SIZE);
7377
let verified_blocks = Arc::new(RwLock::new(HashMap::new()));
7478

7579
Box::new(ChainVm {
7680
db,
7781
inner,
78-
mempool,
82+
mempool: Arc::new(RwLock::new(mempool)),
7983
verified_blocks,
8084
})
8185
}
8286

8387
pub fn new_with_state(db: &Box<dyn rpcchainvm::database::Database + Sync + Send>) -> Self {
84-
let mempool = Arc::new(RwLock::new(VecDeque::new()));
88+
let mempool = mempool::Mempool::new(MEMPOOL_SIZE);
8589
let verified_blocks = &Arc::new(RwLock::new(HashMap::new()));
8690
let inner = ChainVmInterior {
8791
ctx: None,
@@ -97,7 +101,7 @@ impl ChainVm {
97101
Self {
98102
db: db.clone(),
99103
inner: Arc::new(RwLock::new(inner)),
100-
mempool,
104+
mempool: Arc::new(RwLock::new(mempool)),
101105
verified_blocks: Arc::clone(verified_blocks),
102106
}
103107
}
@@ -112,7 +116,7 @@ impl crate::chain::vm::Vm for ChainVm {
112116
return vm.bootstrapped;
113117
}
114118

115-
async fn submit(&self, mut txs: Vec<crate::chain::tx::tx::Transaction>) -> Result<()> {
119+
async fn submit(&self, mut txs: Vec<chain::tx::tx::Transaction>) -> Result<()> {
116120
let now = Utc::now().timestamp() as u64;
117121

118122
// TODO append errors instead of fail
@@ -128,8 +132,11 @@ impl crate::chain::vm::Vm for ChainVm {
128132
tx.execute(self.db.clone(), dummy_block)
129133
.await
130134
.map_err(|e| Error::new(ErrorKind::Other, e.to_string()))?;
135+
131136
let mut mempool = self.mempool.write().await;
132-
mempool.push_front(tx.to_owned());
137+
let _ = mempool
138+
.add(tx.to_owned())
139+
.map_err(|e| Error::new(ErrorKind::Other, e.to_string()))?;
133140
}
134141
Ok(())
135142
}
@@ -429,7 +436,7 @@ impl rpcchainvm::snowman::block::ChainVm for ChainVm {
429436
async fn build_block(
430437
&self,
431438
) -> Result<Box<dyn rpcchainvm::concensus::snowman::Block + Send + Sync>> {
432-
let mut mempool = self.mempool.write().await;
439+
let mempool = self.mempool.read().await;
433440
if mempool.len() == 0 {
434441
return Err(Error::new(ErrorKind::Other, "no pending blocks"));
435442
}

mini-kvvm/tests/api/mod.rs

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,23 @@ async fn service_test() {
1515
let db = MemDb::new();
1616
let vm = vm::ChainVm::new_with_state(&db);
1717

18-
{
19-
// initialize genesis block
20-
let mut inner = vm.inner.write().await;
21-
let resp = create_genesis_block(&inner.state.clone(), vec![]).await;
22-
assert!(resp.is_ok());
23-
inner.preferred = resp.unwrap();
24-
}
18+
// get a broadcast tx pending receiver for new blocks;
19+
let mempool = vm.mempool.read().await;
20+
let mut pending_rx = mempool.subscribe_pending();
21+
drop(mempool);
22+
// unblock channel
23+
tokio::spawn(async move {
24+
loop {
25+
pending_rx.recv().await.unwrap();
26+
}
27+
});
28+
29+
// initialize genesis block
30+
let mut inner = vm.inner.write().await;
31+
let resp = create_genesis_block(&inner.state.clone(), vec![]).await;
32+
assert!(resp.is_ok());
33+
inner.preferred = resp.unwrap();
34+
drop(inner);
2535

2636
let service = api::service::Service::new(vm);
2737
let mut io = jsonrpc_core::IoHandler::new();

0 commit comments

Comments
 (0)