Skip to content

Implementation of WebSocket communication between Clients and Nodes. #40

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: jungle-boogie
Choose a base branch
from
10 changes: 5 additions & 5 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
FROM docker:dind
FROM docker:20-dind
# RUN sed 's/http:\/\/fr\./http:\/\//' /etc/apt/sources.list

WORKDIR /root

RUN apk update
RUN apk add make \
bash \
openjdk11-jre \
curl \
openssl-dev \
python3-dev \
gmp-dev
gmp-dev \
tar

RUN apk add --repository http://dl-cdn.alpinelinux.org/alpine/edge/testing criu-dev
RUN apk add tar
RUN apk add --repository http://dl-cdn.alpinelinux.org/alpine/edge/testing criu-dev \
openjdk20-jre-headless

ADD *.jar Node.jar
ADD config.json config.json
Expand Down
13 changes: 6 additions & 7 deletions Start.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/bin/bash
if [ "$#" -ne 1 ]; then
echo "Sleeping..."
sleep $(shuf -i 1-120 -n 1)
# sleep $(shuf -i 1-120 -n 1)
fi

dockerd --experimental &
Expand All @@ -11,11 +11,10 @@ echo "Waiting for docker daemon to initialize!"
sleep 5
done

docker load -i stress.tar

if [ "$1" -eq 5005 ]; then
echo "We're trusted. Running stress test."
bash stress.sh &
fi
# docker load -i stress.tar
# if [ "$1" -eq 5005 ]; then
# echo "We're trusted. Running stress test."
# bash stress.sh &
# fi

java -jar Node.jar $1
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ dependencies {
implementation('org.jetbrains.kotlinx:kotlinx-serialization-json:1.3.0')
implementation("com.influxdb:influxdb-client-kotlin:4.0.0")
implementation('org.jetbrains.kotlinx:kotlinx-coroutines-core:1.5.2-native-mt')
implementation("io.javalin:javalin:5.6.1")
implementation("org.slf4j:slf4j-simple:2.0.7")
}

tasks.withType(Test).configureEach {
Expand Down
12 changes: 6 additions & 6 deletions config.json
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
{
"trustedNodeIP": "88.200.63.144",
"trustedNodeIP": "172.16.150.159",
"trustedNodePort": 5005,
"webSocketPort": 2517,
"keystorePath": ".",
"port": 5005,
"maxNodes": 10000,
"slotDuration": 10000,
"slotDuration": 5000,
"initialDifficulty": 10,
"broadcastSpreadPercentage": 5,
"committeeSize": 5,
"influxUrl": "",
"influxToken": "",
"dashboardEnabled": false,
"dashboardEnabled": true,
"loggingEnabled": false,
"trustedLoggingEnabled": false,
"historyMinuteClearance": 10,
"historyCleaningFrequency": 5,
"historyMinuteClearance": 1,
"historyCleaningFrequency": 1,
"nodesPerCluster": 10,
"maxIterations": 5,
"packetSplitSize": 60000,
Expand Down
7 changes: 3 additions & 4 deletions src/main/kotlin/Configuration.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import kotlinx.serialization.Serializable
data class Configuration(
val trustedNodeIP: String,
val trustedNodePort: Int,
val port: Int,
var port: Int? = null,
val webSocketPort: Int,
val maxNodes: Int,
val keystorePath: String,
val slotDuration: Long,
Expand All @@ -29,6 +30,4 @@ data class Configuration(
val useCriu: Boolean,
val useTreeBasedMessageRoutingProtocol: Boolean,
val treeChildrenCount: Int
) {
var passedPort: Int = -1
}
)
3 changes: 1 addition & 2 deletions src/main/kotlin/Launch.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ fun main(args: Array<String>) {

Logger.toggleLogging(configuration.loggingEnabled)
args.getOrNull(0)?.toInt()?.apply {
configuration.passedPort = this
println("Passed udpPort: $this...")
configuration.port = this
}

Nion(configuration).apply {
Expand Down
4 changes: 2 additions & 2 deletions src/main/kotlin/Nion.kt
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class Nion(configuration: Configuration) : ChainBuilder(configuration) {
private fun attemptBootstrap() {
if (isTrustedNode || isBootstrapped) return

Logger.info("Attempting bootstrapping.")
Logger.info("Attempting bootstrapping to ${configuration.trustedNodeIP}:${configuration.trustedNodePort}.")
bootstrap(configuration.trustedNodeIP, configuration.trustedNodePort)
runAfter(Random.nextLong(10000, 20000), this::attemptBootstrap)
}
Expand All @@ -71,7 +71,7 @@ class Nion(configuration: Configuration) : ChainBuilder(configuration) {
}

// TODO -------------------------------------
val execution = endpoints[endpoint] ?: throw Exception("Endpoint $endpoint has no handler set.")
val execution = endpoints[endpoint] ?: return // throw Exception("Endpoint $endpoint has no handler set.")
if (endpoint.processing == MessageProcessing.Queued) processingQueue.put { execution(message) }
else launchCoroutine { execution(message) }
}
Expand Down
5 changes: 2 additions & 3 deletions src/main/kotlin/chain/Chain.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package chain

import chain.data.Block
import logging.Logger
import utils.CircularList
import utils.asHex
import utils.tryWithLock
import java.util.concurrent.locks.ReentrantLock
Expand All @@ -15,7 +16,7 @@ import kotlin.concurrent.withLock
class Chain(private val verifiableDelay: VerifiableDelay, private val initialDifficulty: Int, private val committeeSize: Int) {

private val lock = ReentrantLock(true)
private val blocks = mutableListOf<Block>()
private val blocks = CircularList<Block>(50) // ToDo: Remove, do not use Circular List but use persistent storage!

/** Returns the last block in the chain. */
fun getLastBlock(): Block? {
Expand Down Expand Up @@ -46,8 +47,6 @@ class Chain(private val verifiableDelay: VerifiableDelay, private val initialDif
lock.tryWithLock {
blocks.add(nextBlock)
// ToDo: Put chain history in some sort of storage instead of keeping in memory.
val lastBlocks = blocks.takeLast(50)
blocks.removeAll { !lastBlocks.contains(it) }
}
Logger.chain("Block[${nextBlock.votes}/$committeeSize] added [${nextBlock.slot}].")
}
Expand Down
12 changes: 8 additions & 4 deletions src/main/kotlin/chain/ChainBuilder.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import network.data.clusters.ClusterUtils
import network.data.messages.InclusionRequest
import network.data.messages.Message
import network.data.messages.SyncRequest
import network.rpc.Topic
import utils.*
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicBoolean
Expand Down Expand Up @@ -47,9 +48,9 @@ abstract class ChainBuilder(configuration: Configuration) : DockerProxy(configur
votes.entries.removeIf { (key, _) -> key == block.hash.asHex }
if (blockAdded) {

if(isTrustedNode) println("Added block\t[${block.slot}]")
if(isTrustedNode) Logger.chain("Added block\t[${block.slot}].")
sendToSubscribed(Topic.Block, block)

removeOutdatedStatistics(block.slot - 1)
if (block.slot <= 2) validatorSet.inclusionChanges(block)
val nextTask = validatorSet.computeNextTask(block, configuration.committeeSize)

Expand Down Expand Up @@ -226,12 +227,15 @@ abstract class ChainBuilder(configuration: Configuration) : DockerProxy(configur
validatorSet.scheduleChange(inclusionRequest.publicKey, true)
// send(message)
}
Logger.debug("Received inclusion request! ")
val canStartTheChain = isTrustedNode && lastBlock == null
Logger.debug("Received inclusion request! Can start the chain: $canStartTheChain")

if (isTrustedNode && lastBlock == null) {
if (canStartTheChain) {
val scheduledChanges = validatorSet.getScheduledChanges().count { it.value }
val isEnoughToStart = scheduledChanges > configuration.committeeSize
Logger.debug("There is enough to start: $isEnoughToStart")
if (isEnoughToStart && !sentGenesis.getAndSet(true)) {
Logger.debug("Computing proof for genesis block.")
val proof = verifiableDelay.computeProof(configuration.initialDifficulty, "FFFF".encodeToByteArray())
val genesisBlock = Block(1, configuration.initialDifficulty, localNode.publicKey, emptySet(), proof, System.currentTimeMillis(), byteArrayOf(), validatorSet.getScheduledChanges())
send(Endpoint.NewBlock, genesisBlock)
Expand Down
5 changes: 4 additions & 1 deletion src/main/kotlin/chain/ValidatorSet.kt
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ class ValidatorSet(private val localNode: Node, isTrustedNode: Boolean) {


init {
if (isTrustedNode) scheduledChanges[localNode.publicKey] = true
if (isTrustedNode) {
scheduledChanges[localNode.publicKey] = true
Logger.info("We're the trusted node...")
}
}

/** Returns shuffled validator set using passed [random]. */
Expand Down
11 changes: 7 additions & 4 deletions src/main/kotlin/docker/DockerProxy.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import logging.Logger
import network.data.Endpoint
import network.data.clusters.Cluster
import network.data.messages.Message
import network.rpc.Topic
import utils.CircularList
import utils.runAfter
import java.nio.ByteBuffer
Expand Down Expand Up @@ -39,8 +40,10 @@ abstract class DockerProxy(configuration: Configuration) : MigrationStrategy(con
*
* @param slot
*/
protected fun removeOutdatedStatistics(slot: Long) {
networkStatistics.remove(slot)
private fun removeOutdatedStatistics(slot: Long) {
networkStatistics.clear()
val keys = networkStatistics.keys.filter { it < slot }
for (key in keys) networkStatistics.remove(key)
}

/** Stores all [statistics] into latest [networkStatistics] using [networkLock]. */
Expand All @@ -65,17 +68,17 @@ abstract class DockerProxy(configuration: Configuration) : MigrationStrategy(con
val currentTime = System.currentTimeMillis()
localContainers.entries.removeIf { (_, container) -> currentTime - container.updated >= 1000 }

val mapped = localContainers.values.map { it.copy(id = networkMappings[it.id] ?: it.id) }
val mapped: List<DockerContainer> = localContainers.values.map { it.copy(id = networkMappings[it.id] ?: it.id) }
val localStatistics = DockerStatistics(localNode.publicKey, mapped, slot)
val ourPublicKey = localNode.publicKey
val ourCluster = clusters[ourPublicKey]
if (ourCluster != null) {
val clusterRepresentative = ourCluster.centroid
val isRepresentative = clusterRepresentative == ourPublicKey
removeOutdatedStatistics(slot)
if (!isRepresentative) send(Endpoint.NodeStatistics, arrayOf(localStatistics), clusterRepresentative)
else runAfter(configuration.slotDuration / 4) {
val networkStatistics = getNetworkStatistics(slot).plus(localStatistics)
networkStatistics.forEach { Dashboard.statisticSent(localNode.publicKey, it, blockProducer, block.slot) }
send(Endpoint.NodeStatistics, networkStatistics, blockProducer)
}
} else Dashboard.reportException(Exception("Our cluster does not exist."))
Expand Down
3 changes: 3 additions & 0 deletions src/main/kotlin/logging/Dashboard.kt
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ object Dashboard {
}

fun reportDHTQuery(identifier: String, seekerIp: String, seeker: String, hops: Int, revives: Int, duration: Long) {
if (!configuration.dashboardEnabled) return
val point = Point.measurement("dht")
.addField("hops", hops)
.addField("ip", seekerIp)
Expand All @@ -77,6 +78,7 @@ object Dashboard {
* @param statistics Docker statistics that are reported by all representers of clusters.
*/
fun reportStatistics(statistics: Set<DockerStatistics>, slot: Long) {
if (!configuration.dashboardEnabled) return
var total = 0L
for (measurement in statistics) {
val publicKey = sha256(measurement.publicKey).asHex
Expand Down Expand Up @@ -151,6 +153,7 @@ object Dashboard {

/** Reports that an exception was caught */
fun reportException(e: Throwable, additionalInfo: String = "") {
if (!configuration.dashboardEnabled) return
Logger.reportException(e)
val point = Point.measurement("exceptions")
.time(Instant.now(), WritePrecision.NS)
Expand Down
1 change: 0 additions & 1 deletion src/main/kotlin/logging/Logger.kt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ object Logger {
data class Log(val type: DebugType, val log: String, val ip: String, val timestamp: Long)

private var isLoggingEnabled = false
private var currentDebug: DebugType = DebugType.ALL
private val timeFormatter = DateTimeFormatter.ofPattern("dd. MM | HH:mm:ss.SSS")

const val red = "\u001b[31m"
Expand Down
3 changes: 2 additions & 1 deletion src/main/kotlin/network/SocketHolder.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package network

import Configuration
import network.rpc.RPCManager
import java.net.DatagramSocket
import java.net.ServerSocket

Expand All @@ -9,7 +10,7 @@ import java.net.ServerSocket
* on 21/01/2022 at 10:44
* using IntelliJ IDEA
*/
open class SocketHolder(config: Configuration) {
open class SocketHolder(configuration: Configuration) : RPCManager(configuration) {

protected val udpSocket: DatagramSocket = DatagramSocket()
protected val tcpSocket: ServerSocket = ServerSocket(0)
Expand Down
19 changes: 12 additions & 7 deletions src/main/kotlin/network/kademlia/Kademlia.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,20 @@ import logging.Dashboard
import logging.Logger
import network.SocketHolder
import network.data.Node
import network.rpc.Topic
import utils.*
import java.io.ByteArrayInputStream
import java.io.DataInputStream
import java.net.DatagramPacket
import java.net.DatagramSocket
import java.net.InetSocketAddress
import java.nio.ByteBuffer
import java.util.Timer
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.scheduleAtFixedRate
import kotlin.concurrent.withLock
import kotlin.random.Random

/**
Expand All @@ -31,7 +35,7 @@ open class Kademlia(configuration: Configuration) : SocketHolder(configuration)

val crypto = Crypto(".")

private val kademliaSocket: DatagramSocket = if (configuration.passedPort != -1) DatagramSocket(configuration.passedPort) else DatagramSocket()
private val kademliaSocket: DatagramSocket = configuration.port?.let { DatagramSocket(it) } ?: DatagramSocket()

val localAddress = getLocalAddress()
val localNode = Node(
Expand All @@ -43,7 +47,7 @@ open class Kademlia(configuration: Configuration) : SocketHolder(configuration)
publicKey = crypto.publicKey
).apply {
Dashboard.myInfo = "$ip:$kademliaPort"
println(Dashboard.myInfo)
Logger.info("Kademlia listening on: ${Dashboard.myInfo}")
}
val isTrustedNode = localNode.let { node -> node.ip == configuration.trustedNodeIP && node.kademliaPort == configuration.trustedNodePort }

Expand All @@ -58,17 +62,19 @@ open class Kademlia(configuration: Configuration) : SocketHolder(configuration)
private val incomingQueue = LinkedBlockingQueue<KademliaMessage>()
private val queryStorage = ConcurrentHashMap<String, KademliaQuery>()
private val bucketSize = 20
private val testLock = ReentrantLock(true)
private val storageLock = ReentrantLock(true)

init {
Logger.debug("Our identifier is: ${localNode.identifier}")
Thread(::sendOutgoing).start()
Thread(::receiveIncoming).start()
Thread(::processIncoming).start()
lookForInactiveQueries()
Timer().scheduleAtFixedRate(5000, 5000) {
lookForInactiveQueries()
}

if (isTrustedNode) add(localNode)
printTree()
// printTree()
}

/** Sends a FIND_NODE request of our key to the known bootstrapping [Node]. */
Expand Down Expand Up @@ -157,6 +163,7 @@ open class Kademlia(configuration: Configuration) : SocketHolder(configuration)
addToQueue(sender, KademliaEndpoint.CLOSEST_NODES, encodedReply)
add(sender)
}

KademliaEndpoint.CLOSEST_NODES -> {
val closestNodes = ProtoBuf.decodeFromByteArray<ClosestNodes>(kademliaMessage.data)
val receivedNodes = closestNodes.nodes
Expand All @@ -176,7 +183,6 @@ open class Kademlia(configuration: Configuration) : SocketHolder(configuration)
queryHolder.hops++
val node = knownNodes[queryHolder.identifier] ?: return@forEach
val actionsToDo = mutableListOf<(Node) -> Unit>()

queryHolder.queue.drainTo(actionsToDo)
// Logger.trace("Drained $drained actions.")
launchCoroutine {
Expand Down Expand Up @@ -247,7 +253,6 @@ open class Kademlia(configuration: Configuration) : SocketHolder(configuration)
Logger.info("Reviving ${inactiveQueries.size} inactive queries.")
Dashboard.reportException(Exception("Reviving ${inactiveQueries.size} inactive queries."))
}
runAfter(5000, this::lookForInactiveQueries)
}

/** Debugs the built kademlia tree [development purposes only]. */
Expand Down
Loading