From 10ab1f990c579c56394790d0f39ca979517548d9 Mon Sep 17 00:00:00 2001 From: Mathieu Barbin Date: Tue, 9 Jan 2024 09:53:44 +0100 Subject: [PATCH] Implement typed-rpc for Eio - Mark `Grpc_eio.Client.call` as deprecated --- .../greeter-client-eio/greeter_client_eio.ml | 2 +- examples/routeguide-tutorial.md | 8 +- examples/routeguide/src/client.ml | 8 +- lib/grpc-eio/client.ml | 109 ++++++++- lib/grpc-eio/client.mli | 111 ++++++++- lib/grpc-eio/connection.ml | 14 +- lib/grpc-eio/connection.mli | 12 + lib/grpc-eio/server.ml | 224 ++++++++++++++---- lib/grpc-eio/server.mli | 90 +++++++ 9 files changed, 496 insertions(+), 82 deletions(-) create mode 100644 lib/grpc-eio/connection.mli diff --git a/examples/greeter-client-eio/greeter_client_eio.ml b/examples/greeter-client-eio/greeter_client_eio.ml index c8b0530..a95e3ca 100644 --- a/examples/greeter-client-eio/greeter_client_eio.ml +++ b/examples/greeter-client-eio/greeter_client_eio.ml @@ -39,7 +39,7 @@ let main env = in let result = - Grpc_eio.Client.call ~service:"mypackage.Greeter" ~rpc:"SayHello" + Grpc_eio.Client.Rpc.call ~service:"mypackage.Greeter" ~rpc:"SayHello" ~do_request:(H2_eio.Client.request connection ~error_handler:ignore) ~handler:(Grpc_eio.Client.Rpc.unary encoded_request ~f) () diff --git a/examples/routeguide-tutorial.md b/examples/routeguide-tutorial.md index 96128be..da1c5b4 100644 --- a/examples/routeguide-tutorial.md +++ b/examples/routeguide-tutorial.md @@ -439,7 +439,7 @@ Calling the simple RPC `get_feature` requires building up a `Client.call` repres let call_get_feature connection point = let encode, decode = Service.make_client_functions RouteGuide.getFeature in let response = - Client.call ~service:"routeguide.RouteGuide" ~rpc:"GetFeature" + Client.Rpc.call ~service:"routeguide.RouteGuide" ~rpc:"GetFeature" ~do_request:(H2_eio.Client.request connection ~error_handler:ignore) ~handler: (Client.Rpc.unary @@ -476,7 +476,7 @@ let print_features connection = let encode, decode = Service.make_client_functions RouteGuide.listFeatures in let stream = - Client.call ~service:"routeguide.RouteGuide" ~rpc:"ListFeatures" + Client.Rpc.call ~service:"routeguide.RouteGuide" ~rpc:"ListFeatures" ~do_request:(H2_eio.Client.request connection ~error_handler:ignore) ~handler: (Client.Rpc.server_streaming @@ -528,7 +528,7 @@ let run_record_route connection = let encode, decode = Service.make_client_functions RouteGuide.recordRoute in let response = - Client.call ~service:"routeguide.RouteGuide" ~rpc:"RecordRoute" + Client.Rpc.call ~service:"routeguide.RouteGuide" ~rpc:"RecordRoute" ~do_request:(H2_eio.Client.request connection ~error_handler:ignore) ~handler: (Client.Rpc.client_streaming ~f:(fun f response -> @@ -615,7 +615,7 @@ We start by generating a short sequence of locations, similar to how we did for go writer reader' xs) in let result = - Client.call ~service:"routeguide.RouteGuide" ~rpc:"RouteChat" + Client.Rpc.call ~service:"routeguide.RouteGuide" ~rpc:"RouteChat" ~do_request:(H2_eio.Client.request connection ~error_handler:ignore) ~handler: (Client.Rpc.bidirectional_streaming ~f:(fun writer reader -> diff --git a/examples/routeguide/src/client.ml b/examples/routeguide/src/client.ml index 47d8dba..d3b5ab5 100644 --- a/examples/routeguide/src/client.ml +++ b/examples/routeguide/src/client.ml @@ -22,7 +22,7 @@ let client ~sw host port network = let call_get_feature connection point = let encode, decode = Service.make_client_functions RouteGuide.getFeature in let response = - Client.call ~service:"routeguide.RouteGuide" ~rpc:"GetFeature" + Client.Rpc.call ~service:"routeguide.RouteGuide" ~rpc:"GetFeature" ~do_request:(H2_eio.Client.request connection ~error_handler:ignore) ~handler: (Client.Rpc.unary @@ -55,7 +55,7 @@ let print_features connection = let encode, decode = Service.make_client_functions RouteGuide.listFeatures in let stream = - Client.call ~service:"routeguide.RouteGuide" ~rpc:"ListFeatures" + Client.Rpc.call ~service:"routeguide.RouteGuide" ~rpc:"ListFeatures" ~do_request:(H2_eio.Client.request connection ~error_handler:ignore) ~handler: (Client.Rpc.server_streaming @@ -100,7 +100,7 @@ let run_record_route connection = let encode, decode = Service.make_client_functions RouteGuide.recordRoute in let response = - Client.call ~service:"routeguide.RouteGuide" ~rpc:"RecordRoute" + Client.Rpc.call ~service:"routeguide.RouteGuide" ~rpc:"RecordRoute" ~do_request:(H2_eio.Client.request connection ~error_handler:ignore) ~handler: (Client.Rpc.client_streaming ~f:(fun f response -> @@ -178,7 +178,7 @@ let run_route_chat clock connection = go writer reader' xs) in let result = - Client.call ~service:"routeguide.RouteGuide" ~rpc:"RouteChat" + Client.Rpc.call ~service:"routeguide.RouteGuide" ~rpc:"RouteChat" ~do_request:(H2_eio.Client.request connection ~error_handler:ignore) ~handler: (Client.Rpc.bidirectional_streaming ~f:(fun writer reader -> diff --git a/lib/grpc-eio/client.ml b/lib/grpc-eio/client.ml index 4efe5cd..c3039a5 100644 --- a/lib/grpc-eio/client.ml +++ b/lib/grpc-eio/client.ml @@ -43,8 +43,8 @@ let get_response_and_bodies request = let read_body = Eio.Promise.await read_body in (response, read_body, write_body) -let call ~service ~rpc ?(scheme = "https") ~handler ~(do_request : do_request) - ?(headers = default_headers) () = +let call_internal ~service ~rpc ?(scheme = "https") ~handler + ~(do_request : do_request) ?(headers = default_headers) () = let request = make_request ~service ~rpc ~scheme ~headers in let status, trailers_handler = make_trailers_handler () in let response, read_body, write_body = @@ -66,20 +66,98 @@ let call ~service ~rpc ?(scheme = "https") ~handler ~(do_request : do_request) Ok (result, status) | error_status -> Error error_status +let make_handler ~encode_request ~decode_response ~f write_body read_body = + let response_reader, response_writer = Seq.create_reader_writer () in + let request_reader, request_writer = Seq.create_reader_writer () in + Connection.grpc_recv_streaming ~decode:decode_response read_body + response_writer; + let res, res_notify = Eio.Promise.create () in + Eio.Fiber.both + (fun () -> + Eio.Promise.resolve res_notify (f request_writer response_reader)) + (fun () -> + Connection.grpc_send_streaming_client ~encode:encode_request write_body + request_reader); + Eio.Promise.await res + +module Typed_rpc = struct + type ('request, 'request_mode, 'response, 'response_mode, 'a) handler = + ('request, 'request_mode, 'response, 'response_mode) Grpc.Rpc.Client_rpc.t -> + H2.Body.Writer.t -> + H2.Body.Reader.t -> + 'a + + let make_handler (type request response) + ~(rpc : (request, _, response, _) Grpc.Rpc.Client_rpc.t) ~f = + make_handler ~encode_request:rpc.encode_request + ~decode_response:rpc.decode_response ~f + + let bidirectional_streaming (type request response) ~f + (rpc : + ( request, + Grpc.Rpc.Value_mode.stream, + response, + Grpc.Rpc.Value_mode.stream ) + Grpc.Rpc.Client_rpc.t) = + make_handler ~rpc ~f + + let client_streaming (type request response) ~f + (rpc : + ( request, + Grpc.Rpc.Value_mode.stream, + response, + Grpc.Rpc.Value_mode.unary ) + Grpc.Rpc.Client_rpc.t) = + make_handler ~rpc ~f:(fun request_writer responses -> + let response, response_resolver = Eio.Promise.create () in + Eio.Fiber.pair + (fun () -> f request_writer response) + (fun () -> + Eio.Promise.resolve response_resolver + (Seq.read_and_exhaust responses)) + |> fst) + + let server_streaming (type request response) ~f (request : request) + (rpc : + ( request, + Grpc.Rpc.Value_mode.unary, + response, + Grpc.Rpc.Value_mode.stream ) + Grpc.Rpc.Client_rpc.t) = + make_handler ~rpc ~f:(fun request_writer responses -> + Seq.write request_writer request; + Seq.close_writer request_writer; + f responses) + + let unary (type request response) ~f (request : request) + (rpc : + ( request, + Grpc.Rpc.Value_mode.unary, + response, + Grpc.Rpc.Value_mode.unary ) + Grpc.Rpc.Client_rpc.t) = + make_handler ~rpc ~f:(fun request_writer responses -> + Seq.write request_writer request; + Seq.close_writer request_writer; + let response = Seq.read_and_exhaust responses in + f response) + + let call (type request request_mode response response_mode a) + (rpc : + (request, request_mode, response, response_mode) Grpc.Rpc.Client_rpc.t) + ?scheme + ~(handler : (request, request_mode, response, response_mode, a) handler) + ~do_request ?headers () = + call_internal + ~service:(Grpc.Rpc.Service_spec.packaged_service_name rpc.service_spec) + ~rpc:rpc.rpc_name ?scheme ~handler:(handler rpc) ~do_request ?headers () +end + module Rpc = struct type 'a handler = H2.Body.Writer.t -> H2.Body.Reader.t -> 'a - let bidirectional_streaming ~f write_body read_body = - let response_reader, response_writer = Seq.create_reader_writer () in - let request_reader, request_writer = Seq.create_reader_writer () in - Connection.grpc_recv_streaming read_body response_writer; - let res, res_notify = Eio.Promise.create () in - Eio.Fiber.both - (fun () -> - Eio.Promise.resolve res_notify (f request_writer response_reader)) - (fun () -> - Connection.grpc_send_streaming_client write_body request_reader); - Eio.Promise.await res + let bidirectional_streaming ~f = + make_handler ~encode_request:Fun.id ~decode_response:Fun.id ~f let client_streaming ~f = bidirectional_streaming ~f:(fun request_writer responses -> @@ -103,4 +181,9 @@ module Rpc = struct Seq.close_writer request_writer; let response = Seq.read_and_exhaust responses in f response) + + let call = call_internal end + +(* Deprecated in the mli. *) +let call = call_internal diff --git a/lib/grpc-eio/client.mli b/lib/grpc-eio/client.mli index 745d33c..935c6ca 100644 --- a/lib/grpc-eio/client.mli +++ b/lib/grpc-eio/client.mli @@ -1,3 +1,82 @@ +type response_handler = H2.Client_connection.response_handler + +type do_request = + ?flush_headers_immediately:bool -> + ?trailers_handler:(H2.Headers.t -> unit) -> + H2.Request.t -> + response_handler:response_handler -> + H2.Body.Writer.t +(** [do_request] is the type of a function that performs the request *) + +(** {1 Typed API} *) + +module Typed_rpc : sig + (** A typed interface to call RPC from the client side. + + Compared to {!module:Rpc}, this interface will: + + - handle the coding/decoding of messages for you under the hood; + - use the service and RPC names provided by the rpc specification to + call the services with their expected names. *) + + type ('request, 'request_mode, 'response, 'response_mode, 'a) handler + + (** The next functions are meant to be used by the client to handle + call to RPCs. *) + + val bidirectional_streaming : + f:('request Seq.writer -> 'response Seq.t -> 'a) -> + ( 'request, + Grpc.Rpc.Value_mode.stream, + 'response, + Grpc.Rpc.Value_mode.stream, + 'a ) + handler + + val client_streaming : + f:('request Seq.writer -> 'response option Eio.Promise.t -> 'a) -> + ( 'request, + Grpc.Rpc.Value_mode.stream, + 'response, + Grpc.Rpc.Value_mode.unary, + 'a ) + handler + + val server_streaming : + f:('response Seq.t -> 'a) -> + 'request -> + ( 'request, + Grpc.Rpc.Value_mode.unary, + 'response, + Grpc.Rpc.Value_mode.stream, + 'a ) + handler + + val unary : + f:('response option -> 'a) -> + 'request -> + ( 'request, + Grpc.Rpc.Value_mode.unary, + 'response, + Grpc.Rpc.Value_mode.unary, + 'a ) + handler + + val call : + ('request, 'request_mode, 'response, 'response_mode) Grpc.Rpc.Client_rpc.t -> + ?scheme:string -> + handler:('request, 'request_mode, 'response, 'response_mode, 'a) handler -> + do_request:do_request -> + ?headers:H2.Headers.t -> + unit -> + ('a * Grpc.Status.t, H2.Status.t) result + (** [call rpc ~handler ~do_request ()] calls the rpc endpoint given + by [service] and [rpc] using the [do_request] function. The [handler] is + called when this request is set up to send and receive data. *) +end + +(** {1 Untyped API} *) + module Rpc : sig type 'a handler = H2.Body.Writer.t -> H2.Body.Reader.t -> 'a @@ -22,17 +101,20 @@ module Rpc : sig (** [unary ~f enc write read] sets up the sending and receiving logic using [write] and [read], then sends [enc] and calls [f] with a promise for the response. *) -end - -type response_handler = H2.Client_connection.response_handler -type do_request = - ?flush_headers_immediately:bool -> - ?trailers_handler:(H2.Headers.t -> unit) -> - H2.Request.t -> - response_handler:response_handler -> - H2.Body.Writer.t -(** [do_request] is the type of a function that performs the request *) + val call : + service:string -> + rpc:string -> + ?scheme:string -> + handler:'a handler -> + do_request:do_request -> + ?headers:H2.Headers.t -> + unit -> + ('a * Grpc.Status.t, H2.Status.t) result + (** [call ~service ~rpc ~handler ~do_request ()] calls the rpc endpoint given + by [service] and [rpc] using the [do_request] function. The [handler] is + called when this request is set up to send and receive data. *) +end val call : service:string -> @@ -43,6 +125,9 @@ val call : ?headers:H2.Headers.t -> unit -> ('a * Grpc.Status.t, H2.Status.t) result -(** [call ~service ~rpc ~handler ~do_request ()] calls the rpc endpoint given - by [service] and [rpc] using the [do_request] function. The [handler] is - called when this request is set up to send and receive data. *) +[@@ocaml.alert + deprecated "This function was renamed [Grpc_eio.Client.Rpc.call]."] +(** Deprecating this function makes the distinction between the typed and + untyped API more clear. Also, it frees up the name [call] at top level, + which we could use in the future if we decide to "promote" the typed API + to the toplevel scope of this module. *) diff --git a/lib/grpc-eio/connection.ml b/lib/grpc-eio/connection.ml index 3de3965..31f6930 100644 --- a/lib/grpc-eio/connection.ml +++ b/lib/grpc-eio/connection.ml @@ -1,23 +1,25 @@ -let grpc_recv_streaming body message_buffer_writer = +let grpc_recv_streaming ~decode body message_buffer_writer = let request_buffer = Grpc.Buffer.v () in let on_eof () = Seq.close_writer message_buffer_writer in let rec on_read buffer ~off ~len = Grpc.Buffer.copy_from_bigstringaf ~src_off:off ~src:buffer ~dst:request_buffer ~length:len; - Grpc.Message.extract_all (Seq.write message_buffer_writer) request_buffer; + Grpc.Message.extract_all + (fun message -> Seq.write message_buffer_writer (decode message)) + request_buffer; H2.Body.Reader.schedule_read body ~on_read ~on_eof in H2.Body.Reader.schedule_read body ~on_read ~on_eof -let grpc_send_streaming_client body encoder_stream = +let grpc_send_streaming_client ~encode body encoder_stream = Seq.iter (fun encoder -> - let payload = Grpc.Message.make encoder in + let payload = Grpc.Message.make (encode encoder) in H2.Body.Writer.write_string body payload) encoder_stream; H2.Body.Writer.close body -let grpc_send_streaming request encoder_stream status_promise = +let grpc_send_streaming ~encode request encoder_stream status_promise = let body = H2.Reqd.respond_with_streaming ~flush_headers_immediately:true request (H2.Response.create @@ -27,7 +29,7 @@ let grpc_send_streaming request encoder_stream status_promise = in Seq.iter (fun input -> - let payload = Grpc.Message.make input in + let payload = Grpc.Message.make (encode input) in H2.Body.Writer.write_string body payload; H2.Body.Writer.flush body (fun () -> ())) encoder_stream; diff --git a/lib/grpc-eio/connection.mli b/lib/grpc-eio/connection.mli new file mode 100644 index 0000000..fee84be --- /dev/null +++ b/lib/grpc-eio/connection.mli @@ -0,0 +1,12 @@ +val grpc_recv_streaming : + decode:(string -> 'a) -> H2.Body.Reader.t -> 'a Seq.writer -> unit + +val grpc_send_streaming_client : + encode:('a -> string) -> H2.Body.Writer.t -> 'a Seq.reader -> unit + +val grpc_send_streaming : + encode:('a -> string) -> + H2.Reqd.t -> + 'a Seq.reader -> + Grpc.Status.t Eio.Promise.t -> + unit diff --git a/lib/grpc-eio/server.ml b/lib/grpc-eio/server.ml index ffd850c..fff6503 100644 --- a/lib/grpc-eio/server.ml +++ b/lib/grpc-eio/server.ml @@ -1,6 +1,7 @@ module ServiceMap = Map.Make (String) -type service = H2.Reqd.t -> unit +type reqd_handler = H2.Reqd.t -> unit +type service = reqd_handler type t = service ServiceMap.t let v () = ServiceMap.empty @@ -44,6 +45,176 @@ let handle_request t reqd = | None -> respond_with `Unsupported_media_type) | _ -> respond_with `Not_found +let implement_rpc ~decode_request ~encode_response ~f reqd = + let body = H2.Reqd.request_body reqd in + let request_reader, request_writer = Seq.create_reader_writer () in + let response_reader, response_writer = Seq.create_reader_writer () in + Connection.grpc_recv_streaming ~decode:decode_request body request_writer; + let status_promise, status_notify = Eio.Promise.create () in + Eio.Fiber.both + (fun () -> + let respond = Seq.write response_writer in + let status = f request_reader respond in + Seq.close_writer response_writer; + Eio.Promise.resolve status_notify status) + (fun () -> + try + Connection.grpc_send_streaming ~encode:encode_response reqd + response_reader status_promise + with exn -> + (* https://github.com/anmonteiro/ocaml-h2/issues/175 *) + Eio.traceln "%s" (Printexc.to_string exn)) + +module Typed_rpc = struct + module Service = struct + module RpcMap = Map.Make (String) + + type t = reqd_handler RpcMap.t + + let v () = RpcMap.empty + let add_rpc ~name ~rpc t = RpcMap.add name rpc t + + let handle_request (t : t) reqd = + let request = H2.Reqd.request reqd in + let respond_with code = + H2.Reqd.respond_with_string reqd (H2.Response.create code) "" + in + let parts = String.split_on_char '/' request.target in + if List.length parts > 1 then + let rpc_name = List.nth parts (List.length parts - 1) in + match RpcMap.find_opt rpc_name t with + | Some rpc -> rpc reqd + | None -> respond_with `Not_found + else respond_with `Not_found + end + + type server = t + + type ('request, 'response) unary = + 'request -> Grpc.Status.t * 'response option + + type ('request, 'response) client_streaming = + 'request Seq.t -> Grpc.Status.t * 'response option + + type ('request, 'response) server_streaming = + 'request -> ('response -> unit) -> Grpc.Status.t + + type ('request, 'response) bidirectional_streaming = + 'request Seq.t -> ('response -> unit) -> Grpc.Status.t + + type 'service_spec t = + | T : { + rpc_spec : + ( 'request, + 'request_mode, + 'response, + 'response_mode, + 'service_spec ) + Grpc.Rpc.Server_rpc.t; + rpc_impl : reqd_handler; + } + -> 'service_spec t + + let rec make_handlers handlers = + match (handlers : _ Grpc.Rpc.Handlers.t) with + | a :: tl -> List.concat (make_handlers a :: List.map make_handlers tl) + | Handlers { handlers = ts } -> ts + | With_service_spec { service_spec; handlers = ts } -> + List.map + (fun (T t) -> + T + { + t with + rpc_spec = { t.rpc_spec with service_spec = Some service_spec }; + }) + ts + + let server handlers : server = + let handlers = make_handlers handlers in + List.fold_left + (fun map (T t as packed) -> + let service_name = + match t.rpc_spec.service_spec with + | Some service_spec -> + Grpc.Rpc.Service_spec.packaged_service_name service_spec + in + let rpc_impl = + ServiceMap.find_opt service_name map |> Option.value ~default:[] + in + ServiceMap.add service_name (packed :: rpc_impl) map) + ServiceMap.empty handlers + |> ServiceMap.map (fun ts -> + let service = + List.fold_left + (fun acc (T t) -> + Service.add_rpc ~name:t.rpc_spec.rpc_name ~rpc:t.rpc_impl acc) + (Service.v ()) ts + in + Service.handle_request service) + + let implement_rpc (type request response) + ~(rpc_spec : (request, _, response, _, _) Grpc.Rpc.Server_rpc.t) ~f = + let rpc_impl = + implement_rpc ~decode_request:rpc_spec.decode_request + ~encode_response:rpc_spec.encode_response ~f + in + T { rpc_spec; rpc_impl } + + let bidirectional_streaming (type request response) + (rpc_spec : + ( request, + Grpc.Rpc.Value_mode.stream, + response, + Grpc.Rpc.Value_mode.stream, + _ ) + Grpc.Rpc.Server_rpc.t) ~f = + implement_rpc ~rpc_spec ~f + + let unary (type request response) + (rpc_spec : + ( request, + Grpc.Rpc.Value_mode.unary, + response, + Grpc.Rpc.Value_mode.unary, + _ ) + Grpc.Rpc.Server_rpc.t) ~f = + implement_rpc ~rpc_spec ~f:(fun requests respond -> + match Seq.read_and_exhaust requests with + | None -> Grpc.Status.(v OK) + | Some request -> + let status, response = f request in + (match response with + | None -> () + | Some response -> respond response); + status) + + let server_streaming (type request response) + (rpc_spec : + ( request, + Grpc.Rpc.Value_mode.unary, + response, + Grpc.Rpc.Value_mode.stream, + _ ) + Grpc.Rpc.Server_rpc.t) ~f = + implement_rpc ~rpc_spec ~f:(fun requests respond -> + match Seq.read_and_exhaust requests with + | None -> Grpc.Status.(v OK) + | Some request -> f request respond) + + let client_streaming (type request response) + (rpc_spec : + ( request, + Grpc.Rpc.Value_mode.stream, + response, + Grpc.Rpc.Value_mode.unary, + _ ) + Grpc.Rpc.Server_rpc.t) ~f = + implement_rpc ~rpc_spec ~f:(fun requests respond -> + let status, response = f requests in + (match response with None -> () | Some response -> respond response); + status) +end + module Rpc = struct type unary = string -> Grpc.Status.t * string option type client_streaming = string Seq.t -> Grpc.Status.t * string option @@ -59,22 +230,7 @@ module Rpc = struct | Bidirectional_streaming of bidirectional_streaming let bidirectional_streaming ~f reqd = - let body = H2.Reqd.request_body reqd in - let request_reader, request_writer = Seq.create_reader_writer () in - let response_reader, response_writer = Seq.create_reader_writer () in - Connection.grpc_recv_streaming body request_writer; - let status_promise, status_notify = Eio.Promise.create () in - Eio.Fiber.both - (fun () -> - let respond = Seq.write response_writer in - let status = f request_reader respond in - Seq.close_writer response_writer; - Eio.Promise.resolve status_notify status) - (fun () -> - try Connection.grpc_send_streaming reqd response_reader status_promise - with exn -> - (* https://github.com/anmonteiro/ocaml-h2/issues/175 *) - Eio.traceln "%s" (Printexc.to_string exn)) + implement_rpc ~decode_request:Fun.id ~encode_response:Fun.id ~f reqd let client_streaming ~f reqd = bidirectional_streaming reqd ~f:(fun requests respond -> @@ -101,29 +257,15 @@ module Rpc = struct end module Service = struct - module RpcMap = Map.Make (String) - - type t = Rpc.t RpcMap.t - - let v () = RpcMap.empty - let add_rpc ~name ~rpc t = RpcMap.add name rpc t + include Typed_rpc.Service - let handle_request (t : t) reqd = - let request = H2.Reqd.request reqd in - let respond_with code = - H2.Reqd.respond_with_string reqd (H2.Response.create code) "" - in - let parts = String.split_on_char '/' request.target in - if List.length parts > 1 then - let rpc_name = List.nth parts (List.length parts - 1) in - let rpc = RpcMap.find_opt rpc_name t in - match rpc with - | Some rpc -> ( - match rpc with - | Unary f -> Rpc.unary ~f reqd - | Client_streaming f -> Rpc.client_streaming ~f reqd - | Server_streaming f -> Rpc.server_streaming ~f reqd - | Bidirectional_streaming f -> Rpc.bidirectional_streaming ~f reqd) - | None -> respond_with `Not_found - else respond_with `Not_found + let add_rpc ~name ~rpc t = + add_rpc ~name + ~rpc: + (match rpc with + | Rpc.Unary f -> Rpc.unary ~f + | Client_streaming f -> Rpc.client_streaming ~f + | Server_streaming f -> Rpc.server_streaming ~f + | Bidirectional_streaming f -> Rpc.bidirectional_streaming ~f) + t end diff --git a/lib/grpc-eio/server.mli b/lib/grpc-eio/server.mli index 40961f5..715cd3a 100644 --- a/lib/grpc-eio/server.mli +++ b/lib/grpc-eio/server.mli @@ -1,5 +1,95 @@ include Grpc.Server.S +(** {1 Typed API} *) + +module Typed_rpc : sig + (** A typed interface to build RPCs on the server side. + + Compared to {!module:Rpc}, this interface will: + + - handle the coding/decoding of messages for you under the hood; + - use the service and RPC names provided by the rpc specification to + register the services with their expected names. *) + + type server := t + + type ('request, 'response) unary = + 'request -> Grpc.Status.t * 'response option + (** [unary] is the type for a unary grpc rpc, one request, one response. *) + + type ('request, 'response) client_streaming = + 'request Seq.t -> Grpc.Status.t * 'response option + (** [client_streaming] is the type for an rpc where the client streams the + requests and the server responds once. *) + + type ('request, 'response) server_streaming = + 'request -> ('response -> unit) -> Grpc.Status.t + (** [server_streaming] is the type for an rpc where the client sends one + request and the server sends multiple responses. *) + + type ('request, 'response) bidirectional_streaming = + 'request Seq.t -> ('response -> unit) -> Grpc.Status.t + (** [bidirectional_streaming] is the type for an rpc where both the client and + server can send multiple messages. *) + + type 'service_spec t + (** [t] represents an implementation for an RPC on the server side. *) + + (** The next functions are meant to be used by the server to create RPC + implementations. The rpc specification that the function implements must + be provided as it is used to handle coding/decoding of messages. It also + allows to refer to the service and RPC names specified in the [.proto] + file. *) + + val unary : + ( 'request, + Grpc.Rpc.Value_mode.unary, + 'response, + Grpc.Rpc.Value_mode.unary, + 'service_spec ) + Grpc.Rpc.Server_rpc.t -> + f:('request, 'response) unary -> + 'service_spec t + + val client_streaming : + ( 'request, + Grpc.Rpc.Value_mode.stream, + 'response, + Grpc.Rpc.Value_mode.unary, + 'service_spec ) + Grpc.Rpc.Server_rpc.t -> + f:('request, 'response) client_streaming -> + 'service_spec t + + val server_streaming : + ( 'request, + Grpc.Rpc.Value_mode.unary, + 'response, + Grpc.Rpc.Value_mode.stream, + 'service_spec ) + Grpc.Rpc.Server_rpc.t -> + f:('request, 'response) server_streaming -> + 'service_spec t + + val bidirectional_streaming : + ( 'request, + Grpc.Rpc.Value_mode.stream, + 'response, + Grpc.Rpc.Value_mode.stream, + 'service_spec ) + Grpc.Rpc.Server_rpc.t -> + f:('request, 'response) bidirectional_streaming -> + 'service_spec t + + val server : (Grpc.Rpc.Service_spec.t t, unit t) Grpc.Rpc.Handlers.t -> server + (** Having built a list of RPCs you will use this function to package them up + into a server that is ready to be served over the network. This function + takes care of registering the services based on the names provided by the + protoc specification. *) +end + +(** {1 Untyped API} *) + module Rpc : sig type unary = string -> Grpc.Status.t * string option (** [unary] is the type for a unary grpc rpc, one request, one response. *)