Skip to content

Commit 06a68be

Browse files
authored
Pure state sync refactoring (part-1) (#6249)
This pure refactoring of state sync is preparing for #4. As the rough plan in #4 (comment), there will be two PRs for the state sync refactoring. This first PR focuses on isolating the function `process_state_key_values()` as the central point for storing received state data in memory. This function will later be adapted to forward the state data directly to the DB layer for persistent sync. A follow-up PR will handle the encapsulation of `StateSyncMetadata` to support this persistent storage. Although there are many commits in this PR, each commit is small and intentionally incremental to facilitate a smoother review, please review them commit by commit. Each commit should represent an equivalent rewrite of the existing logic, with one exception bb447b2, which has a slight deviation from the original but is correct IMHO. Please give this commit special attention during the review.
1 parent 95be9c1 commit 06a68be

File tree

2 files changed

+88
-87
lines changed

2 files changed

+88
-87
lines changed

prdoc/pr_6249.prdoc

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
title: Pure state sync refactoring (part-1)
2+
3+
doc:
4+
- audience: Node Dev
5+
description: |
6+
The pure refactoring of state sync is preparing for https://github.com/paritytech/polkadot-sdk/issues/4. This is the first part, focusing on isolating the function `process_state_key_values()` as the central point for storing received state data in memory. This function will later be adapted to forward the state data directly to the DB layer to resolve the OOM issue and support persistent state sync.
7+
8+
crates:
9+
- name: sc-network-sync
10+
bump: none

substrate/client/network/sync/src/strategy/state_sync.rs

Lines changed: 78 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,12 @@
1919
//! State sync support.
2020
2121
use crate::{
22-
schema::v1::{StateEntry, StateRequest, StateResponse},
22+
schema::v1::{KeyValueStateEntry, StateEntry, StateRequest, StateResponse},
2323
LOG_TARGET,
2424
};
2525
use codec::{Decode, Encode};
2626
use log::debug;
27-
use sc_client_api::{CompactProof, ProofProvider};
27+
use sc_client_api::{CompactProof, KeyValueStates, ProofProvider};
2828
use sc_consensus::ImportedState;
2929
use smallvec::SmallVec;
3030
use sp_core::storage::well_known_keys;
@@ -132,6 +132,80 @@ where
132132
skip_proof,
133133
}
134134
}
135+
136+
fn process_state_key_values(
137+
&mut self,
138+
state_root: Vec<u8>,
139+
key_values: impl IntoIterator<Item = (Vec<u8>, Vec<u8>)>,
140+
) {
141+
let is_top = state_root.is_empty();
142+
143+
let entry = self.state.entry(state_root).or_default();
144+
145+
if entry.0.len() > 0 && entry.1.len() > 1 {
146+
// Already imported child_trie with same root.
147+
// Warning this will not work with parallel download.
148+
return;
149+
}
150+
151+
let mut child_storage_roots = Vec::new();
152+
153+
for (key, value) in key_values {
154+
// Skip all child key root (will be recalculated on import)
155+
if is_top && well_known_keys::is_child_storage_key(key.as_slice()) {
156+
child_storage_roots.push((value, key));
157+
} else {
158+
self.imported_bytes += key.len() as u64;
159+
entry.0.push((key, value));
160+
}
161+
}
162+
163+
for (root, storage_key) in child_storage_roots {
164+
self.state.entry(root).or_default().1.push(storage_key);
165+
}
166+
}
167+
168+
fn process_state_verified(&mut self, values: KeyValueStates) {
169+
for values in values.0 {
170+
self.process_state_key_values(values.state_root, values.key_values);
171+
}
172+
}
173+
174+
fn process_state_unverified(&mut self, response: StateResponse) -> bool {
175+
let mut complete = true;
176+
// if the trie is a child trie and one of its parent trie is empty,
177+
// the parent cursor stays valid.
178+
// Empty parent trie content only happens when all the response content
179+
// is part of a single child trie.
180+
if self.last_key.len() == 2 && response.entries[0].entries.is_empty() {
181+
// Do not remove the parent trie position.
182+
self.last_key.pop();
183+
} else {
184+
self.last_key.clear();
185+
}
186+
for state in response.entries {
187+
debug!(
188+
target: LOG_TARGET,
189+
"Importing state from {:?} to {:?}",
190+
state.entries.last().map(|e| sp_core::hexdisplay::HexDisplay::from(&e.key)),
191+
state.entries.first().map(|e| sp_core::hexdisplay::HexDisplay::from(&e.key)),
192+
);
193+
194+
if !state.complete {
195+
if let Some(e) = state.entries.last() {
196+
self.last_key.push(e.key.clone());
197+
}
198+
complete = false;
199+
}
200+
201+
let KeyValueStateEntry { state_root, entries, complete: _ } = state;
202+
self.process_state_key_values(
203+
state_root,
204+
entries.into_iter().map(|StateEntry { key, value }| (key, value)),
205+
);
206+
}
207+
complete
208+
}
135209
}
136210

137211
impl<B, Client> StateSyncProvider<B> for StateSync<B, Client>
@@ -181,94 +255,11 @@ where
181255
debug!(target: LOG_TARGET, "Error updating key cursor, depth: {}", completed);
182256
};
183257

184-
for values in values.0 {
185-
let key_values = if values.state_root.is_empty() {
186-
// Read child trie roots.
187-
values
188-
.key_values
189-
.into_iter()
190-
.filter(|key_value| {
191-
if well_known_keys::is_child_storage_key(key_value.0.as_slice()) {
192-
self.state
193-
.entry(key_value.1.clone())
194-
.or_default()
195-
.1
196-
.push(key_value.0.clone());
197-
false
198-
} else {
199-
true
200-
}
201-
})
202-
.collect()
203-
} else {
204-
values.key_values
205-
};
206-
let entry = self.state.entry(values.state_root).or_default();
207-
if entry.0.len() > 0 && entry.1.len() > 1 {
208-
// Already imported child_trie with same root.
209-
// Warning this will not work with parallel download.
210-
} else if entry.0.is_empty() {
211-
for (key, _value) in key_values.iter() {
212-
self.imported_bytes += key.len() as u64;
213-
}
214-
215-
entry.0 = key_values;
216-
} else {
217-
for (key, value) in key_values {
218-
self.imported_bytes += key.len() as u64;
219-
entry.0.push((key, value))
220-
}
221-
}
222-
}
258+
self.process_state_verified(values);
223259
self.imported_bytes += proof_size;
224260
complete
225261
} else {
226-
let mut complete = true;
227-
// if the trie is a child trie and one of its parent trie is empty,
228-
// the parent cursor stays valid.
229-
// Empty parent trie content only happens when all the response content
230-
// is part of a single child trie.
231-
if self.last_key.len() == 2 && response.entries[0].entries.is_empty() {
232-
// Do not remove the parent trie position.
233-
self.last_key.pop();
234-
} else {
235-
self.last_key.clear();
236-
}
237-
for state in response.entries {
238-
debug!(
239-
target: LOG_TARGET,
240-
"Importing state from {:?} to {:?}",
241-
state.entries.last().map(|e| sp_core::hexdisplay::HexDisplay::from(&e.key)),
242-
state.entries.first().map(|e| sp_core::hexdisplay::HexDisplay::from(&e.key)),
243-
);
244-
245-
if !state.complete {
246-
if let Some(e) = state.entries.last() {
247-
self.last_key.push(e.key.clone());
248-
}
249-
complete = false;
250-
}
251-
let is_top = state.state_root.is_empty();
252-
let entry = self.state.entry(state.state_root).or_default();
253-
if entry.0.len() > 0 && entry.1.len() > 1 {
254-
// Already imported child trie with same root.
255-
} else {
256-
let mut child_roots = Vec::new();
257-
for StateEntry { key, value } in state.entries {
258-
// Skip all child key root (will be recalculated on import).
259-
if is_top && well_known_keys::is_child_storage_key(key.as_slice()) {
260-
child_roots.push((value, key));
261-
} else {
262-
self.imported_bytes += key.len() as u64;
263-
entry.0.push((key, value))
264-
}
265-
}
266-
for (root, storage_key) in child_roots {
267-
self.state.entry(root).or_default().1.push(storage_key);
268-
}
269-
}
270-
}
271-
complete
262+
self.process_state_unverified(response)
272263
};
273264
if complete {
274265
self.complete = true;

0 commit comments

Comments
 (0)