Skip to content

Commit

Permalink
Multinode (#190)
Browse files Browse the repository at this point in the history
* minor fix

* fix connection

* broadcase safrole tickets

* update logging

* fix unaligned load

* trace

* fix tests
  • Loading branch information
xlc authored Oct 23, 2024
1 parent 86e32b5 commit 63c6167
Show file tree
Hide file tree
Showing 20 changed files with 254 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ public enum RuntimeEvents {

// New safrole ticket generated from SafroleService
public struct SafroleTicketsGenerated: Event {
public let epochIndex: EpochIndex
public let items: [TicketItemAndOutput]
public let publicKey: Bandersnatch.PublicKey
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public final class SafroleService: ServiceBase, @unchecked Sendable {
)

events.append(.init(
epochIndex: state.value.timeslot.timeslotToEpochIndex(config: config),
items: tickets,
publicKey: secret.publicKey
))
Expand Down
32 changes: 8 additions & 24 deletions Blockchain/Sources/Blockchain/Validator/ServiceBase.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,43 +5,27 @@ public class ServiceBase {
public let id: UniqueId
let logger: Logger
public let config: ProtocolConfigRef
private let eventBus: EventBus
private let subscriptionTokens: ThreadSafeContainer<[EventBus.SubscriptionToken]> = .init([])
private let subscriptions: EventSubscriptions

init(id: UniqueId, config: ProtocolConfigRef, eventBus: EventBus) {
self.id = id
logger = Logger(label: id)
self.config = config
self.eventBus = eventBus
}

deinit {
let eventBus = self.eventBus
let subscriptionTokens = self.subscriptionTokens
Task {
for token in subscriptionTokens.value {
await eventBus.unsubscribe(token: token)
}
}
subscriptions = EventSubscriptions(eventBus: eventBus)
}

@discardableResult
func subscribe<T: Event>(_ eventType: T.Type, id _: UniqueId, handler: @escaping @Sendable (T) async throws -> Void) async -> EventBus
.SubscriptionToken
{
let token = await eventBus.subscribe(eventType, handler: handler)
subscriptionTokens.write { $0.append(token) }
return token
func subscribe<T: Event>(
_ eventType: T.Type, id _: UniqueId, handler: @escaping @Sendable (T) async throws -> Void
) async -> EventBus.SubscriptionToken {
await subscriptions.subscribe(eventType, id: id, handler: handler)
}

func unsubscribe(token: EventBus.SubscriptionToken) async {
subscriptionTokens.write { tokens in
tokens.removeAll { $0 == token }
}
await eventBus.unsubscribe(token: token)
await subscriptions.unsubscribe(token: token)
}

func publish(_ event: some Event) {
eventBus.publish(event)
subscriptions.publish(event)
}
}
24 changes: 20 additions & 4 deletions Blockchain/Tests/BlockchainTests/ExtrinsicPoolServiceTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,11 @@ struct ExtrinsicPoolServiceTests {

allTickets.append(contentsOf: tickets)

let event = RuntimeEvents.SafroleTicketsGenerated(items: tickets, publicKey: secretKey.publicKey)
let event = RuntimeEvents.SafroleTicketsGenerated(
epochIndex: state.value.timeslot.timeslotToEpochIndex(config: config),
items: tickets,
publicKey: secretKey.publicKey
)
await eventBus.publish(event)

// Wait for the event to be processed
Expand Down Expand Up @@ -126,7 +130,11 @@ struct ExtrinsicPoolServiceTests {
idx: 0
)

let addEvent = RuntimeEvents.SafroleTicketsGenerated(items: tickets, publicKey: secretKey.publicKey)
let addEvent = RuntimeEvents.SafroleTicketsGenerated(
epochIndex: state.value.timeslot.timeslotToEpochIndex(config: config),
items: tickets,
publicKey: secretKey.publicKey
)
await eventBus.publish(addEvent)

// Wait for the event to be processed
Expand Down Expand Up @@ -173,7 +181,11 @@ struct ExtrinsicPoolServiceTests {
idx: 0
)

let addEvent = RuntimeEvents.SafroleTicketsGenerated(items: oldTickets, publicKey: secretKey.publicKey)
let addEvent = RuntimeEvents.SafroleTicketsGenerated(
epochIndex: state.value.timeslot.timeslotToEpochIndex(config: config),
items: oldTickets,
publicKey: secretKey.publicKey
)
await eventBus.publish(addEvent)
await storeMiddleware.wait()

Expand Down Expand Up @@ -216,7 +228,11 @@ struct ExtrinsicPoolServiceTests {
)

// Ensure new tickets are accepted
let newAddEvent = RuntimeEvents.SafroleTicketsGenerated(items: newTickets, publicKey: secretKey.publicKey)
let newAddEvent = RuntimeEvents.SafroleTicketsGenerated(
epochIndex: newState.value.timeslot.timeslotToEpochIndex(config: config),
items: newTickets,
publicKey: secretKey.publicKey
)
await eventBus.publish(newAddEvent)
await storeMiddleware.wait()

Expand Down
4 changes: 4 additions & 0 deletions Boka/.swiftpm/xcode/xcshareddata/xcschemes/Boka.xcscheme
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@
argument = "--validator"
isEnabled = "YES">
</CommandLineArgument>
<CommandLineArgument
argument = "--chain=minimal"
isEnabled = "YES">
</CommandLineArgument>
</CommandLineArguments>
<EnvironmentVariables>
<EnvironmentVariable
Expand Down
4 changes: 2 additions & 2 deletions Boka/Package.resolved

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions Boka/Sources/Boka.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ enum MaybeEnabled<T: ExpressibleByArgument>: ExpressibleByArgument {
case disabled

init?(argument: String) {
if argument.lowercased() == "false" {
if argument.lowercased() == "no" {
self = .disabled
} else {
guard let argument = T(argument: argument) else {
Expand Down Expand Up @@ -60,11 +60,11 @@ struct Boka: AsyncParsableCommand {
@Option(name: .long, help: "A preset config or path to chain config file.")
var chain: Genesis = .preset(.dev)

@Option(name: .long, help: "Listen address for RPC server. Pass 'false' to disable RPC server. Default to 127.0.0.1:9955.")
@Option(name: .long, help: "Listen address for RPC server. Pass 'no' to disable RPC server. Default to 127.0.0.1:9955.")
var rpc: MaybeEnabled<NetAddr> = .enabled(NetAddr(address: "127.0.0.1:9955")!)

@Option(name: .long, help: "Listen address for P2P protocol.")
var p2p: NetAddr = .init(address: "127.0.0.1:19955")!
var p2p: NetAddr = .init(address: "127.0.0.1:0")!

@Option(name: .long, help: "Specify peer P2P addresses.")
var peers: [NetAddr] = []
Expand Down Expand Up @@ -114,12 +114,11 @@ struct Boka: AsyncParsableCommand {
}

let rpcConfig = rpc.asOptional.map { addr -> RPCConfig in
logger.info("RPC listen address: \(addr)")
let (address, port) = addr.getAddressAndPort()
return RPCConfig(listenAddress: address, port: Int(port))
}

let keystore = try await DevKeyStore()
let keystore = try await DevKeyStore(devKeysCount: devSeed == nil ? 12 : 0)

let networkKey: Ed25519.SecretKey = try await {
if let devSeed {
Expand All @@ -130,6 +129,7 @@ struct Boka: AsyncParsableCommand {
}
}()

logger.info("Network key: \(networkKey.publicKey.data.toHexString())")
let networkConfig = NetworkConfig(
mode: validator ? .validator : .builder,
listenAddress: p2p,
Expand All @@ -144,7 +144,7 @@ struct Boka: AsyncParsableCommand {
handlerMiddleware: .tracing(prefix: "Handler")
)

let config = Node.Config(rpc: rpcConfig, network: networkConfig)
let config = Node.Config(rpc: rpcConfig, network: networkConfig, peers: peers)

let node: Node = if validator {
try await ValidatorNode(
Expand Down
6 changes: 1 addition & 5 deletions Networking/Sources/MsQuicSwift/NetAddr.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import msquic
import Darwin
#endif

public struct NetAddr: Sendable {
public struct NetAddr: Sendable, Equatable, Hashable {
var quicAddr: QUIC_ADDR

public init?(address: String) {
Expand Down Expand Up @@ -38,17 +38,13 @@ public struct NetAddr: Sendable {
let (host, port, _) = parseQuicAddr(quicAddr) ?? ("::dead:beef", 0, false)
return (host, port)
}
}

extension NetAddr: Equatable {
public static func == (lhs: NetAddr, rhs: NetAddr) -> Bool {
var addr1 = lhs.quicAddr
var addr2 = rhs.quicAddr
return QuicAddrCompare(&addr1, &addr2) == 1
}
}

extension NetAddr: Hashable {
public func hash(into hasher: inout Hasher) {
var addr = quicAddr
let hash = QuicAddrHash(&addr)
Expand Down
3 changes: 3 additions & 0 deletions Networking/Sources/MsQuicSwift/QuicConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,9 @@ private class ConnectionHandle {

case QUIC_CONNECTION_EVENT_SHUTDOWN_COMPLETE:
logger.trace("Shutdown complete")
if let connection {
connection.handler.shutdownComplete(connection)
}
if event.pointee.SHUTDOWN_COMPLETE.AppCloseInProgress == 0 {
// avoid closing twice
api.call { api in
Expand Down
9 changes: 9 additions & 0 deletions Networking/Sources/MsQuicSwift/QuicEventHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public protocol QuicEventHandler: Sendable {
func shouldOpen(_ connection: QuicConnection, certificate: Data?) -> QuicStatus
func connected(_ connection: QuicConnection)
func shutdownInitiated(_ connection: QuicConnection, reason: ConnectionCloseReason)
func shutdownComplete(_ connection: QuicConnection)
func streamStarted(_ connect: QuicConnection, stream: QuicStream)

// stream events
Expand All @@ -55,6 +56,7 @@ extension QuicEventHandler {
public func connected(_: QuicConnection) {}

public func shutdownInitiated(_: QuicConnection, reason _: ConnectionCloseReason) {}
public func shutdownComplete(_: QuicConnection) {}

public func streamStarted(_: QuicConnection, stream _: QuicStream) {}

Expand All @@ -69,6 +71,7 @@ public final class MockQuicEventHandler: QuicEventHandler {
case shouldOpen(connection: QuicConnection, certificate: Data?)
case connected(connection: QuicConnection)
case shutdownInitiated(connection: QuicConnection, reason: ConnectionCloseReason)
case shutdownComplete(connection: QuicConnection)
case streamStarted(connection: QuicConnection, stream: QuicStream)
case dataReceived(stream: QuicStream, data: Data)
case closed(stream: QuicStream, status: QuicStatus, code: QuicErrorCode)
Expand Down Expand Up @@ -107,6 +110,12 @@ public final class MockQuicEventHandler: QuicEventHandler {
}
}

public func shutdownComplete(_ connection: QuicConnection) {
events.write { events in
events.append(.shutdownComplete(connection: connection))
}
}

public func streamStarted(_ connect: QuicConnection, stream: QuicStream) {
events.write { events in
events.append(.streamStarted(connection: connect, stream: stream))
Expand Down
2 changes: 1 addition & 1 deletion Networking/Sources/MsQuicSwift/QuicSettings.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ public typealias QuicSettings = QUIC_SETTINGS
extension QuicSettings {
public static let defaultSettings = {
var settings = QuicSettings()
settings.IdleTimeoutMs = 60000
settings.IdleTimeoutMs = 300_000 // 5 minutes
settings.IsSet.IdleTimeoutMs = 1
settings.ServerResumptionLevel = 2 // QUIC_SERVER_RESUME_AND_ZERORTT
settings.IsSet.ServerResumptionLevel = 1
Expand Down
2 changes: 1 addition & 1 deletion Networking/Sources/Networking/Connection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public final class Connection<Handler: StreamHandler>: Sendable, ConnectionInfoP
logger.debug("Invalid request length")
return
}
let length = UInt32(littleEndian: lengthData.withUnsafeBytes { $0.load(as: UInt32.self) })
let length = UInt32(littleEndian: lengthData.withUnsafeBytes { $0.loadUnaligned(as: UInt32.self) })
// sanity check for length
// TODO: pick better value
guard length < 1024 * 1024 * 10 else {
Expand Down
39 changes: 25 additions & 14 deletions Networking/Sources/Networking/Peer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ public final class Peer<Handler: StreamHandler>: Sendable {
)
}

public func listenAddress() throws -> NetAddr {
try listener.listenAddress()
}

public func connect(to address: NetAddr, mode: PeerMode) throws -> Connection<Handler> {
let conn = impl.connections.read { connections in
connections.byType[mode]?[address]
Expand All @@ -109,12 +113,14 @@ public final class Peer<Handler: StreamHandler>: Sendable {
if let curr {
return curr
}
let conn = try Connection(
QuicConnection(
handler: PeerEventHandler(self.impl),
registration: self.impl.clientConfiguration.registration,
configuration: self.impl.clientConfiguration
),
let quicConn = try QuicConnection(
handler: PeerEventHandler(self.impl),
registration: self.impl.clientConfiguration.registration,
configuration: self.impl.clientConfiguration
)
try quicConn.connect(to: address)
let conn = Connection(
quicConn,
impl: self.impl,
mode: mode,
remoteAddress: address,
Expand All @@ -126,7 +132,7 @@ public final class Peer<Handler: StreamHandler>: Sendable {
}
}

public func broadcast(kind: Handler.PresistentHandler.StreamKind, message: any MessageProtocol) {
public func broadcast(kind: Handler.PresistentHandler.StreamKind, message: Handler.PresistentHandler.Message) {
let connections = impl.connections.read { connections in
connections.byId.values
}
Expand Down Expand Up @@ -261,24 +267,28 @@ private struct PeerEventHandler<Handler: StreamHandler>: QuicEventHandler {
}

// TODO: implement a peer and test this
func shouldOpen(_: QuicConnection, certificate: Data?) -> QuicStatus {
func shouldOpen(_ connection: QuicConnection, certificate: Data?) -> QuicStatus {
// TODO: enable certificate validation logic once parsing logic is fixed
guard let certificate else {
return .code(.requiredCert)
}
do {
let (publicKey, alternativeName) = try parseCertificate(data: certificate, type: .x509)
logger.debug(
"Certificate parsed",
metadata: ["publicKey": "\(publicKey.toHexString())", "alternativeName": "\(alternativeName)"]
)
logger.trace("Certificate parsed", metadata: [
"connectionId": "\(connection.id)",
"publicKey": "\(publicKey.toHexString())",
"alternativeName": "\(alternativeName)",
])
if alternativeName != generateSubjectAlternativeName(pubkey: publicKey) {
return .code(.badCert)
}
if impl.mode == PeerMode.validator {
// TODO: verify if it is current or next validator
}
} catch {
logger.error("Failed to parse certificate", metadata: ["error": "\(error)"])
logger.warning("Failed to parse certificate", metadata: [
"connectionId": "\(connection.id)",
"error": "\(error)"])
return .code(.badCert)
}
return .code(.success)
Expand Down Expand Up @@ -309,7 +319,8 @@ private struct PeerEventHandler<Handler: StreamHandler>: QuicEventHandler {
}
}

func shutdownInitiated(_ connection: QuicConnection, reason _: ConnectionCloseReason) {
func shutdownComplete(_ connection: QuicConnection) {
logger.trace("connection shutdown complete", metadata: ["connectionId": "\(connection.id)"])
impl.connections.write { connections in
if let conn = connections.byId[connection.id] {
connections.byId.removeValue(forKey: connection.id)
Expand Down
11 changes: 10 additions & 1 deletion Node/Sources/Node/NetworkingProtocol/Network.swift
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,18 @@ public final class Network: Sendable {
try peer.connect(to: to, mode: mode)
}

public func broadcast(kind: UniquePresistentStreamKind, message: any MessageProtocol) {
public func send(to: NetAddr, message: CERequest) async throws -> Data {
let conn = try peer.connect(to: to, mode: .builder)
return try await conn.request(message)
}

public func broadcast(kind: UniquePresistentStreamKind, message: UPMessage) {
peer.broadcast(kind: kind, message: message)
}

public func listenAddress() throws -> NetAddr {
try peer.listenAddress()
}
}

struct HandlerDef: StreamHandler {
Expand Down
Loading

0 comments on commit 63c6167

Please sign in to comment.