Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix read loop with shutdown #210

Merged
merged 2 commits into from
Jun 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion lib/server_connection.ml
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,19 @@ and _final_read_operation_for t reqd =
Reader.next t.reader;
) else (
match Reqd.output_state reqd with
| Waiting | Ready -> `Yield
| Waiting | Ready ->
(* XXX(dpatti): This is a way in which the reader and writer are not
parallel -- we tell the writer when it needs to yield but the reader is
always asking for more data. This is the only branch in either
operation function that does not return `(Reader|Writer).next`, which
means there are surprising states you can get into. For example, we ask
the runtime to yield but then raise when it tries to because the reader
is closed. I don't think checking `is_closed` here makes sense
semantically, but I don't think checking it in `_next_read_operation`
makes sense either. I chose here so I could describe why. *)
if Reader.is_closed t.reader
then Reader.next t.reader
else `Yield
| Complete ->
advance_request_queue t;
_next_read_operation t;
Expand Down
74 changes: 63 additions & 11 deletions lib_test/test_server_connection.ml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ module Runtime : sig
val on_writer_unyield : t -> (unit -> unit) -> bool ref

val report_exn : t -> exn -> unit

val shutdown : t -> unit
end = struct
open Server_connection

Expand Down Expand Up @@ -158,32 +160,35 @@ end = struct
;;

let report_exn t = Server_connection.report_exn t.server_connection

let shutdown t = Server_connection.shutdown t.server_connection
end

open Runtime

let read t str ~off ~len =
do_read t (fun conn -> Server_connection.read conn str ~off ~len)
let read ?(eof=false) t str ~off ~len =
do_read t (fun conn ->
if eof
then Server_connection.read_eof conn str ~off ~len
else Server_connection.read conn str ~off ~len)
;;

let read_eof t str ~off ~len =
do_read t (fun conn -> Server_connection.read_eof conn str ~off ~len)
;;
let read_eof = read ~eof:true

let feed_string t str =
let feed_string ?eof t str =
let len = String.length str in
let input = Bigstringaf.of_string str ~off:0 ~len in
read t input ~off:0 ~len
read ?eof t input ~off:0 ~len
;;

let read_string t str =
let c = feed_string t str in
let read_string ?eof t str =
let c = feed_string ?eof t str in
Alcotest.(check int) "read consumes all input" (String.length str) c;
;;

let read_request t r =
let read_request ?eof t r =
let request_string = request_to_string r in
read_string t request_string
read_string ?eof t request_string
;;

let reader_ready t =
Expand Down Expand Up @@ -850,6 +855,21 @@ let test_multiple_requests_in_single_read_with_close () =
connection_is_shutdown t;
;;

let test_multiple_requests_in_single_read_with_eof () =
let response = Response.create `OK in
let t =
create (fun reqd -> Reqd.respond_with_string reqd response "")
in
let reqs =
request_to_string (Request.create `GET "/") ^
request_to_string (Request.create `GET "/")
in
read_string t reqs ~eof:true;
write_response t response;
write_response t response;
connection_is_shutdown t;
;;

let test_parse_failure_after_checkpoint () =
let error_queue = ref None in
let error_handler ?request:_ error _start_response =
Expand Down Expand Up @@ -894,6 +914,35 @@ let test_response_finished_before_body_read () =
write_response t response ~body:"done";
;;

let test_shutdown_in_request_handler () =
let request = Request.create `GET "/" in
let rec t =
lazy (create (fun _ -> shutdown (Lazy.force t)))
in
let t = Lazy.force t in
read_request t request;
reader_closed t;
writer_closed t
;;

let test_shutdown_during_asynchronous_request () =
let request = Request.create `GET "/" in
let response = Response.create `OK in
let continue = ref (fun () -> ()) in
let t = create (fun reqd ->
continue := (fun () ->
Reqd.respond_with_string reqd response ""))
in
read_request t request;
shutdown t;
(* This is raised from Faraday *)
Alcotest.check_raises "[continue] raises because writer is closed"
(Failure "cannot write to closed writer")
!continue;
reader_closed t;
writer_closed t
;;

let tests =
[ "initial reader state" , `Quick, test_initial_reader_state
; "shutdown reader closed", `Quick, test_reader_is_closed_after_eof
Expand All @@ -920,6 +969,9 @@ let tests =
; "multiple requests in single read", `Quick, test_multiple_requests_in_single_read
; "multiple async requests in single read", `Quick, test_multiple_async_requests_in_single_read
; "multiple requests with connection close", `Quick, test_multiple_requests_in_single_read_with_close
; "multiple requests with eof", `Quick, test_multiple_requests_in_single_read_with_eof
; "parse failure after checkpoint", `Quick, test_parse_failure_after_checkpoint
; "response finished before body read", `Quick, test_response_finished_before_body_read
; "shutdown in request handler", `Quick, test_shutdown_in_request_handler
; "shutdown during asynchronous request", `Quick, test_shutdown_during_asynchronous_request
]