|
| 1 | +import |
| 2 | + std/[options, sequtils], |
| 3 | + asynctest, |
| 4 | + bearssl/rand, |
| 5 | + chronicles, |
| 6 | + chronos, |
| 7 | + nimcrypto, |
| 8 | + libp2p/crypto/[crypto, secp], |
| 9 | + libp2p/[multiaddress, multicodec, multihash, routing_record, signed_envelope], |
| 10 | + libp2pdht/dht, |
| 11 | + libp2pdht/discv5/crypto as dhtcrypto, |
| 12 | + libp2pdht/discv5/protocol as discv5_protocol, |
| 13 | + stew/byteutils, |
| 14 | + tests/dht/test_helper |
| 15 | + |
| 16 | +logScope: |
| 17 | + topics = "DAS emulator" |
| 18 | + |
| 19 | +proc bootstrapNodes( |
| 20 | + nodecount: int, |
| 21 | + bootnodes: seq[SignedPeerRecord], |
| 22 | + rng = newRng(), |
| 23 | + delay: int = 0 |
| 24 | + ) : Future[seq[(discv5_protocol.Protocol, PrivateKey)]] {.async.} = |
| 25 | + |
| 26 | + debug "---- STARTING BOOSTRAPS ---" |
| 27 | + for i in 0..<nodecount: |
| 28 | + try: |
| 29 | + let privKey = PrivateKey.example(rng) |
| 30 | + let node = initDiscoveryNode(rng, privKey, localAddress(20302 + i), bootnodes) |
| 31 | + await node.start() |
| 32 | + result.add((node, privKey)) |
| 33 | + if delay > 0: |
| 34 | + await sleepAsync(chronos.milliseconds(delay)) |
| 35 | + except TransportOsError as e: |
| 36 | + echo "skipping node ",i ,":", e.msg |
| 37 | + |
| 38 | + #await allFutures(result.mapIt(it.bootstrap())) # this waits for bootstrap based on bootENode, which includes bonding with all its ping pongs |
| 39 | + |
| 40 | +proc bootstrapNetwork( |
| 41 | + nodecount: int, |
| 42 | + rng = newRng(), |
| 43 | + delay: int = 0 |
| 44 | + ) : Future[seq[(discv5_protocol.Protocol, PrivateKey)]] {.async.} = |
| 45 | + |
| 46 | + let |
| 47 | + bootNodeKey = PrivateKey.fromHex( |
| 48 | + "a2b50376a79b1a8c8a3296485572bdfbf54708bb46d3c25d73d2723aaaf6a617") |
| 49 | + .expect("Valid private key hex") |
| 50 | + bootNodeAddr = localAddress(20301) |
| 51 | + bootNode = initDiscoveryNode(rng, bootNodeKey, bootNodeAddr, @[]) # just a shortcut for new and open |
| 52 | + |
| 53 | + #waitFor bootNode.bootstrap() # immediate, since no bootnodes are defined above |
| 54 | + |
| 55 | + var res = await bootstrapNodes(nodecount - 1, |
| 56 | + @[bootnode.localNode.record], |
| 57 | + rng, |
| 58 | + delay) |
| 59 | + res.insert((bootNode, bootNodeKey), 0) |
| 60 | + return res |
| 61 | + |
| 62 | +proc toNodeId(data: openArray[byte]): NodeId = |
| 63 | + readUintBE[256](keccak256.digest(data).data) |
| 64 | + |
| 65 | +proc segmentData(s: int, segmentsize: int) : seq[byte] = |
| 66 | + result = newSeq[byte](segmentsize) |
| 67 | + result[0] = byte(s mod 256) |
| 68 | + |
| 69 | +when isMainModule: |
| 70 | + proc main() {.async.} = |
| 71 | + let |
| 72 | + nodecount = 5 |
| 73 | + delay_pernode = 10 # in millisec |
| 74 | + delay_init = 2*1000 # in millisec |
| 75 | + blocksize = 16 |
| 76 | + segmentsize = 10 |
| 77 | + samplesize = 3 |
| 78 | + |
| 79 | + var |
| 80 | + rng: ref HmacDrbgContext |
| 81 | + nodes: seq[(discv5_protocol.Protocol, PrivateKey)] |
| 82 | + node0: discv5_protocol.Protocol |
| 83 | + privKey0: PrivateKey |
| 84 | + signedPeerRec0: SignedPeerRecord |
| 85 | + peerRec0: PeerRecord |
| 86 | + segmentIDs = newSeq[NodeId](blocksize) |
| 87 | + |
| 88 | + # start network |
| 89 | + rng = newRng() |
| 90 | + nodes = await bootstrapNetwork(nodecount=nodecount, delay=delay_pernode) |
| 91 | + (node0, privKey0) = nodes[0] |
| 92 | + signedPeerRec0 = privKey0.toSignedPeerRecord |
| 93 | + peerRec0 = signedPeerRec0.data |
| 94 | + |
| 95 | + # wait for network to settle |
| 96 | + await sleepAsync(chronos.milliseconds(delay_init)) |
| 97 | + |
| 98 | + # generate block and push data |
| 99 | + for s in 0 ..< blocksize: |
| 100 | + let |
| 101 | + segment = segmentData(s, segmentsize) |
| 102 | + key = toNodeId(segment) |
| 103 | + |
| 104 | + segmentIDs[s] = key |
| 105 | + |
| 106 | + let addedTo = await node0.addValue(key, segment) |
| 107 | + debug "Value added to: ", addedTo |
| 108 | + |
| 109 | + # sample |
| 110 | + for n in 1 ..< nodecount: |
| 111 | + for s in 0 ..< blocksize: |
| 112 | + let startTime = Moment.now() |
| 113 | + let res = await nodes[n][0].getValue(segmentIDs[s]) |
| 114 | + let pass = res.isOk() |
| 115 | + info "sample", pass, by = n, sample = s, time = Moment.now() - startTime |
| 116 | + |
| 117 | + waitfor main() |
| 118 | + |
| 119 | +# proc teardownAll() = |
| 120 | +# for (n, _) in nodes: # if last test is enabled, we need nodes[1..^1] here |
| 121 | +# await n.closeWait() |
| 122 | + |
| 123 | + |
0 commit comments