diff --git a/lib/reqd.ml b/lib/reqd.ml index 106c06f4..8277a637 100644 --- a/lib/reqd.ml +++ b/lib/reqd.ml @@ -243,15 +243,6 @@ let output_state t : Output_state.t = | Waiting -> Waiting ;; -let is_complete t = - match input_state t with - | Ready -> false - | Complete -> - (match output_state t with - | Waiting | Ready -> false - | Complete -> true) -;; - let flush_request_body t = let request_body = request_body t in if Body.has_pending_output request_body diff --git a/lib/server_connection.ml b/lib/server_connection.ml index bf358e42..f81cd5bd 100644 --- a/lib/server_connection.ml +++ b/lib/server_connection.ml @@ -81,7 +81,7 @@ let current_reqd_exn t = let yield_reader t k = if is_closed t - then failwith "on_wakeup_reader on closed conn" + then failwith "yield_reader on closed conn" else if Optional_thunk.is_some t.wakeup_reader then failwith "yield_reader: only one callback can be registered at a time" else t.wakeup_reader <- Optional_thunk.some k @@ -182,49 +182,36 @@ let set_error_and_handle ?request t error = let report_exn t exn = set_error_and_handle t (`Exn exn) -let advance_request_queue_if_necessary t = - if is_active t then begin - let reqd = current_reqd_exn t in - if Reqd.persistent_connection reqd then begin - if Reqd.is_complete reqd then begin - ignore (Queue.take t.request_queue); - if not (Queue.is_empty t.request_queue) - then t.request_handler (current_reqd_exn t); - wakeup_reader t; - end - end else begin - (* Take the head of the queue, close the remaining request bodies, clear - * the queue, and push the head back on. We do not plan on processing any - * more requests after the current one. *) - ignore (Queue.take t.request_queue); - Queue.iter Reqd.close_request_body t.request_queue; - Queue.clear t.request_queue; - Queue.push reqd t.request_queue; - if Reqd.is_complete reqd - then shutdown t - else - match Reqd.input_state reqd with - | Ready -> () - | Complete -> shutdown_reader t - end - end else if Reader.is_closed t.reader - then shutdown t - -let _next_read_operation t = - advance_request_queue_if_necessary t; - if is_active t +let advance_request_queue t = + ignore (Queue.take t.request_queue); + if not (Queue.is_empty t.request_queue) + then t.request_handler (Queue.peek_exn t.request_queue); +;; + +let rec _next_read_operation t = + if not (is_active t) then ( + if Reader.is_closed t.reader + then shutdown t; + Reader.next t.reader + ) else ( let reqd = current_reqd_exn t in match Reqd.input_state reqd with | Ready -> Reader.next t.reader + | Complete -> _final_read_operation_for t reqd + ) + +and _final_read_operation_for t reqd = + if not (Reqd.persistent_connection reqd) then ( + shutdown_reader t; + Reader.next t.reader; + ) else ( + match Reqd.output_state reqd with + | Waiting | Ready -> `Yield | Complete -> - if Reqd.persistent_connection reqd - then `Yield - else ( - shutdown_reader t; - Reader.next t.reader) + advance_request_queue t; + _next_read_operation t; ) - else Reader.next t.reader ;; let next_read_operation t = @@ -259,13 +246,42 @@ let read t bs ~off ~len = let read_eof t bs ~off ~len = read_with_more t bs ~off ~len Complete -let next_write_operation t = - advance_request_queue_if_necessary t; - if is_active t - then ( +let rec _next_write_operation t = + if not (is_active t) + then Writer.next t.writer + else ( let reqd = current_reqd_exn t in - Reqd.flush_response_body reqd); - Writer.next t.writer + match Reqd.output_state reqd with + | Waiting -> + (* XXX(dpatti): I don't think we should need to call this, but it is + necessary in the case of a streaming, non-chunked body so that you can + set the appropriate flag. *) + Reqd.flush_response_body reqd; + Writer.next t.writer + | Ready -> + Reqd.flush_response_body reqd; + Writer.next t.writer + | Complete -> _final_write_operation_for t reqd + ) + +and _final_write_operation_for t reqd = + let next = + if not (Reqd.persistent_connection reqd) then ( + shutdown_writer t; + Writer.next t.writer; + ) else ( + match Reqd.input_state reqd with + | Ready -> Writer.next t.writer; + | Complete -> + advance_request_queue t; + _next_write_operation t; + ) + in + wakeup_reader t; + next +;; + +let next_write_operation t = _next_write_operation t let report_write_result t result = Writer.report_result t.writer result