Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 10 additions & 12 deletions bin/fuel-core/src/cli/run/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ pub enum StorageMethod {
}

impl RpcArgs {
pub fn into_config(self) -> fuel_core_block_aggregator_api::integration::Config {
fuel_core_block_aggregator_api::integration::Config {
pub fn into_config(self) -> fuel_core_block_aggregator_api::service::Config {
fuel_core_block_aggregator_api::service::Config {
addr: net::SocketAddr::new(self.rpc_ip, self.rpc_port),
sync_from: Some(BlockHeight::from(0)),
storage_method: self.storage_method.map(Into::into).unwrap_or_default(),
Expand All @@ -54,17 +54,17 @@ impl RpcArgs {
}
}

impl From<StorageMethod> for fuel_core_block_aggregator_api::integration::StorageMethod {
impl From<StorageMethod> for fuel_core_block_aggregator_api::service::StorageMethod {
fn from(storage_method: StorageMethod) -> Self {
match storage_method {
StorageMethod::Local => {
fuel_core_block_aggregator_api::integration::StorageMethod::Local
fuel_core_block_aggregator_api::service::StorageMethod::Local
}
StorageMethod::S3 {
bucket,
endpoint_url,
requester_pays,
} => fuel_core_block_aggregator_api::integration::StorageMethod::S3 {
} => fuel_core_block_aggregator_api::service::StorageMethod::S3 {
bucket,
endpoint_url,
requester_pays,
Expand All @@ -73,13 +73,11 @@ impl From<StorageMethod> for fuel_core_block_aggregator_api::integration::Storag
bucket,
endpoint_url,
requester_pays,
} => {
fuel_core_block_aggregator_api::integration::StorageMethod::S3NoPublish {
bucket,
endpoint_url,
requester_pays,
}
}
} => fuel_core_block_aggregator_api::service::StorageMethod::S3NoPublish {
bucket,
endpoint_url,
requester_pays,
},
}
}
}
2 changes: 1 addition & 1 deletion crates/fuel-core/src/p2p_test_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ pub struct CustomizeConfig {
max_discovery_peers_connected: Option<u32>,
subscribe_to_transactions: Option<bool>,
#[cfg(feature = "rpc")]
rpc_config: Option<fuel_core_block_aggregator_api::integration::Config>,
rpc_config: Option<fuel_core_block_aggregator_api::service::Config>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't like integration over service :P ?

}

impl CustomizeConfig {
Expand Down
8 changes: 4 additions & 4 deletions crates/fuel-core/src/service/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use fuel_core_p2p::config::{
};

#[cfg(feature = "rpc")]
use fuel_core_block_aggregator_api::integration::StorageMethod;
use fuel_core_block_aggregator_api::service::StorageMethod;
#[cfg(feature = "test-helpers")]
use fuel_core_chain_config::{
ChainConfig,
Expand Down Expand Up @@ -87,7 +87,7 @@ pub struct Config {
pub block_producer: fuel_core_producer::Config,
pub gas_price_config: GasPriceConfig,
#[cfg(feature = "rpc")]
pub rpc_config: Option<fuel_core_block_aggregator_api::integration::Config>,
pub rpc_config: Option<fuel_core_block_aggregator_api::service::Config>,
pub da_compression: DaCompressionMode,
pub block_importer: fuel_core_importer::Config,
#[cfg(feature = "relayer")]
Expand Down Expand Up @@ -130,7 +130,7 @@ impl Config {
#[cfg(feature = "rpc")]
pub fn local_node_with_rpc() -> Self {
let mut config = Self::local_node_with_state_config(StateConfig::local_testnet());
let rpc_config = fuel_core_block_aggregator_api::integration::Config {
let rpc_config = fuel_core_block_aggregator_api::service::Config {
addr: free_local_addr(),
sync_from: Some(BlockHeight::new(0)),
storage_method: StorageMethod::Local,
Expand All @@ -144,7 +144,7 @@ impl Config {
#[cfg(feature = "rpc")]
pub fn local_node_with_rpc_and_storage_method(storage_method: StorageMethod) -> Self {
let mut config = Self::local_node_with_state_config(StateConfig::local_testnet());
let rpc_config = fuel_core_block_aggregator_api::integration::Config {
let rpc_config = fuel_core_block_aggregator_api::service::Config {
addr: free_local_addr(),
sync_from: Some(BlockHeight::new(0)),
storage_method,
Expand Down
9 changes: 4 additions & 5 deletions crates/fuel-core/src/service/sub_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,11 @@ mod rpc {
service::adapters::rpc::ReceiptSource,
};
pub use fuel_core_block_aggregator_api::{
api::protobuf_adapter::ProtobufAPI,
blocks::importer_and_db_source::{
ImporterAndDbSource,
serializer_adapter::SerializerAdapter,
},
integration::UninitializedTask,
service::UninitializedTask,
};
pub use fuel_core_services::ServiceRunner;
pub use fuel_core_types::fuel_types::BlockHeight;
Expand Down Expand Up @@ -570,24 +569,24 @@ pub fn init_sub_services(
#[allow(clippy::type_complexity)]
#[cfg(feature = "rpc")]
fn init_rpc_server(
config: &fuel_core_block_aggregator_api::integration::Config,
config: &fuel_core_block_aggregator_api::service::Config,
database: &CombinedDatabase,
importer_adapter: &BlockImporterAdapter,
genesis_height: BlockHeight,
) -> anyhow::Result<
ServiceRunner<
UninitializedTask<
ProtobufAPI,
ImporterAndDbSource<SerializerAdapter, Database<OnChain>, ReceiptSource>,
Database<BlockAggregatorDatabase>,
Database<BlockAggregatorDatabase>,
>,
>,
> {
let receipts = ReceiptSource::new(database.off_chain().clone());
let serializer = SerializerAdapter;
let onchain_db = database.on_chain().clone();
let importer = importer_adapter.events_shared_result();
fuel_core_block_aggregator_api::integration::new_service(
fuel_core_block_aggregator_api::service::new_service(
database.block_aggregation_storage().clone(),
serializer,
onchain_db,
Expand Down
5 changes: 3 additions & 2 deletions crates/services/block_aggregator_api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ serde = { workspace = true, features = ["derive"] }
strum = { workspace = true }
strum_macros = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tokio-stream = { workspace = true }
tokio = { workspace = true, features = ["sync"] }
tokio-stream = { workspace = true, features = ["sync"] }
tonic = { workspace = true }
tracing = { workspace = true }

Expand All @@ -42,5 +42,6 @@ aws-smithy-mocks = { workspace = true }
fuel-core-services = { workspace = true, features = ["test-helpers"] }
fuel-core-storage = { workspace = true, features = ["test-helpers"] }
fuel-core-types = { workspace = true, features = ["std", "test-helpers"] }
mockall = { workspace = true }
proptest = { workspace = true }
tokio-stream = { workspace = true }
84 changes: 0 additions & 84 deletions crates/services/block_aggregator_api/src/api.rs
Original file line number Diff line number Diff line change
@@ -1,85 +1 @@
use crate::result::Result;
use fuel_core_types::fuel_types::BlockHeight;
use std::fmt;

pub mod protobuf_adapter;

/// The API for querying the block aggregator service.
pub trait BlockAggregatorApi: Send + Sync {
/// The type of the block range response.
type BlockRangeResponse;
type Block;

/// Awaits the next query to the block aggregator service.
fn await_query(
&mut self,
) -> impl Future<
Output = Result<BlockAggregatorQuery<Self::BlockRangeResponse, Self::Block>>,
> + Send;
}

pub enum BlockAggregatorQuery<BlockRangeResponse, Block> {
GetBlockRange {
first: BlockHeight,
last: BlockHeight,
response: tokio::sync::oneshot::Sender<BlockRangeResponse>,
},
GetCurrentHeight {
response: tokio::sync::oneshot::Sender<Option<BlockHeight>>,
},
// TODO: Do we need a way to unsubscribe or can we just see that the receiver is dropped?
NewBlockSubscription {
response: tokio::sync::mpsc::Sender<(BlockHeight, Block)>,
},
}

impl<T, B> fmt::Debug for BlockAggregatorQuery<T, B> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
BlockAggregatorQuery::GetBlockRange { first, last, .. } => f
.debug_struct("GetBlockRange")
.field("first", first)
.field("last", last)
.finish(),
BlockAggregatorQuery::GetCurrentHeight { .. } => {
f.debug_struct("GetCurrentHeight").finish()
}
BlockAggregatorQuery::NewBlockSubscription { .. } => {
f.debug_struct("GetNewBlockStream").finish()
}
}
}
}

#[cfg(test)]
impl<T, B> BlockAggregatorQuery<T, B> {
pub fn get_block_range<H: Into<BlockHeight>>(
first: H,
last: H,
) -> (Self, tokio::sync::oneshot::Receiver<T>) {
let (sender, receiver) = tokio::sync::oneshot::channel();
let first: BlockHeight = first.into();
let last: BlockHeight = last.into();
let query = Self::GetBlockRange {
first,
last,
response: sender,
};
(query, receiver)
}

pub fn get_current_height()
-> (Self, tokio::sync::oneshot::Receiver<Option<BlockHeight>>) {
let (sender, receiver) = tokio::sync::oneshot::channel();
let query = Self::GetCurrentHeight { response: sender };
(query, receiver)
}

pub fn new_block_subscription()
-> (Self, tokio::sync::mpsc::Receiver<(BlockHeight, B)>) {
const ARBITRARY_CHANNEL_SIZE: usize = 10;
let (sender, receiver) = tokio::sync::mpsc::channel(ARBITRARY_CHANNEL_SIZE);
let query = Self::NewBlockSubscription { response: sender };
(query, receiver)
}
}
Loading
Loading