Skip to content

Commit

Permalink
DataAvailability (#252)
Browse files Browse the repository at this point in the history
* DataStore

* connect

* two da

* work package bundle

* fix tests

* fix
  • Loading branch information
xlc authored Dec 18, 2024
1 parent 25e6b5a commit 1a3e704
Show file tree
Hide file tree
Showing 13 changed files with 266 additions and 63 deletions.
46 changes: 46 additions & 0 deletions Blockchain/Sources/Blockchain/DataStore/DataStore.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import Foundation

public protocol DataStoreProtocol: Sendable {
func read(path: URL) async throws -> Data?
func write(path: URL, value: Data) async throws
func delete(path: URL) async throws
}

public final class DataStore: Sendable {
private let impl: DataStoreProtocol
private let basePath: URL

public init(_ impl: DataStoreProtocol, basePath: URL) {
self.impl = impl
self.basePath = basePath
}

// partitioning files so that we won't have too many files in a single directory
private func getPath(path: String, name: String) -> URL {
var ret = basePath
ret.append(component: path)
var name = name[...]
if let first = name.first {
ret.append(component: String(first), directoryHint: .isDirectory)
name = name.dropFirst()
}
if let second = name.first {
ret.append(component: String(second), directoryHint: .isDirectory)
name = name.dropFirst()
}
ret.append(component: String(name), directoryHint: .notDirectory)
return ret
}

public func read(path: String, name: String) async throws -> Data? {
try await impl.read(path: getPath(path: path, name: name))
}

public func write(path: String, name: String, value: Data) async throws {
try await impl.write(path: getPath(path: path, name: name), value: value)
}

public func delete(path: String, name: String) async throws {
try await impl.delete(path: getPath(path: path, name: name))
}
}
23 changes: 23 additions & 0 deletions Blockchain/Sources/Blockchain/DataStore/FilesystemDataStore.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import Foundation

public actor FilesystemDataStore: DataStoreProtocol {
private let fileManager: FileManager

public init(fileManager: FileManager = .default) {
self.fileManager = fileManager
}

public func read(path: URL) async throws -> Data? {
try Data(contentsOf: path)
}

public func write(path: URL, value: Data) async throws {
let base = path.deletingLastPathComponent()
try fileManager.createDirectory(at: base, withIntermediateDirectories: true)
try value.write(to: path)
}

public func delete(path: URL) async throws {
try fileManager.removeItem(at: path)
}
}
19 changes: 19 additions & 0 deletions Blockchain/Sources/Blockchain/DataStore/InMemoryDataStore.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import Foundation

public actor InMemoryDataStore: DataStoreProtocol {
private var store: [URL: Data] = [:]

public init() {}

public func read(path: URL) async throws -> Data? {
store[path]
}

public func write(path: URL, value: Data) async throws {
store[path] = value
}

public func delete(path: URL) async throws {
store[path] = nil
}
}
10 changes: 10 additions & 0 deletions Blockchain/Sources/Blockchain/Types/WorkPackageBundle.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import Foundation
import Utils

// All the necessary data to audit a work package. Stored in audits DA
public struct WorkPackageBundle: Sendable, Equatable, Codable {
public var workPackage: WorkPackage
public var extrinsic: [Data]
public var importSegments: [[Data]]
public var justifications: [[Data]]
}
54 changes: 54 additions & 0 deletions Blockchain/Sources/Blockchain/Validator/DataAvailability.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import Foundation
import TracingUtils
import Utils

enum DataAvailabilityStore: String, Sendable {
case imports
case audits
}

public final class DataAvailability: ServiceBase2, @unchecked Sendable {
private let dataProvider: BlockchainDataProvider
private let dataStore: DataStore

public init(
config: ProtocolConfigRef,
eventBus: EventBus,
scheduler: Scheduler,
dataProvider: BlockchainDataProvider,
dataStore: DataStore
) async {
self.dataProvider = dataProvider
self.dataStore = dataStore

super.init(id: "DataAvailability", config: config, eventBus: eventBus, scheduler: scheduler)

scheduleForNextEpoch("BlockAuthor.scheduleForNextEpoch") { [weak self] epoch in
await self?.purge(epoch: epoch)
}
}

public func purge(epoch _: EpochIndex) async {
// TODO: purge data
// GP 14.3.1
// Guarantors are required to erasure-code and distribute two data sets: one blob, the auditable work-package containing
// the encoded work-package, extrinsic data and self-justifying imported segments which is placed in the short-term Audit
// da store and a second set of exported-segments data together with the Paged-Proofs metadata. Items in the first store
// are short-lived; assurers are expected to keep them only until finality of the block in which the availability of the work-
// result’s work-package is assured. Items in the second, meanwhile, are long-lived and expected to be kept for a minimum
// of 28 days (672 complete epochs) following the reporting of the work-report.
}

public func fetchSegment(root _: Data32, index _: UInt16) async throws -> Data? {
// TODO: fetch segment
nil
}

public func exportSegments(data _: [Data]) async throws {
// TODO: export segments
}

public func distributeWorkpackageBundle(bundle _: WorkPackageBundle) async throws {
// TODO: distribute workpackage bundle to audits DA
}
}
12 changes: 11 additions & 1 deletion Blockchain/Sources/Blockchain/Validator/ValidatorService.swift
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ public final class ValidatorService: Sendable {
private let safrole: SafroleService
private let extrinsicPool: ExtrinsicPoolService
private let blockAuthor: BlockAuthor
private let dataAvailability: DataAvailability

public init(
blockchain: Blockchain,
keystore: KeyStore,
eventBus: EventBus,
scheduler: Scheduler,
dataProvider: BlockchainDataProvider
dataProvider: BlockchainDataProvider,
dataStore: DataStore
) async {
self.blockchain = blockchain
self.keystore = keystore
Expand All @@ -38,6 +40,14 @@ public final class ValidatorService: Sendable {
scheduler: scheduler,
extrinsicPool: extrinsicPool
)

dataAvailability = await DataAvailability(
config: blockchain.config,
eventBus: eventBus,
scheduler: scheduler,
dataProvider: dataProvider,
dataStore: dataStore
)
}

public func onSyncCompleted() async {
Expand Down
3 changes: 3 additions & 0 deletions Blockchain/Tests/BlockchainTests/BlockchainServices.swift
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import Blockchain
import Foundation
import Utils

class BlockchainServices {
let config: ProtocolConfigRef
let timeProvider: MockTimeProvider
let dataProvider: BlockchainDataProvider
let dataStore: DataStore
let eventBus: EventBus
let scheduler: MockScheduler
let keystore: DevKeyStore
Expand All @@ -30,6 +32,7 @@ class BlockchainServices {
self.genesisBlock = genesisBlock
self.genesisState = genesisState
dataProvider = try! await BlockchainDataProvider(InMemoryDataProvider(genesisState: genesisState, genesisBlock: genesisBlock))
dataStore = DataStore(InMemoryDataStore(), basePath: URL(fileURLWithPath: "/tmp/boka-test-data"))

storeMiddleware = StoreMiddleware()
eventBus = EventBus(eventMiddleware: .serial(Middleware(storeMiddleware), .noError), handlerMiddleware: .noError)
Expand Down
3 changes: 2 additions & 1 deletion Blockchain/Tests/BlockchainTests/ValidatorServiceTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ struct ValidatorServiceTests {
keystore: services.keystore,
eventBus: services.eventBus,
scheduler: services.scheduler,
dataProvider: services.dataProvider
dataProvider: services.dataProvider,
dataStore: services.dataStore
)
await validatorService.onSyncCompleted()
return (services, validatorService)
Expand Down
11 changes: 9 additions & 2 deletions Boka/Sources/Boka.swift
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,12 @@ struct Boka: AsyncParsableCommand {
return .rocksDB(path: path)
} ?? .inMemory

let dataStore: DataStoreKind = basePath.map {
var path = URL(fileURLWithPath: $0)
path.append(path: "da")
return .filesystem(path: path)
} ?? .inMemory

logger.info("Peers: \(peers)")

if validator {
Expand Down Expand Up @@ -165,13 +171,14 @@ struct Boka: AsyncParsableCommand {
handlerMiddleware: .tracing(prefix: "Handler")
)

let config = Node.Config(
let config = Config(
rpc: rpcConfig,
network: networkConfig,
peers: peers,
local: local,
name: name,
database: database
database: database,
dataStore: dataStore
)

let node: Node = if validator {
Expand Down
85 changes: 85 additions & 0 deletions Node/Sources/Node/Config.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import Blockchain
import Database
import Foundation
import Networking
import RPC
import TracingUtils
import Utils

public typealias RPCConfig = Server.Config
public typealias NetworkConfig = Network.Config

private let logger = Logger(label: "config")

public enum Database {
case inMemory
case rocksDB(path: URL)

public func open(chainspec: ChainSpec) async throws -> BlockchainDataProvider {
switch self {
case let .rocksDB(path):
logger.info("Using RocksDB backend at \(path.absoluteString)")
let backend = try await RocksDBBackend(
path: path,
config: chainspec.getConfig(),
genesisBlock: chainspec.getBlock(),
genesisStateData: chainspec.getState()
)
return try await BlockchainDataProvider(backend)
case .inMemory:
logger.info("Using in-memory backend")
let genesisBlock = try chainspec.getBlock()
let genesisStateData = try chainspec.getState()
let backend = try StateBackend(InMemoryBackend(), config: chainspec.getConfig(), rootHash: Data32())
try await backend.writeRaw(Array(genesisStateData))
let genesisState = try await State(backend: backend)
let genesisStateRef = StateRef(genesisState)
return try await BlockchainDataProvider(InMemoryDataProvider(genesisState: genesisStateRef, genesisBlock: genesisBlock))
}
}
}

public enum DataStoreKind {
case inMemory
case filesystem(path: URL)

func create() -> DataStore {
switch self {
case let .filesystem(path):
logger.info("Using filesystem data store at \(path.absoluteString)")
let dataStore = FilesystemDataStore()
return DataStore(dataStore, basePath: path)
case .inMemory:
logger.info("Using in-memory data store")
return DataStore(InMemoryDataStore(), basePath: URL(filePath: "/tmp/boka"))
}
}
}

public struct Config {
public var rpc: RPCConfig?
public var network: NetworkConfig
public var peers: [NetAddr]
public var local: Bool
public var name: String?
public var database: Database
public var dataStore: DataStoreKind

public init(
rpc: RPCConfig?,
network: NetworkConfig,
peers: [NetAddr] = [],
local: Bool = false,
name: String? = nil,
database: Database = .inMemory,
dataStore: DataStoreKind = .inMemory
) {
self.rpc = rpc
self.network = network
self.peers = peers
self.local = local
self.name = name
self.database = database
self.dataStore = dataStore
}
}
56 changes: 0 additions & 56 deletions Node/Sources/Node/Node.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,63 +8,7 @@ import Utils

private let logger = Logger(label: "node")

public typealias RPCConfig = Server.Config
public typealias NetworkConfig = Network.Config

public enum Database {
case inMemory
case rocksDB(path: URL)

public func open(chainspec: ChainSpec) async throws -> BlockchainDataProvider {
switch self {
case let .rocksDB(path):
logger.info("Using RocksDB backend at \(path.absoluteString)")
let backend = try await RocksDBBackend(
path: path,
config: chainspec.getConfig(),
genesisBlock: chainspec.getBlock(),
genesisStateData: chainspec.getState()
)
return try await BlockchainDataProvider(backend)
case .inMemory:
logger.info("Using in-memory backend")
let genesisBlock = try chainspec.getBlock()
let genesisStateData = try chainspec.getState()
let backend = try StateBackend(InMemoryBackend(), config: chainspec.getConfig(), rootHash: Data32())
try await backend.writeRaw(Array(genesisStateData))
let genesisState = try await State(backend: backend)
let genesisStateRef = StateRef(genesisState)
return try await BlockchainDataProvider(InMemoryDataProvider(genesisState: genesisStateRef, genesisBlock: genesisBlock))
}
}
}

public class Node {
public struct Config {
public var rpc: RPCConfig?
public var network: NetworkConfig
public var peers: [NetAddr]
public var local: Bool
public var name: String?
public var database: Database

public init(
rpc: RPCConfig?,
network: NetworkConfig,
peers: [NetAddr] = [],
local: Bool = false,
name: String? = nil,
database: Database = .inMemory
) {
self.rpc = rpc
self.network = network
self.peers = peers
self.local = local
self.name = name
self.database = database
}
}

public let config: Config
public let blockchain: Blockchain
public let rpcServer: Server?
Expand Down
Loading

0 comments on commit 1a3e704

Please sign in to comment.