Skip to content

Commit 25d57e3

Browse files
committed
Optimized InMemoryBackendImpl
1 parent 066b6f7 commit 25d57e3

File tree

9 files changed

+339
-1132
lines changed

9 files changed

+339
-1132
lines changed

rust/Cargo.lock

Lines changed: 25 additions & 796 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ cargo build --release
2525
cargo run -- server/vss-server-config.toml
2626
```
2727

28-
**Note:** For testing puropose you can edit vss-server-config.toml to use `store_type` as in-memory instead of postgresql `store_type = "memory"`
28+
**Note:** For testing purposes you can edit `vss-server-config.toml` to use `store_type` as in-memory instead of PostgreSQL: `store_type = "in_memory"`
2929
4. VSS endpoint should be reachable at `http://localhost:8080/vss`.
3030

3131
### Configuration

rust/impls/src/in_memory_store.rs

Lines changed: 297 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,297 @@
1+
use std::collections::HashMap;
2+
use std::sync::{Arc, Mutex};
3+
4+
use async_trait::async_trait;
5+
use bytes::Bytes;
6+
use chrono::prelude::Utc;
7+
8+
use crate::postgres_store::{
9+
VssDbRecord, LIST_KEY_VERSIONS_MAX_PAGE_SIZE, MAX_PUT_REQUEST_ITEM_COUNT,
10+
};
11+
use api::error::VssError;
12+
use api::kv_store::{KvStore, GLOBAL_VERSION_KEY};
13+
use api::types::{
14+
DeleteObjectRequest, DeleteObjectResponse, GetObjectRequest, GetObjectResponse, KeyValue,
15+
ListKeyVersionsRequest, ListKeyVersionsResponse, PutObjectRequest, PutObjectResponse,
16+
};
17+
18+
/// In-memory backend for VSS, for testing purposes only.
19+
pub struct InMemoryBackendImpl {
20+
store: Arc<Mutex<HashMap<String, VssDbRecord>>>,
21+
}
22+
23+
impl InMemoryBackendImpl {
24+
/// Creates a new in-memory backend.
25+
pub fn new() -> Self {
26+
Self { store: Arc::new(Mutex::new(HashMap::new())) }
27+
}
28+
29+
fn build_vss_record(&self, user_token: String, store_id: String, kv: KeyValue) -> VssDbRecord {
30+
let now = Utc::now();
31+
VssDbRecord {
32+
user_token,
33+
store_id,
34+
key: kv.key,
35+
value: kv.value.to_vec(),
36+
version: kv.version,
37+
created_at: now,
38+
last_updated_at: now,
39+
}
40+
}
41+
42+
fn build_key(user_token: &str, store_id: &str, key: &str) -> String {
43+
format!("{}#{}#{}", user_token, store_id, key)
44+
}
45+
}
46+
47+
fn execute_put_object(
48+
store: &mut HashMap<String, VssDbRecord>, record: VssDbRecord,
49+
) -> Result<(), VssError> {
50+
let key = format!("{}#{}#{}", record.user_token, record.store_id, record.key);
51+
let now = Utc::now();
52+
53+
if let Some(existing) = store.get_mut(&key) {
54+
if existing.version >= record.version {
55+
return Err(VssError::ConflictError(format!(
56+
"Version conflict on put for key {}",
57+
record.key
58+
)));
59+
}
60+
existing.version = record.version;
61+
existing.value = record.value;
62+
existing.last_updated_at = now;
63+
} else {
64+
store.insert(key, record);
65+
}
66+
Ok(())
67+
}
68+
69+
fn execute_delete_object(
70+
store: &mut HashMap<String, VssDbRecord>, record: VssDbRecord,
71+
) -> Result<(), VssError> {
72+
let key = format!("{}#{}#{}", record.user_token, record.store_id, record.key);
73+
if let Some(existing) = store.get(&key) {
74+
if existing.version != record.version {
75+
return Ok(());
76+
}
77+
store.remove(&key);
78+
}
79+
Ok(())
80+
}
81+
82+
#[async_trait]
83+
impl KvStore for InMemoryBackendImpl {
84+
async fn get(
85+
&self, user_token: String, request: GetObjectRequest,
86+
) -> Result<GetObjectResponse, VssError> {
87+
let key = Self::build_key(&user_token, &request.store_id, &request.key);
88+
let guard = self.store.lock().unwrap();
89+
90+
if let Some(record) = guard.get(&key) {
91+
Ok(GetObjectResponse {
92+
value: Some(KeyValue {
93+
key: record.key.clone(),
94+
value: Bytes::from(record.value.clone()),
95+
version: record.version,
96+
}),
97+
})
98+
} else if request.key == GLOBAL_VERSION_KEY {
99+
Ok(GetObjectResponse {
100+
value: Some(KeyValue {
101+
key: GLOBAL_VERSION_KEY.to_string(),
102+
value: Bytes::new(),
103+
version: 0,
104+
}),
105+
})
106+
} else {
107+
Err(VssError::NoSuchKeyError("Requested key not found.".to_string()))
108+
}
109+
}
110+
111+
async fn put(
112+
&self, user_token: String, request: PutObjectRequest,
113+
) -> Result<PutObjectResponse, VssError> {
114+
if request.transaction_items.len() + request.delete_items.len() > MAX_PUT_REQUEST_ITEM_COUNT
115+
{
116+
return Err(VssError::InvalidRequestError(format!(
117+
"Number of write items per request should be less than equal to {}",
118+
MAX_PUT_REQUEST_ITEM_COUNT
119+
)));
120+
}
121+
122+
let mut guard = self.store.lock().unwrap();
123+
124+
// Check for conflicts first
125+
for kv in &request.transaction_items {
126+
let key = Self::build_key(&user_token, &request.store_id, &kv.key);
127+
if let Some(existing) = guard.get(&key) {
128+
if existing.version >= kv.version {
129+
return Err(VssError::ConflictError(format!(
130+
"Version conflict on put for key {}",
131+
kv.key
132+
)));
133+
}
134+
}
135+
}
136+
for kv in &request.delete_items {
137+
let key = Self::build_key(&user_token, &request.store_id, &kv.key);
138+
if let Some(existing) = guard.get(&key) {
139+
if existing.version != kv.version {
140+
return Ok(PutObjectResponse {});
141+
}
142+
}
143+
}
144+
145+
let vss_put_records: Vec<VssDbRecord> = request
146+
.transaction_items
147+
.into_iter()
148+
.map(|kv| self.build_vss_record(user_token.clone(), request.store_id.clone(), kv))
149+
.collect();
150+
151+
let vss_delete_records: Vec<VssDbRecord> = request
152+
.delete_items
153+
.into_iter()
154+
.map(|kv| self.build_vss_record(user_token.clone(), request.store_id.clone(), kv))
155+
.collect();
156+
157+
for vss_record in vss_put_records {
158+
execute_put_object(&mut guard, vss_record)?;
159+
}
160+
for vss_record in vss_delete_records {
161+
execute_delete_object(&mut guard, vss_record)?;
162+
}
163+
164+
Ok(PutObjectResponse {})
165+
}
166+
167+
async fn delete(
168+
&self, user_token: String, request: DeleteObjectRequest,
169+
) -> Result<DeleteObjectResponse, VssError> {
170+
let key_value = request.key_value.ok_or_else(|| {
171+
VssError::InvalidRequestError("key_value missing in DeleteObjectRequest".to_string())
172+
})?;
173+
let vss_record =
174+
self.build_vss_record(user_token.clone(), request.store_id.clone(), key_value);
175+
176+
let mut guard = self.store.lock().unwrap();
177+
178+
// Check for conflict
179+
let key = Self::build_key(&user_token, &request.store_id, &vss_record.key);
180+
if let Some(existing) = guard.get(&key) {
181+
if existing.version != vss_record.version {
182+
return Ok(DeleteObjectResponse {});
183+
}
184+
}
185+
186+
execute_delete_object(&mut guard, vss_record)?;
187+
Ok(DeleteObjectResponse {})
188+
}
189+
190+
async fn list_key_versions(
191+
&self, user_token: String, request: ListKeyVersionsRequest,
192+
) -> Result<ListKeyVersionsResponse, VssError> {
193+
let store_id = request.store_id;
194+
let key_prefix = request.key_prefix.unwrap_or_default();
195+
let page_token = request.page_token.unwrap_or_default();
196+
let page_size = request.page_size.unwrap_or(i32::MAX);
197+
let limit = std::cmp::min(page_size, LIST_KEY_VERSIONS_MAX_PAGE_SIZE) as usize;
198+
199+
let mut global_version = None;
200+
if page_token.is_empty() {
201+
let get_global_version_request = GetObjectRequest {
202+
store_id: store_id.clone(),
203+
key: GLOBAL_VERSION_KEY.to_string(),
204+
};
205+
let get_response = self.get(user_token.clone(), get_global_version_request).await?;
206+
global_version = Some(get_response.value.unwrap().version);
207+
}
208+
209+
let key_versions: Vec<KeyValue> = {
210+
let guard = self.store.lock().unwrap();
211+
let mut key_versions: Vec<KeyValue> = guard
212+
.iter()
213+
.filter(|(k, _)| {
214+
let parts: Vec<&str> = k.split('#').collect();
215+
parts[0] == user_token
216+
&& parts[1] == store_id
217+
&& parts[2].starts_with(&key_prefix)
218+
&& parts[2] > page_token.as_str()
219+
&& parts[2] != GLOBAL_VERSION_KEY
220+
})
221+
.map(|(_, record)| KeyValue {
222+
key: record.key.clone(),
223+
value: Bytes::new(),
224+
version: record.version,
225+
})
226+
.collect();
227+
228+
key_versions.sort_by(|a, b| a.key.cmp(&b.key));
229+
key_versions.into_iter().take(limit).collect()
230+
};
231+
232+
let next_page_token = if key_versions.len() == limit {
233+
key_versions.last().map(|kv| kv.key.clone())
234+
} else {
235+
None
236+
};
237+
238+
Ok(ListKeyVersionsResponse { key_versions, next_page_token, global_version })
239+
}
240+
}
241+
242+
#[cfg(test)]
243+
mod tests {
244+
use super::*;
245+
use api::kv_store::INITIAL_RECORD_VERSION;
246+
use bytes::Bytes;
247+
use tokio::test;
248+
249+
#[test]
250+
async fn test_in_memory_crud() {
251+
let store = InMemoryBackendImpl::new();
252+
let user_token = "test_user".to_string();
253+
let store_id = "test_store".to_string();
254+
255+
// Put
256+
let put_request = PutObjectRequest {
257+
store_id: store_id.clone(),
258+
transaction_items: vec![KeyValue {
259+
key: "key1".to_string(),
260+
value: Bytes::from("value1"),
261+
version: INITIAL_RECORD_VERSION as i64,
262+
}],
263+
delete_items: vec![],
264+
global_version: None,
265+
};
266+
store.put(user_token.clone(), put_request).await.unwrap();
267+
268+
// Get
269+
let get_request = GetObjectRequest { store_id: store_id.clone(), key: "key1".to_string() };
270+
let response = store.get(user_token.clone(), get_request).await.unwrap();
271+
let key_value = response.value.unwrap();
272+
assert_eq!(key_value.value, Bytes::from("value1"));
273+
let current_version = key_value.version;
274+
275+
// List
276+
let list_request = ListKeyVersionsRequest {
277+
store_id: store_id.clone(),
278+
key_prefix: None,
279+
page_size: Some(1),
280+
page_token: None,
281+
};
282+
let response = store.list_key_versions(user_token.clone(), list_request).await.unwrap();
283+
assert_eq!(response.key_versions.len(), 1);
284+
assert_eq!(response.key_versions[0].key, "key1");
285+
286+
// Delete
287+
let delete_request = DeleteObjectRequest {
288+
store_id,
289+
key_value: Some(KeyValue {
290+
key: "key1".to_string(),
291+
value: Bytes::new(),
292+
version: current_version,
293+
}),
294+
};
295+
store.delete(user_token, delete_request).await.unwrap();
296+
}
297+
}

rust/impls/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,11 @@
1111
#![deny(rustdoc::private_intra_doc_links)]
1212
#![deny(missing_docs)]
1313

14+
/// Contains in-memory backend implementation for VSS, for testing purposes only.
15+
pub mod in_memory_store;
1416
mod migrations;
1517
/// Contains [PostgreSQL](https://www.postgresql.org/) based backend implementation for VSS.
1618
pub mod postgres_store;
17-
/// Contains in-memory backend implementation for VSS, for testing purposes only.
18-
pub mod memory_store;
1919

2020
#[macro_use]
2121
extern crate api;

0 commit comments

Comments
 (0)