Skip to content

Commit

Permalink
Merge pull request #250 from libp2p/0.9.1
Browse files Browse the repository at this point in the history
0.9.1 release
  • Loading branch information
ajsutton authored Jul 26, 2022
2 parents 8d40099 + 0041e1a commit dce532a
Show file tree
Hide file tree
Showing 40 changed files with 1,413 additions and 842 deletions.
3 changes: 1 addition & 2 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import java.nio.file.Paths
// ./gradlew publish -PcloudsmithUser=<user> -PcloudsmithApiKey=<api-key>

group = "io.libp2p"
version = "0.9.0-RELEASE"
version = "0.9.1-RELEASE"
description = "a minimal implementation of libp2p for the jvm"

plugins {
Expand Down Expand Up @@ -252,7 +252,6 @@ compileKotlin.kotlinOptions {
}

detekt {
baseline = file("$projectDir/detekt/baseline.xml")
config = files("$projectDir/detekt/config.yml")
buildUponDefaultConfig = true
}
20 changes: 0 additions & 20 deletions detekt/baseline.xml

This file was deleted.

4 changes: 3 additions & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
kotlin.code.style=official
kotlin.code.style=official

org.gradle.jvmargs=-Xmx1G
8 changes: 8 additions & 0 deletions src/main/kotlin/io/libp2p/core/crypto/Key.kt
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ abstract class PrivKey(override val keyType: Crypto.KeyType) : Key {
if (javaClass != other?.javaClass) return false
return bytes().contentEquals((other as PrivKey).bytes())
}

override fun hashCode(): Int {
return raw().contentHashCode()
}
}

/**
Expand All @@ -108,6 +112,10 @@ abstract class PubKey(override val keyType: Crypto.KeyType) : Key {
if (javaClass != other?.javaClass) return false
return bytes().contentEquals((other as PubKey).bytes())
}

override fun hashCode(): Int {
return raw().contentHashCode()
}
}

/**
Expand Down
3 changes: 3 additions & 0 deletions src/main/kotlin/io/libp2p/core/multiformats/MultiaddrDns.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.libp2p.core.multiformats

import org.apache.logging.log4j.LogManager
import java.net.Inet4Address
import java.net.Inet6Address
import java.net.InetAddress
Expand All @@ -12,6 +13,7 @@ class MultiaddrDns {
}

companion object {
private val log = LogManager.getLogger(MultiaddrDns::class.java)
private val dnsProtocols = arrayOf(Protocol.DNS4, Protocol.DNS6, Protocol.DNSADDR)

fun resolve(addr: Multiaddr, resolver: Resolver = DefaultResolver): List<Multiaddr> {
Expand Down Expand Up @@ -52,6 +54,7 @@ class MultiaddrDns {
}
}
} catch (e: UnknownHostException) {
log.debug(e)
return emptyList()
// squash, as this might not be fatal,
// and if it is we'll handle this higher up the call chain
Expand Down
4 changes: 0 additions & 4 deletions src/main/kotlin/io/libp2p/crypto/keys/Curve25519.kt
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ class Curve25519PrivateKey(private val state: DHState) : PrivKey(Crypto.KeyType.
state.getPublicKey(ba, 0)
return Curve25519PublicKey(state)
}

override fun hashCode(): Int = raw().contentHashCode()
}

/**
Expand All @@ -56,8 +54,6 @@ class Curve25519PublicKey(private val state: DHState) : PubKey(Crypto.KeyType.Cu
override fun verify(data: ByteArray, signature: ByteArray): Boolean {
throw NotImplementedError("Verifying with Curve25519 public key currently unsupported.")
}

override fun hashCode(): Int = raw().contentHashCode()
}

/**
Expand Down
4 changes: 4 additions & 0 deletions src/main/kotlin/io/libp2p/crypto/keys/Ecdsa.kt
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ class EcdsaPrivateKey(val priv: JavaECPrivateKey) : PrivKey(Crypto.KeyType.ECDSA
}
}

override fun equals(other: Any?): Boolean = super.equals(other)

override fun hashCode(): Int = priv.hashCode()
}

Expand All @@ -99,6 +101,8 @@ class EcdsaPublicKey(val pub: JavaECPublicKey) : PubKey(Crypto.KeyType.ECDSA) {
verify(signature)
}

override fun equals(other: Any?): Boolean = super.equals(other)

override fun hashCode(): Int = pub.hashCode()
}

Expand Down
4 changes: 4 additions & 0 deletions src/main/kotlin/io/libp2p/crypto/keys/Ed25519.kt
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class Ed25519PrivateKey(private val priv: Ed25519PrivateKeyParameters) : PrivKey

override fun publicKey(): PubKey = Ed25519PublicKey(priv.generatePublicKey())

override fun equals(other: Any?): Boolean = super.equals(other)

override fun hashCode(): Int = priv.hashCode()
}

Expand All @@ -53,6 +55,8 @@ class Ed25519PublicKey(private val pub: Ed25519PublicKeyParameters) : PubKey(Cry
verifySignature(signature)
}

override fun equals(other: Any?): Boolean = super.equals(other)

override fun hashCode(): Int = pub.hashCode()
}

Expand Down
4 changes: 4 additions & 0 deletions src/main/kotlin/io/libp2p/crypto/keys/Rsa.kt
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ class RsaPrivateKey(private val sk: JavaPrivateKey, private val pk: JavaPublicKe

override fun publicKey(): PubKey = rsaPublicKey

override fun equals(other: Any?): Boolean = super.equals(other)

override fun hashCode(): Int = pk.hashCode()
}

Expand All @@ -85,6 +87,8 @@ class RsaPublicKey(private val k: JavaPublicKey) : PubKey(Crypto.KeyType.RSA) {
verify(signature)
}

override fun equals(other: Any?): Boolean = super.equals(other)

override fun hashCode(): Int = k.hashCode()
}

Expand Down
13 changes: 6 additions & 7 deletions src/main/kotlin/io/libp2p/crypto/keys/Secp256k1.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,14 @@ import io.libp2p.core.crypto.PrivKey
import io.libp2p.core.crypto.PubKey
import io.libp2p.core.crypto.sha256
import io.libp2p.crypto.SECP_256K1_ALGORITHM
import org.bouncycastle.asn1.ASN1InputStream
import org.bouncycastle.asn1.ASN1Integer
import org.bouncycastle.asn1.ASN1Primitive
import org.bouncycastle.asn1.ASN1Sequence
import org.bouncycastle.asn1.DERSequenceGenerator
import org.bouncycastle.asn1.*
import org.bouncycastle.asn1.sec.SECNamedCurves
import org.bouncycastle.crypto.ec.CustomNamedCurves
import org.bouncycastle.crypto.generators.ECKeyPairGenerator
import org.bouncycastle.crypto.params.ECDomainParameters
import org.bouncycastle.crypto.params.ECKeyGenerationParameters
import org.bouncycastle.crypto.params.ECPrivateKeyParameters
import org.bouncycastle.crypto.params.ECPublicKeyParameters
import org.bouncycastle.crypto.params.ParametersWithRandom
import org.bouncycastle.crypto.signers.ECDSASigner
import org.bouncycastle.math.ec.FixedPointCombMultiplier
import org.bouncycastle.math.ec.FixedPointUtil
Expand Down Expand Up @@ -61,7 +56,7 @@ class Secp256k1PrivateKey(private val privateKey: ECPrivateKeyParameters) : Priv

override fun sign(data: ByteArray): ByteArray {
val (r, s) = with(ECDSASigner()) {
init(true, ParametersWithRandom(privateKey, SecureRandom()))
init(true, privateKey)
generateSignature(sha256(data)).let {
Pair(it[0], it[1])
}
Expand All @@ -85,6 +80,8 @@ class Secp256k1PrivateKey(private val privateKey: ECPrivateKeyParameters) : Priv
return Secp256k1PublicKey(ECPublicKeyParameters(publicPoint, CURVE))
}

override fun equals(other: Any?): Boolean = super.equals(other)

override fun hashCode(): Int = priv.hashCode()
}

Expand Down Expand Up @@ -120,6 +117,8 @@ class Secp256k1PublicKey(private val pub: ECPublicKeyParameters) : PubKey(Crypto
return signer.verifySignature(sha256(data), r.abs(), s.abs())
}

override fun equals(other: Any?): Boolean = super.equals(other)

override fun hashCode(): Int = pub.hashCode()
}

Expand Down
2 changes: 2 additions & 0 deletions src/main/kotlin/io/libp2p/etc/types/AsyncExt.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ fun <C> CompletableFuture<C>.forward(forwardTo: CompletableFuture<in C>) = forwa
/**
* The same as [CompletableFuture.get] but unwraps [ExecutionException]
*/
@Suppress("SwallowedException")
fun <C> CompletableFuture<C>.getX(): C {
try {
return get()
Expand All @@ -36,6 +37,7 @@ fun <C> CompletableFuture<C>.getX(): C {
/**
* The same as [CompletableFuture.get] but unwraps [ExecutionException]
*/
@Suppress("SwallowedException")
fun <C> CompletableFuture<C>.getX(timeoutSec: Double): C {
try {
return get((timeoutSec * 1000).toLong(), TimeUnit.MILLISECONDS)
Expand Down
104 changes: 50 additions & 54 deletions src/main/kotlin/io/libp2p/etc/util/P2PService.kt
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
package io.libp2p.etc.util

import com.google.common.util.concurrent.ThreadFactoryBuilder
import io.libp2p.core.InternalErrorException
import io.libp2p.core.PeerId
import io.libp2p.core.Stream
import io.libp2p.etc.types.lazyVarInit
import io.libp2p.etc.types.submitAsync
import io.libp2p.etc.types.toVoidCompletableFuture
import io.libp2p.pubsub.AbstractRouter
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelInboundHandlerAdapter
import io.netty.util.ReferenceCountUtil
import org.apache.logging.log4j.LogManager
import java.util.concurrent.CompletableFuture
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService
import java.util.function.Supplier

private val logger = LogManager.getLogger(P2PService::class.java)

/**
* Base class for a service which manages many streams from different peers
Expand All @@ -27,8 +25,34 @@ import java.util.function.Supplier
* service API should be executed on this thread to be thread-safe.
* Consider using the following helpers [runOnEventThread], [submitOnEventThread], [submitAsyncOnEventThread]
* or use the [executor] directly
*
* @param executor Executor backed by a single event thread
* It is only safe to perform any service logic via this executor
*/
abstract class P2PService {
abstract class P2PService(
protected val executor: ScheduledExecutorService
) {

private val peersMutable = mutableListOf<PeerHandler>()
/**
* List of connected peers.
* Note that connected peer could not be ready for writing yet, so consider [activePeers]
* if any data is to be send
*/
val peers: List<PeerHandler> = peersMutable

private val activePeersMutable = mutableListOf<PeerHandler>()
/**
* List of active peers to which data could be written
*/
val activePeers: List<PeerHandler> = activePeersMutable

private val peerIdToPeerHandlerMapMutable = mutableMapOf<PeerId, PeerHandler>()

/**
* Maps [PeerId] to [PeerHandler] instance for connected peers
*/
val peerIdToPeerHandlerMap: Map<PeerId, PeerHandler> = peerIdToPeerHandlerMapMutable

/**
* Represents a single stream
Expand Down Expand Up @@ -106,31 +130,6 @@ abstract class P2PService {
}
}

/**
* Executor backed by a single event thread
* It is only safe to perform any service logic via this executor
*
* The executor can be altered right after the instance creation.
* Changing it later may have unpredictable results
*/
var executor: ScheduledExecutorService by lazyVarInit {
Executors.newSingleThreadScheduledExecutor(
threadFactory
)
}

/**
* List of connected peers.
* Note that connected peer could not be ready for writing yet, so consider [activePeers]
* if any data is to be send
*/
val peers = mutableListOf<PeerHandler>()

/**
* List of active peers to which data could be written
*/
val activePeers = mutableListOf<PeerHandler>()

/**
* Adds a new stream to service. This method should **synchronously** init the underlying
* [io.netty.channel.Channel]
Expand All @@ -140,26 +139,40 @@ abstract class P2PService {
*/
open fun addNewStream(stream: Stream) = initChannel(StreamHandler(stream))

/**
* Callback to initialize the [Stream] underlying [io.netty.channel.Channel]
*
* Is invoked **not** on the event thread
* [io.netty.channel.Channel] initialization must be performed **synchronously on the caller thread**.
* **Don't** initialize the channel on event thread!
* Any service logic related to adding a new stream could be performed
* within overridden [streamAdded] callback (which is invoked on event thread)
*/
protected abstract fun initChannel(streamHandler: StreamHandler)

protected open fun streamAdded(streamHandler: StreamHandler) {
val peerHandler = createPeerHandler(streamHandler)
streamHandler.initPeerHandler(peerHandler)
peers += peerHandler
peersMutable += peerHandler
peerIdToPeerHandlerMapMutable[peerHandler.peerId] = peerHandler
}

protected open fun createPeerHandler(streamHandler: StreamHandler) = PeerHandler(streamHandler)

protected open fun streamActive(stream: StreamHandler) {
if (stream.aborted) return
activePeers += stream.getPeerHandler()
activePeersMutable += stream.getPeerHandler()
onPeerActive(stream.getPeerHandler())
}

protected open fun streamDisconnected(stream: StreamHandler) {
val peerHandler = stream.getPeerHandler()
if (stream.aborted) return
activePeers -= stream.getPeerHandler()
if (peers.remove(stream.getPeerHandler())) {
onPeerDisconnected(stream.getPeerHandler())
activePeersMutable -= peerHandler
if (peersMutable.remove(peerHandler)) {
onPeerDisconnected(peerHandler)
}
peerIdToPeerHandlerMapMutable -= peerHandler.peerId
}

protected open fun streamException(stream: StreamHandler, cause: Throwable) {
Expand All @@ -171,17 +184,6 @@ abstract class P2PService {
onInbound(stream.getPeerHandler(), msg)
}

/**
* Callback to initialize the [Stream] underlying [io.netty.channel.Channel]
*
* Is invoked **not** on the event thread
* [io.netty.channel.Channel] initialization must be performed **synchronously on the caller thread**.
* **Don't** initialize the channel on event thread!
* Any service logic related to adding a new stream could be performed
* within overridden [streamAdded] callback (which is invoked on event thread)
*/
protected abstract fun initChannel(streamHandler: StreamHandler)

/**
* Callback notifies that the peer is active and ready for writing data
* Invoked on event thread
Expand Down Expand Up @@ -238,15 +240,9 @@ abstract class P2PService {
/**
* Executes the code on the service event thread
*/
fun <C> submitOnEventThread(run: () -> C): CompletableFuture<C> = CompletableFuture.supplyAsync(Supplier { run() }, executor)
fun <C> submitOnEventThread(run: () -> C): CompletableFuture<C> = CompletableFuture.supplyAsync({ run() }, executor)
/**
* Executes the code on the service event thread
*/
fun <C> submitAsyncOnEventThread(run: () -> CompletableFuture<C>): CompletableFuture<C> = executor.submitAsync(run)

companion object {
private val threadFactory = ThreadFactoryBuilder().setDaemon(true).setNameFormat("P2PService-event-thread-%d").build()
@JvmStatic
val logger = LogManager.getLogger(AbstractRouter::class.java)
}
}
Loading

0 comments on commit dce532a

Please sign in to comment.