Skip to content

Commit

Permalink
add node test and fixes (#199)
Browse files Browse the repository at this point in the history
* add node test and fixes

* fix

* fix block author

* fix test
  • Loading branch information
xlc authored Oct 28, 2024
1 parent b0d251f commit 80a45d5
Show file tree
Hide file tree
Showing 19 changed files with 368 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ extension BlockchainDataProvider {
try await dataProvider.remove(hash: hash)
}

public var genesisBlockHash: Data32 {
public nonisolated var genesisBlockHash: Data32 {
dataProvider.genesisBlockHash
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import Blockchain
import Foundation
import TracingUtils
import Utils

private let logger = Logger(label: "MockScheduler")

final class SchedulerTask: Sendable, Comparable {
private final class SchedulerTask: Sendable, Comparable {
let id: UniqueId
let scheduleTime: TimeInterval
let repeats: TimeInterval?
Expand Down Expand Up @@ -35,23 +34,27 @@ final class SchedulerTask: Sendable, Comparable {
}
}

struct Storage: Sendable {
var tasks: SortedArray<SchedulerTask> = .init([])
private struct Storage: Sendable {
fileprivate var tasks: SortedArray<SchedulerTask> = .init([])
}

final class MockScheduler: Scheduler, Sendable {
let mockTimeProvider: MockTimeProvider
var timeProvider: TimeProvider {
public final class MockScheduler: Scheduler, Sendable {
public let mockTimeProvider: MockTimeProvider
public var timeProvider: TimeProvider {
mockTimeProvider
}

let storage: ThreadSafeContainer<Storage> = .init(.init())
private let storage: ThreadSafeContainer<Storage> = .init(.init())

init(timeProvider: MockTimeProvider) {
public init(timeProvider: MockTimeProvider) {
mockTimeProvider = timeProvider
}

func scheduleImpl(
public var taskCount: Int {
storage.read { $0.tasks.array.count }
}

public func scheduleImpl(
delay: TimeInterval,
repeats: Bool,
task: @escaping @Sendable () async -> Void,
Expand All @@ -74,13 +77,13 @@ final class MockScheduler: Scheduler, Sendable {
}
}

func advance(by interval: TimeInterval) async {
public func advance(by interval: TimeInterval) async {
let to = timeProvider.getTimeInterval() + interval
while await advanceNext(to: to) {}
mockTimeProvider.advance(to: to)
}

func advanceNext(to time: TimeInterval) async -> Bool {
private func advanceNext(to time: TimeInterval) async -> Bool {
let task: SchedulerTask? = storage.write { storage in
if let task = storage.tasks.array.first, task.scheduleTime <= time {
storage.tasks.remove(at: 0)
Expand Down
8 changes: 6 additions & 2 deletions Blockchain/Sources/Blockchain/Validator/BlockAuthor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ public final class BlockAuthor: ServiceBase2, @unchecked Sendable {
await subscribe(RuntimeEvents.SafroleTicketsGenerated.self, id: "BlockAuthor.SafroleTicketsGenerated") { [weak self] event in
try await self?.on(safroleTicketsGenerated: event)
}
}

public func onSyncCompleted() async {
scheduleForNextEpoch("BlockAuthor.scheduleForNextEpoch") { [weak self] epoch in
await self?.onBeforeEpoch(epoch: epoch)
}
Expand All @@ -37,7 +39,7 @@ public final class BlockAuthor: ServiceBase2, @unchecked Sendable {
public func on(genesis _: StateRef) async {
let nowTimeslot = timeProvider.getTime().timeToTimeslot(config: config)
// schedule for current epoch
let epoch = (nowTimeslot + 1).timeslotToEpochIndex(config: config)
let epoch = nowTimeslot.timeslotToEpochIndex(config: config)
await onBeforeEpoch(epoch: epoch)
}

Expand Down Expand Up @@ -179,8 +181,10 @@ public final class BlockAuthor: ServiceBase2, @unchecked Sendable {

let bestHeadTimeslot = bestHead.timeslot
let bestHeadEpoch = bestHeadTimeslot.timeslotToEpochIndex(config: config)
if bestHeadEpoch + 1 != epoch {
if bestHeadEpoch != 0, bestHeadEpoch + 1 < epoch {
logger.warning("best head epoch \(bestHeadEpoch) is too far from current epoch \(epoch)")
} else if bestHeadEpoch >= epoch {
logger.error("trying to do onBeforeEpoch for epoch \(epoch) but best head epoch is \(bestHeadEpoch)")
}

let state = try await dataProvider.getState(hash: bestHead.hash)
Expand Down
2 changes: 1 addition & 1 deletion Blockchain/Sources/Blockchain/Validator/ServiceBase2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public class ServiceBase2: ServiceBase, @unchecked Sendable {
public func scheduleForNextEpoch(_ id: UniqueId, fn: @escaping @Sendable (EpochIndex) async -> Void) -> Cancellable {
let now = timeProvider.getTime()
let nowTimeslot = now.timeToTimeslot(config: config)
let nextEpoch = (nowTimeslot + 1).timeslotToEpochIndex(config: config) + 1
let nextEpoch = nowTimeslot.timeslotToEpochIndex(config: config) + 1
return scheduleFor(epoch: nextEpoch, id: id, fn: fn)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ public final class ValidatorService: Sendable {
)
}

public func onSyncCompleted() async {
await blockAuthor.onSyncCompleted()
}

public func on(genesis: StateRef) async {
await safrole.on(genesis: genesis)
await blockAuthor.on(genesis: genesis)
Expand Down
3 changes: 2 additions & 1 deletion Blockchain/Tests/BlockchainTests/BlockAuthorTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ struct BlockAuthorTests {
let services = await BlockchainServices()
let blockAuthor = await services.blockAuthor
let runtime = Runtime(config: services.config)
await blockAuthor.onSyncCompleted()
return (services, blockAuthor, runtime)
}

Expand Down Expand Up @@ -108,7 +109,7 @@ struct BlockAuthorTests {

await blockAuthor.on(genesis: genesisState)

#expect(scheduler.storage.value.tasks.count > 0)
#expect(scheduler.taskCount > 0)

await scheduler.advance(by: TimeInterval(2))

Expand Down
7 changes: 4 additions & 3 deletions Blockchain/Tests/BlockchainTests/ValidatorServiceTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ struct ValidatorServiceTests {
time: TimeInterval = 988,
keysCount: Int = 12
) async throws -> (BlockchainServices, ValidatorService) {
setupTestLogger()
// setupTestLogger()

let services = await BlockchainServices(
config: config,
Expand All @@ -25,6 +25,7 @@ struct ValidatorServiceTests {
scheduler: services.scheduler,
dataProvider: services.dataProvider
)
await validatorService.onSyncCompleted()
return (services, validatorService)
}

Expand All @@ -45,7 +46,7 @@ struct ValidatorServiceTests {
#expect(safroleEvents.count == config.value.totalNumberOfValidators)

// Check if block author tasks were scheduled
#expect(scheduler.storage.value.tasks.count > 0)
#expect(scheduler.taskCount > 0)
}

@Test
Expand Down Expand Up @@ -96,7 +97,7 @@ struct ValidatorServiceTests {

// try different genesis time offset to ensure edge cases are covered
@Test(arguments: [988, 1000, 1003, 1021])
func makeManyBlocks(time: Int) async throws {
func makeManyBlocksWithAllKeys(time: Int) async throws {
let (services, validatorService) = try await setup(time: TimeInterval(time))
let genesisState = services.genesisState
let storeMiddleware = services.storeMiddleware
Expand Down
7 changes: 2 additions & 5 deletions Networking/Sources/Networking/Connection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public final class Connection<Handler: StreamHandler>: Sendable, ConnectionInfoP
}

public func request(_ request: Handler.EphemeralHandler.Request) async throws -> Data {
logger.trace("sending request", metadata: ["kind": "\(request.kind)"])
let data = try request.encode()
let kind = request.kind
let stream = try createStream(kind: kind)
Expand Down Expand Up @@ -194,11 +195,7 @@ func presistentStreamRunLoop<Handler: StreamHandler>(
)
var decoder = handler.createDecoder(kind: kind)
do {
while true {
let data = try await receiveMaybeData(stream: stream)
guard let data else {
break
}
while let data = try await receiveMaybeData(stream: stream) {
let msg = try decoder.decode(data: data)
try await handler.handle(connection: connection, message: msg)
}
Expand Down
21 changes: 12 additions & 9 deletions Networking/Sources/Networking/Peer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,16 @@ public final class Peer<Handler: StreamHandler>: Sendable {
// TODO: see if we can remove the role parameter
public func connect(to address: NetAddr, role: PeerRole) throws -> Connection<Handler> {
let conn = impl.connections.read { connections in
connections.byType[role]?[address]
connections.byAddr[address]?.0
}
return try conn ?? impl.connections.write { connections in
let curr = connections.byType[role, default: [:]][address]
let curr = connections.byAddr[address]?.0
if let curr {
return curr
}

logger.debug("connecting to peer", metadata: ["address": "\(address)", "role": "\(role)"])

let quicConn = try QuicConnection(
handler: PeerEventHandler(self.impl),
registration: self.impl.clientConfiguration.registration,
Expand All @@ -127,7 +130,7 @@ public final class Peer<Handler: StreamHandler>: Sendable {
remoteAddress: address,
initiatedByLocal: true
)
connections.byType[role, default: [:]][address] = conn
connections.byAddr[address] = (conn, role)
connections.byId[conn.id] = conn
return conn
}
Expand All @@ -144,7 +147,7 @@ public final class Peer<Handler: StreamHandler>: Sendable {
}
for connection in connections {
if let stream = try? connection.createPreistentStream(kind: kind) {
let res = Result(catching: { try stream.send(data: messageData) })
let res = Result(catching: { try stream.send(message: messageData) })
switch res {
case .success:
break
Expand All @@ -167,7 +170,7 @@ public final class Peer<Handler: StreamHandler>: Sendable {

final class PeerImpl<Handler: StreamHandler>: Sendable {
struct ConnectionStorage {
var byType: [PeerRole: [NetAddr: Connection<Handler>]] = [:]
var byAddr: [NetAddr: (Connection<Handler>, PeerRole)] = [:]
var byId: [UniqueId: Connection<Handler>] = [:]
}

Expand Down Expand Up @@ -212,14 +215,14 @@ final class PeerImpl<Handler: StreamHandler>: Sendable {
func addConnection(_ connection: QuicConnection, addr: NetAddr, role: PeerRole) -> Bool {
connections.write { connections in
if role == .builder {
let currentCount = connections.byType[role]?.count ?? 0
let currentCount = connections.byAddr.values.filter { $0.1 == role }.count
if currentCount >= self.settings.maxBuilderConnections {
self.logger.warning("max builder connections reached")
// TODO: consider connection rotation strategy
return false
}
}
if connections.byType[role, default: [:]][addr] != nil {
if connections.byAddr[addr] != nil {
self.logger.warning("connection already exists")
return false
}
Expand All @@ -230,7 +233,7 @@ final class PeerImpl<Handler: StreamHandler>: Sendable {
remoteAddress: addr,
initiatedByLocal: false
)
connections.byType[role, default: [:]][addr] = conn
connections.byAddr[addr] = (conn, role)
connections.byId[connection.id] = conn
return true
}
Expand Down Expand Up @@ -328,7 +331,7 @@ private struct PeerEventHandler<Handler: StreamHandler>: QuicEventHandler {
impl.connections.write { connections in
if let conn = connections.byId[connection.id] {
connections.byId.removeValue(forKey: connection.id)
connections.byType[conn.role]?.removeValue(forKey: conn.remoteAddress)
connections.byAddr[conn.remoteAddress] = nil
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions Networking/Sources/Networking/Stream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ final class Stream<Handler: StreamHandler>: Sendable, StreamProtocol {
if data.isEmpty {
return
}
guard canReceive else {
logger.warning("unexpected status: \(status)")
return
}
if !channel.syncSend(data) {
logger.warning("stream \(id) is full")
// TODO: backpressure handling
Expand Down
26 changes: 6 additions & 20 deletions Networking/Tests/NetworkingTests/PeerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@ struct PeerTests {
var kind: Kind
var data: Data
func encode() throws -> Data {
let length = UInt32(data.count)
var lengthData = withUnsafeBytes(of: length.littleEndian) { Data($0) }
lengthData.append(data)
return lengthData
data
}

typealias StreamKind = Kind
Expand Down Expand Up @@ -101,19 +98,8 @@ struct PeerTests {

func handle(connection _: any ConnectionInfoProtocol, request: Request) async throws -> Data {
let data = request.data
guard data.count >= 4 else {
throw NSError(
domain: "ExtractError", code: 1,
userInfo: [NSLocalizedDescriptionKey: "Data too short to contain length"]
)
}
let lengthData = data.prefix(4)
let length = UInt32(
littleEndian: lengthData.withUnsafeBytes { $0.loadUnaligned(as: UInt32.self) }
)
let actualData = data.dropFirst(4).prefix(Int(length))
await dataStorage.updateData(actualData)
return actualData
await dataStorage.updateData(data)
return data
}
}

Expand Down Expand Up @@ -197,7 +183,7 @@ struct PeerTests {
kind: .uniqueB, message: .init(kind: .uniqueB, data: Data("I am jam".utf8))
)
// Verify last received data
try? await Task.sleep(for: .milliseconds(1000))
try? await Task.sleep(for: .milliseconds(100))
await #expect(handler2.lastReceivedData == Data("hello world".utf8))
await #expect(handler1.lastReceivedData == Data("I am jam".utf8))
}
Expand Down Expand Up @@ -422,14 +408,14 @@ struct PeerTests {
}

// Wait for all connections to establish
try? await Task.sleep(for: .seconds(10))
try? await Task.sleep(for: .seconds(1))

let centralPeer = peers[0]
let messagedata = Data("Sync message".utf8)
centralPeer.broadcast(kind: .uniqueA, message: MockRequest(kind: .uniqueA, data: messagedata))

// Wait for message to propagate
try? await Task.sleep(for: .seconds(60))
try? await Task.sleep(for: .seconds(2))

// Check that each peer received the broadcast
for i in 1 ..< handles.count {
Expand Down
20 changes: 19 additions & 1 deletion Node/Package.resolved

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 80a45d5

Please sign in to comment.