diff --git a/Cargo.lock b/Cargo.lock index f5f167dfded..198f0ee942c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4271,6 +4271,8 @@ dependencies = [ "fuel-core-storage", "fuel-core-types 0.47.1", "futures", + "itertools 0.12.1", + "mockall", "num_enum", "proptest", "prost 0.14.1", diff --git a/bin/fuel-core/src/cli/run/rpc.rs b/bin/fuel-core/src/cli/run/rpc.rs index 88f1fc64849..fc3769d5000 100644 --- a/bin/fuel-core/src/cli/run/rpc.rs +++ b/bin/fuel-core/src/cli/run/rpc.rs @@ -44,8 +44,8 @@ pub enum StorageMethod { } impl RpcArgs { - pub fn into_config(self) -> fuel_core_block_aggregator_api::integration::Config { - fuel_core_block_aggregator_api::integration::Config { + pub fn into_config(self) -> fuel_core_block_aggregator_api::service::Config { + fuel_core_block_aggregator_api::service::Config { addr: net::SocketAddr::new(self.rpc_ip, self.rpc_port), sync_from: Some(BlockHeight::from(0)), storage_method: self.storage_method.map(Into::into).unwrap_or_default(), @@ -54,17 +54,17 @@ impl RpcArgs { } } -impl From for fuel_core_block_aggregator_api::integration::StorageMethod { +impl From for fuel_core_block_aggregator_api::service::StorageMethod { fn from(storage_method: StorageMethod) -> Self { match storage_method { StorageMethod::Local => { - fuel_core_block_aggregator_api::integration::StorageMethod::Local + fuel_core_block_aggregator_api::service::StorageMethod::Local } StorageMethod::S3 { bucket, endpoint_url, requester_pays, - } => fuel_core_block_aggregator_api::integration::StorageMethod::S3 { + } => fuel_core_block_aggregator_api::service::StorageMethod::S3 { bucket, endpoint_url, requester_pays, @@ -73,13 +73,11 @@ impl From for fuel_core_block_aggregator_api::integration::Storag bucket, endpoint_url, requester_pays, - } => { - fuel_core_block_aggregator_api::integration::StorageMethod::S3NoPublish { - bucket, - endpoint_url, - requester_pays, - } - } + } => fuel_core_block_aggregator_api::service::StorageMethod::S3NoPublish { + bucket, + endpoint_url, + requester_pays, + }, } } } diff --git a/crates/fuel-core/src/combined_database.rs b/crates/fuel-core/src/combined_database.rs index 464e4d62707..7374eb833f5 100644 --- a/crates/fuel-core/src/combined_database.rs +++ b/crates/fuel-core/src/combined_database.rs @@ -3,8 +3,6 @@ use crate::state::{ historical_rocksdb::StateRewindPolicy, rocks_db::DatabaseConfig, }; -#[cfg(feature = "rpc")] -use anyhow::anyhow; use crate::{ database::{ @@ -24,8 +22,6 @@ use crate::{ #[cfg(feature = "rpc")] use crate::database::database_description::block_aggregator::BlockAggregatorDatabase; -#[cfg(feature = "rpc")] -use fuel_core_block_aggregator_api::db::table::LatestBlock; #[cfg(feature = "test-helpers")] use fuel_core_chain_config::{ StateConfig, @@ -43,11 +39,6 @@ use fuel_core_storage::tables::{ ContractsState, Messages, }; -#[cfg(feature = "rpc")] -use fuel_core_storage::{ - Error as StorageError, - StorageAsRef, -}; use fuel_core_types::{ blockchain::primitives::DaBlockHeight, fuel_types::BlockHeight, @@ -488,31 +479,26 @@ impl CombinedDatabase { let gas_price_chain_height = self.gas_price().latest_height_from_metadata()?; - let gas_price_rolled_back = - is_equal_or_none(gas_price_chain_height, target_block_height); + let gas_price_rolled_back = is_equal_or_less_than_or_none( + gas_price_chain_height, + target_block_height, + ); let compression_db_height = self.compression().latest_height_from_metadata()?; let compression_db_rolled_back = - is_equal_or_none(compression_db_height, target_block_height); + is_equal_or_less_than_or_none(compression_db_height, target_block_height); #[cfg(feature = "rpc")] { let block_aggregation_storage_height = self .block_aggregation_storage() - .storage_as_ref::() - .get(&()) - .map_err(|e: StorageError| anyhow!(e))? - .map(|b| b.height()); + .latest_height_from_metadata()?; let block_aggregation_storage_rolled_back = is_equal_or_less_than_or_none( block_aggregation_storage_height, target_block_height, ); - if !block_aggregation_storage_rolled_back { - self.block_aggregation_storage_mut() - .rollback_to(target_block_height)?; - } if on_chain_height == target_block_height && off_chain_height == target_block_height && gas_price_rolled_back @@ -585,6 +571,20 @@ impl CombinedDatabase { { self.compression().rollback_last_block()?; } + + #[cfg(feature = "rpc")] + { + let block_aggregation_storage_height = self + .block_aggregation_storage() + .latest_height_from_metadata()?; + + if let Some(block_aggregation_storage_height) = + block_aggregation_storage_height + && block_aggregation_storage_height > target_block_height + { + self.block_aggregation_storage().rollback_last_block()?; + } + } } if shutdown_listener.is_cancelled() { @@ -722,7 +722,6 @@ fn is_equal_or_none(maybe_left: Option, right: T) -> bool { maybe_left.map(|left| left == right).unwrap_or(true) } -#[cfg(feature = "rpc")] fn is_equal_or_less_than_or_none(maybe_left: Option, right: T) -> bool { maybe_left.map(|left| left <= right).unwrap_or(true) } diff --git a/crates/fuel-core/src/database.rs b/crates/fuel-core/src/database.rs index 6149787a893..7b261f8f55d 100644 --- a/crates/fuel-core/src/database.rs +++ b/crates/fuel-core/src/database.rs @@ -92,19 +92,6 @@ use crate::{ state::HeightType, }; -#[cfg(feature = "rpc")] -use anyhow::anyhow; -#[cfg(feature = "rpc")] -use fuel_core_block_aggregator_api::db::table::{ - Blocks, - LatestBlock, - Mode, -}; -#[cfg(feature = "rpc")] -use fuel_core_storage::{ - StorageAsRef, - transactional::WriteTransaction, -}; #[cfg(feature = "rocksdb")] use std::path::Path; @@ -462,38 +449,12 @@ impl Modifiable for Database { fn commit_changes(&mut self, changes: Changes) -> StorageResult<()> { // Does not need to be monotonically increasing because // storage values are modified in parallel from different heights - commit_changes_with_height_update(self, changes, |_iter| Ok(Vec::new())) - } -} - -#[cfg(feature = "rpc")] -impl Database { - pub fn rollback_to(&mut self, block_height: BlockHeight) -> StorageResult<()> { - let mut tx = self.write_transaction(); - let mode = tx - .storage_as_ref::() - .get(&())? - .map(|m| m.into_owned()); - let new = match mode { - None => None, - Some(Mode::Local(_)) => Some(Mode::new_local(block_height)), - Some(Mode::S3(_)) => Some(Mode::new_s3(block_height)), - }; - if let Some(Mode::Local(_)) = mode { - let remove_heights = tx - .iter_all_keys::(Some(IterDirection::Reverse)) - .flatten() - .take_while(|height| height > &block_height) - .collect::>(); - for height in remove_heights { - tx.storage_as_mut::().remove(&height)?; - } - } - if let Some(new) = new { - tx.storage_as_mut::().insert(&(), &new)?; - tx.commit().map_err(|e: StorageError| anyhow!(e))?; - } - Ok(()) + commit_changes_with_height_update(self, changes, |iter| { + iter.iter_all_keys::(Some( + IterDirection::Reverse, + )) + .try_collect() + }) } } diff --git a/crates/fuel-core/src/p2p_test_helpers.rs b/crates/fuel-core/src/p2p_test_helpers.rs index 71daef2fa21..a1ff20a9b0f 100644 --- a/crates/fuel-core/src/p2p_test_helpers.rs +++ b/crates/fuel-core/src/p2p_test_helpers.rs @@ -93,7 +93,7 @@ pub struct CustomizeConfig { max_discovery_peers_connected: Option, subscribe_to_transactions: Option, #[cfg(feature = "rpc")] - rpc_config: Option, + rpc_config: Option, } impl CustomizeConfig { diff --git a/crates/fuel-core/src/service/adapters/rpc.rs b/crates/fuel-core/src/service/adapters/rpc.rs index 13395b56c46..7e743d4b240 100644 --- a/crates/fuel-core/src/service/adapters/rpc.rs +++ b/crates/fuel-core/src/service/adapters/rpc.rs @@ -6,7 +6,7 @@ use crate::{ fuel_core_graphql_api::storage::transactions::TransactionStatuses, }; use fuel_core_block_aggregator_api::{ - blocks::importer_and_db_source::sync_service::TxReceipts, + blocks::old_block_source::TxReceipts, result::{ Error as RPCError, Result as RPCResult, @@ -32,7 +32,7 @@ impl ReceiptSource { } impl TxReceipts for ReceiptSource { - async fn get_receipts(&self, tx_id: &TxId) -> RPCResult> { + fn get_receipts(&self, tx_id: &TxId) -> RPCResult> { let tx_status = StorageInspect::::get(&self.off_chain, tx_id) .map_err(RPCError::receipt_error)?; diff --git a/crates/fuel-core/src/service/adapters/rpc/tests.rs b/crates/fuel-core/src/service/adapters/rpc/tests.rs index d3065e89eab..aa2d8c8e703 100644 --- a/crates/fuel-core/src/service/adapters/rpc/tests.rs +++ b/crates/fuel-core/src/service/adapters/rpc/tests.rs @@ -39,7 +39,7 @@ async fn get_receipt__gets_the_receipt_for_expected_tx() { let receipt_source = ReceiptSource::new(db); // when - let actual = receipt_source.get_receipts(&tx_id).await.unwrap(); + let actual = receipt_source.get_receipts(&tx_id).unwrap(); // then assert_eq!(actual, expected); diff --git a/crates/fuel-core/src/service/config.rs b/crates/fuel-core/src/service/config.rs index 9464d83ce0b..0582e56913c 100644 --- a/crates/fuel-core/src/service/config.rs +++ b/crates/fuel-core/src/service/config.rs @@ -44,7 +44,7 @@ use fuel_core_p2p::config::{ }; #[cfg(feature = "rpc")] -use fuel_core_block_aggregator_api::integration::StorageMethod; +use fuel_core_block_aggregator_api::service::StorageMethod; #[cfg(feature = "test-helpers")] use fuel_core_chain_config::{ ChainConfig, @@ -87,7 +87,7 @@ pub struct Config { pub block_producer: fuel_core_producer::Config, pub gas_price_config: GasPriceConfig, #[cfg(feature = "rpc")] - pub rpc_config: Option, + pub rpc_config: Option, pub da_compression: DaCompressionMode, pub block_importer: fuel_core_importer::Config, #[cfg(feature = "relayer")] @@ -130,7 +130,7 @@ impl Config { #[cfg(feature = "rpc")] pub fn local_node_with_rpc() -> Self { let mut config = Self::local_node_with_state_config(StateConfig::local_testnet()); - let rpc_config = fuel_core_block_aggregator_api::integration::Config { + let rpc_config = fuel_core_block_aggregator_api::service::Config { addr: free_local_addr(), sync_from: Some(BlockHeight::new(0)), storage_method: StorageMethod::Local, @@ -144,7 +144,7 @@ impl Config { #[cfg(feature = "rpc")] pub fn local_node_with_rpc_and_storage_method(storage_method: StorageMethod) -> Self { let mut config = Self::local_node_with_state_config(StateConfig::local_testnet()); - let rpc_config = fuel_core_block_aggregator_api::integration::Config { + let rpc_config = fuel_core_block_aggregator_api::service::Config { addr: free_local_addr(), sync_from: Some(BlockHeight::new(0)), storage_method, diff --git a/crates/fuel-core/src/service/sub_services.rs b/crates/fuel-core/src/service/sub_services.rs index e3a2df0aff4..84dcc3e9e31 100644 --- a/crates/fuel-core/src/service/sub_services.rs +++ b/crates/fuel-core/src/service/sub_services.rs @@ -93,12 +93,11 @@ mod rpc { service::adapters::rpc::ReceiptSource, }; pub use fuel_core_block_aggregator_api::{ - api::protobuf_adapter::ProtobufAPI, - blocks::importer_and_db_source::{ - ImporterAndDbSource, - serializer_adapter::SerializerAdapter, + blocks::old_block_source::{ + OldBlocksSource, + convertor_adapter::ConvertorAdapter, }, - integration::UninitializedTask, + service::UninitializedTask, }; pub use fuel_core_services::ServiceRunner; pub use fuel_core_types::fuel_types::BlockHeight; @@ -570,24 +569,24 @@ pub fn init_sub_services( #[allow(clippy::type_complexity)] #[cfg(feature = "rpc")] fn init_rpc_server( - config: &fuel_core_block_aggregator_api::integration::Config, + config: &fuel_core_block_aggregator_api::service::Config, database: &CombinedDatabase, importer_adapter: &BlockImporterAdapter, genesis_height: BlockHeight, ) -> anyhow::Result< ServiceRunner< UninitializedTask< - ProtobufAPI, - ImporterAndDbSource, ReceiptSource>, + OldBlocksSource, ReceiptSource>, + Database, Database, >, >, > { let receipts = ReceiptSource::new(database.off_chain().clone()); - let serializer = SerializerAdapter; + let serializer = ConvertorAdapter; let onchain_db = database.on_chain().clone(); let importer = importer_adapter.events_shared_result(); - fuel_core_block_aggregator_api::integration::new_service( + fuel_core_block_aggregator_api::service::new_service( database.block_aggregation_storage().clone(), serializer, onchain_db, diff --git a/crates/services/block_aggregator_api/Cargo.toml b/crates/services/block_aggregator_api/Cargo.toml index 294c41e218f..ab0fb183e49 100644 --- a/crates/services/block_aggregator_api/Cargo.toml +++ b/crates/services/block_aggregator_api/Cargo.toml @@ -24,6 +24,7 @@ fuel-core-services = { workspace = true } fuel-core-storage = { workspace = true, features = ["std"] } fuel-core-types = { workspace = true, features = ["std"] } futures = { workspace = true } +itertools = { workspace = true } num_enum = { workspace = true } prost = { workspace = true, features = ["derive"] } rand = { workspace = true } @@ -31,8 +32,8 @@ serde = { workspace = true, features = ["derive"] } strum = { workspace = true } strum_macros = { workspace = true } thiserror = { workspace = true } -tokio = { workspace = true } -tokio-stream = { workspace = true } +tokio = { workspace = true, features = ["sync"] } +tokio-stream = { workspace = true, features = ["sync"] } tonic = { workspace = true } tracing = { workspace = true } @@ -42,5 +43,6 @@ aws-smithy-mocks = { workspace = true } fuel-core-services = { workspace = true, features = ["test-helpers"] } fuel-core-storage = { workspace = true, features = ["test-helpers"] } fuel-core-types = { workspace = true, features = ["std", "test-helpers"] } +mockall = { workspace = true } proptest = { workspace = true } tokio-stream = { workspace = true } diff --git a/crates/services/block_aggregator_api/src/api.rs b/crates/services/block_aggregator_api/src/api.rs index 0e07b31b0bb..0f6f6506b9f 100644 --- a/crates/services/block_aggregator_api/src/api.rs +++ b/crates/services/block_aggregator_api/src/api.rs @@ -1,85 +1 @@ -use crate::result::Result; -use fuel_core_types::fuel_types::BlockHeight; -use std::fmt; - pub mod protobuf_adapter; - -/// The API for querying the block aggregator service. -pub trait BlockAggregatorApi: Send + Sync { - /// The type of the block range response. - type BlockRangeResponse; - type Block; - - /// Awaits the next query to the block aggregator service. - fn await_query( - &mut self, - ) -> impl Future< - Output = Result>, - > + Send; -} - -pub enum BlockAggregatorQuery { - GetBlockRange { - first: BlockHeight, - last: BlockHeight, - response: tokio::sync::oneshot::Sender, - }, - GetCurrentHeight { - response: tokio::sync::oneshot::Sender>, - }, - // TODO: Do we need a way to unsubscribe or can we just see that the receiver is dropped? - NewBlockSubscription { - response: tokio::sync::mpsc::Sender<(BlockHeight, Block)>, - }, -} - -impl fmt::Debug for BlockAggregatorQuery { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - BlockAggregatorQuery::GetBlockRange { first, last, .. } => f - .debug_struct("GetBlockRange") - .field("first", first) - .field("last", last) - .finish(), - BlockAggregatorQuery::GetCurrentHeight { .. } => { - f.debug_struct("GetCurrentHeight").finish() - } - BlockAggregatorQuery::NewBlockSubscription { .. } => { - f.debug_struct("GetNewBlockStream").finish() - } - } - } -} - -#[cfg(test)] -impl BlockAggregatorQuery { - pub fn get_block_range>( - first: H, - last: H, - ) -> (Self, tokio::sync::oneshot::Receiver) { - let (sender, receiver) = tokio::sync::oneshot::channel(); - let first: BlockHeight = first.into(); - let last: BlockHeight = last.into(); - let query = Self::GetBlockRange { - first, - last, - response: sender, - }; - (query, receiver) - } - - pub fn get_current_height() - -> (Self, tokio::sync::oneshot::Receiver>) { - let (sender, receiver) = tokio::sync::oneshot::channel(); - let query = Self::GetCurrentHeight { response: sender }; - (query, receiver) - } - - pub fn new_block_subscription() - -> (Self, tokio::sync::mpsc::Receiver<(BlockHeight, B)>) { - const ARBITRARY_CHANNEL_SIZE: usize = 10; - let (sender, receiver) = tokio::sync::mpsc::channel(ARBITRARY_CHANNEL_SIZE); - let query = Self::NewBlockSubscription { response: sender }; - (query, receiver) - } -} diff --git a/crates/services/block_aggregator_api/src/api/protobuf_adapter.rs b/crates/services/block_aggregator_api/src/api/protobuf_adapter.rs index e771b662fd4..6da05f8a35c 100644 --- a/crates/services/block_aggregator_api/src/api/protobuf_adapter.rs +++ b/crates/services/block_aggregator_api/src/api/protobuf_adapter.rs @@ -1,8 +1,4 @@ use crate::{ - api::{ - BlockAggregatorApi, - BlockAggregatorQuery, - }, block_range_response::{ BlockRangeResponse, BoxStream, @@ -23,24 +19,24 @@ use crate::{ block_response as proto_block_response, remote_block_response::Location as ProtoRemoteLocation, }, - result::{ - Error, - Result, - }, + result::Result, }; -use anyhow::anyhow; use async_trait::async_trait; use fuel_core_services::{ RunnableService, RunnableTask, - Service, ServiceRunner, StateWatcher, TaskNextAction, - try_or_stop, + stream::Stream, }; +use fuel_core_types::fuel_types::BlockHeight; use futures::StreamExt; -use tokio_stream::wrappers::ReceiverStream; +use std::{ + net::SocketAddr, + ops::Deref, + sync::Arc, +}; use tonic::{ Status, transport::server::Router, @@ -49,34 +45,39 @@ use tonic::{ #[cfg(test)] mod tests; -pub struct Server { - query_sender: - tokio::sync::mpsc::Sender>, +#[cfg_attr(test, mockall::automock)] +pub trait BlocksAggregatorApi: Send + Sync + 'static { + fn get_block_range(&self, first: H, last: H) -> Result + where + H: Into + 'static; + + fn get_current_height(&self) -> Result>; + + fn new_block_subscription( + &self, + ) -> impl Stream)>> + Send + 'static; +} + +pub struct Server { + api: B, } -impl Server { - pub fn new( - query_sender: tokio::sync::mpsc::Sender< - BlockAggregatorQuery, - >, - ) -> Self { - Self { query_sender } +impl Server { + pub fn new(api: B) -> Self { + Self { api } } } #[async_trait] -impl BlockAggregator for Server { +impl BlockAggregator for Server +where + B: BlocksAggregatorApi, +{ async fn get_synced_block_height( &self, - request: tonic::Request, + _: tonic::Request, ) -> Result, tonic::Status> { - tracing::debug!("get_block_height: {:?}", request); - let (response, receiver) = tokio::sync::oneshot::channel(); - let query = BlockAggregatorQuery::GetCurrentHeight { response }; - self.query_sender.send(query).await.map_err(|e| { - tonic::Status::internal(format!("Failed to send query: {}", e)) - })?; - let res = receiver.await; + let res = self.api.get_current_height(); match res { Ok(height) => Ok(tonic::Response::new(ProtoBlockHeightResponse { height: height.map(|inner| *inner), @@ -94,17 +95,7 @@ impl BlockAggregator for Server { request: tonic::Request, ) -> Result, tonic::Status> { let req = request.into_inner(); - let (response, receiver) = tokio::sync::oneshot::channel(); - let query = BlockAggregatorQuery::GetBlockRange { - first: req.start.into(), - last: req.end.into(), - response, - }; - self.query_sender - .send(query) - .await - .map_err(|e| Status::internal(format!("Failed to send query: {}", e)))?; - let res = receiver.await; + let res = self.api.get_block_range(req.start, req.end); match res { Ok(block_range_response) => match block_range_response { BlockRangeResponse::Literal(inner) => { @@ -153,55 +144,53 @@ impl BlockAggregator for Server { } } - type NewBlockSubscriptionStream = ReceiverStream>; + type NewBlockSubscriptionStream = BoxStream>; async fn new_block_subscription( &self, - request: tonic::Request, + _: tonic::Request, ) -> Result, tonic::Status> { - const ARB_CHANNEL_SIZE: usize = 100; - tracing::debug!("get_block_range: {:?}", request); - let (response, mut receiver) = tokio::sync::mpsc::channel(ARB_CHANNEL_SIZE); - let query = BlockAggregatorQuery::NewBlockSubscription { response }; - self.query_sender - .send(query) - .await - .map_err(|e| Status::internal(format!("Failed to send query: {}", e)))?; - - let (task_sender, task_receiver) = tokio::sync::mpsc::channel(ARB_CHANNEL_SIZE); - tokio::spawn(async move { - while let Some((height, nb)) = receiver.recv().await { - let response = ProtoBlockResponse { - height: *height, - payload: Some(proto_block_response::Payload::Literal(nb)), - }; - if task_sender.send(Ok(response)).await.is_err() { - break; + let stream = self.api.new_block_subscription().map(|item| { + match item { + Ok((block_height, block)) => { + let response = ProtoBlockResponse { + height: *block_height, + // TODO: Avoid clone + payload: Some(proto_block_response::Payload::Literal( + block.deref().clone(), + )), + }; + Ok(response) } + Err(err) => Err(tonic::Status::internal(format!( + "Failed to receive new block: {}", + err + ))), } }); - Ok(tonic::Response::new(ReceiverStream::new(task_receiver))) + Ok(tonic::Response::new(stream.boxed())) } } -pub struct ProtobufAPI { - _server_service: ServiceRunner, - query_receiver: - tokio::sync::mpsc::Receiver>, +pub struct UninitializedTask { + addr: SocketAddr, + api: B, } -pub struct ServerTask { - addr: std::net::SocketAddr, - query_sender: - tokio::sync::mpsc::Sender>, +pub struct Task { + addr: SocketAddr, router: Option, } + #[async_trait::async_trait] -impl RunnableService for ServerTask { +impl RunnableService for UninitializedTask +where + B: BlocksAggregatorApi, +{ const NAME: &'static str = "ProtobufServerTask"; type SharedData = (); - type Task = Self; + type Task = Task; type TaskParams = (); fn shared_data(&self) -> Self::SharedData {} @@ -211,48 +200,39 @@ impl RunnableService for ServerTask { _state_watcher: &StateWatcher, _params: Self::TaskParams, ) -> anyhow::Result { - self.start_router()?; - Ok(self) - } -} - -impl ServerTask { - fn start_router(&mut self) -> anyhow::Result<()> { - let server = Server::new(self.query_sender.clone()); + let server = Server::new(self.api); let router = tonic::transport::Server::builder() .add_service(ProtoBlockAggregatorServer::new(server)); - self.router = Some(router); - Ok(()) - } - - fn get_router(&mut self) -> anyhow::Result { - self.router - .take() - .ok_or_else(|| anyhow!("Router has not been initialized yet")) + let task = Task { + addr: self.addr, + router: Some(router), + }; + Ok(task) } } -impl RunnableTask for ServerTask { +impl RunnableTask for Task { async fn run(&mut self, watcher: &mut StateWatcher) -> TaskNextAction { - let router_res = self.get_router(); - let router = try_or_stop!(router_res, |e| tracing::error!( - "Failed to get router, has not been started: {:?}", - e - )); + let mut watcher_local = watcher.clone(); + let future = self + .router + .take() + .expect("Router is always initialized; qed") + .serve_with_shutdown(self.addr, async move { + let _ = watcher_local.while_started().await; + }); + tokio::select! { - res = router.serve(self.addr) => { - if let Err(e) = res { - tracing::error!("BlockAggregator tonic server error: {}", e); - TaskNextAction::Stop - } else { - tracing::info!("BlockAggregator tonic server stopped"); - TaskNextAction::Stop - } - }, - _ = watcher.while_started() => { - TaskNextAction::Stop - } + res = future => { + if let Err(e) = res { + tracing::error!("BlockAggregator tonic server error: {}", e); + } else { + tracing::info!("BlockAggregator tonic server stopped"); + } + }, + _ = watcher.while_started() => {} } + TaskNextAction::Stop } async fn shutdown(self) -> anyhow::Result<()> { @@ -260,38 +240,11 @@ impl RunnableTask for ServerTask { } } -impl ProtobufAPI { - pub fn new(url: String, buffer_size: usize) -> Result { - let (query_sender, query_receiver) = tokio::sync::mpsc::channel::< - BlockAggregatorQuery, - >(buffer_size); - let addr = url.parse().unwrap(); - let _server_service = ServiceRunner::new(ServerTask { - addr, - query_sender, - router: None, - }); - _server_service.start().map_err(Error::Api)?; - let api = Self { - _server_service, - query_receiver, - }; - Ok(api) - } -} - -impl BlockAggregatorApi for ProtobufAPI { - type BlockRangeResponse = BlockRangeResponse; - type Block = ProtoBlock; +pub type APIService = ServiceRunner>; - async fn await_query( - &mut self, - ) -> Result> { - let query = self - .query_receiver - .recv() - .await - .ok_or_else(|| Error::Api(anyhow::anyhow!("Channel closed")))?; - Ok(query) - } +pub fn new_service(addr: SocketAddr, api: B) -> APIService +where + B: BlocksAggregatorApi, +{ + ServiceRunner::new(UninitializedTask { addr, api }) } diff --git a/crates/services/block_aggregator_api/src/api/protobuf_adapter/tests.rs b/crates/services/block_aggregator_api/src/api/protobuf_adapter/tests.rs index 86c52b650c8..23082455ebe 100644 --- a/crates/services/block_aggregator_api/src/api/protobuf_adapter/tests.rs +++ b/crates/services/block_aggregator_api/src/api/protobuf_adapter/tests.rs @@ -1,32 +1,29 @@ #![allow(non_snake_case)] use crate::{ - api::{ - BlockAggregatorApi, - BlockAggregatorQuery, - protobuf_adapter::ProtobufAPI, + api::protobuf_adapter::{ + MockBlocksAggregatorApi, + new_service, }, block_range_response::{ BlockRangeResponse, RemoteS3Response, }, - blocks::importer_and_db_source::{ - BlockSerializer, - serializer_adapter::SerializerAdapter, + blocks::old_block_source::{ + BlockConvector, + convertor_adapter::ConvertorAdapter, }, protobuf_types::{ Block as ProtoBlock, BlockHeightRequest, BlockRangeRequest, NewBlockSubscriptionRequest, - block_aggregator_client::{ - BlockAggregatorClient as ProtoBlockAggregatorClient, - BlockAggregatorClient, - }, + block_aggregator_client::BlockAggregatorClient as ProtoBlockAggregatorClient, block_response::Payload, }, }; use fuel_core_protobuf::remote_block_response::Location; +use fuel_core_services::Service; use fuel_core_types::{ blockchain::block::Block as FuelBlock, fuel_types::BlockHeight, @@ -35,108 +32,90 @@ use futures::{ StreamExt, TryStreamExt, }; +use std::{ + net::SocketAddr, + sync::Arc, +}; +use tokio::sync::broadcast; -fn free_local_addr() -> String { +fn free_local_addr() -> SocketAddr { let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap(); - let addr = listener.local_addr().unwrap(); - format!("127.0.0.1:{}", addr.port()) + listener.local_addr().unwrap() } #[tokio::test] async fn await_query__get_current_height__client_receives_expected_value() { - // given - let path = free_local_addr(); - let mut api = ProtobufAPI::new(path.to_string(), 100).unwrap(); - tokio::time::sleep(std::time::Duration::from_millis(100)).await; + let socket = free_local_addr(); + let mut api = MockBlocksAggregatorApi::default(); + + // Given + api.expect_get_current_height() + .times(1) + .returning(|| Ok(Some(BlockHeight::new(42)))); + + let service = new_service(socket, api); + service.start_and_await().await.unwrap(); // call get current height endpoint with client - let url = format!("http://{}", path); + let url = format!("http://{}", socket); let mut client = ProtoBlockAggregatorClient::connect(url.to_string()) .await .expect("could not connect to server"); - let handle = tokio::spawn(async move { - tracing::info!("querying with client"); - client - .get_synced_block_height(BlockHeightRequest {}) - .await - .expect("could not get height") - }); - - // when - tracing::info!("awaiting query"); - let query = api.await_query().await.unwrap(); - - // then - // return response through query's channel - if let BlockAggregatorQuery::GetCurrentHeight { response } = query { - response.send(Some(BlockHeight::new(42))).unwrap(); - } else { - panic!("expected GetCurrentHeight query"); - } - let res = handle.await.unwrap(); + + // When + let result = client.get_synced_block_height(BlockHeightRequest {}).await; + + // Then + let value = result.expect("could not get height"); // assert client received expected value - assert_eq!(res.into_inner().height, Some(42)); + assert_eq!(value.into_inner().height, Some(42)); } #[tokio::test] async fn await_query__get_block_range__client_receives_expected_value__literal() { - // given - let path = free_local_addr(); - let mut api = ProtobufAPI::new(path.to_string(), 100).unwrap(); - tokio::time::sleep(std::time::Duration::from_millis(100)).await; + let socket = free_local_addr(); + let mut api = MockBlocksAggregatorApi::default(); - // call get current height endpoint with client - let url = format!("http://{}", path); - let mut client = ProtoBlockAggregatorClient::connect(url.to_string()) - .await - .expect("could not connect to server"); - let request = BlockRangeRequest { start: 0, end: 1 }; - let handle = tokio::spawn(async move { - tracing::info!("querying with client"); - client - .get_block_range(request) - .await - .expect("could not get height") - }); - - // when - tracing::info!("awaiting query"); - let query = api.await_query().await.unwrap(); - - // then - let serializer_adapter = SerializerAdapter; + // Given + let convertor_adapter = ConvertorAdapter; let fuel_block_1 = FuelBlock::default(); let mut fuel_block_2 = FuelBlock::default(); let block_height_1 = fuel_block_1.header().height(); let block_height_2 = block_height_1.succ().unwrap(); fuel_block_2.header_mut().set_block_height(block_height_2); - let block1 = serializer_adapter - .serialize_block(&fuel_block_1, &[]) + let block1 = convertor_adapter + .convert_block(&fuel_block_1, &[]) .expect("could not serialize block"); - let block2 = serializer_adapter - .serialize_block(&fuel_block_2, &[]) + let block2 = convertor_adapter + .convert_block(&fuel_block_2, &[]) .expect("could not serialize block"); let list = vec![(*block_height_1, block1), (block_height_2, block2)]; - // return response through query's channel - if let BlockAggregatorQuery::GetBlockRange { - first, - last, - response, - } = query - { - assert_eq!(first, BlockHeight::new(0)); - assert_eq!(last, BlockHeight::new(1)); - tracing::info!("correct query received, sending response"); - let stream = tokio_stream::iter(list.clone()).boxed(); - let range = BlockRangeResponse::Literal(stream); - response.send(range).unwrap(); - } else { - panic!("expected GetBlockRange query"); - } - tracing::info!("awaiting query"); - let response = handle.await.unwrap(); - let expected = list; + let expected = list.clone(); + api.expect_get_block_range() + .times(1) + .returning(move |_: u32, _: u32| { + let response = BlockRangeResponse::Literal( + futures::stream::iter(list.clone().into_iter()).boxed(), + ); + Ok(response) + }); + + let service = new_service(socket, api); + service.start_and_await().await.unwrap(); + + // call get current height endpoint with client + let url = format!("http://{}", socket); + let mut client = ProtoBlockAggregatorClient::connect(url.to_string()) + .await + .expect("could not connect to server"); + let request = BlockRangeRequest { start: 0, end: 1 }; + + // When + let result = client.get_block_range(request).await; + + // Then + let response = result.unwrap(); let actual: Vec<(BlockHeight, ProtoBlock)> = response .into_inner() .try_collect::>() @@ -154,32 +133,13 @@ async fn await_query__get_block_range__client_receives_expected_value__literal() assert_eq!(expected, actual); } + #[tokio::test] async fn await_query__get_block_range__client_receives_expected_value__remote() { - // given - let path = free_local_addr(); - let mut api = ProtobufAPI::new(path.to_string(), 100).unwrap(); - tokio::time::sleep(std::time::Duration::from_millis(100)).await; + let socket = free_local_addr(); + let mut api = MockBlocksAggregatorApi::default(); - // call get current height endpoint with client - let url = format!("http://{}", path); - let mut client = ProtoBlockAggregatorClient::connect(url.to_string()) - .await - .expect("could not connect to server"); - let request = BlockRangeRequest { start: 0, end: 1 }; - let handle = tokio::spawn(async move { - tracing::info!("querying with client"); - client - .get_block_range(request) - .await - .expect("could not get height") - }); - - // when - tracing::info!("awaiting query"); - let query = api.await_query().await.unwrap(); - - // then + // Given let list: Vec<_> = [(BlockHeight::new(1), "1"), (BlockHeight::new(2), "2")] .iter() .map(|(height, key)| { @@ -194,25 +154,31 @@ async fn await_query__get_block_range__client_receives_expected_value__remote() (*height, res) }) .collect(); - // return response through query's channel - if let BlockAggregatorQuery::GetBlockRange { - first, - last, - response, - } = query - { - assert_eq!(first, BlockHeight::new(0)); - assert_eq!(last, BlockHeight::new(1)); - tracing::info!("correct query received, sending response"); - let stream = tokio_stream::iter(list.clone()).boxed(); - let range = BlockRangeResponse::S3(stream); - response.send(range).unwrap(); - } else { - panic!("expected GetBlockRange query"); - } - tracing::info!("awaiting query"); - let response = handle.await.unwrap(); - let expected = list; + let expected = list.clone(); + api.expect_get_block_range() + .times(1) + .returning(move |_: u32, _: u32| { + let response = BlockRangeResponse::S3( + futures::stream::iter(list.clone().into_iter()).boxed(), + ); + Ok(response) + }); + + let service = new_service(socket, api); + service.start_and_await().await.unwrap(); + + // call get current height endpoint with client + let url = format!("http://{}", socket); + let mut client = ProtoBlockAggregatorClient::connect(url.to_string()) + .await + .expect("could not connect to server"); + let request = BlockRangeRequest { start: 0, end: 1 }; + + // When + let result = client.get_block_range(request).await; + + // Then + let response = result.unwrap(); let actual: Vec<(BlockHeight, RemoteS3Response)> = response .into_inner() .try_collect::>() @@ -244,57 +210,57 @@ async fn await_query__get_block_range__client_receives_expected_value__remote() #[tokio::test] async fn await_query__new_block_stream__client_receives_expected_value() { - // given - let path = free_local_addr(); - let mut api = ProtobufAPI::new(path.to_string(), 100).unwrap(); - tokio::time::sleep(std::time::Duration::from_millis(100)).await; + let socket = free_local_addr(); + let mut api = MockBlocksAggregatorApi::default(); + + let (sender, receiver) = broadcast::channel(100); + + // Given + let convertor_adapter = ConvertorAdapter; + let fuel_block_1 = FuelBlock::default(); + let mut fuel_block_2 = FuelBlock::default(); + let block_height_1 = fuel_block_1.header().height(); + let block_height_2 = block_height_1.succ().unwrap(); + fuel_block_2.header_mut().set_block_height(block_height_2); + let block1 = convertor_adapter + .convert_block(&fuel_block_1, &[]) + .expect("could not serialize block"); + let block2 = convertor_adapter + .convert_block(&fuel_block_2, &[]) + .expect("could not serialize block"); + let list = vec![(*block_height_1, block1), (block_height_2, block2)]; + + api.expect_new_block_subscription() + .times(1) + .returning(move || { + let stream = + tokio_stream::wrappers::BroadcastStream::new(receiver.resubscribe()) + .map(|result| result.map_err(|err| anyhow::anyhow!(err))); + stream.boxed() + }); + + let service = new_service(socket, api); + service.start_and_await().await.unwrap(); // call get current height endpoint with client - let url = format!("http://{}", path); - let mut client = BlockAggregatorClient::connect(url.to_string()) + let url = format!("http://{}", socket); + let mut client = ProtoBlockAggregatorClient::connect(url.to_string()) .await .expect("could not connect to server"); let request = NewBlockSubscriptionRequest {}; - let handle = tokio::spawn(async move { - tracing::info!("querying with client"); - client - .new_block_subscription(request) - .await - .expect("could not get height") - }); - - // when - tracing::info!("awaiting query"); - let query = api.await_query().await.unwrap(); - - // then - let height1 = BlockHeight::new(0); - let height2 = BlockHeight::new(1); - let serializer_adapter = SerializerAdapter; - let mut fuel_block_1 = FuelBlock::default(); - fuel_block_1.header_mut().set_block_height(height1); - let mut fuel_block_2 = FuelBlock::default(); - fuel_block_2.header_mut().set_block_height(height2); - let block1 = serializer_adapter - .serialize_block(&fuel_block_1, &[]) - .expect("could not serialize block"); - let block2 = serializer_adapter - .serialize_block(&fuel_block_2, &[]) - .expect("could not serialize block"); - let list = vec![(height1, block1), (height2, block2)]; - if let BlockAggregatorQuery::NewBlockSubscription { response } = query { - tracing::info!("correct query received, sending response"); - for (height, block) in list.clone() { - response.send((height, block)).await.unwrap(); - } - } else { - panic!("expected GetBlockRange query"); + + // When + let result = client.new_block_subscription(request).await; + // Send blocks + for (height, block) in list.clone() { + sender.send((height, Arc::new(block))).unwrap(); } - tracing::info!("awaiting query"); - let response = handle.await.unwrap(); - let expected = list; - let actual: Vec<(BlockHeight, ProtoBlock)> = response - .into_inner() + + // Then + let stream = result.unwrap().into_inner(); + + let actual: Vec<(BlockHeight, ProtoBlock)> = stream + .take(2) .try_collect::>() .await .unwrap() @@ -308,5 +274,5 @@ async fn await_query__new_block_stream__client_receives_expected_value() { }) .collect(); - assert_eq!(expected, actual); + assert_eq!(list, actual); } diff --git a/crates/services/block_aggregator_api/src/block_aggregator.rs b/crates/services/block_aggregator_api/src/block_aggregator.rs deleted file mode 100644 index 48009d6cfa0..00000000000 --- a/crates/services/block_aggregator_api/src/block_aggregator.rs +++ /dev/null @@ -1,148 +0,0 @@ -use crate::{ - BlockAggregator, - api::{ - BlockAggregatorApi, - BlockAggregatorQuery, - }, - blocks::{ - BlockSource, - BlockSourceEvent, - }, - db::BlockAggregatorDB, -}; -use fuel_core_services::{ - TaskNextAction, - try_or_stop, -}; -use fuel_core_types::fuel_types::BlockHeight; - -impl BlockAggregator -where - Api: BlockAggregatorApi, - DB: BlockAggregatorDB, - Blocks: BlockSource, - ::Block: Clone + std::fmt::Debug, - BlockRangeResponse: Send, -{ - pub fn new(query: Api, database: DB, block_source: Blocks) -> Self { - let new_block_subscriptions = Vec::new(); - Self { - query, - database, - block_source, - new_block_subscriptions, - } - } - - pub fn stop(&self) -> TaskNextAction { - TaskNextAction::Stop - } - - pub async fn handle_query( - &mut self, - res: crate::result::Result< - BlockAggregatorQuery, - >, - ) -> TaskNextAction { - tracing::debug!("Handling query: {res:?}"); - let query = try_or_stop!(res, |e| { - tracing::error!("Error receiving query: {e:?}"); - }); - match query { - BlockAggregatorQuery::GetBlockRange { - first, - last, - response, - } => { - self.handle_get_block_range_query(first, last, response) - .await - } - BlockAggregatorQuery::GetCurrentHeight { response } => { - self.handle_get_current_height_query(response).await - } - BlockAggregatorQuery::NewBlockSubscription { response } => { - self.handle_new_block_subscription(response).await - } - } - } - - async fn handle_get_block_range_query( - &mut self, - first: BlockHeight, - last: BlockHeight, - response: tokio::sync::oneshot::Sender, - ) -> TaskNextAction { - let res = self.database.get_block_range(first, last).await; - let block_stream = try_or_stop!(res, |e| { - tracing::error!("Error getting block range from database: {e:?}"); - }); - let res = response.send(block_stream); - try_or_stop!(res, |_| { - tracing::error!("Error sending block range response"); - }); - TaskNextAction::Continue - } - - async fn handle_get_current_height_query( - &mut self, - response: tokio::sync::oneshot::Sender>, - ) -> TaskNextAction { - let res = self.database.get_current_height().await; - let height = try_or_stop!(res, |e| { - tracing::error!("Error getting current height from database: {e:?}"); - }); - let res = response.send(height); - try_or_stop!(res, |_| { - tracing::error!("Error sending current height response"); - }); - TaskNextAction::Continue - } - - async fn handle_new_block_subscription( - &mut self, - response: tokio::sync::mpsc::Sender<(BlockHeight, Blocks::Block)>, - ) -> TaskNextAction { - self.new_block_subscriptions.push(response); - TaskNextAction::Continue - } - - pub async fn handle_block( - &mut self, - res: crate::result::Result::Block>>, - ) -> TaskNextAction - where - ::Block: std::fmt::Debug, - { - tracing::debug!("Handling block: {res:?}"); - let event = try_or_stop!(res, |e| { - tracing::error!("Error receiving block from source: {e:?}"); - }); - match &event { - BlockSourceEvent::NewBlock(height, block) => { - self.new_block_subscriptions.retain_mut(|sub| { - let send_res = sub.try_send((*height, block.clone())); - match send_res { - Ok(_) => true, - Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => { - tracing::error!("Error sending new block to subscriber due to full channel: {height:?}"); - true - }, - Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => { - tracing::debug!("Dropping block subscription due to closed channel"); - false - }, - } - }); - } - BlockSourceEvent::OldBlock(_id, _block) => { - // Do nothing - // Only stream new blocks - } - }; - let res = self.database.store_block(event).await; - try_or_stop!(res, |e| { - tracing::error!("Error storing block in database: {e:?}"); - }); - TaskNextAction::Continue - } -} diff --git a/crates/services/block_aggregator_api/src/blocks.rs b/crates/services/block_aggregator_api/src/blocks.rs index cc846995477..08868c0720f 100644 --- a/crates/services/block_aggregator_api/src/blocks.rs +++ b/crates/services/block_aggregator_api/src/blocks.rs @@ -5,34 +5,17 @@ use fuel_core_types::fuel_types::{ }; use std::fmt::Debug; -pub mod importer_and_db_source; +pub mod old_block_source; /// Source from which blocks can be gathered for aggregation -pub trait BlockSource: Send + Sync { +pub trait BlockSource: Send + Sync + 'static { type Block; - /// Asynchronously fetch the next block and its height - fn next_block( - &mut self, - ) -> impl Future>> + Send; - /// Drain any remaining blocks from the source - fn drain(&mut self) -> impl Future> + Send; -} - -#[derive(Clone, Debug, Eq, PartialEq, Hash)] -pub enum BlockSourceEvent { - NewBlock(BlockHeight, B), - OldBlock(BlockHeight, B), -} - -impl BlockSourceEvent { - pub fn into_inner(self) -> (BlockHeight, B) { - match self { - Self::NewBlock(height, block) | Self::OldBlock(height, block) => { - (height, block) - } - } - } + /// Returns an iterator over blocks starting from the given block height + fn blocks_starting_from( + &self, + block_height: BlockHeight, + ) -> impl Iterator> + Send + Sync + 'static; } #[derive(Clone, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)] diff --git a/crates/services/block_aggregator_api/src/blocks/importer_and_db_source.rs b/crates/services/block_aggregator_api/src/blocks/importer_and_db_source.rs deleted file mode 100644 index d6dfbc78fa7..00000000000 --- a/crates/services/block_aggregator_api/src/blocks/importer_and_db_source.rs +++ /dev/null @@ -1,143 +0,0 @@ -use crate::{ - blocks::{ - BlockSource, - BlockSourceEvent, - importer_and_db_source::importer_service::ImporterTask, - }, - result::{ - Error, - Result, - }, -}; -use anyhow::anyhow; -use fuel_core_services::{ - Service, - ServiceRunner, - stream::BoxStream, -}; -use fuel_core_storage::{ - Error as StorageError, - StorageInspect, - tables::FuelBlocks, -}; -use fuel_core_types::{ - blockchain::block::Block as FuelBlock, - fuel_types::BlockHeight, - services::block_importer::SharedImportResult, -}; - -use crate::blocks::importer_and_db_source::sync_service::{ - SyncTask, - TxReceipts, -}; -use fuel_core_storage::tables::Transactions; -use fuel_core_types::fuel_tx::Receipt as FuelReceipt; - -pub mod importer_service; -pub mod sync_service; -#[cfg(test)] -mod tests; - -pub mod serializer_adapter; - -pub trait BlockSerializer { - type Block; - fn serialize_block( - &self, - block: &FuelBlock, - receipts: &[FuelReceipt], - ) -> Result; -} - -/// A block source that combines an importer and a database sync task. -/// Old blocks will be synced from a target database and new blocks will be received from -/// the importer -pub struct ImporterAndDbSource -where - Serializer: BlockSerializer + Send + Sync + 'static, - ::Block: Send + Sync + 'static, - DB: Send + Sync + 'static, - DB: StorageInspect, - DB: StorageInspect, - Receipts: TxReceipts, -{ - importer_task: ServiceRunner>, - sync_task: ServiceRunner>, - /// Receive blocks from the importer and sync tasks - receiver: tokio::sync::mpsc::Receiver>, -} - -impl ImporterAndDbSource -where - Serializer: BlockSerializer + Clone + Send + Sync + 'static, - ::Block: Send + Sync + 'static, - DB: StorageInspect + Send + Sync, - DB: StorageInspect + Send + 'static, - Receipts: TxReceipts, -{ - pub fn new( - importer: BoxStream, - serializer: Serializer, - db: DB, - receipts: Receipts, - db_starting_height: BlockHeight, - db_ending_height: BlockHeight, - ) -> Self { - const ARB_CHANNEL_SIZE: usize = 100; - let (block_return, receiver) = tokio::sync::mpsc::channel(ARB_CHANNEL_SIZE); - let importer_task = - ImporterTask::new(importer, serializer.clone(), block_return.clone()); - let importer_runner = ServiceRunner::new(importer_task); - importer_runner.start().unwrap(); - let sync_task = SyncTask::new( - serializer, - block_return, - db, - receipts, - db_starting_height, - db_ending_height, - ); - let sync_runner = ServiceRunner::new(sync_task); - sync_runner.start().unwrap(); - Self { - importer_task: importer_runner, - sync_task: sync_runner, - receiver, - } - } -} - -impl BlockSource - for ImporterAndDbSource -where - Serializer: BlockSerializer + Send + Sync + 'static, - ::Block: Send + Sync + 'static, - DB: Send + Sync + 'static, - DB: StorageInspect, - DB: StorageInspect, - Receipts: TxReceipts, -{ - type Block = Serializer::Block; - - async fn next_block(&mut self) -> Result> { - tracing::debug!("awaiting next block"); - tokio::select! { - block_res = self.receiver.recv() => { - block_res.ok_or(Error::BlockSource(anyhow!("Block source channel closed"))) - } - importer_error = self.importer_task.await_stop() => { - Err(Error::BlockSource(anyhow!("Importer task stopped unexpectedly: {:?}", importer_error))) - } - sync_error = self.sync_task.await_stop() => { - Err(Error::BlockSource(anyhow!("Sync task stopped unexpectedly: {:?}", sync_error))) - } - } - } - - async fn drain(&mut self) -> Result<()> { - self.importer_task.stop(); - self.sync_task.stop(); - self.receiver.close(); - Ok(()) - } -} diff --git a/crates/services/block_aggregator_api/src/blocks/importer_and_db_source/importer_service.rs b/crates/services/block_aggregator_api/src/blocks/importer_and_db_source/importer_service.rs deleted file mode 100644 index 99721b06ad2..00000000000 --- a/crates/services/block_aggregator_api/src/blocks/importer_and_db_source/importer_service.rs +++ /dev/null @@ -1,118 +0,0 @@ -use crate::blocks::{ - BlockSourceEvent, - importer_and_db_source::BlockSerializer, -}; -use fuel_core_services::{ - RunnableService, - RunnableTask, - StateWatcher, - TaskNextAction, - stream::BoxStream, - try_or_continue, - try_or_stop, -}; -use fuel_core_types::services::block_importer::SharedImportResult; -use futures::StreamExt; -use tokio::sync::mpsc::Sender; - -pub struct ImporterTask { - importer: BoxStream, - serializer: Serializer, - block_return_sender: Sender>, -} - -impl ImporterTask -where - Serializer: BlockSerializer + Send, - ::Block: Send, -{ - pub fn new( - importer: BoxStream, - serializer: Serializer, - block_return: Sender>, - ) -> Self { - Self { - importer, - serializer, - block_return_sender: block_return, - } - } -} -impl RunnableTask for ImporterTask -where - Serializer: BlockSerializer + Send + Sync, - ::Block: Send, -{ - async fn run(&mut self, watcher: &mut StateWatcher) -> TaskNextAction { - tokio::select! { - fuel_block = self.importer.next() => self.process_shared_import_result(fuel_block).await, - _ = watcher.while_started() => { - TaskNextAction::Stop - }, - } - } - - async fn shutdown(self) -> anyhow::Result<()> { - Ok(()) - } -} - -impl ImporterTask -where - Serializer: BlockSerializer + Send + Sync, -{ - async fn process_shared_import_result( - &mut self, - maybe_import_result: Option, - ) -> TaskNextAction { - tracing::debug!("imported block"); - match maybe_import_result { - Some(import_result) => { - let height = import_result.sealed_block.entity.header().height(); - let receipts = import_result - .tx_status - .iter() - .flat_map(|status| status.result.receipts()) - .map(Clone::clone) - .collect::>(); - let res = self - .serializer - .serialize_block(&import_result.sealed_block.entity, &receipts); - let block = try_or_continue!(res); - let event = BlockSourceEvent::NewBlock(*height, block); - let res = self.block_return_sender.send(event).await; - try_or_stop!( - res, - |_e| "failed to send imported block to receiver: {_e:?}" - ); - TaskNextAction::Continue - } - None => { - tracing::debug!("importer returned None, stopping"); - TaskNextAction::Stop - } - } - } -} - -#[async_trait::async_trait] -impl RunnableService for ImporterTask -where - Serializer: BlockSerializer + Send + Sync + 'static, - ::Block: Send + 'static, -{ - const NAME: &'static str = "BlockSourceImporterTask"; - type SharedData = (); - type Task = Self; - type TaskParams = (); - - fn shared_data(&self) -> Self::SharedData {} - - async fn into_task( - self, - _state_watcher: &StateWatcher, - _params: Self::TaskParams, - ) -> anyhow::Result { - Ok(self) - } -} diff --git a/crates/services/block_aggregator_api/src/blocks/importer_and_db_source/sync_service.rs b/crates/services/block_aggregator_api/src/blocks/importer_and_db_source/sync_service.rs deleted file mode 100644 index ef275bbc85d..00000000000 --- a/crates/services/block_aggregator_api/src/blocks/importer_and_db_source/sync_service.rs +++ /dev/null @@ -1,205 +0,0 @@ -use crate::{ - blocks::{ - BlockSourceEvent, - importer_and_db_source::BlockSerializer, - }, - result::{ - Error, - Result, - }, -}; -use fuel_core_services::{ - RunnableService, - RunnableTask, - StateWatcher, - TaskNextAction, - try_or_continue, - try_or_stop, -}; -use fuel_core_storage::{ - self, - Error as StorageError, - StorageInspect, - tables::{ - FuelBlocks, - Transactions, - }, -}; -use fuel_core_types::{ - blockchain::block::Block as FuelBlock, - fuel_tx::{ - Receipt, - Transaction, - TxId, - }, - fuel_types::BlockHeight, -}; -use futures::{ - StreamExt, - TryStreamExt, - stream::FuturesOrdered, -}; -use tokio::sync::mpsc::Sender; - -pub struct SyncTask { - serializer: Serializer, - block_return_sender: Sender>, - db: DB, - receipts: Receipts, - next_height: BlockHeight, - // exclusive, does not ask for this block - stop_height: BlockHeight, -} - -pub trait TxReceipts: 'static + Send + Sync { - fn get_receipts( - &self, - tx_id: &TxId, - ) -> impl Future>> + Send; -} - -impl SyncTask -where - Serializer: BlockSerializer + Send, - DB: Send + Sync + 'static, - DB: StorageInspect, - DB: StorageInspect, - Receipts: TxReceipts, -{ - pub fn new( - serializer: Serializer, - block_return: Sender>, - db: DB, - receipts: Receipts, - db_starting_height: BlockHeight, - // does not ask for this block (exclusive) - db_ending_height: BlockHeight, - ) -> Self { - Self { - serializer, - block_return_sender: block_return, - db, - receipts, - next_height: db_starting_height, - stop_height: db_ending_height, - } - } - - async fn get_block_and_receipts( - &self, - height: &BlockHeight, - ) -> Result)>> { - let maybe_block = StorageInspect::::get(&self.db, height) - .map_err(Error::block_source_error)?; - if let Some(block) = maybe_block { - let tx_ids = block.transactions(); - let txs = self.get_txs(tx_ids)?; - let receipts = self.get_receipts(tx_ids).await?; - let block = block.into_owned().uncompress(txs); - Ok(Some((block, receipts))) - } else { - Ok(None) - } - } - - fn get_txs(&self, tx_ids: &[TxId]) -> Result> { - let mut txs = Vec::new(); - for tx_id in tx_ids { - match StorageInspect::::get(&self.db, tx_id) - .map_err(Error::block_source_error)? - { - Some(tx) => { - tracing::debug!("found tx id: {:?}", tx_id); - txs.push(tx.into_owned()); - } - None => { - return Ok(vec![]); - } - } - } - Ok(txs) - } - - async fn get_receipts(&self, tx_ids: &[TxId]) -> Result> { - let receipt_futs = tx_ids.iter().map(|tx_id| self.receipts.get_receipts(tx_id)); - FuturesOrdered::from_iter(receipt_futs) - .then(|res| async move { res.map_err(Error::block_source_error) }) - .try_concat() - .await - } -} - -impl RunnableTask - for SyncTask -where - Serializer: BlockSerializer + Send + Sync, - Serializer::Block: Send + Sync + 'static, - DB: Send + Sync + 'static, - DB: StorageInspect, - DB: StorageInspect, - Receipts: TxReceipts, -{ - async fn run(&mut self, _watcher: &mut StateWatcher) -> TaskNextAction { - if self.next_height >= self.stop_height { - tracing::info!( - "reached stop height {}, putting task into hibernation", - self.stop_height - ); - let _ = _watcher.while_started().await; - return TaskNextAction::Stop - } - let next_height = self.next_height; - let res = self.get_block_and_receipts(&next_height).await; - let maybe_block_and_receipts = try_or_stop!(res, |e| { - tracing::error!("error fetching block at height {}: {:?}", next_height, e); - }); - if let Some((block, receipts)) = maybe_block_and_receipts { - tracing::debug!( - "found block at height {:?}, sending to return channel", - next_height - ); - let res = self.serializer.serialize_block(&block, &receipts); - let block = try_or_continue!(res); - let event = - BlockSourceEvent::OldBlock(BlockHeight::from(*next_height), block); - let res = self.block_return_sender.send(event).await; - try_or_continue!(res); - self.next_height = BlockHeight::from((*next_height).saturating_add(1)); - TaskNextAction::Continue - } else { - tracing::error!("no block found at height {:?}, retrying", next_height); - TaskNextAction::Stop - } - } - - async fn shutdown(self) -> anyhow::Result<()> { - Ok(()) - } -} - -#[async_trait::async_trait] -impl RunnableService - for SyncTask -where - Serializer: BlockSerializer + Send + Sync + 'static, - ::Block: Send + Sync + 'static, - DB: Send + Sync + 'static, - DB: StorageInspect, - DB: StorageInspect, - Receipts: TxReceipts, -{ - const NAME: &'static str = "BlockSourceSyncTask"; - type SharedData = (); - type Task = Self; - type TaskParams = (); - - fn shared_data(&self) -> Self::SharedData {} - - async fn into_task( - self, - _state_watcher: &StateWatcher, - _params: Self::TaskParams, - ) -> anyhow::Result { - Ok(self) - } -} diff --git a/crates/services/block_aggregator_api/src/blocks/importer_and_db_source/tests.rs b/crates/services/block_aggregator_api/src/blocks/importer_and_db_source/tests.rs deleted file mode 100644 index 9f2570d546e..00000000000 --- a/crates/services/block_aggregator_api/src/blocks/importer_and_db_source/tests.rs +++ /dev/null @@ -1,172 +0,0 @@ -#![allow(non_snake_case)] - -use super::*; -use fuel_core_services::stream::{ - IntoBoxStream, - pending, -}; -use fuel_core_storage::{ - StorageAsMut, - column::Column as OnChainColumn, - structured_storage::test::InMemoryStorage, - transactional::{ - IntoTransaction, - StorageTransaction, - WriteTransaction, - }, -}; -use futures::StreamExt; -use std::collections::HashMap; - -use crate::blocks::importer_and_db_source::{ - serializer_adapter::SerializerAdapter, - sync_service::TxReceipts, -}; -use fuel_core_types::{ - blockchain::SealedBlock, - fuel_tx::{ - Transaction, - TxId, - UniqueIdentifier, - }, - fuel_types::ChainId, - services::block_importer::ImportResult, -}; -use std::sync::Arc; - -fn onchain_db() -> StorageTransaction> { - InMemoryStorage::default().into_transaction() -} - -struct MockTxReceiptsSource { - receipts_map: HashMap>, -} - -impl MockTxReceiptsSource { - fn new(receipts: &[(TxId, Vec)]) -> Self { - let receipts_map = receipts.iter().cloned().collect(); - Self { receipts_map } - } -} - -impl TxReceipts for MockTxReceiptsSource { - async fn get_receipts(&self, tx_id: &TxId) -> Result> { - let receipts = self.receipts_map.get(tx_id).cloned().ok_or_else(|| { - Error::BlockSource(anyhow!("no receipts found for a tx with id {}", tx_id)) - })?; - Ok(receipts) - } -} - -#[tokio::test] -async fn next_block__gets_new_block_from_importer() { - // given - let block = SealedBlock::default(); - let height = block.entity.header().height(); - let import_result = Arc::new( - ImportResult { - sealed_block: block.clone(), - tx_status: vec![], - events: vec![], - source: Default::default(), - } - .wrap(), - ); - let blocks: Vec = vec![import_result]; - let block_stream = tokio_stream::iter(blocks).chain(pending()).into_boxed(); - let serializer = SerializerAdapter; - let db = onchain_db(); - let receipt_source = MockTxReceiptsSource::new(&[]); - let db_starting_height = BlockHeight::from(0u32); - // we don't need to sync anything, so we can use the same height for both - let db_ending_height = db_starting_height; - let mut adapter = ImporterAndDbSource::new( - block_stream, - serializer.clone(), - db, - receipt_source, - db_starting_height, - db_ending_height, - ); - - // when - let actual = adapter.next_block().await.unwrap(); - - // then - let serialized = serializer.serialize_block(&block.entity, &[]).unwrap(); - let expected = BlockSourceEvent::NewBlock(*height, serialized); - assert_eq!(expected, actual); -} - -fn arbitrary_block_with_txs(height: BlockHeight) -> FuelBlock { - let mut block = FuelBlock::default(); - block.header_mut().set_block_height(height); - let txs = block.transactions_mut(); - *txs = vec![Transaction::default_test_tx()]; - block -} - -fn arbitrary_receipts() -> Vec { - let one = FuelReceipt::Mint { - sub_id: Default::default(), - contract_id: Default::default(), - val: 100, - pc: 0, - is: 0, - }; - let two = FuelReceipt::Transfer { - id: Default::default(), - to: Default::default(), - amount: 50, - asset_id: Default::default(), - pc: 0, - is: 0, - }; - vec![one, two] -} - -#[tokio::test] -async fn next_block__can_get_block_from_db() { - // given - let chain_id = ChainId::default(); - let height1 = BlockHeight::from(0u32); - let height2 = BlockHeight::from(1u32); - let block = arbitrary_block_with_txs(height1); - let receipts = arbitrary_receipts(); - let height = block.header().height(); - let serializer = SerializerAdapter; - let mut onchain_db = onchain_db(); - let mut tx = onchain_db.write_transaction(); - let compressed_block = block.compress(&chain_id); - tx.storage_as_mut::() - .insert(height, &compressed_block) - .unwrap(); - let tx_id = block.transactions()[0].id(&chain_id); - tx.storage_as_mut::() - .insert( - &block.transactions()[0].id(&chain_id), - &block.transactions()[0], - ) - .unwrap(); - tx.commit().unwrap(); - let receipt_source = MockTxReceiptsSource::new(&[(tx_id, receipts.clone())]); - let block_stream = tokio_stream::pending().into_boxed(); - let db_starting_height = *height; - let db_ending_height = height2; - let mut adapter = ImporterAndDbSource::new( - block_stream, - serializer.clone(), - onchain_db, - receipt_source, - db_starting_height, - db_ending_height, - ); - - // when - let actual = adapter.next_block().await.unwrap(); - - // then - let serialized = serializer.serialize_block(&block, &receipts).unwrap(); - let expected = BlockSourceEvent::OldBlock(*height, serialized); - assert_eq!(expected, actual); -} diff --git a/crates/services/block_aggregator_api/src/blocks/old_block_source.rs b/crates/services/block_aggregator_api/src/blocks/old_block_source.rs new file mode 100644 index 00000000000..752583a8c6b --- /dev/null +++ b/crates/services/block_aggregator_api/src/blocks/old_block_source.rs @@ -0,0 +1,190 @@ +use crate::{ + blocks::BlockSource, + result::{ + Error, + Result, + }, +}; +use fuel_core_storage::{ + Error as StorageError, + StorageInspect, + tables::{ + FuelBlocks, + Transactions, + }, +}; +use fuel_core_types::{ + blockchain::block::Block as FuelBlock, + fuel_tx::{ + Receipt as FuelReceipt, + Receipt, + Transaction, + TxId, + }, + fuel_types::BlockHeight, +}; +use std::sync::Arc; + +pub mod convertor_adapter; + +pub trait BlockConvector: Send + Sync + 'static { + type Block; + + fn convert_block( + &self, + block: &FuelBlock, + receipts: &[Vec], + ) -> Result; +} + +pub trait TxReceipts: Send + Sync + 'static { + fn get_receipts(&self, tx_id: &TxId) -> Result>; +} + +pub struct OldBlocksSource { + convertor: Arc, + db: Arc, + receipts: Arc, +} + +impl OldBlocksSource { + pub fn new(convertor: Arc, db: DB, receipts: Receipts) -> Self { + Self { + convertor, + db: Arc::new(db), + receipts: Arc::new(receipts), + } + } +} + +impl OldBlocksSource +where + DB: Send + Sync + 'static, + DB: StorageInspect, + DB: StorageInspect, + Receipts: TxReceipts, + Convertor: BlockConvector, +{ + pub fn blocks_stream_starting( + &self, + block_height: BlockHeight, + ) -> impl Iterator> + Send + Sync + 'static + { + StorageIterator { + convertor: self.convertor.clone(), + db: self.db.clone(), + receipts: self.receipts.clone(), + next_height: Some(block_height), + } + } +} + +impl BlockSource for OldBlocksSource +where + DB: Send + Sync + 'static, + DB: StorageInspect, + DB: StorageInspect, + Receipts: TxReceipts, + Convertor: BlockConvector, +{ + type Block = Convertor::Block; + + fn blocks_starting_from( + &self, + block_height: BlockHeight, + ) -> impl Iterator> + Send + Sync + 'static + { + self.blocks_stream_starting(block_height) + } +} + +pub struct StorageIterator { + convertor: Arc, + db: Arc, + receipts: Arc, + next_height: Option, +} + +impl StorageIterator +where + DB: StorageInspect, + DB: StorageInspect, + Receipts: TxReceipts, + Convertor: BlockConvector, +{ + fn get_block_and_receipts( + &self, + height: &BlockHeight, + ) -> Result>)>> { + let maybe_block = StorageInspect::::get(self.db.as_ref(), height) + .map_err(Error::block_source_error)?; + if let Some(block) = maybe_block { + let tx_ids = block.transactions(); + let txs = self.get_txs(tx_ids)?; + let receipts = self.get_receipts(tx_ids)?; + let block = block.into_owned().uncompress(txs); + Ok(Some((block, receipts))) + } else { + Ok(None) + } + } + + fn get_txs(&self, tx_ids: &[TxId]) -> Result> { + let mut txs = Vec::new(); + for tx_id in tx_ids { + match StorageInspect::::get(self.db.as_ref(), tx_id) + .map_err(Error::block_source_error)? + { + Some(tx) => { + tracing::debug!("found tx id: {:?}", tx_id); + txs.push(tx.into_owned()); + } + None => { + return Ok(vec![]); + } + } + } + Ok(txs) + } + + fn get_receipts(&self, tx_ids: &[TxId]) -> Result>> { + use itertools::Itertools; + tx_ids + .iter() + .map(|tx_id| { + self.receipts + .get_receipts(tx_id) + .map_err(|err| Error::DB(anyhow::anyhow!(err))) + }) + .try_collect() + } +} + +impl Iterator for StorageIterator +where + DB: StorageInspect, + DB: StorageInspect, + Receipts: TxReceipts, + Convertor: BlockConvector, +{ + type Item = Result<(BlockHeight, Convertor::Block)>; + + fn next(&mut self) -> Option { + let next_height = self.next_height?; + + let res = self.get_block_and_receipts(&next_height); + match res { + Ok(Some((block, receipts))) => { + let block = match self.convertor.convert_block(&block, &receipts) { + Ok(b) => b, + Err(e) => return Some(Err(e)), + }; + + self.next_height = next_height.succ(); + Some(Ok((next_height, block))) + } + Ok(None) => None, + Err(e) => Some(Err(e)), + } + } +} diff --git a/crates/services/block_aggregator_api/src/blocks/importer_and_db_source/serializer_adapter.rs b/crates/services/block_aggregator_api/src/blocks/old_block_source/convertor_adapter.rs similarity index 60% rename from crates/services/block_aggregator_api/src/blocks/importer_and_db_source/serializer_adapter.rs rename to crates/services/block_aggregator_api/src/blocks/old_block_source/convertor_adapter.rs index e24932de4b3..ef21cfc24aa 100644 --- a/crates/services/block_aggregator_api/src/blocks/importer_and_db_source/serializer_adapter.rs +++ b/crates/services/block_aggregator_api/src/blocks/old_block_source/convertor_adapter.rs @@ -1,5 +1,5 @@ use crate::{ - blocks::importer_and_db_source::BlockSerializer, + blocks::old_block_source::BlockConvector, protobuf_types::{ Block as ProtoBlock, V1Block as ProtoV1Block, @@ -9,24 +9,26 @@ use crate::{ #[cfg(feature = "fault-proving")] use fuel_core_types::fuel_types::ChainId; +use crate::blocks::old_block_source::convertor_adapter::fuel_to_proto_conversions::{ + proto_header_from_header, + proto_receipt_from_receipt, + proto_tx_from_tx, +}; use fuel_core_types::{ - blockchain::{ - block::Block as FuelBlock, - }, + blockchain::block::Block as FuelBlock, + fuel_tx::Receipt as FuelReceipt, }; -use fuel_core_types::fuel_tx::Receipt as FuelReceipt; -use crate::blocks::importer_and_db_source::serializer_adapter::fuel_to_proto_conversions::{proto_header_from_header, proto_receipt_from_receipt, proto_tx_from_tx}; #[derive(Clone)] -pub struct SerializerAdapter; +pub struct ConvertorAdapter; -impl BlockSerializer for SerializerAdapter { +impl BlockConvector for ConvertorAdapter { type Block = ProtoBlock; - fn serialize_block( + fn convert_block( &self, block: &FuelBlock, - receipts: &[FuelReceipt], + receipts: &[Vec], ) -> crate::result::Result { let proto_header = proto_header_from_header(block.header()); match &block { @@ -38,7 +40,17 @@ impl BlockSerializer for SerializerAdapter { .iter() .map(proto_tx_from_tx) .collect(), - receipts: receipts.iter().map(proto_receipt_from_receipt).collect(), + // TODO: It should be `Vec>`, but we need to update protobuf + // definition first + receipts: receipts + .iter() + .flat_map(|receipts| { + receipts + .iter() + .map(proto_receipt_from_receipt) + .collect::>() + }) + .collect(), }; Ok(ProtoBlock { versioned_block: Some(ProtoVersionedBlock::V1(proto_v1_block)), @@ -58,9 +70,12 @@ pub mod proto_to_fuel_conversions; #[cfg(test)] mod tests { use super::*; - use fuel_core_types::test_helpers::{arb_block, arb_receipts}; + use crate::blocks::old_block_source::convertor_adapter::proto_to_fuel_conversions::fuel_block_from_protobuf; + use fuel_core_types::test_helpers::{ + arb_block, + arb_receipts, + }; use proptest::prelude::*; - use crate::blocks::importer_and_db_source::serializer_adapter::proto_to_fuel_conversions::fuel_block_from_protobuf; proptest! { #![proptest_config(ProptestConfig { @@ -72,10 +87,11 @@ mod tests { receipts in arb_receipts()) { // given - let serializer = SerializerAdapter; + let convertor = ConvertorAdapter; // when - let proto_block = serializer.serialize_block(&block, &receipts).unwrap(); + let receipts = vec![receipts]; + let proto_block = convertor.convert_block(&block, &receipts).unwrap(); // then let (deserialized_block, deserialized_receipts) = fuel_block_from_protobuf(proto_block, &msg_ids, event_inbox_root).unwrap(); diff --git a/crates/services/block_aggregator_api/src/blocks/importer_and_db_source/serializer_adapter/fuel_to_proto_conversions.rs b/crates/services/block_aggregator_api/src/blocks/old_block_source/convertor_adapter/fuel_to_proto_conversions.rs similarity index 99% rename from crates/services/block_aggregator_api/src/blocks/importer_and_db_source/serializer_adapter/fuel_to_proto_conversions.rs rename to crates/services/block_aggregator_api/src/blocks/old_block_source/convertor_adapter/fuel_to_proto_conversions.rs index 66c3a84ce29..d7079808389 100644 --- a/crates/services/block_aggregator_api/src/blocks/importer_and_db_source/serializer_adapter/fuel_to_proto_conversions.rs +++ b/crates/services/block_aggregator_api/src/blocks/old_block_source/convertor_adapter/fuel_to_proto_conversions.rs @@ -1,7 +1,7 @@ #[cfg(feature = "fault-proving")] use crate::protobuf_types::V2Header as ProtoV2Header; use crate::{ - blocks::importer_and_db_source::serializer_adapter::proto_to_fuel_conversions::bytes32_to_vec, + blocks::old_block_source::convertor_adapter::proto_to_fuel_conversions::bytes32_to_vec, protobuf_types::{ BlobTransaction as ProtoBlobTx, ChangeOutput as ProtoChangeOutput, diff --git a/crates/services/block_aggregator_api/src/blocks/importer_and_db_source/serializer_adapter/proto_to_fuel_conversions.rs b/crates/services/block_aggregator_api/src/blocks/old_block_source/convertor_adapter/proto_to_fuel_conversions.rs similarity index 98% rename from crates/services/block_aggregator_api/src/blocks/importer_and_db_source/serializer_adapter/proto_to_fuel_conversions.rs rename to crates/services/block_aggregator_api/src/blocks/old_block_source/convertor_adapter/proto_to_fuel_conversions.rs index bf715b0e30e..c5361fc6a06 100644 --- a/crates/services/block_aggregator_api/src/blocks/importer_and_db_source/serializer_adapter/proto_to_fuel_conversions.rs +++ b/crates/services/block_aggregator_api/src/blocks/old_block_source/convertor_adapter/proto_to_fuel_conversions.rs @@ -1,5 +1,5 @@ #[cfg(feature = "fault-proving")] -use crate::blocks::importer_and_db_source::serializer_adapter::ChainId; +use crate::blocks::old_block_source::convertor_adapter::ChainId; use crate::{ protobuf_types::{ Block as ProtoBlock, @@ -501,7 +501,7 @@ pub fn fuel_block_from_protobuf( proto_block: ProtoBlock, msg_ids: &[fuel_core_types::fuel_tx::MessageId], event_inbox_root: Bytes32, -) -> crate::result::Result<(FuelBlock, Vec)> { +) -> crate::result::Result<(FuelBlock, Vec>)> { let versioned_block = proto_block .versioned_block .ok_or_else(|| anyhow::anyhow!("Missing protobuf versioned_block")) @@ -519,11 +519,14 @@ pub fn fuel_block_from_protobuf( .iter() .map(tx_from_proto_tx) .collect::>()?; - let receipts = v1_inner - .receipts - .iter() - .map(receipt_from_proto) - .collect::>()?; + // TODO: It should be `Vec>`, but we need to update protobuf + let receipts = vec![ + v1_inner + .receipts + .iter() + .map(receipt_from_proto) + .collect::>()?, + ]; (partial_header, txs, receipts) } }; diff --git a/crates/services/block_aggregator_api/src/db.rs b/crates/services/block_aggregator_api/src/db.rs index 7e326bdc737..072fd30c743 100644 --- a/crates/services/block_aggregator_api/src/db.rs +++ b/crates/services/block_aggregator_api/src/db.rs @@ -1,7 +1,4 @@ -use crate::{ - blocks::BlockSourceEvent, - result::Result, -}; +use crate::result::Result; use fuel_core_types::fuel_types::BlockHeight; pub mod remote_cache; @@ -10,29 +7,34 @@ pub mod storage_db; pub mod storage_or_remote_db; pub mod table; -/// The definition of the block aggregator database. -pub trait BlockAggregatorDB: Send + Sync { - type Block; +pub trait BlocksProvider: Send + Sync + 'static { + type Block: Send + Sync + 'static; /// The type used to report a range of blocks type BlockRangeResponse; - /// Stores a block with the given ID - fn store_block( - &mut self, - block: BlockSourceEvent, - ) -> impl Future> + Send; - /// Retrieves a range of blocks from the database fn get_block_range( &self, first: BlockHeight, last: BlockHeight, - ) -> impl Future> + Send; + ) -> Result; /// Retrieves the current height of the aggregated blocks If there is a break in the blocks, /// i.e. the blocks are being aggregated out of order, return the height of the last /// contiguous block - fn get_current_height( - &self, - ) -> impl Future>> + Send; + fn get_current_height(&self) -> Result>; +} + +/// The definition of the block aggregator database. +pub trait BlocksStorage: Send + Sync { + type Block; + /// The type used to report a range of blocks + type BlockRangeResponse; + + /// Stores a block with the given ID + fn store_block( + &mut self, + block_height: BlockHeight, + block: &Self::Block, + ) -> impl Future> + Send; } diff --git a/crates/services/block_aggregator_api/src/db/remote_cache.rs b/crates/services/block_aggregator_api/src/db/remote_cache.rs index ae0e4b03173..20ad0532e2f 100644 --- a/crates/services/block_aggregator_api/src/db/remote_cache.rs +++ b/crates/services/block_aggregator_api/src/db/remote_cache.rs @@ -1,9 +1,10 @@ use crate::{ block_range_response::BlockRangeResponse, - blocks::BlockSourceEvent, db::{ - BlockAggregatorDB, + BlocksProvider, + BlocksStorage, table::{ + Column, LatestBlock, Mode, }, @@ -22,14 +23,12 @@ use flate2::{ write::GzEncoder, }; use fuel_core_storage::{ - Error as StorageError, StorageAsMut, StorageAsRef, - StorageInspect, - StorageMutate, + kv_store::KeyValueInspect, + structured_storage::AsStructuredStorage, transactional::{ Modifiable, - StorageTransaction, WriteTransaction, }, }; @@ -41,45 +40,54 @@ use std::io::Write; #[cfg(test)] mod tests; -#[allow(unused)] pub struct RemoteCache { // aws configuration aws_bucket: String, - requester_pays: bool, - aws_endpoint: Option, client: Client, publishes_blocks: bool, // track consistency between runs local_persisted: S, - sync_from: BlockHeight, - highest_new_height: Option, - orphaned_new_height: Option, - synced: bool, } impl RemoteCache { - #[allow(clippy::too_many_arguments)] - pub async fn new( + pub fn new( aws_bucket: String, - requester_pays: bool, - aws_endpoint: Option, client: Client, local_persisted: S, - sync_from: BlockHeight, publish: bool, ) -> RemoteCache { RemoteCache { aws_bucket, - requester_pays, - aws_endpoint, client, publishes_blocks: publish, local_persisted, - sync_from, - highest_new_height: None, - orphaned_new_height: None, - synced: false, + } + } +} + +pub struct RemoteBlocksProvider { + // aws configuration + aws_bucket: String, + requester_pays: bool, + aws_endpoint: Option, + + // track consistency between runs + local_persisted: S, +} + +impl RemoteBlocksProvider { + pub fn new( + aws_bucket: String, + requester_pays: bool, + aws_endpoint: Option, + local_persisted: S, + ) -> Self { + RemoteBlocksProvider { + aws_bucket, + requester_pays, + aws_endpoint, + local_persisted, } } @@ -106,22 +114,72 @@ impl RemoteCache { } } -impl BlockAggregatorDB for RemoteCache +impl BlocksProvider for RemoteBlocksProvider +where + S: Send + Sync + 'static, + S: KeyValueInspect, +{ + type Block = ProtoBlock; + type BlockRangeResponse = BlockRangeResponse; + + fn get_block_range( + &self, + first: BlockHeight, + last: BlockHeight, + ) -> crate::result::Result { + let current_height = self.get_current_height()?.unwrap_or(BlockHeight::new(0)); + if last > current_height { + Err(Error::db_error(anyhow!( + "Requested block height {} is greater than current synced height {}", + last, + current_height + ))) + } else { + self.stream_blocks(first, last) + } + } + + fn get_current_height(&self) -> crate::result::Result> { + let height = self + .local_persisted + .as_structured_storage() + .storage_as_ref::() + .get(&()) + .map_err(|e| Error::DB(anyhow!(e)))? + .map(|b| b.height()); + + Ok(height) + } +} + +impl BlocksStorage for RemoteCache where S: Send + Sync, S: Modifiable, - S: StorageInspect, - for<'b> StorageTransaction<&'b mut S>: - StorageMutate, + S: KeyValueInspect, { type Block = ProtoBlock; type BlockRangeResponse = BlockRangeResponse; async fn store_block( &mut self, - block_event: BlockSourceEvent, + height: BlockHeight, + block: &Self::Block, ) -> crate::result::Result<()> { - let (height, block) = block_event.clone().into_inner(); + let current_height = self.get_current_height()?; + + if let Some(current_height) = current_height + && let Some(next_height) = current_height.succ() + && next_height != height + { + return Err(Error::db_error(anyhow!( + "Cannot store block at height {}: current height is {}, expected next height is {}", + height, + current_height, + next_height + ))); + } + let key = block_height_to_key(&height); let mut buf = Vec::new(); block.encode(&mut buf).map_err(Error::db_error)?; @@ -138,106 +196,32 @@ where .content_type("application/grpc-web"); let _ = req.send().await.map_err(Error::db_error)?; } - match block_event { - BlockSourceEvent::NewBlock(new_height, _) => { - tracing::debug!("New block: {:?}", new_height); - self.highest_new_height = Some(new_height); - if self.synced { - tracing::debug!("Updating latest block to {:?}", new_height); - let mut tx = self.local_persisted.write_transaction(); - tx.storage_as_mut::() - .insert(&(), &Mode::new_s3(new_height)) - .map_err(|e| Error::DB(anyhow!(e)))?; - tx.commit().map_err(|e| Error::DB(anyhow!(e)))?; - } else if new_height == self.sync_from - || self.height_is_next_height(new_height)? - { - tracing::debug!("Updating latest block to {:?}", new_height); - self.synced = true; - self.highest_new_height = Some(new_height); - self.orphaned_new_height = None; - let mut tx = self.local_persisted.write_transaction(); - tx.storage_as_mut::() - .insert(&(), &Mode::new_s3(new_height)) - .map_err(|e| Error::DB(anyhow!(e)))?; - tx.commit().map_err(|e| Error::DB(anyhow!(e)))?; - } else if self.orphaned_new_height.is_none() { - tracing::info!("Marking block as orphaned: {:?}", new_height); - self.orphaned_new_height = Some(new_height); - } - } - BlockSourceEvent::OldBlock(height, _) => { - tracing::debug!("Old block: {:?}", height); - let mut tx = self.local_persisted.write_transaction(); - let latest_height = if height.succ() == self.orphaned_new_height { - tracing::debug!("Marking block as synced: {:?}", height); - self.orphaned_new_height = None; - self.synced = true; - self.highest_new_height.unwrap_or(height) - } else { - tracing::debug!("Updating latest block to {:?}", height); - height - }; - tx.storage_as_mut::() - .insert(&(), &Mode::new_s3(latest_height)) - .map_err(|e| Error::DB(anyhow!(e)))?; - tx.commit().map_err(|e| Error::DB(anyhow!(e)))?; - } - } - Ok(()) - } - async fn get_block_range( - &self, - first: BlockHeight, - last: BlockHeight, - ) -> crate::result::Result { - let current_height = self - .get_current_height() - .await? - .unwrap_or(BlockHeight::new(0)); - if last > current_height { - Err(Error::db_error(anyhow!( - "Requested block height {} is greater than current synced height {}", - last, - current_height - ))) - } else { - self.stream_blocks(first, last) - } - } - - async fn get_current_height(&self) -> crate::result::Result> { - tracing::debug!("Getting current height from local cache"); - let height = self - .local_persisted - .storage_as_ref::() - .get(&()) + let mut tx = self.local_persisted.write_transaction(); + tx.storage_as_mut::() + .insert(&(), &Mode::new_s3(height)) .map_err(|e| Error::DB(anyhow!(e)))?; + tx.commit().map_err(|e| Error::DB(anyhow!(e)))?; - Ok(height.map(|b| b.height())) + Ok(()) } } impl RemoteCache where S: Send + Sync, - S: StorageInspect, - for<'b> StorageTransaction<&'b mut S>: - StorageMutate, + S: KeyValueInspect, { - fn height_is_next_height(&self, height: BlockHeight) -> crate::result::Result { - let maybe_latest_height = self + pub fn get_current_height(&self) -> crate::result::Result> { + let height = self .local_persisted + .as_structured_storage() .storage_as_ref::() .get(&()) .map_err(|e| Error::DB(anyhow!(e)))? - .map(|m| m.height()); - if let Some(latest_height) = maybe_latest_height { - Ok(latest_height.succ() == Some(height)) - } else { - Ok(false) - } + .map(|b| b.height()); + + Ok(height) } } diff --git a/crates/services/block_aggregator_api/src/db/remote_cache/tests.rs b/crates/services/block_aggregator_api/src/db/remote_cache/tests.rs index ec444a2bb70..e2eb53d56d1 100644 --- a/crates/services/block_aggregator_api/src/db/remote_cache/tests.rs +++ b/crates/services/block_aggregator_api/src/db/remote_cache/tests.rs @@ -1,9 +1,9 @@ use super::*; use crate::{ block_range_response::RemoteS3Response, - blocks::importer_and_db_source::{ - BlockSerializer, - serializer_adapter::SerializerAdapter, + blocks::old_block_source::{ + BlockConvector, + convertor_adapter::ConvertorAdapter, }, db::table::{ Column, @@ -25,7 +25,6 @@ use fuel_core_storage::{ }; use fuel_core_types::blockchain::block::Block as FuelBlock; use futures::StreamExt; -use std::iter; fn database() -> StorageTransaction> { InMemoryStorage::default().into_transaction() @@ -33,8 +32,8 @@ fn database() -> StorageTransaction> { fn arb_proto_block() -> ProtoBlock { let block = FuelBlock::default(); - let serializer = SerializerAdapter; - serializer.serialize_block(&block, &[]).unwrap() + let convertor = ConvertorAdapter; + convertor.convert_block(&block, &[]).unwrap() } fn put_happy_rule() -> Rule { mock!(Client::put_object) @@ -50,15 +49,12 @@ async fn store_block__happy_path() { let client = mock_client!(aws_sdk_s3, [&put_happy_rule()]); let aws_bucket = "test-bucket".to_string(); let storage = database(); - let sync_from = BlockHeight::new(0); - let mut adapter = - RemoteCache::new(aws_bucket, false, None, client, storage, sync_from, true).await; + let mut adapter = RemoteCache::new(aws_bucket, client, storage, true); let block_height = BlockHeight::new(123); let block = arb_proto_block(); - let block = BlockSourceEvent::OldBlock(block_height, block); // when - let res = adapter.store_block(block).await; + let res = adapter.store_block(block_height, &block).await; // then assert!(res.is_ok()); @@ -67,25 +63,19 @@ async fn store_block__happy_path() { #[tokio::test] async fn get_block_range__happy_path() { // given - let client = mock_client!(aws_sdk_s3, []); - let aws_bucket = "test-bucket".to_string(); - let storage = database(); - let sync_from = BlockHeight::new(0); - let adapter = RemoteCache::new( - aws_bucket.clone(), - false, - None, - client, - storage, - sync_from, - true, - ) - .await; let start = BlockHeight::new(999); let end = BlockHeight::new(1003); + let aws_bucket = "test-bucket".to_string(); + let mut storage = database(); + storage + .storage_as_mut::() + .insert(&(), &Mode::new_s3(end)) + .unwrap(); + + let adapter = RemoteBlocksProvider::new(aws_bucket.clone(), false, None, storage); // when - let addresses = adapter.get_block_range(start, end).await.unwrap(); + let addresses = adapter.get_block_range(start, end).unwrap(); // then let actual = match addresses { @@ -115,26 +105,24 @@ async fn get_current_height__returns_highest_continuous_block() { let client = mock_client!(aws_sdk_s3, [&put_happy_rule()]); let aws_bucket = "test-bucket".to_string(); let storage = database(); - let sync_from = BlockHeight::new(0); - let mut adapter = - RemoteCache::new(aws_bucket, false, None, client, storage, sync_from, true).await; + let mut adapter = RemoteCache::new(aws_bucket, client, storage, true); let expected = BlockHeight::new(123); let block = arb_proto_block(); - let block = BlockSourceEvent::OldBlock(expected, block); - adapter.store_block(block).await.unwrap(); + adapter.store_block(expected, &block).await.unwrap(); // when - let actual = adapter.get_current_height().await.unwrap().unwrap(); + let actual = adapter.get_current_height().unwrap().unwrap(); // then assert_eq!(expected, actual); } #[tokio::test] -async fn store_block__does_not_update_the_highest_continuous_block_if_not_contiguous() { - // given +async fn store_block__fails_if_not_contiguous() { let mut storage = database(); + + // Given let mut tx = storage.write_transaction(); let starting_height = BlockHeight::from(1u32); tx.storage_as_mut::() @@ -143,106 +131,14 @@ async fn store_block__does_not_update_the_highest_continuous_block_if_not_contig tx.commit().unwrap(); let client = mock_client!(aws_sdk_s3, [&put_happy_rule()]); let aws_bucket = "test-bucket".to_string(); - let sync_from = BlockHeight::new(0); - let mut adapter = - RemoteCache::new(aws_bucket, false, None, client, storage, sync_from, true).await; + let mut adapter = RemoteCache::new(aws_bucket, client, storage, true); let expected = BlockHeight::new(3); let block = arb_proto_block(); - let block = BlockSourceEvent::NewBlock(expected, block); - adapter.store_block(block).await.unwrap(); - - // when - let expected = starting_height; - let actual = adapter.get_current_height().await.unwrap().unwrap(); - assert_eq!(expected, actual); -} - -#[tokio::test] -async fn store_block__updates_the_highest_continuous_block_if_filling_a_gap() { - let rules: Vec<_> = iter::repeat_with(put_happy_rule).take(10).collect(); - let client = mock_client!(aws_sdk_s3, rules.iter()); - let aws_bucket = "test-bucket".to_string(); - - // given - let db = database(); - let sync_from = BlockHeight::new(0); - let mut adapter = - RemoteCache::new(aws_bucket, false, None, client, db, sync_from, true).await; - - for height in 2..=10u32 { - let height = BlockHeight::from(height); - let block = arb_proto_block(); - let block = BlockSourceEvent::NewBlock(height, block.clone()); - adapter.store_block(block).await.unwrap(); - } - // when - let height = BlockHeight::from(1u32); - let some_block = arb_proto_block(); - let block = BlockSourceEvent::OldBlock(height, some_block.clone()); - adapter.store_block(block).await.unwrap(); - - // then - let expected = BlockHeight::from(10u32); - let actual = adapter.get_current_height().await.unwrap().unwrap(); - assert_eq!(expected, actual); - - assert!(adapter.synced) -} -#[tokio::test] -async fn store_block__new_block_updates_the_highest_continuous_block_if_synced() { - let rules: Vec<_> = iter::repeat_with(put_happy_rule).take(10).collect(); - let client = mock_client!(aws_sdk_s3, rules.iter()); - let aws_bucket = "test-bucket".to_string(); - - // given - let db = database(); - let sync_from = BlockHeight::new(0); - let mut adapter = - RemoteCache::new(aws_bucket, false, None, client, db, sync_from, true).await; - - let height = BlockHeight::from(0u32); - let some_block = arb_proto_block(); - let block = BlockSourceEvent::OldBlock(height, some_block.clone()); - adapter.store_block(block).await.unwrap(); - - // when - let height = BlockHeight::from(1u32); - let some_block = arb_proto_block(); - let block = BlockSourceEvent::NewBlock(height, some_block.clone()); - adapter.store_block(block).await.unwrap(); - - // then - let expected = BlockHeight::from(1u32); - let actual = adapter.get_current_height().await.unwrap().unwrap(); - assert_eq!(expected, actual); - - assert!(adapter.synced) -} - -#[tokio::test] -async fn store_block__new_block_comes_first() { - let rules: Vec<_> = iter::repeat_with(put_happy_rule).take(10).collect(); - let client = mock_client!(aws_sdk_s3, rules.iter()); - let aws_bucket = "test-bucket".to_string(); - - // given - let db = database(); - let sync_from = BlockHeight::new(0); - let mut adapter = - RemoteCache::new(aws_bucket, false, None, client, db, sync_from, true).await; - - // when - let height = BlockHeight::from(0u32); - let some_block = arb_proto_block(); - let block = BlockSourceEvent::NewBlock(height, some_block.clone()); - adapter.store_block(block).await.unwrap(); - - // then - let expected = BlockHeight::from(0u32); - let actual = adapter.get_current_height().await.unwrap().unwrap(); - assert_eq!(expected, actual); + // When + let result = adapter.store_block(expected, &block).await; - assert!(adapter.synced); + // Then + result.expect_err("expected error"); } diff --git a/crates/services/block_aggregator_api/src/db/storage_db.rs b/crates/services/block_aggregator_api/src/db/storage_db.rs index 8f57c925eb2..009ef117960 100644 --- a/crates/services/block_aggregator_api/src/db/storage_db.rs +++ b/crates/services/block_aggregator_api/src/db/storage_db.rs @@ -1,8 +1,8 @@ use crate::{ block_range_response::BlockRangeResponse, - blocks::BlockSourceEvent, db::{ - BlockAggregatorDB, + BlocksProvider, + BlocksStorage, table::{ Blocks, Column, @@ -19,17 +19,13 @@ use crate::{ use anyhow::anyhow; use fuel_core_services::stream::Stream; use fuel_core_storage::{ - Error as StorageError, StorageAsMut, StorageAsRef, - StorageInspect, - StorageMutate, kv_store::KeyValueInspect, + structured_storage::AsStructuredStorage, transactional::{ AtomicView, Modifiable, - ReadTransaction, - StorageTransaction, WriteTransaction, }, }; @@ -45,96 +41,76 @@ use std::{ #[cfg(test)] mod tests; +pub struct StorageBlocksProvider { + storage: S, +} + +impl StorageBlocksProvider { + pub fn new(storage: S) -> Self { + Self { storage } + } +} + pub struct StorageDB { - highest_new_height: Option, - orphaned_new_height: Option, - synced: bool, - sync_from: BlockHeight, storage: S, } impl StorageDB { - pub fn new(storage: S, sync_from: BlockHeight) -> Self { - Self { - highest_new_height: None, - orphaned_new_height: None, - synced: false, - sync_from, - storage, - } + pub fn new(storage: S) -> Self { + Self { storage } } } -impl BlockAggregatorDB for StorageDB +impl BlocksStorage for StorageDB where - S: Modifiable + std::fmt::Debug, + S: Modifiable + Send + Sync, S: KeyValueInspect, - S: StorageInspect, - for<'b> StorageTransaction<&'b mut S>: StorageMutate, - for<'b> StorageTransaction<&'b mut S>: - StorageMutate, - S: AtomicView, - T: Unpin + Send + Sync + KeyValueInspect + 'static + std::fmt::Debug, - StorageTransaction: StorageInspect, { type Block = ProtoBlock; type BlockRangeResponse = BlockRangeResponse; async fn store_block( &mut self, - block_event: BlockSourceEvent, + height: BlockHeight, + block: &Self::Block, ) -> Result<()> { - let (height, block) = block_event.clone().into_inner(); + let current_height = self.get_current_height()?; + + if let Some(current_height) = current_height + && let Some(next_height) = current_height.succ() + && height != next_height + { + return Err(Error::DB(anyhow!( + "Cannot store block at height {:?}, expected height {:?}", + height, + next_height + ))); + } + let mut tx = self.storage.write_transaction(); tx.storage_as_mut::() - .insert(&height, &block) + .insert(&height, block) + .map_err(|e| Error::DB(anyhow!(e)))?; + tx.storage_as_mut::() + .insert(&(), &Mode::Local(height)) .map_err(|e| Error::DB(anyhow!(e)))?; tx.commit().map_err(|e| Error::DB(anyhow!(e)))?; - match block_event { - BlockSourceEvent::NewBlock(new_height, _) => { - tracing::debug!("New block: {:?}", new_height); - self.highest_new_height = Some(new_height); - if self.synced { - let mut tx = self.storage.write_transaction(); - tx.storage_as_mut::() - .insert(&(), &Mode::Local(new_height)) - .map_err(|e| Error::DB(anyhow!(e)))?; - tx.commit().map_err(|e| Error::DB(anyhow!(e)))?; - } else if new_height == self.sync_from - || self.height_is_next_height(new_height)? - { - let mut tx = self.storage.write_transaction(); - self.synced = true; - self.highest_new_height = Some(new_height); - tx.storage_as_mut::() - .insert(&(), &Mode::Local(new_height)) - .map_err(|e| Error::DB(anyhow!(e)))?; - tx.commit().map_err(|e| Error::DB(anyhow!(e)))?; - } else if self.orphaned_new_height.is_none() { - self.orphaned_new_height = Some(new_height); - } - } - BlockSourceEvent::OldBlock(height, _) => { - tracing::debug!("Old block: {:?}", height); - let latest_height = if height.succ() == self.orphaned_new_height { - self.orphaned_new_height = None; - self.synced = true; - self.highest_new_height.unwrap_or(height) - } else { - height - }; - let mut tx = self.storage.write_transaction(); - tx.storage_as_mut::() - .insert(&(), &Mode::Local(latest_height)) - .map_err(|e| Error::DB(anyhow!(e)))?; - tx.commit().map_err(|e| Error::DB(anyhow!(e)))?; - } - } Ok(()) } +} + +impl BlocksProvider for StorageBlocksProvider +where + S: 'static, + S: KeyValueInspect, + S: AtomicView, + S::LatestView: Unpin + Send + Sync + KeyValueInspect + 'static, +{ + type Block = ProtoBlock; + type BlockRangeResponse = BlockRangeResponse; - async fn get_block_range( + fn get_block_range( &self, first: BlockHeight, last: BlockHeight, @@ -147,39 +123,33 @@ where Ok(BlockRangeResponse::Literal(Box::pin(stream))) } - async fn get_current_height(&self) -> Result> { + fn get_current_height(&self) -> Result> { let height = self .storage + .as_structured_storage() .storage_as_ref::() .get(&()) - .map_err(|e| Error::DB(anyhow!(e)))?; + .map_err(|e| Error::DB(anyhow!(e)))? + .map(|b| b.height()); - Ok(height.map(|b| b.height())) + Ok(height) } } -impl StorageDB +impl StorageDB where - S: Modifiable + std::fmt::Debug, S: KeyValueInspect, - S: StorageInspect, - for<'b> StorageTransaction<&'b mut S>: - StorageMutate, - S: AtomicView, - T: Unpin + Send + Sync + KeyValueInspect + 'static + std::fmt::Debug, { - fn height_is_next_height(&self, height: BlockHeight) -> Result { - let maybe_latest_height = self + pub fn get_current_height(&self) -> Result> { + let height = self .storage + .as_structured_storage() .storage_as_ref::() .get(&()) .map_err(|e| Error::DB(anyhow!(e)))? - .map(|m| m.height()); - if let Some(latest_height) = maybe_latest_height { - Ok(latest_height.succ() == Some(height)) - } else { - Ok(false) - } + .map(|b| b.height()); + + Ok(height) } } pub struct StorageStream { @@ -200,8 +170,8 @@ impl StorageStream { impl Stream for StorageStream where - S: Unpin + ReadTransaction + std::fmt::Debug, - for<'a> StorageTransaction<&'a S>: StorageInspect, + S: Unpin, + S: KeyValueInspect, { type Item = (BlockHeight, ProtoBlock); @@ -215,8 +185,8 @@ where ); let this = self.get_mut(); if let Some(height) = this.next { - let tx = this.inner.read_transaction(); - let next_block = tx + let storage = this.inner.as_structured_storage(); + let next_block = storage .storage_as_ref::() .get(&height) .map_err(|e| Error::DB(anyhow!(e))); diff --git a/crates/services/block_aggregator_api/src/db/storage_db/tests.rs b/crates/services/block_aggregator_api/src/db/storage_db/tests.rs index 0b116b4a246..4dd285d657e 100644 --- a/crates/services/block_aggregator_api/src/db/storage_db/tests.rs +++ b/crates/services/block_aggregator_api/src/db/storage_db/tests.rs @@ -2,9 +2,9 @@ use super::*; use crate::{ - blocks::importer_and_db_source::{ - BlockSerializer, - serializer_adapter::SerializerAdapter, + blocks::old_block_source::{ + BlockConvector, + convertor_adapter::ConvertorAdapter, }, db::table::{ Column, @@ -14,7 +14,10 @@ use crate::{ use fuel_core_storage::{ StorageAsRef, structured_storage::test::InMemoryStorage, - transactional::IntoTransaction, + transactional::{ + IntoTransaction, + StorageTransaction, + }, }; use fuel_core_types::{ blockchain::block::Block as FuelBlock, @@ -28,11 +31,11 @@ fn database() -> StorageTransaction> { } fn proto_block_with_height(height: BlockHeight) -> ProtoBlock { - let serializer_adapter = SerializerAdapter; + let convertor_adapter = ConvertorAdapter; let mut default_block = FuelBlock::::default(); default_block.header_mut().set_block_height(height); - serializer_adapter - .serialize_block(&default_block, &[]) + convertor_adapter + .convert_block(&default_block, &[]) .unwrap() } @@ -40,13 +43,12 @@ fn proto_block_with_height(height: BlockHeight) -> ProtoBlock { async fn store_block__adds_to_storage() { // given let db = database(); - let mut adapter = StorageDB::new(db, BlockHeight::from(0u32)); + let mut adapter = StorageDB::new(db); let height = BlockHeight::from(1u32); let expected = proto_block_with_height(height); - let block = BlockSourceEvent::OldBlock(height, expected.clone()); // when - adapter.store_block(block).await.unwrap(); + adapter.store_block(height, &expected).await.unwrap(); // then let actual = adapter @@ -84,11 +86,11 @@ async fn get_block__can_get_expected_range() { tx.commit().unwrap(); let db = db.commit().unwrap(); let tx = db.into_transaction(); - let adapter = StorageDB::new(tx, BlockHeight::from(0u32)); + let adapter = StorageBlocksProvider::new(tx); // when let BlockRangeResponse::Literal(stream) = - adapter.get_block_range(height_2, height_3).await.unwrap() + adapter.get_block_range(height_2, height_3).unwrap() else { panic!("expected literal response") }; @@ -99,26 +101,25 @@ async fn get_block__can_get_expected_range() { } #[tokio::test] -async fn store_block__updates_the_highest_continuous_block_if_contiguous() { +async fn store_block__updates_continuous_block_if_contiguous() { // given let db = database(); - let mut adapter = StorageDB::new(db, BlockHeight::from(0u32)); + let mut adapter = StorageDB::new(db); let height = BlockHeight::from(1u32); let expected = proto_block_with_height(height); - let block = BlockSourceEvent::OldBlock(height, expected.clone()); // when - adapter.store_block(block).await.unwrap(); + adapter.store_block(height, &expected).await.unwrap(); // then let expected = height; - let actual = adapter.get_current_height().await.unwrap().unwrap(); + let actual = adapter.get_current_height().unwrap().unwrap(); assert_eq!(expected, actual); } #[tokio::test] -async fn store_block__does_not_update_the_highest_continuous_block_if_not_contiguous() { - // given +async fn store_block__fails_if_not_contiguous() { + // Given let mut db = database(); let mut tx = db.write_transaction(); let starting_height = BlockHeight::from(1u32); @@ -126,84 +127,13 @@ async fn store_block__does_not_update_the_highest_continuous_block_if_not_contig .insert(&(), &Mode::Local(starting_height)) .unwrap(); tx.commit().unwrap(); - let mut adapter = StorageDB::new(db, BlockHeight::from(0u32)); + let mut adapter = StorageDB::new(db); let height = BlockHeight::from(3u32); let proto = proto_block_with_height(height); - let block = BlockSourceEvent::NewBlock(height, proto.clone()); - - // when - adapter.store_block(block).await.unwrap(); - - // then - let expected = starting_height; - let actual = adapter.get_current_height().await.unwrap().unwrap(); - assert_eq!(expected, actual); -} - -#[tokio::test] -async fn store_block__updates_the_highest_continuous_block_if_filling_a_gap() { - // given - let db = database(); - let mut adapter = StorageDB::new(db, BlockHeight::from(0u32)); - - for height in 2..=10u32 { - let height = BlockHeight::from(height); - let block = proto_block_with_height(height); - let block = BlockSourceEvent::NewBlock(height, block.clone()); - adapter.store_block(block).await.unwrap(); - } - // when - let height = BlockHeight::from(1u32); - let some_block = proto_block_with_height(height); - let block = BlockSourceEvent::OldBlock(height, some_block.clone()); - adapter.store_block(block).await.unwrap(); - - // then - let expected = BlockHeight::from(10u32); - let actual = adapter.get_current_height().await.unwrap().unwrap(); - assert_eq!(expected, actual); -} -#[tokio::test] -async fn store_block__new_block_updates_the_highest_continuous_block_if_synced() { - // given - let db = database(); - let mut adapter = StorageDB::new(db, BlockHeight::from(0u32)); - - let height = BlockHeight::from(0u32); - let some_block = proto_block_with_height(height); - let block = BlockSourceEvent::OldBlock(height, some_block.clone()); - adapter.store_block(block).await.unwrap(); - - // when - let height = BlockHeight::from(1u32); - let some_block = proto_block_with_height(height); - let block = BlockSourceEvent::NewBlock(height, some_block.clone()); - adapter.store_block(block).await.unwrap(); - - // then - let expected = BlockHeight::from(1u32); - let actual = adapter.get_current_height().await.unwrap().unwrap(); - assert_eq!(expected, actual); - - assert!(adapter.synced) -} - -#[tokio::test] -async fn store_block__new_block_comes_first() { - // given - let db = database(); - let mut adapter = StorageDB::new(db, BlockHeight::from(0u32)); // when - let height = BlockHeight::from(0u32); - let some_block = proto_block_with_height(height); - let block = BlockSourceEvent::NewBlock(height, some_block.clone()); - adapter.store_block(block).await.unwrap(); + let result = adapter.store_block(height, &proto).await; // then - let expected = BlockHeight::from(0u32); - let actual = adapter.get_current_height().await.unwrap().unwrap(); - assert_eq!(expected, actual); - - assert!(adapter.synced); + result.expect_err("expected error"); } diff --git a/crates/services/block_aggregator_api/src/db/storage_or_remote_db.rs b/crates/services/block_aggregator_api/src/db/storage_or_remote_db.rs index b501e801678..5bc150fbaac 100644 --- a/crates/services/block_aggregator_api/src/db/storage_or_remote_db.rs +++ b/crates/services/block_aggregator_api/src/db/storage_or_remote_db.rs @@ -1,36 +1,62 @@ use crate::{ block_range_response::BlockRangeResponse, - blocks::BlockSourceEvent, db::{ - BlockAggregatorDB, - remote_cache::RemoteCache, - storage_db::StorageDB, - table::{ - Blocks, - Column, - LatestBlock, + BlocksProvider, + BlocksStorage, + remote_cache::{ + RemoteBlocksProvider, + RemoteCache, }, + storage_db::{ + StorageBlocksProvider, + StorageDB, + }, + table::Column, }, + protobuf_types::Block as ProtoBlock, result::Result, }; use aws_config::{ BehaviorVersion, default_provider::credentials::DefaultCredentialsChain, }; - use fuel_core_storage::{ - Error as StorageError, - StorageInspect, - StorageMutate, kv_store::KeyValueInspect, transactional::{ AtomicView, Modifiable, - StorageTransaction, }, }; use fuel_core_types::fuel_types::BlockHeight; +/// A union of a storage and a remote cache for the block aggregator. This allows both to be +/// supported in production depending on the configuration +pub enum StorageOrRemoteBlocksProvider { + Remote(RemoteBlocksProvider), + Storage(StorageBlocksProvider), +} + +impl StorageOrRemoteBlocksProvider { + pub fn new_storage(storage: S) -> Self { + StorageOrRemoteBlocksProvider::Storage(StorageBlocksProvider::new(storage)) + } + + pub fn new_s3( + storage: S, + aws_bucket: String, + requester_pays: bool, + aws_endpoint_url: Option, + ) -> Self { + let remote_cache = RemoteBlocksProvider::new( + aws_bucket, + requester_pays, + aws_endpoint_url, + storage, + ); + StorageOrRemoteBlocksProvider::Remote(remote_cache) + } +} + /// A union of a storage and a remote cache for the block aggregator. This allows both to be /// supported in production depending on the configuration pub enum StorageOrRemoteDB { @@ -39,17 +65,14 @@ pub enum StorageOrRemoteDB { } impl StorageOrRemoteDB { - pub fn new_storage(storage: S, sync_from: BlockHeight) -> Self { - StorageOrRemoteDB::Storage(StorageDB::new(storage, sync_from)) + pub fn new_storage(storage: S) -> Self { + StorageOrRemoteDB::Storage(StorageDB::new(storage)) } - #[allow(clippy::too_many_arguments)] pub async fn new_s3( storage: S, - aws_bucket: &str, - requester_pays: bool, + aws_bucket: String, aws_endpoint_url: Option, - sync_from: BlockHeight, publish: bool, ) -> Self { let credentials = DefaultCredentialsChain::builder().build().await; @@ -58,80 +81,74 @@ impl StorageOrRemoteDB { .load() .await; let mut config_builder = aws_sdk_s3::config::Builder::from(&sdk_config); - if let Some(endpoint) = &aws_endpoint_url { - config_builder.set_endpoint_url(Some(endpoint.to_string())); + if let Some(endpoint) = aws_endpoint_url { + config_builder.set_endpoint_url(Some(endpoint)); } let config = config_builder.force_path_style(true).build(); let client = aws_sdk_s3::Client::from_conf(config); - let remote_cache = RemoteCache::new( - aws_bucket.to_string(), - requester_pays, - aws_endpoint_url, - client, - storage, - sync_from, - publish, - ) - .await; + let remote_cache = RemoteCache::new(aws_bucket, client, storage, publish); StorageOrRemoteDB::Remote(remote_cache) } } -impl BlockAggregatorDB for StorageOrRemoteDB +impl BlocksStorage for StorageOrRemoteDB where - // Storage Constraints - S: Modifiable + std::fmt::Debug, + S: Modifiable + Send + Sync, S: KeyValueInspect, - S: StorageInspect, - for<'b> StorageTransaction<&'b mut S>: StorageMutate, - for<'b> StorageTransaction<&'b mut S>: - StorageMutate, - S: AtomicView, - T: Unpin + Send + Sync + KeyValueInspect + 'static + std::fmt::Debug, - StorageTransaction: StorageInspect, - // Remote Constraints - S: Send + Sync, - S: Modifiable, - S: StorageInspect, - for<'b> StorageTransaction<&'b mut S>: - StorageMutate, { type Block = crate::protobuf_types::Block; type BlockRangeResponse = BlockRangeResponse; - async fn store_block(&mut self, block: BlockSourceEvent) -> Result<()> { + async fn store_block( + &mut self, + block_height: BlockHeight, + block: &Self::Block, + ) -> Result<()> { match self { - StorageOrRemoteDB::Remote(remote_db) => remote_db.store_block(block).await?, + StorageOrRemoteDB::Remote(remote_db) => { + remote_db.store_block(block_height, block).await? + } StorageOrRemoteDB::Storage(storage_db) => { - storage_db.store_block(block).await? + storage_db.store_block(block_height, block).await? } } Ok(()) } +} - async fn get_block_range( +impl BlocksProvider for StorageOrRemoteBlocksProvider +where + S: 'static, + S: KeyValueInspect, + S: AtomicView, + S::LatestView: Unpin + Send + Sync + KeyValueInspect + 'static, +{ + type Block = ProtoBlock; + type BlockRangeResponse = BlockRangeResponse; + + fn get_block_range( &self, first: BlockHeight, last: BlockHeight, ) -> Result { let range_response = match self { - StorageOrRemoteDB::Remote(remote_db) => { - remote_db.get_block_range(first, last).await? + StorageOrRemoteBlocksProvider::Remote(remote_db) => { + remote_db.get_block_range(first, last)? } - StorageOrRemoteDB::Storage(storage_db) => { - storage_db.get_block_range(first, last).await? + StorageOrRemoteBlocksProvider::Storage(storage_db) => { + storage_db.get_block_range(first, last)? } }; Ok(range_response) } - async fn get_current_height(&self) -> Result> { + fn get_current_height(&self) -> Result> { let height = match self { - StorageOrRemoteDB::Remote(remote_db) => { - remote_db.get_current_height().await? + StorageOrRemoteBlocksProvider::Remote(remote_db) => { + remote_db.get_current_height()? } - StorageOrRemoteDB::Storage(storage_db) => { - storage_db.get_current_height().await? + StorageOrRemoteBlocksProvider::Storage(storage_db) => { + storage_db.get_current_height()? } }; Ok(height) diff --git a/crates/services/block_aggregator_api/src/lib.rs b/crates/services/block_aggregator_api/src/lib.rs index 0a231687117..d1fb8635589 100644 --- a/crates/services/block_aggregator_api/src/lib.rs +++ b/crates/services/block_aggregator_api/src/lib.rs @@ -1,339 +1,18 @@ -use crate::{ - api::BlockAggregatorApi, - blocks::BlockSource, - db::BlockAggregatorDB, -}; -use fuel_core_services::{ - RunnableTask, - StateWatcher, - TaskNextAction, -}; use fuel_core_types::fuel_types::BlockHeight; use protobuf_types::Block as ProtoBlock; pub mod api; +pub mod block_range_response; pub mod blocks; pub mod db; -pub mod result; - -pub mod block_range_response; - -pub mod block_aggregator; pub mod protobuf_types; +pub mod result; +pub mod service; +pub mod task; #[cfg(test)] mod tests; -pub mod integration { - use crate::{ - BlockAggregator, - api::{ - BlockAggregatorApi, - protobuf_adapter::ProtobufAPI, - }, - block_range_response::BlockRangeResponse, - blocks::{ - BlockSource, - importer_and_db_source::{ - BlockSerializer, - ImporterAndDbSource, - sync_service::TxReceipts, - }, - }, - db::{ - storage_or_remote_db::StorageOrRemoteDB, - table::{ - Column, - LatestBlock, - Mode, - }, - }, - protobuf_types::Block as ProtoBlock, - }; - use anyhow::bail; - use fuel_core_services::{ - RunnableService, - ServiceRunner, - StateWatcher, - stream::BoxStream, - }; - use fuel_core_storage::{ - Error as StorageError, - StorageAsRef, - StorageInspect, - StorageMutate, - kv_store::KeyValueInspect, - tables::{ - FuelBlocks, - Transactions, - }, - transactional::{ - AtomicView, - HistoricalView, - Modifiable, - StorageTransaction, - }, - }; - use fuel_core_types::{ - fuel_types::BlockHeight, - services::block_importer::SharedImportResult, - }; - use std::{ - fmt::Debug, - net::SocketAddr, - }; - - #[derive(Clone, Debug)] - pub struct Config { - pub addr: SocketAddr, - pub api_buffer_size: usize, - pub sync_from: Option, - pub storage_method: StorageMethod, - } - - #[derive(Clone, Debug, Default)] - pub enum StorageMethod { - // Stores blocks in local DB - #[default] - Local, - // Publishes blocks to S3 bucket - S3 { - bucket: String, - endpoint_url: Option, - requester_pays: bool, - }, - // Assumes another node is publishing blocks to S3 bucket, but relaying details - S3NoPublish { - bucket: String, - endpoint_url: Option, - requester_pays: bool, - }, - } - - pub struct UninitializedTask { - api: API, - block_source: Blocks, - storage: S, - config: Config, - genesis_block_height: BlockHeight, - } - - #[async_trait::async_trait] - impl RunnableService for UninitializedTask - where - Api: BlockAggregatorApi< - Block = ProtoBlock, - BlockRangeResponse = BlockRangeResponse, - >, - Blocks: BlockSource, - // Storage Constraints - S: Modifiable + Debug, - S: KeyValueInspect, - S: StorageInspect, - for<'b> StorageTransaction<&'b mut S>: - StorageMutate, - for<'b> StorageTransaction<&'b mut S>: - StorageMutate, - S: AtomicView, - T: Unpin + Send + Sync + KeyValueInspect + 'static + Debug, - StorageTransaction: - StorageInspect, - // Remote Constraints - S: Send + Sync, - S: Modifiable, - S: StorageInspect, - for<'b> StorageTransaction<&'b mut S>: - StorageMutate, - { - const NAME: &'static str = "BlockAggregatorService"; - type SharedData = (); - type Task = BlockAggregator, Blocks, Blocks::Block>; - type TaskParams = (); - - fn shared_data(&self) -> Self::SharedData {} - - async fn into_task( - self, - _state_watcher: &StateWatcher, - _params: Self::TaskParams, - ) -> anyhow::Result { - let UninitializedTask { - api, - block_source, - storage, - config, - genesis_block_height, - } = self; - let sync_from = config.sync_from.unwrap_or(genesis_block_height); - let db_adapter = match config.storage_method { - StorageMethod::Local => { - let mode = storage.storage_as_ref::().get(&())?; - let maybe_sync_from_height = match mode - .clone() - .map(|c| c.into_owned()) - { - Some(Mode::S3(_)) => { - bail!( - "Database is configured in S3 mode, but Local storage method was requested. If you would like to run in S3 mode, then please use a clean DB" - ); - } - _ => mode.map(|m| m.height()), - }; - let sync_from_height = maybe_sync_from_height.unwrap_or(sync_from); - StorageOrRemoteDB::new_storage(storage, sync_from_height) - } - StorageMethod::S3 { - bucket, - endpoint_url, - requester_pays, - } => { - let mode = storage.storage_as_ref::().get(&())?; - let maybe_sync_from_height = match mode - .clone() - .map(|c| c.into_owned()) - { - Some(Mode::Local(_)) => { - bail!( - "Database is configured in S3 mode, but Local storage method was requested. If you would like to run in S3 mode, then please use a clean DB" - ); - } - _ => mode.map(|m| m.height()), - }; - let sync_from_height = maybe_sync_from_height.unwrap_or(sync_from); - - let publish = true; - - StorageOrRemoteDB::new_s3( - storage, - &bucket, - requester_pays, - endpoint_url.clone(), - sync_from_height, - publish, - ) - .await - } - - StorageMethod::S3NoPublish { - bucket, - endpoint_url, - requester_pays, - } => { - let mode = storage.storage_as_ref::().get(&())?; - let maybe_sync_from_height = match mode - .clone() - .map(|c| c.into_owned()) - { - Some(Mode::Local(_)) => { - bail!( - "Database is configured in S3 mode, but Local storage method was requested. If you would like to run in S3 mode, then please use a clean DB" - ); - } - _ => mode.map(|m| m.height()), - }; - let sync_from_height = maybe_sync_from_height.unwrap_or(sync_from); - - let publish = false; - - StorageOrRemoteDB::new_s3( - storage, - &bucket, - requester_pays, - endpoint_url.clone(), - sync_from_height, - publish, - ) - .await - } - }; - Ok(BlockAggregator::new(api, db_adapter, block_source)) - } - } - - #[allow(clippy::type_complexity)] - pub fn new_service( - db: DB, - serializer: S, - onchain_db: OnchainDB, - receipts: Receipts, - importer: BoxStream, - config: Config, - genesis_block_height: BlockHeight, - ) -> anyhow::Result< - ServiceRunner< - UninitializedTask< - ProtobufAPI, - ImporterAndDbSource, - DB, - >, - >, - > - where - S: BlockSerializer + Clone + Send + Sync + 'static, - OnchainDB: Send + Sync, - OnchainDB: StorageInspect, - OnchainDB: StorageInspect, - OnchainDB: HistoricalView, - Receipts: TxReceipts, - // Storage Constraints - DB: Modifiable + Debug, - DB: KeyValueInspect, - DB: StorageInspect, - for<'b> StorageTransaction<&'b mut DB>: - StorageMutate, - for<'b> StorageTransaction<&'b mut DB>: - StorageMutate, - DB: AtomicView, - T: Unpin + Send + Sync + KeyValueInspect + 'static + Debug, - StorageTransaction: - StorageInspect, - // Remote Constraints - DB: Send + Sync, - DB: Modifiable, - DB: StorageInspect, - for<'b> StorageTransaction<&'b mut DB>: - StorageMutate, - { - let addr = config.addr.to_string(); - let api_buffer_size = config.api_buffer_size; - let api = ProtobufAPI::new(addr, api_buffer_size) - .map_err(|e| anyhow::anyhow!("Error creating API: {e}"))?; - let db_ending_height = onchain_db - .latest_height() - .and_then(BlockHeight::succ) - .unwrap_or(BlockHeight::from(0)); - let sync_from_height = config.sync_from.unwrap_or(genesis_block_height); - let block_source = ImporterAndDbSource::new( - importer, - serializer, - onchain_db, - receipts, - sync_from_height, - db_ending_height, - ); - let uninitialized_task = UninitializedTask { - api, - block_source, - storage: db, - config, - genesis_block_height, - }; - let runner = ServiceRunner::new(uninitialized_task); - Ok(runner) - } -} - -// TODO: this doesn't need to limited to the blocks, -// but we can change the name later -/// The Block Aggregator service, which aggregates blocks from a source and stores them in a database -/// Queries can be made to the service to retrieve data from the `DB` -pub struct BlockAggregator { - query: Api, - database: DB, - block_source: Blocks, - new_block_subscriptions: Vec>, -} - pub struct NewBlock { height: BlockHeight, block: ProtoBlock, @@ -348,29 +27,3 @@ impl NewBlock { (self.height, self.block) } } - -impl RunnableTask - for BlockAggregator -where - Api: BlockAggregatorApi, - DB: BlockAggregatorDB, - Blocks: BlockSource, - ::Block: Clone + std::fmt::Debug + Send, - BlockRange: Send, -{ - async fn run(&mut self, watcher: &mut StateWatcher) -> TaskNextAction { - tracing::debug!("BlockAggregator running"); - tokio::select! { - query_res = self.query.await_query() => self.handle_query(query_res).await, - block_res = self.block_source.next_block() => self.handle_block(block_res).await, - _ = watcher.while_started() => self.stop(), - } - } - - async fn shutdown(mut self) -> anyhow::Result<()> { - self.block_source.drain().await.map_err(|e| { - anyhow::anyhow!("Error draining block source during shutdown: {e:?}") - })?; - Ok(()) - } -} diff --git a/crates/services/block_aggregator_api/src/service.rs b/crates/services/block_aggregator_api/src/service.rs new file mode 100644 index 00000000000..7de42c5e4d7 --- /dev/null +++ b/crates/services/block_aggregator_api/src/service.rs @@ -0,0 +1,376 @@ +use crate::{ + api::{ + protobuf_adapter, + protobuf_adapter::BlocksAggregatorApi, + }, + block_range_response::BlockRangeResponse, + blocks::{ + BlockSource, + old_block_source::{ + BlockConvector, + OldBlocksSource, + TxReceipts, + }, + }, + db::{ + BlocksProvider, + storage_or_remote_db::{ + StorageOrRemoteBlocksProvider, + StorageOrRemoteDB, + }, + table::{ + Column, + LatestBlock, + Mode, + }, + }, + protobuf_types::Block as ProtoBlock, + result::Result as BlockAggregatorResult, + task::Task, +}; +use anyhow::bail; +use fuel_core_services::{ + RunnableService, + Service, + ServiceRunner, + StateWatcher, + stream::{ + BoxStream, + IntoBoxStream, + }, +}; +use fuel_core_storage::{ + Error as StorageError, + StorageAsRef, + StorageInspect, + kv_store::KeyValueInspect, + structured_storage::AsStructuredStorage, + tables::{ + FuelBlocks, + Transactions, + }, + transactional::{ + AtomicView, + HistoricalView, + Modifiable, + }, +}; +use fuel_core_types::{ + fuel_types::BlockHeight, + services::block_importer::SharedImportResult, +}; +use futures::Stream; +use std::{ + fmt::Debug, + net::SocketAddr, + sync::Arc, +}; +use tokio::sync::broadcast; +use tokio_stream::StreamExt; + +#[derive(Clone, Debug)] +pub struct Config { + pub addr: SocketAddr, + pub api_buffer_size: usize, + pub sync_from: Option, + pub storage_method: StorageMethod, +} + +#[derive(Clone, Debug, Default)] +pub enum StorageMethod { + // Stores blocks in local DB + #[default] + Local, + // Publishes blocks to S3 bucket + S3 { + bucket: String, + endpoint_url: Option, + requester_pays: bool, + }, + // Assumes another node is publishing blocks to S3 bucket, but relaying details + S3NoPublish { + bucket: String, + endpoint_url: Option, + requester_pays: bool, + }, +} + +pub struct SharedState +where + S: BlocksProvider, +{ + pub(crate) storage: Arc, + pub(crate) blocks_broadcast: broadcast::Sender<(BlockHeight, Arc)>, +} + +impl SharedState +where + S: BlocksProvider, +{ + pub fn new(storage: S, channel_size: usize) -> Self { + let (blocks_broadcast, _) = broadcast::channel(channel_size); + SharedState { + storage: Arc::new(storage), + blocks_broadcast, + } + } +} + +impl Clone for SharedState +where + S: BlocksProvider, +{ + fn clone(&self) -> Self { + SharedState { + storage: Arc::clone(&self.storage), + blocks_broadcast: self.blocks_broadcast.clone(), + } + } +} + +impl SharedState +where + S: BlocksProvider, +{ + fn storage(&self) -> &S { + &self.storage + } + + pub fn get_block_range( + &self, + first: H, + last: H, + ) -> BlockAggregatorResult + where + H: Into, + { + self.storage().get_block_range(first.into(), last.into()) + } + + pub fn get_current_height(&self) -> BlockAggregatorResult> { + self.storage().get_current_height() + } + + pub fn new_block_subscription( + &self, + ) -> impl Stream)>> + 'static { + let receiver = self.blocks_broadcast.subscribe(); + tokio_stream::wrappers::BroadcastStream::new(receiver) + .map(|result| result.map_err(|e| anyhow::anyhow!("Broadcast error: {:?}", e))) + } +} + +impl BlocksAggregatorApi for SharedState +where + S: BlocksProvider, +{ + fn get_block_range>( + &self, + first: H, + last: H, + ) -> BlockAggregatorResult { + self.get_block_range(first, last) + } + + fn get_current_height(&self) -> BlockAggregatorResult> { + self.get_current_height() + } + + fn new_block_subscription( + &self, + ) -> impl Stream)>> + Send + 'static + { + self.new_block_subscription() + } +} + +pub struct UninitializedTask +where + Blocks: BlockSource, + S2: KeyValueInspect + 'static, + S2: AtomicView, + S2::LatestView: Unpin + Send + Sync + KeyValueInspect + 'static, +{ + api: Box, + block_source: Blocks, + storage: S1, + config: Config, + genesis_block_height: BlockHeight, + shared_state: SharedState>, + importer: BoxStream::Block)>>, +} + +#[async_trait::async_trait] +impl RunnableService for UninitializedTask +where + Blocks: BlockSource, + S1: Send + Sync + Modifiable + Debug + 'static, + S1: KeyValueInspect, + S2: KeyValueInspect + 'static, + S2: AtomicView, + S2::LatestView: Unpin + Send + Sync + KeyValueInspect + 'static, +{ + const NAME: &'static str = "BlockAggregatorService"; + type SharedData = SharedState>; + type Task = Task, StorageOrRemoteBlocksProvider, Blocks>; + type TaskParams = (); + + fn shared_data(&self) -> Self::SharedData { + self.shared_state.clone() + } + + async fn into_task( + self, + _state_watcher: &StateWatcher, + _params: Self::TaskParams, + ) -> anyhow::Result { + let UninitializedTask { + api, + block_source, + storage, + config, + genesis_block_height, + shared_state, + importer, + } = self; + let sync_from = config.sync_from.unwrap_or(genesis_block_height); + + let publish = matches!(config.storage_method, StorageMethod::S3 { .. }); + + let db_adapter = match config.storage_method { + StorageMethod::Local => { + let mode = storage + .as_structured_storage() + .storage_as_ref::() + .get(&())? + .map(|c| c.into_owned()); + if let Some(Mode::S3(_)) = mode { + bail!( + "Database is configured in S3 mode, but Local storage method was requested. If you would like to run in S3 mode, then please use a clean DB" + ); + }; + StorageOrRemoteDB::new_storage(storage) + } + StorageMethod::S3 { + bucket, + endpoint_url, + .. + } + | StorageMethod::S3NoPublish { + bucket, + endpoint_url, + .. + } => { + let mode = storage + .as_structured_storage() + .storage_as_ref::() + .get(&())? + .map(|c| c.into_owned()); + if let Some(Mode::Local(_)) = mode { + bail!( + "Database is configured in S3 mode, but Local storage method was requested. If you would like to run in S3 mode, then please use a clean DB" + ); + }; + + StorageOrRemoteDB::new_s3(storage, bucket, endpoint_url.clone(), publish) + .await + } + }; + + api.start_and_await().await?; + + Ok(Task::new( + sync_from, + api, + db_adapter, + shared_state, + block_source, + importer, + )) + } +} + +#[allow(clippy::type_complexity)] +pub fn new_service( + db: DB, + convertor: S, + onchain_db: OnchainDB, + receipts: Receipts, + importer: BoxStream, + config: Config, + genesis_block_height: BlockHeight, +) -> anyhow::Result< + ServiceRunner, DB, DB>>, +> +where + S: BlockConvector + Clone + Send + Sync + 'static, + OnchainDB: Send + Sync, + OnchainDB: StorageInspect, + OnchainDB: StorageInspect, + OnchainDB: HistoricalView, + Receipts: TxReceipts, + // Storage Constraints + DB: Modifiable + Debug + Clone + Send + Sync + 'static, + DB: KeyValueInspect, + DB: AtomicView, + T: Unpin + Send + Sync + KeyValueInspect + 'static + Debug, +{ + let db_adapter = match &config.storage_method { + StorageMethod::Local => StorageOrRemoteBlocksProvider::new_storage(db.clone()), + StorageMethod::S3 { + bucket, + endpoint_url, + requester_pays, + } + | StorageMethod::S3NoPublish { + bucket, + endpoint_url, + requester_pays, + } => StorageOrRemoteBlocksProvider::new_s3( + db.clone(), + bucket.clone(), + *requester_pays, + endpoint_url.clone(), + ), + }; + + let convertor = Arc::new(convertor); + let convertor_stream = convertor.clone(); + + let importer = importer + .map(move |res| { + let receipts = res + .tx_status + .iter() + .map(|status| { + // TODO: Avoid cloning of receipts + status.result.receipts().to_vec() + }) + .collect::>(); + let height = *res.sealed_block.entity.header().height(); + let block = + convertor_stream.convert_block(&res.sealed_block.entity, &receipts)?; + + Ok((height, block)) + }) + .into_boxed(); + + let shared_state = SharedState::new(db_adapter, config.api_buffer_size); + + let api = protobuf_adapter::new_service(config.addr, shared_state.clone()); + + let block_source = OldBlocksSource::new(convertor, onchain_db, receipts); + + let uninitialized_task = UninitializedTask { + api: Box::new(api), + block_source, + shared_state, + config, + storage: db, + genesis_block_height, + importer, + }; + + let runner = ServiceRunner::new(uninitialized_task); + Ok(runner) +} diff --git a/crates/services/block_aggregator_api/src/task.rs b/crates/services/block_aggregator_api/src/task.rs new file mode 100644 index 00000000000..23fd08fe74b --- /dev/null +++ b/crates/services/block_aggregator_api/src/task.rs @@ -0,0 +1,205 @@ +use crate::{ + blocks::BlockSource, + db::{ + BlocksProvider, + BlocksStorage, + }, + result::{ + Error, + Result as AggregatorResult, + }, + service::SharedState, +}; +use fuel_core_services::{ + RunnableTask, + Service, + StateWatcher, + TaskNextAction, + stream::{ + BoxStream, + IntoBoxStream, + }, + try_or_stop, +}; +use fuel_core_types::fuel_types::BlockHeight; +use futures::StreamExt; +use std::{ + fmt::Debug, + sync::Arc, +}; + +/// The Block Aggregator service, which aggregates blocks from a source and stores them in a database +/// Queries can be made to the service to retrieve data from the `DB` +pub struct Task +where + S2: BlocksProvider, + Blocks: BlockSource, +{ + pub(crate) sync_from: BlockHeight, + pub(crate) api: Box, + pub(crate) storage: S1, + pub(crate) shared_state: SharedState, + pub(crate) block_source: Blocks, + pub(crate) importer: + BoxStream::Block)>>, + pub(crate) last_seen_importer_height: Option, + /// A joint stream of old blocks from the block source and new blocks from the importer + pub(crate) old_and_new_block_stream: + BoxStream::Block>)>>, +} + +impl Task +where + S1: BlocksStorage, + S2: BlocksProvider, + Blocks: BlockSource, + ::Block: Send + Sync + Debug + 'static, +{ + pub fn new( + sync_from: BlockHeight, + api: Box, + storage: S1, + shared_state: SharedState, + block_source: Blocks, + importer: BoxStream< + anyhow::Result<(BlockHeight, ::Block)>, + >, + ) -> Self { + let mut task = Self { + sync_from, + api, + storage, + shared_state, + block_source, + importer, + last_seen_importer_height: None, + old_and_new_block_stream: futures::stream::empty().into_boxed(), + }; + + let _ = task.restart_blocks_stream(); + + task + } + + pub fn restart_blocks_stream(&mut self) -> TaskNextAction { + let importer_height = self.last_seen_importer_height; + + let next_height = try_or_stop!(self.shared_state.get_current_height()) + .and_then(|height| height.succ()) + .unwrap_or(self.sync_from); + + let old_blocks_stream = futures::stream::iter( + self.block_source + .blocks_starting_from(next_height) + .map(|r| r.map(|(block_height, block)| (block_height, Arc::new(block)))), + ) + .take_while(move |result| { + let take = match result { + Ok((block_height, _)) => { + if let Some(importer_height) = importer_height { + *block_height < importer_height + } else { + true + } + } + Err(_) => true, + }; + + async move { take } + }); + + let receiver = self.shared_state.blocks_broadcast.subscribe(); + let life_stream = + tokio_stream::wrappers::BroadcastStream::new(receiver).map(|result| { + result.map_err(|err| { + Error::BlockSource(anyhow::anyhow!("Stream broadcast error: {err:?}")) + }) + }); + + let stream = old_blocks_stream.chain(life_stream).into_boxed(); + self.old_and_new_block_stream = stream; + + TaskNextAction::Continue + } + + pub async fn handle_block( + &mut self, + block_height: BlockHeight, + block: Arc<::Block>, + ) -> TaskNextAction { + let next_height = try_or_stop!(self.shared_state.get_current_height()) + .and_then(|height| height.succ()) + .unwrap_or(self.sync_from); + + if next_height != block_height { + tracing::warn!( + "Received block at height {block_height} but expected height {next_height}. Restarting the blocks stream." + ); + return self.restart_blocks_stream(); + } + + let res = self.storage.store_block(block_height, block.as_ref()).await; + match res { + Ok(_) => TaskNextAction::Continue, + // If we have an error, it means height is not updated in DB, and it will trigger + // restart of the stream in next iteration. + Err(err) => TaskNextAction::ErrorContinue(anyhow::anyhow!(err)), + } + } +} + +impl RunnableTask for Task +where + S1: BlocksStorage, + S2: BlocksProvider, + Blocks: BlockSource, + ::Block: Send + Sync + Debug, +{ + async fn run(&mut self, watcher: &mut StateWatcher) -> TaskNextAction { + tokio::select! { + biased; + _ = watcher.while_started() => TaskNextAction::Stop, + + _ = self.api.await_stop() => TaskNextAction::Stop, + + block_res = self.importer.next() => { + match block_res { + Some(res) => { + let (height, block) = try_or_stop!(res); + + // The new block is added to the stream of old and new blocks and will be + // processed later during future iterations. + let _ = self + .shared_state + .blocks_broadcast + .send((height, Arc::new(block))); + self.last_seen_importer_height = Some(height); + TaskNextAction::Continue + } + None => { + TaskNextAction::Stop + } + } + }, + event = self.old_and_new_block_stream.next() => { + match event { + Some(Ok((block_height, block))) => { + self.handle_block(block_height, block).await + } + Some(Err(err)) => { + tracing::warn!("Error handling block: {err}, restarting the blocks stream"); + self.restart_blocks_stream() + } + None => { + self.restart_blocks_stream() + } + } + } + } + } + + async fn shutdown(self) -> anyhow::Result<()> { + self.api.stop_and_await().await?; + Ok(()) + } +} diff --git a/crates/services/block_aggregator_api/src/tests.rs b/crates/services/block_aggregator_api/src/tests.rs index dc00e5a0efe..a0e9a13b659 100644 --- a/crates/services/block_aggregator_api/src/tests.rs +++ b/crates/services/block_aggregator_api/src/tests.rs @@ -1,20 +1,30 @@ #![allow(non_snake_case)] -use super::*; use crate::{ - api::BlockAggregatorQuery, blocks::{ BlockBytes, - BlockSourceEvent, + BlockSource, }, - result::{ - Error, - Result, + db::{ + BlocksProvider, + BlocksStorage, }, + result::Result, + service::SharedState, + task::Task, +}; +use fuel_core_services::{ + RunnableTask, + Service, + State, + StateWatcher, + stream::BoxStream, +}; +use fuel_core_types::fuel_types::BlockHeight; +use futures::{ + FutureExt, + StreamExt, }; -use anyhow::anyhow; -use fuel_core_services::stream::BoxStream; -use futures::StreamExt; use rand::{ SeedableRng, prelude::StdRng, @@ -26,36 +36,47 @@ use std::{ Mutex, }, }; -use tokio::{ - sync::mpsc::{ - Receiver, - Sender, - }, - time::error::Elapsed, -}; type BlockRangeResponse = BoxStream; -struct FakeApi { - receiver: Receiver>, -} +struct FakeApi {} -impl FakeApi { - fn new() -> (Self, Sender>) { - let (sender, receiver) = tokio::sync::mpsc::channel(1); - let api = Self { receiver }; - (api, sender) +#[async_trait::async_trait] +impl Service for FakeApi { + fn start(&self) -> anyhow::Result<()> { + Ok(()) + } + + async fn start_and_await(&self) -> anyhow::Result { + Ok(State::Started) + } + + async fn await_start_or_stop(&self) -> anyhow::Result { + futures::future::pending().await + } + + fn stop(&self) -> bool { + false + } + + async fn stop_and_await(&self) -> anyhow::Result { + Ok(State::Stopped) + } + + async fn await_stop(&self) -> anyhow::Result { + futures::future::pending().await + } + + fn state(&self) -> State { + State::Started } -} -impl BlockAggregatorApi for FakeApi { - type BlockRangeResponse = T; - type Block = B; - async fn await_query(&mut self) -> Result> { - Ok(self.receiver.recv().await.unwrap()) + fn state_watcher(&self) -> StateWatcher { + StateWatcher::started() } } +#[derive(Clone)] struct FakeDB { map: Arc>>, } @@ -69,23 +90,23 @@ impl FakeDB { fn add_block(&mut self, height: BlockHeight, block: BlockBytes) { self.map.lock().unwrap().insert(height, block); } - - fn clone_inner(&self) -> Arc>> { - self.map.clone() - } } -impl BlockAggregatorDB for FakeDB { +impl BlocksStorage for FakeDB { type Block = BlockBytes; type BlockRangeResponse = BlockRangeResponse; - async fn store_block(&mut self, block: BlockSourceEvent) -> Result<()> { - let (id, block) = block.into_inner(); - self.map.lock().unwrap().insert(id, block); + async fn store_block(&mut self, id: BlockHeight, block: &BlockBytes) -> Result<()> { + self.map.lock().unwrap().insert(id, block.clone()); Ok(()) } +} + +impl BlocksProvider for FakeDB { + type Block = BlockBytes; + type BlockRangeResponse = BlockRangeResponse; - async fn get_block_range( + fn get_block_range( &self, first: BlockHeight, last: BlockHeight, @@ -106,7 +127,7 @@ impl BlockAggregatorDB for FakeDB { Ok(Box::pin(futures::stream::iter(blocks))) } - async fn get_current_height(&self) -> Result> { + fn get_current_height(&self) -> Result> { let map = self.map.lock().unwrap(); let max_height = map.keys().max().cloned(); Ok(max_height) @@ -114,85 +135,105 @@ impl BlockAggregatorDB for FakeDB { } struct FakeBlockSource { - blocks: Receiver>, + blocks: Vec<(BlockHeight, BlockBytes)>, } impl FakeBlockSource { - fn new() -> (Self, Sender>) { - let (_sender, receiver) = tokio::sync::mpsc::channel(1); - let _self = Self { blocks: receiver }; - (_self, _sender) + fn new(blocks: Vec<(BlockHeight, BlockBytes)>) -> Self { + Self { blocks } } } impl BlockSource for FakeBlockSource { type Block = BlockBytes; - async fn next_block(&mut self) -> Result> { + fn blocks_starting_from( + &self, + block_height: BlockHeight, + ) -> impl Iterator> + Send + Sync + 'static + { + let start_height: u32 = block_height.into(); self.blocks - .recv() - .await - .ok_or(Error::BlockSource(anyhow!("Channel closed"))) + .clone() + .into_iter() + .filter(move |(height, _)| { + let h: u32 = (*height).into(); + h >= start_height + }) + .map(|(height, block)| Ok((height, block))) } +} - async fn drain(&mut self) -> Result<()> { - todo!() - } +fn importer_stream( + blocks: Vec<(BlockHeight, BlockBytes)>, +) -> BoxStream> { + let stream = futures::stream::iter( + blocks + .into_iter() + .map(|(height, block)| Ok((height, block))), + ) + .chain(futures::stream::once(futures::future::pending())); + Box::pin(stream) } #[tokio::test] async fn run__get_block_range__returns_expected_blocks() { let mut rng = StdRng::seed_from_u64(42); - // given - let (api, sender) = FakeApi::new(); + // Given let mut db = FakeDB::new(); db.add_block(1.into(), BlockBytes::random(&mut rng)); db.add_block(2.into(), BlockBytes::random(&mut rng)); db.add_block(3.into(), BlockBytes::random(&mut rng)); - let (source, _block_sender) = FakeBlockSource::new(); + let shared_state = SharedState::new(db.clone(), 1_000); - let mut srv = BlockAggregator::new(api, db, source); - let mut watcher = StateWatcher::started(); - let (query, response) = BlockAggregatorQuery::get_block_range(2, 3); + // When + let result = shared_state.get_block_range(2, 3); - // when - sender.send(query).await.unwrap(); - let _ = srv.run(&mut watcher).await; - - // then - let stream = response.await.unwrap(); + // Then + let stream = result.unwrap(); let blocks = stream.collect::>().await; // TODO: Check values assert_eq!(blocks.len(), 2); - - // cleanup - drop(_block_sender); } #[tokio::test] async fn run__new_block_gets_added_to_db() { let mut rng = StdRng::seed_from_u64(42); - // given - let (api, _sender) = FakeApi::new(); + + // Given let db = FakeDB::new(); - let db_map = db.clone_inner(); - let (source, source_sender) = FakeBlockSource::new(); - let mut srv = BlockAggregator::new(api, db, source); + let source = FakeBlockSource::new(vec![]); + let sync_from = BlockHeight::from(123u32); let block = BlockBytes::random(&mut rng); - let id = BlockHeight::from(123u32); + let importer = importer_stream(vec![(sync_from, block.clone())]); + let shared_state = SharedState::new(db.clone(), 1_000); + let mut srv = Task::new( + sync_from, + Box::new(FakeApi {}), + db, + shared_state.clone(), + source, + importer, + ); let mut watcher = StateWatcher::started(); - // when - let event = BlockSourceEvent::NewBlock(id, block.clone()); - source_sender.send(event).await.unwrap(); + // When + // Import block event + let _ = srv.run(&mut watcher).await; + // Process event let _ = srv.run(&mut watcher).await; - // then - tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; - let actual = db_map.lock().unwrap().get(&id).unwrap().clone(); + // Then + + let actual = shared_state + .get_block_range(sync_from, sync_from) + .unwrap() + .next() + .await + .unwrap(); assert_eq!(block, actual); } @@ -200,98 +241,87 @@ async fn run__new_block_gets_added_to_db() { async fn run__get_current_height__returns_expected_height() { let mut rng = StdRng::seed_from_u64(42); // given - let (api, sender) = FakeApi::new(); let mut db = FakeDB::new(); let expected_height = BlockHeight::from(3u32); db.add_block(1.into(), BlockBytes::random(&mut rng)); db.add_block(2.into(), BlockBytes::random(&mut rng)); db.add_block(expected_height, BlockBytes::random(&mut rng)); - let (source, _block_sender) = FakeBlockSource::new(); - let mut srv = BlockAggregator::new(api, db, source); - - let mut watcher = StateWatcher::started(); - let (query, response) = BlockAggregatorQuery::get_current_height(); + let shared_state = SharedState::new(db.clone(), 1_000); // when - sender.send(query).await.unwrap(); - let _ = srv.run(&mut watcher).await; + let result = shared_state.get_current_height(); // then - tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; - let height = response.await.unwrap().unwrap(); + let height = result.unwrap().unwrap(); assert_eq!(expected_height, height); - - // cleanup - drop(_block_sender); } #[tokio::test] async fn run__new_block_subscription__sends_new_block() { let mut rng = StdRng::seed_from_u64(42); - // given - let (api, sender) = FakeApi::new(); let db = FakeDB::new(); - let (source, source_sender) = FakeBlockSource::new(); - let mut srv = BlockAggregator::new(api, db, source); + let source = FakeBlockSource::new(vec![]); - let expected_block = BlockBytes::random(&mut rng); - let expected_height = BlockHeight::from(123u32); + let sync_from = BlockHeight::from(123u32); + let block = BlockBytes::random(&mut rng); + let importer = importer_stream(vec![(sync_from, block.clone())]); + let shared_state = SharedState::new(db.clone(), 1_000); + let mut srv = Task::new( + sync_from, + Box::new(FakeApi {}), + db, + shared_state.clone(), + source, + importer, + ); let mut watcher = StateWatcher::started(); - let (query, response) = BlockAggregatorQuery::new_block_subscription(); - // when - sender.send(query).await.unwrap(); - let _ = srv.run(&mut watcher).await; - let event = BlockSourceEvent::NewBlock(expected_height, expected_block.clone()); - source_sender.send(event).await.unwrap(); - let _ = srv.run(&mut watcher).await; + // Given + let mut subscription = shared_state.new_block_subscription(); - // then - let actual_block = await_response_with_timeout(response).await.unwrap(); - assert_eq!((expected_height, expected_block), actual_block); + // When + let _ = srv.run(&mut watcher).await; - // cleanup - drop(source_sender); + // Then + let actual_block = subscription + .next() + .now_or_never() + .unwrap() + .unwrap() + .unwrap(); + assert_eq!((sync_from, Arc::new(block)), actual_block); } #[tokio::test] async fn run__new_block_subscription__does_not_send_syncing_blocks() { let mut rng = StdRng::seed_from_u64(42); - // given - let (api, sender) = FakeApi::new(); let db = FakeDB::new(); - let (source, source_sender) = FakeBlockSource::new(); - let mut srv = BlockAggregator::new(api, db, source); + let sync_from = BlockHeight::from(123u32); let block = BlockBytes::random(&mut rng); - let height = BlockHeight::from(123u32); - let mut watcher = StateWatcher::started(); - let (query, response) = BlockAggregatorQuery::new_block_subscription(); - // when - sender.send(query).await.unwrap(); - let _ = srv.run(&mut watcher).await; - let event = BlockSourceEvent::OldBlock(height, block); - source_sender.send(event).await.unwrap(); - let _ = srv.run(&mut watcher).await; + let source = FakeBlockSource::new(vec![(sync_from, block.clone())]); + + let importer = importer_stream(vec![]); + let shared_state = SharedState::new(db.clone(), 1_000); + let mut srv = Task::new( + sync_from, + Box::new(FakeApi {}), + db, + shared_state.clone(), + source, + importer, + ); + let mut watcher = StateWatcher::started(); - // then - let res = await_response_with_timeout(response).await; - assert!(res.is_err(), "should have timed out"); + // Given + let mut subscription = shared_state.new_block_subscription(); - // cleanup - drop(source_sender); -} + // When + let _ = srv.run(&mut watcher).await; -async fn await_response_with_timeout(mut response: Receiver) -> Result { - tokio::time::timeout(tokio::time::Duration::from_secs(1), async { - loop { - if let Ok(result) = response.try_recv() { - return result; - } - tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; - } - }) - .await + // Then + let result = subscription.next().now_or_never(); + assert!(result.is_none()); } diff --git a/crates/storage/src/structured_storage.rs b/crates/storage/src/structured_storage.rs index d166408453a..9ed72c71c9e 100644 --- a/crates/storage/src/structured_storage.rs +++ b/crates/storage/src/structured_storage.rs @@ -110,6 +110,21 @@ impl StructuredStorage { } } +/// The trait that allows wrapping the storage into the structured storage. +pub trait AsStructuredStorage { + /// Wraps the storage into the structured storage. + fn as_structured_storage(&self) -> StructuredStorage<&Self>; +} + +impl AsStructuredStorage for S +where + S: KeyValueInspect, +{ + fn as_structured_storage(&self) -> StructuredStorage<&Self> { + StructuredStorage::new(self) + } +} + impl AsRef for StructuredStorage { fn as_ref(&self) -> &S { &self.inner diff --git a/tests/tests/dos.rs b/tests/tests/dos.rs index 759187b5a68..ab11cd92fbe 100644 --- a/tests/tests/dos.rs +++ b/tests/tests/dos.rs @@ -1,9 +1,6 @@ #![allow(non_snake_case)] -use std::time::{ - Duration, - Instant, -}; +use std::time::Duration; use fuel_core::service::{ Config, @@ -445,68 +442,6 @@ async fn concurrency_limit_0_allows_unthrottled_queries() { assert_eq!(result.status(), 200); } -#[tokio::test(flavor = "multi_thread", worker_threads = 8)] -async fn concurrency_limit_1_prevents_concurrent_queries() { - // Given - const NUM_OF_BLOCKS: u32 = 40; - let num_samples = 100; - - let mut config = Config::local_node(); - config.graphql_config.max_concurrent_queries = 1; - - let query = FULL_BLOCK_QUERY.to_string(); - let query = query.replace("$NUMBER_OF_BLOCKS", NUM_OF_BLOCKS.to_string().as_str()); - - let node = FuelService::new_node(config).await.unwrap(); - let url = format!("http://{}/v1/graphql", node.bound_address); - let client = FuelClient::new(url.clone()).unwrap(); - client.produce_blocks(NUM_OF_BLOCKS, None).await.unwrap(); - - let (tx, mut rx) = tokio::sync::mpsc::channel(100); - - // When - - // Measure the average request time for sequential queries - let mut avg_request_time = 0; - for _ in 0..num_samples { - let now = Instant::now(); - let _ = send_graph_ql_query(&url, &query).await; - avg_request_time += now.elapsed().as_nanos() / num_samples; - } - - // Measure the average request time for concurrent queries - for _ in 0..num_samples { - let tx = tx.clone(); - let url = url.clone(); - let query = query.clone(); - - tokio::spawn(async move { - let now = Instant::now(); - send_graph_ql_query(&url, &query).await; - let _ = tx.send(now.elapsed().as_nanos()).await; - }); - } - - let mut avg_concurrent_request_time = 0; - for _ in 0..num_samples { - let request_time = rx.recv().await.unwrap(); - avg_concurrent_request_time += request_time / num_samples; - } - - // Then - - // In an idealized model we should see c = s * n / 2 - // where - // c = average concurrent request time - // s = single request time - // n = number of request - // 2 = the first even natural non-zero number ;) - // - // However, since this is inherently flaky we divide by 4 instead of 2 to have some margin, - // while still maintaining our ability to assert a large deviation between the two measurements. - assert!(avg_concurrent_request_time > avg_request_time * num_samples / 4); -} - #[tokio::test] async fn recursion_in_queries_is_no_allowed__blocks() { let query = r#" diff --git a/tests/tests/rpc.rs b/tests/tests/rpc.rs index 9aea72823e6..5eb5331b360 100644 --- a/tests/tests/rpc.rs +++ b/tests/tests/rpc.rs @@ -8,7 +8,7 @@ use fuel_core::{ }, }; use fuel_core_block_aggregator_api::{ - blocks::importer_and_db_source::serializer_adapter::proto_to_fuel_conversions::fuel_block_from_protobuf, + blocks::old_block_source::convertor_adapter::proto_to_fuel_conversions::fuel_block_from_protobuf, protobuf_types::{ BlockHeightRequest as ProtoBlockHeightRequest, BlockRangeRequest as ProtoBlockRangeRequest, @@ -72,7 +72,7 @@ async fn get_block_range__can_get_serialized_block_from_rpc__literal() { assert!( matches!( - receipts[1], + receipts[0][1], Receipt::ScriptResult { result: ScriptExecutionResult::Success, .. @@ -82,7 +82,7 @@ async fn get_block_range__can_get_serialized_block_from_rpc__literal() { receipts ); assert!( - matches!(receipts[0], Receipt::Return { .. }), + matches!(receipts[0][0], Receipt::Return { .. }), "should have a return receipt, received: {:?}", receipts ); @@ -170,7 +170,7 @@ async fn new_block_subscription__can_get_expect_block() { assert!( matches!( - receipts[1], + receipts[0][1], Receipt::ScriptResult { result: ScriptExecutionResult::Success, .. @@ -180,7 +180,7 @@ async fn new_block_subscription__can_get_expect_block() { receipts ); assert!( - matches!(receipts[0], Receipt::Return { .. }), + matches!(receipts[0][0], Receipt::Return { .. }), "should have a return receipt, received: {:?}", receipts ); diff --git a/tests/tests/rpc_s3.rs b/tests/tests/rpc_s3.rs index 727fcf34c20..52d6b29742e 100644 --- a/tests/tests/rpc_s3.rs +++ b/tests/tests/rpc_s3.rs @@ -14,9 +14,8 @@ use fuel_core::{ }, }; use fuel_core_block_aggregator_api::{ - blocks::importer_and_db_source::serializer_adapter::proto_to_fuel_conversions::fuel_block_from_protobuf, + blocks::old_block_source::convertor_adapter::proto_to_fuel_conversions::fuel_block_from_protobuf, db::remote_cache::block_height_to_key, - integration::StorageMethod, protobuf_types::{ Block as ProtoBlock, BlockHeightRequest as ProtoBlockHeightRequest, @@ -27,6 +26,7 @@ use fuel_core_block_aggregator_api::{ block_response::Payload as ProtoPayload, remote_block_response::Location, }, + service::StorageMethod, }; use fuel_core_client::client::FuelClient; use fuel_core_types::{