From b0d251fd72f9a6586300bf172388c0d91e4aec7c Mon Sep 17 00:00:00 2001 From: MacOMNI <414294494@qq.com> Date: Mon, 28 Oct 2024 18:06:28 +0800 Subject: [PATCH] more p2p test (#198) * Merge branch 'dev-p2p-test' of github.com:AcalaNetwork/boka into dev-p2p-test * update more tests * update more peer test * update peer test * update random --- .../Tests/NetworkingTests/PeerTests.swift | 227 +++++++++++++++++- 1 file changed, 221 insertions(+), 6 deletions(-) diff --git a/Networking/Tests/NetworkingTests/PeerTests.swift b/Networking/Tests/NetworkingTests/PeerTests.swift index 1161cefb..c6ddf2b1 100644 --- a/Networking/Tests/NetworkingTests/PeerTests.swift +++ b/Networking/Tests/NetworkingTests/PeerTests.swift @@ -78,9 +78,22 @@ struct PeerTests { } } + actor DataStorage { + private(set) var lastReceivedData: Data? + + func updateData(_ data: Data?) { + lastReceivedData = data + } + } + struct MockEphemeralStreamHandler: EphemeralStreamHandler { typealias StreamKind = EphemeralStreamKind typealias Request = MockRequest + private let dataStorage = DataStorage() + + var lastReceivedData: Data? { + get async { await dataStorage.lastReceivedData } + } func createDecoder(kind: StreamKind) -> any MessageDecoder { MockEphemeralMessageDecoder(kind: kind) @@ -99,12 +112,18 @@ struct PeerTests { littleEndian: lengthData.withUnsafeBytes { $0.loadUnaligned(as: UInt32.self) } ) let actualData = data.dropFirst(4).prefix(Int(length)) - + await dataStorage.updateData(actualData) return actualData } } struct MockPresentStreamHandler: PresistentStreamHandler { + private let dataStorage = DataStorage() + + var lastReceivedData: Data? { + get async { await dataStorage.lastReceivedData } + } + func streamOpened( connection _: any Networking.ConnectionInfoProtocol, stream _: any Networking.StreamProtocol>, @@ -113,8 +132,11 @@ struct PeerTests { func handle( connection _: any Networking.ConnectionInfoProtocol, - message _: PeerTests.MockRequest - ) async throws {} + message: PeerTests.MockRequest + ) async throws { + let data = message.data + await dataStorage.updateData(data) + } typealias StreamKind = UniquePresistentStreamKind typealias Request = MockRequest @@ -132,13 +154,16 @@ struct PeerTests { @Test func peerBroadcast() async throws { + let handler1 = MockPresentStreamHandler() + let handler2 = MockPresentStreamHandler() + let peer1 = try Peer( options: PeerOptions( role: .validator, listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 8081)!, genesisHeader: Data32(), secretKey: Ed25519.SecretKey(from: Data32()), - presistentStreamHandler: MockPresentStreamHandler(), + presistentStreamHandler: handler1, ephemeralStreamHandler: MockEphemeralStreamHandler(), serverSettings: .defaultSettings, clientSettings: .defaultSettings @@ -150,7 +175,7 @@ struct PeerTests { listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 8082)!, genesisHeader: Data32(), secretKey: Ed25519.SecretKey(from: Data32()), - presistentStreamHandler: MockPresentStreamHandler(), + presistentStreamHandler: handler2, ephemeralStreamHandler: MockEphemeralStreamHandler(), serverSettings: .defaultSettings, clientSettings: .defaultSettings @@ -171,7 +196,10 @@ struct PeerTests { peer2.broadcast( kind: .uniqueB, message: .init(kind: .uniqueB, data: Data("I am jam".utf8)) ) - try? await Task.sleep(for: .milliseconds(500)) + // Verify last received data + try? await Task.sleep(for: .milliseconds(1000)) + await #expect(handler2.lastReceivedData == Data("hello world".utf8)) + await #expect(handler1.lastReceivedData == Data("I am jam".utf8)) } @Test @@ -222,4 +250,191 @@ struct PeerTests { ) #expect(dataList2 == Data("I am jam".utf8)) } + + @Test + func multiplePeerBroadcast() async throws { + var peers: [Peer] = [] + // Create 100 peer nodes + for i in 0 ..< 100 { + let peer = try Peer( + options: PeerOptions( + role: .builder, + listenAddress: NetAddr(ipAddress: "127.0.0.1", port: UInt16(7081 + i))!, + genesisHeader: Data32(), + secretKey: Ed25519.SecretKey(from: Data32()), + presistentStreamHandler: MockPresentStreamHandler(), + ephemeralStreamHandler: MockEphemeralStreamHandler(), + serverSettings: .defaultSettings, + clientSettings: .defaultSettings + ) + ) + peers.append(peer) + } + + try? await Task.sleep(for: .milliseconds(100)) + + // Connect each peer to the next one in a circular network + for i in 0 ..< peers.count { + let nextPeerIndex = (i + 1) % peers.count + _ = try peers[i].connect( + to: NetAddr(ipAddress: "127.0.0.1", port: UInt16(7081 + nextPeerIndex))!, + role: .validator + ) + } + + try? await Task.sleep(for: .milliseconds(100)) + + // Broadcast a message from each peer + for (i, peer) in peers.enumerated() { + let message = MockRequest( + kind: i % 2 == 0 ? UniquePresistentStreamKind.uniqueA : UniquePresistentStreamKind.uniqueB, + data: Data("Message from peer \(i)".utf8) + ) + peer.broadcast(kind: message.kind, message: message) + } + + // Wait for message propagation + try? await Task.sleep(for: .milliseconds(200)) + } + + @Test + func multiplePeerRequest() async throws { + var peers: [Peer] = [] + + // Create 100 peer nodes + for i in 0 ..< 100 { + let peer = try Peer( + options: PeerOptions( + role: .builder, + listenAddress: NetAddr(ipAddress: "127.0.0.1", port: UInt16(6091 + i))!, + genesisHeader: Data32(), + secretKey: Ed25519.SecretKey(from: Data32()), + presistentStreamHandler: MockPresentStreamHandler(), + ephemeralStreamHandler: MockEphemeralStreamHandler(), + serverSettings: .defaultSettings, + clientSettings: .defaultSettings + ) + ) + peers.append(peer) + } + + // Wait for peers to initialize + try? await Task.sleep(for: .milliseconds(100)) + + // Test request-response by having each peer request from the next peer + for i in 0 ..< 100 { + let messageData = Data("Request from peer \(i)".utf8) + let port = UInt16(6091 + (i + 1) % 100) + let type = (i + 1) % 2 == 0 ? EphemeralStreamKind.typeA : EphemeralStreamKind.typeB + let response = try await peers[i].connect( + to: NetAddr(ipAddress: "127.0.0.1", port: port)!, + role: .validator + ).request(MockRequest(kind: type, data: messageData)) + #expect(response == messageData, "Peer \(i) should receive correct response") + } + } + + @Test + func highConcurrentRequest() async throws { + var peers: [Peer] = [] + + // Create 100 peers + for i in 0 ..< 100 { + let peer = try Peer( + options: PeerOptions( + role: .validator, + listenAddress: NetAddr(ipAddress: "127.0.0.1", port: UInt16(8300 + i))!, + genesisHeader: Data32(), + secretKey: Ed25519.SecretKey(from: Data32()), + presistentStreamHandler: MockPresentStreamHandler(), + ephemeralStreamHandler: MockEphemeralStreamHandler(), + serverSettings: .defaultSettings, + clientSettings: .defaultSettings + ) + ) + peers.append(peer) + } + + for i in 0 ..< peers.count - 1 { + _ = try peers[i].connect( + to: NetAddr(ipAddress: "127.0.0.1", port: UInt16(8300 + i + 1))!, + role: .validator + ) + } + + // Allow connections to establish + try? await Task.sleep(for: .milliseconds(100)) + + // Send multiple requests from each peer + for peer in peers { + let tasks = (1 ... 88).map { _ in + Task { + let net = try peer.listenAddress() + let random = Int.random(in: 0 ..< 100) + let type = random % 2 == 0 ? EphemeralStreamKind.typeA : EphemeralStreamKind.typeB + let messageData = Data("Concurrent request \(net.description) + \(random)".utf8) + let response = try await peer.connect( + to: net, + role: .validator + ).request(MockRequest(kind: type, data: messageData)) + #expect(response == messageData, "Peer should receive correct response") + } + } + // Wait for all tasks to complete + for task in tasks { + try await task.value + } + } + } + + @Test + func broadcastSynchronization() async throws { + var peers: [Peer] = [] + var handles: [MockPresentStreamHandler] = [] + + // Create 50 peers with unique addresses + for i in 0 ..< 50 { + let handle = MockPresentStreamHandler() + let peer = try Peer( + options: PeerOptions( + role: .validator, + listenAddress: NetAddr(ipAddress: "127.0.0.1", port: UInt16(100 + i))!, + genesisHeader: Data32(), + secretKey: Ed25519.SecretKey(from: Data32()), + presistentStreamHandler: handle, + ephemeralStreamHandler: MockEphemeralStreamHandler(), + serverSettings: .defaultSettings, + clientSettings: .defaultSettings + ) + ) + handles.append(handle) + peers.append(peer) + } + + // Connect each peer to form a fully connected network + for i in 0 ..< peers.count { + for j in 0 ..< peers.count where i != j { + _ = try peers[i].connect( + to: NetAddr(ipAddress: "127.0.0.1", port: UInt16(100 + j))!, + role: .validator + ) + } + } + + // Wait for all connections to establish + try? await Task.sleep(for: .seconds(10)) + + let centralPeer = peers[0] + let messagedata = Data("Sync message".utf8) + centralPeer.broadcast(kind: .uniqueA, message: MockRequest(kind: .uniqueA, data: messagedata)) + + // Wait for message to propagate + try? await Task.sleep(for: .seconds(60)) + + // Check that each peer received the broadcast + for i in 1 ..< handles.count { + let receivedData = await handles[i].lastReceivedData + #expect(receivedData == messagedata, "Handle should have received the broadcast message") + } + } }