diff --git a/Dockerfile b/Dockerfile index 3ba93e2..9b859d6 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM docker:dind +FROM docker:20-dind # RUN sed 's/http:\/\/fr\./http:\/\//' /etc/apt/sources.list WORKDIR /root @@ -6,14 +6,14 @@ WORKDIR /root RUN apk update RUN apk add make \ bash \ - openjdk11-jre \ curl \ openssl-dev \ python3-dev \ - gmp-dev + gmp-dev \ + tar -RUN apk add --repository http://dl-cdn.alpinelinux.org/alpine/edge/testing criu-dev -RUN apk add tar +RUN apk add --repository http://dl-cdn.alpinelinux.org/alpine/edge/testing criu-dev \ + openjdk20-jre-headless ADD *.jar Node.jar ADD config.json config.json diff --git a/Start.sh b/Start.sh index 9ae8871..8a25108 100644 --- a/Start.sh +++ b/Start.sh @@ -1,7 +1,7 @@ #!/bin/bash if [ "$#" -ne 1 ]; then echo "Sleeping..." -sleep $(shuf -i 1-120 -n 1) +# sleep $(shuf -i 1-120 -n 1) fi dockerd --experimental & @@ -11,11 +11,10 @@ echo "Waiting for docker daemon to initialize!" sleep 5 done -docker load -i stress.tar - -if [ "$1" -eq 5005 ]; then - echo "We're trusted. Running stress test." - bash stress.sh & -fi +# docker load -i stress.tar +# if [ "$1" -eq 5005 ]; then +# echo "We're trusted. Running stress test." +# bash stress.sh & +# fi java -jar Node.jar $1 \ No newline at end of file diff --git a/build.gradle b/build.gradle index 895fd6e..68897e2 100644 --- a/build.gradle +++ b/build.gradle @@ -22,6 +22,8 @@ dependencies { implementation('org.jetbrains.kotlinx:kotlinx-serialization-json:1.3.0') implementation("com.influxdb:influxdb-client-kotlin:4.0.0") implementation('org.jetbrains.kotlinx:kotlinx-coroutines-core:1.5.2-native-mt') + implementation("io.javalin:javalin:5.6.1") + implementation("org.slf4j:slf4j-simple:2.0.7") } tasks.withType(Test).configureEach { diff --git a/config.json b/config.json index 80e37ab..3d032b2 100644 --- a/config.json +++ b/config.json @@ -1,20 +1,20 @@ { - "trustedNodeIP": "88.200.63.144", + "trustedNodeIP": "172.16.150.159", "trustedNodePort": 5005, + "webSocketPort": 2517, "keystorePath": ".", - "port": 5005, "maxNodes": 10000, - "slotDuration": 10000, + "slotDuration": 5000, "initialDifficulty": 10, "broadcastSpreadPercentage": 5, "committeeSize": 5, "influxUrl": "", "influxToken": "", - "dashboardEnabled": false, + "dashboardEnabled": true, "loggingEnabled": false, "trustedLoggingEnabled": false, - "historyMinuteClearance": 10, - "historyCleaningFrequency": 5, + "historyMinuteClearance": 1, + "historyCleaningFrequency": 1, "nodesPerCluster": 10, "maxIterations": 5, "packetSplitSize": 60000, diff --git a/src/main/kotlin/Configuration.kt b/src/main/kotlin/Configuration.kt index d0da081..1b10c1e 100644 --- a/src/main/kotlin/Configuration.kt +++ b/src/main/kotlin/Configuration.kt @@ -9,7 +9,8 @@ import kotlinx.serialization.Serializable data class Configuration( val trustedNodeIP: String, val trustedNodePort: Int, - val port: Int, + var port: Int? = null, + val webSocketPort: Int, val maxNodes: Int, val keystorePath: String, val slotDuration: Long, @@ -29,6 +30,4 @@ data class Configuration( val useCriu: Boolean, val useTreeBasedMessageRoutingProtocol: Boolean, val treeChildrenCount: Int -) { - var passedPort: Int = -1 -} \ No newline at end of file +) \ No newline at end of file diff --git a/src/main/kotlin/Launch.kt b/src/main/kotlin/Launch.kt index 258b3bf..4c5e0e3 100644 --- a/src/main/kotlin/Launch.kt +++ b/src/main/kotlin/Launch.kt @@ -19,8 +19,7 @@ fun main(args: Array) { Logger.toggleLogging(configuration.loggingEnabled) args.getOrNull(0)?.toInt()?.apply { - configuration.passedPort = this - println("Passed udpPort: $this...") + configuration.port = this } Nion(configuration).apply { diff --git a/src/main/kotlin/Nion.kt b/src/main/kotlin/Nion.kt index b44c25b..899ae1f 100644 --- a/src/main/kotlin/Nion.kt +++ b/src/main/kotlin/Nion.kt @@ -55,7 +55,7 @@ class Nion(configuration: Configuration) : ChainBuilder(configuration) { private fun attemptBootstrap() { if (isTrustedNode || isBootstrapped) return - Logger.info("Attempting bootstrapping.") + Logger.info("Attempting bootstrapping to ${configuration.trustedNodeIP}:${configuration.trustedNodePort}.") bootstrap(configuration.trustedNodeIP, configuration.trustedNodePort) runAfter(Random.nextLong(10000, 20000), this::attemptBootstrap) } @@ -71,7 +71,7 @@ class Nion(configuration: Configuration) : ChainBuilder(configuration) { } // TODO ------------------------------------- - val execution = endpoints[endpoint] ?: throw Exception("Endpoint $endpoint has no handler set.") + val execution = endpoints[endpoint] ?: return // throw Exception("Endpoint $endpoint has no handler set.") if (endpoint.processing == MessageProcessing.Queued) processingQueue.put { execution(message) } else launchCoroutine { execution(message) } } diff --git a/src/main/kotlin/chain/Chain.kt b/src/main/kotlin/chain/Chain.kt index e282598..2dc9433 100644 --- a/src/main/kotlin/chain/Chain.kt +++ b/src/main/kotlin/chain/Chain.kt @@ -2,6 +2,7 @@ package chain import chain.data.Block import logging.Logger +import utils.CircularList import utils.asHex import utils.tryWithLock import java.util.concurrent.locks.ReentrantLock @@ -15,7 +16,7 @@ import kotlin.concurrent.withLock class Chain(private val verifiableDelay: VerifiableDelay, private val initialDifficulty: Int, private val committeeSize: Int) { private val lock = ReentrantLock(true) - private val blocks = mutableListOf() + private val blocks = CircularList(50) // ToDo: Remove, do not use Circular List but use persistent storage! /** Returns the last block in the chain. */ fun getLastBlock(): Block? { @@ -46,8 +47,6 @@ class Chain(private val verifiableDelay: VerifiableDelay, private val initialDif lock.tryWithLock { blocks.add(nextBlock) // ToDo: Put chain history in some sort of storage instead of keeping in memory. - val lastBlocks = blocks.takeLast(50) - blocks.removeAll { !lastBlocks.contains(it) } } Logger.chain("Block[${nextBlock.votes}/$committeeSize] added [${nextBlock.slot}].") } diff --git a/src/main/kotlin/chain/ChainBuilder.kt b/src/main/kotlin/chain/ChainBuilder.kt index 7ab42b0..9d5f872 100644 --- a/src/main/kotlin/chain/ChainBuilder.kt +++ b/src/main/kotlin/chain/ChainBuilder.kt @@ -11,6 +11,7 @@ import network.data.clusters.ClusterUtils import network.data.messages.InclusionRequest import network.data.messages.Message import network.data.messages.SyncRequest +import network.rpc.Topic import utils.* import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicBoolean @@ -47,9 +48,9 @@ abstract class ChainBuilder(configuration: Configuration) : DockerProxy(configur votes.entries.removeIf { (key, _) -> key == block.hash.asHex } if (blockAdded) { - if(isTrustedNode) println("Added block\t[${block.slot}]") + if(isTrustedNode) Logger.chain("Added block\t[${block.slot}].") + sendToSubscribed(Topic.Block, block) - removeOutdatedStatistics(block.slot - 1) if (block.slot <= 2) validatorSet.inclusionChanges(block) val nextTask = validatorSet.computeNextTask(block, configuration.committeeSize) @@ -226,12 +227,15 @@ abstract class ChainBuilder(configuration: Configuration) : DockerProxy(configur validatorSet.scheduleChange(inclusionRequest.publicKey, true) // send(message) } - Logger.debug("Received inclusion request! ") + val canStartTheChain = isTrustedNode && lastBlock == null + Logger.debug("Received inclusion request! Can start the chain: $canStartTheChain") - if (isTrustedNode && lastBlock == null) { + if (canStartTheChain) { val scheduledChanges = validatorSet.getScheduledChanges().count { it.value } val isEnoughToStart = scheduledChanges > configuration.committeeSize + Logger.debug("There is enough to start: $isEnoughToStart") if (isEnoughToStart && !sentGenesis.getAndSet(true)) { + Logger.debug("Computing proof for genesis block.") val proof = verifiableDelay.computeProof(configuration.initialDifficulty, "FFFF".encodeToByteArray()) val genesisBlock = Block(1, configuration.initialDifficulty, localNode.publicKey, emptySet(), proof, System.currentTimeMillis(), byteArrayOf(), validatorSet.getScheduledChanges()) send(Endpoint.NewBlock, genesisBlock) diff --git a/src/main/kotlin/chain/ValidatorSet.kt b/src/main/kotlin/chain/ValidatorSet.kt index 4688c7f..559ee6e 100644 --- a/src/main/kotlin/chain/ValidatorSet.kt +++ b/src/main/kotlin/chain/ValidatorSet.kt @@ -29,7 +29,10 @@ class ValidatorSet(private val localNode: Node, isTrustedNode: Boolean) { init { - if (isTrustedNode) scheduledChanges[localNode.publicKey] = true + if (isTrustedNode) { + scheduledChanges[localNode.publicKey] = true + Logger.info("We're the trusted node...") + } } /** Returns shuffled validator set using passed [random]. */ diff --git a/src/main/kotlin/docker/DockerProxy.kt b/src/main/kotlin/docker/DockerProxy.kt index 6e4b07b..4f2f1ee 100644 --- a/src/main/kotlin/docker/DockerProxy.kt +++ b/src/main/kotlin/docker/DockerProxy.kt @@ -8,6 +8,7 @@ import logging.Logger import network.data.Endpoint import network.data.clusters.Cluster import network.data.messages.Message +import network.rpc.Topic import utils.CircularList import utils.runAfter import java.nio.ByteBuffer @@ -39,8 +40,10 @@ abstract class DockerProxy(configuration: Configuration) : MigrationStrategy(con * * @param slot */ - protected fun removeOutdatedStatistics(slot: Long) { - networkStatistics.remove(slot) + private fun removeOutdatedStatistics(slot: Long) { + networkStatistics.clear() + val keys = networkStatistics.keys.filter { it < slot } + for (key in keys) networkStatistics.remove(key) } /** Stores all [statistics] into latest [networkStatistics] using [networkLock]. */ @@ -65,17 +68,17 @@ abstract class DockerProxy(configuration: Configuration) : MigrationStrategy(con val currentTime = System.currentTimeMillis() localContainers.entries.removeIf { (_, container) -> currentTime - container.updated >= 1000 } - val mapped = localContainers.values.map { it.copy(id = networkMappings[it.id] ?: it.id) } + val mapped: List = localContainers.values.map { it.copy(id = networkMappings[it.id] ?: it.id) } val localStatistics = DockerStatistics(localNode.publicKey, mapped, slot) val ourPublicKey = localNode.publicKey val ourCluster = clusters[ourPublicKey] if (ourCluster != null) { val clusterRepresentative = ourCluster.centroid val isRepresentative = clusterRepresentative == ourPublicKey + removeOutdatedStatistics(slot) if (!isRepresentative) send(Endpoint.NodeStatistics, arrayOf(localStatistics), clusterRepresentative) else runAfter(configuration.slotDuration / 4) { val networkStatistics = getNetworkStatistics(slot).plus(localStatistics) - networkStatistics.forEach { Dashboard.statisticSent(localNode.publicKey, it, blockProducer, block.slot) } send(Endpoint.NodeStatistics, networkStatistics, blockProducer) } } else Dashboard.reportException(Exception("Our cluster does not exist.")) diff --git a/src/main/kotlin/logging/Dashboard.kt b/src/main/kotlin/logging/Dashboard.kt index 13f106c..d954f7d 100644 --- a/src/main/kotlin/logging/Dashboard.kt +++ b/src/main/kotlin/logging/Dashboard.kt @@ -61,6 +61,7 @@ object Dashboard { } fun reportDHTQuery(identifier: String, seekerIp: String, seeker: String, hops: Int, revives: Int, duration: Long) { + if (!configuration.dashboardEnabled) return val point = Point.measurement("dht") .addField("hops", hops) .addField("ip", seekerIp) @@ -77,6 +78,7 @@ object Dashboard { * @param statistics Docker statistics that are reported by all representers of clusters. */ fun reportStatistics(statistics: Set, slot: Long) { + if (!configuration.dashboardEnabled) return var total = 0L for (measurement in statistics) { val publicKey = sha256(measurement.publicKey).asHex @@ -151,6 +153,7 @@ object Dashboard { /** Reports that an exception was caught */ fun reportException(e: Throwable, additionalInfo: String = "") { + if (!configuration.dashboardEnabled) return Logger.reportException(e) val point = Point.measurement("exceptions") .time(Instant.now(), WritePrecision.NS) diff --git a/src/main/kotlin/logging/Logger.kt b/src/main/kotlin/logging/Logger.kt index b933de9..c2f55af 100644 --- a/src/main/kotlin/logging/Logger.kt +++ b/src/main/kotlin/logging/Logger.kt @@ -21,7 +21,6 @@ object Logger { data class Log(val type: DebugType, val log: String, val ip: String, val timestamp: Long) private var isLoggingEnabled = false - private var currentDebug: DebugType = DebugType.ALL private val timeFormatter = DateTimeFormatter.ofPattern("dd. MM | HH:mm:ss.SSS") const val red = "\u001b[31m" diff --git a/src/main/kotlin/network/SocketHolder.kt b/src/main/kotlin/network/SocketHolder.kt index d89e7e9..8dbdd71 100644 --- a/src/main/kotlin/network/SocketHolder.kt +++ b/src/main/kotlin/network/SocketHolder.kt @@ -1,6 +1,7 @@ package network import Configuration +import network.rpc.RPCManager import java.net.DatagramSocket import java.net.ServerSocket @@ -9,7 +10,7 @@ import java.net.ServerSocket * on 21/01/2022 at 10:44 * using IntelliJ IDEA */ -open class SocketHolder(config: Configuration) { +open class SocketHolder(configuration: Configuration) : RPCManager(configuration) { protected val udpSocket: DatagramSocket = DatagramSocket() protected val tcpSocket: ServerSocket = ServerSocket(0) diff --git a/src/main/kotlin/network/kademlia/Kademlia.kt b/src/main/kotlin/network/kademlia/Kademlia.kt index 33804c9..5d2546c 100644 --- a/src/main/kotlin/network/kademlia/Kademlia.kt +++ b/src/main/kotlin/network/kademlia/Kademlia.kt @@ -9,6 +9,7 @@ import logging.Dashboard import logging.Logger import network.SocketHolder import network.data.Node +import network.rpc.Topic import utils.* import java.io.ByteArrayInputStream import java.io.DataInputStream @@ -16,9 +17,12 @@ import java.net.DatagramPacket import java.net.DatagramSocket import java.net.InetSocketAddress import java.nio.ByteBuffer +import java.util.Timer import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.locks.ReentrantLock +import kotlin.concurrent.scheduleAtFixedRate +import kotlin.concurrent.withLock import kotlin.random.Random /** @@ -31,7 +35,7 @@ open class Kademlia(configuration: Configuration) : SocketHolder(configuration) val crypto = Crypto(".") - private val kademliaSocket: DatagramSocket = if (configuration.passedPort != -1) DatagramSocket(configuration.passedPort) else DatagramSocket() + private val kademliaSocket: DatagramSocket = configuration.port?.let { DatagramSocket(it) } ?: DatagramSocket() val localAddress = getLocalAddress() val localNode = Node( @@ -43,7 +47,7 @@ open class Kademlia(configuration: Configuration) : SocketHolder(configuration) publicKey = crypto.publicKey ).apply { Dashboard.myInfo = "$ip:$kademliaPort" - println(Dashboard.myInfo) + Logger.info("Kademlia listening on: ${Dashboard.myInfo}") } val isTrustedNode = localNode.let { node -> node.ip == configuration.trustedNodeIP && node.kademliaPort == configuration.trustedNodePort } @@ -58,17 +62,19 @@ open class Kademlia(configuration: Configuration) : SocketHolder(configuration) private val incomingQueue = LinkedBlockingQueue() private val queryStorage = ConcurrentHashMap() private val bucketSize = 20 - private val testLock = ReentrantLock(true) + private val storageLock = ReentrantLock(true) init { Logger.debug("Our identifier is: ${localNode.identifier}") Thread(::sendOutgoing).start() Thread(::receiveIncoming).start() Thread(::processIncoming).start() - lookForInactiveQueries() + Timer().scheduleAtFixedRate(5000, 5000) { + lookForInactiveQueries() + } if (isTrustedNode) add(localNode) - printTree() + // printTree() } /** Sends a FIND_NODE request of our key to the known bootstrapping [Node]. */ @@ -157,6 +163,7 @@ open class Kademlia(configuration: Configuration) : SocketHolder(configuration) addToQueue(sender, KademliaEndpoint.CLOSEST_NODES, encodedReply) add(sender) } + KademliaEndpoint.CLOSEST_NODES -> { val closestNodes = ProtoBuf.decodeFromByteArray(kademliaMessage.data) val receivedNodes = closestNodes.nodes @@ -176,7 +183,6 @@ open class Kademlia(configuration: Configuration) : SocketHolder(configuration) queryHolder.hops++ val node = knownNodes[queryHolder.identifier] ?: return@forEach val actionsToDo = mutableListOf<(Node) -> Unit>() - queryHolder.queue.drainTo(actionsToDo) // Logger.trace("Drained $drained actions.") launchCoroutine { @@ -247,7 +253,6 @@ open class Kademlia(configuration: Configuration) : SocketHolder(configuration) Logger.info("Reviving ${inactiveQueries.size} inactive queries.") Dashboard.reportException(Exception("Reviving ${inactiveQueries.size} inactive queries.")) } - runAfter(5000, this::lookForInactiveQueries) } /** Debugs the built kademlia tree [development purposes only]. */ diff --git a/src/main/kotlin/network/messaging/Server.kt b/src/main/kotlin/network/messaging/Server.kt index f43fb11..bf07823 100644 --- a/src/main/kotlin/network/messaging/Server.kt +++ b/src/main/kotlin/network/messaging/Server.kt @@ -16,6 +16,7 @@ import network.data.TransmissionLayer import network.data.TransmissionType import network.data.messages.Message import network.kademlia.Kademlia +import network.rpc.Topic import utils.* import java.io.ByteArrayInputStream import java.io.DataInputStream @@ -157,7 +158,7 @@ abstract class Server(val configuration: Configuration) : Kademlia(configuration tcpSocket.accept().use { socket -> val data = socket.getInputStream().readAllBytes() val message = ProtoBuf.decodeFromByteArray(data) - if (alreadySeen(message.uid.asHex)) return@tryAndReport + if (alreadySeen(message.uid.asHex)) return@use processingQueue.add(message) if (message.endpoint.transmissionType == TransmissionType.Broadcast) broadcast(TransmissionLayer.TCP, message.uid.asHex, data) } @@ -253,6 +254,7 @@ abstract class Server(val configuration: Configuration) : Kademlia(configuration val gluedData = messageBuilder.gluedData() val decoded = ProtoBuf.decodeFromByteArray(gluedData) processingQueue.add(decoded) + messageBuilders.remove(messageId) } } } diff --git a/src/main/kotlin/network/rpc/RPCManager.kt b/src/main/kotlin/network/rpc/RPCManager.kt new file mode 100644 index 0000000..60fa246 --- /dev/null +++ b/src/main/kotlin/network/rpc/RPCManager.kt @@ -0,0 +1,100 @@ +package network.rpc + +import Configuration +import io.javalin.Javalin +import io.javalin.websocket.WsCloseContext +import io.javalin.websocket.WsConnectContext +import io.javalin.websocket.WsContext +import kotlinx.serialization.Serializable +import kotlinx.serialization.encodeToString +import kotlinx.serialization.json.Json +import network.data.messages.Message +import java.lang.Exception +import java.net.http.WebSocket +import java.time.Duration +import java.util.* +import java.util.concurrent.CompletionStage +import java.util.concurrent.TimeUnit +import kotlin.collections.HashMap + + +/** + * Created by Mihael Berčič + * on 29/07/2022 at 10:44 + * using IntelliJ IDEA + */ +open class RPCManager(configuration: Configuration) { + + val subscribedClients: HashMap> = hashMapOf() + + private val webServer = Javalin.create { + it.showJavalinBanner = false + }.start(configuration.webSocketPort) + + private val pendingClients: MutableList = mutableListOf() + + private val subscribedTopics: HashMap> = hashMapOf() + + init { + webServer.ws("/") { + it.onConnect(this::onConnect) + it.onClose(this::onClose) + } + } + + /** + * On new WebSocket connection subscibe to all topics (for now, while in MVP). + * After MVP the open WebSocket will be added to [pendingClients] and moved to appropriate topic subscriptions + * once requested. + */ + private fun onConnect(webSocket: WsConnectContext) { + webSocket.enableAutomaticPings(5, TimeUnit.SECONDS) + Topic.entries.forEach { topic -> + subscribedClients + .computeIfAbsent(topic) { mutableListOf() } + .add(webSocket) + } + sendToSubscribed(Topic.Logging, "Hello, this is Nion node!") + } + + /** + * On Client disconnect, remove the context from all subscribed topics. + */ + private fun onClose(webSocket: WsCloseContext) { + removeConnection(webSocket) + } + + fun removeConnection(webSocket: WsContext) { + val subscriptions = subscribedTopics.remove(webSocket) ?: return + subscriptions.forEach { topic -> + subscribedClients[topic]?.remove(webSocket) + } + } + + /** + * Sends serialised data to all clients that are subscribed to the [topic]. + */ + inline fun sendToSubscribed(topic: Topic, data: T) { + val message = RPCMessage(topic, data) + val serialisedData = Json.encodeToString(message) + val clientList = subscribedClients[topic] ?: return + clientList.forEach { + try { + it.send(serialisedData) + } catch (e: Exception) { + removeConnection(it) + } + } + } + +} + +@Serializable +data class RPCMessage(val topic: Topic, val data: T) + +enum class Topic { + Block, + Migration, + LocalApplication, + Logging +} \ No newline at end of file diff --git a/src/main/kotlin/utils/CircularList.kt b/src/main/kotlin/utils/CircularList.kt index b6bc735..89a8ece 100644 --- a/src/main/kotlin/utils/CircularList.kt +++ b/src/main/kotlin/utils/CircularList.kt @@ -8,28 +8,15 @@ import kotlinx.serialization.Serializable * using IntelliJ IDEA */ @Serializable -class CircularList(private val maxCapacity: Int) { - - private val elements = ArrayList(maxCapacity) +class CircularList(private val maxCapacity: Int) : ArrayList(maxCapacity) { /** Adds a new element to the list and removes the oldest element.*/ - fun add(element: T) { - if (elements.size == maxCapacity) elements.removeFirst() - elements.add(element) + override fun add(element: T): Boolean { + if (size == maxCapacity) removeFirst() + return super.add(element) } /** Returns all elements in this circular list. */ - fun elements() = elements.toList() - - override fun hashCode(): Int = elements.joinToString("").hashCode() - - override fun toString(): String { - return elements.toString() - } - - override fun equals(other: Any?): Boolean { - if (other is CircularList<*>) return other.elements == elements - return super.equals(other) - } + fun elements() = toList() } \ No newline at end of file diff --git a/src/main/kotlin/utils/Utils.kt b/src/main/kotlin/utils/Utils.kt index 8e8ab62..7540286 100644 --- a/src/main/kotlin/utils/Utils.kt +++ b/src/main/kotlin/utils/Utils.kt @@ -1,9 +1,6 @@ package utils -import kotlinx.coroutines.CoroutineExceptionHandler -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.GlobalScope -import kotlinx.coroutines.launch +import kotlinx.coroutines.* import logging.Dashboard import logging.Logger import java.net.InetAddress diff --git a/src/test/kotlin/network/ClusterTest.kt b/src/test/kotlin/network/ClusterTest.kt index 3a6e26e..48ea49f 100644 --- a/src/test/kotlin/network/ClusterTest.kt +++ b/src/test/kotlin/network/ClusterTest.kt @@ -18,7 +18,7 @@ class ClusterTest { @Test fun computeClusters() { val publicKeys = (0..100).map { "${Random.nextInt(1000)}" } - val clusters = ClusterUtils.computeClusters(10, 1, publicKeys) { centroid, element -> + val clusters = ClusterUtils.computeClusters(50, 3, publicKeys) { centroid, element -> val elementBitSet = sha256(element).asHex.asBitSet val centroidBitset = sha256(centroid).asHex.asBitSet.apply { xor(elementBitSet) } centroidBitset.nextSetBit(0) @@ -28,8 +28,8 @@ class ClusterTest { val cluster = clusters[element] ?: Cluster("null") val centroid = cluster.centroid val isRepresentative = element == centroid - if (isRepresentative) println("$centroid -> PRODUCER") - println("$element -> $centroid") + if (isRepresentative) println("$centroid -> PRODUCER [color=red]") + else println("$element -> $centroid") } }