From b93a14c2d1b5ecd7ba61fb5c6479141c2785dc97 Mon Sep 17 00:00:00 2001 From: Aryan Shah Date: Wed, 4 Sep 2024 16:12:44 +0100 Subject: [PATCH 1/7] LambdaRuntime and closure initializer extensions --- Sources/AWSLambdaRuntime/Lambda+Codable.swift | 114 ++++++++++++++++++ Sources/AWSLambdaRuntimeCore/NewLambda.swift | 8 ++ .../NewLambdaRuntime.swift | 38 ++++++ 3 files changed, 160 insertions(+) create mode 100644 Sources/AWSLambdaRuntimeCore/NewLambdaRuntime.swift diff --git a/Sources/AWSLambdaRuntime/Lambda+Codable.swift b/Sources/AWSLambdaRuntime/Lambda+Codable.swift index fedd85c9..30fa642f 100644 --- a/Sources/AWSLambdaRuntime/Lambda+Codable.swift +++ b/Sources/AWSLambdaRuntime/Lambda+Codable.swift @@ -184,3 +184,117 @@ extension LambdaCodableAdapter { ) } } + +extension NewLambdaRuntime { + /// Initialize an instance with a ``StreamingLambdaHandler`` in the form of a closure. + /// - Parameter body: The handler in the form of a closure. + package convenience init( + body: @Sendable @escaping (ByteBuffer, LambdaResponseStreamWriter, NewLambdaContext) async throws -> Void + ) where Handler == StreamingClosureHandler { + self.init(handler: StreamingClosureHandler(body: body)) + } + + /// Initialize an instance with a ``NewLambdaHandler`` defined in the form of a closure **with a non-`Void` return type**, an encoder, and a decoder. + /// - Parameter body: The handler in the form of a closure. + /// - Parameter encoder: The encoder object that will be used to encode the generic ``Output`` into a ``ByteBuffer``. + /// - Parameter decoder: The decoder object that will be used to decode the incoming ``ByteBuffer`` event into the generic ``Event`` type. + package convenience init< + Event: Decodable, + Output: Encodable, + Encoder: LambdaOutputEncoder, + Decoder: LambdaEventDecoder + >( + encoder: Encoder, + decoder: Decoder, + body: @escaping (Event, NewLambdaContext) async throws -> Output + ) + where + Handler == LambdaCodableAdapter< + LambdaHandlerAdapter>, + Event, + Output, + Decoder, + Encoder + > + { + let handler = LambdaCodableAdapter( + encoder: encoder, + decoder: decoder, + handler: LambdaHandlerAdapter(handler: ClosureHandler(body: body)) + ) + + self.init(handler: handler) + } + + /// Initialize an instance with a ``NewLambdaHandler`` defined in the form of a closure **with a `Void` return type**, an encoder, and a decoder. + /// - Parameter body: The handler in the form of a closure. + /// - Parameter encoder: The encoder object that will be used to encode the generic ``Output`` into a ``ByteBuffer``. + /// - Parameter decoder: The decoder object that will be used to decode the incoming ``ByteBuffer`` event into the generic ``Event`` type. + package convenience init( + decoder: Decoder, + body: @escaping (Event, NewLambdaContext) async throws -> Void + ) + where + Handler == LambdaCodableAdapter< + LambdaHandlerAdapter>, + Event, + Void, + Decoder, + VoidEncoder + > + { + let handler = LambdaCodableAdapter( + decoder: decoder, + handler: LambdaHandlerAdapter(handler: ClosureHandler(body: body)) + ) + + self.init(handler: handler) + } + + /// Initialize an instance with a ``NewLambdaHandler`` defined in the form of a closure **with a non-`Void` return type**. + /// - note: ``JSONEncoder`` and ``JSONDecoder`` are used as the encoder and decoder objects. Use ``init(encoder:decoder:body:)`` to specify custom encoder and decoder objects. + /// - Parameter body: The handler in the form of a closure. + package convenience init( + body: @escaping (Event, NewLambdaContext) async throws -> Output + ) + where + Handler == LambdaCodableAdapter< + LambdaHandlerAdapter>, + Event, + Output, + JSONDecoder, + LambdaJSONOutputEncoder + > + { + let handler = LambdaCodableAdapter( + encoder: JSONEncoder(), + decoder: JSONDecoder(), + handler: LambdaHandlerAdapter(handler: ClosureHandler(body: body)) + ) + + self.init(handler: handler) + } + + /// Initialize an instance with a ``NewLambdaHandler`` defined in the form of a closure **with a `Void` return type**. + /// - note: ``JSONDecoder`` is used as the decoder object. Use ``init(decoder:body:)`` to specify a custom decoder object. + /// - Parameter body: The handler in the form of a closure. + package convenience init( + body: @escaping (Event, NewLambdaContext) async throws -> Void + ) + where + Handler == LambdaCodableAdapter< + LambdaHandlerAdapter>, + Event, + Void, + JSONDecoder, + VoidEncoder + > + { + let handler = LambdaCodableAdapter( + decoder: JSONDecoder(), + handler: LambdaHandlerAdapter(handler: ClosureHandler(body: body)) + ) + + self.init(handler: handler) + } +} diff --git a/Sources/AWSLambdaRuntimeCore/NewLambda.swift b/Sources/AWSLambdaRuntimeCore/NewLambda.swift index 28eb7df9..d920e24b 100644 --- a/Sources/AWSLambdaRuntimeCore/NewLambda.swift +++ b/Sources/AWSLambdaRuntimeCore/NewLambda.swift @@ -14,6 +14,7 @@ import Dispatch import Logging +import NIOCore extension Lambda { package static func runLoop( @@ -44,4 +45,11 @@ extension Lambda { } } } + + /// The default EventLoop the Lambda is scheduled on. + package static var defaultEventLoop: any EventLoop { + get { + NIOSingletons.posixEventLoopGroup.next() + } + } } diff --git a/Sources/AWSLambdaRuntimeCore/NewLambdaRuntime.swift b/Sources/AWSLambdaRuntimeCore/NewLambdaRuntime.swift new file mode 100644 index 00000000..40323212 --- /dev/null +++ b/Sources/AWSLambdaRuntimeCore/NewLambdaRuntime.swift @@ -0,0 +1,38 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftAWSLambdaRuntime open source project +// +// Copyright (c) 2024 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Foundation +//import ServiceLifecycle +import Logging +import NIOCore +import Synchronization + +package final class NewLambdaRuntime: Sendable where Handler: StreamingLambdaHandler { + let handlerMutex: Mutex + let logger: Logger + let eventLoop: EventLoop + + package init( + handler: sending Handler, + eventLoop: EventLoop = Lambda.defaultEventLoop, + logger: Logger = Logger(label: "LambdaRuntime") + ) { + self.handlerMutex = Mutex(handler) + self.eventLoop = eventLoop + self.logger = logger + } + + package func run() async throws { + } +} From ddc538d22b17f12cce8a9e5ddc989a8151ef5432 Mon Sep 17 00:00:00 2001 From: Aryan Shah Date: Thu, 5 Sep 2024 10:52:58 +0100 Subject: [PATCH 2/7] Fix missing host header, implement runtime's run function, fix defaultEventLoop --- Sources/AWSLambdaRuntimeCore/NewLambda.swift | 6 +-- .../NewLambdaRuntime.swift | 32 ++++++++++- .../NewLambdaRuntimeClient.swift | 54 ++++++++++--------- .../NewLambdaRuntimeError.swift | 2 + 4 files changed, 63 insertions(+), 31 deletions(-) diff --git a/Sources/AWSLambdaRuntimeCore/NewLambda.swift b/Sources/AWSLambdaRuntimeCore/NewLambda.swift index d920e24b..d72df20b 100644 --- a/Sources/AWSLambdaRuntimeCore/NewLambda.swift +++ b/Sources/AWSLambdaRuntimeCore/NewLambda.swift @@ -47,9 +47,5 @@ extension Lambda { } /// The default EventLoop the Lambda is scheduled on. - package static var defaultEventLoop: any EventLoop { - get { - NIOSingletons.posixEventLoopGroup.next() - } - } + package static var defaultEventLoop: any EventLoop = NIOSingletons.posixEventLoopGroup.next() } diff --git a/Sources/AWSLambdaRuntimeCore/NewLambdaRuntime.swift b/Sources/AWSLambdaRuntimeCore/NewLambdaRuntime.swift index 40323212..2a04e25b 100644 --- a/Sources/AWSLambdaRuntimeCore/NewLambdaRuntime.swift +++ b/Sources/AWSLambdaRuntimeCore/NewLambdaRuntime.swift @@ -19,7 +19,7 @@ import NIOCore import Synchronization package final class NewLambdaRuntime: Sendable where Handler: StreamingLambdaHandler { - let handlerMutex: Mutex + let handlerMutex: Mutex let logger: Logger let eventLoop: EventLoop @@ -34,5 +34,35 @@ package final class NewLambdaRuntime: Sendable where Handler: Streaming } package func run() async throws { + guard let runtimeEndpoint = Lambda.env("AWS_LAMBDA_RUNTIME_API") else { + throw NewLambdaRuntimeError(code: .cannotStartLambdaRuntime) + } + + let ipAndPort = runtimeEndpoint.split(separator: ":", maxSplits: 1) + let ip = String(ipAndPort[0]) + let port = Int(ipAndPort[1])! + + let handler = self.handlerMutex.withLock { maybeHandler in + defer { + maybeHandler = nil + } + return maybeHandler + } + + guard let handler else { + throw NewLambdaRuntimeError(code: .runtimeCanOnlyBeStartedOnce) + } + + try await NewLambdaRuntimeClient.withRuntimeClient( + configuration: .init(ip: ip, port: port), + eventLoop: self.eventLoop, + logger: self.logger + ) { runtimeClient in + try await Lambda.runLoop( + runtimeClient: runtimeClient, + handler: handler, + logger: self.logger + ) + } } } diff --git a/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift b/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift index 98db7bf9..7cf0de0a 100644 --- a/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift +++ b/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift @@ -303,7 +303,7 @@ final actor NewLambdaRuntimeClient: LambdaRuntimeClientProtocol { NIOHTTPClientResponseAggregator(maxContentLength: 6 * 1024 * 1024) ) try channel.pipeline.syncOperations.addHandler( - LambdaChannelHandler(delegate: self, logger: self.logger) + LambdaChannelHandler(delegate: self, logger: self.logger, configuration: self.configuration) ) return channel.eventLoop.makeSucceededFuture(()) } catch { @@ -433,10 +433,32 @@ private final class LambdaChannelHandler private var reusableErrorBuffer: ByteBuffer? private let logger: Logger private let delegate: Delegate + private let configuration: NewLambdaRuntimeClient.Configuration - init(delegate: Delegate, logger: Logger) { + let defaultHeaders: HTTPHeaders + /// These headers must be sent along an invocation or initialization error report + let errorHeaders: HTTPHeaders + /// These headers must be sent along an invocation or initialization error report + let streamingHeaders: HTTPHeaders + + init(delegate: Delegate, logger: Logger, configuration: NewLambdaRuntimeClient.Configuration) { self.delegate = delegate self.logger = logger + self.configuration = configuration + self.defaultHeaders = [ + "host": "\(self.configuration.ip):\(self.configuration.port)", + "user-agent": "Swift-Lambda/Unknown", + ] + self.errorHeaders = [ + "host": "\(self.configuration.ip):\(self.configuration.port)", + "user-agent": "Swift-Lambda/Unknown", + "lambda-runtime-function-error-type": "Unhandled", + ] + self.streamingHeaders = [ + "host": "\(self.configuration.ip):\(self.configuration.port)", + "user-agent": "Swift-Lambda/Unknown", + "transfer-encoding": "streaming", + ] } func nextInvocation(isolation: isolated (any Actor)? = #isolation) async throws -> Invocation { @@ -578,7 +600,7 @@ private final class LambdaChannelHandler version: .http1_1, method: .POST, uri: url, - headers: NewLambdaRuntimeClient.streamingHeaders + headers: self.streamingHeaders ) context.write(self.wrapOutboundOut(.head(httpRequest)), promise: nil) @@ -604,11 +626,12 @@ private final class LambdaChannelHandler let headers: HTTPHeaders = if byteBuffer?.readableBytes ?? 0 < 6_000_000 { [ + "host": "\(self.configuration.ip):\(self.configuration.port)", "user-agent": "Swift-Lambda/Unknown", "content-length": "\(byteBuffer?.readableBytes ?? 0)", ] } else { - NewLambdaRuntimeClient.streamingHeaders + self.streamingHeaders } let httpRequest = HTTPRequestHead( @@ -634,7 +657,7 @@ private final class LambdaChannelHandler version: .http1_1, method: .GET, uri: self.nextInvocationPath, - headers: NewLambdaRuntimeClient.defaultHeaders + headers: self.defaultHeaders ) context.write(self.wrapOutboundOut(.head(httpRequest)), promise: nil) @@ -650,7 +673,7 @@ private final class LambdaChannelHandler version: .http1_1, method: .POST, uri: url, - headers: NewLambdaRuntimeClient.errorHeaders + headers: self.errorHeaders ) if self.reusableErrorBuffer == nil { @@ -797,22 +820,3 @@ extension LambdaChannelHandler: ChannelInboundHandler { context.fireChannelInactive() } } - -extension NewLambdaRuntimeClient { - static let defaultHeaders: HTTPHeaders = [ - "user-agent": "Swift-Lambda/Unknown" - ] - - /// These headers must be sent along an invocation or initialization error report - static let errorHeaders: HTTPHeaders = [ - "user-agent": "Swift-Lambda/Unknown", - "lambda-runtime-function-error-type": "Unhandled", - ] - - /// These headers must be sent along an invocation or initialization error report - static let streamingHeaders: HTTPHeaders = [ - "user-agent": "Swift-Lambda/Unknown", - "transfer-encoding": "streaming", - ] - -} diff --git a/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeError.swift b/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeError.swift index 2468ef99..7a7b4cfb 100644 --- a/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeError.swift +++ b/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeError.swift @@ -29,6 +29,8 @@ package struct NewLambdaRuntimeError: Error { case nextInvocationMissingHeaderDeadline case nextInvocationMissingHeaderInvokeFuctionARN + case cannotStartLambdaRuntime + case runtimeCanOnlyBeStartedOnce } package init(code: Code, underlying: (any Error)? = nil) { From 3804510134d88a81f8cf536eec826e5f4e82d4ef Mon Sep 17 00:00:00 2001 From: Aryan Shah Date: Thu, 5 Sep 2024 11:07:14 +0100 Subject: [PATCH 3/7] Move StreamingClosureHandler initializer to AWSLambdaRuntimeCore, fix codable initializers. --- Sources/AWSLambdaRuntime/Lambda+Codable.swift | 26 ++++++++----------- .../AWSLambdaRuntimeCore/NewLambda+JSON.swift | 10 +++++++ 2 files changed, 21 insertions(+), 15 deletions(-) diff --git a/Sources/AWSLambdaRuntime/Lambda+Codable.swift b/Sources/AWSLambdaRuntime/Lambda+Codable.swift index 9b487890..91be77dc 100644 --- a/Sources/AWSLambdaRuntime/Lambda+Codable.swift +++ b/Sources/AWSLambdaRuntime/Lambda+Codable.swift @@ -67,14 +67,6 @@ extension LambdaCodableAdapter { } extension NewLambdaRuntime { - /// Initialize an instance with a ``StreamingLambdaHandler`` in the form of a closure. - /// - Parameter body: The handler in the form of a closure. - package convenience init( - body: @Sendable @escaping (ByteBuffer, LambdaResponseStreamWriter, NewLambdaContext) async throws -> Void - ) where Handler == StreamingClosureHandler { - self.init(handler: StreamingClosureHandler(body: body)) - } - /// Initialize an instance with a ``NewLambdaHandler`` defined in the form of a closure **with a non-`Void` return type**, an encoder, and a decoder. /// - Parameter body: The handler in the form of a closure. /// - Parameter encoder: The encoder object that will be used to encode the generic ``Output`` into a ``ByteBuffer``. @@ -133,10 +125,13 @@ extension NewLambdaRuntime { } /// Initialize an instance with a ``NewLambdaHandler`` defined in the form of a closure **with a non-`Void` return type**. - /// - note: ``JSONEncoder`` and ``JSONDecoder`` are used as the encoder and decoder objects. Use ``init(encoder:decoder:body:)`` to specify custom encoder and decoder objects. /// - Parameter body: The handler in the form of a closure. + /// - Parameter encoder: The encoder object that will be used to encode the generic ``Output`` into a ``ByteBuffer``. ``JSONEncoder()`` used as default. + /// - Parameter decoder: The decoder object that will be used to decode the incoming ``ByteBuffer`` event into the generic ``Event`` type. ``JSONDecoder()`` used as default. package convenience init( - body: @escaping (Event, NewLambdaContext) async throws -> Output + body: @escaping (Event, NewLambdaContext) async throws -> Output, + encoder: JSONEncoder = JSONEncoder(), + decoder: JSONDecoder = JSONDecoder() ) where Handler == LambdaCodableAdapter< @@ -148,8 +143,8 @@ extension NewLambdaRuntime { > { let handler = LambdaCodableAdapter( - encoder: JSONEncoder(), - decoder: JSONDecoder(), + encoder: encoder, + decoder: decoder, handler: LambdaHandlerAdapter(handler: ClosureHandler(body: body)) ) @@ -157,10 +152,11 @@ extension NewLambdaRuntime { } /// Initialize an instance with a ``NewLambdaHandler`` defined in the form of a closure **with a `Void` return type**. - /// - note: ``JSONDecoder`` is used as the decoder object. Use ``init(decoder:body:)`` to specify a custom decoder object. /// - Parameter body: The handler in the form of a closure. + /// - Parameter decoder: The decoder object that will be used to decode the incoming ``ByteBuffer`` event into the generic ``Event`` type. ``JSONDecoder()`` used as default. package convenience init( - body: @escaping (Event, NewLambdaContext) async throws -> Void + body: @escaping (Event, NewLambdaContext) async throws -> Void, + decoder: JSONDecoder = JSONDecoder() ) where Handler == LambdaCodableAdapter< @@ -172,7 +168,7 @@ extension NewLambdaRuntime { > { let handler = LambdaCodableAdapter( - decoder: JSONDecoder(), + decoder: decoder, handler: LambdaHandlerAdapter(handler: ClosureHandler(body: body)) ) diff --git a/Sources/AWSLambdaRuntimeCore/NewLambda+JSON.swift b/Sources/AWSLambdaRuntimeCore/NewLambda+JSON.swift index 4475a81f..f5e81509 100644 --- a/Sources/AWSLambdaRuntimeCore/NewLambda+JSON.swift +++ b/Sources/AWSLambdaRuntimeCore/NewLambda+JSON.swift @@ -159,3 +159,13 @@ where Output == Encoder.Output { try await self.underlyingStreamWriter.writeAndFinish(outputBuffer) } } + +extension NewLambdaRuntime { + /// Initialize an instance with a ``StreamingLambdaHandler`` in the form of a closure. + /// - Parameter body: The handler in the form of a closure. + package convenience init( + body: @Sendable @escaping (ByteBuffer, LambdaResponseStreamWriter, NewLambdaContext) async throws -> Void + ) where Handler == StreamingClosureHandler { + self.init(handler: StreamingClosureHandler(body: body)) + } +} From aed1a8ec546fe9dde1139e6cb888d87ee0e8fd5d Mon Sep 17 00:00:00 2001 From: Aryan Shah Date: Thu, 5 Sep 2024 11:13:28 +0100 Subject: [PATCH 4/7] Refactor ip and port extraction --- Sources/AWSLambdaRuntimeCore/NewLambdaRuntime.swift | 2 +- Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeError.swift | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/Sources/AWSLambdaRuntimeCore/NewLambdaRuntime.swift b/Sources/AWSLambdaRuntimeCore/NewLambdaRuntime.swift index 2a04e25b..35e36061 100644 --- a/Sources/AWSLambdaRuntimeCore/NewLambdaRuntime.swift +++ b/Sources/AWSLambdaRuntimeCore/NewLambdaRuntime.swift @@ -40,7 +40,7 @@ package final class NewLambdaRuntime: Sendable where Handler: Streaming let ipAndPort = runtimeEndpoint.split(separator: ":", maxSplits: 1) let ip = String(ipAndPort[0]) - let port = Int(ipAndPort[1])! + guard let port = Int(ipAndPort[1]) else { throw NewLambdaRuntimeError(code: .invalidPort) } let handler = self.handlerMutex.withLock { maybeHandler in defer { diff --git a/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeError.swift b/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeError.swift index 7a7b4cfb..29b3d7ee 100644 --- a/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeError.swift +++ b/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeError.swift @@ -31,6 +31,7 @@ package struct NewLambdaRuntimeError: Error { case cannotStartLambdaRuntime case runtimeCanOnlyBeStartedOnce + case invalidPort } package init(code: Code, underlying: (any Error)? = nil) { From ea0f0d30f218393eb24944d3d288cf86c546168d Mon Sep 17 00:00:00 2001 From: Aryan Shah Date: Thu, 5 Sep 2024 11:54:52 +0100 Subject: [PATCH 5/7] Fix headers comment, fix transfer-encoding value of streamingHeaders, update env var missing error name --- Sources/AWSLambdaRuntimeCore/NewLambdaRuntime.swift | 2 +- Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift | 5 +++-- Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeError.swift | 2 +- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/Sources/AWSLambdaRuntimeCore/NewLambdaRuntime.swift b/Sources/AWSLambdaRuntimeCore/NewLambdaRuntime.swift index 35e36061..881efa31 100644 --- a/Sources/AWSLambdaRuntimeCore/NewLambdaRuntime.swift +++ b/Sources/AWSLambdaRuntimeCore/NewLambdaRuntime.swift @@ -35,7 +35,7 @@ package final class NewLambdaRuntime: Sendable where Handler: Streaming package func run() async throws { guard let runtimeEndpoint = Lambda.env("AWS_LAMBDA_RUNTIME_API") else { - throw NewLambdaRuntimeError(code: .cannotStartLambdaRuntime) + throw NewLambdaRuntimeError(code: .missingLambdaRuntimeAPIEnvironmentVariable) } let ipAndPort = runtimeEndpoint.split(separator: ":", maxSplits: 1) diff --git a/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift b/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift index 7cf0de0a..46199e98 100644 --- a/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift +++ b/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift @@ -435,10 +435,11 @@ private final class LambdaChannelHandler private let delegate: Delegate private let configuration: NewLambdaRuntimeClient.Configuration + /// These are the default headers that must be sent along an invocation let defaultHeaders: HTTPHeaders /// These headers must be sent along an invocation or initialization error report let errorHeaders: HTTPHeaders - /// These headers must be sent along an invocation or initialization error report + /// These headers must be sent when streaming a response let streamingHeaders: HTTPHeaders init(delegate: Delegate, logger: Logger, configuration: NewLambdaRuntimeClient.Configuration) { @@ -457,7 +458,7 @@ private final class LambdaChannelHandler self.streamingHeaders = [ "host": "\(self.configuration.ip):\(self.configuration.port)", "user-agent": "Swift-Lambda/Unknown", - "transfer-encoding": "streaming", + "transfer-encoding": "chunked", ] } diff --git a/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeError.swift b/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeError.swift index 29b3d7ee..02a4d8b8 100644 --- a/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeError.swift +++ b/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeError.swift @@ -29,7 +29,7 @@ package struct NewLambdaRuntimeError: Error { case nextInvocationMissingHeaderDeadline case nextInvocationMissingHeaderInvokeFuctionARN - case cannotStartLambdaRuntime + case missingLambdaRuntimeAPIEnvironmentVariable case runtimeCanOnlyBeStartedOnce case invalidPort } From 68bca217b69e3a9eef9fe4ec9f2c2c2c6d5ef2bc Mon Sep 17 00:00:00 2001 From: Aryan Shah Date: Thu, 5 Sep 2024 12:58:39 +0100 Subject: [PATCH 6/7] Move closure initializers not using Foundation to NewLambdaHandlers.swift in AWSLambdaRuntimeCore --- Sources/AWSLambdaRuntime/Lambda+Codable.swift | 57 ---------------- .../AWSLambdaRuntimeCore/NewLambda+JSON.swift | 10 --- .../NewLambdaHandlers.swift | 67 +++++++++++++++++++ 3 files changed, 67 insertions(+), 67 deletions(-) diff --git a/Sources/AWSLambdaRuntime/Lambda+Codable.swift b/Sources/AWSLambdaRuntime/Lambda+Codable.swift index 91be77dc..1f59b8f7 100644 --- a/Sources/AWSLambdaRuntime/Lambda+Codable.swift +++ b/Sources/AWSLambdaRuntime/Lambda+Codable.swift @@ -67,63 +67,6 @@ extension LambdaCodableAdapter { } extension NewLambdaRuntime { - /// Initialize an instance with a ``NewLambdaHandler`` defined in the form of a closure **with a non-`Void` return type**, an encoder, and a decoder. - /// - Parameter body: The handler in the form of a closure. - /// - Parameter encoder: The encoder object that will be used to encode the generic ``Output`` into a ``ByteBuffer``. - /// - Parameter decoder: The decoder object that will be used to decode the incoming ``ByteBuffer`` event into the generic ``Event`` type. - package convenience init< - Event: Decodable, - Output: Encodable, - Encoder: LambdaOutputEncoder, - Decoder: LambdaEventDecoder - >( - encoder: Encoder, - decoder: Decoder, - body: @escaping (Event, NewLambdaContext) async throws -> Output - ) - where - Handler == LambdaCodableAdapter< - LambdaHandlerAdapter>, - Event, - Output, - Decoder, - Encoder - > - { - let handler = LambdaCodableAdapter( - encoder: encoder, - decoder: decoder, - handler: LambdaHandlerAdapter(handler: ClosureHandler(body: body)) - ) - - self.init(handler: handler) - } - - /// Initialize an instance with a ``NewLambdaHandler`` defined in the form of a closure **with a `Void` return type**, an encoder, and a decoder. - /// - Parameter body: The handler in the form of a closure. - /// - Parameter encoder: The encoder object that will be used to encode the generic ``Output`` into a ``ByteBuffer``. - /// - Parameter decoder: The decoder object that will be used to decode the incoming ``ByteBuffer`` event into the generic ``Event`` type. - package convenience init( - decoder: Decoder, - body: @escaping (Event, NewLambdaContext) async throws -> Void - ) - where - Handler == LambdaCodableAdapter< - LambdaHandlerAdapter>, - Event, - Void, - Decoder, - VoidEncoder - > - { - let handler = LambdaCodableAdapter( - decoder: decoder, - handler: LambdaHandlerAdapter(handler: ClosureHandler(body: body)) - ) - - self.init(handler: handler) - } - /// Initialize an instance with a ``NewLambdaHandler`` defined in the form of a closure **with a non-`Void` return type**. /// - Parameter body: The handler in the form of a closure. /// - Parameter encoder: The encoder object that will be used to encode the generic ``Output`` into a ``ByteBuffer``. ``JSONEncoder()`` used as default. diff --git a/Sources/AWSLambdaRuntimeCore/NewLambda+JSON.swift b/Sources/AWSLambdaRuntimeCore/NewLambda+JSON.swift index f5e81509..4475a81f 100644 --- a/Sources/AWSLambdaRuntimeCore/NewLambda+JSON.swift +++ b/Sources/AWSLambdaRuntimeCore/NewLambda+JSON.swift @@ -159,13 +159,3 @@ where Output == Encoder.Output { try await self.underlyingStreamWriter.writeAndFinish(outputBuffer) } } - -extension NewLambdaRuntime { - /// Initialize an instance with a ``StreamingLambdaHandler`` in the form of a closure. - /// - Parameter body: The handler in the form of a closure. - package convenience init( - body: @Sendable @escaping (ByteBuffer, LambdaResponseStreamWriter, NewLambdaContext) async throws -> Void - ) where Handler == StreamingClosureHandler { - self.init(handler: StreamingClosureHandler(body: body)) - } -} diff --git a/Sources/AWSLambdaRuntimeCore/NewLambdaHandlers.swift b/Sources/AWSLambdaRuntimeCore/NewLambdaHandlers.swift index 0f9e7412..2464a486 100644 --- a/Sources/AWSLambdaRuntimeCore/NewLambdaHandlers.swift +++ b/Sources/AWSLambdaRuntimeCore/NewLambdaHandlers.swift @@ -171,3 +171,70 @@ package struct ClosureHandler: NewLambdaHandler { try await self.body(event, context) } } + +extension NewLambdaRuntime { + /// Initialize an instance with a ``StreamingLambdaHandler`` in the form of a closure. + /// - Parameter body: The handler in the form of a closure. + package convenience init( + body: @Sendable @escaping (ByteBuffer, LambdaResponseStreamWriter, NewLambdaContext) async throws -> Void + ) where Handler == StreamingClosureHandler { + self.init(handler: StreamingClosureHandler(body: body)) + } + + /// Initialize an instance with a ``NewLambdaHandler`` defined in the form of a closure **with a non-`Void` return type**, an encoder, and a decoder. + /// - Parameter body: The handler in the form of a closure. + /// - Parameter encoder: The encoder object that will be used to encode the generic ``Output`` into a ``ByteBuffer``. + /// - Parameter decoder: The decoder object that will be used to decode the incoming ``ByteBuffer`` event into the generic ``Event`` type. + package convenience init< + Event: Decodable, + Output: Encodable, + Encoder: LambdaOutputEncoder, + Decoder: LambdaEventDecoder + >( + encoder: Encoder, + decoder: Decoder, + body: @escaping (Event, NewLambdaContext) async throws -> Output + ) + where + Handler == LambdaCodableAdapter< + LambdaHandlerAdapter>, + Event, + Output, + Decoder, + Encoder + > + { + let handler = LambdaCodableAdapter( + encoder: encoder, + decoder: decoder, + handler: LambdaHandlerAdapter(handler: ClosureHandler(body: body)) + ) + + self.init(handler: handler) + } + + /// Initialize an instance with a ``NewLambdaHandler`` defined in the form of a closure **with a `Void` return type**, an encoder, and a decoder. + /// - Parameter body: The handler in the form of a closure. + /// - Parameter encoder: The encoder object that will be used to encode the generic ``Output`` into a ``ByteBuffer``. + /// - Parameter decoder: The decoder object that will be used to decode the incoming ``ByteBuffer`` event into the generic ``Event`` type. + package convenience init( + decoder: Decoder, + body: @escaping (Event, NewLambdaContext) async throws -> Void + ) + where + Handler == LambdaCodableAdapter< + LambdaHandlerAdapter>, + Event, + Void, + Decoder, + VoidEncoder + > + { + let handler = LambdaCodableAdapter( + decoder: decoder, + handler: LambdaHandlerAdapter(handler: ClosureHandler(body: body)) + ) + + self.init(handler: handler) + } +} From 58f168e37a1c8864c88c6b8a7381c1ec265d738f Mon Sep 17 00:00:00 2001 From: Aryan Shah Date: Thu, 5 Sep 2024 14:29:07 +0100 Subject: [PATCH 7/7] Use `NIOLockedValueBox` instead of `Mutex` --- .../NewLambdaRuntime.swift | 22 ++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/Sources/AWSLambdaRuntimeCore/NewLambdaRuntime.swift b/Sources/AWSLambdaRuntimeCore/NewLambdaRuntime.swift index 881efa31..41212b22 100644 --- a/Sources/AWSLambdaRuntimeCore/NewLambdaRuntime.swift +++ b/Sources/AWSLambdaRuntimeCore/NewLambdaRuntime.swift @@ -13,13 +13,16 @@ //===----------------------------------------------------------------------===// import Foundation -//import ServiceLifecycle import Logging import NIOCore -import Synchronization +import NIOConcurrencyHelpers -package final class NewLambdaRuntime: Sendable where Handler: StreamingLambdaHandler { - let handlerMutex: Mutex +// We need `@unchecked` Sendable here, as `NIOLockedValueBox` does not understand `sending` today. +// We don't want to use `NIOLockedValueBox` here anyway. We would love to use Mutex here, but this +// sadly crashes the compiler today. +package final class NewLambdaRuntime: @unchecked Sendable where Handler: StreamingLambdaHandler { + // TODO: We want to change this to Mutex as soon as this doesn't crash the Swift compiler on Linux anymore + let handlerMutex: NIOLockedValueBox> let logger: Logger let eventLoop: EventLoop @@ -28,7 +31,7 @@ package final class NewLambdaRuntime: Sendable where Handler: Streaming eventLoop: EventLoop = Lambda.defaultEventLoop, logger: Logger = Logger(label: "LambdaRuntime") ) { - self.handlerMutex = Mutex(handler) + self.handlerMutex = NIOLockedValueBox(handler) self.eventLoop = eventLoop self.logger = logger } @@ -42,11 +45,10 @@ package final class NewLambdaRuntime: Sendable where Handler: Streaming let ip = String(ipAndPort[0]) guard let port = Int(ipAndPort[1]) else { throw NewLambdaRuntimeError(code: .invalidPort) } - let handler = self.handlerMutex.withLock { maybeHandler in - defer { - maybeHandler = nil - } - return maybeHandler + let handler = self.handlerMutex.withLockedValue { handler in + let result = handler + handler = nil + return result } guard let handler else {