From 63c61678265a987593db3144c448f62a609cdd04 Mon Sep 17 00:00:00 2001 From: Xiliang Chen Date: Wed, 23 Oct 2024 18:15:39 +1300 Subject: [PATCH] Multinode (#190) * minor fix * fix connection * broadcase safrole tickets * update logging * fix unaligned load * trace * fix tests --- .../RuntimeProtocols/RuntimeEvents.swift | 1 + .../Blockchain/Validator/SafroleService.swift | 1 + .../Blockchain/Validator/ServiceBase.swift | 32 ++----- .../ExtrinsicPoolServiceTests.swift | 24 ++++- .../xcshareddata/xcschemes/Boka.xcscheme | 4 + Boka/Package.resolved | 4 +- Boka/Sources/Boka.swift | 12 +-- Networking/Sources/MsQuicSwift/NetAddr.swift | 6 +- .../Sources/MsQuicSwift/QuicConnection.swift | 3 + .../MsQuicSwift/QuicEventHandler.swift | 9 ++ .../Sources/MsQuicSwift/QuicSettings.swift | 2 +- .../Sources/Networking/Connection.swift | 2 +- Networking/Sources/Networking/Peer.swift | 39 +++++--- .../Node/NetworkingProtocol/Network.swift | 11 ++- .../NetworkingProtocol/NetworkManager.swift | 90 ++++++++++++++++++- Node/Sources/Node/Node.swift | 14 +-- .../Utils/EventBus/EventSubscriptions.swift | 42 +++++++++ .../Utils/Extensions/Mutex+Utils.swift | 14 +++ .../xcshareddata/swiftpm/Package.resolved | 11 ++- scripts/devnet.sh | 2 +- 20 files changed, 254 insertions(+), 69 deletions(-) create mode 100644 Utils/Sources/Utils/EventBus/EventSubscriptions.swift create mode 100644 Utils/Sources/Utils/Extensions/Mutex+Utils.swift diff --git a/Blockchain/Sources/Blockchain/RuntimeProtocols/RuntimeEvents.swift b/Blockchain/Sources/Blockchain/RuntimeProtocols/RuntimeEvents.swift index b8994fd4..2d4719ae 100644 --- a/Blockchain/Sources/Blockchain/RuntimeProtocols/RuntimeEvents.swift +++ b/Blockchain/Sources/Blockchain/RuntimeProtocols/RuntimeEvents.swift @@ -13,6 +13,7 @@ public enum RuntimeEvents { // New safrole ticket generated from SafroleService public struct SafroleTicketsGenerated: Event { + public let epochIndex: EpochIndex public let items: [TicketItemAndOutput] public let publicKey: Bandersnatch.PublicKey } diff --git a/Blockchain/Sources/Blockchain/Validator/SafroleService.swift b/Blockchain/Sources/Blockchain/Validator/SafroleService.swift index 646c4d69..b6879b07 100644 --- a/Blockchain/Sources/Blockchain/Validator/SafroleService.swift +++ b/Blockchain/Sources/Blockchain/Validator/SafroleService.swift @@ -77,6 +77,7 @@ public final class SafroleService: ServiceBase, @unchecked Sendable { ) events.append(.init( + epochIndex: state.value.timeslot.timeslotToEpochIndex(config: config), items: tickets, publicKey: secret.publicKey )) diff --git a/Blockchain/Sources/Blockchain/Validator/ServiceBase.swift b/Blockchain/Sources/Blockchain/Validator/ServiceBase.swift index 3eb81503..4cb05795 100644 --- a/Blockchain/Sources/Blockchain/Validator/ServiceBase.swift +++ b/Blockchain/Sources/Blockchain/Validator/ServiceBase.swift @@ -5,43 +5,27 @@ public class ServiceBase { public let id: UniqueId let logger: Logger public let config: ProtocolConfigRef - private let eventBus: EventBus - private let subscriptionTokens: ThreadSafeContainer<[EventBus.SubscriptionToken]> = .init([]) + private let subscriptions: EventSubscriptions init(id: UniqueId, config: ProtocolConfigRef, eventBus: EventBus) { self.id = id logger = Logger(label: id) self.config = config - self.eventBus = eventBus - } - - deinit { - let eventBus = self.eventBus - let subscriptionTokens = self.subscriptionTokens - Task { - for token in subscriptionTokens.value { - await eventBus.unsubscribe(token: token) - } - } + subscriptions = EventSubscriptions(eventBus: eventBus) } @discardableResult - func subscribe(_ eventType: T.Type, id _: UniqueId, handler: @escaping @Sendable (T) async throws -> Void) async -> EventBus - .SubscriptionToken - { - let token = await eventBus.subscribe(eventType, handler: handler) - subscriptionTokens.write { $0.append(token) } - return token + func subscribe( + _ eventType: T.Type, id _: UniqueId, handler: @escaping @Sendable (T) async throws -> Void + ) async -> EventBus.SubscriptionToken { + await subscriptions.subscribe(eventType, id: id, handler: handler) } func unsubscribe(token: EventBus.SubscriptionToken) async { - subscriptionTokens.write { tokens in - tokens.removeAll { $0 == token } - } - await eventBus.unsubscribe(token: token) + await subscriptions.unsubscribe(token: token) } func publish(_ event: some Event) { - eventBus.publish(event) + subscriptions.publish(event) } } diff --git a/Blockchain/Tests/BlockchainTests/ExtrinsicPoolServiceTests.swift b/Blockchain/Tests/BlockchainTests/ExtrinsicPoolServiceTests.swift index 175ba3ce..fe05c18b 100644 --- a/Blockchain/Tests/BlockchainTests/ExtrinsicPoolServiceTests.swift +++ b/Blockchain/Tests/BlockchainTests/ExtrinsicPoolServiceTests.swift @@ -56,7 +56,11 @@ struct ExtrinsicPoolServiceTests { allTickets.append(contentsOf: tickets) - let event = RuntimeEvents.SafroleTicketsGenerated(items: tickets, publicKey: secretKey.publicKey) + let event = RuntimeEvents.SafroleTicketsGenerated( + epochIndex: state.value.timeslot.timeslotToEpochIndex(config: config), + items: tickets, + publicKey: secretKey.publicKey + ) await eventBus.publish(event) // Wait for the event to be processed @@ -126,7 +130,11 @@ struct ExtrinsicPoolServiceTests { idx: 0 ) - let addEvent = RuntimeEvents.SafroleTicketsGenerated(items: tickets, publicKey: secretKey.publicKey) + let addEvent = RuntimeEvents.SafroleTicketsGenerated( + epochIndex: state.value.timeslot.timeslotToEpochIndex(config: config), + items: tickets, + publicKey: secretKey.publicKey + ) await eventBus.publish(addEvent) // Wait for the event to be processed @@ -173,7 +181,11 @@ struct ExtrinsicPoolServiceTests { idx: 0 ) - let addEvent = RuntimeEvents.SafroleTicketsGenerated(items: oldTickets, publicKey: secretKey.publicKey) + let addEvent = RuntimeEvents.SafroleTicketsGenerated( + epochIndex: state.value.timeslot.timeslotToEpochIndex(config: config), + items: oldTickets, + publicKey: secretKey.publicKey + ) await eventBus.publish(addEvent) await storeMiddleware.wait() @@ -216,7 +228,11 @@ struct ExtrinsicPoolServiceTests { ) // Ensure new tickets are accepted - let newAddEvent = RuntimeEvents.SafroleTicketsGenerated(items: newTickets, publicKey: secretKey.publicKey) + let newAddEvent = RuntimeEvents.SafroleTicketsGenerated( + epochIndex: newState.value.timeslot.timeslotToEpochIndex(config: config), + items: newTickets, + publicKey: secretKey.publicKey + ) await eventBus.publish(newAddEvent) await storeMiddleware.wait() diff --git a/Boka/.swiftpm/xcode/xcshareddata/xcschemes/Boka.xcscheme b/Boka/.swiftpm/xcode/xcshareddata/xcschemes/Boka.xcscheme index 1dec0787..e3f4eaf8 100644 --- a/Boka/.swiftpm/xcode/xcshareddata/xcschemes/Boka.xcscheme +++ b/Boka/.swiftpm/xcode/xcshareddata/xcschemes/Boka.xcscheme @@ -67,6 +67,10 @@ argument = "--validator" isEnabled = "YES"> + + : ExpressibleByArgument { case disabled init?(argument: String) { - if argument.lowercased() == "false" { + if argument.lowercased() == "no" { self = .disabled } else { guard let argument = T(argument: argument) else { @@ -60,11 +60,11 @@ struct Boka: AsyncParsableCommand { @Option(name: .long, help: "A preset config or path to chain config file.") var chain: Genesis = .preset(.dev) - @Option(name: .long, help: "Listen address for RPC server. Pass 'false' to disable RPC server. Default to 127.0.0.1:9955.") + @Option(name: .long, help: "Listen address for RPC server. Pass 'no' to disable RPC server. Default to 127.0.0.1:9955.") var rpc: MaybeEnabled = .enabled(NetAddr(address: "127.0.0.1:9955")!) @Option(name: .long, help: "Listen address for P2P protocol.") - var p2p: NetAddr = .init(address: "127.0.0.1:19955")! + var p2p: NetAddr = .init(address: "127.0.0.1:0")! @Option(name: .long, help: "Specify peer P2P addresses.") var peers: [NetAddr] = [] @@ -114,12 +114,11 @@ struct Boka: AsyncParsableCommand { } let rpcConfig = rpc.asOptional.map { addr -> RPCConfig in - logger.info("RPC listen address: \(addr)") let (address, port) = addr.getAddressAndPort() return RPCConfig(listenAddress: address, port: Int(port)) } - let keystore = try await DevKeyStore() + let keystore = try await DevKeyStore(devKeysCount: devSeed == nil ? 12 : 0) let networkKey: Ed25519.SecretKey = try await { if let devSeed { @@ -130,6 +129,7 @@ struct Boka: AsyncParsableCommand { } }() + logger.info("Network key: \(networkKey.publicKey.data.toHexString())") let networkConfig = NetworkConfig( mode: validator ? .validator : .builder, listenAddress: p2p, @@ -144,7 +144,7 @@ struct Boka: AsyncParsableCommand { handlerMiddleware: .tracing(prefix: "Handler") ) - let config = Node.Config(rpc: rpcConfig, network: networkConfig) + let config = Node.Config(rpc: rpcConfig, network: networkConfig, peers: peers) let node: Node = if validator { try await ValidatorNode( diff --git a/Networking/Sources/MsQuicSwift/NetAddr.swift b/Networking/Sources/MsQuicSwift/NetAddr.swift index 245e201d..aabcbd6d 100644 --- a/Networking/Sources/MsQuicSwift/NetAddr.swift +++ b/Networking/Sources/MsQuicSwift/NetAddr.swift @@ -8,7 +8,7 @@ import msquic import Darwin #endif -public struct NetAddr: Sendable { +public struct NetAddr: Sendable, Equatable, Hashable { var quicAddr: QUIC_ADDR public init?(address: String) { @@ -38,17 +38,13 @@ public struct NetAddr: Sendable { let (host, port, _) = parseQuicAddr(quicAddr) ?? ("::dead:beef", 0, false) return (host, port) } -} -extension NetAddr: Equatable { public static func == (lhs: NetAddr, rhs: NetAddr) -> Bool { var addr1 = lhs.quicAddr var addr2 = rhs.quicAddr return QuicAddrCompare(&addr1, &addr2) == 1 } -} -extension NetAddr: Hashable { public func hash(into hasher: inout Hasher) { var addr = quicAddr let hash = QuicAddrHash(&addr) diff --git a/Networking/Sources/MsQuicSwift/QuicConnection.swift b/Networking/Sources/MsQuicSwift/QuicConnection.swift index 538ba56d..75956d7e 100644 --- a/Networking/Sources/MsQuicSwift/QuicConnection.swift +++ b/Networking/Sources/MsQuicSwift/QuicConnection.swift @@ -250,6 +250,9 @@ private class ConnectionHandle { case QUIC_CONNECTION_EVENT_SHUTDOWN_COMPLETE: logger.trace("Shutdown complete") + if let connection { + connection.handler.shutdownComplete(connection) + } if event.pointee.SHUTDOWN_COMPLETE.AppCloseInProgress == 0 { // avoid closing twice api.call { api in diff --git a/Networking/Sources/MsQuicSwift/QuicEventHandler.swift b/Networking/Sources/MsQuicSwift/QuicEventHandler.swift index 26395d07..3c9e2456 100644 --- a/Networking/Sources/MsQuicSwift/QuicEventHandler.swift +++ b/Networking/Sources/MsQuicSwift/QuicEventHandler.swift @@ -33,6 +33,7 @@ public protocol QuicEventHandler: Sendable { func shouldOpen(_ connection: QuicConnection, certificate: Data?) -> QuicStatus func connected(_ connection: QuicConnection) func shutdownInitiated(_ connection: QuicConnection, reason: ConnectionCloseReason) + func shutdownComplete(_ connection: QuicConnection) func streamStarted(_ connect: QuicConnection, stream: QuicStream) // stream events @@ -55,6 +56,7 @@ extension QuicEventHandler { public func connected(_: QuicConnection) {} public func shutdownInitiated(_: QuicConnection, reason _: ConnectionCloseReason) {} + public func shutdownComplete(_: QuicConnection) {} public func streamStarted(_: QuicConnection, stream _: QuicStream) {} @@ -69,6 +71,7 @@ public final class MockQuicEventHandler: QuicEventHandler { case shouldOpen(connection: QuicConnection, certificate: Data?) case connected(connection: QuicConnection) case shutdownInitiated(connection: QuicConnection, reason: ConnectionCloseReason) + case shutdownComplete(connection: QuicConnection) case streamStarted(connection: QuicConnection, stream: QuicStream) case dataReceived(stream: QuicStream, data: Data) case closed(stream: QuicStream, status: QuicStatus, code: QuicErrorCode) @@ -107,6 +110,12 @@ public final class MockQuicEventHandler: QuicEventHandler { } } + public func shutdownComplete(_ connection: QuicConnection) { + events.write { events in + events.append(.shutdownComplete(connection: connection)) + } + } + public func streamStarted(_ connect: QuicConnection, stream: QuicStream) { events.write { events in events.append(.streamStarted(connection: connect, stream: stream)) diff --git a/Networking/Sources/MsQuicSwift/QuicSettings.swift b/Networking/Sources/MsQuicSwift/QuicSettings.swift index 973c599a..6529a46c 100644 --- a/Networking/Sources/MsQuicSwift/QuicSettings.swift +++ b/Networking/Sources/MsQuicSwift/QuicSettings.swift @@ -5,7 +5,7 @@ public typealias QuicSettings = QUIC_SETTINGS extension QuicSettings { public static let defaultSettings = { var settings = QuicSettings() - settings.IdleTimeoutMs = 60000 + settings.IdleTimeoutMs = 300_000 // 5 minutes settings.IsSet.IdleTimeoutMs = 1 settings.ServerResumptionLevel = 2 // QUIC_SERVER_RESUME_AND_ZERORTT settings.IsSet.ServerResumptionLevel = 1 diff --git a/Networking/Sources/Networking/Connection.swift b/Networking/Sources/Networking/Connection.swift index a236610f..7e28ab9b 100644 --- a/Networking/Sources/Networking/Connection.swift +++ b/Networking/Sources/Networking/Connection.swift @@ -126,7 +126,7 @@ public final class Connection: Sendable, ConnectionInfoP logger.debug("Invalid request length") return } - let length = UInt32(littleEndian: lengthData.withUnsafeBytes { $0.load(as: UInt32.self) }) + let length = UInt32(littleEndian: lengthData.withUnsafeBytes { $0.loadUnaligned(as: UInt32.self) }) // sanity check for length // TODO: pick better value guard length < 1024 * 1024 * 10 else { diff --git a/Networking/Sources/Networking/Peer.swift b/Networking/Sources/Networking/Peer.swift index 43e10752..61513ad1 100644 --- a/Networking/Sources/Networking/Peer.swift +++ b/Networking/Sources/Networking/Peer.swift @@ -100,6 +100,10 @@ public final class Peer: Sendable { ) } + public func listenAddress() throws -> NetAddr { + try listener.listenAddress() + } + public func connect(to address: NetAddr, mode: PeerMode) throws -> Connection { let conn = impl.connections.read { connections in connections.byType[mode]?[address] @@ -109,12 +113,14 @@ public final class Peer: Sendable { if let curr { return curr } - let conn = try Connection( - QuicConnection( - handler: PeerEventHandler(self.impl), - registration: self.impl.clientConfiguration.registration, - configuration: self.impl.clientConfiguration - ), + let quicConn = try QuicConnection( + handler: PeerEventHandler(self.impl), + registration: self.impl.clientConfiguration.registration, + configuration: self.impl.clientConfiguration + ) + try quicConn.connect(to: address) + let conn = Connection( + quicConn, impl: self.impl, mode: mode, remoteAddress: address, @@ -126,7 +132,7 @@ public final class Peer: Sendable { } } - public func broadcast(kind: Handler.PresistentHandler.StreamKind, message: any MessageProtocol) { + public func broadcast(kind: Handler.PresistentHandler.StreamKind, message: Handler.PresistentHandler.Message) { let connections = impl.connections.read { connections in connections.byId.values } @@ -261,16 +267,18 @@ private struct PeerEventHandler: QuicEventHandler { } // TODO: implement a peer and test this - func shouldOpen(_: QuicConnection, certificate: Data?) -> QuicStatus { + func shouldOpen(_ connection: QuicConnection, certificate: Data?) -> QuicStatus { + // TODO: enable certificate validation logic once parsing logic is fixed guard let certificate else { return .code(.requiredCert) } do { let (publicKey, alternativeName) = try parseCertificate(data: certificate, type: .x509) - logger.debug( - "Certificate parsed", - metadata: ["publicKey": "\(publicKey.toHexString())", "alternativeName": "\(alternativeName)"] - ) + logger.trace("Certificate parsed", metadata: [ + "connectionId": "\(connection.id)", + "publicKey": "\(publicKey.toHexString())", + "alternativeName": "\(alternativeName)", + ]) if alternativeName != generateSubjectAlternativeName(pubkey: publicKey) { return .code(.badCert) } @@ -278,7 +286,9 @@ private struct PeerEventHandler: QuicEventHandler { // TODO: verify if it is current or next validator } } catch { - logger.error("Failed to parse certificate", metadata: ["error": "\(error)"]) + logger.warning("Failed to parse certificate", metadata: [ + "connectionId": "\(connection.id)", + "error": "\(error)"]) return .code(.badCert) } return .code(.success) @@ -309,7 +319,8 @@ private struct PeerEventHandler: QuicEventHandler { } } - func shutdownInitiated(_ connection: QuicConnection, reason _: ConnectionCloseReason) { + func shutdownComplete(_ connection: QuicConnection) { + logger.trace("connection shutdown complete", metadata: ["connectionId": "\(connection.id)"]) impl.connections.write { connections in if let conn = connections.byId[connection.id] { connections.byId.removeValue(forKey: connection.id) diff --git a/Node/Sources/Node/NetworkingProtocol/Network.swift b/Node/Sources/Node/NetworkingProtocol/Network.swift index 69d90340..6a94abb2 100644 --- a/Node/Sources/Node/NetworkingProtocol/Network.swift +++ b/Node/Sources/Node/NetworkingProtocol/Network.swift @@ -65,9 +65,18 @@ public final class Network: Sendable { try peer.connect(to: to, mode: mode) } - public func broadcast(kind: UniquePresistentStreamKind, message: any MessageProtocol) { + public func send(to: NetAddr, message: CERequest) async throws -> Data { + let conn = try peer.connect(to: to, mode: .builder) + return try await conn.request(message) + } + + public func broadcast(kind: UniquePresistentStreamKind, message: UPMessage) { peer.broadcast(kind: kind, message: message) } + + public func listenAddress() throws -> NetAddr { + try peer.listenAddress() + } } struct HandlerDef: StreamHandler { diff --git a/Node/Sources/Node/NetworkingProtocol/NetworkManager.swift b/Node/Sources/Node/NetworkingProtocol/NetworkManager.swift index 7dcd245d..dc8cc320 100644 --- a/Node/Sources/Node/NetworkingProtocol/NetworkManager.swift +++ b/Node/Sources/Node/NetworkingProtocol/NetworkManager.swift @@ -1,17 +1,25 @@ import Blockchain +import Codec import Foundation import TracingUtils import Utils private let logger = Logger(label: "NetworkManager") +enum SendTarget { + case safroleStep1Validator + case currentValidators +} + public final class NetworkManager: Sendable { private let network: Network + private let blockchain: Blockchain + private let subscriptions: EventSubscriptions // This is for development only - // Will assume those peers are also validators - private let devPeers: Set = [] + // Those peers will receive all the messages regardless the target + private let devPeers: Set - public init(config: Network.Config, blockchain: Blockchain) throws { + public init(config: Network.Config, blockchain: Blockchain, eventBus: EventBus, devPeers: Set) throws { let handler = HandlerImpl(blockchain: blockchain) network = try Network( config: config, @@ -19,6 +27,82 @@ public final class NetworkManager: Sendable { genesisHeader: blockchain.dataProvider.genesisBlockHash, handler: handler ) + self.blockchain = blockchain + subscriptions = EventSubscriptions(eventBus: eventBus) + self.devPeers = devPeers + + for peer in devPeers { + _ = try network.connect(to: peer, mode: .validator) + } + + logger.info("P2P Listening on \(try! network.listenAddress())") + + Task { + await subscriptions.subscribe( + RuntimeEvents.SafroleTicketsGenerated.self, + id: "NetworkManager.SafroleTicketsGenerated" + ) { [weak self] event in + await self?.on(safroleTicketsGenerated: event) + } + } + } + + private func getSendTarget(target: SendTarget) -> Set { + // TODO: get target from onchain state + switch target { + case .safroleStep1Validator: + // TODO: only send to the selected validator in the spec + devPeers + case .currentValidators: + // TODO: read onchain state for validators + devPeers + } + } + + private func send( + message: CERequest, + target: SendTarget, + responseType: R.Type, + responseHandler: @Sendable @escaping (Result) async -> Void + ) async { + let targets = getSendTarget(target: target) + for target in targets { + Task { + let res = await Result { + try await network.send(to: target, message: message) + } + .flatMap { data in + Result(catching: { + try JamDecoder.decode(responseType, from: data, withConfig: blockchain.config) + }) + } + await responseHandler(res) + } + } + } + + private func send(message: CERequest, target: SendTarget) async { + let targets = getSendTarget(target: target) + for target in targets { + Task { + // not expecting a response + // TODO: handle errors and ensure no data is returned + _ = try await network.send(to: target, message: message) + } + } + } + + private func on(safroleTicketsGenerated event: RuntimeEvents.SafroleTicketsGenerated) async { + for ticket in event.items { + await send( + message: .safroleTicket1(.init( + epochIndex: event.epochIndex, + attempt: ticket.ticket.attempt, + proof: ticket.ticket.signature + )), + target: .safroleStep1Validator + ) + } } } diff --git a/Node/Sources/Node/Node.swift b/Node/Sources/Node/Node.swift index 56cca98d..807bcad0 100644 --- a/Node/Sources/Node/Node.swift +++ b/Node/Sources/Node/Node.swift @@ -11,12 +11,14 @@ public typealias NetworkConfig = Network.Config public class Node { public struct Config { - public var rpc: Server.Config? - public var network: Network.Config + public var rpc: RPCConfig? + public var network: NetworkConfig + public var peers: [NetAddr] - public init(rpc: Server.Config?, network: Network.Config) { + public init(rpc: RPCConfig?, network: NetworkConfig, peers: [NetAddr] = []) { self.rpc = rpc self.network = network + self.peers = peers } } @@ -33,8 +35,6 @@ public class Node { eventBus: EventBus, keystore: KeyStore ) async throws { - logger.debug("Initializing node") - let (genesisState, genesisBlock, protocolConfig) = try await genesis.load() dataProvider = try await BlockchainDataProvider(InMemoryDataProvider(genesisState: genesisState, genesisBlock: genesisBlock)) timeProvider = SystemTimeProvider() @@ -50,7 +50,9 @@ public class Node { network = try NetworkManager( config: config.network, - blockchain: blockchain + blockchain: blockchain, + eventBus: eventBus, + devPeers: Set(config.peers) ) rpcServer = try config.rpc.map { diff --git a/Utils/Sources/Utils/EventBus/EventSubscriptions.swift b/Utils/Sources/Utils/EventBus/EventSubscriptions.swift new file mode 100644 index 00000000..351af076 --- /dev/null +++ b/Utils/Sources/Utils/EventBus/EventSubscriptions.swift @@ -0,0 +1,42 @@ +import Synchronization + +public struct EventSubscriptions: ~Copyable, Sendable { + private let eventBus: EventBus + private let subscriptionTokens: Mutex<[EventBus.SubscriptionToken]> = .init([]) + + public init(eventBus: EventBus) { + self.eventBus = eventBus + } + + deinit { + let eventBus = self.eventBus + let tokens = subscriptionTokens.value + Task { + for token in tokens { + await eventBus.unsubscribe(token: token) + } + } + } + + @discardableResult + public func subscribe( + _ eventType: T.Type, + id _: UniqueId, + handler: @escaping @Sendable (T) async throws -> Void + ) async -> EventBus.SubscriptionToken { + let token = await eventBus.subscribe(eventType, handler: handler) + subscriptionTokens.withLock { $0.append(token) } + return token + } + + public func unsubscribe(token: EventBus.SubscriptionToken) async { + subscriptionTokens.withLock { tokens in + tokens.removeAll { $0 == token } + } + await eventBus.unsubscribe(token: token) + } + + public func publish(_ event: some Event) { + eventBus.publish(event) + } +} diff --git a/Utils/Sources/Utils/Extensions/Mutex+Utils.swift b/Utils/Sources/Utils/Extensions/Mutex+Utils.swift new file mode 100644 index 00000000..18c2bbd5 --- /dev/null +++ b/Utils/Sources/Utils/Extensions/Mutex+Utils.swift @@ -0,0 +1,14 @@ +import Synchronization + +extension Mutex where Value: Sendable { + public var value: Value { + get { + withLock { $0 } + } + set { + withLock { + $0 = newValue + } + } + } +} diff --git a/boka.xcodeproj/project.xcworkspace/xcshareddata/swiftpm/Package.resolved b/boka.xcodeproj/project.xcworkspace/xcshareddata/swiftpm/Package.resolved index be679657..4463c998 100644 --- a/boka.xcodeproj/project.xcworkspace/xcshareddata/swiftpm/Package.resolved +++ b/boka.xcodeproj/project.xcworkspace/xcshareddata/swiftpm/Package.resolved @@ -1,5 +1,5 @@ { - "originHash" : "c109234482ca746f1aec9c865330521dcffc1ddb2a6ca3220ec46803cb7f10b0", + "originHash" : "2856b6aedabfe15f31e49c84318b9d7a155b1858de6e4c24b935eed8adb6cc4c", "pins" : [ { "identity" : "async-channels", @@ -82,6 +82,15 @@ "version" : "1.2.0" } }, + { + "identity" : "swift-argument-parser", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-argument-parser.git", + "state" : { + "revision" : "41982a3656a71c768319979febd796c6fd111d5c", + "version" : "1.5.0" + } + }, { "identity" : "swift-asn1", "kind" : "remoteSourceControl", diff --git a/scripts/devnet.sh b/scripts/devnet.sh index 9cc3382d..8861077c 100755 --- a/scripts/devnet.sh +++ b/scripts/devnet.sh @@ -11,7 +11,7 @@ create_node() { local port=$((9000 + node_number)) local p2p_port=$((19000 + node_number)) - tmux send-keys -t boka "$bin_path --chain=minimal --rpc 127.0.0.1:$port --validator --dev-seed $node_number --p2p 127.0.0.1:$p2p_port --peers=127.0.0.1:19001 --peers=127.0.0.1:19002 --peers=127.0.0.1:19003" C-m + tmux send-keys -t boka "LOG_LEVEL=trace $bin_path --chain=minimal --rpc 127.0.0.1:$port --validator --dev-seed $node_number --p2p 127.0.0.1:$p2p_port --peers=127.0.0.1:19001 --peers=127.0.0.1:19002 --peers=127.0.0.1:19003 --name=node-$node_number" C-m } # Start a new tmux session