Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 10 additions & 12 deletions bin/fuel-core/src/cli/run/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ pub enum StorageMethod {
}

impl RpcArgs {
pub fn into_config(self) -> fuel_core_block_aggregator_api::integration::Config {
fuel_core_block_aggregator_api::integration::Config {
pub fn into_config(self) -> fuel_core_block_aggregator_api::service::Config {
fuel_core_block_aggregator_api::service::Config {
addr: net::SocketAddr::new(self.rpc_ip, self.rpc_port),
sync_from: Some(BlockHeight::from(0)),
storage_method: self.storage_method.map(Into::into).unwrap_or_default(),
Expand All @@ -54,17 +54,17 @@ impl RpcArgs {
}
}

impl From<StorageMethod> for fuel_core_block_aggregator_api::integration::StorageMethod {
impl From<StorageMethod> for fuel_core_block_aggregator_api::service::StorageMethod {
fn from(storage_method: StorageMethod) -> Self {
match storage_method {
StorageMethod::Local => {
fuel_core_block_aggregator_api::integration::StorageMethod::Local
fuel_core_block_aggregator_api::service::StorageMethod::Local
}
StorageMethod::S3 {
bucket,
endpoint_url,
requester_pays,
} => fuel_core_block_aggregator_api::integration::StorageMethod::S3 {
} => fuel_core_block_aggregator_api::service::StorageMethod::S3 {
bucket,
endpoint_url,
requester_pays,
Expand All @@ -73,13 +73,11 @@ impl From<StorageMethod> for fuel_core_block_aggregator_api::integration::Storag
bucket,
endpoint_url,
requester_pays,
} => {
fuel_core_block_aggregator_api::integration::StorageMethod::S3NoPublish {
bucket,
endpoint_url,
requester_pays,
}
}
} => fuel_core_block_aggregator_api::service::StorageMethod::S3NoPublish {
bucket,
endpoint_url,
requester_pays,
},
}
}
}
41 changes: 20 additions & 21 deletions crates/fuel-core/src/combined_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ use crate::state::{
historical_rocksdb::StateRewindPolicy,
rocks_db::DatabaseConfig,
};
#[cfg(feature = "rpc")]
use anyhow::anyhow;

use crate::{
database::{
Expand All @@ -24,8 +22,6 @@ use crate::{

#[cfg(feature = "rpc")]
use crate::database::database_description::block_aggregator::BlockAggregatorDatabase;
#[cfg(feature = "rpc")]
use fuel_core_block_aggregator_api::db::table::LatestBlock;
#[cfg(feature = "test-helpers")]
use fuel_core_chain_config::{
StateConfig,
Expand All @@ -43,11 +39,6 @@ use fuel_core_storage::tables::{
ContractsState,
Messages,
};
#[cfg(feature = "rpc")]
use fuel_core_storage::{
Error as StorageError,
StorageAsRef,
};
use fuel_core_types::{
blockchain::primitives::DaBlockHeight,
fuel_types::BlockHeight,
Expand Down Expand Up @@ -488,31 +479,26 @@ impl CombinedDatabase {

let gas_price_chain_height =
self.gas_price().latest_height_from_metadata()?;
let gas_price_rolled_back =
is_equal_or_none(gas_price_chain_height, target_block_height);
let gas_price_rolled_back = is_equal_or_less_than_or_none(
gas_price_chain_height,
target_block_height,
);

let compression_db_height =
self.compression().latest_height_from_metadata()?;
let compression_db_rolled_back =
is_equal_or_none(compression_db_height, target_block_height);
is_equal_or_less_than_or_none(compression_db_height, target_block_height);

#[cfg(feature = "rpc")]
{
let block_aggregation_storage_height = self
.block_aggregation_storage()
.storage_as_ref::<LatestBlock>()
.get(&())
.map_err(|e: StorageError| anyhow!(e))?
.map(|b| b.height());
.latest_height_from_metadata()?;
let block_aggregation_storage_rolled_back = is_equal_or_less_than_or_none(
block_aggregation_storage_height,
target_block_height,
);

if !block_aggregation_storage_rolled_back {
self.block_aggregation_storage_mut()
.rollback_to(target_block_height)?;
}
if on_chain_height == target_block_height
&& off_chain_height == target_block_height
&& gas_price_rolled_back
Expand Down Expand Up @@ -585,6 +571,20 @@ impl CombinedDatabase {
{
self.compression().rollback_last_block()?;
}

#[cfg(feature = "rpc")]
{
let block_aggregation_storage_height = self
.block_aggregation_storage()
.latest_height_from_metadata()?;

if let Some(block_aggregation_storage_height) =
block_aggregation_storage_height
&& block_aggregation_storage_height > target_block_height
{
self.block_aggregation_storage().rollback_last_block()?;
}
}
}

if shutdown_listener.is_cancelled() {
Expand Down Expand Up @@ -722,7 +722,6 @@ fn is_equal_or_none<T: PartialEq>(maybe_left: Option<T>, right: T) -> bool {
maybe_left.map(|left| left == right).unwrap_or(true)
}

#[cfg(feature = "rpc")]
fn is_equal_or_less_than_or_none<T: PartialOrd>(maybe_left: Option<T>, right: T) -> bool {
maybe_left.map(|left| left <= right).unwrap_or(true)
}
Expand Down
51 changes: 6 additions & 45 deletions crates/fuel-core/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,19 +92,6 @@ use crate::{
state::HeightType,
};

#[cfg(feature = "rpc")]
use anyhow::anyhow;
#[cfg(feature = "rpc")]
use fuel_core_block_aggregator_api::db::table::{
Blocks,
LatestBlock,
Mode,
};
#[cfg(feature = "rpc")]
use fuel_core_storage::{
StorageAsRef,
transactional::WriteTransaction,
};
#[cfg(feature = "rocksdb")]
use std::path::Path;

Expand Down Expand Up @@ -462,38 +449,12 @@ impl Modifiable for Database<BlockAggregatorDatabase> {
fn commit_changes(&mut self, changes: Changes) -> StorageResult<()> {
// Does not need to be monotonically increasing because
// storage values are modified in parallel from different heights
commit_changes_with_height_update(self, changes, |_iter| Ok(Vec::new()))
}
}

#[cfg(feature = "rpc")]
impl Database<BlockAggregatorDatabase> {
pub fn rollback_to(&mut self, block_height: BlockHeight) -> StorageResult<()> {
let mut tx = self.write_transaction();
let mode = tx
.storage_as_ref::<LatestBlock>()
.get(&())?
.map(|m| m.into_owned());
let new = match mode {
None => None,
Some(Mode::Local(_)) => Some(Mode::new_local(block_height)),
Some(Mode::S3(_)) => Some(Mode::new_s3(block_height)),
};
if let Some(Mode::Local(_)) = mode {
let remove_heights = tx
.iter_all_keys::<Blocks>(Some(IterDirection::Reverse))
.flatten()
.take_while(|height| height > &block_height)
.collect::<Vec<_>>();
for height in remove_heights {
tx.storage_as_mut::<Blocks>().remove(&height)?;
}
}
if let Some(new) = new {
tx.storage_as_mut::<LatestBlock>().insert(&(), &new)?;
tx.commit().map_err(|e: StorageError| anyhow!(e))?;
}
Ok(())
commit_changes_with_height_update(self, changes, |iter| {
iter.iter_all_keys::<fuel_core_block_aggregator_api::db::table::Blocks>(Some(
IterDirection::Reverse,
))
.try_collect()
})
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/fuel-core/src/p2p_test_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ pub struct CustomizeConfig {
max_discovery_peers_connected: Option<u32>,
subscribe_to_transactions: Option<bool>,
#[cfg(feature = "rpc")]
rpc_config: Option<fuel_core_block_aggregator_api::integration::Config>,
rpc_config: Option<fuel_core_block_aggregator_api::service::Config>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't like integration over service :P ?

}

impl CustomizeConfig {
Expand Down
4 changes: 2 additions & 2 deletions crates/fuel-core/src/service/adapters/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
fuel_core_graphql_api::storage::transactions::TransactionStatuses,
};
use fuel_core_block_aggregator_api::{
blocks::importer_and_db_source::sync_service::TxReceipts,
blocks::old_block_source::TxReceipts,
result::{
Error as RPCError,
Result as RPCResult,
Expand All @@ -32,7 +32,7 @@ impl ReceiptSource {
}

impl TxReceipts for ReceiptSource {
async fn get_receipts(&self, tx_id: &TxId) -> RPCResult<Vec<Receipt>> {
fn get_receipts(&self, tx_id: &TxId) -> RPCResult<Vec<Receipt>> {
let tx_status =
StorageInspect::<TransactionStatuses>::get(&self.off_chain, tx_id)
.map_err(RPCError::receipt_error)?;
Expand Down
2 changes: 1 addition & 1 deletion crates/fuel-core/src/service/adapters/rpc/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ async fn get_receipt__gets_the_receipt_for_expected_tx() {
let receipt_source = ReceiptSource::new(db);

// when
let actual = receipt_source.get_receipts(&tx_id).await.unwrap();
let actual = receipt_source.get_receipts(&tx_id).unwrap();

// then
assert_eq!(actual, expected);
Expand Down
8 changes: 4 additions & 4 deletions crates/fuel-core/src/service/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use fuel_core_p2p::config::{
};

#[cfg(feature = "rpc")]
use fuel_core_block_aggregator_api::integration::StorageMethod;
use fuel_core_block_aggregator_api::service::StorageMethod;
#[cfg(feature = "test-helpers")]
use fuel_core_chain_config::{
ChainConfig,
Expand Down Expand Up @@ -87,7 +87,7 @@ pub struct Config {
pub block_producer: fuel_core_producer::Config,
pub gas_price_config: GasPriceConfig,
#[cfg(feature = "rpc")]
pub rpc_config: Option<fuel_core_block_aggregator_api::integration::Config>,
pub rpc_config: Option<fuel_core_block_aggregator_api::service::Config>,
pub da_compression: DaCompressionMode,
pub block_importer: fuel_core_importer::Config,
#[cfg(feature = "relayer")]
Expand Down Expand Up @@ -130,7 +130,7 @@ impl Config {
#[cfg(feature = "rpc")]
pub fn local_node_with_rpc() -> Self {
let mut config = Self::local_node_with_state_config(StateConfig::local_testnet());
let rpc_config = fuel_core_block_aggregator_api::integration::Config {
let rpc_config = fuel_core_block_aggregator_api::service::Config {
addr: free_local_addr(),
sync_from: Some(BlockHeight::new(0)),
storage_method: StorageMethod::Local,
Expand All @@ -144,7 +144,7 @@ impl Config {
#[cfg(feature = "rpc")]
pub fn local_node_with_rpc_and_storage_method(storage_method: StorageMethod) -> Self {
let mut config = Self::local_node_with_state_config(StateConfig::local_testnet());
let rpc_config = fuel_core_block_aggregator_api::integration::Config {
let rpc_config = fuel_core_block_aggregator_api::service::Config {
addr: free_local_addr(),
sync_from: Some(BlockHeight::new(0)),
storage_method,
Expand Down
19 changes: 9 additions & 10 deletions crates/fuel-core/src/service/sub_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,11 @@ mod rpc {
service::adapters::rpc::ReceiptSource,
};
pub use fuel_core_block_aggregator_api::{
api::protobuf_adapter::ProtobufAPI,
blocks::importer_and_db_source::{
ImporterAndDbSource,
serializer_adapter::SerializerAdapter,
blocks::old_block_source::{
OldBlocksSource,
convertor_adapter::ConvertorAdapter,
},
integration::UninitializedTask,
service::UninitializedTask,
};
pub use fuel_core_services::ServiceRunner;
pub use fuel_core_types::fuel_types::BlockHeight;
Expand Down Expand Up @@ -570,24 +569,24 @@ pub fn init_sub_services(
#[allow(clippy::type_complexity)]
#[cfg(feature = "rpc")]
fn init_rpc_server(
config: &fuel_core_block_aggregator_api::integration::Config,
config: &fuel_core_block_aggregator_api::service::Config,
database: &CombinedDatabase,
importer_adapter: &BlockImporterAdapter,
genesis_height: BlockHeight,
) -> anyhow::Result<
ServiceRunner<
UninitializedTask<
ProtobufAPI,
ImporterAndDbSource<SerializerAdapter, Database<OnChain>, ReceiptSource>,
OldBlocksSource<ConvertorAdapter, Database<OnChain>, ReceiptSource>,
Database<BlockAggregatorDatabase>,
Database<BlockAggregatorDatabase>,
>,
>,
> {
let receipts = ReceiptSource::new(database.off_chain().clone());
let serializer = SerializerAdapter;
let serializer = ConvertorAdapter;
let onchain_db = database.on_chain().clone();
let importer = importer_adapter.events_shared_result();
fuel_core_block_aggregator_api::integration::new_service(
fuel_core_block_aggregator_api::service::new_service(
database.block_aggregation_storage().clone(),
serializer,
onchain_db,
Expand Down
6 changes: 4 additions & 2 deletions crates/services/block_aggregator_api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,16 @@ fuel-core-services = { workspace = true }
fuel-core-storage = { workspace = true, features = ["std"] }
fuel-core-types = { workspace = true, features = ["std"] }
futures = { workspace = true }
itertools = { workspace = true }
num_enum = { workspace = true }
prost = { workspace = true, features = ["derive"] }
rand = { workspace = true }
serde = { workspace = true, features = ["derive"] }
strum = { workspace = true }
strum_macros = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tokio-stream = { workspace = true }
tokio = { workspace = true, features = ["sync"] }
tokio-stream = { workspace = true, features = ["sync"] }
tonic = { workspace = true }
tracing = { workspace = true }

Expand All @@ -42,5 +43,6 @@ aws-smithy-mocks = { workspace = true }
fuel-core-services = { workspace = true, features = ["test-helpers"] }
fuel-core-storage = { workspace = true, features = ["test-helpers"] }
fuel-core-types = { workspace = true, features = ["std", "test-helpers"] }
mockall = { workspace = true }
proptest = { workspace = true }
tokio-stream = { workspace = true }
Loading
Loading