Skip to content

Commit

Permalink
refactor request queue mechanics
Browse files Browse the repository at this point in the history
These are just the reqd changes from #172
  • Loading branch information
dpatti authored and seliopou committed Apr 28, 2020
1 parent 16a2486 commit 91e84d4
Showing 1 changed file with 37 additions and 20 deletions.
57 changes: 37 additions & 20 deletions lib/reqd.ml
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,25 @@
type error =
[ `Bad_request | `Bad_gateway | `Internal_server_error | `Exn of exn ]

type response_state =
| Waiting of Optional_thunk.t ref
| Complete of Response.t
| Streaming of Response.t * [`write] Body.t
module Response_state = struct
type t =
| Waiting of Optional_thunk.t ref
| Complete of Response.t
| Streaming of Response.t * [`write] Body.t
end

module Input_state = struct
type t =
| Provide
| Complete
end

module Output_state = struct
type t =
| Consume
| Wait
| Complete
end

type error_handler =
?request:Request.t -> error -> (Headers.t -> [`write] Body.t) -> unit
Expand Down Expand Up @@ -74,7 +89,7 @@ type t =
; response_body_buffer : Bigstringaf.t
; error_handler : error_handler
; mutable persistent : bool
; mutable response_state : response_state
; mutable response_state : Response_state.t
; mutable error_code : [`Ok | error ]
}

Expand All @@ -101,21 +116,21 @@ let response { response_state; _ } =
match response_state with
| Waiting _ -> None
| Streaming(response, _)
| Complete (response) -> Some response
| Complete response -> Some response

let response_exn { response_state; _ } =
match response_state with
| Waiting _ -> failwith "httpaf.Reqd.response_exn: response has not started"
| Streaming(response, _)
| Complete (response) -> response
| Complete response -> response

let respond_with_string t response str =
if t.error_code <> `Ok then
failwith "httpaf.Reqd.respond_with_string: invalid state, currently handling error";
match t.response_state with
| Waiting when_done_waiting ->
(* XXX(seliopou): check response body length *)
Writer.write_response t.writer response;
Writer.write_response t.writer response;
Writer.write_string t.writer str;
if t.persistent then
t.persistent <- Response.persistent_connection response;
Expand Down Expand Up @@ -222,19 +237,21 @@ let on_more_output_available t f =
let persistent_connection t =
t.persistent

let requires_input { request_body; _ } =
not (Body.is_closed request_body)
let input_state t : Input_state.t =
if Body.is_closed t.request_body
then Complete
else Provide

let requires_output { response_state; _ } =
match response_state with
| Complete _ -> false
| Streaming (_, response_body) ->
not (Body.is_closed response_body)
|| Body.has_pending_output response_body
| Waiting _ -> true

let is_complete t =
not (requires_input t || requires_output t)
let output_state t : Output_state.t =
match t.response_state with
| Complete _ -> Complete
| Waiting _ -> Wait
| Streaming(_, response_body) ->
if Body.has_pending_output response_body
then Consume
else if Body.is_closed response_body
then Complete
else Wait

let flush_request_body t =
let request_body = request_body t in
Expand Down

0 comments on commit 91e84d4

Please sign in to comment.