diff --git a/mini-kvvm/Cargo.toml b/mini-kvvm/Cargo.toml index 2f4d126a..8f526bba 100644 --- a/mini-kvvm/Cargo.toml +++ b/mini-kvvm/Cargo.toml @@ -15,9 +15,11 @@ path = "src/bin/mini-kvvm/main.rs" [dependencies] avalanche-proto = { version = "0.16.0" } -avalanche-types = { version = "0.0.34" } +avalanche-types = { version = "0.0.38" } byteorder = "1.4.3" +chan = "0.1.23" chrono = "0.4.19" +crossbeam-channel = "0.5.6" derivative = "2.2.0" dyn-clone = "1.0.9" ethereum-types = { version = "0.14.0" } diff --git a/mini-kvvm/src/block/builder.rs b/mini-kvvm/src/block/builder.rs new file mode 100644 index 00000000..3bd31218 --- /dev/null +++ b/mini-kvvm/src/block/builder.rs @@ -0,0 +1,303 @@ +use std::{ + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + thread, + time::{Duration, Instant}, +}; + +use avalanche_types::rpcchainvm; +use chan::chan_select; +use crossbeam_channel::TryRecvError; +use tokio::sync::RwLock; + +use crate::vm; + +// TODO: make configurable +const GOSSIP_INTERVAL: Duration = Duration::from_secs(1); +const REGOSSIP_INTERVAL: Duration = Duration::from_secs(30); + +pub trait Builder { + fn build(&self); + fn gossip(&self); + fn handle_generate_block(&self); +} + +pub struct Timed { + pub vm: vm::ChainVm, + + /// status signals the phase of block building the Vm is currently in. + /// [DontBuild] indicates there's no need to build a block. + /// [MayBuild] indicates the Vm should proceed to build a block. + /// [Building] indicates the Vm has sent a request to the engine to build a block. + pub status: Arc>, + + pub build_block_timer: Timer, + + // default to 500ms + pub build_interval: Duration, + + pub stop: crossbeam_channel::Receiver<()>, + pub builder_stop: crossbeam_channel::Receiver<()>, + pub done_gossip: crossbeam_channel::Receiver<()>, + pub done_build: crossbeam_channel::Receiver<()>, +} + +#[derive(PartialEq)] +pub enum Status { + /// Indicates there's no need to build a block. + DontBuild, + + /// Indicates the Vm should proceed to build a block. + MayBuild, + + /// Indicates the Vm has sent a request to the engine to build a block. + Building, +} + +pub struct Timer { + /// Timeout Tx channel is used to reset ticker threads. + timeout_tx: crossbeam_channel::Sender<()>, + + /// Timeout Rx channel listens. + timeout_rx: crossbeam_channel::Receiver<()>, + + /// New timer creation stops when true. + finished: Arc, + + /// Notifies the timer to invoke the callback. + should_execute: Arc, + + /// Duration for timer tick event. + duration: Arc>, +} + +/// Directs the engine when to build blocks and gossip transactions. +impl Timed { + /// Sets the initial timeout on the two stage timer if the process + /// has not already begun from an earlier notification. If [buildStatus] is anything + /// other than [DontBuild], then the attempt has already begun and this notification + /// can be safely skipped. + async fn signal_txs_ready(&mut self) { + if *self.status.read().await == Status::DontBuild { + return; + } + + self.mark_building().await + } + + /// Signal the avalanchego engine to build a block from pending transactions + async fn mark_building(&mut self) { + let vm = self.vm.inner.read().await; + if let Some(engine) = &vm.to_engine { + if let Err(_) = engine + .send(rpcchainvm::common::message::Message::PendingTxs) + .await + { + log::warn!("dropping message to consensus engine"); + }; + return; + } + // release lock + drop(vm); + + let mut status = self.status.write().await; + *status = Status::Building; + return; + } + + /// Should be called immediately after [build_block]. + // [HandleGenerateBlock] invocation could lead to quiescence, building a block with + // some delay, or attempting to build another block immediately + pub async fn handle_generate_block(&mut self) { + let mut status = self.status.write().await; + + if self.need_to_build().await { + *status = Status::MayBuild; + self.dispatch_timer_duration(self.build_interval).await; + } else { + *status = Status::DontBuild; + } + } + + // Returns true if there are outstanding transactions to be issued + // into a block. + async fn need_to_build(&self) -> bool { + let mempool = self.vm.mempool.read().await; + return mempool.len() > 0; + } + + /// Parses the block current status and + pub async fn build_block_parse_status(&mut self) { + let mut mark_building = false; + match &*self.status.read().await { + Status::DontBuild => { + // no op + } + Status::MayBuild => mark_building = true, + Status::Building => { + // If the status has already been set to building, there is no need + // to send an additional request to the consensus engine until the call + // to BuildBlock resets the block status. + } + } + + if mark_building { + self.mark_building().await; + } + } + + /// Defines the duration until we check block status. + pub async fn dispatch_timer_duration(&self, duration: Duration) { + let mut timer = self.build_block_timer.duration.write().await; + *timer = duration; + self.build_block_timer + .should_execute + .store(true, Ordering::Relaxed); + self.dispatch_reset(); + } + + /// Cancel the currently dispatch timer scheduled event. + pub fn dispatch_cancel(&self) { + self.build_block_timer + .should_execute + .store(false, Ordering::Relaxed); + self.dispatch_reset(); + } + + /// Stops execution of the dispatch timer. + pub fn dispatch_stop(&self) { + self.build_block_timer + .finished + .store(true, Ordering::Relaxed); + self.dispatch_reset(); + } + + /// Calls the timeout channel which will result in a new timer event. + pub fn dispatch_reset(&self) { + let _ = self.build_block_timer.timeout_tx.send(()); + } + + /// Manages a dispatch timer lifecycle. + pub async fn dispatch_timer(&mut self) { + let (tx, ticker_rx): ( + crossbeam_channel::Sender<()>, + crossbeam_channel::Receiver<()>, + ) = crossbeam_channel::bounded(1); + let cleared = Arc::new(AtomicBool::new(false)); + let reset = Arc::new(AtomicBool::new(false)); + let mut ticker_duration = Duration::from_secs(0); + + while !self.build_block_timer.finished.load(Ordering::Relaxed) { + // cleared is true after tick + if cleared.load(Ordering::Relaxed) + && self + .build_block_timer + .should_execute + .load(Ordering::Relaxed) + { + self.build_block_parse_status().await; + } + + // start a new ticker thread which sends a single tick signal. + let ticker_tx = tx.clone(); + if reset.load(Ordering::Relaxed) { + tokio::spawn(async move { + let time = Instant::now(); + thread::sleep(ticker_duration); + let _ = ticker_tx.send(()).unwrap(); + log::debug!("Tick duration: {:?}", time.elapsed()); + }); + } + + reset.store(false, Ordering::Relaxed); + cleared.store(false, Ordering::Relaxed); + + let timeout_ch = self.build_block_timer.timeout_rx.clone(); + + loop { + // select will block until one of the channels is received + crossbeam_channel::select! { + recv(timeout_ch) -> _ => { + // reset timer duration + if self.build_block_timer.should_execute.load(Ordering::Relaxed) { + let duration = self.build_block_timer.duration.read().await; + ticker_duration = *duration; + } + reset.store(true, Ordering::Relaxed); + log::debug!("timeout"); + break + } + + // ticker + recv(ticker_rx) -> _ => { + cleared.store(true, Ordering::Relaxed); + log::debug!("tick"); + break + } + } + } + } + } + + /// Ensures that new transactions passed to mempool are + /// considered for the next block. + pub async fn build(&mut self) { + log::debug!("starting build loops"); + + self.signal_txs_ready().await; + let mempool = self.vm.mempool.read().await; + let mempool_pending_ch = mempool.subscribe_pending(); + drop(mempool); + + let stop_ch = self.stop.clone(); + let builder_stop_ch = self.builder_stop.clone(); + + loop { + // select will block until a signal is received + crossbeam_channel::select! { + recv(mempool_pending_ch) -> _ => { + log::debug!("mempool pending called\n"); + self.signal_txs_ready().await; + log::debug!("pending tx received from mempool\n"); + } + + recv(builder_stop_ch) -> _ => { + log::debug!("builder stop called\n"); + break + } + + recv(stop_ch) -> _ => { + log::debug!("stop called\n"); + break + } + } + } + } + + pub async fn gossip(&self) { + log::debug!("starting gossip loops"); + + let gossip = chan::tick(GOSSIP_INTERVAL); + let regossip = chan::tick(REGOSSIP_INTERVAL); + let stop_ch = self.stop.clone(); + + while stop_ch.try_recv() == Err(TryRecvError::Empty) { + chan_select! { + gossip.recv() => { + let mempool = &mut self.vm.mempool.write().await; + let new_txs = mempool.new_txs().unwrap(); + drop(mempool); + + let mut network = self.vm.network.as_ref().unwrap().write().await; + let _ = network.gossip_new_txs(new_txs).await; + }, + regossip.recv() => { + let mut network = self.vm.network.as_ref().unwrap().write().await; + let _ = network.regossip_txs().await; + }, + } + } + } +} diff --git a/mini-kvvm/src/block/mod.rs b/mini-kvvm/src/block/mod.rs index 85ceb2e5..b4cb455c 100644 --- a/mini-kvvm/src/block/mod.rs +++ b/mini-kvvm/src/block/mod.rs @@ -1,3 +1,4 @@ +pub mod builder; pub mod state; use std::io::{Error, ErrorKind, Result}; diff --git a/mini-kvvm/src/lib.rs b/mini-kvvm/src/lib.rs index 573a3edd..ced715a0 100644 --- a/mini-kvvm/src/lib.rs +++ b/mini-kvvm/src/lib.rs @@ -3,5 +3,6 @@ pub mod block; pub mod chain; pub mod genesis; pub mod mempool; +pub mod network; pub mod utils; pub mod vm; diff --git a/mini-kvvm/src/mempool/mod.rs b/mini-kvvm/src/mempool/mod.rs index a7abf3fa..6e1abbd2 100644 --- a/mini-kvvm/src/mempool/mod.rs +++ b/mini-kvvm/src/mempool/mod.rs @@ -6,7 +6,6 @@ use std::{ }; use avalanche_types::ids; -use tokio::sync::broadcast; use crate::chain::tx::tx::Transaction; @@ -17,7 +16,8 @@ pub struct Mempool { /// Channel of length one, which the mempool ensures has an item on /// it as long as there is an unissued transaction remaining in [txs]. - pending_tx: broadcast::Sender<()>, + pending_tx: crossbeam_channel::Sender<()>, + pending_rx: crossbeam_channel::Receiver<()>, /// Vec of [Tx] that are ready to be gossiped. new_txs: Vec, @@ -26,18 +26,21 @@ pub struct Mempool { impl Mempool { pub fn new(max_size: usize) -> Self { // initialize broadcast channel - let (pending_tx, _rx): (broadcast::Sender<()>, broadcast::Receiver<()>) = - tokio::sync::broadcast::channel(1); + let (pending_tx, pending_rx): ( + crossbeam_channel::Sender<()>, + crossbeam_channel::Receiver<()>, + ) = crossbeam_channel::bounded(1); Self { data: Arc::new(RwLock::new(Data::new(max_size))), pending_tx, + pending_rx, new_txs: Vec::new(), } } /// Returns a broadcast receiver for the pending tx channel. - pub fn subscribe_pending(&self) -> broadcast::Receiver<()> { - self.pending_tx.subscribe() + pub fn subscribe_pending(&self) -> crossbeam_channel::Receiver<()> { + self.pending_rx.clone() } /// Returns Tx from Id if it exists. @@ -177,7 +180,7 @@ async fn test_mempool() { // init mempool let mut mempool = Mempool::new(10); - let mut pending_rx = mempool.subscribe_pending(); + let pending_rx = mempool.subscribe_pending(); // create tx_1 let tx_data_1 = unsigned::TransactionData { @@ -195,7 +198,7 @@ async fn test_mempool() { let tx_1_id = tx_1.id; assert_eq!(mempool.add(tx_1).unwrap(), true); // drain channel - let resp = pending_rx.recv().await; + let resp = pending_rx.recv(); assert!(resp.is_ok()); assert_eq!(mempool.len(), 1); @@ -221,7 +224,7 @@ async fn test_mempool() { assert_eq!(mempool.len(), 2); // drain channel - let resp = pending_rx.recv().await; + let resp = pending_rx.recv(); assert!(resp.is_ok()); // prune tx_2 as invalid diff --git a/mini-kvvm/src/network.rs b/mini-kvvm/src/network.rs new file mode 100644 index 00000000..e12c7e60 --- /dev/null +++ b/mini-kvvm/src/network.rs @@ -0,0 +1,126 @@ +use std::{ + io::{Error, ErrorKind, Result}, + num::NonZeroUsize, +}; + +use avalanche_types::ids::{self, Id}; +use lru::LruCache; + +use crate::{ + chain::{self, vm::Vm}, + vm, +}; + +const GOSSIPED_TXS_LRU_SIZE: usize = 512; + +pub struct Push { + vm: vm::ChainVm, + gossiped_tx: LruCache, +} + +impl Push { + pub fn new(vm: vm::ChainVm) -> Self { + let cache: LruCache = + LruCache::new(NonZeroUsize::new(GOSSIPED_TXS_LRU_SIZE).unwrap()); + Self { + gossiped_tx: cache, + vm, + } + } + + pub async fn send_txs(&self, txs: Vec) -> Result<()> { + if txs.is_empty() { + return Ok(()); + } + + let b = serde_json::to_vec(&txs).map_err(|e| { + Error::new( + ErrorKind::Other, + format!("failed to marshal txs: {}", e.to_string()), + ) + })?; + + log::debug!("sending app gossip txs: {} size: {}", txs.len(), b.len()); + + let appsender = self + .vm + .app_sender + .clone() + .expect("appsender should exist after initialize"); + appsender.send_app_gossip(b).await.map_err(|e| { + Error::new( + ErrorKind::Other, + format!("gossip txs failed: {}", e.to_string()), + ) + })?; + Ok(()) + } + + pub async fn gossip_new_txs(&mut self, new_txs: Vec) -> Result<()> { + let mut txs: Vec = Vec::with_capacity(new_txs.len()); + + for tx in new_txs.iter() { + if self.gossiped_tx.contains(&tx.id) { + log::debug!("already gossiped skipping id: {}", tx.id); + continue; + } + + self.gossiped_tx.put(tx.id, ()); + + txs.push(tx.to_owned()); + } + + Ok(()) + } + + /// Triggers "AppGossip" on the pending transactions in the mempool. + /// "force" is true to re-gossip whether recently gossiped or not. + pub async fn regossip_txs(&mut self) -> Result<()> { + let mut txs: Vec = Vec::new(); + let mempool = self.vm.mempool.read().await; + + // Gossip at most the target units of a block at once + while mempool.len() > 0 { + match mempool.pop_back() { + Some(tx) => { + // Note: when regossiping, we force resend even though we may have done it + // recently. + self.gossiped_tx.put(tx.id, ()); + txs.push(tx); + } + None => return Ok(()), + } + } + + return self.send_txs(txs).await; + } + + pub async fn app_gossip(&mut self, node_id: ids::node::Id, message: &[u8]) -> Result<()> { + log::debug!( + "appgossip message handler, sender: {} bytes: {:?}", + node_id, + message + ); + + let txs: Vec = serde_json::from_slice(&message).unwrap(); + + // submit incoming gossip + log::debug!( + "appgossip transactions are being submitted txs: {}", + txs.len() + ); + + self.vm.submit(txs).await.map_err(|e| { + Error::new( + ErrorKind::Other, + format!( + "appgossip failed to submit txs peer_id: {}: {}", + node_id, + e.to_string() + ), + ) + })?; + + Ok(()) + } +} diff --git a/mini-kvvm/src/vm.rs b/mini-kvvm/src/vm.rs index c570a027..9e55c116 100644 --- a/mini-kvvm/src/vm.rs +++ b/mini-kvvm/src/vm.rs @@ -23,7 +23,7 @@ use crate::{ block::{self, state::State}, chain::{self, tx::Transaction, vm::Vm}, genesis::Genesis, - mempool, + mempool, network, }; const PUBLIC_API_ENDPOINT: &str = "/public"; @@ -63,7 +63,9 @@ impl Default for ChainVmInterior { #[derive(Clone)] pub struct ChainVm { pub db: Box, + pub app_sender: Option>, pub mempool: Arc>, + pub network: Option>>, pub inner: Arc>, pub verified_blocks: Arc>>, } @@ -81,6 +83,8 @@ impl ChainVm { inner, mempool: Arc::new(RwLock::new(mempool)), verified_blocks, + app_sender: None, + network: None, }) } @@ -103,6 +107,8 @@ impl ChainVm { inner: Arc::new(RwLock::new(inner)), mempool: Arc::new(RwLock::new(mempool)), verified_blocks: Arc::clone(verified_blocks), + app_sender: None, + network: None, } } } @@ -235,7 +241,7 @@ impl rpcchainvm::common::vm::Vm for ChainVm { _config_bytes: &[u8], _to_engine: Sender, _fxs: &[rpcchainvm::common::vm::Fx], - _app_sender: Box, + app_sender: Box, ) -> Result<()> { let mut vm = self.inner.write().await; vm.ctx = ctx; @@ -247,6 +253,9 @@ impl rpcchainvm::common::vm::Vm for ChainVm { let current = db_manager.current().await?; self.db = current.db.clone(); + self.app_sender = Some(app_sender); + self.network = Some(Arc::new(RwLock::new(network::Push::new(self.clone())))); + let verified_blocks = self.verified_blocks.clone(); vm.state = State::new(self.db.clone(), verified_blocks); diff --git a/mini-kvvm/tests/api/mod.rs b/mini-kvvm/tests/api/mod.rs index e2d28364..270a8a43 100644 --- a/mini-kvvm/tests/api/mod.rs +++ b/mini-kvvm/tests/api/mod.rs @@ -1,3 +1,5 @@ +use std::thread; + use avalanche_types::rpcchainvm::database::memdb::Database as MemDb; use jsonrpc_core::futures::{self, FutureExt}; use jsonrpc_core_client::transports::local; @@ -17,12 +19,12 @@ async fn service_test() { // get a broadcast tx pending receiver for new blocks; let mempool = vm.mempool.read().await; - let mut pending_rx = mempool.subscribe_pending(); + let pending_rx = mempool.subscribe_pending(); drop(mempool); // unblock channel - tokio::spawn(async move { - loop { - pending_rx.recv().await.unwrap(); + thread::spawn(move || loop { + crossbeam_channel::select! { + recv(pending_rx) -> _ => {} } });