From 3f7c89b1efabe036586343ec88ced346caa36eba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mihael=20Ber=C4=8Di=C4=8D?= Date: Sat, 29 Jul 2023 22:28:17 +0200 Subject: [PATCH 1/8] Implemented WebSockets for Nion Clients to retrieve latest and updated information from Nion nodes. --- build.gradle | 2 + config.json | 5 +- src/main/kotlin/Configuration.kt | 1 + src/main/kotlin/chain/ChainBuilder.kt | 4 +- src/main/kotlin/network/SocketHolder.kt | 3 +- src/main/kotlin/network/rpc/RPCManager.kt | 71 +++++++++++++++++++++++ 6 files changed, 82 insertions(+), 4 deletions(-) create mode 100644 src/main/kotlin/network/rpc/RPCManager.kt diff --git a/build.gradle b/build.gradle index 895fd6e..68897e2 100644 --- a/build.gradle +++ b/build.gradle @@ -22,6 +22,8 @@ dependencies { implementation('org.jetbrains.kotlinx:kotlinx-serialization-json:1.3.0') implementation("com.influxdb:influxdb-client-kotlin:4.0.0") implementation('org.jetbrains.kotlinx:kotlinx-coroutines-core:1.5.2-native-mt') + implementation("io.javalin:javalin:5.6.1") + implementation("org.slf4j:slf4j-simple:2.0.7") } tasks.withType(Test).configureEach { diff --git a/config.json b/config.json index 80e37ab..c5c55a8 100644 --- a/config.json +++ b/config.json @@ -1,8 +1,9 @@ { "trustedNodeIP": "88.200.63.144", - "trustedNodePort": 5005, + "trustedNodePort": 2517, "keystorePath": ".", "port": 5005, + "webSocketPort": 5010, "maxNodes": 10000, "slotDuration": 10000, "initialDifficulty": 10, @@ -11,7 +12,7 @@ "influxUrl": "", "influxToken": "", "dashboardEnabled": false, - "loggingEnabled": false, + "loggingEnabled": true, "trustedLoggingEnabled": false, "historyMinuteClearance": 10, "historyCleaningFrequency": 5, diff --git a/src/main/kotlin/Configuration.kt b/src/main/kotlin/Configuration.kt index d0da081..f6d5b47 100644 --- a/src/main/kotlin/Configuration.kt +++ b/src/main/kotlin/Configuration.kt @@ -10,6 +10,7 @@ data class Configuration( val trustedNodeIP: String, val trustedNodePort: Int, val port: Int, + val webSocketPort: Int, val maxNodes: Int, val keystorePath: String, val slotDuration: Long, diff --git a/src/main/kotlin/chain/ChainBuilder.kt b/src/main/kotlin/chain/ChainBuilder.kt index 7ab42b0..d50476b 100644 --- a/src/main/kotlin/chain/ChainBuilder.kt +++ b/src/main/kotlin/chain/ChainBuilder.kt @@ -11,6 +11,7 @@ import network.data.clusters.ClusterUtils import network.data.messages.InclusionRequest import network.data.messages.Message import network.data.messages.SyncRequest +import network.rpc.Topic import utils.* import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicBoolean @@ -47,7 +48,8 @@ abstract class ChainBuilder(configuration: Configuration) : DockerProxy(configur votes.entries.removeIf { (key, _) -> key == block.hash.asHex } if (blockAdded) { - if(isTrustedNode) println("Added block\t[${block.slot}]") + if(isTrustedNode) Logger.chain("Added block\t[${block.slot}].") + sendToSubscribed(Topic.Block, block) removeOutdatedStatistics(block.slot - 1) if (block.slot <= 2) validatorSet.inclusionChanges(block) diff --git a/src/main/kotlin/network/SocketHolder.kt b/src/main/kotlin/network/SocketHolder.kt index d89e7e9..8dbdd71 100644 --- a/src/main/kotlin/network/SocketHolder.kt +++ b/src/main/kotlin/network/SocketHolder.kt @@ -1,6 +1,7 @@ package network import Configuration +import network.rpc.RPCManager import java.net.DatagramSocket import java.net.ServerSocket @@ -9,7 +10,7 @@ import java.net.ServerSocket * on 21/01/2022 at 10:44 * using IntelliJ IDEA */ -open class SocketHolder(config: Configuration) { +open class SocketHolder(configuration: Configuration) : RPCManager(configuration) { protected val udpSocket: DatagramSocket = DatagramSocket() protected val tcpSocket: ServerSocket = ServerSocket(0) diff --git a/src/main/kotlin/network/rpc/RPCManager.kt b/src/main/kotlin/network/rpc/RPCManager.kt new file mode 100644 index 0000000..fdf05e0 --- /dev/null +++ b/src/main/kotlin/network/rpc/RPCManager.kt @@ -0,0 +1,71 @@ +package network.rpc + +import Configuration +import io.javalin.Javalin +import io.javalin.websocket.WsCloseContext +import io.javalin.websocket.WsConnectContext +import io.javalin.websocket.WsContext +import kotlinx.serialization.encodeToString +import kotlinx.serialization.json.Json +import java.net.http.WebSocket +import java.util.* +import java.util.concurrent.CompletionStage +import kotlin.collections.HashMap + +open class RPCManager(configuration: Configuration) { + + private val webServer = Javalin.create { + it.showJavalinBanner = false + }.start(configuration.webSocketPort) + + private val pendingClients: MutableList = mutableListOf() + private val subscribedClients: HashMap> = hashMapOf() + private val subscribedTopics: HashMap> = hashMapOf() + + init { + webServer.ws("/") { + it.onConnect(this::onConnect) + it.onClose(this::onClose) + } + } + + /** + * On new WebSocket connection subscibe to all topics (for now, while in MVP). + * After MVP the open WebSocket will be added to [pendingClients] and moved to appropriate topic subscriptions + * once requested. + */ + private fun onConnect(webSocket: WsConnectContext) { + webSocket.send("Hello, this is Nion node!") + Topic.entries.forEach { topic -> + subscribedClients + .computeIfAbsent(topic) { mutableListOf() } + .add(webSocket) + } + } + + /** + * On Client disconnect, remove the context from all subscribed topics. + */ + private fun onClose(webSocket: WsCloseContext) { + val subscriptions = subscribedTopics.remove(webSocket) ?: return + subscriptions.forEach { topic -> + subscribedClients[topic]?.remove(webSocket) + } + } + + /** + * Sends serialised data to all clients that are subscribed to the [topic]. + */ + fun sendToSubscribed(topic: Topic, data: Any) { + val serialisedData = Json.encodeToString(data) + val clientList = subscribedClients[topic] ?: return + clientList.forEach { it.send(serialisedData) } + } + +} + +enum class Topic { + Block, + Migration, + LocalApplication +} \ No newline at end of file From ebc33014a79190de8beefd5ee7338ac2867172ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mihael=20Ber=C4=8Di=C4=8D?= Date: Sat, 29 Jul 2023 22:53:42 +0200 Subject: [PATCH 2/8] Added documentation comments. --- src/main/kotlin/network/rpc/RPCManager.kt | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/main/kotlin/network/rpc/RPCManager.kt b/src/main/kotlin/network/rpc/RPCManager.kt index fdf05e0..565e948 100644 --- a/src/main/kotlin/network/rpc/RPCManager.kt +++ b/src/main/kotlin/network/rpc/RPCManager.kt @@ -12,6 +12,12 @@ import java.util.* import java.util.concurrent.CompletionStage import kotlin.collections.HashMap + +/** + * Created by Mihael Berčič + * on 29/07/2022 at 10:44 + * using IntelliJ IDEA + */ open class RPCManager(configuration: Configuration) { private val webServer = Javalin.create { From de7c89656804072fd19a07ff931d08393c4e39f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mihael=20Ber=C4=8Di=C4=8D?= Date: Sun, 30 Jul 2023 00:09:00 +0200 Subject: [PATCH 3/8] Fixed main node IP set in configuration file. Cleaned up code regarding UDP datagram sockets and configuration setup. --- config.json | 2 +- src/main/kotlin/Configuration.kt | 6 ++---- src/main/kotlin/Launch.kt | 3 +-- src/main/kotlin/Nion.kt | 2 +- src/main/kotlin/logging/Dashboard.kt | 1 + src/main/kotlin/network/kademlia/Kademlia.kt | 6 +++--- 6 files changed, 9 insertions(+), 11 deletions(-) diff --git a/config.json b/config.json index c5c55a8..fd01edb 100644 --- a/config.json +++ b/config.json @@ -1,5 +1,5 @@ { - "trustedNodeIP": "88.200.63.144", + "trustedNodeIP": "88.200.63.133", "trustedNodePort": 2517, "keystorePath": ".", "port": 5005, diff --git a/src/main/kotlin/Configuration.kt b/src/main/kotlin/Configuration.kt index f6d5b47..d79845b 100644 --- a/src/main/kotlin/Configuration.kt +++ b/src/main/kotlin/Configuration.kt @@ -9,7 +9,7 @@ import kotlinx.serialization.Serializable data class Configuration( val trustedNodeIP: String, val trustedNodePort: Int, - val port: Int, + var port: Int?, val webSocketPort: Int, val maxNodes: Int, val keystorePath: String, @@ -30,6 +30,4 @@ data class Configuration( val useCriu: Boolean, val useTreeBasedMessageRoutingProtocol: Boolean, val treeChildrenCount: Int -) { - var passedPort: Int = -1 -} \ No newline at end of file +) \ No newline at end of file diff --git a/src/main/kotlin/Launch.kt b/src/main/kotlin/Launch.kt index 258b3bf..4c5e0e3 100644 --- a/src/main/kotlin/Launch.kt +++ b/src/main/kotlin/Launch.kt @@ -19,8 +19,7 @@ fun main(args: Array) { Logger.toggleLogging(configuration.loggingEnabled) args.getOrNull(0)?.toInt()?.apply { - configuration.passedPort = this - println("Passed udpPort: $this...") + configuration.port = this } Nion(configuration).apply { diff --git a/src/main/kotlin/Nion.kt b/src/main/kotlin/Nion.kt index b44c25b..c921c8a 100644 --- a/src/main/kotlin/Nion.kt +++ b/src/main/kotlin/Nion.kt @@ -55,7 +55,7 @@ class Nion(configuration: Configuration) : ChainBuilder(configuration) { private fun attemptBootstrap() { if (isTrustedNode || isBootstrapped) return - Logger.info("Attempting bootstrapping.") + Logger.info("Attempting bootstrapping to ${configuration.trustedNodeIP}:${configuration.trustedNodePort}.") bootstrap(configuration.trustedNodeIP, configuration.trustedNodePort) runAfter(Random.nextLong(10000, 20000), this::attemptBootstrap) } diff --git a/src/main/kotlin/logging/Dashboard.kt b/src/main/kotlin/logging/Dashboard.kt index 13f106c..e30f5e8 100644 --- a/src/main/kotlin/logging/Dashboard.kt +++ b/src/main/kotlin/logging/Dashboard.kt @@ -151,6 +151,7 @@ object Dashboard { /** Reports that an exception was caught */ fun reportException(e: Throwable, additionalInfo: String = "") { + if (!configuration.dashboardEnabled) return Logger.reportException(e) val point = Point.measurement("exceptions") .time(Instant.now(), WritePrecision.NS) diff --git a/src/main/kotlin/network/kademlia/Kademlia.kt b/src/main/kotlin/network/kademlia/Kademlia.kt index 33804c9..e16e269 100644 --- a/src/main/kotlin/network/kademlia/Kademlia.kt +++ b/src/main/kotlin/network/kademlia/Kademlia.kt @@ -31,7 +31,7 @@ open class Kademlia(configuration: Configuration) : SocketHolder(configuration) val crypto = Crypto(".") - private val kademliaSocket: DatagramSocket = if (configuration.passedPort != -1) DatagramSocket(configuration.passedPort) else DatagramSocket() + private val kademliaSocket: DatagramSocket = configuration.port?.let { DatagramSocket(it) } ?: DatagramSocket() val localAddress = getLocalAddress() val localNode = Node( @@ -43,7 +43,7 @@ open class Kademlia(configuration: Configuration) : SocketHolder(configuration) publicKey = crypto.publicKey ).apply { Dashboard.myInfo = "$ip:$kademliaPort" - println(Dashboard.myInfo) + Logger.info("Kademlia listening on: ${Dashboard.myInfo}") } val isTrustedNode = localNode.let { node -> node.ip == configuration.trustedNodeIP && node.kademliaPort == configuration.trustedNodePort } @@ -68,7 +68,7 @@ open class Kademlia(configuration: Configuration) : SocketHolder(configuration) lookForInactiveQueries() if (isTrustedNode) add(localNode) - printTree() + // printTree() } /** Sends a FIND_NODE request of our key to the known bootstrapping [Node]. */ From 1b16feb85f6ddcc6f1ac9420024d9b66be12b76e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mihael=20Ber=C4=8Di=C4=8D?= Date: Mon, 31 Jul 2023 00:33:51 +0200 Subject: [PATCH 4/8] Modified to run latest openjdk-20-jre. --- Dockerfile | 3 ++- Start.sh | 13 ++++++------- config.json | 2 +- src/main/kotlin/logging/Logger.kt | 1 - 4 files changed, 9 insertions(+), 10 deletions(-) diff --git a/Dockerfile b/Dockerfile index 3ba93e2..6d1b436 100644 --- a/Dockerfile +++ b/Dockerfile @@ -4,9 +4,10 @@ FROM docker:dind WORKDIR /root RUN apk update +RUN apk add openjdk20-jre-headless --repository=http://dl-cdn.alpinelinux.org/alpine/edge/testing/ + RUN apk add make \ bash \ - openjdk11-jre \ curl \ openssl-dev \ python3-dev \ diff --git a/Start.sh b/Start.sh index 9ae8871..8a25108 100644 --- a/Start.sh +++ b/Start.sh @@ -1,7 +1,7 @@ #!/bin/bash if [ "$#" -ne 1 ]; then echo "Sleeping..." -sleep $(shuf -i 1-120 -n 1) +# sleep $(shuf -i 1-120 -n 1) fi dockerd --experimental & @@ -11,11 +11,10 @@ echo "Waiting for docker daemon to initialize!" sleep 5 done -docker load -i stress.tar - -if [ "$1" -eq 5005 ]; then - echo "We're trusted. Running stress test." - bash stress.sh & -fi +# docker load -i stress.tar +# if [ "$1" -eq 5005 ]; then +# echo "We're trusted. Running stress test." +# bash stress.sh & +# fi java -jar Node.jar $1 \ No newline at end of file diff --git a/config.json b/config.json index fd01edb..47a4cba 100644 --- a/config.json +++ b/config.json @@ -2,7 +2,7 @@ "trustedNodeIP": "88.200.63.133", "trustedNodePort": 2517, "keystorePath": ".", - "port": 5005, + "port": 2517, "webSocketPort": 5010, "maxNodes": 10000, "slotDuration": 10000, diff --git a/src/main/kotlin/logging/Logger.kt b/src/main/kotlin/logging/Logger.kt index b933de9..c2f55af 100644 --- a/src/main/kotlin/logging/Logger.kt +++ b/src/main/kotlin/logging/Logger.kt @@ -21,7 +21,6 @@ object Logger { data class Log(val type: DebugType, val log: String, val ip: String, val timestamp: Long) private var isLoggingEnabled = false - private var currentDebug: DebugType = DebugType.ALL private val timeFormatter = DateTimeFormatter.ofPattern("dd. MM | HH:mm:ss.SSS") const val red = "\u001b[31m" From a8900680c87e654bb512b5e958ebf0130f14e153 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mihael=20Ber=C4=8Di=C4=8D?= Date: Mon, 31 Jul 2023 20:36:49 +0200 Subject: [PATCH 5/8] Implemented live client connection and data transfer, completely functional chain. --- Dockerfile | 11 +++++------ config.json | 7 +++---- src/main/kotlin/Configuration.kt | 2 +- src/main/kotlin/chain/ChainBuilder.kt | 7 +++++-- src/main/kotlin/chain/ValidatorSet.kt | 5 ++++- src/main/kotlin/network/rpc/RPCManager.kt | 24 +++++++++++++++++++---- 6 files changed, 38 insertions(+), 18 deletions(-) diff --git a/Dockerfile b/Dockerfile index 6d1b436..9b859d6 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,20 +1,19 @@ -FROM docker:dind +FROM docker:20-dind # RUN sed 's/http:\/\/fr\./http:\/\//' /etc/apt/sources.list WORKDIR /root RUN apk update -RUN apk add openjdk20-jre-headless --repository=http://dl-cdn.alpinelinux.org/alpine/edge/testing/ - RUN apk add make \ bash \ curl \ openssl-dev \ python3-dev \ - gmp-dev + gmp-dev \ + tar -RUN apk add --repository http://dl-cdn.alpinelinux.org/alpine/edge/testing criu-dev -RUN apk add tar +RUN apk add --repository http://dl-cdn.alpinelinux.org/alpine/edge/testing criu-dev \ + openjdk20-jre-headless ADD *.jar Node.jar ADD config.json config.json diff --git a/config.json b/config.json index 47a4cba..bb25f95 100644 --- a/config.json +++ b/config.json @@ -1,9 +1,8 @@ { - "trustedNodeIP": "88.200.63.133", - "trustedNodePort": 2517, + "trustedNodeIP": "172.16.150.159", + "trustedNodePort": 5005, + "webSocketPort": 2517, "keystorePath": ".", - "port": 2517, - "webSocketPort": 5010, "maxNodes": 10000, "slotDuration": 10000, "initialDifficulty": 10, diff --git a/src/main/kotlin/Configuration.kt b/src/main/kotlin/Configuration.kt index d79845b..1b10c1e 100644 --- a/src/main/kotlin/Configuration.kt +++ b/src/main/kotlin/Configuration.kt @@ -9,7 +9,7 @@ import kotlinx.serialization.Serializable data class Configuration( val trustedNodeIP: String, val trustedNodePort: Int, - var port: Int?, + var port: Int? = null, val webSocketPort: Int, val maxNodes: Int, val keystorePath: String, diff --git a/src/main/kotlin/chain/ChainBuilder.kt b/src/main/kotlin/chain/ChainBuilder.kt index d50476b..5c6936e 100644 --- a/src/main/kotlin/chain/ChainBuilder.kt +++ b/src/main/kotlin/chain/ChainBuilder.kt @@ -228,12 +228,15 @@ abstract class ChainBuilder(configuration: Configuration) : DockerProxy(configur validatorSet.scheduleChange(inclusionRequest.publicKey, true) // send(message) } - Logger.debug("Received inclusion request! ") + val canStartTheChain = isTrustedNode && lastBlock == null + Logger.debug("Received inclusion request! Can start the chain: $canStartTheChain") - if (isTrustedNode && lastBlock == null) { + if (canStartTheChain) { val scheduledChanges = validatorSet.getScheduledChanges().count { it.value } val isEnoughToStart = scheduledChanges > configuration.committeeSize + Logger.debug("There is enough to start: $isEnoughToStart") if (isEnoughToStart && !sentGenesis.getAndSet(true)) { + Logger.debug("Computing proof for genesis block.") val proof = verifiableDelay.computeProof(configuration.initialDifficulty, "FFFF".encodeToByteArray()) val genesisBlock = Block(1, configuration.initialDifficulty, localNode.publicKey, emptySet(), proof, System.currentTimeMillis(), byteArrayOf(), validatorSet.getScheduledChanges()) send(Endpoint.NewBlock, genesisBlock) diff --git a/src/main/kotlin/chain/ValidatorSet.kt b/src/main/kotlin/chain/ValidatorSet.kt index 4688c7f..559ee6e 100644 --- a/src/main/kotlin/chain/ValidatorSet.kt +++ b/src/main/kotlin/chain/ValidatorSet.kt @@ -29,7 +29,10 @@ class ValidatorSet(private val localNode: Node, isTrustedNode: Boolean) { init { - if (isTrustedNode) scheduledChanges[localNode.publicKey] = true + if (isTrustedNode) { + scheduledChanges[localNode.publicKey] = true + Logger.info("We're the trusted node...") + } } /** Returns shuffled validator set using passed [random]. */ diff --git a/src/main/kotlin/network/rpc/RPCManager.kt b/src/main/kotlin/network/rpc/RPCManager.kt index 565e948..e91a40c 100644 --- a/src/main/kotlin/network/rpc/RPCManager.kt +++ b/src/main/kotlin/network/rpc/RPCManager.kt @@ -7,9 +7,12 @@ import io.javalin.websocket.WsConnectContext import io.javalin.websocket.WsContext import kotlinx.serialization.encodeToString import kotlinx.serialization.json.Json +import java.lang.Exception import java.net.http.WebSocket +import java.time.Duration import java.util.* import java.util.concurrent.CompletionStage +import java.util.concurrent.TimeUnit import kotlin.collections.HashMap @@ -20,12 +23,14 @@ import kotlin.collections.HashMap */ open class RPCManager(configuration: Configuration) { + val subscribedClients: HashMap> = hashMapOf() + private val webServer = Javalin.create { it.showJavalinBanner = false }.start(configuration.webSocketPort) private val pendingClients: MutableList = mutableListOf() - private val subscribedClients: HashMap> = hashMapOf() + private val subscribedTopics: HashMap> = hashMapOf() init { @@ -41,6 +46,7 @@ open class RPCManager(configuration: Configuration) { * once requested. */ private fun onConnect(webSocket: WsConnectContext) { + webSocket.enableAutomaticPings(5, TimeUnit.SECONDS) webSocket.send("Hello, this is Nion node!") Topic.entries.forEach { topic -> subscribedClients @@ -53,6 +59,10 @@ open class RPCManager(configuration: Configuration) { * On Client disconnect, remove the context from all subscribed topics. */ private fun onClose(webSocket: WsCloseContext) { + removeConnection(webSocket) + } + + fun removeConnection(webSocket: WsContext) { val subscriptions = subscribedTopics.remove(webSocket) ?: return subscriptions.forEach { topic -> subscribedClients[topic]?.remove(webSocket) @@ -62,10 +72,16 @@ open class RPCManager(configuration: Configuration) { /** * Sends serialised data to all clients that are subscribed to the [topic]. */ - fun sendToSubscribed(topic: Topic, data: Any) { - val serialisedData = Json.encodeToString(data) + inline fun sendToSubscribed(topic: Topic, data: T) { + val serialisedData = Json.encodeToString(data) val clientList = subscribedClients[topic] ?: return - clientList.forEach { it.send(serialisedData) } + clientList.forEach { + try { + it.send(serialisedData) + } catch (e:Exception) { + removeConnection(it) + } + } } } From b58b578462e4c9cd4ec29baa7f6e830f3bf84b36 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mihael=20Ber=C4=8Di=C4=8D?= Date: Mon, 31 Jul 2023 23:31:34 +0200 Subject: [PATCH 6/8] Disabled logging for testnet. --- config.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config.json b/config.json index bb25f95..6262f8d 100644 --- a/config.json +++ b/config.json @@ -11,7 +11,7 @@ "influxUrl": "", "influxToken": "", "dashboardEnabled": false, - "loggingEnabled": true, + "loggingEnabled": false, "trustedLoggingEnabled": false, "historyMinuteClearance": 10, "historyCleaningFrequency": 5, From 73f7fdedf9864b171f73635d1997df79c91418da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mihael=20Ber=C4=8Di=C4=8D?= Date: Sat, 5 Aug 2023 23:15:12 +0200 Subject: [PATCH 7/8] Looking for reasons of chain not continuing. --- config.json | 2 +- src/main/kotlin/chain/Chain.kt | 7 +++---- src/main/kotlin/chain/VerifiableDelay.kt | 1 + src/main/kotlin/utils/CircularList.kt | 23 +++++------------------ 4 files changed, 10 insertions(+), 23 deletions(-) diff --git a/config.json b/config.json index 6262f8d..62ffe75 100644 --- a/config.json +++ b/config.json @@ -4,7 +4,7 @@ "webSocketPort": 2517, "keystorePath": ".", "maxNodes": 10000, - "slotDuration": 10000, + "slotDuration": 100, "initialDifficulty": 10, "broadcastSpreadPercentage": 5, "committeeSize": 5, diff --git a/src/main/kotlin/chain/Chain.kt b/src/main/kotlin/chain/Chain.kt index e282598..0a5c69a 100644 --- a/src/main/kotlin/chain/Chain.kt +++ b/src/main/kotlin/chain/Chain.kt @@ -2,6 +2,7 @@ package chain import chain.data.Block import logging.Logger +import utils.CircularList import utils.asHex import utils.tryWithLock import java.util.concurrent.locks.ReentrantLock @@ -15,7 +16,7 @@ import kotlin.concurrent.withLock class Chain(private val verifiableDelay: VerifiableDelay, private val initialDifficulty: Int, private val committeeSize: Int) { private val lock = ReentrantLock(true) - private val blocks = mutableListOf() + private val blocks = CircularList(50) // ToDo: Remove, do not use Circular List but use persistent storage! /** Returns the last block in the chain. */ fun getLastBlock(): Block? { @@ -33,7 +34,7 @@ class Chain(private val verifiableDelay: VerifiableDelay, private val initialDif val lastBlock = getLastBlock() val lastHash = lastBlock?.hash ?: "FFFF".toByteArray() val difficulty = lastBlock?.difficulty ?: initialDifficulty - val isLegitimate = nextBlock.slot == (lastBlock?.slot ?: 0) + 1 // verifiableDelay.verifyProof(lastHash, difficulty, nextBlock.vdfProof) + val isLegitimate = verifiableDelay.verifyProof(lastHash, difficulty, nextBlock.vdfProof) if (!isLegitimate) { Logger.trace("Proof is not legitimate for block ${nextBlock.slot}!") Logger.chain("Last hash: ${lastHash.asHex}") @@ -46,8 +47,6 @@ class Chain(private val verifiableDelay: VerifiableDelay, private val initialDif lock.tryWithLock { blocks.add(nextBlock) // ToDo: Put chain history in some sort of storage instead of keeping in memory. - val lastBlocks = blocks.takeLast(50) - blocks.removeAll { !lastBlocks.contains(it) } } Logger.chain("Block[${nextBlock.votes}/$committeeSize] added [${nextBlock.slot}].") } diff --git a/src/main/kotlin/chain/VerifiableDelay.kt b/src/main/kotlin/chain/VerifiableDelay.kt index 1afbb53..548e2cb 100644 --- a/src/main/kotlin/chain/VerifiableDelay.kt +++ b/src/main/kotlin/chain/VerifiableDelay.kt @@ -11,6 +11,7 @@ class VerifiableDelay { /** Runs a vdf-cli command and returns the output of vdf computation. */ fun computeProof(difficulty: Int, hash: ByteArray): String { + return "PROOF" val hexHash = hash.asHex val needed = hexHash.length % 2 val processBuilder = ProcessBuilder() diff --git a/src/main/kotlin/utils/CircularList.kt b/src/main/kotlin/utils/CircularList.kt index b6bc735..89a8ece 100644 --- a/src/main/kotlin/utils/CircularList.kt +++ b/src/main/kotlin/utils/CircularList.kt @@ -8,28 +8,15 @@ import kotlinx.serialization.Serializable * using IntelliJ IDEA */ @Serializable -class CircularList(private val maxCapacity: Int) { - - private val elements = ArrayList(maxCapacity) +class CircularList(private val maxCapacity: Int) : ArrayList(maxCapacity) { /** Adds a new element to the list and removes the oldest element.*/ - fun add(element: T) { - if (elements.size == maxCapacity) elements.removeFirst() - elements.add(element) + override fun add(element: T): Boolean { + if (size == maxCapacity) removeFirst() + return super.add(element) } /** Returns all elements in this circular list. */ - fun elements() = elements.toList() - - override fun hashCode(): Int = elements.joinToString("").hashCode() - - override fun toString(): String { - return elements.toString() - } - - override fun equals(other: Any?): Boolean { - if (other is CircularList<*>) return other.elements == elements - return super.equals(other) - } + fun elements() = toList() } \ No newline at end of file From 3eb2e93909dd71a7e22312843e85e7bae70239d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mihael=20Ber=C4=8Di=C4=8D?= Date: Tue, 8 Aug 2023 19:50:11 +0200 Subject: [PATCH 8/8] Fixed memory leak that has been affecting us since a long time. --- config.json | 8 ++++---- src/main/kotlin/Nion.kt | 2 +- src/main/kotlin/chain/Chain.kt | 2 +- src/main/kotlin/chain/ChainBuilder.kt | 1 - src/main/kotlin/chain/VerifiableDelay.kt | 1 - src/main/kotlin/docker/DockerProxy.kt | 11 +++++++---- src/main/kotlin/logging/Dashboard.kt | 2 ++ src/main/kotlin/network/kademlia/Kademlia.kt | 13 +++++++++---- src/main/kotlin/network/messaging/Server.kt | 4 +++- src/main/kotlin/network/rpc/RPCManager.kt | 15 +++++++++++---- src/main/kotlin/utils/Utils.kt | 5 +---- src/test/kotlin/network/ClusterTest.kt | 6 +++--- 12 files changed, 42 insertions(+), 28 deletions(-) diff --git a/config.json b/config.json index 62ffe75..3d032b2 100644 --- a/config.json +++ b/config.json @@ -4,17 +4,17 @@ "webSocketPort": 2517, "keystorePath": ".", "maxNodes": 10000, - "slotDuration": 100, + "slotDuration": 5000, "initialDifficulty": 10, "broadcastSpreadPercentage": 5, "committeeSize": 5, "influxUrl": "", "influxToken": "", - "dashboardEnabled": false, + "dashboardEnabled": true, "loggingEnabled": false, "trustedLoggingEnabled": false, - "historyMinuteClearance": 10, - "historyCleaningFrequency": 5, + "historyMinuteClearance": 1, + "historyCleaningFrequency": 1, "nodesPerCluster": 10, "maxIterations": 5, "packetSplitSize": 60000, diff --git a/src/main/kotlin/Nion.kt b/src/main/kotlin/Nion.kt index c921c8a..899ae1f 100644 --- a/src/main/kotlin/Nion.kt +++ b/src/main/kotlin/Nion.kt @@ -71,7 +71,7 @@ class Nion(configuration: Configuration) : ChainBuilder(configuration) { } // TODO ------------------------------------- - val execution = endpoints[endpoint] ?: throw Exception("Endpoint $endpoint has no handler set.") + val execution = endpoints[endpoint] ?: return // throw Exception("Endpoint $endpoint has no handler set.") if (endpoint.processing == MessageProcessing.Queued) processingQueue.put { execution(message) } else launchCoroutine { execution(message) } } diff --git a/src/main/kotlin/chain/Chain.kt b/src/main/kotlin/chain/Chain.kt index 0a5c69a..2dc9433 100644 --- a/src/main/kotlin/chain/Chain.kt +++ b/src/main/kotlin/chain/Chain.kt @@ -34,7 +34,7 @@ class Chain(private val verifiableDelay: VerifiableDelay, private val initialDif val lastBlock = getLastBlock() val lastHash = lastBlock?.hash ?: "FFFF".toByteArray() val difficulty = lastBlock?.difficulty ?: initialDifficulty - val isLegitimate = verifiableDelay.verifyProof(lastHash, difficulty, nextBlock.vdfProof) + val isLegitimate = nextBlock.slot == (lastBlock?.slot ?: 0) + 1 // verifiableDelay.verifyProof(lastHash, difficulty, nextBlock.vdfProof) if (!isLegitimate) { Logger.trace("Proof is not legitimate for block ${nextBlock.slot}!") Logger.chain("Last hash: ${lastHash.asHex}") diff --git a/src/main/kotlin/chain/ChainBuilder.kt b/src/main/kotlin/chain/ChainBuilder.kt index 5c6936e..9d5f872 100644 --- a/src/main/kotlin/chain/ChainBuilder.kt +++ b/src/main/kotlin/chain/ChainBuilder.kt @@ -51,7 +51,6 @@ abstract class ChainBuilder(configuration: Configuration) : DockerProxy(configur if(isTrustedNode) Logger.chain("Added block\t[${block.slot}].") sendToSubscribed(Topic.Block, block) - removeOutdatedStatistics(block.slot - 1) if (block.slot <= 2) validatorSet.inclusionChanges(block) val nextTask = validatorSet.computeNextTask(block, configuration.committeeSize) diff --git a/src/main/kotlin/chain/VerifiableDelay.kt b/src/main/kotlin/chain/VerifiableDelay.kt index 548e2cb..1afbb53 100644 --- a/src/main/kotlin/chain/VerifiableDelay.kt +++ b/src/main/kotlin/chain/VerifiableDelay.kt @@ -11,7 +11,6 @@ class VerifiableDelay { /** Runs a vdf-cli command and returns the output of vdf computation. */ fun computeProof(difficulty: Int, hash: ByteArray): String { - return "PROOF" val hexHash = hash.asHex val needed = hexHash.length % 2 val processBuilder = ProcessBuilder() diff --git a/src/main/kotlin/docker/DockerProxy.kt b/src/main/kotlin/docker/DockerProxy.kt index 6e4b07b..4f2f1ee 100644 --- a/src/main/kotlin/docker/DockerProxy.kt +++ b/src/main/kotlin/docker/DockerProxy.kt @@ -8,6 +8,7 @@ import logging.Logger import network.data.Endpoint import network.data.clusters.Cluster import network.data.messages.Message +import network.rpc.Topic import utils.CircularList import utils.runAfter import java.nio.ByteBuffer @@ -39,8 +40,10 @@ abstract class DockerProxy(configuration: Configuration) : MigrationStrategy(con * * @param slot */ - protected fun removeOutdatedStatistics(slot: Long) { - networkStatistics.remove(slot) + private fun removeOutdatedStatistics(slot: Long) { + networkStatistics.clear() + val keys = networkStatistics.keys.filter { it < slot } + for (key in keys) networkStatistics.remove(key) } /** Stores all [statistics] into latest [networkStatistics] using [networkLock]. */ @@ -65,17 +68,17 @@ abstract class DockerProxy(configuration: Configuration) : MigrationStrategy(con val currentTime = System.currentTimeMillis() localContainers.entries.removeIf { (_, container) -> currentTime - container.updated >= 1000 } - val mapped = localContainers.values.map { it.copy(id = networkMappings[it.id] ?: it.id) } + val mapped: List = localContainers.values.map { it.copy(id = networkMappings[it.id] ?: it.id) } val localStatistics = DockerStatistics(localNode.publicKey, mapped, slot) val ourPublicKey = localNode.publicKey val ourCluster = clusters[ourPublicKey] if (ourCluster != null) { val clusterRepresentative = ourCluster.centroid val isRepresentative = clusterRepresentative == ourPublicKey + removeOutdatedStatistics(slot) if (!isRepresentative) send(Endpoint.NodeStatistics, arrayOf(localStatistics), clusterRepresentative) else runAfter(configuration.slotDuration / 4) { val networkStatistics = getNetworkStatistics(slot).plus(localStatistics) - networkStatistics.forEach { Dashboard.statisticSent(localNode.publicKey, it, blockProducer, block.slot) } send(Endpoint.NodeStatistics, networkStatistics, blockProducer) } } else Dashboard.reportException(Exception("Our cluster does not exist.")) diff --git a/src/main/kotlin/logging/Dashboard.kt b/src/main/kotlin/logging/Dashboard.kt index e30f5e8..d954f7d 100644 --- a/src/main/kotlin/logging/Dashboard.kt +++ b/src/main/kotlin/logging/Dashboard.kt @@ -61,6 +61,7 @@ object Dashboard { } fun reportDHTQuery(identifier: String, seekerIp: String, seeker: String, hops: Int, revives: Int, duration: Long) { + if (!configuration.dashboardEnabled) return val point = Point.measurement("dht") .addField("hops", hops) .addField("ip", seekerIp) @@ -77,6 +78,7 @@ object Dashboard { * @param statistics Docker statistics that are reported by all representers of clusters. */ fun reportStatistics(statistics: Set, slot: Long) { + if (!configuration.dashboardEnabled) return var total = 0L for (measurement in statistics) { val publicKey = sha256(measurement.publicKey).asHex diff --git a/src/main/kotlin/network/kademlia/Kademlia.kt b/src/main/kotlin/network/kademlia/Kademlia.kt index e16e269..5d2546c 100644 --- a/src/main/kotlin/network/kademlia/Kademlia.kt +++ b/src/main/kotlin/network/kademlia/Kademlia.kt @@ -9,6 +9,7 @@ import logging.Dashboard import logging.Logger import network.SocketHolder import network.data.Node +import network.rpc.Topic import utils.* import java.io.ByteArrayInputStream import java.io.DataInputStream @@ -16,9 +17,12 @@ import java.net.DatagramPacket import java.net.DatagramSocket import java.net.InetSocketAddress import java.nio.ByteBuffer +import java.util.Timer import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.locks.ReentrantLock +import kotlin.concurrent.scheduleAtFixedRate +import kotlin.concurrent.withLock import kotlin.random.Random /** @@ -58,14 +62,16 @@ open class Kademlia(configuration: Configuration) : SocketHolder(configuration) private val incomingQueue = LinkedBlockingQueue() private val queryStorage = ConcurrentHashMap() private val bucketSize = 20 - private val testLock = ReentrantLock(true) + private val storageLock = ReentrantLock(true) init { Logger.debug("Our identifier is: ${localNode.identifier}") Thread(::sendOutgoing).start() Thread(::receiveIncoming).start() Thread(::processIncoming).start() - lookForInactiveQueries() + Timer().scheduleAtFixedRate(5000, 5000) { + lookForInactiveQueries() + } if (isTrustedNode) add(localNode) // printTree() @@ -157,6 +163,7 @@ open class Kademlia(configuration: Configuration) : SocketHolder(configuration) addToQueue(sender, KademliaEndpoint.CLOSEST_NODES, encodedReply) add(sender) } + KademliaEndpoint.CLOSEST_NODES -> { val closestNodes = ProtoBuf.decodeFromByteArray(kademliaMessage.data) val receivedNodes = closestNodes.nodes @@ -176,7 +183,6 @@ open class Kademlia(configuration: Configuration) : SocketHolder(configuration) queryHolder.hops++ val node = knownNodes[queryHolder.identifier] ?: return@forEach val actionsToDo = mutableListOf<(Node) -> Unit>() - queryHolder.queue.drainTo(actionsToDo) // Logger.trace("Drained $drained actions.") launchCoroutine { @@ -247,7 +253,6 @@ open class Kademlia(configuration: Configuration) : SocketHolder(configuration) Logger.info("Reviving ${inactiveQueries.size} inactive queries.") Dashboard.reportException(Exception("Reviving ${inactiveQueries.size} inactive queries.")) } - runAfter(5000, this::lookForInactiveQueries) } /** Debugs the built kademlia tree [development purposes only]. */ diff --git a/src/main/kotlin/network/messaging/Server.kt b/src/main/kotlin/network/messaging/Server.kt index f43fb11..bf07823 100644 --- a/src/main/kotlin/network/messaging/Server.kt +++ b/src/main/kotlin/network/messaging/Server.kt @@ -16,6 +16,7 @@ import network.data.TransmissionLayer import network.data.TransmissionType import network.data.messages.Message import network.kademlia.Kademlia +import network.rpc.Topic import utils.* import java.io.ByteArrayInputStream import java.io.DataInputStream @@ -157,7 +158,7 @@ abstract class Server(val configuration: Configuration) : Kademlia(configuration tcpSocket.accept().use { socket -> val data = socket.getInputStream().readAllBytes() val message = ProtoBuf.decodeFromByteArray(data) - if (alreadySeen(message.uid.asHex)) return@tryAndReport + if (alreadySeen(message.uid.asHex)) return@use processingQueue.add(message) if (message.endpoint.transmissionType == TransmissionType.Broadcast) broadcast(TransmissionLayer.TCP, message.uid.asHex, data) } @@ -253,6 +254,7 @@ abstract class Server(val configuration: Configuration) : Kademlia(configuration val gluedData = messageBuilder.gluedData() val decoded = ProtoBuf.decodeFromByteArray(gluedData) processingQueue.add(decoded) + messageBuilders.remove(messageId) } } } diff --git a/src/main/kotlin/network/rpc/RPCManager.kt b/src/main/kotlin/network/rpc/RPCManager.kt index e91a40c..60fa246 100644 --- a/src/main/kotlin/network/rpc/RPCManager.kt +++ b/src/main/kotlin/network/rpc/RPCManager.kt @@ -5,8 +5,10 @@ import io.javalin.Javalin import io.javalin.websocket.WsCloseContext import io.javalin.websocket.WsConnectContext import io.javalin.websocket.WsContext +import kotlinx.serialization.Serializable import kotlinx.serialization.encodeToString import kotlinx.serialization.json.Json +import network.data.messages.Message import java.lang.Exception import java.net.http.WebSocket import java.time.Duration @@ -47,12 +49,12 @@ open class RPCManager(configuration: Configuration) { */ private fun onConnect(webSocket: WsConnectContext) { webSocket.enableAutomaticPings(5, TimeUnit.SECONDS) - webSocket.send("Hello, this is Nion node!") Topic.entries.forEach { topic -> subscribedClients .computeIfAbsent(topic) { mutableListOf() } .add(webSocket) } + sendToSubscribed(Topic.Logging, "Hello, this is Nion node!") } /** @@ -73,12 +75,13 @@ open class RPCManager(configuration: Configuration) { * Sends serialised data to all clients that are subscribed to the [topic]. */ inline fun sendToSubscribed(topic: Topic, data: T) { - val serialisedData = Json.encodeToString(data) + val message = RPCMessage(topic, data) + val serialisedData = Json.encodeToString(message) val clientList = subscribedClients[topic] ?: return clientList.forEach { try { it.send(serialisedData) - } catch (e:Exception) { + } catch (e: Exception) { removeConnection(it) } } @@ -86,8 +89,12 @@ open class RPCManager(configuration: Configuration) { } +@Serializable +data class RPCMessage(val topic: Topic, val data: T) + enum class Topic { Block, Migration, - LocalApplication + LocalApplication, + Logging } \ No newline at end of file diff --git a/src/main/kotlin/utils/Utils.kt b/src/main/kotlin/utils/Utils.kt index 8e8ab62..7540286 100644 --- a/src/main/kotlin/utils/Utils.kt +++ b/src/main/kotlin/utils/Utils.kt @@ -1,9 +1,6 @@ package utils -import kotlinx.coroutines.CoroutineExceptionHandler -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.GlobalScope -import kotlinx.coroutines.launch +import kotlinx.coroutines.* import logging.Dashboard import logging.Logger import java.net.InetAddress diff --git a/src/test/kotlin/network/ClusterTest.kt b/src/test/kotlin/network/ClusterTest.kt index 3a6e26e..48ea49f 100644 --- a/src/test/kotlin/network/ClusterTest.kt +++ b/src/test/kotlin/network/ClusterTest.kt @@ -18,7 +18,7 @@ class ClusterTest { @Test fun computeClusters() { val publicKeys = (0..100).map { "${Random.nextInt(1000)}" } - val clusters = ClusterUtils.computeClusters(10, 1, publicKeys) { centroid, element -> + val clusters = ClusterUtils.computeClusters(50, 3, publicKeys) { centroid, element -> val elementBitSet = sha256(element).asHex.asBitSet val centroidBitset = sha256(centroid).asHex.asBitSet.apply { xor(elementBitSet) } centroidBitset.nextSetBit(0) @@ -28,8 +28,8 @@ class ClusterTest { val cluster = clusters[element] ?: Cluster("null") val centroid = cluster.centroid val isRepresentative = element == centroid - if (isRepresentative) println("$centroid -> PRODUCER") - println("$element -> $centroid") + if (isRepresentative) println("$centroid -> PRODUCER [color=red]") + else println("$element -> $centroid") } }