diff --git a/lib/reqd.ml b/lib/reqd.ml index 755e5906..77645fd4 100644 --- a/lib/reqd.ml +++ b/lib/reqd.ml @@ -47,6 +47,13 @@ module Input_state = struct | Complete end +module Output_state = struct + type t = + | Ready + | Wait + | Complete +end + type error_handler = ?request:Request.t -> error -> (Headers.t -> [`write] Body.t) -> unit @@ -109,13 +116,13 @@ let response { response_state; _ } = match response_state with | Waiting _ -> None | Streaming(response, _) - | Complete (response) -> Some response + | Complete response -> Some response let response_exn { response_state; _ } = match response_state with | Waiting _ -> failwith "httpaf.Reqd.response_exn: response has not started" | Streaming(response, _) - | Complete (response) -> response + | Complete response -> response let respond_with_string t response str = if t.error_code <> `Ok then @@ -123,7 +130,7 @@ let respond_with_string t response str = match t.response_state with | Waiting when_done_waiting -> (* XXX(seliopou): check response body length *) - Writer.write_response t.writer response; + Writer.write_response t.writer response; Writer.write_string t.writer str; if t.persistent then t.persistent <- Response.persistent_connection response; @@ -236,19 +243,16 @@ let input_state t : Input_state.t = else Ready ;; -let requires_output { response_state; _ } = - match response_state with - | Complete _ -> false - | Streaming (_, response_body) -> - not (Body.is_closed response_body) - || Body.has_pending_output response_body - | Waiting _ -> true - -let is_complete t = - match input_state t with - | Complete -> not (requires_output t) - | Ready -> false -;; +let output_state t : Output_state.t = + match t.response_state with + | Complete _ -> Complete + | Waiting _ -> Wait + | Streaming(_, response_body) -> + if Body.has_pending_output response_body + then Ready + else if Body.is_closed response_body + then Complete + else Wait let flush_request_body t = let request_body = request_body t in diff --git a/lib/server_connection.ml b/lib/server_connection.ml index faf1de1b..a36a78a1 100644 --- a/lib/server_connection.ml +++ b/lib/server_connection.ml @@ -82,7 +82,7 @@ let current_reqd_exn t = let yield_reader t k = if is_closed t - then failwith "on_wakeup_reader on closed conn" + then failwith "yield_reader on closed conn" else if Optional_thunk.is_some t.wakeup_reader then failwith "yield_reader: only one callback can be registered at a time" else t.wakeup_reader <- Optional_thunk.some k @@ -94,9 +94,9 @@ let wakeup_reader t = Optional_thunk.call_if_some f ;; -let on_wakeup_writer t k = +let yield_writer t k = if is_closed t - then failwith "on_wakeup_writer on closed conn" + then failwith "yield_writer on closed conn" else if Optional_thunk.is_some t.wakeup_writer then failwith "yield_writer: only one callback can be registered at a time" else t.wakeup_writer <- Optional_thunk.some k @@ -171,6 +171,7 @@ let error_code t = else None let shutdown t = + Queue.clear t.request_queue; shutdown_reader t; shutdown_writer t; wakeup_reader t; @@ -197,51 +198,44 @@ let set_error_and_handle ?request t error = let report_exn t exn = set_error_and_handle t (`Exn exn) -let advance_request_queue_if_necessary t = - if is_active t then begin - let reqd = current_reqd_exn t in - if Reqd.persistent_connection reqd then begin - if Reqd.is_complete reqd then begin - ignore (Queue.take t.request_queue); - if not (Queue.is_empty t.request_queue) - then t.request_handler (current_reqd_exn t); - wakeup_reader t; - end - end else begin - ignore (Queue.take t.request_queue); - Queue.iter Reqd.close_request_body t.request_queue; - Queue.clear t.request_queue; - Queue.push reqd t.request_queue; - wakeup_writer t; - if Reqd.is_complete reqd - then shutdown t - else - match Reqd.input_state reqd with - | Ready -> () - | Complete -> shutdown_reader t - end - end else if Reader.is_closed t.reader - then shutdown t - -let _next_read_operation t = - advance_request_queue_if_necessary t; - if is_active t - then ( +let advance_request_queue t = + ignore (Queue.take t.request_queue); + if not (Queue.is_empty t.request_queue) + then t.request_handler (Queue.peek_exn t.request_queue); +;; + +let rec _next_read_operation t = + if not (is_active t) then ( + if Reader.is_closed t.reader + then shutdown t; + Reader.next t.reader + ) else ( let reqd = current_reqd_exn t in match Reqd.input_state reqd with | Ready -> Reader.next t.reader - | Complete -> - if Reqd.persistent_connection reqd - then `Yield - else ( - shutdown_reader t; - Reader.next t.reader) + | Complete -> _final_read_operation_for t reqd ) - else Reader.next t.reader + +and _final_read_operation_for t reqd = + let next = + if not (Reqd.persistent_connection reqd) then ( + shutdown_reader t; + Reader.next t.reader; + ) else ( + match Reqd.output_state reqd with + | Wait | Ready -> `Yield + | Complete -> + advance_request_queue t; + _next_read_operation t; + ) + in + wakeup_writer t; + next ;; let next_read_operation t = match _next_read_operation t with + (* XXX(dpatti): These two [`Error _] constructors are never returned *) | `Error (`Parse _) -> set_error_and_handle t `Bad_request; `Close | `Error (`Bad_request request) -> set_error_and_handle ~request t `Bad_request; `Close | (`Read | `Yield | `Close) as operation -> operation @@ -274,22 +268,39 @@ let flush_response_body t = Reqd.flush_response_body reqd ;; -let next_write_operation t = - advance_request_queue_if_necessary t; - flush_response_body t; - Writer.next t.writer +let rec _next_write_operation t = + if not (is_active t) then ( + if Reader.is_closed t.reader + then shutdown t; + Writer.next t.writer + ) else ( + let reqd = current_reqd_exn t in + match Reqd.output_state reqd with + | Wait -> `Yield + | Ready -> + Reqd.flush_response_body reqd; + Writer.next t.writer + | Complete -> _final_write_operation_for t reqd + ) + +and _final_write_operation_for t reqd = + let next = + if not (Reqd.persistent_connection reqd) then ( + shutdown_writer t; + Writer.next t.writer; + ) else ( + match Reqd.input_state reqd with + | Ready -> assert false + | Complete -> + advance_request_queue t; + _next_write_operation t; + ) + in + wakeup_reader t; + next +;; + +let next_write_operation t = _next_write_operation t let report_write_result t result = Writer.report_result t.writer result - -let yield_writer t k = - if is_active t then begin - let reqd = current_reqd_exn t in - if Reqd.requires_output reqd - then Reqd.on_more_output_available reqd k - else if Reqd.persistent_connection reqd - then on_wakeup_writer t k - else begin shutdown t; k () end - end else if Writer.is_closed t.writer then k () else begin - on_wakeup_writer t k - end