Skip to content

Commit

Permalink
UP and CE messages working (#193)
Browse files Browse the repository at this point in the history
* peer mode to peer role

* peer manager

* UP and CE messages working

* fix
  • Loading branch information
xlc authored Oct 24, 2024
1 parent b4763b4 commit c095281
Show file tree
Hide file tree
Showing 17 changed files with 304 additions and 173 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,6 @@ extension BlockchainDataProvider {
try await dataProvider.getState(hash: hash)
}

public func getFinalizedHead() async throws -> Data32 {
try await dataProvider.getFinalizedHead()
}

public func getHeads() async throws -> Set<Data32> {
try await dataProvider.getHeads()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ struct BlockchainDataProviderTests {
@Test func testInitialization() async throws {
#expect(await provider.bestHead.hash == genesisBlock.hash)
#expect(await provider.finalizedHead.hash == genesisBlock.hash)
#expect(try await provider.getFinalizedHead() == genesisBlock.hash)
#expect(try await provider.getHeads() == [genesisBlock.hash])
}

Expand Down
6 changes: 5 additions & 1 deletion Boka/.swiftpm/xcode/xcshareddata/xcschemes/Boka.xcscheme
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@
</BuildableReference>
</BuildableProductRunnable>
<CommandLineArguments>
<CommandLineArgument
argument = "--p2p 127.0.0.1:19001"
isEnabled = "YES">
</CommandLineArgument>
<CommandLineArgument
argument = "--validator"
isEnabled = "YES">
Expand All @@ -75,7 +79,7 @@
<EnvironmentVariables>
<EnvironmentVariable
key = "LOG_LEVEL"
value = "trace,bandersnatch=info"
value = "trace"
isEnabled = "YES">
</EnvironmentVariable>
</EnvironmentVariables>
Expand Down
6 changes: 4 additions & 2 deletions Boka/Sources/Boka.swift
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,9 @@ struct Boka: AsyncParsableCommand {

let logger = Logger(label: "cli")

logger.info("Starting Boka. Chain: \(chain)")
logger.info("Starting Boka.")

logger.info("Chain: \(chain)")

if let name {
logger.info("Node name: \(name)")
Expand Down Expand Up @@ -131,7 +133,7 @@ struct Boka: AsyncParsableCommand {

logger.info("Network key: \(networkKey.publicKey.data.toHexString())")
let networkConfig = NetworkConfig(
mode: validator ? .validator : .builder,
role: validator ? .validator : .builder,
listenAddress: p2p,
key: networkKey
)
Expand Down
14 changes: 7 additions & 7 deletions Networking/Sources/Networking/Connection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ private let logger = Logger(label: "Connection")

public protocol ConnectionInfoProtocol {
var id: UniqueId { get }
var mode: PeerMode { get }
var role: PeerRole { get }
var remoteAddress: NetAddr { get }
}

public final class Connection<Handler: StreamHandler>: Sendable, ConnectionInfoProtocol {
let connection: QuicConnection
let impl: PeerImpl<Handler>
public let mode: PeerMode
public let role: PeerRole
public let remoteAddress: NetAddr
let presistentStreams: ThreadSafeContainer<
[Handler.PresistentHandler.StreamKind: Stream<Handler>]
Expand All @@ -26,10 +26,10 @@ public final class Connection<Handler: StreamHandler>: Sendable, ConnectionInfoP
connection.id
}

init(_ connection: QuicConnection, impl: PeerImpl<Handler>, mode: PeerMode, remoteAddress: NetAddr, initiatedByLocal: Bool) {
init(_ connection: QuicConnection, impl: PeerImpl<Handler>, role: PeerRole, remoteAddress: NetAddr, initiatedByLocal: Bool) {
self.connection = connection
self.impl = impl
self.mode = mode
self.role = role
self.remoteAddress = remoteAddress
self.initiatedByLocal = initiatedByLocal
}
Expand All @@ -42,7 +42,7 @@ public final class Connection<Handler: StreamHandler>: Sendable, ConnectionInfoP
let data = try request.encode()
let kind = request.kind
let stream = try createStream(kind: kind)
try stream.send(data: data)
try stream.send(message: data)
// TODO: pipe this to decoder directly to be able to reject early
var response = Data()
while let nextData = await stream.receive() {
Expand Down Expand Up @@ -144,7 +144,7 @@ public final class Connection<Handler: StreamHandler>: Sendable, ConnectionInfoP
}
let request = try decoder.decode(data: data)
let resp = try await impl.ephemeralStreamHandler.handle(connection: self, request: request)
try stream.send(data: resp, finish: true)
try stream.send(message: resp, finish: true)
}
}
}
Expand Down Expand Up @@ -186,7 +186,7 @@ func presistentStreamRunLoop<Handler: StreamHandler>(
guard let lengthData else {
break
}
let length = UInt32(littleEndian: lengthData.withUnsafeBytes { $0.load(as: UInt32.self) })
let length = UInt32(littleEndian: lengthData.withUnsafeBytes { $0.loadUnaligned(as: UInt32.self) })
// sanity check for length
// TODO: pick better value
guard length < 1024 * 1024 * 10 else {
Expand Down
76 changes: 76 additions & 0 deletions Networking/Sources/Networking/MockPeerEventHandler.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import Foundation
import MsQuicSwift
import Utils

public final class MockPeerEventHandler: QuicEventHandler {
public enum EventType {
case newConnection(listener: QuicListener, connection: QuicConnection, info: ConnectionInfo)
case shouldOpen(connection: QuicConnection, certificate: Data?)
case connected(connection: QuicConnection)
case shutdownInitiated(connection: QuicConnection, reason: ConnectionCloseReason)
case streamStarted(connection: QuicConnection, stream: QuicStream)
case dataReceived(stream: QuicStream, data: Data)
case closed(stream: QuicStream, status: QuicStatus, code: QuicErrorCode)
}

public let events: ThreadSafeContainer<[EventType]> = .init([])

public init() {}

public func newConnection(
_ listener: QuicListener, connection: QuicConnection, info: ConnectionInfo
) -> QuicStatus {
events.write { events in
events.append(.newConnection(listener: listener, connection: connection, info: info))
}

return .code(.success)
}

public func shouldOpen(_: QuicConnection, certificate: Data?) -> QuicStatus {
guard let certificate else {
return .code(.requiredCert)
}
do {
let (publicKey, alternativeName) = try parseCertificate(data: certificate, type: .x509)
if alternativeName != generateSubjectAlternativeName(pubkey: publicKey) {
return .code(.badCert)
}
} catch {
return .code(.badCert)
}
return .code(.success)
}

public func connected(_ connection: QuicConnection) {
events.write { events in
events.append(.connected(connection: connection))
}
}

public func shutdownInitiated(_ connection: QuicConnection, reason: ConnectionCloseReason) {
print("shutdownInitiated \(connection.id) with reason \(reason)")
events.write { events in
events.append(.shutdownInitiated(connection: connection, reason: reason))
}
}

public func streamStarted(_ connect: QuicConnection, stream: QuicStream) {
events.write { events in
events.append(.streamStarted(connection: connect, stream: stream))
}
}

public func dataReceived(_ stream: QuicStream, data: Data) {
events.write { events in
events.append(.dataReceived(stream: stream, data: data))
}
}

public func closed(_ stream: QuicStream, status: QuicStatus, code: QuicErrorCode) {
print("closed stream \(stream.id) with status \(status) and code \(code)")
events.write { events in
events.append(.closed(stream: stream, status: status, code: code))
}
}
}
Loading

0 comments on commit c095281

Please sign in to comment.