Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose write failures in flush #3

Merged
merged 2 commits into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/lwt/dune
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
(executables
(libraries h1 h1-lwt-unix h1_examples base stdio lwt lwt.unix)
(optional true)
(names lwt_get lwt_post lwt_echo_post lwt_echo_upgrade))
(names lwt_get lwt_post lwt_echo_post lwt_echo_upgrade lwt_chunked))

(alias
(name runtest)
Expand Down
40 changes: 40 additions & 0 deletions examples/lwt/lwt_chunked.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
open Base
open Lwt.Infix
module Arg = Stdlib.Arg

open H1_lwt_unix

let request_handler (_ : Unix.sockaddr) reqd =
let body = H1.Reqd.respond_with_streaming reqd (H1.Response.create ~headers:(H1.Headers.of_list ["connection", "close"]) `OK) in
let rec respond_loop i =
H1.Body.Writer.write_string body (Printf.sprintf "Chunk %i\n" i);
H1.Body.Writer.flush_with_reason body (function
| `Closed -> Stdio.print_endline "closed"
| `Written -> Stdio.print_endline "written"; Lwt.bind (Lwt_unix.sleep 5.) (fun () -> respond_loop (i+1)) |> ignore
);
Lwt.return_unit
in ignore (respond_loop 0)

let error_handler (_ : Unix.sockaddr) = H1_examples.Server.error_handler

let main port =
let listen_address = Unix.(ADDR_INET (inet_addr_loopback, port)) in
Lwt.async (fun () ->
Lwt_io.establish_server_with_client_socket
listen_address
(Server.create_connection_handler ~upgrade_handler:None ~request_handler ~error_handler)
>|= fun _server ->
Stdio.printf "Listening on port %i.\n" port);
let forever, _ = Lwt.wait () in
Lwt_main.run forever
;;

let () =
Stdlib.Sys.set_signal Stdlib.Sys.sigpipe Stdlib.Sys.Signal_ignore;
let port = ref 8080 in
Arg.parse
["-p", Arg.Set_int port, " Listening port number (8080 by default)"]
ignore
"Echoes POST requests. Runs forever.";
main !port
;;
111 changes: 71 additions & 40 deletions lib/body.ml
Original file line number Diff line number Diff line change
Expand Up @@ -102,53 +102,78 @@ module Reader = struct
end

module Writer = struct
module Writer = Serialize.Writer

type encoding =
| Identity
| Chunked of { mutable written_final_chunk : bool }

type t =
{ faraday : Faraday.t
; encoding : encoding
; when_ready_to_write : unit -> unit
; buffered_bytes : int ref
{ faraday : Faraday.t
; writer : Writer.t
; encoding : encoding
; buffered_bytes : int ref
}

let of_faraday faraday ~encoding ~when_ready_to_write =
let of_faraday faraday writer ~encoding =
let encoding =
match encoding with
| `Fixed _ | `Close_delimited -> Identity
| `Chunked -> Chunked { written_final_chunk = false }
in
{ faraday
; encoding
; when_ready_to_write
; writer
; buffered_bytes = ref 0
}

let create buffer ~encoding ~when_ready_to_write =
of_faraday (Faraday.of_bigstring buffer) ~encoding ~when_ready_to_write
let create buffer writer ~encoding =
of_faraday (Faraday.of_bigstring buffer) writer ~encoding

let write_char t c =
Faraday.write_char t.faraday c
if not (Faraday.is_closed t.faraday) then
Faraday.write_char t.faraday c

let write_string t ?off ?len s =
Faraday.write_string ?off ?len t.faraday s
if not (Faraday.is_closed t.faraday) then
Faraday.write_string ?off ?len t.faraday s

let write_bigstring t ?off ?len b =
Faraday.write_bigstring ?off ?len t.faraday b
if not (Faraday.is_closed t.faraday) then
Faraday.write_bigstring ?off ?len t.faraday b

let schedule_bigstring t ?off ?len (b:Bigstringaf.t) =
Faraday.schedule_bigstring ?off ?len t.faraday b
if not (Faraday.is_closed t.faraday) then
Faraday.schedule_bigstring ?off ?len t.faraday b

let ready_to_write t = t.when_ready_to_write ()
let ready_to_write t = Writer.wakeup t.writer

let flush t kontinue =
Faraday.flush t.faraday kontinue;
ready_to_write t

let flush_with_reason t kontinue =
if Writer.is_closed t.writer then
kontinue `Closed
else begin
Faraday.flush_with_reason t.faraday (fun reason ->
let result =
match reason with
| Nothing_pending | Shift -> `Written
| Drain -> `Closed
in
kontinue result);
ready_to_write t
end

let is_closed t =
Faraday.is_closed t.faraday

let close_and_drain t =
Faraday.close t.faraday;
(* Resolve all pending flushes *)
ignore (Faraday.drain t.faraday : int)

let close t =
Faraday.close t.faraday;
ready_to_write t;
Expand All @@ -166,33 +191,39 @@ module Writer = struct
in
faraday_has_output || additional_encoding_output

let transfer_to_writer t writer =
let transfer_to_writer t =
let faraday = t.faraday in
begin match Faraday.operation faraday with
| `Yield -> ()
| `Close ->
(match t.encoding with
| Identity -> ()
| Chunked ({ written_final_chunk } as chunked) ->
if not written_final_chunk then begin
chunked.written_final_chunk <- true;
Serialize.Writer.schedule_chunk writer [];
end);
Serialize.Writer.unyield writer;
| `Writev iovecs ->
let buffered = t.buffered_bytes in
begin match IOVec.shiftv iovecs !buffered with
| [] -> ()
| iovecs ->
let lengthv = IOVec.lengthv iovecs in
buffered := !buffered + lengthv;
begin match t.encoding with
| Identity -> Serialize.Writer.schedule_fixed writer iovecs
| Chunked _ -> Serialize.Writer.schedule_chunk writer iovecs
end;
Serialize.Writer.flush writer (fun () ->
Faraday.shift faraday lengthv;
buffered := !buffered - lengthv)
end
if Writer.is_closed t.writer then
close_and_drain t
else begin
match Faraday.operation faraday with
| `Yield -> ()
| `Close ->
(match t.encoding with
| Identity -> ()
| Chunked ({ written_final_chunk } as chunked) ->
if not written_final_chunk then begin
chunked.written_final_chunk <- true;
Serialize.Writer.schedule_chunk t.writer [];
end);
Serialize.Writer.unyield t.writer;
| `Writev iovecs ->
let buffered = t.buffered_bytes in
begin match IOVec.shiftv iovecs !buffered with
| [] -> ()
| iovecs ->
let lengthv = IOVec.lengthv iovecs in
buffered := !buffered + lengthv;
begin match t.encoding with
| Identity -> Serialize.Writer.schedule_fixed t.writer iovecs
| Chunked _ -> Serialize.Writer.schedule_chunk t.writer iovecs
end;
Serialize.Writer.flush t.writer (fun result ->
match result with
| `Closed -> close_and_drain t
| `Written ->
Faraday.shift faraday lengthv;
buffered := !buffered - lengthv)
end
end
end
6 changes: 3 additions & 3 deletions lib/client_connection.ml
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ module Oneshot = struct
| `Error `Bad_request ->
failwith "H1.Client_connection.request: invalid body length"
in
Body.Writer.create (Bigstringaf.create config.request_body_buffer_size)
~encoding ~when_ready_to_write:(fun () -> Writer.wakeup writer)
Body.Writer.create (Bigstringaf.create config.request_body_buffer_size) writer
~encoding
in
let t =
{ request
Expand All @@ -89,7 +89,7 @@ module Oneshot = struct

let flush_request_body t =
if Body.Writer.has_pending_output t.request_body
then Body.Writer.transfer_to_writer t.request_body t.writer
then Body.Writer.transfer_to_writer t.request_body
;;

let set_error_and_handle_without_shutdown t error =
Expand Down
18 changes: 12 additions & 6 deletions lib/h1.mli
Original file line number Diff line number Diff line change
Expand Up @@ -494,23 +494,29 @@ module Body : sig
modified until a subsequent call to {!flush} has successfully
completed. *)

val flush : t -> (unit -> unit) -> unit
(** [flush t f] makes all bytes in [t] available for writing to the awaiting
output channel. Once those bytes have reached that output channel, [f]
will be called.
val flush_with_reason : t -> ([ `Written | `Closed ] -> unit) -> unit
(** [flush_with_reason t f] makes all bytes in [t] available for writing to the awaiting output
channel. Once those bytes have reached that output channel, [f `Written] will be
called. If instead, the output channel is closed before all of those bytes are
successfully written, [f `Closed] will be called.

The type of the output channel is runtime-dependent, as are guarantees
about whether those packets have been queued for delivery or have
actually been received by the intended recipient. *)

val flush: t -> (unit -> unit) -> unit
(** [flush t f] is identical to [flush_with_reason t], except ignoring the result of the flush.
In most situations, you should use flush_with_reason and properly handle a closed output channel. *)

val close : t -> unit
(** [close t] closes [t], causing subsequent write calls to raise. If
[t] is writable, this will cause any pending output to become available
to the output channel. *)

val is_closed : t -> bool
(** [is_closed t] is [true] if {!close} has been called on [t] and [false]
otherwise. A closed [t] may still have pending output. *)
(** [is_closed t] is [true] if {!close} has been called on [t], or if the attached
output channel is closed (e.g. because [report_write_result `Closed] has been
called). A closed [t] may still have pending output. *)
end

end
Expand Down
6 changes: 2 additions & 4 deletions lib/reqd.ml
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,7 @@ let unsafe_respond_with_streaming ~flush_headers_immediately t response =
"H1.Reqd.respond_with_streaming: invalid response body length"
in
let response_body =
Body.Writer.create t.response_body_buffer ~encoding
~when_ready_to_write:(fun () -> Writer.wakeup t.writer)
Body.Writer.create t.response_body_buffer t.writer ~encoding
in
Writer.write_response t.writer response;
if t.persistent then
Expand Down Expand Up @@ -288,6 +287,5 @@ let flush_request_body t =

let flush_response_body t =
match t.response_state with
| Streaming (_, response_body) ->
Body.Writer.transfer_to_writer response_body t.writer
| Streaming (_, response_body) -> Body.Writer.transfer_to_writer response_body
| _ -> ()
28 changes: 17 additions & 11 deletions lib/serialize.ml
Original file line number Diff line number Diff line change
Expand Up @@ -89,18 +89,18 @@ let schedule_bigstring_chunk t chunk =
module Writer = struct
type t =
{ buffer : Bigstringaf.t
(* The buffer that the encoder uses for buffered writes. Managed by the
* control module for the encoder. *)
(* The buffer that the encoder uses for buffered writes. Managed by the
* control module for the encoder. *)
; encoder : Faraday.t
(* The encoder that handles encoding for writes. Uses the [buffer]
* referenced above internally. *)
(* The encoder that handles encoding for writes. Uses the [buffer]
* referenced above internally. *)
; mutable drained_bytes : int
(* The number of bytes that were not written due to the output stream
* being closed before all buffered output could be written. Useful for
* detecting error cases. *)
(* The number of bytes that were not written due to the output stream
* being closed before all buffered output could be written. Useful for
* detecting error cases. *)
; mutable wakeup : Optional_thunk.t
(* The callback from the runtime to be invoked when output is ready to be
* flushed. *)
(* The callback from the runtime to be invoked when output is ready to be
* flushed. *)
}

let create ?(buffer_size=0x800) () =
Expand Down Expand Up @@ -158,13 +158,19 @@ module Writer = struct
;;

let flush t f =
flush t.encoder f
flush_with_reason t.encoder (fun reason ->
let result =
match reason with
| Nothing_pending | Shift -> `Written
| Drain -> `Closed
in
f result)

let unyield t =
(* This would be better implemented by a function that just takes the
encoder out of a yielded state if it's in that state. Requires a change
to the faraday library. *)
flush t (fun () -> ())
flush t (fun _result -> ())

let yield t =
Faraday.yield t.encoder
Expand Down
5 changes: 3 additions & 2 deletions lib/server_connection.ml
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,9 @@ let set_error_and_handle ?request t error =
"H1.Server_connection.error_handler: invalid response body \
length"
in
Body.Writer.of_faraday (Writer.faraday writer) ~encoding
~when_ready_to_write:(fun () -> Writer.wakeup writer)))
Body.Writer.of_faraday (Writer.faraday writer) writer ~encoding
)
)

let report_exn t exn = set_error_and_handle t (`Exn exn)

Expand Down
Loading
Loading