Skip to content

Commit

Permalink
Merge pull request #3 from ada2k/expose-write-failures-in-flush
Browse files Browse the repository at this point in the history
Expose write failures in flush
  • Loading branch information
dinosaure authored Aug 19, 2024
2 parents d4092f1 + 2d95e23 commit ed3b95c
Show file tree
Hide file tree
Showing 9 changed files with 211 additions and 118 deletions.
2 changes: 1 addition & 1 deletion examples/lwt/dune
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
(executables
(libraries h1 h1-lwt-unix h1_examples base stdio lwt lwt.unix)
(optional true)
(names lwt_get lwt_post lwt_echo_post lwt_echo_upgrade))
(names lwt_get lwt_post lwt_echo_post lwt_echo_upgrade lwt_chunked))

(alias
(name runtest)
Expand Down
40 changes: 40 additions & 0 deletions examples/lwt/lwt_chunked.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
open Base
open Lwt.Infix
module Arg = Stdlib.Arg

open H1_lwt_unix

let request_handler (_ : Unix.sockaddr) reqd =
let body = H1.Reqd.respond_with_streaming reqd (H1.Response.create ~headers:(H1.Headers.of_list ["connection", "close"]) `OK) in
let rec respond_loop i =
H1.Body.Writer.write_string body (Printf.sprintf "Chunk %i\n" i);
H1.Body.Writer.flush_with_reason body (function
| `Closed -> Stdio.print_endline "closed"
| `Written -> Stdio.print_endline "written"; Lwt.bind (Lwt_unix.sleep 5.) (fun () -> respond_loop (i+1)) |> ignore
);
Lwt.return_unit
in ignore (respond_loop 0)

let error_handler (_ : Unix.sockaddr) = H1_examples.Server.error_handler

let main port =
let listen_address = Unix.(ADDR_INET (inet_addr_loopback, port)) in
Lwt.async (fun () ->
Lwt_io.establish_server_with_client_socket
listen_address
(Server.create_connection_handler ~upgrade_handler:None ~request_handler ~error_handler)
>|= fun _server ->
Stdio.printf "Listening on port %i.\n" port);
let forever, _ = Lwt.wait () in
Lwt_main.run forever
;;

let () =
Stdlib.Sys.set_signal Stdlib.Sys.sigpipe Stdlib.Sys.Signal_ignore;
let port = ref 8080 in
Arg.parse
["-p", Arg.Set_int port, " Listening port number (8080 by default)"]
ignore
"Echoes POST requests. Runs forever.";
main !port
;;
111 changes: 71 additions & 40 deletions lib/body.ml
Original file line number Diff line number Diff line change
Expand Up @@ -102,53 +102,78 @@ module Reader = struct
end

module Writer = struct
module Writer = Serialize.Writer

type encoding =
| Identity
| Chunked of { mutable written_final_chunk : bool }

type t =
{ faraday : Faraday.t
; encoding : encoding
; when_ready_to_write : unit -> unit
; buffered_bytes : int ref
{ faraday : Faraday.t
; writer : Writer.t
; encoding : encoding
; buffered_bytes : int ref
}

let of_faraday faraday ~encoding ~when_ready_to_write =
let of_faraday faraday writer ~encoding =
let encoding =
match encoding with
| `Fixed _ | `Close_delimited -> Identity
| `Chunked -> Chunked { written_final_chunk = false }
in
{ faraday
; encoding
; when_ready_to_write
; writer
; buffered_bytes = ref 0
}

let create buffer ~encoding ~when_ready_to_write =
of_faraday (Faraday.of_bigstring buffer) ~encoding ~when_ready_to_write
let create buffer writer ~encoding =
of_faraday (Faraday.of_bigstring buffer) writer ~encoding

let write_char t c =
Faraday.write_char t.faraday c
if not (Faraday.is_closed t.faraday) then
Faraday.write_char t.faraday c

let write_string t ?off ?len s =
Faraday.write_string ?off ?len t.faraday s
if not (Faraday.is_closed t.faraday) then
Faraday.write_string ?off ?len t.faraday s

let write_bigstring t ?off ?len b =
Faraday.write_bigstring ?off ?len t.faraday b
if not (Faraday.is_closed t.faraday) then
Faraday.write_bigstring ?off ?len t.faraday b

let schedule_bigstring t ?off ?len (b:Bigstringaf.t) =
Faraday.schedule_bigstring ?off ?len t.faraday b
if not (Faraday.is_closed t.faraday) then
Faraday.schedule_bigstring ?off ?len t.faraday b

let ready_to_write t = t.when_ready_to_write ()
let ready_to_write t = Writer.wakeup t.writer

let flush t kontinue =
Faraday.flush t.faraday kontinue;
ready_to_write t

let flush_with_reason t kontinue =
if Writer.is_closed t.writer then
kontinue `Closed
else begin
Faraday.flush_with_reason t.faraday (fun reason ->
let result =
match reason with
| Nothing_pending | Shift -> `Written
| Drain -> `Closed
in
kontinue result);
ready_to_write t
end

let is_closed t =
Faraday.is_closed t.faraday

let close_and_drain t =
Faraday.close t.faraday;
(* Resolve all pending flushes *)
ignore (Faraday.drain t.faraday : int)

let close t =
Faraday.close t.faraday;
ready_to_write t;
Expand All @@ -166,33 +191,39 @@ module Writer = struct
in
faraday_has_output || additional_encoding_output

let transfer_to_writer t writer =
let transfer_to_writer t =
let faraday = t.faraday in
begin match Faraday.operation faraday with
| `Yield -> ()
| `Close ->
(match t.encoding with
| Identity -> ()
| Chunked ({ written_final_chunk } as chunked) ->
if not written_final_chunk then begin
chunked.written_final_chunk <- true;
Serialize.Writer.schedule_chunk writer [];
end);
Serialize.Writer.unyield writer;
| `Writev iovecs ->
let buffered = t.buffered_bytes in
begin match IOVec.shiftv iovecs !buffered with
| [] -> ()
| iovecs ->
let lengthv = IOVec.lengthv iovecs in
buffered := !buffered + lengthv;
begin match t.encoding with
| Identity -> Serialize.Writer.schedule_fixed writer iovecs
| Chunked _ -> Serialize.Writer.schedule_chunk writer iovecs
end;
Serialize.Writer.flush writer (fun () ->
Faraday.shift faraday lengthv;
buffered := !buffered - lengthv)
end
if Writer.is_closed t.writer then
close_and_drain t
else begin
match Faraday.operation faraday with
| `Yield -> ()
| `Close ->
(match t.encoding with
| Identity -> ()
| Chunked ({ written_final_chunk } as chunked) ->
if not written_final_chunk then begin
chunked.written_final_chunk <- true;
Serialize.Writer.schedule_chunk t.writer [];
end);
Serialize.Writer.unyield t.writer;
| `Writev iovecs ->
let buffered = t.buffered_bytes in
begin match IOVec.shiftv iovecs !buffered with
| [] -> ()
| iovecs ->
let lengthv = IOVec.lengthv iovecs in
buffered := !buffered + lengthv;
begin match t.encoding with
| Identity -> Serialize.Writer.schedule_fixed t.writer iovecs
| Chunked _ -> Serialize.Writer.schedule_chunk t.writer iovecs
end;
Serialize.Writer.flush t.writer (fun result ->
match result with
| `Closed -> close_and_drain t
| `Written ->
Faraday.shift faraday lengthv;
buffered := !buffered - lengthv)
end
end
end
6 changes: 3 additions & 3 deletions lib/client_connection.ml
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ module Oneshot = struct
| `Error `Bad_request ->
failwith "H1.Client_connection.request: invalid body length"
in
Body.Writer.create (Bigstringaf.create config.request_body_buffer_size)
~encoding ~when_ready_to_write:(fun () -> Writer.wakeup writer)
Body.Writer.create (Bigstringaf.create config.request_body_buffer_size) writer
~encoding
in
let t =
{ request
Expand All @@ -89,7 +89,7 @@ module Oneshot = struct

let flush_request_body t =
if Body.Writer.has_pending_output t.request_body
then Body.Writer.transfer_to_writer t.request_body t.writer
then Body.Writer.transfer_to_writer t.request_body
;;

let set_error_and_handle_without_shutdown t error =
Expand Down
18 changes: 12 additions & 6 deletions lib/h1.mli
Original file line number Diff line number Diff line change
Expand Up @@ -494,23 +494,29 @@ module Body : sig
modified until a subsequent call to {!flush} has successfully
completed. *)

val flush : t -> (unit -> unit) -> unit
(** [flush t f] makes all bytes in [t] available for writing to the awaiting
output channel. Once those bytes have reached that output channel, [f]
will be called.
val flush_with_reason : t -> ([ `Written | `Closed ] -> unit) -> unit
(** [flush_with_reason t f] makes all bytes in [t] available for writing to the awaiting output
channel. Once those bytes have reached that output channel, [f `Written] will be
called. If instead, the output channel is closed before all of those bytes are
successfully written, [f `Closed] will be called.
The type of the output channel is runtime-dependent, as are guarantees
about whether those packets have been queued for delivery or have
actually been received by the intended recipient. *)

val flush: t -> (unit -> unit) -> unit
(** [flush t f] is identical to [flush_with_reason t], except ignoring the result of the flush.
In most situations, you should use flush_with_reason and properly handle a closed output channel. *)

val close : t -> unit
(** [close t] closes [t], causing subsequent write calls to raise. If
[t] is writable, this will cause any pending output to become available
to the output channel. *)

val is_closed : t -> bool
(** [is_closed t] is [true] if {!close} has been called on [t] and [false]
otherwise. A closed [t] may still have pending output. *)
(** [is_closed t] is [true] if {!close} has been called on [t], or if the attached
output channel is closed (e.g. because [report_write_result `Closed] has been
called). A closed [t] may still have pending output. *)
end

end
Expand Down
6 changes: 2 additions & 4 deletions lib/reqd.ml
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,7 @@ let unsafe_respond_with_streaming ~flush_headers_immediately t response =
"H1.Reqd.respond_with_streaming: invalid response body length"
in
let response_body =
Body.Writer.create t.response_body_buffer ~encoding
~when_ready_to_write:(fun () -> Writer.wakeup t.writer)
Body.Writer.create t.response_body_buffer t.writer ~encoding
in
Writer.write_response t.writer response;
if t.persistent then
Expand Down Expand Up @@ -288,6 +287,5 @@ let flush_request_body t =

let flush_response_body t =
match t.response_state with
| Streaming (_, response_body) ->
Body.Writer.transfer_to_writer response_body t.writer
| Streaming (_, response_body) -> Body.Writer.transfer_to_writer response_body
| _ -> ()
28 changes: 17 additions & 11 deletions lib/serialize.ml
Original file line number Diff line number Diff line change
Expand Up @@ -89,18 +89,18 @@ let schedule_bigstring_chunk t chunk =
module Writer = struct
type t =
{ buffer : Bigstringaf.t
(* The buffer that the encoder uses for buffered writes. Managed by the
* control module for the encoder. *)
(* The buffer that the encoder uses for buffered writes. Managed by the
* control module for the encoder. *)
; encoder : Faraday.t
(* The encoder that handles encoding for writes. Uses the [buffer]
* referenced above internally. *)
(* The encoder that handles encoding for writes. Uses the [buffer]
* referenced above internally. *)
; mutable drained_bytes : int
(* The number of bytes that were not written due to the output stream
* being closed before all buffered output could be written. Useful for
* detecting error cases. *)
(* The number of bytes that were not written due to the output stream
* being closed before all buffered output could be written. Useful for
* detecting error cases. *)
; mutable wakeup : Optional_thunk.t
(* The callback from the runtime to be invoked when output is ready to be
* flushed. *)
(* The callback from the runtime to be invoked when output is ready to be
* flushed. *)
}

let create ?(buffer_size=0x800) () =
Expand Down Expand Up @@ -158,13 +158,19 @@ module Writer = struct
;;

let flush t f =
flush t.encoder f
flush_with_reason t.encoder (fun reason ->
let result =
match reason with
| Nothing_pending | Shift -> `Written
| Drain -> `Closed
in
f result)

let unyield t =
(* This would be better implemented by a function that just takes the
encoder out of a yielded state if it's in that state. Requires a change
to the faraday library. *)
flush t (fun () -> ())
flush t (fun _result -> ())

let yield t =
Faraday.yield t.encoder
Expand Down
5 changes: 3 additions & 2 deletions lib/server_connection.ml
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,9 @@ let set_error_and_handle ?request t error =
"H1.Server_connection.error_handler: invalid response body \
length"
in
Body.Writer.of_faraday (Writer.faraday writer) ~encoding
~when_ready_to_write:(fun () -> Writer.wakeup writer)))
Body.Writer.of_faraday (Writer.faraday writer) writer ~encoding
)
)

let report_exn t exn = set_error_and_handle t (`Exn exn)

Expand Down
Loading

0 comments on commit ed3b95c

Please sign in to comment.