Skip to content

Commit

Permalink
Block data provider refactor (#192)
Browse files Browse the repository at this point in the history
* blockchain data provider refactor

* fix and tests

* fix

* swiftlint is too slow
  • Loading branch information
xlc authored Oct 24, 2024
1 parent 63c6167 commit 361bd57
Show file tree
Hide file tree
Showing 15 changed files with 335 additions and 149 deletions.
10 changes: 5 additions & 5 deletions .githooks/pre-commit
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
#!/bin/bash

if which swiftlint >/dev/null; then
swiftlint lint --fix --quiet
else
echo "warning: SwiftLint not installed, download from https://github.com/realm/SwiftLint"
fi
# if which swiftlint >/dev/null; then
# swiftlint lint --fix --quiet
# else
# echo "warning: SwiftLint not installed, download from https://github.com/realm/SwiftLint"
# fi

git diff --diff-filter=d --staged --name-only | grep -e '\(.*\).swift$' | while read line; do
swiftformat "${line}";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,61 +3,55 @@ import Utils

private let logger = Logger(label: "BlockchainDataProvider")

private struct BlockchainStorage: Sendable {
var bestHead: Data32
var bestHeadTimeslot: TimeslotIndex
var finalizedHead: Data32
public struct HeadInfo: Sendable {
public var hash: Data32
public var timeslot: TimeslotIndex
public var number: UInt32
}

public final class BlockchainDataProvider: Sendable {
private let storage: ThreadSafeContainer<BlockchainStorage>
public actor BlockchainDataProvider: Sendable {
public private(set) var bestHead: HeadInfo
public private(set) var finalizedHead: HeadInfo
private let dataProvider: BlockchainDataProviderProtocol

public init(_ dataProvider: BlockchainDataProviderProtocol) async throws {
let heads = try await dataProvider.getHeads()
var bestHead: (HeaderRef, Data32)?
var bestHead = HeadInfo(hash: dataProvider.genesisBlockHash, timeslot: 0, number: 0)
for head in heads {
let header = try await dataProvider.getHeader(hash: head)
if bestHead == nil || header.value.timeslot > bestHead!.0.value.timeslot {
bestHead = (header, head)
if header.value.timeslot > bestHead.timeslot {
let number = try await dataProvider.getBlockNumber(hash: head)
bestHead = HeadInfo(hash: head, timeslot: header.value.timeslot, number: number)
}
}
let finalizedHead = try await dataProvider.getFinalizedHead()

storage = ThreadSafeContainer(.init(
bestHead: bestHead?.1 ?? dataProvider.genesisBlockHash,
bestHeadTimeslot: bestHead?.0.value.timeslot ?? 0,
finalizedHead: finalizedHead
))
self.bestHead = bestHead

self.dataProvider = dataProvider
}
let finalizedHeadHash = try await dataProvider.getFinalizedHead()

public var bestHead: Data32 {
storage.value.bestHead
}
finalizedHead = try await HeadInfo(
hash: finalizedHeadHash,
timeslot: dataProvider.getHeader(hash: finalizedHeadHash).value.timeslot,
number: dataProvider.getBlockNumber(hash: finalizedHeadHash)
)

public var finalizedHead: Data32 {
storage.value.finalizedHead
self.dataProvider = dataProvider
}

public func blockImported(block: BlockRef, state: StateRef) async throws {
try await add(block: block)
try await add(state: state)
try await updateHead(hash: block.hash, parent: block.header.parentHash)
try await dataProvider.updateHead(hash: block.hash, parent: block.header.parentHash)

if block.header.timeslot > storage.value.bestHeadTimeslot {
storage.write { storage in
storage.bestHead = block.hash
storage.bestHeadTimeslot = block.header.timeslot
}
if block.header.timeslot > bestHead.timeslot {
let number = try await dataProvider.getBlockNumber(hash: block.hash)
bestHead = HeadInfo(hash: block.hash, timeslot: block.header.timeslot, number: number)
}

logger.debug("block imported: \(block.hash)")
}
}

// expose BlockchainDataProviderProtocol
extension BlockchainDataProvider {
public func hasBlock(hash: Data32) async throws -> Bool {
try await dataProvider.hasBlock(hash: hash)
Expand All @@ -71,6 +65,10 @@ extension BlockchainDataProvider {
try await dataProvider.isHead(hash: hash)
}

public func getBlockNumber(hash: Data32) async throws -> UInt32 {
try await dataProvider.getBlockNumber(hash: hash)
}

public func getHeader(hash: Data32) async throws -> HeaderRef {
try await dataProvider.getHeader(hash: hash)
}
Expand All @@ -95,31 +93,56 @@ extension BlockchainDataProvider {
try await dataProvider.getBlockHash(byTimeslot: timeslot)
}

public func getBlockHash(byNumber number: UInt32) async throws -> Set<Data32> {
try await dataProvider.getBlockHash(byNumber: number)
}

// add forks of finalized head is not allowed
public func add(block: BlockRef) async throws {
logger.debug("adding block: \(block.hash)")

// require parent exists (i.e. not purged) and block is not fork of any finalized block
guard try await hasBlock(hash: block.header.parentHash), block.header.timeslot > finalizedHead.timeslot else {
throw BlockchainDataProviderError.uncanonical(hash: block.hash)
}

try await dataProvider.add(block: block)
}

/// only allow to add state if the corresponding block is added
public func add(state: StateRef) async throws {
logger.debug("adding state: \(state.value.lastBlockHash)")

// if block exists, that means it passed the canonicalization check
guard try await hasBlock(hash: state.value.lastBlockHash) else {
throw BlockchainDataProviderError.noData(hash: state.value.lastBlockHash)
}

try await dataProvider.add(state: state)
}

/// Also purge fork of all finalized blocks
public func setFinalizedHead(hash: Data32) async throws {
logger.debug("setting finalized head: \(hash)")

try await dataProvider.setFinalizedHead(hash: hash)
storage.write { storage in
storage.finalizedHead = hash
}
}
let oldFinalizedHead = finalizedHead
let number = try await dataProvider.getBlockNumber(hash: hash)

public func updateHead(hash: Data32, parent: Data32) async throws {
logger.debug("updating head: \(hash) with parent: \(parent)")
var hashToCheck = hash
var hashToCheckNumber = number
while hashToCheck != oldFinalizedHead.hash {
let hashes = try await dataProvider.getBlockHash(byNumber: hashToCheckNumber)
for hash in hashes where hash != hashToCheck {
logger.trace("purge block: \(hash)")
try await dataProvider.remove(hash: hash)
}
hashToCheck = try await dataProvider.getHeader(hash: hashToCheck).value.parentHash
hashToCheckNumber -= 1
}

try await dataProvider.updateHead(hash: hash, parent: parent)
let header = try await dataProvider.getHeader(hash: hash)
finalizedHead = HeadInfo(hash: hash, timeslot: header.value.timeslot, number: number)
try await dataProvider.setFinalizedHead(hash: hash)
}

public func remove(hash: Data32) async throws {
Expand All @@ -131,4 +154,8 @@ extension BlockchainDataProvider {
public var genesisBlockHash: Data32 {
dataProvider.genesisBlockHash
}

public func getBestState() async throws -> StateRef {
try await dataProvider.getState(hash: bestHead.hash)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@ import Utils

public enum BlockchainDataProviderError: Error, Equatable {
case noData(hash: Data32)
case uncanonical(hash: Data32)
}

public protocol BlockchainDataProviderProtocol: Sendable {
func hasBlock(hash: Data32) async throws -> Bool
func hasState(hash: Data32) async throws -> Bool
func isHead(hash: Data32) async throws -> Bool

func getBlockNumber(hash: Data32) async throws -> UInt32

/// throw BlockchainDataProviderError.noData if not found
func getHeader(hash: Data32) async throws -> HeaderRef

Expand All @@ -24,6 +27,8 @@ public protocol BlockchainDataProviderProtocol: Sendable {

/// return empty set if not found
func getBlockHash(byTimeslot timeslot: TimeslotIndex) async throws -> Set<Data32>
/// return empty set if not found
func getBlockHash(byNumber number: UInt32) async throws -> Set<Data32>

func add(block: BlockRef) async throws
func add(state: StateRef) async throws
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ public actor InMemoryDataProvider: Sendable {
public private(set) var heads: Set<Data32>
public private(set) var finalizedHead: Data32

private var hashByNumber: [UInt32: Set<Data32>] = [:]
private var numberByHash: [Data32: UInt32] = [:]
private var blockByHash: [Data32: BlockRef] = [:]
private var stateByBlockHash: [Data32: StateRef] = [:]
private var hashByTimeslot: [TimeslotIndex: Set<Data32>] = [:]
Expand Down Expand Up @@ -32,6 +34,13 @@ extension InMemoryDataProvider: BlockchainDataProviderProtocol {
heads.contains(hash)
}

public func getBlockNumber(hash: Data32) async throws -> UInt32 {
guard let number = numberByHash[hash] else {
throw BlockchainDataProviderError.noData(hash: hash)
}
return number
}

public func getHeader(hash: Data32) throws -> HeaderRef {
guard let header = blockByHash[hash]?.header.asRef() else {
throw BlockchainDataProviderError.noData(hash: hash)
Expand Down Expand Up @@ -65,6 +74,10 @@ extension InMemoryDataProvider: BlockchainDataProviderProtocol {
hashByTimeslot[timeslot] ?? Set()
}

public func getBlockHash(byNumber number: UInt32) -> Set<Data32> {
hashByNumber[number] ?? Set()
}

public func add(state: StateRef) {
stateByBlockHash[state.value.lastBlockHash] = state
hashByTimeslot[state.value.timeslot, default: Set()].insert(state.value.lastBlockHash)
Expand All @@ -73,6 +86,13 @@ extension InMemoryDataProvider: BlockchainDataProviderProtocol {
public func add(block: BlockRef) {
blockByHash[block.hash] = block
hashByTimeslot[block.header.timeslot, default: Set()].insert(block.hash)
let blockNumber = if let number = numberByHash[block.header.parentHash] {
number + 1
} else {
UInt32(0)
}
numberByHash[block.hash] = blockNumber
hashByNumber[blockNumber, default: Set()].insert(block.hash)
}

public func setFinalizedHead(hash: Data32) {
Expand All @@ -92,9 +112,18 @@ extension InMemoryDataProvider: BlockchainDataProviderProtocol {
public func remove(hash: Data32) {
let timeslot = blockByHash[hash]?.header.timeslot ?? stateByBlockHash[hash]?.value.timeslot
stateByBlockHash.removeValue(forKey: hash)
blockByHash.removeValue(forKey: hash)

if let timeslot {
hashByTimeslot[timeslot]?.remove(hash)
}

let number = numberByHash.removeValue(forKey: hash)

if let number {
hashByNumber[number]?.remove(hash)
}

heads.remove(hash)
}
}
9 changes: 9 additions & 0 deletions Blockchain/Sources/Blockchain/Types/BlockRef.swift
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,12 @@ extension BlockRef: Codable {
try value.encode(to: encoder)
}
}

extension BlockRef {
public static func dummy(config: ProtocolConfigRef, parent: BlockRef) -> BlockRef {
dummy(config: config).mutate {
$0.header.unsigned.parentHash = parent.hash
$0.header.unsigned.timeslot = parent.header.timeslot + 1
}
}
}
14 changes: 14 additions & 0 deletions Blockchain/Sources/Blockchain/Types/StateRef.swift
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,17 @@ extension StateRef: Codable {
try value.encode(to: encoder)
}
}

extension StateRef {
public static func dummy(config: ProtocolConfigRef, block: BlockRef) -> StateRef {
dummy(config: config).mutate {
$0.recentHistory.items.safeAppend(RecentHistory.HistoryItem(
headerHash: block.hash,
mmr: MMR([]),
stateRoot: Data32(),
workReportHashes: try! ConfigLimitedSizeArray(config: config)
))
$0.timeslot = block.header.timeslot + 1
}
}
}
4 changes: 2 additions & 2 deletions Blockchain/Sources/Blockchain/Validator/BlockAuthor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public final class BlockAuthor: ServiceBase2, @unchecked Sendable {
) async throws
-> BlockRef
{
let parentHash = dataProvider.bestHead
let parentHash = await dataProvider.bestHead.hash

logger.trace("creating new block for timeslot: \(timeslot) with parent hash: \(parentHash)")

Expand Down Expand Up @@ -175,7 +175,7 @@ public final class BlockAuthor: ServiceBase2, @unchecked Sendable {
tickets.value = []
let timeslot = epoch.epochToTimeslotIndex(config: config)

let bestHead = dataProvider.bestHead
let bestHead = await dataProvider.bestHead.hash
let state = try await dataProvider.getState(hash: bestHead)

// simulate next block to determine the block authors for next epoch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,13 @@ public final class ExtrinsicPoolService: ServiceBase, @unchecked Sendable {
// Safrole VRF commitments only changes every epoch
// and we should never receive tickets at very beginning and very end of an epoch
// so it is safe to use best head state without worrying about forks or edge cases
let state = try await dataProvider.getState(hash: dataProvider.bestHead)
let state = try await dataProvider.getBestState()
try await storage.update(state: state, config: config)
await storage.add(tickets: tickets.items)
}

private func on(safroleTicketsReceived tickets: RuntimeEvents.SafroleTicketsReceived) async throws {
let state = try await dataProvider.getState(hash: dataProvider.bestHead)
let state = try await dataProvider.getBestState()

try await storage.update(state: state, config: config)
await storage.add(tickets: tickets.items, config: config)
Expand Down
Loading

0 comments on commit 361bd57

Please sign in to comment.