Skip to content

Commit 68e8c5e

Browse files
committed
Restructure RedisMessenger to be a little more legible
1 parent 87618be commit 68e8c5e

File tree

1 file changed

+64
-53
lines changed

1 file changed

+64
-53
lines changed
Lines changed: 64 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,81 @@
11
import NIO
22

3-
/// `ChannelInboundHandler` that handles the responsibility of coordinating incoming and outgoing messages
4-
/// on a particular connection to Redis.
5-
internal final class RedisMessenger: ChannelInboundHandler {
6-
/// See `ChannelInboundHandler.InboundIn`
7-
public typealias InboundIn = RedisData
8-
9-
/// See `ChannelInboundHandler.OutboundOut`
10-
public typealias OutboundOut = RedisData
11-
12-
/// Queue of promises waiting to receive an incoming response value from a outgoing message.
13-
private var waitingResponseQueue: [EventLoopPromise<InboundIn>]
14-
/// Queue of unsent outgoing messages, with the oldest objects at the end of the array.
15-
private var outgoingMessageQueue: [OutboundOut]
16-
17-
/// This handler's event loop.
3+
/// `ChannelInboundHandler` that is responsible for coordinating incoming and outgoing messages on a particular
4+
/// connection to Redis.
5+
internal final class RedisMessenger {
186
private let eventLoop: EventLoop
197

20-
/// Context used for writing outgoing messages with.
21-
private weak var outputContext: ChannelHandlerContext?
8+
/// Context to be used for writing outgoing messages with.
9+
private var channelContext: ChannelHandlerContext?
10+
11+
/// Queue of promises waiting to receive an incoming response value from an outgoing message.
12+
private var waitingResponseQueue: [EventLoopPromise<RedisData>]
13+
/// Queue of unset outgoing messages, with the oldest messages at the end of the array.
14+
private var outgoingMessageQueue: [RedisData]
2215

2316
/// Creates a new handler that works on the specified `EventLoop`.
24-
public init(on eventLoop: EventLoop) {
17+
init(on eventLoop: EventLoop) {
2518
self.waitingResponseQueue = []
2619
self.outgoingMessageQueue = []
2720
self.eventLoop = eventLoop
2821
}
22+
23+
/// Adds a complete message encoded as `RedisData` to the queue and returns an `EventLoopFuture` that resolves
24+
/// the response from Redis.
25+
func enqueue(_ output: RedisData) -> EventLoopFuture<RedisData> {
26+
// ensure that we are on the event loop before modifying our data
27+
guard eventLoop.inEventLoop else {
28+
return eventLoop.submit({}).then { return self.enqueue(output) }
29+
}
30+
31+
// add the new output to the writing queue at the front
32+
outgoingMessageQueue.insert(output, at: 0)
33+
34+
// every outgoing message is expected to receive some form of response, so create a promise that we'll resolve
35+
// with the response
36+
let promise = eventLoop.makePromise(of: RedisData.self)
37+
waitingResponseQueue.insert(promise, at: 0)
38+
39+
// if we have a context for writing, flush the outgoing queue
40+
channelContext?.eventLoop.execute {
41+
self._flushOutgoingQueue()
42+
}
43+
44+
return promise.futureResult
45+
}
46+
47+
/// Writes all queued outgoing messages to the channel.
48+
func _flushOutgoingQueue() {
49+
guard let context = channelContext else { return }
50+
51+
while let output = outgoingMessageQueue.popLast() {
52+
context.write(wrapOutboundOut(output), promise: nil)
53+
context.flush()
54+
}
55+
}
56+
}
2957

58+
// MARK: ChannelInboundHandler
59+
60+
extension RedisMessenger: ChannelInboundHandler {
61+
/// See `ChannelInboundHandler.InboundIn`
62+
public typealias InboundIn = RedisData
63+
64+
/// See `ChannelInboundHandler.OutboundOut`
65+
public typealias OutboundOut = RedisData
66+
67+
/// Invoked by NIO when the channel for this handler has become active, receiving a context that is ready to
68+
/// send messages.
69+
///
70+
/// Any queued messages will be flushed at this point.
3071
/// See `ChannelInboundHandler.channelActive(ctx:)`
3172
public func channelActive(ctx: ChannelHandlerContext) {
32-
outputContext = ctx
73+
channelContext = ctx
3374
_flushOutgoingQueue()
3475
}
3576

77+
/// Invoked by NIO when an error was thrown earlier in the response chain. The waiting promise at the front
78+
/// of the queue will be failed with the error.
3679
/// See `ChannelInboundHandler.errorCaught(ctx:error:)`
3780
public func errorCaught(ctx: ChannelHandlerContext, error: Error) {
3881
guard let leadPromise = waitingResponseQueue.last else {
@@ -41,6 +84,8 @@ internal final class RedisMessenger: ChannelInboundHandler {
4184
leadPromise.fail(error: error)
4285
}
4386

87+
/// Invoked by NIO when a read has been fired from earlier in the response chain. This forwards the unwrapped
88+
/// `RedisData` to the response at the front of the queue.
4489
/// See `ChannelInboundHandler.channelRead(ctx:data:)`
4590
public func channelRead(ctx: ChannelHandlerContext, data: NIOAny) {
4691
let input = unwrapInboundIn(data)
@@ -54,38 +99,4 @@ internal final class RedisMessenger: ChannelInboundHandler {
5499

55100
leadPromise.succeed(result: input)
56101
}
57-
58-
/// Adds a complete message encoded as `RedisData` to the queue and returns an `EventLoopFuture` that resolves
59-
/// the response from Redis.
60-
func enqueue(_ output: OutboundOut) -> EventLoopFuture<InboundIn> {
61-
// ensure that we are on the event loop before modifying our data
62-
guard eventLoop.inEventLoop else {
63-
return eventLoop.submit { }.then { return self.enqueue(output) }
64-
}
65-
66-
// add the new output to the writing queue at the front
67-
outgoingMessageQueue.insert(output, at: 0)
68-
69-
// every outgoing message is expected to receive some form of response, so build
70-
// the context in the readQueue that we resolve with the response
71-
let promise = eventLoop.makePromise(of: InboundIn.self)
72-
waitingResponseQueue.insert(promise, at: 0)
73-
74-
// if we have a context for writing, flush the outgoing queue
75-
outputContext?.eventLoop.execute {
76-
self._flushOutgoingQueue()
77-
}
78-
79-
return promise.futureResult
80-
}
81-
82-
/// Writes all queued outgoing messages to the channel.
83-
func _flushOutgoingQueue() {
84-
guard let context = outputContext else { return }
85-
86-
while let output = outgoingMessageQueue.popLast() {
87-
context.write(wrapOutboundOut(output), promise: nil)
88-
context.flush()
89-
}
90-
}
91102
}

0 commit comments

Comments
 (0)