From 40bf0d72a5e5bfd1b88a90691a7b0e3f30966f63 Mon Sep 17 00:00:00 2001 From: MacOMNI <414294494@qq.com> Date: Wed, 8 Jan 2025 15:06:26 +0800 Subject: [PATCH 01/27] multiplePeers tests --- .../BlockchainDataProvider.swift | 2 +- .../xcode/xcshareddata/xcschemes/Node.xcscheme | 12 ++++++++++++ Node/Tests/NodeTests/NodeTests.swift | 6 +++--- 3 files changed, 16 insertions(+), 4 deletions(-) diff --git a/Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProvider.swift b/Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProvider.swift index d42c9a0f..83a69a18 100644 --- a/Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProvider.swift +++ b/Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProvider.swift @@ -53,7 +53,7 @@ public actor BlockchainDataProvider: Sendable { bestHead = HeadInfo(hash: block.hash, timeslot: block.header.timeslot, number: number) } - logger.debug("block imported: \(block.hash)") + logger.info("block Imported: #\(bestHead.timeslot) \(block.hash)") } } diff --git a/Node/.swiftpm/xcode/xcshareddata/xcschemes/Node.xcscheme b/Node/.swiftpm/xcode/xcshareddata/xcschemes/Node.xcscheme index 4490e214..8501d959 100644 --- a/Node/.swiftpm/xcode/xcshareddata/xcschemes/Node.xcscheme +++ b/Node/.swiftpm/xcode/xcshareddata/xcschemes/Node.xcscheme @@ -29,6 +29,18 @@ selectedLauncherIdentifier = "Xcode.DebuggerFoundation.Launcher.LLDB" shouldUseLaunchSchemeArgsEnv = "YES" shouldAutocreateTestPlan = "YES"> + + + + + + Date: Wed, 8 Jan 2025 15:32:06 +0800 Subject: [PATCH 02/27] update swiftlint --- Node/Tests/NodeTests/NodeTests.swift | 1 - 1 file changed, 1 deletion(-) diff --git a/Node/Tests/NodeTests/NodeTests.swift b/Node/Tests/NodeTests/NodeTests.swift index a49cbceb..2e1b1c6f 100644 --- a/Node/Tests/NodeTests/NodeTests.swift +++ b/Node/Tests/NodeTests/NodeTests.swift @@ -168,7 +168,6 @@ final class NodeTests { let validator2BestHead = await validator2.dataProvider.bestHead let node1BestHead = await node1.dataProvider.bestHead let node2BestHead = await node2.dataProvider.bestHead - #expect(validator1BestHead.hash == node1BestHead.hash) #expect(validator1BestHead.hash == node2BestHead.hash) #expect(validator2BestHead.hash == node1BestHead.hash) From a15467c5c5d5fbb50338da622e0acdaabd6993eb Mon Sep 17 00:00:00 2001 From: MacOMNI <414294494@qq.com> Date: Wed, 8 Jan 2025 16:31:02 +0800 Subject: [PATCH 03/27] update more tests --- Node/Tests/NodeTests/NodeTests.swift | 57 ++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/Node/Tests/NodeTests/NodeTests.swift b/Node/Tests/NodeTests/NodeTests.swift index 2e1b1c6f..ea335230 100644 --- a/Node/Tests/NodeTests/NodeTests.swift +++ b/Node/Tests/NodeTests/NodeTests.swift @@ -172,4 +172,61 @@ final class NodeTests { #expect(validator1BestHead.hash == node2BestHead.hash) #expect(validator2BestHead.hash == node1BestHead.hash) } + + @Test + func moreMultiplePeers() async throws { + // Create multiple nodes + var nodeDescriptions: [NodeDescription] = [ + NodeDescription(isValidator: true, database: getDatabase(0)), + NodeDescription(isValidator: true, devSeed: 1, database: getDatabase(1)) + ] + + // Add 18 non-validator nodes + for i in 2...19 { + nodeDescriptions.append(NodeDescription(devSeed: UInt32(i), database: .inMemory)) + } + + let (nodes, scheduler) = try await Topology( + nodes: nodeDescriptions, + connections: (0..<20).flatMap { i in + (i + 1..<20).map { j in (i, j) } // Fully connected topology + } + ).build(genesis: .preset(.minimal)) + + let (validator1, validator1StoreMiddlware) = nodes[0] + let (validator2, validator2StoreMiddlware) = nodes[1] + + // Extract non-validator nodes and their middleware + let nonValidatorNodes = nodes[2...].map { $0 } + + try await Task.sleep(for: .milliseconds(nodes.count * 200)) + let (node1, node1StoreMiddlware) = nonValidatorNodes[0] + let (node2, node2StoreMiddlware) = nonValidatorNodes[1] + // Verify connections for a sample of non-validator nodes + #expect(node1.network.peersCount == 19) + #expect(node2.network.peersCount == 19) + // Advance time and verify sync + for _ in 0 ..< 10 { + await scheduler.advance(by: TimeInterval(validator1.blockchain.config.value.slotPeriodSeconds)) + await validator1StoreMiddlware.wait() + await validator2StoreMiddlware.wait() + + for (_, middleware) in nonValidatorNodes { + await middleware.wait() + } + } + + try await Task.sleep(for: .milliseconds(nodes.count * 200)) + + let validator1BestHead = await validator1.dataProvider.bestHead + let validator2BestHead = await validator2.dataProvider.bestHead + + for (node, _) in nonValidatorNodes { + let nodeBestHead = await node.dataProvider.bestHead + #expect(validator1BestHead.hash == nodeBestHead.hash) + #expect(validator2BestHead.hash == nodeBestHead.hash) + } + } + + } From b7c9e51f2dcdf84112c5206ee7422c66b40f708d Mon Sep 17 00:00:00 2001 From: MacOMNI <414294494@qq.com> Date: Wed, 8 Jan 2025 16:52:15 +0800 Subject: [PATCH 04/27] update --- Node/Tests/NodeTests/NodeTests.swift | 80 ++++++++++++++-------------- 1 file changed, 39 insertions(+), 41 deletions(-) diff --git a/Node/Tests/NodeTests/NodeTests.swift b/Node/Tests/NodeTests/NodeTests.swift index ea335230..2b1683ec 100644 --- a/Node/Tests/NodeTests/NodeTests.swift +++ b/Node/Tests/NodeTests/NodeTests.swift @@ -10,75 +10,75 @@ final class NodeTests { let tmpDir = FileManager.default.temporaryDirectory return tmpDir.appendingPathComponent("\(UUID().uuidString)") }() - + func getDatabase(_ idx: Int) -> Database { Database.rocksDB(path: path.appendingPathComponent("\(idx)")) } - + deinit { try? FileManager.default.removeItem(at: path) } - + @Test func validatorNodeInMemory() async throws { let (nodes, scheduler) = try await Topology( nodes: [NodeDescription(isValidator: true)] ).build(genesis: .preset(.minimal)) - + let (validatorNode, storeMiddlware) = nodes[0] - + // Get initial state let initialBestHead = await validatorNode.dataProvider.bestHead let initialTimeslot = initialBestHead.timeslot - + // Advance time for _ in 0 ..< 10 { await scheduler.advance(by: TimeInterval(validatorNode.blockchain.config.value.slotPeriodSeconds)) await storeMiddlware.wait() } - + // Wait for block production try await Task.sleep(for: .milliseconds(500)) - + // Get new state let newBestHead = await validatorNode.dataProvider.bestHead let newTimeslot = newBestHead.timeslot - + // Verify block was produced #expect(newTimeslot > initialTimeslot) #expect(try await validatorNode.blockchain.dataProvider.hasBlock(hash: newBestHead.hash)) } - + @Test func validatorNodeRocksDB() async throws { let (nodes, scheduler) = try await Topology( nodes: [NodeDescription(isValidator: true, database: getDatabase(0))] ).build(genesis: .preset(.minimal)) - + let (validatorNode, storeMiddlware) = nodes[0] - + // Get initial state let initialBestHead = await validatorNode.dataProvider.bestHead let initialTimeslot = initialBestHead.timeslot - + // Advance time for _ in 0 ..< 10 { await scheduler.advance(by: TimeInterval(validatorNode.blockchain.config.value.slotPeriodSeconds)) await storeMiddlware.wait() } - + // Wait for block production try await Task.sleep(for: .milliseconds(500)) - + // Get new state let newBestHead = await validatorNode.dataProvider.bestHead let newTimeslot = newBestHead.timeslot - + // Verify block was produced #expect(newTimeslot > initialTimeslot) #expect(try await validatorNode.blockchain.dataProvider.hasBlock(hash: newBestHead.hash)) } - + @Test func sync() async throws { // Create validator and full node @@ -89,46 +89,46 @@ final class NodeTests { ], connections: [(0, 1)] ).build(genesis: .preset(.minimal)) - + let (validatorNode, validatorStoreMiddlware) = nodes[0] let (node, nodeStoreMiddlware) = nodes[1] - + // Advance time to produce blocks for _ in 0 ..< 10 { await scheduler.advance(by: TimeInterval(validatorNode.blockchain.config.value.slotPeriodSeconds)) await validatorStoreMiddlware.wait() await nodeStoreMiddlware.wait() } - + // Wait for sync try await Task.sleep(for: .milliseconds(500)) - + // Verify sync let validatorBestHead = await validatorNode.dataProvider.bestHead let nodeBestHead = await node.dataProvider.bestHead - + #expect(validatorBestHead.hash == nodeBestHead.hash) - + // Produce more blocks for _ in 0 ..< 10 { await scheduler.advance(by: TimeInterval(validatorNode.blockchain.config.value.slotPeriodSeconds)) await validatorStoreMiddlware.wait() await nodeStoreMiddlware.wait() } - + try await Task.sleep(for: .milliseconds(500)) - + await validatorStoreMiddlware.wait() await nodeStoreMiddlware.wait() - + // Verify new blocks are synced let newValidatorBestHead = await validatorNode.dataProvider.bestHead let newNodeBestHead = await node.dataProvider.bestHead - + #expect(newValidatorBestHead.hash == newNodeBestHead.hash) #expect(newValidatorBestHead.timeslot > validatorBestHead.timeslot) } - + @Test func multiplePeers() async throws { // Create multiple nodes @@ -141,18 +141,18 @@ final class NodeTests { ], connections: [(0, 1), (0, 2), (0, 3), (1, 2), (1, 3)] ).build(genesis: .preset(.minimal)) - + let (validator1, validator1StoreMiddlware) = nodes[0] let (validator2, validator2StoreMiddlware) = nodes[1] let (node1, node1StoreMiddlware) = nodes[2] let (node2, node2StoreMiddlware) = nodes[3] - + try await Task.sleep(for: .milliseconds(nodes.count * 500)) - + // Verify connections #expect(node1.network.peersCount == 2) #expect(node2.network.peersCount == 2) - + // Advance time and verify sync for _ in 0 ..< 10 { await scheduler.advance(by: TimeInterval(validator1.blockchain.config.value.slotPeriodSeconds)) @@ -161,9 +161,9 @@ final class NodeTests { await node1StoreMiddlware.wait() await node2StoreMiddlware.wait() } - + try await Task.sleep(for: .milliseconds(nodes.count * 500)) - + let validator1BestHead = await validator1.dataProvider.bestHead let validator2BestHead = await validator2.dataProvider.bestHead let node1BestHead = await node1.dataProvider.bestHead @@ -192,13 +192,13 @@ final class NodeTests { (i + 1..<20).map { j in (i, j) } // Fully connected topology } ).build(genesis: .preset(.minimal)) - + let (validator1, validator1StoreMiddlware) = nodes[0] let (validator2, validator2StoreMiddlware) = nodes[1] - + // Extract non-validator nodes and their middleware let nonValidatorNodes = nodes[2...].map { $0 } - + try await Task.sleep(for: .milliseconds(nodes.count * 200)) let (node1, node1StoreMiddlware) = nonValidatorNodes[0] let (node2, node2StoreMiddlware) = nonValidatorNodes[1] @@ -215,9 +215,9 @@ final class NodeTests { await middleware.wait() } } - + try await Task.sleep(for: .milliseconds(nodes.count * 200)) - + let validator1BestHead = await validator1.dataProvider.bestHead let validator2BestHead = await validator2.dataProvider.bestHead @@ -227,6 +227,4 @@ final class NodeTests { #expect(validator2BestHead.hash == nodeBestHead.hash) } } - - } From 54d1efe7801c308e8e60248197b3d51d8ae757d3 Mon Sep 17 00:00:00 2001 From: MacOMNI <414294494@qq.com> Date: Wed, 8 Jan 2025 16:56:15 +0800 Subject: [PATCH 05/27] update more test --- Node/Tests/NodeTests/NodeTests.swift | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Node/Tests/NodeTests/NodeTests.swift b/Node/Tests/NodeTests/NodeTests.swift index 2b1683ec..054cee4a 100644 --- a/Node/Tests/NodeTests/NodeTests.swift +++ b/Node/Tests/NodeTests/NodeTests.swift @@ -147,7 +147,7 @@ final class NodeTests { let (node1, node1StoreMiddlware) = nodes[2] let (node2, node2StoreMiddlware) = nodes[3] - try await Task.sleep(for: .milliseconds(nodes.count * 500)) + try await Task.sleep(for: .milliseconds(nodes.count * 200)) // Verify connections #expect(node1.network.peersCount == 2) @@ -199,7 +199,7 @@ final class NodeTests { // Extract non-validator nodes and their middleware let nonValidatorNodes = nodes[2...].map { $0 } - try await Task.sleep(for: .milliseconds(nodes.count * 200)) + try await Task.sleep(for: .milliseconds(nodes.count * 100)) let (node1, node1StoreMiddlware) = nonValidatorNodes[0] let (node2, node2StoreMiddlware) = nonValidatorNodes[1] // Verify connections for a sample of non-validator nodes From 773e12580734facdf11c491c7b79ce14730d18fb Mon Sep 17 00:00:00 2001 From: MacOMNI <414294494@qq.com> Date: Wed, 8 Jan 2025 17:23:24 +0800 Subject: [PATCH 06/27] update swiftlint --- Node/Tests/NodeTests/NodeTests.swift | 22 ++++++++-------------- 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/Node/Tests/NodeTests/NodeTests.swift b/Node/Tests/NodeTests/NodeTests.swift index 054cee4a..c2fec61f 100644 --- a/Node/Tests/NodeTests/NodeTests.swift +++ b/Node/Tests/NodeTests/NodeTests.swift @@ -10,23 +10,22 @@ final class NodeTests { let tmpDir = FileManager.default.temporaryDirectory return tmpDir.appendingPathComponent("\(UUID().uuidString)") }() - + func getDatabase(_ idx: Int) -> Database { Database.rocksDB(path: path.appendingPathComponent("\(idx)")) } - + deinit { try? FileManager.default.removeItem(at: path) } - - @Test + + @Test func validatorNodeInMemory() async throws { let (nodes, scheduler) = try await Topology( nodes: [NodeDescription(isValidator: true)] ).build(genesis: .preset(.minimal)) - + let (validatorNode, storeMiddlware) = nodes[0] - // Get initial state let initialBestHead = await validatorNode.dataProvider.bestHead let initialTimeslot = initialBestHead.timeslot @@ -180,25 +179,21 @@ final class NodeTests { NodeDescription(isValidator: true, database: getDatabase(0)), NodeDescription(isValidator: true, devSeed: 1, database: getDatabase(1)) ] - // Add 18 non-validator nodes for i in 2...19 { nodeDescriptions.append(NodeDescription(devSeed: UInt32(i), database: .inMemory)) } - + let (nodes, scheduler) = try await Topology( nodes: nodeDescriptions, connections: (0..<20).flatMap { i in (i + 1..<20).map { j in (i, j) } // Fully connected topology } ).build(genesis: .preset(.minimal)) - let (validator1, validator1StoreMiddlware) = nodes[0] let (validator2, validator2StoreMiddlware) = nodes[1] - // Extract non-validator nodes and their middleware let nonValidatorNodes = nodes[2...].map { $0 } - try await Task.sleep(for: .milliseconds(nodes.count * 100)) let (node1, node1StoreMiddlware) = nonValidatorNodes[0] let (node2, node2StoreMiddlware) = nonValidatorNodes[1] @@ -215,12 +210,11 @@ final class NodeTests { await middleware.wait() } } - try await Task.sleep(for: .milliseconds(nodes.count * 200)) - + let validator1BestHead = await validator1.dataProvider.bestHead let validator2BestHead = await validator2.dataProvider.bestHead - + for (node, _) in nonValidatorNodes { let nodeBestHead = await node.dataProvider.bestHead #expect(validator1BestHead.hash == nodeBestHead.hash) From c99ad210483dac576c8e5cc7511ac17c28c81f5b Mon Sep 17 00:00:00 2001 From: MacOMNI <414294494@qq.com> Date: Wed, 8 Jan 2025 17:28:15 +0800 Subject: [PATCH 07/27] update --- Node/Tests/NodeTests/NodeTests.swift | 29 ---------------------------- 1 file changed, 29 deletions(-) diff --git a/Node/Tests/NodeTests/NodeTests.swift b/Node/Tests/NodeTests/NodeTests.swift index c2fec61f..45f6c3aa 100644 --- a/Node/Tests/NodeTests/NodeTests.swift +++ b/Node/Tests/NodeTests/NodeTests.swift @@ -29,20 +29,16 @@ final class NodeTests { // Get initial state let initialBestHead = await validatorNode.dataProvider.bestHead let initialTimeslot = initialBestHead.timeslot - // Advance time for _ in 0 ..< 10 { await scheduler.advance(by: TimeInterval(validatorNode.blockchain.config.value.slotPeriodSeconds)) await storeMiddlware.wait() } - // Wait for block production try await Task.sleep(for: .milliseconds(500)) - // Get new state let newBestHead = await validatorNode.dataProvider.bestHead let newTimeslot = newBestHead.timeslot - // Verify block was produced #expect(newTimeslot > initialTimeslot) #expect(try await validatorNode.blockchain.dataProvider.hasBlock(hash: newBestHead.hash)) @@ -53,26 +49,20 @@ final class NodeTests { let (nodes, scheduler) = try await Topology( nodes: [NodeDescription(isValidator: true, database: getDatabase(0))] ).build(genesis: .preset(.minimal)) - let (validatorNode, storeMiddlware) = nodes[0] - // Get initial state let initialBestHead = await validatorNode.dataProvider.bestHead let initialTimeslot = initialBestHead.timeslot - // Advance time for _ in 0 ..< 10 { await scheduler.advance(by: TimeInterval(validatorNode.blockchain.config.value.slotPeriodSeconds)) await storeMiddlware.wait() } - // Wait for block production try await Task.sleep(for: .milliseconds(500)) - // Get new state let newBestHead = await validatorNode.dataProvider.bestHead let newTimeslot = newBestHead.timeslot - // Verify block was produced #expect(newTimeslot > initialTimeslot) #expect(try await validatorNode.blockchain.dataProvider.hasBlock(hash: newBestHead.hash)) @@ -88,42 +78,32 @@ final class NodeTests { ], connections: [(0, 1)] ).build(genesis: .preset(.minimal)) - let (validatorNode, validatorStoreMiddlware) = nodes[0] let (node, nodeStoreMiddlware) = nodes[1] - // Advance time to produce blocks for _ in 0 ..< 10 { await scheduler.advance(by: TimeInterval(validatorNode.blockchain.config.value.slotPeriodSeconds)) await validatorStoreMiddlware.wait() await nodeStoreMiddlware.wait() } - // Wait for sync try await Task.sleep(for: .milliseconds(500)) - // Verify sync let validatorBestHead = await validatorNode.dataProvider.bestHead let nodeBestHead = await node.dataProvider.bestHead - #expect(validatorBestHead.hash == nodeBestHead.hash) - // Produce more blocks for _ in 0 ..< 10 { await scheduler.advance(by: TimeInterval(validatorNode.blockchain.config.value.slotPeriodSeconds)) await validatorStoreMiddlware.wait() await nodeStoreMiddlware.wait() } - try await Task.sleep(for: .milliseconds(500)) - await validatorStoreMiddlware.wait() await nodeStoreMiddlware.wait() - // Verify new blocks are synced let newValidatorBestHead = await validatorNode.dataProvider.bestHead let newNodeBestHead = await node.dataProvider.bestHead - #expect(newValidatorBestHead.hash == newNodeBestHead.hash) #expect(newValidatorBestHead.timeslot > validatorBestHead.timeslot) } @@ -140,18 +120,14 @@ final class NodeTests { ], connections: [(0, 1), (0, 2), (0, 3), (1, 2), (1, 3)] ).build(genesis: .preset(.minimal)) - let (validator1, validator1StoreMiddlware) = nodes[0] let (validator2, validator2StoreMiddlware) = nodes[1] let (node1, node1StoreMiddlware) = nodes[2] let (node2, node2StoreMiddlware) = nodes[3] - try await Task.sleep(for: .milliseconds(nodes.count * 200)) - // Verify connections #expect(node1.network.peersCount == 2) #expect(node2.network.peersCount == 2) - // Advance time and verify sync for _ in 0 ..< 10 { await scheduler.advance(by: TimeInterval(validator1.blockchain.config.value.slotPeriodSeconds)) @@ -160,9 +136,7 @@ final class NodeTests { await node1StoreMiddlware.wait() await node2StoreMiddlware.wait() } - try await Task.sleep(for: .milliseconds(nodes.count * 500)) - let validator1BestHead = await validator1.dataProvider.bestHead let validator2BestHead = await validator2.dataProvider.bestHead let node1BestHead = await node1.dataProvider.bestHead @@ -183,7 +157,6 @@ final class NodeTests { for i in 2...19 { nodeDescriptions.append(NodeDescription(devSeed: UInt32(i), database: .inMemory)) } - let (nodes, scheduler) = try await Topology( nodes: nodeDescriptions, connections: (0..<20).flatMap { i in @@ -211,10 +184,8 @@ final class NodeTests { } } try await Task.sleep(for: .milliseconds(nodes.count * 200)) - let validator1BestHead = await validator1.dataProvider.bestHead let validator2BestHead = await validator2.dataProvider.bestHead - for (node, _) in nonValidatorNodes { let nodeBestHead = await node.dataProvider.bestHead #expect(validator1BestHead.hash == nodeBestHead.hash) From 3afa5520a4858942fccc36bde4939c9ad9f45f2e Mon Sep 17 00:00:00 2001 From: MacOMNI <414294494@qq.com> Date: Wed, 8 Jan 2025 17:30:56 +0800 Subject: [PATCH 08/27] update --- Node/Tests/NodeTests/NodeTests.swift | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/Node/Tests/NodeTests/NodeTests.swift b/Node/Tests/NodeTests/NodeTests.swift index 45f6c3aa..f75083c6 100644 --- a/Node/Tests/NodeTests/NodeTests.swift +++ b/Node/Tests/NodeTests/NodeTests.swift @@ -19,8 +19,7 @@ final class NodeTests { try? FileManager.default.removeItem(at: path) } - @Test - func validatorNodeInMemory() async throws { + @Test func validatorNodeInMemory() async throws { let (nodes, scheduler) = try await Topology( nodes: [NodeDescription(isValidator: true)] ).build(genesis: .preset(.minimal)) @@ -44,8 +43,7 @@ final class NodeTests { #expect(try await validatorNode.blockchain.dataProvider.hasBlock(hash: newBestHead.hash)) } - @Test - func validatorNodeRocksDB() async throws { + @Test func validatorNodeRocksDB() async throws { let (nodes, scheduler) = try await Topology( nodes: [NodeDescription(isValidator: true, database: getDatabase(0))] ).build(genesis: .preset(.minimal)) @@ -68,8 +66,7 @@ final class NodeTests { #expect(try await validatorNode.blockchain.dataProvider.hasBlock(hash: newBestHead.hash)) } - @Test - func sync() async throws { + @Test func sync() async throws { // Create validator and full node let (nodes, scheduler) = try await Topology( nodes: [ @@ -108,8 +105,7 @@ final class NodeTests { #expect(newValidatorBestHead.timeslot > validatorBestHead.timeslot) } - @Test - func multiplePeers() async throws { + @Test func multiplePeers() async throws { // Create multiple nodes let (nodes, scheduler) = try await Topology( nodes: [ @@ -146,8 +142,7 @@ final class NodeTests { #expect(validator2BestHead.hash == node1BestHead.hash) } - @Test - func moreMultiplePeers() async throws { + @Test func moreMultiplePeers() async throws { // Create multiple nodes var nodeDescriptions: [NodeDescription] = [ NodeDescription(isValidator: true, database: getDatabase(0)), From cf261066ab98cfa96717a1a650529bc07e408687 Mon Sep 17 00:00:00 2001 From: MacOMNI <414294494@qq.com> Date: Wed, 8 Jan 2025 17:33:01 +0800 Subject: [PATCH 09/27] update --- Node/Tests/NodeTests/NodeTests.swift | 4 ---- 1 file changed, 4 deletions(-) diff --git a/Node/Tests/NodeTests/NodeTests.swift b/Node/Tests/NodeTests/NodeTests.swift index f75083c6..a99a6451 100644 --- a/Node/Tests/NodeTests/NodeTests.swift +++ b/Node/Tests/NodeTests/NodeTests.swift @@ -42,7 +42,6 @@ final class NodeTests { #expect(newTimeslot > initialTimeslot) #expect(try await validatorNode.blockchain.dataProvider.hasBlock(hash: newBestHead.hash)) } - @Test func validatorNodeRocksDB() async throws { let (nodes, scheduler) = try await Topology( nodes: [NodeDescription(isValidator: true, database: getDatabase(0))] @@ -65,7 +64,6 @@ final class NodeTests { #expect(newTimeslot > initialTimeslot) #expect(try await validatorNode.blockchain.dataProvider.hasBlock(hash: newBestHead.hash)) } - @Test func sync() async throws { // Create validator and full node let (nodes, scheduler) = try await Topology( @@ -104,7 +102,6 @@ final class NodeTests { #expect(newValidatorBestHead.hash == newNodeBestHead.hash) #expect(newValidatorBestHead.timeslot > validatorBestHead.timeslot) } - @Test func multiplePeers() async throws { // Create multiple nodes let (nodes, scheduler) = try await Topology( @@ -141,7 +138,6 @@ final class NodeTests { #expect(validator1BestHead.hash == node2BestHead.hash) #expect(validator2BestHead.hash == node1BestHead.hash) } - @Test func moreMultiplePeers() async throws { // Create multiple nodes var nodeDescriptions: [NodeDescription] = [ From 85f7f986c9d34bfe3001bc6c3d3c56b0082c0306 Mon Sep 17 00:00:00 2001 From: MacOMNI <414294494@qq.com> Date: Wed, 8 Jan 2025 19:05:49 +0800 Subject: [PATCH 10/27] update swiftlint --- .../RuntimeProtocols/Accumulation.swift | 2 +- .../RuntimeProtocols/Guaranteeing.swift | 4 +- .../VMInvocations/HostCall/HostCalls.swift | 2 +- Node/Tests/NodeTests/NodeTests.swift | 78 +++++++++++++++---- Utils/Sources/Utils/Merklization/MMR.swift | 2 +- 5 files changed, 66 insertions(+), 22 deletions(-) diff --git a/Blockchain/Sources/Blockchain/RuntimeProtocols/Accumulation.swift b/Blockchain/Sources/Blockchain/RuntimeProtocols/Accumulation.swift index 5ccfdc2e..9885d90d 100644 --- a/Blockchain/Sources/Blockchain/RuntimeProtocols/Accumulation.swift +++ b/Blockchain/Sources/Blockchain/RuntimeProtocols/Accumulation.swift @@ -319,7 +319,7 @@ extension Accumulation { let rightQueueItems = accumulationQueue.array[index...] let leftQueueItems = accumulationQueue.array[0 ..< index] - var allQueueItems = rightQueueItems.flatMap { $0 } + leftQueueItems.flatMap { $0 } + newQueueItems + var allQueueItems = rightQueueItems.flatMap(\.self) + leftQueueItems.flatMap(\.self) + newQueueItems editAccumulatedItems(items: &allQueueItems, accumulatedPackages: Set(zeroPrereqReports.map(\.packageSpecification.workPackageHash))) diff --git a/Blockchain/Sources/Blockchain/RuntimeProtocols/Guaranteeing.swift b/Blockchain/Sources/Blockchain/RuntimeProtocols/Guaranteeing.swift index cdfd57c0..3b08ee14 100644 --- a/Blockchain/Sources/Blockchain/RuntimeProtocols/Guaranteeing.swift +++ b/Blockchain/Sources/Blockchain/RuntimeProtocols/Guaranteeing.swift @@ -185,8 +185,8 @@ extension Guaranteeing { } let recentWorkPackageHashes: Set = Set(recentHistory.items.flatMap(\.lookup.keys)) - let accumulateHistoryReports = Set(accumulationHistory.array.flatMap { $0 }) - let accumulateQueueReports = Set(accumulationQueue.array.flatMap { $0 } + let accumulateHistoryReports = Set(accumulationHistory.array.flatMap(\.self)) + let accumulateQueueReports = Set(accumulationQueue.array.flatMap(\.self) .flatMap(\.workReport.refinementContext.prerequisiteWorkPackages)) let pendingWorkReportHashes = Set(reports.array.flatMap { $0?.workReport.refinementContext.prerequisiteWorkPackages ?? [] }) let pipelinedWorkReportHashes = recentWorkPackageHashes.union(accumulateHistoryReports).union(accumulateQueueReports) diff --git a/Blockchain/Sources/Blockchain/VMInvocations/HostCall/HostCalls.swift b/Blockchain/Sources/Blockchain/VMInvocations/HostCall/HostCalls.swift index d5616ce0..5267f74b 100644 --- a/Blockchain/Sources/Blockchain/VMInvocations/HostCall/HostCalls.swift +++ b/Blockchain/Sources/Blockchain/VMInvocations/HostCall/HostCalls.swift @@ -909,7 +909,7 @@ public class Invoke: HostCall { self.context = context } - public func _callImpl(config: ProtocolConfigRef, state: VMState) async throws { + public func _callImpl(config _: ProtocolConfigRef, state: VMState) async throws { let pvmIndex: UInt64 = state.readRegister(Registers.Index(raw: 7)) let startAddr: UInt32 = state.readRegister(Registers.Index(raw: 8)) diff --git a/Node/Tests/NodeTests/NodeTests.swift b/Node/Tests/NodeTests/NodeTests.swift index a99a6451..b7ce96d6 100644 --- a/Node/Tests/NodeTests/NodeTests.swift +++ b/Node/Tests/NodeTests/NodeTests.swift @@ -18,52 +18,66 @@ final class NodeTests { deinit { try? FileManager.default.removeItem(at: path) } - @Test func validatorNodeInMemory() async throws { let (nodes, scheduler) = try await Topology( nodes: [NodeDescription(isValidator: true)] ).build(genesis: .preset(.minimal)) let (validatorNode, storeMiddlware) = nodes[0] + // Get initial state let initialBestHead = await validatorNode.dataProvider.bestHead let initialTimeslot = initialBestHead.timeslot + // Advance time - for _ in 0 ..< 10 { - await scheduler.advance(by: TimeInterval(validatorNode.blockchain.config.value.slotPeriodSeconds)) + for _ in 0..<10 { + await scheduler.advance( + by: TimeInterval(validatorNode.blockchain.config.value.slotPeriodSeconds)) await storeMiddlware.wait() } + // Wait for block production try await Task.sleep(for: .milliseconds(500)) + // Get new state let newBestHead = await validatorNode.dataProvider.bestHead let newTimeslot = newBestHead.timeslot + // Verify block was produced #expect(newTimeslot > initialTimeslot) #expect(try await validatorNode.blockchain.dataProvider.hasBlock(hash: newBestHead.hash)) } + @Test func validatorNodeRocksDB() async throws { let (nodes, scheduler) = try await Topology( nodes: [NodeDescription(isValidator: true, database: getDatabase(0))] ).build(genesis: .preset(.minimal)) + let (validatorNode, storeMiddlware) = nodes[0] + // Get initial state let initialBestHead = await validatorNode.dataProvider.bestHead let initialTimeslot = initialBestHead.timeslot + // Advance time - for _ in 0 ..< 10 { - await scheduler.advance(by: TimeInterval(validatorNode.blockchain.config.value.slotPeriodSeconds)) + for _ in 0..<10 { + await scheduler.advance( + by: TimeInterval(validatorNode.blockchain.config.value.slotPeriodSeconds)) await storeMiddlware.wait() } + // Wait for block production try await Task.sleep(for: .milliseconds(500)) + // Get new state let newBestHead = await validatorNode.dataProvider.bestHead let newTimeslot = newBestHead.timeslot + // Verify block was produced #expect(newTimeslot > initialTimeslot) #expect(try await validatorNode.blockchain.dataProvider.hasBlock(hash: newBestHead.hash)) } + @Test func sync() async throws { // Create validator and full node let (nodes, scheduler) = try await Topology( @@ -73,35 +87,48 @@ final class NodeTests { ], connections: [(0, 1)] ).build(genesis: .preset(.minimal)) + let (validatorNode, validatorStoreMiddlware) = nodes[0] let (node, nodeStoreMiddlware) = nodes[1] + // Advance time to produce blocks - for _ in 0 ..< 10 { - await scheduler.advance(by: TimeInterval(validatorNode.blockchain.config.value.slotPeriodSeconds)) + for _ in 0..<10 { + await scheduler.advance( + by: TimeInterval(validatorNode.blockchain.config.value.slotPeriodSeconds)) await validatorStoreMiddlware.wait() await nodeStoreMiddlware.wait() } + // Wait for sync try await Task.sleep(for: .milliseconds(500)) + // Verify sync let validatorBestHead = await validatorNode.dataProvider.bestHead let nodeBestHead = await node.dataProvider.bestHead + #expect(validatorBestHead.hash == nodeBestHead.hash) + // Produce more blocks - for _ in 0 ..< 10 { - await scheduler.advance(by: TimeInterval(validatorNode.blockchain.config.value.slotPeriodSeconds)) + for _ in 0..<10 { + await scheduler.advance( + by: TimeInterval(validatorNode.blockchain.config.value.slotPeriodSeconds)) await validatorStoreMiddlware.wait() await nodeStoreMiddlware.wait() } + try await Task.sleep(for: .milliseconds(500)) + await validatorStoreMiddlware.wait() await nodeStoreMiddlware.wait() + // Verify new blocks are synced let newValidatorBestHead = await validatorNode.dataProvider.bestHead let newNodeBestHead = await node.dataProvider.bestHead + #expect(newValidatorBestHead.hash == newNodeBestHead.hash) #expect(newValidatorBestHead.timeslot > validatorBestHead.timeslot) } + @Test func multiplePeers() async throws { // Create multiple nodes let (nodes, scheduler) = try await Topology( @@ -113,23 +140,30 @@ final class NodeTests { ], connections: [(0, 1), (0, 2), (0, 3), (1, 2), (1, 3)] ).build(genesis: .preset(.minimal)) + let (validator1, validator1StoreMiddlware) = nodes[0] let (validator2, validator2StoreMiddlware) = nodes[1] let (node1, node1StoreMiddlware) = nodes[2] let (node2, node2StoreMiddlware) = nodes[3] + try await Task.sleep(for: .milliseconds(nodes.count * 200)) + // Verify connections #expect(node1.network.peersCount == 2) #expect(node2.network.peersCount == 2) + // Advance time and verify sync - for _ in 0 ..< 10 { - await scheduler.advance(by: TimeInterval(validator1.blockchain.config.value.slotPeriodSeconds)) + for _ in 0..<10 { + await scheduler.advance( + by: TimeInterval(validator1.blockchain.config.value.slotPeriodSeconds)) await validator1StoreMiddlware.wait() await validator2StoreMiddlware.wait() await node1StoreMiddlware.wait() await node2StoreMiddlware.wait() } + try await Task.sleep(for: .milliseconds(nodes.count * 500)) + let validator1BestHead = await validator1.dataProvider.bestHead let validator2BestHead = await validator2.dataProvider.bestHead let node1BestHead = await node1.dataProvider.bestHead @@ -138,26 +172,32 @@ final class NodeTests { #expect(validator1BestHead.hash == node2BestHead.hash) #expect(validator2BestHead.hash == node1BestHead.hash) } + @Test func moreMultiplePeers() async throws { // Create multiple nodes var nodeDescriptions: [NodeDescription] = [ NodeDescription(isValidator: true, database: getDatabase(0)), - NodeDescription(isValidator: true, devSeed: 1, database: getDatabase(1)) + NodeDescription(isValidator: true, devSeed: 1, database: getDatabase(1)), ] + // Add 18 non-validator nodes for i in 2...19 { nodeDescriptions.append(NodeDescription(devSeed: UInt32(i), database: .inMemory)) } + let (nodes, scheduler) = try await Topology( nodes: nodeDescriptions, connections: (0..<20).flatMap { i in - (i + 1..<20).map { j in (i, j) } // Fully connected topology + (i + 1..<20).map { j in (i, j) } // Fully connected topology } ).build(genesis: .preset(.minimal)) + let (validator1, validator1StoreMiddlware) = nodes[0] let (validator2, validator2StoreMiddlware) = nodes[1] + // Extract non-validator nodes and their middleware - let nonValidatorNodes = nodes[2...].map { $0 } + let nonValidatorNodes = nodes[2...].map(\.self) + try await Task.sleep(for: .milliseconds(nodes.count * 100)) let (node1, node1StoreMiddlware) = nonValidatorNodes[0] let (node2, node2StoreMiddlware) = nonValidatorNodes[1] @@ -165,18 +205,22 @@ final class NodeTests { #expect(node1.network.peersCount == 19) #expect(node2.network.peersCount == 19) // Advance time and verify sync - for _ in 0 ..< 10 { - await scheduler.advance(by: TimeInterval(validator1.blockchain.config.value.slotPeriodSeconds)) + for _ in 0..<10 { + await scheduler.advance( + by: TimeInterval(validator1.blockchain.config.value.slotPeriodSeconds)) await validator1StoreMiddlware.wait() await validator2StoreMiddlware.wait() - + for (_, middleware) in nonValidatorNodes { await middleware.wait() } } + try await Task.sleep(for: .milliseconds(nodes.count * 200)) + let validator1BestHead = await validator1.dataProvider.bestHead let validator2BestHead = await validator2.dataProvider.bestHead + for (node, _) in nonValidatorNodes { let nodeBestHead = await node.dataProvider.bestHead #expect(validator1BestHead.hash == nodeBestHead.hash) diff --git a/Utils/Sources/Utils/Merklization/MMR.swift b/Utils/Sources/Utils/Merklization/MMR.swift index e782a224..591bd112 100644 --- a/Utils/Sources/Utils/Merklization/MMR.swift +++ b/Utils/Sources/Utils/Merklization/MMR.swift @@ -34,7 +34,7 @@ public struct MMR: Sendable, Equatable, Codable { } } - let nonNilPeaks = peaks.compactMap { $0 } + let nonNilPeaks = peaks.compactMap(\.self) return helper(nonNilPeaks[...]) } } From 38e0fa570e58747b29f5637f2962241b248ed3b7 Mon Sep 17 00:00:00 2001 From: MacOMNI <414294494@qq.com> Date: Thu, 9 Jan 2025 11:32:45 +0800 Subject: [PATCH 11/27] update tests --- Node/Tests/NodeTests/NodeTests.swift | 22 ++++++++-------------- Node/Tests/NodeTests/Topology.swift | 4 +++- 2 files changed, 11 insertions(+), 15 deletions(-) diff --git a/Node/Tests/NodeTests/NodeTests.swift b/Node/Tests/NodeTests/NodeTests.swift index b7ce96d6..29af3a40 100644 --- a/Node/Tests/NodeTests/NodeTests.swift +++ b/Node/Tests/NodeTests/NodeTests.swift @@ -31,8 +31,7 @@ final class NodeTests { // Advance time for _ in 0..<10 { - await scheduler.advance( - by: TimeInterval(validatorNode.blockchain.config.value.slotPeriodSeconds)) + await scheduler.advance(by: TimeInterval(validatorNode.blockchain.config.value.slotPeriodSeconds)) await storeMiddlware.wait() } @@ -61,8 +60,7 @@ final class NodeTests { // Advance time for _ in 0..<10 { - await scheduler.advance( - by: TimeInterval(validatorNode.blockchain.config.value.slotPeriodSeconds)) + await scheduler.advance(by: TimeInterval(validatorNode.blockchain.config.value.slotPeriodSeconds)) await storeMiddlware.wait() } @@ -93,8 +91,7 @@ final class NodeTests { // Advance time to produce blocks for _ in 0..<10 { - await scheduler.advance( - by: TimeInterval(validatorNode.blockchain.config.value.slotPeriodSeconds)) + await scheduler.advance(by: TimeInterval(validatorNode.blockchain.config.value.slotPeriodSeconds)) await validatorStoreMiddlware.wait() await nodeStoreMiddlware.wait() } @@ -110,8 +107,7 @@ final class NodeTests { // Produce more blocks for _ in 0..<10 { - await scheduler.advance( - by: TimeInterval(validatorNode.blockchain.config.value.slotPeriodSeconds)) + await scheduler.advance(by: TimeInterval(validatorNode.blockchain.config.value.slotPeriodSeconds)) await validatorStoreMiddlware.wait() await nodeStoreMiddlware.wait() } @@ -154,8 +150,7 @@ final class NodeTests { // Advance time and verify sync for _ in 0..<10 { - await scheduler.advance( - by: TimeInterval(validator1.blockchain.config.value.slotPeriodSeconds)) + await scheduler.advance(by: TimeInterval(validator1.blockchain.config.value.slotPeriodSeconds)) await validator1StoreMiddlware.wait() await validator2StoreMiddlware.wait() await node1StoreMiddlware.wait() @@ -199,15 +194,14 @@ final class NodeTests { let nonValidatorNodes = nodes[2...].map(\.self) try await Task.sleep(for: .milliseconds(nodes.count * 100)) - let (node1, node1StoreMiddlware) = nonValidatorNodes[0] - let (node2, node2StoreMiddlware) = nonValidatorNodes[1] + let (node1, _ ) = nonValidatorNodes[0] + let (node2, _ ) = nonValidatorNodes[1] // Verify connections for a sample of non-validator nodes #expect(node1.network.peersCount == 19) #expect(node2.network.peersCount == 19) // Advance time and verify sync for _ in 0..<10 { - await scheduler.advance( - by: TimeInterval(validator1.blockchain.config.value.slotPeriodSeconds)) + await scheduler.advance(by: TimeInterval(validator1.blockchain.config.value.slotPeriodSeconds)) await validator1StoreMiddlware.wait() await validator2StoreMiddlware.wait() diff --git a/Node/Tests/NodeTests/Topology.swift b/Node/Tests/NodeTests/Topology.swift index be8654c2..74625c96 100644 --- a/Node/Tests/NodeTests/Topology.swift +++ b/Node/Tests/NodeTests/Topology.swift @@ -63,7 +63,9 @@ struct Topology { for (from, to) in connections { let fromNode = ret[from].0 let toNode = ret[to].0 - _ = try fromNode.network.network.connect(to: toNode.network.network.listenAddress(), role: .validator) + let conn = try fromNode.network.network.connect(to: toNode.network.network.listenAddress(), role: .validator) + try? await conn.ready() + try? await Task.sleep(for: .milliseconds(50)) } return (ret, scheduler) } From a5d97d8e13355cbd01be9cab447b8d066880d1b2 Mon Sep 17 00:00:00 2001 From: MacOMNI <414294494@qq.com> Date: Thu, 9 Jan 2025 14:27:17 +0800 Subject: [PATCH 12/27] update node test --- .../Sources/Blockchain/Blockchain.swift | 3 +-- .../BlockchainDataProvider.swift | 2 +- Node/Tests/NodeTests/NodeTests.swift | 25 +++++++++++-------- Node/Tests/NodeTests/Topology.swift | 1 - 4 files changed, 16 insertions(+), 15 deletions(-) diff --git a/Blockchain/Sources/Blockchain/Blockchain.swift b/Blockchain/Sources/Blockchain/Blockchain.swift index 8b578e33..3288c2b3 100644 --- a/Blockchain/Sources/Blockchain/Blockchain.swift +++ b/Blockchain/Sources/Blockchain/Blockchain.swift @@ -48,8 +48,7 @@ public final class Blockchain: ServiceBase, @unchecked Sendable { try await dataProvider.blockImported(block: block, state: state) publish(RuntimeEvents.BlockImported(block: block, state: state, parentState: parent)) - - logger.info("Block imported: #\(block.header.timeslot) \(block.hash)") + logger.debug(" Import block: #\(block.header.timeslot) \(block.hash)") } } diff --git a/Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProvider.swift b/Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProvider.swift index 83a69a18..3ad95efb 100644 --- a/Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProvider.swift +++ b/Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProvider.swift @@ -53,7 +53,7 @@ public actor BlockchainDataProvider: Sendable { bestHead = HeadInfo(hash: block.hash, timeslot: block.header.timeslot, number: number) } - logger.info("block Imported: #\(bestHead.timeslot) \(block.hash)") + logger.debug("Block imported: #\(bestHead.timeslot) \(block.hash)") } } diff --git a/Node/Tests/NodeTests/NodeTests.swift b/Node/Tests/NodeTests/NodeTests.swift index 29af3a40..5ad651d3 100644 --- a/Node/Tests/NodeTests/NodeTests.swift +++ b/Node/Tests/NodeTests/NodeTests.swift @@ -6,13 +6,16 @@ import Utils @testable import Node final class NodeTests { + var dataBaseIndex: Int = 0; + let path = { let tmpDir = FileManager.default.temporaryDirectory return tmpDir.appendingPathComponent("\(UUID().uuidString)") }() - func getDatabase(_ idx: Int) -> Database { - Database.rocksDB(path: path.appendingPathComponent("\(idx)")) + func getDatabase() -> Database { + dataBaseIndex += 1; + return Database.rocksDB(path: path.appendingPathComponent("\(dataBaseIndex)")) } deinit { @@ -49,7 +52,7 @@ final class NodeTests { @Test func validatorNodeRocksDB() async throws { let (nodes, scheduler) = try await Topology( - nodes: [NodeDescription(isValidator: true, database: getDatabase(0))] + nodes: [NodeDescription(isValidator: true, database: getDatabase())] ).build(genesis: .preset(.minimal)) let (validatorNode, storeMiddlware) = nodes[0] @@ -80,8 +83,8 @@ final class NodeTests { // Create validator and full node let (nodes, scheduler) = try await Topology( nodes: [ - NodeDescription(isValidator: true, database: getDatabase(0)), - NodeDescription(devSeed: 1, database: getDatabase(1)), + NodeDescription(isValidator: true, database: getDatabase()), + NodeDescription(devSeed: 1, database: getDatabase()), ], connections: [(0, 1)] ).build(genesis: .preset(.minimal)) @@ -129,9 +132,9 @@ final class NodeTests { // Create multiple nodes let (nodes, scheduler) = try await Topology( nodes: [ - NodeDescription(isValidator: true, database: getDatabase(0)), - NodeDescription(isValidator: true, devSeed: 1, database: getDatabase(1)), - NodeDescription(devSeed: 2, database: getDatabase(2)), + NodeDescription(isValidator: true, database: getDatabase()), + NodeDescription(isValidator: true, devSeed: 1, database: getDatabase()), + NodeDescription(devSeed: 2, database: getDatabase()), NodeDescription(devSeed: 3, database: .inMemory), ], connections: [(0, 1), (0, 2), (0, 3), (1, 2), (1, 3)] @@ -171,8 +174,8 @@ final class NodeTests { @Test func moreMultiplePeers() async throws { // Create multiple nodes var nodeDescriptions: [NodeDescription] = [ - NodeDescription(isValidator: true, database: getDatabase(0)), - NodeDescription(isValidator: true, devSeed: 1, database: getDatabase(1)), + NodeDescription(isValidator: true, database: getDatabase()), + NodeDescription(isValidator: true, devSeed: 1, database: getDatabase()), ] // Add 18 non-validator nodes @@ -210,7 +213,7 @@ final class NodeTests { } } - try await Task.sleep(for: .milliseconds(nodes.count * 200)) + try await Task.sleep(for: .milliseconds(nodes.count * 500)) let validator1BestHead = await validator1.dataProvider.bestHead let validator2BestHead = await validator2.dataProvider.bestHead diff --git a/Node/Tests/NodeTests/Topology.swift b/Node/Tests/NodeTests/Topology.swift index 74625c96..7b42fb14 100644 --- a/Node/Tests/NodeTests/Topology.swift +++ b/Node/Tests/NodeTests/Topology.swift @@ -65,7 +65,6 @@ struct Topology { let toNode = ret[to].0 let conn = try fromNode.network.network.connect(to: toNode.network.network.listenAddress(), role: .validator) try? await conn.ready() - try? await Task.sleep(for: .milliseconds(50)) } return (ret, scheduler) } From b3514f9e4feb0c5dc2004bd818f950582e05e7cc Mon Sep 17 00:00:00 2001 From: MacOMNI <414294494@qq.com> Date: Thu, 9 Jan 2025 16:57:15 +0800 Subject: [PATCH 13/27] update logger --- .../Sources/Blockchain/Blockchain.swift | 1 - .../BlockchainDataProvider.swift | 2 + .../Sources/MsQuicSwift/QuicConnection.swift | 16 +++--- .../Sources/MsQuicSwift/QuicStream.swift | 16 +++--- .../Node/NetworkingProtocol/SyncManager.swift | 3 +- Node/Tests/NodeTests/NodeTests.swift | 55 ++++++++++++------- Node/Tests/NodeTests/Topology.swift | 1 + 7 files changed, 55 insertions(+), 39 deletions(-) diff --git a/Blockchain/Sources/Blockchain/Blockchain.swift b/Blockchain/Sources/Blockchain/Blockchain.swift index 3288c2b3..463145b2 100644 --- a/Blockchain/Sources/Blockchain/Blockchain.swift +++ b/Blockchain/Sources/Blockchain/Blockchain.swift @@ -48,7 +48,6 @@ public final class Blockchain: ServiceBase, @unchecked Sendable { try await dataProvider.blockImported(block: block, state: state) publish(RuntimeEvents.BlockImported(block: block, state: state, parentState: parent)) - logger.debug(" Import block: #\(block.header.timeslot) \(block.hash)") } } diff --git a/Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProvider.swift b/Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProvider.swift index 3ad95efb..611f38d1 100644 --- a/Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProvider.swift +++ b/Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProvider.swift @@ -27,6 +27,8 @@ public actor BlockchainDataProvider: Sendable { if header.value.timeslot > bestHead.timeslot { let number = try await dataProvider.getBlockNumber(hash: head).unwrap() bestHead = HeadInfo(hash: head, timeslot: header.value.timeslot, number: number) + } else { + logger.warning("Found a block with timeslot \(header.value.timeslot) but best head is \(bestHead.timeslot)") } } diff --git a/Networking/Sources/MsQuicSwift/QuicConnection.swift b/Networking/Sources/MsQuicSwift/QuicConnection.swift index 48b88ff8..021e4f8d 100644 --- a/Networking/Sources/MsQuicSwift/QuicConnection.swift +++ b/Networking/Sources/MsQuicSwift/QuicConnection.swift @@ -102,7 +102,7 @@ public final class QuicConnection: Sendable { } public func connect(to address: NetAddr) throws { - logger.debug("connecting to \(address)") + logger.info("connecting to \(address)") try storage.write { storage in guard var storage2 = storage else { throw QuicError.alreadyClosed @@ -131,7 +131,7 @@ public final class QuicConnection: Sendable { } public func shutdown(errorCode: QuicErrorCode = .success) throws { - logger.debug("closing connection") + logger.info("closing connection") try storage.write { storage in guard let storage2 = storage else { throw QuicError.alreadyClosed @@ -152,7 +152,7 @@ public final class QuicConnection: Sendable { } public func createStream() throws -> QuicStream { - logger.debug("creating stream") + logger.info("creating stream") return try storage.read { storage in guard let storage, storage.state == .started else { @@ -210,7 +210,7 @@ private class ConnectionHandle { fileprivate func callbackHandler(event: UnsafePointer) -> QuicStatus { switch event.pointee.Type { case QUIC_CONNECTION_EVENT_PEER_CERTIFICATE_RECEIVED: - logger.debug("Peer certificate received") + logger.info("Peer certificate received") if let connection { let evtData = event.pointee.PEER_CERTIFICATE_RECEIVED let data: Data? @@ -239,7 +239,7 @@ private class ConnectionHandle { connection.handler.shutdownInitiated(connection, reason: .idle) } } else { - logger.debug("Shut down by transport. Status: \(status) Error: \(evtData.ErrorCode)") + logger.info("Shut down by transport. Status: \(status) Error: \(evtData.ErrorCode)") if let connection { connection.handler.shutdownInitiated( connection, @@ -249,14 +249,14 @@ private class ConnectionHandle { } case QUIC_CONNECTION_EVENT_SHUTDOWN_INITIATED_BY_PEER: - logger.debug("Shut down by peer. Error: \(event.pointee.SHUTDOWN_INITIATED_BY_PEER.ErrorCode)") + logger.info("Shut down by peer. Error: \(event.pointee.SHUTDOWN_INITIATED_BY_PEER.ErrorCode)") if let connection { let errorCode = QuicErrorCode(event.pointee.SHUTDOWN_INITIATED_BY_PEER.ErrorCode) connection.handler.shutdownInitiated(connection, reason: .byPeer(code: errorCode)) } case QUIC_CONNECTION_EVENT_SHUTDOWN_COMPLETE: - logger.debug("Shutdown complete") + logger.info("Shutdown complete") if let connection { connection.handler.shutdownComplete(connection) } @@ -270,7 +270,7 @@ private class ConnectionHandle { Unmanaged.passUnretained(self).release() // !! release -1 case QUIC_CONNECTION_EVENT_PEER_STREAM_STARTED: - logger.debug("Peer stream started") + logger.info("Peer stream started") let streamPtr = event.pointee.PEER_STREAM_STARTED.Stream if let connection, let streamPtr, connection.api != nil { let stream = QuicStream(connection: connection, stream: streamPtr, handler: connection.handler) diff --git a/Networking/Sources/MsQuicSwift/QuicStream.swift b/Networking/Sources/MsQuicSwift/QuicStream.swift index 22f16c4b..7febfa47 100644 --- a/Networking/Sources/MsQuicSwift/QuicStream.swift +++ b/Networking/Sources/MsQuicSwift/QuicStream.swift @@ -75,7 +75,7 @@ public final class QuicStream: Sendable { } public func shutdown(errorCode: QuicErrorCode = .success) throws { - logger.debug("closing stream \(errorCode)") + logger.info("closing stream \(errorCode)") try storage.write { storage in guard let storage2 = storage else { @@ -94,7 +94,7 @@ public final class QuicStream: Sendable { } public func send(data: Data, start: Bool = false, finish: Bool = false) throws { - logger.trace("Sending \(data.count) bytes") + logger.info("Sending \(data.count) bytes") try storage.read { storage in guard let storage, let api = storage.connection.api else { @@ -104,7 +104,7 @@ public final class QuicStream: Sendable { let messageLength = data.count if messageLength == 0 { - logger.trace("No data to send.") + logger.info("No data to send.") throw SendError.emptyData // Throw a specific error or return } @@ -173,7 +173,7 @@ private class StreamHandle { fileprivate func callbackHandler(event: UnsafePointer) -> QuicStatus { switch event.pointee.Type { case QUIC_STREAM_EVENT_SEND_COMPLETE: - logger.trace("Stream send completed") + logger.info("Stream send completed") if let clientContext = event.pointee.SEND_COMPLETE.ClientContext { clientContext.deallocate() // !! deallocate } @@ -188,7 +188,7 @@ private class StreamHandle { totalSize += Int(buffer.Length) } - logger.trace("Stream received \(totalSize) bytes") + logger.info("Stream received \(totalSize) bytes") var receivedData = Data(capacity: totalSize) @@ -207,16 +207,16 @@ private class StreamHandle { } case QUIC_STREAM_EVENT_PEER_SEND_SHUTDOWN: - logger.trace("Peer send shutdown") + logger.info("Peer send shutdown") if let stream { stream.handler.dataReceived(stream, data: nil) } case QUIC_STREAM_EVENT_PEER_SEND_ABORTED: - logger.trace("Peer send aborted") + logger.info("Peer send aborted") case QUIC_STREAM_EVENT_SHUTDOWN_COMPLETE: - logger.trace("Stream shutdown complete") + logger.info("Stream shutdown complete") let evtData = event.pointee.SHUTDOWN_COMPLETE if let stream { diff --git a/Node/Sources/Node/NetworkingProtocol/SyncManager.swift b/Node/Sources/Node/NetworkingProtocol/SyncManager.swift index 7fa410b8..0569a4d3 100644 --- a/Node/Sources/Node/NetworkingProtocol/SyncManager.swift +++ b/Node/Sources/Node/NetworkingProtocol/SyncManager.swift @@ -144,7 +144,7 @@ public actor SyncManager: Sendable { } private func importBlock(currentTimeslot: TimeslotIndex, newHeader: HeaderRef, peer: PeerId) { - logger.debug("importing block", metadata: ["hash": "\(newHeader.hash)", "remote": "\(peer)"]) + logger.info("importing block", metadata: ["hash": "\(newHeader.hash)", "remote": "\(peer)"]) let blockchain = blockchain let network = network Task.detached { @@ -163,6 +163,7 @@ public actor SyncManager: Sendable { } // reverse to import old block first for block in blocks.reversed() { + logger.info("blocks reversed", metadata: ["hash": "\(String(describing: block.hash))"]) try await blockchain.importBlock(block) } } catch { diff --git a/Node/Tests/NodeTests/NodeTests.swift b/Node/Tests/NodeTests/NodeTests.swift index 5ad651d3..00a910b9 100644 --- a/Node/Tests/NodeTests/NodeTests.swift +++ b/Node/Tests/NodeTests/NodeTests.swift @@ -130,45 +130,58 @@ final class NodeTests { @Test func multiplePeers() async throws { // Create multiple nodes + var nodeDescriptions: [NodeDescription] = [ + NodeDescription(isValidator: true, database: getDatabase()), + NodeDescription(isValidator: true, devSeed: 1, database: getDatabase()), + ] + // Add 18 non-validator nodes + for i in 2...10 { + nodeDescriptions.append(NodeDescription(devSeed: UInt32(i), database: getDatabase())) + } + for i in 11...19 { + nodeDescriptions.append(NodeDescription(devSeed: UInt32(i), database: .inMemory)) + } + let (nodes, scheduler) = try await Topology( - nodes: [ - NodeDescription(isValidator: true, database: getDatabase()), - NodeDescription(isValidator: true, devSeed: 1, database: getDatabase()), - NodeDescription(devSeed: 2, database: getDatabase()), - NodeDescription(devSeed: 3, database: .inMemory), - ], - connections: [(0, 1), (0, 2), (0, 3), (1, 2), (1, 3)] + nodes: nodeDescriptions, + connections: (0..<20).flatMap { i in + (i + 1..<20).map { j in (i, j) } // Fully connected topology + } ).build(genesis: .preset(.minimal)) let (validator1, validator1StoreMiddlware) = nodes[0] let (validator2, validator2StoreMiddlware) = nodes[1] - let (node1, node1StoreMiddlware) = nodes[2] - let (node2, node2StoreMiddlware) = nodes[3] - try await Task.sleep(for: .milliseconds(nodes.count * 200)) - - // Verify connections - #expect(node1.network.peersCount == 2) - #expect(node2.network.peersCount == 2) + // Extract non-validator nodes and their middleware + let nonValidatorNodes = nodes[2...].map(\.self) + try await Task.sleep(for: .milliseconds(nodes.count * 100)) + let (node1, _ ) = nonValidatorNodes[0] + let (node2, _ ) = nonValidatorNodes[1] + // Verify connections for a sample of non-validator nodes + #expect(node1.network.peersCount == 19) + #expect(node2.network.peersCount == 19) // Advance time and verify sync for _ in 0..<10 { await scheduler.advance(by: TimeInterval(validator1.blockchain.config.value.slotPeriodSeconds)) await validator1StoreMiddlware.wait() await validator2StoreMiddlware.wait() - await node1StoreMiddlware.wait() - await node2StoreMiddlware.wait() + + for (_, middleware) in nonValidatorNodes { + await middleware.wait() + } } try await Task.sleep(for: .milliseconds(nodes.count * 500)) let validator1BestHead = await validator1.dataProvider.bestHead let validator2BestHead = await validator2.dataProvider.bestHead - let node1BestHead = await node1.dataProvider.bestHead - let node2BestHead = await node2.dataProvider.bestHead - #expect(validator1BestHead.hash == node1BestHead.hash) - #expect(validator1BestHead.hash == node2BestHead.hash) - #expect(validator2BestHead.hash == node1BestHead.hash) + + for (node, _) in nonValidatorNodes { + let nodeBestHead = await node.dataProvider.bestHead + #expect(validator1BestHead.hash == nodeBestHead.hash) + #expect(validator2BestHead.hash == nodeBestHead.hash) + } } @Test func moreMultiplePeers() async throws { diff --git a/Node/Tests/NodeTests/Topology.swift b/Node/Tests/NodeTests/Topology.swift index 7b42fb14..0738182b 100644 --- a/Node/Tests/NodeTests/Topology.swift +++ b/Node/Tests/NodeTests/Topology.swift @@ -65,6 +65,7 @@ struct Topology { let toNode = ret[to].0 let conn = try fromNode.network.network.connect(to: toNode.network.network.listenAddress(), role: .validator) try? await conn.ready() + try await Task.sleep(for: .milliseconds(50)) } return (ret, scheduler) } From 8793718bf4c6c035c470cd22331f734bfb7fe2b5 Mon Sep 17 00:00:00 2001 From: MacOMNI <414294494@qq.com> Date: Thu, 9 Jan 2025 17:56:06 +0800 Subject: [PATCH 14/27] update test --- .../BlockchainDataProvider.swift | 3 ++- Database/Sources/Database/RocksDBBackend.swift | 2 +- Node/Sources/Node/Config.swift | 4 ++-- Node/Tests/NodeTests/NodeTests.swift | 17 ++++++++--------- Node/Tests/NodeTests/Topology.swift | 1 - 5 files changed, 13 insertions(+), 14 deletions(-) diff --git a/Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProvider.swift b/Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProvider.swift index 611f38d1..ccbfca01 100644 --- a/Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProvider.swift +++ b/Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProvider.swift @@ -27,6 +27,7 @@ public actor BlockchainDataProvider: Sendable { if header.value.timeslot > bestHead.timeslot { let number = try await dataProvider.getBlockNumber(hash: head).unwrap() bestHead = HeadInfo(hash: head, timeslot: header.value.timeslot, number: number) + logger.info("best head with timeslot \(header.value.timeslot) updated to \(bestHead.hash)") } else { logger.warning("Found a block with timeslot \(header.value.timeslot) but best head is \(bestHead.timeslot)") } @@ -102,7 +103,7 @@ extension BlockchainDataProvider { // add forks of finalized head is not allowed public func add(block: BlockRef) async throws { - logger.debug("adding block: \(block.hash)") + logger.info("adding block: \(block.hash)") // require parent exists (i.e. not purged) and block is not fork of any finalized block guard try await hasBlock(hash: block.header.parentHash), block.header.timeslot > finalizedHead.timeslot else { diff --git a/Database/Sources/Database/RocksDBBackend.swift b/Database/Sources/Database/RocksDBBackend.swift index 83034f1f..649b42d0 100644 --- a/Database/Sources/Database/RocksDBBackend.swift +++ b/Database/Sources/Database/RocksDBBackend.swift @@ -126,7 +126,7 @@ extension RocksDBBackend: BlockchainDataProviderProtocol { } public func add(block: BlockRef) async throws { - logger.trace("add(block:) \(block.hash)") + logger.info("add(block:) \(block.hash)") // TODO: batch put diff --git a/Node/Sources/Node/Config.swift b/Node/Sources/Node/Config.swift index a0989727..958d6fe1 100644 --- a/Node/Sources/Node/Config.swift +++ b/Node/Sources/Node/Config.swift @@ -18,7 +18,7 @@ public enum Database { public func open(chainspec: ChainSpec) async throws -> BlockchainDataProvider { switch self { case let .rocksDB(path): - logger.info("Using RocksDB backend at \(path.absoluteString)") + logger.debug("Using RocksDB backend at \(path.absoluteString)") let backend = try await RocksDBBackend( path: path, config: chainspec.getConfig(), @@ -27,7 +27,7 @@ public enum Database { ) return try await BlockchainDataProvider(backend) case .inMemory: - logger.info("Using in-memory backend") + logger.debug("Using in-memory backend") let genesisBlock = try chainspec.getBlock() let genesisStateData = try chainspec.getState() let backend = try StateBackend(InMemoryBackend(), config: chainspec.getConfig(), rootHash: Data32()) diff --git a/Node/Tests/NodeTests/NodeTests.swift b/Node/Tests/NodeTests/NodeTests.swift index 00a910b9..4544d2fa 100644 --- a/Node/Tests/NodeTests/NodeTests.swift +++ b/Node/Tests/NodeTests/NodeTests.swift @@ -6,13 +6,14 @@ import Utils @testable import Node final class NodeTests { - var dataBaseIndex: Int = 0; + + var dataBaseIndex: Int = 0 let path = { let tmpDir = FileManager.default.temporaryDirectory return tmpDir.appendingPathComponent("\(UUID().uuidString)") }() - + func getDatabase() -> Database { dataBaseIndex += 1; return Database.rocksDB(path: path.appendingPathComponent("\(dataBaseIndex)")) @@ -21,6 +22,7 @@ final class NodeTests { deinit { try? FileManager.default.removeItem(at: path) } + @Test func validatorNodeInMemory() async throws { let (nodes, scheduler) = try await Topology( nodes: [NodeDescription(isValidator: true)] @@ -135,12 +137,9 @@ final class NodeTests { NodeDescription(isValidator: true, devSeed: 1, database: getDatabase()), ] // Add 18 non-validator nodes - for i in 2...10 { + for i in 2...19 { nodeDescriptions.append(NodeDescription(devSeed: UInt32(i), database: getDatabase())) } - for i in 11...19 { - nodeDescriptions.append(NodeDescription(devSeed: UInt32(i), database: .inMemory)) - } let (nodes, scheduler) = try await Topology( nodes: nodeDescriptions, @@ -162,7 +161,7 @@ final class NodeTests { #expect(node1.network.peersCount == 19) #expect(node2.network.peersCount == 19) // Advance time and verify sync - for _ in 0..<10 { + for _ in 0..<20 { await scheduler.advance(by: TimeInterval(validator1.blockchain.config.value.slotPeriodSeconds)) await validator1StoreMiddlware.wait() await validator2StoreMiddlware.wait() @@ -172,7 +171,7 @@ final class NodeTests { } } - try await Task.sleep(for: .milliseconds(nodes.count * 500)) + try await Task.sleep(for: .milliseconds(nodes.count * 100)) let validator1BestHead = await validator1.dataProvider.bestHead let validator2BestHead = await validator2.dataProvider.bestHead @@ -216,7 +215,7 @@ final class NodeTests { #expect(node1.network.peersCount == 19) #expect(node2.network.peersCount == 19) // Advance time and verify sync - for _ in 0..<10 { + for _ in 0..<3 { await scheduler.advance(by: TimeInterval(validator1.blockchain.config.value.slotPeriodSeconds)) await validator1StoreMiddlware.wait() await validator2StoreMiddlware.wait() diff --git a/Node/Tests/NodeTests/Topology.swift b/Node/Tests/NodeTests/Topology.swift index 0738182b..7b42fb14 100644 --- a/Node/Tests/NodeTests/Topology.swift +++ b/Node/Tests/NodeTests/Topology.swift @@ -65,7 +65,6 @@ struct Topology { let toNode = ret[to].0 let conn = try fromNode.network.network.connect(to: toNode.network.network.listenAddress(), role: .validator) try? await conn.ready() - try await Task.sleep(for: .milliseconds(50)) } return (ret, scheduler) } From b85254f6585910d46edeb95cc56e03f7af2ba9a8 Mon Sep 17 00:00:00 2001 From: MacOMNI <414294494@qq.com> Date: Thu, 9 Jan 2025 20:00:57 +0800 Subject: [PATCH 15/27] update node --- Node/Tests/NodeTests/NodeTests.swift | 35 ++++++++++++++++------------ 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/Node/Tests/NodeTests/NodeTests.swift b/Node/Tests/NodeTests/NodeTests.swift index 4544d2fa..e6dda530 100644 --- a/Node/Tests/NodeTests/NodeTests.swift +++ b/Node/Tests/NodeTests/NodeTests.swift @@ -6,23 +6,22 @@ import Utils @testable import Node final class NodeTests { - var dataBaseIndex: Int = 0 - + let path = { let tmpDir = FileManager.default.temporaryDirectory return tmpDir.appendingPathComponent("\(UUID().uuidString)") }() - + func getDatabase() -> Database { - dataBaseIndex += 1; + dataBaseIndex += 1 return Database.rocksDB(path: path.appendingPathComponent("\(dataBaseIndex)")) } deinit { try? FileManager.default.removeItem(at: path) } - + @Test func validatorNodeInMemory() async throws { let (nodes, scheduler) = try await Topology( nodes: [NodeDescription(isValidator: true)] @@ -36,7 +35,8 @@ final class NodeTests { // Advance time for _ in 0..<10 { - await scheduler.advance(by: TimeInterval(validatorNode.blockchain.config.value.slotPeriodSeconds)) + await scheduler.advance( + by: TimeInterval(validatorNode.blockchain.config.value.slotPeriodSeconds)) await storeMiddlware.wait() } @@ -65,7 +65,8 @@ final class NodeTests { // Advance time for _ in 0..<10 { - await scheduler.advance(by: TimeInterval(validatorNode.blockchain.config.value.slotPeriodSeconds)) + await scheduler.advance( + by: TimeInterval(validatorNode.blockchain.config.value.slotPeriodSeconds)) await storeMiddlware.wait() } @@ -96,7 +97,8 @@ final class NodeTests { // Advance time to produce blocks for _ in 0..<10 { - await scheduler.advance(by: TimeInterval(validatorNode.blockchain.config.value.slotPeriodSeconds)) + await scheduler.advance( + by: TimeInterval(validatorNode.blockchain.config.value.slotPeriodSeconds)) await validatorStoreMiddlware.wait() await nodeStoreMiddlware.wait() } @@ -112,7 +114,8 @@ final class NodeTests { // Produce more blocks for _ in 0..<10 { - await scheduler.advance(by: TimeInterval(validatorNode.blockchain.config.value.slotPeriodSeconds)) + await scheduler.advance( + by: TimeInterval(validatorNode.blockchain.config.value.slotPeriodSeconds)) await validatorStoreMiddlware.wait() await nodeStoreMiddlware.wait() } @@ -155,14 +158,15 @@ final class NodeTests { let nonValidatorNodes = nodes[2...].map(\.self) try await Task.sleep(for: .milliseconds(nodes.count * 100)) - let (node1, _ ) = nonValidatorNodes[0] - let (node2, _ ) = nonValidatorNodes[1] + let (node1, _) = nonValidatorNodes[0] + let (node2, _) = nonValidatorNodes[1] // Verify connections for a sample of non-validator nodes #expect(node1.network.peersCount == 19) #expect(node2.network.peersCount == 19) // Advance time and verify sync for _ in 0..<20 { - await scheduler.advance(by: TimeInterval(validator1.blockchain.config.value.slotPeriodSeconds)) + await scheduler.advance( + by: TimeInterval(validator1.blockchain.config.value.slotPeriodSeconds)) await validator1StoreMiddlware.wait() await validator2StoreMiddlware.wait() @@ -209,14 +213,15 @@ final class NodeTests { let nonValidatorNodes = nodes[2...].map(\.self) try await Task.sleep(for: .milliseconds(nodes.count * 100)) - let (node1, _ ) = nonValidatorNodes[0] - let (node2, _ ) = nonValidatorNodes[1] + let (node1, _) = nonValidatorNodes[0] + let (node2, _) = nonValidatorNodes[1] // Verify connections for a sample of non-validator nodes #expect(node1.network.peersCount == 19) #expect(node2.network.peersCount == 19) // Advance time and verify sync for _ in 0..<3 { - await scheduler.advance(by: TimeInterval(validator1.blockchain.config.value.slotPeriodSeconds)) + await scheduler.advance( + by: TimeInterval(validator1.blockchain.config.value.slotPeriodSeconds)) await validator1StoreMiddlware.wait() await validator2StoreMiddlware.wait() From 5c68e4acfd3ad14d13445655fe4d7c31b263f740 Mon Sep 17 00:00:00 2001 From: MacOMNI <414294494@qq.com> Date: Fri, 10 Jan 2025 10:36:48 +0800 Subject: [PATCH 16/27] update tests --- .../BlockchainDataProvider.swift | 3 +-- .../Blockchain/Validator/BlockAuthor.swift | 2 +- Database/Sources/Database/RocksDBBackend.swift | 2 +- .../Sources/MsQuicSwift/QuicConnection.swift | 16 ++++++++-------- Networking/Sources/MsQuicSwift/QuicStream.swift | 16 ++++++++-------- .../Node/NetworkingProtocol/SyncManager.swift | 4 ++-- Node/Tests/NodeTests/NodeTests.swift | 10 +++------- .../Sources/Utils/EventBus/StoreMiddleware.swift | 2 +- 8 files changed, 25 insertions(+), 30 deletions(-) diff --git a/Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProvider.swift b/Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProvider.swift index ccbfca01..611f38d1 100644 --- a/Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProvider.swift +++ b/Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProvider.swift @@ -27,7 +27,6 @@ public actor BlockchainDataProvider: Sendable { if header.value.timeslot > bestHead.timeslot { let number = try await dataProvider.getBlockNumber(hash: head).unwrap() bestHead = HeadInfo(hash: head, timeslot: header.value.timeslot, number: number) - logger.info("best head with timeslot \(header.value.timeslot) updated to \(bestHead.hash)") } else { logger.warning("Found a block with timeslot \(header.value.timeslot) but best head is \(bestHead.timeslot)") } @@ -103,7 +102,7 @@ extension BlockchainDataProvider { // add forks of finalized head is not allowed public func add(block: BlockRef) async throws { - logger.info("adding block: \(block.hash)") + logger.debug("adding block: \(block.hash)") // require parent exists (i.e. not purged) and block is not fork of any finalized block guard try await hasBlock(hash: block.header.parentHash), block.header.timeslot > finalizedHead.timeslot else { diff --git a/Blockchain/Sources/Blockchain/Validator/BlockAuthor.swift b/Blockchain/Sources/Blockchain/Validator/BlockAuthor.swift index a55f72e8..75baa9d3 100644 --- a/Blockchain/Sources/Blockchain/Validator/BlockAuthor.swift +++ b/Blockchain/Sources/Blockchain/Validator/BlockAuthor.swift @@ -163,7 +163,7 @@ public final class BlockAuthor: ServiceBase2, @unchecked Sendable { await withSpan("BlockAuthor.newBlock", logger: logger) { _ in // TODO: add timeout let block = try await createNewBlock(timeslot: timeslot, claim: claim) - logger.info("New block created: #\(block.header.timeslot) \(block.hash) on parent #\(block.header.parentHash)") + logger.debug("New block created: #\(block.header.timeslot) \(block.hash) on parent #\(block.header.parentHash)") publish(RuntimeEvents.BlockAuthored(block: block)) } } diff --git a/Database/Sources/Database/RocksDBBackend.swift b/Database/Sources/Database/RocksDBBackend.swift index 649b42d0..a56dddbd 100644 --- a/Database/Sources/Database/RocksDBBackend.swift +++ b/Database/Sources/Database/RocksDBBackend.swift @@ -126,7 +126,7 @@ extension RocksDBBackend: BlockchainDataProviderProtocol { } public func add(block: BlockRef) async throws { - logger.info("add(block:) \(block.hash)") + logger.debug("add(block:) \(block.hash)") // TODO: batch put diff --git a/Networking/Sources/MsQuicSwift/QuicConnection.swift b/Networking/Sources/MsQuicSwift/QuicConnection.swift index 021e4f8d..48b88ff8 100644 --- a/Networking/Sources/MsQuicSwift/QuicConnection.swift +++ b/Networking/Sources/MsQuicSwift/QuicConnection.swift @@ -102,7 +102,7 @@ public final class QuicConnection: Sendable { } public func connect(to address: NetAddr) throws { - logger.info("connecting to \(address)") + logger.debug("connecting to \(address)") try storage.write { storage in guard var storage2 = storage else { throw QuicError.alreadyClosed @@ -131,7 +131,7 @@ public final class QuicConnection: Sendable { } public func shutdown(errorCode: QuicErrorCode = .success) throws { - logger.info("closing connection") + logger.debug("closing connection") try storage.write { storage in guard let storage2 = storage else { throw QuicError.alreadyClosed @@ -152,7 +152,7 @@ public final class QuicConnection: Sendable { } public func createStream() throws -> QuicStream { - logger.info("creating stream") + logger.debug("creating stream") return try storage.read { storage in guard let storage, storage.state == .started else { @@ -210,7 +210,7 @@ private class ConnectionHandle { fileprivate func callbackHandler(event: UnsafePointer) -> QuicStatus { switch event.pointee.Type { case QUIC_CONNECTION_EVENT_PEER_CERTIFICATE_RECEIVED: - logger.info("Peer certificate received") + logger.debug("Peer certificate received") if let connection { let evtData = event.pointee.PEER_CERTIFICATE_RECEIVED let data: Data? @@ -239,7 +239,7 @@ private class ConnectionHandle { connection.handler.shutdownInitiated(connection, reason: .idle) } } else { - logger.info("Shut down by transport. Status: \(status) Error: \(evtData.ErrorCode)") + logger.debug("Shut down by transport. Status: \(status) Error: \(evtData.ErrorCode)") if let connection { connection.handler.shutdownInitiated( connection, @@ -249,14 +249,14 @@ private class ConnectionHandle { } case QUIC_CONNECTION_EVENT_SHUTDOWN_INITIATED_BY_PEER: - logger.info("Shut down by peer. Error: \(event.pointee.SHUTDOWN_INITIATED_BY_PEER.ErrorCode)") + logger.debug("Shut down by peer. Error: \(event.pointee.SHUTDOWN_INITIATED_BY_PEER.ErrorCode)") if let connection { let errorCode = QuicErrorCode(event.pointee.SHUTDOWN_INITIATED_BY_PEER.ErrorCode) connection.handler.shutdownInitiated(connection, reason: .byPeer(code: errorCode)) } case QUIC_CONNECTION_EVENT_SHUTDOWN_COMPLETE: - logger.info("Shutdown complete") + logger.debug("Shutdown complete") if let connection { connection.handler.shutdownComplete(connection) } @@ -270,7 +270,7 @@ private class ConnectionHandle { Unmanaged.passUnretained(self).release() // !! release -1 case QUIC_CONNECTION_EVENT_PEER_STREAM_STARTED: - logger.info("Peer stream started") + logger.debug("Peer stream started") let streamPtr = event.pointee.PEER_STREAM_STARTED.Stream if let connection, let streamPtr, connection.api != nil { let stream = QuicStream(connection: connection, stream: streamPtr, handler: connection.handler) diff --git a/Networking/Sources/MsQuicSwift/QuicStream.swift b/Networking/Sources/MsQuicSwift/QuicStream.swift index 7febfa47..8f094f88 100644 --- a/Networking/Sources/MsQuicSwift/QuicStream.swift +++ b/Networking/Sources/MsQuicSwift/QuicStream.swift @@ -75,7 +75,7 @@ public final class QuicStream: Sendable { } public func shutdown(errorCode: QuicErrorCode = .success) throws { - logger.info("closing stream \(errorCode)") + logger.debug("closing stream \(errorCode)") try storage.write { storage in guard let storage2 = storage else { @@ -94,7 +94,7 @@ public final class QuicStream: Sendable { } public func send(data: Data, start: Bool = false, finish: Bool = false) throws { - logger.info("Sending \(data.count) bytes") + logger.debug("Sending \(data.count) bytes") try storage.read { storage in guard let storage, let api = storage.connection.api else { @@ -104,7 +104,7 @@ public final class QuicStream: Sendable { let messageLength = data.count if messageLength == 0 { - logger.info("No data to send.") + logger.debug("No data to send.") throw SendError.emptyData // Throw a specific error or return } @@ -173,7 +173,7 @@ private class StreamHandle { fileprivate func callbackHandler(event: UnsafePointer) -> QuicStatus { switch event.pointee.Type { case QUIC_STREAM_EVENT_SEND_COMPLETE: - logger.info("Stream send completed") + logger.debug("Stream send completed") if let clientContext = event.pointee.SEND_COMPLETE.ClientContext { clientContext.deallocate() // !! deallocate } @@ -188,7 +188,7 @@ private class StreamHandle { totalSize += Int(buffer.Length) } - logger.info("Stream received \(totalSize) bytes") + logger.debug("Stream received \(totalSize) bytes") var receivedData = Data(capacity: totalSize) @@ -207,16 +207,16 @@ private class StreamHandle { } case QUIC_STREAM_EVENT_PEER_SEND_SHUTDOWN: - logger.info("Peer send shutdown") + logger.trace("Peer send shutdown") if let stream { stream.handler.dataReceived(stream, data: nil) } case QUIC_STREAM_EVENT_PEER_SEND_ABORTED: - logger.info("Peer send aborted") + logger.trace("Peer send aborted") case QUIC_STREAM_EVENT_SHUTDOWN_COMPLETE: - logger.info("Stream shutdown complete") + logger.trace("Stream shutdown complete") let evtData = event.pointee.SHUTDOWN_COMPLETE if let stream { diff --git a/Node/Sources/Node/NetworkingProtocol/SyncManager.swift b/Node/Sources/Node/NetworkingProtocol/SyncManager.swift index 0569a4d3..7c805920 100644 --- a/Node/Sources/Node/NetworkingProtocol/SyncManager.swift +++ b/Node/Sources/Node/NetworkingProtocol/SyncManager.swift @@ -144,7 +144,7 @@ public actor SyncManager: Sendable { } private func importBlock(currentTimeslot: TimeslotIndex, newHeader: HeaderRef, peer: PeerId) { - logger.info("importing block", metadata: ["hash": "\(newHeader.hash)", "remote": "\(peer)"]) + logger.debug("importing block", metadata: ["hash": "\(newHeader.hash)", "remote": "\(peer)"]) let blockchain = blockchain let network = network Task.detached { @@ -163,7 +163,7 @@ public actor SyncManager: Sendable { } // reverse to import old block first for block in blocks.reversed() { - logger.info("blocks reversed", metadata: ["hash": "\(String(describing: block.hash))"]) + logger.debug("blocks reversed", metadata: ["hash": "\(String(describing: block.hash))"]) try await blockchain.importBlock(block) } } catch { diff --git a/Node/Tests/NodeTests/NodeTests.swift b/Node/Tests/NodeTests/NodeTests.swift index e6dda530..122cf548 100644 --- a/Node/Tests/NodeTests/NodeTests.swift +++ b/Node/Tests/NodeTests/NodeTests.swift @@ -163,7 +163,7 @@ final class NodeTests { // Verify connections for a sample of non-validator nodes #expect(node1.network.peersCount == 19) #expect(node2.network.peersCount == 19) - // Advance time and verify sync + // Advance time to produce blocks for _ in 0..<20 { await scheduler.advance( by: TimeInterval(validator1.blockchain.config.value.slotPeriodSeconds)) @@ -175,8 +175,6 @@ final class NodeTests { } } - try await Task.sleep(for: .milliseconds(nodes.count * 100)) - let validator1BestHead = await validator1.dataProvider.bestHead let validator2BestHead = await validator2.dataProvider.bestHead @@ -218,8 +216,8 @@ final class NodeTests { // Verify connections for a sample of non-validator nodes #expect(node1.network.peersCount == 19) #expect(node2.network.peersCount == 19) - // Advance time and verify sync - for _ in 0..<3 { + // Advance time to produce blocks + for _ in 0..<20 { await scheduler.advance( by: TimeInterval(validator1.blockchain.config.value.slotPeriodSeconds)) await validator1StoreMiddlware.wait() @@ -230,8 +228,6 @@ final class NodeTests { } } - try await Task.sleep(for: .milliseconds(nodes.count * 500)) - let validator1BestHead = await validator1.dataProvider.bestHead let validator2BestHead = await validator2.dataProvider.bestHead diff --git a/Utils/Sources/Utils/EventBus/StoreMiddleware.swift b/Utils/Sources/Utils/EventBus/StoreMiddleware.swift index cfc0cbcd..7071dbad 100644 --- a/Utils/Sources/Utils/EventBus/StoreMiddleware.swift +++ b/Utils/Sources/Utils/EventBus/StoreMiddleware.swift @@ -21,7 +21,7 @@ public struct StoreMiddleware: MiddlewareProtocol { @discardableResult public func wait() async -> [Sendable] { - try? await Task.sleep(for: .milliseconds(5)) + try? await Task.sleep(for: .milliseconds(50)) let value = storage.value From 115d38c6bd64ac4198552a7f06840f6d1088b8ee Mon Sep 17 00:00:00 2001 From: MacOMNI <414294494@qq.com> Date: Fri, 10 Jan 2025 11:06:21 +0800 Subject: [PATCH 17/27] update nodetest --- Node/Tests/NodeTests/NodeTests.swift | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Node/Tests/NodeTests/NodeTests.swift b/Node/Tests/NodeTests/NodeTests.swift index 122cf548..a6131bc8 100644 --- a/Node/Tests/NodeTests/NodeTests.swift +++ b/Node/Tests/NodeTests/NodeTests.swift @@ -174,7 +174,7 @@ final class NodeTests { await middleware.wait() } } - + let validator1BestHead = await validator1.dataProvider.bestHead let validator2BestHead = await validator2.dataProvider.bestHead @@ -217,7 +217,7 @@ final class NodeTests { #expect(node1.network.peersCount == 19) #expect(node2.network.peersCount == 19) // Advance time to produce blocks - for _ in 0..<20 { + for _ in 0..<50 { await scheduler.advance( by: TimeInterval(validator1.blockchain.config.value.slotPeriodSeconds)) await validator1StoreMiddlware.wait() From 0f9136d0dc6702f04777d2bdbf5e24a7703af3a6 Mon Sep 17 00:00:00 2001 From: MacOMNI <414294494@qq.com> Date: Fri, 10 Jan 2025 11:11:03 +0800 Subject: [PATCH 18/27] update test --- Node/Tests/NodeTests/NodeTests.swift | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Node/Tests/NodeTests/NodeTests.swift b/Node/Tests/NodeTests/NodeTests.swift index a6131bc8..d3793ad7 100644 --- a/Node/Tests/NodeTests/NodeTests.swift +++ b/Node/Tests/NodeTests/NodeTests.swift @@ -217,7 +217,7 @@ final class NodeTests { #expect(node1.network.peersCount == 19) #expect(node2.network.peersCount == 19) // Advance time to produce blocks - for _ in 0..<50 { + for _ in 0..<10 { await scheduler.advance( by: TimeInterval(validator1.blockchain.config.value.slotPeriodSeconds)) await validator1StoreMiddlware.wait() @@ -227,7 +227,7 @@ final class NodeTests { await middleware.wait() } } - + try await Task.sleep(for: .milliseconds(nodes.count * 200)) let validator1BestHead = await validator1.dataProvider.bestHead let validator2BestHead = await validator2.dataProvider.bestHead From f0ab6985ec82050ba72aa38506e9ce20082644b3 Mon Sep 17 00:00:00 2001 From: MacOMNI <414294494@qq.com> Date: Fri, 10 Jan 2025 17:53:04 +0800 Subject: [PATCH 19/27] update tests --- .../BlockchainDataProvider.swift | 2 -- Networking/Sources/Networking/Connection.swift | 10 ++++++---- Node/Sources/Node/NetworkingProtocol/Network.swift | 2 +- Node/Sources/Node/NetworkingProtocol/SyncManager.swift | 2 +- Node/Tests/NodeTests/NodeTests.swift | 5 ++--- 5 files changed, 10 insertions(+), 11 deletions(-) diff --git a/Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProvider.swift b/Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProvider.swift index 611f38d1..3ad95efb 100644 --- a/Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProvider.swift +++ b/Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProvider.swift @@ -27,8 +27,6 @@ public actor BlockchainDataProvider: Sendable { if header.value.timeslot > bestHead.timeslot { let number = try await dataProvider.getBlockNumber(hash: head).unwrap() bestHead = HeadInfo(hash: head, timeslot: header.value.timeslot, number: number) - } else { - logger.warning("Found a block with timeslot \(header.value.timeslot) but best head is \(bestHead.timeslot)") } } diff --git a/Networking/Sources/Networking/Connection.swift b/Networking/Sources/Networking/Connection.swift index b02380ac..6b4adc47 100644 --- a/Networking/Sources/Networking/Connection.swift +++ b/Networking/Sources/Networking/Connection.swift @@ -280,7 +280,7 @@ public final class Connection: Sendable, ConnectionInfoP let resp = try await impl.ephemeralStreamHandler.handle(connection: self, request: request) try stream.send(message: resp, finish: true) } catch { - logger.debug("Failed to handle request", metadata: ["error": "\(error)"]) + logger.info("Failed to handle request", metadata: ["error": "\(error)"]) stream.close(abort: true) } } @@ -318,7 +318,7 @@ private func receiveMaybeData(stream: Stream) async throws - // TODO: pick better value guard length < 1024 * 1024 * 10 else { stream.close(abort: true) - logger.debug("Invalid request length: \(length)") + logger.info("Invalid request length: \(length)") // TODO: report bad peer throw ConnectionError.invalidLength } @@ -336,7 +336,7 @@ func presistentStreamRunLoop( do { try await handler.streamOpened(connection: connection, stream: stream, kind: kind) } catch { - logger.debug( + logger.error( "Failed to setup presistent stream", metadata: ["connectionId": "\(connection.id)", "streamId": "\(stream.id)", "kind": "\(kind)", "error": "\(error)"] ) @@ -348,11 +348,13 @@ func presistentStreamRunLoop( var decoder = handler.createDecoder(kind: kind) do { while let data = try await receiveMaybeData(stream: stream) { + logger.debug("receiveMaybeData length: \(data.count) from \(connection.id)") let msg = try decoder.decode(data: data) + logger.debug("handling message: \(msg) from \(connection.id)") try await handler.handle(connection: connection, message: msg) } } catch { - logger.debug("UP stream run loop failed: \(error)") + logger.error("UP stream run loop failed: \(error) from \(connection.id)") stream.close(abort: true) } diff --git a/Node/Sources/Node/NetworkingProtocol/Network.swift b/Node/Sources/Node/NetworkingProtocol/Network.swift index 31001360..ee3880fc 100644 --- a/Node/Sources/Node/NetworkingProtocol/Network.swift +++ b/Node/Sources/Node/NetworkingProtocol/Network.swift @@ -129,7 +129,7 @@ struct PresistentStreamHandlerImpl: PresistentStreamHandler { } func handle(connection: any ConnectionInfoProtocol, message: Message) async throws { - impl.logger.trace("handling message: \(message) from \(connection.id)") + impl.logger.debug("handling message: \(message) from \(connection.id)") try await impl.handler.handle(connection: connection, upMessage: message) } diff --git a/Node/Sources/Node/NetworkingProtocol/SyncManager.swift b/Node/Sources/Node/NetworkingProtocol/SyncManager.swift index 7c805920..7aa30985 100644 --- a/Node/Sources/Node/NetworkingProtocol/SyncManager.swift +++ b/Node/Sources/Node/NetworkingProtocol/SyncManager.swift @@ -139,7 +139,7 @@ public actor SyncManager: Sendable { status = .syncing syncContinuation.forEach { $0.resume() } syncContinuation = [] - logger.info("sync completed") + logger.debug("sync completed") } } diff --git a/Node/Tests/NodeTests/NodeTests.swift b/Node/Tests/NodeTests/NodeTests.swift index d3793ad7..001f047d 100644 --- a/Node/Tests/NodeTests/NodeTests.swift +++ b/Node/Tests/NodeTests/NodeTests.swift @@ -174,7 +174,6 @@ final class NodeTests { await middleware.wait() } } - let validator1BestHead = await validator1.dataProvider.bestHead let validator2BestHead = await validator2.dataProvider.bestHead @@ -217,7 +216,7 @@ final class NodeTests { #expect(node1.network.peersCount == 19) #expect(node2.network.peersCount == 19) // Advance time to produce blocks - for _ in 0..<10 { + for _ in 0..<30 { await scheduler.advance( by: TimeInterval(validator1.blockchain.config.value.slotPeriodSeconds)) await validator1StoreMiddlware.wait() @@ -227,7 +226,7 @@ final class NodeTests { await middleware.wait() } } - try await Task.sleep(for: .milliseconds(nodes.count * 200)) + try await Task.sleep(for: .milliseconds(nodes.count * 100)) let validator1BestHead = await validator1.dataProvider.bestHead let validator2BestHead = await validator2.dataProvider.bestHead From 3af4f9e90ac9be85301758a9417d58969b0cc999 Mon Sep 17 00:00:00 2001 From: MacOMNI <414294494@qq.com> Date: Mon, 13 Jan 2025 14:26:04 +0800 Subject: [PATCH 20/27] update node test --- .../Blockchain/Types/EpochMarker.swift | 12 +++++++ .../Sources/Blockchain/Types/Header.swift | 4 +-- Node/Tests/NodeTests/NodeTests.swift | 31 +++++++++---------- 3 files changed, 29 insertions(+), 18 deletions(-) diff --git a/Blockchain/Sources/Blockchain/Types/EpochMarker.swift b/Blockchain/Sources/Blockchain/Types/EpochMarker.swift index f7c5a700..5f63a00d 100644 --- a/Blockchain/Sources/Blockchain/Types/EpochMarker.swift +++ b/Blockchain/Sources/Blockchain/Types/EpochMarker.swift @@ -21,3 +21,15 @@ public struct EpochMarker: Sendable, Equatable, Codable { self.validators = validators } } + +extension EpochMarker: Dummy { + public typealias Config = ProtocolConfigRef + + public static func dummy(config: Config) -> EpochMarker { + EpochMarker( + entropy: Data32(), + ticketsEntropy: Data32(), + validators: try! ConfigFixedSizeArray(config: config, defaultValue: Data32()) + ) + } +} diff --git a/Blockchain/Sources/Blockchain/Types/Header.swift b/Blockchain/Sources/Blockchain/Types/Header.swift index 8c742810..696fdf1c 100644 --- a/Blockchain/Sources/Blockchain/Types/Header.swift +++ b/Blockchain/Sources/Blockchain/Types/Header.swift @@ -168,13 +168,13 @@ extension HeaderRef: Codable { extension Header.Unsigned: Dummy { public typealias Config = ProtocolConfigRef - public static func dummy(config _: Config) -> Header.Unsigned { + public static func dummy(config: Config) -> Header.Unsigned { Header.Unsigned( parentHash: Data32(), priorStateRoot: Data32(), extrinsicsHash: Data32(), timeslot: 0, - epoch: nil, + epoch: EpochMarker.dummy(config: config), winningTickets: nil, offendersMarkers: [], authorIndex: 0, diff --git a/Node/Tests/NodeTests/NodeTests.swift b/Node/Tests/NodeTests/NodeTests.swift index 001f047d..c0665947 100644 --- a/Node/Tests/NodeTests/NodeTests.swift +++ b/Node/Tests/NodeTests/NodeTests.swift @@ -6,16 +6,14 @@ import Utils @testable import Node final class NodeTests { - var dataBaseIndex: Int = 0 let path = { let tmpDir = FileManager.default.temporaryDirectory return tmpDir.appendingPathComponent("\(UUID().uuidString)") }() - func getDatabase() -> Database { - dataBaseIndex += 1 - return Database.rocksDB(path: path.appendingPathComponent("\(dataBaseIndex)")) + func getDatabase(_ index: Int) -> Database { + return Database.rocksDB(path: path.appendingPathComponent("\(index)")) } deinit { @@ -54,7 +52,7 @@ final class NodeTests { @Test func validatorNodeRocksDB() async throws { let (nodes, scheduler) = try await Topology( - nodes: [NodeDescription(isValidator: true, database: getDatabase())] + nodes: [NodeDescription(isValidator: true, database: getDatabase(0))] ).build(genesis: .preset(.minimal)) let (validatorNode, storeMiddlware) = nodes[0] @@ -86,8 +84,8 @@ final class NodeTests { // Create validator and full node let (nodes, scheduler) = try await Topology( nodes: [ - NodeDescription(isValidator: true, database: getDatabase()), - NodeDescription(devSeed: 1, database: getDatabase()), + NodeDescription(isValidator: true, database: getDatabase(0)), + NodeDescription(devSeed: 1, database: getDatabase(1)), ], connections: [(0, 1)] ).build(genesis: .preset(.minimal)) @@ -136,12 +134,12 @@ final class NodeTests { @Test func multiplePeers() async throws { // Create multiple nodes var nodeDescriptions: [NodeDescription] = [ - NodeDescription(isValidator: true, database: getDatabase()), - NodeDescription(isValidator: true, devSeed: 1, database: getDatabase()), + NodeDescription(isValidator: true, database: getDatabase(0)), + NodeDescription(isValidator: true, devSeed: 1, database: getDatabase(1)), ] // Add 18 non-validator nodes for i in 2...19 { - nodeDescriptions.append(NodeDescription(devSeed: UInt32(i), database: getDatabase())) + nodeDescriptions.append(NodeDescription(devSeed: UInt32(i), database: getDatabase(i))) } let (nodes, scheduler) = try await Topology( @@ -187,8 +185,8 @@ final class NodeTests { @Test func moreMultiplePeers() async throws { // Create multiple nodes var nodeDescriptions: [NodeDescription] = [ - NodeDescription(isValidator: true, database: getDatabase()), - NodeDescription(isValidator: true, devSeed: 1, database: getDatabase()), + NodeDescription(isValidator: true, database: getDatabase(0)), + NodeDescription(isValidator: true, devSeed: 1, database: getDatabase(1)), ] // Add 18 non-validator nodes @@ -198,8 +196,8 @@ final class NodeTests { let (nodes, scheduler) = try await Topology( nodes: nodeDescriptions, - connections: (0..<20).flatMap { i in - (i + 1..<20).map { j in (i, j) } // Fully connected topology + connections: (0..<2).flatMap { i in + (2..<20).map { j in (i, j) } //connected topology } ).build(genesis: .preset(.minimal)) @@ -213,8 +211,8 @@ final class NodeTests { let (node1, _) = nonValidatorNodes[0] let (node2, _) = nonValidatorNodes[1] // Verify connections for a sample of non-validator nodes - #expect(node1.network.peersCount == 19) - #expect(node2.network.peersCount == 19) + #expect(node1.network.peersCount == 2) + #expect(node2.network.peersCount == 2) // Advance time to produce blocks for _ in 0..<30 { await scheduler.advance( @@ -225,6 +223,7 @@ final class NodeTests { for (_, middleware) in nonValidatorNodes { await middleware.wait() } + try await Task.sleep(for: .milliseconds(500)) } try await Task.sleep(for: .milliseconds(nodes.count * 100)) let validator1BestHead = await validator1.dataProvider.bestHead From 261bfc465665d673877855ce5a1962fbb27f2494 Mon Sep 17 00:00:00 2001 From: MacOMNI <414294494@qq.com> Date: Mon, 13 Jan 2025 14:43:55 +0800 Subject: [PATCH 21/27] update swiftlint --- .../Blockchain/Types/EpochMarker.swift | 2 +- Node/Tests/NodeTests/NodeTests.swift | 45 ++++++++++--------- 2 files changed, 26 insertions(+), 21 deletions(-) diff --git a/Blockchain/Sources/Blockchain/Types/EpochMarker.swift b/Blockchain/Sources/Blockchain/Types/EpochMarker.swift index 5f63a00d..8ce21ab3 100644 --- a/Blockchain/Sources/Blockchain/Types/EpochMarker.swift +++ b/Blockchain/Sources/Blockchain/Types/EpochMarker.swift @@ -24,7 +24,7 @@ public struct EpochMarker: Sendable, Equatable, Codable { extension EpochMarker: Dummy { public typealias Config = ProtocolConfigRef - + public static func dummy(config: Config) -> EpochMarker { EpochMarker( entropy: Data32(), diff --git a/Node/Tests/NodeTests/NodeTests.swift b/Node/Tests/NodeTests/NodeTests.swift index c0665947..b4585bfa 100644 --- a/Node/Tests/NodeTests/NodeTests.swift +++ b/Node/Tests/NodeTests/NodeTests.swift @@ -6,14 +6,13 @@ import Utils @testable import Node final class NodeTests { - let path = { let tmpDir = FileManager.default.temporaryDirectory return tmpDir.appendingPathComponent("\(UUID().uuidString)") }() func getDatabase(_ index: Int) -> Database { - return Database.rocksDB(path: path.appendingPathComponent("\(index)")) + Database.rocksDB(path: path.appendingPathComponent("\(index)")) } deinit { @@ -32,9 +31,10 @@ final class NodeTests { let initialTimeslot = initialBestHead.timeslot // Advance time - for _ in 0..<10 { + for _ in 0 ..< 10 { await scheduler.advance( - by: TimeInterval(validatorNode.blockchain.config.value.slotPeriodSeconds)) + by: TimeInterval(validatorNode.blockchain.config.value.slotPeriodSeconds) + ) await storeMiddlware.wait() } @@ -62,9 +62,10 @@ final class NodeTests { let initialTimeslot = initialBestHead.timeslot // Advance time - for _ in 0..<10 { + for _ in 0 ..< 10 { await scheduler.advance( - by: TimeInterval(validatorNode.blockchain.config.value.slotPeriodSeconds)) + by: TimeInterval(validatorNode.blockchain.config.value.slotPeriodSeconds) + ) await storeMiddlware.wait() } @@ -94,9 +95,10 @@ final class NodeTests { let (node, nodeStoreMiddlware) = nodes[1] // Advance time to produce blocks - for _ in 0..<10 { + for _ in 0 ..< 10 { await scheduler.advance( - by: TimeInterval(validatorNode.blockchain.config.value.slotPeriodSeconds)) + by: TimeInterval(validatorNode.blockchain.config.value.slotPeriodSeconds) + ) await validatorStoreMiddlware.wait() await nodeStoreMiddlware.wait() } @@ -111,9 +113,10 @@ final class NodeTests { #expect(validatorBestHead.hash == nodeBestHead.hash) // Produce more blocks - for _ in 0..<10 { + for _ in 0 ..< 10 { await scheduler.advance( - by: TimeInterval(validatorNode.blockchain.config.value.slotPeriodSeconds)) + by: TimeInterval(validatorNode.blockchain.config.value.slotPeriodSeconds) + ) await validatorStoreMiddlware.wait() await nodeStoreMiddlware.wait() } @@ -138,14 +141,14 @@ final class NodeTests { NodeDescription(isValidator: true, devSeed: 1, database: getDatabase(1)), ] // Add 18 non-validator nodes - for i in 2...19 { + for i in 2 ... 19 { nodeDescriptions.append(NodeDescription(devSeed: UInt32(i), database: getDatabase(i))) } let (nodes, scheduler) = try await Topology( nodes: nodeDescriptions, - connections: (0..<20).flatMap { i in - (i + 1..<20).map { j in (i, j) } // Fully connected topology + connections: (0 ..< 20).flatMap { i in + (i + 1 ..< 20).map { j in (i, j) } // Fully connected topology } ).build(genesis: .preset(.minimal)) @@ -162,9 +165,10 @@ final class NodeTests { #expect(node1.network.peersCount == 19) #expect(node2.network.peersCount == 19) // Advance time to produce blocks - for _ in 0..<20 { + for _ in 0 ..< 20 { await scheduler.advance( - by: TimeInterval(validator1.blockchain.config.value.slotPeriodSeconds)) + by: TimeInterval(validator1.blockchain.config.value.slotPeriodSeconds) + ) await validator1StoreMiddlware.wait() await validator2StoreMiddlware.wait() @@ -190,14 +194,14 @@ final class NodeTests { ] // Add 18 non-validator nodes - for i in 2...19 { + for i in 2 ... 19 { nodeDescriptions.append(NodeDescription(devSeed: UInt32(i), database: .inMemory)) } let (nodes, scheduler) = try await Topology( nodes: nodeDescriptions, - connections: (0..<2).flatMap { i in - (2..<20).map { j in (i, j) } //connected topology + connections: (0 ..< 2).flatMap { i in + (2 ..< 20).map { j in (i, j) } // connected topology } ).build(genesis: .preset(.minimal)) @@ -214,9 +218,10 @@ final class NodeTests { #expect(node1.network.peersCount == 2) #expect(node2.network.peersCount == 2) // Advance time to produce blocks - for _ in 0..<30 { + for _ in 0 ..< 30 { await scheduler.advance( - by: TimeInterval(validator1.blockchain.config.value.slotPeriodSeconds)) + by: TimeInterval(validator1.blockchain.config.value.slotPeriodSeconds) + ) await validator1StoreMiddlware.wait() await validator2StoreMiddlware.wait() From e116f60285b6ab7fd703d10fb381d86d18f9d066 Mon Sep 17 00:00:00 2001 From: MacOMNI <414294494@qq.com> Date: Mon, 13 Jan 2025 14:48:47 +0800 Subject: [PATCH 22/27] update swiftlint --- .../BlockchainDataProvider/BlockchainDataProvider.swift | 2 +- .../BlockchainDataProvider/InMemoryDataProvider.swift | 2 +- Blockchain/Sources/Blockchain/State/StateTrie.swift | 2 +- Node/Sources/Node/NetworkingProtocol/PeerManager.swift | 2 +- Node/Sources/Node/NetworkingProtocol/SyncManager.swift | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProvider.swift b/Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProvider.swift index 3ad95efb..3da0693b 100644 --- a/Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProvider.swift +++ b/Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProvider.swift @@ -14,7 +14,7 @@ public enum BlockchainDataProviderError: Error, Equatable { case uncanonical(hash: Data32) } -public actor BlockchainDataProvider: Sendable { +public actor BlockchainDataProvider { public private(set) var bestHead: HeadInfo public private(set) var finalizedHead: HeadInfo private let dataProvider: BlockchainDataProviderProtocol diff --git a/Blockchain/Sources/Blockchain/BlockchainDataProvider/InMemoryDataProvider.swift b/Blockchain/Sources/Blockchain/BlockchainDataProvider/InMemoryDataProvider.swift index 5ddd0dc9..93d66c7c 100644 --- a/Blockchain/Sources/Blockchain/BlockchainDataProvider/InMemoryDataProvider.swift +++ b/Blockchain/Sources/Blockchain/BlockchainDataProvider/InMemoryDataProvider.swift @@ -1,6 +1,6 @@ import Utils -public actor InMemoryDataProvider: Sendable { +public actor InMemoryDataProvider { public private(set) var heads: Set public private(set) var finalizedHead: Data32 diff --git a/Blockchain/Sources/Blockchain/State/StateTrie.swift b/Blockchain/Sources/Blockchain/State/StateTrie.swift index 7c7dffb5..627461b5 100644 --- a/Blockchain/Sources/Blockchain/State/StateTrie.swift +++ b/Blockchain/Sources/Blockchain/State/StateTrie.swift @@ -95,7 +95,7 @@ public enum StateTrieError: Error { case invalidParent } -public actor StateTrie: Sendable { +public actor StateTrie { private let backend: StateBackendProtocol public private(set) var rootHash: Data32 private var nodes: [Data: TrieNode] = [:] diff --git a/Node/Sources/Node/NetworkingProtocol/PeerManager.swift b/Node/Sources/Node/NetworkingProtocol/PeerManager.swift index f65462b6..afb4a6c8 100644 --- a/Node/Sources/Node/NetworkingProtocol/PeerManager.swift +++ b/Node/Sources/Node/NetworkingProtocol/PeerManager.swift @@ -20,7 +20,7 @@ public struct PeerInfo: Sendable { // - distinguish between connect peers and offline peers // - peer reputation // - purge offline peers -public actor PeerManager: Sendable { +public actor PeerManager { private let eventBus: EventBus public private(set) var peers: [Data: PeerInfo] = [:] diff --git a/Node/Sources/Node/NetworkingProtocol/SyncManager.swift b/Node/Sources/Node/NetworkingProtocol/SyncManager.swift index 7aa30985..305fa38b 100644 --- a/Node/Sources/Node/NetworkingProtocol/SyncManager.swift +++ b/Node/Sources/Node/NetworkingProtocol/SyncManager.swift @@ -20,7 +20,7 @@ enum SyncStatus { // - sync peer rotation // - fast sync mode (no verification) // - re-enter to bulk sync mode if new peer with better head is discovered -public actor SyncManager: Sendable { +public actor SyncManager { private let blockchain: Blockchain private let network: Network private let peerManager: PeerManager From 5361558b167e3215b4080c10bef2115988a64489 Mon Sep 17 00:00:00 2001 From: MacOMNI <414294494@qq.com> Date: Tue, 14 Jan 2025 17:23:10 +0800 Subject: [PATCH 23/27] add some logger --- .../Sources/MsQuicSwift/QuicStream.swift | 3 +- .../Sources/Networking/Connection.swift | 20 ++++-- Networking/Sources/Networking/Peer.swift | 4 +- .../BlockAnnouncementDecoderTests.swift | 66 +++++++++++++++++++ Node/Tests/NodeTests/NodeTests.swift | 2 +- Node/Tests/NodeTests/Topology.swift | 4 ++ 6 files changed, 88 insertions(+), 11 deletions(-) create mode 100644 Node/Tests/NodeTests/BlockAnnouncementDecoderTests.swift diff --git a/Networking/Sources/MsQuicSwift/QuicStream.swift b/Networking/Sources/MsQuicSwift/QuicStream.swift index 8f094f88..50a62012 100644 --- a/Networking/Sources/MsQuicSwift/QuicStream.swift +++ b/Networking/Sources/MsQuicSwift/QuicStream.swift @@ -94,8 +94,7 @@ public final class QuicStream: Sendable { } public func send(data: Data, start: Bool = false, finish: Bool = false) throws { - logger.debug("Sending \(data.count) bytes") - + logger.debug("Stream \(id) sending \(data.count) bytes data \(data.toHexString())") try storage.read { storage in guard let storage, let api = storage.connection.api else { throw QuicError.alreadyClosed diff --git a/Networking/Sources/Networking/Connection.swift b/Networking/Sources/Networking/Connection.swift index 6b4adc47..5934c6b2 100644 --- a/Networking/Sources/Networking/Connection.swift +++ b/Networking/Sources/Networking/Connection.swift @@ -234,7 +234,7 @@ public final class Connection: Sendable, ConnectionInfoP impl.addStream(stream) Task { guard let byte = await stream.receiveByte() else { - logger.debug("stream closed without receiving kind. status: \(stream.status)") + logger.info("stream closed without receiving kind. status: \(stream.status)") return } if let upKind = Handler.PresistentHandler.StreamKind(rawValue: byte) { @@ -246,14 +246,14 @@ public final class Connection: Sendable, ConnectionInfoP if existingStream.stream.id < stream.stream.id { // The new stream has a higher ID, so reset the existing one existingStream.close(abort: false) - logger.debug( + logger.info( "Reset older UP stream with lower ID", metadata: ["existingStreamId": "\(existingStream.stream.id)", "newStreamId": "\(stream.stream.id)"] ) } else { // The existing stream has a higher ID or is equal, so reset the new one stream.close(abort: false) - logger.debug( + logger.info( "Duplicate UP stream detected, closing new stream with lower or equal ID", metadata: ["existingStreamId": "\(existingStream.stream.id)", "newStreamId": "\(stream.stream.id)"] ) @@ -278,9 +278,13 @@ public final class Connection: Sendable, ConnectionInfoP let data = try await receiveData(stream: stream) let request = try decoder.decode(data: data) let resp = try await impl.ephemeralStreamHandler.handle(connection: self, request: request) +// logger +// .info( +// "sending addr \(remoteAddress.description) request data \(resp.toHexString()) with \(resp.count) bytes " +// ) try stream.send(message: resp, finish: true) } catch { - logger.info("Failed to handle request", metadata: ["error": "\(error)"]) + logger.error("Failed to handle request", metadata: ["error": "\(error)"]) stream.close(abort: true) } } @@ -348,13 +352,15 @@ func presistentStreamRunLoop( var decoder = handler.createDecoder(kind: kind) do { while let data = try await receiveMaybeData(stream: stream) { - logger.debug("receiveMaybeData length: \(data.count) from \(connection.id)") + logger + .debug( + "receiveMaybeData \(data.count) length from \(connection.id) stream \(stream.id) data \(String(describing: data.toHexString()))" + ) let msg = try decoder.decode(data: data) - logger.debug("handling message: \(msg) from \(connection.id)") try await handler.handle(connection: connection, message: msg) } } catch { - logger.error("UP stream run loop failed: \(error) from \(connection.id)") + logger.error("UP stream run loop failed: \(error) from \(connection.remoteAddress) \(connection.id) \(stream.id)") stream.close(abort: true) } diff --git a/Networking/Sources/Networking/Peer.swift b/Networking/Sources/Networking/Peer.swift index 5ca00014..0e880276 100644 --- a/Networking/Sources/Networking/Peer.swift +++ b/Networking/Sources/Networking/Peer.swift @@ -196,6 +196,7 @@ public final class Peer: Sendable { metadata: [ "connectionId": "\(connection.id)", "kind": "\(kind)", + "message": "\(messageData)", "error": "\(error)", ] ) @@ -298,7 +299,7 @@ final class PeerImpl: Sendable { var state = reconnectStates.read { reconnectStates in reconnectStates[address] ?? .init() } - + logger.info("reconnecting to \(address) \(state.attempt) attempts") guard state.attempt < maxRetryAttempts else { logger.warning("reconnecting to \(address) exceeded max attempts") return @@ -338,6 +339,7 @@ final class PeerImpl: Sendable { states[connection.id] ?? .init() } + logger.info("Reopen attempt for stream \(kind) on connection \(connection.id) \(state.attempt) attempts") guard state.attempt < maxRetryAttempts else { logger.warning("Reopen attempt for stream \(kind) on connection \(connection.id) exceeded max attempts") return diff --git a/Node/Tests/NodeTests/BlockAnnouncementDecoderTests.swift b/Node/Tests/NodeTests/BlockAnnouncementDecoderTests.swift new file mode 100644 index 00000000..e63419c9 --- /dev/null +++ b/Node/Tests/NodeTests/BlockAnnouncementDecoderTests.swift @@ -0,0 +1,66 @@ +import Blockchain +import Codec +import Foundation +import Testing +import Utils + +@testable import Node + +final class BlockAnnouncementDecoderTests { + @Test + func decodeInvalidEpoch() throws { + let hexString = """ + ed0100007d810035df13056b2e28c4c331a7a53094e97a2b8bceff223ecd34b6cffd2d9ab69998d70\ + 574f759fc057a99604de71d302cb16d365985a180bd8f3387d2d736189d15af832dfe4f67744008b6\ + 2c334b569fcbb4c261e0f065655697306ca252ff000000010000000000000000000000000000000000\ + 0000000000000000000000000000000000000000000000000000000000000000000000000000000000\ + 5e465beb01dbafe160ce8216047f2155dd0569f058afd52dcea601025a8d161d2c5da3a09d66a5d43\ + e7d523e6108736db99d2c2f08fbdcb72a4e8e5aced3482a8552b36000b454fdf6b5418e22ef5d6609\ + e8fc6b816822f02727e085c514d560000000004fbacc2baea15e2e69185623e37a51ee9372ebd80dd\ + 405a34d24a0a40f79e1d92d49a247d13acca8ccaf7cb6d3eb9ef10b3ef29a93e01e9ddce0a4266c4a\ + 2c0e96a3b8c26c8ac6c9063ed7dcdb18479736c7c7be7fbfd006b4cb4b44ffa948154fbacc2baea15\ + e2e69185623e37a51ee9372ebd80dd405a34d24a0a40f79e1d993ccc91f31f5d8657ef98d203ddcc7\ + 38482fe2caaa41f51d983239ac0dbbba04ca820ff3eb8d2ab3b9e1421f7955d876776b0c293f2e31e\ + aa2da18c3b580f5067d810035df13056b2e28c4c331a7a53094e97a2b8bceff223ecd34b6cffd2d9a + """ + let hex = hexString.replacingOccurrences(of: "\\s+", with: "", options: .regularExpression) + let data = Data(fromHexString: hex)! + let config = ProtocolConfigRef.minimal + #expect(throws: DecodingError.self) { + _ = try JamDecoder.decode(BlockAnnouncement.self, from: data, withConfig: config) + } + } + + @Test + func decodeNotEnoughDataToDecode() throws { + let hexString = """ + 371134fcf189799fea21d2b9a50bd8352c7814a120617700e1f984af1cb3698fb1aed1999185b3800\ + 51235aa97f33306a9682e486b12c6016e6df19ea71f1ff6189d15af832dfe4f67744008b62c334b56\ + 9fcbb4c261e0f065655697306ca252ab00000001000000000000000000000000000000000000000000\ + 0000000000000000000000000000000000000000000000000000000000000000000000000000000000\ + 5e465beb01dbafe160ce8216047f2155dd0569f058afd52dcea601025a8d161d2c5da3a09d66a5d43\ + e7d523e6108736db99d2c2f08fbdcb72a4e8e5aced3482a8552b36000b454fdf6b5418e22ef5d6609\ + e8fc6b816822f02727e085c514d5605d069d7591ea55d9cc7adb9e8eaff66a1688d075c69fa94815e\ + f0fe9a56025699a565d486952598747cb9b3b78bb97694100a1cbf8d7af4eb2ea740b844b41d19a99\ + 09db141ee10d89f9bff13d651831cc91098bdf30c917ce89d1b8416af719000000004fbacc2baea15\ + e2e69185623e37a51ee9372ebd80dd405a34d24a0a40f79e1d92d49a247d13acca8ccaf7cb6d3eb9e\ + f10b3ef29a93e01e9ddce0a4266c4a2c0e96a3b8c26c8ac6c9063ed7dcdb18479736c7c7be7fbfd00\ + 6b4cb4b44ffa948154fbacc2baea15e2e69185623e37a51ee9372ebd80dd405a34d24a0a40f79e1d9\ + 575f1115fe3f8a903ac06a3578eeb154ef3f75d2d18fcabfafa4f14530a27f050258515937b7f7bfe\ + f6bf7aa67adf39b59057bbbe0433c9e8a057917c836f814371134fcf189799fea21d2b9a50bd8352c\ + 7814a120617700e1f984af1cb3698f00000000 + """ + let hex = hexString.replacingOccurrences(of: "\\s+", with: "", options: .regularExpression) + let data = Data(fromHexString: hex)! + let config = ProtocolConfigRef.minimal + #expect(throws: DecodingError.self) { + _ = try JamDecoder.decode(BlockAnnouncement.self, from: data, withConfig: config) + } + } + + @Test + func decodeNotAllDataWasConsumed() throws {} + + @Test + func decodeNilValue() throws {} +} diff --git a/Node/Tests/NodeTests/NodeTests.swift b/Node/Tests/NodeTests/NodeTests.swift index b4585bfa..d03a98b3 100644 --- a/Node/Tests/NodeTests/NodeTests.swift +++ b/Node/Tests/NodeTests/NodeTests.swift @@ -228,7 +228,7 @@ final class NodeTests { for (_, middleware) in nonValidatorNodes { await middleware.wait() } - try await Task.sleep(for: .milliseconds(500)) + try await Task.sleep(for: .milliseconds(100)) } try await Task.sleep(for: .milliseconds(nodes.count * 100)) let validator1BestHead = await validator1.dataProvider.bestHead diff --git a/Node/Tests/NodeTests/Topology.swift b/Node/Tests/NodeTests/Topology.swift index 7b42fb14..025e6f89 100644 --- a/Node/Tests/NodeTests/Topology.swift +++ b/Node/Tests/NodeTests/Topology.swift @@ -64,6 +64,10 @@ struct Topology { let fromNode = ret[from].0 let toNode = ret[to].0 let conn = try fromNode.network.network.connect(to: toNode.network.network.listenAddress(), role: .validator) + try print( + "connect from \(fromNode.network.network.listenAddress().description) to \(toNode.network.network.listenAddress().description)" + ) + print("connect \(conn.id) address: \(conn.remoteAddress)") try? await conn.ready() } return (ret, scheduler) From 9c138960324936cc4fbe22d6f37aa50ad2a41020 Mon Sep 17 00:00:00 2001 From: MacOMNI <414294494@qq.com> Date: Tue, 14 Jan 2025 19:30:21 +0800 Subject: [PATCH 24/27] update some logger --- Codec/Sources/Codec/JamDecoder.swift | 2 +- Networking/Sources/MsQuicSwift/QuicStream.swift | 2 +- Networking/Sources/Networking/Connection.swift | 11 ++++++----- Networking/Sources/Networking/Peer.swift | 5 ++++- 4 files changed, 12 insertions(+), 8 deletions(-) diff --git a/Codec/Sources/Codec/JamDecoder.swift b/Codec/Sources/Codec/JamDecoder.swift index 2f6339c0..41497f27 100644 --- a/Codec/Sources/Codec/JamDecoder.swift +++ b/Codec/Sources/Codec/JamDecoder.swift @@ -310,7 +310,7 @@ private struct JamKeyedDecodingContainer: KeyedDecodingContainerPr throw DecodingError.dataCorrupted( DecodingError.Context( codingPath: decoder.codingPath, - debugDescription: "Invalid boolean value: \(byte)" + debugDescription: "Decode key \(key.stringValue) with invalid boolean value: \(byte)" ) ) } diff --git a/Networking/Sources/MsQuicSwift/QuicStream.swift b/Networking/Sources/MsQuicSwift/QuicStream.swift index 50a62012..070bc50a 100644 --- a/Networking/Sources/MsQuicSwift/QuicStream.swift +++ b/Networking/Sources/MsQuicSwift/QuicStream.swift @@ -94,12 +94,12 @@ public final class QuicStream: Sendable { } public func send(data: Data, start: Bool = false, finish: Bool = false) throws { - logger.debug("Stream \(id) sending \(data.count) bytes data \(data.toHexString())") try storage.read { storage in guard let storage, let api = storage.connection.api else { throw QuicError.alreadyClosed } + logger.info("\(storage.connection.id) \(id) sending \(data.count) bytes data \(data.toHexString())") let messageLength = data.count if messageLength == 0 { diff --git a/Networking/Sources/Networking/Connection.swift b/Networking/Sources/Networking/Connection.swift index 5934c6b2..de96c2c2 100644 --- a/Networking/Sources/Networking/Connection.swift +++ b/Networking/Sources/Networking/Connection.swift @@ -350,17 +350,18 @@ func presistentStreamRunLoop( metadata: ["connectionId": "\(connection.id)", "streamId": "\(stream.id)", "kind": "\(kind)"] ) var decoder = handler.createDecoder(kind: kind) + var msg = Data() do { while let data = try await receiveMaybeData(stream: stream) { - logger - .debug( - "receiveMaybeData \(data.count) length from \(connection.id) stream \(stream.id) data \(String(describing: data.toHexString()))" - ) + msg = data let msg = try decoder.decode(data: data) try await handler.handle(connection: connection, message: msg) } } catch { - logger.error("UP stream run loop failed: \(error) from \(connection.remoteAddress) \(connection.id) \(stream.id)") + logger + .error( + "UP stream run loop failed: \(error) from \(connection.remoteAddress) \(connection.id) \(stream.id) data \(msg.toHexString())" + ) stream.close(abort: true) } diff --git a/Networking/Sources/Networking/Peer.swift b/Networking/Sources/Networking/Peer.swift index 0e880276..ef71a3fb 100644 --- a/Networking/Sources/Networking/Peer.swift +++ b/Networking/Sources/Networking/Peer.swift @@ -393,7 +393,10 @@ private struct PeerEventHandler: QuicEventHandler { ) return .code(.alpnNegFailure) } - logger.debug("new connection: \(addr) role: \(role)") + logger + .info( + "new connection: \(addr) id: \(connection.id) local addr: \(info.localAddress) remote addr: \(info.remoteAddress), role: \(role)" + ) if impl.addConnection(connection, addr: addr, role: role) { return .code(.success) } else { From b1d773731cb5154dbf0e13ba914918b4cc05e22f Mon Sep 17 00:00:00 2001 From: MacOMNI <414294494@qq.com> Date: Wed, 15 Jan 2025 14:55:20 +0800 Subject: [PATCH 25/27] update test --- .../Sources/MsQuicSwift/QuicStream.swift | 2 +- .../Sources/Networking/Connection.swift | 14 ++--- Networking/Sources/Networking/Peer.swift | 38 +++++++------ Networking/Sources/Networking/Stream.swift | 54 ++++++++++++------- .../NetworkingProtocol/NetworkManager.swift | 2 +- .../BlockAnnouncementDecoderTests.swift | 6 --- Node/Tests/NodeTests/Topology.swift | 4 -- 7 files changed, 64 insertions(+), 56 deletions(-) diff --git a/Networking/Sources/MsQuicSwift/QuicStream.swift b/Networking/Sources/MsQuicSwift/QuicStream.swift index 070bc50a..5a10cc3a 100644 --- a/Networking/Sources/MsQuicSwift/QuicStream.swift +++ b/Networking/Sources/MsQuicSwift/QuicStream.swift @@ -99,7 +99,7 @@ public final class QuicStream: Sendable { throw QuicError.alreadyClosed } - logger.info("\(storage.connection.id) \(id) sending \(data.count) bytes data \(data.toHexString())") + logger.debug("\(storage.connection.id) \(id) sending \(data.count) bytes data \(data.toHexString())") let messageLength = data.count if messageLength == 0 { diff --git a/Networking/Sources/Networking/Connection.swift b/Networking/Sources/Networking/Connection.swift index de96c2c2..18d702b1 100644 --- a/Networking/Sources/Networking/Connection.swift +++ b/Networking/Sources/Networking/Connection.swift @@ -176,7 +176,7 @@ public final class Connection: Sendable, ConnectionInfoP let data = try request.encode() let kind = request.kind let stream = try createStream(kind: kind) - try stream.send(message: data) + try await stream.send(message: data) return try await receiveData(stream: stream) } @@ -234,7 +234,7 @@ public final class Connection: Sendable, ConnectionInfoP impl.addStream(stream) Task { guard let byte = await stream.receiveByte() else { - logger.info("stream closed without receiving kind. status: \(stream.status)") + logger.warning("stream closed without receiving kind. status: \(stream.status)") return } if let upKind = Handler.PresistentHandler.StreamKind(rawValue: byte) { @@ -278,11 +278,7 @@ public final class Connection: Sendable, ConnectionInfoP let data = try await receiveData(stream: stream) let request = try decoder.decode(data: data) let resp = try await impl.ephemeralStreamHandler.handle(connection: self, request: request) -// logger -// .info( -// "sending addr \(remoteAddress.description) request data \(resp.toHexString()) with \(resp.count) bytes " -// ) - try stream.send(message: resp, finish: true) + try await stream.send(message: resp, finish: true) } catch { logger.error("Failed to handle request", metadata: ["error": "\(error)"]) stream.close(abort: true) @@ -322,7 +318,7 @@ private func receiveMaybeData(stream: Stream) async throws - // TODO: pick better value guard length < 1024 * 1024 * 10 else { stream.close(abort: true) - logger.info("Invalid request length: \(length)") + logger.error("Invalid request length: \(length)") // TODO: report bad peer throw ConnectionError.invalidLength } @@ -360,7 +356,7 @@ func presistentStreamRunLoop( } catch { logger .error( - "UP stream run loop failed: \(error) from \(connection.remoteAddress) \(connection.id) \(stream.id) data \(msg.toHexString())" + "UP stream run loop failed: \(error) remote \(connection.remoteAddress) \(connection.id) \(stream.id) kind: \(kind) data \(msg.toHexString()) bytes \(msg.count)" ) stream.close(abort: true) } diff --git a/Networking/Sources/Networking/Peer.swift b/Networking/Sources/Networking/Peer.swift index ef71a3fb..2ce4eb08 100644 --- a/Networking/Sources/Networking/Peer.swift +++ b/Networking/Sources/Networking/Peer.swift @@ -186,20 +186,24 @@ public final class Peer: Sendable { } for connection in connections { if let stream = try? connection.createPreistentStream(kind: kind) { - let res = Result(catching: { try stream.send(message: messageData) }) - switch res { - case .success: - break - case let .failure(error): - impl.logger.warning( - "Failed to send message", - metadata: [ - "connectionId": "\(connection.id)", - "kind": "\(kind)", - "message": "\(messageData)", - "error": "\(error)", - ] - ) + Task { + let res = await Result { + try await stream.send(message: messageData) + } + switch res { + case .success: + break + case let .failure(error): + impl.logger.warning( + "Failed to send message", + metadata: [ + "connectionId": "\(connection.id)", + "kind": "\(kind)", + "message": "\(messageData)", + "error": "\(error)", + ] + ) + } } } } @@ -299,7 +303,7 @@ final class PeerImpl: Sendable { var state = reconnectStates.read { reconnectStates in reconnectStates[address] ?? .init() } - logger.info("reconnecting to \(address) \(state.attempt) attempts") + logger.debug("reconnecting to \(address) \(state.attempt) attempts") guard state.attempt < maxRetryAttempts else { logger.warning("reconnecting to \(address) exceeded max attempts") return @@ -339,7 +343,7 @@ final class PeerImpl: Sendable { states[connection.id] ?? .init() } - logger.info("Reopen attempt for stream \(kind) on connection \(connection.id) \(state.attempt) attempts") + logger.debug("Reopen attempt for stream \(kind) on connection \(connection.id) \(state.attempt) attempts") guard state.attempt < maxRetryAttempts else { logger.warning("Reopen attempt for stream \(kind) on connection \(connection.id) exceeded max attempts") return @@ -395,7 +399,7 @@ private struct PeerEventHandler: QuicEventHandler { } logger .info( - "new connection: \(addr) id: \(connection.id) local addr: \(info.localAddress) remote addr: \(info.remoteAddress), role: \(role)" + "new connection: \(connection.id) local addr: \(info.localAddress) remote addr: \(info.remoteAddress), role: \(role)" ) if impl.addConnection(connection, addr: addr, role: role) { return .code(.success) diff --git a/Networking/Sources/Networking/Stream.swift b/Networking/Sources/Networking/Stream.swift index 90bf76fa..bf1839b7 100644 --- a/Networking/Sources/Networking/Stream.swift +++ b/Networking/Sources/Networking/Stream.swift @@ -27,10 +27,39 @@ public protocol StreamProtocol { var id: UniqueId { get } var status: StreamStatus { get } - func send(message: Message) throws + func send(message: Message) async throws func close(abort: Bool) } +actor StreamSender { + private let stream: QuicStream + private var status: StreamStatus + + init(stream: QuicStream, status: StreamStatus) { + self.stream = stream + self.status = status + } + + func send(message: Data, finish: Bool = false) throws { + guard status == .open || status == .sendOnly else { + throw StreamError.notOpen + } + + let length = UInt32(message.count) + var lengthData = Data(repeating: 0, count: 4) + lengthData.withUnsafeMutableBytes { ptr in + ptr.storeBytes(of: UInt32(littleEndian: length), as: UInt32.self) + } + + try stream.send(data: lengthData, finish: false) + try stream.send(data: message, finish: finish) + + if finish { + status = .receiveOnly + } + } +} + final class Stream: Sendable, StreamProtocol { typealias Message = Handler.PresistentHandler.Message @@ -41,6 +70,7 @@ final class Stream: Sendable, StreamProtocol { private let channel: Channel = .init(capacity: 100) private let nextData: Mutex = .init(nil) private let _status: ThreadSafeContainer = .init(.open) + private let sender: StreamSender let connectionId: UniqueId let kind: Handler.PresistentHandler.StreamKind? @@ -63,10 +93,11 @@ final class Stream: Sendable, StreamProtocol { self.connectionId = connectionId self.impl = impl self.kind = kind + sender = StreamSender(stream: stream, status: .open) } - public func send(message: Handler.PresistentHandler.Message) throws { - try send(message: message.encode(), finish: false) + public func send(message: Handler.PresistentHandler.Message) async throws { + try await send(message: message.encode(), finish: false) } /// send raw data @@ -91,21 +122,8 @@ final class Stream: Sendable, StreamProtocol { } // send message with length prefix - func send(message: Data, finish: Bool = false) throws { - guard canSend else { - throw StreamError.notOpen - } - - let length = UInt32(message.count) - var lengthData = Data(repeating: 0, count: 4) - lengthData.withUnsafeMutableBytes { ptr in - ptr.storeBytes(of: UInt32(littleEndian: length), as: UInt32.self) - } - try stream.send(data: lengthData, finish: false) - try stream.send(data: message, finish: finish) - if finish { - status = .receiveOnly - } + func send(message: Data, finish: Bool = false) async throws { + try await sender.send(message: message, finish: finish) } func received(data: Data?) { diff --git a/Node/Sources/Node/NetworkingProtocol/NetworkManager.swift b/Node/Sources/Node/NetworkingProtocol/NetworkManager.swift index bbc687bd..e534828c 100644 --- a/Node/Sources/Node/NetworkingProtocol/NetworkManager.swift +++ b/Node/Sources/Node/NetworkingProtocol/NetworkManager.swift @@ -273,7 +273,7 @@ struct HandlerImpl: NetworkProtocolHandler { heads: headsWithTimeslot ) - try stream.send(message: .blockAnnouncementHandshake(handshake)) + try await stream.send(message: .blockAnnouncementHandshake(handshake)) } } } diff --git a/Node/Tests/NodeTests/BlockAnnouncementDecoderTests.swift b/Node/Tests/NodeTests/BlockAnnouncementDecoderTests.swift index e63419c9..199078a8 100644 --- a/Node/Tests/NodeTests/BlockAnnouncementDecoderTests.swift +++ b/Node/Tests/NodeTests/BlockAnnouncementDecoderTests.swift @@ -57,10 +57,4 @@ final class BlockAnnouncementDecoderTests { _ = try JamDecoder.decode(BlockAnnouncement.self, from: data, withConfig: config) } } - - @Test - func decodeNotAllDataWasConsumed() throws {} - - @Test - func decodeNilValue() throws {} } diff --git a/Node/Tests/NodeTests/Topology.swift b/Node/Tests/NodeTests/Topology.swift index 025e6f89..7b42fb14 100644 --- a/Node/Tests/NodeTests/Topology.swift +++ b/Node/Tests/NodeTests/Topology.swift @@ -64,10 +64,6 @@ struct Topology { let fromNode = ret[from].0 let toNode = ret[to].0 let conn = try fromNode.network.network.connect(to: toNode.network.network.listenAddress(), role: .validator) - try print( - "connect from \(fromNode.network.network.listenAddress().description) to \(toNode.network.network.listenAddress().description)" - ) - print("connect \(conn.id) address: \(conn.remoteAddress)") try? await conn.ready() } return (ret, scheduler) From c10f836d31d90fc1799835d403df6653f86a59b4 Mon Sep 17 00:00:00 2001 From: MacOMNI <414294494@qq.com> Date: Wed, 15 Jan 2025 15:44:34 +0800 Subject: [PATCH 26/27] update swiftlint --- Networking/Sources/Networking/Connection.swift | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/Networking/Sources/Networking/Connection.swift b/Networking/Sources/Networking/Connection.swift index 18d702b1..44a3e693 100644 --- a/Networking/Sources/Networking/Connection.swift +++ b/Networking/Sources/Networking/Connection.swift @@ -338,7 +338,7 @@ func presistentStreamRunLoop( } catch { logger.error( "Failed to setup presistent stream", - metadata: ["connectionId": "\(connection.id)", "streamId": "\(stream.id)", "kind": "\(kind)", "error": "\(error)"] + metadata: ["connectionId": "\(connection.id)", "streamId": "\(stream.id)", "error": "\(error)"] ) } logger.debug( @@ -346,18 +346,13 @@ func presistentStreamRunLoop( metadata: ["connectionId": "\(connection.id)", "streamId": "\(stream.id)", "kind": "\(kind)"] ) var decoder = handler.createDecoder(kind: kind) - var msg = Data() do { while let data = try await receiveMaybeData(stream: stream) { - msg = data let msg = try decoder.decode(data: data) try await handler.handle(connection: connection, message: msg) } } catch { - logger - .error( - "UP stream run loop failed: \(error) remote \(connection.remoteAddress) \(connection.id) \(stream.id) kind: \(kind) data \(msg.toHexString()) bytes \(msg.count)" - ) + logger.error("UP stream run loop failed: \(error) \(connection.id) \(stream.id)") stream.close(abort: true) } From 74b83d0d7d18d5cfbee48926a9dc84d5a7c7a260 Mon Sep 17 00:00:00 2001 From: MacOMNI <414294494@qq.com> Date: Wed, 15 Jan 2025 16:06:59 +0800 Subject: [PATCH 27/27] update node test --- Networking/Sources/Networking/Peer.swift | 4 ---- Node/Tests/NodeTests/NodeTests.swift | 1 - Utils/Sources/Utils/EventBus/StoreMiddleware.swift | 2 +- 3 files changed, 1 insertion(+), 6 deletions(-) diff --git a/Networking/Sources/Networking/Peer.swift b/Networking/Sources/Networking/Peer.swift index 2ce4eb08..ca97e5c1 100644 --- a/Networking/Sources/Networking/Peer.swift +++ b/Networking/Sources/Networking/Peer.swift @@ -397,10 +397,6 @@ private struct PeerEventHandler: QuicEventHandler { ) return .code(.alpnNegFailure) } - logger - .info( - "new connection: \(connection.id) local addr: \(info.localAddress) remote addr: \(info.remoteAddress), role: \(role)" - ) if impl.addConnection(connection, addr: addr, role: role) { return .code(.success) } else { diff --git a/Node/Tests/NodeTests/NodeTests.swift b/Node/Tests/NodeTests/NodeTests.swift index d03a98b3..ae048af8 100644 --- a/Node/Tests/NodeTests/NodeTests.swift +++ b/Node/Tests/NodeTests/NodeTests.swift @@ -228,7 +228,6 @@ final class NodeTests { for (_, middleware) in nonValidatorNodes { await middleware.wait() } - try await Task.sleep(for: .milliseconds(100)) } try await Task.sleep(for: .milliseconds(nodes.count * 100)) let validator1BestHead = await validator1.dataProvider.bestHead diff --git a/Utils/Sources/Utils/EventBus/StoreMiddleware.swift b/Utils/Sources/Utils/EventBus/StoreMiddleware.swift index 7071dbad..cfc0cbcd 100644 --- a/Utils/Sources/Utils/EventBus/StoreMiddleware.swift +++ b/Utils/Sources/Utils/EventBus/StoreMiddleware.swift @@ -21,7 +21,7 @@ public struct StoreMiddleware: MiddlewareProtocol { @discardableResult public func wait() async -> [Sendable] { - try? await Task.sleep(for: .milliseconds(50)) + try? await Task.sleep(for: .milliseconds(5)) let value = storage.value