Skip to content

Commit cfb99ba

Browse files
committed
#100 -- Fix addPubSubHandler not checking if already added
1 parent 284b7f0 commit cfb99ba

File tree

3 files changed

+74
-22
lines changed

3 files changed

+74
-22
lines changed

Sources/RediStack/Extensions/SwiftNIO.swift

Lines changed: 42 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
//
33
// This source file is part of the RediStack open source project
44
//
5-
// Copyright (c) 2019-2020 RediStack project authors
5+
// Copyright (c) 2019-2022 RediStack project authors
66
// Licensed under Apache License v2.0
77
//
88
// See LICENSE.txt for license information
@@ -26,10 +26,10 @@ extension TimeAmount {
2626

2727
// MARK: Pipeline manipulation
2828

29-
extension Channel {
29+
extension ChannelPipeline {
3030
/// Adds the baseline channel handlers needed to support sending and receiving messages in Redis Serialization Protocol (RESP) format to the pipeline.
3131
///
32-
/// For implementation details, see `RedisMessageEncoder`, `RedisByteDecoder`, and `RedisCommandHandler`.
32+
/// For implementation details, see ``RedisMessageEncoder``, ``RedisByteDecoder``, and ``RedisCommandHandler``.
3333
///
3434
/// # Pipeline chart
3535
/// RedisClient.send
@@ -62,20 +62,20 @@ extension Channel {
6262
(RedisCommandHandler(), "RediStack.CommandHandler")
6363
]
6464
return .andAllSucceed(
65-
handlers.map { self.pipeline.addHandler($0, name: $1) },
65+
handlers.map { self.addHandler($0, name: $1) },
6666
on: self.eventLoop
6767
)
6868
}
6969

7070
/// Adds the channel handler that is responsible for handling everything related to Redis PubSub.
71-
/// - Important: The connection that manages this channel is responsible for removing the `RedisPubSubHandler`.
71+
/// - Important: The connection that manages this channel is responsible for removing the ``RedisPubSubHandler``.
7272
///
7373
/// # Discussion
7474
/// PubSub responsibilities include managing subscription callbacks as well as parsing and dispatching messages received from Redis.
7575
///
76-
/// For implementation details, see `RedisPubSubHandler`.
76+
/// For implementation details, see ``RedisPubSubHandler``.
7777
///
78-
/// The handler will be inserted in the `NIO.ChannelPipeline` just before the `RedisCommandHandler` instance.
78+
/// The handler will be inserted in the `NIO.ChannelPipeline` just before the ``RedisCommandHandler`` instance.
7979
///
8080
/// # Pipeline chart
8181
/// RedisClient.send
@@ -106,14 +106,38 @@ extension Channel {
106106
/// | [ Socket.read ] | | [ Socket.write ] |
107107
/// +-----------------+ +------------------+
108108
/// - Returns: A `NIO.EventLoopFuture` that resolves the instance of the PubSubHandler that was added to the pipeline.
109-
public func addPubSubHandler() -> EventLoopFuture<RedisPubSubHandler> {
110-
return self.pipeline
111-
.handler(type: RedisCommandHandler.self)
112-
.flatMap {
113-
let pubsubHandler = RedisPubSubHandler(eventLoop: self.eventLoop)
114-
return self.pipeline
115-
.addHandler(pubsubHandler, name: "RediStack.PubSubHandler", position: .before($0))
116-
.map { pubsubHandler }
109+
public func addRedisPubSubHandler() -> EventLoopFuture<RedisPubSubHandler> {
110+
// first try to return the handler that already exists in the pipeline
111+
112+
return self.handler(type: RedisPubSubHandler.self)
113+
.flatMapError {
114+
// if it doesn't exist, add it to the pipeline
115+
guard
116+
let error = $0 as? ChannelPipelineError,
117+
error == .notFound
118+
else { return self.eventLoop.makeFailedFuture($0) }
119+
120+
return self.handler(type: RedisCommandHandler.self)
121+
.flatMap {
122+
let pubsubHandler = RedisPubSubHandler(eventLoop: self.eventLoop)
123+
return self.addHandler(pubsubHandler, name: "RediStack.PubSubHandler", position: .before($0))
124+
.map { pubsubHandler }
125+
}
126+
}
127+
}
128+
129+
/// Removes the provided Redis PubSub handler.
130+
/// - Returns: A `NIO.EventLoopFuture` that resolves when the handler was removed from the pipeline.
131+
public func removeRedisPubSubHandler(_ handler: RedisPubSubHandler) -> EventLoopFuture<Void> {
132+
self.removeHandler(handler)
133+
.flatMapError {
134+
// if it was already removed, then we can just succeed
135+
guard
136+
let error = $0 as? ChannelPipelineError,
137+
error == .alreadyRemoved
138+
else { return self.eventLoop.makeFailedFuture($0) }
139+
140+
return self.eventLoop.makeSucceededVoidFuture()
117141
}
118142
}
119143
}
@@ -124,9 +148,9 @@ extension ClientBootstrap {
124148
/// Makes a new `ClientBootstrap` instance with a baseline Redis `Channel` pipeline
125149
/// for sending and receiving messages in Redis Serialization Protocol (RESP) format.
126150
///
127-
/// For implementation details, see `RedisMessageEncoder`, `RedisByteDecoder`, and `RedisCommandHandler`.
151+
/// For implementation details, see ``RedisMessageEncoder``, ``RedisByteDecoder``, and ``RedisCommandHandler``.
128152
///
129-
/// See also `Channel.addBaseRedisHandlers()`.
153+
/// See also `ChannelPipeline.addBaseRedisHandlers()`.
130154
/// - Parameter group: The `EventLoopGroup` to create the `ClientBootstrap` with.
131155
/// - Returns: A TCP connection with the base configuration of a `Channel` pipeline for RESP messages.
132156
public static func makeRedisTCPClient(group: EventLoopGroup) -> ClientBootstrap {
@@ -135,6 +159,6 @@ extension ClientBootstrap {
135159
ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR),
136160
value: 1
137161
)
138-
.channelInitializer { $0.addBaseRedisHandlers() }
162+
.channelInitializer { $0.pipeline.addBaseRedisHandlers() }
139163
}
140164
}

Sources/RediStack/RedisConnection.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -427,8 +427,8 @@ extension RedisConnection {
427427
guard case let .pubsub(handler) = self.state else {
428428
logger.debug("not in pubsub mode, moving to pubsub mode")
429429
// otherwise, add it to the pipeline, add the subscriptions, and update our state after it was successful
430-
return self.channel
431-
.addPubSubHandler()
430+
return self.channel.pipeline
431+
.addRedisPubSubHandler()
432432
.flatMap { handler in
433433
logger.trace("handler added, adding subscription")
434434
return handler
@@ -442,7 +442,7 @@ extension RedisConnection {
442442
)
443443
// if there was an error, no subscriptions were made
444444
// so remove the handler and propogate the error to the caller by rethrowing it
445-
return self.channel.pipeline.removeHandler(handler)
445+
return self.channel.pipeline.removeRedisPubSubHandler(handler)
446446
.flatMapThrowing { throw error }
447447
}
448448
// success, return the handler

Tests/RediStackIntegrationTests/Commands/PubSubCommandsTests.swift

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
//
33
// This source file is part of the RediStack open source project
44
//
5-
// Copyright (c) 2020 RediStack project authors
5+
// Copyright (c) 2020-2022 RediStack project authors
66
// Licensed under Apache License v2.0
77
//
88
// See LICENSE.txt for license information
@@ -12,6 +12,7 @@
1212
//
1313
//===----------------------------------------------------------------------===//
1414

15+
import NIO
1516
import RediStack
1617
import RediStackTestUtils
1718
import XCTest
@@ -341,3 +342,30 @@ final class RedisPubSubCommandsPoolTests: RediStackConnectionPoolIntegrationTest
341342
XCTAssertEqual(self.pool.leasedConnectionCount, 0)
342343
}
343344
}
345+
346+
// MARK: - #100 subscribe race condition
347+
348+
extension RedisPubSubCommandsTests {
349+
func test_pubsub_pipelineChanges_hasNoRaceCondition() throws {
350+
func runOperation(_ factory: (RedisChannelName) -> EventLoopFuture<Void>) -> EventLoopFuture<Void> {
351+
return .andAllSucceed(
352+
(0...100_000).reduce(into: []) {
353+
result, index in
354+
355+
result.append(factory("\(#function)-\(index)"))
356+
},
357+
on: self.connection.eventLoop
358+
)
359+
}
360+
361+
// subscribing (adding handler)
362+
try runOperation { self.connection.subscribe(to: $0) { _, _ in } }
363+
.wait()
364+
365+
// unsubscribing (removing handler)
366+
try runOperation { self.connection.unsubscribe(from: $0) }
367+
.wait()
368+
369+
try self.connection.close().wait()
370+
}
371+
}

0 commit comments

Comments
 (0)