Skip to content

Commit 9193b08

Browse files
committed
Merge branch 'remove-db-public-send' into entity-manager
2 parents 264ed84 + 66f8540 commit 9193b08

File tree

5 files changed

+83
-96
lines changed

5 files changed

+83
-96
lines changed

src/api/blobs.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ impl Blobs {
127127
.await
128128
}
129129

130-
pub fn add_slice(&self, data: impl AsRef<[u8]>) -> AddProgress {
130+
pub fn add_slice(&self, data: impl AsRef<[u8]>) -> AddProgress<'_> {
131131
let options = ImportBytesRequest {
132132
data: Bytes::copy_from_slice(data.as_ref()),
133133
format: crate::BlobFormat::Raw,
@@ -136,7 +136,7 @@ impl Blobs {
136136
self.add_bytes_impl(options)
137137
}
138138

139-
pub fn add_bytes(&self, data: impl Into<bytes::Bytes>) -> AddProgress {
139+
pub fn add_bytes(&self, data: impl Into<bytes::Bytes>) -> AddProgress<'_> {
140140
let options = ImportBytesRequest {
141141
data: data.into(),
142142
format: crate::BlobFormat::Raw,
@@ -145,7 +145,7 @@ impl Blobs {
145145
self.add_bytes_impl(options)
146146
}
147147

148-
pub fn add_bytes_with_opts(&self, options: impl Into<AddBytesOptions>) -> AddProgress {
148+
pub fn add_bytes_with_opts(&self, options: impl Into<AddBytesOptions>) -> AddProgress<'_> {
149149
let options = options.into();
150150
let request = ImportBytesRequest {
151151
data: options.data,
@@ -155,7 +155,7 @@ impl Blobs {
155155
self.add_bytes_impl(request)
156156
}
157157

158-
fn add_bytes_impl(&self, options: ImportBytesRequest) -> AddProgress {
158+
fn add_bytes_impl(&self, options: ImportBytesRequest) -> AddProgress<'_> {
159159
trace!("{options:?}");
160160
let this = self.clone();
161161
let stream = Gen::new(|co| async move {
@@ -180,7 +180,7 @@ impl Blobs {
180180
AddProgress::new(self, stream)
181181
}
182182

183-
pub fn add_path_with_opts(&self, options: impl Into<AddPathOptions>) -> AddProgress {
183+
pub fn add_path_with_opts(&self, options: impl Into<AddPathOptions>) -> AddProgress<'_> {
184184
let options = options.into();
185185
self.add_path_with_opts_impl(ImportPathRequest {
186186
path: options.path,
@@ -190,7 +190,7 @@ impl Blobs {
190190
})
191191
}
192192

193-
fn add_path_with_opts_impl(&self, options: ImportPathRequest) -> AddProgress {
193+
fn add_path_with_opts_impl(&self, options: ImportPathRequest) -> AddProgress<'_> {
194194
trace!("{:?}", options);
195195
let client = self.client.clone();
196196
let stream = Gen::new(|co| async move {
@@ -215,7 +215,7 @@ impl Blobs {
215215
AddProgress::new(self, stream)
216216
}
217217

218-
pub fn add_path(&self, path: impl AsRef<Path>) -> AddProgress {
218+
pub fn add_path(&self, path: impl AsRef<Path>) -> AddProgress<'_> {
219219
self.add_path_with_opts(AddPathOptions {
220220
path: path.as_ref().to_owned(),
221221
mode: ImportMode::Copy,
@@ -226,7 +226,7 @@ impl Blobs {
226226
pub async fn add_stream(
227227
&self,
228228
data: impl Stream<Item = io::Result<Bytes>> + Send + Sync + 'static,
229-
) -> AddProgress {
229+
) -> AddProgress<'_> {
230230
let inner = ImportByteStreamRequest {
231231
format: crate::BlobFormat::Raw,
232232
scope: Scope::default(),
@@ -521,7 +521,7 @@ pub struct Batch<'a> {
521521
}
522522

523523
impl<'a> Batch<'a> {
524-
pub fn add_bytes(&self, data: impl Into<Bytes>) -> BatchAddProgress {
524+
pub fn add_bytes(&self, data: impl Into<Bytes>) -> BatchAddProgress<'_> {
525525
let options = ImportBytesRequest {
526526
data: data.into(),
527527
format: crate::BlobFormat::Raw,
@@ -530,7 +530,7 @@ impl<'a> Batch<'a> {
530530
BatchAddProgress(self.blobs.add_bytes_impl(options))
531531
}
532532

533-
pub fn add_bytes_with_opts(&self, options: impl Into<AddBytesOptions>) -> BatchAddProgress {
533+
pub fn add_bytes_with_opts(&self, options: impl Into<AddBytesOptions>) -> BatchAddProgress<'_> {
534534
let options = options.into();
535535
BatchAddProgress(self.blobs.add_bytes_impl(ImportBytesRequest {
536536
data: options.data,
@@ -539,7 +539,7 @@ impl<'a> Batch<'a> {
539539
}))
540540
}
541541

542-
pub fn add_slice(&self, data: impl AsRef<[u8]>) -> BatchAddProgress {
542+
pub fn add_slice(&self, data: impl AsRef<[u8]>) -> BatchAddProgress<'_> {
543543
let options = ImportBytesRequest {
544544
data: Bytes::copy_from_slice(data.as_ref()),
545545
format: crate::BlobFormat::Raw,
@@ -548,7 +548,7 @@ impl<'a> Batch<'a> {
548548
BatchAddProgress(self.blobs.add_bytes_impl(options))
549549
}
550550

551-
pub fn add_path_with_opts(&self, options: impl Into<AddPathOptions>) -> BatchAddProgress {
551+
pub fn add_path_with_opts(&self, options: impl Into<AddPathOptions>) -> BatchAddProgress<'_> {
552552
let options = options.into();
553553
BatchAddProgress(self.blobs.add_path_with_opts_impl(ImportPathRequest {
554554
path: options.path,

src/metrics.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use iroh_metrics::{Counter, MetricsGroup};
44

55
/// Enum of metrics for the module
66
#[allow(missing_docs)]
7+
#[allow(dead_code)]
78
#[derive(Debug, Default, MetricsGroup)]
89
#[metrics(name = "iroh-blobs")]
910
pub struct Metrics {

src/store/fs.rs

Lines changed: 6 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -247,20 +247,8 @@ impl HashContext {
247247
}
248248

249249
/// Update the entry state in the database, and wait for completion.
250-
pub async fn update(&self, hash: Hash, state: EntryState<Bytes>) -> io::Result<()> {
251-
let (tx, rx) = oneshot::channel();
252-
self.db()
253-
.send(
254-
meta::Update {
255-
hash,
256-
state,
257-
tx: Some(tx),
258-
span: tracing::Span::current(),
259-
}
260-
.into(),
261-
)
262-
.await?;
263-
rx.await.map_err(|_e| io::Error::other(""))??;
250+
pub async fn update_await(&self, hash: Hash, state: EntryState<Bytes>) -> io::Result<()> {
251+
self.db().update_await(hash, state).await?;
264252
Ok(())
265253
}
266254

@@ -270,40 +258,13 @@ impl HashContext {
270258
data_location: DataLocation::Inline(Bytes::new()),
271259
outboard_location: OutboardLocation::NotNeeded,
272260
}));
273-
}
274-
let (tx, rx) = oneshot::channel();
275-
self.db()
276-
.send(
277-
meta::Get {
278-
hash,
279-
tx,
280-
span: tracing::Span::current(),
281-
}
282-
.into(),
283-
)
284-
.await
285-
.ok();
286-
let res = rx.await.map_err(io::Error::other)?;
287-
Ok(res.state?)
261+
};
262+
self.db().get(hash).await
288263
}
289264

290265
/// Update the entry state in the database, and wait for completion.
291266
pub async fn set(&self, hash: Hash, state: EntryState<Bytes>) -> io::Result<()> {
292-
let (tx, rx) = oneshot::channel();
293-
self.db()
294-
.send(
295-
meta::Set {
296-
hash,
297-
state,
298-
tx,
299-
span: tracing::Span::current(),
300-
}
301-
.into(),
302-
)
303-
.await
304-
.map_err(io::Error::other)?;
305-
rx.await.map_err(|_e| io::Error::other(""))??;
306-
Ok(())
267+
self.db().set(hash, state).await
307268
}
308269

309270
pub async fn get_maybe_create(&self, hash: Hash, create: bool) -> api::Result<BaoFileHandle> {
@@ -893,7 +854,7 @@ async fn finish_import_impl(import_data: ImportEntry, ctx: HashContext) -> io::R
893854
data_location,
894855
outboard_location,
895856
};
896-
ctx.update(hash, state).await?;
857+
ctx.update_await(hash, state).await?;
897858
Ok(())
898859
}
899860

src/store/fs/bao_file.rs

Lines changed: 4 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -21,21 +21,17 @@ use bytes::{Bytes, BytesMut};
2121
use derive_more::Debug;
2222
use irpc::channel::mpsc;
2323
use tokio::sync::watch;
24-
use tracing::{debug, error, info, trace, Span};
24+
use tracing::{debug, error, info, trace};
2525

2626
use super::{
2727
entry_state::{DataLocation, EntryState, OutboardLocation},
28-
meta::Update,
2928
options::{Options, PathOptions},
3029
BaoFilePart,
3130
};
3231
use crate::{
3332
api::blobs::Bitfield,
3433
store::{
35-
fs::{
36-
meta::{raw_outboard_size, Set},
37-
TaskContext,
38-
},
34+
fs::{meta::raw_outboard_size, TaskContext},
3935
util::{
4036
read_checksummed_and_truncate, write_checksummed, FixedSize, MemOrFile,
4137
PartialMemStorage, SizeInfo, SparseMemFile, DD,
@@ -644,21 +640,7 @@ impl BaoFileHandle {
644640
let size = storage.bitfield.size;
645641
let (storage, entry_state) = storage.into_complete(size, &options)?;
646642
debug!("File was reconstructed as complete");
647-
let (tx, rx) = crate::util::channel::oneshot::channel();
648-
ctx.db
649-
.sender
650-
.send(
651-
Set {
652-
hash,
653-
state: entry_state,
654-
tx,
655-
span: Span::current(),
656-
}
657-
.into(),
658-
)
659-
.await
660-
.map_err(|_| io::Error::other("send update"))?;
661-
rx.await.map_err(|_| io::Error::other("receive update"))??;
643+
ctx.db.set(hash, entry_state).await?;
662644
storage.into()
663645
} else {
664646
storage.into()
@@ -796,19 +778,7 @@ impl BaoFileHandle {
796778
true
797779
});
798780
if let Some(update) = res? {
799-
ctx.db
800-
.sender
801-
.send(
802-
Update {
803-
hash: self.hash,
804-
state: update,
805-
tx: None,
806-
span: Span::current(),
807-
}
808-
.into(),
809-
)
810-
.await
811-
.map_err(|_| io::Error::other("send update"))?;
781+
ctx.db.update(self.hash, update).await?;
812782
}
813783
Ok(())
814784
}

src/store/fs/meta.rs

Lines changed: 60 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ mod proto;
3434
pub use proto::*;
3535
pub(crate) mod tables;
3636
use tables::{ReadOnlyTables, ReadableTables, Tables};
37-
use tracing::{debug, error, info_span, trace};
37+
use tracing::{debug, error, info_span, trace, Span};
3838

3939
use super::{
4040
delete_set::DeleteHandle,
@@ -88,16 +88,70 @@ pub type ActorResult<T> = Result<T, ActorError>;
8888

8989
#[derive(Debug, Clone)]
9090
pub struct Db {
91-
pub sender: tokio::sync::mpsc::Sender<Command>,
91+
sender: tokio::sync::mpsc::Sender<Command>,
9292
}
9393

9494
impl Db {
9595
pub fn new(sender: tokio::sync::mpsc::Sender<Command>) -> Self {
9696
Self { sender }
9797
}
9898

99+
pub async fn update_await(&self, hash: Hash, state: EntryState<Bytes>) -> io::Result<()> {
100+
let (tx, rx) = oneshot::channel();
101+
self.sender
102+
.send(
103+
Update {
104+
hash,
105+
state,
106+
tx: Some(tx),
107+
span: tracing::Span::current(),
108+
}
109+
.into(),
110+
)
111+
.await
112+
.map_err(|_| io::Error::other("send update"))?;
113+
rx.await
114+
.map_err(|_e| io::Error::other("receive update"))??;
115+
Ok(())
116+
}
117+
118+
/// Update the entry state for a hash, without awaiting completion.
119+
pub async fn update(&self, hash: Hash, state: EntryState<Bytes>) -> io::Result<()> {
120+
self.sender
121+
.send(
122+
Update {
123+
hash,
124+
state,
125+
tx: None,
126+
span: Span::current(),
127+
}
128+
.into(),
129+
)
130+
.await
131+
.map_err(|_| io::Error::other("send update"))
132+
}
133+
134+
/// Set the entry state and await completion.
135+
pub async fn set(&self, hash: Hash, entry_state: EntryState<Bytes>) -> io::Result<()> {
136+
let (tx, rx) = oneshot::channel();
137+
self.sender
138+
.send(
139+
Set {
140+
hash,
141+
state: entry_state,
142+
tx,
143+
span: Span::current(),
144+
}
145+
.into(),
146+
)
147+
.await
148+
.map_err(|_| io::Error::other("send update"))?;
149+
rx.await.map_err(|_| io::Error::other("receive update"))??;
150+
Ok(())
151+
}
152+
99153
/// Get the entry state for a hash, if any.
100-
pub async fn get(&self, hash: Hash) -> anyhow::Result<Option<EntryState<Bytes>>> {
154+
pub async fn get(&self, hash: Hash) -> io::Result<Option<EntryState<Bytes>>> {
101155
let (tx, rx) = oneshot::channel();
102156
self.sender
103157
.send(
@@ -108,8 +162,9 @@ impl Db {
108162
}
109163
.into(),
110164
)
111-
.await?;
112-
let res = rx.await?;
165+
.await
166+
.map_err(|_| io::Error::other("send get"))?;
167+
let res = rx.await.map_err(|_| io::Error::other("receive get"))?;
113168
Ok(res.state?)
114169
}
115170

0 commit comments

Comments
 (0)