Skip to content

Commit

Permalink
update workpackagepool
Browse files Browse the repository at this point in the history
  • Loading branch information
MacOMNI committed Dec 18, 2024
1 parent 52468e7 commit d1bbfdb
Show file tree
Hide file tree
Showing 7 changed files with 320 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@ extension Guaranteeing {
}
}

public static func toCoreAssignment(_ source: [UInt32], n: UInt32, max: UInt32) -> [CoreIndex] {
public func toCoreAssignment(_ source: [UInt32], n: UInt32, max: UInt32) -> [CoreIndex] {
source.map { CoreIndex(($0 + n) % max) }
}

public static func getCoreAssignment(config: ProtocolConfigRef, randomness: Data32, timeslot: TimeslotIndex) -> [CoreIndex] {
public func getCoreAssignment(config: ProtocolConfigRef, randomness: Data32, timeslot: TimeslotIndex) -> [CoreIndex] {
var source = Array(repeating: UInt32(0), count: config.value.totalNumberOfValidators)
for i in 0 ..< config.value.totalNumberOfValidators {
source[i] = UInt32(config.value.totalNumberOfCores * i / config.value.totalNumberOfValidators)
Expand All @@ -94,14 +94,14 @@ extension Guaranteeing {
> {
let coreAssignmentRotationPeriod = UInt32(config.value.coreAssignmentRotationPeriod)

let currentCoreAssignment = Self.getCoreAssignment(config: config, randomness: entropyPool.t2, timeslot: timeslot)
let currentCoreAssignment = getCoreAssignment(config: config, randomness: entropyPool.t2, timeslot: timeslot)
let currentCoreKeys = withoutOffenders(keys: currentValidators.map(\.ed25519))

let isEpochChanging = (timeslot % UInt32(config.value.epochLength)) < coreAssignmentRotationPeriod
let previousRandomness = isEpochChanging ? entropyPool.t3 : entropyPool.t2
let previousValidators = isEpochChanging ? previousValidators : currentValidators

let previousCoreAssignment = Self.getCoreAssignment(
let previousCoreAssignment = getCoreAssignment(
config: config,
randomness: previousRandomness,
timeslot: timeslot - coreAssignmentRotationPeriod
Expand Down
10 changes: 10 additions & 0 deletions Blockchain/Sources/Blockchain/RuntimeProtocols/RuntimeEvents.swift
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,14 @@ public enum RuntimeEvents {
public struct BlockAuthored: Event {
public let block: BlockRef
}

// New WorkPackagesGenerated by Guaranteeing Service
public struct WorkPackagesGenerated: Event {
public let items: [WorkPackageAndOutput]
}

// New GuaranteeGenerated by Guaranteeing Service
public struct GuaranteeGenerated: Event {
public let items: [WorkPackageAndOutput]
}
}
87 changes: 85 additions & 2 deletions Blockchain/Sources/Blockchain/Validator/GuaranteeingService.swift
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import Foundation
import Synchronization
import TracingUtils
import Utils

Expand All @@ -13,6 +14,8 @@ public final class GuaranteeingService: ServiceBase2, @unchecked Sendable {
private let keystore: KeyStore
private let runtime: Runtime
private let extrinsicPool: ExtrinsicPoolService
private let workPackagePool: WorkPackagePoolService
private let guarantees: ThreadSafeContainer<[RuntimeEvents.GuaranteeGenerated]> = .init([])

public init(
config: ProtocolConfigRef,
Expand All @@ -27,9 +30,89 @@ public final class GuaranteeingService: ServiceBase2, @unchecked Sendable {
self.keystore = keystore
self.runtime = runtime
self.extrinsicPool = extrinsicPool
workPackagePool = await WorkPackagePoolService(config: config, dataProvider: dataProvider, eventBus: eventBus)
super.init(id: "GuaranteeingService", config: config, eventBus: eventBus, scheduler: scheduler)

super.init(id: "BlockAuthor", config: config, eventBus: eventBus, scheduler: scheduler)
await subscribe(RuntimeEvents.GuaranteeGenerated.self, id: "GuaranteeingService.GuaranteeGenerated") { [weak self] event in
try await self?.onGuaranteeGenerated(event: event)
}
}

public func on(genesis _: StateRef) async {}
public func on(genesis _: StateRef) async {
let nowTimeslot = timeProvider.getTime().timeToTimeslot(config: config)
let epoch = nowTimeslot.timeslotToEpochIndex(config: config)
await onGuaranteeingEpoch(epoch: epoch)
}

public func onSyncCompleted() async {
scheduleForNextEpoch("GuaranteeingService.scheduleForNextEpoch") { [weak self] epoch in
await self?.onGuaranteeingEpoch(epoch: epoch)
}
}

public func scheduleGuaranteeTasks(epoch _: EpochIndex) async throws {
// let timeslot = epoch.epochToTimeslotIndex(config: config)
let state = try await dataProvider.getState(hash: dataProvider.bestHead.hash)
// The most recent block’s τimeslot.
let timeslot = state.value.timeslot
let currentCoreAssignment = state.value.getCoreAssignment(
config: config,
randomness: state.value.entropyPool.t2,
timeslot: timeslot
)
let coreCount = currentCoreAssignment.count
for coreIndex in 0 ..< coreCount {
let core = currentCoreAssignment[coreIndex]
// workpackagepoolService workpackagepoolService same as ExtrinsicPoolService
let workPackage = await workPackagePool.getWorkPackage(for: core)
// let workPackage = WorkPackage.dummy(config: config)
// validateWorkPackage & mock wp
let validateWP = try validateWorkPackage(workPackage)
if validateWP {
let workReport = try await createWorkReport(for: workPackage, core: core)
// sign work report
// eventbus
} else {
logger.error("WorkPackage validation failed")
}

//
}
}

private func createWorkReport(for _: WorkPackage, core _: CoreIndex) async throws -> WorkReport {
// TODO:
// RefineInvocation ouput
// outdata -> workreport struct
WorkReport.dummy(config: config)
}

private func validateWorkPackage(_: WorkPackage) throws -> Bool {
// Add validate func
true
}

private func onGuaranteeingEpoch(epoch: EpochIndex) async {
logger.debug("Processing guarantees for epoch \(epoch)")
await withSpan("GuaranteeingService.onBeforeEpoch", logger: logger) { _ in
guarantees.value = []
let timeslot = epoch.epochToTimeslotIndex(config: config)

let bestHead = await dataProvider.bestHead
let bestHeadEpoch = bestHead.timeslot.timeslotToEpochIndex(config: config)
if bestHeadEpoch >= epoch {
logger.error("Attempting to process guarantees for epoch \(epoch) but best head epoch is \(bestHeadEpoch)")
return
}

let state = try await dataProvider.getState(hash: bestHead.hash)
let coreAuthorizationPool = state.value.coreAuthorizationPool

// await scheduleGuaranteeTasks()
}
}

private func onGuaranteeGenerated(event: RuntimeEvents.GuaranteeGenerated) async throws {
guarantees.write { $0.append(event) }
}
}
35 changes: 35 additions & 0 deletions Blockchain/Sources/Blockchain/Validator/SafroleService.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,19 @@ public struct TicketItemAndOutput: Comparable, Sendable, Codable {
}
}

public struct WorkPackageAndOutput: Comparable, Sendable, Codable {
public let guarantee: ExtrinsicGuarantees.GuaranteeItem
public let output: Data32

public static func < (lhs: WorkPackageAndOutput, rhs: WorkPackageAndOutput) -> Bool {
lhs.output < rhs.output
}

public static func == (lhs: WorkPackageAndOutput, rhs: WorkPackageAndOutput) -> Bool {
lhs.output == rhs.output && lhs.guarantee == rhs.guarantee
}
}

public final class SafroleService: ServiceBase, @unchecked Sendable {
private let keystore: KeyStore
private let ringContext: Bandersnatch.RingContext
Expand Down Expand Up @@ -118,4 +131,26 @@ public final class SafroleService: ServiceBase, @unchecked Sendable {

return tickets
}

public static func generateWorkPackages(
core _: CoreIndex,
validators: [ValidatorKey],
entropy: Data32,
ringContext: Bandersnatch.RingContext,
secret: Bandersnatch.SecretKey,
idx: UInt32
) throws -> [WorkPackageAndOutput] {
let pubkeys = try validators.map {
try Bandersnatch.PublicKey(data: $0.bandersnatch)
}

let prover = Bandersnatch.Prover(sercret: secret, ring: pubkeys, proverIdx: UInt(idx), ctx: ringContext)

var vrfInputData = SigningContext.entropyInputData(entropy: entropy)

var wps: [WorkPackageAndOutput] = []
// TODO: generateWorkPackages

return wps
}
}
15 changes: 15 additions & 0 deletions Blockchain/Sources/Blockchain/Validator/ServiceBase2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,19 @@ public class ServiceBase2: ServiceBase, @unchecked Sendable {
}
}
}

@discardableResult
public func scheduleForGuaranteeing(_ id: UniqueId, timeslot: TimeslotIndex,
task: @escaping @Sendable () async -> Void) -> Cancellable
{
let scheduleTime = config.scheduleTimeForGuaranteeing(timeslot: timeslot)
let now = timeProvider.getTimeInterval()
var delay = scheduleTime - now
if delay < 0 {
logger.info("\(id): late guaranteeing for timeslot \(timeslot), expectedDelay \(delay)")
delay = 0
}
logger.info("\(id): scheduling guaranteeing for timeslot \(timeslot) in \(delay)")
return schedule(id: id, delay: delay, task: task)
}
}
111 changes: 111 additions & 0 deletions Blockchain/Sources/Blockchain/Validator/WorkPackagePoolService.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import TracingUtils
import Utils

private actor WorkPackageStorage {
var logger: Logger!
var pendingWorkPackages: SortedUniqueArray<WorkPackageAndOutput> = .init()
var epoch: EpochIndex = 0
// add core
// var core: CoreIndex = 0
let ringContext: Bandersnatch.RingContext
var verifier: Bandersnatch.Verifier!
var entropy: Data32 = .init()

init(ringContext: Bandersnatch.RingContext) {
self.ringContext = ringContext
}

func setLogger(_ logger: Logger) {
self.logger = logger
}

func update(state: StateRef, config: ProtocolConfigRef) throws {
// change epoch to core?
let newEpoch = state.value.timeslot.timeslotToEpochIndex(config: config)
if epoch != newEpoch {
logger.info("Updating verifier for epoch \(newEpoch)")
// TODO: change to core verifier
let commitment = try Bandersnatch.RingCommitment(data: state.value.safroleState.ticketsVerifier)
verifier = Bandersnatch.Verifier(ctx: ringContext, commitment: commitment)
epoch = newEpoch
entropy = state.value.entropyPool.t3
pendingWorkPackages.removeAll()
}
}

func add(packages: [WorkPackageAndOutput], config: ProtocolConfigRef) {
for package in packages {
guard validatePackage(package, config: config) else {
logger.warning("Invalid work package: \(package)")
continue
}
pendingWorkPackages.insert(package)
}
}

private func validatePackage(_: WorkPackageAndOutput, config _: ProtocolConfigRef) -> Bool {
// TODO: add validate logic
true
}

func removeWorkPackages(workPackages: [WorkPackageAndOutput]) {
pendingWorkPackages.remove { guarantee in
workPackages.contains { $0.guarantee == guarantee.guarantee }
}
}

func getWorkPackage(for _: CoreIndex) -> SortedUniqueArray<WorkPackageAndOutput> {
// return pendingWorkPackages.filter { $0.guarantee.workReport.coreIndex == core }
pendingWorkPackages
}
}

public final class WorkPackagePoolService: ServiceBase, @unchecked Sendable {
private var storage: WorkPackageStorage
private let dataProvider: BlockchainDataProvider

public init(
config: ProtocolConfigRef,
dataProvider: BlockchainDataProvider,
eventBus: EventBus
) async {
self.dataProvider = dataProvider

let ringContext = try! Bandersnatch.RingContext(size: UInt(config.value.totalNumberOfValidators))
storage = WorkPackageStorage(ringContext: ringContext)

super.init(id: "WorkPackagePoolService", config: config, eventBus: eventBus)
await storage.setLogger(logger)

await subscribe(RuntimeEvents.WorkPackagesGenerated.self, id: "WorkPackagePool.WorkPackagesGenerated") { [weak self] event in
try await self?.on(workPackagesGenerated: event)
}

await subscribe(RuntimeEvents.BlockFinalized.self, id: "WorkPackagePool.BlockFinalized") { [weak self] event in
try await self?.on(blockFinalized: event)
}
}

private func on(workPackagesGenerated event: RuntimeEvents.WorkPackagesGenerated) async throws {
let state = try await dataProvider.getBestState()
try await storage.update(state: state, config: config)
await storage.add(packages: event.items, config: config)
}

private func on(blockFinalized event: RuntimeEvents.BlockFinalized) async throws {
let block = try await dataProvider.getBlock(hash: event.hash)
// await storage.removeWorkPackages(workPackages: block.extrinsic.reports.guarantees.)
}

public func update(state: StateRef, config: ProtocolConfigRef) async throws {
try await storage.update(state: state, config: config)
}

public func addWorkPackages(packages: [WorkPackageAndOutput]) async throws {
await storage.add(packages: packages, config: config)
}

public func getWorkPackage(for core: CoreIndex) async -> SortedUniqueArray<WorkPackageAndOutput> {
await storage.getWorkPackage(for: core)
}
}
60 changes: 60 additions & 0 deletions Blockchain/Tests/BlockchainTests/WorkPackagePoolServiceTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import Foundation
import Testing
import TracingUtils
import Utils

@testable import Blockchain

struct WorkPackagePoolServiceTests {
let config: ProtocolConfigRef
let timeProvider: MockTimeProvider
let dataProvider: BlockchainDataProvider
let eventBus: EventBus
let keystore: KeyStore
let storeMiddleware: StoreMiddleware
let workPackagecPoolService: WorkPackagePoolService

let ringContext: Bandersnatch.RingContext

init() async throws {
config = ProtocolConfigRef.dev.mutate { config in
config.ticketEntriesPerValidator = 4
}
timeProvider = MockTimeProvider(time: 1000)

let (genesisState, genesisBlock) = try State.devGenesis(config: config)
dataProvider = try await BlockchainDataProvider(InMemoryDataProvider(genesisState: genesisState, genesisBlock: genesisBlock))

storeMiddleware = StoreMiddleware()
eventBus = EventBus(eventMiddleware: .serial(Middleware(storeMiddleware), .noError), handlerMiddleware: .noError)

keystore = try await DevKeyStore(devKeysCount: config.value.totalNumberOfValidators)

workPackagecPoolService = await WorkPackagePoolService(config: config, dataProvider: dataProvider, eventBus: eventBus)
try await workPackagecPoolService.addWorkPackages(packages: [])
ringContext = try Bandersnatch.RingContext(size: UInt(config.value.totalNumberOfValidators))

// setupTestLogger()
}

@Test
func testAddPendingWorkPackage() async throws {
let state = try await dataProvider.getBestState()

var allWorkPackages = SortedUniqueArray<WorkPackageAndOutput>()

for (i, validatorKey) in state.value.nextValidators.enumerated() {
let secretKey = try await keystore.get(Bandersnatch.self, publicKey: Bandersnatch.PublicKey(data: validatorKey.bandersnatch))!
// generate work package
// eventBus.publish
// Wait for the event to be processed
await storeMiddleware.wait()
}
}

@Test
func testAddAndInvalidWorkPackage() async throws {
let state = try await dataProvider.getBestState()
let validatorKey = state.value.currentValidators[0]
}
}

0 comments on commit d1bbfdb

Please sign in to comment.