Skip to content

Commit a5c3625

Browse files
authored
Add ValkeyConnection.triggerGracefulShutdown (#207)
1 parent f065fc2 commit a5c3625

File tree

5 files changed

+53
-15
lines changed

5 files changed

+53
-15
lines changed

Sources/Valkey/Connection/ValkeyChannelHandler+stateMachine.swift

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -330,14 +330,13 @@ extension ValkeyChannelHandler {
330330
}
331331

332332
@usableFromInline
333-
enum GracefulShutdownAction {
334-
case waitForPendingCommands(Context)
333+
enum TriggerGracefulShutdownAction {
335334
case closeConnection(Context)
336335
case doNothing
337336
}
338337
/// Want to gracefully shutdown the handler
339338
@usableFromInline
340-
mutating func gracefulShutdown() -> GracefulShutdownAction {
339+
mutating func triggerGracefulShutdown() -> TriggerGracefulShutdownAction {
341340
switch consume self.state {
342341
case .initialized:
343342
self = .closed(nil)
@@ -346,11 +345,11 @@ extension ValkeyChannelHandler {
346345
var pendingCommands = state.pendingCommands
347346
pendingCommands.prepend(state.pendingHelloCommand)
348347
self = .closing(.init(context: state.context, pendingCommands: pendingCommands))
349-
return .waitForPendingCommands(state.context)
348+
return .doNothing
350349
case .active(let state):
351350
if state.pendingCommands.count > 0 {
352351
self = .closing(.init(context: state.context, pendingCommands: state.pendingCommands))
353-
return .waitForPendingCommands(state.context)
352+
return .doNothing
354353
} else {
355354
self = .closed(nil)
356355
return .closeConnection(state.context)

Sources/Valkey/Connection/ValkeyChannelHandler.swift

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -517,6 +517,15 @@ final class ValkeyChannelHandler: ChannelInboundHandler {
517517
break
518518
}
519519
}
520+
521+
func triggerGracefulShutdown() {
522+
switch self.stateMachine.triggerGracefulShutdown() {
523+
case .closeConnection(let context):
524+
context.close(mode: .all, promise: nil)
525+
case .doNothing:
526+
break
527+
}
528+
}
520529
}
521530

522531
@available(valkeySwift 1.0, *)

Sources/Valkey/Connection/ValkeyConnection.swift

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,14 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
165165
try await self.channelHandler.waitOnActive().get()
166166
}
167167

168+
/// Trigger graceful shutdown of connection
169+
///
170+
/// The connection will wait until all pending commands have been processed before
171+
/// closing the connection.
172+
func triggerGracefulShutdown() {
173+
self.channelHandler.triggerGracefulShutdown()
174+
}
175+
168176
/// Send RESP command to Valkey connection
169177
/// - Parameter command: ValkeyCommand structure
170178
/// - Returns: The command response as defined in the ValkeyCommand

Tests/ValkeyTests/ValkeyChannelHandlerStateMachineTests.swift

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ struct ValkeyChannelHandlerStateMachineTests {
147147
var stateMachine = ValkeyChannelHandler.StateMachine<String>()
148148
stateMachine.setConnected(context: "testGracefulShutdown")
149149
stateMachine.receiveHelloResponse()
150-
switch stateMachine.gracefulShutdown() {
150+
switch stateMachine.triggerGracefulShutdown() {
151151
case .closeConnection(let context):
152152
#expect(context == "testGracefulShutdown")
153153
default:
@@ -169,10 +169,10 @@ struct ValkeyChannelHandlerStateMachineTests {
169169
case .throwError:
170170
Issue.record("Invalid sendCommand action")
171171
}
172-
switch stateMachine.gracefulShutdown() {
173-
case .waitForPendingCommands(let context):
174-
#expect(context == "testGracefulShutdown")
175-
default:
172+
switch stateMachine.triggerGracefulShutdown() {
173+
case .doNothing:
174+
break
175+
case .closeConnection:
176176
Issue.record("Invalid waitForPendingCommands action")
177177
}
178178
expect(
@@ -208,10 +208,10 @@ struct ValkeyChannelHandlerStateMachineTests {
208208
case .throwError:
209209
Issue.record("Invalid sendCommand action")
210210
}
211-
switch stateMachine.gracefulShutdown() {
212-
case .waitForPendingCommands(let context):
213-
#expect(context == "testClosedClosingState")
214-
default:
211+
switch stateMachine.triggerGracefulShutdown() {
212+
case .doNothing:
213+
break
214+
case .closeConnection:
215215
Issue.record("Invalid waitForPendingCommands action")
216216
}
217217
expect(
@@ -334,7 +334,7 @@ struct ValkeyChannelHandlerStateMachineTests {
334334
case .throwError:
335335
Issue.record("Invalid sendCommand action")
336336
}
337-
_ = stateMachine.gracefulShutdown()
337+
_ = stateMachine.triggerGracefulShutdown()
338338
switch stateMachine.cancel(requestID: 23) {
339339
case .failPendingCommandsAndClose(let context, let cancel, let closeConnectionDueToCancel):
340340
#expect(context == "testCancelGracefulShutdown")

Tests/ValkeyTests/ValkeyConnectionTests.swift

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -587,6 +587,28 @@ struct ConnectionTests {
587587
try await channel.close()
588588
}
589589

590+
@Test
591+
@available(valkeySwift 1.0, *)
592+
func testTriggerGracefulShutdown() async throws {
593+
let channel = NIOAsyncTestingChannel()
594+
let logger = Logger(label: "test")
595+
let connection = try await ValkeyConnection.setupChannelAndConnect(channel, configuration: .init(), logger: logger)
596+
try await channel.processHello()
597+
598+
async let fooResult = connection.get("foo").map { String(buffer: $0) }
599+
600+
let outbound = try await channel.waitForOutboundWrite(as: ByteBuffer.self)
601+
#expect(outbound == RESPToken(.command(["GET", "foo"])).base)
602+
603+
await connection.triggerGracefulShutdown()
604+
#expect(channel.isActive)
605+
606+
try await channel.writeInbound(RESPToken(.bulkString("Bar")).base)
607+
#expect(try await fooResult == "Bar")
608+
609+
try await channel.closeFuture.get()
610+
}
611+
590612
#if DistributedTracingSupport
591613
@Suite
592614
struct DistributedTracingTests {

0 commit comments

Comments
 (0)