Skip to content

Commit

Permalink
Merge branch 'master' into dev_peer
Browse files Browse the repository at this point in the history
* master:
  update async channels (#188)
  fix block author (#185)
  • Loading branch information
MacOMNI committed Oct 22, 2024
2 parents 4d98221 + 1bb83f2 commit f190901
Show file tree
Hide file tree
Showing 24 changed files with 346 additions and 229 deletions.
2 changes: 1 addition & 1 deletion Blockchain/Sources/Blockchain/Blockchain.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public final class Blockchain: ServiceBase, @unchecked Sendable {
self.dataProvider = dataProvider
self.timeProvider = timeProvider

super.init(logger: Logger(label: "Blockchain"), config: config, eventBus: eventBus)
super.init(id: "Blockchain", config: config, eventBus: eventBus)

await subscribe(RuntimeEvents.BlockAuthored.self, id: "Blockchain.BlockAuthored") { [weak self] event in
try await self?.on(blockAuthored: event)
Expand Down
26 changes: 17 additions & 9 deletions Blockchain/Sources/Blockchain/RuntimeProtocols/Runtime.swift
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import Codec
import Foundation
import TracingUtils
import Utils

private let logger = Logger(label: "Runtime")

// the STF
public final class Runtime {
public enum Error: Swift.Error {
Expand All @@ -22,7 +25,8 @@ public final class Runtime {
case preimagesNotSorted
case invalidPreimageServiceIndex
case duplicatedPreimage
case notBlockAuthor
case invalidAuthorTicket
case invalidAuthorKey
case invalidBlockSeal(any Swift.Error)
case invalidVrfSignature
case other(any Swift.Error)
Expand Down Expand Up @@ -70,19 +74,20 @@ public final class Runtime {
// winning tickets is validated at apply time by Safrole

// offendersMarkers is validated at apply time by Disputes
}

// validate block.header.seal
public func validateHeaderSeal(block: BlockRef, state: inout State) throws(Error) {
let vrfOutput: Data32
let blockAuthorKey = try Result {
try Bandersnatch.PublicKey(data: state.value.currentValidators[Int(block.header.authorIndex)].bandersnatch)
try Bandersnatch.PublicKey(data: state.currentValidators[Int(block.header.authorIndex)].bandersnatch)
}.mapError(Error.invalidBlockSeal).get()
let index = block.header.timeslot % UInt32(config.value.epochLength)
let encodedHeader = try Result { try JamEncoder.encode(block.header.unsigned) }.mapError(Error.invalidBlockSeal).get()
let entropyVRFInputData: Data
switch state.value.safroleState.ticketsOrKeys {
switch state.safroleState.ticketsOrKeys {
case let .left(tickets):
let ticket = tickets[Int(index)]
let vrfInputData = SigningContext.safroleTicketInputData(entropy: state.value.entropyPool.t3, attempt: ticket.attempt)
let vrfInputData = SigningContext.safroleTicketInputData(entropy: state.entropyPool.t3, attempt: ticket.attempt)
vrfOutput = try Result {
try blockAuthorKey.ietfVRFVerify(
vrfInputData: vrfInputData,
Expand All @@ -91,17 +96,18 @@ public final class Runtime {
)
}.mapError(Error.invalidBlockSeal).get()
guard ticket.id == vrfOutput else {
throw Error.notBlockAuthor
throw Error.invalidAuthorTicket
}

entropyVRFInputData = SigningContext.entropyInputData(entropy: vrfOutput)

case let .right(keys):
let key = keys[Int(index)]
guard key == blockAuthorKey.data else {
throw Error.notBlockAuthor
logger.debug("expected key: \(key.toHexString()), got key: \(blockAuthorKey.data.toHexString())")
throw Error.invalidAuthorKey
}
let vrfInputData = SigningContext.fallbackSealInputData(entropy: state.value.entropyPool.t3)
let vrfInputData = SigningContext.fallbackSealInputData(entropy: state.entropyPool.t3)
vrfOutput = try Result {
try blockAuthorKey.ietfVRFVerify(
vrfInputData: vrfInputData,
Expand All @@ -110,7 +116,7 @@ public final class Runtime {
)
}.mapError(Error.invalidBlockSeal).get()

entropyVRFInputData = SigningContext.fallbackSealInputData(entropy: state.value.entropyPool.t3)
entropyVRFInputData = SigningContext.fallbackSealInputData(entropy: state.entropyPool.t3)
}

_ = try Result {
Expand Down Expand Up @@ -150,6 +156,8 @@ public final class Runtime {
do {
try updateSafrole(block: block, state: &newState)

try validateHeaderSeal(block: block, state: &newState)

try updateDisputes(block: block, state: &newState)

// depends on Safrole and Disputes
Expand Down
4 changes: 4 additions & 0 deletions Blockchain/Sources/Blockchain/Types/StateRef.swift
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ public final class StateRef: Ref<State>, @unchecked Sendable {
public var stateRoot: Data32 {
lazyStateRoot.value.value
}

override public var description: String {
"StateRef(\(stateRoot.toHexString()))"
}
}

extension StateRef: Codable {
Expand Down
24 changes: 14 additions & 10 deletions Blockchain/Sources/Blockchain/Validator/BlockAuthor.swift
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import Codec
import Foundation
import Synchronization
import TracingUtils
import Utils

Expand All @@ -8,7 +9,7 @@ public final class BlockAuthor: ServiceBase2, @unchecked Sendable {
private let keystore: KeyStore
private let extrinsicPool: ExtrinsicPoolService

private var tickets: ThreadSafeContainer<[RuntimeEvents.SafroleTicketsGenerated]> = .init([])
private let tickets: ThreadSafeContainer<[RuntimeEvents.SafroleTicketsGenerated]> = .init([])

public init(
config: ProtocolConfigRef,
Expand All @@ -22,7 +23,7 @@ public final class BlockAuthor: ServiceBase2, @unchecked Sendable {
self.keystore = keystore
self.extrinsicPool = extrinsicPool

super.init(logger: Logger(label: "BlockAuthor"), config: config, eventBus: eventBus, scheduler: scheduler)
super.init(id: "BlockAuthor", config: config, eventBus: eventBus, scheduler: scheduler)

await subscribe(RuntimeEvents.SafroleTicketsGenerated.self, id: "BlockAuthor.SafroleTicketsGenerated") { [weak self] event in
try await self?.on(safroleTicketsGenerated: event)
Expand All @@ -33,11 +34,11 @@ public final class BlockAuthor: ServiceBase2, @unchecked Sendable {
}
}

public func on(genesis state: StateRef) async {
await scheduleNewBlocks(
ticketsOrKeys: state.value.safroleState.ticketsOrKeys,
timeslot: timeProvider.getTime().timeToTimeslot(config: config)
)
public func on(genesis _: StateRef) async {
let nowTimeslot = timeProvider.getTime().timeToTimeslot(config: config)
// schedule for current epoch
let epoch = (nowTimeslot + 1).timeslotToEpochIndex(config: config)
await onBeforeEpoch(epoch: epoch)
}

public func createNewBlock(
Expand All @@ -47,6 +48,9 @@ public final class BlockAuthor: ServiceBase2, @unchecked Sendable {
-> BlockRef
{
let parentHash = dataProvider.bestHead

logger.trace("creating new block for timeslot: \(timeslot) with parent hash: \(parentHash)")

let state = try await dataProvider.getState(hash: parentHash)
let epoch = timeslot.timeslotToEpochIndex(config: config)

Expand Down Expand Up @@ -156,7 +160,7 @@ public final class BlockAuthor: ServiceBase2, @unchecked Sendable {
await withSpan("BlockAuthor.newBlock", logger: logger) { _ in
// TODO: add timeout
let block = try await createNewBlock(timeslot: timeslot, claim: claim)
logger.info("New block created: #\(block.header.timeslot) \(block.hash)")
logger.info("New block created: #\(block.header.timeslot) \(block.hash) on parent #\(block.header.parentHash)")
publish(RuntimeEvents.BlockAuthored(block: block))
}
}
Expand Down Expand Up @@ -205,7 +209,7 @@ public final class BlockAuthor: ServiceBase2, @unchecked Sendable {
if delay < 0 {
continue
}
logger.debug("Scheduling new block task at timeslot \(timeslot)")
logger.trace("Scheduling new block task at timeslot \(timeslot) for claim \(claim.1.data.toHexString())")
schedule(id: "BlockAuthor.newBlock", delay: delay) { [weak self] in
if let self {
await newBlock(timeslot: timeslot, claim: .left(claim))
Expand All @@ -223,7 +227,7 @@ public final class BlockAuthor: ServiceBase2, @unchecked Sendable {
if delay < 0 {
continue
}
logger.debug("Scheduling new block task at timeslot \(timeslot)")
logger.trace("Scheduling new block task at timeslot \(timeslot) for key \(pubkey.data.toHexString())")
schedule(id: "BlockAuthor.newBlock", delay: delay) { [weak self] in
if let self {
await newBlock(timeslot: timeslot, claim: .right(pubkey))
Expand Down
17 changes: 10 additions & 7 deletions Blockchain/Sources/Blockchain/Validator/ExtrinsicPoolService.swift
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import Utils
private typealias TicketItem = ExtrinsicTickets.TicketItem

private actor ServiceStorage {
let logger: Logger
var logger: Logger!

// sorted array ordered by output
var pendingTickets: SortedUniqueArray<TicketItemAndOutput> = .init()
Expand All @@ -13,11 +13,14 @@ private actor ServiceStorage {
var entropy: Data32 = .init()
let ringContext: Bandersnatch.RingContext

init(logger: Logger, ringContext: Bandersnatch.RingContext) {
self.logger = logger
init(ringContext: Bandersnatch.RingContext) {
self.ringContext = ringContext
}

func setLogger(_ logger: Logger) {
self.logger = logger
}

func add(tickets: [TicketItem], config: ProtocolConfigRef) {
for ticket in tickets {
if (try? ticket.validate(config: config)) == nil {
Expand Down Expand Up @@ -77,12 +80,12 @@ public final class ExtrinsicPoolService: ServiceBase, @unchecked Sendable {
) async {
self.dataProvider = dataProvider

let logger = Logger(label: "ExtrinsicPoolService")

let ringContext = try! Bandersnatch.RingContext(size: UInt(config.value.totalNumberOfValidators))
storage = ServiceStorage(logger: logger, ringContext: ringContext)
storage = ServiceStorage(ringContext: ringContext)

super.init(id: "ExtrinsicPoolService", config: config, eventBus: eventBus)

super.init(logger: logger, config: config, eventBus: eventBus)
await storage.setLogger(logger)

await subscribe(RuntimeEvents.SafroleTicketsGenerated.self, id: "ExtrinsicPool.SafroleTicketsGenerated") { [weak self] event in
try await self?.on(safroleTicketsGenerated: event)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public final class GuaranteeingService: ServiceBase2, @unchecked Sendable {
self.runtime = runtime
self.extrinsicPool = extrinsicPool

super.init(logger: Logger(label: "BlockAuthor"), config: config, eventBus: eventBus, scheduler: scheduler)
super.init(id: "BlockAuthor", config: config, eventBus: eventBus, scheduler: scheduler)
}

public func on(genesis _: StateRef) async {}
Expand Down
6 changes: 2 additions & 4 deletions Blockchain/Sources/Blockchain/Validator/SafroleService.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public final class SafroleService: ServiceBase, @unchecked Sendable {
self.keystore = keystore
ringContext = try! Bandersnatch.RingContext(size: UInt(config.value.totalNumberOfValidators))

super.init(logger: Logger(label: "SafroleService"), config: config, eventBus: eventBus)
super.init(id: "SafroleService", config: config, eventBus: eventBus)

await subscribe(RuntimeEvents.BlockImported.self, id: "SafroleService.BlockImported") { [weak self] event in
try await self?.on(blockImported: event)
Expand Down Expand Up @@ -66,8 +66,6 @@ public final class SafroleService: ServiceBase, @unchecked Sendable {
continue
}

logger.debug("Generating tickets for validator \(pubkey)")

try withSpan("generateTickets") { _ in
let tickets = try SafroleService.generateTickets(
count: TicketIndex(config.value.ticketEntriesPerValidator),
Expand All @@ -86,7 +84,7 @@ public final class SafroleService: ServiceBase, @unchecked Sendable {
}

if events.isEmpty {
logger.debug("Not in next validators")
logger.trace("Not in next validators")
}

return events
Expand Down
8 changes: 5 additions & 3 deletions Blockchain/Sources/Blockchain/Validator/ServiceBase.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ import TracingUtils
import Utils

public class ServiceBase {
public let logger: Logger
public let id: UniqueId
let logger: Logger
public let config: ProtocolConfigRef
private let eventBus: EventBus
private let subscriptionTokens: ThreadSafeContainer<[EventBus.SubscriptionToken]> = .init([])

init(logger: Logger, config: ProtocolConfigRef, eventBus: EventBus) {
self.logger = logger
init(id: UniqueId, config: ProtocolConfigRef, eventBus: EventBus) {
self.id = id
logger = Logger(label: id)
self.config = config
self.eventBus = eventBus
}
Expand Down
16 changes: 7 additions & 9 deletions Blockchain/Sources/Blockchain/Validator/ServiceBase2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ public class ServiceBase2: ServiceBase, @unchecked Sendable {
private let scheduler: Scheduler
private let cancellables: ThreadSafeContainer<Set<IdCancellable>> = .init([])

public init(logger: Logger, config: ProtocolConfigRef, eventBus: EventBus, scheduler: Scheduler) {
public init(id: UniqueId, config: ProtocolConfigRef, eventBus: EventBus, scheduler: Scheduler) {
self.scheduler = scheduler
super.init(logger: logger, config: config, eventBus: eventBus)
super.init(id: id, config: config, eventBus: eventBus)
}

deinit {
Expand Down Expand Up @@ -63,8 +63,8 @@ public class ServiceBase2: ServiceBase, @unchecked Sendable {

@discardableResult
public func scheduleForNextEpoch(_ id: UniqueId, fn: @escaping @Sendable (EpochIndex) async -> Void) -> Cancellable {
let now = timeProvider.getTimeInterval()
let nowTimeslot = UInt32(now).timeToTimeslot(config: config)
let now = timeProvider.getTime()
let nowTimeslot = now.timeToTimeslot(config: config)
let nextEpoch = (nowTimeslot + 1).timeslotToEpochIndex(config: config) + 1
return scheduleFor(epoch: nextEpoch, id: id, fn: fn)
}
Expand All @@ -73,12 +73,10 @@ public class ServiceBase2: ServiceBase, @unchecked Sendable {
private func scheduleFor(epoch: EpochIndex, id: UniqueId, fn: @escaping @Sendable (EpochIndex) async -> Void) -> Cancellable {
let scheduleTime = config.scheduleTimeForPrepareEpoch(epoch: epoch)
let now = timeProvider.getTimeInterval()
let delay = scheduleTime - now
var delay = scheduleTime - now
if delay < 0 {
// too late / current epoch is about to end
// schedule for the one after
logger.debug("\(id): skipping epoch \(epoch) because it is too late")
return scheduleFor(epoch: epoch + 1, id: id, fn: fn)
logger.debug("\(id): late epoch start \(epoch), expectedDelay \(delay)")
delay = 0
}
logger.trace("\(id): scheduling epoch \(epoch) in \(delay)")
return schedule(id: id, delay: delay) { [weak self] in
Expand Down
Loading

0 comments on commit f190901

Please sign in to comment.