From d1bbfdba574de1e6faffec867db533fa8c845b7d Mon Sep 17 00:00:00 2001 From: MacOMNI <414294494@qq.com> Date: Wed, 18 Dec 2024 11:38:33 +0800 Subject: [PATCH] update workpackagepool --- .../RuntimeProtocols/Guaranteeing.swift | 8 +- .../RuntimeProtocols/RuntimeEvents.swift | 10 ++ .../Validator/GuaranteeingService.swift | 87 +++++++++++++- .../Blockchain/Validator/SafroleService.swift | 35 ++++++ .../Blockchain/Validator/ServiceBase2.swift | 15 +++ .../Validator/WorkPackagePoolService.swift | 111 ++++++++++++++++++ .../WorkPackagePoolServiceTests.swift | 60 ++++++++++ 7 files changed, 320 insertions(+), 6 deletions(-) create mode 100644 Blockchain/Sources/Blockchain/Validator/WorkPackagePoolService.swift create mode 100644 Blockchain/Tests/BlockchainTests/WorkPackagePoolServiceTests.swift diff --git a/Blockchain/Sources/Blockchain/RuntimeProtocols/Guaranteeing.swift b/Blockchain/Sources/Blockchain/RuntimeProtocols/Guaranteeing.swift index 3f0bb05f..a3cbe5f3 100644 --- a/Blockchain/Sources/Blockchain/RuntimeProtocols/Guaranteeing.swift +++ b/Blockchain/Sources/Blockchain/RuntimeProtocols/Guaranteeing.swift @@ -63,11 +63,11 @@ extension Guaranteeing { } } - public static func toCoreAssignment(_ source: [UInt32], n: UInt32, max: UInt32) -> [CoreIndex] { + public func toCoreAssignment(_ source: [UInt32], n: UInt32, max: UInt32) -> [CoreIndex] { source.map { CoreIndex(($0 + n) % max) } } - public static func getCoreAssignment(config: ProtocolConfigRef, randomness: Data32, timeslot: TimeslotIndex) -> [CoreIndex] { + public func getCoreAssignment(config: ProtocolConfigRef, randomness: Data32, timeslot: TimeslotIndex) -> [CoreIndex] { var source = Array(repeating: UInt32(0), count: config.value.totalNumberOfValidators) for i in 0 ..< config.value.totalNumberOfValidators { source[i] = UInt32(config.value.totalNumberOfCores * i / config.value.totalNumberOfValidators) @@ -94,14 +94,14 @@ extension Guaranteeing { > { let coreAssignmentRotationPeriod = UInt32(config.value.coreAssignmentRotationPeriod) - let currentCoreAssignment = Self.getCoreAssignment(config: config, randomness: entropyPool.t2, timeslot: timeslot) + let currentCoreAssignment = getCoreAssignment(config: config, randomness: entropyPool.t2, timeslot: timeslot) let currentCoreKeys = withoutOffenders(keys: currentValidators.map(\.ed25519)) let isEpochChanging = (timeslot % UInt32(config.value.epochLength)) < coreAssignmentRotationPeriod let previousRandomness = isEpochChanging ? entropyPool.t3 : entropyPool.t2 let previousValidators = isEpochChanging ? previousValidators : currentValidators - let previousCoreAssignment = Self.getCoreAssignment( + let previousCoreAssignment = getCoreAssignment( config: config, randomness: previousRandomness, timeslot: timeslot - coreAssignmentRotationPeriod diff --git a/Blockchain/Sources/Blockchain/RuntimeProtocols/RuntimeEvents.swift b/Blockchain/Sources/Blockchain/RuntimeProtocols/RuntimeEvents.swift index 2d4719ae..7ea03bea 100644 --- a/Blockchain/Sources/Blockchain/RuntimeProtocols/RuntimeEvents.swift +++ b/Blockchain/Sources/Blockchain/RuntimeProtocols/RuntimeEvents.swift @@ -31,4 +31,14 @@ public enum RuntimeEvents { public struct BlockAuthored: Event { public let block: BlockRef } + + // New WorkPackagesGenerated by Guaranteeing Service + public struct WorkPackagesGenerated: Event { + public let items: [WorkPackageAndOutput] + } + + // New GuaranteeGenerated by Guaranteeing Service + public struct GuaranteeGenerated: Event { + public let items: [WorkPackageAndOutput] + } } diff --git a/Blockchain/Sources/Blockchain/Validator/GuaranteeingService.swift b/Blockchain/Sources/Blockchain/Validator/GuaranteeingService.swift index b893bd6f..34306a61 100644 --- a/Blockchain/Sources/Blockchain/Validator/GuaranteeingService.swift +++ b/Blockchain/Sources/Blockchain/Validator/GuaranteeingService.swift @@ -1,4 +1,5 @@ import Foundation +import Synchronization import TracingUtils import Utils @@ -13,6 +14,8 @@ public final class GuaranteeingService: ServiceBase2, @unchecked Sendable { private let keystore: KeyStore private let runtime: Runtime private let extrinsicPool: ExtrinsicPoolService + private let workPackagePool: WorkPackagePoolService + private let guarantees: ThreadSafeContainer<[RuntimeEvents.GuaranteeGenerated]> = .init([]) public init( config: ProtocolConfigRef, @@ -27,9 +30,89 @@ public final class GuaranteeingService: ServiceBase2, @unchecked Sendable { self.keystore = keystore self.runtime = runtime self.extrinsicPool = extrinsicPool + workPackagePool = await WorkPackagePoolService(config: config, dataProvider: dataProvider, eventBus: eventBus) + super.init(id: "GuaranteeingService", config: config, eventBus: eventBus, scheduler: scheduler) - super.init(id: "BlockAuthor", config: config, eventBus: eventBus, scheduler: scheduler) + await subscribe(RuntimeEvents.GuaranteeGenerated.self, id: "GuaranteeingService.GuaranteeGenerated") { [weak self] event in + try await self?.onGuaranteeGenerated(event: event) + } } - public func on(genesis _: StateRef) async {} + public func on(genesis _: StateRef) async { + let nowTimeslot = timeProvider.getTime().timeToTimeslot(config: config) + let epoch = nowTimeslot.timeslotToEpochIndex(config: config) + await onGuaranteeingEpoch(epoch: epoch) + } + + public func onSyncCompleted() async { + scheduleForNextEpoch("GuaranteeingService.scheduleForNextEpoch") { [weak self] epoch in + await self?.onGuaranteeingEpoch(epoch: epoch) + } + } + + public func scheduleGuaranteeTasks(epoch _: EpochIndex) async throws { + // let timeslot = epoch.epochToTimeslotIndex(config: config) + let state = try await dataProvider.getState(hash: dataProvider.bestHead.hash) + // The most recent block’s τimeslot. + let timeslot = state.value.timeslot + let currentCoreAssignment = state.value.getCoreAssignment( + config: config, + randomness: state.value.entropyPool.t2, + timeslot: timeslot + ) + let coreCount = currentCoreAssignment.count + for coreIndex in 0 ..< coreCount { + let core = currentCoreAssignment[coreIndex] + // workpackagepoolService workpackagepoolService same as ExtrinsicPoolService + let workPackage = await workPackagePool.getWorkPackage(for: core) +// let workPackage = WorkPackage.dummy(config: config) + // validateWorkPackage & mock wp + let validateWP = try validateWorkPackage(workPackage) + if validateWP { + let workReport = try await createWorkReport(for: workPackage, core: core) + // sign work report + // eventbus + } else { + logger.error("WorkPackage validation failed") + } + + // + } + } + + private func createWorkReport(for _: WorkPackage, core _: CoreIndex) async throws -> WorkReport { + // TODO: + // RefineInvocation ouput + // outdata -> workreport struct + WorkReport.dummy(config: config) + } + + private func validateWorkPackage(_: WorkPackage) throws -> Bool { + // Add validate func + true + } + + private func onGuaranteeingEpoch(epoch: EpochIndex) async { + logger.debug("Processing guarantees for epoch \(epoch)") + await withSpan("GuaranteeingService.onBeforeEpoch", logger: logger) { _ in + guarantees.value = [] + let timeslot = epoch.epochToTimeslotIndex(config: config) + + let bestHead = await dataProvider.bestHead + let bestHeadEpoch = bestHead.timeslot.timeslotToEpochIndex(config: config) + if bestHeadEpoch >= epoch { + logger.error("Attempting to process guarantees for epoch \(epoch) but best head epoch is \(bestHeadEpoch)") + return + } + + let state = try await dataProvider.getState(hash: bestHead.hash) + let coreAuthorizationPool = state.value.coreAuthorizationPool + + // await scheduleGuaranteeTasks() + } + } + + private func onGuaranteeGenerated(event: RuntimeEvents.GuaranteeGenerated) async throws { + guarantees.write { $0.append(event) } + } } diff --git a/Blockchain/Sources/Blockchain/Validator/SafroleService.swift b/Blockchain/Sources/Blockchain/Validator/SafroleService.swift index b6879b07..3aa434ed 100644 --- a/Blockchain/Sources/Blockchain/Validator/SafroleService.swift +++ b/Blockchain/Sources/Blockchain/Validator/SafroleService.swift @@ -15,6 +15,19 @@ public struct TicketItemAndOutput: Comparable, Sendable, Codable { } } +public struct WorkPackageAndOutput: Comparable, Sendable, Codable { + public let guarantee: ExtrinsicGuarantees.GuaranteeItem + public let output: Data32 + + public static func < (lhs: WorkPackageAndOutput, rhs: WorkPackageAndOutput) -> Bool { + lhs.output < rhs.output + } + + public static func == (lhs: WorkPackageAndOutput, rhs: WorkPackageAndOutput) -> Bool { + lhs.output == rhs.output && lhs.guarantee == rhs.guarantee + } +} + public final class SafroleService: ServiceBase, @unchecked Sendable { private let keystore: KeyStore private let ringContext: Bandersnatch.RingContext @@ -118,4 +131,26 @@ public final class SafroleService: ServiceBase, @unchecked Sendable { return tickets } + + public static func generateWorkPackages( + core _: CoreIndex, + validators: [ValidatorKey], + entropy: Data32, + ringContext: Bandersnatch.RingContext, + secret: Bandersnatch.SecretKey, + idx: UInt32 + ) throws -> [WorkPackageAndOutput] { + let pubkeys = try validators.map { + try Bandersnatch.PublicKey(data: $0.bandersnatch) + } + + let prover = Bandersnatch.Prover(sercret: secret, ring: pubkeys, proverIdx: UInt(idx), ctx: ringContext) + + var vrfInputData = SigningContext.entropyInputData(entropy: entropy) + + var wps: [WorkPackageAndOutput] = [] + // TODO: generateWorkPackages + + return wps + } } diff --git a/Blockchain/Sources/Blockchain/Validator/ServiceBase2.swift b/Blockchain/Sources/Blockchain/Validator/ServiceBase2.swift index 2fb0cdb9..26d83649 100644 --- a/Blockchain/Sources/Blockchain/Validator/ServiceBase2.swift +++ b/Blockchain/Sources/Blockchain/Validator/ServiceBase2.swift @@ -85,4 +85,19 @@ public class ServiceBase2: ServiceBase, @unchecked Sendable { } } } + + @discardableResult + public func scheduleForGuaranteeing(_ id: UniqueId, timeslot: TimeslotIndex, + task: @escaping @Sendable () async -> Void) -> Cancellable + { + let scheduleTime = config.scheduleTimeForGuaranteeing(timeslot: timeslot) + let now = timeProvider.getTimeInterval() + var delay = scheduleTime - now + if delay < 0 { + logger.info("\(id): late guaranteeing for timeslot \(timeslot), expectedDelay \(delay)") + delay = 0 + } + logger.info("\(id): scheduling guaranteeing for timeslot \(timeslot) in \(delay)") + return schedule(id: id, delay: delay, task: task) + } } diff --git a/Blockchain/Sources/Blockchain/Validator/WorkPackagePoolService.swift b/Blockchain/Sources/Blockchain/Validator/WorkPackagePoolService.swift new file mode 100644 index 00000000..61d6a182 --- /dev/null +++ b/Blockchain/Sources/Blockchain/Validator/WorkPackagePoolService.swift @@ -0,0 +1,111 @@ +import TracingUtils +import Utils + +private actor WorkPackageStorage { + var logger: Logger! + var pendingWorkPackages: SortedUniqueArray = .init() + var epoch: EpochIndex = 0 + // add core + // var core: CoreIndex = 0 + let ringContext: Bandersnatch.RingContext + var verifier: Bandersnatch.Verifier! + var entropy: Data32 = .init() + + init(ringContext: Bandersnatch.RingContext) { + self.ringContext = ringContext + } + + func setLogger(_ logger: Logger) { + self.logger = logger + } + + func update(state: StateRef, config: ProtocolConfigRef) throws { + // change epoch to core? + let newEpoch = state.value.timeslot.timeslotToEpochIndex(config: config) + if epoch != newEpoch { + logger.info("Updating verifier for epoch \(newEpoch)") + // TODO: change to core verifier + let commitment = try Bandersnatch.RingCommitment(data: state.value.safroleState.ticketsVerifier) + verifier = Bandersnatch.Verifier(ctx: ringContext, commitment: commitment) + epoch = newEpoch + entropy = state.value.entropyPool.t3 + pendingWorkPackages.removeAll() + } + } + + func add(packages: [WorkPackageAndOutput], config: ProtocolConfigRef) { + for package in packages { + guard validatePackage(package, config: config) else { + logger.warning("Invalid work package: \(package)") + continue + } + pendingWorkPackages.insert(package) + } + } + + private func validatePackage(_: WorkPackageAndOutput, config _: ProtocolConfigRef) -> Bool { + // TODO: add validate logic + true + } + + func removeWorkPackages(workPackages: [WorkPackageAndOutput]) { + pendingWorkPackages.remove { guarantee in + workPackages.contains { $0.guarantee == guarantee.guarantee } + } + } + + func getWorkPackage(for _: CoreIndex) -> SortedUniqueArray { + // return pendingWorkPackages.filter { $0.guarantee.workReport.coreIndex == core } + pendingWorkPackages + } +} + +public final class WorkPackagePoolService: ServiceBase, @unchecked Sendable { + private var storage: WorkPackageStorage + private let dataProvider: BlockchainDataProvider + + public init( + config: ProtocolConfigRef, + dataProvider: BlockchainDataProvider, + eventBus: EventBus + ) async { + self.dataProvider = dataProvider + + let ringContext = try! Bandersnatch.RingContext(size: UInt(config.value.totalNumberOfValidators)) + storage = WorkPackageStorage(ringContext: ringContext) + + super.init(id: "WorkPackagePoolService", config: config, eventBus: eventBus) + await storage.setLogger(logger) + + await subscribe(RuntimeEvents.WorkPackagesGenerated.self, id: "WorkPackagePool.WorkPackagesGenerated") { [weak self] event in + try await self?.on(workPackagesGenerated: event) + } + + await subscribe(RuntimeEvents.BlockFinalized.self, id: "WorkPackagePool.BlockFinalized") { [weak self] event in + try await self?.on(blockFinalized: event) + } + } + + private func on(workPackagesGenerated event: RuntimeEvents.WorkPackagesGenerated) async throws { + let state = try await dataProvider.getBestState() + try await storage.update(state: state, config: config) + await storage.add(packages: event.items, config: config) + } + + private func on(blockFinalized event: RuntimeEvents.BlockFinalized) async throws { + let block = try await dataProvider.getBlock(hash: event.hash) +// await storage.removeWorkPackages(workPackages: block.extrinsic.reports.guarantees.) + } + + public func update(state: StateRef, config: ProtocolConfigRef) async throws { + try await storage.update(state: state, config: config) + } + + public func addWorkPackages(packages: [WorkPackageAndOutput]) async throws { + await storage.add(packages: packages, config: config) + } + + public func getWorkPackage(for core: CoreIndex) async -> SortedUniqueArray { + await storage.getWorkPackage(for: core) + } +} diff --git a/Blockchain/Tests/BlockchainTests/WorkPackagePoolServiceTests.swift b/Blockchain/Tests/BlockchainTests/WorkPackagePoolServiceTests.swift new file mode 100644 index 00000000..f8ddceac --- /dev/null +++ b/Blockchain/Tests/BlockchainTests/WorkPackagePoolServiceTests.swift @@ -0,0 +1,60 @@ +import Foundation +import Testing +import TracingUtils +import Utils + +@testable import Blockchain + +struct WorkPackagePoolServiceTests { + let config: ProtocolConfigRef + let timeProvider: MockTimeProvider + let dataProvider: BlockchainDataProvider + let eventBus: EventBus + let keystore: KeyStore + let storeMiddleware: StoreMiddleware + let workPackagecPoolService: WorkPackagePoolService + + let ringContext: Bandersnatch.RingContext + + init() async throws { + config = ProtocolConfigRef.dev.mutate { config in + config.ticketEntriesPerValidator = 4 + } + timeProvider = MockTimeProvider(time: 1000) + + let (genesisState, genesisBlock) = try State.devGenesis(config: config) + dataProvider = try await BlockchainDataProvider(InMemoryDataProvider(genesisState: genesisState, genesisBlock: genesisBlock)) + + storeMiddleware = StoreMiddleware() + eventBus = EventBus(eventMiddleware: .serial(Middleware(storeMiddleware), .noError), handlerMiddleware: .noError) + + keystore = try await DevKeyStore(devKeysCount: config.value.totalNumberOfValidators) + + workPackagecPoolService = await WorkPackagePoolService(config: config, dataProvider: dataProvider, eventBus: eventBus) + try await workPackagecPoolService.addWorkPackages(packages: []) + ringContext = try Bandersnatch.RingContext(size: UInt(config.value.totalNumberOfValidators)) + + // setupTestLogger() + } + + @Test + func testAddPendingWorkPackage() async throws { + let state = try await dataProvider.getBestState() + + var allWorkPackages = SortedUniqueArray() + + for (i, validatorKey) in state.value.nextValidators.enumerated() { + let secretKey = try await keystore.get(Bandersnatch.self, publicKey: Bandersnatch.PublicKey(data: validatorKey.bandersnatch))! + // generate work package + // eventBus.publish + // Wait for the event to be processed + await storeMiddleware.wait() + } + } + + @Test + func testAddAndInvalidWorkPackage() async throws { + let state = try await dataProvider.getBestState() + let validatorKey = state.value.currentValidators[0] + } +}