Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UP and CE messages working #193

Merged
merged 4 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,6 @@ extension BlockchainDataProvider {
try await dataProvider.getState(hash: hash)
}

public func getFinalizedHead() async throws -> Data32 {
try await dataProvider.getFinalizedHead()
}

public func getHeads() async throws -> Set<Data32> {
try await dataProvider.getHeads()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ struct BlockchainDataProviderTests {
@Test func testInitialization() async throws {
#expect(await provider.bestHead.hash == genesisBlock.hash)
#expect(await provider.finalizedHead.hash == genesisBlock.hash)
#expect(try await provider.getFinalizedHead() == genesisBlock.hash)
#expect(try await provider.getHeads() == [genesisBlock.hash])
}

Expand Down
6 changes: 5 additions & 1 deletion Boka/.swiftpm/xcode/xcshareddata/xcschemes/Boka.xcscheme
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@
</BuildableReference>
</BuildableProductRunnable>
<CommandLineArguments>
<CommandLineArgument
argument = "--p2p 127.0.0.1:19001"
isEnabled = "YES">
</CommandLineArgument>
<CommandLineArgument
argument = "--validator"
isEnabled = "YES">
Expand All @@ -75,7 +79,7 @@
<EnvironmentVariables>
<EnvironmentVariable
key = "LOG_LEVEL"
value = "trace,bandersnatch=info"
value = "trace"
isEnabled = "YES">
</EnvironmentVariable>
</EnvironmentVariables>
Expand Down
6 changes: 4 additions & 2 deletions Boka/Sources/Boka.swift
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,9 @@ struct Boka: AsyncParsableCommand {

let logger = Logger(label: "cli")

logger.info("Starting Boka. Chain: \(chain)")
logger.info("Starting Boka.")

logger.info("Chain: \(chain)")

if let name {
logger.info("Node name: \(name)")
Expand Down Expand Up @@ -131,7 +133,7 @@ struct Boka: AsyncParsableCommand {

logger.info("Network key: \(networkKey.publicKey.data.toHexString())")
let networkConfig = NetworkConfig(
mode: validator ? .validator : .builder,
role: validator ? .validator : .builder,
listenAddress: p2p,
key: networkKey
)
Expand Down
14 changes: 7 additions & 7 deletions Networking/Sources/Networking/Connection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ private let logger = Logger(label: "Connection")

public protocol ConnectionInfoProtocol {
var id: UniqueId { get }
var mode: PeerMode { get }
var role: PeerRole { get }
var remoteAddress: NetAddr { get }
}

public final class Connection<Handler: StreamHandler>: Sendable, ConnectionInfoProtocol {
let connection: QuicConnection
let impl: PeerImpl<Handler>
public let mode: PeerMode
public let role: PeerRole
public let remoteAddress: NetAddr
let presistentStreams: ThreadSafeContainer<
[Handler.PresistentHandler.StreamKind: Stream<Handler>]
Expand All @@ -26,10 +26,10 @@ public final class Connection<Handler: StreamHandler>: Sendable, ConnectionInfoP
connection.id
}

init(_ connection: QuicConnection, impl: PeerImpl<Handler>, mode: PeerMode, remoteAddress: NetAddr, initiatedByLocal: Bool) {
init(_ connection: QuicConnection, impl: PeerImpl<Handler>, role: PeerRole, remoteAddress: NetAddr, initiatedByLocal: Bool) {
self.connection = connection
self.impl = impl
self.mode = mode
self.role = role
self.remoteAddress = remoteAddress
self.initiatedByLocal = initiatedByLocal
}
Expand All @@ -42,7 +42,7 @@ public final class Connection<Handler: StreamHandler>: Sendable, ConnectionInfoP
let data = try request.encode()
let kind = request.kind
let stream = try createStream(kind: kind)
try stream.send(data: data)
try stream.send(message: data)
// TODO: pipe this to decoder directly to be able to reject early
var response = Data()
while let nextData = await stream.receive() {
Expand Down Expand Up @@ -144,7 +144,7 @@ public final class Connection<Handler: StreamHandler>: Sendable, ConnectionInfoP
}
let request = try decoder.decode(data: data)
let resp = try await impl.ephemeralStreamHandler.handle(connection: self, request: request)
try stream.send(data: resp, finish: true)
try stream.send(message: resp, finish: true)
}
}
}
Expand Down Expand Up @@ -186,7 +186,7 @@ func presistentStreamRunLoop<Handler: StreamHandler>(
guard let lengthData else {
break
}
let length = UInt32(littleEndian: lengthData.withUnsafeBytes { $0.load(as: UInt32.self) })
let length = UInt32(littleEndian: lengthData.withUnsafeBytes { $0.loadUnaligned(as: UInt32.self) })
// sanity check for length
// TODO: pick better value
guard length < 1024 * 1024 * 10 else {
Expand Down
76 changes: 76 additions & 0 deletions Networking/Sources/Networking/MockPeerEventHandler.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import Foundation
import MsQuicSwift
import Utils

public final class MockPeerEventHandler: QuicEventHandler {
public enum EventType {
case newConnection(listener: QuicListener, connection: QuicConnection, info: ConnectionInfo)
case shouldOpen(connection: QuicConnection, certificate: Data?)
case connected(connection: QuicConnection)
case shutdownInitiated(connection: QuicConnection, reason: ConnectionCloseReason)
case streamStarted(connection: QuicConnection, stream: QuicStream)
case dataReceived(stream: QuicStream, data: Data)
case closed(stream: QuicStream, status: QuicStatus, code: QuicErrorCode)
}

public let events: ThreadSafeContainer<[EventType]> = .init([])

public init() {}

public func newConnection(
_ listener: QuicListener, connection: QuicConnection, info: ConnectionInfo
) -> QuicStatus {
events.write { events in
events.append(.newConnection(listener: listener, connection: connection, info: info))
}

return .code(.success)
}

public func shouldOpen(_: QuicConnection, certificate: Data?) -> QuicStatus {
guard let certificate else {
return .code(.requiredCert)
}
do {
let (publicKey, alternativeName) = try parseCertificate(data: certificate, type: .x509)
if alternativeName != generateSubjectAlternativeName(pubkey: publicKey) {
return .code(.badCert)
}
} catch {
return .code(.badCert)
}
return .code(.success)
}

public func connected(_ connection: QuicConnection) {
events.write { events in
events.append(.connected(connection: connection))
}
}

public func shutdownInitiated(_ connection: QuicConnection, reason: ConnectionCloseReason) {
print("shutdownInitiated \(connection.id) with reason \(reason)")
events.write { events in
events.append(.shutdownInitiated(connection: connection, reason: reason))
}
}

public func streamStarted(_ connect: QuicConnection, stream: QuicStream) {
events.write { events in
events.append(.streamStarted(connection: connect, stream: stream))
}
}

public func dataReceived(_ stream: QuicStream, data: Data) {
events.write { events in
events.append(.dataReceived(stream: stream, data: data))
}
}

public func closed(_ stream: QuicStream, status: QuicStatus, code: QuicErrorCode) {
print("closed stream \(stream.id) with status \(status) and code \(code)")
events.write { events in
events.append(.closed(stream: stream, status: status, code: code))
}
}
}
Loading