Skip to content

Commit 902b18d

Browse files
authored
don't deliver events for unregistered fds (#341)
Motivation: Since forever we had a major bug in the Selector: In this condition: - kqueue/epoll had many events - in one of the earlier events we unregister a Channel whose fd is on of the later events - we subsequently (still in the same event loop tick) register a new channel which gets the same fd as the previously closed one then we would deliver an event that was meant for a previous channel to a newly opened one. Thanks to @mcdappdev for hitting this bug, helping us debug it and also providing a repeatedly working repro. Modifications: if during event delivery any fd gets unregistered, we stop delivering the remaining events and rely on the selector to redeliver them again next time. Result: we don't deliver events for previously closed channels to new ones.
1 parent 17a2aae commit 902b18d

File tree

7 files changed

+374
-11
lines changed

7 files changed

+374
-11
lines changed

Sources/NIO/BaseSocketChannel.swift

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,11 @@ private struct SocketChannelLifecycleManager {
3434
// this is queried from the Channel, ie. must be thread-safe
3535
internal let isActiveAtomic: Atomic<Bool>
3636
// these are only to be accessed on the EventLoop
37+
38+
// have we seen the `.readEOF` notification
39+
// note: this can be `false` on a deactivated channel, we might just have torn it down.
40+
var hasSeenEOFNotification: Bool = false
41+
3742
private var currentState: State = .fresh {
3843
didSet {
3944
assert(self.eventLoop.inEventLoop)
@@ -228,8 +233,8 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
228233
/// Returned by the `private func readable0()` to inform the caller about the current state of the underlying read stream.
229234
/// This is mostly useful when receiving `.readEOF` as we then need to drain the read stream fully (ie. until we receive EOF or error of course)
230235
private enum ReadStreamState {
231-
/// Everything seems normal.
232-
case normal
236+
/// Everything seems normal
237+
case normal(ReadResult)
233238

234239
/// We saw EOF.
235240
case eof
@@ -619,18 +624,23 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
619624
}
620625
}
621626

622-
private func registerForReadable() {
627+
private final func registerForReadable() {
623628
assert(eventLoop.inEventLoop)
624629
assert(self.lifecycleManager.isRegistered)
625630

631+
guard !self.lifecycleManager.hasSeenEOFNotification else {
632+
// we have seen an EOF notification before so there's no point in registering for reads
633+
return
634+
}
635+
626636
guard !self.interestedEvent.contains(.read) else {
627637
return
628638
}
629639

630640
self.safeReregister(interested: self.interestedEvent.union(.read))
631641
}
632642

633-
func unregisterForReadable() {
643+
internal final func unregisterForReadable() {
634644
assert(eventLoop.inEventLoop)
635645
assert(self.lifecycleManager.isRegistered)
636646

@@ -776,6 +786,16 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
776786
}
777787

778788
final func readEOF() {
789+
assert(!self.lifecycleManager.hasSeenEOFNotification)
790+
self.lifecycleManager.hasSeenEOFNotification = true
791+
792+
self.readEOF0()
793+
794+
assert(!self.interestedEvent.contains(.read))
795+
assert(!self.interestedEvent.contains(.readEOF))
796+
}
797+
798+
final func readEOF0() {
779799
if self.lifecycleManager.isRegistered {
780800
// we're unregistering from `readEOF` here as we want this to be one-shot. We're then synchronously
781801
// reading all input until the EOF that we're guaranteed to see. After that `readEOF` becomes uninteresting
@@ -793,7 +813,9 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
793813
assert(!self.lifecycleManager.isActive)
794814
assert(!self.lifecycleManager.isRegistered)
795815
break loop
796-
case .normal:
816+
case .normal(.none):
817+
preconditionFailure("got .readEOF and read returned not reading any bytes, nor EOF.")
818+
case .normal(.some):
797819
// normal, note that there is no guarantee we're still active (as the user might have closed in callout)
798820
continue loop
799821
}
@@ -805,7 +827,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
805827
// other words: Failing to unregister the whole selector will cause NIO to spin at 100% CPU constantly delivering
806828
// the `reset` event.
807829
final func reset() {
808-
self.readEOF()
830+
self.readEOF0()
809831

810832
if self.socket.isOpen {
811833
assert(self.lifecycleManager.isRegistered)
@@ -831,6 +853,8 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
831853
}
832854

833855
public final func readable() {
856+
assert(!self.lifecycleManager.hasSeenEOFNotification,
857+
"got a read notification after having already seen .readEOF")
834858
self.readable0()
835859
}
836860

@@ -845,8 +869,9 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
845869
}
846870
}
847871

872+
let readResult: ReadResult
848873
do {
849-
try readFromSocket()
874+
readResult = try readFromSocket()
850875
} catch let err {
851876
let readStreamState: ReadStreamState
852877
// ChannelError.eof is not something we want to fire through the pipeline as it just means the remote
@@ -885,7 +910,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
885910
pipeline.fireChannelReadComplete0()
886911
}
887912
readIfNeeded0()
888-
return .normal
913+
return .normal(readResult)
889914
}
890915

891916
/// Returns `true` if the `Channel` should be closed as result of the given `Error` which happened during `readFromSocket`.

Sources/NIO/Selector.swift

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,8 @@ final class Selector<R: Registration> {
259259
private var eventsCapacity = 64
260260
private var events: UnsafeMutablePointer<EventType>
261261
private var registrations = [Int: R]()
262+
// temporary workaround to stop us delivering outdated events; read in `whenReady`, set in `deregister`
263+
private var deregistrationsHappened: Bool = false
262264

263265
private static func allocateEventsArray(capacity: Int) -> UnsafeMutablePointer<EventType> {
264266
let events: UnsafeMutablePointer<EventType> = UnsafeMutablePointer.allocate(capacity: capacity)
@@ -453,6 +455,8 @@ final class Selector<R: Registration> {
453455
guard self.lifecycleState == .open else {
454456
throw IOError(errnoCode: EBADF, reason: "can't deregister from selector as it's \(self.lifecycleState).")
455457
}
458+
// temporary workaround to stop us delivering outdated events
459+
self.deregistrationsHappened = true
456460
try selectable.withUnsafeFileDescriptor { fd in
457461
guard let reg = registrations.removeValue(forKey: Int(fd)) else {
458462
return
@@ -500,7 +504,10 @@ final class Selector<R: Registration> {
500504
ready = Int(try Epoll.epoll_wait(epfd: self.fd, events: events, maxevents: Int32(eventsCapacity), timeout: -1))
501505
}
502506

503-
for i in 0..<ready {
507+
// start with no deregistrations happened
508+
self.deregistrationsHappened = false
509+
// temporary workaround to stop us delivering outdated events; possibly set in `deregister`
510+
for i in 0..<ready where !self.deregistrationsHappened {
504511
let ev = events[i]
505512
switch ev.data.fd {
506513
case eventfd:
@@ -540,7 +547,10 @@ final class Selector<R: Registration> {
540547
Int(try KQueue.kevent(kq: self.fd, changelist: nil, nchanges: 0, eventlist: events, nevents: Int32(eventsCapacity), timeout: ts))
541548
}
542549

543-
for i in 0..<ready {
550+
// start with no deregistrations happened
551+
self.deregistrationsHappened = false
552+
// temporary workaround to stop us delivering outdated events; possibly set in `deregister`
553+
for i in 0..<ready where !self.deregistrationsHappened {
544554
let ev = events[i]
545555
let filter = Int32(ev.filter)
546556
guard Int32(ev.flags) & EV_ERROR == 0 else {

Tests/NIOTests/EchoServerClientTest+XCTest.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ extension EchoServerClientTest {
4040
("testPendingReadProcessedAfterWriteError", testPendingReadProcessedAfterWriteError),
4141
("testChannelErrorEOFNotFiredThroughPipeline", testChannelErrorEOFNotFiredThroughPipeline),
4242
("testPortNumbers", testPortNumbers),
43+
("testConnectingToIPv4And6ButServerOnlyWaitsOnIPv4", testConnectingToIPv4And6ButServerOnlyWaitsOnIPv4),
4344
]
4445
}
4546
}

Tests/NIOTests/EchoServerClientTest.swift

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -779,4 +779,51 @@ class EchoServerClientTest : XCTestCase {
779779
}
780780
XCTAssertTrue(atLeastOneSucceeded)
781781
}
782+
783+
func testConnectingToIPv4And6ButServerOnlyWaitsOnIPv4() throws {
784+
let group = MultiThreadedEventLoopGroup(numThreads: 1)
785+
defer {
786+
XCTAssertNoThrow(try group.syncShutdownGracefully())
787+
}
788+
789+
let numBytes = 16 * 1024
790+
let promise: EventLoopPromise<ByteBuffer> = group.next().newPromise()
791+
let countingHandler = ByteCountingHandler(numBytes: numBytes, promise: promise)
792+
793+
// we're binding to IPv4 only
794+
let serverChannel = try ServerBootstrap(group: group)
795+
.serverChannelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
796+
.childChannelInitializer { channel in
797+
channel.pipeline.add(handler: countingHandler)
798+
}
799+
.bind(host: "127.0.0.1", port: 0)
800+
.wait()
801+
802+
defer {
803+
XCTAssertNoThrow(try serverChannel.syncCloseAcceptingAlreadyClosed())
804+
}
805+
806+
// but we're trying to connect to (depending on the system configuration and resolver) IPv4 and IPv6
807+
let clientChannel = try ClientBootstrap(group: group)
808+
.connect(host: "localhost", port: Int(serverChannel.localAddress!.port!))
809+
.thenIfError {
810+
promise.fail(error: $0)
811+
return group.next().newFailedFuture(error: $0)
812+
}
813+
.wait()
814+
815+
defer {
816+
XCTAssertNoThrow(try clientChannel.syncCloseAcceptingAlreadyClosed())
817+
}
818+
819+
var buffer = clientChannel.allocator.buffer(capacity: numBytes)
820+
821+
for i in 0..<numBytes {
822+
buffer.write(integer: UInt8(i % 256))
823+
}
824+
825+
try clientChannel.writeAndFlush(NIOAny(buffer)).wait()
826+
827+
try countingHandler.assertReceived(buffer: buffer)
828+
}
782829
}

Tests/NIOTests/SelectorTest+XCTest.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ extension SelectorTest {
2828
return [
2929
("testDeregisterWhileProcessingEvents", testDeregisterWhileProcessingEvents),
3030
("testDeregisterAndCloseWhileProcessingEvents", testDeregisterAndCloseWhileProcessingEvents),
31+
("testWeDoNotDeliverEventsForPreviouslyClosedChannels", testWeDoNotDeliverEventsForPreviouslyClosedChannels),
3132
]
3233
}
3334
}

0 commit comments

Comments
 (0)