Skip to content

Commit

Permalink
Implement typed-rpc for Eio
Browse files Browse the repository at this point in the history
- Mark `Grpc_eio.Client.call` as deprecated
  • Loading branch information
mbarbin committed Jan 27, 2024
1 parent 9abcb67 commit 10ab1f9
Show file tree
Hide file tree
Showing 9 changed files with 496 additions and 82 deletions.
2 changes: 1 addition & 1 deletion examples/greeter-client-eio/greeter_client_eio.ml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ let main env =
in

let result =
Grpc_eio.Client.call ~service:"mypackage.Greeter" ~rpc:"SayHello"
Grpc_eio.Client.Rpc.call ~service:"mypackage.Greeter" ~rpc:"SayHello"
~do_request:(H2_eio.Client.request connection ~error_handler:ignore)
~handler:(Grpc_eio.Client.Rpc.unary encoded_request ~f)
()
Expand Down
8 changes: 4 additions & 4 deletions examples/routeguide-tutorial.md
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ Calling the simple RPC `get_feature` requires building up a `Client.call` repres
let call_get_feature connection point =
let encode, decode = Service.make_client_functions RouteGuide.getFeature in
let response =
Client.call ~service:"routeguide.RouteGuide" ~rpc:"GetFeature"
Client.Rpc.call ~service:"routeguide.RouteGuide" ~rpc:"GetFeature"
~do_request:(H2_eio.Client.request connection ~error_handler:ignore)
~handler:
(Client.Rpc.unary
Expand Down Expand Up @@ -476,7 +476,7 @@ let print_features connection =
let encode, decode = Service.make_client_functions RouteGuide.listFeatures in
let stream =
Client.call ~service:"routeguide.RouteGuide" ~rpc:"ListFeatures"
Client.Rpc.call ~service:"routeguide.RouteGuide" ~rpc:"ListFeatures"
~do_request:(H2_eio.Client.request connection ~error_handler:ignore)
~handler:
(Client.Rpc.server_streaming
Expand Down Expand Up @@ -528,7 +528,7 @@ let run_record_route connection =
let encode, decode = Service.make_client_functions RouteGuide.recordRoute in
let response =
Client.call ~service:"routeguide.RouteGuide" ~rpc:"RecordRoute"
Client.Rpc.call ~service:"routeguide.RouteGuide" ~rpc:"RecordRoute"
~do_request:(H2_eio.Client.request connection ~error_handler:ignore)
~handler:
(Client.Rpc.client_streaming ~f:(fun f response ->
Expand Down Expand Up @@ -615,7 +615,7 @@ We start by generating a short sequence of locations, similar to how we did for
go writer reader' xs)
in
let result =
Client.call ~service:"routeguide.RouteGuide" ~rpc:"RouteChat"
Client.Rpc.call ~service:"routeguide.RouteGuide" ~rpc:"RouteChat"
~do_request:(H2_eio.Client.request connection ~error_handler:ignore)
~handler:
(Client.Rpc.bidirectional_streaming ~f:(fun writer reader ->
Expand Down
8 changes: 4 additions & 4 deletions examples/routeguide/src/client.ml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ let client ~sw host port network =
let call_get_feature connection point =
let encode, decode = Service.make_client_functions RouteGuide.getFeature in
let response =
Client.call ~service:"routeguide.RouteGuide" ~rpc:"GetFeature"
Client.Rpc.call ~service:"routeguide.RouteGuide" ~rpc:"GetFeature"
~do_request:(H2_eio.Client.request connection ~error_handler:ignore)
~handler:
(Client.Rpc.unary
Expand Down Expand Up @@ -55,7 +55,7 @@ let print_features connection =

let encode, decode = Service.make_client_functions RouteGuide.listFeatures in
let stream =
Client.call ~service:"routeguide.RouteGuide" ~rpc:"ListFeatures"
Client.Rpc.call ~service:"routeguide.RouteGuide" ~rpc:"ListFeatures"
~do_request:(H2_eio.Client.request connection ~error_handler:ignore)
~handler:
(Client.Rpc.server_streaming
Expand Down Expand Up @@ -100,7 +100,7 @@ let run_record_route connection =

let encode, decode = Service.make_client_functions RouteGuide.recordRoute in
let response =
Client.call ~service:"routeguide.RouteGuide" ~rpc:"RecordRoute"
Client.Rpc.call ~service:"routeguide.RouteGuide" ~rpc:"RecordRoute"
~do_request:(H2_eio.Client.request connection ~error_handler:ignore)
~handler:
(Client.Rpc.client_streaming ~f:(fun f response ->
Expand Down Expand Up @@ -178,7 +178,7 @@ let run_route_chat clock connection =
go writer reader' xs)
in
let result =
Client.call ~service:"routeguide.RouteGuide" ~rpc:"RouteChat"
Client.Rpc.call ~service:"routeguide.RouteGuide" ~rpc:"RouteChat"
~do_request:(H2_eio.Client.request connection ~error_handler:ignore)
~handler:
(Client.Rpc.bidirectional_streaming ~f:(fun writer reader ->
Expand Down
109 changes: 96 additions & 13 deletions lib/grpc-eio/client.ml
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ let get_response_and_bodies request =
let read_body = Eio.Promise.await read_body in
(response, read_body, write_body)

let call ~service ~rpc ?(scheme = "https") ~handler ~(do_request : do_request)
?(headers = default_headers) () =
let call_internal ~service ~rpc ?(scheme = "https") ~handler
~(do_request : do_request) ?(headers = default_headers) () =
let request = make_request ~service ~rpc ~scheme ~headers in
let status, trailers_handler = make_trailers_handler () in
let response, read_body, write_body =
Expand All @@ -66,20 +66,98 @@ let call ~service ~rpc ?(scheme = "https") ~handler ~(do_request : do_request)
Ok (result, status)
| error_status -> Error error_status

let make_handler ~encode_request ~decode_response ~f write_body read_body =
let response_reader, response_writer = Seq.create_reader_writer () in
let request_reader, request_writer = Seq.create_reader_writer () in
Connection.grpc_recv_streaming ~decode:decode_response read_body
response_writer;
let res, res_notify = Eio.Promise.create () in
Eio.Fiber.both
(fun () ->
Eio.Promise.resolve res_notify (f request_writer response_reader))
(fun () ->
Connection.grpc_send_streaming_client ~encode:encode_request write_body
request_reader);
Eio.Promise.await res

module Typed_rpc = struct
type ('request, 'request_mode, 'response, 'response_mode, 'a) handler =
('request, 'request_mode, 'response, 'response_mode) Grpc.Rpc.Client_rpc.t ->
H2.Body.Writer.t ->
H2.Body.Reader.t ->
'a

let make_handler (type request response)
~(rpc : (request, _, response, _) Grpc.Rpc.Client_rpc.t) ~f =
make_handler ~encode_request:rpc.encode_request
~decode_response:rpc.decode_response ~f

let bidirectional_streaming (type request response) ~f
(rpc :
( request,
Grpc.Rpc.Value_mode.stream,
response,
Grpc.Rpc.Value_mode.stream )
Grpc.Rpc.Client_rpc.t) =
make_handler ~rpc ~f

let client_streaming (type request response) ~f
(rpc :
( request,
Grpc.Rpc.Value_mode.stream,
response,
Grpc.Rpc.Value_mode.unary )
Grpc.Rpc.Client_rpc.t) =
make_handler ~rpc ~f:(fun request_writer responses ->
let response, response_resolver = Eio.Promise.create () in
Eio.Fiber.pair
(fun () -> f request_writer response)
(fun () ->
Eio.Promise.resolve response_resolver
(Seq.read_and_exhaust responses))
|> fst)

let server_streaming (type request response) ~f (request : request)
(rpc :
( request,
Grpc.Rpc.Value_mode.unary,
response,
Grpc.Rpc.Value_mode.stream )
Grpc.Rpc.Client_rpc.t) =
make_handler ~rpc ~f:(fun request_writer responses ->
Seq.write request_writer request;
Seq.close_writer request_writer;
f responses)

let unary (type request response) ~f (request : request)
(rpc :
( request,
Grpc.Rpc.Value_mode.unary,
response,
Grpc.Rpc.Value_mode.unary )
Grpc.Rpc.Client_rpc.t) =
make_handler ~rpc ~f:(fun request_writer responses ->
Seq.write request_writer request;
Seq.close_writer request_writer;
let response = Seq.read_and_exhaust responses in
f response)

let call (type request request_mode response response_mode a)
(rpc :
(request, request_mode, response, response_mode) Grpc.Rpc.Client_rpc.t)
?scheme
~(handler : (request, request_mode, response, response_mode, a) handler)
~do_request ?headers () =
call_internal
~service:(Grpc.Rpc.Service_spec.packaged_service_name rpc.service_spec)
~rpc:rpc.rpc_name ?scheme ~handler:(handler rpc) ~do_request ?headers ()
end

module Rpc = struct
type 'a handler = H2.Body.Writer.t -> H2.Body.Reader.t -> 'a

let bidirectional_streaming ~f write_body read_body =
let response_reader, response_writer = Seq.create_reader_writer () in
let request_reader, request_writer = Seq.create_reader_writer () in
Connection.grpc_recv_streaming read_body response_writer;
let res, res_notify = Eio.Promise.create () in
Eio.Fiber.both
(fun () ->
Eio.Promise.resolve res_notify (f request_writer response_reader))
(fun () ->
Connection.grpc_send_streaming_client write_body request_reader);
Eio.Promise.await res
let bidirectional_streaming ~f =
make_handler ~encode_request:Fun.id ~decode_response:Fun.id ~f

let client_streaming ~f =
bidirectional_streaming ~f:(fun request_writer responses ->
Expand All @@ -103,4 +181,9 @@ module Rpc = struct
Seq.close_writer request_writer;
let response = Seq.read_and_exhaust responses in
f response)

let call = call_internal
end

(* Deprecated in the mli. *)
let call = call_internal
111 changes: 98 additions & 13 deletions lib/grpc-eio/client.mli
Original file line number Diff line number Diff line change
@@ -1,3 +1,82 @@
type response_handler = H2.Client_connection.response_handler

type do_request =
?flush_headers_immediately:bool ->
?trailers_handler:(H2.Headers.t -> unit) ->
H2.Request.t ->
response_handler:response_handler ->
H2.Body.Writer.t
(** [do_request] is the type of a function that performs the request *)

(** {1 Typed API} *)

module Typed_rpc : sig
(** A typed interface to call RPC from the client side.
Compared to {!module:Rpc}, this interface will:
- handle the coding/decoding of messages for you under the hood;
- use the service and RPC names provided by the rpc specification to
call the services with their expected names. *)

type ('request, 'request_mode, 'response, 'response_mode, 'a) handler

(** The next functions are meant to be used by the client to handle
call to RPCs. *)

val bidirectional_streaming :
f:('request Seq.writer -> 'response Seq.t -> 'a) ->
( 'request,
Grpc.Rpc.Value_mode.stream,
'response,
Grpc.Rpc.Value_mode.stream,
'a )
handler

val client_streaming :
f:('request Seq.writer -> 'response option Eio.Promise.t -> 'a) ->
( 'request,
Grpc.Rpc.Value_mode.stream,
'response,
Grpc.Rpc.Value_mode.unary,
'a )
handler

val server_streaming :
f:('response Seq.t -> 'a) ->
'request ->
( 'request,
Grpc.Rpc.Value_mode.unary,
'response,
Grpc.Rpc.Value_mode.stream,
'a )
handler

val unary :
f:('response option -> 'a) ->
'request ->
( 'request,
Grpc.Rpc.Value_mode.unary,
'response,
Grpc.Rpc.Value_mode.unary,
'a )
handler

val call :
('request, 'request_mode, 'response, 'response_mode) Grpc.Rpc.Client_rpc.t ->
?scheme:string ->
handler:('request, 'request_mode, 'response, 'response_mode, 'a) handler ->
do_request:do_request ->
?headers:H2.Headers.t ->
unit ->
('a * Grpc.Status.t, H2.Status.t) result
(** [call rpc ~handler ~do_request ()] calls the rpc endpoint given
by [service] and [rpc] using the [do_request] function. The [handler] is
called when this request is set up to send and receive data. *)
end

(** {1 Untyped API} *)

module Rpc : sig
type 'a handler = H2.Body.Writer.t -> H2.Body.Reader.t -> 'a

Expand All @@ -22,17 +101,20 @@ module Rpc : sig
(** [unary ~f enc write read] sets up the sending and receiving
logic using [write] and [read], then sends [enc] and calls [f] with a
promise for the response. *)
end

type response_handler = H2.Client_connection.response_handler

type do_request =
?flush_headers_immediately:bool ->
?trailers_handler:(H2.Headers.t -> unit) ->
H2.Request.t ->
response_handler:response_handler ->
H2.Body.Writer.t
(** [do_request] is the type of a function that performs the request *)
val call :
service:string ->
rpc:string ->
?scheme:string ->
handler:'a handler ->
do_request:do_request ->
?headers:H2.Headers.t ->
unit ->
('a * Grpc.Status.t, H2.Status.t) result
(** [call ~service ~rpc ~handler ~do_request ()] calls the rpc endpoint given
by [service] and [rpc] using the [do_request] function. The [handler] is
called when this request is set up to send and receive data. *)
end

val call :
service:string ->
Expand All @@ -43,6 +125,9 @@ val call :
?headers:H2.Headers.t ->
unit ->
('a * Grpc.Status.t, H2.Status.t) result
(** [call ~service ~rpc ~handler ~do_request ()] calls the rpc endpoint given
by [service] and [rpc] using the [do_request] function. The [handler] is
called when this request is set up to send and receive data. *)
[@@ocaml.alert
deprecated "This function was renamed [Grpc_eio.Client.Rpc.call]."]
(** Deprecating this function makes the distinction between the typed and
untyped API more clear. Also, it frees up the name [call] at top level,
which we could use in the future if we decide to "promote" the typed API
to the toplevel scope of this module. *)
14 changes: 8 additions & 6 deletions lib/grpc-eio/connection.ml
Original file line number Diff line number Diff line change
@@ -1,23 +1,25 @@
let grpc_recv_streaming body message_buffer_writer =
let grpc_recv_streaming ~decode body message_buffer_writer =
let request_buffer = Grpc.Buffer.v () in
let on_eof () = Seq.close_writer message_buffer_writer in
let rec on_read buffer ~off ~len =
Grpc.Buffer.copy_from_bigstringaf ~src_off:off ~src:buffer
~dst:request_buffer ~length:len;
Grpc.Message.extract_all (Seq.write message_buffer_writer) request_buffer;
Grpc.Message.extract_all
(fun message -> Seq.write message_buffer_writer (decode message))
request_buffer;
H2.Body.Reader.schedule_read body ~on_read ~on_eof
in
H2.Body.Reader.schedule_read body ~on_read ~on_eof

let grpc_send_streaming_client body encoder_stream =
let grpc_send_streaming_client ~encode body encoder_stream =
Seq.iter
(fun encoder ->
let payload = Grpc.Message.make encoder in
let payload = Grpc.Message.make (encode encoder) in
H2.Body.Writer.write_string body payload)
encoder_stream;
H2.Body.Writer.close body

let grpc_send_streaming request encoder_stream status_promise =
let grpc_send_streaming ~encode request encoder_stream status_promise =
let body =
H2.Reqd.respond_with_streaming ~flush_headers_immediately:true request
(H2.Response.create
Expand All @@ -27,7 +29,7 @@ let grpc_send_streaming request encoder_stream status_promise =
in
Seq.iter
(fun input ->
let payload = Grpc.Message.make input in
let payload = Grpc.Message.make (encode input) in
H2.Body.Writer.write_string body payload;
H2.Body.Writer.flush body (fun () -> ()))
encoder_stream;
Expand Down
12 changes: 12 additions & 0 deletions lib/grpc-eio/connection.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
val grpc_recv_streaming :
decode:(string -> 'a) -> H2.Body.Reader.t -> 'a Seq.writer -> unit

val grpc_send_streaming_client :
encode:('a -> string) -> H2.Body.Writer.t -> 'a Seq.reader -> unit

val grpc_send_streaming :
encode:('a -> string) ->
H2.Reqd.t ->
'a Seq.reader ->
Grpc.Status.t Eio.Promise.t ->
unit
Loading

0 comments on commit 10ab1f9

Please sign in to comment.