Skip to content

Commit 3b434d0

Browse files
committed
Optimized InMemoryBackendImpl
1 parent 066b6f7 commit 3b434d0

File tree

9 files changed

+409
-1132
lines changed

9 files changed

+409
-1132
lines changed

rust/Cargo.lock

Lines changed: 25 additions & 796 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ cargo build --release
2525
cargo run -- server/vss-server-config.toml
2626
```
2727

28-
**Note:** For testing puropose you can edit vss-server-config.toml to use `store_type` as in-memory instead of postgresql `store_type = "memory"`
28+
**Note:** For testing purposes you can edit `vss-server-config.toml` to use `store_type` as in-memory instead of PostgreSQL: `store_type = "in_memory"`
2929
4. VSS endpoint should be reachable at `http://localhost:8080/vss`.
3030

3131
### Configuration

rust/impls/src/in_memory_store.rs

Lines changed: 367 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,367 @@
1+
use std::collections::HashMap;
2+
use std::sync::{Arc, Mutex};
3+
4+
use async_trait::async_trait;
5+
use bytes::Bytes;
6+
use chrono::prelude::Utc;
7+
8+
use crate::postgres_store::{
9+
VssDbRecord, LIST_KEY_VERSIONS_MAX_PAGE_SIZE, MAX_PUT_REQUEST_ITEM_COUNT,
10+
};
11+
use api::error::VssError;
12+
use api::kv_store::{KvStore, GLOBAL_VERSION_KEY};
13+
use api::types::{
14+
DeleteObjectRequest, DeleteObjectResponse, GetObjectRequest, GetObjectResponse, KeyValue,
15+
ListKeyVersionsRequest, ListKeyVersionsResponse, PutObjectRequest, PutObjectResponse,
16+
};
17+
18+
/// In-memory backend for VSS, for testing purposes only.
19+
pub struct InMemoryBackendImpl {
20+
store: Arc<Mutex<HashMap<String, VssDbRecord>>>,
21+
}
22+
23+
impl InMemoryBackendImpl {
24+
/// Creates a new in-memory backend.
25+
pub fn new() -> Self {
26+
Self { store: Arc::new(Mutex::new(HashMap::new())) }
27+
}
28+
29+
fn build_vss_record(&self, user_token: String, store_id: String, kv: KeyValue) -> VssDbRecord {
30+
let now = Utc::now();
31+
VssDbRecord {
32+
user_token,
33+
store_id,
34+
key: kv.key,
35+
value: kv.value.to_vec(),
36+
version: kv.version,
37+
created_at: now,
38+
last_updated_at: now,
39+
}
40+
}
41+
42+
fn build_key(user_token: &str, store_id: &str, key: &str) -> String {
43+
format!("{}#{}#{}", user_token, store_id, key)
44+
}
45+
46+
fn get_current_global_version(
47+
&self, guard: &HashMap<String, VssDbRecord>, user_token: &str, store_id: &str,
48+
) -> i64 {
49+
let global_key = Self::build_key(user_token, store_id, GLOBAL_VERSION_KEY);
50+
guard.get(&global_key).map(|r| r.version).unwrap_or(0)
51+
}
52+
53+
fn set_global_version(
54+
&self, guard: &mut HashMap<String, VssDbRecord>, user_token: String, store_id: String,
55+
new_version: i64,
56+
) {
57+
let global_key = Self::build_key(&user_token, &store_id, GLOBAL_VERSION_KEY);
58+
let now = Utc::now();
59+
60+
let entry = guard.entry(global_key);
61+
match entry {
62+
std::collections::hash_map::Entry::Occupied(mut occ) => {
63+
let rec = occ.get_mut();
64+
rec.version = new_version;
65+
rec.last_updated_at = now;
66+
},
67+
std::collections::hash_map::Entry::Vacant(vac) => {
68+
let record = VssDbRecord {
69+
user_token,
70+
store_id,
71+
key: GLOBAL_VERSION_KEY.to_string(),
72+
value: vec![],
73+
version: new_version,
74+
created_at: now,
75+
last_updated_at: now,
76+
};
77+
vac.insert(record);
78+
},
79+
}
80+
}
81+
}
82+
83+
fn execute_put_object(
84+
store: &mut HashMap<String, VssDbRecord>, record: VssDbRecord,
85+
) -> Result<(), VssError> {
86+
let key = format!("{}#{}#{}", record.user_token, record.store_id, record.key);
87+
let now = Utc::now();
88+
89+
let entry = store.entry(key);
90+
match entry {
91+
std::collections::hash_map::Entry::Occupied(mut occ) => {
92+
let existing = occ.get_mut();
93+
if existing.version >= record.version {
94+
return Err(VssError::ConflictError(format!(
95+
"Version conflict on put for key {}",
96+
record.key
97+
)));
98+
}
99+
existing.version = record.version;
100+
existing.value = record.value;
101+
existing.last_updated_at = now;
102+
},
103+
std::collections::hash_map::Entry::Vacant(vac) => {
104+
vac.insert(record);
105+
},
106+
}
107+
Ok(())
108+
}
109+
110+
fn execute_delete_object(
111+
store: &mut HashMap<String, VssDbRecord>, record: VssDbRecord,
112+
) -> Result<(), VssError> {
113+
let key = format!("{}#{}#{}", record.user_token, record.store_id, record.key);
114+
if let Some(existing) = store.get(&key) {
115+
if existing.version != record.version {
116+
return Ok(());
117+
}
118+
store.remove(&key);
119+
}
120+
Ok(())
121+
}
122+
123+
#[async_trait]
124+
impl KvStore for InMemoryBackendImpl {
125+
async fn get(
126+
&self, user_token: String, request: GetObjectRequest,
127+
) -> Result<GetObjectResponse, VssError> {
128+
let key = Self::build_key(&user_token, &request.store_id, &request.key);
129+
let guard = self.store.lock().unwrap();
130+
131+
if let Some(record) = guard.get(&key) {
132+
Ok(GetObjectResponse {
133+
value: Some(KeyValue {
134+
key: record.key.clone(),
135+
value: Bytes::from(record.value.clone()),
136+
version: record.version,
137+
}),
138+
})
139+
} else if request.key == GLOBAL_VERSION_KEY {
140+
let current_global =
141+
self.get_current_global_version(&guard, &user_token, &request.store_id);
142+
Ok(GetObjectResponse {
143+
value: Some(KeyValue {
144+
key: GLOBAL_VERSION_KEY.to_string(),
145+
value: Bytes::new(),
146+
version: current_global,
147+
}),
148+
})
149+
} else {
150+
Err(VssError::NoSuchKeyError("Requested key not found.".to_string()))
151+
}
152+
}
153+
154+
async fn put(
155+
&self, user_token: String, request: PutObjectRequest,
156+
) -> Result<PutObjectResponse, VssError> {
157+
if request.transaction_items.len() + request.delete_items.len() > MAX_PUT_REQUEST_ITEM_COUNT
158+
{
159+
return Err(VssError::InvalidRequestError(format!(
160+
"Number of write items per request should be less than equal to {}",
161+
MAX_PUT_REQUEST_ITEM_COUNT
162+
)));
163+
}
164+
165+
let store_id = request.store_id.clone();
166+
let mut guard = self.store.lock().unwrap();
167+
168+
// Handling global_version precondition
169+
let current_global = self.get_current_global_version(&guard, &user_token, &store_id);
170+
if let Some(expected_global) = request.global_version {
171+
if current_global != expected_global {
172+
return Err(VssError::ConflictError(format!(
173+
"Global version conflict: expected {}, current {}",
174+
expected_global, current_global
175+
)));
176+
}
177+
}
178+
179+
// Check for conflicts on puts
180+
for kv in &request.transaction_items {
181+
let key = Self::build_key(&user_token, &store_id, &kv.key);
182+
if let Some(existing) = guard.get(&key) {
183+
if existing.version >= kv.version {
184+
return Err(VssError::ConflictError(format!(
185+
"Version conflict on put for key {}",
186+
kv.key
187+
)));
188+
}
189+
}
190+
}
191+
192+
// Apply updates
193+
let vss_put_records: Vec<VssDbRecord> = request
194+
.transaction_items
195+
.into_iter()
196+
.map(|kv| self.build_vss_record(user_token.clone(), store_id.clone(), kv))
197+
.collect();
198+
199+
let vss_delete_records: Vec<VssDbRecord> = request
200+
.delete_items
201+
.into_iter()
202+
.map(|kv| self.build_vss_record(user_token.clone(), store_id.clone(), kv))
203+
.collect();
204+
205+
let mut mutated = false;
206+
for vss_record in vss_put_records {
207+
execute_put_object(&mut guard, vss_record)?;
208+
mutated = true;
209+
}
210+
for vss_record in vss_delete_records {
211+
// execute_delete_object is no-op on version mismatch, but call anyway
212+
execute_delete_object(&mut guard, vss_record)?;
213+
mutated = true;
214+
}
215+
216+
if mutated || request.global_version.is_some() {
217+
let new_global = current_global + 1;
218+
self.set_global_version(&mut guard, user_token.clone(), store_id.clone(), new_global);
219+
}
220+
221+
Ok(PutObjectResponse {})
222+
}
223+
224+
async fn delete(
225+
&self, user_token: String, request: DeleteObjectRequest,
226+
) -> Result<DeleteObjectResponse, VssError> {
227+
let key_value = request.key_value.ok_or_else(|| {
228+
VssError::InvalidRequestError("key_value missing in DeleteObjectRequest".to_string())
229+
})?;
230+
let store_id = request.store_id.clone();
231+
let mut guard = self.store.lock().unwrap();
232+
233+
let current_global = self.get_current_global_version(&guard, &user_token, &store_id);
234+
235+
// Check for version conflict
236+
let key = Self::build_key(&user_token, &store_id, &key_value.key);
237+
let mutated = if let Some(existing) = guard.get(&key) {
238+
if existing.version != key_value.version {
239+
false
240+
} else {
241+
true
242+
}
243+
} else {
244+
false
245+
};
246+
247+
if mutated {
248+
let vss_record = self.build_vss_record(user_token.clone(), store_id.clone(), key_value);
249+
execute_delete_object(&mut guard, vss_record)?;
250+
let new_global = current_global + 1;
251+
self.set_global_version(&mut guard, user_token.clone(), store_id.clone(), new_global);
252+
}
253+
254+
Ok(DeleteObjectResponse {})
255+
}
256+
257+
async fn list_key_versions(
258+
&self, user_token: String, request: ListKeyVersionsRequest,
259+
) -> Result<ListKeyVersionsResponse, VssError> {
260+
let store_id = request.store_id;
261+
let key_prefix = request.key_prefix.unwrap_or_default();
262+
let page_token = request.page_token.unwrap_or_default();
263+
let page_size = request.page_size.unwrap_or(i32::MAX);
264+
let limit = std::cmp::min(page_size, LIST_KEY_VERSIONS_MAX_PAGE_SIZE) as usize;
265+
266+
let mut global_version = None;
267+
if page_token.is_empty() {
268+
let get_global_version_request = GetObjectRequest {
269+
store_id: store_id.clone(),
270+
key: GLOBAL_VERSION_KEY.to_string(),
271+
};
272+
let get_response = self.get(user_token.clone(), get_global_version_request).await?;
273+
global_version = Some(get_response.value.unwrap().version);
274+
}
275+
276+
let key_versions: Vec<KeyValue> = {
277+
let guard = self.store.lock().unwrap();
278+
let mut key_versions: Vec<KeyValue> = guard
279+
.iter()
280+
.filter(|(k, _)| {
281+
let parts: Vec<&str> = k.split('#').collect();
282+
if parts.len() < 3 {
283+
return false;
284+
}
285+
parts[0] == user_token.as_str()
286+
&& parts[1] == store_id.as_str()
287+
&& parts[2].starts_with(&key_prefix)
288+
&& parts[2] > page_token.as_str()
289+
&& parts[2] != GLOBAL_VERSION_KEY
290+
})
291+
.map(|(_, record)| KeyValue {
292+
key: record.key.clone(),
293+
value: Bytes::new(),
294+
version: record.version,
295+
})
296+
.collect();
297+
298+
key_versions.sort_by(|a, b| a.key.cmp(&b.key));
299+
key_versions.into_iter().take(limit).collect()
300+
};
301+
302+
let next_page_token = if key_versions.len() == limit {
303+
key_versions.last().map(|kv| kv.key.clone())
304+
} else {
305+
None
306+
};
307+
308+
Ok(ListKeyVersionsResponse { key_versions, next_page_token, global_version })
309+
}
310+
}
311+
312+
#[cfg(test)]
313+
mod tests {
314+
use super::*;
315+
use api::kv_store::INITIAL_RECORD_VERSION;
316+
use bytes::Bytes;
317+
use tokio::test;
318+
319+
#[test]
320+
async fn test_in_memory_crud() {
321+
let store = InMemoryBackendImpl::new();
322+
let user_token = "test_user".to_string();
323+
let store_id = "test_store".to_string();
324+
325+
// Put
326+
let put_request = PutObjectRequest {
327+
store_id: store_id.clone(),
328+
transaction_items: vec![KeyValue {
329+
key: "key1".to_string(),
330+
value: Bytes::from("value1"),
331+
version: INITIAL_RECORD_VERSION as i64,
332+
}],
333+
delete_items: vec![],
334+
global_version: None,
335+
};
336+
store.put(user_token.clone(), put_request).await.unwrap();
337+
338+
// Get
339+
let get_request = GetObjectRequest { store_id: store_id.clone(), key: "key1".to_string() };
340+
let response = store.get(user_token.clone(), get_request).await.unwrap();
341+
let key_value = response.value.unwrap();
342+
assert_eq!(key_value.value, Bytes::from("value1"));
343+
let current_version = key_value.version;
344+
345+
// List
346+
let list_request = ListKeyVersionsRequest {
347+
store_id: store_id.clone(),
348+
key_prefix: None,
349+
page_size: Some(1),
350+
page_token: None,
351+
};
352+
let response = store.list_key_versions(user_token.clone(), list_request).await.unwrap();
353+
assert_eq!(response.key_versions.len(), 1);
354+
assert_eq!(response.key_versions[0].key, "key1");
355+
356+
// Delete
357+
let delete_request = DeleteObjectRequest {
358+
store_id: store_id.clone(),
359+
key_value: Some(KeyValue {
360+
key: "key1".to_string(),
361+
value: Bytes::new(),
362+
version: current_version,
363+
}),
364+
};
365+
store.delete(user_token.clone(), delete_request).await.unwrap();
366+
}
367+
}

rust/impls/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,11 @@
1111
#![deny(rustdoc::private_intra_doc_links)]
1212
#![deny(missing_docs)]
1313

14+
/// Contains in-memory backend implementation for VSS, for testing purposes only.
15+
pub mod in_memory_store;
1416
mod migrations;
1517
/// Contains [PostgreSQL](https://www.postgresql.org/) based backend implementation for VSS.
1618
pub mod postgres_store;
17-
/// Contains in-memory backend implementation for VSS, for testing purposes only.
18-
pub mod memory_store;
1919

2020
#[macro_use]
2121
extern crate api;

0 commit comments

Comments
 (0)