Skip to content

Commit

Permalink
Merge pull request #14 from kit-ty-kate/fold
Browse files Browse the repository at this point in the history
Add Http_lwt_client.fold_request for handling receiving streams of data instead of one large string that might not fit into RAM
  • Loading branch information
hannesm authored Oct 25, 2022
2 parents f44b228 + 0b85f82 commit 42c2fbc
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 52 deletions.
45 changes: 27 additions & 18 deletions app/hurl.ml
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
let jump () protocol uri meth headers output input no_follow =
let jump () protocol uri meth headers input output no_follow =
let ( let* ) = Result.bind in
let config = match protocol with
| None -> None
| Some `HTTP_1_1 -> Some (`HTTP_1_1 Httpaf.Config.default)
| Some `H2 -> Some (`H2 H2.Config.default)
in
let open Lwt.Infix in
let* body, default_meth =
match input with
| None -> Ok (None, `GET)
Expand All @@ -13,23 +14,31 @@ let jump () protocol uri meth headers output input no_follow =
Ok (Some d, `POST)
in
let meth = match meth with None -> default_meth | Some x -> x in
let open Lwt.Infix in
Lwt_main.run ((
Http_lwt_client.one_request ?config ~meth ~headers ?body ~follow_redirect:(not no_follow) uri >|= function
| Ok (resp, body) ->
Format.fprintf Format.std_formatter "%a\n%!"
Http_lwt_client.pp_response resp;
(match body with
| None -> Ok ()
| Some data ->
match output with
| None -> Format.fprintf Format.std_formatter "%s\n%!" data ; Ok ()
| Some fn -> Bos.OS.File.write (Fpath.v fn) data)
| Error `Msg msg as e ->
Logs.err (fun m -> m "error %s" msg);
e) >|= fun r ->
Unix.sleep 100;
r)
Lwt_main.run (
let fd, close = match output with
| None -> Unix.stdout, fun () -> ()
| Some fn ->
let fd = Unix.openfile fn [ Unix.O_WRONLY ] 0o644 in
fd, fun () -> Unix.close fd
in
let reply () data =
let bytes = Bytes.of_string data in
let blen = String.length data in
let written = Unix.write fd bytes 0 blen in
if written <> blen then
Printf.printf "couldn't fully write (%d of %d bytes)" written blen;
Lwt.return_unit
in
(Http_lwt_client.request ?config ~meth ~headers ?body ~follow_redirect:(not no_follow) uri reply () >|= function
| Ok (resp, ()) ->
Format.fprintf Format.std_formatter "\n%a\n%!"
Http_lwt_client.pp_response resp;
Ok ()
| Error `Msg msg as e ->
Logs.err (fun m -> m "error %s" msg);
e) >|= fun r ->
close ();
r)

let setup_log style_renderer level =
Fmt_tty.setup_std_outputs ?style_renderer ();
Expand Down
68 changes: 41 additions & 27 deletions src/http_lwt_client.ml
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ let prep_http_1_1_headers headers host user_pass blen =
let headers = add headers "user-agent" ("http-lwt-client/%%VERSION_NUM%%") in
let headers = add headers "host" host in
let headers = add headers "connection" "close" in
let headers = match blen with
| None -> headers
| Some x -> add headers "content-length" (string_of_int x)
let headers =
add headers "content-length"
(string_of_int (Option.value ~default:0 blen))
in
add_authentication ~add headers user_pass

Expand All @@ -82,7 +82,8 @@ let prep_h2_headers headers host user_pass blen =
let add hdr = H2.Headers.add_unless_exists hdr ?sensitive:None in
let headers = add headers ":authority" host in
let headers =
add headers "content-length" (string_of_int (Option.value ~default:0 blen))
add headers "content-length"
(string_of_int (Option.value ~default:0 blen))
in
add_authentication ~add headers user_pass

Expand All @@ -100,8 +101,8 @@ let pp_response ppf { version ; status ; reason ; headers } =
Format.fprintf ppf "((version \"%a\") (status %a) (reason %S) (headers %a))"
Version.pp_hum version Status.pp_hum status reason Headers.pp_hum headers

let single_http_1_1_request ?config fd user_pass host meth path headers body =
let blen = match body with None -> None | Some x -> Some (String.length x) in
let single_http_1_1_request ?config fd user_pass host meth path headers body f f_init =
let blen = Option.map String.length body in
let headers = prep_http_1_1_headers headers host user_pass blen in
let req = Httpaf.Request.create ~headers meth path in
let finished, notify_finished = Lwt.wait () in
Expand All @@ -126,15 +127,20 @@ let single_http_1_1_request ?config fd user_pass host meth path headers body =
wakeup (Ok (response, data))
in
let response_handler response response_body =
let open Lwt.Infix in
let rec on_read on_eof data bs ~off ~len =
let data = data ^ Bigstringaf.substring ~off ~len bs in
let data =
data >>= fun data ->
f data (Bigstringaf.substring ~off ~len bs)
in
Httpaf.Body.schedule_read response_body
~on_read:(on_read on_eof data)
~on_eof:(on_eof response (Some data))
~on_eof:(on_eof response data)
in
let f_init = Lwt.return f_init in
Httpaf.Body.schedule_read response_body
~on_read:(on_read on_eof "")
~on_eof:(on_eof response None)
~on_read:(on_read on_eof f_init)
~on_eof:(on_eof response f_init)
in
let error_handler e =
let err = match e with
Expand All @@ -158,8 +164,8 @@ let single_http_1_1_request ?config fd user_pass host meth path headers body =
Httpaf.Body.close_writer request_body;
finished

let single_h2_request ?config fd scheme user_pass host meth path headers body =
let blen = match body with None -> None | Some x -> Some (String.length x) in
let single_h2_request ?config fd scheme user_pass host meth path headers body f f_init =
let blen = Option.map String.length body in
let headers = prep_h2_headers headers host user_pass blen in
let req = H2.Request.create ~scheme ~headers meth path in
Logs.debug (fun m -> m "Sending @[<v 0>%a@]" H2.Request.pp_hum req);
Expand All @@ -182,15 +188,20 @@ let single_h2_request ?config fd scheme user_pass host meth path headers body =
wakeup (Ok (response, data))
in
let response_handler response response_body =
let open Lwt.Infix in
let rec on_read on_eof data bs ~off ~len =
let data = data ^ Bigstringaf.substring ~off ~len bs in
let data =
data >>= fun data ->
f data (Bigstringaf.substring ~off ~len bs)
in
H2.Body.Reader.schedule_read response_body
~on_read:(on_read on_eof data)
~on_eof:(on_eof response (Some data))
~on_eof:(on_eof response data)
in
let f_init = Lwt.return f_init in
H2.Body.Reader.schedule_read response_body
~on_read:(on_read on_eof "")
~on_eof:(on_eof response None)
~on_read:(on_read on_eof f_init)
~on_eof:(on_eof response f_init)
in
let error_handler e =
let err = match e with
Expand Down Expand Up @@ -233,7 +244,7 @@ let alpn_protocol = function
None
| Error () -> None

let single_request resolver ?config tls_config ~meth ~headers ?body uri =
let single_request resolver ?config tls_config ~meth ~headers ?body uri f f_init =
Lwt_result.lift (decode_uri uri) >>= fun (tls, scheme, user_pass, host, port, path) ->
(if tls then
Lwt_result.lift (Lazy.force tls_config) >|= function
Expand All @@ -245,25 +256,27 @@ let single_request resolver ?config tls_config ~meth ~headers ?body uri =
else
Lwt_result.return None) >>= fun tls_config ->
connect resolver ?port ?tls_config host >>= fun fd ->
match alpn_protocol fd, config with
begin match alpn_protocol fd, config with
| (Some `HTTP_1_1 | None), Some (`HTTP_1_1 config) ->
Logs.debug (fun m -> m "Start an http/1.1 connection as expected.");
single_http_1_1_request ~config fd user_pass host meth path headers body
single_http_1_1_request ~config fd user_pass host meth path headers body f f_init
| (Some `HTTP_1_1 | None), None ->
Logs.debug (fun m -> m "Start an http/1.1 connection by default.");
single_http_1_1_request fd user_pass host meth path headers body
single_http_1_1_request fd user_pass host meth path headers body f f_init
| (Some `H2 | None), Some (`H2 config) ->
Logs.debug (fun m -> m "Start an h2 connection as expected.");
single_h2_request ~config fd scheme user_pass host meth path headers body
single_h2_request ~config fd scheme user_pass host meth path headers body f f_init
| Some `H2, None ->
Logs.debug (fun m -> m "Start an h2 connection as requested by the server.");
single_h2_request fd scheme user_pass host meth path headers body
single_h2_request fd scheme user_pass host meth path headers body f f_init
| Some `H2, Some (`HTTP_1_1 _config) ->
Logs.warn (fun m -> m "Initiate an h2 connection despite a requested http/1.1 connection.");
single_h2_request fd scheme user_pass host meth path headers body
single_h2_request fd scheme user_pass host meth path headers body f f_init
| Some `HTTP_1_1, Some (`H2 _config) ->
Logs.warn (fun m -> m "Initiate an http/1.1 connection despite a requested h2 connection.");
single_http_1_1_request fd user_pass host meth path headers body
single_http_1_1_request fd user_pass host meth path headers body f f_init
end >>= fun (resp, body) ->
Lwt.map (fun body -> Ok (resp, body)) body

let resolve_location ~uri ~location =
match String.split_on_char '/' location with
Expand All @@ -281,7 +294,7 @@ let resolve_location ~uri ~location =

let default_auth = lazy (Ca_certs.authenticator ())

let one_request
let request
?config
?tls_config
?authenticator
Expand All @@ -292,6 +305,7 @@ let one_request
?(follow_redirect = true)
?(happy_eyeballs = Happy_eyeballs_lwt.create ())
uri
f f_init
=
let tls_config : ([`Custom of Tls.Config.client | `Default of Tls.Config.client ], [> `Msg of string ]) result Lazy.t =
lazy
Expand All @@ -312,13 +326,13 @@ let one_request
auth)
in
if not follow_redirect then
single_request happy_eyeballs ?config tls_config ~meth ~headers ?body uri
single_request happy_eyeballs ?config tls_config ~meth ~headers ?body uri f f_init
else
let rec follow_redirect count uri =
if count = 0 then
Lwt.return (Error (`Msg "redirect limit exceeded"))
else
single_request happy_eyeballs ?config tls_config ~meth ~headers ?body uri
single_request happy_eyeballs ?config tls_config ~meth ~headers ?body uri f f_init
>>= fun (resp, body) ->
if Status.is_redirection resp.status then
(match Headers.get resp.headers "location" with
Expand Down
20 changes: 13 additions & 7 deletions src/http_lwt_client.mli
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,15 @@ type response =
(** [pp_response ppf response] pretty-prints the [response] on [ppf]. *)
val pp_response : Format.formatter -> response -> unit

(** [one_request ~config ~authenticator ~meth ~headers ~body ~max_redirect
~follow_redirect ~happy_eyeballs uri] does a single request of [uri] and
returns the response. By default, up to 5 redirects ([max_redirect]) are
followed. If [follow_redirect] is false, no redirect is followed (defaults
to true). The default HTTP request type ([meth]) is [GET].
(** [request ~config ~authenticator ~meth ~headers ~body ~max_redirect
~follow_redirect ~happy_eyeballs uri f init] does a single request of [uri]
and returns the response. Each time part of the body is received,
[f acc part] is called, with [acc] being the last return value of [f]
(or [init] if it is the first time) and [part] being the body part received.
By default, up to 5 redirects ([max_redirect]) are followed.
If [follow_redirect] is false, no redirect is followed (defaults to true).
The default HTTP request type ([meth]) is [GET].
If no [tls_config] is provided, a default one is used, with alpn and
authenticators. If a [tls_config] is provided, this is used unmodified.
Expand All @@ -53,7 +57,7 @@ val pp_response : Format.formatter -> response -> unit
The [happy-eyeballs] opam package is used to establish the TCP connection,
which prefers IPv6 over IPv4.
*)
val one_request
val request
: ?config : [ `HTTP_1_1 of Httpaf.Config.t | `H2 of H2.Config.t ]
-> ?tls_config:Tls.Config.client
-> ?authenticator:X509.Authenticator.t
Expand All @@ -64,4 +68,6 @@ val one_request
-> ?follow_redirect:bool
-> ?happy_eyeballs:Happy_eyeballs_lwt.t
-> string
-> (response * string option, [> `Msg of string ]) Lwt_result.t
-> ('a -> string -> 'a Lwt.t)
-> 'a
-> (response * 'a, [> `Msg of string ]) Lwt_result.t

0 comments on commit 42c2fbc

Please sign in to comment.