Skip to content

Commit

Permalink
refactor shutdown handlers and fix test
Browse files Browse the repository at this point in the history
In the parent feature, some changes to the test runtime were made, but
they weren't representative of what a normal runtime would do. This was
due to a bug that was fixed in another branch that this is based off of.
I reverted the changes to the runtime and reorganized the test to read
more clearly.

At the same time, I refactored `shutdown_reader` and `shutdown_writer`.
We were checking `is_active` twice in `shutdown_writer`, and it wasn't
clear if that was necessary. I also became curious of why we only wake
the reader/writer if there is no request. Presumably, if our actions on
the request cause it to wake the reader, then the wake call will be a
no-op anyway. I reorganized the code to both make the two
implementations more parallel and a bit more straightforward. This also
meant we could drop the extra wakes in shutdown.
  • Loading branch information
dpatti committed May 22, 2021
1 parent 1bae46c commit 084aef7
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 36 deletions.
18 changes: 8 additions & 10 deletions lib/server_connection.ml
Original file line number Diff line number Diff line change
Expand Up @@ -138,17 +138,17 @@ let create ?(config=Config.default) ?(error_handler=default_error_handler) reque
}

let shutdown_reader t =
Reader.force_close t.reader;
if is_active t
then Reqd.close_request_body (current_reqd_exn t)
else wakeup_reader t
then Reqd.close_request_body (current_reqd_exn t);
Reader.force_close t.reader;
wakeup_reader t

let shutdown_writer t =
if is_active t then Reqd.flush_response_body (current_reqd_exn t);
if is_active t then (
Reqd.close_request_body (current_reqd_exn t);
Reqd.flush_response_body (current_reqd_exn t));
Writer.close t.writer;
if is_active t
then Reqd.close_request_body (current_reqd_exn t)
else wakeup_writer t
wakeup_writer t

let error_code t =
if is_active t
Expand All @@ -157,9 +157,7 @@ let error_code t =

let shutdown t =
shutdown_reader t;
shutdown_writer t;
wakeup_reader t;
wakeup_writer t
shutdown_writer t

let set_error_and_handle ?request t error =
if is_active t then begin
Expand Down
49 changes: 23 additions & 26 deletions lib_test/test_server_connection.ml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ end = struct
; write_loop : (unit -> unit)
; mutable read_unyield_hook : (unit -> unit) option
; mutable write_unyield_hook : (unit -> unit) option
; mutable stopped : bool
}

let rec read_step t =
Expand All @@ -64,13 +63,12 @@ end = struct
| `Yield ->
trace "reader: Yield";
t.read_operation <- `Yield;
if not t.stopped then
yield_reader t.server_connection (fun () ->
trace "reader: Yield callback";
read_step t;
t.read_unyield_hook |> Option.iter (fun f ->
t.read_unyield_hook <- None;
f ()))
yield_reader t.server_connection (fun () ->
trace "reader: Yield callback";
read_step t;
t.read_unyield_hook |> Option.iter (fun f ->
t.read_unyield_hook <- None;
f ()))
| `Close ->
trace "reader: Close";
t.read_operation <- `Close
Expand All @@ -84,13 +82,12 @@ end = struct
| `Yield ->
t.write_operation <- `Yield;
trace "writer: Yield";
if not t.stopped then
yield_writer t.server_connection (fun () ->
trace "writer: Yield callback";
write_step t;
t.write_unyield_hook |> Option.iter (fun f ->
t.write_unyield_hook <- None;
f ()))
yield_writer t.server_connection (fun () ->
trace "writer: Yield callback";
write_step t;
t.write_unyield_hook |> Option.iter (fun f ->
t.write_unyield_hook <- None;
f ()))
| `Close n ->
trace "writer: Close";
t.write_operation <- `Close n
Expand All @@ -106,7 +103,6 @@ end = struct
; write_loop = (fun () -> write_step (Lazy.force_val t))
; read_unyield_hook = None
; write_unyield_hook = None
; stopped = false
})
in
let t = Lazy.force_val t in
Expand Down Expand Up @@ -165,9 +161,7 @@ end = struct

let report_exn t = Server_connection.report_exn t.server_connection

let shutdown t =
t.stopped <- true;
Server_connection.shutdown t.server_connection
let shutdown t = Server_connection.shutdown t.server_connection
end

open Runtime
Expand Down Expand Up @@ -932,18 +926,21 @@ let test_shutdown_in_request_handler () =
;;

let test_flush_response_before_shutdown () =
let reqd = ref None in
let request_handler reqd' = reqd := Some reqd' in
let t = create request_handler in
let request = Request.create `GET "/" ~headers:(Headers.encoding_fixed 0) in
read_request t request;
let reqd = Option.get !reqd in
let response = Response.create `OK in
let body = Reqd.respond_with_streaming ~flush_headers_immediately:true reqd response in
let continue = ref (fun () -> ()) in
let request_handler reqd =
let body = Reqd.respond_with_streaming ~flush_headers_immediately:true reqd response in
continue := (fun () ->
Body.write_string body "hello world");
in
let t = create request_handler in
read_request t request;
write_response t response;
Body.write_string body "hello world";
!continue ();
shutdown t;
write_string t "hello world";
connection_is_shutdown t
;;

let tests =
Expand Down

0 comments on commit 084aef7

Please sign in to comment.