diff --git a/lib/server_connection.ml b/lib/server_connection.ml index f81cd5bd..39336452 100644 --- a/lib/server_connection.ml +++ b/lib/server_connection.ml @@ -144,6 +144,7 @@ let shutdown_reader t = else wakeup_reader t let shutdown_writer t = + if is_active t then Reqd.flush_response_body (current_reqd_exn t); Writer.close t.writer; if is_active t then Reqd.close_request_body (current_reqd_exn t) diff --git a/lib_test/test_server_connection.ml b/lib_test/test_server_connection.ml index e1b0e696..ea093da2 100644 --- a/lib_test/test_server_connection.ml +++ b/lib_test/test_server_connection.ml @@ -40,6 +40,8 @@ module Runtime : sig val on_writer_unyield : t -> (unit -> unit) -> bool ref val report_exn : t -> exn -> unit + + val shutdown : t -> unit end = struct open Server_connection @@ -51,6 +53,7 @@ end = struct ; write_loop : (unit -> unit) ; mutable read_unyield_hook : (unit -> unit) option ; mutable write_unyield_hook : (unit -> unit) option + ; mutable stopped : bool } let rec read_step t = @@ -61,12 +64,13 @@ end = struct | `Yield -> trace "reader: Yield"; t.read_operation <- `Yield; - yield_reader t.server_connection (fun () -> - trace "reader: Yield callback"; - read_step t; - t.read_unyield_hook |> Option.iter (fun f -> - t.read_unyield_hook <- None; - f ())) + if not t.stopped then + yield_reader t.server_connection (fun () -> + trace "reader: Yield callback"; + read_step t; + t.read_unyield_hook |> Option.iter (fun f -> + t.read_unyield_hook <- None; + f ())) | `Close -> trace "reader: Close"; t.read_operation <- `Close @@ -80,12 +84,13 @@ end = struct | `Yield -> t.write_operation <- `Yield; trace "writer: Yield"; - yield_writer t.server_connection (fun () -> - trace "writer: Yield callback"; - write_step t; - t.write_unyield_hook |> Option.iter (fun f -> - t.write_unyield_hook <- None; - f ())) + if not t.stopped then + yield_writer t.server_connection (fun () -> + trace "writer: Yield callback"; + write_step t; + t.write_unyield_hook |> Option.iter (fun f -> + t.write_unyield_hook <- None; + f ())) | `Close n -> trace "writer: Close"; t.write_operation <- `Close n @@ -101,6 +106,7 @@ end = struct ; write_loop = (fun () -> write_step (Lazy.force_val t)) ; read_unyield_hook = None ; write_unyield_hook = None + ; stopped = false }) in let t = Lazy.force_val t in @@ -158,6 +164,10 @@ end = struct ;; let report_exn t = Server_connection.report_exn t.server_connection + + let shutdown t = + t.stopped <- true; + Server_connection.shutdown t.server_connection end open Runtime @@ -894,6 +904,21 @@ let test_response_finished_before_body_read () = write_response t response ~body:"done"; ;; +let test_flush_response_before_shutdown () = + let reqd = ref None in + let request_handler reqd' = reqd := Some reqd' in + let t = create request_handler in + let request = Request.create `GET "/" ~headers:(Headers.encoding_fixed 0) in + read_request t request; + let reqd = Option.get !reqd in + let response = Response.create `OK in + let body = Reqd.respond_with_streaming ~flush_headers_immediately:true reqd response in + write_response t response; + Body.write_string body "hello world"; + shutdown t; + write_string t "hello world"; +;; + let tests = [ "initial reader state" , `Quick, test_initial_reader_state ; "shutdown reader closed", `Quick, test_reader_is_closed_after_eof @@ -922,4 +947,5 @@ let tests = ; "multiple requests with connection close", `Quick, test_multiple_requests_in_single_read_with_close ; "parse failure after checkpoint", `Quick, test_parse_failure_after_checkpoint ; "response finished before body read", `Quick, test_response_finished_before_body_read + ; "flush response before shutdown", `Quick, test_flush_response_before_shutdown ]