diff --git a/examples/greeter-client-eio/greeter_client_eio.ml b/examples/greeter-client-eio/greeter_client_eio.ml index 23be377..bd19265 100644 --- a/examples/greeter-client-eio/greeter_client_eio.ml +++ b/examples/greeter-client-eio/greeter_client_eio.ml @@ -30,7 +30,7 @@ let main env = let result = Grpc_eio.Client.Typed_rpc.call - (Grpc_protoc_plugin.client_rpc (module Greeter.SayHello)) + (Grpc_protoc_plugin.Client_rpc.unary (module Greeter.SayHello)) ~do_request:(H2_eio.Client.request connection ~error_handler:ignore) ~handler:(Grpc_eio.Client.Typed_rpc.unary request ~f) () diff --git a/examples/greeter-protoc-client-eio/greeter_client_eio.ml b/examples/greeter-protoc-client-eio/greeter_client_eio.ml index ea1d57f..5fc1521 100644 --- a/examples/greeter-protoc-client-eio/greeter_client_eio.ml +++ b/examples/greeter-protoc-client-eio/greeter_client_eio.ml @@ -29,7 +29,8 @@ let main env = let result = Grpc_eio.Client.Typed_rpc.call - (Grpc_protoc.client_rpc Greeter_protoc.Greeter.Greeter.Client.sayHello) + (Grpc_protoc.Client_rpc.unary + Greeter_protoc.Greeter.Greeter.Client.sayHello) ~do_request:(H2_eio.Client.request connection ~error_handler:ignore) ~handler:(Grpc_eio.Client.Typed_rpc.unary request ~f) () diff --git a/examples/greeter-protoc-server-eio/greeter_server_eio.ml b/examples/greeter-protoc-server-eio/greeter_server_eio.ml index ba2a672..ac11d5d 100644 --- a/examples/greeter-protoc-server-eio/greeter_server_eio.ml +++ b/examples/greeter-protoc-server-eio/greeter_server_eio.ml @@ -1,7 +1,7 @@ open Grpc_eio let sayHello rpc = - Grpc_eio.Server.Typed_rpc.unary (Grpc_protoc.server_rpc rpc) + Grpc_eio.Server.Typed_rpc.unary (Grpc_protoc.Server_rpc.unary rpc) ~f:(fun (request : Greeter_protoc.Greeter.hello_request) -> let message = if request.name = "" then "You forgot your name!" @@ -52,11 +52,8 @@ let serve server env = let () = let server = - let { Pbrt_services.Server.package; service_name; handlers } = - Greeter_protoc.Greeter.Greeter.Server.make ~sayHello () - in - Server.Typed_rpc.server - (With_service_spec { package; service_name; handlers }) + Greeter_protoc.Greeter.Greeter.Server.make ~sayHello () + |> Grpc_protoc.handlers |> Server.Typed_rpc.server in Eio_main.run (serve server) diff --git a/examples/greeter-server-eio/greeter_server_eio.ml b/examples/greeter-server-eio/greeter_server_eio.ml index a4e5df3..bd2a636 100644 --- a/examples/greeter-server-eio/greeter_server_eio.ml +++ b/examples/greeter-server-eio/greeter_server_eio.ml @@ -3,7 +3,7 @@ open Grpc_eio let say_hello = let module SayHello = Greeter.Mypackage.Greeter.SayHello in Grpc_eio.Server.Typed_rpc.unary - (Grpc_protoc_plugin.server_rpc (module SayHello)) + (Grpc_protoc_plugin.Server_rpc.unary (module SayHello)) ~f:(fun request -> let message = if request = "" then "You forgot your name!" @@ -53,6 +53,8 @@ let serve server env = listen () let () = - let server = Server.Typed_rpc.server (Handlers [ say_hello ]) in + let server = + Server.Typed_rpc.server (Grpc_protoc_plugin.handlers [ say_hello ]) + in Eio_main.run (serve server) diff --git a/examples/routeguide-protoc/src/client.ml b/examples/routeguide-protoc/src/client.ml index 0d7681b..860da38 100644 --- a/examples/routeguide-protoc/src/client.ml +++ b/examples/routeguide-protoc/src/client.ml @@ -21,7 +21,7 @@ let client ~sw host port network = let call_get_feature connection point = let response = Client.Typed_rpc.call - (Grpc_protoc.client_rpc Route_guide.RouteGuide.Client.getFeature) + (Grpc_protoc.Client_rpc.unary Route_guide.RouteGuide.Client.getFeature) ~do_request:(H2_eio.Client.request connection ~error_handler:ignore) ~handler: (Client.Typed_rpc.unary point ~f:(function @@ -52,7 +52,8 @@ let print_features connection = let stream = Client.Typed_rpc.call - (Grpc_protoc.client_rpc Route_guide.RouteGuide.Client.listFeatures) + (Grpc_protoc.Client_rpc.server_streaming + Route_guide.RouteGuide.Client.listFeatures) ~do_request:(H2_eio.Client.request connection ~error_handler:ignore) ~handler:(Client.Typed_rpc.server_streaming rectangle ~f:Fun.id) () @@ -82,7 +83,8 @@ let run_record_route connection = let response = Client.Typed_rpc.call - (Grpc_protoc.client_rpc Route_guide.RouteGuide.Client.recordRoute) + (Grpc_protoc.Client_rpc.client_streaming + Route_guide.RouteGuide.Client.recordRoute) ~do_request:(H2_eio.Client.request connection ~error_handler:ignore) ~handler: (Client.Typed_rpc.client_streaming ~f:(fun f response -> @@ -144,7 +146,8 @@ let run_route_chat clock connection = in let result = Client.Typed_rpc.call - (Grpc_protoc.client_rpc Route_guide.RouteGuide.Client.routeChat) + (Grpc_protoc.Client_rpc.bidirectional_streaming + Route_guide.RouteGuide.Client.routeChat) ~do_request:(H2_eio.Client.request connection ~error_handler:ignore) ~handler: (Client.Typed_rpc.bidirectional_streaming ~f:(fun writer reader -> diff --git a/examples/routeguide-protoc/src/server.ml b/examples/routeguide-protoc/src/server.ml index 4cb1e50..3c18fb3 100644 --- a/examples/routeguide-protoc/src/server.ml +++ b/examples/routeguide-protoc/src/server.ml @@ -77,7 +77,8 @@ let calc_distance (p1 : Route_guide.point) (p2 : Route_guide.point) : int = (* $MDX part-begin=server-get-feature *) let get_feature (t : t) rpc = - Grpc_eio.Server.Typed_rpc.unary (Grpc_protoc.server_rpc rpc) ~f:(fun point -> + Grpc_eio.Server.Typed_rpc.unary (Grpc_protoc.Server_rpc.unary rpc) + ~f:(fun point -> Eio.traceln "GetFeature = {:%a}" Route_guide.pp_point point; (* Lookup the feature and if found return it. *) @@ -103,8 +104,8 @@ let get_feature (t : t) rpc = (* $MDX part-end *) (* $MDX part-begin=server-list-features *) let list_features (t : t) rpc = - Grpc_eio.Server.Typed_rpc.server_streaming (Grpc_protoc.server_rpc rpc) - ~f:(fun rectangle f -> + Grpc_eio.Server.Typed_rpc.server_streaming + (Grpc_protoc.Server_rpc.server_streaming rpc) ~f:(fun rectangle f -> (* Lookup and reply with features found. *) let () = List.iter @@ -118,7 +119,8 @@ let list_features (t : t) rpc = (* $MDX part-end *) (* $MDX part-begin=server-record-route *) let record_route (t : t) (clock : _ Eio.Time.clock) rpc = - Grpc_eio.Server.Typed_rpc.client_streaming (Grpc_protoc.server_rpc rpc) + Grpc_eio.Server.Typed_rpc.client_streaming + (Grpc_protoc.Server_rpc.client_streaming rpc) ~f:(fun (stream : Route_guide.point Seq.t) -> Eio.traceln "RecordRoute"; @@ -168,7 +170,8 @@ let record_route (t : t) (clock : _ Eio.Time.clock) rpc = (* $MDX part-end *) (* $MDX part-begin=server-route-chat *) let route_chat (_ : t) rpc = - Grpc_eio.Server.Typed_rpc.bidirectional_streaming (Grpc_protoc.server_rpc rpc) + Grpc_eio.Server.Typed_rpc.bidirectional_streaming + (Grpc_protoc.Server_rpc.bidirectional_streaming rpc) ~f:(fun (stream : Route_guide.route_note Seq.t) (f : Route_guide.route_note -> unit) @@ -187,13 +190,10 @@ let route_chat (_ : t) rpc = (* $MDX part-end *) (* $MDX part-begin=server-grpc *) let server t clock = - let { Pbrt_services.Server.package; service_name; handlers } = - Route_guide.RouteGuide.Server.make ~getFeature:(get_feature t) - ~listFeatures:(list_features t) ~recordRoute:(record_route t clock) - ~routeChat:(route_chat t) () - in - Server.Typed_rpc.server - (With_service_spec { package; service_name; handlers }) + Route_guide.RouteGuide.Server.make ~getFeature:(get_feature t) + ~listFeatures:(list_features t) ~recordRoute:(record_route t clock) + ~routeChat:(route_chat t) () + |> Grpc_protoc.handlers |> Server.Typed_rpc.server (* $MDX part-end *) let connection_handler server ~sw = diff --git a/examples/routeguide-tutorial.md b/examples/routeguide-tutorial.md index 10261f7..b99e3b3 100644 --- a/examples/routeguide-tutorial.md +++ b/examples/routeguide-tutorial.md @@ -194,7 +194,7 @@ The individual service functions from our proto definition are implemented using ```ocaml let server t clock = Server.Typed_rpc.server - (Handlers + (Grpc_protoc_plugin.handlers [ get_feature t; list_features t; record_route t clock; route_chat t ]) ``` @@ -206,7 +206,7 @@ Let's look at the simplest type first, `GetFeature` which just gets a `Point` fr ```ocaml let get_feature (t : t) = Grpc_eio.Server.Typed_rpc.unary - (Grpc_protoc_plugin.server_rpc (module RouteGuide.GetFeature)) + (Grpc_protoc_plugin.Server_rpc.unary (module RouteGuide.GetFeature)) ~f:(fun point -> Eio.traceln "GetFeature = {:%s}" (Point.show point); @@ -238,7 +238,8 @@ Now let's look at one of our streaming RPCs. `list_features` is a server-side st ```ocaml let list_features (t : t) = Grpc_eio.Server.Typed_rpc.server_streaming - (Grpc_protoc_plugin.server_rpc (module RouteGuide.ListFeatures)) + (Grpc_protoc_plugin.Server_rpc.server_streaming + (module RouteGuide.ListFeatures)) ~f:(fun rectangle f -> (* Lookup and reply with features found. *) let () = @@ -261,7 +262,8 @@ Now let's look at something a little more complicated: the client-side streaming ```ocaml let record_route (t : t) (clock : _ Eio.Time.clock) = Grpc_eio.Server.Typed_rpc.client_streaming - (Grpc_protoc_plugin.server_rpc (module RouteGuide.RecordRoute)) + (Grpc_protoc_plugin.Server_rpc.client_streaming + (module RouteGuide.RecordRoute)) ~f:(fun (stream : Point.t Seq.t) -> Eio.traceln "RecordRoute"; @@ -312,7 +314,8 @@ Finally, let's look at our bidirectional streaming RPC `route_chat`, which recei ```ocaml let route_chat (_ : t) = Grpc_eio.Server.Typed_rpc.bidirectional_streaming - (Grpc_protoc_plugin.server_rpc (module RouteGuide.RouteChat)) + (Grpc_protoc_plugin.Server_rpc.bidirectional_streaming + (module RouteGuide.RouteChat)) ~f:(fun (stream : RouteNote.t Seq.t) (f : RouteNote.t -> unit) -> Printf.printf "RouteChat\n"; @@ -401,7 +404,7 @@ Calling the simple RPC `get_feature` requires building up a `Client.call` repres let call_get_feature connection point = let response = Client.Typed_rpc.call - (Grpc_protoc_plugin.client_rpc (module RouteGuide.GetFeature)) + (Grpc_protoc_plugin.Client_rpc.unary (module RouteGuide.GetFeature)) ~do_request:(H2_eio.Client.request connection ~error_handler:ignore) ~handler: (Client.Typed_rpc.unary point ~f:(function @@ -429,7 +432,8 @@ let print_features connection = let stream = Client.Typed_rpc.call - (Grpc_protoc_plugin.client_rpc (module RouteGuide.ListFeatures)) + (Grpc_protoc_plugin.Client_rpc.server_streaming + (module RouteGuide.ListFeatures)) ~do_request:(H2_eio.Client.request connection ~error_handler:ignore) ~handler:(Client.Typed_rpc.server_streaming rectangle ~f:Fun.id) () @@ -466,7 +470,8 @@ let run_record_route connection = let response = Client.Typed_rpc.call - (Grpc_protoc_plugin.client_rpc (module RouteGuide.RecordRoute)) + (Grpc_protoc_plugin.Client_rpc.client_streaming + (module RouteGuide.RecordRoute)) ~do_request:(H2_eio.Client.request connection ~error_handler:ignore) ~handler: (Client.Typed_rpc.client_streaming ~f:(fun f response -> @@ -535,7 +540,8 @@ We start by generating a short sequence of locations, similar to how we did for in let result = Client.Typed_rpc.call - (Grpc_protoc_plugin.client_rpc (module RouteGuide.RouteChat)) + (Grpc_protoc_plugin.Client_rpc.bidirectional_streaming + (module RouteGuide.RouteChat)) ~do_request:(H2_eio.Client.request connection ~error_handler:ignore) ~handler: (Client.Typed_rpc.bidirectional_streaming ~f:(fun writer reader -> diff --git a/examples/routeguide/src/client.ml b/examples/routeguide/src/client.ml index c55a304..94c8167 100644 --- a/examples/routeguide/src/client.ml +++ b/examples/routeguide/src/client.ml @@ -21,7 +21,7 @@ let client ~sw host port network = let call_get_feature connection point = let response = Client.Typed_rpc.call - (Grpc_protoc_plugin.client_rpc (module RouteGuide.GetFeature)) + (Grpc_protoc_plugin.Client_rpc.unary (module RouteGuide.GetFeature)) ~do_request:(H2_eio.Client.request connection ~error_handler:ignore) ~handler: (Client.Typed_rpc.unary point ~f:(function @@ -45,7 +45,8 @@ let print_features connection = let stream = Client.Typed_rpc.call - (Grpc_protoc_plugin.client_rpc (module RouteGuide.ListFeatures)) + (Grpc_protoc_plugin.Client_rpc.server_streaming + (module RouteGuide.ListFeatures)) ~do_request:(H2_eio.Client.request connection ~error_handler:ignore) ~handler:(Client.Typed_rpc.server_streaming rectangle ~f:Fun.id) () @@ -75,7 +76,8 @@ let run_record_route connection = let response = Client.Typed_rpc.call - (Grpc_protoc_plugin.client_rpc (module RouteGuide.RecordRoute)) + (Grpc_protoc_plugin.Client_rpc.client_streaming + (module RouteGuide.RecordRoute)) ~do_request:(H2_eio.Client.request connection ~error_handler:ignore) ~handler: (Client.Typed_rpc.client_streaming ~f:(fun f response -> @@ -135,7 +137,8 @@ let run_route_chat clock connection = in let result = Client.Typed_rpc.call - (Grpc_protoc_plugin.client_rpc (module RouteGuide.RouteChat)) + (Grpc_protoc_plugin.Client_rpc.bidirectional_streaming + (module RouteGuide.RouteChat)) ~do_request:(H2_eio.Client.request connection ~error_handler:ignore) ~handler: (Client.Typed_rpc.bidirectional_streaming ~f:(fun writer reader -> diff --git a/examples/routeguide/src/server.ml b/examples/routeguide/src/server.ml index 91f88a6..557e323 100644 --- a/examples/routeguide/src/server.ml +++ b/examples/routeguide/src/server.ml @@ -75,7 +75,7 @@ let calc_distance (p1 : Point.t) (p2 : Point.t) : int = (* $MDX part-begin=server-get-feature *) let get_feature (t : t) = Grpc_eio.Server.Typed_rpc.unary - (Grpc_protoc_plugin.server_rpc (module RouteGuide.GetFeature)) + (Grpc_protoc_plugin.Server_rpc.unary (module RouteGuide.GetFeature)) ~f:(fun point -> Eio.traceln "GetFeature = {:%s}" (Point.show point); @@ -100,7 +100,8 @@ let get_feature (t : t) = (* $MDX part-begin=server-list-features *) let list_features (t : t) = Grpc_eio.Server.Typed_rpc.server_streaming - (Grpc_protoc_plugin.server_rpc (module RouteGuide.ListFeatures)) + (Grpc_protoc_plugin.Server_rpc.server_streaming + (module RouteGuide.ListFeatures)) ~f:(fun rectangle f -> (* Lookup and reply with features found. *) let () = @@ -116,7 +117,8 @@ let list_features (t : t) = (* $MDX part-begin=server-record-route *) let record_route (t : t) (clock : _ Eio.Time.clock) = Grpc_eio.Server.Typed_rpc.client_streaming - (Grpc_protoc_plugin.server_rpc (module RouteGuide.RecordRoute)) + (Grpc_protoc_plugin.Server_rpc.client_streaming + (module RouteGuide.RecordRoute)) ~f:(fun (stream : Point.t Seq.t) -> Eio.traceln "RecordRoute"; @@ -162,7 +164,8 @@ let record_route (t : t) (clock : _ Eio.Time.clock) = (* $MDX part-begin=server-route-chat *) let route_chat (_ : t) = Grpc_eio.Server.Typed_rpc.bidirectional_streaming - (Grpc_protoc_plugin.server_rpc (module RouteGuide.RouteChat)) + (Grpc_protoc_plugin.Server_rpc.bidirectional_streaming + (module RouteGuide.RouteChat)) ~f:(fun (stream : RouteNote.t Seq.t) (f : RouteNote.t -> unit) -> Printf.printf "RouteChat\n"; @@ -179,7 +182,7 @@ let route_chat (_ : t) = (* $MDX part-begin=server-grpc *) let server t clock = Server.Typed_rpc.server - (Handlers + (Grpc_protoc_plugin.handlers [ get_feature t; list_features t; record_route t clock; route_chat t ]) (* $MDX part-end *) diff --git a/lib/grpc-eio/client.ml b/lib/grpc-eio/client.ml index a8480c5..4210579 100644 --- a/lib/grpc-eio/client.ml +++ b/lib/grpc-eio/client.ml @@ -106,14 +106,19 @@ module Rpc = struct end module Typed_rpc = struct - type ('request, 'response, 'a) handler = - ('request, 'response) Grpc.Rpc.Client_rpc.t -> + type ('request, 'request_mode, 'response, 'response_mode, 'a) handler = + ('request, 'request_mode, 'response, 'response_mode) Grpc.Rpc.Client_rpc.t -> H2.Body.Writer.t -> H2.Body.Reader.t -> 'a let unary (type request response) ~f (request : request) - (rpc : (request, response) Grpc.Rpc.Client_rpc.t) = + (rpc : + ( request, + Grpc.Rpc.Value_mode.unary, + response, + Grpc.Rpc.Value_mode.unary ) + Grpc.Rpc.Client_rpc.t) = let request = rpc.encode_request request in let f response = let response = response |> Option.map rpc.decode_response in @@ -122,7 +127,12 @@ module Typed_rpc = struct Rpc.unary ~f request let server_streaming (type request response) ~f (request : request) - (rpc : (request, response) Grpc.Rpc.Client_rpc.t) = + (rpc : + ( request, + Grpc.Rpc.Value_mode.unary, + response, + Grpc.Rpc.Value_mode.stream ) + Grpc.Rpc.Client_rpc.t) = let request = rpc.encode_request request in let f responses = let responses = Seq.map rpc.decode_response responses in @@ -131,7 +141,12 @@ module Typed_rpc = struct Rpc.server_streaming ~f request let client_streaming (type request response) ~f - (rpc : (request, response) Grpc.Rpc.Client_rpc.t) = + (rpc : + ( request, + Grpc.Rpc.Value_mode.stream, + response, + Grpc.Rpc.Value_mode.unary ) + Grpc.Rpc.Client_rpc.t) = let f requests response = let requests_reader, requests' = Seq.create_reader_writer () in let response', response_u = Eio.Promise.create () in @@ -153,7 +168,12 @@ module Typed_rpc = struct Rpc.client_streaming ~f let bidirectional_streaming (type request response) ~f - (rpc : (request, response) Grpc.Rpc.Client_rpc.t) = + (rpc : + ( request, + Grpc.Rpc.Value_mode.stream, + response, + Grpc.Rpc.Value_mode.stream ) + Grpc.Rpc.Client_rpc.t) = let f requests responses = let requests_reader, requests' = Seq.create_reader_writer () in let responses' = Seq.map rpc.decode_response responses in @@ -167,9 +187,12 @@ module Typed_rpc = struct in Rpc.bidirectional_streaming ~f - let call (type request response a) - (rpc : (request, response) Grpc.Rpc.Client_rpc.t) ?scheme - ~(handler : (request, response, a) handler) ~do_request ?headers () = + let call (type request request_mode response response_mode a) + (rpc : + (request, request_mode, response, response_mode) Grpc.Rpc.Client_rpc.t) + ?scheme + ~(handler : (request, request_mode, response, response_mode, a) handler) + ~do_request ?headers () = call ~service:(Grpc.Rpc.Client_rpc.packaged_service_name rpc) ~rpc:rpc.rpc_name ?scheme ~handler:(handler rpc) ~do_request ?headers () diff --git a/lib/grpc-eio/client.mli b/lib/grpc-eio/client.mli index 023f866..45396f1 100644 --- a/lib/grpc-eio/client.mli +++ b/lib/grpc-eio/client.mli @@ -55,29 +55,53 @@ module Typed_rpc : sig - use the service and RPC names provided by the rpc specification to call the services with their expected names. *) - type ('request, 'response, 'a) handler + type ('request, 'request_mode, 'response, 'response_mode, 'a) handler (** The next functions are meant to be used by the client to handle call to RPCs. *) val bidirectional_streaming : f:('request Seq.writer -> 'response Seq.t -> 'a) -> - ('request, 'response, 'a) handler + ( 'request, + Grpc.Rpc.Value_mode.stream, + 'response, + Grpc.Rpc.Value_mode.stream, + 'a ) + handler val client_streaming : f:('request Seq.writer -> 'response option Eio.Promise.t -> 'a) -> - ('request, 'response, 'a) handler + ( 'request, + Grpc.Rpc.Value_mode.stream, + 'response, + Grpc.Rpc.Value_mode.unary, + 'a ) + handler val server_streaming : - f:('response Seq.t -> 'a) -> 'request -> ('request, 'response, 'a) handler + f:('response Seq.t -> 'a) -> + 'request -> + ( 'request, + Grpc.Rpc.Value_mode.unary, + 'response, + Grpc.Rpc.Value_mode.stream, + 'a ) + handler val unary : - f:('response option -> 'a) -> 'request -> ('request, 'response, 'a) handler + f:('response option -> 'a) -> + 'request -> + ( 'request, + Grpc.Rpc.Value_mode.unary, + 'response, + Grpc.Rpc.Value_mode.unary, + 'a ) + handler val call : - ('request, 'response) Grpc.Rpc.Client_rpc.t -> + ('request, 'request_mode, 'response, 'response_mode) Grpc.Rpc.Client_rpc.t -> ?scheme:string -> - handler:('request, 'response, 'a) handler -> + handler:('request, 'request_mode, 'response, 'response_mode, 'a) handler -> do_request:do_request -> ?headers:H2.Headers.t -> unit -> diff --git a/lib/grpc-eio/server.ml b/lib/grpc-eio/server.ml index b8da679..bf0b037 100644 --- a/lib/grpc-eio/server.ml +++ b/lib/grpc-eio/server.ml @@ -145,39 +145,29 @@ module Typed_rpc = struct type 'service_spec t = | T : { - rpc_spec : ('request, 'response, 'service_spec) Grpc.Rpc.Server_rpc.t; + rpc_spec : + ( 'request, + 'request_mode, + 'response, + 'response_mode, + 'service_spec ) + Grpc.Rpc.Server_rpc.t; rpc_impl : Rpc.t; } -> 'service_spec t - module Handlers = struct - type 'service_spec rpc = 'service_spec t - - type t = - | Handlers : Grpc.Rpc.Service_spec.t rpc list -> t - | With_service_spec : { - package : string list; - service_name : string; - handlers : unit rpc list; - } - -> t - end - let server handlers : server = let ts = - match (handlers : Handlers.t) with - | Handlers ts -> ts - | With_service_spec { package; service_name; handlers = ts } -> + match (handlers : _ Grpc.Rpc.Handlers.t) with + | Handlers { handlers = ts } -> ts + | With_service_spec { service_spec; handlers = ts } -> List.map (fun (T t) -> T { t with rpc_spec = - { - t.rpc_spec with - service_spec = Some { package; service_name }; - }; + { t.rpc_spec with service_spec = Some service_spec }; }) ts in @@ -205,7 +195,13 @@ module Typed_rpc = struct Service.handle_request service) let unary (type request response) - (rpc_spec : (request, response, _) Grpc.Rpc.Server_rpc.t) ~f:handler = + (rpc_spec : + ( request, + Grpc.Rpc.Value_mode.unary, + response, + Grpc.Rpc.Value_mode.unary, + _ ) + Grpc.Rpc.Server_rpc.t) ~f:handler = let handler buffer = let status, response = handler (rpc_spec.decode_request buffer) in (status, Option.map rpc_spec.encode_response response) @@ -213,7 +209,13 @@ module Typed_rpc = struct T { rpc_spec; rpc_impl = Rpc.Unary handler } let server_streaming (type request response) - (rpc_spec : (request, response, _) Grpc.Rpc.Server_rpc.t) ~f:handler = + (rpc_spec : + ( request, + Grpc.Rpc.Value_mode.unary, + response, + Grpc.Rpc.Value_mode.stream, + _ ) + Grpc.Rpc.Server_rpc.t) ~f:handler = let handler buffer f = handler (rpc_spec.decode_request buffer) (fun response -> f (rpc_spec.encode_response response)) @@ -221,7 +223,13 @@ module Typed_rpc = struct T { rpc_spec; rpc_impl = Rpc.Server_streaming handler } let client_streaming (type request response) - (rpc_spec : (request, response, _) Grpc.Rpc.Server_rpc.t) ~f:handler = + (rpc_spec : + ( request, + Grpc.Rpc.Value_mode.stream, + response, + Grpc.Rpc.Value_mode.unary, + _ ) + Grpc.Rpc.Server_rpc.t) ~f:handler = let handler requests = let requests = Seq.map rpc_spec.decode_request requests in let status, response = handler requests in @@ -230,7 +238,13 @@ module Typed_rpc = struct T { rpc_spec; rpc_impl = Rpc.Client_streaming handler } let bidirectional_streaming (type request response) - (rpc_spec : (request, response, _) Grpc.Rpc.Server_rpc.t) ~f:handler = + (rpc_spec : + ( request, + Grpc.Rpc.Value_mode.stream, + response, + Grpc.Rpc.Value_mode.stream, + _ ) + Grpc.Rpc.Server_rpc.t) ~f:handler = let handler requests f = let requests = Seq.map rpc_spec.decode_request requests in handler requests (fun response -> f (rpc_spec.encode_response response)) diff --git a/lib/grpc-eio/server.mli b/lib/grpc-eio/server.mli index 4aa8bbe..76c000e 100644 --- a/lib/grpc-eio/server.mli +++ b/lib/grpc-eio/server.mli @@ -88,39 +88,46 @@ module Typed_rpc : sig file. *) val unary : - ('request, 'response, 'service_spec) Grpc.Rpc.Server_rpc.t -> + ( 'request, + Grpc.Rpc.Value_mode.unary, + 'response, + Grpc.Rpc.Value_mode.unary, + 'service_spec ) + Grpc.Rpc.Server_rpc.t -> f:('request, 'response) unary -> 'service_spec t val client_streaming : - ('request, 'response, 'service_spec) Grpc.Rpc.Server_rpc.t -> + ( 'request, + Grpc.Rpc.Value_mode.stream, + 'response, + Grpc.Rpc.Value_mode.unary, + 'service_spec ) + Grpc.Rpc.Server_rpc.t -> f:('request, 'response) client_streaming -> 'service_spec t val server_streaming : - ('request, 'response, 'service_spec) Grpc.Rpc.Server_rpc.t -> + ( 'request, + Grpc.Rpc.Value_mode.unary, + 'response, + Grpc.Rpc.Value_mode.stream, + 'service_spec ) + Grpc.Rpc.Server_rpc.t -> f:('request, 'response) server_streaming -> 'service_spec t val bidirectional_streaming : - ('request, 'response, 'service_spec) Grpc.Rpc.Server_rpc.t -> + ( 'request, + Grpc.Rpc.Value_mode.stream, + 'response, + Grpc.Rpc.Value_mode.stream, + 'service_spec ) + Grpc.Rpc.Server_rpc.t -> f:('request, 'response) bidirectional_streaming -> 'service_spec t - module Handlers : sig - type 'service_spec rpc := 'service_spec t - - type t = - | Handlers : Grpc.Rpc.Service_spec.t rpc list -> t - | With_service_spec : { - package : string list; - service_name : string; - handlers : unit rpc list; - } - -> t - end - - val server : Handlers.t -> server + val server : (Grpc.Rpc.Service_spec.t t, unit t) Grpc.Rpc.Handlers.t -> server (** Having built a list of RPCs you will use this function to package them up into a server that is ready to be served over the network. This function takes care of registering the services based on the names provided by the diff --git a/lib/grpc-protoc-plugin/grpc_protoc_plugin.ml b/lib/grpc-protoc-plugin/grpc_protoc_plugin.ml index a06d5f0..baea692 100644 --- a/lib/grpc-protoc-plugin/grpc_protoc_plugin.ml +++ b/lib/grpc-protoc-plugin/grpc_protoc_plugin.ml @@ -24,22 +24,46 @@ let service_spec (type request response) service_name = R.service_name; } -let client_rpc (type request response) - (module R : S with type Request.t = request and type Response.t = response) - = - { - Grpc.Rpc.Client_rpc.service_spec = service_spec (module R); - rpc_name = R.method_name; - encode_request = encode (module R.Request); - decode_response = decode (module R.Response); - } +module Client_rpc = struct + let make (type request response) + (module R : S with type Request.t = request and type Response.t = response) + ~request_mode ~response_mode = + { + Grpc.Rpc.Client_rpc.service_spec = service_spec (module R); + rpc_name = R.method_name; + encode_request = encode (module R.Request); + decode_response = decode (module R.Response); + request_mode; + response_mode; + } -let server_rpc (type request response) - (module R : S with type Request.t = request and type Response.t = response) - = - { - Grpc.Rpc.Server_rpc.service_spec = Some (service_spec (module R)); - rpc_name = R.method_name; - decode_request = decode (module R.Request); - encode_response = encode (module R.Response); - } + let unary rpc = make rpc ~request_mode:Unary ~response_mode:Unary + let client_streaming rpc = make rpc ~request_mode:Stream ~response_mode:Unary + let server_streaming rpc = make rpc ~request_mode:Unary ~response_mode:Stream + + let bidirectional_streaming rpc = + make rpc ~request_mode:Stream ~response_mode:Stream +end + +module Server_rpc = struct + let make (type request response) + (module R : S with type Request.t = request and type Response.t = response) + ~request_mode ~response_mode = + { + Grpc.Rpc.Server_rpc.service_spec = Some (service_spec (module R)); + rpc_name = R.method_name; + decode_request = decode (module R.Request); + encode_response = encode (module R.Response); + request_mode; + response_mode; + } + + let unary rpc = make rpc ~request_mode:Unary ~response_mode:Unary + let client_streaming rpc = make rpc ~request_mode:Stream ~response_mode:Unary + let server_streaming rpc = make rpc ~request_mode:Unary ~response_mode:Stream + + let bidirectional_streaming rpc = + make rpc ~request_mode:Stream ~response_mode:Stream +end + +let handlers handlers = Grpc.Rpc.Handlers.Handlers { handlers } diff --git a/lib/grpc-protoc-plugin/grpc_protoc_plugin.mli b/lib/grpc-protoc-plugin/grpc_protoc_plugin.mli index 33a8056..7bd147f 100644 --- a/lib/grpc-protoc-plugin/grpc_protoc_plugin.mli +++ b/lib/grpc-protoc-plugin/grpc_protoc_plugin.mli @@ -1,13 +1,75 @@ module type S = Ocaml_protoc_plugin.Service.Rpc -val client_rpc : - (module Ocaml_protoc_plugin.Service.Rpc - with type Request.t = 'request - and type Response.t = 'response) -> - ('request, 'response) Grpc.Rpc.Client_rpc.t - -val server_rpc : - (module Ocaml_protoc_plugin.Service.Rpc - with type Request.t = 'request - and type Response.t = 'response) -> - ('request, 'response, Grpc.Rpc.Service_spec.t) Grpc.Rpc.Server_rpc.t +module Client_rpc : sig + val unary : + (module S with type Request.t = 'request and type Response.t = 'response) -> + ( 'request, + Grpc.Rpc.Value_mode.unary, + 'response, + Grpc.Rpc.Value_mode.unary ) + Grpc.Rpc.Client_rpc.t + + val client_streaming : + (module S with type Request.t = 'request and type Response.t = 'response) -> + ( 'request, + Grpc.Rpc.Value_mode.stream, + 'response, + Grpc.Rpc.Value_mode.unary ) + Grpc.Rpc.Client_rpc.t + + val server_streaming : + (module S with type Request.t = 'request and type Response.t = 'response) -> + ( 'request, + Grpc.Rpc.Value_mode.unary, + 'response, + Grpc.Rpc.Value_mode.stream ) + Grpc.Rpc.Client_rpc.t + + val bidirectional_streaming : + (module S with type Request.t = 'request and type Response.t = 'response) -> + ( 'request, + Grpc.Rpc.Value_mode.stream, + 'response, + Grpc.Rpc.Value_mode.stream ) + Grpc.Rpc.Client_rpc.t +end + +module Server_rpc : sig + val unary : + (module S with type Request.t = 'request and type Response.t = 'response) -> + ( 'request, + Grpc.Rpc.Value_mode.unary, + 'response, + Grpc.Rpc.Value_mode.unary, + Grpc.Rpc.Service_spec.t ) + Grpc.Rpc.Server_rpc.t + + val client_streaming : + (module S with type Request.t = 'request and type Response.t = 'response) -> + ( 'request, + Grpc.Rpc.Value_mode.stream, + 'response, + Grpc.Rpc.Value_mode.unary, + Grpc.Rpc.Service_spec.t ) + Grpc.Rpc.Server_rpc.t + + val server_streaming : + (module S with type Request.t = 'request and type Response.t = 'response) -> + ( 'request, + Grpc.Rpc.Value_mode.unary, + 'response, + Grpc.Rpc.Value_mode.stream, + Grpc.Rpc.Service_spec.t ) + Grpc.Rpc.Server_rpc.t + + val bidirectional_streaming : + (module S with type Request.t = 'request and type Response.t = 'response) -> + ( 'request, + Grpc.Rpc.Value_mode.stream, + 'response, + Grpc.Rpc.Value_mode.stream, + Grpc.Rpc.Service_spec.t ) + Grpc.Rpc.Server_rpc.t +end + +val handlers : 'a list -> ('a, _) Grpc.Rpc.Handlers.t diff --git a/lib/grpc-protoc/grpc_protoc.ml b/lib/grpc-protoc/grpc_protoc.ml index b2567fa..5ba88aa 100644 --- a/lib/grpc-protoc/grpc_protoc.ml +++ b/lib/grpc-protoc/grpc_protoc.ml @@ -7,21 +7,49 @@ let decode (type a) (decode : Pbrt.Decoder.t -> a) buffer = let decoder = Pbrt.Decoder.of_string buffer in decode decoder -let client_rpc (type request response) - (rpc : (request, _, response, _) Pbrt_services.Client.rpc) = - { - Grpc.Rpc.Client_rpc.service_spec = - { package = rpc.package; service_name = rpc.service_name }; - rpc_name = rpc.rpc_name; - encode_request = encode rpc.encode_pb_req; - decode_response = decode rpc.decode_pb_res; - } - -let server_rpc (type request response) - (rpc : (request, _, response, _) Pbrt_services.Server.rpc) = - { - Grpc.Rpc.Server_rpc.service_spec = None; - rpc_name = rpc.name; - decode_request = decode rpc.decode_pb_req; - encode_response = encode rpc.encode_pb_res; - } +module Client_rpc = struct + let make (type request response) + (rpc : (request, _, response, _) Pbrt_services.Client.rpc) ~request_mode + ~response_mode = + { + Grpc.Rpc.Client_rpc.service_spec = + { package = rpc.package; service_name = rpc.service_name }; + rpc_name = rpc.rpc_name; + encode_request = encode rpc.encode_pb_req; + decode_response = decode rpc.decode_pb_res; + request_mode; + response_mode; + } + + let unary rpc = make rpc ~request_mode:Unary ~response_mode:Unary + let client_streaming rpc = make rpc ~request_mode:Stream ~response_mode:Unary + let server_streaming rpc = make rpc ~request_mode:Unary ~response_mode:Stream + + let bidirectional_streaming rpc = + make rpc ~request_mode:Stream ~response_mode:Stream +end + +module Server_rpc = struct + let make (type request response) + (rpc : (request, _, response, _) Pbrt_services.Server.rpc) ~request_mode + ~response_mode = + { + Grpc.Rpc.Server_rpc.service_spec = None; + rpc_name = rpc.name; + decode_request = decode rpc.decode_pb_req; + encode_response = encode rpc.encode_pb_res; + request_mode; + response_mode; + } + + let unary rpc = make rpc ~request_mode:Unary ~response_mode:Unary + let client_streaming rpc = make rpc ~request_mode:Stream ~response_mode:Unary + let server_streaming rpc = make rpc ~request_mode:Unary ~response_mode:Stream + + let bidirectional_streaming rpc = + make rpc ~request_mode:Stream ~response_mode:Stream +end + +let handlers { Pbrt_services.Server.package; service_name; handlers } = + Grpc.Rpc.Handlers.With_service_spec + { service_spec = { package; service_name }; handlers } diff --git a/lib/grpc-protoc/grpc_protoc.mli b/lib/grpc-protoc/grpc_protoc.mli index 8356284..6390a10 100644 --- a/lib/grpc-protoc/grpc_protoc.mli +++ b/lib/grpc-protoc/grpc_protoc.mli @@ -1,7 +1,105 @@ -val client_rpc : - ('request, _, 'response, _) Pbrt_services.Client.rpc -> - ('request, 'response) Grpc.Rpc.Client_rpc.t +module Client_rpc : sig + val unary : + ( 'request, + Pbrt_services.Value_mode.unary, + 'response, + Pbrt_services.Value_mode.unary ) + Pbrt_services.Client.rpc -> + ( 'request, + Grpc.Rpc.Value_mode.unary, + 'response, + Grpc.Rpc.Value_mode.unary ) + Grpc.Rpc.Client_rpc.t -val server_rpc : - ('request, _, 'response, _) Pbrt_services.Server.rpc -> - ('request, 'response, unit) Grpc.Rpc.Server_rpc.t + val client_streaming : + ( 'request, + Pbrt_services.Value_mode.stream, + 'response, + Pbrt_services.Value_mode.unary ) + Pbrt_services.Client.rpc -> + ( 'request, + Grpc.Rpc.Value_mode.stream, + 'response, + Grpc.Rpc.Value_mode.unary ) + Grpc.Rpc.Client_rpc.t + + val server_streaming : + ( 'request, + Pbrt_services.Value_mode.unary, + 'response, + Pbrt_services.Value_mode.stream ) + Pbrt_services.Client.rpc -> + ( 'request, + Grpc.Rpc.Value_mode.unary, + 'response, + Grpc.Rpc.Value_mode.stream ) + Grpc.Rpc.Client_rpc.t + + val bidirectional_streaming : + ( 'request, + Pbrt_services.Value_mode.stream, + 'response, + Pbrt_services.Value_mode.stream ) + Pbrt_services.Client.rpc -> + ( 'request, + Grpc.Rpc.Value_mode.stream, + 'response, + Grpc.Rpc.Value_mode.stream ) + Grpc.Rpc.Client_rpc.t +end + +module Server_rpc : sig + val unary : + ( 'request, + Pbrt_services.Value_mode.unary, + 'response, + Pbrt_services.Value_mode.unary ) + Pbrt_services.Server.rpc -> + ( 'request, + Grpc.Rpc.Value_mode.unary, + 'response, + Grpc.Rpc.Value_mode.unary, + unit ) + Grpc.Rpc.Server_rpc.t + + val client_streaming : + ( 'request, + Pbrt_services.Value_mode.stream, + 'response, + Pbrt_services.Value_mode.unary ) + Pbrt_services.Server.rpc -> + ( 'request, + Grpc.Rpc.Value_mode.stream, + 'response, + Grpc.Rpc.Value_mode.unary, + unit ) + Grpc.Rpc.Server_rpc.t + + val server_streaming : + ( 'request, + Pbrt_services.Value_mode.unary, + 'response, + Pbrt_services.Value_mode.stream ) + Pbrt_services.Server.rpc -> + ( 'request, + Grpc.Rpc.Value_mode.unary, + 'response, + Grpc.Rpc.Value_mode.stream, + unit ) + Grpc.Rpc.Server_rpc.t + + val bidirectional_streaming : + ( 'request, + Pbrt_services.Value_mode.stream, + 'response, + Pbrt_services.Value_mode.stream ) + Pbrt_services.Server.rpc -> + ( 'request, + Grpc.Rpc.Value_mode.stream, + 'response, + Grpc.Rpc.Value_mode.stream, + unit ) + Grpc.Rpc.Server_rpc.t +end + +val handlers : 'a Pbrt_services.Server.t -> (_, 'a) Grpc.Rpc.Handlers.t diff --git a/lib/grpc/rpc.ml b/lib/grpc/rpc.ml index 3191c5c..c303b7c 100644 --- a/lib/grpc/rpc.ml +++ b/lib/grpc/rpc.ml @@ -1,5 +1,11 @@ type buffer = string +module Value_mode = struct + type unary + type stream + type _ t = Unary : unary t | Stream : stream t +end + module Service_spec = struct type t = { package : string list; service_name : string } @@ -8,12 +14,20 @@ module Service_spec = struct ^ t.service_name end +module Handlers = struct + type ('a, 'b) t = + | Handlers of { handlers : 'a list } + | With_service_spec of { handlers : 'b list; service_spec : Service_spec.t } +end + module Client_rpc = struct - type ('request, 'response) t = { + type ('request, 'request_mode, 'response, 'response_mode) t = { service_spec : Service_spec.t; rpc_name : string; encode_request : 'request -> buffer; decode_response : buffer -> 'response; + request_mode : 'request_mode Value_mode.t; + response_mode : 'response_mode Value_mode.t; } let packaged_service_name t = @@ -25,10 +39,12 @@ module Server_rpc = struct type 'a t = None : unit t | Some : Service_spec.t -> Service_spec.t t end - type ('request, 'response, 'service_spec) t = { + type ('request, 'request_mode, 'response, 'response_mode, 'service_spec) t = { service_spec : 'service_spec Service_spec.t; rpc_name : string; decode_request : buffer -> 'request; encode_response : 'response -> buffer; + request_mode : 'request_mode Value_mode.t; + response_mode : 'response_mode Value_mode.t; } end diff --git a/lib/grpc/rpc.mli b/lib/grpc/rpc.mli index 442a4e7..02db146 100644 --- a/lib/grpc/rpc.mli +++ b/lib/grpc/rpc.mli @@ -2,18 +2,32 @@ type buffer = string (** Exploring a separate client/server api that works better with [ocaml-protoc]. *) +module Value_mode : sig + type unary + type stream + type _ t = Unary : unary t | Stream : stream t +end + module Service_spec : sig type t = { package : string list; service_name : string } val packaged_service_name : t -> string end +module Handlers : sig + type ('a, 'b) t = + | Handlers of { handlers : 'a list } + | With_service_spec of { handlers : 'b list; service_spec : Service_spec.t } +end + module Client_rpc : sig - type ('request, 'response) t = { + type ('request, 'request_mode, 'response, 'response_mode) t = { service_spec : Service_spec.t; rpc_name : string; encode_request : 'request -> buffer; decode_response : buffer -> 'response; + request_mode : 'request_mode Value_mode.t; + response_mode : 'response_mode Value_mode.t; } val packaged_service_name : _ t -> string @@ -24,10 +38,12 @@ module Server_rpc : sig type 'a t = None : unit t | Some : Service_spec.t -> Service_spec.t t end - type ('request, 'response, 'service_spec) t = { + type ('request, 'request_mode, 'response, 'response_mode, 'service_spec) t = { service_spec : 'service_spec Service_spec.t; rpc_name : string; decode_request : buffer -> 'request; encode_response : 'response -> buffer; + request_mode : 'request_mode Value_mode.t; + response_mode : 'response_mode Value_mode.t; } end