diff --git a/Blockchain/Sources/Blockchain/Blockchain.swift b/Blockchain/Sources/Blockchain/Blockchain.swift index b7b64e48..a83cb5e5 100644 --- a/Blockchain/Sources/Blockchain/Blockchain.swift +++ b/Blockchain/Sources/Blockchain/Blockchain.swift @@ -54,4 +54,8 @@ public final class Blockchain: ServiceBase, @unchecked Sendable { publish(RuntimeEvents.BlockFinalized(hash: hash)) } + + public func publish(event: some Event) { + publish(event) + } } diff --git a/Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProvider.swift b/Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProvider.swift index dd0cc200..ee0a9cb2 100644 --- a/Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProvider.swift +++ b/Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProvider.swift @@ -17,9 +17,7 @@ public final class BlockchainDataProvider: Sendable { let heads = try await dataProvider.getHeads() var bestHead: (HeaderRef, Data32)? for head in heads { - guard let header = try? await dataProvider.getHeader(hash: head) else { - continue - } + let header = try await dataProvider.getHeader(hash: head) if bestHead == nil || header.value.timeslot > bestHead!.0.value.timeslot { bestHead = (header, head) } @@ -27,7 +25,7 @@ public final class BlockchainDataProvider: Sendable { let finalizedHead = try await dataProvider.getFinalizedHead() storage = ThreadSafeContainer(.init( - bestHead: bestHead?.1 ?? Data32(), + bestHead: bestHead?.1 ?? dataProvider.genesisBlockHash, bestHeadTimeslot: bestHead?.0.value.timeslot ?? 0, finalizedHead: finalizedHead )) @@ -129,4 +127,8 @@ extension BlockchainDataProvider { try await dataProvider.remove(hash: hash) } + + public var genesisBlockHash: Data32 { + dataProvider.genesisBlockHash + } } diff --git a/Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProviderProtocol.swift b/Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProviderProtocol.swift index c5c207d9..012373a2 100644 --- a/Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProviderProtocol.swift +++ b/Blockchain/Sources/Blockchain/BlockchainDataProvider/BlockchainDataProviderProtocol.swift @@ -34,4 +34,6 @@ public protocol BlockchainDataProviderProtocol: Sendable { /// remove header, block and state func remove(hash: Data32) async throws + + var genesisBlockHash: Data32 { get } } diff --git a/Blockchain/Sources/Blockchain/BlockchainDataProvider/InMemoryDataProvider.swift b/Blockchain/Sources/Blockchain/BlockchainDataProvider/InMemoryDataProvider.swift index 858f5d55..d075be2b 100644 --- a/Blockchain/Sources/Blockchain/BlockchainDataProvider/InMemoryDataProvider.swift +++ b/Blockchain/Sources/Blockchain/BlockchainDataProvider/InMemoryDataProvider.swift @@ -7,12 +7,15 @@ public actor InMemoryDataProvider: Sendable { private var blockByHash: [Data32: BlockRef] = [:] private var stateByBlockHash: [Data32: StateRef] = [:] private var hashByTimeslot: [TimeslotIndex: Set] = [:] + public let genesisBlockHash: Data32 - public init(genesis: StateRef) async { - heads = [Data32()] - finalizedHead = Data32() + public init(genesisState: StateRef, genesisBlock: BlockRef) async { + genesisBlockHash = genesisBlock.hash + heads = [genesisBlockHash] + finalizedHead = genesisBlockHash - add(state: genesis) + add(block: genesisBlock) + add(state: genesisState) } } @@ -80,8 +83,7 @@ extension InMemoryDataProvider: BlockchainDataProviderProtocol { // parent needs to be either // - existing head // - known block - // - genesis / all zeros - guard heads.remove(parent) != nil || hasBlock(hash: parent) || parent == Data32() else { + guard heads.remove(parent) != nil || hasBlock(hash: parent) else { throw BlockchainDataProviderError.noData(hash: parent) } heads.insert(hash) diff --git a/Blockchain/Sources/Blockchain/RuntimeProtocols/RuntimeEvents.swift b/Blockchain/Sources/Blockchain/RuntimeProtocols/RuntimeEvents.swift index aaa58940..b8994fd4 100644 --- a/Blockchain/Sources/Blockchain/RuntimeProtocols/RuntimeEvents.swift +++ b/Blockchain/Sources/Blockchain/RuntimeProtocols/RuntimeEvents.swift @@ -20,6 +20,10 @@ public enum RuntimeEvents { // New safrole ticket received from network public struct SafroleTicketsReceived: Event { public let items: [ExtrinsicTickets.TicketItem] + + public init(items: [ExtrinsicTickets.TicketItem]) { + self.items = items + } } // New block authored by BlockAuthor service diff --git a/Blockchain/Sources/Blockchain/Types/RecentHistory.swift b/Blockchain/Sources/Blockchain/Types/RecentHistory.swift index c8c12936..b5848f84 100644 --- a/Blockchain/Sources/Blockchain/Types/RecentHistory.swift +++ b/Blockchain/Sources/Blockchain/Types/RecentHistory.swift @@ -34,7 +34,15 @@ public struct RecentHistory: Sendable, Equatable, Codable { extension RecentHistory: Dummy { public typealias Config = ProtocolConfigRef public static func dummy(config: Config) -> RecentHistory { - RecentHistory(items: try! ConfigLimitedSizeArray(config: config)) + RecentHistory(items: try! ConfigLimitedSizeArray( + config: config, + array: [HistoryItem( + headerHash: Data32(), + mmr: MMR([]), + stateRoot: Data32(), + workReportHashes: ConfigLimitedSizeArray(config: config) + )] + )) } } diff --git a/Blockchain/Sources/Blockchain/Types/State+Genesis.swift b/Blockchain/Sources/Blockchain/Types/State+Genesis.swift index ccaeebb7..2b60149b 100644 --- a/Blockchain/Sources/Blockchain/Types/State+Genesis.swift +++ b/Blockchain/Sources/Blockchain/Types/State+Genesis.swift @@ -1,7 +1,7 @@ import Utils extension State { - public static func devGenesis(config: ProtocolConfigRef) throws -> State { + public static func devGenesis(config: ProtocolConfigRef) throws -> (StateRef, BlockRef) { var devKeys = [ValidatorKey]() var state = State.dummy(config: config) @@ -32,7 +32,15 @@ extension State { ) state.safroleState.ticketsVerifier = commitment.data - return state + let block = BlockRef(Block.dummy(config: config)) + try state.recentHistory.items.append(RecentHistory.HistoryItem( + headerHash: block.hash, + mmr: MMR([]), + stateRoot: Data32(), + workReportHashes: ConfigLimitedSizeArray(config: config) + )) + + return (StateRef(state), block) } // TODO: add file genesis // public static func fileGenesis(config: ProtocolConfigRef) throws -> State diff --git a/Blockchain/Sources/Blockchain/Types/State.swift b/Blockchain/Sources/Blockchain/Types/State.swift index 6d1f443e..e467bea4 100644 --- a/Blockchain/Sources/Blockchain/Types/State.swift +++ b/Blockchain/Sources/Blockchain/Types/State.swift @@ -124,7 +124,7 @@ public struct State: Sendable, Equatable, Codable { extension State { public var lastBlockHash: Data32 { - recentHistory.items.last.map(\.headerHash) ?? Data32() + recentHistory.items.last.map(\.headerHash)! } } diff --git a/Blockchain/Tests/BlockchainTests/BlockAuthorTests.swift b/Blockchain/Tests/BlockchainTests/BlockAuthorTests.swift index 7861814b..56357026 100644 --- a/Blockchain/Tests/BlockchainTests/BlockAuthorTests.swift +++ b/Blockchain/Tests/BlockchainTests/BlockAuthorTests.swift @@ -22,7 +22,8 @@ struct BlockAuthorTests { config = ProtocolConfigRef.dev timeProvider = MockTimeProvider(time: 988) - dataProvider = try await BlockchainDataProvider(InMemoryDataProvider(genesis: StateRef(State.devGenesis(config: config)))) + let (genesisState, genesisBlock) = try State.devGenesis(config: config) + dataProvider = try await BlockchainDataProvider(InMemoryDataProvider(genesisState: genesisState, genesisBlock: genesisBlock)) storeMiddleware = StoreMiddleware() eventBus = EventBus(eventMiddleware: Middleware(storeMiddleware)) @@ -45,7 +46,7 @@ struct BlockAuthorTests { @Test func createNewBlockWithFallbackKey() async throws { - let genesisState = try await dataProvider.getState(hash: Data32()) + let genesisState = try await dataProvider.getState(hash: dataProvider.genesisBlockHash) let timeslot = timeProvider.getTime().timeToTimeslot(config: config) @@ -62,7 +63,7 @@ struct BlockAuthorTests { @Test func createNewBlockWithTicket() async throws { - let genesisState = try await dataProvider.getState(hash: Data32()) + let genesisState = try await dataProvider.getState(hash: dataProvider.genesisBlockHash) var state = genesisState.value state.safroleState.ticketsVerifier = try Bandersnatch.RingCommitment( @@ -108,7 +109,7 @@ struct BlockAuthorTests { @Test func scheduleNewBlocks() async throws { - let genesisState = try await dataProvider.getState(hash: Data32()) + let genesisState = try await dataProvider.getState(hash: dataProvider.genesisBlockHash) await blockAuthor.on(genesis: genesisState) diff --git a/Blockchain/Tests/BlockchainTests/DispatchQueueSchedulerTests.swift b/Blockchain/Tests/BlockchainTests/DispatchQueueSchedulerTests.swift index 1084be1b..97c2e60c 100644 --- a/Blockchain/Tests/BlockchainTests/DispatchQueueSchedulerTests.swift +++ b/Blockchain/Tests/BlockchainTests/DispatchQueueSchedulerTests.swift @@ -38,23 +38,23 @@ struct DispatchQueueSchedulerTests { let diff = try #require(end.value).timeIntervalSince(now) - delay let diffAbs = abs(diff) - #expect(diffAbs < 0.5) + #expect(diffAbs < 1) } } @Test func scheduleRepeatingTask() async throws { - try await confirmation(expectedCount: 3) { confirm in - let delay = 0.5 + try await confirmation(expectedCount: 2) { confirm in + let delay = 1.5 let now = Date() let executionTimes = ThreadSafeContainer<[Date]>([]) - let expectedExecutions = 3 + let expectedExecutions = 2 let cancel = scheduler.schedule(delay: delay, repeats: true) { executionTimes.value.append(Date()) confirm() } - try await Task.sleep(for: .seconds(1.6)) + try await Task.sleep(for: .seconds(3.1)) _ = cancel @@ -64,7 +64,7 @@ struct DispatchQueueSchedulerTests { let expectedInterval = delay * Double(index + 1) let actualInterval = time.timeIntervalSince(now) let difference = abs(actualInterval - expectedInterval) - #expect(difference < 0.5) + #expect(difference < 1) } } } @@ -83,13 +83,13 @@ struct DispatchQueueSchedulerTests { @Test func cancelRepeatingTask() async throws { try await confirmation(expectedCount: 2) { confirm in - let delay = 0.5 + let delay = 1.0 let cancel = scheduler.schedule(delay: delay, repeats: true) { confirm() } - try await Task.sleep(for: .seconds(1.2)) + try await Task.sleep(for: .seconds(2.2)) cancel.cancel() diff --git a/Blockchain/Tests/BlockchainTests/ExtrinsicPoolServiceTests.swift b/Blockchain/Tests/BlockchainTests/ExtrinsicPoolServiceTests.swift index b3d7ae9b..242d9b28 100644 --- a/Blockchain/Tests/BlockchainTests/ExtrinsicPoolServiceTests.swift +++ b/Blockchain/Tests/BlockchainTests/ExtrinsicPoolServiceTests.swift @@ -21,7 +21,8 @@ struct ExtrinsicPoolServiceTests { } timeProvider = MockTimeProvider(time: 1000) - dataProvider = try await BlockchainDataProvider(InMemoryDataProvider(genesis: StateRef(State.devGenesis(config: config)))) + let (genesisState, genesisBlock) = try State.devGenesis(config: config) + dataProvider = try await BlockchainDataProvider(InMemoryDataProvider(genesisState: genesisState, genesisBlock: genesisBlock)) storeMiddleware = StoreMiddleware() eventBus = EventBus(eventMiddleware: Middleware(storeMiddleware)) @@ -186,6 +187,7 @@ struct ExtrinsicPoolServiceTests { let newBlock = BlockRef.dummy(config: config).mutate { $0.header.unsigned.timeslot = nextTimeslot + $0.header.unsigned.parentHash = dataProvider.bestHead } let oldEntropyPool = state.value.entropyPool diff --git a/Blockchain/Tests/BlockchainTests/InMemoryDataProviderTests.swift b/Blockchain/Tests/BlockchainTests/InMemoryDataProviderTests.swift index 9012b40d..46bf59b0 100644 --- a/Blockchain/Tests/BlockchainTests/InMemoryDataProviderTests.swift +++ b/Blockchain/Tests/BlockchainTests/InMemoryDataProviderTests.swift @@ -7,18 +7,18 @@ struct InMemoryDataProviderTests { let config = ProtocolConfigRef.mainnet @Test func testInitialization() async throws { - let genesis = StateRef(State.dummy(config: config)) - let provider = await InMemoryDataProvider(genesis: genesis) + let (genesis, block) = try State.devGenesis(config: config) + let provider = await InMemoryDataProvider(genesisState: genesis, genesisBlock: block) - #expect(await (provider.getHeads()) == [Data32()]) - #expect(await (provider.getFinalizedHead()) == Data32()) + #expect(await (provider.getHeads()) == [block.hash]) + #expect(await (provider.getFinalizedHead()) == block.hash) } @Test func testAddAndRetrieveBlock() async throws { - let genesis = StateRef(State.dummy(config: config)) - let provider = await InMemoryDataProvider(genesis: genesis) + let (genesis, block) = try State.devGenesis(config: config) + + let provider = await InMemoryDataProvider(genesisState: genesis, genesisBlock: block) - let block = BlockRef(Block.dummy(config: config)) await provider.add(block: block) #expect(await (provider.hasBlock(hash: block.hash)) == true) @@ -29,8 +29,9 @@ struct InMemoryDataProviderTests { } @Test func testAddAndRetrieveState() async throws { - let genesis = StateRef(State.dummy(config: config)) - let provider = await InMemoryDataProvider(genesis: genesis) + let (genesis, block) = try State.devGenesis(config: config) + + let provider = await InMemoryDataProvider(genesisState: genesis, genesisBlock: block) let state = StateRef(State.dummy(config: config)) await provider.add(state: state) @@ -40,16 +41,18 @@ struct InMemoryDataProviderTests { } @Test func testUpdateHead() async throws { - let genesis = StateRef(State.dummy(config: config)) - let provider = await InMemoryDataProvider(genesis: genesis) + let (genesis, block) = try State.devGenesis(config: config) + let provider = await InMemoryDataProvider(genesisState: genesis, genesisBlock: block) - let newBlock = BlockRef(Block.dummy(config: config)) + let newBlock = BlockRef(Block.dummy(config: config)).mutate { + $0.header.unsigned.timeslot = 123 + } await provider.add(block: newBlock) - try await provider.updateHead(hash: newBlock.hash, parent: Data32()) + try await provider.updateHead(hash: newBlock.hash, parent: block.hash) #expect(await provider.isHead(hash: newBlock.hash) == true) - #expect(await provider.isHead(hash: Data32()) == false) + #expect(await provider.isHead(hash: block.hash) == false) let hash = Data32.random() await #expect(throws: BlockchainDataProviderError.noData(hash: hash)) { @@ -58,10 +61,10 @@ struct InMemoryDataProviderTests { } @Test func testSetFinalizedHead() async throws { - let genesis = StateRef(State.dummy(config: config)) - let provider = await InMemoryDataProvider(genesis: genesis) + let (genesis, block) = try State.devGenesis(config: config) + + let provider = await InMemoryDataProvider(genesisState: genesis, genesisBlock: block) - let block = BlockRef(Block.dummy(config: config)) await provider.add(block: block) await provider.setFinalizedHead(hash: block.hash) @@ -69,8 +72,9 @@ struct InMemoryDataProviderTests { } @Test func testRemoveHash() async throws { - let genesis = StateRef(State.dummy(config: config)) - let provider = await InMemoryDataProvider(genesis: genesis) + let (genesis, block) = try State.devGenesis(config: config) + + let provider = await InMemoryDataProvider(genesisState: genesis, genesisBlock: block) let state = StateRef(State.dummy(config: ProtocolConfigRef.dev)) let timeslotIndex = state.value.timeslot diff --git a/Blockchain/Tests/BlockchainTests/SafroleServiceTests.swift b/Blockchain/Tests/BlockchainTests/SafroleServiceTests.swift index f0c4e71c..33b38933 100644 --- a/Blockchain/Tests/BlockchainTests/SafroleServiceTests.swift +++ b/Blockchain/Tests/BlockchainTests/SafroleServiceTests.swift @@ -21,7 +21,7 @@ struct SafroleServiceTests { } timeProvider = MockTimeProvider(time: 1000) - genesisState = try StateRef(State.devGenesis(config: config)) + (genesisState, _) = try State.devGenesis(config: config) storeMiddleware = StoreMiddleware() eventBus = EventBus(eventMiddleware: Middleware(storeMiddleware)) diff --git a/Blockchain/Tests/BlockchainTests/ValidatorServiceTests.swift b/Blockchain/Tests/BlockchainTests/ValidatorServiceTests.swift index a918a604..87bee198 100644 --- a/Blockchain/Tests/BlockchainTests/ValidatorServiceTests.swift +++ b/Blockchain/Tests/BlockchainTests/ValidatorServiceTests.swift @@ -21,7 +21,8 @@ struct ValidatorServiceTests { config = ProtocolConfigRef.dev timeProvider = MockTimeProvider(time: 988) - dataProvider = try await BlockchainDataProvider(InMemoryDataProvider(genesis: StateRef(State.devGenesis(config: config)))) + let (genesisState, genesisBlock) = try State.devGenesis(config: config) + dataProvider = try await BlockchainDataProvider(InMemoryDataProvider(genesisState: genesisState, genesisBlock: genesisBlock)) storeMiddleware = StoreMiddleware() eventBus = EventBus(eventMiddleware: Middleware(storeMiddleware)) @@ -48,7 +49,7 @@ struct ValidatorServiceTests { @Test func onGenesis() async throws { - let genesisState = try await dataProvider.getState(hash: Data32()) + let genesisState = try await dataProvider.getState(hash: dataProvider.genesisBlockHash) await validatorService.on(genesis: genesisState) @@ -64,7 +65,7 @@ struct ValidatorServiceTests { @Test func produceBlocks() async throws { - let genesisState = try await dataProvider.getState(hash: Data32()) + let genesisState = try await dataProvider.getState(hash: dataProvider.genesisBlockHash) await validatorService.on(genesis: genesisState) @@ -103,7 +104,7 @@ struct ValidatorServiceTests { @Test func makeManyBlocks() async throws { - let genesisState = try await dataProvider.getState(hash: Data32()) + let genesisState = try await dataProvider.getState(hash: dataProvider.genesisBlockHash) await validatorService.on(genesis: genesisState) diff --git a/Boka/.swiftpm/xcode/xcshareddata/xcschemes/Boka.xcscheme b/Boka/.swiftpm/xcode/xcshareddata/xcschemes/Boka.xcscheme index ae134e00..207805d9 100644 --- a/Boka/.swiftpm/xcode/xcshareddata/xcschemes/Boka.xcscheme +++ b/Boka/.swiftpm/xcode/xcshareddata/xcschemes/Boka.xcscheme @@ -62,6 +62,12 @@ ReferencedContainer = "container:"> + + + + Bool { - inner.hasContent(record: &record) - } - - public func write(_ record: inout LogRecord, to: inout FragmentOutput) { - inner.write(&record, to: &to) - } -} +// TODO: renable after this issue is fixed: https://github.com/swiftlang/swift/issues/76690 +// public final class LogFragment: LoggerFragment { +// private let inner: LoggerFragment + +// public init() { +// inner = TimestampFragment() +// .and(InnerFragment().separated(" ")) +// .and(MessageFragment().separated(" ")) +// .and(MetadataFragment().separated(" ")) +// .and(SourceLocationFragment().separated(" ").maxLevel(.debug)) +// } + +// public func hasContent(record: inout LogRecord) -> Bool { +// inner.hasContent(record: &record) +// } + +// public func write(_ record: inout LogRecord, to: inout FragmentOutput) { +// inner.write(&record, to: &to) +// } +// } diff --git a/Boka/Sources/Tracing.swift b/Boka/Sources/Tracing.swift index 359b5900..861a3917 100644 --- a/Boka/Sources/Tracing.swift +++ b/Boka/Sources/Tracing.swift @@ -54,7 +54,7 @@ private func parseLevel(_ level: String) -> Logger.Level? { } public enum Tracing { - public static func bootstrap(_ serviceName: String, loggerOnly: Bool = false) async throws -> [Service] { + public static func bootstrap(_: String, loggerOnly _: Bool = false) async throws -> [Service] { let env = ProcessInfo.processInfo.environment let (filters, defaultLevel, minimalLevel) = parse(from: env["LOG_LEVEL"] ?? "") ?? { @@ -64,7 +64,8 @@ public enum Tracing { LoggingSystem.bootstrap({ label, metadataProvider in BokaLogger( - fragment: LogFragment(), + // fragment: LogFragment(), + fragment: timestampDefaultLoggerFragment(), label: label, level: minimalLevel, metadataProvider: metadataProvider, @@ -73,51 +74,54 @@ public enum Tracing { ) }, metadataProvider: .otel) - if loggerOnly { - return [] - } + return [] + + // TODO: renable after this issue is fixed: https://github.com/swiftlang/swift/issues/76690 + // if loggerOnly { + // return [] + // } - // Configure OTel resource detection to automatically apply helpful attributes to events. - let environment = OTelEnvironment.detected() - let resourceDetection = OTelResourceDetection(detectors: [ - OTelProcessResourceDetector(), - OTelEnvironmentResourceDetector(environment: environment), - .manual( - OTelResource( - attributes: SpanAttributes(["service.name": serviceName.toSpanAttribute()]) - ) - ), - ]) - let resource = await resourceDetection.resource(environment: environment, logLevel: .trace) + // // Configure OTel resource detection to automatically apply helpful attributes to events. + // let environment = OTelEnvironment.detected() + // let resourceDetection = OTelResourceDetection(detectors: [ + // OTelProcessResourceDetector(), + // OTelEnvironmentResourceDetector(environment: environment), + // .manual( + // OTelResource( + // attributes: SpanAttributes(["service.name": serviceName.toSpanAttribute()]) + // ) + // ), + // ]) + // let resource = await resourceDetection.resource(environment: environment, logLevel: .trace) - // Bootstrap the metrics backend to export metrics periodically in OTLP/gRPC. - let registry = OTelMetricRegistry() - let metricsExporter = try OTLPGRPCMetricExporter( - configuration: .init(environment: environment) - ) - let metrics = OTelPeriodicExportingMetricsReader( - resource: resource, - producer: registry, - exporter: metricsExporter, - configuration: .init(environment: environment) - ) - MetricsSystem.bootstrap(OTLPMetricsFactory(registry: registry)) + // // Bootstrap the metrics backend to export metrics periodically in OTLP/gRPC. + // let registry = OTelMetricRegistry() + // let metricsExporter = try OTLPGRPCMetricExporter( + // configuration: .init(environment: environment) + // ) + // let metrics = OTelPeriodicExportingMetricsReader( + // resource: resource, + // producer: registry, + // exporter: metricsExporter, + // configuration: .init(environment: environment) + // ) + // MetricsSystem.bootstrap(OTLPMetricsFactory(registry: registry)) - // Bootstrap the tracing backend to export traces periodically in OTLP/gRPC. - let exporter = try OTLPGRPCSpanExporter(configuration: .init(environment: environment)) - let processor = OTelBatchSpanProcessor( - exporter: exporter, configuration: .init(environment: environment) - ) - let tracer = OTelTracer( - idGenerator: OTelRandomIDGenerator(), - sampler: OTelConstantSampler(isOn: true), - propagator: OTelW3CPropagator(), - processor: processor, - environment: environment, - resource: resource - ) - InstrumentationSystem.bootstrap(tracer) + // // Bootstrap the tracing backend to export traces periodically in OTLP/gRPC. + // let exporter = try OTLPGRPCSpanExporter(configuration: .init(environment: environment)) + // let processor = OTelBatchSpanProcessor( + // exporter: exporter, configuration: .init(environment: environment) + // ) + // let tracer = OTelTracer( + // idGenerator: OTelRandomIDGenerator(), + // sampler: OTelConstantSampler(isOn: true), + // propagator: OTelW3CPropagator(), + // processor: processor, + // environment: environment, + // resource: resource + // ) + // InstrumentationSystem.bootstrap(tracer) - return [tracer, metrics] + // return [tracer, metrics] } } diff --git a/Boka/Sources/main.swift b/Boka/Sources/main.swift index 8c566d23..a07cec9d 100644 --- a/Boka/Sources/main.swift +++ b/Boka/Sources/main.swift @@ -4,9 +4,4 @@ import ConsoleKit let input = CommandInput(arguments: CommandLine.arguments) let console = Terminal() let boka = Boka() -do { - try await console.run(boka, input: input) -} catch { - console.error("\(error.localizedDescription)") - throw error -} +try await console.run(boka, input: input) diff --git a/Boka/Tests/BokaTests/BokaTests.swift b/Boka/Tests/BokaTests/BokaTests.swift index a70ca1c4..802ed3e2 100644 --- a/Boka/Tests/BokaTests/BokaTests.swift +++ b/Boka/Tests/BokaTests/BokaTests.swift @@ -40,7 +40,7 @@ final class BokaTests { @Test func commandWithAllConfig() async throws { let sepc = ResourceLoader.loadResource(named: "devnet_allconfig_spec.json")!.path() let genesis: Genesis = .file(path: sepc) - let (_, protocolConfig) = try await genesis.load() + let (_, _, protocolConfig) = try await genesis.load() #expect(protocolConfig.value.maxWorkItems == 2) #expect(protocolConfig.value.serviceMinBalance == 100) } @@ -49,7 +49,7 @@ final class BokaTests { let sepc = ResourceLoader.loadResource(named: "mainnet_someconfig_spec.json")!.path() let genesis: Genesis = .file(path: sepc) let config = ProtocolConfigRef.mainnet.value - let (_, protocolConfig) = try await genesis.load() + let (_, _, protocolConfig) = try await genesis.load() #expect(protocolConfig.value.auditTranchePeriod == 100) #expect(protocolConfig.value.pvmProgramInitSegmentSize == config.pvmProgramInitSegmentSize) } @@ -58,7 +58,7 @@ final class BokaTests { let sepc = ResourceLoader.loadResource(named: "devnet_noconfig_spec.json")!.path() let genesis: Genesis = .file(path: sepc) let config = ProtocolConfigRef.dev.value - let (_, protocolConfig) = try await genesis.load() + let (_, _, protocolConfig) = try await genesis.load() #expect(protocolConfig.value.maxWorkItems == config.maxWorkItems) #expect(protocolConfig.value.serviceMinBalance == config.serviceMinBalance) } diff --git a/Boka/boka b/Boka/boka deleted file mode 100755 index 57a0c312..00000000 Binary files a/Boka/boka and /dev/null differ diff --git a/Codec/Package.resolved b/Codec/Package.resolved index efc8b352..aa699247 100644 --- a/Codec/Package.resolved +++ b/Codec/Package.resolved @@ -1,5 +1,5 @@ { - "originHash" : "2cf5fc4fee7b0857388938d03095bb30e57dc50320b617b963f68c0a9600a43e", + "originHash" : "027931fd6146ff8a703e948ec61abe2a52a77db3285ba4722225d611e17904e6", "pins" : [ { "identity" : "swift-syntax", diff --git a/Codec/Package.swift b/Codec/Package.swift index 9ae0475f..c8cf459c 100644 --- a/Codec/Package.swift +++ b/Codec/Package.swift @@ -22,7 +22,8 @@ let package = Package( // Targets are the basic building blocks of a package, defining a module or a test suite. // Targets can depend on other targets in this package and products from dependencies. .target( - name: "Codec" + name: "Codec", + dependencies: [] ), .testTarget( name: "CodecTests", diff --git a/Codec/Sources/Codec/DataInput.swift b/Codec/Sources/Codec/DataInput.swift new file mode 100644 index 00000000..31455066 --- /dev/null +++ b/Codec/Sources/Codec/DataInput.swift @@ -0,0 +1,45 @@ +import Foundation + +public protocol DataInput { + /// read some data chunk + /// throw when no more data + mutating func read(length: Int) throws -> Data + + var isEmpty: Bool { get } +} + +extension DataInput { + public mutating func read() throws -> UInt8 { + try read(length: 1).first! + } + + public mutating func decodeUInt64() throws -> UInt64 { + // TODO: improve this by use `read(minLength: 8)` to avoid read byte by byte + let res = try IntegerCodec.decode { try self.read() } + guard let res else { + throw DecodingError.dataCorrupted( + DecodingError.Context( + codingPath: [], + debugDescription: "Not enough data to perform variable length integer decoding" + ) + ) + } + return res + } +} + +extension Data: DataInput { + public mutating func read(length: Int) throws -> Data { + guard count >= length else { + throw DecodingError.dataCorrupted( + DecodingError.Context( + codingPath: [], + debugDescription: "Not enough data to decode \(length) bytes" + ) + ) + } + let res = self[startIndex ..< startIndex + length] + self = self[startIndex + length ..< endIndex] + return res + } +} diff --git a/Codec/Sources/Codec/JamDecoder.swift b/Codec/Sources/Codec/JamDecoder.swift index 9132d885..2e20e9de 100644 --- a/Codec/Sources/Codec/JamDecoder.swift +++ b/Codec/Sources/Codec/JamDecoder.swift @@ -1,23 +1,23 @@ import Foundation public class JamDecoder { - private var data: Data + private var input: DataInput private let config: Any? - public init(data: Data, config: Any? = nil) { - self.data = data + public init(data: DataInput, config: Any? = nil) { + input = data self.config = config } public func decode(_ type: T.Type) throws -> T { - let context = DecodeContext(data: data) + let context = DecodeContext(input: input) context.userInfo[.config] = config let res = try context.decode(type, key: nil) - data = context.data + input = context.input return res } - public static func decode(_ type: T.Type, from data: Data, withConfig config: Any? = nil) throws -> T { + public static func decode(_ type: T.Type, from data: DataInput, withConfig config: Any? = nil) throws -> T { let decoder = JamDecoder(data: data, config: config) let val = try decoder.decode(type) try decoder.finalize() @@ -25,7 +25,7 @@ public class JamDecoder { } public func finalize() throws { - guard data.isEmpty else { + guard input.isEmpty else { throw DecodingError.dataCorrupted( DecodingError.Context( codingPath: [], @@ -71,12 +71,12 @@ private class DecodeContext: Decoder { var codingPath: [CodingKey] = [] var userInfo: [CodingUserInfoKey: Any] = [:] - var data: Data + var input: DataInput - init(data: Data, codingPath: [CodingKey] = [], userInfo: [CodingUserInfoKey: Any] = [:]) { + init(input: DataInput, codingPath: [CodingKey] = [], userInfo: [CodingUserInfoKey: Any] = [:]) { self.codingPath = codingPath self.userInfo = userInfo - self.data = data + self.input = input self.userInfo[.isJamCodec] = true } @@ -92,72 +92,51 @@ private class DecodeContext: Decoder { JamSingleValueDecodingContainer(codingPath: codingPath, decoder: self) } - fileprivate func decodeInt(codingPath: @autoclosure () -> [CodingKey]) throws -> T { - guard data.count >= MemoryLayout.size else { - throw DecodingError.dataCorrupted( - DecodingError.Context( - codingPath: codingPath(), - debugDescription: "Not enough data to decode \(T.self)" - ) - ) - } - let res = data.withUnsafeBytes { ptr in + fileprivate func decodeInt(codingPath _: @autoclosure () -> [CodingKey]) throws -> T { + let data = try input.read(length: MemoryLayout.size) + return data.withUnsafeBytes { ptr in ptr.loadUnaligned(as: T.self) } - data.removeFirst(MemoryLayout.size) - return res } fileprivate func decodeData(codingPath: @autoclosure () -> [CodingKey]) throws -> Data { - guard let length = data.decode() else { - throw DecodingError.dataCorrupted( - DecodingError.Context( - codingPath: codingPath(), - debugDescription: "Unable to decode data length" - ) - ) - } - guard data.count >= Int(length) else { + let length = try input.decodeUInt64() + // sanity check: length must be less than 4gb + guard length < 0x1_0000_0000 else { throw DecodingError.dataCorrupted( DecodingError.Context( codingPath: codingPath(), - debugDescription: "Not enough data to decode" + debugDescription: "Invalid data length" ) ) } - let res = data[data.startIndex ..< data.startIndex + Int(length)] - data.removeFirst(Int(length)) + let res = try input.read(length: Int(length)) return res } fileprivate func decodeData(codingPath: @autoclosure () -> [CodingKey]) throws -> [UInt8] { - guard let length = data.decode() else { - throw DecodingError.dataCorrupted( - DecodingError.Context( - codingPath: codingPath(), - debugDescription: "Unable to decode data length" - ) - ) - } - guard data.count >= Int(length) else { + let length = try input.decodeUInt64() + // sanity check: length must be less than 4gb + guard length < 0x1_0000_0000 else { throw DecodingError.dataCorrupted( DecodingError.Context( codingPath: codingPath(), - debugDescription: "Not enough data to decode" + debugDescription: "Invalid data length" ) ) } - let res = Array(data[data.startIndex ..< data.startIndex + Int(length)]) - data.removeFirst(Int(length)) - return res + let res = try input.read(length: Int(length)) + return Array(res) } fileprivate func decodeArray(_ type: T.Type, key: CodingKey?) throws -> T { - guard let length = data.decode(), length < 0xFFFFFF else { + let length = try input.decodeUInt64() + // sanity check: length can't be unreasonably large + guard length < 0xFFFFFF else { throw DecodingError.dataCorrupted( DecodingError.Context( codingPath: codingPath, - debugDescription: "Unable to decode array length" + debugDescription: "Invalid array length" ) ) } @@ -172,17 +151,8 @@ private class DecodeContext: Decoder { fileprivate func decodeFixedLengthData(_ type: T.Type, key: CodingKey?) throws -> T { try withExtendedLifetime(PushCodingPath(decoder: self, key: key)) { let length = try type.length(decoder: self) - guard data.count >= length else { - throw DecodingError.dataCorrupted( - DecodingError.Context( - codingPath: codingPath, - debugDescription: "Not enough data to decode \(T.self)" - ) - ) - } - let value = data[data.startIndex ..< data.startIndex + length] - data.removeFirst(length) - return try type.init(decoder: self, data: value) + let data = try input.read(length: length) + return try type.init(decoder: self, data: data) } } @@ -214,24 +184,16 @@ private struct JamKeyedDecodingContainer: KeyedDecodingContainerPr } func contains(_: K) -> Bool { - decoder.data.count > 0 + !decoder.input.isEmpty } - func decodeNil(forKey key: K) throws -> Bool { - guard let byte = decoder.data.next() else { - throw DecodingError.keyNotFound( - key, DecodingError.Context(codingPath: codingPath, debugDescription: "Unexpected end of data") - ) - } + func decodeNil(forKey _: K) throws -> Bool { + let byte = try decoder.input.read() return byte == 0 } - func decode(_: Bool.Type, forKey key: K) throws -> Bool { - guard let byte = decoder.data.next() else { - throw DecodingError.keyNotFound( - key, DecodingError.Context(codingPath: codingPath, debugDescription: "Unexpected end of data") - ) - } + func decode(_: Bool.Type, forKey _: K) throws -> Bool { + let byte = try decoder.input.read() return byte == 1 } @@ -258,11 +220,9 @@ private struct JamKeyedDecodingContainer: KeyedDecodingContainerPr return value } - func decode(_: Int8.Type, forKey key: K) throws -> Int8 { - guard let value = decoder.data.next() else { - throw DecodingError.keyNotFound(key, DecodingError.Context(codingPath: codingPath, debugDescription: "Unexpected end of data")) - } - return Int8(bitPattern: value) + func decode(_: Int8.Type, forKey _: K) throws -> Int8 { + let byte = try decoder.input.read() + return Int8(bitPattern: byte) } func decode(_: Int16.Type, forKey key: K) throws -> Int16 { @@ -284,11 +244,9 @@ private struct JamKeyedDecodingContainer: KeyedDecodingContainerPr return value } - func decode(_: UInt8.Type, forKey key: K) throws -> UInt8 { - guard let value = decoder.data.next() else { - throw DecodingError.keyNotFound(key, DecodingError.Context(codingPath: codingPath, debugDescription: "Unexpected end of data")) - } - return value + func decode(_: UInt8.Type, forKey _: K) throws -> UInt8 { + let byte = try decoder.input.read() + return byte } func decode(_: UInt16.Type, forKey key: K) throws -> UInt16 { @@ -330,7 +288,7 @@ private struct JamUnkeyedDecodingContainer: UnkeyedDecodingContainer { var codingPath: [CodingKey] = [] let count: Int? = nil var isAtEnd: Bool { - decoder.data.count == 0 + decoder.input.isEmpty } var currentIndex: Int = 0 @@ -338,17 +296,13 @@ private struct JamUnkeyedDecodingContainer: UnkeyedDecodingContainer { let decoder: DecodeContext mutating func decodeNil() throws -> Bool { - guard let byte = decoder.data.next() else { - throw DecodingError.dataCorruptedError(in: self, debugDescription: "Unexpected end of data") - } + let byte = try decoder.input.read() currentIndex += 1 return byte == 0 } mutating func decode(_: Bool.Type) throws -> Bool { - guard let byte = decoder.data.next() else { - throw DecodingError.dataCorruptedError(in: self, debugDescription: "Unexpected end of data") - } + let byte = try decoder.input.read() currentIndex += 1 return byte == 1 } @@ -379,11 +333,9 @@ private struct JamUnkeyedDecodingContainer: UnkeyedDecodingContainer { } mutating func decode(_: Int8.Type) throws -> Int8 { - guard let value = decoder.data.next() else { - throw DecodingError.dataCorruptedError(in: self, debugDescription: "Unexpected end of data") - } + let byte = try decoder.input.read() currentIndex += 1 - return Int8(bitPattern: value) + return Int8(bitPattern: byte) } mutating func decode(_: Int16.Type) throws -> Int16 { @@ -410,11 +362,9 @@ private struct JamUnkeyedDecodingContainer: UnkeyedDecodingContainer { } mutating func decode(_: UInt8.Type) throws -> UInt8 { - guard let value = decoder.data.next() else { - throw DecodingError.dataCorruptedError(in: self, debugDescription: "Unexpected end of data") - } + let byte = try decoder.input.read() currentIndex += 1 - return value + return byte } mutating func decode(_: UInt16.Type) throws -> UInt16 { @@ -458,13 +408,12 @@ private struct JamSingleValueDecodingContainer: SingleValueDecodingContainer { let decoder: DecodeContext func decodeNil() -> Bool { - decoder.data.next() == 0 + let byte = try? decoder.input.read() + return byte == 0 } func decode(_: Bool.Type) throws -> Bool { - guard let byte = decoder.data.next() else { - throw DecodingError.dataCorruptedError(in: self, debugDescription: "Unexpected end of data") - } + let byte = try decoder.input.read() return byte == 1 } @@ -492,10 +441,8 @@ private struct JamSingleValueDecodingContainer: SingleValueDecodingContainer { } func decode(_: Int8.Type) throws -> Int8 { - guard let value = decoder.data.next() else { - throw DecodingError.dataCorruptedError(in: self, debugDescription: "Unexpected end of data") - } - return Int8(bitPattern: value) + let byte = try decoder.input.read() + return Int8(bitPattern: byte) } func decode(_: Int16.Type) throws -> Int16 { @@ -518,10 +465,8 @@ private struct JamSingleValueDecodingContainer: SingleValueDecodingContainer { } func decode(_: UInt8.Type) throws -> UInt8 { - guard let value = decoder.data.next() else { - throw DecodingError.dataCorruptedError(in: self, debugDescription: "Unexpected end of data") - } - return value + let byte = try decoder.input.read() + return byte } func decode(_: UInt16.Type) throws -> UInt16 { diff --git a/Networking/Sources/MsQuicSwift/NetAddr.swift b/Networking/Sources/MsQuicSwift/NetAddr.swift index 1c776393..d9af4c92 100644 --- a/Networking/Sources/MsQuicSwift/NetAddr.swift +++ b/Networking/Sources/MsQuicSwift/NetAddr.swift @@ -7,50 +7,72 @@ import msquic import Darwin #endif -public struct NetAddr: Hashable, Sendable { - var ipAddress: String - var port: UInt16 - var ipv4: Bool +public struct NetAddr: Sendable { + var quicAddr: QUIC_ADDR - public init(ipAddress: String, port: UInt16, ipv4: Bool = false) { - self.ipAddress = ipAddress - self.port = port - self.ipv4 = ipv4 - // TODO: automatically determine the ip address family - } - - public init(quicAddr: QUIC_ADDR) { - let (host, port, ipv4) = parseQuicAddr(quicAddr) ?? ("::dead:beef", 0, false) - ipAddress = host - self.port = port - self.ipv4 = ipv4 + public init?(address: String) { + guard let res = parseIpv6Addr(address) ?? parseIpv4Addr(address) else { + return nil + } + let (host, port) = res + self.init(ipAddress: host, port: port) } - func toQuicAddr() -> QUIC_ADDR? { - var addr = QUIC_ADDR() - let cstring = ipAddress.cString(using: .utf8) - guard cstring != nil else { + public init?(ipAddress: String, port: UInt16) { + guard let cstring = ipAddress.cString(using: .utf8) else { return nil } - let success = QuicAddrFromString(cstring!, port, &addr) + quicAddr = QUIC_ADDR() + let success = QuicAddrFromString(cstring, port, &quicAddr) guard success == 1 else { return nil } - return addr + } + + public init(quicAddr: QUIC_ADDR) { + self.quicAddr = quicAddr + } + + public func getAddressAndPort() -> (String, UInt16) { + let (host, port, _) = parseQuicAddr(quicAddr) ?? ("::dead:beef", 0, false) + return (host, port) + } +} + +extension NetAddr: Equatable { + public static func == (lhs: NetAddr, rhs: NetAddr) -> Bool { + var addr1 = lhs.quicAddr + var addr2 = rhs.quicAddr + return QuicAddrCompare(&addr1, &addr2) == 1 + } +} + +extension NetAddr: Hashable { + public func hash(into hasher: inout Hasher) { + var addr = quicAddr + let hash = QuicAddrHash(&addr) + hasher.combine(hash) } } extension NetAddr: CustomStringConvertible { public var description: String { - if ipv4 { - "\(ipAddress):\(port)" - } else { - "[\(ipAddress)]:\(port)" + var buffer = QUIC_ADDR_STR() + var addr = quicAddr + let success = QuicAddrToString(&addr, &buffer) + guard success == 1 else { + return "::dead:beef" } + let ipAddr = withUnsafePointer(to: buffer.Address) { ptr in + ptr.withMemoryRebound(to: CChar.self, capacity: Int(64)) { ptr in + String(cString: ptr, encoding: .utf8)! + } + } + return ipAddr } } -func parseQuicAddr(_ addr: QUIC_ADDR) -> (String, UInt16, Bool)? { +private func parseQuicAddr(_ addr: QUIC_ADDR) -> (String, UInt16, Bool)? { let ipv6 = addr.Ip.sa_family == QUIC_ADDRESS_FAMILY(QUIC_ADDRESS_FAMILY_INET6) let port = if ipv6 { helper_ntohs(addr.Ipv6.sin6_port) @@ -68,11 +90,38 @@ func parseQuicAddr(_ addr: QUIC_ADDR) -> (String, UInt16, Bool)? { guard success == 1 else { return nil } - let ipaddress = withUnsafePointer(to: buffer.Address) { ptr in + let ipAddr = withUnsafePointer(to: buffer.Address) { ptr in ptr.withMemoryRebound(to: CChar.self, capacity: Int(64)) { ptr in String(cString: ptr, encoding: .utf8)! } } - return (ipaddress, port, ipv6) + return (ipAddr, port, ipv6) +} + +private func parseIpv6Addr(_ address: String) -> (String, UInt16)? { + let parts = address.split(separator: "]:") + guard parts.count == 2 else { + return nil + } + let host = String(parts[0]) + let port = parts[1].dropFirst() + guard let portNum = UInt16(port, radix: 10) else { + return nil + } + return (host, portNum) +} + +private func parseIpv4Addr(_ address: String) -> (String, UInt16)? { + print(address) + let parts = address.split(separator: ":") + guard parts.count == 2 else { + return nil + } + let host = String(parts[0]) + let port = parts[1].dropFirst() + guard let portNum = UInt16(port, radix: 10) else { + return nil + } + return (host, portNum) } diff --git a/Networking/Sources/MsQuicSwift/QuicConnection.swift b/Networking/Sources/MsQuicSwift/QuicConnection.swift index 59bf13a4..538ba56d 100644 --- a/Networking/Sources/MsQuicSwift/QuicConnection.swift +++ b/Networking/Sources/MsQuicSwift/QuicConnection.swift @@ -110,12 +110,13 @@ public final class QuicConnection: Sendable { guard storage2.state == .opened else { throw QuicError.alreadyStarted } + let (host, port) = address.getAddressAndPort() try storage2.registration.api.call("ConnectionStart") { api in api.pointee.ConnectionStart( storage2.handle.ptr, storage2.configuration.ptr, QUIC_ADDRESS_FAMILY(QUIC_ADDRESS_FAMILY_UNSPEC), - address.ipAddress, address.port + host, port ) } storage2.state = .started diff --git a/Networking/Sources/MsQuicSwift/QuicListener.swift b/Networking/Sources/MsQuicSwift/QuicListener.swift index bd6d840a..a8d6e371 100644 --- a/Networking/Sources/MsQuicSwift/QuicListener.swift +++ b/Networking/Sources/MsQuicSwift/QuicListener.swift @@ -41,10 +41,7 @@ public final class QuicListener: Sendable { _ = ListenerHandle(logger: logger, ptr: ptr!, api: registration.api, listener: self) - let address = listenAddress.toQuicAddr() - guard var address else { - throw QuicError.invalidAddress(listenAddress) - } + var address = listenAddress.quicAddr try alpns.withContentUnsafeBytes { alpnPtrs in var buffer = [QUIC_BUFFER](repeating: QUIC_BUFFER(), count: alpnPtrs.count) diff --git a/Networking/Sources/MsQuicSwift/QuicStream.swift b/Networking/Sources/MsQuicSwift/QuicStream.swift index d8975ab5..563a84b4 100644 --- a/Networking/Sources/MsQuicSwift/QuicStream.swift +++ b/Networking/Sources/MsQuicSwift/QuicStream.swift @@ -97,6 +97,7 @@ public final class QuicStream: Sendable { throw QuicError.alreadyClosed } + // TODO: improve the case when data is empty let messageLength = data.count let sendBufferRaw = UnsafeMutableRawPointer.allocate( // !! allocate diff --git a/Networking/Sources/Networking/Connection.swift b/Networking/Sources/Networking/Connection.swift index f4c2be01..a236610f 100644 --- a/Networking/Sources/Networking/Connection.swift +++ b/Networking/Sources/Networking/Connection.swift @@ -1,3 +1,4 @@ +import AsyncChannels import Foundation import MsQuicSwift import TracingUtils @@ -5,11 +6,17 @@ import Utils private let logger = Logger(label: "Connection") -public final class Connection: Sendable { +public protocol ConnectionInfoProtocol { + var id: UniqueId { get } + var mode: PeerMode { get } + var remoteAddress: NetAddr { get } +} + +public final class Connection: Sendable, ConnectionInfoProtocol { let connection: QuicConnection let impl: PeerImpl - let mode: PeerMode - let remoteAddress: NetAddr + public let mode: PeerMode + public let remoteAddress: NetAddr let presistentStreams: ThreadSafeContainer< [Handler.PresistentHandler.StreamKind: Stream] > = .init([:]) @@ -32,10 +39,11 @@ public final class Connection: Sendable { } public func request(_ request: Handler.EphemeralHandler.Request) async throws -> Data { - let data = request.encode() + let data = try request.encode() let kind = request.kind let stream = try createStream(kind: kind) try stream.send(data: data) + // TODO: pipe this to decoder directly to be able to reject early var response = Data() while let nextData = await stream.receive() { response.append(nextData) @@ -109,7 +117,34 @@ public final class Connection: Sendable { } if let ceKind = Handler.EphemeralHandler.StreamKind(rawValue: byte) { logger.debug("stream opened. kind: \(ceKind)") - // TODO: handle requests + + var decoder = impl.ephemeralStreamHandler.createDecoder(kind: ceKind) + + let lengthData = await stream.receive(count: 4) + guard let lengthData else { + stream.close(abort: true) + logger.debug("Invalid request length") + return + } + let length = UInt32(littleEndian: lengthData.withUnsafeBytes { $0.load(as: UInt32.self) }) + // sanity check for length + // TODO: pick better value + guard length < 1024 * 1024 * 10 else { + stream.close(abort: true) + logger.debug("Invalid request length: \(length)") + // TODO: report bad peer + return + } + let data = await stream.receive(count: Int(length)) + guard let data else { + stream.close(abort: true) + logger.debug("Invalid request data") + // TODO: report bad peer + return + } + let request = try decoder.decode(data: data) + let resp = try await impl.ephemeralStreamHandler.handle(connection: self, request: request) + try stream.send(data: resp, finish: true) } } } @@ -133,7 +168,7 @@ func presistentStreamRunLoop( ) { Task.detached { do { - try handler.streamOpened(stream: stream, kind: kind) + try await handler.streamOpened(connection: connection, stream: stream, kind: kind) } catch { logger.debug( "Failed to setup presistent stream", @@ -144,25 +179,37 @@ func presistentStreamRunLoop( "Starting presistent stream run loop", metadata: ["connectionId": "\(connection.id)", "streamId": "\(stream.id)", "kind": "\(kind)"] ) + var decoder = handler.createDecoder(kind: kind) do { - let decoder = handler.createDecoder(kind: kind) - while let data = await stream.receive() { - if let message = try decoder.decode(data: data) { - try handler.handle(message: message) + while true { + let lengthData = await stream.receive(count: 4) + guard let lengthData else { + break + } + let length = UInt32(littleEndian: lengthData.withUnsafeBytes { $0.load(as: UInt32.self) }) + // sanity check for length + // TODO: pick better value + guard length < 1024 * 1024 * 10 else { + stream.close(abort: true) + logger.debug("Invalid message length: \(length)") + // TODO: report bad peer + return + } + let data = await stream.receive(count: Int(length)) + guard let data else { + break } + let msg = try decoder.decode(data: data) + try await handler.handle(connection: connection, message: msg) } - // it is ok if there are some leftover data - _ = decoder.finish() - logger.debug( - "Ending presistent stream run loop", - metadata: ["connectionId": "\(connection.id)", "streamId": "\(stream.id)", "kind": "\(kind)"] - ) } catch { - logger.debug( - "Failed to handle presistent stream data", - metadata: ["connectionId": "\(connection.id)", "streamId": "\(stream.id)", "kind": "\(kind)", "error": "\(error)"] - ) + logger.debug("UP stream run loop failed: \(error)") stream.close(abort: true) } + + logger.debug( + "Ending presistent stream run loop", + metadata: ["connectionId": "\(connection.id)", "streamId": "\(stream.id)", "kind": "\(kind)"] + ) } } diff --git a/Networking/Sources/Networking/Peer.swift b/Networking/Sources/Networking/Peer.swift index 247a8e2f..33f026fe 100644 --- a/Networking/Sources/Networking/Peer.swift +++ b/Networking/Sources/Networking/Peer.swift @@ -3,6 +3,8 @@ import Logging import MsQuicSwift import Utils +public typealias NetAddr = MsQuicSwift.NetAddr + public enum StreamType: Sendable { case uniquePersistent case commonEphemeral @@ -123,6 +125,32 @@ public final class Peer: Sendable { return conn } } + + public func broadcast(kind: Handler.PresistentHandler.StreamKind, message: any MessageProtocol) { + let connections = impl.connections.read { connections in + connections.byId.values + } + + guard let messageData = try? message.encode() else { + impl.logger.warning("Failed to encode message: \(message)") + return + } + for connection in connections { + if let stream = try? connection.createPreistentStream(kind: kind) { + let res = Result(catching: { try stream.send(data: messageData) }) + switch res { + case .success: + break + case let .failure(error): + impl.logger.warning("Failed to send message", metadata: [ + "connectionId": "\(connection.id)", + "kind": "\(kind)", + "error": "\(error)", + ]) + } + } + } + } } final class PeerImpl: Sendable { @@ -206,7 +234,7 @@ final class PeerImpl: Sendable { } } -private final class PeerEventHandler: QuicEventHandler { +private struct PeerEventHandler: QuicEventHandler { private let impl: PeerImpl private var logger: Logger { diff --git a/Networking/Sources/Networking/Stream.swift b/Networking/Sources/Networking/Stream.swift index 3274356c..85adefaa 100644 --- a/Networking/Sources/Networking/Stream.swift +++ b/Networking/Sources/Networking/Stream.swift @@ -1,6 +1,7 @@ import AsyncChannels import Foundation import MsQuicSwift +import Synchronization import TracingUtils import Utils @@ -29,8 +30,7 @@ final class Stream: Sendable, StreamProtocol { let stream: QuicStream let impl: PeerImpl private let channel: Channel = .init(capacity: 100) - // TODO: https://github.com/gh123man/Async-Channels/issues/12 - private let nextData: ThreadSafeContainer = .init(nil) + private let nextData: Mutex = .init(nil) private let _status: ThreadSafeContainer = .init(.open) let connectionId: UniqueId let kind: Handler.PresistentHandler.StreamKind? @@ -95,32 +95,37 @@ final class Stream: Sendable, StreamProtocol { } func receive() async -> Data? { - if let data = nextData.value { - nextData.value = nil + let data = nextData.withLock { + let ret = $0 + $0 = nil + return ret + } + if let data { return data } return await channel.receive() } - func receiveByte() async -> UInt8? { - if var data = nextData.value { - let byte = data.removeFirst() - if data.isEmpty { - nextData.value = nil - } else { - nextData.value = data - } - return byte - } - - guard var data = await receive() else { + func receive(count: Int) async -> Data? { + guard var result = await receive() else { return nil } - - let byte = data.removeFirst() - if !data.isEmpty { - nextData.value = data + if result.count < count { + guard let more = await receive(count: count - result.count) else { + return nil + } + result.append(more) + return result + } else { + let ret = result.prefix(count) + nextData.withLock { + $0 = result.dropFirst(count) + } + return ret } - return byte + } + + func receiveByte() async -> UInt8? { + await receive(count: 1)?.first } } diff --git a/Networking/Sources/Networking/StreamHandler.swift b/Networking/Sources/Networking/StreamHandler.swift index 3a2944a8..2c848f28 100644 --- a/Networking/Sources/Networking/StreamHandler.swift +++ b/Networking/Sources/Networking/StreamHandler.swift @@ -2,8 +2,8 @@ import Foundation public protocol StreamKindProtocol: Sendable, Hashable, RawRepresentable, CaseIterable {} -public protocol MessageProtocol { - func encode() -> Data +public protocol MessageProtocol: Sendable { + func encode() throws -> Data } public protocol RequestProtocol: MessageProtocol { @@ -15,12 +15,7 @@ public protocol RequestProtocol: MessageProtocol { public protocol MessageDecoder { associatedtype Message - // return nil if need more data - // data will be kept in internal buffer - func decode(data: Data) throws -> Message? - - // return leftover data - func finish() -> Data? + mutating func decode(data: Data) throws -> Message } public protocol PresistentStreamHandler: Sendable { @@ -28,8 +23,8 @@ public protocol PresistentStreamHandler: Sendable { associatedtype Message: MessageProtocol func createDecoder(kind: StreamKind) -> any MessageDecoder - func streamOpened(stream: any StreamProtocol, kind: StreamKind) throws - func handle(message: Message) throws + func streamOpened(connection: any ConnectionInfoProtocol, stream: any StreamProtocol, kind: StreamKind) async throws + func handle(connection: any ConnectionInfoProtocol, message: Message) async throws } public protocol EphemeralStreamHandler: Sendable { @@ -37,7 +32,7 @@ public protocol EphemeralStreamHandler: Sendable { associatedtype Request: RequestProtocol func createDecoder(kind: StreamKind) -> any MessageDecoder - func handle(request: Request) async throws -> Data + func handle(connection: any ConnectionInfoProtocol, request: Request) async throws -> Data } public protocol StreamHandler: Sendable { diff --git a/Networking/Sources/module.modulemap b/Networking/Sources/module.modulemap index 505a426e..f676e89a 100644 --- a/Networking/Sources/module.modulemap +++ b/Networking/Sources/module.modulemap @@ -5,7 +5,5 @@ module msquic { module openssl { header "include/openssl/ssl3.h" - link "ssl" - link "crypto" export * } diff --git a/Networking/Tests/MsQuicSwiftTests/QuicListenerTests.swift b/Networking/Tests/MsQuicSwiftTests/QuicListenerTests.swift index eeefa7bb..8e15b935 100644 --- a/Networking/Tests/MsQuicSwiftTests/QuicListenerTests.swift +++ b/Networking/Tests/MsQuicSwiftTests/QuicListenerTests.swift @@ -58,13 +58,14 @@ struct QuicListenerTests { handler: serverHandler, registration: registration, configuration: serverConfiguration, - listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0), + listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0)!, alpns: [Data("testalpn".utf8)] ) let listenAddress = try listener.listenAddress() - #expect(listenAddress.ipAddress == "127.0.0.1") - #expect(listenAddress.port != 0) + let (ipAddress, port) = listenAddress.getAddressAndPort() + #expect(ipAddress == "127.0.0.1") + #expect(port != 0) // create connection to listener @@ -98,10 +99,12 @@ struct QuicListenerTests { } }.first! + let (ipAddress2, _) = info.remoteAddress.getAddressAndPort() + #expect(info.negotiatedAlpn == Data("testalpn".utf8)) #expect(info.serverName == "127.0.0.1") #expect(info.localAddress == listenAddress) - #expect(info.remoteAddress.ipAddress == "127.0.0.1") + #expect(ipAddress2 == "127.0.0.1") let stream2 = try serverConnection.createStream() try stream2.send(data: Data("other test data 2".utf8)) diff --git a/Networking/Tests/NetworkingTests/PKCS12Tests.swift b/Networking/Tests/NetworkingTests/PKCS12Tests.swift index 6b2a01a3..a884b6de 100644 --- a/Networking/Tests/NetworkingTests/PKCS12Tests.swift +++ b/Networking/Tests/NetworkingTests/PKCS12Tests.swift @@ -45,7 +45,7 @@ struct PKCS12Tests { handler: serverHandler, registration: registration, configuration: serverConfiguration, - listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0), + listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0)!, alpns: [Data("testalpn".utf8)] ) diff --git a/Networking/Tests/NetworkingTests/PeerTest.swift b/Networking/Tests/NetworkingTests/PeerTest.swift index 271d2823..d35f166a 100644 --- a/Networking/Tests/NetworkingTests/PeerTest.swift +++ b/Networking/Tests/NetworkingTests/PeerTest.swift @@ -55,7 +55,7 @@ final class PeerTests { handler: serverHandler, registration: registration, configuration: serverConfiguration, - listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0), + listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0)!, alpns: [Data("testalpn".utf8)] ) @@ -115,7 +115,7 @@ final class PeerTests { handler: serverHandler, registration: registration, configuration: serverConfiguration, - listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0), + listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0)!, alpns: [Data("testalpn".utf8)] ) diff --git a/Node/Package.swift b/Node/Package.swift index 7ff4460d..bf29c664 100644 --- a/Node/Package.swift +++ b/Node/Package.swift @@ -21,6 +21,7 @@ let package = Package( .package(path: "../RPC"), .package(path: "../TracingUtils"), .package(path: "../Utils"), + .package(url: "https://github.com/gh123man/Async-Channels.git", from: "1.0.0"), ], targets: [ // Targets are the basic building blocks of a package, defining a module or a test suite. @@ -32,6 +33,7 @@ let package = Package( "RPC", "TracingUtils", "Utils", + .product(name: "AsyncChannels", package: "Async-Channels"), ] ), .testTarget( diff --git a/Node/Sources/Node/Genesis.swift b/Node/Sources/Node/Genesis.swift index 52980efe..32419add 100644 --- a/Node/Sources/Node/Genesis.swift +++ b/Node/Sources/Node/Genesis.swift @@ -21,12 +21,12 @@ public enum GenesisError: Error { } extension Genesis { - public func load() async throws -> (StateRef, ProtocolConfigRef) { + public func load() async throws -> (StateRef, BlockRef, ProtocolConfigRef) { switch self { case .dev: let config = ProtocolConfigRef.dev - let state = try State.devGenesis(config: config) - return (StateRef(state), config) + let (state, block) = try State.devGenesis(config: config) + return (state, block, config) case let .file(path): let genesis = try readAndValidateGenesis(from: path) var config: ProtocolConfig @@ -44,8 +44,8 @@ extension Genesis { config = genesis.config! } let configRef = Ref(config) - let state = try State.devGenesis(config: configRef) - return (StateRef(state), configRef) + let (state, block) = try State.devGenesis(config: configRef) + return (state, block, configRef) } } diff --git a/Node/Sources/Node/NetworkingProtocol/CommonEphemeral/CERequest.swift b/Node/Sources/Node/NetworkingProtocol/CommonEphemeral/CERequest.swift new file mode 100644 index 00000000..893eea64 --- /dev/null +++ b/Node/Sources/Node/NetworkingProtocol/CommonEphemeral/CERequest.swift @@ -0,0 +1,87 @@ +import Blockchain +import Codec +import Foundation +import Networking + +public enum CERequest: Sendable { + case safroleTicket1(SafroleTicketMessage) + case safroleTicket2(SafroleTicketMessage) +} + +extension CERequest: RequestProtocol { + public typealias StreamKind = CommonEphemeralStreamKind + + public func encode() throws -> Data { + switch self { + case let .safroleTicket1(message): + try JamEncoder.encode(message) + case let .safroleTicket2(message): + try JamEncoder.encode(message) + } + } + + public var kind: CommonEphemeralStreamKind { + switch self { + case .safroleTicket1: + .safroleTicket1 + case .safroleTicket2: + .safroleTicket2 + } + } + + static func getType(kind: CommonEphemeralStreamKind) -> Decodable.Type { + switch kind { + case .safroleTicket1: + SafroleTicketMessage.self + case .safroleTicket2: + SafroleTicketMessage.self + default: + fatalError("unimplemented") + } + } + + static func from(kind: CommonEphemeralStreamKind, data: any Decodable) -> CERequest? { + switch kind { + case .safroleTicket1: + guard let message = data as? SafroleTicketMessage else { + return nil + } + return .safroleTicket1(message) + case .safroleTicket2: + guard let message = data as? SafroleTicketMessage else { + return nil + } + return .safroleTicket2(message) + default: + fatalError("unimplemented") + } + } +} + +extension CERequest { + public func handle(blockchain: Blockchain) async throws -> (any Encodable)? { + switch self { + case let .safroleTicket1(message): + blockchain.publish(event: RuntimeEvents.SafroleTicketsReceived( + items: [ + ExtrinsicTickets.TicketItem( + attempt: message.attempt, + signature: message.proof + ), + ] + )) + // TODO: rebroadcast to other peers after some time + return nil + case let .safroleTicket2(message): + blockchain.publish(event: RuntimeEvents.SafroleTicketsReceived( + items: [ + ExtrinsicTickets.TicketItem( + attempt: message.attempt, + signature: message.proof + ), + ] + )) + return nil + } + } +} diff --git a/Node/Sources/Node/NetworkingProtocol/CommonEphemeral/SafroleTicket.swift b/Node/Sources/Node/NetworkingProtocol/CommonEphemeral/SafroleTicket.swift new file mode 100644 index 00000000..4ed74c2a --- /dev/null +++ b/Node/Sources/Node/NetworkingProtocol/CommonEphemeral/SafroleTicket.swift @@ -0,0 +1,7 @@ +import Blockchain + +public struct SafroleTicketMessage: Codable, Sendable { + public var epochIndex: EpochIndex + public var attempt: TicketIndex + public var proof: BandersnatchRingVRFProof +} diff --git a/Node/Sources/Node/NetworkingProtocol/MessageDecoder.swift b/Node/Sources/Node/NetworkingProtocol/MessageDecoder.swift new file mode 100644 index 00000000..d3a63722 --- /dev/null +++ b/Node/Sources/Node/NetworkingProtocol/MessageDecoder.swift @@ -0,0 +1,55 @@ +import Blockchain +import Codec +import Dispatch +import Foundation +import Networking +import Synchronization +import TracingUtils + +class UPMessageDecoder: MessageDecoder { + typealias Message = UPMessage + + private let config: ProtocolConfigRef + private let kind: UniquePresistentStreamKind + + init(config: ProtocolConfigRef, kind: UniquePresistentStreamKind) { + self.config = config + self.kind = kind + } + + func decode(data: Data) throws -> Message { + let type = UPMessage.getType(kind: kind) + let payload = try JamDecoder.decode(type, from: data, withConfig: config) + guard let message = UPMessage.from(kind: kind, data: payload) else { + throw DecodingError.dataCorrupted(DecodingError.Context( + codingPath: [], + debugDescription: "unreachable: invalid UP message" + )) + } + return message + } +} + +class CEMessageDecoder: MessageDecoder { + typealias Message = CERequest + + private let config: ProtocolConfigRef + private let kind: CommonEphemeralStreamKind + + init(config: ProtocolConfigRef, kind: CommonEphemeralStreamKind) { + self.config = config + self.kind = kind + } + + func decode(data: Data) throws -> Message { + let type = CERequest.getType(kind: kind) + let payload = try JamDecoder.decode(type, from: data, withConfig: config) + guard let message = CERequest.from(kind: kind, data: payload) else { + throw DecodingError.dataCorrupted(DecodingError.Context( + codingPath: [], + debugDescription: "unreachable: invalid CE message" + )) + } + return message + } +} diff --git a/Node/Sources/Node/NetworkingProtocol/Network.swift b/Node/Sources/Node/NetworkingProtocol/Network.swift new file mode 100644 index 00000000..69d90340 --- /dev/null +++ b/Node/Sources/Node/NetworkingProtocol/Network.swift @@ -0,0 +1,129 @@ +import Blockchain +import Codec +import Foundation +import Networking +import TracingUtils +import Utils + +public protocol NetworkProtocolHandler: Sendable { + func handle(ceRequest: CERequest) async throws -> (any Encodable)? + func handle(upMessage: UPMessage) async throws +} + +public final class Network: Sendable { + public struct Config { + public var mode: PeerMode + public var listenAddress: NetAddr + public var key: Ed25519.SecretKey + public var peerSettings: PeerSettings + + public init( + mode: PeerMode, + listenAddress: NetAddr, + key: Ed25519.SecretKey, + peerSettings: PeerSettings = .defaultSettings + ) { + self.mode = mode + self.listenAddress = listenAddress + self.key = key + self.peerSettings = peerSettings + } + } + + private let impl: NetworkImpl + private let peer: Peer + + public init( + config: Config, + protocolConfig: ProtocolConfigRef, + genesisHeader: Data32, + handler: NetworkProtocolHandler + ) throws { + let logger = Logger(label: "Network".uniqueId) + + impl = NetworkImpl( + logger: logger, + config: protocolConfig, + handler: handler + ) + + let option = PeerOptions( + mode: config.mode, + listenAddress: config.listenAddress, + genesisHeader: genesisHeader, + secretKey: config.key, + presistentStreamHandler: PresistentStreamHandlerImpl(impl: impl), + ephemeralStreamHandler: EphemeralStreamHandlerImpl(impl: impl), + serverSettings: .defaultSettings, + clientSettings: .defaultSettings + ) + + peer = try Peer(options: option) + } + + public func connect(to: NetAddr, mode: PeerMode) throws -> some ConnectionInfoProtocol { + try peer.connect(to: to, mode: mode) + } + + public func broadcast(kind: UniquePresistentStreamKind, message: any MessageProtocol) { + peer.broadcast(kind: kind, message: message) + } +} + +struct HandlerDef: StreamHandler { + typealias PresistentHandler = PresistentStreamHandlerImpl + typealias EphemeralHandler = EphemeralStreamHandlerImpl +} + +private final class NetworkImpl: Sendable { + let logger: Logger + let config: ProtocolConfigRef + let handler: NetworkProtocolHandler + + init(logger: Logger, config: ProtocolConfigRef, handler: NetworkProtocolHandler) { + self.logger = logger + self.config = config + self.handler = handler + } +} + +struct PresistentStreamHandlerImpl: PresistentStreamHandler { + typealias StreamKind = UniquePresistentStreamKind + typealias Message = UPMessage + + fileprivate let impl: NetworkImpl + + func createDecoder(kind: StreamKind) -> any MessageDecoder { + UPMessageDecoder(config: impl.config, kind: kind) + } + + func streamOpened(connection _: any ConnectionInfoProtocol, stream _: any StreamProtocol, kind _: StreamKind) throws { + // TODO: send handshake + } + + func handle(connection: any ConnectionInfoProtocol, message: Message) async throws { + impl.logger.trace("handling message: \(message) from \(connection.id)") + + try await impl.handler.handle(upMessage: message) + } +} + +struct EphemeralStreamHandlerImpl: EphemeralStreamHandler { + typealias StreamKind = CommonEphemeralStreamKind + typealias Request = CERequest + + fileprivate let impl: NetworkImpl + + func createDecoder(kind: StreamKind) -> any MessageDecoder { + CEMessageDecoder(config: impl.config, kind: kind) + } + + func handle(connection: any ConnectionInfoProtocol, request: Request) async throws -> Data { + impl.logger.trace("handling request: \(request) from \(connection.id)") + let resp = try await impl.handler.handle(ceRequest: request) + if let resp { + return try JamEncoder.encode(resp) + } + return Data() + } +} diff --git a/Node/Sources/Node/NetworkingProtocol/NetworkManager.swift b/Node/Sources/Node/NetworkingProtocol/NetworkManager.swift new file mode 100644 index 00000000..7dcd245d --- /dev/null +++ b/Node/Sources/Node/NetworkingProtocol/NetworkManager.swift @@ -0,0 +1,61 @@ +import Blockchain +import Foundation +import TracingUtils +import Utils + +private let logger = Logger(label: "NetworkManager") + +public final class NetworkManager: Sendable { + private let network: Network + // This is for development only + // Will assume those peers are also validators + private let devPeers: Set = [] + + public init(config: Network.Config, blockchain: Blockchain) throws { + let handler = HandlerImpl(blockchain: blockchain) + network = try Network( + config: config, + protocolConfig: blockchain.config, + genesisHeader: blockchain.dataProvider.genesisBlockHash, + handler: handler + ) + } +} + +struct HandlerImpl: NetworkProtocolHandler { + let blockchain: Blockchain + + func handle(ceRequest: CERequest) async throws -> (any Encodable)? { + switch ceRequest { + case let .safroleTicket1(message): + blockchain.publish(event: RuntimeEvents.SafroleTicketsReceived( + items: [ + ExtrinsicTickets.TicketItem( + attempt: message.attempt, + signature: message.proof + ), + ] + )) + // TODO: rebroadcast to other peers after some time + return nil + case let .safroleTicket2(message): + blockchain.publish(event: RuntimeEvents.SafroleTicketsReceived( + items: [ + ExtrinsicTickets.TicketItem( + attempt: message.attempt, + signature: message.proof + ), + ] + )) + return nil + } + } + + func handle(upMessage: UPMessage) async throws { + switch upMessage { + case let .blockAnnouncement(message): + logger.debug("received block announcement: \(message)") + // TODO: handle it + } + } +} diff --git a/Node/Sources/Node/NetworkingProtocol/Typealias.swift b/Node/Sources/Node/NetworkingProtocol/Typealias.swift new file mode 100644 index 00000000..4b86e1dd --- /dev/null +++ b/Node/Sources/Node/NetworkingProtocol/Typealias.swift @@ -0,0 +1,5 @@ +import Networking + +public typealias PeerMode = Networking.PeerMode +public typealias PeerSettings = Networking.PeerSettings +public typealias NetAddr = Networking.NetAddr diff --git a/Node/Sources/Node/NetworkingProtocol/UniquePresistent/BlockAnnouncement.swift b/Node/Sources/Node/NetworkingProtocol/UniquePresistent/BlockAnnouncement.swift new file mode 100644 index 00000000..f23f1e90 --- /dev/null +++ b/Node/Sources/Node/NetworkingProtocol/UniquePresistent/BlockAnnouncement.swift @@ -0,0 +1,8 @@ +import Blockchain +import Utils + +public struct BlockAnnouncement: Codable, Sendable { + public var header: Header + public var headerHash: Data32 + public var timeslot: TimeslotIndex +} diff --git a/Node/Sources/Node/NetworkingProtocol/UniquePresistent/UPMessage.swift b/Node/Sources/Node/NetworkingProtocol/UniquePresistent/UPMessage.swift new file mode 100644 index 00000000..77cfe036 --- /dev/null +++ b/Node/Sources/Node/NetworkingProtocol/UniquePresistent/UPMessage.swift @@ -0,0 +1,40 @@ +import Codec +import Foundation +import Networking + +public enum UPMessage: Sendable { + case blockAnnouncement(BlockAnnouncement) +} + +extension UPMessage: MessageProtocol { + public func encode() throws -> Data { + switch self { + case let .blockAnnouncement(message): + try JamEncoder.encode(message) + } + } + + public var kind: UniquePresistentStreamKind { + switch self { + case .blockAnnouncement: + .blockAnnouncement + } + } + + static func getType(kind: UniquePresistentStreamKind) -> Decodable.Type { + switch kind { + case .blockAnnouncement: + BlockAnnouncement.self + } + } + + static func from(kind: UniquePresistentStreamKind, data: any Decodable) -> UPMessage? { + switch kind { + case .blockAnnouncement: + guard let message = data as? BlockAnnouncement else { + return nil + } + return .blockAnnouncement(message) + } + } +} diff --git a/Node/Sources/Node/Node.swift b/Node/Sources/Node/Node.swift index d6a4b57b..56cca98d 100644 --- a/Node/Sources/Node/Node.swift +++ b/Node/Sources/Node/Node.swift @@ -1,47 +1,64 @@ import Blockchain +import Networking import RPC import TracingUtils import Utils -let logger = Logger(label: "node") +private let logger = Logger(label: "node") public typealias RPCConfig = Server.Config +public typealias NetworkConfig = Network.Config public class Node { - public class Config { - public let rpc: Server.Config + public struct Config { + public var rpc: Server.Config? + public var network: Network.Config - public init(rpc: Server.Config) { + public init(rpc: Server.Config?, network: Network.Config) { self.rpc = rpc + self.network = network } } public let blockchain: Blockchain - public let rpcServer: Server + public let rpcServer: Server? public let timeProvider: TimeProvider public let dataProvider: BlockchainDataProvider + public let keystore: KeyStore + public let network: NetworkManager public init( config: Config, genesis: Genesis, - eventBus: EventBus + eventBus: EventBus, + keystore: KeyStore ) async throws { logger.debug("Initializing node") - let (genesisState, protocolConfig) = try await genesis.load() - dataProvider = try await BlockchainDataProvider(InMemoryDataProvider(genesis: genesisState)) + let (genesisState, genesisBlock, protocolConfig) = try await genesis.load() + dataProvider = try await BlockchainDataProvider(InMemoryDataProvider(genesisState: genesisState, genesisBlock: genesisBlock)) timeProvider = SystemTimeProvider() - blockchain = try await Blockchain( + let blockchain = try await Blockchain( config: protocolConfig, dataProvider: dataProvider, timeProvider: timeProvider, eventBus: eventBus ) + self.blockchain = blockchain - rpcServer = try Server(config: config.rpc, source: blockchain) + self.keystore = keystore + + network = try NetworkManager( + config: config.network, + blockchain: blockchain + ) + + rpcServer = try config.rpc.map { + try Server(config: $0, source: blockchain) + } } public func wait() async throws { - try await rpcServer.wait() + try await rpcServer?.wait() } } diff --git a/Node/Sources/Node/ValidatorNode.swift b/Node/Sources/Node/ValidatorNode.swift index 7307a55f..b0a0fb6b 100644 --- a/Node/Sources/Node/ValidatorNode.swift +++ b/Node/Sources/Node/ValidatorNode.swift @@ -6,10 +6,10 @@ import Utils public class ValidatorNode: Node { private var validator: ValidatorService! - public required init( - genesis: Genesis, config: Config, eventBus: EventBus, keystore: KeyStore + override public required init( + config: Config, genesis: Genesis, eventBus: EventBus, keystore: KeyStore ) async throws { - try await super.init(config: config, genesis: genesis, eventBus: eventBus) + try await super.init(config: config, genesis: genesis, eventBus: eventBus, keystore: keystore) let scheduler = DispatchQueueScheduler(timeProvider: timeProvider) validator = await ValidatorService( @@ -20,7 +20,7 @@ public class ValidatorNode: Node { dataProvider: dataProvider ) - let genesisState = try await blockchain.getState(hash: Data32()) + let genesisState = try await blockchain.getState(hash: blockchain.dataProvider.genesisBlockHash) await validator.on(genesis: genesisState!) } diff --git a/TracingUtils/Sources/TracingUtils/unreachable.swift b/TracingUtils/Sources/TracingUtils/unreachable.swift index fb382c29..2b0ed647 100644 --- a/TracingUtils/Sources/TracingUtils/unreachable.swift +++ b/TracingUtils/Sources/TracingUtils/unreachable.swift @@ -2,7 +2,7 @@ import Logging let logger = Logger(label: "tracing-utils.assertions") -enum AssertionError: Error { +public enum AssertionError: Error { case unreachable(String) } diff --git a/Utils/Sources/Utils/Regexs.swift b/Utils/Sources/Utils/Regexs.swift deleted file mode 100644 index c4cf42e1..00000000 --- a/Utils/Sources/Utils/Regexs.swift +++ /dev/null @@ -1,66 +0,0 @@ -import Foundation - -public enum RegexsError: Error { - case invalidFormat - case invalidPort -} - -public enum Regexs { - // Combined regex pattern for IP address with port - public static func parseAddress(_ address: String) throws -> (ip: String, port: Int) { - let ipv4Pattern: String = [ - "(?:", - "(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)", - "\\.", - "){3}", - "(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)", - ].reduce("", +) - - let ipv6Pattern = [ - "(?:", - "(?:(?:[0-9A-Fa-f]{1,4}:){6}", - "|::(?:[0-9A-Fa-f]{1,4}:){5}", - "|(?:[0-9A-Fa-f]{1,4})?::(?:[0-9A-Fa-f]{1,4}:){4}", - "|(?:(?:[0-9A-Fa-f]{1,4}:){0,1}[0-9A-Fa-f]{1,4})?::(?:[0-9A-Fa-f]{1,4}:){3}", - "|(?:(?:[0-9A-Fa-f]{1,4}:){0,2}[0-9A-Fa-f]{1,4})?::(?:[0-9A-Fa-f]{1,4}:){2}", - "|(?:(?:[0-9A-Fa-f]{1,4}:){0,3}[0-9A-Fa-f]{1,4})?::[0-9A-Fa-f]{1,4}:", - "|(?:(?:[0-9A-Fa-f]{1,4}:){0,4}[0-9A-Fa-f]{1,4})?::)", - "(?:", - "[0-9A-Fa-f]{1,4}:[0-9A-Fa-f]{1,4}", - "|(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\\.){3}", - "(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)", - ")", - "|(?:(?:[0-9A-Fa-f]{1,4}:){0,5}[0-9A-Fa-f]{1,4})?::[0-9A-Fa-f]{1,4}", - "|(?:(?:[0-9A-Fa-f]{1,4}:){0,6}[0-9A-Fa-f]{1,4})?::", - ")", - ].reduce("", +) - let ipAddressWithPortPattern = #"(?:(\#(ipv4Pattern))|\[(\#(ipv6Pattern))\]):(\d{1,5})"# - - let regex = try NSRegularExpression(pattern: ipAddressWithPortPattern, options: []) - let range = NSRange(location: 0, length: address.utf16.count) - - if let match = regex.firstMatch(in: address, options: [], range: range) { - let ipRange: Range - if let ipv4Range = Range(match.range(at: 1), in: address) { - ipRange = ipv4Range - } else if let ipv6Range = Range(match.range(at: 2), in: address) { - ipRange = ipv6Range - } else { - throw RegexsError.invalidFormat - } - - let portRange = Range(match.range(at: 3), in: address)! - - let ip = String(address[ipRange]) - let portString = String(address[portRange]) - - if let port = Int(portString), (0 ... 65535).contains(port) { - return (ip, port) - } else { - throw RegexsError.invalidPort - } - } else { - throw RegexsError.invalidFormat - } - } -} diff --git a/Utils/Tests/UtilsTests/RegexsTests.swift b/Utils/Tests/UtilsTests/RegexsTests.swift deleted file mode 100644 index b3eec3e4..00000000 --- a/Utils/Tests/UtilsTests/RegexsTests.swift +++ /dev/null @@ -1,30 +0,0 @@ -import Foundation -import Testing - -@testable import Utils - -struct RegexsTests { - @Test func parseAddress() throws { - // Correct IPv4 address - #expect(try Regexs.parseAddress("127.0.0.1:9955") == ("127.0.0.1", 9955)) - - // Correct IPv6 addresses - #expect(try Regexs - .parseAddress("[2001:0db8:85a3:0000:0000:8a2e:0370:7334]:8080") == ("2001:0db8:85a3:0000:0000:8a2e:0370:7334", 8080)) - #expect(try Regexs.parseAddress("[2001:db8:85a3::8a2e:370:7334]:8080") == ("2001:db8:85a3::8a2e:370:7334", 8080)) - #expect(try Regexs.parseAddress("[::1]:8080") == ("::1", 8080)) - - // Exception case: Missing port - #expect(throws: RegexsError.invalidFormat) { try Regexs.parseAddress("127.0.0.1") } - #expect(throws: RegexsError.invalidFormat) { try Regexs.parseAddress("abcd:::") } - // Exception case: Invalid port - #expect(throws: RegexsError.invalidPort) { try Regexs.parseAddress("127.0.0.1:75535") } - #expect(throws: RegexsError.invalidPort) { try Regexs.parseAddress("[2001:db8::1]:75535") } - - // Exception case: Invalid IPv4 format - #expect(throws: RegexsError.invalidFormat) { try Regexs.parseAddress("256.256.256.256:8080") } - - // Exception case: Invalid IPv6 format - #expect(throws: RegexsError.invalidFormat) { try Regexs.parseAddress("[2001:db8:::1]:8080") } - } -}