Skip to content

Commit

Permalink
Merge branch 'master' into rpc/telemetry-basics
Browse files Browse the repository at this point in the history
  • Loading branch information
qiweiii committed Oct 23, 2024
2 parents 79d67a4 + 63c6167 commit 37a021e
Show file tree
Hide file tree
Showing 55 changed files with 1,304 additions and 502 deletions.
2 changes: 1 addition & 1 deletion Blockchain/Sources/Blockchain/Blockchain.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public final class Blockchain: ServiceBase, @unchecked Sendable {
self.dataProvider = dataProvider
self.timeProvider = timeProvider

super.init(logger: Logger(label: "Blockchain"), config: config, eventBus: eventBus)
super.init(id: "Blockchain", config: config, eventBus: eventBus)

await subscribe(RuntimeEvents.BlockAuthored.self, id: "Blockchain.BlockAuthored") { [weak self] event in
try await self?.on(blockAuthored: event)
Expand Down
38 changes: 38 additions & 0 deletions Blockchain/Sources/Blockchain/Config/ProtocolConfig+Preset.swift
Original file line number Diff line number Diff line change
@@ -1,6 +1,44 @@
import Utils

extension Ref where T == ProtocolConfig {
// TODO: pick some good numbers for dev env
public static let minimal = Ref(ProtocolConfig(
auditTranchePeriod: 8,
additionalMinBalancePerStateItem: 10,
additionalMinBalancePerStateByte: 1,
serviceMinBalance: 100,
totalNumberOfCores: 1,
preimagePurgePeriod: 28800,
epochLength: 6,
auditBiasFactor: 2,
coreAccumulationGas: Gas(10_000_000), // TODO: check this
workPackageAuthorizerGas: Gas(10_000_000), // TODO: check this
workPackageRefineGas: Gas(10_000_000), // TODO: check this
recentHistorySize: 8,
maxWorkItems: 4,
maxTicketsPerExtrinsic: 4,
maxLookupAnchorAge: 14400,
transferMemoSize: 128,
ticketEntriesPerValidator: 2,
maxAuthorizationsPoolItems: 8,
slotPeriodSeconds: 4,
maxAuthorizationsQueueItems: 10,
coreAssignmentRotationPeriod: 6,
maxServiceCodeSize: 4_000_000,
preimageReplacementPeriod: 5,
totalNumberOfValidators: 3,
erasureCodedPieceSize: 684,
maxWorkPackageManifestEntries: 1 << 11,
maxEncodedWorkPackageSize: 12 * 1 << 20,
maxEncodedWorkReportSize: 96 * 1 << 10,
erasureCodedSegmentSize: 6,
ticketSubmissionEndSlot: 2,
pvmDynamicAddressAlignmentFactor: 2,
pvmProgramInitInputDataSize: 1 << 24,
pvmProgramInitPageSize: 1 << 14,
pvmProgramInitSegmentSize: 1 << 16
))

// TODO: pick some good numbers for dev env
public static let dev = Ref(ProtocolConfig(
auditTranchePeriod: 8,
Expand Down
26 changes: 17 additions & 9 deletions Blockchain/Sources/Blockchain/RuntimeProtocols/Runtime.swift
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import Codec
import Foundation
import TracingUtils
import Utils

private let logger = Logger(label: "Runtime")

// the STF
public final class Runtime {
public enum Error: Swift.Error {
Expand All @@ -22,7 +25,8 @@ public final class Runtime {
case preimagesNotSorted
case invalidPreimageServiceIndex
case duplicatedPreimage
case notBlockAuthor
case invalidAuthorTicket
case invalidAuthorKey
case invalidBlockSeal(any Swift.Error)
case invalidVrfSignature
case other(any Swift.Error)
Expand Down Expand Up @@ -70,19 +74,20 @@ public final class Runtime {
// winning tickets is validated at apply time by Safrole

// offendersMarkers is validated at apply time by Disputes
}

// validate block.header.seal
public func validateHeaderSeal(block: BlockRef, state: inout State) throws(Error) {
let vrfOutput: Data32
let blockAuthorKey = try Result {
try Bandersnatch.PublicKey(data: state.value.currentValidators[Int(block.header.authorIndex)].bandersnatch)
try Bandersnatch.PublicKey(data: state.currentValidators[Int(block.header.authorIndex)].bandersnatch)
}.mapError(Error.invalidBlockSeal).get()
let index = block.header.timeslot % UInt32(config.value.epochLength)
let encodedHeader = try Result { try JamEncoder.encode(block.header.unsigned) }.mapError(Error.invalidBlockSeal).get()
let entropyVRFInputData: Data
switch state.value.safroleState.ticketsOrKeys {
switch state.safroleState.ticketsOrKeys {
case let .left(tickets):
let ticket = tickets[Int(index)]
let vrfInputData = SigningContext.safroleTicketInputData(entropy: state.value.entropyPool.t3, attempt: ticket.attempt)
let vrfInputData = SigningContext.safroleTicketInputData(entropy: state.entropyPool.t3, attempt: ticket.attempt)
vrfOutput = try Result {
try blockAuthorKey.ietfVRFVerify(
vrfInputData: vrfInputData,
Expand All @@ -91,17 +96,18 @@ public final class Runtime {
)
}.mapError(Error.invalidBlockSeal).get()
guard ticket.id == vrfOutput else {
throw Error.notBlockAuthor
throw Error.invalidAuthorTicket
}

entropyVRFInputData = SigningContext.entropyInputData(entropy: vrfOutput)

case let .right(keys):
let key = keys[Int(index)]
guard key == blockAuthorKey.data else {
throw Error.notBlockAuthor
logger.debug("expected key: \(key.toHexString()), got key: \(blockAuthorKey.data.toHexString())")
throw Error.invalidAuthorKey
}
let vrfInputData = SigningContext.fallbackSealInputData(entropy: state.value.entropyPool.t3)
let vrfInputData = SigningContext.fallbackSealInputData(entropy: state.entropyPool.t3)
vrfOutput = try Result {
try blockAuthorKey.ietfVRFVerify(
vrfInputData: vrfInputData,
Expand All @@ -110,7 +116,7 @@ public final class Runtime {
)
}.mapError(Error.invalidBlockSeal).get()

entropyVRFInputData = SigningContext.fallbackSealInputData(entropy: state.value.entropyPool.t3)
entropyVRFInputData = SigningContext.fallbackSealInputData(entropy: state.entropyPool.t3)
}

_ = try Result {
Expand Down Expand Up @@ -150,6 +156,8 @@ public final class Runtime {
do {
try updateSafrole(block: block, state: &newState)

try validateHeaderSeal(block: block, state: &newState)

try updateDisputes(block: block, state: &newState)

// depends on Safrole and Disputes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ public enum RuntimeEvents {

// New safrole ticket generated from SafroleService
public struct SafroleTicketsGenerated: Event {
public let epochIndex: EpochIndex
public let items: [TicketItemAndOutput]
public let publicKey: Bandersnatch.PublicKey
}
Expand Down
4 changes: 4 additions & 0 deletions Blockchain/Sources/Blockchain/Types/StateRef.swift
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ public final class StateRef: Ref<State>, @unchecked Sendable {
public var stateRoot: Data32 {
lazyStateRoot.value.value
}

override public var description: String {
"StateRef(\(stateRoot.toHexString()))"
}
}

extension StateRef: Codable {
Expand Down
24 changes: 14 additions & 10 deletions Blockchain/Sources/Blockchain/Validator/BlockAuthor.swift
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import Codec
import Foundation
import Synchronization
import TracingUtils
import Utils

Expand All @@ -8,7 +9,7 @@ public final class BlockAuthor: ServiceBase2, @unchecked Sendable {
private let keystore: KeyStore
private let extrinsicPool: ExtrinsicPoolService

private var tickets: ThreadSafeContainer<[RuntimeEvents.SafroleTicketsGenerated]> = .init([])
private let tickets: ThreadSafeContainer<[RuntimeEvents.SafroleTicketsGenerated]> = .init([])

public init(
config: ProtocolConfigRef,
Expand All @@ -22,7 +23,7 @@ public final class BlockAuthor: ServiceBase2, @unchecked Sendable {
self.keystore = keystore
self.extrinsicPool = extrinsicPool

super.init(logger: Logger(label: "BlockAuthor"), config: config, eventBus: eventBus, scheduler: scheduler)
super.init(id: "BlockAuthor", config: config, eventBus: eventBus, scheduler: scheduler)

await subscribe(RuntimeEvents.SafroleTicketsGenerated.self, id: "BlockAuthor.SafroleTicketsGenerated") { [weak self] event in
try await self?.on(safroleTicketsGenerated: event)
Expand All @@ -33,11 +34,11 @@ public final class BlockAuthor: ServiceBase2, @unchecked Sendable {
}
}

public func on(genesis state: StateRef) async {
await scheduleNewBlocks(
ticketsOrKeys: state.value.safroleState.ticketsOrKeys,
timeslot: timeProvider.getTime().timeToTimeslot(config: config)
)
public func on(genesis _: StateRef) async {
let nowTimeslot = timeProvider.getTime().timeToTimeslot(config: config)
// schedule for current epoch
let epoch = (nowTimeslot + 1).timeslotToEpochIndex(config: config)
await onBeforeEpoch(epoch: epoch)
}

public func createNewBlock(
Expand All @@ -47,6 +48,9 @@ public final class BlockAuthor: ServiceBase2, @unchecked Sendable {
-> BlockRef
{
let parentHash = dataProvider.bestHead

logger.trace("creating new block for timeslot: \(timeslot) with parent hash: \(parentHash)")

let state = try await dataProvider.getState(hash: parentHash)
let epoch = timeslot.timeslotToEpochIndex(config: config)

Expand Down Expand Up @@ -156,7 +160,7 @@ public final class BlockAuthor: ServiceBase2, @unchecked Sendable {
await withSpan("BlockAuthor.newBlock", logger: logger) { _ in
// TODO: add timeout
let block = try await createNewBlock(timeslot: timeslot, claim: claim)
logger.info("New block created: #\(block.header.timeslot) \(block.hash)")
logger.info("New block created: #\(block.header.timeslot) \(block.hash) on parent #\(block.header.parentHash)")
publish(RuntimeEvents.BlockAuthored(block: block))
}
}
Expand Down Expand Up @@ -205,7 +209,7 @@ public final class BlockAuthor: ServiceBase2, @unchecked Sendable {
if delay < 0 {
continue
}
logger.debug("Scheduling new block task at timeslot \(timeslot)")
logger.trace("Scheduling new block task at timeslot \(timeslot) for claim \(claim.1.data.toHexString())")
schedule(id: "BlockAuthor.newBlock", delay: delay) { [weak self] in
if let self {
await newBlock(timeslot: timeslot, claim: .left(claim))
Expand All @@ -223,7 +227,7 @@ public final class BlockAuthor: ServiceBase2, @unchecked Sendable {
if delay < 0 {
continue
}
logger.debug("Scheduling new block task at timeslot \(timeslot)")
logger.trace("Scheduling new block task at timeslot \(timeslot) for key \(pubkey.data.toHexString())")
schedule(id: "BlockAuthor.newBlock", delay: delay) { [weak self] in
if let self {
await newBlock(timeslot: timeslot, claim: .right(pubkey))
Expand Down
17 changes: 10 additions & 7 deletions Blockchain/Sources/Blockchain/Validator/ExtrinsicPoolService.swift
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import Utils
private typealias TicketItem = ExtrinsicTickets.TicketItem

private actor ServiceStorage {
let logger: Logger
var logger: Logger!

// sorted array ordered by output
var pendingTickets: SortedUniqueArray<TicketItemAndOutput> = .init()
Expand All @@ -13,11 +13,14 @@ private actor ServiceStorage {
var entropy: Data32 = .init()
let ringContext: Bandersnatch.RingContext

init(logger: Logger, ringContext: Bandersnatch.RingContext) {
self.logger = logger
init(ringContext: Bandersnatch.RingContext) {
self.ringContext = ringContext
}

func setLogger(_ logger: Logger) {
self.logger = logger
}

func add(tickets: [TicketItem], config: ProtocolConfigRef) {
for ticket in tickets {
if (try? ticket.validate(config: config)) == nil {
Expand Down Expand Up @@ -77,12 +80,12 @@ public final class ExtrinsicPoolService: ServiceBase, @unchecked Sendable {
) async {
self.dataProvider = dataProvider

let logger = Logger(label: "ExtrinsicPoolService")

let ringContext = try! Bandersnatch.RingContext(size: UInt(config.value.totalNumberOfValidators))
storage = ServiceStorage(logger: logger, ringContext: ringContext)
storage = ServiceStorage(ringContext: ringContext)

super.init(id: "ExtrinsicPoolService", config: config, eventBus: eventBus)

super.init(logger: logger, config: config, eventBus: eventBus)
await storage.setLogger(logger)

await subscribe(RuntimeEvents.SafroleTicketsGenerated.self, id: "ExtrinsicPool.SafroleTicketsGenerated") { [weak self] event in
try await self?.on(safroleTicketsGenerated: event)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public final class GuaranteeingService: ServiceBase2, @unchecked Sendable {
self.runtime = runtime
self.extrinsicPool = extrinsicPool

super.init(logger: Logger(label: "BlockAuthor"), config: config, eventBus: eventBus, scheduler: scheduler)
super.init(id: "BlockAuthor", config: config, eventBus: eventBus, scheduler: scheduler)
}

public func on(genesis _: StateRef) async {}
Expand Down
7 changes: 3 additions & 4 deletions Blockchain/Sources/Blockchain/Validator/SafroleService.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public final class SafroleService: ServiceBase, @unchecked Sendable {
self.keystore = keystore
ringContext = try! Bandersnatch.RingContext(size: UInt(config.value.totalNumberOfValidators))

super.init(logger: Logger(label: "SafroleService"), config: config, eventBus: eventBus)
super.init(id: "SafroleService", config: config, eventBus: eventBus)

await subscribe(RuntimeEvents.BlockImported.self, id: "SafroleService.BlockImported") { [weak self] event in
try await self?.on(blockImported: event)
Expand Down Expand Up @@ -66,8 +66,6 @@ public final class SafroleService: ServiceBase, @unchecked Sendable {
continue
}

logger.debug("Generating tickets for validator \(pubkey)")

try withSpan("generateTickets") { _ in
let tickets = try SafroleService.generateTickets(
count: TicketIndex(config.value.ticketEntriesPerValidator),
Expand All @@ -79,14 +77,15 @@ public final class SafroleService: ServiceBase, @unchecked Sendable {
)

events.append(.init(
epochIndex: state.value.timeslot.timeslotToEpochIndex(config: config),
items: tickets,
publicKey: secret.publicKey
))
}
}

if events.isEmpty {
logger.debug("Not in next validators")
logger.trace("Not in next validators")
}

return events
Expand Down
40 changes: 13 additions & 27 deletions Blockchain/Sources/Blockchain/Validator/ServiceBase.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,44 +2,30 @@ import TracingUtils
import Utils

public class ServiceBase {
public let logger: Logger
public let id: UniqueId
let logger: Logger
public let config: ProtocolConfigRef
private let eventBus: EventBus
private let subscriptionTokens: ThreadSafeContainer<[EventBus.SubscriptionToken]> = .init([])
private let subscriptions: EventSubscriptions

init(logger: Logger, config: ProtocolConfigRef, eventBus: EventBus) {
self.logger = logger
init(id: UniqueId, config: ProtocolConfigRef, eventBus: EventBus) {
self.id = id
logger = Logger(label: id)
self.config = config
self.eventBus = eventBus
}

deinit {
let eventBus = self.eventBus
let subscriptionTokens = self.subscriptionTokens
Task {
for token in subscriptionTokens.value {
await eventBus.unsubscribe(token: token)
}
}
subscriptions = EventSubscriptions(eventBus: eventBus)
}

@discardableResult
func subscribe<T: Event>(_ eventType: T.Type, id _: UniqueId, handler: @escaping @Sendable (T) async throws -> Void) async -> EventBus
.SubscriptionToken
{
let token = await eventBus.subscribe(eventType, handler: handler)
subscriptionTokens.write { $0.append(token) }
return token
func subscribe<T: Event>(
_ eventType: T.Type, id _: UniqueId, handler: @escaping @Sendable (T) async throws -> Void
) async -> EventBus.SubscriptionToken {
await subscriptions.subscribe(eventType, id: id, handler: handler)
}

func unsubscribe(token: EventBus.SubscriptionToken) async {
subscriptionTokens.write { tokens in
tokens.removeAll { $0 == token }
}
await eventBus.unsubscribe(token: token)
await subscriptions.unsubscribe(token: token)
}

func publish(_ event: some Event) {
eventBus.publish(event)
subscriptions.publish(event)
}
}
Loading

0 comments on commit 37a021e

Please sign in to comment.