Skip to content

Commit

Permalink
Sync manager (#196)
Browse files Browse the repository at this point in the history
* rename

* syncing

* sync
  • Loading branch information
xlc authored Oct 25, 2024
1 parent 98694d8 commit 1d51337
Show file tree
Hide file tree
Showing 16 changed files with 364 additions and 82 deletions.
5 changes: 5 additions & 0 deletions Blockchain/Sources/Blockchain/Blockchain.swift
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ public final class Blockchain: ServiceBase, @unchecked Sendable {
public func importBlock(_ block: BlockRef) async throws {
logger.debug("importing block: #\(block.header.timeslot) \(block.hash)")

if try await dataProvider.hasBlock(hash: block.hash) {
logger.debug("block already imported", metadata: ["hash": "\(block.hash)"])
return
}

try await withSpan("importBlock") { span in
span.attributes.blockHash = block.hash.description

Expand Down
30 changes: 29 additions & 1 deletion Blockchain/Sources/Blockchain/Types/Header.swift
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,35 @@ extension Header {
}
}

public typealias HeaderRef = Ref<Header>
public final class HeaderRef: Ref<Header>, @unchecked Sendable {
public required init(_ value: Header) {
lazyHash = Lazy {
Ref(value.hash())
}

super.init(value)
}

private let lazyHash: Lazy<Ref<Data32>>

public var hash: Data32 {
lazyHash.value.value
}

override public var description: String {
"Header(hash: \(hash), timeslot: \(value.timeslot))"
}
}

extension HeaderRef: Codable {
public convenience init(from decoder: Decoder) throws {
try self.init(.init(from: decoder))
}

public func encode(to encoder: Encoder) throws {
try value.encode(to: encoder)
}
}

extension Header.Unsigned: Dummy {
public typealias Config = ProtocolConfigRef
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import Foundation
import Utils

public class ValidatorService {
public final class ValidatorService: Sendable {
private let blockchain: Blockchain
private let keystore: KeyStore
private let safrole: SafroleService
Expand Down
25 changes: 23 additions & 2 deletions Boka/Sources/Boka.swift
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ struct Boka: AsyncParsableCommand {
var basePath: String?

@Option(name: .long, help: "A preset config or path to chain config file.")
var chain: Genesis = .preset(.dev)
var chain: Genesis = .preset(.minimal)

@Option(name: .long, help: "Listen address for RPC server. Pass 'no' to disable RPC server. Default to 127.0.0.1:9955.")
var rpc: MaybeEnabled<NetAddr> = .enabled(NetAddr(address: "127.0.0.1:9955")!)
Expand All @@ -81,6 +81,12 @@ struct Boka: AsyncParsableCommand {
@Option(name: .long, help: "Node name. For telemetry only.")
var name: String?

@Flag(name: .long, help: "Enable local mode, whereas peers are not expected.")
var local: Bool = false

@Flag(name: .long, help: "Enable dev mode. This is equivalent to --local --validator")
var dev: Bool = false

mutating func run() async throws {
let services = try await Tracing.bootstrap("Boka", loggerOnly: true)
for service in services {
Expand All @@ -95,6 +101,16 @@ struct Boka: AsyncParsableCommand {

logger.info("Chain: \(chain)")

if dev {
local = true
validator = true
logger.info("Dev mode enabled. Enabling local and validator.")
}

if local {
logger.info("Local mode enabled.")
}

if let name {
logger.info("Node name: \(name)")
}
Expand Down Expand Up @@ -146,7 +162,12 @@ struct Boka: AsyncParsableCommand {
handlerMiddleware: .tracing(prefix: "Handler")
)

let config = Node.Config(rpc: rpcConfig, network: networkConfig, peers: peers)
let config = Node.Config(
rpc: rpcConfig,
network: networkConfig,
peers: peers,
local: local
)

let node: Node = if validator {
try await ValidatorNode(
Expand Down
4 changes: 2 additions & 2 deletions Networking/Sources/Networking/Peer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,8 @@ public final class Peer<Handler: StreamHandler>: Sendable {
}

// there should be only one connection per peer
public func getPeersCount() -> Int {
impl.connections.value.byId.values.count
public var peersCount: Int {
impl.connections.read { $0.byId.count }
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import Utils

public struct BlockRequest: Codable, Sendable {
public enum Direction: UInt8, Codable, Sendable {
case ascendingExcludsive = 0
case descendingInclusive = 1
}

public var hash: Data32
public var direction: Direction
public var maxBlocks: UInt32
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import Foundation
import Networking

public enum CERequest: Sendable {
case blockRequest(BlockRequest)
case safroleTicket1(SafroleTicketMessage)
case safroleTicket2(SafroleTicketMessage)
}
Expand All @@ -13,6 +14,8 @@ extension CERequest: RequestProtocol {

public func encode() throws -> Data {
switch self {
case let .blockRequest(message):
try JamEncoder.encode(message)
case let .safroleTicket1(message):
try JamEncoder.encode(message)
case let .safroleTicket2(message):
Expand All @@ -22,6 +25,8 @@ extension CERequest: RequestProtocol {

public var kind: CommonEphemeralStreamKind {
switch self {
case .blockRequest:
.blockRequest
case .safroleTicket1:
.safroleTicket1
case .safroleTicket2:
Expand All @@ -31,6 +36,8 @@ extension CERequest: RequestProtocol {

static func getType(kind: CommonEphemeralStreamKind) -> Decodable.Type {
switch kind {
case .blockRequest:
BlockRequest.self
case .safroleTicket1:
SafroleTicketMessage.self
case .safroleTicket2:
Expand All @@ -42,6 +49,11 @@ extension CERequest: RequestProtocol {

static func from(kind: CommonEphemeralStreamKind, data: any Decodable) -> CERequest? {
switch kind {
case .blockRequest:
guard let message = data as? BlockRequest else {
return nil
}
return .blockRequest(message)
case .safroleTicket1:
guard let message = data as? SafroleTicketMessage else {
return nil
Expand All @@ -57,31 +69,3 @@ extension CERequest: RequestProtocol {
}
}
}

extension CERequest {
public func handle(blockchain: Blockchain) async throws -> (any Encodable)? {
switch self {
case let .safroleTicket1(message):
blockchain.publish(event: RuntimeEvents.SafroleTicketsReceived(
items: [
ExtrinsicTickets.TicketItem(
attempt: message.attempt,
signature: message.proof
),
]
))
// TODO: rebroadcast to other peers after some time
return nil
case let .safroleTicket2(message):
blockchain.publish(event: RuntimeEvents.SafroleTicketsReceived(
items: [
ExtrinsicTickets.TicketItem(
attempt: message.attempt,
signature: message.proof
),
]
))
return nil
}
}
}
4 changes: 2 additions & 2 deletions Node/Sources/Node/NetworkingProtocol/Network.swift
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ public final class Network: Sendable {
try peer.listenAddress()
}

public func getPeersCount() -> Int {
peer.getPeersCount()
public var peersCount: Int {
peer.peersCount
}
}

Expand Down
15 changes: 15 additions & 0 deletions Node/Sources/Node/NetworkingProtocol/NetworkEvents.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import Blockchain
import Utils

public enum NetworkEvents {
public struct PeerAdded: Event {
public let info: PeerInfo
}

public struct PeerUpdated: Event {
public let info: PeerInfo
public let newBlockHeader: HeaderRef
}

public struct BulkSyncCompleted: Event {}
}
Loading

0 comments on commit 1d51337

Please sign in to comment.