Skip to content

Commit

Permalink
avoid race conditions in stream (#173)
Browse files Browse the repository at this point in the history
* avoid race conditions in stream

* fix
  • Loading branch information
xlc authored Oct 16, 2024
1 parent 1024df2 commit 372314d
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 10 deletions.
33 changes: 23 additions & 10 deletions Networking/Sources/Networking/Stream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -62,32 +62,45 @@ public final class Stream: Sendable {
}

public func receive() async -> Data? {
if let data = nextData.value {
nextData.value = nil
if let data = nextData.exchange(nil) {
return data
}
return await channel.receive()
}

public func receiveByte() async -> UInt8? {
if var data = nextData.value {
let byte = data.removeFirst()
if data.isEmpty {
nextData.value = nil
let byte = nextData.write { nextData -> UInt8? in
if var data = nextData {
let byte = data.removeFirst()
if data.isEmpty {
nextData = nil
} else {
nextData = data
}
return byte
} else {
nextData.value = data
return nil
}
}
if let byte {
return byte
}

guard var data = await receive() else {
return nil
}

let byte = data.removeFirst()
let byte2 = data.removeFirst()
if !data.isEmpty {
nextData.value = data
// TODO: this can append data in wrong order if receiveByte is called concurrently
nextData.write { nextData in
if let currentData = nextData {
nextData = currentData + data
} else {
nextData = data
}
}
}
return byte
return byte2
}
}
8 changes: 8 additions & 0 deletions Utils/Sources/Utils/ThreadSafeContainer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,14 @@ public final class ThreadSafeContainer<T>: @unchecked Sendable {
}
}

public func exchange(_ value: T) -> T {
lock.withWriteLock {
let ret = self.storage
self.storage = value
return ret
}
}

public var value: T {
get {
read { $0 }
Expand Down

0 comments on commit 372314d

Please sign in to comment.