Skip to content

Commit a35de88

Browse files
committed
Add NIORedisPipeline and RedisPipeline to allow chaining requests and receiving a single response
1 parent 75efad9 commit a35de88

File tree

6 files changed

+239
-6
lines changed

6 files changed

+239
-6
lines changed

Sources/NIORedis/NIORedisConnection.swift

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public final class NIORedisConnection {
3131
/// Executes the desired command with the specified arguments.
3232
/// - Important: All arguments should be in `.bulkString` format.
3333
public func command(_ command: String, _ arguments: [RedisData] = []) -> EventLoopFuture<RedisData> {
34-
return send(.array([RedisData(bulk: command)] + arguments))
34+
return _send(.array([RedisData(bulk: command)] + arguments))
3535
.thenThrowing { response in
3636
switch response {
3737
case let .error(error): throw error
@@ -40,7 +40,12 @@ public final class NIORedisConnection {
4040
}
4141
}
4242

43-
private func send(_ message: RedisData) -> EventLoopFuture<RedisData> {
43+
/// Creates a `NIORedisPipeline` for executing a batch of commands.
44+
public func makePipeline() -> NIORedisPipeline {
45+
return .init(using: self)
46+
}
47+
48+
func _send(_ message: RedisData) -> EventLoopFuture<RedisData> {
4449
// ensure the connection is still open
4550
guard !isClosed.load() else { return eventLoop.makeFailedFuture(error: RedisError.connectionClosed) }
4651

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
import Foundation
2+
import NIO
3+
4+
/// An object that provides a mechanism to "pipeline" multiple Redis commands in sequence, providing an aggregate response
5+
/// of all the Redis responses for each individual command.
6+
///
7+
/// let results = connection.makePipeline()
8+
/// .enqueue(command: "SET", arguments: ["my_key", 3])
9+
/// .enqueue(command: "INCR", arguments: ["my_key"])
10+
/// .execute()
11+
/// // results == Future<[RedisData]>
12+
/// // results[0].string == Optional("OK")
13+
/// // results[1].int == Optional(4)
14+
/// - Important: The larger the pipeline queue, the more memory both NIORedis and Redis will use.
15+
/// See https://redis.io/topics/pipelining#redis-pipelining
16+
public final class NIORedisPipeline {
17+
/// The client to execute the commands on.
18+
private let connection: NIORedisConnection
19+
private let encoder: RedisDataEncoder = .init()
20+
21+
/// The queue of complete, encoded commands to execute.
22+
private var queue: [RedisData]
23+
private var messageCount: Int
24+
25+
/// Creates a new pipeline queue using the provided `NIORedisConnection`.
26+
/// - Parameter using: The connection to execute the commands on.
27+
public init(using connection: NIORedisConnection) {
28+
self.connection = connection
29+
self.queue = []
30+
self.messageCount = 0
31+
}
32+
33+
/// Queues the provided command and arguments to be executed when `execute()` is invoked.
34+
/// - Parameters:
35+
/// - command: The command to execute. See https://redis.io/commands
36+
/// - arguments: The arguments, if any, to send with the command.
37+
/// - Returns: A self-reference to this `NIORedisPipeline` instance for chaining commands.
38+
@discardableResult
39+
public func enqueue(command: String, arguments: [RedisDataConvertible] = []) throws -> NIORedisPipeline {
40+
let args = try arguments.map { try $0.convertToRedisData() }
41+
42+
queue.append(.array([RedisData(bulk: command)] + args))
43+
44+
return self
45+
}
46+
47+
/// Flushes the queue, sending all of the commands to Redis in the same order as they were enqueued.
48+
/// - Important: If any of the commands fail, the remaining commands will not execute and the `EventLoopFuture` will fail.
49+
/// - Returns: A `EventLoopFuture` that resolves the `RedisData` responses, in the same order as the command queue.
50+
public func execute() -> EventLoopFuture<[RedisData]> {
51+
let promise = connection.eventLoop.makePromise(of: [RedisData].self)
52+
53+
var results = [RedisData]()
54+
var iterator = queue.makeIterator()
55+
56+
// recursive internal method for chaining each request and
57+
// attaching callbacks for failing or ultimately succeeding
58+
func handle(_ command: RedisData) {
59+
let future = connection._send(command)
60+
future.whenSuccess { response in
61+
switch response {
62+
case let .error(error): promise.fail(error: error)
63+
default:
64+
results.append(response)
65+
66+
if let next = iterator.next() {
67+
handle(next)
68+
} else {
69+
promise.succeed(result: results)
70+
}
71+
}
72+
}
73+
future.whenFailure { promise.fail(error: $0) }
74+
}
75+
76+
if let first = iterator.next() {
77+
handle(first)
78+
} else {
79+
promise.succeed(result: [])
80+
}
81+
82+
promise.futureResult.whenComplete { self.queue = [] }
83+
84+
return promise.futureResult
85+
}
86+
}

Sources/Redis/RedisConnection.swift

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,28 @@ import Foundation
22
import NIORedis
33

44
public final class RedisConnection {
5-
private let driverConnection: NIORedisConnection
5+
let _driverConnection: NIORedisConnection
6+
67
private let queue: DispatchQueue
78

8-
deinit { driverConnection.close() }
9+
deinit { _driverConnection.close() }
910

1011
init(driver: NIORedisConnection, callbackQueue: DispatchQueue) {
11-
self.driverConnection = driver
12+
self._driverConnection = driver
1213
self.queue = callbackQueue
1314
}
1415

16+
/// Creates a `RedisPipeline` for executing a batch of commands.
17+
public func makePipeline(callbackQueue: DispatchQueue? = nil) -> RedisPipeline {
18+
return .init(using: self, callbackQueue: callbackQueue ?? queue)
19+
}
20+
1521
public func get(
1622
_ key: String,
1723
_ callback: @escaping (Result<String?, Error>
1824
) -> Void) {
1925
// TODO: Make this a generic method to avoid copy/paste
20-
driverConnection.get(key)
26+
_driverConnection.get(key)
2127
.map { result in
2228
self.queue.async { callback(.success(result)) }
2329
}

Sources/Redis/RedisPipeline.swift

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
import Foundation
2+
import NIORedis
3+
4+
/// An object that provides a mechanism to "pipeline" multiple Redis commands in sequence, providing an aggregate response
5+
/// of all the Redis responses for each individual command.
6+
///
7+
/// connection.makePipeline()
8+
/// .enqueue(command: "SET", arguments: ["my_key", 3])
9+
/// .enqueue(command: "INCR", arguments: ["my_key"])
10+
/// .execute { results in
11+
/// // results[0].string == Optional("OK")
12+
/// // results[1].int == Optional(4)
13+
/// }
14+
/// - Important: The larger the pipeline queue, the more memory both the Redis driver and Redis server will use.
15+
/// See https://redis.io/topics/pipelining#redis-pipelining
16+
public final class RedisPipeline {
17+
private let _driverPipeline: NIORedisPipeline
18+
private let queue: DispatchQueue
19+
20+
/// Creates a new pipeline queue using the provided `RedisConnection`, executing callbacks on the provided `DispatchQueue`.
21+
/// - Parameters:
22+
/// - using: The connection to execute the commands on.
23+
/// - callbackQueue: The queue to execute all callbacks on.
24+
public init(using connection: RedisConnection, callbackQueue: DispatchQueue) {
25+
self._driverPipeline = NIORedisPipeline(using: connection._driverConnection)
26+
self.queue = callbackQueue
27+
}
28+
29+
/// Queues the provided command and arguments to be executed when `execute()` is invoked.
30+
/// - Parameters:
31+
/// - command: The command to execute. See https://redis.io/commands
32+
/// - arguments: The arguments, if any, to send with the command.
33+
/// - Returns: A self-reference to this `RedisPipeline` instance for chaining commands.
34+
@discardableResult
35+
public func enqueue(command: String, arguments: [RedisDataConvertible] = []) throws -> RedisPipeline {
36+
try _driverPipeline.enqueue(command: command, arguments: arguments)
37+
return self
38+
}
39+
40+
/// Flushes the queue, sending all of the commands to Redis in the same order as they were enqueued.
41+
/// - Important: If any of the commands fail, the remaining commands will not execute and the callback will receive a failure.
42+
/// - Parameter callback: The callback to receive the results of the pipeline of commands, or an error if thrown.
43+
public func execute(_ callback: @escaping (Result<[RedisData], Error>) -> Void) {
44+
_driverPipeline.execute()
45+
.map { results in
46+
self.queue.async { callback(.success(results)) }
47+
}
48+
.whenFailure { error in
49+
self.queue.async { callback(.failure(error)) }
50+
}
51+
}
52+
}
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
@testable import NIORedis
2+
import XCTest
3+
4+
final class NIORedisPipelineTests: XCTestCase {
5+
private var redis: NIORedis!
6+
private var connection: NIORedisConnection!
7+
8+
override func setUp() {
9+
let redis = NIORedis(executionModel: .spawnThreads(2))
10+
11+
guard let connection = try? redis.makeConnection().wait() else {
12+
return XCTFail("Failed to create connection!")
13+
}
14+
15+
self.redis = redis
16+
self.connection = connection
17+
}
18+
19+
override func tearDown() {
20+
_ = try? connection.command("FLUSHALL").wait()
21+
connection.close()
22+
try? redis.terminate()
23+
}
24+
25+
func test_enqueue() {
26+
let pipeline = connection.makePipeline()
27+
28+
XCTAssertNoThrow(try pipeline.enqueue(command: "PING"))
29+
XCTAssertNoThrow(try pipeline.enqueue(command: "SET", arguments: ["KEY", 3]))
30+
}
31+
32+
func test_executeFails() throws {
33+
let pipeline = try connection.makePipeline()
34+
.enqueue(command: "GET")
35+
36+
XCTAssertThrowsError(try pipeline.execute().wait())
37+
}
38+
39+
func test_singleCommand() throws {
40+
let results = try connection.makePipeline()
41+
.enqueue(command: "PING")
42+
.execute()
43+
.wait()
44+
45+
XCTAssertEqual(results[0].string, "PONG")
46+
}
47+
48+
func test_multipleCommands() throws {
49+
let results = try connection.makePipeline()
50+
.enqueue(command: "PING")
51+
.enqueue(command: "SET", arguments: ["my_key", 3])
52+
.enqueue(command: "GET", arguments: ["my_key"])
53+
.execute()
54+
.wait()
55+
56+
XCTAssertEqual(results[0].string, "PONG")
57+
XCTAssertEqual(results[1].string, "OK")
58+
XCTAssertEqual(results[2].data, "3".convertedToData())
59+
}
60+
61+
func test_executeIsOrdered() throws {
62+
let results = try connection.makePipeline()
63+
.enqueue(command: "SET", arguments: ["key", 1])
64+
.enqueue(command: "INCR", arguments: ["key"])
65+
.enqueue(command: "DECR", arguments: ["key"])
66+
.enqueue(command: "INCRBY", arguments: ["key", 15])
67+
.execute()
68+
.wait()
69+
70+
XCTAssertEqual(results[0].string, "OK")
71+
XCTAssertEqual(results[1].int, 2)
72+
XCTAssertEqual(results[2].int, 1)
73+
XCTAssertEqual(results[3].int, 16)
74+
}
75+
76+
static var allTests = [
77+
("test_enqueue", test_enqueue),
78+
("test_executeFails", test_executeFails),
79+
("test_singleCommand", test_singleCommand),
80+
("test_multipleCommands", test_multipleCommands),
81+
("test_executeIsOrdered", test_executeIsOrdered),
82+
]
83+
}

Tests/NIORedisTests/XCTestManifests.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ public func allTests() -> [XCTestCaseEntry] {
1010
testCase(RedisDataEncoderTests.allTests),
1111
testCase(RedisDataEncoderParsingTests.allTests),
1212
testCase(BasicCommandsTests.allTests),
13+
testCase(NIORedisPipelineTests.allTests)
1314
]
1415
}
1516
#endif

0 commit comments

Comments
 (0)