diff --git a/Blockchain/Sources/Blockchain/Blockchain.swift b/Blockchain/Sources/Blockchain/Blockchain.swift index a83cb5e5..b2284727 100644 --- a/Blockchain/Sources/Blockchain/Blockchain.swift +++ b/Blockchain/Sources/Blockchain/Blockchain.swift @@ -15,7 +15,7 @@ public final class Blockchain: ServiceBase, @unchecked Sendable { self.dataProvider = dataProvider self.timeProvider = timeProvider - super.init(logger: Logger(label: "Blockchain"), config: config, eventBus: eventBus) + super.init(id: "Blockchain", config: config, eventBus: eventBus) await subscribe(RuntimeEvents.BlockAuthored.self, id: "Blockchain.BlockAuthored") { [weak self] event in try await self?.on(blockAuthored: event) diff --git a/Blockchain/Sources/Blockchain/RuntimeProtocols/Runtime.swift b/Blockchain/Sources/Blockchain/RuntimeProtocols/Runtime.swift index 6da6d5f5..4fb14c64 100644 --- a/Blockchain/Sources/Blockchain/RuntimeProtocols/Runtime.swift +++ b/Blockchain/Sources/Blockchain/RuntimeProtocols/Runtime.swift @@ -1,7 +1,10 @@ import Codec import Foundation +import TracingUtils import Utils +private let logger = Logger(label: "Runtime") + // the STF public final class Runtime { public enum Error: Swift.Error { @@ -22,7 +25,8 @@ public final class Runtime { case preimagesNotSorted case invalidPreimageServiceIndex case duplicatedPreimage - case notBlockAuthor + case invalidAuthorTicket + case invalidAuthorKey case invalidBlockSeal(any Swift.Error) case invalidVrfSignature case other(any Swift.Error) @@ -70,19 +74,20 @@ public final class Runtime { // winning tickets is validated at apply time by Safrole // offendersMarkers is validated at apply time by Disputes + } - // validate block.header.seal + public func validateHeaderSeal(block: BlockRef, state: inout State) throws(Error) { let vrfOutput: Data32 let blockAuthorKey = try Result { - try Bandersnatch.PublicKey(data: state.value.currentValidators[Int(block.header.authorIndex)].bandersnatch) + try Bandersnatch.PublicKey(data: state.currentValidators[Int(block.header.authorIndex)].bandersnatch) }.mapError(Error.invalidBlockSeal).get() let index = block.header.timeslot % UInt32(config.value.epochLength) let encodedHeader = try Result { try JamEncoder.encode(block.header.unsigned) }.mapError(Error.invalidBlockSeal).get() let entropyVRFInputData: Data - switch state.value.safroleState.ticketsOrKeys { + switch state.safroleState.ticketsOrKeys { case let .left(tickets): let ticket = tickets[Int(index)] - let vrfInputData = SigningContext.safroleTicketInputData(entropy: state.value.entropyPool.t3, attempt: ticket.attempt) + let vrfInputData = SigningContext.safroleTicketInputData(entropy: state.entropyPool.t3, attempt: ticket.attempt) vrfOutput = try Result { try blockAuthorKey.ietfVRFVerify( vrfInputData: vrfInputData, @@ -91,7 +96,7 @@ public final class Runtime { ) }.mapError(Error.invalidBlockSeal).get() guard ticket.id == vrfOutput else { - throw Error.notBlockAuthor + throw Error.invalidAuthorTicket } entropyVRFInputData = SigningContext.entropyInputData(entropy: vrfOutput) @@ -99,9 +104,10 @@ public final class Runtime { case let .right(keys): let key = keys[Int(index)] guard key == blockAuthorKey.data else { - throw Error.notBlockAuthor + logger.debug("expected key: \(key.toHexString()), got key: \(blockAuthorKey.data.toHexString())") + throw Error.invalidAuthorKey } - let vrfInputData = SigningContext.fallbackSealInputData(entropy: state.value.entropyPool.t3) + let vrfInputData = SigningContext.fallbackSealInputData(entropy: state.entropyPool.t3) vrfOutput = try Result { try blockAuthorKey.ietfVRFVerify( vrfInputData: vrfInputData, @@ -110,7 +116,7 @@ public final class Runtime { ) }.mapError(Error.invalidBlockSeal).get() - entropyVRFInputData = SigningContext.fallbackSealInputData(entropy: state.value.entropyPool.t3) + entropyVRFInputData = SigningContext.fallbackSealInputData(entropy: state.entropyPool.t3) } _ = try Result { @@ -150,6 +156,8 @@ public final class Runtime { do { try updateSafrole(block: block, state: &newState) + try validateHeaderSeal(block: block, state: &newState) + try updateDisputes(block: block, state: &newState) // depends on Safrole and Disputes diff --git a/Blockchain/Sources/Blockchain/Types/StateRef.swift b/Blockchain/Sources/Blockchain/Types/StateRef.swift index c8e45b01..2ca1720c 100644 --- a/Blockchain/Sources/Blockchain/Types/StateRef.swift +++ b/Blockchain/Sources/Blockchain/Types/StateRef.swift @@ -22,6 +22,10 @@ public final class StateRef: Ref, @unchecked Sendable { public var stateRoot: Data32 { lazyStateRoot.value.value } + + override public var description: String { + "StateRef(\(stateRoot.toHexString()))" + } } extension StateRef: Codable { diff --git a/Blockchain/Sources/Blockchain/Validator/BlockAuthor.swift b/Blockchain/Sources/Blockchain/Validator/BlockAuthor.swift index 4a31c64a..728c11b2 100644 --- a/Blockchain/Sources/Blockchain/Validator/BlockAuthor.swift +++ b/Blockchain/Sources/Blockchain/Validator/BlockAuthor.swift @@ -1,5 +1,6 @@ import Codec import Foundation +import Synchronization import TracingUtils import Utils @@ -8,7 +9,7 @@ public final class BlockAuthor: ServiceBase2, @unchecked Sendable { private let keystore: KeyStore private let extrinsicPool: ExtrinsicPoolService - private var tickets: ThreadSafeContainer<[RuntimeEvents.SafroleTicketsGenerated]> = .init([]) + private let tickets: ThreadSafeContainer<[RuntimeEvents.SafroleTicketsGenerated]> = .init([]) public init( config: ProtocolConfigRef, @@ -22,7 +23,7 @@ public final class BlockAuthor: ServiceBase2, @unchecked Sendable { self.keystore = keystore self.extrinsicPool = extrinsicPool - super.init(logger: Logger(label: "BlockAuthor"), config: config, eventBus: eventBus, scheduler: scheduler) + super.init(id: "BlockAuthor", config: config, eventBus: eventBus, scheduler: scheduler) await subscribe(RuntimeEvents.SafroleTicketsGenerated.self, id: "BlockAuthor.SafroleTicketsGenerated") { [weak self] event in try await self?.on(safroleTicketsGenerated: event) @@ -33,11 +34,11 @@ public final class BlockAuthor: ServiceBase2, @unchecked Sendable { } } - public func on(genesis state: StateRef) async { - await scheduleNewBlocks( - ticketsOrKeys: state.value.safroleState.ticketsOrKeys, - timeslot: timeProvider.getTime().timeToTimeslot(config: config) - ) + public func on(genesis _: StateRef) async { + let nowTimeslot = timeProvider.getTime().timeToTimeslot(config: config) + // schedule for current epoch + let epoch = (nowTimeslot + 1).timeslotToEpochIndex(config: config) + await onBeforeEpoch(epoch: epoch) } public func createNewBlock( @@ -47,6 +48,9 @@ public final class BlockAuthor: ServiceBase2, @unchecked Sendable { -> BlockRef { let parentHash = dataProvider.bestHead + + logger.trace("creating new block for timeslot: \(timeslot) with parent hash: \(parentHash)") + let state = try await dataProvider.getState(hash: parentHash) let epoch = timeslot.timeslotToEpochIndex(config: config) @@ -156,7 +160,7 @@ public final class BlockAuthor: ServiceBase2, @unchecked Sendable { await withSpan("BlockAuthor.newBlock", logger: logger) { _ in // TODO: add timeout let block = try await createNewBlock(timeslot: timeslot, claim: claim) - logger.info("New block created: #\(block.header.timeslot) \(block.hash)") + logger.info("New block created: #\(block.header.timeslot) \(block.hash) on parent #\(block.header.parentHash)") publish(RuntimeEvents.BlockAuthored(block: block)) } } @@ -205,7 +209,7 @@ public final class BlockAuthor: ServiceBase2, @unchecked Sendable { if delay < 0 { continue } - logger.debug("Scheduling new block task at timeslot \(timeslot)") + logger.trace("Scheduling new block task at timeslot \(timeslot) for claim \(claim.1.data.toHexString())") schedule(id: "BlockAuthor.newBlock", delay: delay) { [weak self] in if let self { await newBlock(timeslot: timeslot, claim: .left(claim)) @@ -223,7 +227,7 @@ public final class BlockAuthor: ServiceBase2, @unchecked Sendable { if delay < 0 { continue } - logger.debug("Scheduling new block task at timeslot \(timeslot)") + logger.trace("Scheduling new block task at timeslot \(timeslot) for key \(pubkey.data.toHexString())") schedule(id: "BlockAuthor.newBlock", delay: delay) { [weak self] in if let self { await newBlock(timeslot: timeslot, claim: .right(pubkey)) diff --git a/Blockchain/Sources/Blockchain/Validator/ExtrinsicPoolService.swift b/Blockchain/Sources/Blockchain/Validator/ExtrinsicPoolService.swift index 6a9f0522..7a03f478 100644 --- a/Blockchain/Sources/Blockchain/Validator/ExtrinsicPoolService.swift +++ b/Blockchain/Sources/Blockchain/Validator/ExtrinsicPoolService.swift @@ -4,7 +4,7 @@ import Utils private typealias TicketItem = ExtrinsicTickets.TicketItem private actor ServiceStorage { - let logger: Logger + var logger: Logger! // sorted array ordered by output var pendingTickets: SortedUniqueArray = .init() @@ -13,11 +13,14 @@ private actor ServiceStorage { var entropy: Data32 = .init() let ringContext: Bandersnatch.RingContext - init(logger: Logger, ringContext: Bandersnatch.RingContext) { - self.logger = logger + init(ringContext: Bandersnatch.RingContext) { self.ringContext = ringContext } + func setLogger(_ logger: Logger) { + self.logger = logger + } + func add(tickets: [TicketItem], config: ProtocolConfigRef) { for ticket in tickets { if (try? ticket.validate(config: config)) == nil { @@ -77,12 +80,12 @@ public final class ExtrinsicPoolService: ServiceBase, @unchecked Sendable { ) async { self.dataProvider = dataProvider - let logger = Logger(label: "ExtrinsicPoolService") - let ringContext = try! Bandersnatch.RingContext(size: UInt(config.value.totalNumberOfValidators)) - storage = ServiceStorage(logger: logger, ringContext: ringContext) + storage = ServiceStorage(ringContext: ringContext) + + super.init(id: "ExtrinsicPoolService", config: config, eventBus: eventBus) - super.init(logger: logger, config: config, eventBus: eventBus) + await storage.setLogger(logger) await subscribe(RuntimeEvents.SafroleTicketsGenerated.self, id: "ExtrinsicPool.SafroleTicketsGenerated") { [weak self] event in try await self?.on(safroleTicketsGenerated: event) diff --git a/Blockchain/Sources/Blockchain/Validator/GuaranteeingService.swift b/Blockchain/Sources/Blockchain/Validator/GuaranteeingService.swift index 2c58c0ac..b893bd6f 100644 --- a/Blockchain/Sources/Blockchain/Validator/GuaranteeingService.swift +++ b/Blockchain/Sources/Blockchain/Validator/GuaranteeingService.swift @@ -28,7 +28,7 @@ public final class GuaranteeingService: ServiceBase2, @unchecked Sendable { self.runtime = runtime self.extrinsicPool = extrinsicPool - super.init(logger: Logger(label: "BlockAuthor"), config: config, eventBus: eventBus, scheduler: scheduler) + super.init(id: "BlockAuthor", config: config, eventBus: eventBus, scheduler: scheduler) } public func on(genesis _: StateRef) async {} diff --git a/Blockchain/Sources/Blockchain/Validator/SafroleService.swift b/Blockchain/Sources/Blockchain/Validator/SafroleService.swift index 22d59a90..646c4d69 100644 --- a/Blockchain/Sources/Blockchain/Validator/SafroleService.swift +++ b/Blockchain/Sources/Blockchain/Validator/SafroleService.swift @@ -27,7 +27,7 @@ public final class SafroleService: ServiceBase, @unchecked Sendable { self.keystore = keystore ringContext = try! Bandersnatch.RingContext(size: UInt(config.value.totalNumberOfValidators)) - super.init(logger: Logger(label: "SafroleService"), config: config, eventBus: eventBus) + super.init(id: "SafroleService", config: config, eventBus: eventBus) await subscribe(RuntimeEvents.BlockImported.self, id: "SafroleService.BlockImported") { [weak self] event in try await self?.on(blockImported: event) @@ -66,8 +66,6 @@ public final class SafroleService: ServiceBase, @unchecked Sendable { continue } - logger.debug("Generating tickets for validator \(pubkey)") - try withSpan("generateTickets") { _ in let tickets = try SafroleService.generateTickets( count: TicketIndex(config.value.ticketEntriesPerValidator), @@ -86,7 +84,7 @@ public final class SafroleService: ServiceBase, @unchecked Sendable { } if events.isEmpty { - logger.debug("Not in next validators") + logger.trace("Not in next validators") } return events diff --git a/Blockchain/Sources/Blockchain/Validator/ServiceBase.swift b/Blockchain/Sources/Blockchain/Validator/ServiceBase.swift index 84effa38..3eb81503 100644 --- a/Blockchain/Sources/Blockchain/Validator/ServiceBase.swift +++ b/Blockchain/Sources/Blockchain/Validator/ServiceBase.swift @@ -2,13 +2,15 @@ import TracingUtils import Utils public class ServiceBase { - public let logger: Logger + public let id: UniqueId + let logger: Logger public let config: ProtocolConfigRef private let eventBus: EventBus private let subscriptionTokens: ThreadSafeContainer<[EventBus.SubscriptionToken]> = .init([]) - init(logger: Logger, config: ProtocolConfigRef, eventBus: EventBus) { - self.logger = logger + init(id: UniqueId, config: ProtocolConfigRef, eventBus: EventBus) { + self.id = id + logger = Logger(label: id) self.config = config self.eventBus = eventBus } diff --git a/Blockchain/Sources/Blockchain/Validator/ServiceBase2.swift b/Blockchain/Sources/Blockchain/Validator/ServiceBase2.swift index 07acdf08..eaae3717 100644 --- a/Blockchain/Sources/Blockchain/Validator/ServiceBase2.swift +++ b/Blockchain/Sources/Blockchain/Validator/ServiceBase2.swift @@ -20,9 +20,9 @@ public class ServiceBase2: ServiceBase, @unchecked Sendable { private let scheduler: Scheduler private let cancellables: ThreadSafeContainer> = .init([]) - public init(logger: Logger, config: ProtocolConfigRef, eventBus: EventBus, scheduler: Scheduler) { + public init(id: UniqueId, config: ProtocolConfigRef, eventBus: EventBus, scheduler: Scheduler) { self.scheduler = scheduler - super.init(logger: logger, config: config, eventBus: eventBus) + super.init(id: id, config: config, eventBus: eventBus) } deinit { @@ -63,8 +63,8 @@ public class ServiceBase2: ServiceBase, @unchecked Sendable { @discardableResult public func scheduleForNextEpoch(_ id: UniqueId, fn: @escaping @Sendable (EpochIndex) async -> Void) -> Cancellable { - let now = timeProvider.getTimeInterval() - let nowTimeslot = UInt32(now).timeToTimeslot(config: config) + let now = timeProvider.getTime() + let nowTimeslot = now.timeToTimeslot(config: config) let nextEpoch = (nowTimeslot + 1).timeslotToEpochIndex(config: config) + 1 return scheduleFor(epoch: nextEpoch, id: id, fn: fn) } @@ -73,12 +73,10 @@ public class ServiceBase2: ServiceBase, @unchecked Sendable { private func scheduleFor(epoch: EpochIndex, id: UniqueId, fn: @escaping @Sendable (EpochIndex) async -> Void) -> Cancellable { let scheduleTime = config.scheduleTimeForPrepareEpoch(epoch: epoch) let now = timeProvider.getTimeInterval() - let delay = scheduleTime - now + var delay = scheduleTime - now if delay < 0 { - // too late / current epoch is about to end - // schedule for the one after - logger.debug("\(id): skipping epoch \(epoch) because it is too late") - return scheduleFor(epoch: epoch + 1, id: id, fn: fn) + logger.debug("\(id): late epoch start \(epoch), expectedDelay \(delay)") + delay = 0 } logger.trace("\(id): scheduling epoch \(epoch) in \(delay)") return schedule(id: id, delay: delay) { [weak self] in diff --git a/Blockchain/Tests/BlockchainTests/BlockAuthorTests.swift b/Blockchain/Tests/BlockchainTests/BlockAuthorTests.swift index 56357026..9460ad54 100644 --- a/Blockchain/Tests/BlockchainTests/BlockAuthorTests.swift +++ b/Blockchain/Tests/BlockchainTests/BlockAuthorTests.swift @@ -6,56 +6,39 @@ import Utils @testable import Blockchain struct BlockAuthorTests { - let config: ProtocolConfigRef - let timeProvider: MockTimeProvider - let dataProvider: BlockchainDataProvider - let eventBus: EventBus - let scheduler: MockScheduler - let keystore: KeyStore - let blockAuthor: BlockAuthor - let runtime: Runtime - let storeMiddleware: StoreMiddleware - - init() async throws { + func setup() async -> (BlockchainServices, BlockAuthor, Runtime) { // setupTestLogger() - config = ProtocolConfigRef.dev - timeProvider = MockTimeProvider(time: 988) - - let (genesisState, genesisBlock) = try State.devGenesis(config: config) - dataProvider = try await BlockchainDataProvider(InMemoryDataProvider(genesisState: genesisState, genesisBlock: genesisBlock)) - - storeMiddleware = StoreMiddleware() - eventBus = EventBus(eventMiddleware: Middleware(storeMiddleware)) - - scheduler = MockScheduler(timeProvider: timeProvider) - - keystore = try await DevKeyStore(devKeysCount: config.value.totalNumberOfValidators) - - blockAuthor = await BlockAuthor( - config: config, - dataProvider: dataProvider, - eventBus: eventBus, - keystore: keystore, - scheduler: scheduler, - extrinsicPool: ExtrinsicPoolService(config: config, dataProvider: dataProvider, eventBus: eventBus) - ) - - runtime = Runtime(config: config) + let services = await BlockchainServices() + let blockAuthor = await services.blockAuthor + let runtime = Runtime(config: services.config) + return (services, blockAuthor, runtime) } @Test func createNewBlockWithFallbackKey() async throws { - let genesisState = try await dataProvider.getState(hash: dataProvider.genesisBlockHash) + let (services, blockAuthor, runtime) = await setup() + let config = services.config + let timeProvider = services.timeProvider + let genesisState = services.genesisState let timeslot = timeProvider.getTime().timeToTimeslot(config: config) - // get the validator key - let idx = timeslot % UInt32(config.value.totalNumberOfValidators) - let devKey = try DevKeyStore.getDevKey(seed: idx) + // dry run Safrole to get the validator key + let res = try genesisState.value.updateSafrole( + config: config, + slot: timeslot, + entropy: Data32(), + offenders: [], + extrinsics: .dummy(config: config) + ) + + let idx = timeslot % UInt32(config.value.epochLength) + let key = res.state.ticketsOrKeys.right!.array[Int(idx)] + let pubkey = try! Bandersnatch.PublicKey(data: key) // Create a new block - let block = try await blockAuthor.createNewBlock(timeslot: timeslot, claim: .right(devKey.bandersnatch)) + let block = try await blockAuthor.createNewBlock(timeslot: timeslot, claim: .right(pubkey)) // Verify block try _ = runtime.apply(block: block, state: genesisState, context: .init(timeslot: timeslot + 1)) @@ -63,7 +46,13 @@ struct BlockAuthorTests { @Test func createNewBlockWithTicket() async throws { - let genesisState = try await dataProvider.getState(hash: dataProvider.genesisBlockHash) + let (services, blockAuthor, runtime) = await setup() + let config = services.config + let timeProvider = services.timeProvider + let genesisState = services.genesisState + let keystore = services.keystore + let dataProvider = services.dataProvider + var state = genesisState.value state.safroleState.ticketsVerifier = try Bandersnatch.RingCommitment( @@ -95,6 +84,7 @@ struct BlockAuthorTests { ) state.safroleState.ticketsOrKeys = try .left(ConfigFixedSizeArray(config: config, array: validatorTickets)) + state.timeslot = timeslot - 1 let newStateRef = StateRef(state) // modify genesis state @@ -108,14 +98,19 @@ struct BlockAuthorTests { } @Test - func scheduleNewBlocks() async throws { - let genesisState = try await dataProvider.getState(hash: dataProvider.genesisBlockHash) + func firstBlock() async throws { + let (services, blockAuthor, runtime) = await setup() + let config = services.config + let timeProvider = services.timeProvider + let genesisState = services.genesisState + let scheduler = services.scheduler + let storeMiddleware = services.storeMiddleware await blockAuthor.on(genesis: genesisState) #expect(scheduler.storage.value.tasks.count > 0) - await scheduler.advance(by: 2) + await scheduler.advance(by: TimeInterval(2)) let events = await storeMiddleware.wait() #expect(events.count == 1) diff --git a/Blockchain/Tests/BlockchainTests/BlockchainServices.swift b/Blockchain/Tests/BlockchainTests/BlockchainServices.swift new file mode 100644 index 00000000..140b12c1 --- /dev/null +++ b/Blockchain/Tests/BlockchainTests/BlockchainServices.swift @@ -0,0 +1,87 @@ +import Blockchain +import Utils + +class BlockchainServices { + let config: ProtocolConfigRef + let timeProvider: MockTimeProvider + let dataProvider: BlockchainDataProvider + let eventBus: EventBus + let scheduler: MockScheduler + let keystore: KeyStore + let storeMiddleware: StoreMiddleware + let genesisBlock: BlockRef + let genesisState: StateRef + + private var _blockchain: Blockchain? + private weak var _blockchainRef: Blockchain? + + private var _blockAuthor: BlockAuthor? + private weak var _blockAuthorRef: BlockAuthor? + + init( + config: ProtocolConfigRef = .dev, + timeProvider: MockTimeProvider = MockTimeProvider(time: 988) + ) async { + self.config = config + self.timeProvider = timeProvider + + let (genesisState, genesisBlock) = try! State.devGenesis(config: config) + self.genesisBlock = genesisBlock + self.genesisState = genesisState + dataProvider = try! await BlockchainDataProvider(InMemoryDataProvider(genesisState: genesisState, genesisBlock: genesisBlock)) + + storeMiddleware = StoreMiddleware() + eventBus = EventBus(eventMiddleware: .serial(Middleware(storeMiddleware), .noError), handlerMiddleware: .noError) + + scheduler = MockScheduler(timeProvider: timeProvider) + + keystore = try! await DevKeyStore() + } + + deinit { + _blockchain = nil + _blockAuthor = nil + + if let _blockchainRef { + fatalError("BlockchainServices: blockchain still alive. retain count: \(_getRetainCount(_blockchainRef))") + } + + if let _blockAuthorRef { + fatalError("BlockchainServices: blockAuthor still alive. retain count: \(_getRetainCount(_blockAuthorRef))") + } + } + + var blockchain: Blockchain { + get async { + if let _blockchain { + return _blockchain + } + _blockchain = try! await Blockchain( + config: config, + dataProvider: dataProvider, + timeProvider: timeProvider, + eventBus: eventBus + ) + _blockchainRef = _blockchain + return _blockchain! + } + } + + var blockAuthor: BlockAuthor { + get async { + if let _blockAuthor { + return _blockAuthor + } + _blockAuthor = await BlockAuthor( + config: config, + dataProvider: dataProvider, + eventBus: eventBus, + keystore: keystore, + scheduler: scheduler, + extrinsicPool: ExtrinsicPoolService(config: config, dataProvider: dataProvider, eventBus: eventBus) + ) + _blockAuthorRef = _blockAuthor + return _blockAuthor! + } + } +} diff --git a/Blockchain/Tests/BlockchainTests/DispatchQueueSchedulerTests.swift b/Blockchain/Tests/BlockchainTests/DispatchQueueSchedulerTests.swift index 97c2e60c..6f87ce7b 100644 --- a/Blockchain/Tests/BlockchainTests/DispatchQueueSchedulerTests.swift +++ b/Blockchain/Tests/BlockchainTests/DispatchQueueSchedulerTests.swift @@ -23,77 +23,85 @@ struct DispatchQueueSchedulerTests { } @Test func scheduleDelayedTask() async throws { - try await confirmation { confirm in - let delay = 0.5 - let now = Date() - let end: ThreadSafeContainer = .init(nil) - let cancel = scheduler.schedule(delay: delay, repeats: false) { - end.value = Date() - confirm() + await withKnownIssue("unstable when cpu is busy", isIntermittent: true) { + try await confirmation { confirm in + let delay = 0.5 + let now = Date() + let end: ThreadSafeContainer = .init(nil) + let cancel = scheduler.schedule(delay: delay, repeats: false) { + end.value = Date() + confirm() + } + + try await Task.sleep(for: .seconds(1)) + + _ = cancel + + let diff = try #require(end.value).timeIntervalSince(now) - delay + let diffAbs = abs(diff) + #expect(diffAbs < 0.5) } - - try await Task.sleep(for: .seconds(1)) - - _ = cancel - - let diff = try #require(end.value).timeIntervalSince(now) - delay - let diffAbs = abs(diff) - #expect(diffAbs < 1) } } @Test func scheduleRepeatingTask() async throws { - try await confirmation(expectedCount: 2) { confirm in - let delay = 1.5 - let now = Date() - let executionTimes = ThreadSafeContainer<[Date]>([]) - let expectedExecutions = 2 - - let cancel = scheduler.schedule(delay: delay, repeats: true) { - executionTimes.value.append(Date()) - confirm() - } - - try await Task.sleep(for: .seconds(3.1)) - - _ = cancel - - #expect(executionTimes.value.count == expectedExecutions) - - for (index, time) in executionTimes.value.enumerated() { - let expectedInterval = delay * Double(index + 1) - let actualInterval = time.timeIntervalSince(now) - let difference = abs(actualInterval - expectedInterval) - #expect(difference < 1) + await withKnownIssue("unstable when cpu is busy", isIntermittent: true) { + try await confirmation(expectedCount: 2) { confirm in + let delay = 1.5 + let now = Date() + let executionTimes = ThreadSafeContainer<[Date]>([]) + let expectedExecutions = 2 + + let cancel = scheduler.schedule(delay: delay, repeats: true) { + executionTimes.value.append(Date()) + confirm() + } + + try await Task.sleep(for: .seconds(3.1)) + + _ = cancel + + #expect(executionTimes.value.count == expectedExecutions) + + for (index, time) in executionTimes.value.enumerated() { + let expectedInterval = delay * Double(index + 1) + let actualInterval = time.timeIntervalSince(now) + let difference = abs(actualInterval - expectedInterval) + #expect(difference < 0.5) + } } } } @Test func cancelTask() async throws { - try await confirmation(expectedCount: 0) { confirm in - let cancel = scheduler.schedule(delay: 0, repeats: false) { - confirm() - } + await withKnownIssue("unstable when cpu is busy", isIntermittent: true) { + try await confirmation(expectedCount: 0) { confirm in + let cancel = scheduler.schedule(delay: 0, repeats: false) { + confirm() + } - cancel.cancel() + cancel.cancel() - try await Task.sleep(for: .seconds(0.1)) + try await Task.sleep(for: .seconds(0.1)) + } } } @Test func cancelRepeatingTask() async throws { - try await confirmation(expectedCount: 2) { confirm in - let delay = 1.0 + await withKnownIssue("unstable when cpu is busy", isIntermittent: true) { + try await confirmation(expectedCount: 2) { confirm in + let delay = 1.0 - let cancel = scheduler.schedule(delay: delay, repeats: true) { - confirm() - } + let cancel = scheduler.schedule(delay: delay, repeats: true) { + confirm() + } - try await Task.sleep(for: .seconds(2.2)) + try await Task.sleep(for: .seconds(2.2)) - cancel.cancel() + cancel.cancel() - try await Task.sleep(for: .seconds(0.1)) + try await Task.sleep(for: .seconds(0.1)) + } } } diff --git a/Blockchain/Tests/BlockchainTests/ExtrinsicPoolServiceTests.swift b/Blockchain/Tests/BlockchainTests/ExtrinsicPoolServiceTests.swift index 242d9b28..175ba3ce 100644 --- a/Blockchain/Tests/BlockchainTests/ExtrinsicPoolServiceTests.swift +++ b/Blockchain/Tests/BlockchainTests/ExtrinsicPoolServiceTests.swift @@ -25,7 +25,7 @@ struct ExtrinsicPoolServiceTests { dataProvider = try await BlockchainDataProvider(InMemoryDataProvider(genesisState: genesisState, genesisBlock: genesisBlock)) storeMiddleware = StoreMiddleware() - eventBus = EventBus(eventMiddleware: Middleware(storeMiddleware)) + eventBus = EventBus(eventMiddleware: .serial(Middleware(storeMiddleware), .noError), handlerMiddleware: .noError) keystore = try await DevKeyStore(devKeysCount: config.value.totalNumberOfValidators) diff --git a/Blockchain/Tests/BlockchainTests/MockScheduler.swift b/Blockchain/Tests/BlockchainTests/MockScheduler.swift index db55836b..9266ca47 100644 --- a/Blockchain/Tests/BlockchainTests/MockScheduler.swift +++ b/Blockchain/Tests/BlockchainTests/MockScheduler.swift @@ -1,17 +1,19 @@ -import Atomics import Blockchain import Foundation +import TracingUtils import Utils +private let logger = Logger(label: "MockScheduler") + final class SchedulerTask: Sendable, Comparable { - let id: Int + let id: UniqueId let scheduleTime: TimeInterval let repeats: TimeInterval? let task: @Sendable () async -> Void let cancel: (@Sendable () -> Void)? init( - id: Int, + id: UniqueId, scheduleTime: TimeInterval, repeats: TimeInterval?, task: @escaping @Sendable () async -> Void, @@ -38,8 +40,6 @@ struct Storage: Sendable { } final class MockScheduler: Scheduler, Sendable { - static let idGenerator = ManagedAtomic(0) - let mockTimeProvider: MockTimeProvider var timeProvider: TimeProvider { mockTimeProvider @@ -59,7 +59,7 @@ final class MockScheduler: Scheduler, Sendable { ) -> Cancellable { let now = timeProvider.getTimeInterval() let scheduleTime = now + delay - let id = Self.idGenerator.loadThenWrappingIncrement(ordering: .relaxed) + let id = UniqueId() let task = SchedulerTask(id: id, scheduleTime: scheduleTime, repeats: repeats ? delay : nil, task: task, cancel: onCancel) storage.write { storage in storage.tasks.insert(task) @@ -90,6 +90,7 @@ final class MockScheduler: Scheduler, Sendable { if let task { mockTimeProvider.advance(to: task.scheduleTime) + logger.debug("executing task \(task.id) at time \(task.scheduleTime)") await task.task() return true diff --git a/Blockchain/Tests/BlockchainTests/SafroleServiceTests.swift b/Blockchain/Tests/BlockchainTests/SafroleServiceTests.swift index 33b38933..0f50da12 100644 --- a/Blockchain/Tests/BlockchainTests/SafroleServiceTests.swift +++ b/Blockchain/Tests/BlockchainTests/SafroleServiceTests.swift @@ -24,7 +24,7 @@ struct SafroleServiceTests { (genesisState, _) = try State.devGenesis(config: config) storeMiddleware = StoreMiddleware() - eventBus = EventBus(eventMiddleware: Middleware(storeMiddleware)) + eventBus = EventBus(eventMiddleware: .serial(Middleware(storeMiddleware), .noError), handlerMiddleware: .noError) keystore = try await DevKeyStore(devKeysCount: 2) diff --git a/Blockchain/Tests/BlockchainTests/ValidatorServiceTests.swift b/Blockchain/Tests/BlockchainTests/ValidatorServiceTests.swift index 87bee198..07c74cc2 100644 --- a/Blockchain/Tests/BlockchainTests/ValidatorServiceTests.swift +++ b/Blockchain/Tests/BlockchainTests/ValidatorServiceTests.swift @@ -6,50 +6,29 @@ import Utils @testable import Blockchain struct ValidatorServiceTests { - let config: ProtocolConfigRef - let timeProvider: MockTimeProvider - let dataProvider: BlockchainDataProvider - let eventBus: EventBus - let scheduler: MockScheduler - let keystore: KeyStore - let validatorService: ValidatorService - let storeMiddleware: StoreMiddleware + func setup(time: TimeInterval = 988) async throws -> (BlockchainServices, ValidatorService) { + setupTestLogger() - init() async throws { - // setupTestLogger() - - config = ProtocolConfigRef.dev - timeProvider = MockTimeProvider(time: 988) - - let (genesisState, genesisBlock) = try State.devGenesis(config: config) - dataProvider = try await BlockchainDataProvider(InMemoryDataProvider(genesisState: genesisState, genesisBlock: genesisBlock)) - - storeMiddleware = StoreMiddleware() - eventBus = EventBus(eventMiddleware: Middleware(storeMiddleware)) - - scheduler = MockScheduler(timeProvider: timeProvider) - - keystore = try await DevKeyStore(devKeysCount: config.value.totalNumberOfValidators) - - let blockchain = try await Blockchain( - config: config, - dataProvider: dataProvider, - timeProvider: timeProvider, - eventBus: eventBus + let services = await BlockchainServices( + timeProvider: MockTimeProvider(time: time) ) - - validatorService = await ValidatorService( - blockchain: blockchain, - keystore: keystore, - eventBus: eventBus, - scheduler: scheduler, - dataProvider: dataProvider + let validatorService = await ValidatorService( + blockchain: services.blockchain, + keystore: services.keystore, + eventBus: services.eventBus, + scheduler: services.scheduler, + dataProvider: services.dataProvider ) + return (services, validatorService) } @Test func onGenesis() async throws { - let genesisState = try await dataProvider.getState(hash: dataProvider.genesisBlockHash) + let (services, validatorService) = try await setup() + let genesisState = services.genesisState + let storeMiddleware = services.storeMiddleware + let config = services.config + let scheduler = services.scheduler await validatorService.on(genesis: genesisState) @@ -65,7 +44,14 @@ struct ValidatorServiceTests { @Test func produceBlocks() async throws { - let genesisState = try await dataProvider.getState(hash: dataProvider.genesisBlockHash) + let (services, validatorService) = try await setup() + let genesisState = services.genesisState + let storeMiddleware = services.storeMiddleware + let config = services.config + let scheduler = services.scheduler + let timeProvider = services.timeProvider + let keystore = services.keystore + let dataProvider = services.dataProvider await validatorService.on(genesis: genesisState) @@ -102,18 +88,28 @@ struct ValidatorServiceTests { #expect(try await dataProvider.getHeads().contains(block.hash)) } - @Test - func makeManyBlocks() async throws { - let genesisState = try await dataProvider.getState(hash: dataProvider.genesisBlockHash) + // try different genesis time offset to ensure edge cases are covered + @Test(arguments: [988, 1000, 1003, 1021]) + func makeManyBlocks(time: Int) async throws { + let (services, validatorService) = try await setup(time: TimeInterval(time)) + let genesisState = services.genesisState + let storeMiddleware = services.storeMiddleware + let config = services.config + let scheduler = services.scheduler await validatorService.on(genesis: genesisState) - await scheduler.advance(by: TimeInterval(config.value.slotPeriodSeconds) * 20) + await storeMiddleware.wait() + + for _ in 0 ..< 25 { + await scheduler.advance(by: TimeInterval(config.value.slotPeriodSeconds)) + await storeMiddleware.wait() // let events to be processed + } let events = await storeMiddleware.wait() let blockAuthoredEvents = events.filter { $0 is RuntimeEvents.BlockAuthored } - #expect(blockAuthoredEvents.count == 20) + #expect(blockAuthoredEvents.count == 25) } } diff --git a/Networking/Package.swift b/Networking/Package.swift index 36e29a4d..ee9b929a 100644 --- a/Networking/Package.swift +++ b/Networking/Package.swift @@ -20,7 +20,7 @@ let package = Package( .package(url: "https://github.com/apple/swift-log.git", from: "1.6.0"), .package(url: "https://github.com/apple/swift-certificates.git", from: "1.5.0"), .package(url: "https://github.com/apple/swift-testing.git", branch: "0.10.0"), - .package(url: "https://github.com/gh123man/Async-Channels.git", from: "1.0.0"), + .package(url: "https://github.com/gh123man/Async-Channels.git", from: "1.0.2"), ], targets: [ .target( diff --git a/Networking/Sources/Networking/Stream.swift b/Networking/Sources/Networking/Stream.swift index 85adefaa..03aaca32 100644 --- a/Networking/Sources/Networking/Stream.swift +++ b/Networking/Sources/Networking/Stream.swift @@ -71,17 +71,17 @@ final class Stream: Sendable, StreamProtocol { if data.isEmpty { return } - // TODO: backpressure handling - // https://github.com/gh123man/Async-Channels/issues/11 - Task { - await channel.send(data) + + if !channel.syncSend(data) { + logger.warning("stream \(id) is full") + // TODO: backpressure handling } } // initiate stream close public func close(abort: Bool = false) { if status != .open { - logger.warning("Trying to close stream \(stream.id) in status \(status)") + logger.warning("Trying to close stream \(id) in status \(status)") return } status = abort ? .aborted : .closed diff --git a/Utils/Sources/Utils/ConfigLimitedSizeArray.swift b/Utils/Sources/Utils/ConfigLimitedSizeArray.swift index 4c37a5d1..80b9ac68 100644 --- a/Utils/Sources/Utils/ConfigLimitedSizeArray.swift +++ b/Utils/Sources/Utils/ConfigLimitedSizeArray.swift @@ -187,6 +187,16 @@ extension ConfigLimitedSizeArray { } } +extension ConfigLimitedSizeArray: CustomStringConvertible, CustomDebugStringConvertible { + public var description: String { + array.description + } + + public var debugDescription: String { + "ConfigLimitedSizeArray<\(T.self), \(minLength), \(maxLength)>(\(description))" + } +} + public typealias ConfigFixedSizeArray = ConfigLimitedSizeArray extension ConfigLimitedSizeArray: Decodable where T: Decodable { diff --git a/Utils/Sources/Utils/Crypto/Bandersnatch.swift b/Utils/Sources/Utils/Crypto/Bandersnatch.swift index 7b95e0ab..cbd4cd1f 100644 --- a/Utils/Sources/Utils/Crypto/Bandersnatch.swift +++ b/Utils/Sources/Utils/Crypto/Bandersnatch.swift @@ -2,8 +2,6 @@ import bandersnatch_vrfs import Foundation import TracingUtils -private let logger = Logger(label: "Bandersnatch") - public enum Bandersnatch: KeyType { public enum Error: Swift.Error { case createSecretFailed(Int) @@ -39,11 +37,6 @@ public enum Bandersnatch: KeyType { /// /// Used for ticket claiming during block production. public func ietfVRFSign(vrfInputData: Data, auxData: Data = Data()) throws -> Data96 { - logger.trace( - "ietfVRFSign", - metadata: ["vrfInputData": "\(vrfInputData.toDebugHexString())", "auxData": "\(auxData.toDebugHexString())"] - ) - var output = Data(repeating: 0, count: 96) try FFIUtils.call(vrfInputData, auxData, out: &output) { ptrs, out_buf in @@ -64,8 +57,6 @@ public enum Bandersnatch: KeyType { } public func getOutput(vrfInputData: Data) throws -> Data32 { - logger.trace("getOutput", metadata: ["vrfInputData": "\(vrfInputData.toDebugHexString())"]) - var output = Data(repeating: 0, count: 32) try FFIUtils.call(vrfInputData, out: &output) { ptrs, out_buf in @@ -152,15 +143,6 @@ public enum Bandersnatch: KeyType { public func ietfVRFVerify( vrfInputData: Data, auxData: Data = Data(), signature: Data96 ) throws(Error) -> Data32 { - logger.trace( - "ietfVRFVerify", - metadata: [ - "vrfInputData": "\(vrfInputData.toDebugHexString())", - "auxData": "\(auxData.toDebugHexString())", - "signature": "\(signature.data.toDebugHexString())", - ] - ) - var output = Data(repeating: 0, count: 32) try FFIUtils.call(vrfInputData, auxData, signature.data, out: &output) { ptrs, out_buf in @@ -220,11 +202,6 @@ public enum Bandersnatch: KeyType { /// /// Used for tickets submission. public func ringVRFSign(vrfInputData: Data, auxData: Data = Data()) throws(Error) -> Data784 { - logger.trace( - "ringVRFSign", - metadata: ["vrfInputData": "\(vrfInputData.toDebugHexString())", "auxData": "\(auxData.toDebugHexString())"] - ) - var output = Data(repeating: 0, count: 784) try FFIUtils.call(vrfInputData, auxData, out: &output) { ptrs, out_buf in @@ -315,15 +292,6 @@ public enum Bandersnatch: KeyType { /// /// On success returns the VRF output hash. public func ringVRFVerify(vrfInputData: Data, auxData: Data = Data(), signature: Data784) throws(Error) -> Data32 { - logger.trace( - "ringVRFVerify", - metadata: [ - "vrfInputData": "\(vrfInputData.toDebugHexString())", - "auxData": "\(auxData.toDebugHexString())", - "signature": "\(signature.data.toDebugHexString())", - ] - ) - var output = Data(repeating: 0, count: 32) try FFIUtils.call(vrfInputData, auxData, signature.data, out: &output) { ptrs, out_buf in diff --git a/Utils/Sources/Utils/Either.swift b/Utils/Sources/Utils/Either.swift index f892c843..35c5a767 100644 --- a/Utils/Sources/Utils/Either.swift +++ b/Utils/Sources/Utils/Either.swift @@ -3,6 +3,20 @@ import Codec public enum Either { case left(Left) case right(Right) + + public var left: Left? { + if case let .left(left) = self { + return left + } + return nil + } + + public var right: Right? { + if case let .right(right) = self { + return right + } + return nil + } } extension Either: Equatable where Left: Equatable, Right: Equatable {} diff --git a/Utils/Sources/Utils/EventBus/NoErrorMiddleware.swift b/Utils/Sources/Utils/EventBus/NoErrorMiddleware.swift new file mode 100644 index 00000000..c481a374 --- /dev/null +++ b/Utils/Sources/Utils/EventBus/NoErrorMiddleware.swift @@ -0,0 +1,17 @@ +private struct NoErrorMiddleware: MiddlewareProtocol { + public func handle(_ event: T, next: MiddlewareHandler) async throws { + do { + try await next(event) + } catch { + assertionFailure("NoErrorMiddleware failed: \(error)") + } + } +} + +extension Middleware { + /// Asserts that no error is thrown by the next middleware + /// NOTE: Used for testing only + public static var noError: Middleware { + Middleware(NoErrorMiddleware()) + } +} diff --git a/Utils/Sources/Utils/FixedSizeData.swift b/Utils/Sources/Utils/FixedSizeData.swift index cdf35c22..8503b8e9 100644 --- a/Utils/Sources/Utils/FixedSizeData.swift +++ b/Utils/Sources/Utils/FixedSizeData.swift @@ -30,7 +30,7 @@ extension FixedSizeData: Codable { } } -extension FixedSizeData: CustomStringConvertible { +extension FixedSizeData: CustomStringConvertible, CustomDebugStringConvertible { public var description: String { if T.value > 32 { let prefix = data.prefix(8).map { String(format: "%02x", $0) }.joined() @@ -40,6 +40,10 @@ extension FixedSizeData: CustomStringConvertible { return "0x\(data.map { String(format: "%02x", $0) }.joined())" } } + + public var debugDescription: String { + description + } } extension FixedSizeData: Comparable { diff --git a/Utils/Sources/Utils/UniqueId.swift b/Utils/Sources/Utils/UniqueId.swift index 97c8eb63..82e46d3e 100644 --- a/Utils/Sources/Utils/UniqueId.swift +++ b/Utils/Sources/Utils/UniqueId.swift @@ -7,7 +7,7 @@ public struct UniqueId: Sendable { public let id: Int public let name: String - public init(_ name: String) { + public init(_ name: String = "") { id = UniqueId.idGenerator.loadThenWrappingIncrement(ordering: .relaxed) self.name = name }