diff --git a/Networking/Sources/MsQuicSwift/QuicStream.swift b/Networking/Sources/MsQuicSwift/QuicStream.swift index 9690d75f..d19a421e 100644 --- a/Networking/Sources/MsQuicSwift/QuicStream.swift +++ b/Networking/Sources/MsQuicSwift/QuicStream.swift @@ -8,6 +8,10 @@ private struct Storage { let connection: QuicConnection } +public enum SendError: Error { + case emptyData +} + public final class QuicStream: Sendable { public let id: UniqueId private let logger: Logger @@ -97,9 +101,13 @@ public final class QuicStream: Sendable { throw QuicError.alreadyClosed } - // TODO: improve the case when data is empty let messageLength = data.count + if messageLength == 0 { + logger.trace("No data to send.") + throw SendError.emptyData // Throw a specific error or return + } + let sendBufferRaw = UnsafeMutableRawPointer.allocate( // !! allocate byteCount: MemoryLayout.size + messageLength, alignment: MemoryLayout.alignment @@ -107,8 +115,7 @@ public final class QuicStream: Sendable { let sendBuffer = sendBufferRaw.assumingMemoryBound(to: QUIC_BUFFER.self) let bufferPointer = sendBufferRaw.advanced(by: MemoryLayout.size).assumingMemoryBound(to: UInt8.self) - data.copyBytes(to: bufferPointer, count: messageLength) // TODO: figure out a better way to avoid memory copy here - + data.withUnsafeBytes { bufferPointer.update(from: $0.baseAddress!.assumingMemoryBound(to: UInt8.self), count: messageLength) } sendBuffer.pointee.Buffer = bufferPointer sendBuffer.pointee.Length = UInt32(messageLength) diff --git a/Networking/Sources/Networking/Connection.swift b/Networking/Sources/Networking/Connection.swift index 46f2e7e4..998b2116 100644 --- a/Networking/Sources/Networking/Connection.swift +++ b/Networking/Sources/Networking/Connection.swift @@ -1,6 +1,7 @@ import AsyncChannels import Foundation import MsQuicSwift +import Synchronization import TracingUtils import Utils @@ -36,8 +37,7 @@ public final class Connection: Sendable, ConnectionInfoP public let role: PeerRole public let remoteAddress: NetAddr - private let lastActive: ThreadSafeContainer = .init(0) - + private let lastActive: Atomic = Atomic(0) let presistentStreams: ThreadSafeContainer< [Handler.PresistentHandler.StreamKind: Stream] > = .init([:]) @@ -59,8 +59,8 @@ public final class Connection: Sendable, ConnectionInfoP } } - public var lastActiveTimeStamp: TimeInterval { - lastActive.read { $0 } + func getLastActive() -> TimeInterval { + lastActive.load(ordering: .sequentiallyConsistent) } public var id: UniqueId { @@ -73,11 +73,11 @@ public final class Connection: Sendable, ConnectionInfoP self.role = role self.remoteAddress = remoteAddress self.initiatedByLocal = initiatedByLocal - lastActive.write { $0 = Date().timeIntervalSince1970 } + updateLastActive() } func updateLastActive() { - lastActive.write { $0 = Date().timeIntervalSince1970 } + lastActive.store(Date().timeIntervalSince1970, ordering: .releasing) } func opened(publicKey: Data) throws { diff --git a/Networking/Sources/Networking/Peer.swift b/Networking/Sources/Networking/Peer.swift index 16d7717a..fcccccca 100644 --- a/Networking/Sources/Networking/Peer.swift +++ b/Networking/Sources/Networking/Peer.swift @@ -70,7 +70,7 @@ public struct PeerOptions: Sendable { } } -// TODO: reopen UP stream, peer reputation system to ban peers not following the protocol +// TODO: peer reputation system to ban peers not following the protocol public final class Peer: Sendable { private let impl: PeerImpl @@ -272,7 +272,7 @@ final class PeerImpl: Sendable { let currentCount = connections.byAddr.values.filter { $0.role == role }.count if currentCount >= self.settings.maxBuilderConnections { if let conn = connections.byAddr.values.filter({ $0.role == .builder }) - .sorted(by: { $0.lastActiveTimeStamp < $1.lastActiveTimeStamp }).first + .sorted(by: { $0.getLastActive() < $1.getLastActive() }).first { self.logger.warning("Replacing least active builder connection at \(conn.remoteAddress)") conn.close(abort: false)