diff --git a/.github/workflows/test-and-release.yml b/.github/workflows/test-and-release.yml index ca3e9704..8217e15c 100644 --- a/.github/workflows/test-and-release.yml +++ b/.github/workflows/test-and-release.yml @@ -135,7 +135,7 @@ jobs: with: cache-on-failure: true - name: Run e2e tests - run: scripts/tests.e2e.sh 1.7.10 + run: scripts/tests.e2e.sh 1.7.13 release: name: Release ${{ matrix.job.target }} (${{ matrix.job.os }}) @@ -232,7 +232,7 @@ jobs: run: | if [ "$PLATFORM_NAME" == "linux" ]; then - ./target/${TARGET}/release/mini-kvvm + ./target/${TARGET}/release/mini-kvvm --version cp ./target/${TARGET}/release/mini-kvvm mini-kvvm.${TARGET} echo "::set-output name=file_name_mini-kvvm::mini-kvvm.${TARGET}" tar -czvf mini-kvvm_${TARGET}.tar.gz -C ./target/${TARGET}/release mini-kvvm diff --git a/mini-kvvm/Cargo.toml b/mini-kvvm/Cargo.toml index 54c54d11..21b9b595 100644 --- a/mini-kvvm/Cargo.toml +++ b/mini-kvvm/Cargo.toml @@ -2,7 +2,7 @@ name = "mini-kvvm" version = "0.0.0" edition = "2021" -rust-version = "1.60" +rust-version = "1.61" publish = false description = "Mini key-value store VM for Avalanche in Rust" license = "BSD-3-Clause" @@ -14,15 +14,28 @@ name = "mini-kvvm" path = "src/bin/mini-kvvm/main.rs" [dependencies] -avalanche-proto = { path = "../crates/avalanche-proto" } -clap = { version = "3.1.18", features = ["cargo", "derive"] } +async-trait = "0.1.53" +avalanche-proto = { version = "0.15.1" } +avalanche-types = "0.0.27" #NOTE avalanche-types must be updated from branch ava-types-plugin-logic for this to work +avalanche-utils = { version="0.0.3", features = ["rfc3339"] } +bytes = "1.1.0" +chrono = "0.4.19" +clap = { version = "3.1.8", features = ["cargo", "derive"] } env_logger = "0.9.0" -log = "0.4.17" -prost = "0.10.3" -serde = { version = "1.0.137", features = ["derive"] } -serde_json = "1.0.81" -serde_yaml = "0.8.24" -tokio = { version = "1.18.2", features = ["fs", "rt-multi-thread"] } -tokio-stream = { version = "0.1.8", features = ["net"] } +hmac-sha256 = "1.1" +jsonrpc-core = "18.0" +jsonrpc-derive = "18.0" +log = "0.4.16" +num-traits = "0.2.15" +num-derive = "0.3" +prost = "0.10.0" +semver = "1.0.9" +serde = { version = "1.0.136", features = ["derive"] } +serde_json = "1.0.79" +serde_yaml = "0.8.23" +time = { version = "0.3.9", features = ["formatting", "parsing"]} +tokio = { version = "1.19.1", features = ["fs", "rt-multi-thread"] } +tokio-stream = { version = "0.1.9", features = ["net"] } tonic = { version = "0.7", features = ["compression"] } tonic-health = "0.6" +tonic-reflection = "0.4.0" diff --git a/mini-kvvm/src/bin/mini-kvvm/main.rs b/mini-kvvm/src/bin/mini-kvvm/main.rs index a3cc5c6d..5426ce2d 100644 --- a/mini-kvvm/src/bin/mini-kvvm/main.rs +++ b/mini-kvvm/src/bin/mini-kvvm/main.rs @@ -1,13 +1,14 @@ -use std::io; +use std::{io::Result, sync::Arc}; use clap::{crate_version, Arg, Command}; use log::info; - -use mini_kvvm::genesis; +use mini_kvvm::{genesis, kvvm}; +use tokio::sync::RwLock; pub const APP_NAME: &str = "mini-kvvm-rs"; -fn main() { +#[tokio::main] +async fn main() -> Result<()> { let matches = Command::new(APP_NAME) .version(crate_version!()) .about("Mini key-value VM for Avalanche in Rust") @@ -38,11 +39,18 @@ fn main() { let msg = sub_matches.value_of("WELCOME_MESSAGE").unwrap_or(""); let p = sub_matches.value_of("GENESIS_FILE_PATH").unwrap_or(""); execute_genesis(author, msg, p).unwrap(); - return; + return Ok(()); } info!("starting mini-kvvm-rs"); - // TODO + let mini_kvvm = kvvm::ChainVm::new(Arc::new(RwLock::new(kvvm::ChainVmInterior::default()))); + let rpcchain = avalanche_types::rpcchainvm::vm::server::Server::new(mini_kvvm); + + avalanche_types::rpcchainvm::plugin::serve(rpcchain) + .await + .expect("failed to start server"); + + Ok(()) } pub fn command_genesis() -> Command<'static> { @@ -79,12 +87,10 @@ pub fn command_genesis() -> Command<'static> { ) } -pub fn execute_genesis(author: &str, msg: &str, p: &str) -> io::Result<()> { +pub fn execute_genesis(author: &str, msg: &str, p: &str) -> Result<()> { let g = genesis::Genesis { author: String::from(author), welcome_message: String::from(msg), }; - g.sync(p)?; - - Ok(()) + g.sync(p) } diff --git a/mini-kvvm/src/block.rs b/mini-kvvm/src/block.rs new file mode 100644 index 00000000..5a0a1bad --- /dev/null +++ b/mini-kvvm/src/block.rs @@ -0,0 +1,253 @@ +use std::{ + cmp::Ordering, + io::{Error, ErrorKind, Result}, +}; + +use avalanche_types::{ + choices::status::Status, + ids::{must_deserialize_id, Id}, + rpcchainvm, +}; +use avalanche_utils::rfc3339; +use chrono::{DateTime, NaiveDateTime, Utc}; +use hmac_sha256::Hash; +use serde::{Deserialize, Serialize}; + +use crate::kvvm::ChainVm; + +pub const DATA_LEN: usize = 32; + +impl Block { + pub fn new( + parent: Id, + height: u64, + data: Vec, + timestamp: DateTime, + status: Status, + ) -> Self { + Self { + parent, + height, + timestamp, + data, + status, + id: Id::empty(), + bytes: Vec::default(), + vm: None, + } + } +} + +pub trait MiniKvvmBlock: rpcchainvm::concensus::snowman::Block + Serialize { + fn data(&self) -> &[u8]; + fn initialize(&mut self, vm: ChainVm) -> Result; + fn set_status(&mut self, status: Status); +} + +// TODO remove +// Default is only used as a placeholder for unimplemented block logic +impl Default for Block { + fn default() -> Self { + Self { + id: Id::empty(), + parent: Id::empty(), + timestamp: DateTime::::from_utc(NaiveDateTime::from_timestamp(0, 0), Utc), + bytes: Vec::default(), + height: 0, + status: Status::Unknown("".to_string()), + data: Vec::default(), + vm: None, + } + } +} + +/// snow/consensus/snowman/Block +/// ref. https://pkg.go.dev/github.com/ava-labs/avalanchego/snow/consensus/snowman#Block +#[derive(Serialize, Clone, Deserialize)] +pub struct Block { + #[serde(deserialize_with = "must_deserialize_id")] + pub parent: Id, + pub status: Status, + height: u64, + #[serde(with = "rfc3339::serde_format")] + timestamp: DateTime, + data: Vec, + + // generated not serialized + #[serde(skip)] + id: Id, + // generated not serialized + #[serde(skip)] + bytes: Vec, + #[serde(skip)] + vm: Option, +} + +#[tonic::async_trait] +impl rpcchainvm::concensus::snowman::Decidable for Block { + /// id returns the ID of this block + async fn id(&self) -> Id { + self.id + } + + /// status returns the status of this block + async fn status(&self) -> Status { + self.status.clone() + } + + /// Accepts this element. + async fn accept(&mut self) -> Result<()> { + let vm = self.vm.clone(); + let vm = vm.ok_or(Error::new(ErrorKind::Other, "no vm associated with block"))?; + let mut inner = vm.inner.write().await; + + self.status = Status::Accepted; + + // add newly accepted block to state + inner + .state + .put_block(self.clone(), vm.clone()) + .await + .map_err(|e| Error::new(ErrorKind::Other, format!("failed to put block: {:?}", e)))?; + + // set last accepted block to this block id + inner + .state + .set_last_accepted_block_id(&self.id) + .await + .map_err(|e| Error::new(ErrorKind::Other, format!("failed to put block: {:?}", e)))?; + + // remove from verified blocks + inner.verified_blocks.remove(&self.id); + Ok(()) + } + + /// Rejects this element. + async fn reject(&mut self) -> Result<()> { + let vm = self.vm.clone(); + let vm = vm.ok_or(Error::new(ErrorKind::Other, "no vm associated with block"))?; + let mut inner = vm.inner.write().await; + + self.status = Status::Rejected; + + // add newly rejected block to state + inner + .state + .put_block(self.clone(), vm.clone()) + .await + .map_err(|e| Error::new(ErrorKind::Other, format!("failed to put block: {:?}", e)))?; + + // remove from verified, as it is rejected + inner.verified_blocks.remove(&self.id); + Ok(()) + } +} + +#[tonic::async_trait] +impl rpcchainvm::concensus::snowman::Block for Block { + /// bytes returns the binary representation of this block + async fn bytes(&self) -> &[u8] { + &self.bytes + } + + /// height returns this block's height. The genesis block has height 0. + async fn height(&self) -> u64 { + self.height + } + + async fn timestamp(&self) -> u64 { + self.timestamp.timestamp() as u64 + } + + async fn parent(&self) -> Id { + self.parent + } + + /// verify ensures that the state of the block is expected. + async fn verify(&self) -> Result<()> { + let vm = self + .vm + .clone() + .ok_or(Error::new(ErrorKind::Other, "no reference to vm"))?; + + let vm = vm.inner.read().await; + + match vm.state.get_block(self.parent).await? { + Some(parent_block) => { + // Ensure block height comes right after its parent's height + if parent_block.height().await + 1 != self.height { + return Err(Error::new( + ErrorKind::InvalidData, + "failed to verify block invalid height", + )); + } + // Ensure block timestamp is after its parent's timestamp. + if self.timestamp().await.cmp(&parent_block.timestamp().await) == Ordering::Less { + return Err(Error::new( + ErrorKind::InvalidData, + format!( + "block timestamp: {} is after parents: {}", + self.timestamp().await, + parent_block.timestamp().await + ), + )); + } + Ok(()) + } + None => Err(Error::new( + ErrorKind::NotFound, + "failed to verify block parent not found", + )), + } + } +} + +impl MiniKvvmBlock for Block { + /// data returns the block payload. + fn data(&self) -> &[u8] { + &self.data + } + + fn set_status(&mut self, status: Status) { + self.status = status; + } + + /// initialize populates the generated fields (id, bytes) of the the block and + /// returns the generated id. + fn initialize(&mut self, vm: ChainVm) -> Result { + if self.id.is_empty() { + match serde_json::to_vec(&self) { + // Populate generated fields + Ok(block_bytes) => { + let block_data = block_bytes.as_slice(); + let block_id = to_block_id(&block_data); + self.id = block_id; + self.bytes = block_bytes; + self.vm = Some(vm); + return Ok(self.id); + } + Err(error) => { + return Err(Error::new(ErrorKind::NotFound, error)); + } + } + } + Ok(self.id) + } +} + +fn to_block_id(bytes: &[u8]) -> Id { + new_id(Hash::hash(bytes)) +} + +fn new_id(bytes: [u8; DATA_LEN]) -> Id { + Id::from_slice(&bytes) +} + +#[tokio::test] +async fn test_serialization_round_trip() { + use rpcchainvm::concensus::snowman::Block as _; //Bring the block trait into scope for [.parent()] + let block = Block::default(); + let writer = serde_json::to_vec(&block).unwrap(); + let value: Block = serde_json::from_slice(&writer).unwrap(); + assert_eq!(block.parent().await, value.parent().await); +} diff --git a/mini-kvvm/src/genesis.rs b/mini-kvvm/src/genesis.rs index b3a95ef9..aac6fb9d 100644 --- a/mini-kvvm/src/genesis.rs +++ b/mini-kvvm/src/genesis.rs @@ -44,6 +44,22 @@ impl Genesis { Ok(resp) } + pub fn verify(&self) -> Result<(), Error> { + if self.author.is_empty() { + return Err(Error::new( + ErrorKind::InvalidData, + format!("invalid author"), + )); + } + if self.welcome_message.is_empty() { + return Err(Error::new( + ErrorKind::InvalidData, + format!("invalid welcome_message"), + )); + } + Ok(()) + } + pub fn sync(&self, file_path: &str) -> io::Result<()> { info!("syncing genesis to '{}'", file_path); let path = Path::new(file_path); diff --git a/mini-kvvm/src/kvvm.rs b/mini-kvvm/src/kvvm.rs index 205ef498..59cdc6c1 100644 --- a/mini-kvvm/src/kvvm.rs +++ b/mini-kvvm/src/kvvm.rs @@ -1,30 +1,506 @@ #![allow(dead_code)] -#![allow(unused_imports)] -use std::sync::{Arc, Mutex}; +use std::{ + collections::HashMap, + convert::TryInto, + io::{Error, ErrorKind, Result}, + sync::Arc, + time, +}; -use avalanche_proto::vm::vm_server::Vm; +use avalanche_types::{ + choices::status::Status, + ids, + rpcchainvm::{ + self, + concensus::snowman::Block as BlockTrait, + database::manager::{DatabaseManager, Manager}, + }, +}; +use chrono::{DateTime, NaiveDateTime, Utc}; +use semver::Version; +use tokio::sync::{mpsc::Sender, RwLock}; -#[derive(Debug)] -pub struct Handler { - db: Db, +use crate::block::{Block, MiniKvvmBlock}; +use crate::genesis::Genesis; +use crate::state::State; + +pub struct ChainVmInterior { + pub ctx: Option, + pub bootstrapped: bool, + pub version: Version, + pub genesis: Genesis, + pub db_manager: Box, + pub state: State, + pub preferred: ids::Id, + pub mempool: Vec>, + pub verified_blocks: HashMap>, + pub last_accepted: Block, + pub to_engine: Option>, + preferred_block_id: Option, +} + +impl ChainVmInterior { + pub fn new( + ctx: Option, + bootstrapped: bool, + version: Version, + genesis: Genesis, + db_manager: Box, + state: State, + preferred: ids::Id, + mempool: Vec>, + verified_blocks: HashMap>, + last_accepted: Block, + to_engine: Option>, + preferred_block_id: Option, + ) -> Self { + Self { + ctx, + bootstrapped, + version, + genesis, + db_manager, + state, + preferred, + mempool, + verified_blocks, + last_accepted, + to_engine, + preferred_block_id, + } + } +} + +impl Default for ChainVmInterior { + fn default() -> Self { + Self { + ctx: None, + bootstrapped: false, + version: Version::new(0, 0, 1), + genesis: Genesis::default(), + db_manager: DatabaseManager::new_from_databases(Vec::new()), + state: State::new(None), + preferred: ids::Id::empty(), + mempool: Vec::new(), + verified_blocks: HashMap::new(), + last_accepted: Block::default(), + to_engine: None, + preferred_block_id: None, + } + } +} + +// Wrapper around ChainVmInterior, allowing for easier access with [Arc>] access +#[derive(Clone)] +pub struct ChainVm { + pub inner: Arc>, +} + +impl ChainVm { + pub fn new(inner: Arc>) -> Box { + Box::new(ChainVm { inner }) + } +} + +impl Default for ChainVm { + fn default() -> Self { + ChainVm { + inner: Arc::new(RwLock::new(ChainVmInterior::default())), + } + } +} + +// This VM doesn't (currently) have any app-specific messages +#[tonic::async_trait] +impl rpcchainvm::common::apphandler::AppHandler for ChainVm { + async fn app_request( + &self, + _node_id: &ids::node::Id, + _request_id: u32, + _deadline: time::Instant, + _request: &[u8], + ) -> Result<()> { + Err(Error::new( + ErrorKind::Unsupported, + "app request not implemented", + )) + } + + async fn app_request_failed(&self, _node_id: &ids::node::Id, _request_id: u32) -> Result<()> { + Err(Error::new( + ErrorKind::Unsupported, + "app request failed not implemented", + )) + } + + async fn app_response( + &self, + _node_id: &ids::node::Id, + _request_id: u32, + _response: &[u8], + ) -> Result<()> { + Err(Error::new( + ErrorKind::Unsupported, + "app response not implemented", + )) + } + + async fn app_gossip(&self, _node_id: &ids::node::Id, _msg: &[u8]) -> Result<()> { + Err(Error::new( + ErrorKind::Unsupported, + "app gossip not implemented", + )) + } +} + +// This VM doesn't implement Connector these methods are noop. +#[tonic::async_trait] +impl rpcchainvm::common::vm::Connector for ChainVm { + async fn connected(&self, _id: &ids::node::Id) -> Result<()> { + log::info!("connected called"); + Ok(()) + } + async fn disconnected(&self, _id: &ids::node::Id) -> Result<()> { + log::info!("disconnected called"); + Ok(()) + } +} + +#[tonic::async_trait] +impl rpcchainvm::health::Checkable for ChainVm { + async fn health_check(&self) -> Result> { + Ok(Vec::new()) + } +} + +#[tonic::async_trait] +impl rpcchainvm::common::vm::Vm for ChainVm { + async fn initialize( + &self, + ctx: Option, + db_manager: Box, + genesis_bytes: &[u8], + _upgrade_bytes: &[u8], + _config_bytes: &[u8], + to_engine: Sender, + _fxs: &[rpcchainvm::common::vm::Fx], + _app_sender: Box, + ) -> Result<()> { + let mut vm = self.inner.write().await; + + vm.ctx = ctx; + vm.db_manager = db_manager; + vm.to_engine = Some(to_engine); + + let current_db = vm.db_manager.current().await.map_err(|e| { + Error::new( + ErrorKind::Other, + format!("failed to verify genesis: {:?}", e), + ) + })?; + + let state = State::new(Some(current_db.clone())); + vm.state = state; + + let genesis = Genesis::default(); //NOTE changed to have default genesis for testing + vm.genesis = genesis; + vm.genesis.verify().map_err(|e| { + Error::new( + ErrorKind::Other, + format!("failed to verify genesis: {:?}", e), + ) + })?; + + // Check if last accepted block exists + if vm.state.has_last_accepted_block().await? { + let maybe_last_accepted_block_id = vm.state.get_last_accepted_block_id().await?; + if maybe_last_accepted_block_id.is_none() { + return Err(Error::new(ErrorKind::Other, "invalid block no id found")); + } + let last_accepted_block_id = maybe_last_accepted_block_id.unwrap(); + + let maybe_last_accepted_block = vm.state.get_block(last_accepted_block_id).await?; + if maybe_last_accepted_block.is_none() { + return Err(Error::new(ErrorKind::Other, "invalid block no id found")); + } + let last_accepted_block = maybe_last_accepted_block.unwrap(); + + vm.preferred = last_accepted_block_id; + vm.last_accepted = last_accepted_block; + + log::info!( + "initialized from last accepted block {}", + last_accepted_block_id + ); + } else { + let genesis_block_vec = genesis_bytes.to_vec(); + let genesis_block_bytes = genesis_block_vec.try_into().unwrap(); + + let mut genesis_block = Block::new( + ids::Id::empty(), + 0, + genesis_block_bytes, + DateTime::::from_utc(NaiveDateTime::from_timestamp(0, 0), Utc), + Status::Processing, + ); + + let genesis_block_id = genesis_block.initialize(self.clone())?; + + let accepted_block_id = vm + .state + .accept_block(genesis_block, self.clone()) + .await + .map_err(|e| { + Error::new(ErrorKind::Other, format!("failed to accept block: {:?}", e)) + })?; + // Remove accepted block now that it is accepted + vm.verified_blocks.remove(&accepted_block_id); + + log::info!("initialized from genesis block: {:?}", genesis_block_id) + } + + Ok(()) + } + + async fn shutdown(&self) -> Result<()> { + Ok(()) + } + + async fn set_state(&self, snow_state: rpcchainvm::state::State) -> Result<()> { + let mut vm = self.inner.write().await; + match snow_state.try_into() { + // Initializing is called by chains manager when it is creating the chain. + Ok(rpcchainvm::state::State::Initializing) => { + log::debug!("set_state: initializing"); + vm.bootstrapped = false; + Ok(()) + } + Ok(rpcchainvm::state::State::StateSyncing) => { + log::debug!("set_state: state syncing"); + Err(Error::new(ErrorKind::Other, "state sync is not supported")) + } + // Bootstrapping is called by the bootstrapper to signal bootstrapping has started. + Ok(rpcchainvm::state::State::Bootstrapping) => { + log::debug!("set_state: bootstrapping"); + vm.bootstrapped = false; + Ok(()) + } + // NormalOp os called when consensus has started signalling bootstrap phase is complete + Ok(rpcchainvm::state::State::NormalOp) => { + log::debug!("set_state: normal op"); + vm.bootstrapped = true; + Ok(()) + } + Err(_) => Err(Error::new(ErrorKind::Other, "failed to accept block")), + } + } + + /// Returns this VM's version + async fn version(&self) -> Result { + let vm = self.inner.read().await; + Ok(vm.version.to_string()) + } + + async fn create_static_handlers( + &self, + ) -> std::io::Result< + std::collections::HashMap< + String, + avalanche_types::rpcchainvm::common::http_handler::HttpHandler, + >, + > { + use super::static_service::{ + StaticService, StaticServiceImpl, STATICSERVICE_PUBLICENDPOINT, + }; + log::debug!("create_static_handlers called"); + + // make a new jsonrpc service with this vm as a reference + let mut io = jsonrpc_core::IoHandler::new(); + let service = StaticServiceImpl { vm: self.clone() }; + + // Allow [io] to handle methods defined in service.rs + io.extend_with(service.to_delegate()); + let http_handler = rpcchainvm::common::http_handler::HttpHandler::new_from_u8(0, io) + .map_err(|_| std::io::Error::from(std::io::ErrorKind::InvalidData))?; + + let mut handlers = std::collections::HashMap::new(); + handlers.insert(String::from(STATICSERVICE_PUBLICENDPOINT), http_handler); + Ok(handlers) + } + + async fn create_handlers( + &self, + ) -> std::io::Result< + std::collections::HashMap< + String, + avalanche_types::rpcchainvm::common::http_handler::HttpHandler, + >, + > { + use super::service::{Service, ServiceImpl, SERVICE_PUBLICENDPOINT}; + log::debug!("create_handlers called"); + + // make a new jsonrpc service with this vm as a reference + let mut io = jsonrpc_core::IoHandler::new(); + let service = ServiceImpl { vm: self.clone() }; + + // Allow [io] to handle methods defined in service.rs + io.extend_with(service.to_delegate()); + let http_handler = rpcchainvm::common::http_handler::HttpHandler::new_from_u8(0, io) + .map_err(|_| std::io::Error::from(std::io::ErrorKind::InvalidData))?; + + let mut handlers = std::collections::HashMap::new(); + handlers.insert(String::from(SERVICE_PUBLICENDPOINT), http_handler); + Ok(handlers) + } } -#[derive(Debug, Clone)] -struct Db { - shared: Arc, +#[tonic::async_trait] +impl rpcchainvm::snowman::block::Getter for ChainVm { + async fn get_block( + &self, + id: ids::Id, + ) -> Result> { + let vm = self.inner.write().await; + log::debug!("kvvm get_block called"); + + let current_db = vm.db_manager.current().await.map_err(|e| { + Error::new( + ErrorKind::Other, + format!("failed to get current db: {:?}", e), + ) + })?; + + let state = crate::state::State::new(Some(current_db.clone())); + + match state.get_block(id).await? { + Some(mut block) => { + let block_id = block.initialize(self.clone())?; + + log::debug!("found old block id: {}", block_id.to_string()); + + Ok(Box::new(block)) + } + None => Err(Error::new( + ErrorKind::NotFound, + format!("failed to get block id: {}", id), + )), + } + } } -#[derive(Debug)] -struct Shared { - state: Mutex, +#[tonic::async_trait] +impl rpcchainvm::snowman::block::Parser for ChainVm { + async fn parse_block( + &self, + bytes: &[u8], + ) -> Result> { + let vm = self.inner.write().await; + log::debug!( + "kvvm parse_block called: {}", + String::from_utf8_lossy(&bytes) + ); + + let mut new_block: Block = serde_json::from_slice(bytes.as_ref())?; + new_block.status = Status::Processing; + + let current_db = vm.db_manager.current().await.map_err(|e| { + Error::new( + ErrorKind::Other, + format!("failed to get current db: {:?}", e), + ) + })?; + + let state = crate::state::State::new(Some(current_db.clone())); + + let new_block_id = new_block + .initialize(self.clone()) + .map_err(|e| Error::new(ErrorKind::Other, format!("failed to init block: {:?}", e)))?; + + match state.get_block(new_block_id).await? { + Some(mut old_block) => { + let old_block_id = old_block.initialize(self.clone())?; + log::debug!("parsed old block id: {}", old_block_id.to_string()); + Ok(Box::new(old_block)) + } + None => { + log::debug!("parsed new block id: {}", new_block_id); + Ok(Box::new(new_block)) + } + } + } } -#[derive(Debug)] -struct State { - bootstrapped: bool, +#[tonic::async_trait] +impl rpcchainvm::snowman::block::ChainVm for ChainVm { + async fn build_block(&self) -> Result> { + log::debug!("build_block called"); + use avalanche_types::rpcchainvm::snowman::block::Getter; + let mut vm = self.inner.write().await; + + // Pop next block from mempool error if empty + let block_value = vm + .mempool + .pop() + .ok_or_else(|| Error::new(ErrorKind::Other, "there is no block to propose"))?; + + // Get Preferred Block + let preferred_block = self.get_block(vm.preferred).await?; + + let mut new_block = Block::new( + vm.preferred, + preferred_block.height().await + 1, + block_value, + chrono::offset::Utc::now(), + Status::Processing, + ); + + let new_block_id = new_block.initialize(self.clone())?; + + new_block.verify().await.map_err(|e| { + Error::new(ErrorKind::Other, format!("failed to verify block: {:?}", e)) + })?; + + // Add block as verified + vm.verified_blocks.insert(new_block_id, preferred_block); + log::debug!("block verified {:?}", new_block_id); + + Ok(Box::new(new_block)) + } + + async fn set_preference(&self, id: ids::Id) -> Result<()> { + log::info!("setting preferred block id..."); + let mut vm = self.inner.write().await; + vm.preferred_block_id = Some(id); + Ok(()) + } + + async fn last_accepted(&self) -> Result { + let vm = self.inner.write().await; + let current_db = vm.db_manager.current().await.map_err(|e| { + Error::new( + ErrorKind::Other, + format!("failed to get current db: {:?}", e), + ) + })?; + + let state = crate::state::State::new(Some(current_db.clone())); + + match state.get_last_accepted_block_id().await? { + Some(last_accepted_block_id) => Ok(last_accepted_block_id), + None => Err(Error::new( + ErrorKind::NotFound, + "failed to get last accepted block", + )), + } + } + + async fn issue_tx(&self) -> Result> { + Err(Error::new( + ErrorKind::Unsupported, + "issue tx not implemented", + )) + } } -// impl Vm for Handler { -// TODO -// } +impl rpcchainvm::vm::Vm for ChainVm {} diff --git a/mini-kvvm/src/lib.rs b/mini-kvvm/src/lib.rs index 024feeea..a8bae1f0 100644 --- a/mini-kvvm/src/lib.rs +++ b/mini-kvvm/src/lib.rs @@ -1,3 +1,6 @@ +pub mod block; pub mod genesis; pub mod kvvm; -pub mod plugin; +pub mod service; +pub mod state; +pub mod static_service; diff --git a/mini-kvvm/src/plugin.rs b/mini-kvvm/src/plugin.rs deleted file mode 100644 index 7983b13e..00000000 --- a/mini-kvvm/src/plugin.rs +++ /dev/null @@ -1,78 +0,0 @@ -use std::io::{self, Error, ErrorKind}; - -use avalanche_proto::vm::vm_server::{Vm, VmServer}; -use log::info; -use tokio::net::TcpListener; -use tokio_stream::wrappers::TcpListenerStream; -use tonic::transport::{server::NamedService, Server}; -use tonic_health::server::health_reporter; - -/// ref. https://github.com/ava-labs/avalanchego/blob/v1.7.10/vms/rpcchainvm/vm.go -pub const PROTOCOL_VERSION: u8 = 12; -pub const MAGIC_COOKIE_KEY: &str = "VM_PLUGIN"; -pub const MAGIC_COOKIE_VALUE: &str = "dynamic"; - -/// ref. https://github.com/ava-labs/avalanchego/blob/v1.7.10/vms/rpcchainvm/vm.go -#[derive(Debug)] -pub struct HandshakeConfig { - pub protocol_version: u8, - pub magic_cookie_key: &'static str, - pub magic_cookie_value: &'static str, -} - -impl Default for HandshakeConfig { - fn default() -> Self { - Self::default() - } -} - -impl HandshakeConfig { - pub fn default() -> Self { - Self { - protocol_version: PROTOCOL_VERSION, - magic_cookie_key: MAGIC_COOKIE_KEY, - magic_cookie_value: MAGIC_COOKIE_VALUE, - } - } -} - -struct Plugin; - -impl NamedService for Plugin { - const NAME: &'static str = "plugin"; -} - -pub async fn serve(vm: V, handshake_config: &HandshakeConfig) -> io::Result<()> -where - V: Vm, -{ - // "go-plugin requires the gRPC Health Checking Service to be registered on your server" - // ref. https://github.com/hashicorp/go-plugin/blob/master/docs/guide-plugin-write-non-go.md - // ref. https://github.com/hyperium/tonic/blob/v0.7.1/examples/src/health/server.rs - let (mut health_reporter, health_svc) = health_reporter(); - health_reporter.set_serving::().await; - - // TODO: Add support for abstract unix sockets once supported by tonic. - // ref. https://github.com/hyperium/tonic/issues/966 - // avalanchego currently only supports plugins listening on IP address. - let listener = TcpListener::bind("127.0.0.1:0").await?; - let addr = listener.local_addr()?; - info!("plugin listening on address {:?}", addr); - - // ref. https://github.com/hashicorp/go-plugin/blob/master/docs/guide-plugin-write-non-go.md#4-output-handshake-information - let handshake_msg = format!("1|{}|tcp|{}|grpc|", handshake_config.protocol_version, addr); - info!("handshake message: {}", handshake_msg); - println!("{}", handshake_msg); - - Server::builder() - .add_service(health_svc) - .add_service(VmServer::new(vm)) - .serve_with_incoming(TcpListenerStream::new(listener)) - .await - .map_err(|e| { - Error::new( - ErrorKind::Other, - format!("failed serve_with_incoming '{}'", e), - ) - }) -} diff --git a/mini-kvvm/src/service.rs b/mini-kvvm/src/service.rs new file mode 100644 index 00000000..f4e03947 --- /dev/null +++ b/mini-kvvm/src/service.rs @@ -0,0 +1,158 @@ +use crate::{block::Block, kvvm::ChainVm}; + +use avalanche_types::{ + choices::status::Status, + ids::Id, + rpcchainvm::snowman::block::{ChainVm as ChainVmTrait, Getter, Parser}, +}; +use chrono::{DateTime, NaiveDateTime, Utc}; +use jsonrpc_core::{BoxFuture, Error as JsonRPCError, ErrorCode as JRPCErrorCode, Result}; +use jsonrpc_derive::rpc; +use serde::{Deserialize, Serialize}; +use std::str::FromStr; + +pub const SERVICE_PUBLICENDPOINT: &str = "/kvvm-rs"; //used for this service's endpoint + +#[derive(Serialize)] +pub struct BuildBlockResponse { + pub block: Vec, +} + +#[derive(Deserialize)] +pub struct GetBlockArgs { + pub id: String, +} + +#[derive(Serialize)] +pub struct GetBlockResponse { + pub block: Vec, +} + +#[derive(Serialize)] +pub struct LastAcceptedResponse { + pub id: Id, +} + +#[derive(Deserialize)] +pub struct ParseBlockArgs { + pub bytes: Vec, +} + +#[derive(Serialize)] +pub struct ParseBlockResponse { + pub block: Vec, +} + +#[derive(Deserialize)] +pub struct AddBlockArgs { + pub bytes: Vec, +} + +#[derive(Serialize)] +pub struct AddBlockResponse { + pub id: Id, +} + +#[rpc(server)] +pub trait Service { + #[rpc(name = "build_block")] + fn build_block(&self) -> BoxFuture>; + + #[rpc(name = "get_block")] + fn get_block(&self, params: GetBlockArgs) -> BoxFuture>; + + #[rpc(name = "last_accepted")] + fn last_accepted(&self) -> BoxFuture>; + + #[rpc(name = "parse_block")] + fn parse_block(&self, params: ParseBlockArgs) -> BoxFuture>; + + #[rpc(name = "add_block")] + fn add_block(&self, params: AddBlockArgs) -> BoxFuture>; +} +/// Implementation of handlers +pub struct ServiceImpl { + pub vm: ChainVm, +} + +// TODO: Edit to pass error messages through jsonrpc error +fn create_jsonrpc_error(e: std::io::Error) -> JsonRPCError { + let mut error = JsonRPCError::new(JRPCErrorCode::InternalError); + error.message = format!("{}", e); + error +} + +impl Service for ServiceImpl { + fn build_block(&self) -> BoxFuture> { + log::info!("build block method called"); + let vm = self.vm.clone(); + + Box::pin(async move { + let result = vm.build_block().await.map_err(create_jsonrpc_error)?; + let bytes = result.bytes().await.to_vec(); + Ok(BuildBlockResponse { block: bytes }) + }) + } + + fn add_block(&self, params: AddBlockArgs) -> BoxFuture> { + use crate::block::MiniKvvmBlock; + log::info!("add block method called"); + let vm = self.vm.clone(); + + let mut block = Block::new( + Id::empty(), + 0, + params.bytes, + DateTime::::from_utc(NaiveDateTime::from_timestamp(0, 0), Utc), + Status::Processing, + ); + + Box::pin(async move { + let block_id = block.initialize(vm.clone()).map_err(create_jsonrpc_error)?; + let mut inner = vm.inner.write().await; + let accepted_block_id = inner + .state + .accept_block(block, vm.clone()) + .await + .map_err(create_jsonrpc_error)?; + inner.verified_blocks.remove(&accepted_block_id); + Ok(AddBlockResponse { id: block_id }) + }) + } + + fn get_block(&self, params: GetBlockArgs) -> BoxFuture> { + log::info!("get block method called"); + let vm = self.vm.clone(); + + Box::pin(async move { + let id = Id::from_str(params.id.as_str()).map_err(create_jsonrpc_error)?; + let result = vm.get_block(id).await.map_err(create_jsonrpc_error)?; + let bytes = result.bytes().await.to_vec(); + Ok(GetBlockResponse { block: bytes }) + }) + } + + fn last_accepted(&self) -> BoxFuture> { + log::info!("last accepted method called"); + let vm = self.vm.clone(); + + Box::pin(async move { + let result = vm.last_accepted().await.map_err(create_jsonrpc_error)?; + Ok(LastAcceptedResponse { id: result }) + }) + } + + fn parse_block(&self, params: ParseBlockArgs) -> BoxFuture> { + log::info!("parse block method called"); + let vm = self.vm.clone(); + + Box::pin(async move { + let result = vm + .parse_block(params.bytes.as_ref()) + .await + .map_err(create_jsonrpc_error)?; + let bytes = result.bytes().await.to_vec(); + Ok(ParseBlockResponse { block: bytes }) + }) + } +} diff --git a/mini-kvvm/src/state.rs b/mini-kvvm/src/state.rs new file mode 100644 index 00000000..1e933622 --- /dev/null +++ b/mini-kvvm/src/state.rs @@ -0,0 +1,185 @@ +use std::io::{Error, ErrorKind, Result}; + +use avalanche_types::{ + choices::status::Status, + ids::Id, + rpcchainvm::{self, database}, +}; +pub use bytes::*; + +use crate::block::{Block, MiniKvvmBlock}; +use crate::kvvm::ChainVm; + +const LAST_ACCEPTED_BLOCK_ID_KEY: &[u8] = b"last_accepted"; +const STATE_INITIALIZED_KEY: &[u8] = b"state_initialized"; +const STATE_INITIALIZED_VALUE: &[u8] = b"state_has_infact_been_initialized"; +const SINGLETON_STATE_PREFIX: &[u8] = b"singleton"; + +pub const BLOCK_DATA_LEN: usize = 32; +pub const BLOCK_STATE_PREFIX: &[u8] = b"blockStatePrefix"; + +pub struct State { + client: Option, + last_accepted_block_id_key: Vec, + state_initialized_key: Vec, +} + +impl State { + pub fn new(client: Option) -> Self { + Self { + client, + last_accepted_block_id_key: Self::prefix( + BLOCK_STATE_PREFIX, + LAST_ACCEPTED_BLOCK_ID_KEY, + ), + state_initialized_key: Self::prefix(SINGLETON_STATE_PREFIX, STATE_INITIALIZED_KEY), + } + } + pub fn prefix(prefix: &[u8], data: &[u8]) -> Vec { + let mut result = Vec::with_capacity(prefix.len() + data.len()); + result.extend_from_slice(prefix); + result.extend_from_slice(data); + + result + } + + pub async fn get(&self, key: Vec) -> Result>> { + let client = self.client.clone().ok_or(Error::new( + ErrorKind::Other, + "no database associated with this client", + ))?; + + let client = client.inner.write().await; + let resp = client.get(key.as_slice()).await; + + log::info!("state get response: {:?}", resp); + + let err = match &resp { + Ok(_) => database::DatabaseError::None as u32, + Err(e) => rpcchainvm::database::rpcdb::error_to_error_code(&e.to_string()).unwrap(), + }; + let err = num_traits::FromPrimitive::from_u32(err); + + match err { + Some(database::DatabaseError::Closed) => Err(Error::new( + ErrorKind::Other, + format!("failed to get: {:?}", err), + )), + Some(database::DatabaseError::NotFound) => Ok(None), + _ => { + let resp = resp.map_err(|e| Error::new(ErrorKind::Other, format!("{:?}", e)))?; //this should never panic, but this handles errors in case it should happen + Ok(Some(resp)) + } + } + } + + pub async fn put(&mut self, key: Vec, value: Vec) -> Result<()> { + let client = self.client.clone().ok_or(Error::new( + ErrorKind::Other, + "no database associated with this client", + ))?; + let mut client = client.inner.write().await; + let resp = client.put(key.as_slice(), value.as_slice()).await; + + let err = match &resp { + Ok(_) => database::DatabaseError::None as u32, + Err(e) => rpcchainvm::database::rpcdb::error_to_error_code(&e.to_string()).unwrap(), + }; + + let err = num_traits::FromPrimitive::from_u32(err); + + match err { + Some(database::DatabaseError::None) => Ok(()), + Some(database::DatabaseError::Closed) => Err(Error::new( + ErrorKind::Other, + format!("failed to put: {:?}", err), + )), + Some(database::DatabaseError::NotFound) => Err(Error::new( + ErrorKind::NotFound, + format!("failed to put: {:?}", err), + )), + _ => { + resp.map_err(|e| Error::new(ErrorKind::Other, format!("{:?}", e)))?; //this should never panic, but this handles errors in case it should happen + Ok(()) + } + } + } + + // Dupe of kvvm this should be removed or moved to block? + pub async fn get_block(&self, id: Id) -> Result> { + log::debug!("state get_block called"); + let key = Self::prefix(BLOCK_STATE_PREFIX, id.as_ref()); + log::debug!("state get_block key {:?}", key); + let value = match self.get(key).await { + Ok(Some(v)) => v, + _ => return Ok(None), + }; + + let block = serde_json::from_slice(&value).map_err(|e| { + Error::new( + ErrorKind::Other, + format!("failed deserialize block: {:?}", e), + ) + })?; + //log::info!("state get_block value: {:?}", block); //TODO implement debug block trait + + Ok(block) + } + + pub async fn put_block(&mut self, mut block: impl MiniKvvmBlock, vm: ChainVm) -> Result<()> { + let value = serde_json::to_vec(&block)?; + let key = Self::prefix(BLOCK_STATE_PREFIX, block.initialize(vm)?.as_ref()); + self.put(key, value).await + } + + pub async fn has_last_accepted_block(&self) -> Result { + let last = self.get_last_accepted_block_id().await?; + if last.is_some() { + return Ok(true); + } + Ok(false) + } + + pub async fn get_last_accepted_block_id(&self) -> Result> { + match self.get(self.last_accepted_block_id_key.clone()).await? { + Some(block_id_bytes) => Ok(Some(Id::from_slice(&block_id_bytes))), + None => Ok(None), + } + } + + pub async fn set_last_accepted_block_id(&mut self, id: &Id) -> Result<()> { + self.put( + self.last_accepted_block_id_key.clone(), + Vec::from(id.as_ref()), + ) + .await + } + + pub async fn is_state_initialized(&mut self) -> Result { + let state = self.get(self.state_initialized_key.clone()).await?; + Ok(match state { + Some(state_initialized_bytes) => !state_initialized_bytes.is_empty(), + None => false, + }) + } + + pub async fn set_state_initialized(&mut self) -> Result<()> { + self.put( + self.state_initialized_key.clone(), + Vec::from(STATE_INITIALIZED_VALUE), + ) + .await + } + + pub async fn accept_block(&mut self, mut block: impl MiniKvvmBlock, vm: ChainVm) -> Result { + block.set_status(Status::Accepted); + let block_id = block + .initialize(vm.clone()) + .map_err(|e| Error::new(ErrorKind::Other, format!("failed to init block: {:?}", e)))?; + log::info!("accepting block with id: {}", block_id); + self.put_block(block, vm).await?; + self.set_last_accepted_block_id(&block_id).await?; + + Ok(block_id) + } +} diff --git a/mini-kvvm/src/static_service.rs b/mini-kvvm/src/static_service.rs new file mode 100644 index 00000000..452af58e --- /dev/null +++ b/mini-kvvm/src/static_service.rs @@ -0,0 +1,44 @@ +use crate::kvvm::ChainVm; + +use avalanche_types::rpcchainvm::common::vm::Vm; +use jsonrpc_core::{BoxFuture, Error as JsonRPCError, ErrorCode as JRPCErrorCode, Result}; +use jsonrpc_derive::rpc; +use serde::Deserialize; + +pub const STATICSERVICE_PUBLICENDPOINT: &str = "/static-kvvm-rs"; //used for this service's endpoint + +#[derive(Deserialize)] +pub struct SetStateArgs { + pub id: u32, +} + +#[rpc(server)] +pub trait StaticService { + #[rpc(name = "set_state")] + fn set_state(&self, params: SetStateArgs) -> BoxFuture>; +} +/// Implementation of handlers +pub struct StaticServiceImpl { + pub vm: ChainVm, +} + +// TODO: Edit to pass error messages through jsonrpc error +fn create_jsonrpc_error(_: std::io::Error) -> JsonRPCError { + JsonRPCError::new(JRPCErrorCode::InternalError) +} + +impl StaticService for StaticServiceImpl { + fn set_state(&self, params: SetStateArgs) -> BoxFuture> { + log::info!("set state method called"); + let vm = self.vm.clone(); + + Box::pin(async move { + let state = avalanche_types::rpcchainvm::state::State::try_from(params.id) + .map_err(|_| JsonRPCError::new(JRPCErrorCode::InternalError))?; + + vm.set_state(state).await.map_err(create_jsonrpc_error)?; + + Ok(()) + }) + } +} diff --git a/scripts/mkvvm-test.sh b/scripts/mkvvm-test.sh new file mode 100755 index 00000000..fe73bdd7 --- /dev/null +++ b/scripts/mkvvm-test.sh @@ -0,0 +1,31 @@ +PORT=:8080 +GRPC_PORT=:8081 + +#hash of minikvvm +MINI_KVVM_HASH=qBnAKUQ2mxiB1JdqsPPU7Ufuj1XmPLpnPTRvZEpkYZBmK6UjE + +kill -9 $(lsof -t -i$PORT) + +# build new version of mini-kvvm +rm ${AVALANCHEGO_PLUGIN_PATH}/${MINI_KVVM_HASH} +cargo build \ +--release \ +--bin mini-kvvm +cp target/release/mini-kvvm ${AVALANCHEGO_PLUGIN_PATH}/${MINI_KVVM_HASH} + +#Start the network runner +avalanche-network-runner server \ +--log-level debug \ +--port=$PORT \ +--grpc-gateway-port=$GRPC_PORT & +NETWORK_RUNNER_PID=${!} +sleep 5 + +#Make a new instance of mini-kvvm +avalanche-network-runner control start \ +--log-level all \ +--endpoint="0.0.0.0:8080" \ +--number-of-nodes=5 \ +--avalanchego-path ${AVALANCHEGO_EXEC_PATH} \ +--plugin-dir ${AVALANCHEGO_PLUGIN_PATH} \ +--blockchain-specs '[{"vm_name":"minikvvm","genesis":"/tmp/mini-kvvm.genesis.json"}]' \ diff --git a/scripts/tests.e2e.sh b/scripts/tests.e2e.sh index a456aa81..a1f273d3 100755 --- a/scripts/tests.e2e.sh +++ b/scripts/tests.e2e.sh @@ -7,50 +7,11 @@ if ! [[ "$0" =~ scripts/tests.e2e.sh ]]; then exit 255 fi -AVALANCHEGO_VERSION=$1 -if [[ -z "${AVALANCHEGO_VERSION}" ]]; then - echo "Missing avalanchego version argument!" - echo "Usage: ${0} [AVALANCHEGO_VERSION]" >> /dev/stderr - exit 255 -fi - -echo "Running with:" -echo AVALANCHEGO_VERSION: ${AVALANCHEGO_VERSION} - -############################ -# download avalanchego -# https://github.com/ava-labs/avalanchego/releases -GOARCH=$(go env GOARCH) -GOOS=$(go env GOOS) -DOWNLOAD_URL=https://github.com/ava-labs/avalanchego/releases/download/v${AVALANCHEGO_VERSION}/avalanchego-linux-${GOARCH}-v${AVALANCHEGO_VERSION}.tar.gz -DOWNLOAD_PATH=/tmp/avalanchego.tar.gz -if [[ ${GOOS} == "darwin" ]]; then - DOWNLOAD_URL=https://github.com/ava-labs/avalanchego/releases/download/v${AVALANCHEGO_VERSION}/avalanchego-macos-v${AVALANCHEGO_VERSION}.zip - DOWNLOAD_PATH=/tmp/avalanchego.zip -fi - -rm -rf /tmp/avalanchego-v${AVALANCHEGO_VERSION} -rm -f ${DOWNLOAD_PATH} - -echo "downloading avalanchego ${AVALANCHEGO_VERSION} at ${DOWNLOAD_URL}" -curl -L ${DOWNLOAD_URL} -o ${DOWNLOAD_PATH} - -echo "extracting downloaded avalanchego" -if [[ ${GOOS} == "linux" ]]; then - tar xzvf ${DOWNLOAD_PATH} -C /tmp -elif [[ ${GOOS} == "darwin" ]]; then - unzip ${DOWNLOAD_PATH} -d /tmp/avalanchego-build - mv /tmp/avalanchego-build/build /tmp/avalanchego-v${AVALANCHEGO_VERSION} -fi -find /tmp/avalanchego-v${AVALANCHEGO_VERSION} - -AVALANCHEGO_PATH=/tmp/avalanchego-v${AVALANCHEGO_VERSION}/avalanchego -AVALANCHEGO_PLUGIN_DIR=/tmp/avalanchego-v${AVALANCHEGO_VERSION}/plugins - ################################# # download avalanche-network-runner # https://github.com/ava-labs/avalanche-network-runner # TODO: use "go install -v github.com/ava-labs/avalanche-network-runner/cmd/avalanche-network-runner@v${NETWORK_RUNNER_VERSION}" +GOOS=$(go env GOOS) # ensures that the compatible network runner version is downloaded for this machine NETWORK_RUNNER_VERSION=1.1.0 DOWNLOAD_PATH=/tmp/avalanche-network-runner.tar.gz DOWNLOAD_URL=https://github.com/ava-labs/avalanche-network-runner/releases/download/v${NETWORK_RUNNER_VERSION}/avalanche-network-runner_${NETWORK_RUNNER_VERSION}_linux_amd64.tar.gz @@ -77,12 +38,12 @@ server \ --port=":12342" \ --grpc-gateway-port=":12343" & NETWORK_RUNNER_PID=${!} +sleep 5 # sleep to ensure that network runner initializes before e2e tests begin ################################# # do not run in parallel, to run in sequence echo "running e2e tests" NETWORK_RUNNER_GRPC_ENDPOINT=http://127.0.0.1:12342 \ -NETWORK_RUNNER_AVALANCHEGO_PATH=${AVALANCHEGO_PATH} \ RUST_LOG=debug \ cargo test --all-features --package e2e -- --show-output --nocapture @@ -91,4 +52,4 @@ cargo test --all-features --package e2e -- --show-output --nocapture # just in case tests are aborted, manually terminate them again echo "network-runner RPC server was running on NETWORK_RUNNER_PID ${NETWORK_RUNNER_PID} as test mode; terminating the process..." pkill -P ${NETWORK_RUNNER_PID} || true -kill -2 ${NETWORK_RUNNER_PID} +kill -2 ${NETWORK_RUNNER_PID} \ No newline at end of file diff --git a/tests/e2e/Cargo.toml b/tests/e2e/Cargo.toml index e9f4725f..1fa1c94a 100644 --- a/tests/e2e/Cargo.toml +++ b/tests/e2e/Cargo.toml @@ -2,7 +2,7 @@ name = "e2e" version = "0.0.0" edition = "2021" -rust-version = "1.60" +rust-version = "1.61" publish = false description = "E2E tests" license = "BSD-3-Clause" @@ -11,7 +11,8 @@ homepage = "https://avax.network" [dependencies] [dev-dependencies] -avalanche-network-runner-sdk = { git = "https://github.com/ava-labs/avalanche-network-runner-sdk-rs", rev = "696d1b8" } +avalanche-installer = "0.0.4" +avalanche-network-runner-sdk = { version = "0.0.1" } env_logger = "0.9.0" log = "0.4.17" serde_json = "1.0.81" diff --git a/tests/e2e/src/lib.rs b/tests/e2e/src/lib.rs index 6bff6ab1..b313e6dc 100644 --- a/tests/e2e/src/lib.rs +++ b/tests/e2e/src/lib.rs @@ -1,6 +1,8 @@ #[cfg(test)] mod tests; +use std::collections::HashMap; + pub fn get_network_runner_grpc_endpoint() -> (String, bool) { match std::env::var("NETWORK_RUNNER_GRPC_ENDPOINT") { Ok(s) => (s, true), @@ -14,3 +16,34 @@ pub fn get_network_runner_avalanchego_path() -> (String, bool) { _ => (String::new(), false), } } + +pub fn get_network_runner_whitelisted_subnets() -> (String, bool) { + match std::env::var("NETWORK_RUNNER_WHITELISTED_SUBNETS") { + Ok(s) => (s, true), + _ => (String::new(), false), + } +} + +pub fn get_network_runner_plugin_dir_path() -> (String, bool) { + match std::env::var("NETWORK_RUNNER_PLUGIN_DIR_PATH") { + Ok(s) => (s, true), + _ => (String::new(), false), + } +} + +pub fn get_custom_vms() -> (HashMap, bool) { + match std::env::var("NETWORK_RUNNER_CUSTOM_VM") { + Ok(s) => (hash_from_str(&s), true), + _ => (HashMap::new(), false), + } +} + +/// hash_from_str takes a comma delimited key value pair and converts it to a hashmap. +/// example: key=value,key=value. +fn hash_from_str(input: &str) -> HashMap { + input + .split(',') + .map(|s| s.split_at(s.find("=").expect("invalid format expect key=value"))) + .map(|(key, val)| (key.to_string(), val[1..].to_string())) + .collect() +} diff --git a/tests/e2e/src/tests/mod.rs b/tests/e2e/src/tests/mod.rs index 9047f3a3..f2f5d7ea 100644 --- a/tests/e2e/src/tests/mod.rs +++ b/tests/e2e/src/tests/mod.rs @@ -1,4 +1,5 @@ use std::{ + process::Command, thread, time::{Duration, Instant}, }; @@ -7,6 +8,8 @@ use log::{info, warn}; use avalanche_network_runner_sdk::{Client, GlobalConfig, StartRequest}; +const RELEASE: &str = "v1.7.16"; + #[tokio::test] async fn e2e() { let _ = env_logger::builder() @@ -23,8 +26,16 @@ async fn e2e() { let resp = cli.ping().await.expect("failed ping"); info!("network-runner is running (ping response {:?})", resp); - let (exec_path, is_set) = get_network_runner_avalanchego_path(); - assert!(is_set); + // Download avalanchego + let (exec_path, _plugin_path) = + avalanche_installer::avalanchego::download(None, None, Some(String::from(RELEASE))) + .await + .expect("failed to download avalanchego"); + + info!( + "running avalanchego version {}", + get_avalanchego_version(&exec_path) + ); let global_config = GlobalConfig { log_level: String::from("info"), @@ -108,15 +119,16 @@ async fn e2e() { info!("successfully stopped network"); } -fn get_network_runner_grpc_endpoint() -> (String, bool) { - match std::env::var("NETWORK_RUNNER_GRPC_ENDPOINT") { - Ok(s) => (s, true), - _ => (String::new(), false), - } +fn get_avalanchego_version(exec_path: &String) -> String { + let output = Command::new(exec_path) + .arg("--version") + .output() + .expect("failed to get avalanchego version"); + format!("{}", String::from_utf8(output.stdout).unwrap()) } -fn get_network_runner_avalanchego_path() -> (String, bool) { - match std::env::var("NETWORK_RUNNER_AVALANCHEGO_PATH") { +fn get_network_runner_grpc_endpoint() -> (String, bool) { + match std::env::var("NETWORK_RUNNER_GRPC_ENDPOINT") { Ok(s) => (s, true), _ => (String::new(), false), }