Skip to content

Commit 44633a2

Browse files
committed
Merge branch 'pipelining'
* pipelining: Add `NIORedisPipeline` and `RedisPipeline` to allow chaining requests and receiving a single response Rename `redisPipeline` property in `NIORedisConnection` to remove possible confusion on upcoming `NIORedisPipeline` structure Restructure `RedisMessenger` to be a little more legible
2 parents 87618be + a35de88 commit 44633a2

File tree

7 files changed

+306
-62
lines changed

7 files changed

+306
-62
lines changed
Lines changed: 64 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,81 @@
11
import NIO
22

3-
/// `ChannelInboundHandler` that handles the responsibility of coordinating incoming and outgoing messages
4-
/// on a particular connection to Redis.
5-
internal final class RedisMessenger: ChannelInboundHandler {
6-
/// See `ChannelInboundHandler.InboundIn`
7-
public typealias InboundIn = RedisData
8-
9-
/// See `ChannelInboundHandler.OutboundOut`
10-
public typealias OutboundOut = RedisData
11-
12-
/// Queue of promises waiting to receive an incoming response value from a outgoing message.
13-
private var waitingResponseQueue: [EventLoopPromise<InboundIn>]
14-
/// Queue of unsent outgoing messages, with the oldest objects at the end of the array.
15-
private var outgoingMessageQueue: [OutboundOut]
16-
17-
/// This handler's event loop.
3+
/// `ChannelInboundHandler` that is responsible for coordinating incoming and outgoing messages on a particular
4+
/// connection to Redis.
5+
internal final class RedisMessenger {
186
private let eventLoop: EventLoop
197

20-
/// Context used for writing outgoing messages with.
21-
private weak var outputContext: ChannelHandlerContext?
8+
/// Context to be used for writing outgoing messages with.
9+
private var channelContext: ChannelHandlerContext?
10+
11+
/// Queue of promises waiting to receive an incoming response value from an outgoing message.
12+
private var waitingResponseQueue: [EventLoopPromise<RedisData>]
13+
/// Queue of unset outgoing messages, with the oldest messages at the end of the array.
14+
private var outgoingMessageQueue: [RedisData]
2215

2316
/// Creates a new handler that works on the specified `EventLoop`.
24-
public init(on eventLoop: EventLoop) {
17+
init(on eventLoop: EventLoop) {
2518
self.waitingResponseQueue = []
2619
self.outgoingMessageQueue = []
2720
self.eventLoop = eventLoop
2821
}
22+
23+
/// Adds a complete message encoded as `RedisData` to the queue and returns an `EventLoopFuture` that resolves
24+
/// the response from Redis.
25+
func enqueue(_ output: RedisData) -> EventLoopFuture<RedisData> {
26+
// ensure that we are on the event loop before modifying our data
27+
guard eventLoop.inEventLoop else {
28+
return eventLoop.submit({}).then { return self.enqueue(output) }
29+
}
30+
31+
// add the new output to the writing queue at the front
32+
outgoingMessageQueue.insert(output, at: 0)
33+
34+
// every outgoing message is expected to receive some form of response, so create a promise that we'll resolve
35+
// with the response
36+
let promise = eventLoop.makePromise(of: RedisData.self)
37+
waitingResponseQueue.insert(promise, at: 0)
38+
39+
// if we have a context for writing, flush the outgoing queue
40+
channelContext?.eventLoop.execute {
41+
self._flushOutgoingQueue()
42+
}
43+
44+
return promise.futureResult
45+
}
46+
47+
/// Writes all queued outgoing messages to the channel.
48+
func _flushOutgoingQueue() {
49+
guard let context = channelContext else { return }
50+
51+
while let output = outgoingMessageQueue.popLast() {
52+
context.write(wrapOutboundOut(output), promise: nil)
53+
context.flush()
54+
}
55+
}
56+
}
2957

58+
// MARK: ChannelInboundHandler
59+
60+
extension RedisMessenger: ChannelInboundHandler {
61+
/// See `ChannelInboundHandler.InboundIn`
62+
public typealias InboundIn = RedisData
63+
64+
/// See `ChannelInboundHandler.OutboundOut`
65+
public typealias OutboundOut = RedisData
66+
67+
/// Invoked by NIO when the channel for this handler has become active, receiving a context that is ready to
68+
/// send messages.
69+
///
70+
/// Any queued messages will be flushed at this point.
3071
/// See `ChannelInboundHandler.channelActive(ctx:)`
3172
public func channelActive(ctx: ChannelHandlerContext) {
32-
outputContext = ctx
73+
channelContext = ctx
3374
_flushOutgoingQueue()
3475
}
3576

77+
/// Invoked by NIO when an error was thrown earlier in the response chain. The waiting promise at the front
78+
/// of the queue will be failed with the error.
3679
/// See `ChannelInboundHandler.errorCaught(ctx:error:)`
3780
public func errorCaught(ctx: ChannelHandlerContext, error: Error) {
3881
guard let leadPromise = waitingResponseQueue.last else {
@@ -41,6 +84,8 @@ internal final class RedisMessenger: ChannelInboundHandler {
4184
leadPromise.fail(error: error)
4285
}
4386

87+
/// Invoked by NIO when a read has been fired from earlier in the response chain. This forwards the unwrapped
88+
/// `RedisData` to the response at the front of the queue.
4489
/// See `ChannelInboundHandler.channelRead(ctx:data:)`
4590
public func channelRead(ctx: ChannelHandlerContext, data: NIOAny) {
4691
let input = unwrapInboundIn(data)
@@ -54,38 +99,4 @@ internal final class RedisMessenger: ChannelInboundHandler {
5499

55100
leadPromise.succeed(result: input)
56101
}
57-
58-
/// Adds a complete message encoded as `RedisData` to the queue and returns an `EventLoopFuture` that resolves
59-
/// the response from Redis.
60-
func enqueue(_ output: OutboundOut) -> EventLoopFuture<InboundIn> {
61-
// ensure that we are on the event loop before modifying our data
62-
guard eventLoop.inEventLoop else {
63-
return eventLoop.submit { }.then { return self.enqueue(output) }
64-
}
65-
66-
// add the new output to the writing queue at the front
67-
outgoingMessageQueue.insert(output, at: 0)
68-
69-
// every outgoing message is expected to receive some form of response, so build
70-
// the context in the readQueue that we resolve with the response
71-
let promise = eventLoop.makePromise(of: InboundIn.self)
72-
waitingResponseQueue.insert(promise, at: 0)
73-
74-
// if we have a context for writing, flush the outgoing queue
75-
outputContext?.eventLoop.execute {
76-
self._flushOutgoingQueue()
77-
}
78-
79-
return promise.futureResult
80-
}
81-
82-
/// Writes all queued outgoing messages to the channel.
83-
func _flushOutgoingQueue() {
84-
guard let context = outputContext else { return }
85-
86-
while let output = outgoingMessageQueue.popLast() {
87-
context.write(wrapOutboundOut(output), promise: nil)
88-
context.flush()
89-
}
90-
}
91102
}

Sources/NIORedis/NIORedisConnection.swift

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ public final class NIORedisConnection {
88
/// Has the connection been closed?
99
public private(set) var isClosed = Atomic<Bool>(value: false)
1010

11-
internal let redisPipeline: RedisMessenger
11+
internal let messenger: RedisMessenger
1212

1313
private let channel: Channel
1414

@@ -18,7 +18,7 @@ public final class NIORedisConnection {
1818
/// - Important: Call `close()` before deinitializing to properly cleanup resources!
1919
init(channel: Channel, handler: RedisMessenger) {
2020
self.channel = channel
21-
self.redisPipeline = handler
21+
self.messenger = handler
2222
}
2323

2424
/// Closes the connection to Redis.
@@ -31,7 +31,7 @@ public final class NIORedisConnection {
3131
/// Executes the desired command with the specified arguments.
3232
/// - Important: All arguments should be in `.bulkString` format.
3333
public func command(_ command: String, _ arguments: [RedisData] = []) -> EventLoopFuture<RedisData> {
34-
return send(.array([RedisData(bulk: command)] + arguments))
34+
return _send(.array([RedisData(bulk: command)] + arguments))
3535
.thenThrowing { response in
3636
switch response {
3737
case let .error(error): throw error
@@ -40,15 +40,20 @@ public final class NIORedisConnection {
4040
}
4141
}
4242

43-
private func send(_ message: RedisData) -> EventLoopFuture<RedisData> {
43+
/// Creates a `NIORedisPipeline` for executing a batch of commands.
44+
public func makePipeline() -> NIORedisPipeline {
45+
return .init(using: self)
46+
}
47+
48+
func _send(_ message: RedisData) -> EventLoopFuture<RedisData> {
4449
// ensure the connection is still open
4550
guard !isClosed.load() else { return eventLoop.makeFailedFuture(error: RedisError.connectionClosed) }
4651

4752
// create a new promise to store
4853
let promise = eventLoop.makePromise(of: RedisData.self)
4954

5055
// cascade this enqueue to the newly created promise
51-
redisPipeline.enqueue(message).cascade(promise: promise)
56+
messenger.enqueue(message).cascade(promise: promise)
5257

5358
return promise.futureResult
5459
}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
import Foundation
2+
import NIO
3+
4+
/// An object that provides a mechanism to "pipeline" multiple Redis commands in sequence, providing an aggregate response
5+
/// of all the Redis responses for each individual command.
6+
///
7+
/// let results = connection.makePipeline()
8+
/// .enqueue(command: "SET", arguments: ["my_key", 3])
9+
/// .enqueue(command: "INCR", arguments: ["my_key"])
10+
/// .execute()
11+
/// // results == Future<[RedisData]>
12+
/// // results[0].string == Optional("OK")
13+
/// // results[1].int == Optional(4)
14+
/// - Important: The larger the pipeline queue, the more memory both NIORedis and Redis will use.
15+
/// See https://redis.io/topics/pipelining#redis-pipelining
16+
public final class NIORedisPipeline {
17+
/// The client to execute the commands on.
18+
private let connection: NIORedisConnection
19+
private let encoder: RedisDataEncoder = .init()
20+
21+
/// The queue of complete, encoded commands to execute.
22+
private var queue: [RedisData]
23+
private var messageCount: Int
24+
25+
/// Creates a new pipeline queue using the provided `NIORedisConnection`.
26+
/// - Parameter using: The connection to execute the commands on.
27+
public init(using connection: NIORedisConnection) {
28+
self.connection = connection
29+
self.queue = []
30+
self.messageCount = 0
31+
}
32+
33+
/// Queues the provided command and arguments to be executed when `execute()` is invoked.
34+
/// - Parameters:
35+
/// - command: The command to execute. See https://redis.io/commands
36+
/// - arguments: The arguments, if any, to send with the command.
37+
/// - Returns: A self-reference to this `NIORedisPipeline` instance for chaining commands.
38+
@discardableResult
39+
public func enqueue(command: String, arguments: [RedisDataConvertible] = []) throws -> NIORedisPipeline {
40+
let args = try arguments.map { try $0.convertToRedisData() }
41+
42+
queue.append(.array([RedisData(bulk: command)] + args))
43+
44+
return self
45+
}
46+
47+
/// Flushes the queue, sending all of the commands to Redis in the same order as they were enqueued.
48+
/// - Important: If any of the commands fail, the remaining commands will not execute and the `EventLoopFuture` will fail.
49+
/// - Returns: A `EventLoopFuture` that resolves the `RedisData` responses, in the same order as the command queue.
50+
public func execute() -> EventLoopFuture<[RedisData]> {
51+
let promise = connection.eventLoop.makePromise(of: [RedisData].self)
52+
53+
var results = [RedisData]()
54+
var iterator = queue.makeIterator()
55+
56+
// recursive internal method for chaining each request and
57+
// attaching callbacks for failing or ultimately succeeding
58+
func handle(_ command: RedisData) {
59+
let future = connection._send(command)
60+
future.whenSuccess { response in
61+
switch response {
62+
case let .error(error): promise.fail(error: error)
63+
default:
64+
results.append(response)
65+
66+
if let next = iterator.next() {
67+
handle(next)
68+
} else {
69+
promise.succeed(result: results)
70+
}
71+
}
72+
}
73+
future.whenFailure { promise.fail(error: $0) }
74+
}
75+
76+
if let first = iterator.next() {
77+
handle(first)
78+
} else {
79+
promise.succeed(result: [])
80+
}
81+
82+
promise.futureResult.whenComplete { self.queue = [] }
83+
84+
return promise.futureResult
85+
}
86+
}

Sources/Redis/RedisConnection.swift

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,28 @@ import Foundation
22
import NIORedis
33

44
public final class RedisConnection {
5-
private let driverConnection: NIORedisConnection
5+
let _driverConnection: NIORedisConnection
6+
67
private let queue: DispatchQueue
78

8-
deinit { driverConnection.close() }
9+
deinit { _driverConnection.close() }
910

1011
init(driver: NIORedisConnection, callbackQueue: DispatchQueue) {
11-
self.driverConnection = driver
12+
self._driverConnection = driver
1213
self.queue = callbackQueue
1314
}
1415

16+
/// Creates a `RedisPipeline` for executing a batch of commands.
17+
public func makePipeline(callbackQueue: DispatchQueue? = nil) -> RedisPipeline {
18+
return .init(using: self, callbackQueue: callbackQueue ?? queue)
19+
}
20+
1521
public func get(
1622
_ key: String,
1723
_ callback: @escaping (Result<String?, Error>
1824
) -> Void) {
1925
// TODO: Make this a generic method to avoid copy/paste
20-
driverConnection.get(key)
26+
_driverConnection.get(key)
2127
.map { result in
2228
self.queue.async { callback(.success(result)) }
2329
}

Sources/Redis/RedisPipeline.swift

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
import Foundation
2+
import NIORedis
3+
4+
/// An object that provides a mechanism to "pipeline" multiple Redis commands in sequence, providing an aggregate response
5+
/// of all the Redis responses for each individual command.
6+
///
7+
/// connection.makePipeline()
8+
/// .enqueue(command: "SET", arguments: ["my_key", 3])
9+
/// .enqueue(command: "INCR", arguments: ["my_key"])
10+
/// .execute { results in
11+
/// // results[0].string == Optional("OK")
12+
/// // results[1].int == Optional(4)
13+
/// }
14+
/// - Important: The larger the pipeline queue, the more memory both the Redis driver and Redis server will use.
15+
/// See https://redis.io/topics/pipelining#redis-pipelining
16+
public final class RedisPipeline {
17+
private let _driverPipeline: NIORedisPipeline
18+
private let queue: DispatchQueue
19+
20+
/// Creates a new pipeline queue using the provided `RedisConnection`, executing callbacks on the provided `DispatchQueue`.
21+
/// - Parameters:
22+
/// - using: The connection to execute the commands on.
23+
/// - callbackQueue: The queue to execute all callbacks on.
24+
public init(using connection: RedisConnection, callbackQueue: DispatchQueue) {
25+
self._driverPipeline = NIORedisPipeline(using: connection._driverConnection)
26+
self.queue = callbackQueue
27+
}
28+
29+
/// Queues the provided command and arguments to be executed when `execute()` is invoked.
30+
/// - Parameters:
31+
/// - command: The command to execute. See https://redis.io/commands
32+
/// - arguments: The arguments, if any, to send with the command.
33+
/// - Returns: A self-reference to this `RedisPipeline` instance for chaining commands.
34+
@discardableResult
35+
public func enqueue(command: String, arguments: [RedisDataConvertible] = []) throws -> RedisPipeline {
36+
try _driverPipeline.enqueue(command: command, arguments: arguments)
37+
return self
38+
}
39+
40+
/// Flushes the queue, sending all of the commands to Redis in the same order as they were enqueued.
41+
/// - Important: If any of the commands fail, the remaining commands will not execute and the callback will receive a failure.
42+
/// - Parameter callback: The callback to receive the results of the pipeline of commands, or an error if thrown.
43+
public func execute(_ callback: @escaping (Result<[RedisData], Error>) -> Void) {
44+
_driverPipeline.execute()
45+
.map { results in
46+
self.queue.async { callback(.success(results)) }
47+
}
48+
.whenFailure { error in
49+
self.queue.async { callback(.failure(error)) }
50+
}
51+
}
52+
}

0 commit comments

Comments
 (0)