Skip to content

Commit 0b9557d

Browse files
authored
Merge pull request #58 from ava-labs/mvp
vm: implement remaining core methods
2 parents 31d5959 + f1f8b2b commit 0b9557d

File tree

5 files changed

+136
-68
lines changed

5 files changed

+136
-68
lines changed

mini-kvvm/Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,20 @@ path = "src/bin/mini-kvvm/main.rs"
1515

1616
[dependencies]
1717
avalanche-proto = { version = "0.16.0" }
18-
avalanche-types = { version = "0.0.32" }
18+
avalanche-types = { version = "0.0.34" }
1919
byteorder = "1.4.3"
2020
chrono = "0.4.19"
2121
derivative = "2.2.0"
2222
dyn-clone = "1.0.9"
23-
ethereum-types = { version = "0.13.1" }
23+
ethereum-types = { version = "0.14.0" }
2424
clap = { version = "3.1.17", features = ["cargo", "derive"] }
2525
env_logger = "0.9.0"
2626
hex = "0.4.3"
2727
jsonrpc-core = "18.0.0"
2828
jsonrpc-core-client = { version = "18.0.0" }
2929
jsonrpc-derive = "18.0"
3030
log = "0.4.17"
31-
lru = "0.7.8"
31+
lru = "0.8.0"
3232
prost = "0.11.0"
3333
semver = "1.0.13"
3434
serde = { version = "1.0.144", features = ["derive"] }

mini-kvvm/src/bin/mini-kvvm/main.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use avalanche_types::rpcchainvm;
44
use clap::{crate_version, Arg, Command};
55
use log::info;
66
use mini_kvvm::{genesis, vm};
7+
use tokio::sync::broadcast::{Receiver, Sender};
78

89
pub const APP_NAME: &str = "mini-kvvm-rs";
910

@@ -42,11 +43,14 @@ async fn main() -> Result<()> {
4243
return Ok(());
4344
}
4445

46+
// Initialize broadcast stop channel used to terminate gRPC servers during shutdown.
47+
let (stop_ch_tx, stop_ch_rx): (Sender<()>, Receiver<()>) = tokio::sync::broadcast::channel(1);
48+
4549
info!("starting mini-kvvm-rs");
46-
let vm = vm::ChainVm::new();
47-
let vm_server = avalanche_types::rpcchainvm::vm::server::Server::new(vm);
50+
let vm_server =
51+
avalanche_types::rpcchainvm::vm::server::Server::new(vm::ChainVm::new(), stop_ch_tx);
4852

49-
rpcchainvm::plugin::serve(vm_server)
53+
rpcchainvm::plugin::serve(vm_server, stop_ch_rx)
5054
.await
5155
.expect("failed to start gRPC server");
5256

mini-kvvm/src/block/state.rs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::{
22
collections::HashMap,
33
io::{Error, ErrorKind, Result},
4+
num::NonZeroUsize,
45
sync::Arc,
56
};
67

@@ -78,7 +79,8 @@ impl State {
7879
db: Box<dyn rpcchainvm::database::Database + Send + Sync>,
7980
verified_blocks: Arc<RwLock<HashMap<ids::Id, Block>>>,
8081
) -> Self {
81-
let cache: LruCache<ids::Id, Block> = LruCache::new(BLOCKS_LRU_SIZE);
82+
let cache: LruCache<ids::Id, Block> =
83+
LruCache::new(NonZeroUsize::new(BLOCKS_LRU_SIZE).unwrap());
8284
return Self {
8385
inner: InnerState {
8486
db: Arc::new(RwLock::new(db)),
@@ -252,6 +254,26 @@ impl State {
252254
Ok(())
253255
}
254256

257+
/// Attempts to parse a byte array into a block. If the source is empty
258+
/// bytes will be marshalled from a default block.
259+
pub async fn parse_block(&self, mut source: Vec<u8>, status: Status) -> Result<Block> {
260+
let mut block = Block::default();
261+
262+
if source.is_empty() {
263+
source = serde_json::to_vec(&block)?;
264+
}
265+
block.bytes = source.to_vec();
266+
block.id = ids::Id::from_slice_with_sha256(&Sha3_256::digest(source));
267+
block.st = status;
268+
block.state = self.clone();
269+
270+
for tx in block.txs.iter_mut() {
271+
tx.init().await?;
272+
}
273+
274+
Ok(block.to_owned())
275+
}
276+
255277
/// Checks if the last accepted block key exists and returns true if has a value.
256278
pub async fn has_last_accepted(&self) -> Result<bool> {
257279
let db = self.inner.db.read().await;

mini-kvvm/src/vm.rs

Lines changed: 102 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use std::{
66
};
77

88
use avalanche_types::{
9-
choices::status,
9+
choices::status::{self, Status},
1010
ids,
1111
rpcchainvm::{
1212
self,
@@ -18,12 +18,16 @@ use semver::Version;
1818
use tokio::sync::{mpsc::Sender, RwLock};
1919

2020
use crate::{
21+
api,
2122
block::Block as StatefulBlock,
2223
block::{self, state::State},
2324
chain::{self, tx::Transaction},
2425
genesis::Genesis,
2526
};
2627

28+
const PUBLIC_API_ENDPOINT: &str = "/public";
29+
const VERSION: &str = env!("CARGO_PKG_VERSION");
30+
2731
pub struct ChainVmInterior {
2832
pub ctx: Option<rpcchainvm::context::Context>,
2933
pub bootstrapped: bool,
@@ -176,27 +180,20 @@ impl rpcchainvm::common::apphandler::AppHandler for ChainVm {
176180
#[tonic::async_trait]
177181
impl rpcchainvm::common::vm::Connector for ChainVm {
178182
async fn connected(&self, _id: &ids::node::Id) -> Result<()> {
179-
Err(Error::new(
180-
ErrorKind::Unsupported,
181-
"connected not implemented",
182-
))
183+
// no-op
184+
Ok(())
183185
}
184186

185187
async fn disconnected(&self, _id: &ids::node::Id) -> Result<()> {
186-
Err(Error::new(
187-
ErrorKind::Unsupported,
188-
"disconnected not implemented",
189-
))
188+
// no-op
189+
Ok(())
190190
}
191191
}
192192

193193
#[tonic::async_trait]
194194
impl rpcchainvm::health::Checkable for ChainVm {
195195
async fn health_check(&self) -> Result<Vec<u8>> {
196-
Err(Error::new(
197-
ErrorKind::Unsupported,
198-
"health check not implemented",
199-
))
196+
Ok("200".as_bytes().to_vec())
200197
}
201198
}
202199

@@ -217,6 +214,10 @@ impl rpcchainvm::common::vm::Vm for ChainVm {
217214
let mut vm = self.inner.write().await;
218215
vm.ctx = ctx;
219216

217+
let version =
218+
Version::parse(VERSION).map_err(|e| Error::new(ErrorKind::Other, e.to_string()))?;
219+
vm.version = version;
220+
220221
let current = db_manager.current().await?;
221222
self.db = current.db.clone();
222223

@@ -282,44 +283,70 @@ impl rpcchainvm::common::vm::Vm for ChainVm {
282283

283284
/// Called when the node is shutting down.
284285
async fn shutdown(&self) -> Result<()> {
285-
Err(Error::new(
286-
ErrorKind::Unsupported,
287-
"shutdown not implemented",
288-
))
286+
// grpc servers are shutdown via broadcast channel
287+
// if additional shutdown is required we can extend.
288+
Ok(())
289289
}
290290

291-
/// Communicates to Vm the next state it starts.
292-
async fn set_state(&self, _snow_state: rpcchainvm::snow::State) -> Result<()> {
293-
Err(Error::new(
294-
ErrorKind::Unsupported,
295-
"set_state not implemented",
296-
))
291+
/// Communicates to Vm the next state phase.
292+
async fn set_state(&self, snow_state: rpcchainvm::snow::State) -> Result<()> {
293+
let mut vm = self.inner.write().await;
294+
match snow_state.try_into() {
295+
// Initializing is called by chains manager when it is creating the chain.
296+
Ok(rpcchainvm::snow::State::Initializing) => {
297+
log::debug!("set_state: initializing");
298+
vm.bootstrapped = false;
299+
Ok(())
300+
}
301+
Ok(rpcchainvm::snow::State::StateSyncing) => {
302+
log::debug!("set_state: state syncing");
303+
Err(Error::new(ErrorKind::Other, "state sync is not supported"))
304+
}
305+
// Bootstrapping is called by the bootstrapper to signal bootstrapping has started.
306+
Ok(rpcchainvm::snow::State::Bootstrapping) => {
307+
log::debug!("set_state: bootstrapping");
308+
vm.bootstrapped = false;
309+
Ok(())
310+
}
311+
// NormalOp os called when consensus has started signalling bootstrap phase is complete.
312+
Ok(rpcchainvm::snow::State::NormalOp) => {
313+
log::debug!("set_state: normal op");
314+
vm.bootstrapped = true;
315+
Ok(())
316+
}
317+
Err(_) => Err(Error::new(ErrorKind::Other, "unknown state")),
318+
}
297319
}
298320

299321
/// Returns the version of the VM this node is running.
300322
async fn version(&self) -> Result<String> {
301-
Err(Error::new(
302-
ErrorKind::Unsupported,
303-
"version not implemented",
304-
))
323+
Ok(String::from(VERSION))
305324
}
306325

307-
/// Creates the HTTP handlers for custom Vm network calls.
326+
/// Creates the HTTP handlers for custom Vm network calls
327+
/// for "ext/vm/[vmId]"
308328
async fn create_static_handlers(
309329
&self,
310330
) -> std::io::Result<
311-
std::collections::HashMap<
312-
String,
313-
avalanche_types::rpcchainvm::common::http_handler::HttpHandler,
314-
>,
331+
std::collections::HashMap<String, rpcchainvm::common::http_handler::HttpHandler>,
315332
> {
316-
Err(Error::new(
317-
ErrorKind::Unsupported,
318-
"create_static_handlers not implemented",
319-
))
333+
log::debug!("create_static_handlers called");
334+
335+
// Initialize the jsonrpc public service and handler
336+
let service = api::service::Service::new(self.clone());
337+
let mut handler = jsonrpc_core::IoHandler::new();
338+
handler.extend_with(api::Service::to_delegate(service));
339+
340+
let http_handler = rpcchainvm::common::http_handler::HttpHandler::new_from_u8(0, handler)
341+
.map_err(|_| Error::from(ErrorKind::InvalidData))?;
342+
343+
let mut handlers = HashMap::new();
344+
handlers.insert(String::from(PUBLIC_API_ENDPOINT), http_handler);
345+
Ok(handlers)
320346
}
321347

322-
/// Creates the HTTP handlers for custom chain network calls.
348+
/// Creates the HTTP handlers for custom chain network calls
349+
/// for "ext/vm/[chainId]"
323350
async fn create_handlers(
324351
&self,
325352
) -> std::io::Result<
@@ -328,10 +355,7 @@ impl rpcchainvm::common::vm::Vm for ChainVm {
328355
avalanche_types::rpcchainvm::common::http_handler::HttpHandler,
329356
>,
330357
> {
331-
Err(Error::new(
332-
ErrorKind::Unsupported,
333-
"create_handlers not implemented",
334-
))
358+
Ok(HashMap::new())
335359
}
336360
}
337361

@@ -340,12 +364,16 @@ impl rpcchainvm::snowman::block::Getter for ChainVm {
340364
/// Attempt to load a block.
341365
async fn get_block(
342366
&self,
343-
_id: ids::Id,
367+
id: ids::Id,
344368
) -> Result<Box<dyn rpcchainvm::concensus::snowman::Block + Send + Sync>> {
345-
Err(Error::new(
346-
ErrorKind::Unsupported,
347-
"get_block not implemented",
348-
))
369+
let mut vm = self.inner.write().await;
370+
371+
let block =
372+
vm.state.get_block(id).await.map_err(|e| {
373+
Error::new(ErrorKind::Other, format!("failed to get block: {:?}", e))
374+
})?;
375+
376+
Ok(Box::new(block))
349377
}
350378
}
351379

@@ -354,12 +382,25 @@ impl rpcchainvm::snowman::block::Parser for ChainVm {
354382
/// Attempt to create a block from a stream of bytes.
355383
async fn parse_block(
356384
&self,
357-
_bytes: &[u8],
385+
bytes: &[u8],
358386
) -> Result<Box<dyn rpcchainvm::concensus::snowman::Block + Send + Sync>> {
359-
Err(Error::new(
360-
ErrorKind::Unsupported,
361-
"parse_block not implemented",
362-
))
387+
let mut vm = self.inner.write().await;
388+
389+
let new_block = vm
390+
.state
391+
.parse_block(bytes.to_vec(), Status::Processing)
392+
.await
393+
.map_err(|e| Error::new(ErrorKind::Other, format!("failed to parse block: {:?}", e)))?;
394+
395+
log::debug!("parsed block id: {}", new_block.id);
396+
397+
match vm.state.get_block(new_block.id).await {
398+
Ok(old_block) => {
399+
log::debug!("returning previously parsed block id: {}", old_block.id);
400+
return Ok(Box::new(old_block));
401+
}
402+
Err(_) => return Ok(Box::new(new_block)),
403+
};
363404
}
364405
}
365406

@@ -429,19 +470,20 @@ impl rpcchainvm::snowman::block::ChainVm for ChainVm {
429470
}
430471

431472
/// Notify the Vm of the currently preferred block.
432-
async fn set_preference(&self, _id: ids::Id) -> Result<()> {
433-
Err(Error::new(
434-
ErrorKind::Unsupported,
435-
"set_preference not implemented",
436-
))
473+
async fn set_preference(&self, id: ids::Id) -> Result<()> {
474+
let mut vm = self.inner.write().await;
475+
vm.preferred_block_id = Some(id);
476+
477+
Ok(())
437478
}
438479

439480
// Returns the Id of the last accepted block.
440481
async fn last_accepted(&self) -> Result<ids::Id> {
441-
Err(Error::new(
442-
ErrorKind::Unsupported,
443-
"last_accepted not implemented",
444-
))
482+
let vm = self.inner.write().await;
483+
let state = vm.state.clone();
484+
let last_accepted_id = state.get_last_accepted().await?;
485+
486+
Ok(last_accepted_id)
445487
}
446488

447489
/// Attempts to issue a transaction into consensus.

scripts/tests.unused.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,6 @@ fi
1010
# https://github.com/est31/cargo-udeps
1111
cargo install cargo-udeps --locked
1212

13-
cargo +nightly udeps || true
13+
cargo +nightly udeps
1414

1515
echo "ALL SUCCESS!"

0 commit comments

Comments
 (0)