Skip to content

Commit ae0ad23

Browse files
authored
Merge branch 'main' into swift_config
2 parents 4c55cd2 + e2ab0d1 commit ae0ad23

17 files changed

+505
-192
lines changed

Sources/AsyncHTTPClient/AsyncAwait/Transaction+StateMachine.swift

Lines changed: 71 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ extension Transaction {
3838
case requestHeadSent
3939
case producing
4040
case paused(continuation: CheckedContinuation<Void, Error>?)
41+
case endForwarded
4142
case finished
4243
}
4344

@@ -97,7 +98,8 @@ extension Transaction {
9798
bodyStreamContinuation: CheckedContinuation<Void, Error>?
9899
)
99100

100-
case failRequestStreamContinuation(CheckedContinuation<Void, Error>, Error)
101+
case failRequestStreamContinuation(CheckedContinuation<Void, Error>, Error, HTTPRequestExecutor)
102+
case cancelExecutor(HTTPRequestExecutor)
101103
}
102104

103105
mutating func fail(_ error: Error) -> FailAction {
@@ -135,7 +137,7 @@ extension Transaction {
135137
bodyStreamContinuation: continuation
136138
)
137139

138-
case .requestHeadSent, .finished, .producing, .paused(continuation: .none):
140+
case .requestHeadSent, .endForwarded, .finished, .producing, .paused(continuation: .none):
139141
self.state = .finished(error: error)
140142
return .failResponseHead(
141143
context.continuation,
@@ -156,12 +158,29 @@ extension Transaction {
156158
context.executor,
157159
bodyStreamContinuation: bodyStreamContinuation
158160
)
159-
case .finished, .producing, .requestHeadSent:
161+
case .endForwarded, .finished, .producing, .requestHeadSent:
160162
return .failResponseStream(source, error, context.executor, bodyStreamContinuation: nil)
161163
}
162164

163-
case .finished(error: _),
164-
.executing(_, _, .finished):
165+
case .executing(let context, let requestStreamState, .finished):
166+
// an error occured after full response received, but before the full request was sent
167+
self.state = .finished(error: error)
168+
switch requestStreamState {
169+
case .paused(let bodyStreamContinuation):
170+
if let bodyStreamContinuation {
171+
return .failRequestStreamContinuation(
172+
bodyStreamContinuation,
173+
error,
174+
context.executor
175+
)
176+
} else {
177+
return .cancelExecutor(context.executor)
178+
}
179+
case .endForwarded, .finished, .producing, .requestHeadSent:
180+
return .cancelExecutor(context.executor)
181+
}
182+
183+
case .finished(error: _):
165184
return .none
166185
}
167186
}
@@ -232,7 +251,7 @@ extension Transaction {
232251
self.state = .executing(context, .producing, responseState)
233252
return .resumeStream(continuation)
234253

235-
case .executing(_, .finished, _):
254+
case .executing(_, .endForwarded, _), .executing(_, .finished, _):
236255
// the channels writability changed to writable after we have forwarded all the
237256
// request bytes. Can be ignored.
238257
return .none
@@ -254,6 +273,7 @@ extension Transaction {
254273
self.state = .executing(context, .paused(continuation: nil), responseSteam)
255274

256275
case .executing(_, .paused, _),
276+
.executing(_, .endForwarded, _),
257277
.executing(_, .finished, _),
258278
.finished:
259279
// the channels writability changed to paused after we have already forwarded all
@@ -298,7 +318,7 @@ extension Transaction {
298318
"A write continuation already exists, but we tried to set another one. Invalid state: \(self.state)"
299319
)
300320

301-
case .finished, .executing(_, .finished, _):
321+
case .finished, .executing(_, .endForwarded, _), .executing(_, .finished, _):
302322
return .fail
303323
}
304324
}
@@ -309,6 +329,7 @@ extension Transaction {
309329
.queued,
310330
.deadlineExceededWhileQueued,
311331
.executing(_, .requestHeadSent, _),
332+
.executing(_, .endForwarded, _),
312333
.executing(_, .finished, _):
313334
preconditionFailure(
314335
"A request stream can only produce, if the request was started. Invalid state: \(self.state)"
@@ -343,6 +364,7 @@ extension Transaction {
343364
case .initialized,
344365
.queued,
345366
.deadlineExceededWhileQueued,
367+
.executing(_, .endForwarded, _),
346368
.executing(_, .finished, _):
347369
preconditionFailure("Invalid state: \(self.state)")
348370

@@ -355,17 +377,38 @@ extension Transaction {
355377
.executing(let context, .paused(continuation: .none), let responseState),
356378
.executing(let context, .requestHeadSent, let responseState):
357379

358-
switch responseState {
359-
case .finished:
360-
// if the response stream has already finished before the request, we must succeed
361-
// the final continuation.
362-
self.state = .finished(error: nil)
363-
return .forwardStreamFinished(context.executor)
380+
self.state = .executing(context, .endForwarded, responseState)
381+
return .forwardStreamFinished(context.executor)
364382

365-
case .waitingForResponseHead, .streamingBody:
366-
self.state = .executing(context, .finished, responseState)
367-
return .forwardStreamFinished(context.executor)
368-
}
383+
case .finished:
384+
return .none
385+
}
386+
}
387+
388+
enum RequestBodyStreamSentAction {
389+
case none
390+
case failure(Error)
391+
}
392+
393+
mutating func requestBodyStreamSent() -> RequestBodyStreamSentAction {
394+
switch self.state {
395+
case .initialized,
396+
.queued,
397+
.deadlineExceededWhileQueued,
398+
.executing(_, .requestHeadSent, _),
399+
.executing(_, .finished, _),
400+
.executing(_, .producing, _),
401+
.executing(_, .paused, _):
402+
assertionFailure("Invalid state: \(self.state)")
403+
return .failure(HTTPClientError.internalStateFailure())
404+
405+
case .executing(_, .endForwarded, .finished):
406+
self.state = .finished(error: nil)
407+
return .none
408+
409+
case .executing(let context, .endForwarded, let responseState):
410+
self.state = .executing(context, .finished, responseState)
411+
return .none
369412

370413
case .finished:
371414
return .none
@@ -482,7 +525,7 @@ extension Transaction {
482525
switch requestState {
483526
case .finished:
484527
self.state = .finished(error: nil)
485-
case .paused, .producing, .requestHeadSent:
528+
case .paused, .producing, .requestHeadSent, .endForwarded:
486529
self.state = .executing(context, requestState, .finished)
487530
}
488531
return .finishResponseStream(source, finalBody: newChunks)
@@ -497,6 +540,15 @@ extension Transaction {
497540
}
498541
}
499542

543+
mutating func httpResponseStreamTerminated() -> FailAction {
544+
switch self.state {
545+
case .executing(_, _, .finished), .finished:
546+
return .none
547+
default:
548+
return self.fail(HTTPClientError.cancelled)
549+
}
550+
}
551+
500552
enum DeadlineExceededAction {
501553
case none
502554
case cancelSchedulerOnly(scheduler: HTTPRequestScheduler)
@@ -538,7 +590,7 @@ extension Transaction {
538590
executor: context.executor,
539591
bodyStreamContinuation: continuation
540592
)
541-
case .requestHeadSent, .finished, .producing, .paused(continuation: .none):
593+
case .requestHeadSent, .endForwarded, .finished, .producing, .paused(continuation: .none):
542594
self.state = .finished(error: error)
543595
return .cancel(
544596
requestContinuation: context.continuation,

Sources/AsyncHTTPClient/AsyncAwait/Transaction.swift

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,9 @@ extension Transaction: HTTPExecutableRequest {
206206
}
207207
}
208208

209-
func requestHeadSent() {}
209+
func requestHeadSent() {
210+
// protocol requirement. Intentionally not needed.
211+
}
210212

211213
func resumeRequestBodyStream() {
212214
let action = self.state.withLockedValue { state in
@@ -245,6 +247,19 @@ extension Transaction: HTTPExecutableRequest {
245247
}
246248
}
247249

250+
func requestBodyStreamSent() {
251+
let action = self.state.withLockedValue { state in
252+
state.requestBodyStreamSent()
253+
}
254+
255+
switch action {
256+
case .none:
257+
break
258+
case .failure(let error):
259+
self.fail(error)
260+
}
261+
}
262+
248263
// MARK: Response
249264

250265
func receiveResponseHead(_ head: HTTPResponseHead) {
@@ -302,6 +317,13 @@ extension Transaction: HTTPExecutableRequest {
302317
}
303318
}
304319

320+
func httpResponseStreamTerminated() {
321+
let action = self.state.withLockedValue { state in
322+
state.httpResponseStreamTerminated()
323+
}
324+
self.performFailAction(action)
325+
}
326+
305327
func fail(_ error: Error) {
306328
let action = self.state.withLockedValue { state in
307329
state.fail(error)
@@ -325,8 +347,12 @@ extension Transaction: HTTPExecutableRequest {
325347
requestBodyStreamContinuation?.resume(throwing: error)
326348
executor.cancelRequest(self)
327349

328-
case .failRequestStreamContinuation(let bodyStreamContinuation, let error):
350+
case .failRequestStreamContinuation(let bodyStreamContinuation, let error, let executor):
329351
bodyStreamContinuation.resume(throwing: error)
352+
executor.cancelRequest(self)
353+
354+
case .cancelExecutor(let executor):
355+
executor.cancelRequest(self)
330356
}
331357
}
332358

@@ -369,6 +395,6 @@ extension Transaction: NIOAsyncSequenceProducerDelegate {
369395

370396
@usableFromInline
371397
func didTerminate() {
372-
self.fail(HTTPClientError.cancelled)
398+
self.httpResponseStreamTerminated()
373399
}
374400
}

Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift

Lines changed: 46 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,46 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
242242
case .sendBodyPart(let part, let writePromise):
243243
context.writeAndFlush(self.wrapOutboundOut(.body(part)), promise: writePromise)
244244

245-
case .sendRequestEnd(let writePromise):
245+
case .sendRequestEnd(let writePromise, let finalAction):
246+
247+
let writePromise = writePromise ?? context.eventLoop.makePromise(of: Void.self)
248+
// We need to defer succeeding the old request to avoid ordering issues
249+
250+
writePromise.futureResult.hop(to: context.eventLoop).assumeIsolated().whenComplete { result in
251+
guard let oldRequest = self.request else {
252+
// in the meantime an error might have happened, which is why this request is
253+
// not reference anymore.
254+
return
255+
}
256+
oldRequest.requestBodyStreamSent()
257+
switch result {
258+
case .success:
259+
// If our final action is not `none`, that means we've already received
260+
// the complete response. As a result, once we've uploaded all the body parts
261+
// we need to tell the pool that the connection is idle or, if we were asked to
262+
// close when we're done, send the close. Either way, we then succeed the request
263+
264+
switch finalAction {
265+
case .none:
266+
// we must not nil out the request here, as we are still uploading the request
267+
// and therefore still need the reference to it.
268+
break
269+
270+
case .informConnectionIsIdle:
271+
self.request = nil
272+
self.onConnectionIdle()
273+
274+
case .close:
275+
self.request = nil
276+
context.close(promise: nil)
277+
}
278+
279+
case .failure(let error):
280+
context.close(promise: nil)
281+
oldRequest.fail(error)
282+
}
283+
}
284+
246285
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: writePromise)
247286

248287
if let readTimeoutAction = self.idleReadTimeoutStateMachine?.requestEndSent() {
@@ -300,7 +339,7 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
300339
// that the request is neither failed nor finished yet
301340
self.request!.receiveResponseBodyParts(buffer)
302341

303-
case .succeedRequest(let finalAction, let buffer):
342+
case .forwardResponseEnd(let finalAction, let buffer):
304343
// We can force unwrap the request here, as we have just validated in the state machine,
305344
// that the request is neither failed nor finished yet
306345

@@ -312,39 +351,20 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
312351
// other way around.
313352

314353
let oldRequest = self.request!
315-
self.request = nil
316354
self.runTimeoutAction(.clearIdleReadTimeoutTimer, context: context)
317355
self.runTimeoutAction(.clearIdleWriteTimeoutTimer, context: context)
318356

319357
switch finalAction {
320358
case .close:
359+
self.request = nil
321360
context.close(promise: nil)
322361
oldRequest.receiveResponseEnd(buffer, trailers: nil)
323-
case .sendRequestEnd(let writePromise, let shouldClose):
324-
let writePromise = writePromise ?? context.eventLoop.makePromise(of: Void.self)
325-
// We need to defer succeeding the old request to avoid ordering issues
326-
writePromise.futureResult.hop(to: context.eventLoop).assumeIsolated().whenComplete { result in
327-
switch result {
328-
case .success:
329-
// If our final action was `sendRequestEnd`, that means we've already received
330-
// the complete response. As a result, once we've uploaded all the body parts
331-
// we need to tell the pool that the connection is idle or, if we were asked to
332-
// close when we're done, send the close. Either way, we then succeed the request
333-
if shouldClose {
334-
context.close(promise: nil)
335-
} else {
336-
self.onConnectionIdle()
337-
}
338-
339-
oldRequest.receiveResponseEnd(buffer, trailers: nil)
340-
case .failure(let error):
341-
context.close(promise: nil)
342-
oldRequest.fail(error)
343-
}
344-
}
345362

346-
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: writePromise)
363+
case .none:
364+
oldRequest.receiveResponseEnd(buffer, trailers: nil)
365+
347366
case .informConnectionIsIdle:
367+
self.request = nil
348368
self.onConnectionIdle()
349369
oldRequest.receiveResponseEnd(buffer, trailers: nil)
350370
}

0 commit comments

Comments
 (0)