Skip to content

Commit 0c020df

Browse files
glbrnttrnro
andauthored
Strict concurrency for pipeline helpers (#503)
Motivation: To increase concurrency safety the NIOHTTP2 module should compile without warning under strict concurrency checking. Modifications: - Add a few temporary internal helpers - Annotate the pipeline helpers Result: Fewer concurrency warnings Co-authored-by: Rick Newton-Rogers <[email protected]>
1 parent 9f0d47e commit 0c020df

File tree

2 files changed

+186
-91
lines changed

2 files changed

+186
-91
lines changed

Sources/NIOHTTP2/HTTP2PipelineHelpers.swift

Lines changed: 151 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -71,26 +71,38 @@ extension ChannelPipeline {
7171
deprecated,
7272
renamed: "Channel.configureHTTP2SecureUpgrade(h2ChannelConfigurator:http1ChannelConfigurator:)"
7373
)
74+
@preconcurrency
7475
public func configureHTTP2SecureUpgrade(
75-
h2PipelineConfigurator: @escaping (ChannelPipeline) -> EventLoopFuture<Void>,
76-
http1PipelineConfigurator: @escaping (ChannelPipeline) -> EventLoopFuture<Void>
76+
h2PipelineConfigurator: @escaping @Sendable (ChannelPipeline) -> EventLoopFuture<Void>,
77+
http1PipelineConfigurator: @escaping @Sendable (ChannelPipeline) -> EventLoopFuture<Void>
7778
) -> EventLoopFuture<Void> {
78-
let alpnHandler = ApplicationProtocolNegotiationHandler { result in
79-
switch result {
80-
case .negotiated("h2"):
81-
// Successful upgrade to HTTP/2. Let the user configure the pipeline.
82-
return h2PipelineConfigurator(self)
83-
case .negotiated("http/1.1"), .fallback:
84-
// Explicit or implicit HTTP/1.1 choice.
85-
return http1PipelineConfigurator(self)
86-
case .negotiated:
87-
// We negotiated something that isn't HTTP/1.1. This is a bad scene, and is a good indication
88-
// of a user configuration error. We're going to close the connection directly.
89-
return self.close().flatMap { self.eventLoop.makeFailedFuture(NIOHTTP2Errors.InvalidALPNToken()) }
79+
@Sendable
80+
func makeALPNHandler() -> ApplicationProtocolNegotiationHandler {
81+
ApplicationProtocolNegotiationHandler { result in
82+
switch result {
83+
case .negotiated("h2"):
84+
// Successful upgrade to HTTP/2. Let the user configure the pipeline.
85+
return h2PipelineConfigurator(self)
86+
case .negotiated("http/1.1"), .fallback:
87+
// Explicit or implicit HTTP/1.1 choice.
88+
return http1PipelineConfigurator(self)
89+
case .negotiated:
90+
// We negotiated something that isn't HTTP/1.1. This is a bad scene, and is a good indication
91+
// of a user configuration error. We're going to close the connection directly.
92+
return self.close().flatMap { self.eventLoop.makeFailedFuture(NIOHTTP2Errors.InvalidALPNToken()) }
93+
}
9094
}
9195
}
9296

93-
return self.addHandler(alpnHandler)
97+
if self.eventLoop.inEventLoop {
98+
return self.eventLoop.assumeIsolatedUnsafeUnchecked().makeCompletedFuture {
99+
try self.syncOperations.addHandler(makeALPNHandler())
100+
}
101+
} else {
102+
return self.eventLoop.submit {
103+
try self.syncOperations.addHandler(makeALPNHandler())
104+
}
105+
}
94106
}
95107
}
96108

@@ -156,20 +168,29 @@ extension Channel {
156168
targetWindowSize: Int,
157169
inboundStreamStateInitializer: NIOChannelInitializerWithStreamID? = nil
158170
) -> EventLoopFuture<HTTP2StreamMultiplexer> {
159-
var handlers = [ChannelHandler]()
160-
handlers.reserveCapacity(2) // Update this if we need to add more handlers, to avoid unnecessary reallocation.
161-
handlers.append(NIOHTTP2Handler(mode: mode, initialSettings: initialLocalSettings))
162-
handlers.append(
163-
HTTP2StreamMultiplexer(
171+
@Sendable
172+
func configure() throws -> HTTP2StreamMultiplexer {
173+
let http2 = NIOHTTP2Handler(mode: mode, initialSettings: initialLocalSettings)
174+
let multiplexer = HTTP2StreamMultiplexer(
164175
mode: mode,
165176
channel: self,
166177
targetWindowSize: targetWindowSize,
167178
inboundStreamStateInitializer: inboundStreamStateInitializer
168179
)
169-
)
170180

171-
return self.pipeline.addHandlers(handlers, position: position).flatMap {
172-
self.pipeline.handler(type: HTTP2StreamMultiplexer.self)
181+
let handlers: [any ChannelHandler] = [http2, multiplexer]
182+
try self.pipeline.syncOperations.addHandlers(handlers, position: position)
183+
return multiplexer
184+
}
185+
186+
if self.eventLoop.inEventLoop {
187+
return self.eventLoop.assumeIsolatedUnsafeUnchecked().makeCompletedFuture {
188+
try configure()
189+
}
190+
} else {
191+
return self.eventLoop.submit {
192+
try configure()
193+
}
173194
}
174195
}
175196

@@ -195,28 +216,25 @@ extension Channel {
195216
targetWindowSize: Int = 65535,
196217
inboundStreamInitializer: NIOChannelInitializer?
197218
) -> EventLoopFuture<HTTP2StreamMultiplexer> {
219+
@Sendable
220+
func configure() throws -> HTTP2StreamMultiplexer {
221+
try self.pipeline.syncOperations.configureHTTP2Pipeline(
222+
mode: mode,
223+
channel: self,
224+
initialLocalSettings: initialLocalSettings,
225+
position: ChannelPipeline.SynchronousOperations.Position(position),
226+
targetWindowSize: targetWindowSize,
227+
inboundStreamInitializer: inboundStreamInitializer
228+
)
229+
}
198230

199231
if self.eventLoop.inEventLoop {
200-
return self.eventLoop.makeCompletedFuture {
201-
try self.pipeline.syncOperations.configureHTTP2Pipeline(
202-
mode: mode,
203-
channel: self,
204-
initialLocalSettings: initialLocalSettings,
205-
position: position,
206-
targetWindowSize: targetWindowSize,
207-
inboundStreamInitializer: inboundStreamInitializer
208-
)
232+
return self.eventLoop.assumeIsolatedUnsafeUnchecked().makeCompletedFuture {
233+
try configure()
209234
}
210235
} else {
211236
return self.eventLoop.submit {
212-
try self.pipeline.syncOperations.configureHTTP2Pipeline(
213-
mode: mode,
214-
channel: self,
215-
initialLocalSettings: initialLocalSettings,
216-
position: position,
217-
targetWindowSize: targetWindowSize,
218-
inboundStreamInitializer: inboundStreamInitializer
219-
)
237+
try configure()
220238
}
221239
}
222240
}
@@ -298,22 +316,35 @@ extension Channel {
298316
h2ChannelConfigurator: @escaping NIOChannelInitializer,
299317
http1ChannelConfigurator: @escaping NIOChannelInitializer
300318
) -> EventLoopFuture<Void> {
301-
let alpnHandler = ApplicationProtocolNegotiationHandler { result in
302-
switch result {
303-
case .negotiated("h2"):
304-
// Successful upgrade to HTTP/2. Let the user configure the pipeline.
305-
return h2ChannelConfigurator(self)
306-
case .negotiated("http/1.1"), .fallback:
307-
// Explicit or implicit HTTP/1.1 choice.
308-
return http1ChannelConfigurator(self)
309-
case .negotiated:
310-
// We negotiated something that isn't HTTP/1.1. This is a bad scene, and is a good indication
311-
// of a user configuration error. We're going to close the connection directly.
312-
return self.close().flatMap { self.eventLoop.makeFailedFuture(NIOHTTP2Errors.invalidALPNToken()) }
319+
@Sendable
320+
func makeALPNHandler() -> ApplicationProtocolNegotiationHandler {
321+
ApplicationProtocolNegotiationHandler { result in
322+
switch result {
323+
case .negotiated("h2"):
324+
// Successful upgrade to HTTP/2. Let the user configure the pipeline.
325+
return h2ChannelConfigurator(self)
326+
case .negotiated("http/1.1"), .fallback:
327+
// Explicit or implicit HTTP/1.1 choice.
328+
return http1ChannelConfigurator(self)
329+
case .negotiated:
330+
// We negotiated something that isn't HTTP/1.1. This is a bad scene, and is a good indication
331+
// of a user configuration error. We're going to close the connection directly.
332+
return self.close().flatMap { self.eventLoop.makeFailedFuture(NIOHTTP2Errors.invalidALPNToken()) }
333+
}
313334
}
314335
}
315336

316-
return self.pipeline.addHandler(alpnHandler)
337+
if self.eventLoop.inEventLoop {
338+
let alpnHandler = makeALPNHandler()
339+
return self.eventLoop.makeCompletedFuture {
340+
try self.pipeline.syncOperations.addHandler(alpnHandler)
341+
}
342+
} else {
343+
return self.eventLoop.submit {
344+
let alpnHandler = makeALPNHandler()
345+
try self.pipeline.syncOperations.addHandler(alpnHandler)
346+
}
347+
}
317348
}
318349

319350
/// Configures a `ChannelPipeline` to speak either HTTP/1.1 or HTTP/2 according to what can be negotiated with the client.
@@ -369,12 +400,18 @@ extension Channel {
369400
configurator: configurator,
370401
h2ConnectionChannelConfigurator: h2ConnectionChannelConfigurator
371402
) { channel in
372-
channel.configureHTTP2Pipeline(mode: .server, targetWindowSize: targetWindowSize) {
373-
streamChannel -> EventLoopFuture<Void> in
374-
streamChannel.pipeline.addHandler(HTTP2FramePayloadToHTTP1ServerCodec()).flatMap {
375-
() -> EventLoopFuture<Void> in
376-
configurator(streamChannel)
403+
channel.configureHTTP2Pipeline(
404+
mode: .server,
405+
targetWindowSize: targetWindowSize
406+
) { streamChannel -> EventLoopFuture<Void> in
407+
do {
408+
let sync = streamChannel.pipeline.syncOperations
409+
try sync.addHandlers(HTTP2FramePayloadToHTTP1ServerCodec())
410+
} catch {
411+
return streamChannel.eventLoop.makeFailedFuture(error)
377412
}
413+
414+
return configurator(streamChannel)
378415
}.map { _ in () }
379416
}
380417
}
@@ -416,10 +453,14 @@ extension Channel {
416453
streamConfiguration: streamConfiguration,
417454
streamDelegate: streamDelegate
418455
) { streamChannel -> EventLoopFuture<Void> in
419-
streamChannel.pipeline.addHandler(HTTP2FramePayloadToHTTP1ServerCodec()).flatMap {
420-
() -> EventLoopFuture<Void> in
421-
configurator(streamChannel)
456+
do {
457+
let sync = streamChannel.pipeline.syncOperations
458+
try sync.addHandlers(HTTP2FramePayloadToHTTP1ServerCodec())
459+
} catch {
460+
return streamChannel.eventLoop.makeFailedFuture(error)
422461
}
462+
463+
return configurator(streamChannel)
423464
}.map { _ in () }
424465
}
425466
}
@@ -485,7 +526,7 @@ extension ChannelPipeline.SynchronousOperations {
485526
inboundStreamInitializer: inboundStreamInitializer
486527
)
487528

488-
try self.addHandler(handler, position: position)
529+
try self.addHandler(handler, position: Position(position))
489530

490531
// `multiplexer` will always be non-nil when we are initializing with an `inboundStreamInitializer`
491532
return try handler.syncMultiplexer()
@@ -511,11 +552,10 @@ extension ChannelPipeline.SynchronousOperations {
511552
mode: NIOHTTP2Handler.ParserMode,
512553
channel: Channel,
513554
initialLocalSettings: [HTTP2Setting] = nioDefaultSettings,
514-
position: ChannelPipeline.Position = .last,
555+
position: ChannelPipeline.SynchronousOperations.Position = .last,
515556
targetWindowSize: Int = 65535,
516557
inboundStreamInitializer: NIOChannelInitializer?
517558
) throws -> HTTP2StreamMultiplexer {
518-
519559
let http2Handler = NIOHTTP2Handler(mode: mode, initialSettings: initialLocalSettings)
520560
let multiplexer = HTTP2StreamMultiplexer(
521561
mode: mode,
@@ -636,36 +676,24 @@ extension Channel {
636676
public func configureHTTP2AsyncSecureUpgrade<HTTP1Output: Sendable, HTTP2Output: Sendable>(
637677
http1ConnectionInitializer: @escaping NIOChannelInitializerWithOutput<HTTP1Output>,
638678
http2ConnectionInitializer: @escaping NIOChannelInitializerWithOutput<HTTP2Output>
639-
640679
) -> EventLoopFuture<EventLoopFuture<NIONegotiatedHTTPVersion<HTTP1Output, HTTP2Output>>> {
641-
let alpnHandler = NIOTypedApplicationProtocolNegotiationHandler<
642-
NIONegotiatedHTTPVersion<HTTP1Output, HTTP2Output>
643-
> { result in
644-
switch result {
645-
case .negotiated("h2"):
646-
// Successful upgrade to HTTP/2. Let the user configure the pipeline.
647-
return http2ConnectionInitializer(self).map { http2Output in .http2(http2Output) }
648-
case .negotiated("http/1.1"), .fallback:
649-
// Explicit or implicit HTTP/1.1 choice.
650-
return http1ConnectionInitializer(self).map { http1Output in .http1_1(http1Output) }
651-
case .negotiated:
652-
// We negotiated something that isn't HTTP/1.1. This is a bad scene, and is a good indication
653-
// of a user configuration error. We're going to close the connection directly.
654-
return self.close().flatMap { self.eventLoop.makeFailedFuture(NIOHTTP2Errors.invalidALPNToken()) }
680+
if self.eventLoop.inEventLoop {
681+
return self.eventLoop.makeCompletedFuture {
682+
self.pipeline.syncOperations.configureHTTP2AsyncSecureUpgrade(
683+
on: self,
684+
http1ConnectionInitializer: http1ConnectionInitializer,
685+
http2ConnectionInitializer: http2ConnectionInitializer
686+
)
655687
}
656-
}
657-
658-
return self.pipeline
659-
.addHandler(alpnHandler)
660-
.flatMap { _ in
661-
self.pipeline.handler(
662-
type: NIOTypedApplicationProtocolNegotiationHandler<
663-
NIONegotiatedHTTPVersion<HTTP1Output, HTTP2Output>
664-
>.self
665-
).map { alpnHandler in
666-
alpnHandler.protocolNegotiationResult
667-
}
688+
} else {
689+
return self.eventLoop.submit {
690+
self.pipeline.syncOperations.configureHTTP2AsyncSecureUpgrade(
691+
on: self,
692+
http1ConnectionInitializer: http1ConnectionInitializer,
693+
http2ConnectionInitializer: http2ConnectionInitializer
694+
)
668695
}
696+
}
669697
}
670698

671699
/// Configures a `ChannelPipeline` to speak either HTTP/1.1 or HTTP/2 according to what can be negotiated with the client.
@@ -861,6 +889,38 @@ extension ChannelPipeline.SynchronousOperations {
861889
inboundStreamChannels: inboundStreamChannels
862890
)
863891
}
892+
893+
@inlinable
894+
func configureHTTP2AsyncSecureUpgrade<HTTP1Output: Sendable, HTTP2Output: Sendable>(
895+
on channel: any Channel,
896+
http1ConnectionInitializer: @escaping NIOChannelInitializerWithOutput<HTTP1Output>,
897+
http2ConnectionInitializer: @escaping NIOChannelInitializerWithOutput<HTTP2Output>
898+
) -> EventLoopFuture<NIONegotiatedHTTPVersion<HTTP1Output, HTTP2Output>> {
899+
let alpnHandler = NIOTypedApplicationProtocolNegotiationHandler<
900+
NIONegotiatedHTTPVersion<HTTP1Output, HTTP2Output>
901+
> { result in
902+
switch result {
903+
case .negotiated("h2"):
904+
// Successful upgrade to HTTP/2. Let the user configure the pipeline.
905+
return http2ConnectionInitializer(channel).map { http2Output in .http2(http2Output) }
906+
case .negotiated("http/1.1"), .fallback:
907+
// Explicit or implicit HTTP/1.1 choice.
908+
return http1ConnectionInitializer(channel).map { http1Output in .http1_1(http1Output) }
909+
case .negotiated:
910+
// We negotiated something that isn't HTTP/1.1. This is a bad scene, and is a good indication
911+
// of a user configuration error. We're going to close the connection directly.
912+
return channel.close().flatMap { channel.eventLoop.makeFailedFuture(NIOHTTP2Errors.invalidALPNToken()) }
913+
}
914+
}
915+
916+
do {
917+
try self.addHandler(alpnHandler)
918+
} catch {
919+
return channel.eventLoop.makeFailedFuture(error)
920+
}
921+
922+
return alpnHandler.protocolNegotiationResult
923+
}
864924
}
865925

866926
/// `NIONegotiatedHTTPVersion` is a generic negotiation result holder for HTTP/1.1 and HTTP/2
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the SwiftNIO open source project
4+
//
5+
// Copyright (c) 2025 Apple Inc. and the SwiftNIO project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import NIOCore
16+
17+
// TODO: remove this extension and bump the NIO dependency
18+
// when https://github.com/apple/swift-nio/pull/3152 is released.
19+
extension NIOIsolatedEventLoop {
20+
@inlinable
21+
@available(*, noasync)
22+
func makeCompletedFuture<Success>(_ result: Result<Success, Error>) -> EventLoopFuture<Success> {
23+
let promise = self.nonisolated().makePromise(of: Success.self)
24+
promise.assumeIsolatedUnsafeUnchecked().completeWith(result)
25+
return promise.futureResult
26+
}
27+
28+
@inlinable
29+
@available(*, noasync)
30+
func makeCompletedFuture<Success>(
31+
withResultOf body: () throws -> Success
32+
) -> EventLoopFuture<Success> {
33+
self.makeCompletedFuture(Result(catching: body))
34+
}
35+
}

0 commit comments

Comments
 (0)