Skip to content

Conversation

@xgreenx
Copy link
Collaborator

@xgreenx xgreenx commented Dec 9, 2025

  • Splitted read storage from write storage
  • Instead of using channels, now we query data directly from the storage adapter
  • Removed cloning of blocks by suing Arc

Instead of using channels, now we query data directly from the storage adapter.
@xgreenx xgreenx self-assigned this Dec 9, 2025
@xgreenx xgreenx requested review from a team, Dentosal, MitchTurner and rymnc as code owners December 9, 2025 20:19
@cursor
Copy link

cursor bot commented Dec 9, 2025

PR Summary

Refactors block aggregator to separate read and write paths, remove channel-based API, and wire a direct storage-backed RPC service with broadcasted new-block streams; updates configs, storage helpers, and tests accordingly.

  • Block Aggregator API (major refactor):
    • Split interfaces into BlocksProvider (read) and BlocksStorage (write).
    • Replaced channel-based BlockAggregatorApi/BlockAggregatorQuery with direct, trait-based access via SharedState and BlocksAggregatorApi.
    • New modules: service.rs (config, wiring, shared state) and task.rs (ingest/store and broadcast new blocks); removed legacy block_aggregator.rs and old API.
    • Protobuf server now serves from storage-backed API; new-block subscription uses broadcast stream; avoids unnecessary block clones via Arc.
  • RPC/Node integration:
    • Moved from integration::{Config,StorageMethod,...} to service::{Config,StorageMethod,...} across CLI, node config, and sub-services.
    • Updated init_rpc_server wiring to new service/task types.
  • Storage:
    • Added AsStructuredStorage helper to wrap KV stores for structured access.
    • Split S3 logic: RemoteCache (write/publish) and RemoteBlocksProvider (read-only); adjusted constructors and key/value access.
  • Tests & deps:
    • Rewrote tests to use new traits/streams and mockall for API; enabled tokio/tokio-stream sync features.

Written by Cursor Bugbot for commit eba8c17. This will update automatically on new commits. Configure here.

@xgreenx xgreenx added the no changelog Skip the CI check of the changelog modification label Dec 9, 2025
subscribe_to_transactions: Option<bool>,
#[cfg(feature = "rpc")]
rpc_config: Option<fuel_core_block_aggregator_api::integration::Config>,
rpc_config: Option<fuel_core_block_aggregator_api::service::Config>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't like integration over service :P ?

query_sender:
tokio::sync::mpsc::Sender<BlockAggregatorQuery<BlockRangeResponse, ProtoBlock>>,
#[cfg_attr(test, mockall::automock)]
pub trait BlocksAggregatorApi: Send + Sync + 'static {
Copy link
Member

@MitchTurner MitchTurner Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In reality, we could just depend on the BlocksProvider trait here and directly on the broadcast::Sender that we have in the SharedStorage. This would parallel what we are doing in the aggregator service already, since it takes a dep on BlocksStorage and broadcast::Sender.

I think that's the right pattern.

tokio::sync::mpsc::Receiver<BlockAggregatorQuery<BlockRangeResponse, ProtoBlock>>,
pub struct UninitializedTask<B> {
addr: SocketAddr,
api: B,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above comment. Maybe rename something like block_aggregator

Comment on lines +203 to +210
let server = Server::new(self.api);
let router = tonic::transport::Server::builder()
.add_service(ProtoBlockAggregatorServer::new(server));
self.router = Some(router);
Ok(())
}

fn get_router(&mut self) -> anyhow::Result<Router> {
self.router
.take()
.ok_or_else(|| anyhow!("Router has not been initialized yet"))
let task = Task {
addr: self.addr,
router: Some(router),
};
Ok(task)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need an UninitializedTask just for this? What's the point of into_task for anyway haha?

impl BlockAggregatorApi for ProtobufAPI {
type BlockRangeResponse = BlockRangeResponse;
type Block = ProtoBlock;
pub type APIService<B> = ServiceRunner<UninitializedTask<B>>;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I'm not a fan of aliases like this, as it adds misdirection. If I publically saw an APIService, I wouldn't assume that it was in fact the ServiceRunner.

}
}

impl<S> BlocksAggregatorApi for SharedState<S>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't be iimplementing a foreign port like this here. See above comment on moving renaming and moving SharedState.


/// The Block Aggregator service, which aggregates blocks from a source and stores them in a database
/// Queries can be made to the service to retrieve data from the `DB`
pub struct Task<S1, S2, Blocks>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why S1 and S2? Can we use S and P? or Storage and PRovider?

Comment on lines +13 to +29
pub trait BlocksProvider: Send + Sync + 'static {
type Block: Send + Sync + 'static;
/// The type used to report a range of blocks
type BlockRangeResponse;

/// Stores a block with the given ID
fn store_block(
&mut self,
block: BlockSourceEvent<Self::Block>,
) -> impl Future<Output = Result<()>> + Send;

/// Retrieves a range of blocks from the database
fn get_block_range(
&self,
first: BlockHeight,
last: BlockHeight,
) -> impl Future<Output = Result<Self::BlockRangeResponse>> + Send;
) -> Result<Self::BlockRangeResponse>;

/// Retrieves the current height of the aggregated blocks If there is a break in the blocks,
/// i.e. the blocks are being aggregated out of order, return the height of the last
/// contiguous block
fn get_current_height(
&self,
) -> impl Future<Output = Result<Option<BlockHeight>>> + Send;
fn get_current_height(&self) -> Result<Option<BlockHeight>>;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can move the the protobuf adapter code and replace BlocksAggregatorAPI port, like I mentioned above.

}
}

impl<S> BlocksProvider for StorageBlocksProvider<S>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we move this port to the protobuf adapter code, this impl should go with it.

Comment on lines +118 to +121
S: 'static,
S: KeyValueInspect<Column = Column>,
S: AtomicView,
S::LatestView: Unpin + Send + Sync + KeyValueInspect<Column = Column> + 'static,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IDK how you remember which traits to use to keep it this clean :P

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

no changelog Skip the CI check of the changelog modification

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants