Skip to content
This repository was archived by the owner on Apr 20, 2020. It is now read-only.

Commit 6b71a3d

Browse files
authored
fix #66 handle none exisiting paths in Doc on index and query (#67)
* fix #66 handle none exisiting paths in Doc on index and query * return JSON result on query * Async index thread
1 parent a3199ea commit 6b71a3d

File tree

8 files changed

+261
-215
lines changed

8 files changed

+261
-215
lines changed

Cargo.lock

Lines changed: 101 additions & 159 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,16 @@ edition = "2018"
88
crate-type = ["cdylib"]
99

1010
[dependencies]
11-
bson = "0.13"
11+
bson = "0.14"
1212
serde_json = "1.0"
1313
libc = "0.2"
1414
jsonpath_lib = { git="https://github.com/gkorland/jsonpath.git", branch="reaplce_with_ownership_parser" }
1515

1616
[dependencies.redismodule]
1717
git = "https://github.com/redislabsmodules/redismodule-rs.git"
18-
branch = "master"
18+
branch = "safe_context_rm_call"
1919
features = ["experimental-api"]
2020

2121
[dependencies.redisearch_api]
2222
git = "https://github.com/RediSearch/redisearch-api-rs.git"
23-
branch = "master"
23+
branch = "safe_context_rm_call"

src/backward.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ impl From<u64> for NodeType {
3434
}
3535
}
3636

37-
pub unsafe fn json_rdb_load(rdb: *mut raw::RedisModuleIO) -> Value {
37+
pub fn json_rdb_load(rdb: *mut raw::RedisModuleIO) -> Value {
3838
let node_type = raw::load_unsigned(rdb).into();
3939
match node_type {
4040
NodeType::Null => Value::Null,

src/commands/index.rs

Lines changed: 106 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1-
use serde_json::Value;
1+
use std::thread;
22

3-
use redismodule::{Context, RedisError, RedisResult, RedisValue};
4-
use redismodule::{NextArg, REDIS_OK};
3+
use serde_json::{Map, Value};
4+
5+
use redismodule::{Context, NextArg, RedisError, RedisResult, RedisValue, REDIS_OK};
56

67
use redisearch_api::{Document, FieldType};
78

@@ -75,13 +76,11 @@ pub fn add_document(key: &str, index_name: &str, doc: &RedisJSON) -> RedisResult
7576

7677
let map = schema_map::as_ref();
7778

78-
map.get(index_name)
79-
.ok_or("ERR no such index".into())
80-
.and_then(|schema| {
81-
let rsdoc = create_document(key, schema, doc)?;
82-
schema.index.add_document(&rsdoc)?;
83-
REDIS_OK
84-
})
79+
if let Some(schema) = map.get(index_name) {
80+
let rsdoc = create_document(key, schema, doc)?;
81+
schema.index.add_document(&rsdoc)?;
82+
}
83+
REDIS_OK
8584
}
8685

8786
fn create_document(key: &str, schema: &Schema, doc: &RedisJSON) -> Result<Document, Error> {
@@ -91,13 +90,14 @@ fn create_document(key: &str, schema: &Schema, doc: &RedisJSON) -> Result<Docume
9190
let rsdoc = Document::create(key, score);
9291

9392
for (field_name, path) in fields {
94-
let value = doc.get_doc(&path)?;
95-
96-
match value {
97-
Value::String(v) => rsdoc.add_field(field_name, &v, FieldType::FULLTEXT),
98-
Value::Number(v) => rsdoc.add_field(field_name, &v.to_string(), FieldType::NUMERIC),
99-
Value::Bool(v) => rsdoc.add_field(field_name, &v.to_string(), FieldType::TAG),
100-
_ => {}
93+
let results = doc.get_values(path)?;
94+
if let Some(value) = results.first() {
95+
match value {
96+
Value::String(v) => rsdoc.add_field(field_name, &v, FieldType::FULLTEXT),
97+
Value::Number(v) => rsdoc.add_field(field_name, &v.to_string(), FieldType::NUMERIC),
98+
Value::Bool(v) => rsdoc.add_field(field_name, &v.to_string(), FieldType::TAG),
99+
_ => {}
100+
}
101101
}
102102
}
103103

@@ -120,14 +120,78 @@ where
120120
match subcommand.to_uppercase().as_str() {
121121
"ADD" => {
122122
let path = args.next_string()?;
123-
add_field(&index_name, &field_name, &path)
123+
add_field(&index_name, &field_name, &path)?;
124+
125+
// TODO handle another "ADD" calls in prallel a running call
126+
thread::spawn(move || {
127+
let schema = if let Some(stored_schema) = schema_map::as_ref().get(&index_name) {
128+
stored_schema
129+
} else {
130+
return; // TODO handle this case
131+
};
132+
133+
let ctx = Context::get_thread_safe_context();
134+
let mut cursor: u64 = 0;
135+
loop {
136+
ctx.lock();
137+
let res = scan_and_index(&ctx, &schema, cursor);
138+
ctx.unlock();
139+
140+
match res {
141+
Ok(c) => cursor = c,
142+
Err(e) => {
143+
eprintln!("Err on index {:?}", e); // TODO hadnle this better
144+
return;
145+
}
146+
}
147+
if cursor == 0 {
148+
break;
149+
}
150+
}
151+
});
152+
153+
REDIS_OK
124154
}
125155
//"DEL" => {}
126156
//"INFO" => {}
127157
_ => Err("ERR unknown subcommand - try `JSON.INDEX HELP`".into()),
128158
}
129159
}
130160

161+
fn scan_and_index(ctx: &Context, schema: &Schema, cursor: u64) -> Result<u64, RedisError> {
162+
let values = ctx.call("scan", &[&cursor.to_string()]);
163+
match values {
164+
Ok(RedisValue::Array(arr)) => match (arr.get(0), arr.get(1)) {
165+
(Some(RedisValue::SimpleString(next_cursor)), Some(RedisValue::Array(keys))) => {
166+
let cursor = next_cursor.parse().unwrap();
167+
let res = keys.iter().try_for_each(|k| {
168+
if let RedisValue::SimpleString(key) = k {
169+
ctx.open_key(&key)
170+
.get_value::<RedisJSON>(&REDIS_JSON_TYPE)
171+
.and_then(|doc| {
172+
if let Some(data) = doc {
173+
if let Some(index) = &data.index {
174+
if schema.name == *index {
175+
add_document(key, index, data)?;
176+
}
177+
}
178+
Ok(())
179+
} else {
180+
Err("Error on get value from key".into())
181+
}
182+
})
183+
} else {
184+
Err("Error on parsing reply from scan".into())
185+
}
186+
});
187+
res.map(|_| cursor)
188+
}
189+
_ => Err("Error on parsing reply from scan".into()),
190+
},
191+
_ => Err("Error on parsing reply from scan".into()),
192+
}
193+
}
194+
131195
// JSON.QGET <index> <query> <path>
132196
pub fn qget<I>(ctx: &Context, args: I) -> RedisResult
133197
where
@@ -145,18 +209,29 @@ where
145209
.ok_or("ERR no such index".into())
146210
.map(|schema| &schema.index)
147211
.and_then(|index| {
148-
let results: Result<Vec<_>, RedisError> = index
149-
.search(&query)?
150-
.map(|key| {
151-
let key = ctx.open_key_writable(&key);
152-
let value = match key.get_value::<RedisJSON>(&REDIS_JSON_TYPE)? {
153-
Some(doc) => doc.to_string(&path, Format::JSON)?.into(),
154-
None => RedisValue::None,
155-
};
156-
Ok(value)
157-
})
158-
.collect();
159-
160-
Ok(results?.into())
212+
let result: Value =
213+
index
214+
.search(&query)?
215+
.try_fold(Value::Object(Map::new()), |mut acc, key| {
216+
ctx.open_key(&key)
217+
.get_value::<RedisJSON>(&REDIS_JSON_TYPE)
218+
.and_then(|doc| {
219+
doc.map_or(Ok(Vec::new()), |data| {
220+
data.get_values(&path)
221+
.map_err(|e| e.into()) // Convert Error to RedisError
222+
.map(|values| {
223+
values.into_iter().map(|val| val.clone()).collect()
224+
})
225+
})
226+
})
227+
.map(|r| {
228+
acc.as_object_mut()
229+
.unwrap()
230+
.insert(key.to_string(), Value::Array(r));
231+
acc
232+
})
233+
})?;
234+
235+
Ok(RedisJSON::serialize(&result, Format::JSON)?.into())
161236
})
162237
}

src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ fn json_set(ctx: &Context, args: Vec<String>) -> RedisResult {
132132
}
133133
(None, SetOptions::AlreadyExists) => Ok(RedisValue::None),
134134
(None, _) => {
135-
let doc = RedisJSON::from_str(&value, format)?;
135+
let doc = RedisJSON::from_str(&value, &index, format)?;
136136
if path == "$" {
137137
redis_key.set_value(&REDIS_JSON_TYPE, doc)?;
138138

@@ -692,7 +692,7 @@ fn json_resp(ctx: &Context, args: Vec<String>) -> RedisResult {
692692

693693
let key = ctx.open_key(&key);
694694
match key.get_value::<RedisJSON>(&REDIS_JSON_TYPE)? {
695-
Some(doc) => Ok(resp_serialize(doc.get_doc(&path)?)),
695+
Some(doc) => Ok(resp_serialize(doc.get_first(&path)?)),
696696
None => Ok(RedisValue::None),
697697
}
698698
}

src/redisjson.rs

Lines changed: 39 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ impl Format {
4141
#[derive(Debug)]
4242
pub struct RedisJSON {
4343
data: Value,
44+
pub index: Option<String>,
4445
}
4546

4647
impl RedisJSON {
@@ -62,10 +63,14 @@ impl RedisJSON {
6263
}
6364
}
6465

65-
pub fn from_str(data: &str, format: Format) -> Result<Self, Error> {
66+
pub fn from_str(data: &str, index: &Option<String>, format: Format) -> Result<Self, Error> {
6667
let value = RedisJSON::parse_str(data, format)?;
67-
Ok(Self { data: value })
68+
Ok(Self {
69+
data: value,
70+
index: index.clone(),
71+
})
6872
}
73+
6974
fn add_value(&mut self, path: &str, value: Value) -> Result<bool, Error> {
7075
if NodeVisitorImpl::check(path)? {
7176
let mut splits = path.rsplitn(2, '.');
@@ -154,7 +159,7 @@ impl RedisJSON {
154159
}
155160

156161
pub fn to_string(&self, path: &str, format: Format) -> Result<String, Error> {
157-
let results = self.get_doc(path)?;
162+
let results = self.get_first(path)?;
158163
Self::serialize(results, format)
159164
}
160165

@@ -193,35 +198,35 @@ impl RedisJSON {
193198
}
194199

195200
pub fn str_len(&self, path: &str) -> Result<usize, Error> {
196-
self.get_doc(path)?
201+
self.get_first(path)?
197202
.as_str()
198203
.ok_or_else(|| "ERR wrong type of path value".into())
199204
.map(|s| s.len())
200205
}
201206

202207
pub fn arr_len(&self, path: &str) -> Result<usize, Error> {
203-
self.get_doc(path)?
208+
self.get_first(path)?
204209
.as_array()
205210
.ok_or_else(|| "ERR wrong type of path value".into())
206211
.map(|arr| arr.len())
207212
}
208213

209214
pub fn obj_len(&self, path: &str) -> Result<usize, Error> {
210-
self.get_doc(path)?
215+
self.get_first(path)?
211216
.as_object()
212217
.ok_or_else(|| "ERR wrong type of path value".into())
213218
.map(|obj| obj.len())
214219
}
215220

216221
pub fn obj_keys<'a>(&'a self, path: &'a str) -> Result<Vec<&'a String>, Error> {
217-
self.get_doc(path)?
222+
self.get_first(path)?
218223
.as_object()
219224
.ok_or_else(|| "ERR wrong type of path value".into())
220225
.map(|obj| obj.keys().collect())
221226
}
222227

223228
pub fn arr_index(&self, path: &str, scalar: &str, start: i64, end: i64) -> Result<i64, Error> {
224-
if let Value::Array(arr) = self.get_doc(path)? {
229+
if let Value::Array(arr) = self.get_first(path)? {
225230
// end=-1/0 means INFINITY to support backward with RedisJSON
226231
if arr.is_empty() || end < -1 {
227232
return Ok(-1);
@@ -252,7 +257,7 @@ impl RedisJSON {
252257
}
253258

254259
pub fn get_type(&self, path: &str) -> Result<String, Error> {
255-
let s = RedisJSON::value_name(self.get_doc(path)?);
260+
let s = RedisJSON::value_name(self.get_first(path)?);
256261
Ok(s.to_string())
257262
}
258263

@@ -322,7 +327,7 @@ impl RedisJSON {
322327

323328
pub fn get_memory<'a>(&'a self, path: &'a str) -> Result<usize, Error> {
324329
// TODO add better calculation, handle wrappers, internals and length
325-
let res = match self.get_doc(path)? {
330+
let res = match self.get_first(path)? {
326331
Value::Null => 0,
327332
Value::Bool(v) => mem::size_of_val(v),
328333
Value::Number(v) => mem::size_of_val(v),
@@ -333,26 +338,39 @@ impl RedisJSON {
333338
Ok(res.into())
334339
}
335340

336-
// TODO: Rename this to 'get_value', since 'doc' is overloaded.
337-
pub fn get_doc<'a>(&'a self, path: &'a str) -> Result<&'a Value, Error> {
338-
let results = jsonpath_lib::select(&self.data, path)?;
341+
pub fn get_first<'a>(&'a self, path: &'a str) -> Result<&'a Value, Error> {
342+
let results = self.get_values(path)?;
339343
match results.first() {
340344
Some(s) => Ok(s),
341345
None => Err("ERR path does not exist".into()),
342346
}
343347
}
348+
349+
pub fn get_values<'a>(&'a self, path: &'a str) -> Result<Vec<&'a Value>, Error> {
350+
let results = jsonpath_lib::select(&self.data, path)?;
351+
Ok(results)
352+
}
344353
}
345354

346355
pub mod type_methods {
347356
use super::*;
348357

349358
#[allow(non_snake_case, unused)]
350-
pub unsafe extern "C" fn rdb_load(rdb: *mut raw::RedisModuleIO, encver: c_int) -> *mut c_void {
359+
pub extern "C" fn rdb_load(rdb: *mut raw::RedisModuleIO, encver: c_int) -> *mut c_void {
351360
let json = match encver {
352361
0 => RedisJSON {
353362
data: backward::json_rdb_load(rdb),
363+
index: None, // TODO handle load from rdb
354364
},
355-
2 => RedisJSON::from_str(&raw::load_string(rdb), Format::JSON).unwrap(),
365+
2 => {
366+
let data = raw::load_string(rdb);
367+
let schema = if raw::load_unsigned(rdb) > 0 {
368+
Some(raw::load_string(rdb))
369+
} else {
370+
None
371+
};
372+
RedisJSON::from_str(&data, &schema, Format::JSON).unwrap()
373+
}
356374
_ => panic!("Can't load old RedisJSON RDB"),
357375
};
358376
Box::into_raw(Box::new(json)) as *mut c_void
@@ -367,5 +385,11 @@ pub mod type_methods {
367385
pub unsafe extern "C" fn rdb_save(rdb: *mut raw::RedisModuleIO, value: *mut c_void) {
368386
let json = &*(value as *mut RedisJSON);
369387
raw::save_string(rdb, &json.data.to_string());
388+
if let Some(index) = &json.index {
389+
raw::save_unsigned(rdb, 1);
390+
raw::save_string(rdb, &index);
391+
} else {
392+
raw::save_unsigned(rdb, 0);
393+
}
370394
}
371395
}

0 commit comments

Comments
 (0)