Skip to content

Commit

Permalink
many fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
xlc committed Nov 7, 2024
1 parent 1243845 commit 5cdfc1b
Show file tree
Hide file tree
Showing 7 changed files with 171 additions and 140 deletions.
59 changes: 40 additions & 19 deletions Blockchain/Sources/Blockchain/State/InMemoryBackend.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,14 @@ private struct KVPair: Comparable, Sendable {
}

public actor InMemoryBackend: StateBackendProtocol {
private var store: SortedArray<KVPair>
private var refCounts: [Data: Int]
// we really should be using Heap or some other Tree based structure here
// but let's keep it simple for now
private var store: SortedArray<KVPair> = .init([])
private var rawValues: [Data32: Data] = [:]
private var refCounts: [Data: Int] = [:]
private var rawValueRefCounts: [Data32: Int] = [:]

public init(store: [Data32: Data] = [:]) {
self.store = .init(store.map { KVPair(key: $0.key.data, value: $0.value) })
refCounts = [:]

for key in store.keys {
refCounts[key.data] = 1
}
}
public init() {}

public func read(key: Data) async throws -> Data? {
let idx = store.insertIndex(KVPair(key: key, value: Data()))
Expand Down Expand Up @@ -51,23 +48,47 @@ public actor InMemoryBackend: StateBackendProtocol {
return resp
}

public func batchRead(keys: [Data]) async throws -> [(key: Data, value: Data?)] {
var resp = [(key: Data, value: Data?)]()
for key in keys {
let value = try await read(key: key)
resp.append((key, value))
public func batchUpdate(_ updates: [StateBackendOperation]) async throws {
for update in updates {
switch update {
case let .write(key, value):
let idx = store.insertIndex(KVPair(key: key, value: value))
let item = store.array[safe: idx]
if let item, item.key == key { // found
// value is not used for ordering so this is safe
store.unsafeArrayAccess[idx].value = value
} else { // not found
store.insert(KVPair(key: key, value: value))
}
case let .writeRawValue(key, value):
rawValues[key] = value
rawValueRefCounts[key, default: 0] += 1
case .refIncrement:
break
case .refDecrement:
break
}
}
return resp
}

public func batchUpdate(_: [StateBackendOperation]) async throws {}
public func readValue(hash: Data32) async throws -> Data? {
rawValues[hash]
}

public func gc() async throws {
public func gc(callback: @Sendable (Data) -> Data32?) async throws {
// check ref counts and remove keys with 0 ref count
for (key, count) in refCounts where count == 0 {
let idx = store.insertIndex(KVPair(key: key, value: Data()))
if store.array[safe: idx]?.key == key {
let item = store.array[safe: idx]
if let item, item.key == key {
store.remove(at: idx)
if let rawValueKey = callback(item.value) {
rawValueRefCounts[rawValueKey, default: 0] -= 1
if rawValueRefCounts[rawValueKey] == 0 {
rawValues.removeValue(forKey: rawValueKey)
rawValueRefCounts.removeValue(forKey: rawValueKey)
}
}
}
}
}
Expand Down
71 changes: 7 additions & 64 deletions Blockchain/Sources/Blockchain/State/State.swift
Original file line number Diff line number Diff line change
Expand Up @@ -197,12 +197,8 @@ public struct State: Sendable {
}
}

public func save() async throws -> State {
let changes = layer.toKV()
let trie = StateTrie(rootHash: backend.rootHash, backend: backend)
try await trie.update(updates: changes)
let newRoot = trie.rootHash
return State(backend: backend.newBackend(rootHash: newRoot), layer: layer)
public func save() async throws {
try await backend.write(layer.toKV())
}
}

Expand All @@ -211,63 +207,6 @@ extension State {
recentHistory.items.last.map(\.headerHash)!
}

private class KVSequence: Sequence {
typealias Element = (key: Data32, value: Data)

let seq: any Sequence<(key: Data32, value: Data)>
let layer: [Data32: Data?]

init(state: State) async throws {
seq = try await state.backend.readAll()
var layer = [Data32: Data?]()
for (key, value) in state.layer.toKV() {
layer[key.encode()] = try value.map { try JamEncoder.encode($0) }
}
self.layer = layer
}

func makeIterator() -> KVSequence.Iterator {
KVSequence.Iterator(iter: seq.makeIterator(), layer: layer)
}

struct Iterator: IteratorProtocol {
typealias Element = (key: Data32, value: Data)

var iter: any IteratorProtocol<KVSequence.Element>
var layerIterator: [Data32: Data?].Iterator?
let layer: [Data32: Data?]

init(iter: any IteratorProtocol<KVSequence.Element>, layer: [Data32: Data?]) {
self.iter = iter
self.layer = layer
}

mutating func next() -> KVSequence.Iterator.Element? {
if layerIterator != nil {
if let (key, value) = layerIterator?.next() {
if let value {
return (key, value)
}
return next() // skip this one
}
return nil
}
if let (key, value) = iter.next() {
if layer.keys.contains(key) {
return next() // skip this one
}
return (key, value)
}
layerIterator = layer.makeIterator()
return next()
}
}
}

public func toKV() async throws -> some Sequence<(key: Data32, value: Data)> {
try await KVSequence(state: self)
}

public func asRef() -> StateRef {
StateRef(self)
}
Expand Down Expand Up @@ -335,7 +274,7 @@ extension State: Dummy {
}
let rootHash = try! stateMerklize(kv: store)

let backend = StateBackend(InMemoryBackend(store: store), config: config, rootHash: rootHash)
let backend = StateBackend(InMemoryBackend(), config: config, rootHash: rootHash)

let layer = StateLayer(changes: kv)

Expand Down Expand Up @@ -489,4 +428,8 @@ public class StateRef: Ref<State>, @unchecked Sendable {
public static func dummy(config: ProtocolConfigRef, block: BlockRef?) -> StateRef {
StateRef(State.dummy(config: config, block: block))
}

public var stateRoot: Data32 {
fatalError("not implemented")
}
}
69 changes: 31 additions & 38 deletions Blockchain/Sources/Blockchain/State/StateBackend.swift
Original file line number Diff line number Diff line change
Expand Up @@ -10,66 +10,59 @@ public enum StateBackendError: Error {
public final class StateBackend: Sendable {
private let impl: StateBackendProtocol
private let config: ProtocolConfigRef
public let rootHash: Data32
private let trie: StateTrie

public init(_ impl: StateBackendProtocol, config: ProtocolConfigRef, rootHash: Data32) {
self.impl = impl
self.config = config
self.rootHash = rootHash
trie = StateTrie(rootHash: rootHash, backend: impl)
}

public func read<Key: StateKey>(_ key: Key) async throws -> Key.Value {
let encodedKey = key.encode().data
if let ret = try await impl.read(key: encodedKey) {
public var rootHash: Data32 {
get async {
await trie.rootHash
}
}

public func read<Key: StateKey>(_ key: Key) async throws -> Key.Value? {
let encodedKey = key.encode()
if let ret = try await trie.read(key: encodedKey) {
guard let ret = try JamDecoder.decode(key.decodeType(), from: ret, withConfig: config) as? Key.Value else {
throw StateBackendError.invalidData
}
return ret
}
if Key.optional {
return Key.Value?.none as! Key.Value
return nil
}
throw StateBackendError.missingState
}

public func batchRead(_ keys: [any StateKey]) async throws -> [(key: any StateKey, value: (Codable & Sendable)?)] {
let encodedKeys = keys.map { $0.encode().data }
let result = try await impl.batchRead(keys: encodedKeys)
return try zip(result, keys).map { data, key in
guard let rawValue = data.value else {
return (key: key, value: nil)
}
let value = try JamDecoder.decode(key.decodeType(), from: rawValue, withConfig: config)
return (key: key, value: value)
var ret = [(key: any StateKey, value: (Codable & Sendable)?)]()
ret.reserveCapacity(keys.count)
for key in keys {
try await ret.append((key, read(key)))
}
return ret
}

public func readAll() async throws -> [Data32: Data] {
let all = try await impl.readAll(prefix: Data(), startKey: nil, limit: nil)
var result = [Data32: Data]()
for (key, value) in all {
try result[Data32(key).unwrap()] = value
}
return result
}

public func newBackend(rootHash: Data32) -> StateBackend {
StateBackend(impl: impl, config: config, rootHash: rootHash)
}
}

// MARK: - TrieNode

extension StateBackend {
public func readTrieNode(_ key: Data) async throws -> Data? {
try await impl.read(key: key)
}

public func batchUpdateTrieNodes(_ ops: [StateBackendOperation]) async throws {
try await impl.batchUpdate(ops)
public func write(_ values: any Sequence<(key: any StateKey, value: (Codable & Sendable)?)>) async throws {
try await trie.update(values.map { try (key: $0.key.encode(), value: $0.value.map { try JamEncoder.encode($0) }) })
try await trie.save()
}

public func gc() async throws {
try await impl.gc()
try await impl.gc { data in
guard data.count == 64 else {
// unexpected data size
return nil
}
let isRegularLeaf = data[0] & 0b1100_0000 == 0b1100_0000
if isRegularLeaf {
return Data32(data.suffix(from: 32))!
}
return nil
}
}
}
27 changes: 19 additions & 8 deletions Blockchain/Sources/Blockchain/State/StateBackendProtocol.swift
Original file line number Diff line number Diff line change
@@ -1,20 +1,31 @@
import Foundation
import Utils

public enum StateTrieBackendOperation: Sendable {
public enum StateBackendOperation: Sendable {
case write(key: Data, value: Data)
case writeRawValue(key: Data32, value: Data)
case refIncrement(key: Data)
case refDecrement(key: Data)
}

// key: trie node hash (32 bytes)
// value: trie node data (64 bytes)
public protocol StateTrieBackend: Sendable {
/// key: trie node hash (32 bytes)
/// value: trie node data (64 bytes)
/// ref counting requirements:
/// - write do not increment ref count, only explicit ref increment do
/// - lazy prune is used. e.g. when ref count is reduced to zero, the value will only be removed
/// when gc is performed
/// - raw value have its own ref counting
/// - writeRawValue increment ref count, and write if necessary
/// - raw value ref count is only decremented when connected trie node is removed during gc
public protocol StateBackendProtocol: Sendable {
func read(key: Data) async throws -> Data?
func readAll(prefix: Data, startKey: Data?, limit: UInt32?) async throws -> [(key: Data, value: Data)]
func batchRead(keys: [Data]) async throws -> [(key: Data, value: Data?)]
func batchUpdate(_ ops: [StateTrieBackendOperation]) async throws
func batchUpdate(_ ops: [StateBackendOperation]) async throws

// remove entries with zero ref count
func gc() async throws
// hash is the blake2b256 hash of the value
func readValue(hash: Data32) async throws -> Data?

/// remove entries with zero ref count
/// callback returns a dependent raw value key if the data is regular leaf node
func gc(callback: @Sendable (Data) -> Data32?) async throws
}
Loading

0 comments on commit 5cdfc1b

Please sign in to comment.