Skip to content

Commit

Permalink
refactor request queue mechanics
Browse files Browse the repository at this point in the history
This is a prelude to #159 which introduces streaming requests, and much
of the diff is identical, with a few trivial changes in `Reqd` and a few
major changes in `Server_connection`.

The goals here are:

1. Make `Reqd` return better types that encapsulate its state instead of
   requiring the user to probe it with `requires_<input|output>` and
   `is_complete`.

2. Try to make queue management easier to reason about by folding bits
   of logic from `advance_request_queue_if_necessary` into
   `next_read_operation` and `next_write_operation` such that we only
   perform side-effects when the operation in question demands it.

One of the ways I tried to make this easier to reason about was to make
the `yield_<reader|writer>` and `next_<read|write>_operation` functions
very parallel. For one, the extra logic in `yield_writer` was puzzling.
Ideally, if you're calling `yield_writer`, you're doing so because you
just called `next_action` and were told to `Yield`, so all of the
conditions being checked should not be possible.

Looking at the next-operation functions, they both start out with a
short-circuit for shutting down when the server can no longer make
progress (reader is closed and queue is empty). This doesn't feel like
it belongs here. Perhaps this check should be part of
`advance_request_queue` with some extra logic triggering in
`shutdown_reader`? After that, the next-operation functions use some
very simple probing of the input/output state of `Reqd` to determine
what to do next. Only in the case of `Complete` do we move into a
separate function (to make it easier to read):
`_final_<read|write>_operation`.

In these functions, we decide if we should shutdown the respective
reader/writer or consider the `reqd` complete and move it off the queue.
When we do shift it off, we recursively ask for the next operation given
the new queue state. In all cases, before we return the result, we
wakeup the other side so that it too can evaluate the next operation
given the new queue state.

Though on the surface, these pieces feel fairly straightforward, there
are still a slew of re-entrancy bugs to consider. I think there are two
things that we can do to make this drastically easier to manage:

1. We call `t.request_handler` in two places, and this is mostly because
   we want to keep the invariant that the head of the request queue has
   already been passed off to the handler. I feel like splitting this up
   into a simple queue of unhandled requests and a [Reqd.t option] that
   represents the current request would be easier to manage.

2. It would be nice to schedule calls. Things like waking up the writer
   before you let the read loop know its next operation just immediately
   makes my mind fall apart and lose track of state. There's a fairly
   obvious solution of asking for a `schedule : (unit -> unit) -> unit`
   function from the runtime that promises to not call the thunk
   synchronously, but rather waits until it is outside of the read and
   write loops. But maybe we can solve it using what we have now, like
   establishing a contract that when the reader/writer is woken up, they
   must schedule their work for a fresh call stack and not immediately
   ask for operations.

I added a `Queue.clear` to shutdown, not because it was necessary in any
sense, but because it was part of `advance_request_queue_if_necessary`,
which could have come into play in certain situations where `shutdown`
was called from the runtime (e.g., in case of some exception).

I would like to note that despite the fact that all tests pass, I have
very little confidence in this being correct right now and would like to
do some further testing within the actual runtimes.
  • Loading branch information
dpatti committed Apr 12, 2020
1 parent 16a2486 commit e6980ca
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 73 deletions.
57 changes: 37 additions & 20 deletions lib/reqd.ml
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,25 @@
type error =
[ `Bad_request | `Bad_gateway | `Internal_server_error | `Exn of exn ]

type response_state =
| Waiting of Optional_thunk.t ref
| Complete of Response.t
| Streaming of Response.t * [`write] Body.t
module Response_state = struct
type t =
| Waiting of Optional_thunk.t ref
| Complete of Response.t
| Streaming of Response.t * [`write] Body.t
end

module Input_state = struct
type t =
| Provide
| Complete
end

module Output_state = struct
type t =
| Consume
| Wait
| Complete
end

type error_handler =
?request:Request.t -> error -> (Headers.t -> [`write] Body.t) -> unit
Expand Down Expand Up @@ -74,7 +89,7 @@ type t =
; response_body_buffer : Bigstringaf.t
; error_handler : error_handler
; mutable persistent : bool
; mutable response_state : response_state
; mutable response_state : Response_state.t
; mutable error_code : [`Ok | error ]
}

Expand All @@ -101,21 +116,21 @@ let response { response_state; _ } =
match response_state with
| Waiting _ -> None
| Streaming(response, _)
| Complete (response) -> Some response
| Complete response -> Some response

let response_exn { response_state; _ } =
match response_state with
| Waiting _ -> failwith "httpaf.Reqd.response_exn: response has not started"
| Streaming(response, _)
| Complete (response) -> response
| Complete response -> response

let respond_with_string t response str =
if t.error_code <> `Ok then
failwith "httpaf.Reqd.respond_with_string: invalid state, currently handling error";
match t.response_state with
| Waiting when_done_waiting ->
(* XXX(seliopou): check response body length *)
Writer.write_response t.writer response;
Writer.write_response t.writer response;
Writer.write_string t.writer str;
if t.persistent then
t.persistent <- Response.persistent_connection response;
Expand Down Expand Up @@ -222,19 +237,21 @@ let on_more_output_available t f =
let persistent_connection t =
t.persistent

let requires_input { request_body; _ } =
not (Body.is_closed request_body)
let input_state t : Input_state.t =
if Body.is_closed t.request_body
then Complete
else Provide

let requires_output { response_state; _ } =
match response_state with
| Complete _ -> false
| Streaming (_, response_body) ->
not (Body.is_closed response_body)
|| Body.has_pending_output response_body
| Waiting _ -> true

let is_complete t =
not (requires_input t || requires_output t)
let output_state t : Output_state.t =
match t.response_state with
| Complete _ -> Complete
| Waiting _ -> Wait
| Streaming(_, response_body) ->
if Body.has_pending_output response_body
then Consume
else if Body.is_closed response_body
then Complete
else Wait

let flush_request_body t =
let request_body = request_body t in
Expand Down
123 changes: 70 additions & 53 deletions lib/server_connection.ml
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ let current_reqd_exn t =

let yield_reader t k =
if is_closed t
then failwith "on_wakeup_reader on closed conn"
then failwith "yield_reader on closed conn"
else if Optional_thunk.is_some t.wakeup_reader
then failwith "yield_reader: only one callback can be registered at a time"
else t.wakeup_reader <- Optional_thunk.some k
Expand All @@ -94,9 +94,9 @@ let wakeup_reader t =
Optional_thunk.call_if_some f
;;

let on_wakeup_writer t k =
let yield_writer t k =
if is_closed t
then failwith "on_wakeup_writer on closed conn"
then failwith "yield_writer on closed conn"
else if Optional_thunk.is_some t.wakeup_writer
then failwith "yield_writer: only one callback can be registered at a time"
else t.wakeup_writer <- Optional_thunk.some k
Expand Down Expand Up @@ -171,6 +171,7 @@ let error_code t =
else None

let shutdown t =
Queue.clear t.request_queue;
shutdown_reader t;
shutdown_writer t;
wakeup_reader t;
Expand All @@ -197,45 +198,44 @@ let set_error_and_handle ?request t error =
let report_exn t exn =
set_error_and_handle t (`Exn exn)

let advance_request_queue_if_necessary t =
if is_active t then begin
let reqd = current_reqd_exn t in
if Reqd.persistent_connection reqd then begin
if Reqd.is_complete reqd then begin
ignore (Queue.take t.request_queue);
if not (Queue.is_empty t.request_queue)
then t.request_handler (current_reqd_exn t);
wakeup_reader t;
end
end else begin
ignore (Queue.take t.request_queue);
Queue.iter Reqd.close_request_body t.request_queue;
Queue.clear t.request_queue;
Queue.push reqd t.request_queue;
wakeup_writer t;
if Reqd.is_complete reqd
then shutdown t
else if not (Reqd.requires_input reqd)
then shutdown_reader t
end
end else if Reader.is_closed t.reader
then shutdown t

let _next_read_operation t =
advance_request_queue_if_necessary t;
if is_active t then begin
let advance_request_queue t =
ignore (Queue.take t.request_queue);
if not (Queue.is_empty t.request_queue)
then t.request_handler (Queue.peek_exn t.request_queue);
;;

let rec _next_read_operation t =
if not (is_active t) then (
if Reader.is_closed t.reader
then shutdown t;
Reader.next t.reader
) else (
let reqd = current_reqd_exn t in
if Reqd.requires_input reqd then Reader.next t.reader
else if Reqd.persistent_connection reqd then `Yield
else begin
match Reqd.input_state reqd with
| Provide -> Reader.next t.reader
| Complete -> _final_read_operation_for t reqd
)

and _final_read_operation_for t reqd =
let next =
if not (Reqd.persistent_connection reqd) then (
shutdown_reader t;
Reader.next t.reader
end
end else
Reader.next t.reader
Reader.next t.reader;
) else (
match Reqd.output_state reqd with
| Wait | Consume -> `Yield
| Complete ->
advance_request_queue t;
_next_read_operation t;
)
in
wakeup_writer t;
next
;;

let next_read_operation t =
match _next_read_operation t with
(* XXX(dpatti): These two [`Error _] constructors are never returned *)
| `Error (`Parse _) -> set_error_and_handle t `Bad_request; `Close
| `Error (`Bad_request request) -> set_error_and_handle ~request t `Bad_request; `Close
| (`Read | `Yield | `Close) as operation -> operation
Expand Down Expand Up @@ -268,22 +268,39 @@ let flush_response_body t =
Reqd.flush_response_body reqd
;;

let next_write_operation t =
advance_request_queue_if_necessary t;
flush_response_body t;
Writer.next t.writer
let rec _next_write_operation t =
if not (is_active t) then (
if Reader.is_closed t.reader
then shutdown t;
Writer.next t.writer
) else (
let reqd = current_reqd_exn t in
match Reqd.output_state reqd with
| Wait -> `Yield
| Consume ->
Reqd.flush_response_body reqd;
Writer.next t.writer
| Complete -> _final_write_operation_for t reqd
)

and _final_write_operation_for t reqd =
let next =
if not (Reqd.persistent_connection reqd) then (
shutdown_writer t;
Writer.next t.writer;
) else (
match Reqd.input_state reqd with
| Provide -> assert false
| Complete ->
advance_request_queue t;
_next_write_operation t;
)
in
wakeup_reader t;
next
;;

let next_write_operation t = _next_write_operation t

let report_write_result t result =
Writer.report_result t.writer result

let yield_writer t k =
if is_active t then begin
let reqd = current_reqd_exn t in
if Reqd.requires_output reqd
then Reqd.on_more_output_available reqd k
else if Reqd.persistent_connection reqd
then on_wakeup_writer t k
else begin shutdown t; k () end
end else if Writer.is_closed t.writer then k () else begin
on_wakeup_writer t k
end

0 comments on commit e6980ca

Please sign in to comment.