Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Typed rpc eio #55

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions dune-project
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@
(>= 0.9.1))
(h2
(>= 0.9.0))
(odoc
(and
(>= 2.4.0)
:with-doc))
ppx_deriving
(uri
(>= 4.0.0))))
Expand Down Expand Up @@ -92,6 +96,32 @@
(>= 0.9.0))
stringext))

(package
(name grpc-protoc-plugin)
(synopsis "An implementation of gRPC using ocaml-protoc-plugin")
(description
"Functionality for building gRPC services and rpcs with `ocaml-protoc-plugin`")
(depends
(grpc
(= :version))
(ocaml-protoc-plugin
(>= 4.5))))

(package
(name grpc-protoc)
(synopsis "An implementation of gRPC using ocaml-protoc")
(description
"Functionality for building gRPC services and rpcs with `ocaml-protoc`")
(depends
(grpc
(= :version))
(ocaml-protoc
(>= 3.0))
(pbrt
(>= 3.0))
(pbrt_services
(>= 3.0))))

(package
(name grpc-examples)
(synopsis "Various gRPC examples")
Expand Down
2 changes: 1 addition & 1 deletion examples/greeter-client-eio/greeter_client_eio.ml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ let main env =
in

let result =
Grpc_eio.Client.call ~service:"mypackage.Greeter" ~rpc:"SayHello"
Grpc_eio.Client.Rpc.call ~service:"mypackage.Greeter" ~rpc:"SayHello"
~do_request:(H2_eio.Client.request connection ~error_handler:ignore)
~handler:(Grpc_eio.Client.Rpc.unary encoded_request ~f)
()
Expand Down
8 changes: 4 additions & 4 deletions examples/routeguide-tutorial.md
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ Calling the simple RPC `get_feature` requires building up a `Client.call` repres
let call_get_feature connection point =
let encode, decode = Service.make_client_functions RouteGuide.getFeature in
let response =
Client.call ~service:"routeguide.RouteGuide" ~rpc:"GetFeature"
Client.Rpc.call ~service:"routeguide.RouteGuide" ~rpc:"GetFeature"
~do_request:(H2_eio.Client.request connection ~error_handler:ignore)
~handler:
(Client.Rpc.unary
Expand Down Expand Up @@ -476,7 +476,7 @@ let print_features connection =

let encode, decode = Service.make_client_functions RouteGuide.listFeatures in
let stream =
Client.call ~service:"routeguide.RouteGuide" ~rpc:"ListFeatures"
Client.Rpc.call ~service:"routeguide.RouteGuide" ~rpc:"ListFeatures"
~do_request:(H2_eio.Client.request connection ~error_handler:ignore)
~handler:
(Client.Rpc.server_streaming
Expand Down Expand Up @@ -528,7 +528,7 @@ let run_record_route connection =

let encode, decode = Service.make_client_functions RouteGuide.recordRoute in
let response =
Client.call ~service:"routeguide.RouteGuide" ~rpc:"RecordRoute"
Client.Rpc.call ~service:"routeguide.RouteGuide" ~rpc:"RecordRoute"
~do_request:(H2_eio.Client.request connection ~error_handler:ignore)
~handler:
(Client.Rpc.client_streaming ~f:(fun f response ->
Expand Down Expand Up @@ -615,7 +615,7 @@ We start by generating a short sequence of locations, similar to how we did for
go writer reader' xs)
in
let result =
Client.call ~service:"routeguide.RouteGuide" ~rpc:"RouteChat"
Client.Rpc.call ~service:"routeguide.RouteGuide" ~rpc:"RouteChat"
~do_request:(H2_eio.Client.request connection ~error_handler:ignore)
~handler:
(Client.Rpc.bidirectional_streaming ~f:(fun writer reader ->
Expand Down
8 changes: 4 additions & 4 deletions examples/routeguide/src/client.ml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ let client ~sw host port network =
let call_get_feature connection point =
let encode, decode = Service.make_client_functions RouteGuide.getFeature in
let response =
Client.call ~service:"routeguide.RouteGuide" ~rpc:"GetFeature"
Client.Rpc.call ~service:"routeguide.RouteGuide" ~rpc:"GetFeature"
~do_request:(H2_eio.Client.request connection ~error_handler:ignore)
~handler:
(Client.Rpc.unary
Expand Down Expand Up @@ -55,7 +55,7 @@ let print_features connection =

let encode, decode = Service.make_client_functions RouteGuide.listFeatures in
let stream =
Client.call ~service:"routeguide.RouteGuide" ~rpc:"ListFeatures"
Client.Rpc.call ~service:"routeguide.RouteGuide" ~rpc:"ListFeatures"
~do_request:(H2_eio.Client.request connection ~error_handler:ignore)
~handler:
(Client.Rpc.server_streaming
Expand Down Expand Up @@ -100,7 +100,7 @@ let run_record_route connection =

let encode, decode = Service.make_client_functions RouteGuide.recordRoute in
let response =
Client.call ~service:"routeguide.RouteGuide" ~rpc:"RecordRoute"
Client.Rpc.call ~service:"routeguide.RouteGuide" ~rpc:"RecordRoute"
~do_request:(H2_eio.Client.request connection ~error_handler:ignore)
~handler:
(Client.Rpc.client_streaming ~f:(fun f response ->
Expand Down Expand Up @@ -178,7 +178,7 @@ let run_route_chat clock connection =
go writer reader' xs)
in
let result =
Client.call ~service:"routeguide.RouteGuide" ~rpc:"RouteChat"
Client.Rpc.call ~service:"routeguide.RouteGuide" ~rpc:"RouteChat"
~do_request:(H2_eio.Client.request connection ~error_handler:ignore)
~handler:
(Client.Rpc.bidirectional_streaming ~f:(fun writer reader ->
Expand Down
40 changes: 40 additions & 0 deletions grpc-protoc-plugin.opam
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# This file is generated by dune, edit dune-project instead
opam-version: "2.0"
synopsis: "An implementation of gRPC using ocaml-protoc-plugin"
description:
"Functionality for building gRPC services and rpcs with `ocaml-protoc-plugin`"
maintainer: ["Daniel Quernheim <[email protected]>"]
authors: [
"Andrew Jeffery <[email protected]>"
"Daniel Quernheim <[email protected]>"
"Michael Bacarella <[email protected]>"
"Sven Anderson <[email protected]>"
"Tim McGilchrist <[email protected]>"
"Wojtek Czekalski <[email protected]>"
"dimitris.mostrous <[email protected]>"
]
license: "BSD-3-Clause"
homepage: "https://github.com/dialohq/ocaml-grpc"
doc: "https://dialohq.github.io/ocaml-grpc"
bug-reports: "https://github.com/dialohq/ocaml-grpc/issues"
depends: [
"dune" {>= "3.7"}
"grpc" {= version}
"ocaml-protoc-plugin" {>= "4.5"}
"odoc" {with-doc}
]
build: [
["dune" "subst"] {dev}
[
"dune"
"build"
"-p"
name
"-j"
jobs
"@install"
"@runtest" {with-test}
"@doc" {with-doc}
]
]
dev-repo: "git+https://github.com/dialohq/ocaml-grpc.git"
42 changes: 42 additions & 0 deletions grpc-protoc.opam
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# This file is generated by dune, edit dune-project instead
opam-version: "2.0"
synopsis: "An implementation of gRPC using ocaml-protoc"
description:
"Functionality for building gRPC services and rpcs with `ocaml-protoc`"
maintainer: ["Daniel Quernheim <[email protected]>"]
authors: [
"Andrew Jeffery <[email protected]>"
"Daniel Quernheim <[email protected]>"
"Michael Bacarella <[email protected]>"
"Sven Anderson <[email protected]>"
"Tim McGilchrist <[email protected]>"
"Wojtek Czekalski <[email protected]>"
"dimitris.mostrous <[email protected]>"
]
license: "BSD-3-Clause"
homepage: "https://github.com/dialohq/ocaml-grpc"
doc: "https://dialohq.github.io/ocaml-grpc"
bug-reports: "https://github.com/dialohq/ocaml-grpc/issues"
depends: [
"dune" {>= "3.7"}
"grpc" {= version}
"ocaml-protoc" {>= "3.0"}
"pbrt" {>= "3.0"}
"pbrt_services" {>= "3.0"}
"odoc" {with-doc}
]
build: [
["dune" "subst"] {dev}
[
"dune"
"build"
"-p"
name
"-j"
jobs
"@install"
"@runtest" {with-test}
"@doc" {with-doc}
]
]
dev-repo: "git+https://github.com/dialohq/ocaml-grpc.git"
2 changes: 1 addition & 1 deletion grpc.opam
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ depends: [
"ocaml" {>= "4.08"}
"bigstringaf" {>= "0.9.1"}
"h2" {>= "0.9.0"}
"odoc" {>= "2.4.0" & with-doc}
"ppx_deriving"
"uri" {>= "4.0.0"}
"odoc" {with-doc}
]
build: [
["dune" "subst"] {dev}
Expand Down
109 changes: 96 additions & 13 deletions lib/grpc-eio/client.ml
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ let get_response_and_bodies request =
let read_body = Eio.Promise.await read_body in
(response, read_body, write_body)

let call ~service ~rpc ?(scheme = "https") ~handler ~(do_request : do_request)
?(headers = default_headers) () =
let call_internal ~service ~rpc ?(scheme = "https") ~handler
~(do_request : do_request) ?(headers = default_headers) () =
let request = make_request ~service ~rpc ~scheme ~headers in
let status, trailers_handler = make_trailers_handler () in
let response, read_body, write_body =
Expand All @@ -66,20 +66,98 @@ let call ~service ~rpc ?(scheme = "https") ~handler ~(do_request : do_request)
Ok (result, status)
| error_status -> Error error_status

let make_handler ~encode_request ~decode_response ~f write_body read_body =
let response_reader, response_writer = Seq.create_reader_writer () in
let request_reader, request_writer = Seq.create_reader_writer () in
Connection.grpc_recv_streaming ~decode:decode_response read_body
response_writer;
let res, res_notify = Eio.Promise.create () in
Eio.Fiber.both
(fun () ->
Eio.Promise.resolve res_notify (f request_writer response_reader))
(fun () ->
Connection.grpc_send_streaming_client ~encode:encode_request write_body
request_reader);
Eio.Promise.await res

module Typed_rpc = struct
type ('request, 'request_mode, 'response, 'response_mode, 'a) handler =
('request, 'request_mode, 'response, 'response_mode) Grpc.Rpc.Client_rpc.t ->
H2.Body.Writer.t ->
H2.Body.Reader.t ->
'a

let make_handler (type request response)
~(rpc : (request, _, response, _) Grpc.Rpc.Client_rpc.t) ~f =
make_handler ~encode_request:rpc.encode_request
~decode_response:rpc.decode_response ~f

let bidirectional_streaming (type request response) ~f
(rpc :
( request,
Grpc.Rpc.Value_mode.stream,
response,
Grpc.Rpc.Value_mode.stream )
Grpc.Rpc.Client_rpc.t) =
make_handler ~rpc ~f

let client_streaming (type request response) ~f
(rpc :
( request,
Grpc.Rpc.Value_mode.stream,
response,
Grpc.Rpc.Value_mode.unary )
Grpc.Rpc.Client_rpc.t) =
make_handler ~rpc ~f:(fun request_writer responses ->
let response, response_resolver = Eio.Promise.create () in
Eio.Fiber.pair
(fun () -> f request_writer response)
(fun () ->
Eio.Promise.resolve response_resolver
(Seq.read_and_exhaust responses))
|> fst)

let server_streaming (type request response) ~f (request : request)
(rpc :
( request,
Grpc.Rpc.Value_mode.unary,
response,
Grpc.Rpc.Value_mode.stream )
Grpc.Rpc.Client_rpc.t) =
make_handler ~rpc ~f:(fun request_writer responses ->
Seq.write request_writer request;
Seq.close_writer request_writer;
f responses)

let unary (type request response) ~f (request : request)
(rpc :
( request,
Grpc.Rpc.Value_mode.unary,
response,
Grpc.Rpc.Value_mode.unary )
Grpc.Rpc.Client_rpc.t) =
make_handler ~rpc ~f:(fun request_writer responses ->
Seq.write request_writer request;
Seq.close_writer request_writer;
let response = Seq.read_and_exhaust responses in
f response)

let call (type request request_mode response response_mode a)
(rpc :
(request, request_mode, response, response_mode) Grpc.Rpc.Client_rpc.t)
?scheme
~(handler : (request, request_mode, response, response_mode, a) handler)
~do_request ?headers () =
call_internal
~service:(Grpc.Rpc.Service_spec.packaged_service_name rpc.service_spec)
~rpc:rpc.rpc_name ?scheme ~handler:(handler rpc) ~do_request ?headers ()
end

module Rpc = struct
type 'a handler = H2.Body.Writer.t -> H2.Body.Reader.t -> 'a

let bidirectional_streaming ~f write_body read_body =
let response_reader, response_writer = Seq.create_reader_writer () in
let request_reader, request_writer = Seq.create_reader_writer () in
Connection.grpc_recv_streaming read_body response_writer;
let res, res_notify = Eio.Promise.create () in
Eio.Fiber.both
(fun () ->
Eio.Promise.resolve res_notify (f request_writer response_reader))
(fun () ->
Connection.grpc_send_streaming_client write_body request_reader);
Eio.Promise.await res
let bidirectional_streaming ~f =
make_handler ~encode_request:Fun.id ~decode_response:Fun.id ~f

let client_streaming ~f =
bidirectional_streaming ~f:(fun request_writer responses ->
Expand All @@ -103,4 +181,9 @@ module Rpc = struct
Seq.close_writer request_writer;
let response = Seq.read_and_exhaust responses in
f response)

let call = call_internal
end

(* Deprecated in the mli. *)
let call = call_internal
Loading
Loading