From 65493da2105906ebc0a253a5047ff2957515f092 Mon Sep 17 00:00:00 2001 From: HatemMn <19950216+HatemMn@users.noreply.github.com> Date: Fri, 8 Aug 2025 14:07:05 +0200 Subject: [PATCH 01/10] feat: rebased the first working version (finally) style: clippy and clleaning feat: improvements feat: rewrite lost files... feat: many work fix: final fixes fix: final fixes2 fix: fmt fix: delete useless sync traits fix: review fixes fix: first batch of fixes feat: second batch of fixes fix: adt file feat: BYE batcherarc --- Cargo.toml | 3 +- crate/findex/Cargo.toml | 7 +- crate/findex/README.md | 2 + crate/findex/src/adt.rs | 24 ++ crate/findex/src/batcher_findex.rs | 363 ++++++++++++++++++++ crate/findex/src/benches.rs | 6 +- crate/findex/src/error.rs | 46 +++ crate/findex/src/findex.rs | 7 +- crate/findex/src/lib.rs | 6 + crate/findex/src/ovec.rs | 2 +- crate/memories/Cargo.toml | 2 + crate/memories/src/batching_layer/error.rs | 48 +++ crate/memories/src/batching_layer/memory.rs | 257 ++++++++++++++ crate/memories/src/batching_layer/mod.rs | 5 + crate/memories/src/in_memory.rs | 26 ++ crate/memories/src/lib.rs | 19 + 16 files changed, 812 insertions(+), 11 deletions(-) create mode 100644 crate/findex/src/batcher_findex.rs create mode 100644 crate/memories/src/batching_layer/error.rs create mode 100644 crate/memories/src/batching_layer/memory.rs create mode 100644 crate/memories/src/batching_layer/mod.rs diff --git a/Cargo.toml b/Cargo.toml index 08cee221..6b7026e4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ members = ["crate/findex", "crate/memories"] resolver = "2" [workspace.package] -version = "8.0.1" +version = "8.0.2" authors = [ "Bruno Grieder ", "Célia Corsin ", @@ -30,3 +30,4 @@ cosmian_crypto_core = { version = "10.2", default-features = false, features = [ criterion = { version = "0.6" } smol-macros = { version = "0.1" } tokio = { version = "1.46" } +futures = "0.3.31" # Todo: this is a temporary approach diff --git a/crate/findex/Cargo.toml b/crate/findex/Cargo.toml index d3ed614e..b3490a2b 100644 --- a/crate/findex/Cargo.toml +++ b/crate/findex/Cargo.toml @@ -15,14 +15,15 @@ name = "cosmian_findex" path = "src/lib.rs" [features] +batch = ["cosmian_sse_memories/batch"] test-utils = ["agnostic-lite", "criterion"] [dependencies] aes = "0.8" cosmian_crypto_core.workspace = true -cosmian_sse_memories = { path = "../memories", version = "8.0" } +cosmian_sse_memories = { path = "../memories", version = "8.1" } xts-mode = "0.5" - +futures.workspace = true # Optional dependencies for testing and benchmarking. agnostic-lite = { workspace = true, optional = true, features = [ "tokio", @@ -33,7 +34,7 @@ criterion = { workspace = true, optional = true } [dev-dependencies] agnostic-lite = { workspace = true, features = ["tokio"] } -cosmian_sse_memories = { path = "../memories", version = "8.0", features = [ +cosmian_sse_memories = { path = "../memories", version = "8.1", features = [ "redis-mem", "sqlite-mem", "postgres-mem", diff --git a/crate/findex/README.md b/crate/findex/README.md index d22b9a2d..36b8ade7 100644 --- a/crate/findex/README.md +++ b/crate/findex/README.md @@ -2,6 +2,8 @@ This crate provides the core functionality of Findex, defining the abstract data types, cryptographic operations, and encoding algorithms. +Findex also supports batching operations into a singe call to the memory interface, which reduces connection overhead and avoids file descriptor limits on some Linux systems. + ## Setup Add `cosmian_findex` as dependency to your project : diff --git a/crate/findex/src/adt.rs b/crate/findex/src/adt.rs index ad61b09f..015cb02c 100644 --- a/crate/findex/src/adt.rs +++ b/crate/findex/src/adt.rs @@ -51,6 +51,30 @@ pub trait VectorADT: Send { fn read(&self) -> impl Send + Future, Self::Error>>; } +/// This trait extends the functionality of the standard `IndexADT` by providing methods that operate +/// on multiple keywords or entries simultaneously to cut network's overhead and improve performance. +pub trait IndexBatcher { + type Error: std::error::Error; + + /// Search the index for the values bound to the given keywords. + fn batch_search( + &self, + keywords: Vec<&Keyword>, + ) -> impl Future>, Self::Error>>; + + /// Binds each value to their associated keyword in this index. + fn batch_insert( + &self, + entries: Vec<(Keyword, impl Sync + Send + IntoIterator)>, + ) -> impl Send + Future>; + + /// Removes the given values from the index. + fn batch_delete( + &self, + entries: Vec<(Keyword, impl Sync + Send + IntoIterator)>, + ) -> impl Send + Future>; +} + #[cfg(test)] pub mod tests { diff --git a/crate/findex/src/batcher_findex.rs b/crate/findex/src/batcher_findex.rs new file mode 100644 index 00000000..a9f5edd7 --- /dev/null +++ b/crate/findex/src/batcher_findex.rs @@ -0,0 +1,363 @@ +use crate::adt::IndexBatcher; +use crate::error::BatchFindexError; +use crate::{Decoder, Encoder, Findex, IndexADT}; +use cosmian_sse_memories::{ADDRESS_LENGTH, Address, BatchingMemoryADT, MemoryBatcher}; +use std::sync::atomic::AtomicUsize; +use std::{collections::HashSet, fmt::Debug, hash::Hash, sync::Arc}; + +#[derive(Debug)] +pub struct FindexBatcher< + const WORD_LENGTH: usize, + Value: Send + Hash + Eq, + EncodingError: Send + Debug, + BatcherMemory: Clone + Send + BatchingMemoryADT
, Word = [u8; WORD_LENGTH]>, +> { + memory: BatcherMemory, + encode: Arc>, + decode: Arc>, +} + +impl< + const WORD_LENGTH: usize, + Value: Send + Hash + Eq, + BatcherMemory: Debug + + Send + + Sync + + Clone + + BatchingMemoryADT
, Word = [u8; WORD_LENGTH]>, + EncodingError: Send + Debug + std::error::Error, +> FindexBatcher +{ + pub fn new( + memory: BatcherMemory, + encode: Encoder, + decode: Decoder, + ) -> Self { + Self { + memory, + encode: Arc::new(encode), + decode: Arc::new(decode), + } + } + + // Insert or delete are both an unbounded number of calls to `guarded_write` on the memory layer. + async fn batch_insert_or_delete( + &self, + entries: Vec<(Keyword, impl Send + IntoIterator)>, + is_insert: bool, + ) -> Result<(), BatchFindexError> + where + Keyword: Send + Sync + Hash + Eq, + { + let mut futures = Vec::new(); + let buffered_memory = Arc::new(MemoryBatcher::new_writer( + self.memory.clone(), + AtomicUsize::new(entries.len()), + )); + + for (guard_keyword, bindings) in entries { + let memory_arc = buffered_memory.clone(); // TODO + // Create a temporary Findex instance using the shared batching layer + let findex = Findex::::new( + // This (cheap) Arc clone is necessary because `decrement_capacity` is called + // below and needs to be able to access the Arc. + memory_arc.clone(), + *self.encode, + *self.decode, + ); + + let future = async move { + if is_insert { + findex.insert(guard_keyword, bindings).await + } else { + findex.delete(guard_keyword, bindings).await + }?; + // Once one of the operations succeeds, we should make the buffer smaller. + memory_arc.decrement_capacity().await?; + Ok::<_, BatchFindexError<_>>(()) + }; + + futures.push(future); + } + + // Execute all futures concurrently and collect results. + futures::future::try_join_all(futures).await?; + + Ok(()) + } +} + +impl< + const WORD_LENGTH: usize, + Keyword: Send + Sync + Hash + Eq, + Value: Send + Hash + Eq, + EncodingError: Send + Debug + std::error::Error, + BatcherMemory: Debug + + Send + + Sync + + Clone + + BatchingMemoryADT
, Word = [u8; WORD_LENGTH]>, +> IndexBatcher for FindexBatcher +{ + type Error = BatchFindexError; + + async fn batch_insert( + &self, + entries: Vec<(Keyword, impl Sync + Send + IntoIterator)>, + ) -> Result<(), Self::Error> { + self.batch_insert_or_delete(entries, true).await + } + + async fn batch_delete( + &self, + entries: Vec<(Keyword, impl Sync + Send + IntoIterator)>, + ) -> Result<(), Self::Error> { + self.batch_insert_or_delete(entries, false).await + } + + async fn batch_search( + &self, + keywords: Vec<&Keyword>, + ) -> Result>, Self::Error> { + let mut futures = Vec::new(); + let n = keywords.len(); + + let buffered_memory = Arc::new(MemoryBatcher::new_reader( + // TODO: kill this + self.memory.clone(), + AtomicUsize::new(n), + )); + + for keyword in keywords { + let buffered_memory_clone = buffered_memory.clone(); + // Create a temporary Findex instance using the shared batching layer. + let findex = Findex::::new( + buffered_memory_clone, + *self.encode, + *self.decode, + ); + + let future = async move { findex.search(keyword).await }; + futures.push(future); + } + // Execute all futures concurrently and collect results. + let results = futures::future::join_all(futures).await; + + let mut output = Vec::with_capacity(results.len()); + for result in results { + output.push(result?); + } + + Ok(output) + } +} + +// The underlying tests assume the existence of a `Findex` implementation that is correct +// The testing strategy for each function is to use the `Findex` implementation to perform the same operations +// and compare the results with the `BatcherFindex` implementation. +#[cfg(test)] +mod tests { + use super::*; + use crate::{Findex, IndexADT, dummy_decode, dummy_encode}; + use cosmian_crypto_core::define_byte_type; + use cosmian_sse_memories::{ADDRESS_LENGTH, InMemory}; + use std::collections::HashSet; + + type Value = Bytes<8>; + define_byte_type!(Bytes); + + impl TryFrom for Bytes { + type Error = String; + fn try_from(value: usize) -> Result { + Self::try_from(value.to_be_bytes().as_slice()).map_err(|e| e.to_string()) + } + } + + const WORD_LENGTH: usize = 16; + + #[tokio::test] + async fn test_batch_insert() { + let trivial_memory = InMemory::, [u8; WORD_LENGTH]>::default(); + + let batcher_findex = FindexBatcher::::new( + trivial_memory.clone(), + |op, values| { + dummy_encode::(op, values).map_err(BatchFindexError::< + InMemory, [u8; WORD_LENGTH]>, + >::Encoding) + }, + |words| { + dummy_decode(words).map_err(BatchFindexError::< + InMemory, [u8; WORD_LENGTH]>, + >::Encoding) + }, + ); + + let cat_bindings = vec![ + Value::try_from(1).unwrap(), + Value::try_from(2).unwrap(), + Value::try_from(3).unwrap(), + ]; + let dog_bindings = vec![ + Value::try_from(4).unwrap(), + Value::try_from(5).unwrap(), + Value::try_from(6).unwrap(), + ]; + + // Batch insert multiple entries. + let entries = vec![ + ("cat".to_string(), cat_bindings.clone()), + ("dog".to_string(), dog_bindings.clone()), + ]; + + batcher_findex.batch_insert(entries).await.unwrap(); + + // instantiate a (non batched) Findex to verify the results. + let findex = Findex::new( + trivial_memory.clone(), + dummy_encode::, + dummy_decode, + ); + + let cat_result = findex.search(&"cat".to_string()).await.unwrap(); + assert_eq!(cat_result, cat_bindings.into_iter().collect::>()); + + let dog_result = findex.search(&"dog".to_string()).await.unwrap(); + assert_eq!(dog_result, dog_bindings.into_iter().collect::>()); + } + + #[tokio::test] + async fn test_batch_delete() { + let trivial_memory = InMemory::, [u8; WORD_LENGTH]>::default(); + + // First, populate the memory with initial data using regular Findex. + let findex = Findex::new( + trivial_memory.clone(), + dummy_encode::, + dummy_decode, + ); + + let cat_bindings = vec![ + Value::try_from(1).unwrap(), + Value::try_from(3).unwrap(), + Value::try_from(5).unwrap(), + Value::try_from(7).unwrap(), + ]; + let dog_bindings = vec![ + Value::try_from(0).unwrap(), + Value::try_from(2).unwrap(), + Value::try_from(4).unwrap(), + Value::try_from(6).unwrap(), + ]; + + findex + .insert("cat".to_string(), cat_bindings.clone()) + .await + .unwrap(); + findex + .insert("dog".to_string(), dog_bindings.clone()) + .await + .unwrap(); + + // Create BatcherFindex for deletion operations. + let batcher_findex = FindexBatcher::::new( + trivial_memory.clone(), + |op, values| { + dummy_encode::(op, values).map_err(BatchFindexError::< + InMemory, [u8; WORD_LENGTH]>, + >::Encoding) + }, + |words| { + dummy_decode(words).map_err(BatchFindexError::< + InMemory, [u8; WORD_LENGTH]>, + >::Encoding) + }, + ); + + let delete_entries = vec![ + ( + "cat".to_string(), + vec![Value::try_from(1).unwrap(), Value::try_from(5).unwrap()], + ), + ("dog".to_string(), dog_bindings), // Remove all dog bindings. + ]; + + // Perform batch delete. + batcher_findex.batch_delete(delete_entries).await.unwrap(); + + // Verify deletions were performed using a regular findex instance. + let cat_result = findex.search(&"cat".to_string()).await.unwrap(); + let dog_result = findex.search(&"dog".to_string()).await.unwrap(); + + let expected_cat = vec![ + Value::try_from(3).unwrap(), // 1 and 5 removed, 3 and 7 remain. + Value::try_from(7).unwrap(), + ] + .into_iter() + .collect::>(); + let expected_dog = HashSet::new(); // all of the dog bindings are removed. + + assert_eq!(cat_result, expected_cat); + assert_eq!(dog_result, expected_dog); + } + + #[tokio::test] + async fn test_batch_search() { + let trivial_memory = InMemory::, [u8; WORD_LENGTH]>::default(); + + let findex = Findex::new( + trivial_memory.clone(), + dummy_encode::, + dummy_decode, + ); + let cat_bindings = [ + Value::try_from(1).unwrap(), + Value::try_from(3).unwrap(), + Value::try_from(5).unwrap(), + ]; + let dog_bindings = [ + Value::try_from(0).unwrap(), + Value::try_from(2).unwrap(), + Value::try_from(4).unwrap(), + ]; + findex + .insert("cat".to_string(), cat_bindings.clone()) + .await + .unwrap(); + findex + .insert("dog".to_string(), dog_bindings.clone()) + .await + .unwrap(); + + let batcher_findex = FindexBatcher::::new( + trivial_memory, + |op, values| { + dummy_encode::(op, values) + .map_err(BatchFindexError::< + InMemory, [u8; WORD_LENGTH]>, + >::Encoding) + }, + |words| { + dummy_decode(words).map_err(BatchFindexError::< + InMemory, [u8; WORD_LENGTH]>, + >::Encoding) + }, + ); + + let key1 = "cat".to_string(); + let key2 = "dog".to_string(); + // Perform batch search + let batch_search_results = batcher_findex + .batch_search(vec![&key1, &key2]) + .await + .unwrap(); + + assert_eq!( + batch_search_results, + vec![ + cat_bindings.iter().cloned().collect::>(), + dog_bindings.iter().cloned().collect::>() + ] + ); + } +} diff --git a/crate/findex/src/benches.rs b/crate/findex/src/benches.rs index 742ae2a3..d325605d 100644 --- a/crate/findex/src/benches.rs +++ b/crate/findex/src/benches.rs @@ -3,8 +3,10 @@ //! to be generic and work with any memory back end that implements the //! MemoryADT trait. -use std::{collections::HashSet, fmt::Debug, sync::Arc}; - +use crate::{ + Findex, IndexADT, WORD_LENGTH, dummy_decode, dummy_encode, + encryption_layer::MemoryEncryptionLayer, +}; use agnostic_lite::{JoinHandle, RuntimeLite}; use cosmian_crypto_core::{Secret, reexport::rand_core::CryptoRngCore}; use cosmian_sse_memories::{ADDRESS_LENGTH, Address, MemoryADT}; diff --git a/crate/findex/src/error.rs b/crate/findex/src/error.rs index 8954f877..40956220 100644 --- a/crate/findex/src/error.rs +++ b/crate/findex/src/error.rs @@ -1,3 +1,6 @@ +#[cfg(feature = "batch")] +pub use batch_findex_error::*; + use std::fmt::{Debug, Display}; #[derive(Debug)] @@ -16,3 +19,46 @@ impl Display for Error
{ } impl std::error::Error for Error
{} + +#[cfg(feature = "batch")] +pub mod batch_findex_error { + use super::*; + use cosmian_sse_memories::{BatchingLayerError, MemoryADT}; + + #[derive(Debug)] + pub enum BatchFindexError { + BatchingLayer(BatchingLayerError), + Findex(Error), + Encoding(String), + } + + impl Display for BatchFindexError + where + ::Address: Debug, + { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::BatchingLayer(e) => write!(f, "Batching layer error: {e}"), + Self::Findex(error) => write!(f, "Findex error: {error:?}"), + Self::Encoding(msg) => write!(f, "Encoding error: {msg}"), + } + } + } + + impl From> for BatchFindexError { + fn from(e: Error) -> Self { + Self::Findex(e) + } + } + + impl From> for BatchFindexError { + fn from(e: BatchingLayerError) -> Self { + Self::BatchingLayer(e) + } + } + + impl std::error::Error for BatchFindexError where + ::Address: Debug + { + } +} diff --git a/crate/findex/src/findex.rs b/crate/findex/src/findex.rs index 9b1e9302..19c528e1 100644 --- a/crate/findex/src/findex.rs +++ b/crate/findex/src/findex.rs @@ -1,5 +1,3 @@ -#![allow(clippy::type_complexity)] - use std::{ collections::HashSet, fmt::Debug, @@ -140,8 +138,9 @@ impl< #[cfg(test)] mod tests { - use std::{collections::HashSet, sync::Arc}; - + use crate::{ + Findex, IndexADT, dummy_decode, dummy_encode, encryption_layer::MemoryEncryptionLayer, + }; use cosmian_crypto_core::{CsRng, Secret, define_byte_type, reexport::rand_core::SeedableRng}; use cosmian_sse_memories::{ADDRESS_LENGTH, Address, InMemory}; use smol_macros::Executor; diff --git a/crate/findex/src/lib.rs b/crate/findex/src/lib.rs index d328cc55..39a84f80 100644 --- a/crate/findex/src/lib.rs +++ b/crate/findex/src/lib.rs @@ -17,6 +17,12 @@ mod benches; pub use adt::IndexADT; #[cfg(feature = "test-utils")] pub use benches::*; + +#[cfg(feature = "batch")] +mod batcher_findex; +#[cfg(feature = "batch")] +pub use batcher_findex::FindexBatcher; + pub use encoding::{ Decoder, Encoder, generic_encoding::{generic_decode, generic_encode}, diff --git a/crate/findex/src/ovec.rs b/crate/findex/src/ovec.rs index 902599c1..23eb840a 100644 --- a/crate/findex/src/ovec.rs +++ b/crate/findex/src/ovec.rs @@ -218,8 +218,8 @@ mod tests { use cosmian_sse_memories::{ADDRESS_LENGTH, Address, InMemory}; use crate::{ - MemoryEncryptionLayer, adt::tests::{test_vector_concurrent, test_vector_sequential}, + encryption_layer::MemoryEncryptionLayer, ovec::IVec, }; diff --git a/crate/memories/Cargo.toml b/crate/memories/Cargo.toml index 2a3b2454..2fffc7a1 100644 --- a/crate/memories/Cargo.toml +++ b/crate/memories/Cargo.toml @@ -19,6 +19,7 @@ redis-mem = ["redis"] sqlite-mem = ["async-sqlite"] postgres-mem = ["deadpool-postgres", "tokio", "tokio-postgres"] test-utils = ["tokio"] +batch = [] [dependencies] agnostic-lite = { workspace = true } @@ -33,6 +34,7 @@ tokio = { workspace = true, optional = true } tokio-postgres = { version = "0.7", optional = true, features = [ "array-impls", ] } +futures.workspace = true [dev-dependencies] agnostic-lite = { workspace = true, features = ["tokio", "smol"] } diff --git a/crate/memories/src/batching_layer/error.rs b/crate/memories/src/batching_layer/error.rs new file mode 100644 index 00000000..1bf1609c --- /dev/null +++ b/crate/memories/src/batching_layer/error.rs @@ -0,0 +1,48 @@ +use futures::channel::oneshot::Canceled; +use std::fmt::{Debug, Display}; + +use crate::MemoryADT; + +#[derive(Debug)] +pub enum BatchingLayerError { + Memory(M::Error), // the from will not be implemented due to conflicting implementations with Rust's `core` library. Use `map_err` instead of `?`. + Mutex(String), + Channel(String), + BufferOverflow(String), + WrongOperation(String), +} + +impl Display for BatchingLayerError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Memory(err) => write!(f, "Memory error: {:?}", err), + Self::Mutex(msg) => write!(f, "Mutex error: {}", msg), + Self::Channel(msg) => { + write!(f, "Channel closed unexpectedly: {}", msg) + } + Self::BufferOverflow(msg) => { + write!(f, "Buffer overflow: {}", msg) + } + Self::WrongOperation(msg) => { + write!(f, "Wrong operation: {}", msg) + } + } + } +} + +impl From for BatchingLayerError { + fn from(_: Canceled) -> Self { + Self::Channel( + "The sender was dropped before sending its results with the `send` function." + .to_string(), + ) + } +} + +impl From> for BatchingLayerError { + fn from(e: std::sync::PoisonError) -> Self { + Self::Mutex(format!("Mutex lock poisoned: {e}")) + } +} + +impl std::error::Error for BatchingLayerError {} diff --git a/crate/memories/src/batching_layer/memory.rs b/crate/memories/src/batching_layer/memory.rs new file mode 100644 index 00000000..6a96bfef --- /dev/null +++ b/crate/memories/src/batching_layer/memory.rs @@ -0,0 +1,257 @@ +// ---------------------------------- BufferedMemory Structure ---------------------------------- +// It takes as inner memory any memory that implements the batcher ADT +// which is basically, having MemoryADT + The function batch_guarded_write + +use futures::channel::oneshot; +use std::fmt::Debug; +use std::ops::Deref; +use std::sync::{ + Arc, Mutex, + atomic::{AtomicUsize, Ordering}, +}; + +use crate::batching_layer::BatchingLayerError; +use crate::{BatchingMemoryADT, MemoryADT}; + +type ReadOperation = ( + Vec<::Address>, + oneshot::Sender::Word>>, ::Error>>, +); + +type WriteOperation = ( + ( + (::Address, Option<::Word>), + Vec<(::Address, ::Word)>, + ), + oneshot::Sender::Word>, ::Error>>, +); + +#[allow(clippy::type_complexity)] // refactoring this type will make the code unnecessarily more difficult to read without any actual benefit +enum PendingOperations +where + M::Address: Clone, +{ + PendingReads(Mutex>>), + PendingWrites(Mutex>>), +} + +impl PendingOperations +where + M::Address: Clone, +{ + // Gets the lock of the buffer and returns its length; hence the name. + pub fn lock_and_get_len(&self) -> Result> { + Ok(match self { + Self::PendingReads(read_ops) => read_ops.lock()?.len(), + Self::PendingWrites(write_ops) => write_ops.lock()?.len(), + }) + } +} + +pub struct MemoryBatcher +where + M::Address: Clone, +{ + inner: M, // the actual memory layer that implements the actual network / memory call + capacity: AtomicUsize, // capacity at which the operation should be executed + buffer: PendingOperations, +} + +impl MemoryBatcher +where + ::Address: Clone, +{ + pub const fn new_reader(inner: M, capacity: AtomicUsize) -> Self { + Self { + inner, + capacity, + buffer: PendingOperations::PendingReads(Mutex::new(Vec::new())), + } + } + + pub const fn new_writer(inner: M, capacity: AtomicUsize) -> Self { + Self { + inner, + capacity, + buffer: PendingOperations::PendingWrites(Mutex::new(Vec::new())), + } + } + + // atomically decrement the buffer size, needed on inserts/deletes + pub async fn decrement_capacity(&self) -> Result<(), BatchingLayerError> { + // `fetch_sub` returns the previous value, so if it was 1, it means the buffer's job is done + let previous = self.capacity.fetch_sub(1, Ordering::SeqCst); + match &self.buffer { + PendingOperations::PendingReads(read_ops) => { + if previous <= read_ops.lock()?.len() { + let _ = self.flush().await; + } + } + PendingOperations::PendingWrites(write_ops) => { + if previous <= write_ops.lock()?.len() { + let _ = self.flush().await; + } + } + } + Ok(()) + } + + async fn flush(&self) -> Result<(), BatchingLayerError> { + if self.buffer.lock_and_get_len()? > self.capacity.load(Ordering::SeqCst) { + return Err(BatchingLayerError::BufferOverflow( + "The buffer vector's length is greater than the capacity, this should not happen." + .to_owned(), + )); + } + // check if the buffer is full + if self.buffer.lock_and_get_len()? == self.capacity.load(Ordering::SeqCst) { + match &self.buffer { + PendingOperations::PendingReads(read_ops) => { + let batches = std::mem::take(&mut *read_ops.lock()?); + + // Build combined address list while tracking which addresses belong to which batch + let all_addresses = batches + .iter() + .flat_map(|(addresses, _)| addresses.iter()) + .cloned() + .collect(); + + let mut aggregated_reads_results = self + .inner + .batch_read(all_addresses) + .await + .map_err(BatchingLayerError::::Memory)?; + + // Distribute results to each batch's sender + for (input_addresses, sender) in batches.into_iter().rev() { + let split_point = aggregated_reads_results.len() - input_addresses.len(); // This is the point where the last batch's results start + let batch_results = aggregated_reads_results.split_off(split_point); // After this call, all_results will be left containing the elements [0, split_point) + sender.send(Ok(batch_results)).map_err(|_| { + // Upon failure, the vector we tried to send is returned in the Err varient, but it's explicitly ignored here to not extract information. + BatchingLayerError::::Channel( + "The receiver end of this read operation was dropped before the `send` function could be called." + .to_owned(), + ) + })?; + } + } + PendingOperations::PendingWrites(write_ops) => { + let batches = std::mem::take(&mut *write_ops.lock()?); + + let (bindings, senders): (Vec<_>, Vec<_>) = batches.into_iter().unzip(); + + let aggregated_writes_results = self + .inner + .batch_guarded_write(bindings) + .await + .map_err(BatchingLayerError::::Memory)?; + + // Distribute results to each batch's sender + for (res, sender) in aggregated_writes_results.into_iter().zip(senders) { + sender.send(Ok(res)).map_err(|_| { + BatchingLayerError::::Channel( + "The receiver end of this write operation was dropped before the `send` function could be called." + .to_owned(), + ) + })?; + } + } + }; + } + Ok(()) + } +} + +impl MemoryADT for MemoryBatcher +where + M::Address: Clone + Send, + M::Word: Send, +{ + type Address = M::Address; + type Word = M::Word; + type Error = BatchingLayerError; + + async fn batch_read( + &self, + addresses: Vec, + ) -> Result>, Self::Error> { + match &self.buffer { + PendingOperations::PendingWrites(_) => Err(BatchingLayerError::WrongOperation( + "`batch_read` is called on a writer MemoryBatcher, make sure to use `new_reader` during initialization.".to_owned() + )), + PendingOperations::PendingReads(read_ops) => { + // Create a channel for this batch. + let (sender, receiver) = oneshot::channel(); + + // Add to pending batches. + { + let mut pending = read_ops.lock()?; + pending.push((addresses, sender)); + + // Determine if we should flush. + } + + // Each thread tries to flush but only one will succeed and empty the buffer. + self.flush().await?; + + // Wait for results. + receiver + .await? + .map_err(|e| BatchingLayerError::::Memory(e)) + } + } + } + + async fn guarded_write( + &self, + guard: (Self::Address, Option), + bindings: Vec<(Self::Address, Self::Word)>, + ) -> Result, Self::Error> { + match &self.buffer { + PendingOperations::PendingReads(_) => Err(BatchingLayerError::WrongOperation( + "`guarded_write` is called on a reader MemoryBatcher, make sure to use `new_writer` during initialization.".to_owned() + )), + PendingOperations::PendingWrites(write_ops) => { + let (sender, receiver) = oneshot::channel(); + + { + let mut pending = write_ops.lock()?; + pending.push(((guard, bindings), sender)); + } + + self.flush().await?; + + receiver + .await? + .map_err(|e| BatchingLayerError::::Memory(e)) + } + } + } +} + +// This simply forwards the BR/GW calls to the inner memory +// when findex instances (below) call the batcher's operations +impl MemoryADT for Arc> +where + M::Address: Send + Clone, + M::Word: Send, +{ + type Address = M::Address; + type Word = M::Word; + type Error = BatchingLayerError; + + async fn batch_read( + &self, + addresses: Vec, + ) -> Result>, Self::Error> { + (**self).batch_read(addresses).await + } + + async fn guarded_write( + &self, + guard: (Self::Address, Option), + bindings: Vec<(Self::Address, Self::Word)>, + ) -> Result, Self::Error> { + (**self).guarded_write(guard, bindings).await + } +} diff --git a/crate/memories/src/batching_layer/mod.rs b/crate/memories/src/batching_layer/mod.rs new file mode 100644 index 00000000..a34f92a3 --- /dev/null +++ b/crate/memories/src/batching_layer/mod.rs @@ -0,0 +1,5 @@ +mod error; +mod memory; + +pub use error::BatchingLayerError; +pub use memory::MemoryBatcher; diff --git a/crate/memories/src/in_memory.rs b/crate/memories/src/in_memory.rs index 101e0e8b..87a9a212 100644 --- a/crate/memories/src/in_memory.rs +++ b/crate/memories/src/in_memory.rs @@ -7,6 +7,8 @@ use std::{ sync::{Arc, Mutex}, }; +#[cfg(feature = "batch")] +use crate::BatchingMemoryADT; use crate::MemoryADT; #[derive(Debug, Clone, PartialEq, Eq)] @@ -74,6 +76,30 @@ impl Memory } } +#[cfg(feature = "batch")] +impl BatchingMemoryADT + for InMemory +{ + async fn batch_guarded_write( + &self, + operations: Vec<((Address, Option), Vec<(Address, Value)>)>, + ) -> Result>, Self::Error> { + let store = &mut *self.inner.lock().expect("poisoned lock"); + let mut res = Vec::with_capacity(operations.len()); + for (guard, bindings) in operations { + let (a, old) = guard; + let cur = store.get(&a).cloned(); + if old == cur { + for (k, v) in bindings { + store.insert(k, v); + } + } + res.push(cur); + } + Ok(res) + } +} + impl IntoIterator for InMemory { diff --git a/crate/memories/src/lib.rs b/crate/memories/src/lib.rs index d2bccf8b..62f3a008 100644 --- a/crate/memories/src/lib.rs +++ b/crate/memories/src/lib.rs @@ -54,3 +54,22 @@ pub trait MemoryADT { bindings: Vec<(Self::Address, Self::Word)>, ) -> impl Send + std::future::Future, Self::Error>>; } + +#[cfg(feature = "batch")] +mod batching_layer; + +#[cfg(feature = "batch")] +pub use batching_layer::{BatchingLayerError, MemoryBatcher}; + +// Super trait for MemoryADT that allows doing write operations in batches. +#[cfg(feature = "batch")] +pub trait BatchingMemoryADT: MemoryADT { + #[allow(clippy::type_complexity)] // Refactoring this type will make the code unnecessarily more difficult to read without any actual benefit. + fn batch_guarded_write( + &self, + write_operations: Vec<( + (Self::Address, Option), + Vec<(Self::Address, Self::Word)>, + )>, + ) -> impl Send + std::future::Future>, Self::Error>>; +} From e768ee52fb151b3252df37480a9a69e68083b889 Mon Sep 17 00:00:00 2001 From: HatemMn <19950216+HatemMn@users.noreply.github.com> Date: Mon, 18 Aug 2025 17:03:16 +0200 Subject: [PATCH 02/10] feat: some last rev fixes --- crate/findex/src/batcher_findex.rs | 64 ++++++--------------- crate/findex/src/error.rs | 2 - crate/memories/src/batching_layer/memory.rs | 1 - 3 files changed, 16 insertions(+), 51 deletions(-) diff --git a/crate/findex/src/batcher_findex.rs b/crate/findex/src/batcher_findex.rs index a9f5edd7..ffc10cf2 100644 --- a/crate/findex/src/batcher_findex.rs +++ b/crate/findex/src/batcher_findex.rs @@ -25,7 +25,7 @@ impl< + Sync + Clone + BatchingMemoryADT
, Word = [u8; WORD_LENGTH]>, - EncodingError: Send + Debug + std::error::Error, + EncodingError: Send + Debug, > FindexBatcher { pub fn new( @@ -91,7 +91,7 @@ impl< const WORD_LENGTH: usize, Keyword: Send + Sync + Hash + Eq, Value: Send + Hash + Eq, - EncodingError: Send + Debug + std::error::Error, + EncodingError: Send + Debug, BatcherMemory: Debug + Send + Sync @@ -120,19 +120,16 @@ impl< keywords: Vec<&Keyword>, ) -> Result>, Self::Error> { let mut futures = Vec::new(); - let n = keywords.len(); - let buffered_memory = Arc::new(MemoryBatcher::new_reader( - // TODO: kill this self.memory.clone(), - AtomicUsize::new(n), + AtomicUsize::new(keywords.len()), )); for keyword in keywords { - let buffered_memory_clone = buffered_memory.clone(); + let buffered_memory = buffered_memory.clone(); // Create a temporary Findex instance using the shared batching layer. let findex = Findex::::new( - buffered_memory_clone, + buffered_memory, *self.encode, *self.decode, ); @@ -140,15 +137,11 @@ impl< let future = async move { findex.search(keyword).await }; futures.push(future); } - // Execute all futures concurrently and collect results. - let results = futures::future::join_all(futures).await; - - let mut output = Vec::with_capacity(results.len()); - for result in results { - output.push(result?); - } - Ok(output) + // Execute all futures concurrently and collect results. + futures::future::try_join_all(futures) + .await + .map_err(|e| BatchFindexError::Findex(e)) } } @@ -181,16 +174,8 @@ mod tests { let batcher_findex = FindexBatcher::::new( trivial_memory.clone(), - |op, values| { - dummy_encode::(op, values).map_err(BatchFindexError::< - InMemory, [u8; WORD_LENGTH]>, - >::Encoding) - }, - |words| { - dummy_decode(words).map_err(BatchFindexError::< - InMemory, [u8; WORD_LENGTH]>, - >::Encoding) - }, + dummy_encode, + dummy_decode, ); let cat_bindings = vec![ @@ -262,16 +247,8 @@ mod tests { // Create BatcherFindex for deletion operations. let batcher_findex = FindexBatcher::::new( trivial_memory.clone(), - |op, values| { - dummy_encode::(op, values).map_err(BatchFindexError::< - InMemory, [u8; WORD_LENGTH]>, - >::Encoding) - }, - |words| { - dummy_decode(words).map_err(BatchFindexError::< - InMemory, [u8; WORD_LENGTH]>, - >::Encoding) - }, + dummy_encode, + dummy_decode, ); let delete_entries = vec![ @@ -330,18 +307,9 @@ mod tests { .unwrap(); let batcher_findex = FindexBatcher::::new( - trivial_memory, - |op, values| { - dummy_encode::(op, values) - .map_err(BatchFindexError::< - InMemory, [u8; WORD_LENGTH]>, - >::Encoding) - }, - |words| { - dummy_decode(words).map_err(BatchFindexError::< - InMemory, [u8; WORD_LENGTH]>, - >::Encoding) - }, + trivial_memory.clone(), + dummy_encode, + dummy_decode, ); let key1 = "cat".to_string(); diff --git a/crate/findex/src/error.rs b/crate/findex/src/error.rs index 40956220..1f432b02 100644 --- a/crate/findex/src/error.rs +++ b/crate/findex/src/error.rs @@ -29,7 +29,6 @@ pub mod batch_findex_error { pub enum BatchFindexError { BatchingLayer(BatchingLayerError), Findex(Error), - Encoding(String), } impl Display for BatchFindexError @@ -40,7 +39,6 @@ pub mod batch_findex_error { match self { Self::BatchingLayer(e) => write!(f, "Batching layer error: {e}"), Self::Findex(error) => write!(f, "Findex error: {error:?}"), - Self::Encoding(msg) => write!(f, "Encoding error: {msg}"), } } } diff --git a/crate/memories/src/batching_layer/memory.rs b/crate/memories/src/batching_layer/memory.rs index 6a96bfef..787bd1bf 100644 --- a/crate/memories/src/batching_layer/memory.rs +++ b/crate/memories/src/batching_layer/memory.rs @@ -4,7 +4,6 @@ use futures::channel::oneshot; use std::fmt::Debug; -use std::ops::Deref; use std::sync::{ Arc, Mutex, atomic::{AtomicUsize, Ordering}, From 6900efc9ec4f3dac9aaf1a4a567a678a9c59f51c Mon Sep 17 00:00:00 2001 From: HatemMn <19950216+HatemMn@users.noreply.github.com> Date: Mon, 25 Aug 2025 14:02:04 +0200 Subject: [PATCH 03/10] feat: detach buffer feat: i think it's finally working feat: errors fix: refactor tests and some more fixes fix: a lot of formatting fix: more formats and comments fix: more formats and comments2 --- Cargo.toml | 2 +- crate/findex/Cargo.toml | 4 +- crate/findex/src/adt.rs | 6 +- crate/findex/src/batcher_findex.rs | 170 ++++---- crate/findex/src/benches.rs | 6 +- crate/findex/src/error.rs | 33 +- crate/findex/src/findex.rs | 5 +- crate/findex/src/lib.rs | 13 +- crate/findex/src/ovec.rs | 2 +- crate/memories/src/batching_layer/buffer.rs | 95 +++++ crate/memories/src/batching_layer/error.rs | 60 ++- crate/memories/src/batching_layer/memory.rs | 369 ++++++++---------- crate/memories/src/batching_layer/mod.rs | 4 +- .../memories/src/batching_layer/operation.rs | 62 +++ crate/memories/src/lib.rs | 2 +- 15 files changed, 478 insertions(+), 355 deletions(-) create mode 100644 crate/memories/src/batching_layer/buffer.rs create mode 100644 crate/memories/src/batching_layer/operation.rs diff --git a/Cargo.toml b/Cargo.toml index 6b7026e4..fdde75e1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,4 +30,4 @@ cosmian_crypto_core = { version = "10.2", default-features = false, features = [ criterion = { version = "0.6" } smol-macros = { version = "0.1" } tokio = { version = "1.46" } -futures = "0.3.31" # Todo: this is a temporary approach +futures = "0.3.31" diff --git a/crate/findex/Cargo.toml b/crate/findex/Cargo.toml index b3490a2b..51f02cff 100644 --- a/crate/findex/Cargo.toml +++ b/crate/findex/Cargo.toml @@ -21,7 +21,7 @@ test-utils = ["agnostic-lite", "criterion"] [dependencies] aes = "0.8" cosmian_crypto_core.workspace = true -cosmian_sse_memories = { path = "../memories", version = "8.1" } +cosmian_sse_memories = { path = "../memories", version = "8.0.2" } # Should be above 8.0.2 to have batching features. xts-mode = "0.5" futures.workspace = true # Optional dependencies for testing and benchmarking. @@ -34,7 +34,7 @@ criterion = { workspace = true, optional = true } [dev-dependencies] agnostic-lite = { workspace = true, features = ["tokio"] } -cosmian_sse_memories = { path = "../memories", version = "8.1", features = [ +cosmian_sse_memories = { path = "../memories", version = "8.0.2", features = [ "redis-mem", "sqlite-mem", "postgres-mem", diff --git a/crate/findex/src/adt.rs b/crate/findex/src/adt.rs index 015cb02c..e4746317 100644 --- a/crate/findex/src/adt.rs +++ b/crate/findex/src/adt.rs @@ -51,8 +51,10 @@ pub trait VectorADT: Send { fn read(&self) -> impl Send + Future, Self::Error>>; } -/// This trait extends the functionality of the standard `IndexADT` by providing methods that operate -/// on multiple keywords or entries simultaneously to cut network's overhead and improve performance. +/// This trait extends the functionality of the standard `IndexADT` by +/// providing methods that operate on multiple keywords or entries +/// simultaneously. +#[cfg(feature = "batch")] pub trait IndexBatcher { type Error: std::error::Error; diff --git a/crate/findex/src/batcher_findex.rs b/crate/findex/src/batcher_findex.rs index ffc10cf2..ca531622 100644 --- a/crate/findex/src/batcher_findex.rs +++ b/crate/findex/src/batcher_findex.rs @@ -1,10 +1,9 @@ -use crate::adt::IndexBatcher; -use crate::error::BatchFindexError; -use crate::{Decoder, Encoder, Findex, IndexADT}; -use cosmian_sse_memories::{ADDRESS_LENGTH, Address, BatchingMemoryADT, MemoryBatcher}; -use std::sync::atomic::AtomicUsize; use std::{collections::HashSet, fmt::Debug, hash::Hash, sync::Arc}; +use cosmian_sse_memories::{ADDRESS_LENGTH, Address, BatchingMemoryADT, MemoryBatcher}; + +use crate::{Decoder, Encoder, Findex, IndexADT, adt::IndexBatcher, error::BatchFindexError}; + #[derive(Debug)] pub struct FindexBatcher< const WORD_LENGTH: usize, @@ -13,8 +12,8 @@ pub struct FindexBatcher< BatcherMemory: Clone + Send + BatchingMemoryADT
, Word = [u8; WORD_LENGTH]>, > { memory: BatcherMemory, - encode: Arc>, - decode: Arc>, + encode: Encoder, + decode: Decoder, } impl< @@ -35,12 +34,13 @@ impl< ) -> Self { Self { memory, - encode: Arc::new(encode), - decode: Arc::new(decode), + encode, + decode, } } - // Insert or delete are both an unbounded number of calls to `guarded_write` on the memory layer. + // Both insert and delete operations make an unbounded number of calls to + // `guarded_write` on the memory layer. async fn batch_insert_or_delete( &self, entries: Vec<(Keyword, impl Send + IntoIterator)>, @@ -50,20 +50,15 @@ impl< Keyword: Send + Sync + Hash + Eq, { let mut futures = Vec::new(); - let buffered_memory = Arc::new(MemoryBatcher::new_writer( - self.memory.clone(), - AtomicUsize::new(entries.len()), - )); + let memory = Arc::new(MemoryBatcher::new(self.memory.clone(), entries.len())); for (guard_keyword, bindings) in entries { - let memory_arc = buffered_memory.clone(); // TODO - // Create a temporary Findex instance using the shared batching layer + let memory = memory.clone(); + // Create a temporary Findex instance using the shared batching layer. let findex = Findex::::new( - // This (cheap) Arc clone is necessary because `decrement_capacity` is called - // below and needs to be able to access the Arc. - memory_arc.clone(), - *self.encode, - *self.decode, + memory.clone(), + self.encode, + self.decode, ); let future = async move { @@ -73,7 +68,7 @@ impl< findex.delete(guard_keyword, bindings).await }?; // Once one of the operations succeeds, we should make the buffer smaller. - memory_arc.decrement_capacity().await?; + memory.unsubscribe().await?; Ok::<_, BatchFindexError<_>>(()) }; @@ -120,47 +115,46 @@ impl< keywords: Vec<&Keyword>, ) -> Result>, Self::Error> { let mut futures = Vec::new(); - let buffered_memory = Arc::new(MemoryBatcher::new_reader( - self.memory.clone(), - AtomicUsize::new(keywords.len()), - )); + let memory = Arc::new(MemoryBatcher::new(self.memory.clone(), keywords.len())); for keyword in keywords { - let buffered_memory = buffered_memory.clone(); - // Create a temporary Findex instance using the shared batching layer. + let memory = memory.clone(); let findex = Findex::::new( - buffered_memory, - *self.encode, - *self.decode, + memory, + self.encode, + self.decode, ); let future = async move { findex.search(keyword).await }; futures.push(future); } - // Execute all futures concurrently and collect results. futures::future::try_join_all(futures) .await .map_err(|e| BatchFindexError::Findex(e)) } } -// The underlying tests assume the existence of a `Findex` implementation that is correct -// The testing strategy for each function is to use the `Findex` implementation to perform the same operations -// and compare the results with the `BatcherFindex` implementation. +// The underlying tests assume the existence of a `Findex` implementation that +// is correct The testing strategy for each function is to use the `Findex` +// implementation to perform the same operations and compare the results with +// the `BatcherFindex` implementation. #[cfg(test)] mod tests { - use super::*; - use crate::{Findex, IndexADT, dummy_decode, dummy_encode}; + use std::collections::HashSet; + use cosmian_crypto_core::define_byte_type; use cosmian_sse_memories::{ADDRESS_LENGTH, InMemory}; - use std::collections::HashSet; + + use super::*; + use crate::{Findex, IndexADT, dummy_decode, dummy_encode}; type Value = Bytes<8>; define_byte_type!(Bytes); impl TryFrom for Bytes { type Error = String; + fn try_from(value: usize) -> Result { Self::try_from(value.to_be_bytes().as_slice()).map_err(|e| e.to_string()) } @@ -169,19 +163,15 @@ mod tests { const WORD_LENGTH: usize = 16; #[tokio::test] - async fn test_batch_insert() { + async fn test_batch_insert_and_delete() { let trivial_memory = InMemory::, [u8; WORD_LENGTH]>::default(); - let batcher_findex = FindexBatcher::::new( - trivial_memory.clone(), - dummy_encode, - dummy_decode, - ); - + // Initial data for insertion let cat_bindings = vec![ Value::try_from(1).unwrap(), Value::try_from(2).unwrap(), Value::try_from(3).unwrap(), + Value::try_from(7).unwrap(), ]; let dog_bindings = vec![ Value::try_from(4).unwrap(), @@ -189,52 +179,13 @@ mod tests { Value::try_from(6).unwrap(), ]; - // Batch insert multiple entries. - let entries = vec![ - ("cat".to_string(), cat_bindings.clone()), - ("dog".to_string(), dog_bindings.clone()), - ]; - - batcher_findex.batch_insert(entries).await.unwrap(); - - // instantiate a (non batched) Findex to verify the results. + // Insert using normal findex let findex = Findex::new( trivial_memory.clone(), dummy_encode::, dummy_decode, ); - let cat_result = findex.search(&"cat".to_string()).await.unwrap(); - assert_eq!(cat_result, cat_bindings.into_iter().collect::>()); - - let dog_result = findex.search(&"dog".to_string()).await.unwrap(); - assert_eq!(dog_result, dog_bindings.into_iter().collect::>()); - } - - #[tokio::test] - async fn test_batch_delete() { - let trivial_memory = InMemory::, [u8; WORD_LENGTH]>::default(); - - // First, populate the memory with initial data using regular Findex. - let findex = Findex::new( - trivial_memory.clone(), - dummy_encode::, - dummy_decode, - ); - - let cat_bindings = vec![ - Value::try_from(1).unwrap(), - Value::try_from(3).unwrap(), - Value::try_from(5).unwrap(), - Value::try_from(7).unwrap(), - ]; - let dog_bindings = vec![ - Value::try_from(0).unwrap(), - Value::try_from(2).unwrap(), - Value::try_from(4).unwrap(), - Value::try_from(6).unwrap(), - ]; - findex .insert("cat".to_string(), cat_bindings.clone()) .await @@ -244,38 +195,55 @@ mod tests { .await .unwrap(); - // Create BatcherFindex for deletion operations. - let batcher_findex = FindexBatcher::::new( + // Create a `findex_batcher` instance + let findex_batcher = FindexBatcher::::new( trivial_memory.clone(), dummy_encode, dummy_decode, ); - let delete_entries = vec![ + // Test batch delete + let deletion_entries = vec![ ( "cat".to_string(), - vec![Value::try_from(1).unwrap(), Value::try_from(5).unwrap()], + vec![Value::try_from(1).unwrap(), Value::try_from(3).unwrap()], // Partial deletion ), - ("dog".to_string(), dog_bindings), // Remove all dog bindings. + ("dog".to_string(), dog_bindings), // Complete deletion ]; - // Perform batch delete. - batcher_findex.batch_delete(delete_entries).await.unwrap(); + findex_batcher.batch_delete(deletion_entries).await.unwrap(); - // Verify deletions were performed using a regular findex instance. - let cat_result = findex.search(&"cat".to_string()).await.unwrap(); - let dog_result = findex.search(&"dog".to_string()).await.unwrap(); + // Verify deletions using normal findex + let cat_result_after_delete = findex.search(&"cat".to_string()).await.unwrap(); + let dog_result_after_delete = findex.search(&"dog".to_string()).await.unwrap(); let expected_cat = vec![ - Value::try_from(3).unwrap(), // 1 and 5 removed, 3 and 7 remain. + Value::try_from(2).unwrap(), // 1 and 3 removed, 2 and 7 remain Value::try_from(7).unwrap(), ] .into_iter() .collect::>(); - let expected_dog = HashSet::new(); // all of the dog bindings are removed. + let expected_dog = HashSet::new(); // All dog bindings removed + + assert_eq!(cat_result_after_delete, expected_cat); + assert_eq!(dog_result_after_delete, expected_dog); + + // Test batch insert + let insert_entries = vec![( + "dog".to_string(), + vec![Value::try_from(8).unwrap(), Value::try_from(9).unwrap()], + )]; + + findex_batcher.batch_insert(insert_entries).await.unwrap(); + + // Verify insertions using normal findex + let new_dog_results = findex.search(&"dog".to_string()).await.unwrap(); + + let expected_dog = vec![Value::try_from(8).unwrap(), Value::try_from(9).unwrap()] + .into_iter() + .collect::>(); - assert_eq!(cat_result, expected_cat); - assert_eq!(dog_result, expected_dog); + assert_eq!(new_dog_results, expected_dog); } #[tokio::test] @@ -306,7 +274,7 @@ mod tests { .await .unwrap(); - let batcher_findex = FindexBatcher::::new( + let findex_batcher = FindexBatcher::::new( trivial_memory.clone(), dummy_encode, dummy_decode, @@ -315,7 +283,7 @@ mod tests { let key1 = "cat".to_string(); let key2 = "dog".to_string(); // Perform batch search - let batch_search_results = batcher_findex + let batch_search_results = findex_batcher .batch_search(vec![&key1, &key2]) .await .unwrap(); diff --git a/crate/findex/src/benches.rs b/crate/findex/src/benches.rs index d325605d..742ae2a3 100644 --- a/crate/findex/src/benches.rs +++ b/crate/findex/src/benches.rs @@ -3,10 +3,8 @@ //! to be generic and work with any memory back end that implements the //! MemoryADT trait. -use crate::{ - Findex, IndexADT, WORD_LENGTH, dummy_decode, dummy_encode, - encryption_layer::MemoryEncryptionLayer, -}; +use std::{collections::HashSet, fmt::Debug, sync::Arc}; + use agnostic_lite::{JoinHandle, RuntimeLite}; use cosmian_crypto_core::{Secret, reexport::rand_core::CryptoRngCore}; use cosmian_sse_memories::{ADDRESS_LENGTH, Address, MemoryADT}; diff --git a/crate/findex/src/error.rs b/crate/findex/src/error.rs index 1f432b02..46874441 100644 --- a/crate/findex/src/error.rs +++ b/crate/findex/src/error.rs @@ -1,8 +1,8 @@ +use std::fmt::{Debug, Display}; + #[cfg(feature = "batch")] pub use batch_findex_error::*; -use std::fmt::{Debug, Display}; - #[derive(Debug)] pub enum Error
{ Parsing(String), @@ -22,18 +22,23 @@ impl std::error::Error for Error
{} #[cfg(feature = "batch")] pub mod batch_findex_error { + use cosmian_sse_memories::{MemoryADT, MemoryBatcherError}; + use super::*; - use cosmian_sse_memories::{BatchingLayerError, MemoryADT}; #[derive(Debug)] - pub enum BatchFindexError { - BatchingLayer(BatchingLayerError), + pub enum BatchFindexError + where + ::Word: Debug, + { + BatchingLayer(MemoryBatcherError), Findex(Error), } impl Display for BatchFindexError where ::Address: Debug, + ::Word: Debug, { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { @@ -43,20 +48,28 @@ pub mod batch_findex_error { } } - impl From> for BatchFindexError { + impl From> for BatchFindexError + where + ::Word: Debug, + { fn from(e: Error) -> Self { Self::Findex(e) } } - impl From> for BatchFindexError { - fn from(e: BatchingLayerError) -> Self { + impl From> for BatchFindexError + where + ::Word: Debug, + { + fn from(e: MemoryBatcherError) -> Self { Self::BatchingLayer(e) } } - impl std::error::Error for BatchFindexError where - ::Address: Debug + impl std::error::Error for BatchFindexError + where + ::Address: Debug, + ::Word: Debug, { } } diff --git a/crate/findex/src/findex.rs b/crate/findex/src/findex.rs index 19c528e1..43bf0291 100644 --- a/crate/findex/src/findex.rs +++ b/crate/findex/src/findex.rs @@ -138,9 +138,8 @@ impl< #[cfg(test)] mod tests { - use crate::{ - Findex, IndexADT, dummy_decode, dummy_encode, encryption_layer::MemoryEncryptionLayer, - }; + use std::{collections::HashSet, sync::Arc}; + use cosmian_crypto_core::{CsRng, Secret, define_byte_type, reexport::rand_core::SeedableRng}; use cosmian_sse_memories::{ADDRESS_LENGTH, Address, InMemory}; use smol_macros::Executor; diff --git a/crate/findex/src/lib.rs b/crate/findex/src/lib.rs index 39a84f80..880ba320 100644 --- a/crate/findex/src/lib.rs +++ b/crate/findex/src/lib.rs @@ -17,12 +17,6 @@ mod benches; pub use adt::IndexADT; #[cfg(feature = "test-utils")] pub use benches::*; - -#[cfg(feature = "batch")] -mod batcher_findex; -#[cfg(feature = "batch")] -pub use batcher_findex::FindexBatcher; - pub use encoding::{ Decoder, Encoder, generic_encoding::{generic_decode, generic_encode}, @@ -36,6 +30,13 @@ pub use encryption_layer::{KEY_LENGTH, MemoryEncryptionLayer}; pub use error::Error; pub use findex::{Findex, Op}; +#[cfg(feature = "batch")] +mod batcher_findex; +#[cfg(feature = "batch")] +pub use adt::IndexBatcher; +#[cfg(feature = "batch")] +pub use batcher_findex::FindexBatcher; + #[cfg(feature = "test-utils")] pub mod reexport { // Re-exporting the most commonly used runtime interfaces for convenience. diff --git a/crate/findex/src/ovec.rs b/crate/findex/src/ovec.rs index 23eb840a..902599c1 100644 --- a/crate/findex/src/ovec.rs +++ b/crate/findex/src/ovec.rs @@ -218,8 +218,8 @@ mod tests { use cosmian_sse_memories::{ADDRESS_LENGTH, Address, InMemory}; use crate::{ + MemoryEncryptionLayer, adt::tests::{test_vector_concurrent, test_vector_sequential}, - encryption_layer::MemoryEncryptionLayer, ovec::IVec, }; diff --git a/crate/memories/src/batching_layer/buffer.rs b/crate/memories/src/batching_layer/buffer.rs new file mode 100644 index 00000000..2c50b513 --- /dev/null +++ b/crate/memories/src/batching_layer/buffer.rs @@ -0,0 +1,95 @@ +use std::{mem, sync::Mutex}; + +use crate::{ + BatchingMemoryADT, + batching_layer::operation::{Operation, PendingOperations}, +}; + +struct Buffer { + capacity: usize, // the size at which the buffer should be flushed + data: PendingOperations, +} + +impl Buffer { + /// Flushes the buffer if it contains data and returns the flushed + /// operations. Returns None if the buffer is empty. + fn flush_if_not_empty(&mut self) -> Option> { + if !self.data.is_empty() { + Some(mem::take(&mut self.data)) + } else { + None + } + } +} + +pub(crate) struct ThreadSafeBuffer(Mutex>); + +impl ThreadSafeBuffer +where + M: BatchingMemoryADT, +{ + pub(crate) fn new(capacity: usize) -> Self { + Self(Mutex::new(Buffer:: { + capacity, + data: Vec::with_capacity(capacity), + })) + } + + pub(crate) fn shrink_capacity(&self) -> Result>, BufferError> { + let mut buffer = self.0.lock()?; + if buffer.capacity == 0 { + return Err(BufferError::Underflow); + } + buffer.capacity -= 1; + Ok(buffer.flush_if_not_empty()) + } + + pub(crate) fn push( + &self, + item: Operation, + ) -> Result>, BufferError> { + let mut buffer = self.0.lock()?; + // Check if the new item is compatible with the last item, since the buffer is + // thread-safe, this ensures by transitivity that all items in the + // buffer are of the same type. + if let Some(last_item) = buffer.data.last() { + if mem::discriminant(last_item) != mem::discriminant(&item) { + return Err(BufferError::TypeMismatch); + } + } + buffer.data.push(item); + Ok(buffer.flush_if_not_empty()) + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum BufferError { + TypeMismatch, // when the type of the new item does not match the type of the last item + Overflow, + Underflow, + Mutex(String), +} + +impl From>>> + for BufferError +{ + fn from(e: std::sync::PoisonError>>) -> Self { + Self::Mutex(format!("Mutex poisoned: {}", e)) + } +} + +impl std::fmt::Display for BufferError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::TypeMismatch => write!( + f, + "Type mismatch: cannot mix read and write operations in the same buffer." + ), + Self::Overflow => write!(f, "Buffer overflow: cannot push below capacity."), + Self::Underflow => write!(f, "Buffer underflow: cannot shrink capacity below zero."), + Self::Mutex(msg) => write!(f, "Mutex error: {}", msg), + } + } +} + +impl std::error::Error for BufferError {} diff --git a/crate/memories/src/batching_layer/error.rs b/crate/memories/src/batching_layer/error.rs index 1bf1609c..124478c5 100644 --- a/crate/memories/src/batching_layer/error.rs +++ b/crate/memories/src/batching_layer/error.rs @@ -1,36 +1,54 @@ -use futures::channel::oneshot::Canceled; use std::fmt::{Debug, Display}; -use crate::MemoryADT; +use futures::channel::oneshot::Canceled; + +use crate::{ + MemoryADT, + batching_layer::{buffer::BufferError, operation::MemoryOutput}, +}; #[derive(Debug)] -pub enum BatchingLayerError { - Memory(M::Error), // the from will not be implemented due to conflicting implementations with Rust's `core` library. Use `map_err` instead of `?`. - Mutex(String), +pub enum MemoryBatcherError +where + M::Word: std::fmt::Debug, +{ + Memory(M::Error), /* the from will not be implemented due to conflicting + * implementations with Rust's `core` library. Use `map_err` instead of + * `?`. */ Channel(String), - BufferOverflow(String), - WrongOperation(String), + InternalBuffering(BufferError), + WrongResultType(MemoryOutput), } -impl Display for BatchingLayerError { +impl Display for MemoryBatcherError +where + M::Word: std::fmt::Debug, +{ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::Memory(err) => write!(f, "Memory error: {:?}", err), - Self::Mutex(msg) => write!(f, "Mutex error: {}", msg), Self::Channel(msg) => { write!(f, "Channel closed unexpectedly: {}", msg) } - Self::BufferOverflow(msg) => { - write!(f, "Buffer overflow: {}", msg) - } - Self::WrongOperation(msg) => { - write!(f, "Wrong operation: {}", msg) + Self::InternalBuffering(err) => write!(f, "Internal buffering error: {:?}", err), + Self::WrongResultType(out) => { + write!( + f, + "Wrong result type, expected {:?}", + match out { + MemoryOutput::Read(_) => "Read, got Write.", + MemoryOutput::Write(_) => "Write, got Read.", + } + ) } } } } -impl From for BatchingLayerError { +impl From for MemoryBatcherError +where + M::Word: std::fmt::Debug, +{ fn from(_: Canceled) -> Self { Self::Channel( "The sender was dropped before sending its results with the `send` function." @@ -39,10 +57,14 @@ impl From for BatchingLayerError { } } -impl From> for BatchingLayerError { - fn from(e: std::sync::PoisonError) -> Self { - Self::Mutex(format!("Mutex lock poisoned: {e}")) +impl From for MemoryBatcherError +where + M::Word: std::fmt::Debug, +{ + fn from(e: BufferError) -> Self { + Self::InternalBuffering(e) } } -impl std::error::Error for BatchingLayerError {} +impl std::error::Error for MemoryBatcherError where M::Word: std::fmt::Debug +{} diff --git a/crate/memories/src/batching_layer/memory.rs b/crate/memories/src/batching_layer/memory.rs index 787bd1bf..260212c8 100644 --- a/crate/memories/src/batching_layer/memory.rs +++ b/crate/memories/src/batching_layer/memory.rs @@ -1,203 +1,42 @@ -// ---------------------------------- BufferedMemory Structure ---------------------------------- -// It takes as inner memory any memory that implements the batcher ADT -// which is basically, having MemoryADT + The function batch_guarded_write +use std::{fmt::Debug, sync::Arc}; use futures::channel::oneshot; -use std::fmt::Debug; -use std::sync::{ - Arc, Mutex, - atomic::{AtomicUsize, Ordering}, -}; - -use crate::batching_layer::BatchingLayerError; -use crate::{BatchingMemoryADT, MemoryADT}; - -type ReadOperation = ( - Vec<::Address>, - oneshot::Sender::Word>>, ::Error>>, -); - -type WriteOperation = ( - ( - (::Address, Option<::Word>), - Vec<(::Address, ::Word)>, - ), - oneshot::Sender::Word>, ::Error>>, -); -#[allow(clippy::type_complexity)] // refactoring this type will make the code unnecessarily more difficult to read without any actual benefit -enum PendingOperations -where - M::Address: Clone, -{ - PendingReads(Mutex>>), - PendingWrites(Mutex>>), -} - -impl PendingOperations -where - M::Address: Clone, -{ - // Gets the lock of the buffer and returns its length; hence the name. - pub fn lock_and_get_len(&self) -> Result> { - Ok(match self { - Self::PendingReads(read_ops) => read_ops.lock()?.len(), - Self::PendingWrites(write_ops) => write_ops.lock()?.len(), - }) - } -} +use crate::{ + BatchingMemoryADT, MemoryADT, + batching_layer::{ + MemoryBatcherError, + buffer::ThreadSafeBuffer, + operation::{ + MemoryInput, MemoryOutput, Operation, OperationResultReceiver, PendingOperations, + }, + }, +}; -pub struct MemoryBatcher -where - M::Address: Clone, -{ - inner: M, // the actual memory layer that implements the actual network / memory call - capacity: AtomicUsize, // capacity at which the operation should be executed - buffer: PendingOperations, -} - -impl MemoryBatcher -where - ::Address: Clone, -{ - pub const fn new_reader(inner: M, capacity: AtomicUsize) -> Self { - Self { - inner, - capacity, - buffer: PendingOperations::PendingReads(Mutex::new(Vec::new())), - } - } - - pub const fn new_writer(inner: M, capacity: AtomicUsize) -> Self { - Self { - inner, - capacity, - buffer: PendingOperations::PendingWrites(Mutex::new(Vec::new())), - } - } - - // atomically decrement the buffer size, needed on inserts/deletes - pub async fn decrement_capacity(&self) -> Result<(), BatchingLayerError> { - // `fetch_sub` returns the previous value, so if it was 1, it means the buffer's job is done - let previous = self.capacity.fetch_sub(1, Ordering::SeqCst); - match &self.buffer { - PendingOperations::PendingReads(read_ops) => { - if previous <= read_ops.lock()?.len() { - let _ = self.flush().await; - } - } - PendingOperations::PendingWrites(write_ops) => { - if previous <= write_ops.lock()?.len() { - let _ = self.flush().await; - } - } - } - Ok(()) - } - - async fn flush(&self) -> Result<(), BatchingLayerError> { - if self.buffer.lock_and_get_len()? > self.capacity.load(Ordering::SeqCst) { - return Err(BatchingLayerError::BufferOverflow( - "The buffer vector's length is greater than the capacity, this should not happen." - .to_owned(), - )); - } - // check if the buffer is full - if self.buffer.lock_and_get_len()? == self.capacity.load(Ordering::SeqCst) { - match &self.buffer { - PendingOperations::PendingReads(read_ops) => { - let batches = std::mem::take(&mut *read_ops.lock()?); - - // Build combined address list while tracking which addresses belong to which batch - let all_addresses = batches - .iter() - .flat_map(|(addresses, _)| addresses.iter()) - .cloned() - .collect(); - - let mut aggregated_reads_results = self - .inner - .batch_read(all_addresses) - .await - .map_err(BatchingLayerError::::Memory)?; - - // Distribute results to each batch's sender - for (input_addresses, sender) in batches.into_iter().rev() { - let split_point = aggregated_reads_results.len() - input_addresses.len(); // This is the point where the last batch's results start - let batch_results = aggregated_reads_results.split_off(split_point); // After this call, all_results will be left containing the elements [0, split_point) - sender.send(Ok(batch_results)).map_err(|_| { - // Upon failure, the vector we tried to send is returned in the Err varient, but it's explicitly ignored here to not extract information. - BatchingLayerError::::Channel( - "The receiver end of this read operation was dropped before the `send` function could be called." - .to_owned(), - ) - })?; - } - } - PendingOperations::PendingWrites(write_ops) => { - let batches = std::mem::take(&mut *write_ops.lock()?); - - let (bindings, senders): (Vec<_>, Vec<_>) = batches.into_iter().unzip(); - - let aggregated_writes_results = self - .inner - .batch_guarded_write(bindings) - .await - .map_err(BatchingLayerError::::Memory)?; - - // Distribute results to each batch's sender - for (res, sender) in aggregated_writes_results.into_iter().zip(senders) { - sender.send(Ok(res)).map_err(|_| { - BatchingLayerError::::Channel( - "The receiver end of this write operation was dropped before the `send` function could be called." - .to_owned(), - ) - })?; - } - } - }; - } - Ok(()) - } +pub struct MemoryBatcher { + pub inner: M, // The actual memory that does the R/W operations. + buffer: Arc>, // The buffer that holds the operations to be batched. } impl MemoryADT for MemoryBatcher where - M::Address: Clone + Send, - M::Word: Send, + M::Address: Clone, + M::Word: std::fmt::Debug, { type Address = M::Address; + type Error = MemoryBatcherError; type Word = M::Word; - type Error = BatchingLayerError; async fn batch_read( &self, addresses: Vec, ) -> Result>, Self::Error> { - match &self.buffer { - PendingOperations::PendingWrites(_) => Err(BatchingLayerError::WrongOperation( - "`batch_read` is called on a writer MemoryBatcher, make sure to use `new_reader` during initialization.".to_owned() - )), - PendingOperations::PendingReads(read_ops) => { - // Create a channel for this batch. - let (sender, receiver) = oneshot::channel(); - - // Add to pending batches. - { - let mut pending = read_ops.lock()?; - pending.push((addresses, sender)); + let res = self.apply(MemoryInput::Read(addresses)).await?; - // Determine if we should flush. - } - - // Each thread tries to flush but only one will succeed and empty the buffer. - self.flush().await?; - - // Wait for results. - receiver - .await? - .map_err(|e| BatchingLayerError::::Memory(e)) - } + if let MemoryOutput::Read(words) = res { + Ok(words) + } else { + Err(MemoryBatcherError::WrongResultType(res)) } } @@ -206,38 +45,25 @@ where guard: (Self::Address, Option), bindings: Vec<(Self::Address, Self::Word)>, ) -> Result, Self::Error> { - match &self.buffer { - PendingOperations::PendingReads(_) => Err(BatchingLayerError::WrongOperation( - "`guarded_write` is called on a reader MemoryBatcher, make sure to use `new_writer` during initialization.".to_owned() - )), - PendingOperations::PendingWrites(write_ops) => { - let (sender, receiver) = oneshot::channel(); - - { - let mut pending = write_ops.lock()?; - pending.push(((guard, bindings), sender)); - } - - self.flush().await?; + let res = self.apply(MemoryInput::Write((guard, bindings))).await?; - receiver - .await? - .map_err(|e| BatchingLayerError::::Memory(e)) - } + if let MemoryOutput::Write(word) = res { + Ok(word) + } else { + Err(MemoryBatcherError::WrongResultType(res)) } } } -// This simply forwards the BR/GW calls to the inner memory -// when findex instances (below) call the batcher's operations +// Forward the BR/GW calls on Arcs to their actual implementations. impl MemoryADT for Arc> where M::Address: Send + Clone, - M::Word: Send, + M::Word: Send + std::fmt::Debug, { type Address = M::Address; + type Error = MemoryBatcherError; type Word = M::Word; - type Error = BatchingLayerError; async fn batch_read( &self, @@ -254,3 +80,138 @@ where (**self).guarded_write(guard, bindings).await } } + +impl MemoryBatcher +where + M::Address: Clone + Send, + M::Word: Send + std::fmt::Debug, +{ + pub fn new(inner: M, n: usize) -> Self { + if n == 0 { + panic!("Buffer capacity must be greater than zero."); + }; + Self { + inner, + buffer: Arc::new(ThreadSafeBuffer::new(n)), + } + } + + pub async fn unsubscribe(&self) -> Result<(), MemoryBatcherError> { + if let Some(ops) = self.buffer.shrink_capacity()? { + self.manage(ops).await?; + } + Ok(()) + } + + async fn apply(&self, op: MemoryInput) -> Result, MemoryBatcherError> { + let (operation, receiver) = match op { + MemoryInput::Read(addresses) => { + let (sender, receiver) = oneshot::channel(); + ( + Operation::Read((addresses, sender)), + OperationResultReceiver::::Read(receiver), + ) + } + MemoryInput::Write((guard, bindings)) => { + let (sender, receiver) = oneshot::channel(); + ( + Operation::Write(((guard, bindings), sender)), + OperationResultReceiver::::Write(receiver), + ) + } + }; + + if let Some(ops) = self.buffer.push(operation)? { + self.manage(ops).await?; + } + + Ok(match receiver { + OperationResultReceiver::Read(receiver) => { + let result = receiver.await?.map_err(MemoryBatcherError::Memory)?; + MemoryOutput::Read(result) + } + OperationResultReceiver::Write(receiver) => { + let result = receiver.await?.map_err(MemoryBatcherError::Memory)?; + MemoryOutput::Write(result) + } + }) + } + + async fn manage(&self, ops: PendingOperations) -> Result<(), MemoryBatcherError> { + // Assumes the vector is homogeneous, i.e. all operations are of the same type. + // This should be guaranteed by the buffer. + match ops[0] { + Operation::Read(_) => { + // Build combined address list while tracking which addresses belong to which + // batch. + let all_addresses: Vec<_> = ops + .iter() + .flat_map(|op| match op { + Operation::Read((addresses, _)) => addresses.clone(), + _ => unreachable!( + "Expected all operations to be reads, reaching this statement means \ + the buffer has implementation flaws at the push level." + ), + }) + .collect(); + + let mut aggregated_reads_results = self + .inner + .batch_read(all_addresses) + .await + .map_err(MemoryBatcherError::Memory)?; + + // Distribute results to each batch's sender. + for (input_addresses, sender) in ops + .into_iter() + .map(|op| match op { + Operation::Read((addresses, sender)) => (addresses, sender), + _ => unreachable!( + "Expected all operations to be reads, reaching this statement means \ + the buffer has implementation flaws at the push level." + ), + }) + .rev() + { + let split_point = aggregated_reads_results.len() - input_addresses.len(); // This is the point where the last batch's results start. + let batch_results = aggregated_reads_results.split_off(split_point); // After this call, all_results will be left containing the elements [0, split_point). + sender.send(Ok(batch_results)).map_err(|_| { + // Upon failure, the vector we tried to send is returned in the Err variant, + // but it's explicitly ignored here to not extract information. + MemoryBatcherError::::Channel( + "The receiver end of this read operation was dropped before the \ + `send` function could be called." + .to_owned(), + ) + })?; + } + } + Operation::Write(_) => { + let (bindings, senders): (Vec<_>, Vec<_>) = ops + .into_iter() + .map(|op| match op { + Operation::Write((bindings, sender)) => (bindings, sender), + _ => unreachable!(), + }) + .unzip(); + + let aggregated_writes_results = self + .inner + .batch_guarded_write(bindings) + .await + .map_err(MemoryBatcherError::Memory)?; + + for (res, sender) in aggregated_writes_results.into_iter().zip(senders) { + sender.send(Ok(res)).map_err(|_| { + MemoryBatcherError::::Channel( + "The receiver end of this write operation was dropped before the \ + `send` function could be called." + .to_owned(), + ) + })?; + } + } + }; + Ok(()) + } +} diff --git a/crate/memories/src/batching_layer/mod.rs b/crate/memories/src/batching_layer/mod.rs index a34f92a3..30958941 100644 --- a/crate/memories/src/batching_layer/mod.rs +++ b/crate/memories/src/batching_layer/mod.rs @@ -1,5 +1,7 @@ +mod buffer; mod error; mod memory; +mod operation; -pub use error::BatchingLayerError; +pub use error::MemoryBatcherError; pub use memory::MemoryBatcher; diff --git a/crate/memories/src/batching_layer/operation.rs b/crate/memories/src/batching_layer/operation.rs new file mode 100644 index 00000000..2a0134fd --- /dev/null +++ b/crate/memories/src/batching_layer/operation.rs @@ -0,0 +1,62 @@ +//! This module strongly types and defines the variables that are used within +//! the batching layer. It adds clear distinction between : +//! - And `input`(resp `output`) type, which is the type that the memory backend +//! accepts (resp returns) via its MemoryADT implementation. +//! - An operation type, which is a pair of an input and a oneshot channel. An +//! operation is considered *pending* starting the moment it is pushed to the +//! buffer, and to each operation corresponds exactly one result consisting of +//! an output that can be retrieved from the oneshot channel (or otherwise an +//! error). +use futures::channel::oneshot; + +use crate::{BatchingMemoryADT, MemoryADT}; + +pub(crate) type BatchReadInput = Vec<::Address>; +pub(crate) type GuardedWriteInput = ( + (::Address, Option<::Word>), + Vec<(::Address, ::Word)>, +); + +// Notice: to avoid breaking changes, the MemoryADT I/O types are kept here for +// now. If a major release is planned, consider moving them to the MemoryADT +// module. +pub(crate) enum MemoryInput { + Read(BatchReadInput), + Write(GuardedWriteInput), +} + +pub(crate) type BatchReadOutput = Vec::Word>>; +pub(crate) type GuardedWriteOutput = Option<::Word>; + +#[derive(Debug)] +pub enum MemoryOutput +where + M::Word: std::fmt::Debug, +{ + Read(BatchReadOutput), + Write(GuardedWriteOutput), +} + +pub(crate) type ReadOperation = ( + BatchReadInput, + oneshot::Sender, ::Error>>, +); + +pub(crate) type WriteOperation = ( + GuardedWriteInput, + oneshot::Sender, ::Error>>, +); + +pub(crate) enum Operation { + Read(ReadOperation), + Write(WriteOperation), +} + +pub(crate) type PendingOperations = Vec>; + +// Match arms do not support heterogeneous types, this enum is the only way to +// escape a 2 branch `apply` function and the code duplication that would imply. +pub(crate) enum OperationResultReceiver { + Read(oneshot::Receiver, ::Error>>), + Write(oneshot::Receiver, ::Error>>), +} diff --git a/crate/memories/src/lib.rs b/crate/memories/src/lib.rs index 62f3a008..3cf194e2 100644 --- a/crate/memories/src/lib.rs +++ b/crate/memories/src/lib.rs @@ -59,7 +59,7 @@ pub trait MemoryADT { mod batching_layer; #[cfg(feature = "batch")] -pub use batching_layer::{BatchingLayerError, MemoryBatcher}; +pub use batching_layer::{MemoryBatcher, MemoryBatcherError}; // Super trait for MemoryADT that allows doing write operations in batches. #[cfg(feature = "batch")] From 8bb88a8e9c86a9573b44d719e263410ac13e1700 Mon Sep 17 00:00:00 2001 From: HatemMn <19950216+HatemMn@users.noreply.github.com> Date: Fri, 7 Nov 2025 18:06:47 +0100 Subject: [PATCH 04/10] fix: WIP on tbz-review/feat/batch_findex --- Cargo.toml | 2 +- crate/findex/Cargo.toml | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index fdde75e1..d0aae745 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ members = ["crate/findex", "crate/memories"] resolver = "2" [workspace.package] -version = "8.0.2" +version = "8.1.0" authors = [ "Bruno Grieder ", "Célia Corsin ", diff --git a/crate/findex/Cargo.toml b/crate/findex/Cargo.toml index 51f02cff..5cc351f6 100644 --- a/crate/findex/Cargo.toml +++ b/crate/findex/Cargo.toml @@ -21,7 +21,7 @@ test-utils = ["agnostic-lite", "criterion"] [dependencies] aes = "0.8" cosmian_crypto_core.workspace = true -cosmian_sse_memories = { path = "../memories", version = "8.0.2" } # Should be above 8.0.2 to have batching features. +cosmian_sse_memories = { path = "../memories", version = "8.1.0" } xts-mode = "0.5" futures.workspace = true # Optional dependencies for testing and benchmarking. @@ -34,7 +34,7 @@ criterion = { workspace = true, optional = true } [dev-dependencies] agnostic-lite = { workspace = true, features = ["tokio"] } -cosmian_sse_memories = { path = "../memories", version = "8.0.2", features = [ +cosmian_sse_memories = { path = "../memories", version = "8.1.0", features = [ "redis-mem", "sqlite-mem", "postgres-mem", From a1b2b25e0c4a7aa09474f4df58e711dc9b090436 Mon Sep 17 00:00:00 2001 From: HatemMn <19950216+HatemMn@users.noreply.github.com> Date: Wed, 12 Nov 2025 10:21:51 +0100 Subject: [PATCH 05/10] fix: wip --- crate/findex/src/adt.rs | 25 ++++++++++++++++--------- crate/findex/src/batcher_findex.rs | 29 +++++++++++++++++++---------- 2 files changed, 35 insertions(+), 19 deletions(-) diff --git a/crate/findex/src/adt.rs b/crate/findex/src/adt.rs index e4746317..a743b07b 100644 --- a/crate/findex/src/adt.rs +++ b/crate/findex/src/adt.rs @@ -51,9 +51,7 @@ pub trait VectorADT: Send { fn read(&self) -> impl Send + Future, Self::Error>>; } -/// This trait extends the functionality of the standard `IndexADT` by -/// providing methods that operate on multiple keywords or entries -/// simultaneously. +/// This trait provides methods that let an index operate on multiple keywords or entries simultaneously. #[cfg(feature = "batch")] pub trait IndexBatcher { type Error: std::error::Error; @@ -65,16 +63,25 @@ pub trait IndexBatcher { ) -> impl Future>, Self::Error>>; /// Binds each value to their associated keyword in this index. - fn batch_insert( + fn batch_insert( &self, - entries: Vec<(Keyword, impl Sync + Send + IntoIterator)>, - ) -> impl Send + Future>; + entries: Entries, + ) -> impl Send + Future> + where + Values: Sync + Send + IntoIterator, + Entries: Send + IntoIterator, + Entries::IntoIter: ExactSizeIterator, + ::IntoIter: Send; /// Removes the given values from the index. - fn batch_delete( + fn batch_delete( &self, - entries: Vec<(Keyword, impl Sync + Send + IntoIterator)>, - ) -> impl Send + Future>; + entries: Entries, + ) -> impl Send + Future> + where + Values: Sync + Send + IntoIterator, + Entries: Send + IntoIterator, + Entries::IntoIter: ExactSizeIterator + Send; } #[cfg(test)] diff --git a/crate/findex/src/batcher_findex.rs b/crate/findex/src/batcher_findex.rs index ca531622..801b3c22 100644 --- a/crate/findex/src/batcher_findex.rs +++ b/crate/findex/src/batcher_findex.rs @@ -41,15 +41,20 @@ impl< // Both insert and delete operations make an unbounded number of calls to // `guarded_write` on the memory layer. - async fn batch_insert_or_delete( + async fn batch_insert_or_delete( &self, - entries: Vec<(Keyword, impl Send + IntoIterator)>, + entries: Entries, is_insert: bool, ) -> Result<(), BatchFindexError> where Keyword: Send + Sync + Hash + Eq, + Bindings: Send + IntoIterator, + Entries: IntoIterator + Send, + Entries::IntoIter: ExactSizeIterator, { let mut futures = Vec::new(); + + let entries = entries.into_iter(); let memory = Arc::new(MemoryBatcher::new(self.memory.clone(), entries.len())); for (guard_keyword, bindings) in entries { @@ -96,17 +101,21 @@ impl< { type Error = BatchFindexError; - async fn batch_insert( - &self, - entries: Vec<(Keyword, impl Sync + Send + IntoIterator)>, - ) -> Result<(), Self::Error> { + async fn batch_insert(&self, entries: Entries) -> Result<(), Self::Error> + where + Bindings: Sync + Send + IntoIterator, + Entries: Send + IntoIterator, + Entries::IntoIter: ExactSizeIterator, + { self.batch_insert_or_delete(entries, true).await } - async fn batch_delete( - &self, - entries: Vec<(Keyword, impl Sync + Send + IntoIterator)>, - ) -> Result<(), Self::Error> { + async fn batch_delete(&self, entries: Entries) -> Result<(), Self::Error> + where + Bindings: Sync + Send + IntoIterator, + Entries: Send + IntoIterator, + Entries::IntoIter: ExactSizeIterator, + { self.batch_insert_or_delete(entries, false).await } From d9b0ad07261f728cc41b29bb7354dfbfc6f169bb Mon Sep 17 00:00:00 2001 From: HatemMn <19950216+HatemMn@users.noreply.github.com> Date: Wed, 12 Nov 2025 15:57:24 +0100 Subject: [PATCH 06/10] fix: WIP advancing --- crate/findex/src/adt.rs | 4 +- crate/findex/src/batcher_findex.rs | 308 ------------------ crate/findex/src/error.rs | 2 + crate/findex/src/lib.rs | 4 +- crate/memories/src/batching_layer/buffer.rs | 33 +- crate/memories/src/batching_layer/memory.rs | 9 +- .../src/databases/postgresql_mem/memory.rs | 4 +- crate/memories/src/lib.rs | 8 +- 8 files changed, 31 insertions(+), 341 deletions(-) delete mode 100644 crate/findex/src/batcher_findex.rs diff --git a/crate/findex/src/adt.rs b/crate/findex/src/adt.rs index a743b07b..5d46fbca 100644 --- a/crate/findex/src/adt.rs +++ b/crate/findex/src/adt.rs @@ -68,7 +68,7 @@ pub trait IndexBatcher { entries: Entries, ) -> impl Send + Future> where - Values: Sync + Send + IntoIterator, + Values: Send + IntoIterator, Entries: Send + IntoIterator, Entries::IntoIter: ExactSizeIterator, ::IntoIter: Send; @@ -79,7 +79,7 @@ pub trait IndexBatcher { entries: Entries, ) -> impl Send + Future> where - Values: Sync + Send + IntoIterator, + Values: Send + IntoIterator, Entries: Send + IntoIterator, Entries::IntoIter: ExactSizeIterator + Send; } diff --git a/crate/findex/src/batcher_findex.rs b/crate/findex/src/batcher_findex.rs deleted file mode 100644 index 801b3c22..00000000 --- a/crate/findex/src/batcher_findex.rs +++ /dev/null @@ -1,308 +0,0 @@ -use std::{collections::HashSet, fmt::Debug, hash::Hash, sync::Arc}; - -use cosmian_sse_memories::{ADDRESS_LENGTH, Address, BatchingMemoryADT, MemoryBatcher}; - -use crate::{Decoder, Encoder, Findex, IndexADT, adt::IndexBatcher, error::BatchFindexError}; - -#[derive(Debug)] -pub struct FindexBatcher< - const WORD_LENGTH: usize, - Value: Send + Hash + Eq, - EncodingError: Send + Debug, - BatcherMemory: Clone + Send + BatchingMemoryADT
, Word = [u8; WORD_LENGTH]>, -> { - memory: BatcherMemory, - encode: Encoder, - decode: Decoder, -} - -impl< - const WORD_LENGTH: usize, - Value: Send + Hash + Eq, - BatcherMemory: Debug - + Send - + Sync - + Clone - + BatchingMemoryADT
, Word = [u8; WORD_LENGTH]>, - EncodingError: Send + Debug, -> FindexBatcher -{ - pub fn new( - memory: BatcherMemory, - encode: Encoder, - decode: Decoder, - ) -> Self { - Self { - memory, - encode, - decode, - } - } - - // Both insert and delete operations make an unbounded number of calls to - // `guarded_write` on the memory layer. - async fn batch_insert_or_delete( - &self, - entries: Entries, - is_insert: bool, - ) -> Result<(), BatchFindexError> - where - Keyword: Send + Sync + Hash + Eq, - Bindings: Send + IntoIterator, - Entries: IntoIterator + Send, - Entries::IntoIter: ExactSizeIterator, - { - let mut futures = Vec::new(); - - let entries = entries.into_iter(); - let memory = Arc::new(MemoryBatcher::new(self.memory.clone(), entries.len())); - - for (guard_keyword, bindings) in entries { - let memory = memory.clone(); - // Create a temporary Findex instance using the shared batching layer. - let findex = Findex::::new( - memory.clone(), - self.encode, - self.decode, - ); - - let future = async move { - if is_insert { - findex.insert(guard_keyword, bindings).await - } else { - findex.delete(guard_keyword, bindings).await - }?; - // Once one of the operations succeeds, we should make the buffer smaller. - memory.unsubscribe().await?; - Ok::<_, BatchFindexError<_>>(()) - }; - - futures.push(future); - } - - // Execute all futures concurrently and collect results. - futures::future::try_join_all(futures).await?; - - Ok(()) - } -} - -impl< - const WORD_LENGTH: usize, - Keyword: Send + Sync + Hash + Eq, - Value: Send + Hash + Eq, - EncodingError: Send + Debug, - BatcherMemory: Debug - + Send - + Sync - + Clone - + BatchingMemoryADT
, Word = [u8; WORD_LENGTH]>, -> IndexBatcher for FindexBatcher -{ - type Error = BatchFindexError; - - async fn batch_insert(&self, entries: Entries) -> Result<(), Self::Error> - where - Bindings: Sync + Send + IntoIterator, - Entries: Send + IntoIterator, - Entries::IntoIter: ExactSizeIterator, - { - self.batch_insert_or_delete(entries, true).await - } - - async fn batch_delete(&self, entries: Entries) -> Result<(), Self::Error> - where - Bindings: Sync + Send + IntoIterator, - Entries: Send + IntoIterator, - Entries::IntoIter: ExactSizeIterator, - { - self.batch_insert_or_delete(entries, false).await - } - - async fn batch_search( - &self, - keywords: Vec<&Keyword>, - ) -> Result>, Self::Error> { - let mut futures = Vec::new(); - let memory = Arc::new(MemoryBatcher::new(self.memory.clone(), keywords.len())); - - for keyword in keywords { - let memory = memory.clone(); - let findex = Findex::::new( - memory, - self.encode, - self.decode, - ); - - let future = async move { findex.search(keyword).await }; - futures.push(future); - } - - futures::future::try_join_all(futures) - .await - .map_err(|e| BatchFindexError::Findex(e)) - } -} - -// The underlying tests assume the existence of a `Findex` implementation that -// is correct The testing strategy for each function is to use the `Findex` -// implementation to perform the same operations and compare the results with -// the `BatcherFindex` implementation. -#[cfg(test)] -mod tests { - use std::collections::HashSet; - - use cosmian_crypto_core::define_byte_type; - use cosmian_sse_memories::{ADDRESS_LENGTH, InMemory}; - - use super::*; - use crate::{Findex, IndexADT, dummy_decode, dummy_encode}; - - type Value = Bytes<8>; - define_byte_type!(Bytes); - - impl TryFrom for Bytes { - type Error = String; - - fn try_from(value: usize) -> Result { - Self::try_from(value.to_be_bytes().as_slice()).map_err(|e| e.to_string()) - } - } - - const WORD_LENGTH: usize = 16; - - #[tokio::test] - async fn test_batch_insert_and_delete() { - let trivial_memory = InMemory::, [u8; WORD_LENGTH]>::default(); - - // Initial data for insertion - let cat_bindings = vec![ - Value::try_from(1).unwrap(), - Value::try_from(2).unwrap(), - Value::try_from(3).unwrap(), - Value::try_from(7).unwrap(), - ]; - let dog_bindings = vec![ - Value::try_from(4).unwrap(), - Value::try_from(5).unwrap(), - Value::try_from(6).unwrap(), - ]; - - // Insert using normal findex - let findex = Findex::new( - trivial_memory.clone(), - dummy_encode::, - dummy_decode, - ); - - findex - .insert("cat".to_string(), cat_bindings.clone()) - .await - .unwrap(); - findex - .insert("dog".to_string(), dog_bindings.clone()) - .await - .unwrap(); - - // Create a `findex_batcher` instance - let findex_batcher = FindexBatcher::::new( - trivial_memory.clone(), - dummy_encode, - dummy_decode, - ); - - // Test batch delete - let deletion_entries = vec![ - ( - "cat".to_string(), - vec![Value::try_from(1).unwrap(), Value::try_from(3).unwrap()], // Partial deletion - ), - ("dog".to_string(), dog_bindings), // Complete deletion - ]; - - findex_batcher.batch_delete(deletion_entries).await.unwrap(); - - // Verify deletions using normal findex - let cat_result_after_delete = findex.search(&"cat".to_string()).await.unwrap(); - let dog_result_after_delete = findex.search(&"dog".to_string()).await.unwrap(); - - let expected_cat = vec![ - Value::try_from(2).unwrap(), // 1 and 3 removed, 2 and 7 remain - Value::try_from(7).unwrap(), - ] - .into_iter() - .collect::>(); - let expected_dog = HashSet::new(); // All dog bindings removed - - assert_eq!(cat_result_after_delete, expected_cat); - assert_eq!(dog_result_after_delete, expected_dog); - - // Test batch insert - let insert_entries = vec![( - "dog".to_string(), - vec![Value::try_from(8).unwrap(), Value::try_from(9).unwrap()], - )]; - - findex_batcher.batch_insert(insert_entries).await.unwrap(); - - // Verify insertions using normal findex - let new_dog_results = findex.search(&"dog".to_string()).await.unwrap(); - - let expected_dog = vec![Value::try_from(8).unwrap(), Value::try_from(9).unwrap()] - .into_iter() - .collect::>(); - - assert_eq!(new_dog_results, expected_dog); - } - - #[tokio::test] - async fn test_batch_search() { - let trivial_memory = InMemory::, [u8; WORD_LENGTH]>::default(); - - let findex = Findex::new( - trivial_memory.clone(), - dummy_encode::, - dummy_decode, - ); - let cat_bindings = [ - Value::try_from(1).unwrap(), - Value::try_from(3).unwrap(), - Value::try_from(5).unwrap(), - ]; - let dog_bindings = [ - Value::try_from(0).unwrap(), - Value::try_from(2).unwrap(), - Value::try_from(4).unwrap(), - ]; - findex - .insert("cat".to_string(), cat_bindings.clone()) - .await - .unwrap(); - findex - .insert("dog".to_string(), dog_bindings.clone()) - .await - .unwrap(); - - let findex_batcher = FindexBatcher::::new( - trivial_memory.clone(), - dummy_encode, - dummy_decode, - ); - - let key1 = "cat".to_string(); - let key2 = "dog".to_string(); - // Perform batch search - let batch_search_results = findex_batcher - .batch_search(vec![&key1, &key2]) - .await - .unwrap(); - - assert_eq!( - batch_search_results, - vec![ - cat_bindings.iter().cloned().collect::>(), - dog_bindings.iter().cloned().collect::>() - ] - ); - } -} diff --git a/crate/findex/src/error.rs b/crate/findex/src/error.rs index 46874441..123e2c2e 100644 --- a/crate/findex/src/error.rs +++ b/crate/findex/src/error.rs @@ -33,6 +33,7 @@ pub mod batch_findex_error { { BatchingLayer(MemoryBatcherError), Findex(Error), + Other(String), } impl Display for BatchFindexError @@ -44,6 +45,7 @@ pub mod batch_findex_error { match self { Self::BatchingLayer(e) => write!(f, "Batching layer error: {e}"), Self::Findex(error) => write!(f, "Findex error: {error:?}"), + Self::Other(msg) => write!(f, "{msg}"), } } } diff --git a/crate/findex/src/lib.rs b/crate/findex/src/lib.rs index 880ba320..19924690 100644 --- a/crate/findex/src/lib.rs +++ b/crate/findex/src/lib.rs @@ -31,11 +31,11 @@ pub use error::Error; pub use findex::{Findex, Op}; #[cfg(feature = "batch")] -mod batcher_findex; +mod findex_batcher; #[cfg(feature = "batch")] pub use adt::IndexBatcher; #[cfg(feature = "batch")] -pub use batcher_findex::FindexBatcher; +pub use findex_batcher::FindexBatcher; #[cfg(feature = "test-utils")] pub mod reexport { diff --git a/crate/memories/src/batching_layer/buffer.rs b/crate/memories/src/batching_layer/buffer.rs index 2c50b513..c628b85b 100644 --- a/crate/memories/src/batching_layer/buffer.rs +++ b/crate/memories/src/batching_layer/buffer.rs @@ -1,4 +1,8 @@ -use std::{mem, sync::Mutex}; +use std::{ + mem, + num::{NonZero, NonZeroUsize}, + sync::Mutex, +}; use crate::{ BatchingMemoryADT, @@ -6,7 +10,7 @@ use crate::{ }; struct Buffer { - capacity: usize, // the size at which the buffer should be flushed + capacity: NonZeroUsize, data: PendingOperations, } @@ -28,19 +32,20 @@ impl ThreadSafeBuffer where M: BatchingMemoryADT, { - pub(crate) fn new(capacity: usize) -> Self { + pub(crate) fn new(capacity: NonZeroUsize) -> Self { Self(Mutex::new(Buffer:: { capacity, - data: Vec::with_capacity(capacity), + data: Vec::with_capacity(capacity.into()), })) } pub(crate) fn shrink_capacity(&self) -> Result>, BufferError> { - let mut buffer = self.0.lock()?; - if buffer.capacity == 0 { - return Err(BufferError::Underflow); + let mut buffer = self.0.lock().expect("poisoned lock"); + if buffer.capacity == NonZero::new(1).unwrap() { + return Ok(buffer.flush_if_not_empty()); } - buffer.capacity -= 1; + buffer.capacity = + NonZero::new(buffer.capacity.get() - 1).expect("buffer capacity should not reach zero"); Ok(buffer.flush_if_not_empty()) } @@ -48,7 +53,7 @@ where &self, item: Operation, ) -> Result>, BufferError> { - let mut buffer = self.0.lock()?; + let mut buffer = self.0.lock().expect("poisoned lock"); // Check if the new item is compatible with the last item, since the buffer is // thread-safe, this ensures by transitivity that all items in the // buffer are of the same type. @@ -67,15 +72,6 @@ pub enum BufferError { TypeMismatch, // when the type of the new item does not match the type of the last item Overflow, Underflow, - Mutex(String), -} - -impl From>>> - for BufferError -{ - fn from(e: std::sync::PoisonError>>) -> Self { - Self::Mutex(format!("Mutex poisoned: {}", e)) - } } impl std::fmt::Display for BufferError { @@ -87,7 +83,6 @@ impl std::fmt::Display for BufferError { ), Self::Overflow => write!(f, "Buffer overflow: cannot push below capacity."), Self::Underflow => write!(f, "Buffer underflow: cannot shrink capacity below zero."), - Self::Mutex(msg) => write!(f, "Mutex error: {}", msg), } } } diff --git a/crate/memories/src/batching_layer/memory.rs b/crate/memories/src/batching_layer/memory.rs index 260212c8..666cd288 100644 --- a/crate/memories/src/batching_layer/memory.rs +++ b/crate/memories/src/batching_layer/memory.rs @@ -1,4 +1,4 @@ -use std::{fmt::Debug, sync::Arc}; +use std::{fmt::Debug, num::NonZeroUsize, sync::Arc}; use futures::channel::oneshot; @@ -86,13 +86,10 @@ where M::Address: Clone + Send, M::Word: Send + std::fmt::Debug, { - pub fn new(inner: M, n: usize) -> Self { - if n == 0 { - panic!("Buffer capacity must be greater than zero."); - }; + pub fn new(inner: M, capacity: NonZeroUsize) -> Self { Self { inner, - buffer: Arc::new(ThreadSafeBuffer::new(n)), + buffer: Arc::new(ThreadSafeBuffer::new(capacity)), } } diff --git a/crate/memories/src/databases/postgresql_mem/memory.rs b/crate/memories/src/databases/postgresql_mem/memory.rs index 61399d27..b9d8f35a 100644 --- a/crate/memories/src/databases/postgresql_mem/memory.rs +++ b/crate/memories/src/databases/postgresql_mem/memory.rs @@ -205,6 +205,8 @@ impl MemoryADT #[cfg(test)] mod tests { + use std::future::Future; + use deadpool_postgres::Config; use tokio_postgres::NoTls; @@ -235,7 +237,7 @@ mod tests { ) -> Result<(), PostgresMemoryError> where F: FnOnce(PostgresMemory, [u8; 129]>) -> Fut + Send, - Fut: std::future::Future + Send, + Fut: Future + Send, { let test_pool = create_testing_pool(DB_URL).await.unwrap(); let m = PostgresMemory::new_with_pool(test_pool.clone(), table_name.to_string()).await; diff --git a/crate/memories/src/lib.rs b/crate/memories/src/lib.rs index 3cf194e2..d86e19b7 100644 --- a/crate/memories/src/lib.rs +++ b/crate/memories/src/lib.rs @@ -2,6 +2,8 @@ mod address; mod databases; mod in_memory; +use std::future::Future; + pub use address::Address; #[cfg(feature = "postgres-mem")] pub use databases::postgresql_mem::{PostgresMemory, PostgresMemoryError}; @@ -44,7 +46,7 @@ pub trait MemoryADT { fn batch_read( &self, addresses: Vec, - ) -> impl Send + std::future::Future>, Self::Error>>; + ) -> impl Send + Future>, Self::Error>>; /// Write the given bindings if the word currently stored at the guard /// address is the guard word, and returns this word. @@ -52,7 +54,7 @@ pub trait MemoryADT { &self, guard: (Self::Address, Option), bindings: Vec<(Self::Address, Self::Word)>, - ) -> impl Send + std::future::Future, Self::Error>>; + ) -> impl Send + Future, Self::Error>>; } #[cfg(feature = "batch")] @@ -71,5 +73,5 @@ pub trait BatchingMemoryADT: MemoryADT { (Self::Address, Option), Vec<(Self::Address, Self::Word)>, )>, - ) -> impl Send + std::future::Future>, Self::Error>>; + ) -> impl Send + Future>, Self::Error>>; } From 90ad992a19c8f754c71ea50e41613da5bc5211b3 Mon Sep 17 00:00:00 2001 From: HatemMn <19950216+HatemMn@users.noreply.github.com> Date: Wed, 12 Nov 2025 18:40:37 +0100 Subject: [PATCH 07/10] fix: last rev fixes --- crate/findex/README.md | 2 +- crate/findex/src/findex_batcher.rs | 317 ++++++++++++++++++ crate/memories/src/batching_layer/buffer.rs | 15 +- crate/memories/src/batching_layer/error.rs | 20 +- crate/memories/src/batching_layer/memory.rs | 99 +++--- crate/memories/src/batching_layer/mod.rs | 2 + .../memories/src/batching_layer/operation.rs | 4 +- crate/memories/src/in_memory.rs | 23 +- crate/memories/src/lib.rs | 8 +- 9 files changed, 392 insertions(+), 98 deletions(-) create mode 100644 crate/findex/src/findex_batcher.rs diff --git a/crate/findex/README.md b/crate/findex/README.md index 36b8ade7..fa9bad90 100644 --- a/crate/findex/README.md +++ b/crate/findex/README.md @@ -2,7 +2,7 @@ This crate provides the core functionality of Findex, defining the abstract data types, cryptographic operations, and encoding algorithms. -Findex also supports batching operations into a singe call to the memory interface, which reduces connection overhead and avoids file descriptor limits on some Linux systems. +Supports batching operations into a singe call to the memory interface, which reduces connection overhead and avoids file descriptor limits on some Linux systems. ## Setup diff --git a/crate/findex/src/findex_batcher.rs b/crate/findex/src/findex_batcher.rs new file mode 100644 index 00000000..fe4e6794 --- /dev/null +++ b/crate/findex/src/findex_batcher.rs @@ -0,0 +1,317 @@ +use std::{collections::HashSet, fmt::Debug, hash::Hash, num::NonZero}; + +use cosmian_sse_memories::{ADDRESS_LENGTH, Address, BatchingMemoryADT, MemoryBatcher}; + +use crate::{Decoder, Encoder, Findex, IndexADT, adt::IndexBatcher, error::BatchFindexError}; + +#[derive(Debug)] +pub struct FindexBatcher< + const WORD_LENGTH: usize, + Value: Send + Hash + Eq, + EncodingError: Send + Debug, + BatcherMemory: Clone + Send + BatchingMemoryADT
, Word = [u8; WORD_LENGTH]>, +> { + memory: BatcherMemory, + encode: Encoder, + decode: Decoder, +} + +impl< + const WORD_LENGTH: usize, + Value: Send + Hash + Eq, + BatcherMemory: Debug + + Send + + Sync + + Clone + + BatchingMemoryADT
, Word = [u8; WORD_LENGTH]>, + EncodingError: Send + Debug, +> FindexBatcher +{ + pub fn new( + memory: BatcherMemory, + encode: Encoder, + decode: Decoder, + ) -> Self { + Self { + memory, + encode, + decode, + } + } + + async fn batch_insert_or_delete( + &self, + entries: Entries, + is_insert: bool, + ) -> Result<(), BatchFindexError> + where + Keyword: Send + Sync + Hash + Eq, + Bindings: Send + IntoIterator, + Entries: IntoIterator + Send, + Entries::IntoIter: ExactSizeIterator, + { + let entries = entries.into_iter(); + let n = entries.len(); + let mut futures = Vec::with_capacity(n); + let memory = MemoryBatcher::new( + self.memory.clone(), + NonZero::new(n) + .ok_or_else(|| BatchFindexError::Other("Batch size can not be zero".to_owned()))?, + ); + + for (guard_keyword, bindings) in entries { + let memory = memory.clone(); + // Create a temporary Findex instance using the shared batching layer. + let findex = Findex::::new( + memory.clone(), + self.encode, + self.decode, + ); + + let future = async move { + if is_insert { + findex.insert(guard_keyword, bindings).await + } else { + findex.delete(guard_keyword, bindings).await + }?; + // Within findex, insert/delete operations may perform a variable number of memory + // writes. This requires explicit `unsubscribe()` calls to adjust the expected buffer size once one of the operations succeeds. + memory.unsubscribe().await?; + Ok::<_, BatchFindexError<_>>(()) + }; + + futures.push(future); + } + + // Execute all futures concurrently and collect results. + futures::future::try_join_all(futures).await?; + + Ok(()) + } +} + +impl< + const WORD_LENGTH: usize, + Keyword: Send + Sync + Hash + Eq, + Value: Send + Hash + Eq, + EncodingError: Send + Debug, + BatchingMemoryLayer: Debug + + Send + + Sync + + Clone + + BatchingMemoryADT
, Word = [u8; WORD_LENGTH]>, +> IndexBatcher + for FindexBatcher +{ + type Error = BatchFindexError; + + async fn batch_insert(&self, entries: Entries) -> Result<(), Self::Error> + where + Bindings: Send + IntoIterator, + Entries: Send + IntoIterator, + Entries::IntoIter: ExactSizeIterator, + { + self.batch_insert_or_delete(entries, true).await + } + + async fn batch_delete(&self, entries: Entries) -> Result<(), Self::Error> + where + Bindings: Send + IntoIterator, + Entries: Send + IntoIterator, + Entries::IntoIter: ExactSizeIterator, + { + self.batch_insert_or_delete(entries, false).await + } + + async fn batch_search( + &self, + keywords: Vec<&Keyword>, + ) -> Result>, Self::Error> { + let n = keywords.len(); + let mut futures = Vec::with_capacity(n); + let memory = MemoryBatcher::new( + self.memory.clone(), + NonZero::new(n) + .ok_or_else(|| BatchFindexError::Other("Batch size can not be zero".to_owned()))?, + ); + + for keyword in keywords { + let memory = memory.clone(); + let findex = Findex::::new( + memory, + self.encode, + self.decode, + ); + // Search operations do not require calling `unsubscribe()` on the memory batcher. + // This is because all Findex search operations perform the same deterministic + // number of memory read operations. Specifically, each (safe) Findex search + // completes after performing exactly two reads. + let future = async move { findex.search(keyword).await }; + futures.push(future); + } + + futures::future::try_join_all(futures) + .await + .map_err(|e| BatchFindexError::Findex(e)) + } +} + +// These tests implement dual testing against the base Findex implementation. +#[cfg(test)] +mod tests { + use std::collections::HashSet; + + use cosmian_crypto_core::define_byte_type; + use cosmian_sse_memories::{ADDRESS_LENGTH, InMemory}; + + use super::*; + use crate::{Findex, IndexADT, dummy_decode, dummy_encode}; + + type Value = Bytes<8>; + define_byte_type!(Bytes); + + impl TryFrom for Bytes { + type Error = String; + + fn try_from(value: usize) -> Result { + Self::try_from(value.to_be_bytes().as_slice()).map_err(|e| e.to_string()) + } + } + + const WORD_LENGTH: usize = 16; + + #[tokio::test] + async fn test_batch_insert_and_delete() { + let trivial_memory = InMemory::, [u8; WORD_LENGTH]>::default(); + + // Initial data for insertion + let cat_bindings = vec![ + Value::try_from(1).unwrap(), + Value::try_from(2).unwrap(), + Value::try_from(3).unwrap(), + Value::try_from(7).unwrap(), + ]; + let dog_bindings = vec![ + Value::try_from(4).unwrap(), + Value::try_from(5).unwrap(), + Value::try_from(6).unwrap(), + ]; + + // Insert using normal findex + let findex = Findex::new( + trivial_memory.clone(), + dummy_encode::, + dummy_decode, + ); + + findex + .insert("cat".to_string(), cat_bindings.clone()) + .await + .unwrap(); + findex + .insert("dog".to_string(), dog_bindings.clone()) + .await + .unwrap(); + + // Create a `findex_batcher` instance + let findex_batcher = FindexBatcher::::new( + trivial_memory.clone(), + dummy_encode, + dummy_decode, + ); + + // Test batch delete + let deletion_entries = vec![ + ( + "cat".to_string(), + vec![Value::try_from(1).unwrap(), Value::try_from(3).unwrap()], // Partial deletion + ), + ("dog".to_string(), dog_bindings), // Complete deletion + ]; + + findex_batcher.batch_delete(deletion_entries).await.unwrap(); + + // Verify deletions using normal findex + let cat_result_after_delete = findex.search(&"cat".to_string()).await.unwrap(); + let dog_result_after_delete = findex.search(&"dog".to_string()).await.unwrap(); + + let expected_cat = vec![ + Value::try_from(2).unwrap(), // 1 and 3 removed, 2 and 7 remain + Value::try_from(7).unwrap(), + ] + .into_iter() + .collect::>(); + let expected_dog = HashSet::new(); // All dog bindings removed + + assert_eq!(cat_result_after_delete, expected_cat); + assert_eq!(dog_result_after_delete, expected_dog); + + // Test batch insert + let insert_entries = vec![( + "dog".to_string(), + vec![Value::try_from(8).unwrap(), Value::try_from(9).unwrap()], + )]; + + findex_batcher.batch_insert(insert_entries).await.unwrap(); + + // Verify insertions using normal findex + let new_dog_results = findex.search(&"dog".to_string()).await.unwrap(); + + let expected_dog = vec![Value::try_from(8).unwrap(), Value::try_from(9).unwrap()] + .into_iter() + .collect::>(); + + assert_eq!(new_dog_results, expected_dog); + } + + #[tokio::test] + async fn test_batch_search() { + let trivial_memory = InMemory::, [u8; WORD_LENGTH]>::default(); + + let findex = Findex::new( + trivial_memory.clone(), + dummy_encode::, + dummy_decode, + ); + let cat_bindings = [ + Value::try_from(1).unwrap(), + Value::try_from(3).unwrap(), + Value::try_from(5).unwrap(), + ]; + let dog_bindings = [ + Value::try_from(0).unwrap(), + Value::try_from(2).unwrap(), + Value::try_from(4).unwrap(), + ]; + findex + .insert("cat".to_string(), cat_bindings.clone()) + .await + .unwrap(); + findex + .insert("dog".to_string(), dog_bindings.clone()) + .await + .unwrap(); + + let findex_batcher = FindexBatcher::::new( + trivial_memory.clone(), + dummy_encode, + dummy_decode, + ); + + let key1 = "cat".to_string(); + let key2 = "dog".to_string(); + // Perform batch search + let batch_search_results = findex_batcher + .batch_search(vec![&key1, &key2]) + .await + .unwrap(); + + assert_eq!( + batch_search_results, + vec![ + cat_bindings.iter().cloned().collect::>(), + dog_bindings.iter().cloned().collect::>() + ] + ); + } +} diff --git a/crate/memories/src/batching_layer/buffer.rs b/crate/memories/src/batching_layer/buffer.rs index c628b85b..5650a64f 100644 --- a/crate/memories/src/batching_layer/buffer.rs +++ b/crate/memories/src/batching_layer/buffer.rs @@ -1,3 +1,8 @@ +//! Thread-safe buffer for batching memory operations. +//! +//! Operations are accumulated until capacity is reached, then flushed in one call to the inner memory. +//! All operations are synchronized via `Mutex` to ensure thread-safe concurrent access. + use std::{ mem, num::{NonZero, NonZeroUsize}, @@ -54,14 +59,6 @@ where item: Operation, ) -> Result>, BufferError> { let mut buffer = self.0.lock().expect("poisoned lock"); - // Check if the new item is compatible with the last item, since the buffer is - // thread-safe, this ensures by transitivity that all items in the - // buffer are of the same type. - if let Some(last_item) = buffer.data.last() { - if mem::discriminant(last_item) != mem::discriminant(&item) { - return Err(BufferError::TypeMismatch); - } - } buffer.data.push(item); Ok(buffer.flush_if_not_empty()) } @@ -69,7 +66,7 @@ where #[derive(Debug, Clone, PartialEq, Eq)] pub enum BufferError { - TypeMismatch, // when the type of the new item does not match the type of the last item + TypeMismatch, Overflow, Underflow, } diff --git a/crate/memories/src/batching_layer/error.rs b/crate/memories/src/batching_layer/error.rs index 124478c5..6bcf82f4 100644 --- a/crate/memories/src/batching_layer/error.rs +++ b/crate/memories/src/batching_layer/error.rs @@ -15,8 +15,8 @@ where Memory(M::Error), /* the from will not be implemented due to conflicting * implementations with Rust's `core` library. Use `map_err` instead of * `?`. */ - Channel(String), - InternalBuffering(BufferError), + ClosedChannel, + Buffering(BufferError), WrongResultType(MemoryOutput), } @@ -27,10 +27,11 @@ where fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::Memory(err) => write!(f, "Memory error: {:?}", err), - Self::Channel(msg) => { - write!(f, "Channel closed unexpectedly: {}", msg) - } - Self::InternalBuffering(err) => write!(f, "Internal buffering error: {:?}", err), + Self::ClosedChannel => write!( + f, + "Channel closed unexpectedly, the sender was dropped before sending its results with the `send` function.." + ), + Self::Buffering(err) => write!(f, "Internal buffering error: {:?}", err), Self::WrongResultType(out) => { write!( f, @@ -50,10 +51,7 @@ where M::Word: std::fmt::Debug, { fn from(_: Canceled) -> Self { - Self::Channel( - "The sender was dropped before sending its results with the `send` function." - .to_string(), - ) + Self::ClosedChannel } } @@ -62,7 +60,7 @@ where M::Word: std::fmt::Debug, { fn from(e: BufferError) -> Self { - Self::InternalBuffering(e) + Self::Buffering(e) } } diff --git a/crate/memories/src/batching_layer/memory.rs b/crate/memories/src/batching_layer/memory.rs index 666cd288..c71f2f2d 100644 --- a/crate/memories/src/batching_layer/memory.rs +++ b/crate/memories/src/batching_layer/memory.rs @@ -14,10 +14,19 @@ use crate::{ }; pub struct MemoryBatcher { - pub inner: M, // The actual memory that does the R/W operations. + pub inner: Arc, // The actual memory that does the R/W operations. buffer: Arc>, // The buffer that holds the operations to be batched. } +impl Clone for MemoryBatcher { + fn clone(&self) -> Self { + Self { + inner: Arc::clone(&self.inner), + buffer: Arc::clone(&self.buffer), + } + } +} + impl MemoryADT for MemoryBatcher where M::Address: Clone, @@ -55,32 +64,6 @@ where } } -// Forward the BR/GW calls on Arcs to their actual implementations. -impl MemoryADT for Arc> -where - M::Address: Send + Clone, - M::Word: Send + std::fmt::Debug, -{ - type Address = M::Address; - type Error = MemoryBatcherError; - type Word = M::Word; - - async fn batch_read( - &self, - addresses: Vec, - ) -> Result>, Self::Error> { - (**self).batch_read(addresses).await - } - - async fn guarded_write( - &self, - guard: (Self::Address, Option), - bindings: Vec<(Self::Address, Self::Word)>, - ) -> Result, Self::Error> { - (**self).guarded_write(guard, bindings).await - } -} - impl MemoryBatcher where M::Address: Clone + Send, @@ -88,7 +71,7 @@ where { pub fn new(inner: M, capacity: NonZeroUsize) -> Self { Self { - inner, + inner: Arc::new(inner), buffer: Arc::new(ThreadSafeBuffer::new(capacity)), } } @@ -143,16 +126,18 @@ where // batch. let all_addresses: Vec<_> = ops .iter() - .flat_map(|op| match op { - Operation::Read((addresses, _)) => addresses.clone(), - _ => unreachable!( - "Expected all operations to be reads, reaching this statement means \ - the buffer has implementation flaws at the push level." - ), + .map(|op| match op { + Operation::Read((addresses, _)) => Ok(addresses.clone()), + _ => Err(MemoryBatcherError::Buffering( + crate::batching_layer::buffer::BufferError::TypeMismatch, + )), }) + .collect::, _>>()? // Short-circuit on first error. + .into_iter() + .flatten() .collect(); - let mut aggregated_reads_results = self + let mut words = self .inner .batch_read(all_addresses) .await @@ -162,34 +147,32 @@ where for (input_addresses, sender) in ops .into_iter() .map(|op| match op { - Operation::Read((addresses, sender)) => (addresses, sender), - _ => unreachable!( - "Expected all operations to be reads, reaching this statement means \ - the buffer has implementation flaws at the push level." - ), + Operation::Read((addresses, sender)) => Ok((addresses, sender)), + _ => Err(MemoryBatcherError::Buffering( + crate::batching_layer::buffer::BufferError::TypeMismatch, + )), }) + .collect::, _>>()? + .into_iter() .rev() { - let split_point = aggregated_reads_results.len() - input_addresses.len(); // This is the point where the last batch's results start. - let batch_results = aggregated_reads_results.split_off(split_point); // After this call, all_results will be left containing the elements [0, split_point). - sender.send(Ok(batch_results)).map_err(|_| { - // Upon failure, the vector we tried to send is returned in the Err variant, - // but it's explicitly ignored here to not extract information. - MemoryBatcherError::::Channel( - "The receiver end of this read operation was dropped before the \ - `send` function could be called." - .to_owned(), - ) - })?; + let batch_results = words.split_off(words.len() - input_addresses.len()); // After this call, all_results will be left containing the elements [0, split_point). + sender + .send(Ok(batch_results)) + .map_err(|_| MemoryBatcherError::::ClosedChannel)?; } } Operation::Write(_) => { let (bindings, senders): (Vec<_>, Vec<_>) = ops .into_iter() .map(|op| match op { - Operation::Write((bindings, sender)) => (bindings, sender), - _ => unreachable!(), + Operation::Write((bindings, sender)) => Ok((bindings, sender)), + _ => Err(MemoryBatcherError::Buffering( + crate::batching_layer::buffer::BufferError::TypeMismatch, + )), }) + .collect::, _>>()? + .into_iter() .unzip(); let aggregated_writes_results = self @@ -199,13 +182,9 @@ where .map_err(MemoryBatcherError::Memory)?; for (res, sender) in aggregated_writes_results.into_iter().zip(senders) { - sender.send(Ok(res)).map_err(|_| { - MemoryBatcherError::::Channel( - "The receiver end of this write operation was dropped before the \ - `send` function could be called." - .to_owned(), - ) - })?; + sender + .send(Ok(res)) + .map_err(|_| MemoryBatcherError::::ClosedChannel)?; } } }; diff --git a/crate/memories/src/batching_layer/mod.rs b/crate/memories/src/batching_layer/mod.rs index 30958941..d98e2bf7 100644 --- a/crate/memories/src/batching_layer/mod.rs +++ b/crate/memories/src/batching_layer/mod.rs @@ -5,3 +5,5 @@ mod operation; pub use error::MemoryBatcherError; pub use memory::MemoryBatcher; + +pub use crate::batching_layer::operation::{BatchReadInput, GuardedWriteInput}; diff --git a/crate/memories/src/batching_layer/operation.rs b/crate/memories/src/batching_layer/operation.rs index 2a0134fd..4faff8dd 100644 --- a/crate/memories/src/batching_layer/operation.rs +++ b/crate/memories/src/batching_layer/operation.rs @@ -11,8 +11,8 @@ use futures::channel::oneshot; use crate::{BatchingMemoryADT, MemoryADT}; -pub(crate) type BatchReadInput = Vec<::Address>; -pub(crate) type GuardedWriteInput = ( +pub type BatchReadInput = Vec<::Address>; +pub type GuardedWriteInput = ( (::Address, Option<::Word>), Vec<(::Address, ::Word)>, ); diff --git a/crate/memories/src/in_memory.rs b/crate/memories/src/in_memory.rs index 87a9a212..48c8e713 100644 --- a/crate/memories/src/in_memory.rs +++ b/crate/memories/src/in_memory.rs @@ -85,18 +85,19 @@ impl Batchi operations: Vec<((Address, Option), Vec<(Address, Value)>)>, ) -> Result>, Self::Error> { let store = &mut *self.inner.lock().expect("poisoned lock"); - let mut res = Vec::with_capacity(operations.len()); - for (guard, bindings) in operations { - let (a, old) = guard; - let cur = store.get(&a).cloned(); - if old == cur { - for (k, v) in bindings { - store.insert(k, v); + Ok(operations + .into_iter() + .map(|(guard, bindings)| { + let (a, old) = guard; + let cur = store.get(&a).cloned(); + if old == cur { + for (k, v) in bindings { + store.insert(k, v); + } } - } - res.push(cur); - } - Ok(res) + cur + }) + .collect()) } } diff --git a/crate/memories/src/lib.rs b/crate/memories/src/lib.rs index d86e19b7..e82c0676 100644 --- a/crate/memories/src/lib.rs +++ b/crate/memories/src/lib.rs @@ -63,15 +63,15 @@ mod batching_layer; #[cfg(feature = "batch")] pub use batching_layer::{MemoryBatcher, MemoryBatcherError}; +#[cfg(feature = "batch")] +pub use crate::batching_layer::{BatchReadInput, GuardedWriteInput}; + // Super trait for MemoryADT that allows doing write operations in batches. #[cfg(feature = "batch")] pub trait BatchingMemoryADT: MemoryADT { #[allow(clippy::type_complexity)] // Refactoring this type will make the code unnecessarily more difficult to read without any actual benefit. fn batch_guarded_write( &self, - write_operations: Vec<( - (Self::Address, Option), - Vec<(Self::Address, Self::Word)>, - )>, + write_operations: Vec>, ) -> impl Send + Future>, Self::Error>>; } From c823b03f83f443885836f505a6143a43636437b8 Mon Sep 17 00:00:00 2001 From: HatemMn <19950216+HatemMn@users.noreply.github.com> Date: Wed, 12 Nov 2025 18:45:41 +0100 Subject: [PATCH 08/10] fix: fmmt --- crate/findex/src/adt.rs | 3 ++- crate/findex/src/findex_batcher.rs | 14 ++++++++------ crate/memories/src/batching_layer/buffer.rs | 5 +++-- crate/memories/src/batching_layer/error.rs | 3 ++- 4 files changed, 15 insertions(+), 10 deletions(-) diff --git a/crate/findex/src/adt.rs b/crate/findex/src/adt.rs index 5d46fbca..41e6b5c6 100644 --- a/crate/findex/src/adt.rs +++ b/crate/findex/src/adt.rs @@ -51,7 +51,8 @@ pub trait VectorADT: Send { fn read(&self) -> impl Send + Future, Self::Error>>; } -/// This trait provides methods that let an index operate on multiple keywords or entries simultaneously. +/// This trait provides methods that let an index operate on multiple keywords +/// or entries simultaneously. #[cfg(feature = "batch")] pub trait IndexBatcher { type Error: std::error::Error; diff --git a/crate/findex/src/findex_batcher.rs b/crate/findex/src/findex_batcher.rs index fe4e6794..f5f18937 100644 --- a/crate/findex/src/findex_batcher.rs +++ b/crate/findex/src/findex_batcher.rs @@ -74,8 +74,9 @@ impl< } else { findex.delete(guard_keyword, bindings).await }?; - // Within findex, insert/delete operations may perform a variable number of memory - // writes. This requires explicit `unsubscribe()` calls to adjust the expected buffer size once one of the operations succeeds. + // Within findex, insert/delete operations may perform a variable number of + // memory writes. This requires explicit `unsubscribe()` calls + // to adjust the expected buffer size once one of the operations succeeds. memory.unsubscribe().await?; Ok::<_, BatchFindexError<_>>(()) }; @@ -142,10 +143,11 @@ impl< self.encode, self.decode, ); - // Search operations do not require calling `unsubscribe()` on the memory batcher. - // This is because all Findex search operations perform the same deterministic - // number of memory read operations. Specifically, each (safe) Findex search - // completes after performing exactly two reads. + // Search operations do not require calling `unsubscribe()` on the memory + // batcher. This is because all Findex search operations perform the + // same deterministic number of memory read operations. + // Specifically, each (safe) Findex search completes after + // performing exactly two reads. let future = async move { findex.search(keyword).await }; futures.push(future); } diff --git a/crate/memories/src/batching_layer/buffer.rs b/crate/memories/src/batching_layer/buffer.rs index 5650a64f..b3305a98 100644 --- a/crate/memories/src/batching_layer/buffer.rs +++ b/crate/memories/src/batching_layer/buffer.rs @@ -1,7 +1,8 @@ //! Thread-safe buffer for batching memory operations. //! -//! Operations are accumulated until capacity is reached, then flushed in one call to the inner memory. -//! All operations are synchronized via `Mutex` to ensure thread-safe concurrent access. +//! Operations are accumulated until capacity is reached, then flushed in one +//! call to the inner memory. All operations are synchronized via `Mutex` to +//! ensure thread-safe concurrent access. use std::{ mem, diff --git a/crate/memories/src/batching_layer/error.rs b/crate/memories/src/batching_layer/error.rs index 6bcf82f4..26ad6bd3 100644 --- a/crate/memories/src/batching_layer/error.rs +++ b/crate/memories/src/batching_layer/error.rs @@ -29,7 +29,8 @@ where Self::Memory(err) => write!(f, "Memory error: {:?}", err), Self::ClosedChannel => write!( f, - "Channel closed unexpectedly, the sender was dropped before sending its results with the `send` function.." + "Channel closed unexpectedly, the sender was dropped before sending its results \ + with the `send` function.." ), Self::Buffering(err) => write!(f, "Internal buffering error: {:?}", err), Self::WrongResultType(out) => { From 64050b0b6ffa7dcec88e641cf446d7cb1ab37425 Mon Sep 17 00:00:00 2001 From: HatemMn <19950216+HatemMn@users.noreply.github.com> Date: Wed, 12 Nov 2025 18:48:01 +0100 Subject: [PATCH 09/10] fix: UPGRADE ci --- .github/workflows/main.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index e46d23e3..15c8cd68 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -5,7 +5,7 @@ jobs: cargo-lint: uses: ./.github/workflows/ci.yml with: - toolchain: 1.87.0 + toolchain: 1.88.0 cleanup: needs: - cargo-lint From 94f794578175735dfb511c8179c1a9459a00c8fb Mon Sep 17 00:00:00 2001 From: HatemMn <19950216+HatemMn@users.noreply.github.com> Date: Wed, 12 Nov 2025 18:48:17 +0100 Subject: [PATCH 10/10] fix: UPGRADE ci again --- .github/workflows/main.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 15c8cd68..12f887d6 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -5,7 +5,7 @@ jobs: cargo-lint: uses: ./.github/workflows/ci.yml with: - toolchain: 1.88.0 + toolchain: 1.89.0 cleanup: needs: - cargo-lint