1+ use std:: collections:: HashMap ;
2+ use std:: sync:: { Arc , Mutex } ;
3+
4+ use chrono:: prelude:: { Utc } ;
5+ use async_trait:: async_trait;
6+ use bytes:: Bytes ;
7+
8+ use api:: error:: VssError ;
9+ use crate :: postgres_store:: { VssDbRecord , LIST_KEY_VERSIONS_MAX_PAGE_SIZE , MAX_PUT_REQUEST_ITEM_COUNT } ;
10+ use api:: kv_store:: { KvStore , GLOBAL_VERSION_KEY } ;
11+ use api:: types:: {
12+ DeleteObjectRequest , DeleteObjectResponse , GetObjectRequest , GetObjectResponse ,
13+ KeyValue , ListKeyVersionsRequest , ListKeyVersionsResponse , PutObjectRequest , PutObjectResponse ,
14+ } ;
15+
16+ /// In-memory backend for VSS, for testing purposes only.
17+ pub struct InMemoryBackendImpl {
18+ store : Arc < Mutex < HashMap < ( String , String , String ) , VssDbRecord > > > ,
19+ }
20+
21+ impl InMemoryBackendImpl {
22+ /// Creates a new in-memory backend.
23+ pub fn new ( ) -> Self {
24+ Self {
25+ store : Arc :: new ( Mutex :: new ( HashMap :: new ( ) ) ) ,
26+ }
27+ }
28+
29+ fn build_vss_record ( & self , user_token : String , store_id : String , kv : KeyValue ) -> VssDbRecord {
30+ let now = Utc :: now ( ) ;
31+ VssDbRecord {
32+ user_token,
33+ store_id,
34+ key : kv. key ,
35+ value : kv. value . to_vec ( ) ,
36+ version : kv. version ,
37+ created_at : now,
38+ last_updated_at : now,
39+ }
40+ }
41+
42+ fn execute_put_object (
43+ & self ,
44+ store : & mut HashMap < ( String , String , String ) , VssDbRecord > ,
45+ record : & VssDbRecord ,
46+ ) -> Result < u64 , VssError > {
47+ let key_tuple = (
48+ record. user_token . clone ( ) ,
49+ record. store_id . clone ( ) ,
50+ record. key . clone ( ) ,
51+ ) ;
52+ let now = Utc :: now ( ) ;
53+
54+ if let Some ( existing) = store. get_mut ( & key_tuple) {
55+ if existing. version >= record. version {
56+ return Err ( VssError :: ConflictError (
57+ "Version conflict on put" . to_string ( ) ,
58+ ) ) ;
59+ }
60+ existing. version = record. version ;
61+ existing. value = record. value . clone ( ) ;
62+ existing. last_updated_at = now;
63+ Ok ( 1 )
64+ } else {
65+ store. insert (
66+ key_tuple,
67+ VssDbRecord {
68+ user_token : record. user_token . clone ( ) ,
69+ store_id : record. store_id . clone ( ) ,
70+ key : record. key . clone ( ) ,
71+ value : record. value . clone ( ) ,
72+ version : record. version ,
73+ created_at : now,
74+ last_updated_at : now,
75+ } ,
76+ ) ;
77+ Ok ( 1 )
78+ }
79+ }
80+
81+ fn execute_delete_object (
82+ & self ,
83+ store : & mut HashMap < ( String , String , String ) , VssDbRecord > ,
84+ record : & VssDbRecord ,
85+ ) -> Result < u64 , VssError > {
86+ let key_tuple = (
87+ record. user_token . clone ( ) ,
88+ record. store_id . clone ( ) ,
89+ record. key . clone ( ) ,
90+ ) ;
91+ if let Some ( existing) = store. get ( & key_tuple) {
92+ if existing. version != record. version {
93+ return Ok ( 0 ) ;
94+ }
95+ store. remove ( & key_tuple) ;
96+ Ok ( 1 )
97+ } else {
98+ Ok ( 0 )
99+ }
100+ }
101+ }
102+
103+ #[ async_trait]
104+ impl KvStore for InMemoryBackendImpl {
105+ async fn get (
106+ & self ,
107+ user_token : String ,
108+ request : GetObjectRequest ,
109+ ) -> Result < GetObjectResponse , VssError > {
110+ let key_tuple = ( user_token. clone ( ) , request. store_id . clone ( ) , request. key . clone ( ) ) ;
111+ let guard = self . store . lock ( ) . unwrap ( ) ;
112+
113+ if let Some ( record) = guard. get ( & key_tuple) {
114+ Ok ( GetObjectResponse {
115+ value : Some ( KeyValue {
116+ key : record. key . clone ( ) ,
117+ value : Bytes :: from ( record. value . clone ( ) ) ,
118+ version : record. version ,
119+ } ) ,
120+ } )
121+ } else if request. key == GLOBAL_VERSION_KEY {
122+ Ok ( GetObjectResponse {
123+ value : Some ( KeyValue {
124+ key : GLOBAL_VERSION_KEY . to_string ( ) ,
125+ value : Bytes :: new ( ) ,
126+ version : 0 ,
127+ } ) ,
128+ } )
129+ } else {
130+ Err ( VssError :: NoSuchKeyError ( "Requested key not found." . to_string ( ) ) )
131+ }
132+ }
133+
134+ async fn put (
135+ & self ,
136+ user_token : String ,
137+ request : PutObjectRequest ,
138+ ) -> Result < PutObjectResponse , VssError > {
139+ if request. transaction_items . len ( ) + request. delete_items . len ( ) > MAX_PUT_REQUEST_ITEM_COUNT {
140+ return Err ( VssError :: InvalidRequestError ( format ! (
141+ "Number of write items per request should be less than equal to {}" ,
142+ MAX_PUT_REQUEST_ITEM_COUNT
143+ ) ) ) ;
144+ }
145+
146+ let mut guard = self . store . lock ( ) . unwrap ( ) ;
147+ let mut temp_store = guard. clone ( ) ;
148+
149+ let vss_put_records: Vec < VssDbRecord > = request
150+ . transaction_items
151+ . into_iter ( )
152+ . map ( |kv| self . build_vss_record ( user_token. clone ( ) , request. store_id . clone ( ) , kv) )
153+ . collect ( ) ;
154+
155+ let vss_delete_records: Vec < VssDbRecord > = request
156+ . delete_items
157+ . into_iter ( )
158+ . map ( |kv| self . build_vss_record ( user_token. clone ( ) , request. store_id . clone ( ) , kv) )
159+ . collect ( ) ;
160+
161+ let mut batch_results = Vec :: new ( ) ;
162+
163+ for vss_record in & vss_put_records {
164+ let num_rows = self . execute_put_object ( & mut temp_store, vss_record) ?;
165+ batch_results. push ( num_rows) ;
166+ }
167+
168+ for vss_record in & vss_delete_records {
169+ let num_rows = self . execute_delete_object ( & mut temp_store, vss_record) ?;
170+ batch_results. push ( num_rows) ;
171+ }
172+
173+ if batch_results. iter ( ) . any ( |& rows| rows == 0 && !vss_delete_records. is_empty ( ) ) {
174+ return Err ( VssError :: ConflictError (
175+ "Transaction could not be completed due to a possible conflict" . to_string ( ) ,
176+ ) ) ;
177+ }
178+
179+ * guard = temp_store;
180+ Ok ( PutObjectResponse { } )
181+ }
182+
183+ async fn delete (
184+ & self ,
185+ user_token : String ,
186+ request : DeleteObjectRequest ,
187+ ) -> Result < DeleteObjectResponse , VssError > {
188+ let key_value = request. key_value . ok_or_else ( || {
189+ VssError :: InvalidRequestError ( "key_value missing in DeleteObjectRequest" . to_string ( ) )
190+ } ) ?;
191+ let vss_record = self . build_vss_record ( user_token, request. store_id , key_value) ;
192+
193+ let mut guard = self . store . lock ( ) . unwrap ( ) ;
194+ let mut temp_store = guard. clone ( ) ;
195+
196+ self . execute_delete_object ( & mut temp_store, & vss_record) ?;
197+
198+ * guard = temp_store;
199+ Ok ( DeleteObjectResponse { } )
200+ }
201+
202+ async fn list_key_versions (
203+ & self ,
204+ user_token : String ,
205+ request : ListKeyVersionsRequest ,
206+ ) -> Result < ListKeyVersionsResponse , VssError > {
207+ let store_id = request. store_id ;
208+ let key_prefix = request. key_prefix . unwrap_or_default ( ) ;
209+ let page_token = request. page_token . unwrap_or_default ( ) ;
210+ let page_size = request. page_size . unwrap_or ( i32:: MAX ) ;
211+ let limit = std:: cmp:: min ( page_size, LIST_KEY_VERSIONS_MAX_PAGE_SIZE ) as usize ;
212+
213+ let mut global_version = None ;
214+ if page_token. is_empty ( ) {
215+ let get_global_version_request = GetObjectRequest {
216+ store_id : store_id. clone ( ) ,
217+ key : GLOBAL_VERSION_KEY . to_string ( ) ,
218+ } ;
219+ let get_response = self . get ( user_token. clone ( ) , get_global_version_request) . await ?;
220+ global_version = Some ( get_response. value . unwrap ( ) . version ) ;
221+ }
222+
223+ let key_versions: Vec < KeyValue > = {
224+ let guard = self . store . lock ( ) . unwrap ( ) ;
225+ let mut key_versions: Vec < KeyValue > = guard
226+ . iter ( )
227+ . filter ( |( k, _) | {
228+ k. 0 == user_token
229+ && k. 1 == store_id
230+ && k. 2 . starts_with ( & key_prefix)
231+ && k. 2 > page_token
232+ && k. 2 != GLOBAL_VERSION_KEY
233+ } )
234+ . map ( |( _, record) | KeyValue {
235+ key : record. key . clone ( ) ,
236+ value : Bytes :: new ( ) ,
237+ version : record. version ,
238+ } )
239+ . collect ( ) ;
240+
241+ key_versions. sort_by ( |a, b| a. key . cmp ( & b. key ) ) ;
242+ key_versions. into_iter ( ) . take ( limit) . collect ( )
243+ } ;
244+
245+ let next_page_token = if key_versions. len ( ) == limit {
246+ key_versions. last ( ) . map ( |kv| kv. key . clone ( ) )
247+ } else {
248+ None
249+ } ;
250+
251+ Ok ( ListKeyVersionsResponse {
252+ key_versions,
253+ next_page_token,
254+ global_version,
255+ } )
256+ }
257+ }
258+
259+ #[ cfg( test) ]
260+ mod tests {
261+ use super :: * ;
262+ use tokio:: test;
263+ use bytes:: Bytes ;
264+ use api:: kv_store:: INITIAL_RECORD_VERSION ;
265+
266+ #[ test]
267+ async fn test_in_memory_crud ( ) {
268+ let store = InMemoryBackendImpl :: new ( ) ;
269+ let user_token = "test_user" . to_string ( ) ;
270+ let store_id = "test_store" . to_string ( ) ;
271+
272+ // Put
273+ let put_request = PutObjectRequest {
274+ store_id : store_id. clone ( ) ,
275+ transaction_items : vec ! [ KeyValue {
276+ key: "key1" . to_string( ) ,
277+ value: Bytes :: from( "value1" ) ,
278+ version: INITIAL_RECORD_VERSION as i64 ,
279+ } ] ,
280+ delete_items : vec ! [ ] ,
281+ global_version : None ,
282+ } ;
283+ store. put ( user_token. clone ( ) , put_request) . await . unwrap ( ) ;
284+
285+ // Get
286+ let get_request = GetObjectRequest {
287+ store_id : store_id. clone ( ) ,
288+ key : "key1" . to_string ( ) ,
289+ } ;
290+ let response = store. get ( user_token. clone ( ) , get_request) . await . unwrap ( ) ;
291+ let key_value = response. value . unwrap ( ) ;
292+ assert_eq ! ( key_value. value, Bytes :: from( "value1" ) ) ;
293+ let current_version = key_value. version ;
294+
295+ // List
296+ let list_request = ListKeyVersionsRequest {
297+ store_id : store_id. clone ( ) ,
298+ key_prefix : None ,
299+ page_size : Some ( 1 ) ,
300+ page_token : None ,
301+ } ;
302+ let response = store. list_key_versions ( user_token. clone ( ) , list_request) . await . unwrap ( ) ;
303+ assert_eq ! ( response. key_versions. len( ) , 1 ) ;
304+ assert_eq ! ( response. key_versions[ 0 ] . key, "key1" ) ;
305+
306+ // Delete
307+ let delete_request = DeleteObjectRequest {
308+ store_id,
309+ key_value : Some ( KeyValue {
310+ key : "key1" . to_string ( ) ,
311+ value : Bytes :: new ( ) ,
312+ version : current_version,
313+ } ) ,
314+ } ;
315+ store. delete ( user_token, delete_request) . await . unwrap ( ) ;
316+ }
317+ }
0 commit comments