Skip to content

Commit b5be9ab

Browse files
committed
properly handle shutting down the rpc task
1 parent e2ccc85 commit b5be9ab

File tree

4 files changed

+180
-164
lines changed

4 files changed

+180
-164
lines changed

src/net_protocol.rs

Lines changed: 119 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ use crate::{
2525
Stats,
2626
},
2727
provider::EventSender,
28-
store::GcConfig,
28+
rpc::{client::blobs::MemClient, RpcHandler},
29+
store::{GcConfig, Store},
2930
util::{
3031
local_pool::{self, LocalPoolHandle},
3132
progress::{AsyncChannelProgressSender, ProgressSender},
@@ -53,8 +54,8 @@ impl Default for GcState {
5354
}
5455

5556
#[derive(Debug)]
56-
pub struct Blobs<S> {
57-
rt: LocalPoolHandle,
57+
pub(crate) struct BlobsInner<S> {
58+
pub(crate) rt: LocalPoolHandle,
5859
pub(crate) store: S,
5960
events: EventSender,
6061
downloader: Downloader,
@@ -135,36 +136,33 @@ impl<S: crate::store::Store> Builder<S> {
135136

136137
/// Build the Blobs protocol handler.
137138
/// You need to provide a local pool handle and an endpoint.
138-
pub fn build(self, rt: &LocalPoolHandle, endpoint: &Endpoint) -> Arc<Blobs<S>> {
139+
pub fn build(self, rt: &LocalPoolHandle, endpoint: &Endpoint) -> Arc<Blobs> {
139140
let downloader = Downloader::new(self.store.clone(), endpoint.clone(), rt.clone());
140-
Arc::new(Blobs::new(
141+
let inner = Arc::new(BlobsInner::new(
141142
self.store,
142143
rt.clone(),
143144
self.events.unwrap_or_default(),
144145
downloader,
145146
endpoint.clone(),
146-
))
147+
));
148+
Arc::new(Blobs { inner })
147149
}
148150
}
149151

150-
impl<S> Blobs<S> {
152+
impl Blobs {
151153
/// Create a new Blobs protocol handler builder, given a store.
152-
pub fn builder(store: S) -> Builder<S> {
154+
pub fn builder<S>(store: S) -> Builder<S> {
153155
Builder {
154156
store,
155157
events: None,
156158
}
157159
}
158-
}
159160

160-
impl Blobs<crate::store::mem::Store> {
161161
/// Create a new memory-backed Blobs protocol handler.
162162
pub fn memory() -> Builder<crate::store::mem::Store> {
163163
Self::builder(crate::store::mem::Store::new())
164164
}
165-
}
166165

167-
impl Blobs<crate::store::fs::Store> {
168166
/// Load a persistent Blobs protocol handler from a path.
169167
pub async fn persistent(
170168
path: impl AsRef<std::path::Path>,
@@ -173,8 +171,8 @@ impl Blobs<crate::store::fs::Store> {
173171
}
174172
}
175173

176-
impl<S: crate::store::Store> Blobs<S> {
177-
pub fn new(
174+
impl<S: crate::store::Store> BlobsInner<S> {
175+
fn new(
178176
store: S,
179177
rt: LocalPoolHandle,
180178
events: EventSender,
@@ -194,14 +192,6 @@ impl<S: crate::store::Store> Blobs<S> {
194192
}
195193
}
196194

197-
pub fn rt(&self) -> &LocalPoolHandle {
198-
&self.rt
199-
}
200-
201-
pub fn downloader(&self) -> &Downloader {
202-
&self.downloader
203-
}
204-
205195
pub fn endpoint(&self) -> &Endpoint {
206196
&self.endpoint
207197
}
@@ -386,66 +376,67 @@ impl<S: crate::store::Store> Blobs<S> {
386376
}
387377
}
388378

389-
// trait BlobsInner: Debug + Send + Sync + 'static {
390-
// fn shutdown(self: Arc<Self>) -> BoxedFuture<()>;
391-
// fn accept(self: Arc<Self>, conn: Connecting) -> BoxedFuture<Result<()>>;
392-
// fn client(self: Arc<Self>) -> MemClient;
393-
// fn local_pool_handle(&self) -> &LocalPoolHandle;
394-
// fn downloader(&self) -> &Downloader;
395-
// }
396-
397-
// #[derive(Debug)]
398-
// struct Blobs2 {
399-
// inner: Arc<dyn BlobsInner>,
400-
// }
401-
402-
// impl Blobs2 {
403-
// fn client(&self) -> MemClient {
404-
// self.inner.clone().client()
405-
// }
406-
407-
// fn local_pool_handle(&self) -> &LocalPoolHandle {
408-
// self.inner.local_pool_handle()
409-
// }
410-
411-
// fn downloader(&self) -> &Downloader {
412-
// self.inner.downloader()
413-
// }
414-
// }
415-
416-
// impl<S: crate::store::Store> BlobsInner for Blobs<S> {
417-
// fn shutdown(self: Arc<Self>) -> BoxedFuture<()> {
418-
// ProtocolHandler::shutdown(self)
419-
// }
420-
421-
// fn accept(self: Arc<Self>, conn: Connecting) -> BoxedFuture<Result<()>> {
422-
// ProtocolHandler::accept(self, conn)
423-
// }
424-
425-
// fn client(self: Arc<Self>) -> MemClient {
426-
// Blobs::client(self)
427-
// }
428-
429-
// fn local_pool_handle(&self) -> &LocalPoolHandle {
430-
// self.rt()
431-
// }
432-
433-
// fn downloader(&self) -> &Downloader {
434-
// self.downloader()
435-
// }
436-
// }
437-
438-
// impl ProtocolHandler for Blobs2 {
439-
// fn accept(self: Arc<Self>, conn: Connecting) -> BoxedFuture<Result<()>> {
440-
// self.inner.clone().accept(conn)
441-
// }
442-
443-
// fn shutdown(self: Arc<Self>) -> BoxedFuture<()> {
444-
// self.inner.clone().shutdown()
445-
// }
446-
// }
447-
448-
impl<S: crate::store::Store> ProtocolHandler for Blobs<S> {
379+
trait DynBlobs: Debug + Send + Sync + 'static {
380+
fn shutdown(self: Arc<Self>) -> BoxedFuture<()>;
381+
fn accept(self: Arc<Self>, conn: Connecting) -> BoxedFuture<Result<()>>;
382+
fn client(self: Arc<Self>) -> MemClient;
383+
fn local_pool_handle(&self) -> &LocalPoolHandle;
384+
fn downloader(&self) -> &Downloader;
385+
fn endpoint(&self) -> &Endpoint;
386+
fn start_gc(&self, config: GcConfig) -> Result<()>;
387+
fn add_protected(&self, cb: ProtectCb) -> Result<()>;
388+
fn stop_rpc_task(&self);
389+
}
390+
391+
#[derive(Debug)]
392+
pub struct Blobs {
393+
inner: Arc<dyn DynBlobs>,
394+
}
395+
396+
impl Blobs {
397+
pub fn client(&self) -> MemClient {
398+
self.inner.clone().client()
399+
}
400+
401+
pub fn local_pool_handle(&self) -> &LocalPoolHandle {
402+
self.inner.local_pool_handle()
403+
}
404+
405+
pub fn downloader(&self) -> &Downloader {
406+
self.inner.downloader()
407+
}
408+
409+
pub fn endpoint(&self) -> &Endpoint {
410+
self.inner.endpoint()
411+
}
412+
413+
pub fn add_protected(&self, cb: ProtectCb) -> Result<()> {
414+
self.inner.add_protected(cb)
415+
}
416+
417+
pub fn start_gc(&self, config: GcConfig) -> Result<()> {
418+
self.inner.start_gc(config)
419+
}
420+
421+
pub fn new<S: Store>(
422+
store: S,
423+
rt: LocalPoolHandle,
424+
events: EventSender,
425+
downloader: Downloader,
426+
endpoint: Endpoint,
427+
) -> Self {
428+
let inner = Arc::new(BlobsInner::new(store, rt, events, downloader, endpoint));
429+
Self { inner }
430+
}
431+
}
432+
433+
impl Drop for Blobs {
434+
fn drop(&mut self) {
435+
self.inner.stop_rpc_task();
436+
}
437+
}
438+
439+
impl<S: crate::store::Store> DynBlobs for BlobsInner<S> {
449440
fn accept(self: Arc<Self>, conn: Connecting) -> BoxedFuture<Result<()>> {
450441
Box::pin(async move {
451442
crate::provider::handle_connection(
@@ -461,9 +452,55 @@ impl<S: crate::store::Store> ProtocolHandler for Blobs<S> {
461452

462453
fn shutdown(self: Arc<Self>) -> BoxedFuture<()> {
463454
Box::pin(async move {
455+
self.stop_rpc_task();
464456
self.store.shutdown().await;
465457
})
466458
}
459+
460+
fn stop_rpc_task(&self) {
461+
if let Some(rpc_handler) = self.rpc_handler.get() {
462+
rpc_handler.shutdown();
463+
}
464+
}
465+
466+
fn client(self: Arc<Self>) -> MemClient {
467+
let client = self
468+
.rpc_handler
469+
.get_or_init(|| RpcHandler::new(&self))
470+
.client
471+
.clone();
472+
MemClient::new(client)
473+
}
474+
475+
fn local_pool_handle(&self) -> &LocalPoolHandle {
476+
&self.rt
477+
}
478+
479+
fn downloader(&self) -> &Downloader {
480+
&self.downloader
481+
}
482+
483+
fn start_gc(&self, config: GcConfig) -> Result<()> {
484+
self.start_gc(config)
485+
}
486+
487+
fn add_protected(&self, cb: ProtectCb) -> Result<()> {
488+
self.add_protected(cb)
489+
}
490+
491+
fn endpoint(&self) -> &Endpoint {
492+
&self.endpoint
493+
}
494+
}
495+
496+
impl ProtocolHandler for Blobs {
497+
fn accept(self: Arc<Self>, conn: Connecting) -> BoxedFuture<Result<()>> {
498+
self.inner.clone().accept(conn)
499+
}
500+
501+
fn shutdown(self: Arc<Self>) -> BoxedFuture<()> {
502+
self.inner.clone().shutdown()
503+
}
467504
}
468505

469506
/// A request to the node to download and share the data specified by the hash.

0 commit comments

Comments
 (0)