diff --git a/lib/body.ml b/lib/body.ml index d5f04244..1cdba678 100644 --- a/lib/body.ml +++ b/lib/body.ml @@ -33,20 +33,20 @@ module Reader = struct type t = - { faraday : Faraday.t - ; mutable read_scheduled : bool - ; mutable on_eof : unit -> unit - ; mutable on_read : Bigstringaf.t -> off:int -> len:int -> unit + { faraday : Faraday.t + ; mutable read_scheduled : bool + ; mutable on_eof : unit -> unit + ; mutable on_read : Bigstringaf.t -> off:int -> len:int -> unit } - let default_on_eof = Sys.opaque_identity (fun () -> ()) - let default_on_read = Sys.opaque_identity (fun _ ~off:_ ~len:_ -> ()) + let default_on_eof = Sys.opaque_identity (fun () -> ()) + let default_on_read = Sys.opaque_identity (fun _ ~off:_ ~len:_ -> ()) let create buffer = - { faraday = Faraday.of_bigstring buffer - ; read_scheduled = false - ; on_eof = default_on_eof - ; on_read = default_on_read + { faraday = Faraday.of_bigstring buffer + ; read_scheduled = false + ; on_eof = default_on_eof + ; on_read = default_on_read } let create_empty () = @@ -107,13 +107,13 @@ module Writer = struct | Chunked of { mutable written_final_chunk : bool } type t = - { faraday : Faraday.t - ; encoding : encoding - ; when_ready_to_write : unit -> unit - ; buffered_bytes : int ref + { faraday : Faraday.t + ; encoding : encoding + ; writer : Serialize.Writer.t + ; buffered_bytes : int ref } - let of_faraday faraday ~encoding ~when_ready_to_write = + let _of_faraday faraday ~encoding ~writer = let encoding = match encoding with | `Fixed _ | `Close_delimited -> Identity @@ -121,12 +121,19 @@ module Writer = struct in { faraday ; encoding - ; when_ready_to_write + ; writer ; buffered_bytes = ref 0 } - let create buffer ~encoding ~when_ready_to_write = - of_faraday (Faraday.of_bigstring buffer) ~encoding ~when_ready_to_write + let create buffer ~encoding ~writer = + _of_faraday (Faraday.of_bigstring buffer) ~encoding ~writer + + (* XXX(dpatti): [create_direct] allows you to write directly to the response + writer instead of going through an intermediary buffer, but at the cost of + not having the ability to transfer with the correct encoding. We should get + rid of this. *) + let create_direct ~encoding ~writer = + _of_faraday (Serialize.Writer.faraday writer) ~encoding ~writer let write_char t c = Faraday.write_char t.faraday c @@ -140,7 +147,7 @@ module Writer = struct let schedule_bigstring t ?off ?len (b:Bigstringaf.t) = Faraday.schedule_bigstring ?off ?len t.faraday b - let ready_to_write t = t.when_ready_to_write () + let ready_to_write t = Serialize.Writer.wakeup t.writer let flush t kontinue = Faraday.flush t.faraday kontinue; @@ -166,7 +173,7 @@ module Writer = struct in faraday_has_output || additional_encoding_output - let transfer_to_writer t writer = + let transfer_to_writer t = let faraday = t.faraday in begin match Faraday.operation faraday with | `Yield -> () @@ -176,9 +183,9 @@ module Writer = struct | Chunked ({ written_final_chunk } as chunked) -> if not written_final_chunk then begin chunked.written_final_chunk <- true; - Serialize.Writer.schedule_chunk writer []; + Serialize.Writer.schedule_chunk t.writer []; end); - Serialize.Writer.unyield writer; + Serialize.Writer.unyield t.writer; | `Writev iovecs -> let buffered = t.buffered_bytes in begin match IOVec.shiftv iovecs !buffered with @@ -187,10 +194,10 @@ module Writer = struct let lengthv = IOVec.lengthv iovecs in buffered := !buffered + lengthv; begin match t.encoding with - | Identity -> Serialize.Writer.schedule_fixed writer iovecs - | Chunked _ -> Serialize.Writer.schedule_chunk writer iovecs + | Identity -> Serialize.Writer.schedule_fixed t.writer iovecs + | Chunked _ -> Serialize.Writer.schedule_chunk t.writer iovecs end; - Serialize.Writer.flush writer (fun () -> + Serialize.Writer.flush t.writer (fun () -> Faraday.shift faraday lengthv; buffered := !buffered - lengthv) end diff --git a/lib/client_connection.ml b/lib/client_connection.ml index 443aad5e..7a2ca4c9 100644 --- a/lib/client_connection.ml +++ b/lib/client_connection.ml @@ -72,7 +72,7 @@ module Oneshot = struct failwith "Httpaf.Client_connection.request: invalid body length" in Body.Writer.create (Bigstringaf.create config.request_body_buffer_size) - ~encoding ~when_ready_to_write:(fun () -> Writer.wakeup writer) + ~encoding ~writer in let t = { request @@ -89,7 +89,7 @@ module Oneshot = struct let flush_request_body t = if Body.Writer.has_pending_output t.request_body - then Body.Writer.transfer_to_writer t.request_body t.writer + then Body.Writer.transfer_to_writer t.request_body ;; let set_error_and_handle_without_shutdown t error = diff --git a/lib/reqd.ml b/lib/reqd.ml index 5569bf1a..7178bad3 100644 --- a/lib/reqd.ml +++ b/lib/reqd.ml @@ -163,8 +163,7 @@ let unsafe_respond_with_streaming ~flush_headers_immediately t response = failwith "httpaf.Reqd.respond_with_streaming: invalid response body length" in let response_body = - Body.Writer.create t.response_body_buffer ~encoding ~when_ready_to_write:(fun () -> - Writer.wakeup t.writer) + Body.Writer.create t.response_body_buffer ~encoding ~writer:t.writer in Writer.write_response t.writer response; if t.persistent then @@ -257,5 +256,5 @@ let flush_request_body t = let flush_response_body t = match t.response_state with | Streaming (_, response_body) -> - Body.Writer.transfer_to_writer response_body t.writer + Body.Writer.transfer_to_writer response_body | _ -> () diff --git a/lib/server_connection.ml b/lib/server_connection.ml index 839a10b3..08ea6279 100644 --- a/lib/server_connection.ml +++ b/lib/server_connection.ml @@ -200,8 +200,7 @@ let set_error_and_handle ?request t error = | `Error (`Bad_gateway | `Internal_server_error) -> failwith "httpaf.Server_connection.error_handler: invalid response body length" in - Body.Writer.of_faraday (Writer.faraday writer) ~encoding - ~when_ready_to_write:(fun () -> Writer.wakeup writer)); + Body.Writer.create_direct ~encoding ~writer); end let report_exn t exn =