From 6250ee34c950b4b47bb7545582ac0b8752fbc7bd Mon Sep 17 00:00:00 2001 From: Doug Patti Date: Sun, 12 Jan 2020 20:37:24 -0500 Subject: [PATCH] refactor request queue mechanics This is a prelude to #159 which introduces upgrade requests, and much of the diff is identical, with a few trivial changes in `Reqd` and a few major changes in `Server_connection`. The goals here are: 1. Make `Reqd` return better types that encapsulate its state instead of requiring the user to probe it with `requires_` and `is_complete`. 2. Try to make queue management easier to reason about by folding bits of logic from `advance_request_queue_if_necessary` into `next_read_operation` and `next_write_operation` such that we only perform side-effects when the operation in question demands it. One of the ways I tried to make this easier to reason about was to make the `yield_` and `next__operation` functions very parallel. For one, the extra logic in `yield_writer` was puzzling. Ideally, if you're calling `yield_writer`, you're doing so because you just called `next_action` and were told to `Yield`, so all of the conditions being checked should not be possible. Looking at the next-operation functions, they both start out with a short-circuit for shutting down when the server can no longer make progress (reader is closed and queue is empty). This doesn't feel like it belongs here. Perhaps this check should be part of `advance_request_queue` with some extra logic triggering in `shutdown_reader`? After that, the next-operation functions use some very simple probing of the input/output state of `Reqd` to determine what to do next. Only in the case of `Complete` do we move into a separate function (to make it easier to read): `_final__operation`. In these functions, we decide if we should shutdown the respective reader/writer or consider the `reqd` complete and move it off the queue. When we do shift it off, we recursively ask for the next operation given the new queue state. In all cases, before we return the result, we wakeup the other side so that it too can evaluate the next operation given the new queue state. Though on the surface, these pieces feel fairly straightforward, there are still a slew of re-entrancy bugs to consider. I think there are two things that we can do to make this drastically easier to manage: 1. We call `t.request_handler` in two places, and this is mostly because we want to keep the invariant that the head of the request queue has already been passed off to the handler. I feel like splitting this up into a simple queue of unhandled requests and a [Reqd.t option] that represents the current request would be easier to manage. 2. It would be nice to schedule calls. Things like waking up the writer before you let the read loop know its next operation just immediately makes my mind fall apart and lose track of state. There's a fairly obvious solution of asking for a `schedule : (unit -> unit) -> unit` function from the runtime that promises to not call the thunk synchronously, but rather waits until it is outside of the read and write loops. But maybe we can solve it using what we have now, like establishing a contract that when the reader/writer is woken up, they must schedule their work for a fresh call stack and not immediately ask for operations. I added a `Queue.clear` to shutdown, not because it was necessary in any sense, but because it was part of `advance_request_queue_if_necessary`, which could have come into play in certain situations where `shutdown` was called from the runtime (e.g., in case of some exception). I would like to note that despite the fact that all tests pass, I have very little confidence in this being correct right now and would like to do some further testing within the actual runtimes. --- lib/reqd.ml | 9 ---- lib/server_connection.ml | 110 +++++++++++++++++++++++---------------- 2 files changed, 64 insertions(+), 55 deletions(-) diff --git a/lib/reqd.ml b/lib/reqd.ml index 106c06f4..8277a637 100644 --- a/lib/reqd.ml +++ b/lib/reqd.ml @@ -243,15 +243,6 @@ let output_state t : Output_state.t = | Waiting -> Waiting ;; -let is_complete t = - match input_state t with - | Ready -> false - | Complete -> - (match output_state t with - | Waiting | Ready -> false - | Complete -> true) -;; - let flush_request_body t = let request_body = request_body t in if Body.has_pending_output request_body diff --git a/lib/server_connection.ml b/lib/server_connection.ml index bf358e42..959d542d 100644 --- a/lib/server_connection.ml +++ b/lib/server_connection.ml @@ -81,7 +81,7 @@ let current_reqd_exn t = let yield_reader t k = if is_closed t - then failwith "on_wakeup_reader on closed conn" + then failwith "yield_reader on closed conn" else if Optional_thunk.is_some t.wakeup_reader then failwith "yield_reader: only one callback can be registered at a time" else t.wakeup_reader <- Optional_thunk.some k @@ -155,6 +155,7 @@ let error_code t = else None let shutdown t = + Queue.clear t.request_queue; shutdown_reader t; shutdown_writer t; wakeup_reader t; @@ -182,53 +183,44 @@ let set_error_and_handle ?request t error = let report_exn t exn = set_error_and_handle t (`Exn exn) -let advance_request_queue_if_necessary t = - if is_active t then begin - let reqd = current_reqd_exn t in - if Reqd.persistent_connection reqd then begin - if Reqd.is_complete reqd then begin - ignore (Queue.take t.request_queue); - if not (Queue.is_empty t.request_queue) - then t.request_handler (current_reqd_exn t); - wakeup_reader t; - end - end else begin - (* Take the head of the queue, close the remaining request bodies, clear - * the queue, and push the head back on. We do not plan on processing any - * more requests after the current one. *) - ignore (Queue.take t.request_queue); - Queue.iter Reqd.close_request_body t.request_queue; - Queue.clear t.request_queue; - Queue.push reqd t.request_queue; - if Reqd.is_complete reqd - then shutdown t - else - match Reqd.input_state reqd with - | Ready -> () - | Complete -> shutdown_reader t - end - end else if Reader.is_closed t.reader - then shutdown t - -let _next_read_operation t = - advance_request_queue_if_necessary t; - if is_active t - then ( +let advance_request_queue t = + ignore (Queue.take t.request_queue); + if not (Queue.is_empty t.request_queue) + then t.request_handler (Queue.peek_exn t.request_queue); +;; + +let rec _next_read_operation t = + if not (is_active t) then ( + if Reader.is_closed t.reader + then shutdown t; + Reader.next t.reader + ) else ( let reqd = current_reqd_exn t in match Reqd.input_state reqd with | Ready -> Reader.next t.reader - | Complete -> - if Reqd.persistent_connection reqd - then `Yield - else ( - shutdown_reader t; - Reader.next t.reader) + | Complete -> _final_read_operation_for t reqd ) - else Reader.next t.reader + +and _final_read_operation_for t reqd = + let next = + if not (Reqd.persistent_connection reqd) then ( + shutdown_reader t; + Reader.next t.reader; + ) else ( + match Reqd.output_state reqd with + | Waiting | Ready -> `Yield + | Complete -> + advance_request_queue t; + _next_read_operation t; + ) + in + wakeup_writer t; + next ;; let next_read_operation t = match _next_read_operation t with + (* XXX(dpatti): These two [`Error _] constructors are never returned *) | `Error (`Parse _) -> set_error_and_handle t `Bad_request; `Close | `Error (`Bad_request request) -> set_error_and_handle ~request t `Bad_request; `Close | (`Read | `Yield | `Close) as operation -> operation @@ -259,13 +251,39 @@ let read t bs ~off ~len = let read_eof t bs ~off ~len = read_with_more t bs ~off ~len Complete -let next_write_operation t = - advance_request_queue_if_necessary t; - if is_active t - then ( +let rec _next_write_operation t = + if not (is_active t) then ( + if Reader.is_closed t.reader + then shutdown t; + Writer.next t.writer + ) else ( let reqd = current_reqd_exn t in - Reqd.flush_response_body reqd); - Writer.next t.writer + match Reqd.output_state reqd with + | Waiting -> `Yield + | Ready -> + Reqd.flush_response_body reqd; + Writer.next t.writer + | Complete -> _final_write_operation_for t reqd + ) + +and _final_write_operation_for t reqd = + let next = + if not (Reqd.persistent_connection reqd) then ( + shutdown_writer t; + Writer.next t.writer; + ) else ( + match Reqd.input_state reqd with + | Ready -> assert false + | Complete -> + advance_request_queue t; + _next_write_operation t; + ) + in + wakeup_reader t; + next +;; + +let next_write_operation t = _next_write_operation t let report_write_result t result = Writer.report_result t.writer result