Skip to content

Commit d70e029

Browse files
committed
Add a way to build a rpc handler that still has the S type parameter
1 parent b5be9ab commit d70e029

File tree

2 files changed

+36
-12
lines changed

2 files changed

+36
-12
lines changed

src/net_protocol.rs

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use crate::{
2525
Stats,
2626
},
2727
provider::EventSender,
28-
rpc::{client::blobs::MemClient, RpcHandler},
28+
rpc::{client::blobs::MemClient, MemRpcHandler, RpcHandler},
2929
store::{GcConfig, Store},
3030
util::{
3131
local_pool::{self, LocalPoolHandle},
@@ -63,7 +63,7 @@ pub(crate) struct BlobsInner<S> {
6363
endpoint: Endpoint,
6464
gc_state: Arc<std::sync::Mutex<GcState>>,
6565
#[cfg(feature = "rpc")]
66-
pub(crate) rpc_handler: Arc<OnceLock<crate::rpc::RpcHandler>>,
66+
pub(crate) rpc_handler: Arc<OnceLock<crate::rpc::MemRpcHandler>>,
6767
}
6868

6969
/// Name used for logging when new node addresses are added from gossip.
@@ -137,15 +137,26 @@ impl<S: crate::store::Store> Builder<S> {
137137
/// Build the Blobs protocol handler.
138138
/// You need to provide a local pool handle and an endpoint.
139139
pub fn build(self, rt: &LocalPoolHandle, endpoint: &Endpoint) -> Arc<Blobs> {
140+
let inner = self.build_inner(rt, endpoint);
141+
Arc::new(Blobs { inner })
142+
}
143+
144+
pub fn build_rpc_handler(self, rt: &LocalPoolHandle, endpoint: &Endpoint) -> RpcHandler<S> {
145+
let inner = self.build_inner(rt, endpoint);
146+
RpcHandler::from_blobs(inner)
147+
}
148+
149+
/// Build the Blobs protocol handler.
150+
/// You need to provide a local pool handle and an endpoint.
151+
fn build_inner(self, rt: &LocalPoolHandle, endpoint: &Endpoint) -> Arc<BlobsInner<S>> {
140152
let downloader = Downloader::new(self.store.clone(), endpoint.clone(), rt.clone());
141-
let inner = Arc::new(BlobsInner::new(
153+
Arc::new(BlobsInner::new(
142154
self.store,
143155
rt.clone(),
144156
self.events.unwrap_or_default(),
145157
downloader,
146158
endpoint.clone(),
147-
));
148-
Arc::new(Blobs { inner })
159+
))
149160
}
150161
}
151162

@@ -394,6 +405,10 @@ pub struct Blobs {
394405
}
395406

396407
impl Blobs {
408+
pub(crate) fn from_inner<S: Store>(inner: Arc<BlobsInner<S>>) -> Self {
409+
Self { inner }
410+
}
411+
397412
pub fn client(&self) -> MemClient {
398413
self.inner.clone().client()
399414
}
@@ -466,7 +481,7 @@ impl<S: crate::store::Store> DynBlobs for BlobsInner<S> {
466481
fn client(self: Arc<Self>) -> MemClient {
467482
let client = self
468483
.rpc_handler
469-
.get_or_init(|| RpcHandler::new(&self))
484+
.get_or_init(|| MemRpcHandler::new(&self))
470485
.client
471486
.clone();
472487
MemClient::new(client)

src/rpc.rs

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ use crate::{
4343
export::ExportProgress,
4444
format::collection::Collection,
4545
get::db::DownloadProgress,
46-
net_protocol::{BlobDownloadRequest, BlobsInner},
46+
net_protocol::{BlobDownloadRequest, Blobs, BlobsInner},
4747
provider::{AddProgress, BatchAddPathProgress},
4848
store::{ConsistencyCheckProgress, ImportProgress, MapEntry, ValidateProgress},
4949
util::{
@@ -71,7 +71,7 @@ impl<D: crate::store::Store> BlobsInner<D> {
7171
where
7272
C: ChannelTypes<RpcService>,
7373
{
74-
Handler {
74+
RpcHandler {
7575
blobs: self.clone(),
7676
}
7777
.handle_rpc_request(msg, chan)
@@ -81,11 +81,11 @@ impl<D: crate::store::Store> BlobsInner<D> {
8181

8282
/// RPC handler for the blobs protocol
8383
#[derive(Debug, Clone)]
84-
pub struct Handler<S> {
84+
pub struct RpcHandler<S> {
8585
blobs: Arc<BlobsInner<S>>,
8686
}
8787

88-
impl<D: crate::store::Store> Handler<D> {
88+
impl<D: crate::store::Store> RpcHandler<D> {
8989
fn store(&self) -> &D {
9090
&self.blobs.store
9191
}
@@ -94,6 +94,15 @@ impl<D: crate::store::Store> Handler<D> {
9494
&self.blobs.rt
9595
}
9696

97+
pub(crate) fn from_blobs(blobs: Arc<BlobsInner<D>>) -> Self {
98+
Self { blobs }
99+
}
100+
101+
/// Get the blobs ProtocolHandler
102+
pub fn blobs(&self) -> Arc<Blobs> {
103+
Arc::new(Blobs::from_inner(self.blobs.clone()))
104+
}
105+
97106
/// Handle an RPC request
98107
pub async fn handle_rpc_request<C>(
99108
self,
@@ -900,14 +909,14 @@ impl<D: crate::store::Store> Handler<D> {
900909
}
901910

902911
#[derive(Debug)]
903-
pub(crate) struct RpcHandler {
912+
pub(crate) struct MemRpcHandler {
904913
/// Client to hand out
905914
pub(crate) client: RpcClient<RpcService, MemConnector>,
906915
/// Handler task
907916
handler: AbortOnDropHandle<()>,
908917
}
909918

910-
impl RpcHandler {
919+
impl MemRpcHandler {
911920
pub fn new<D: crate::store::Store>(blobs: &Arc<BlobsInner<D>>) -> Self {
912921
let blobs = blobs.clone();
913922
let (listener, connector) = quic_rpc::transport::flume::channel(1);

0 commit comments

Comments
 (0)