diff --git a/dune-project b/dune-project index 32c4d3a..1374a21 100644 --- a/dune-project +++ b/dune-project @@ -38,6 +38,10 @@ (>= 0.9.1)) (h2 (>= 0.9.0)) + (odoc + (and + (>= 2.4.0) + :with-doc)) ppx_deriving (uri (>= 4.0.0)))) @@ -92,6 +96,32 @@ (>= 0.9.0)) stringext)) +(package + (name grpc-protoc-plugin) + (synopsis "An implementation of gRPC using ocaml-protoc-plugin") + (description + "Functionality for building gRPC services and rpcs with `ocaml-protoc-plugin`") + (depends + (grpc + (= :version)) + (ocaml-protoc-plugin + (>= 4.5)))) + +(package + (name grpc-protoc) + (synopsis "An implementation of gRPC using ocaml-protoc") + (description + "Functionality for building gRPC services and rpcs with `ocaml-protoc`") + (depends + (grpc + (= :version)) + (ocaml-protoc + (>= 3.0)) + (pbrt + (>= 3.0)) + (pbrt_services + (>= 3.0)))) + (package (name grpc-examples) (synopsis "Various gRPC examples") diff --git a/examples/greeter-client-eio/greeter_client_eio.ml b/examples/greeter-client-eio/greeter_client_eio.ml index c8b0530..a95e3ca 100644 --- a/examples/greeter-client-eio/greeter_client_eio.ml +++ b/examples/greeter-client-eio/greeter_client_eio.ml @@ -39,7 +39,7 @@ let main env = in let result = - Grpc_eio.Client.call ~service:"mypackage.Greeter" ~rpc:"SayHello" + Grpc_eio.Client.Rpc.call ~service:"mypackage.Greeter" ~rpc:"SayHello" ~do_request:(H2_eio.Client.request connection ~error_handler:ignore) ~handler:(Grpc_eio.Client.Rpc.unary encoded_request ~f) () diff --git a/examples/routeguide-tutorial.md b/examples/routeguide-tutorial.md index 96128be..da1c5b4 100644 --- a/examples/routeguide-tutorial.md +++ b/examples/routeguide-tutorial.md @@ -439,7 +439,7 @@ Calling the simple RPC `get_feature` requires building up a `Client.call` repres let call_get_feature connection point = let encode, decode = Service.make_client_functions RouteGuide.getFeature in let response = - Client.call ~service:"routeguide.RouteGuide" ~rpc:"GetFeature" + Client.Rpc.call ~service:"routeguide.RouteGuide" ~rpc:"GetFeature" ~do_request:(H2_eio.Client.request connection ~error_handler:ignore) ~handler: (Client.Rpc.unary @@ -476,7 +476,7 @@ let print_features connection = let encode, decode = Service.make_client_functions RouteGuide.listFeatures in let stream = - Client.call ~service:"routeguide.RouteGuide" ~rpc:"ListFeatures" + Client.Rpc.call ~service:"routeguide.RouteGuide" ~rpc:"ListFeatures" ~do_request:(H2_eio.Client.request connection ~error_handler:ignore) ~handler: (Client.Rpc.server_streaming @@ -528,7 +528,7 @@ let run_record_route connection = let encode, decode = Service.make_client_functions RouteGuide.recordRoute in let response = - Client.call ~service:"routeguide.RouteGuide" ~rpc:"RecordRoute" + Client.Rpc.call ~service:"routeguide.RouteGuide" ~rpc:"RecordRoute" ~do_request:(H2_eio.Client.request connection ~error_handler:ignore) ~handler: (Client.Rpc.client_streaming ~f:(fun f response -> @@ -615,7 +615,7 @@ We start by generating a short sequence of locations, similar to how we did for go writer reader' xs) in let result = - Client.call ~service:"routeguide.RouteGuide" ~rpc:"RouteChat" + Client.Rpc.call ~service:"routeguide.RouteGuide" ~rpc:"RouteChat" ~do_request:(H2_eio.Client.request connection ~error_handler:ignore) ~handler: (Client.Rpc.bidirectional_streaming ~f:(fun writer reader -> diff --git a/examples/routeguide/src/client.ml b/examples/routeguide/src/client.ml index 47d8dba..d3b5ab5 100644 --- a/examples/routeguide/src/client.ml +++ b/examples/routeguide/src/client.ml @@ -22,7 +22,7 @@ let client ~sw host port network = let call_get_feature connection point = let encode, decode = Service.make_client_functions RouteGuide.getFeature in let response = - Client.call ~service:"routeguide.RouteGuide" ~rpc:"GetFeature" + Client.Rpc.call ~service:"routeguide.RouteGuide" ~rpc:"GetFeature" ~do_request:(H2_eio.Client.request connection ~error_handler:ignore) ~handler: (Client.Rpc.unary @@ -55,7 +55,7 @@ let print_features connection = let encode, decode = Service.make_client_functions RouteGuide.listFeatures in let stream = - Client.call ~service:"routeguide.RouteGuide" ~rpc:"ListFeatures" + Client.Rpc.call ~service:"routeguide.RouteGuide" ~rpc:"ListFeatures" ~do_request:(H2_eio.Client.request connection ~error_handler:ignore) ~handler: (Client.Rpc.server_streaming @@ -100,7 +100,7 @@ let run_record_route connection = let encode, decode = Service.make_client_functions RouteGuide.recordRoute in let response = - Client.call ~service:"routeguide.RouteGuide" ~rpc:"RecordRoute" + Client.Rpc.call ~service:"routeguide.RouteGuide" ~rpc:"RecordRoute" ~do_request:(H2_eio.Client.request connection ~error_handler:ignore) ~handler: (Client.Rpc.client_streaming ~f:(fun f response -> @@ -178,7 +178,7 @@ let run_route_chat clock connection = go writer reader' xs) in let result = - Client.call ~service:"routeguide.RouteGuide" ~rpc:"RouteChat" + Client.Rpc.call ~service:"routeguide.RouteGuide" ~rpc:"RouteChat" ~do_request:(H2_eio.Client.request connection ~error_handler:ignore) ~handler: (Client.Rpc.bidirectional_streaming ~f:(fun writer reader -> diff --git a/grpc-protoc-plugin.opam b/grpc-protoc-plugin.opam new file mode 100644 index 0000000..1ddae22 --- /dev/null +++ b/grpc-protoc-plugin.opam @@ -0,0 +1,40 @@ +# This file is generated by dune, edit dune-project instead +opam-version: "2.0" +synopsis: "An implementation of gRPC using ocaml-protoc-plugin" +description: + "Functionality for building gRPC services and rpcs with `ocaml-protoc-plugin`" +maintainer: ["Daniel Quernheim "] +authors: [ + "Andrew Jeffery " + "Daniel Quernheim " + "Michael Bacarella " + "Sven Anderson " + "Tim McGilchrist " + "Wojtek Czekalski " + "dimitris.mostrous " +] +license: "BSD-3-Clause" +homepage: "https://github.com/dialohq/ocaml-grpc" +doc: "https://dialohq.github.io/ocaml-grpc" +bug-reports: "https://github.com/dialohq/ocaml-grpc/issues" +depends: [ + "dune" {>= "3.7"} + "grpc" {= version} + "ocaml-protoc-plugin" {>= "4.5"} + "odoc" {with-doc} +] +build: [ + ["dune" "subst"] {dev} + [ + "dune" + "build" + "-p" + name + "-j" + jobs + "@install" + "@runtest" {with-test} + "@doc" {with-doc} + ] +] +dev-repo: "git+https://github.com/dialohq/ocaml-grpc.git" diff --git a/grpc-protoc.opam b/grpc-protoc.opam new file mode 100644 index 0000000..fde3dfd --- /dev/null +++ b/grpc-protoc.opam @@ -0,0 +1,42 @@ +# This file is generated by dune, edit dune-project instead +opam-version: "2.0" +synopsis: "An implementation of gRPC using ocaml-protoc" +description: + "Functionality for building gRPC services and rpcs with `ocaml-protoc`" +maintainer: ["Daniel Quernheim "] +authors: [ + "Andrew Jeffery " + "Daniel Quernheim " + "Michael Bacarella " + "Sven Anderson " + "Tim McGilchrist " + "Wojtek Czekalski " + "dimitris.mostrous " +] +license: "BSD-3-Clause" +homepage: "https://github.com/dialohq/ocaml-grpc" +doc: "https://dialohq.github.io/ocaml-grpc" +bug-reports: "https://github.com/dialohq/ocaml-grpc/issues" +depends: [ + "dune" {>= "3.7"} + "grpc" {= version} + "ocaml-protoc" {>= "3.0"} + "pbrt" {>= "3.0"} + "pbrt_services" {>= "3.0"} + "odoc" {with-doc} +] +build: [ + ["dune" "subst"] {dev} + [ + "dune" + "build" + "-p" + name + "-j" + jobs + "@install" + "@runtest" {with-test} + "@doc" {with-doc} + ] +] +dev-repo: "git+https://github.com/dialohq/ocaml-grpc.git" diff --git a/grpc.opam b/grpc.opam index 8355be4..a071e05 100644 --- a/grpc.opam +++ b/grpc.opam @@ -23,9 +23,9 @@ depends: [ "ocaml" {>= "4.08"} "bigstringaf" {>= "0.9.1"} "h2" {>= "0.9.0"} + "odoc" {>= "2.4.0" & with-doc} "ppx_deriving" "uri" {>= "4.0.0"} - "odoc" {with-doc} ] build: [ ["dune" "subst"] {dev} diff --git a/lib/grpc-eio/client.ml b/lib/grpc-eio/client.ml index 4efe5cd..c3039a5 100644 --- a/lib/grpc-eio/client.ml +++ b/lib/grpc-eio/client.ml @@ -43,8 +43,8 @@ let get_response_and_bodies request = let read_body = Eio.Promise.await read_body in (response, read_body, write_body) -let call ~service ~rpc ?(scheme = "https") ~handler ~(do_request : do_request) - ?(headers = default_headers) () = +let call_internal ~service ~rpc ?(scheme = "https") ~handler + ~(do_request : do_request) ?(headers = default_headers) () = let request = make_request ~service ~rpc ~scheme ~headers in let status, trailers_handler = make_trailers_handler () in let response, read_body, write_body = @@ -66,20 +66,98 @@ let call ~service ~rpc ?(scheme = "https") ~handler ~(do_request : do_request) Ok (result, status) | error_status -> Error error_status +let make_handler ~encode_request ~decode_response ~f write_body read_body = + let response_reader, response_writer = Seq.create_reader_writer () in + let request_reader, request_writer = Seq.create_reader_writer () in + Connection.grpc_recv_streaming ~decode:decode_response read_body + response_writer; + let res, res_notify = Eio.Promise.create () in + Eio.Fiber.both + (fun () -> + Eio.Promise.resolve res_notify (f request_writer response_reader)) + (fun () -> + Connection.grpc_send_streaming_client ~encode:encode_request write_body + request_reader); + Eio.Promise.await res + +module Typed_rpc = struct + type ('request, 'request_mode, 'response, 'response_mode, 'a) handler = + ('request, 'request_mode, 'response, 'response_mode) Grpc.Rpc.Client_rpc.t -> + H2.Body.Writer.t -> + H2.Body.Reader.t -> + 'a + + let make_handler (type request response) + ~(rpc : (request, _, response, _) Grpc.Rpc.Client_rpc.t) ~f = + make_handler ~encode_request:rpc.encode_request + ~decode_response:rpc.decode_response ~f + + let bidirectional_streaming (type request response) ~f + (rpc : + ( request, + Grpc.Rpc.Value_mode.stream, + response, + Grpc.Rpc.Value_mode.stream ) + Grpc.Rpc.Client_rpc.t) = + make_handler ~rpc ~f + + let client_streaming (type request response) ~f + (rpc : + ( request, + Grpc.Rpc.Value_mode.stream, + response, + Grpc.Rpc.Value_mode.unary ) + Grpc.Rpc.Client_rpc.t) = + make_handler ~rpc ~f:(fun request_writer responses -> + let response, response_resolver = Eio.Promise.create () in + Eio.Fiber.pair + (fun () -> f request_writer response) + (fun () -> + Eio.Promise.resolve response_resolver + (Seq.read_and_exhaust responses)) + |> fst) + + let server_streaming (type request response) ~f (request : request) + (rpc : + ( request, + Grpc.Rpc.Value_mode.unary, + response, + Grpc.Rpc.Value_mode.stream ) + Grpc.Rpc.Client_rpc.t) = + make_handler ~rpc ~f:(fun request_writer responses -> + Seq.write request_writer request; + Seq.close_writer request_writer; + f responses) + + let unary (type request response) ~f (request : request) + (rpc : + ( request, + Grpc.Rpc.Value_mode.unary, + response, + Grpc.Rpc.Value_mode.unary ) + Grpc.Rpc.Client_rpc.t) = + make_handler ~rpc ~f:(fun request_writer responses -> + Seq.write request_writer request; + Seq.close_writer request_writer; + let response = Seq.read_and_exhaust responses in + f response) + + let call (type request request_mode response response_mode a) + (rpc : + (request, request_mode, response, response_mode) Grpc.Rpc.Client_rpc.t) + ?scheme + ~(handler : (request, request_mode, response, response_mode, a) handler) + ~do_request ?headers () = + call_internal + ~service:(Grpc.Rpc.Service_spec.packaged_service_name rpc.service_spec) + ~rpc:rpc.rpc_name ?scheme ~handler:(handler rpc) ~do_request ?headers () +end + module Rpc = struct type 'a handler = H2.Body.Writer.t -> H2.Body.Reader.t -> 'a - let bidirectional_streaming ~f write_body read_body = - let response_reader, response_writer = Seq.create_reader_writer () in - let request_reader, request_writer = Seq.create_reader_writer () in - Connection.grpc_recv_streaming read_body response_writer; - let res, res_notify = Eio.Promise.create () in - Eio.Fiber.both - (fun () -> - Eio.Promise.resolve res_notify (f request_writer response_reader)) - (fun () -> - Connection.grpc_send_streaming_client write_body request_reader); - Eio.Promise.await res + let bidirectional_streaming ~f = + make_handler ~encode_request:Fun.id ~decode_response:Fun.id ~f let client_streaming ~f = bidirectional_streaming ~f:(fun request_writer responses -> @@ -103,4 +181,9 @@ module Rpc = struct Seq.close_writer request_writer; let response = Seq.read_and_exhaust responses in f response) + + let call = call_internal end + +(* Deprecated in the mli. *) +let call = call_internal diff --git a/lib/grpc-eio/client.mli b/lib/grpc-eio/client.mli index 745d33c..935c6ca 100644 --- a/lib/grpc-eio/client.mli +++ b/lib/grpc-eio/client.mli @@ -1,3 +1,82 @@ +type response_handler = H2.Client_connection.response_handler + +type do_request = + ?flush_headers_immediately:bool -> + ?trailers_handler:(H2.Headers.t -> unit) -> + H2.Request.t -> + response_handler:response_handler -> + H2.Body.Writer.t +(** [do_request] is the type of a function that performs the request *) + +(** {1 Typed API} *) + +module Typed_rpc : sig + (** A typed interface to call RPC from the client side. + + Compared to {!module:Rpc}, this interface will: + + - handle the coding/decoding of messages for you under the hood; + - use the service and RPC names provided by the rpc specification to + call the services with their expected names. *) + + type ('request, 'request_mode, 'response, 'response_mode, 'a) handler + + (** The next functions are meant to be used by the client to handle + call to RPCs. *) + + val bidirectional_streaming : + f:('request Seq.writer -> 'response Seq.t -> 'a) -> + ( 'request, + Grpc.Rpc.Value_mode.stream, + 'response, + Grpc.Rpc.Value_mode.stream, + 'a ) + handler + + val client_streaming : + f:('request Seq.writer -> 'response option Eio.Promise.t -> 'a) -> + ( 'request, + Grpc.Rpc.Value_mode.stream, + 'response, + Grpc.Rpc.Value_mode.unary, + 'a ) + handler + + val server_streaming : + f:('response Seq.t -> 'a) -> + 'request -> + ( 'request, + Grpc.Rpc.Value_mode.unary, + 'response, + Grpc.Rpc.Value_mode.stream, + 'a ) + handler + + val unary : + f:('response option -> 'a) -> + 'request -> + ( 'request, + Grpc.Rpc.Value_mode.unary, + 'response, + Grpc.Rpc.Value_mode.unary, + 'a ) + handler + + val call : + ('request, 'request_mode, 'response, 'response_mode) Grpc.Rpc.Client_rpc.t -> + ?scheme:string -> + handler:('request, 'request_mode, 'response, 'response_mode, 'a) handler -> + do_request:do_request -> + ?headers:H2.Headers.t -> + unit -> + ('a * Grpc.Status.t, H2.Status.t) result + (** [call rpc ~handler ~do_request ()] calls the rpc endpoint given + by [service] and [rpc] using the [do_request] function. The [handler] is + called when this request is set up to send and receive data. *) +end + +(** {1 Untyped API} *) + module Rpc : sig type 'a handler = H2.Body.Writer.t -> H2.Body.Reader.t -> 'a @@ -22,17 +101,20 @@ module Rpc : sig (** [unary ~f enc write read] sets up the sending and receiving logic using [write] and [read], then sends [enc] and calls [f] with a promise for the response. *) -end - -type response_handler = H2.Client_connection.response_handler -type do_request = - ?flush_headers_immediately:bool -> - ?trailers_handler:(H2.Headers.t -> unit) -> - H2.Request.t -> - response_handler:response_handler -> - H2.Body.Writer.t -(** [do_request] is the type of a function that performs the request *) + val call : + service:string -> + rpc:string -> + ?scheme:string -> + handler:'a handler -> + do_request:do_request -> + ?headers:H2.Headers.t -> + unit -> + ('a * Grpc.Status.t, H2.Status.t) result + (** [call ~service ~rpc ~handler ~do_request ()] calls the rpc endpoint given + by [service] and [rpc] using the [do_request] function. The [handler] is + called when this request is set up to send and receive data. *) +end val call : service:string -> @@ -43,6 +125,9 @@ val call : ?headers:H2.Headers.t -> unit -> ('a * Grpc.Status.t, H2.Status.t) result -(** [call ~service ~rpc ~handler ~do_request ()] calls the rpc endpoint given - by [service] and [rpc] using the [do_request] function. The [handler] is - called when this request is set up to send and receive data. *) +[@@ocaml.alert + deprecated "This function was renamed [Grpc_eio.Client.Rpc.call]."] +(** Deprecating this function makes the distinction between the typed and + untyped API more clear. Also, it frees up the name [call] at top level, + which we could use in the future if we decide to "promote" the typed API + to the toplevel scope of this module. *) diff --git a/lib/grpc-eio/connection.ml b/lib/grpc-eio/connection.ml index 3de3965..31f6930 100644 --- a/lib/grpc-eio/connection.ml +++ b/lib/grpc-eio/connection.ml @@ -1,23 +1,25 @@ -let grpc_recv_streaming body message_buffer_writer = +let grpc_recv_streaming ~decode body message_buffer_writer = let request_buffer = Grpc.Buffer.v () in let on_eof () = Seq.close_writer message_buffer_writer in let rec on_read buffer ~off ~len = Grpc.Buffer.copy_from_bigstringaf ~src_off:off ~src:buffer ~dst:request_buffer ~length:len; - Grpc.Message.extract_all (Seq.write message_buffer_writer) request_buffer; + Grpc.Message.extract_all + (fun message -> Seq.write message_buffer_writer (decode message)) + request_buffer; H2.Body.Reader.schedule_read body ~on_read ~on_eof in H2.Body.Reader.schedule_read body ~on_read ~on_eof -let grpc_send_streaming_client body encoder_stream = +let grpc_send_streaming_client ~encode body encoder_stream = Seq.iter (fun encoder -> - let payload = Grpc.Message.make encoder in + let payload = Grpc.Message.make (encode encoder) in H2.Body.Writer.write_string body payload) encoder_stream; H2.Body.Writer.close body -let grpc_send_streaming request encoder_stream status_promise = +let grpc_send_streaming ~encode request encoder_stream status_promise = let body = H2.Reqd.respond_with_streaming ~flush_headers_immediately:true request (H2.Response.create @@ -27,7 +29,7 @@ let grpc_send_streaming request encoder_stream status_promise = in Seq.iter (fun input -> - let payload = Grpc.Message.make input in + let payload = Grpc.Message.make (encode input) in H2.Body.Writer.write_string body payload; H2.Body.Writer.flush body (fun () -> ())) encoder_stream; diff --git a/lib/grpc-eio/connection.mli b/lib/grpc-eio/connection.mli new file mode 100644 index 0000000..fee84be --- /dev/null +++ b/lib/grpc-eio/connection.mli @@ -0,0 +1,12 @@ +val grpc_recv_streaming : + decode:(string -> 'a) -> H2.Body.Reader.t -> 'a Seq.writer -> unit + +val grpc_send_streaming_client : + encode:('a -> string) -> H2.Body.Writer.t -> 'a Seq.reader -> unit + +val grpc_send_streaming : + encode:('a -> string) -> + H2.Reqd.t -> + 'a Seq.reader -> + Grpc.Status.t Eio.Promise.t -> + unit diff --git a/lib/grpc-eio/server.ml b/lib/grpc-eio/server.ml index ffd850c..fff6503 100644 --- a/lib/grpc-eio/server.ml +++ b/lib/grpc-eio/server.ml @@ -1,6 +1,7 @@ module ServiceMap = Map.Make (String) -type service = H2.Reqd.t -> unit +type reqd_handler = H2.Reqd.t -> unit +type service = reqd_handler type t = service ServiceMap.t let v () = ServiceMap.empty @@ -44,6 +45,176 @@ let handle_request t reqd = | None -> respond_with `Unsupported_media_type) | _ -> respond_with `Not_found +let implement_rpc ~decode_request ~encode_response ~f reqd = + let body = H2.Reqd.request_body reqd in + let request_reader, request_writer = Seq.create_reader_writer () in + let response_reader, response_writer = Seq.create_reader_writer () in + Connection.grpc_recv_streaming ~decode:decode_request body request_writer; + let status_promise, status_notify = Eio.Promise.create () in + Eio.Fiber.both + (fun () -> + let respond = Seq.write response_writer in + let status = f request_reader respond in + Seq.close_writer response_writer; + Eio.Promise.resolve status_notify status) + (fun () -> + try + Connection.grpc_send_streaming ~encode:encode_response reqd + response_reader status_promise + with exn -> + (* https://github.com/anmonteiro/ocaml-h2/issues/175 *) + Eio.traceln "%s" (Printexc.to_string exn)) + +module Typed_rpc = struct + module Service = struct + module RpcMap = Map.Make (String) + + type t = reqd_handler RpcMap.t + + let v () = RpcMap.empty + let add_rpc ~name ~rpc t = RpcMap.add name rpc t + + let handle_request (t : t) reqd = + let request = H2.Reqd.request reqd in + let respond_with code = + H2.Reqd.respond_with_string reqd (H2.Response.create code) "" + in + let parts = String.split_on_char '/' request.target in + if List.length parts > 1 then + let rpc_name = List.nth parts (List.length parts - 1) in + match RpcMap.find_opt rpc_name t with + | Some rpc -> rpc reqd + | None -> respond_with `Not_found + else respond_with `Not_found + end + + type server = t + + type ('request, 'response) unary = + 'request -> Grpc.Status.t * 'response option + + type ('request, 'response) client_streaming = + 'request Seq.t -> Grpc.Status.t * 'response option + + type ('request, 'response) server_streaming = + 'request -> ('response -> unit) -> Grpc.Status.t + + type ('request, 'response) bidirectional_streaming = + 'request Seq.t -> ('response -> unit) -> Grpc.Status.t + + type 'service_spec t = + | T : { + rpc_spec : + ( 'request, + 'request_mode, + 'response, + 'response_mode, + 'service_spec ) + Grpc.Rpc.Server_rpc.t; + rpc_impl : reqd_handler; + } + -> 'service_spec t + + let rec make_handlers handlers = + match (handlers : _ Grpc.Rpc.Handlers.t) with + | a :: tl -> List.concat (make_handlers a :: List.map make_handlers tl) + | Handlers { handlers = ts } -> ts + | With_service_spec { service_spec; handlers = ts } -> + List.map + (fun (T t) -> + T + { + t with + rpc_spec = { t.rpc_spec with service_spec = Some service_spec }; + }) + ts + + let server handlers : server = + let handlers = make_handlers handlers in + List.fold_left + (fun map (T t as packed) -> + let service_name = + match t.rpc_spec.service_spec with + | Some service_spec -> + Grpc.Rpc.Service_spec.packaged_service_name service_spec + in + let rpc_impl = + ServiceMap.find_opt service_name map |> Option.value ~default:[] + in + ServiceMap.add service_name (packed :: rpc_impl) map) + ServiceMap.empty handlers + |> ServiceMap.map (fun ts -> + let service = + List.fold_left + (fun acc (T t) -> + Service.add_rpc ~name:t.rpc_spec.rpc_name ~rpc:t.rpc_impl acc) + (Service.v ()) ts + in + Service.handle_request service) + + let implement_rpc (type request response) + ~(rpc_spec : (request, _, response, _, _) Grpc.Rpc.Server_rpc.t) ~f = + let rpc_impl = + implement_rpc ~decode_request:rpc_spec.decode_request + ~encode_response:rpc_spec.encode_response ~f + in + T { rpc_spec; rpc_impl } + + let bidirectional_streaming (type request response) + (rpc_spec : + ( request, + Grpc.Rpc.Value_mode.stream, + response, + Grpc.Rpc.Value_mode.stream, + _ ) + Grpc.Rpc.Server_rpc.t) ~f = + implement_rpc ~rpc_spec ~f + + let unary (type request response) + (rpc_spec : + ( request, + Grpc.Rpc.Value_mode.unary, + response, + Grpc.Rpc.Value_mode.unary, + _ ) + Grpc.Rpc.Server_rpc.t) ~f = + implement_rpc ~rpc_spec ~f:(fun requests respond -> + match Seq.read_and_exhaust requests with + | None -> Grpc.Status.(v OK) + | Some request -> + let status, response = f request in + (match response with + | None -> () + | Some response -> respond response); + status) + + let server_streaming (type request response) + (rpc_spec : + ( request, + Grpc.Rpc.Value_mode.unary, + response, + Grpc.Rpc.Value_mode.stream, + _ ) + Grpc.Rpc.Server_rpc.t) ~f = + implement_rpc ~rpc_spec ~f:(fun requests respond -> + match Seq.read_and_exhaust requests with + | None -> Grpc.Status.(v OK) + | Some request -> f request respond) + + let client_streaming (type request response) + (rpc_spec : + ( request, + Grpc.Rpc.Value_mode.stream, + response, + Grpc.Rpc.Value_mode.unary, + _ ) + Grpc.Rpc.Server_rpc.t) ~f = + implement_rpc ~rpc_spec ~f:(fun requests respond -> + let status, response = f requests in + (match response with None -> () | Some response -> respond response); + status) +end + module Rpc = struct type unary = string -> Grpc.Status.t * string option type client_streaming = string Seq.t -> Grpc.Status.t * string option @@ -59,22 +230,7 @@ module Rpc = struct | Bidirectional_streaming of bidirectional_streaming let bidirectional_streaming ~f reqd = - let body = H2.Reqd.request_body reqd in - let request_reader, request_writer = Seq.create_reader_writer () in - let response_reader, response_writer = Seq.create_reader_writer () in - Connection.grpc_recv_streaming body request_writer; - let status_promise, status_notify = Eio.Promise.create () in - Eio.Fiber.both - (fun () -> - let respond = Seq.write response_writer in - let status = f request_reader respond in - Seq.close_writer response_writer; - Eio.Promise.resolve status_notify status) - (fun () -> - try Connection.grpc_send_streaming reqd response_reader status_promise - with exn -> - (* https://github.com/anmonteiro/ocaml-h2/issues/175 *) - Eio.traceln "%s" (Printexc.to_string exn)) + implement_rpc ~decode_request:Fun.id ~encode_response:Fun.id ~f reqd let client_streaming ~f reqd = bidirectional_streaming reqd ~f:(fun requests respond -> @@ -101,29 +257,15 @@ module Rpc = struct end module Service = struct - module RpcMap = Map.Make (String) - - type t = Rpc.t RpcMap.t - - let v () = RpcMap.empty - let add_rpc ~name ~rpc t = RpcMap.add name rpc t + include Typed_rpc.Service - let handle_request (t : t) reqd = - let request = H2.Reqd.request reqd in - let respond_with code = - H2.Reqd.respond_with_string reqd (H2.Response.create code) "" - in - let parts = String.split_on_char '/' request.target in - if List.length parts > 1 then - let rpc_name = List.nth parts (List.length parts - 1) in - let rpc = RpcMap.find_opt rpc_name t in - match rpc with - | Some rpc -> ( - match rpc with - | Unary f -> Rpc.unary ~f reqd - | Client_streaming f -> Rpc.client_streaming ~f reqd - | Server_streaming f -> Rpc.server_streaming ~f reqd - | Bidirectional_streaming f -> Rpc.bidirectional_streaming ~f reqd) - | None -> respond_with `Not_found - else respond_with `Not_found + let add_rpc ~name ~rpc t = + add_rpc ~name + ~rpc: + (match rpc with + | Rpc.Unary f -> Rpc.unary ~f + | Client_streaming f -> Rpc.client_streaming ~f + | Server_streaming f -> Rpc.server_streaming ~f + | Bidirectional_streaming f -> Rpc.bidirectional_streaming ~f) + t end diff --git a/lib/grpc-eio/server.mli b/lib/grpc-eio/server.mli index 40961f5..715cd3a 100644 --- a/lib/grpc-eio/server.mli +++ b/lib/grpc-eio/server.mli @@ -1,5 +1,95 @@ include Grpc.Server.S +(** {1 Typed API} *) + +module Typed_rpc : sig + (** A typed interface to build RPCs on the server side. + + Compared to {!module:Rpc}, this interface will: + + - handle the coding/decoding of messages for you under the hood; + - use the service and RPC names provided by the rpc specification to + register the services with their expected names. *) + + type server := t + + type ('request, 'response) unary = + 'request -> Grpc.Status.t * 'response option + (** [unary] is the type for a unary grpc rpc, one request, one response. *) + + type ('request, 'response) client_streaming = + 'request Seq.t -> Grpc.Status.t * 'response option + (** [client_streaming] is the type for an rpc where the client streams the + requests and the server responds once. *) + + type ('request, 'response) server_streaming = + 'request -> ('response -> unit) -> Grpc.Status.t + (** [server_streaming] is the type for an rpc where the client sends one + request and the server sends multiple responses. *) + + type ('request, 'response) bidirectional_streaming = + 'request Seq.t -> ('response -> unit) -> Grpc.Status.t + (** [bidirectional_streaming] is the type for an rpc where both the client and + server can send multiple messages. *) + + type 'service_spec t + (** [t] represents an implementation for an RPC on the server side. *) + + (** The next functions are meant to be used by the server to create RPC + implementations. The rpc specification that the function implements must + be provided as it is used to handle coding/decoding of messages. It also + allows to refer to the service and RPC names specified in the [.proto] + file. *) + + val unary : + ( 'request, + Grpc.Rpc.Value_mode.unary, + 'response, + Grpc.Rpc.Value_mode.unary, + 'service_spec ) + Grpc.Rpc.Server_rpc.t -> + f:('request, 'response) unary -> + 'service_spec t + + val client_streaming : + ( 'request, + Grpc.Rpc.Value_mode.stream, + 'response, + Grpc.Rpc.Value_mode.unary, + 'service_spec ) + Grpc.Rpc.Server_rpc.t -> + f:('request, 'response) client_streaming -> + 'service_spec t + + val server_streaming : + ( 'request, + Grpc.Rpc.Value_mode.unary, + 'response, + Grpc.Rpc.Value_mode.stream, + 'service_spec ) + Grpc.Rpc.Server_rpc.t -> + f:('request, 'response) server_streaming -> + 'service_spec t + + val bidirectional_streaming : + ( 'request, + Grpc.Rpc.Value_mode.stream, + 'response, + Grpc.Rpc.Value_mode.stream, + 'service_spec ) + Grpc.Rpc.Server_rpc.t -> + f:('request, 'response) bidirectional_streaming -> + 'service_spec t + + val server : (Grpc.Rpc.Service_spec.t t, unit t) Grpc.Rpc.Handlers.t -> server + (** Having built a list of RPCs you will use this function to package them up + into a server that is ready to be served over the network. This function + takes care of registering the services based on the names provided by the + protoc specification. *) +end + +(** {1 Untyped API} *) + module Rpc : sig type unary = string -> Grpc.Status.t * string option (** [unary] is the type for a unary grpc rpc, one request, one response. *) diff --git a/lib/grpc-protoc-plugin/dune b/lib/grpc-protoc-plugin/dune new file mode 100644 index 0000000..900987e --- /dev/null +++ b/lib/grpc-protoc-plugin/dune @@ -0,0 +1,4 @@ +(library + (name grpc_protoc_plugin) + (public_name grpc-protoc-plugin) + (libraries grpc ocaml-protoc-plugin)) diff --git a/lib/grpc-protoc-plugin/grpc_protoc_plugin.ml b/lib/grpc-protoc-plugin/grpc_protoc_plugin.ml new file mode 100644 index 0000000..baea692 --- /dev/null +++ b/lib/grpc-protoc-plugin/grpc_protoc_plugin.ml @@ -0,0 +1,69 @@ +module type S = Ocaml_protoc_plugin.Service.Rpc + +let encode (type a) + (module M : Ocaml_protoc_plugin.Runtime.Runtime'.Service.Message + with type t = a) (a : a) = + a |> M.to_proto |> Ocaml_protoc_plugin.Runtime.Runtime'.Writer.contents + +let decode (type a) + (module M : Ocaml_protoc_plugin.Runtime.Runtime'.Service.Message + with type t = a) buffer = + buffer |> Ocaml_protoc_plugin.Runtime.Runtime'.Reader.create |> M.from_proto + |> function + | Ok r -> r + | Error e -> + failwith + (Printf.sprintf "Could not decode request: %s" + (Ocaml_protoc_plugin.Result.show_error e)) + +let service_spec (type request response) + (module R : S with type Request.t = request and type Response.t = response) + = + { + Grpc.Rpc.Service_spec.package = R.package_name |> Option.to_list; + service_name = R.service_name; + } + +module Client_rpc = struct + let make (type request response) + (module R : S with type Request.t = request and type Response.t = response) + ~request_mode ~response_mode = + { + Grpc.Rpc.Client_rpc.service_spec = service_spec (module R); + rpc_name = R.method_name; + encode_request = encode (module R.Request); + decode_response = decode (module R.Response); + request_mode; + response_mode; + } + + let unary rpc = make rpc ~request_mode:Unary ~response_mode:Unary + let client_streaming rpc = make rpc ~request_mode:Stream ~response_mode:Unary + let server_streaming rpc = make rpc ~request_mode:Unary ~response_mode:Stream + + let bidirectional_streaming rpc = + make rpc ~request_mode:Stream ~response_mode:Stream +end + +module Server_rpc = struct + let make (type request response) + (module R : S with type Request.t = request and type Response.t = response) + ~request_mode ~response_mode = + { + Grpc.Rpc.Server_rpc.service_spec = Some (service_spec (module R)); + rpc_name = R.method_name; + decode_request = decode (module R.Request); + encode_response = encode (module R.Response); + request_mode; + response_mode; + } + + let unary rpc = make rpc ~request_mode:Unary ~response_mode:Unary + let client_streaming rpc = make rpc ~request_mode:Stream ~response_mode:Unary + let server_streaming rpc = make rpc ~request_mode:Unary ~response_mode:Stream + + let bidirectional_streaming rpc = + make rpc ~request_mode:Stream ~response_mode:Stream +end + +let handlers handlers = Grpc.Rpc.Handlers.Handlers { handlers } diff --git a/lib/grpc-protoc-plugin/grpc_protoc_plugin.mli b/lib/grpc-protoc-plugin/grpc_protoc_plugin.mli new file mode 100644 index 0000000..b549936 --- /dev/null +++ b/lib/grpc-protoc-plugin/grpc_protoc_plugin.mli @@ -0,0 +1,92 @@ +(** A utility library for constructing gRPC specifications using + [Ocaml_protoc_plugin]. + + This module is designed to work alongside [Ocaml_protoc_plugin] to generate + gRPC stubs, as outlined in {!module:Grpc.Rpc}. It offers a collection of + helper functions that construct gRPC specifications from the code produced + by [Ocaml_protoc_plugin] based on the services defined in *.proto files. *) + +module type S = Ocaml_protoc_plugin.Service.Rpc +(** For each service delineated in *.proto files, [Ocaml_protoc_plugin] + generates a module that conforms to the type interface [S]. This module + serves as the entry point for this library to create the corresponding + gRPC specifications on the client and server sides. It is to be supplied + to the corresponding helper as a first class module parameter. *) + +(** {1 Client side} *) + +module Client_rpc : sig + val unary : + (module S with type Request.t = 'request and type Response.t = 'response) -> + ( 'request, + Grpc.Rpc.Value_mode.unary, + 'response, + Grpc.Rpc.Value_mode.unary ) + Grpc.Rpc.Client_rpc.t + + val client_streaming : + (module S with type Request.t = 'request and type Response.t = 'response) -> + ( 'request, + Grpc.Rpc.Value_mode.stream, + 'response, + Grpc.Rpc.Value_mode.unary ) + Grpc.Rpc.Client_rpc.t + + val server_streaming : + (module S with type Request.t = 'request and type Response.t = 'response) -> + ( 'request, + Grpc.Rpc.Value_mode.unary, + 'response, + Grpc.Rpc.Value_mode.stream ) + Grpc.Rpc.Client_rpc.t + + val bidirectional_streaming : + (module S with type Request.t = 'request and type Response.t = 'response) -> + ( 'request, + Grpc.Rpc.Value_mode.stream, + 'response, + Grpc.Rpc.Value_mode.stream ) + Grpc.Rpc.Client_rpc.t +end + +(** {1 Server side} *) + +module Server_rpc : sig + val unary : + (module S with type Request.t = 'request and type Response.t = 'response) -> + ( 'request, + Grpc.Rpc.Value_mode.unary, + 'response, + Grpc.Rpc.Value_mode.unary, + Grpc.Rpc.Service_spec.t ) + Grpc.Rpc.Server_rpc.t + + val client_streaming : + (module S with type Request.t = 'request and type Response.t = 'response) -> + ( 'request, + Grpc.Rpc.Value_mode.stream, + 'response, + Grpc.Rpc.Value_mode.unary, + Grpc.Rpc.Service_spec.t ) + Grpc.Rpc.Server_rpc.t + + val server_streaming : + (module S with type Request.t = 'request and type Response.t = 'response) -> + ( 'request, + Grpc.Rpc.Value_mode.unary, + 'response, + Grpc.Rpc.Value_mode.stream, + Grpc.Rpc.Service_spec.t ) + Grpc.Rpc.Server_rpc.t + + val bidirectional_streaming : + (module S with type Request.t = 'request and type Response.t = 'response) -> + ( 'request, + Grpc.Rpc.Value_mode.stream, + 'response, + Grpc.Rpc.Value_mode.stream, + Grpc.Rpc.Service_spec.t ) + Grpc.Rpc.Server_rpc.t +end + +val handlers : 'a list -> ('a, _) Grpc.Rpc.Handlers.t diff --git a/lib/grpc-protoc/dune b/lib/grpc-protoc/dune new file mode 100644 index 0000000..14a98b6 --- /dev/null +++ b/lib/grpc-protoc/dune @@ -0,0 +1,4 @@ +(library + (name grpc_protoc) + (public_name grpc-protoc) + (libraries grpc ocaml-protoc pbrt pbrt_services)) diff --git a/lib/grpc-protoc/grpc_protoc.ml b/lib/grpc-protoc/grpc_protoc.ml new file mode 100644 index 0000000..5ba88aa --- /dev/null +++ b/lib/grpc-protoc/grpc_protoc.ml @@ -0,0 +1,55 @@ +let encode (type a) (encode : a -> Pbrt.Encoder.t -> unit) (a : a) = + let encoder = Pbrt.Encoder.create () in + encode a encoder; + Pbrt.Encoder.to_string encoder + +let decode (type a) (decode : Pbrt.Decoder.t -> a) buffer = + let decoder = Pbrt.Decoder.of_string buffer in + decode decoder + +module Client_rpc = struct + let make (type request response) + (rpc : (request, _, response, _) Pbrt_services.Client.rpc) ~request_mode + ~response_mode = + { + Grpc.Rpc.Client_rpc.service_spec = + { package = rpc.package; service_name = rpc.service_name }; + rpc_name = rpc.rpc_name; + encode_request = encode rpc.encode_pb_req; + decode_response = decode rpc.decode_pb_res; + request_mode; + response_mode; + } + + let unary rpc = make rpc ~request_mode:Unary ~response_mode:Unary + let client_streaming rpc = make rpc ~request_mode:Stream ~response_mode:Unary + let server_streaming rpc = make rpc ~request_mode:Unary ~response_mode:Stream + + let bidirectional_streaming rpc = + make rpc ~request_mode:Stream ~response_mode:Stream +end + +module Server_rpc = struct + let make (type request response) + (rpc : (request, _, response, _) Pbrt_services.Server.rpc) ~request_mode + ~response_mode = + { + Grpc.Rpc.Server_rpc.service_spec = None; + rpc_name = rpc.name; + decode_request = decode rpc.decode_pb_req; + encode_response = encode rpc.encode_pb_res; + request_mode; + response_mode; + } + + let unary rpc = make rpc ~request_mode:Unary ~response_mode:Unary + let client_streaming rpc = make rpc ~request_mode:Stream ~response_mode:Unary + let server_streaming rpc = make rpc ~request_mode:Unary ~response_mode:Stream + + let bidirectional_streaming rpc = + make rpc ~request_mode:Stream ~response_mode:Stream +end + +let handlers { Pbrt_services.Server.package; service_name; handlers } = + Grpc.Rpc.Handlers.With_service_spec + { service_spec = { package; service_name }; handlers } diff --git a/lib/grpc-protoc/grpc_protoc.mli b/lib/grpc-protoc/grpc_protoc.mli new file mode 100644 index 0000000..8486af6 --- /dev/null +++ b/lib/grpc-protoc/grpc_protoc.mli @@ -0,0 +1,116 @@ +(** A utility library for constructing gRPC specifications using [Ocaml_protoc]. + + This module is designed to work alongside [Ocaml_protoc] to generate gRPC + stubs, as outlined in {!module:Grpc.Rpc}. It offers a collection of helper + functions that construct gRPC specifications from the code produced by + [Ocaml_protoc] based on the services defined in *.proto files. *) + +(** {1 Client side} *) + +module Client_rpc : sig + val unary : + ( 'request, + Pbrt_services.Value_mode.unary, + 'response, + Pbrt_services.Value_mode.unary ) + Pbrt_services.Client.rpc -> + ( 'request, + Grpc.Rpc.Value_mode.unary, + 'response, + Grpc.Rpc.Value_mode.unary ) + Grpc.Rpc.Client_rpc.t + + val client_streaming : + ( 'request, + Pbrt_services.Value_mode.stream, + 'response, + Pbrt_services.Value_mode.unary ) + Pbrt_services.Client.rpc -> + ( 'request, + Grpc.Rpc.Value_mode.stream, + 'response, + Grpc.Rpc.Value_mode.unary ) + Grpc.Rpc.Client_rpc.t + + val server_streaming : + ( 'request, + Pbrt_services.Value_mode.unary, + 'response, + Pbrt_services.Value_mode.stream ) + Pbrt_services.Client.rpc -> + ( 'request, + Grpc.Rpc.Value_mode.unary, + 'response, + Grpc.Rpc.Value_mode.stream ) + Grpc.Rpc.Client_rpc.t + + val bidirectional_streaming : + ( 'request, + Pbrt_services.Value_mode.stream, + 'response, + Pbrt_services.Value_mode.stream ) + Pbrt_services.Client.rpc -> + ( 'request, + Grpc.Rpc.Value_mode.stream, + 'response, + Grpc.Rpc.Value_mode.stream ) + Grpc.Rpc.Client_rpc.t +end + +(** {1 Server side} *) + +module Server_rpc : sig + val unary : + ( 'request, + Pbrt_services.Value_mode.unary, + 'response, + Pbrt_services.Value_mode.unary ) + Pbrt_services.Server.rpc -> + ( 'request, + Grpc.Rpc.Value_mode.unary, + 'response, + Grpc.Rpc.Value_mode.unary, + unit ) + Grpc.Rpc.Server_rpc.t + + val client_streaming : + ( 'request, + Pbrt_services.Value_mode.stream, + 'response, + Pbrt_services.Value_mode.unary ) + Pbrt_services.Server.rpc -> + ( 'request, + Grpc.Rpc.Value_mode.stream, + 'response, + Grpc.Rpc.Value_mode.unary, + unit ) + Grpc.Rpc.Server_rpc.t + + val server_streaming : + ( 'request, + Pbrt_services.Value_mode.unary, + 'response, + Pbrt_services.Value_mode.stream ) + Pbrt_services.Server.rpc -> + ( 'request, + Grpc.Rpc.Value_mode.unary, + 'response, + Grpc.Rpc.Value_mode.stream, + unit ) + Grpc.Rpc.Server_rpc.t + + val bidirectional_streaming : + ( 'request, + Pbrt_services.Value_mode.stream, + 'response, + Pbrt_services.Value_mode.stream ) + Pbrt_services.Server.rpc -> + ( 'request, + Grpc.Rpc.Value_mode.stream, + 'response, + Grpc.Rpc.Value_mode.stream, + unit ) + Grpc.Rpc.Server_rpc.t +end + +val handlers : 'a Pbrt_services.Server.t -> (_, 'a) Grpc.Rpc.Handlers.t diff --git a/lib/grpc/grpc.ml b/lib/grpc/grpc.ml index 00ca697..c84744b 100644 --- a/lib/grpc/grpc.ml +++ b/lib/grpc/grpc.ml @@ -2,3 +2,4 @@ module Server = Server module Status = Status module Message = Message module Buffer = Buffer +module Rpc = Rpc diff --git a/lib/grpc/rpc.ml b/lib/grpc/rpc.ml new file mode 100644 index 0000000..75859ac --- /dev/null +++ b/lib/grpc/rpc.ml @@ -0,0 +1,47 @@ +module Value_mode = struct + type unary + type stream + type _ t = Unary : unary t | Stream : stream t +end + +module Service_spec = struct + type t = { package : string list; service_name : string } + + let packaged_service_name { package; service_name } = + String.concat "." (package @ [ service_name ]) +end + +type buffer = string + +module Client_rpc = struct + type ('request, 'request_mode, 'response, 'response_mode) t = { + service_spec : Service_spec.t; + rpc_name : string; + encode_request : 'request -> buffer; + decode_response : buffer -> 'response; + request_mode : 'request_mode Value_mode.t; + response_mode : 'response_mode Value_mode.t; + } +end + +module Server_rpc = struct + module Service_spec = struct + type 'a t = None : unit t | Some : Service_spec.t -> Service_spec.t t + end + + type ('request, 'request_mode, 'response, 'response_mode, 'service_spec) t = { + service_spec : 'service_spec Service_spec.t; + rpc_name : string; + decode_request : buffer -> 'request; + encode_response : 'response -> buffer; + request_mode : 'request_mode Value_mode.t; + response_mode : 'response_mode Value_mode.t; + } +end + +module Handlers = struct + type ('a, 'b) t = + | Handlers of { handlers : 'a list } + | With_service_spec of { handlers : 'b list; service_spec : Service_spec.t } + | ( :: ) of ('a, 'b) t * ('a, 'b) t list +end diff --git a/lib/grpc/rpc.mli b/lib/grpc/rpc.mli new file mode 100644 index 0000000..3d87b4a --- /dev/null +++ b/lib/grpc/rpc.mli @@ -0,0 +1,116 @@ +(** Creating typed specification for RPCs. + + This module provides the functionality to create typed specifications for + RPCs. It defines abstractions for both client and server sides to utilize + the typed interfaces of Grpc. These abstractions can be constructed + directly by advanced users or via helper libraries for commonly used + protoc providers supported by Grpc. *) + +module Value_mode : sig + (** A type used to differentiate between unary and stream values. + + Grpc supports the definition of RPCs that either take and return a single + value or a stream of values. The table below illustrates the four types of + RPCs that can be defined, distinguished by the {!type:Value_mode.t} of + their [request_mode] and [response_mode] fields. + + {t + | request_mode | response_mode | rpc kind | + | :----------: | :------------:|:-----------------------:| + | Unary | Unary | unary | + | Unary | Stream | server_streaming | + | Stream | Unary | client_streaming | + | Stream | Stream | bidirectional_streaming | + } *) + + type unary + type stream + type _ t = Unary : unary t | Stream : stream t +end + +module Service_spec : sig + (** The complete name used to identify a service. *) + + type t = { package : string list; service_name : string } + (** Services can be qualified by a list of {!field:package} names in addition + to their {!field:service_name}. Values of this type are typically + auto-generated from the service interfaces defined in *.proto files. *) + + val packaged_service_name : t -> string + (** This function constructs a canonical service name that acts as a key to + identify and retrieve the correct service at runtime. The convention is + to concatenate the package and service names, separated by a dot. *) +end + +type buffer = string +(** The {!type:buffer} type represents the messages exchanged by the low-level + transport layer of Grpc. The typed specification includes transformation + functions to convert to and from this wire encoding. Depending on the + specification's construction, this string may represent messages in JSON + or Protobuf format. *) + +(** {1 Client side} *) + +module Client_rpc : sig + (** RPC specification used by clients when calling gRPCs. *) + + type ('request, 'request_mode, 'response, 'response_mode) t = { + service_spec : Service_spec.t; + rpc_name : string; + encode_request : 'request -> buffer; + decode_response : buffer -> 'response; + request_mode : 'request_mode Value_mode.t; + response_mode : 'response_mode Value_mode.t; + } +end + +(** {1 Server side} *) + +module Server_rpc : sig + (** RPC specification used by server when implementing gRPCs. *) + + module Service_spec : sig + (** This type indicates whether a {!Service_spec.t} is available in the + server-side specification. + + Grpc supports several protoc providers (ocaml-protoc & ocaml-protoc-plugin), + which differ in the information available in their server-side handlers. + + {t + | protoc library | service_spec in handler | + | :-----------------: | :----------------------:| + | ocaml-protoc | No | + | ocaml-protoc-plugin | Yes | + } *) + type 'a t = None : unit t | Some : Service_spec.t -> Service_spec.t t + end + + type ('request, 'request_mode, 'response, 'response_mode, 'service_spec) t = { + service_spec : 'service_spec Service_spec.t; + rpc_name : string; + decode_request : buffer -> 'request; + encode_response : 'response -> buffer; + request_mode : 'request_mode Value_mode.t; + response_mode : 'response_mode Value_mode.t; + } +end + +module Handlers : sig + (** This type helps distinguish between server handlers that do or do not + contain the specification of the service they implement. The type is + parameterized as it is shared by libraries that depend on different + concurrency libraries, causing the actual type of handlers to vary. *) + type ('a, 'b) t = + | Handlers of { handlers : 'a list } + (** This representation is used when each handler contains a reference + to the service spec, such as when they are built with + [ocaml_protoc_plugin]. *) + | With_service_spec of { handlers : 'b list; service_spec : Service_spec.t } + (** If the service spec is not represented by each handler, it must be + passed along with all handlers that implement an RPC for that + service. *) + | ( :: ) of ('a, 'b) t * ('a, 'b) t list + (** This constructor allows multiple services' handlers to be + implemented on the same server, supplying them grouped using list + syntax. *) +end