From d73822bc2442fb13143a9f433c36d25a3c2e165c Mon Sep 17 00:00:00 2001 From: Doug Patti Date: Sun, 12 Jan 2020 20:37:24 -0500 Subject: [PATCH 1/7] refactor request queue mechanics This is a prelude to #159 which introduces upgrade requests, with a few major changes in `Server_connection`. The goals here is to try to make queue management easier to reason about by folding bits of logic from `advance_request_queue_if_necessary` into `next_read_operation` and `next_write_operation` such that we only perform side-effects when the operation in question demands it. One of the ways I tried to make this easier to reason about was to make the `next__operation` functions very parallel. Getting the read operation starts out with a short-circuit for shutting down when the server can no longer make progress (reader is closed and queue is empty). This doesn't feel like it belongs here. Perhaps this check should be part of `advance_request_queue` with some extra logic triggering in `shutdown_reader`? After that, the next-operation functions use some very simple probing of the input/output state of `Reqd` to determine what to do next. Only in the case of `Complete` do we move into a separate function (to make it easier to read): `_final__operation`. In these functions, we decide if we should shutdown the respective reader/writer or consider the `reqd` complete and move it off the queue. What's happening is that we don't know if the write action or read action will be last, so each function checks the state of the other to see if they're both complete. When we do shift it off, we recursively ask for the next operation given the new queue state. In the case of the writer triggering the advancing, before we return the result, we wakeup the reader so that it can evaluate the next operation given the new queue state. Note that in the case of a non-persistent connection, the queue is never advanced and the connection is shut down when both sides are done. Though on the surface, these pieces feel fairly straightforward, there are still a slew of re-entrancy bugs to consider. I think there are two things that we can do to make this drastically easier to manage: 1. We call `t.request_handler` in two places, and this is mostly because we want to keep the invariant that the head of the request queue has already been passed off to the handler. I feel like splitting this up into a simple queue of unhandled requests and a [Reqd.t option] that represents the current request would be easier to manage. 2. It would be nice to schedule calls. Things like waking up the writer before you let the read loop know its next operation just immediately makes my mind fall apart and lose track of state. There's a fairly obvious solution of asking for a `schedule : (unit -> unit) -> unit` function from the runtime that promises to not call the thunk synchronously, but rather waits until it is outside of the read and write loops. But maybe we can solve it using what we have now, like establishing a contract that when the reader/writer is woken up, they must schedule their work for a fresh call stack and not immediately ask for operations. --- lib/reqd.ml | 9 ---- lib/server_connection.ml | 110 +++++++++++++++++++++++---------------- 2 files changed, 64 insertions(+), 55 deletions(-) diff --git a/lib/reqd.ml b/lib/reqd.ml index 106c06f4..8277a637 100644 --- a/lib/reqd.ml +++ b/lib/reqd.ml @@ -243,15 +243,6 @@ let output_state t : Output_state.t = | Waiting -> Waiting ;; -let is_complete t = - match input_state t with - | Ready -> false - | Complete -> - (match output_state t with - | Waiting | Ready -> false - | Complete -> true) -;; - let flush_request_body t = let request_body = request_body t in if Body.has_pending_output request_body diff --git a/lib/server_connection.ml b/lib/server_connection.ml index bf358e42..959d542d 100644 --- a/lib/server_connection.ml +++ b/lib/server_connection.ml @@ -81,7 +81,7 @@ let current_reqd_exn t = let yield_reader t k = if is_closed t - then failwith "on_wakeup_reader on closed conn" + then failwith "yield_reader on closed conn" else if Optional_thunk.is_some t.wakeup_reader then failwith "yield_reader: only one callback can be registered at a time" else t.wakeup_reader <- Optional_thunk.some k @@ -155,6 +155,7 @@ let error_code t = else None let shutdown t = + Queue.clear t.request_queue; shutdown_reader t; shutdown_writer t; wakeup_reader t; @@ -182,53 +183,44 @@ let set_error_and_handle ?request t error = let report_exn t exn = set_error_and_handle t (`Exn exn) -let advance_request_queue_if_necessary t = - if is_active t then begin - let reqd = current_reqd_exn t in - if Reqd.persistent_connection reqd then begin - if Reqd.is_complete reqd then begin - ignore (Queue.take t.request_queue); - if not (Queue.is_empty t.request_queue) - then t.request_handler (current_reqd_exn t); - wakeup_reader t; - end - end else begin - (* Take the head of the queue, close the remaining request bodies, clear - * the queue, and push the head back on. We do not plan on processing any - * more requests after the current one. *) - ignore (Queue.take t.request_queue); - Queue.iter Reqd.close_request_body t.request_queue; - Queue.clear t.request_queue; - Queue.push reqd t.request_queue; - if Reqd.is_complete reqd - then shutdown t - else - match Reqd.input_state reqd with - | Ready -> () - | Complete -> shutdown_reader t - end - end else if Reader.is_closed t.reader - then shutdown t - -let _next_read_operation t = - advance_request_queue_if_necessary t; - if is_active t - then ( +let advance_request_queue t = + ignore (Queue.take t.request_queue); + if not (Queue.is_empty t.request_queue) + then t.request_handler (Queue.peek_exn t.request_queue); +;; + +let rec _next_read_operation t = + if not (is_active t) then ( + if Reader.is_closed t.reader + then shutdown t; + Reader.next t.reader + ) else ( let reqd = current_reqd_exn t in match Reqd.input_state reqd with | Ready -> Reader.next t.reader - | Complete -> - if Reqd.persistent_connection reqd - then `Yield - else ( - shutdown_reader t; - Reader.next t.reader) + | Complete -> _final_read_operation_for t reqd ) - else Reader.next t.reader + +and _final_read_operation_for t reqd = + let next = + if not (Reqd.persistent_connection reqd) then ( + shutdown_reader t; + Reader.next t.reader; + ) else ( + match Reqd.output_state reqd with + | Waiting | Ready -> `Yield + | Complete -> + advance_request_queue t; + _next_read_operation t; + ) + in + wakeup_writer t; + next ;; let next_read_operation t = match _next_read_operation t with + (* XXX(dpatti): These two [`Error _] constructors are never returned *) | `Error (`Parse _) -> set_error_and_handle t `Bad_request; `Close | `Error (`Bad_request request) -> set_error_and_handle ~request t `Bad_request; `Close | (`Read | `Yield | `Close) as operation -> operation @@ -259,13 +251,39 @@ let read t bs ~off ~len = let read_eof t bs ~off ~len = read_with_more t bs ~off ~len Complete -let next_write_operation t = - advance_request_queue_if_necessary t; - if is_active t - then ( +let rec _next_write_operation t = + if not (is_active t) then ( + if Reader.is_closed t.reader + then shutdown t; + Writer.next t.writer + ) else ( let reqd = current_reqd_exn t in - Reqd.flush_response_body reqd); - Writer.next t.writer + match Reqd.output_state reqd with + | Waiting -> `Yield + | Ready -> + Reqd.flush_response_body reqd; + Writer.next t.writer + | Complete -> _final_write_operation_for t reqd + ) + +and _final_write_operation_for t reqd = + let next = + if not (Reqd.persistent_connection reqd) then ( + shutdown_writer t; + Writer.next t.writer; + ) else ( + match Reqd.input_state reqd with + | Ready -> assert false + | Complete -> + advance_request_queue t; + _next_write_operation t; + ) + in + wakeup_reader t; + next +;; + +let next_write_operation t = _next_write_operation t let report_write_result t result = Writer.report_result t.writer result From 94e4480366e61f4d31c6749608f654681d490372 Mon Sep 17 00:00:00 2001 From: Doug Patti Date: Sat, 16 May 2020 15:11:13 -0400 Subject: [PATCH 2/7] refactor-request-queue: read loop no longer needs to wake writer This is because the writer is always woken by the appropriate calls that push chunks onto the body or writer or calls that close the body. Had to import an additional line from a recent band-aid fix regarding setting the flag on non-chunked streaming responses. It feels like we should find an alternative means of maintaining this piece of information. --- lib/server_connection.ml | 31 ++++++++++++++++--------------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/lib/server_connection.ml b/lib/server_connection.ml index 959d542d..db6f08ef 100644 --- a/lib/server_connection.ml +++ b/lib/server_connection.ml @@ -202,20 +202,16 @@ let rec _next_read_operation t = ) and _final_read_operation_for t reqd = - let next = - if not (Reqd.persistent_connection reqd) then ( - shutdown_reader t; - Reader.next t.reader; - ) else ( - match Reqd.output_state reqd with - | Waiting | Ready -> `Yield - | Complete -> - advance_request_queue t; - _next_read_operation t; - ) - in - wakeup_writer t; - next + if not (Reqd.persistent_connection reqd) then ( + shutdown_reader t; + Reader.next t.reader; + ) else ( + match Reqd.output_state reqd with + | Waiting | Ready -> `Yield + | Complete -> + advance_request_queue t; + _next_read_operation t; + ) ;; let next_read_operation t = @@ -259,7 +255,12 @@ let rec _next_write_operation t = ) else ( let reqd = current_reqd_exn t in match Reqd.output_state reqd with - | Waiting -> `Yield + | Waiting -> + (* XXX(dpatti): I don't think we should need to call this, but it is + necessary in the case of a streaming, non-chunked body so that you can + set the appropriate flag. *) + Reqd.flush_response_body reqd; + Writer.next t.writer | Ready -> Reqd.flush_response_body reqd; Writer.next t.writer From cce55fd5e3b3d91e821ffddabbe9bf6e43f58dd2 Mon Sep 17 00:00:00 2001 From: Doug Patti Date: Fri, 2 Apr 2021 15:58:19 -0400 Subject: [PATCH 3/7] refactor-request-queue: fixes We basically never want to call `Queue.clear` because the head of the queue has special semantic meaning. Instead, we never try to clear the queue and rely on the fact that the queue will never be advanced. This is easy to reason about because the only time we advance the request queue is when the current request is not persistent. I added an explicit test of this situation to build confidence. Additionally, there was an incorrect assertion that you couldn't finish a write with reads still pending. A test was added upstream and it no longer fails with this fix. The final change was some subtle but unused code. In the write loop, we have something that decides to shutdown the connection if the reader is closed, parallel to the next read operation. But this felt weird, the reader should always be awake in the case that it is closed, which means that either 1) it will shutdown the connection or 2) it will wait for the writer, which will wake the reader once it's advanced the request queue, and then it will shutdown the connection. --- lib/server_connection.ml | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/lib/server_connection.ml b/lib/server_connection.ml index db6f08ef..f81cd5bd 100644 --- a/lib/server_connection.ml +++ b/lib/server_connection.ml @@ -155,7 +155,6 @@ let error_code t = else None let shutdown t = - Queue.clear t.request_queue; shutdown_reader t; shutdown_writer t; wakeup_reader t; @@ -190,7 +189,8 @@ let advance_request_queue t = ;; let rec _next_read_operation t = - if not (is_active t) then ( + if not (is_active t) + then ( if Reader.is_closed t.reader then shutdown t; Reader.next t.reader @@ -216,7 +216,6 @@ and _final_read_operation_for t reqd = let next_read_operation t = match _next_read_operation t with - (* XXX(dpatti): These two [`Error _] constructors are never returned *) | `Error (`Parse _) -> set_error_and_handle t `Bad_request; `Close | `Error (`Bad_request request) -> set_error_and_handle ~request t `Bad_request; `Close | (`Read | `Yield | `Close) as operation -> operation @@ -248,11 +247,9 @@ let read_eof t bs ~off ~len = read_with_more t bs ~off ~len Complete let rec _next_write_operation t = - if not (is_active t) then ( - if Reader.is_closed t.reader - then shutdown t; - Writer.next t.writer - ) else ( + if not (is_active t) + then Writer.next t.writer + else ( let reqd = current_reqd_exn t in match Reqd.output_state reqd with | Waiting -> @@ -274,7 +271,7 @@ and _final_write_operation_for t reqd = Writer.next t.writer; ) else ( match Reqd.input_state reqd with - | Ready -> assert false + | Ready -> Writer.next t.writer; | Complete -> advance_request_queue t; _next_write_operation t; From cad867785da525d0637511b427d845dd6737cf5e Mon Sep 17 00:00:00 2001 From: David House Date: Thu, 29 Apr 2021 19:42:09 +0100 Subject: [PATCH 4/7] Don't hang the reader thread after write eof In many runtimes, there are separate reader and writer threads that drive the reading and writing from httpaf independently. So a thing that can happen is: - A request arrives. - The response handler begins but does not finish responding to it. - The writer thread calls [Server_connection.report_write_result `Closed]. - The reader thread delivers another request. In this case, httpaf will never deliver the second request to the handler, because the first request's output_state never gets to Complete. We have no hope of responding to the second request, but we should still deliver it to the handler in case the request has side-effects (e.g. as many POST requests do). This PR fixes this by noticing when the writer is closed in [Reqd.output_state] and just always returning [Complete] in this case, as no more output progress can be made. --- lib/reqd.ml | 9 +++++++-- lib_test/test_server_connection.ml | 24 ++++++++++++++++++++++++ 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/lib/reqd.ml b/lib/reqd.ml index 8277a637..2e9d4b8a 100644 --- a/lib/reqd.ml +++ b/lib/reqd.ml @@ -235,12 +235,17 @@ let output_state t : Output_state.t = match t.response_state with | Fixed _ -> Complete | Streaming (_, response_body) -> - if Body.has_pending_output response_body + if Writer.is_closed t.writer + then Complete + else if Body.has_pending_output response_body then Ready else if Body.is_closed response_body then Complete else Waiting - | Waiting -> Waiting + | Waiting -> + if Writer.is_closed t.writer + then Complete + else Waiting ;; let flush_request_body t = diff --git a/lib_test/test_server_connection.ml b/lib_test/test_server_connection.ml index e1b0e696..6783db65 100644 --- a/lib_test/test_server_connection.ml +++ b/lib_test/test_server_connection.ml @@ -894,6 +894,29 @@ let test_response_finished_before_body_read () = write_response t response ~body:"done"; ;; +let test_can_read_more_requests_after_write_eof () = + let request = Request.create `GET "/" ~headers:(Headers.encoding_fixed 0) in + let reqd = ref None in + let request_handler reqd' = reqd := Some reqd' in + let t = create request_handler in + let response = Response.create `OK ~headers:Headers.encoding_chunked in + read_request t request; + Reqd.respond_with_streaming (Option.get !reqd) response ~flush_headers_immediately:true + |> (ignore : [ `write ] Body.t -> unit); + write_eof t; + (* In many runtimes, there are separate reader and writer threads that drive the reading + and writing from httpaf independently. So just because the writer thread has told us + that the socket is closed doesn't mean we won't get a bunch more requests delivered + to us from the reader thread. We should be ready to receive them, and call the + request handler for them, even if there is no possibility of writing responses (e.g. + those requests might be side-effecting requests like POST requests). *) + reqd := None; + reader_ready t; + read_request t request; + Alcotest.(check bool) "request handler fired" true (Option.is_some !reqd) +;; + + let tests = [ "initial reader state" , `Quick, test_initial_reader_state ; "shutdown reader closed", `Quick, test_reader_is_closed_after_eof @@ -922,4 +945,5 @@ let tests = ; "multiple requests with connection close", `Quick, test_multiple_requests_in_single_read_with_close ; "parse failure after checkpoint", `Quick, test_parse_failure_after_checkpoint ; "response finished before body read", `Quick, test_response_finished_before_body_read + ; "can read more requests after write eof", `Quick, test_can_read_more_requests_after_write_eof ] From e5fd51f1cd1cdf91de0a8e8df693b7b16951ebb8 Mon Sep 17 00:00:00 2001 From: David House Date: Thu, 29 Apr 2021 19:42:09 +0100 Subject: [PATCH 5/7] Don't hang the reader thread after write eof In many runtimes, there are separate reader and writer threads that drive the reading and writing from httpaf independently. So a thing that can happen is: - A request arrives. - The response handler begins but does not finish responding to it. - The writer thread calls [Server_connection.report_write_result `Closed]. - The reader thread delivers another request. In this case, httpaf will never deliver the second request to the handler, because the first request's output_state never gets to Complete. We have no hope of responding to the second request, but we should still deliver it to the handler in case the request has side-effects (e.g. as many POST requests do). This PR fixes this by noticing when the writer is closed in [Reqd.output_state] and just always returning [Complete] in this case, as no more output progress can be made. --- lib/reqd.ml | 9 +++++++-- lib_test/test_server_connection.ml | 23 +++++++++++++++++++++++ 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/lib/reqd.ml b/lib/reqd.ml index 8277a637..2e9d4b8a 100644 --- a/lib/reqd.ml +++ b/lib/reqd.ml @@ -235,12 +235,17 @@ let output_state t : Output_state.t = match t.response_state with | Fixed _ -> Complete | Streaming (_, response_body) -> - if Body.has_pending_output response_body + if Writer.is_closed t.writer + then Complete + else if Body.has_pending_output response_body then Ready else if Body.is_closed response_body then Complete else Waiting - | Waiting -> Waiting + | Waiting -> + if Writer.is_closed t.writer + then Complete + else Waiting ;; let flush_request_body t = diff --git a/lib_test/test_server_connection.ml b/lib_test/test_server_connection.ml index e1b0e696..d47c62af 100644 --- a/lib_test/test_server_connection.ml +++ b/lib_test/test_server_connection.ml @@ -894,6 +894,28 @@ let test_response_finished_before_body_read () = write_response t response ~body:"done"; ;; +let test_can_read_more_requests_after_write_eof () = + let request = Request.create `GET "/" ~headers:(Headers.encoding_fixed 0) in + let reqd = ref None in + let request_handler reqd' = reqd := Some reqd' in + let t = create request_handler in + let response = Response.create `OK ~headers:Headers.encoding_chunked in + read_request t request; + Reqd.respond_with_streaming (Option.get !reqd) response ~flush_headers_immediately:true + |> (ignore : [ `write ] Body.t -> unit); + write_eof t; + (* In many runtimes, there are separate reader and writer threads that drive the reading + and writing from httpaf independently. So just because the writer thread has told us + that the socket is closed doesn't mean we won't get a bunch more requests delivered + to us from the reader thread. We should be ready to receive them, and call the + request handler for them, even if there is no possibility of writing responses (e.g. + those requests might be side-effecting requests like POST requests). *) + reqd := None; + reader_ready t; + read_request t request; + Alcotest.(check bool) "request handler fired" true (Option.is_some !reqd) +;; + let tests = [ "initial reader state" , `Quick, test_initial_reader_state ; "shutdown reader closed", `Quick, test_reader_is_closed_after_eof @@ -922,4 +944,5 @@ let tests = ; "multiple requests with connection close", `Quick, test_multiple_requests_in_single_read_with_close ; "parse failure after checkpoint", `Quick, test_parse_failure_after_checkpoint ; "response finished before body read", `Quick, test_response_finished_before_body_read + ; "can read more requests after write eof", `Quick, test_can_read_more_requests_after_write_eof ] From ed4780bfccea8685b43ed1487c1d4d1ab6608a59 Mon Sep 17 00:00:00 2001 From: Doug Patti Date: Wed, 2 Jun 2021 19:03:32 -0400 Subject: [PATCH 6/7] demonstrate writing to closed writer --- lib_test/test_server_connection.ml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib_test/test_server_connection.ml b/lib_test/test_server_connection.ml index d47c62af..f55af7ac 100644 --- a/lib_test/test_server_connection.ml +++ b/lib_test/test_server_connection.ml @@ -913,6 +913,8 @@ let test_can_read_more_requests_after_write_eof () = reqd := None; reader_ready t; read_request t request; + Reqd.respond_with_streaming (Option.get !reqd) response ~flush_headers_immediately:true + |> (ignore : [ `write ] Body.t -> unit); Alcotest.(check bool) "request handler fired" true (Option.is_some !reqd) ;; From edfb152dbf6c67e9346a6664bbd39990636fbc51 Mon Sep 17 00:00:00 2001 From: David House Date: Thu, 3 Jun 2021 07:42:36 +0100 Subject: [PATCH 7/7] cosmetic --move test to end of list --- lib_test/test_server_connection.ml | 50 +++++++++++++++--------------- 1 file changed, 25 insertions(+), 25 deletions(-) diff --git a/lib_test/test_server_connection.ml b/lib_test/test_server_connection.ml index e08832ac..78ae333a 100644 --- a/lib_test/test_server_connection.ml +++ b/lib_test/test_server_connection.ml @@ -914,30 +914,6 @@ let test_response_finished_before_body_read () = write_response t response ~body:"done"; ;; -let test_can_read_more_requests_after_write_eof () = - let request = Request.create `GET "/" ~headers:(Headers.encoding_fixed 0) in - let reqd = ref None in - let request_handler reqd' = reqd := Some reqd' in - let t = create request_handler in - let response = Response.create `OK ~headers:Headers.encoding_chunked in - read_request t request; - Reqd.respond_with_streaming (Option.get !reqd) response ~flush_headers_immediately:true - |> (ignore : [ `write ] Body.t -> unit); - write_eof t; - (* In many runtimes, there are separate reader and writer threads that drive the reading - and writing from httpaf independently. So just because the writer thread has told us - that the socket is closed doesn't mean we won't get a bunch more requests delivered - to us from the reader thread. We should be ready to receive them, and call the - request handler for them, even if there is no possibility of writing responses (e.g. - those requests might be side-effecting requests like POST requests). *) - reqd := None; - reader_ready t; - read_request t request; - Reqd.respond_with_streaming (Option.get !reqd) response ~flush_headers_immediately:true - |> (ignore : [ `write ] Body.t -> unit); - Alcotest.(check bool) "request handler fired" true (Option.is_some !reqd) -;; - let test_shutdown_in_request_handler () = let request = Request.create `GET "/" in let rec t = @@ -1005,6 +981,30 @@ let test_schedule_read_with_data_available () = write_response t response; ;; +let test_can_read_more_requests_after_write_eof () = + let request = Request.create `GET "/" ~headers:(Headers.encoding_fixed 0) in + let reqd = ref None in + let request_handler reqd' = reqd := Some reqd' in + let t = create request_handler in + let response = Response.create `OK ~headers:Headers.encoding_chunked in + read_request t request; + Reqd.respond_with_streaming (Option.get !reqd) response ~flush_headers_immediately:true + |> (ignore : [ `write ] Body.t -> unit); + write_eof t; + (* In many runtimes, there are separate reader and writer threads that drive the reading + and writing from httpaf independently. So just because the writer thread has told us + that the socket is closed doesn't mean we won't get a bunch more requests delivered + to us from the reader thread. We should be ready to receive them, and call the + request handler for them, even if there is no possibility of writing responses (e.g. + those requests might be side-effecting requests like POST requests). *) + reqd := None; + reader_ready t; + read_request t request; + Reqd.respond_with_streaming (Option.get !reqd) response ~flush_headers_immediately:true + |> (ignore : [ `write ] Body.t -> unit); + Alcotest.(check bool) "request handler fired" true (Option.is_some !reqd) +;; + let tests = [ "initial reader state" , `Quick, test_initial_reader_state ; "shutdown reader closed", `Quick, test_reader_is_closed_after_eof @@ -1034,8 +1034,8 @@ let tests = ; "multiple requests with eof", `Quick, test_multiple_requests_in_single_read_with_eof ; "parse failure after checkpoint", `Quick, test_parse_failure_after_checkpoint ; "response finished before body read", `Quick, test_response_finished_before_body_read - ; "can read more requests after write eof", `Quick, test_can_read_more_requests_after_write_eof ; "shutdown in request handler", `Quick, test_shutdown_in_request_handler ; "shutdown during asynchronous request", `Quick, test_shutdown_during_asynchronous_request ; "schedule read with data available", `Quick, test_schedule_read_with_data_available + ; "can read more requests after write eof", `Quick, test_can_read_more_requests_after_write_eof ]