Skip to content

Commit e70fab6

Browse files
authored
Add snapshot and restore to indexer (#188)
1 parent 48f8333 commit e70fab6

File tree

8 files changed

+111
-37
lines changed

8 files changed

+111
-37
lines changed

indexer/src/adaptor.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::{where_query::WhereQuery, IndexerChange};
22
use schema::{self, record::RecordRoot, Schema};
3+
use serde::{Deserialize, Serialize};
34
use std::{pin::Pin, time::SystemTime};
45

56
pub type Result<T> = std::result::Result<T, Error>;
@@ -21,6 +22,12 @@ pub enum Error {
2122
CollectionCollectionRecordNotFound { id: String },
2223
}
2324

25+
#[derive(Debug, Clone, Serialize, Deserialize)]
26+
pub struct SnapshotValue {
27+
pub key: Box<[u8]>,
28+
pub value: Box<[u8]>,
29+
}
30+
2431
/// The Store trait
2532
#[async_trait::async_trait]
2633
pub trait IndexerAdaptor: Send + Sync {
@@ -51,4 +58,13 @@ pub trait IndexerAdaptor: Send + Sync {
5158
async fn set_system_key(&self, key: &str, data: &RecordRoot) -> Result<()>;
5259

5360
async fn get_system_key(&self, key: &str) -> Result<Option<RecordRoot>>;
61+
62+
async fn snapshot(
63+
&self,
64+
chunk_size: usize,
65+
) -> Pin<Box<dyn futures::Stream<Item = Result<Vec<SnapshotValue>>> + '_ + Send>>;
66+
67+
async fn restore(&self, chunk: Vec<SnapshotValue>) -> Result<()>;
68+
69+
async fn reset(&self) -> Result<()>;
5470
}

indexer/src/lib.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
// TODO: we should export schema from here, so that indexer builders
44
// are using the correct schema
5-
use crate::adaptor::IndexerAdaptor;
5+
use crate::adaptor::{IndexerAdaptor, SnapshotValue};
66
use crate::list_query::ListQuery;
77
use crate::where_query::WhereQuery;
88
use futures::stream::{FuturesUnordered, StreamExt};
@@ -75,6 +75,25 @@ impl<A: IndexerAdaptor> Indexer<A> {
7575
Self { adaptor }
7676
}
7777

78+
pub async fn snapshot(
79+
&self,
80+
chunk_size: usize,
81+
) -> Pin<Box<dyn futures::Stream<Item = Result<Vec<SnapshotValue>>> + '_ + Send>> {
82+
self.adaptor
83+
.snapshot(chunk_size)
84+
.await
85+
.map(|s| s.map_err(Error::from))
86+
.boxed()
87+
}
88+
89+
pub async fn restore(&self, chunk: Vec<SnapshotValue>) -> Result<()> {
90+
Ok(self.adaptor.restore(chunk).await?)
91+
}
92+
93+
pub async fn reset(&self) -> Result<()> {
94+
Ok(self.adaptor.reset().await?)
95+
}
96+
7897
pub async fn commit(&self, height: usize, changes: Vec<IndexerChange>) -> Result<()> {
7998
Ok(self.adaptor.commit(height, changes).await?)
8099
}

indexer/src/memory.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1-
use crate::adaptor::{Error, IndexerAdaptor, Result};
1+
use crate::adaptor::{Error, Result, SnapshotValue};
22
use crate::where_query::{WhereInequality, WhereNode, WhereQuery};
3+
use crate::IndexerAdaptor;
34
use crate::IndexerChange;
45
use schema::{
56
field_path::FieldPath,
@@ -417,6 +418,21 @@ impl IndexerAdaptor for MemoryStore {
417418

418419
Ok(state.system_data.get(key).cloned())
419420
}
421+
422+
async fn snapshot(
423+
&self,
424+
_: usize,
425+
) -> Pin<Box<dyn futures::Stream<Item = Result<Vec<SnapshotValue>>> + '_ + Send>> {
426+
todo!()
427+
}
428+
429+
async fn restore(&self, _: Vec<SnapshotValue>) -> Result<()> {
430+
todo!()
431+
}
432+
433+
async fn reset(&self) -> Result<()> {
434+
todo!()
435+
}
420436
}
421437

422438
#[cfg(test)]

indexer_rocksdb/src/adaptor.rs

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,13 @@ use crate::keys;
22
use crate::result_stream::convert_stream;
33
use crate::{
44
key_range::{self, key_range, KeyRange},
5-
proto,
5+
proto, snapshot,
66
store::{self, Store},
77
};
88
use async_recursion::async_recursion;
99
use futures::{StreamExt, TryStreamExt};
1010
use indexer::{
11-
adaptor::{self, IndexerAdaptor},
11+
adaptor::{self, IndexerAdaptor, SnapshotValue},
1212
where_query::WhereQuery,
1313
IndexerChange,
1414
};
@@ -92,6 +92,14 @@ impl RocksDBAdaptor {
9292
}
9393
}
9494

95+
pub fn snapshot(&self, chunk_size: usize) -> snapshot::SnapshotIterator {
96+
self.store.snapshot(chunk_size)
97+
}
98+
99+
pub fn restore(&self, data: snapshot::SnapshotChunk) -> Result<()> {
100+
Ok(self.store.restore(data)?)
101+
}
102+
95103
pub async fn _get(&self, collection_id: &str, record_id: &str) -> Result<Option<RecordRoot>> {
96104
let key = keys::Key::new_data(collection_id.to_string(), record_id.to_string())?;
97105

@@ -613,6 +621,28 @@ impl IndexerAdaptor for RocksDBAdaptor {
613621
async fn get_system_key(&self, key: &str) -> adaptor::Result<Option<RecordRoot>> {
614622
Ok(self._get_system_record(key).await?)
615623
}
624+
625+
async fn snapshot(
626+
&self,
627+
chunk_size: usize,
628+
) -> Pin<Box<dyn futures::Stream<Item = adaptor::Result<Vec<SnapshotValue>>> + '_ + Send>> {
629+
let res = futures::stream::iter(self.store.snapshot(chunk_size));
630+
let stream = Box::pin(res.map(|s| {
631+
s.map_err(store::StoreError::from)
632+
.map_err(Error::from)
633+
.map_err(adaptor::Error::from)
634+
}));
635+
636+
stream.boxed()
637+
}
638+
639+
async fn restore(&self, chunk: Vec<SnapshotValue>) -> adaptor::Result<()> {
640+
Ok(self.store.restore(chunk).map_err(Error::from)?)
641+
}
642+
643+
async fn reset(&self) -> adaptor::Result<()> {
644+
Ok(self.store.reset().map_err(Error::from)?)
645+
}
616646
}
617647

618648
impl From<Error> for adaptor::Error {

indexer_rocksdb/src/snapshot.rs

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1+
use indexer::adaptor::SnapshotValue;
12
use rocksdb::{IteratorMode, DB};
2-
use serde::{Deserialize, Serialize};
33

44
pub type Result<T> = std::result::Result<T, Error>;
55

@@ -12,16 +12,9 @@ pub enum Error {
1212
BincodeError(#[from] bincode::Error),
1313
}
1414

15-
#[derive(Debug, Clone, Serialize, Deserialize)]
16-
pub struct SnapshotValue {
17-
pub(crate) key: Box<[u8]>,
18-
pub(crate) value: Box<[u8]>,
19-
}
20-
2115
pub type SnapshotChunk = Vec<SnapshotValue>;
2216

2317
pub struct SnapshotIterator<'a> {
24-
// db: &'a DB,
2518
chunk_size: usize,
2619
iter: rocksdb::DBIteratorWithThreadMode<'a, rocksdb::DB>,
2720
}

polybase/src/db.rs

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@ use crate::mempool::Mempool;
33
use crate::txn::{self, CallTxn};
44
use futures_util::{future, StreamExt};
55
use gateway::Gateway;
6-
use indexer::{adaptor::IndexerAdaptor, IndexerChange};
6+
use indexer::{
7+
adaptor::{IndexerAdaptor, SnapshotValue},
8+
IndexerChange,
9+
};
710
use indexer::{auth_user::AuthUser, list_query::ListQuery, Indexer};
8-
use indexer_rocksdb::snapshot::{SnapshotChunk, SnapshotIterator};
911
use parking_lot::Mutex;
1012
use schema::{
1113
self, methods,
@@ -15,9 +17,9 @@ use schema::{
1517
},
1618
Schema,
1719
};
18-
use serde::{Deserialize, Serialize};
1920
use solid::proposal::{self};
2021
use std::cmp::min;
22+
use std::pin::Pin;
2123
use std::time::{Duration, SystemTime};
2224
use tokio::sync::mpsc;
2325
use tokio::sync::Mutex as AsyncMutex;
@@ -108,11 +110,6 @@ pub enum UserError {
108110
UnauthorizedCall,
109111
}
110112

111-
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
112-
pub struct DbSnapshot {
113-
pub index: Vec<u8>,
114-
}
115-
116113
pub enum DbWaitResult<T> {
117114
Updated(T),
118115
NotModified,
@@ -652,22 +649,25 @@ impl<A: IndexerAdaptor> Db<A> {
652649
}
653650

654651
/// Reset all data in the database
655-
pub fn reset(&self) -> Result<()> {
656-
todo!()
657-
//Ok(self.indexer.reset()?)
652+
pub async fn reset(&self) -> Result<()> {
653+
Ok(self.indexer.reset().await?)
658654
}
659655

660656
/// Create a snapshot iterator, that can be used to iterate over the
661657
/// entire database in chunks
662-
pub fn snapshot_iter(&self, _chunk_size: usize) -> SnapshotIterator {
663-
todo!()
664-
//self.indexer.snapshot(chunk_size)
658+
pub async fn snapshot_iter(
659+
&self,
660+
chunk_size: usize,
661+
) -> Pin<Box<dyn futures::Stream<Item = Result<Vec<SnapshotValue>>> + '_ + Send>> {
662+
self.indexer
663+
.snapshot(chunk_size)
664+
.await
665+
.map(|s| s.map_err(Error::from))
666+
.boxed()
665667
}
666668

667-
pub fn restore_chunk(&self, _chunk: SnapshotChunk) -> Result<()> {
668-
todo!()
669-
//self.indexer.restore(chunk)?;
670-
//Ok(())
669+
pub async fn restore_chunk(&self, chunk: Vec<SnapshotValue>) -> Result<()> {
670+
Ok(self.indexer.restore(chunk).await?)
671671
}
672672

673673
#[tracing::instrument(skip(self))]

polybase/src/main.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ async fn main() -> Result<()> {
290290
let mut last_commit = Instant::now();
291291

292292
while !shutdown.load(Ordering::Relaxed) {
293-
let network = Arc::clone(&network);
293+
let network: Arc<Network> = Arc::clone(&network);
294294

295295
tokio::select! {
296296
// Db only produces CallTxn events, that should be propogated
@@ -378,7 +378,7 @@ async fn main() -> Result<()> {
378378

379379
// Reset the database
380380
#[allow(clippy::expect_used)]
381-
db.reset().expect("Failed to reset database");
381+
db.reset().await.expect("Failed to reset database");
382382

383383
info!("Db reset ready for snapshot, sending accept");
384384

@@ -401,8 +401,8 @@ async fn main() -> Result<()> {
401401
// and this snapshot may take a while to complete
402402
tokio::spawn(async move {
403403
// 100MB chunks
404-
let snapshot_iter = db.snapshot_iter(config.snapshot_chunk_size);
405-
for chunk in snapshot_iter {
404+
let mut snapshot_iter = db.snapshot_iter(config.snapshot_chunk_size).await;
405+
while let Some(chunk) = snapshot_iter.next().await {
406406
let peer_id = from_peer_id.clone();
407407
match chunk {
408408
Ok(chunk) => {
@@ -450,7 +450,7 @@ async fn main() -> Result<()> {
450450
if let Some(chunk) = chunk {
451451
// We should panic if we are unable to restore
452452
#[allow(clippy::unwrap_used)]
453-
db.restore_chunk(chunk).unwrap();
453+
db.restore_chunk(chunk).await.unwrap();
454454
} else {
455455
// We are finished, reset solid with the new proposal state from the snapshot
456456
#[allow(clippy::unwrap_used)]

polybase/src/network/events.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use indexer_rocksdb::snapshot::SnapshotChunk;
1+
use indexer::adaptor::SnapshotValue;
22
use serde::{Deserialize, Serialize};
33
use solid::proposal::ProposalAccept;
44
use solid::proposal::ProposalManifest;
@@ -35,7 +35,7 @@ pub enum NetworkEvent {
3535
/// A chunk of data for us to load into the database.
3636
SnapshotChunk {
3737
id: usize,
38-
chunk: Option<SnapshotChunk>,
38+
chunk: Option<Vec<SnapshotValue>>,
3939
},
4040

4141
/// A transaction sent to another peer, which we should add to our Mempool

0 commit comments

Comments
 (0)