diff --git a/Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProvider.swift b/Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProvider.swift index 86104cde..4dcc5b0d 100644 --- a/Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProvider.swift +++ b/Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProvider.swift @@ -147,7 +147,7 @@ extension BlockchainDataProvider { try await dataProvider.remove(hash: hash) } - public var genesisBlockHash: Data32 { + public nonisolated var genesisBlockHash: Data32 { dataProvider.genesisBlockHash } diff --git a/Blockchain/Tests/BlockchainTests/MockScheduler.swift b/Blockchain/Sources/Blockchain/Scheduler/MockScheduler.swift similarity index 78% rename from Blockchain/Tests/BlockchainTests/MockScheduler.swift rename to Blockchain/Sources/Blockchain/Scheduler/MockScheduler.swift index 596edaee..5f322200 100644 --- a/Blockchain/Tests/BlockchainTests/MockScheduler.swift +++ b/Blockchain/Sources/Blockchain/Scheduler/MockScheduler.swift @@ -1,11 +1,10 @@ -import Blockchain import Foundation import TracingUtils import Utils private let logger = Logger(label: "MockScheduler") -final class SchedulerTask: Sendable, Comparable { +private final class SchedulerTask: Sendable, Comparable { let id: UniqueId let scheduleTime: TimeInterval let repeats: TimeInterval? @@ -35,23 +34,27 @@ final class SchedulerTask: Sendable, Comparable { } } -struct Storage: Sendable { - var tasks: SortedArray = .init([]) +private struct Storage: Sendable { + fileprivate var tasks: SortedArray = .init([]) } -final class MockScheduler: Scheduler, Sendable { - let mockTimeProvider: MockTimeProvider - var timeProvider: TimeProvider { +public final class MockScheduler: Scheduler, Sendable { + public let mockTimeProvider: MockTimeProvider + public var timeProvider: TimeProvider { mockTimeProvider } - let storage: ThreadSafeContainer = .init(.init()) + private let storage: ThreadSafeContainer = .init(.init()) - init(timeProvider: MockTimeProvider) { + public init(timeProvider: MockTimeProvider) { mockTimeProvider = timeProvider } - func scheduleImpl( + public var taskCount: Int { + storage.read { $0.tasks.array.count } + } + + public func scheduleImpl( delay: TimeInterval, repeats: Bool, task: @escaping @Sendable () async -> Void, @@ -74,13 +77,13 @@ final class MockScheduler: Scheduler, Sendable { } } - func advance(by interval: TimeInterval) async { + public func advance(by interval: TimeInterval) async { let to = timeProvider.getTimeInterval() + interval while await advanceNext(to: to) {} mockTimeProvider.advance(to: to) } - func advanceNext(to time: TimeInterval) async -> Bool { + private func advanceNext(to time: TimeInterval) async -> Bool { let task: SchedulerTask? = storage.write { storage in if let task = storage.tasks.array.first, task.scheduleTime <= time { storage.tasks.remove(at: 0) diff --git a/Blockchain/Sources/Blockchain/Validator/BlockAuthor.swift b/Blockchain/Sources/Blockchain/Validator/BlockAuthor.swift index 86f638af..137ab636 100644 --- a/Blockchain/Sources/Blockchain/Validator/BlockAuthor.swift +++ b/Blockchain/Sources/Blockchain/Validator/BlockAuthor.swift @@ -28,7 +28,9 @@ public final class BlockAuthor: ServiceBase2, @unchecked Sendable { await subscribe(RuntimeEvents.SafroleTicketsGenerated.self, id: "BlockAuthor.SafroleTicketsGenerated") { [weak self] event in try await self?.on(safroleTicketsGenerated: event) } + } + public func onSyncCompleted() async { scheduleForNextEpoch("BlockAuthor.scheduleForNextEpoch") { [weak self] epoch in await self?.onBeforeEpoch(epoch: epoch) } @@ -37,7 +39,7 @@ public final class BlockAuthor: ServiceBase2, @unchecked Sendable { public func on(genesis _: StateRef) async { let nowTimeslot = timeProvider.getTime().timeToTimeslot(config: config) // schedule for current epoch - let epoch = (nowTimeslot + 1).timeslotToEpochIndex(config: config) + let epoch = nowTimeslot.timeslotToEpochIndex(config: config) await onBeforeEpoch(epoch: epoch) } @@ -179,8 +181,10 @@ public final class BlockAuthor: ServiceBase2, @unchecked Sendable { let bestHeadTimeslot = bestHead.timeslot let bestHeadEpoch = bestHeadTimeslot.timeslotToEpochIndex(config: config) - if bestHeadEpoch + 1 != epoch { + if bestHeadEpoch != 0, bestHeadEpoch + 1 < epoch { logger.warning("best head epoch \(bestHeadEpoch) is too far from current epoch \(epoch)") + } else if bestHeadEpoch >= epoch { + logger.error("trying to do onBeforeEpoch for epoch \(epoch) but best head epoch is \(bestHeadEpoch)") } let state = try await dataProvider.getState(hash: bestHead.hash) diff --git a/Blockchain/Sources/Blockchain/Validator/ServiceBase2.swift b/Blockchain/Sources/Blockchain/Validator/ServiceBase2.swift index eaae3717..1f8322c6 100644 --- a/Blockchain/Sources/Blockchain/Validator/ServiceBase2.swift +++ b/Blockchain/Sources/Blockchain/Validator/ServiceBase2.swift @@ -65,7 +65,7 @@ public class ServiceBase2: ServiceBase, @unchecked Sendable { public func scheduleForNextEpoch(_ id: UniqueId, fn: @escaping @Sendable (EpochIndex) async -> Void) -> Cancellable { let now = timeProvider.getTime() let nowTimeslot = now.timeToTimeslot(config: config) - let nextEpoch = (nowTimeslot + 1).timeslotToEpochIndex(config: config) + 1 + let nextEpoch = nowTimeslot.timeslotToEpochIndex(config: config) + 1 return scheduleFor(epoch: nextEpoch, id: id, fn: fn) } diff --git a/Blockchain/Sources/Blockchain/Validator/ValidatorService.swift b/Blockchain/Sources/Blockchain/Validator/ValidatorService.swift index 4f846539..3ee6d2a6 100644 --- a/Blockchain/Sources/Blockchain/Validator/ValidatorService.swift +++ b/Blockchain/Sources/Blockchain/Validator/ValidatorService.swift @@ -40,6 +40,10 @@ public final class ValidatorService: Sendable { ) } + public func onSyncCompleted() async { + await blockAuthor.onSyncCompleted() + } + public func on(genesis: StateRef) async { await safrole.on(genesis: genesis) await blockAuthor.on(genesis: genesis) diff --git a/Blockchain/Tests/BlockchainTests/BlockAuthorTests.swift b/Blockchain/Tests/BlockchainTests/BlockAuthorTests.swift index 9460ad54..d632d4a6 100644 --- a/Blockchain/Tests/BlockchainTests/BlockAuthorTests.swift +++ b/Blockchain/Tests/BlockchainTests/BlockAuthorTests.swift @@ -12,6 +12,7 @@ struct BlockAuthorTests { let services = await BlockchainServices() let blockAuthor = await services.blockAuthor let runtime = Runtime(config: services.config) + await blockAuthor.onSyncCompleted() return (services, blockAuthor, runtime) } @@ -108,7 +109,7 @@ struct BlockAuthorTests { await blockAuthor.on(genesis: genesisState) - #expect(scheduler.storage.value.tasks.count > 0) + #expect(scheduler.taskCount > 0) await scheduler.advance(by: TimeInterval(2)) diff --git a/Blockchain/Tests/BlockchainTests/ValidatorServiceTests.swift b/Blockchain/Tests/BlockchainTests/ValidatorServiceTests.swift index 90abe888..8493157d 100644 --- a/Blockchain/Tests/BlockchainTests/ValidatorServiceTests.swift +++ b/Blockchain/Tests/BlockchainTests/ValidatorServiceTests.swift @@ -11,7 +11,7 @@ struct ValidatorServiceTests { time: TimeInterval = 988, keysCount: Int = 12 ) async throws -> (BlockchainServices, ValidatorService) { - setupTestLogger() + // setupTestLogger() let services = await BlockchainServices( config: config, @@ -25,6 +25,7 @@ struct ValidatorServiceTests { scheduler: services.scheduler, dataProvider: services.dataProvider ) + await validatorService.onSyncCompleted() return (services, validatorService) } @@ -45,7 +46,7 @@ struct ValidatorServiceTests { #expect(safroleEvents.count == config.value.totalNumberOfValidators) // Check if block author tasks were scheduled - #expect(scheduler.storage.value.tasks.count > 0) + #expect(scheduler.taskCount > 0) } @Test @@ -96,7 +97,7 @@ struct ValidatorServiceTests { // try different genesis time offset to ensure edge cases are covered @Test(arguments: [988, 1000, 1003, 1021]) - func makeManyBlocks(time: Int) async throws { + func makeManyBlocksWithAllKeys(time: Int) async throws { let (services, validatorService) = try await setup(time: TimeInterval(time)) let genesisState = services.genesisState let storeMiddleware = services.storeMiddleware diff --git a/Networking/Sources/Networking/Connection.swift b/Networking/Sources/Networking/Connection.swift index 8f873f76..5d9e3532 100644 --- a/Networking/Sources/Networking/Connection.swift +++ b/Networking/Sources/Networking/Connection.swift @@ -44,6 +44,7 @@ public final class Connection: Sendable, ConnectionInfoP } public func request(_ request: Handler.EphemeralHandler.Request) async throws -> Data { + logger.trace("sending request", metadata: ["kind": "\(request.kind)"]) let data = try request.encode() let kind = request.kind let stream = try createStream(kind: kind) @@ -194,11 +195,7 @@ func presistentStreamRunLoop( ) var decoder = handler.createDecoder(kind: kind) do { - while true { - let data = try await receiveMaybeData(stream: stream) - guard let data else { - break - } + while let data = try await receiveMaybeData(stream: stream) { let msg = try decoder.decode(data: data) try await handler.handle(connection: connection, message: msg) } diff --git a/Networking/Sources/Networking/Peer.swift b/Networking/Sources/Networking/Peer.swift index fe93a5cc..8f203665 100644 --- a/Networking/Sources/Networking/Peer.swift +++ b/Networking/Sources/Networking/Peer.swift @@ -107,13 +107,16 @@ public final class Peer: Sendable { // TODO: see if we can remove the role parameter public func connect(to address: NetAddr, role: PeerRole) throws -> Connection { let conn = impl.connections.read { connections in - connections.byType[role]?[address] + connections.byAddr[address]?.0 } return try conn ?? impl.connections.write { connections in - let curr = connections.byType[role, default: [:]][address] + let curr = connections.byAddr[address]?.0 if let curr { return curr } + + logger.debug("connecting to peer", metadata: ["address": "\(address)", "role": "\(role)"]) + let quicConn = try QuicConnection( handler: PeerEventHandler(self.impl), registration: self.impl.clientConfiguration.registration, @@ -127,7 +130,7 @@ public final class Peer: Sendable { remoteAddress: address, initiatedByLocal: true ) - connections.byType[role, default: [:]][address] = conn + connections.byAddr[address] = (conn, role) connections.byId[conn.id] = conn return conn } @@ -144,7 +147,7 @@ public final class Peer: Sendable { } for connection in connections { if let stream = try? connection.createPreistentStream(kind: kind) { - let res = Result(catching: { try stream.send(data: messageData) }) + let res = Result(catching: { try stream.send(message: messageData) }) switch res { case .success: break @@ -167,7 +170,7 @@ public final class Peer: Sendable { final class PeerImpl: Sendable { struct ConnectionStorage { - var byType: [PeerRole: [NetAddr: Connection]] = [:] + var byAddr: [NetAddr: (Connection, PeerRole)] = [:] var byId: [UniqueId: Connection] = [:] } @@ -212,14 +215,14 @@ final class PeerImpl: Sendable { func addConnection(_ connection: QuicConnection, addr: NetAddr, role: PeerRole) -> Bool { connections.write { connections in if role == .builder { - let currentCount = connections.byType[role]?.count ?? 0 + let currentCount = connections.byAddr.values.filter { $0.1 == role }.count if currentCount >= self.settings.maxBuilderConnections { self.logger.warning("max builder connections reached") // TODO: consider connection rotation strategy return false } } - if connections.byType[role, default: [:]][addr] != nil { + if connections.byAddr[addr] != nil { self.logger.warning("connection already exists") return false } @@ -230,7 +233,7 @@ final class PeerImpl: Sendable { remoteAddress: addr, initiatedByLocal: false ) - connections.byType[role, default: [:]][addr] = conn + connections.byAddr[addr] = (conn, role) connections.byId[connection.id] = conn return true } @@ -328,7 +331,7 @@ private struct PeerEventHandler: QuicEventHandler { impl.connections.write { connections in if let conn = connections.byId[connection.id] { connections.byId.removeValue(forKey: connection.id) - connections.byType[conn.role]?.removeValue(forKey: conn.remoteAddress) + connections.byAddr[conn.remoteAddress] = nil } } } diff --git a/Networking/Sources/Networking/Stream.swift b/Networking/Sources/Networking/Stream.swift index 0a79d362..90bf76fa 100644 --- a/Networking/Sources/Networking/Stream.swift +++ b/Networking/Sources/Networking/Stream.swift @@ -120,6 +120,10 @@ final class Stream: Sendable, StreamProtocol { if data.isEmpty { return } + guard canReceive else { + logger.warning("unexpected status: \(status)") + return + } if !channel.syncSend(data) { logger.warning("stream \(id) is full") // TODO: backpressure handling diff --git a/Networking/Tests/NetworkingTests/PeerTests.swift b/Networking/Tests/NetworkingTests/PeerTests.swift index c6ddf2b1..25c00e36 100644 --- a/Networking/Tests/NetworkingTests/PeerTests.swift +++ b/Networking/Tests/NetworkingTests/PeerTests.swift @@ -17,10 +17,7 @@ struct PeerTests { var kind: Kind var data: Data func encode() throws -> Data { - let length = UInt32(data.count) - var lengthData = withUnsafeBytes(of: length.littleEndian) { Data($0) } - lengthData.append(data) - return lengthData + data } typealias StreamKind = Kind @@ -101,19 +98,8 @@ struct PeerTests { func handle(connection _: any ConnectionInfoProtocol, request: Request) async throws -> Data { let data = request.data - guard data.count >= 4 else { - throw NSError( - domain: "ExtractError", code: 1, - userInfo: [NSLocalizedDescriptionKey: "Data too short to contain length"] - ) - } - let lengthData = data.prefix(4) - let length = UInt32( - littleEndian: lengthData.withUnsafeBytes { $0.loadUnaligned(as: UInt32.self) } - ) - let actualData = data.dropFirst(4).prefix(Int(length)) - await dataStorage.updateData(actualData) - return actualData + await dataStorage.updateData(data) + return data } } @@ -197,7 +183,7 @@ struct PeerTests { kind: .uniqueB, message: .init(kind: .uniqueB, data: Data("I am jam".utf8)) ) // Verify last received data - try? await Task.sleep(for: .milliseconds(1000)) + try? await Task.sleep(for: .milliseconds(100)) await #expect(handler2.lastReceivedData == Data("hello world".utf8)) await #expect(handler1.lastReceivedData == Data("I am jam".utf8)) } @@ -422,14 +408,14 @@ struct PeerTests { } // Wait for all connections to establish - try? await Task.sleep(for: .seconds(10)) + try? await Task.sleep(for: .seconds(1)) let centralPeer = peers[0] let messagedata = Data("Sync message".utf8) centralPeer.broadcast(kind: .uniqueA, message: MockRequest(kind: .uniqueA, data: messagedata)) // Wait for message to propagate - try? await Task.sleep(for: .seconds(60)) + try? await Task.sleep(for: .seconds(2)) // Check that each peer received the broadcast for i in 1 ..< handles.count { diff --git a/Node/Package.resolved b/Node/Package.resolved index cdee67bc..f1dd53fc 100644 --- a/Node/Package.resolved +++ b/Node/Package.resolved @@ -1,5 +1,5 @@ { - "originHash" : "7dcc3c8ac80d868e86196be8e7bbe367d44c35130f5b194c66eec8cb296fa12b", + "originHash" : "33177d9b5bd122b13d37a99e2337556cfc5a1b7e466229e7bdb19fd9423a117a", "pins" : [ { "identity" : "async-channels", @@ -217,6 +217,15 @@ "version" : "1.1.0" } }, + { + "identity" : "swift-syntax", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-syntax.git", + "state" : { + "revision" : "0687f71944021d616d34d922343dcef086855920", + "version" : "600.0.1" + } + }, { "identity" : "swift-system", "kind" : "remoteSourceControl", @@ -226,6 +235,15 @@ "version" : "1.3.2" } }, + { + "identity" : "swift-testing", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-testing.git", + "state" : { + "branch" : "0.10.0", + "revision" : "69d59cfc76e5daf498ca61f5af409f594768eef9" + } + }, { "identity" : "vapor", "kind" : "remoteSourceControl", diff --git a/Node/Package.swift b/Node/Package.swift index bf29c664..cfb6c483 100644 --- a/Node/Package.swift +++ b/Node/Package.swift @@ -21,7 +21,8 @@ let package = Package( .package(path: "../RPC"), .package(path: "../TracingUtils"), .package(path: "../Utils"), - .package(url: "https://github.com/gh123man/Async-Channels.git", from: "1.0.0"), + .package(url: "https://github.com/apple/swift-testing.git", branch: "0.10.0"), + .package(url: "https://github.com/gh123man/Async-Channels.git", from: "1.0.2"), ], targets: [ // Targets are the basic building blocks of a package, defining a module or a test suite. @@ -38,7 +39,10 @@ let package = Package( ), .testTarget( name: "NodeTests", - dependencies: ["Node"] + dependencies: [ + "Node", + .product(name: "Testing", package: "swift-testing"), + ] ), ], swiftLanguageModes: [.version("6")] diff --git a/Node/Sources/Node/NetworkingProtocol/NetworkManager.swift b/Node/Sources/Node/NetworkingProtocol/NetworkManager.swift index 25be8887..724be134 100644 --- a/Node/Sources/Node/NetworkingProtocol/NetworkManager.swift +++ b/Node/Sources/Node/NetworkingProtocol/NetworkManager.swift @@ -15,10 +15,10 @@ enum BroadcastTarget { } public final class NetworkManager: Sendable { - let peerManager: PeerManager - let network: Network - let syncManager: SyncManager - let blockchain: Blockchain + public let peerManager: PeerManager + public let network: Network + public let syncManager: SyncManager + public let blockchain: Blockchain private let subscriptions: EventSubscriptions // This is for development only @@ -33,7 +33,7 @@ public final class NetworkManager: Sendable { ) async throws { peerManager = PeerManager(eventBus: eventBus) - network = try await Network( + network = try Network( config: config, protocolConfig: blockchain.config, genesisHeader: blockchain.dataProvider.genesisBlockHash, @@ -61,6 +61,13 @@ public final class NetworkManager: Sendable { ) { [weak self] event in await self?.on(safroleTicketsGenerated: event) } + + await subscriptions.subscribe( + RuntimeEvents.BlockImported.self, + id: "NetworkManager.BlockImported" + ) { [weak self] event in + await self?.on(blockImported: event) + } } } @@ -123,6 +130,18 @@ public final class NetworkManager: Sendable { } } + private func on(blockImported event: RuntimeEvents.BlockImported) async { + logger.debug("sending blocks", metadata: ["hash": "\(event.block.hash)"]) + let finalized = await blockchain.dataProvider.finalizedHead + network.broadcast( + kind: .blockAnnouncement, + message: .blockAnnouncement(BlockAnnouncement( + header: event.block.header.asRef(), + finalized: HashAndSlot(hash: finalized.hash, timeslot: finalized.timeslot) + )) + ) + } + public var peersCount: Int { network.peersCount } @@ -133,6 +152,7 @@ struct HandlerImpl: NetworkProtocolHandler { let peerManager: PeerManager func handle(ceRequest: CERequest) async throws -> [any Encodable] { + logger.trace("handling request", metadata: ["request": "\(ceRequest)"]) switch ceRequest { case let .blockRequest(message): let dataProvider = blockchain.dataProvider @@ -164,6 +184,9 @@ struct HandlerImpl: NetworkProtocolHandler { for _ in 0 ..< count { let block = try await dataProvider.getBlock(hash: hash) resp.append(block) + if hash == dataProvider.genesisBlockHash { + break + } hash = block.header.parentHash } } diff --git a/Node/Sources/Node/NetworkingProtocol/SyncManager.swift b/Node/Sources/Node/NetworkingProtocol/SyncManager.swift index d4e5d597..41858774 100644 --- a/Node/Sources/Node/NetworkingProtocol/SyncManager.swift +++ b/Node/Sources/Node/NetworkingProtocol/SyncManager.swift @@ -8,11 +8,18 @@ private let logger = Logger(label: "SyncManager") let BLOCK_REQUEST_BLOCK_COUNT: UInt32 = 50 +enum SyncStatus { + case discovering + case bulkSyncing + case syncing +} + // TODO: // - pick best peer // - remove slow one // - sync peer rotation // - fast sync mode (no verification) +// - re-enter to bulk sync mode if new peer with better head is discovered public actor SyncManager: Sendable { private let blockchain: Blockchain private let network: Network @@ -20,8 +27,7 @@ public actor SyncManager: Sendable { private let subscriptions: EventSubscriptions - // starts with bulk syncing mode, until our best have catched up with the peer best - private var bulkSyncing = false + private var status = SyncStatus.discovering private var syncContinuation: [CheckedContinuation] = [] private var networkBest: HashAndSlot? @@ -45,7 +51,7 @@ public actor SyncManager: Sendable { } public func waitForSyncCompletion() async { - if !bulkSyncing { + if status == .syncing { return } await withCheckedContinuation { continuation in @@ -72,11 +78,21 @@ public actor SyncManager: Sendable { } let currentHead = await blockchain.dataProvider.bestHead + if currentHead.timeslot >= networkBest!.timeslot { + syncCompleted() + return + } - if bulkSyncing { + switch status { + case .discovering: + status = .bulkSyncing + await bulkSync(currentHead: currentHead) + case .bulkSyncing: await bulkSync(currentHead: currentHead) - } else if let newBlockHeader { - importBlock(currentTimeslot: currentHead.timeslot, newHeader: newBlockHeader, peer: info.address) + case .syncing: + if let newBlockHeader { + importBlock(currentTimeslot: currentHead.timeslot, newHeader: newBlockHeader, peer: info.address) + } } } @@ -106,13 +122,8 @@ public actor SyncManager: Sendable { let currentHead = await blockchain.dataProvider.bestHead if currentHead.timeslot >= networkBest!.timeslot { - if bulkSyncing { - bulkSyncing = false - syncContinuation.forEach { $0.resume() } - syncContinuation = [] - logger.info("bulk sync completed") - return - } + syncCompleted() + return } await bulkSync(currentHead: blockchain.dataProvider.bestHead) @@ -123,7 +134,17 @@ public actor SyncManager: Sendable { } } + private func syncCompleted() { + if status != .syncing { + status = .syncing + syncContinuation.forEach { $0.resume() } + syncContinuation = [] + logger.info("sync completed") + } + } + private func importBlock(currentTimeslot: TimeslotIndex, newHeader: HeaderRef, peer: NetAddr) { + logger.debug("importing block", metadata: ["hash": "\(newHeader.hash)", "remote": "\(peer)"]) let blockchain = blockchain let network = network Task.detached { @@ -135,9 +156,13 @@ public actor SyncManager: Sendable { direction: .descendingInclusive, maxBlocks: max(1, newHeader.value.timeslot - currentTimeslot) ))) - let decoded = try JamDecoder.decode([BlockRef].self, from: resp, withConfig: blockchain.config) + let decoder = JamDecoder(data: resp, config: blockchain.config) + var blocks = [BlockRef]() + while !decoder.isAtEnd { + try blocks.append(decoder.decode(BlockRef.self)) + } // reverse to import old block first - for block in decoded.reversed() { + for block in blocks.reversed() { try await blockchain.importBlock(block) } } catch { diff --git a/Node/Sources/Node/Node.swift b/Node/Sources/Node/Node.swift index 3b7bb7b8..9ab7da4d 100644 --- a/Node/Sources/Node/Node.swift +++ b/Node/Sources/Node/Node.swift @@ -27,16 +27,17 @@ public class Node { public let config: Config public let blockchain: Blockchain public let rpcServer: Server? - public let timeProvider: TimeProvider + public let scheduler: Scheduler public let dataProvider: BlockchainDataProvider public let keystore: KeyStore public let network: NetworkManager - public init( + public required init( config: Config, genesis: Genesis, eventBus: EventBus, - keystore: KeyStore + keystore: KeyStore, + scheduler: Scheduler = DispatchQueueScheduler(timeProvider: SystemTimeProvider()) ) async throws { self.config = config @@ -45,11 +46,11 @@ public class Node { logger.info("Genesis: \(genesisBlock.hash)") dataProvider = try await BlockchainDataProvider(InMemoryDataProvider(genesisState: genesisState, genesisBlock: genesisBlock)) - timeProvider = SystemTimeProvider() + self.scheduler = scheduler let blockchain = try await Blockchain( config: protocolConfig, dataProvider: dataProvider, - timeProvider: timeProvider, + timeProvider: scheduler.timeProvider, eventBus: eventBus ) self.blockchain = blockchain diff --git a/Node/Sources/Node/ValidatorNode.swift b/Node/Sources/Node/ValidatorNode.swift index 86c1ac9b..63af3755 100644 --- a/Node/Sources/Node/ValidatorNode.swift +++ b/Node/Sources/Node/ValidatorNode.swift @@ -6,12 +6,21 @@ import Utils public class ValidatorNode: Node { private var validator: ValidatorService! - override public required init( - config: Config, genesis: Genesis, eventBus: EventBus, keystore: KeyStore + public required init( + config: Config, + genesis: Genesis, + eventBus: EventBus, + keystore: KeyStore, + scheduler: Scheduler = DispatchQueueScheduler(timeProvider: SystemTimeProvider()) ) async throws { - try await super.init(config: config, genesis: genesis, eventBus: eventBus, keystore: keystore) + try await super.init( + config: config, + genesis: genesis, + eventBus: eventBus, + keystore: keystore, + scheduler: scheduler + ) - let scheduler = DispatchQueueScheduler(timeProvider: timeProvider) let validator = await ValidatorService( blockchain: blockchain, keystore: keystore, @@ -22,14 +31,17 @@ public class ValidatorNode: Node { self.validator = validator let syncManager = network.syncManager - let dataProvider = blockchain.dataProvider + let dataProvider: BlockchainDataProvider = blockchain.dataProvider let local = config.local Task { let genesisState = try await dataProvider.getState(hash: dataProvider.genesisBlockHash) if !local { await syncManager.waitForSyncCompletion() } - await validator.on(genesis: genesisState) + await validator.onSyncCompleted() + if await dataProvider.bestHead.hash == dataProvider.genesisBlockHash { + await validator.on(genesis: genesisState) + } } } } diff --git a/Node/Tests/NodeTests/NodeTests.swift b/Node/Tests/NodeTests/NodeTests.swift index 8fba66d3..635880c4 100644 --- a/Node/Tests/NodeTests/NodeTests.swift +++ b/Node/Tests/NodeTests/NodeTests.swift @@ -1,13 +1,133 @@ -import XCTest +import Blockchain +import Foundation +import Testing +import Utils @testable import Node -final class NodeTests: XCTestCase { - func testExample() throws { - // XCTest Documentation - // https://developer.apple.com/documentation/xctest +struct NodeTests { + @Test + func validatorNode() async throws { + let (nodes, scheduler) = try await Topology( + nodes: [NodeDescription(isValidator: true)] + ).build(genesis: .preset(.minimal)) - // Defining Test Cases and Test Methods - // https://developer.apple.com/documentation/xctest/defining_test_cases_and_test_methods + let (validatorNode, storeMiddlware) = nodes[0] + + // Get initial state + let initialBestHead = await validatorNode.dataProvider.bestHead + let initialTimeslot = initialBestHead.timeslot + + // Advance time + for _ in 0 ..< 10 { + await scheduler.advance(by: TimeInterval(validatorNode.blockchain.config.value.slotPeriodSeconds)) + await storeMiddlware.wait() + } + + // Wait for block production + try await Task.sleep(for: .milliseconds(500)) + + // Get new state + let newBestHead = await validatorNode.dataProvider.bestHead + let newTimeslot = newBestHead.timeslot + + // Verify block was produced + #expect(newTimeslot > initialTimeslot) + #expect(try await validatorNode.blockchain.dataProvider.hasBlock(hash: newBestHead.hash)) + } + + @Test + func sync() async throws { + // Create validator and full node + let (nodes, scheduler) = try await Topology( + nodes: [ + NodeDescription(isValidator: true), + NodeDescription(devSeed: 1), + ], + connections: [(0, 1)] + ).build(genesis: .preset(.minimal)) + + let (validatorNode, validatorStoreMiddlware) = nodes[0] + let (node, nodeStoreMiddlware) = nodes[1] + + // Advance time to produce blocks + for _ in 0 ..< 10 { + await scheduler.advance(by: TimeInterval(validatorNode.blockchain.config.value.slotPeriodSeconds)) + await validatorStoreMiddlware.wait() + await nodeStoreMiddlware.wait() + } + + // Wait for sync + try await Task.sleep(for: .milliseconds(500)) + + // Verify sync + let validatorBestHead = await validatorNode.dataProvider.bestHead + let nodeBestHead = await node.dataProvider.bestHead + + #expect(validatorBestHead.hash == nodeBestHead.hash) + + // Produce more blocks + for _ in 0 ..< 10 { + await scheduler.advance(by: TimeInterval(validatorNode.blockchain.config.value.slotPeriodSeconds)) + await validatorStoreMiddlware.wait() + await nodeStoreMiddlware.wait() + } + + try await Task.sleep(for: .milliseconds(500)) + + await validatorStoreMiddlware.wait() + await nodeStoreMiddlware.wait() + + // Verify new blocks are synced + let newValidatorBestHead = await validatorNode.dataProvider.bestHead + let newNodeBestHead = await node.dataProvider.bestHead + + #expect(newValidatorBestHead.hash == newNodeBestHead.hash) + #expect(newValidatorBestHead.timeslot > validatorBestHead.timeslot) + } + + @Test + func multiplePeers() async throws { + // Create multiple nodes + let (nodes, scheduler) = try await Topology( + nodes: [ + NodeDescription(isValidator: true), + NodeDescription(isValidator: true, devSeed: 1), + NodeDescription(devSeed: 2), + NodeDescription(devSeed: 3), + ], + connections: [(0, 1), (0, 2), (0, 3), (1, 2), (1, 3)] + ).build(genesis: .preset(.minimal)) + + let (validator1, validator1StoreMiddlware) = nodes[0] + let (validator2, validator2StoreMiddlware) = nodes[1] + let (node1, node1StoreMiddlware) = nodes[2] + let (node2, node2StoreMiddlware) = nodes[3] + + try await Task.sleep(for: .milliseconds(500)) + + // Verify connections + #expect(node1.network.peersCount == 2) + #expect(node2.network.peersCount == 2) + + // Advance time and verify sync + for _ in 0 ..< 10 { + await scheduler.advance(by: TimeInterval(validator1.blockchain.config.value.slotPeriodSeconds)) + await validator1StoreMiddlware.wait() + await validator2StoreMiddlware.wait() + await node1StoreMiddlware.wait() + await node2StoreMiddlware.wait() + } + + try await Task.sleep(for: .milliseconds(500)) + + let validator1BestHead = await validator1.dataProvider.bestHead + let validator2BestHead = await validator2.dataProvider.bestHead + let node1BestHead = await node1.dataProvider.bestHead + let node2BestHead = await node2.dataProvider.bestHead + + #expect(validator1BestHead.hash == node1BestHead.hash) + #expect(validator1BestHead.hash == node2BestHead.hash) + #expect(validator2BestHead.hash == node1BestHead.hash) } } diff --git a/Node/Tests/NodeTests/Topology.swift b/Node/Tests/NodeTests/Topology.swift new file mode 100644 index 00000000..30014ad3 --- /dev/null +++ b/Node/Tests/NodeTests/Topology.swift @@ -0,0 +1,67 @@ +import Blockchain +import Node +import TracingUtils +import Utils + +struct NodeDescription { + let isValidator: Bool + let devSeed: UInt32 + + public init(isValidator: Bool = false, devSeed: UInt32 = 0) { + self.isValidator = isValidator + self.devSeed = devSeed + } +} + +struct Topology { + let nodes: [NodeDescription] + let connections: [(Int, Int)] + + init(nodes: [NodeDescription], connections: [(Int, Int)] = []) { + self.nodes = nodes + self.connections = connections + } + + func build(genesis: Genesis) async throws -> ([(Node, StoreMiddleware)], MockScheduler) { + // setupTestLogger() + + let timeProvider = MockTimeProvider(time: 1000) + let scheduler = MockScheduler(timeProvider: timeProvider) + var ret: [(Node, StoreMiddleware)] = [] + for desc in nodes { + let storeMiddleware = StoreMiddleware() + let eventBus = EventBus(eventMiddleware: .serial(Middleware(storeMiddleware), .noError), handlerMiddleware: .noError) + let keystore = try await DevKeyStore(devKeysCount: 0) + let keys = try await keystore.addDevKeys(seed: desc.devSeed) + let nodeConfig = await Node.Config( + rpc: nil, + network: Network.Config( + role: desc.isValidator ? .validator : .builder, + listenAddress: NetAddr(address: "127.0.0.1:0")!, + key: keystore.get(Ed25519.self, publicKey: keys.ed25519)! + ), + peers: [], + local: nodes.count == 1 + ) + let nodeCls = desc.isValidator ? ValidatorNode.self : Node.self + let node = try await nodeCls.init( + config: nodeConfig, + genesis: genesis, + eventBus: eventBus, + keystore: keystore, + scheduler: scheduler + ) + ret.append((node, storeMiddleware)) + } + + // wait for the listeners to be ready + try await Task.sleep(for: .milliseconds(500)) + + for (from, to) in connections { + let fromNode = ret[from].0 + let toNode = ret[to].0 + _ = try fromNode.network.network.connect(to: toNode.network.network.listenAddress(), role: .validator) + } + return (ret, scheduler) + } +}