Skip to content

Commit ab72912

Browse files
committed
vm: add network
Signed-off-by: Sam Batschelet <[email protected]>
1 parent 25ac8d9 commit ab72912

File tree

5 files changed

+151
-11
lines changed

5 files changed

+151
-11
lines changed

mini-kvvm/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ path = "src/bin/mini-kvvm/main.rs"
1515

1616
[dependencies]
1717
avalanche-proto = { version = "0.16.0" }
18-
avalanche-types = { version = "0.0.34" }
18+
avalanche-types = { version = "0.0.38" }
1919
byteorder = "1.4.3"
2020
chan = "0.1.23"
2121
chrono = "0.4.19"

mini-kvvm/src/block/builder.rs

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,11 @@ use std::{
77
time::{Duration, Instant},
88
};
99

10-
use chan::chan_select;
1110
use avalanche_types::rpcchainvm;
12-
use tokio::sync::RwLock;
11+
use chan::chan_select;
12+
use tokio::sync::{RwLock};
13+
use crossbeam_channel::TryRecvError;
14+
1315

1416
use crate::vm;
1517

@@ -276,17 +278,19 @@ impl Timed {
276278
let regossip = chan::tick(Duration::from_millis(100));
277279
let stop_ch = self.stop.clone();
278280

279-
loop {
280-
281+
while stop_ch.try_recv() == Err(TryRecvError::Empty) {
281282
chan_select! {
282283
gossip.recv() => {
283-
let mut mempool = self.vm.mempool.write().await;
284-
let new_txs = mempool.new_txs();
285-
drop(mempool)
284+
let mempool = &mut self.vm.mempool.write().await;
285+
let new_txs = mempool.new_txs().unwrap();
286+
drop(mempool);
286287

288+
let mut network = self.vm.network.as_ref().unwrap().write().await;
289+
let _ = network.gossip_new_txs(new_txs).await;
287290
},
288291
regossip.recv() => {
289-
292+
let mut network = self.vm.network.as_ref().unwrap().write().await;
293+
let _ = network.regossip_txs().await;
290294
},
291295
}
292296
}

mini-kvvm/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,6 @@ pub mod block;
33
pub mod chain;
44
pub mod genesis;
55
pub mod mempool;
6+
pub mod network;
67
pub mod utils;
78
pub mod vm;

mini-kvvm/src/network.rs

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
use std::{
2+
io::{Error, ErrorKind, Result},
3+
num::NonZeroUsize,
4+
};
5+
6+
use avalanche_types::ids::{self, Id};
7+
use lru::LruCache;
8+
9+
use crate::{
10+
chain::{self, vm::Vm},
11+
vm,
12+
};
13+
14+
const GOSSIPED_TXS_LRU_SIZE: usize = 512;
15+
16+
pub struct Push {
17+
vm: vm::ChainVm,
18+
gossiped_tx: LruCache<Id, ()>,
19+
}
20+
21+
impl Push {
22+
pub fn new(vm: vm::ChainVm) -> Self {
23+
let cache: LruCache<Id, ()> =
24+
LruCache::new(NonZeroUsize::new(GOSSIPED_TXS_LRU_SIZE).unwrap());
25+
Self {
26+
gossiped_tx: cache,
27+
vm,
28+
}
29+
}
30+
31+
pub async fn send_txs(&self, txs: Vec<chain::tx::tx::Transaction>) -> Result<()> {
32+
if txs.is_empty() {
33+
return Ok(());
34+
}
35+
36+
let b = serde_json::to_vec(&txs).map_err(|e| {
37+
Error::new(
38+
ErrorKind::Other,
39+
format!("failed to marshal txs: {}", e.to_string()),
40+
)
41+
})?;
42+
43+
log::debug!("sending app gossip txs: {} size: {}", txs.len(), b.len());
44+
45+
let appsender = self
46+
.vm
47+
.app_sender
48+
.clone()
49+
.expect("appsender should exist after initialize");
50+
appsender.send_app_gossip(b).await.map_err(|e| {
51+
Error::new(
52+
ErrorKind::Other,
53+
format!("gossip txs failed: {}", e.to_string()),
54+
)
55+
})?;
56+
Ok(())
57+
}
58+
59+
pub async fn gossip_new_txs(&mut self, new_txs: Vec<chain::tx::tx::Transaction>) -> Result<()> {
60+
let mut txs: Vec<chain::tx::tx::Transaction> = Vec::with_capacity(new_txs.len());
61+
62+
for tx in new_txs.iter() {
63+
if self.gossiped_tx.contains(&tx.id) {
64+
log::debug!("already gossiped skipping id: {}", tx.id);
65+
continue;
66+
}
67+
68+
self.gossiped_tx.put(tx.id, ());
69+
70+
txs.push(tx.to_owned());
71+
}
72+
73+
Ok(())
74+
}
75+
76+
/// Triggers "AppGossip" on the pending transactions in the mempool.
77+
/// "force" is true to re-gossip whether recently gossiped or not.
78+
pub async fn regossip_txs(&mut self) -> Result<()> {
79+
let mut txs: Vec<chain::tx::tx::Transaction> = Vec::new();
80+
let mempool = self.vm.mempool.read().await;
81+
82+
// Gossip at most the target units of a block at once
83+
while mempool.len() > 0 {
84+
match mempool.pop_back() {
85+
Some(tx) => {
86+
// Note: when regossiping, we force resend even though we may have done it
87+
// recently.
88+
self.gossiped_tx.put(tx.id, ());
89+
txs.push(tx);
90+
}
91+
None => return Ok(()),
92+
}
93+
}
94+
95+
return self.send_txs(txs).await;
96+
}
97+
98+
pub async fn app_gossip(&mut self, node_id: ids::node::Id, message: &[u8]) -> Result<()> {
99+
log::debug!(
100+
"appgossip message handler, sender: {} bytes: {:?}",
101+
node_id,
102+
message
103+
);
104+
105+
let txs: Vec<chain::tx::tx::Transaction> = serde_json::from_slice(&message).unwrap();
106+
107+
// submit incoming gossip
108+
log::debug!(
109+
"appgossip transactions are being submitted txs: {}",
110+
txs.len()
111+
);
112+
113+
self.vm.submit(txs).await.map_err(|e| {
114+
Error::new(
115+
ErrorKind::Other,
116+
format!(
117+
"appgossip failed to submit txs peer_id: {}: {}",
118+
node_id,
119+
e.to_string()
120+
),
121+
)
122+
})?;
123+
124+
Ok(())
125+
}
126+
}

mini-kvvm/src/vm.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use crate::{
2323
block::{self, state::State},
2424
chain::{self, tx::Transaction, vm::Vm},
2525
genesis::Genesis,
26-
mempool,
26+
mempool, network,
2727
};
2828

2929
const PUBLIC_API_ENDPOINT: &str = "/public";
@@ -63,7 +63,9 @@ impl Default for ChainVmInterior {
6363
#[derive(Clone)]
6464
pub struct ChainVm {
6565
pub db: Box<dyn rpcchainvm::database::Database + Sync + Send>,
66+
pub app_sender: Option<Box<dyn rpcchainvm::common::appsender::AppSender + Send + Sync>>,
6667
pub mempool: Arc<RwLock<mempool::Mempool>>,
68+
pub network: Option<Arc<RwLock<network::Push>>>,
6769
pub inner: Arc<RwLock<ChainVmInterior>>,
6870
pub verified_blocks: Arc<RwLock<HashMap<ids::Id, block::Block>>>,
6971
}
@@ -81,6 +83,8 @@ impl ChainVm {
8183
inner,
8284
mempool: Arc::new(RwLock::new(mempool)),
8385
verified_blocks,
86+
app_sender: None,
87+
network: None,
8488
})
8589
}
8690

@@ -103,6 +107,8 @@ impl ChainVm {
103107
inner: Arc::new(RwLock::new(inner)),
104108
mempool: Arc::new(RwLock::new(mempool)),
105109
verified_blocks: Arc::clone(verified_blocks),
110+
app_sender: None,
111+
network: None,
106112
}
107113
}
108114
}
@@ -235,7 +241,7 @@ impl rpcchainvm::common::vm::Vm for ChainVm {
235241
_config_bytes: &[u8],
236242
_to_engine: Sender<rpcchainvm::common::message::Message>,
237243
_fxs: &[rpcchainvm::common::vm::Fx],
238-
_app_sender: Box<dyn rpcchainvm::common::appsender::AppSender + Send + Sync>,
244+
app_sender: Box<dyn rpcchainvm::common::appsender::AppSender + Send + Sync>,
239245
) -> Result<()> {
240246
let mut vm = self.inner.write().await;
241247
vm.ctx = ctx;
@@ -247,6 +253,9 @@ impl rpcchainvm::common::vm::Vm for ChainVm {
247253
let current = db_manager.current().await?;
248254
self.db = current.db.clone();
249255

256+
self.app_sender = Some(app_sender);
257+
self.network = Some(Arc::new(RwLock::new(network::Push::new(self.clone()))));
258+
250259
let verified_blocks = self.verified_blocks.clone();
251260

252261
vm.state = State::new(self.db.clone(), verified_blocks);

0 commit comments

Comments
 (0)