@@ -16,7 +16,6 @@ import Logging
1616import NIOCore
1717import NIOHTTP1
1818import NIOPosix
19- import _NIOBase64
2019
2120final actor NewLambdaRuntimeClient : LambdaRuntimeClientProtocol {
2221 nonisolated let unownedExecutor : UnownedSerialExecutor
@@ -69,11 +68,22 @@ final actor NewLambdaRuntimeClient: LambdaRuntimeClientProtocol {
6968 case sentResponse( requestID: String )
7069 }
7170
71+ enum ClosingState {
72+ case notClosing
73+ case closing( CheckedContinuation < Void , Never > )
74+ }
75+
7276 private let eventLoop : any EventLoop
7377 private let logger : Logger
7478 private let configuration : Configuration
79+
7580 private var connectionState : ConnectionState = . disconnected
7681 private var lambdaState : LambdaState = . idle( previousRequestID: nil )
82+ private var closingState : ClosingState = . notClosing
83+
84+ // connections that are currently being closed. In the `run` method we must await all of them
85+ // being fully closed before we can return from it.
86+ private var closingConnections : [ any Channel ] = [ ]
7787
7888 static func withRuntimeClient< Result> (
7989 configuration: Configuration ,
@@ -89,6 +99,8 @@ final actor NewLambdaRuntimeClient: LambdaRuntimeClientProtocol {
8999 result = . failure( error)
90100 }
91101
102+ await runtime. close ( )
103+
92104 //try? await runtime.close()
93105 return try result. get ( )
94106 }
@@ -100,6 +112,31 @@ final actor NewLambdaRuntimeClient: LambdaRuntimeClientProtocol {
100112 self . logger = logger
101113 }
102114
115+ private func close( ) async {
116+ guard case . notClosing = self . closingState else {
117+ return
118+ }
119+ await withCheckedContinuation { continuation in
120+ self . closingState = . closing( continuation)
121+
122+ switch self . connectionState {
123+ case . disconnected:
124+ break
125+
126+ case . connecting( let continuations) :
127+ for continuation in continuations {
128+ continuation. resume ( throwing: NewLambdaRuntimeError ( code: . closingRuntimeClient) )
129+ }
130+ self . connectionState = . connecting( [ ] )
131+
132+ case . connected( let channel, let lambdaChannelHandler) :
133+ channel. clo
134+ }
135+ }
136+
137+
138+ }
139+
103140 func nextInvocation( ) async throws -> ( Invocation , Writer ) {
104141 switch self . lambdaState {
105142 case . idle:
@@ -190,7 +227,20 @@ final actor NewLambdaRuntimeClient: LambdaRuntimeClientProtocol {
190227 }
191228
192229 private func channelClosed( _ channel: any Channel ) {
193- // TODO: Fill out
230+ switch self . connectionState {
231+ case . disconnected:
232+ break
233+
234+ case . connecting( let array) :
235+ self . connectionState = . disconnected
236+
237+ for continuation in array {
238+ continuation. resume ( throwing: NewLambdaRuntimeError ( code: . lostConnectionToControlPlane) )
239+ }
240+
241+ case . connected:
242+ self . connectionState = . disconnected
243+ }
194244 }
195245
196246 private func makeOrGetConnection( ) async throws -> LambdaChannelHandler < NewLambdaRuntimeClient > {
@@ -227,6 +277,7 @@ final actor NewLambdaRuntimeClient: LambdaRuntimeClientProtocol {
227277 return channel. eventLoop. makeFailedFuture ( error)
228278 }
229279 }
280+ . connectTimeout ( . seconds( 2 ) )
230281
231282 do {
232283 // connect directly via socket address to avoid happy eyeballs (perf)
@@ -279,6 +330,35 @@ extension NewLambdaRuntimeClient: LambdaChannelHandlerDelegate {
279330 }
280331
281332 nonisolated func connectionWillClose( channel: any Channel ) {
333+ self . assumeIsolated { isolated in
334+ switch isolated. connectionState {
335+ case . disconnected:
336+ // this case should never happen. But whatever
337+ if channel. isActive {
338+ isolated. closingConnections. append ( channel)
339+ }
340+
341+ case . connecting( let continuations) :
342+ // this case should never happen. But whatever
343+ if channel. isActive {
344+ isolated. closingConnections. append ( channel)
345+ }
346+
347+ for continuation in continuations {
348+ continuation. resume ( throwing: NewLambdaRuntimeError ( code: . connectionToControlPlaneLost) )
349+ }
350+
351+ case . connected( let stateChannel, _) :
352+ guard channel === stateChannel else {
353+ isolated. closingConnections. append ( channel)
354+ return
355+ }
356+
357+ isolated. connectionState = . disconnected
358+
359+
360+ }
361+ }
282362
283363 }
284364}
@@ -288,7 +368,7 @@ private protocol LambdaChannelHandlerDelegate {
288368 func connectionErrorHappened( _ error: any Error , channel: any Channel )
289369}
290370
291- private final class LambdaChannelHandler < Delegate> {
371+ private final class LambdaChannelHandler < Delegate: LambdaChannelHandlerDelegate > {
292372 let nextInvocationPath = Consts . invocationURLPrefix + Consts. getNextInvocationURLSuffix
293373
294374 enum State {
@@ -652,27 +732,18 @@ extension LambdaChannelHandler: ChannelInboundHandler {
652732
653733 func errorCaught( context: ChannelHandlerContext , error: Error ) {
654734 // pending responses will fail with lastError in channelInactive since we are calling context.close
735+ self . delegate. connectionErrorHappened ( error, channel: context. channel)
736+
655737 self . lastError = error
656738 context. channel. close ( promise: nil )
657739 }
658740
659741 func channelInactive( context: ChannelHandlerContext ) {
660742 // fail any pending responses with last error or assume peer disconnected
661- context. fireChannelInactive ( )
662743
663- // switch self.state {
664- // case .idle:
665- // break
666- //
667- // case .running(let promise, let timeout):
668- // self.state = .idle
669- // timeout?.cancel()
670- // promise.fail(self.lastError ?? HTTPClient.Errors.connectionResetByPeer)
671- //
672- // case .waitForConnectionClose(let response, let promise):
673- // self.state = .idle
674- // promise.succeed(response)
675- // }
744+ // we don't need to forward channelInactive to the delegate, as the delegate observes the
745+ // closeFuture
746+ context. fireChannelInactive ( )
676747 }
677748}
678749
0 commit comments