diff --git a/dune-project b/dune-project index 9856ec7..cf3a55e 100644 --- a/dune-project +++ b/dune-project @@ -29,7 +29,8 @@ (synopsis "A modular gRPC library") (description "This library builds some of the signatures and implementations of gRPC functionality. This is used in the more specialised package `grpc-lwt` which has more machinery, however this library can also be used to do some bits yourself.") - (tags (network rpc serialisation)) + (tags + (network rpc serialisation)) (depends (ocaml (>= 4.08)) @@ -44,7 +45,8 @@ (synopsis "An Lwt implementation of gRPC") (description "Functionality for building gRPC services and rpcs with `lwt`.") - (tags (network rpc serialisation)) + (tags + (network rpc serialisation)) (depends (grpc (= :version)) @@ -57,31 +59,61 @@ (synopsis "An Async implementation of gRPC") (description "Functionality for building gRPC services and rpcs with `async`.") - (tags (network rpc serialisation)) + (tags + (network rpc serialisation)) (depends (ocaml (>= 4.11)) (grpc (= :version)) - (async (>= v0.16)) + (async + (>= v0.16)) stringext)) (package (name grpc-eio) (synopsis "An Eio implementation of gRPC") (description - "Functionality for building gRPC services and rpcs with `eio`.") + "Functionality for building gRPC services and rpcs with `eio`.") (depends (grpc (= :version)) - (eio (>= 0.12)) - stringext)) + (eio + (>= 0.12)) + stringext)) + +(package + (name grpc-protoc-plugin) + (synopsis "An implementation of gRPC using ocaml-protoc-plugin") + (description + "Functionality for building gRPC services and rpcs with `ocaml-protoc-plugin`") + (depends + (grpc + (= :version)) + (ocaml-protoc-plugin + (>= 4.5)))) + +(package + (name grpc-protoc) + (synopsis "An implementation of gRPC using ocaml-protoc") + (description + "Functionality for building gRPC services and rpcs with `ocaml-protoc`") + (depends + (grpc + (= :version)) + (ocaml-protoc + (>= 3.0)) + (pbrt + (>= 3.0)) + (pbrt_services + (>= 3.0)))) (package (name grpc-examples) (synopsis "Various gRPC examples") (description "Various gRPC examples.") - (tags (network rpc serialisation)) + (tags + (network rpc serialisation)) (depends grpc-lwt h2-lwt-unix @@ -89,25 +121,36 @@ h2-async grpc-eio h2-eio - (ocaml-protoc-plugin (>= 4.5)) + (ocaml-protoc-plugin + (>= 4.5)) ppx_deriving_yojson conduit-lwt-unix cohttp-lwt-unix tls-async - (lwt_ssl (>= 1.2.0)) - (mdx (and (>= 2.2.1) :with-test)) - (eio_main (>= 0.12)) + (lwt_ssl + (>= 1.2.0)) + (mdx + (and + (>= 2.2.1) + :with-test)) + (eio_main + (>= 0.12)) stringext)) (package (name grpc-bench) (synopsis "Benchmarking package for gRPC") (description "Benchmarking package for gRPC.") - (tags (network rpc serialisation benchmark)) + (tags + (network rpc serialisation benchmark)) (depends grpc - (bechamel(>= 0.4.0)) + (bechamel + (>= 0.4.0)) notty - (bechamel-notty (>= 0.4.0)) - (bigstringaf (>= 0.9.1)) - (notty (>= 0.2.3)))) + (bechamel-notty + (>= 0.4.0)) + (bigstringaf + (>= 0.9.1)) + (notty + (>= 0.2.3)))) diff --git a/examples/greeter-client-eio/dune b/examples/greeter-client-eio/dune index 37f97bc..40151b3 100644 --- a/examples/greeter-client-eio/dune +++ b/examples/greeter-client-eio/dune @@ -1,3 +1,3 @@ (executable (name greeter_client_eio) - (libraries grpc grpc-eio ocaml-protoc-plugin eio_main greeter h2 h2-eio)) + (libraries grpc grpc-eio grpc-protoc-plugin eio_main greeter h2 h2-eio)) diff --git a/examples/greeter-client-eio/greeter_client_eio.ml b/examples/greeter-client-eio/greeter_client_eio.ml index c8b0530..bd19265 100644 --- a/examples/greeter-client-eio/greeter_client_eio.ml +++ b/examples/greeter-client-eio/greeter_client_eio.ml @@ -19,31 +19,23 @@ let main env = H2_eio.Client.create_connection ~sw ~error_handler:ignore socket in - let open Ocaml_protoc_plugin in let open Greeter.Mypackage in - let encode, decode = Service.make_client_functions Greeter.sayHello in - let encoded_request = - HelloRequest.make ~name () |> encode |> Writer.contents - in + let request = HelloRequest.make ~name () in - let f decoder = - match decoder with - | Some decoder -> ( - Reader.create decoder |> decode |> function - | Ok v -> v - | Error e -> - failwith - (Printf.sprintf "Could not decode request: %s" - (Result.show_error e))) + let f response = + match response with + | Some response -> response | None -> Greeter.SayHello.Response.make () in let result = - Grpc_eio.Client.call ~service:"mypackage.Greeter" ~rpc:"SayHello" + Grpc_eio.Client.Typed_rpc.call + (Grpc_protoc_plugin.Client_rpc.unary (module Greeter.SayHello)) ~do_request:(H2_eio.Client.request connection ~error_handler:ignore) - ~handler:(Grpc_eio.Client.Rpc.unary encoded_request ~f) + ~handler:(Grpc_eio.Client.Typed_rpc.unary request ~f) () in + Eio.Promise.await (H2_eio.Client.shutdown connection); result in diff --git a/examples/greeter-protoc-client-eio/dune b/examples/greeter-protoc-client-eio/dune new file mode 100644 index 0000000..9ba8a85 --- /dev/null +++ b/examples/greeter-protoc-client-eio/dune @@ -0,0 +1,3 @@ +(executable + (name greeter_client_eio) + (libraries grpc grpc-eio grpc-protoc eio_main greeter_protoc h2 h2-eio)) diff --git a/examples/greeter-protoc-client-eio/greeter_client_eio.ml b/examples/greeter-protoc-client-eio/greeter_client_eio.ml new file mode 100644 index 0000000..5fc1521 --- /dev/null +++ b/examples/greeter-protoc-client-eio/greeter_client_eio.ml @@ -0,0 +1,48 @@ +let main env = + let name = if Array.length Sys.argv > 1 then Sys.argv.(1) else "anonymous" in + let host = "localhost" in + let port = "8080" in + let network = Eio.Stdenv.net env in + let run sw = + let inet, port = + Eio_unix.run_in_systhread (fun () -> + Unix.getaddrinfo host port [ Unix.(AI_FAMILY PF_INET) ]) + |> List.filter_map (fun (addr : Unix.addr_info) -> + match addr.ai_addr with + | Unix.ADDR_UNIX _ -> None + | ADDR_INET (addr, port) -> Some (addr, port)) + |> List.hd + in + let addr = `Tcp (Eio_unix.Net.Ipaddr.of_unix inet, port) in + let socket = Eio.Net.connect ~sw network addr in + let connection = + H2_eio.Client.create_connection ~sw ~error_handler:ignore socket + in + + let request = Greeter_protoc.Greeter.default_hello_request ~name () in + + let f response = + match response with + | Some response -> response + | None -> Greeter_protoc.Greeter.default_hello_reply () + in + + let result = + Grpc_eio.Client.Typed_rpc.call + (Grpc_protoc.Client_rpc.unary + Greeter_protoc.Greeter.Greeter.Client.sayHello) + ~do_request:(H2_eio.Client.request connection ~error_handler:ignore) + ~handler:(Grpc_eio.Client.Typed_rpc.unary request ~f) + () + in + + Eio.Promise.await (H2_eio.Client.shutdown connection); + result + in + Eio.Switch.run run + +let () = + match Eio_main.run main with + | Ok (reply, status) -> + Eio.traceln "%s: %s" (Grpc.Status.show status) reply.message + | Error err -> Eio.traceln "Error: %a" H2.Status.pp_hum err diff --git a/examples/greeter-protoc-server-eio/dune b/examples/greeter-protoc-server-eio/dune new file mode 100644 index 0000000..b4cb2b0 --- /dev/null +++ b/examples/greeter-protoc-server-eio/dune @@ -0,0 +1,3 @@ +(executable + (name greeter_server_eio) + (libraries grpc grpc-eio grpc-protoc eio_main greeter_protoc h2 h2-eio)) diff --git a/examples/greeter-protoc-server-eio/greeter_server_eio.ml b/examples/greeter-protoc-server-eio/greeter_server_eio.ml new file mode 100644 index 0000000..ac11d5d --- /dev/null +++ b/examples/greeter-protoc-server-eio/greeter_server_eio.ml @@ -0,0 +1,59 @@ +open Grpc_eio + +let sayHello rpc = + Grpc_eio.Server.Typed_rpc.unary (Grpc_protoc.Server_rpc.unary rpc) + ~f:(fun (request : Greeter_protoc.Greeter.hello_request) -> + let message = + if request.name = "" then "You forgot your name!" + else Format.sprintf "Hello, %s!" request.name + in + let reply = Greeter_protoc.Greeter.default_hello_reply ~message () in + (Grpc.Status.(v OK), Some reply)) + +let connection_handler server sw = + let error_handler client_address ?request:_ _error start_response = + Eio.traceln "Error in request from:%a" Eio.Net.Sockaddr.pp client_address; + let response_body = start_response H2.Headers.empty in + H2.Body.Writer.write_string response_body + "There was an error handling your request.\n"; + H2.Body.Writer.close response_body + in + let request_handler client_address request_descriptor = + Eio.traceln "Handling a request from:%a" Eio.Net.Sockaddr.pp client_address; + Eio.Fiber.fork ~sw (fun () -> + Grpc_eio.Server.handle_request server request_descriptor) + in + fun socket addr -> + H2_eio.Server.create_connection_handler ?config:None ~request_handler + ~error_handler addr ~sw socket + +let serve server env = + let port = 8080 in + let net = Eio.Stdenv.net env in + let addr = `Tcp (Eio.Net.Ipaddr.V4.loopback, port) in + Eio.Switch.run @@ fun sw -> + let handler = connection_handler server sw in + let server_socket = + Eio.Net.listen net ~sw ~reuse_addr:true ~backlog:10 addr + in + let rec listen () = + Eio.Net.accept_fork ~sw server_socket + ~on_error:(fun exn -> Eio.traceln "%s" (Printexc.to_string exn)) + handler; + listen () + in + Printf.printf "Listening on port %i for grpc requests\n" port; + print_endline ""; + print_endline "Try running:"; + print_endline ""; + print_endline + {| dune exec -- examples/greeter-protoc-client-eio/greeter_client_eio.exe |}; + listen () + +let () = + let server = + Greeter_protoc.Greeter.Greeter.Server.make ~sayHello () + |> Grpc_protoc.handlers |> Server.Typed_rpc.server + in + + Eio_main.run (serve server) diff --git a/examples/greeter-protoc/dune b/examples/greeter-protoc/dune new file mode 100644 index 0000000..9ebdcdb --- /dev/null +++ b/examples/greeter-protoc/dune @@ -0,0 +1,14 @@ +(library + (name greeter_protoc) + (package grpc-examples) + (libraries ocaml-protoc pbrt pbrt_services)) + +(rule + (copy ../greeter/greeter.proto greeter.proto)) + +(rule + (targets greeter.ml greeter.mli) + (deps + (:proto greeter.proto)) + (action + (run ocaml-protoc %{proto} --binary --pp --services --ml_out .))) diff --git a/examples/greeter-server-eio/dune b/examples/greeter-server-eio/dune index 8108aa6..05f400e 100644 --- a/examples/greeter-server-eio/dune +++ b/examples/greeter-server-eio/dune @@ -1,3 +1,3 @@ (executable (name greeter_server_eio) - (libraries grpc grpc-eio ocaml-protoc-plugin eio_main greeter h2 h2-eio)) + (libraries grpc grpc-eio grpc-protoc-plugin eio_main greeter h2 h2-eio)) diff --git a/examples/greeter-server-eio/greeter_server_eio.ml b/examples/greeter-server-eio/greeter_server_eio.ml index 16aaba0..bd2a636 100644 --- a/examples/greeter-server-eio/greeter_server_eio.ml +++ b/examples/greeter-server-eio/greeter_server_eio.ml @@ -1,22 +1,16 @@ open Grpc_eio -let say_hello buffer = - let open Ocaml_protoc_plugin in - let open Greeter.Mypackage in - let decode, encode = Service.make_service_functions Greeter.sayHello in - let request = - Reader.create buffer |> decode |> function - | Ok v -> v - | Error e -> - failwith - (Printf.sprintf "Could not decode request: %s" (Result.show_error e)) - in - let message = - if request = "" then "You forgot your name!" - else Format.sprintf "Hello, %s!" request - in - let reply = Greeter.SayHello.Response.make ~message () in - (Grpc.Status.(v OK), Some (encode reply |> Writer.contents)) +let say_hello = + let module SayHello = Greeter.Mypackage.Greeter.SayHello in + Grpc_eio.Server.Typed_rpc.unary + (Grpc_protoc_plugin.Server_rpc.unary (module SayHello)) + ~f:(fun request -> + let message = + if request = "" then "You forgot your name!" + else Format.sprintf "Hello, %s!" request + in + let reply = SayHello.Response.make ~message () in + (Grpc.Status.(v OK), Some reply)) let connection_handler server sw = let error_handler client_address ?request:_ _error start_response = @@ -59,12 +53,8 @@ let serve server env = listen () let () = - let greeter_service = - Server.Service.( - v () |> add_rpc ~name:"SayHello" ~rpc:(Unary say_hello) |> handle_request) - in let server = - Server.( - v () |> add_service ~name:"mypackage.Greeter" ~service:greeter_service) + Server.Typed_rpc.server (Grpc_protoc_plugin.handlers [ say_hello ]) in + Eio_main.run (serve server) diff --git a/examples/routeguide-protoc/proto/dune b/examples/routeguide-protoc/proto/dune new file mode 100644 index 0000000..6d00d81 --- /dev/null +++ b/examples/routeguide-protoc/proto/dune @@ -0,0 +1,24 @@ +(library + (name routeguide_protoc) + (package grpc-examples) + (preprocess + (pps ppx_deriving.show ppx_deriving.eq)) + (libraries ocaml-protoc pbrt pbrt_services)) + +(rule + (copy ../../routeguide/proto/route_guide.proto route_guide.proto)) + +(rule + (targets route_guide.ml route_guide.mli) + (deps + (:proto route_guide.proto)) + (action + (run + ocaml-protoc + %{proto} + --binary + --ocaml_all_types_ppx + "deriving show { with_path = false }, eq" + --services + --ml_out + .))) diff --git a/examples/routeguide-protoc/src/client.ml b/examples/routeguide-protoc/src/client.ml new file mode 100644 index 0000000..860da38 --- /dev/null +++ b/examples/routeguide-protoc/src/client.ml @@ -0,0 +1,198 @@ +open Grpc_eio +module Route_guide = Routeguide_protoc.Route_guide + +(* $MDX part-begin=client-h2 *) +let client ~sw host port network = + let inet, port = + Eio_unix.run_in_systhread (fun () -> + Unix.getaddrinfo host port [ Unix.(AI_FAMILY PF_INET) ]) + |> List.filter_map (fun (addr : Unix.addr_info) -> + match addr.ai_addr with + | Unix.ADDR_UNIX _ -> None + | ADDR_INET (addr, port) -> Some (addr, port)) + |> List.hd + in + let addr = `Tcp (Eio_unix.Net.Ipaddr.of_unix inet, port) in + let socket = Eio.Net.connect ~sw network addr in + H2_eio.Client.create_connection ~sw ~error_handler:ignore socket + +(* $MDX part-end *) +(* $MDX part-begin=client-get-feature *) +let call_get_feature connection point = + let response = + Client.Typed_rpc.call + (Grpc_protoc.Client_rpc.unary Route_guide.RouteGuide.Client.getFeature) + ~do_request:(H2_eio.Client.request connection ~error_handler:ignore) + ~handler: + (Client.Typed_rpc.unary point ~f:(function + | Some feature -> feature + | None -> Route_guide.default_feature ())) + () + in + match response with + | Ok (res, _ok) -> + Format.printf "RESPONSE = {%s}" (Route_guide.show_feature res) + | Error _ -> Printf.printf "an error occurred" + +(* $MDX part-end *) +(* $MDX part-begin=client-list-features *) +let print_features connection = + let rectangle = + Route_guide.default_rectangle + ~lo: + (Routeguide_protoc.Route_guide.default_point ~latitude:400000000l + ~longitude:(-750000000l) () + |> Option.some) + ~hi: + (Routeguide_protoc.Route_guide.default_point ~latitude:420000000l + ~longitude:(-730000000l) () + |> Option.some) + () + in + + let stream = + Client.Typed_rpc.call + (Grpc_protoc.Client_rpc.server_streaming + Route_guide.RouteGuide.Client.listFeatures) + ~do_request:(H2_eio.Client.request connection ~error_handler:ignore) + ~handler:(Client.Typed_rpc.server_streaming rectangle ~f:Fun.id) + () + in + match stream with + | Ok (results, _ok) -> + Seq.iter + (fun f -> Format.printf "RESPONSE = {%a}" Route_guide.pp_feature f) + results + | Error e -> + failwith (Printf.sprintf "HTTP2 error: %s" (H2.Status.to_string e)) + +(* $MDX part-end *) +(* $MDX part-begin=client-random-point *) +let random_point () : Route_guide.point = + let latitude = (Random.int 180 - 90) * 10000000 |> Int32.of_int in + let longitude = (Random.int 360 - 180) * 10000000 |> Int32.of_int in + Route_guide.default_point ~latitude ~longitude () + +(* $MDX part-end *) +(* $MDX part-begin=client-record-route *) +let run_record_route connection = + let points = + Random.int 100 + |> Seq.unfold (function 0 -> None | x -> Some (random_point (), x - 1)) + in + + let response = + Client.Typed_rpc.call + (Grpc_protoc.Client_rpc.client_streaming + Route_guide.RouteGuide.Client.recordRoute) + ~do_request:(H2_eio.Client.request connection ~error_handler:ignore) + ~handler: + (Client.Typed_rpc.client_streaming ~f:(fun f response -> + (* Stream points to server. *) + Seq.iter (fun point -> Seq.write f point) points; + + (* Signal we have finished sending points. *) + Seq.close_writer f; + + (* Decode RouteSummary responses. *) + Eio.Promise.await response |> function + | Some summary -> summary + | None -> failwith (Printf.sprintf "No RouteSummary received."))) + () + in + match response with + | Ok (result, _ok) -> + Format.printf "SUMMARY = {%a}" Route_guide.pp_route_summary result + | Error e -> + failwith (Printf.sprintf "HTTP2 error: %s" (H2.Status.to_string e)) + +(* $MDX part-end *) +(* $MDX part-begin=client-route-chat-1 *) +let run_route_chat clock connection = + (* Generate locations. *) + let location_count = 5 in + Printf.printf "Generating %i locations\n" location_count; + let route_notes = + location_count + |> Seq.unfold (function + | 0 -> None + | x -> + Some + ( Route_guide.default_route_note + ~location:(random_point () |> Option.some) + ~message:(Printf.sprintf "Random Message %i" x) + (), + x - 1 )) + in + (* $MDX part-end *) + (* $MDX part-begin=client-route-chat-2 *) + let rec go writer reader notes = + match Seq.uncons notes with + | None -> + Seq.close_writer writer (* Signal no more notes from the client. *) + | Some (route_note, xs) -> ( + Seq.write writer route_note; + + (* Yield and sleep, waiting for server reply. *) + Eio.Time.sleep clock 1.0; + Eio.Fiber.yield (); + + match Seq.uncons reader with + | None -> failwith "Expecting response" + | Some (route_note, reader') -> + Format.printf "NOTE = {%s}\n" + (Route_guide.show_route_note route_note); + go writer reader' xs) + in + let result = + Client.Typed_rpc.call + (Grpc_protoc.Client_rpc.bidirectional_streaming + Route_guide.RouteGuide.Client.routeChat) + ~do_request:(H2_eio.Client.request connection ~error_handler:ignore) + ~handler: + (Client.Typed_rpc.bidirectional_streaming ~f:(fun writer reader -> + go writer reader route_notes)) + () + in + match result with + | Ok ((), _ok) -> () + | Error e -> + failwith (Printf.sprintf "HTTP2 error: %s" (H2.Status.to_string e)) + +(* $MDX part-end *) +(* $MDX part-begin=client-main *) + +let main env = + let port = "8080" in + let host = "localhost" in + let clock = Eio.Stdenv.clock env in + let network = Eio.Stdenv.net env in + let () = Random.self_init () in + + let run sw = + let connection = client ~sw host port network in + + Printf.printf "*** SIMPLE RPC ***\n"; + let request = + Route_guide.default_point ~latitude:409146138l ~longitude:(-746188906l) () + in + let result = call_get_feature connection request in + + Printf.printf "\n*** SERVER STREAMING ***\n"; + print_features connection; + + Printf.printf "\n*** CLIENT STREAMING ***\n"; + run_record_route connection; + + Printf.printf "\n*** BIDIRECTIONAL STREAMING ***\n"; + run_route_chat clock connection; + + Eio.Promise.await (H2_eio.Client.shutdown connection); + result + in + + Eio.Switch.run run + +let () = Eio_main.run main + +(* $MDX part-end *) diff --git a/examples/routeguide-protoc/src/dune b/examples/routeguide-protoc/src/dune new file mode 100644 index 0000000..b0b91be --- /dev/null +++ b/examples/routeguide-protoc/src/dune @@ -0,0 +1,14 @@ +(executables + (names server client) + (package grpc-examples) + (public_names routeguide-protoc-server routeguide-protoc-client) + (libraries + grpc-eio + grpc-protoc + eio_main + h2-eio + routeguide_protoc + yojson + ppx_deriving_yojson.runtime) + (preprocess + (pps ppx_deriving_yojson ppx_deriving.show ppx_deriving.eq))) diff --git a/examples/routeguide-protoc/src/server.ml b/examples/routeguide-protoc/src/server.ml new file mode 100644 index 0000000..3c18fb3 --- /dev/null +++ b/examples/routeguide-protoc/src/server.ml @@ -0,0 +1,245 @@ +open Grpc_eio +module Route_guide = Routeguide_protoc.Route_guide + +(* Derived data types to make reading JSON data easier. *) +type location = { latitude : int; longitude : int } [@@deriving yojson] +type feature = { location : location; name : string } [@@deriving yojson] +type feature_list = feature list [@@deriving yojson] + +(* This will act as a master state that the server is serving over RPC. *) +type t = { features : Route_guide.feature list } + +module RouteNotesMap = Hashtbl.Make (struct + type t = Route_guide.point + + let equal = Route_guide.equal_point + let hash s = Hashtbl.hash s +end) + +(** Load route_guide data from a JSON file. *) +let load_features path : Route_guide.feature list = + let json = Yojson.Safe.from_file path in + match feature_list_of_yojson json with + | Ok v -> + List.map + (fun feature -> + Route_guide.default_feature ~name:feature.name + ~location: + (Route_guide.default_point + ~longitude:(feature.location.longitude |> Int32.of_int) + ~latitude:(feature.location.latitude |> Int32.of_int) + () + |> Option.some) + ()) + v + | Error err -> failwith err + +let in_range (point : Route_guide.point) (rect : Route_guide.rectangle) : bool = + let lo = Option.get rect.lo in + let hi = Option.get rect.hi in + + let left = Int32.min lo.longitude hi.longitude in + let right = Int32.max lo.longitude hi.longitude in + let top = Int32.max lo.latitude hi.latitude in + let bottom = Int32.min lo.latitude hi.latitude in + + point.longitude >= left && point.longitude <= right + && point.latitude >= bottom && point.latitude <= top + +let pi = 4. *. atan 1. +let radians_of_degrees = ( *. ) (pi /. 180.) + +(* Calculates the distance between two points using the "haversine" formula. *) +(* This code was taken from http://www.movable-type.co.uk/scripts/latlong.html. *) +let calc_distance (p1 : Route_guide.point) (p2 : Route_guide.point) : int = + let cord_factor = 1e7 in + let r = 6_371_000.0 in + (* meters *) + let lat1 = Int32.to_float p1.latitude /. cord_factor in + let lat2 = Int32.to_float p2.latitude /. cord_factor in + let lng1 = Int32.to_float p1.longitude /. cord_factor in + let lng2 = Int32.to_float p2.longitude /. cord_factor in + + let lat_rad1 = radians_of_degrees lat1 in + let lat_rad2 = radians_of_degrees lat2 in + + let delta_lat = radians_of_degrees (lat2 -. lat1) in + let delta_lng = radians_of_degrees (lng2 -. lng1) in + + let a = + (sin (delta_lat /. 2.0) *. sin (delta_lat /. 2.0)) + +. cos lat_rad1 *. cos lat_rad2 + *. sin (delta_lng /. 2.0) + *. sin (delta_lng /. 2.0) + in + let c = 2.0 *. atan2 (sqrt a) (sqrt (1.0 -. a)) in + Float.to_int (r *. c) + +(* $MDX part-begin=server-get-feature *) +let get_feature (t : t) rpc = + Grpc_eio.Server.Typed_rpc.unary (Grpc_protoc.Server_rpc.unary rpc) + ~f:(fun point -> + Eio.traceln "GetFeature = {:%a}" Route_guide.pp_point point; + + (* Lookup the feature and if found return it. *) + let feature = + List.find_opt + (fun (f : Route_guide.feature) -> + match (f.location, point) with + | Some p1, p2 -> Route_guide.equal_point p1 p2 + | _, _ -> false) + t.features + in + Eio.traceln "Found feature %s" + (feature + |> Option.map Route_guide.show_feature + |> Option.value ~default:"Missing"); + match feature with + | Some feature -> (Grpc.Status.(v OK), Some feature) + | None -> + (* No feature was found, return an unnamed feature. *) + ( Grpc.Status.(v OK), + Some (Route_guide.default_feature ~location:(Some point) ()) )) + +(* $MDX part-end *) +(* $MDX part-begin=server-list-features *) +let list_features (t : t) rpc = + Grpc_eio.Server.Typed_rpc.server_streaming + (Grpc_protoc.Server_rpc.server_streaming rpc) ~f:(fun rectangle f -> + (* Lookup and reply with features found. *) + let () = + List.iter + (fun (feature : Route_guide.feature) -> + if in_range (Option.get feature.location) rectangle then f feature + else ()) + t.features + in + Grpc.Status.(v OK)) + +(* $MDX part-end *) +(* $MDX part-begin=server-record-route *) +let record_route (t : t) (clock : _ Eio.Time.clock) rpc = + Grpc_eio.Server.Typed_rpc.client_streaming + (Grpc_protoc.Server_rpc.client_streaming rpc) + ~f:(fun (stream : Route_guide.point Seq.t) -> + Eio.traceln "RecordRoute"; + + let last_point = ref None in + let start = Eio.Time.now clock in + + let point_count, feature_count, distance = + Seq.fold_left + (fun (point_count, feature_count, distance) point -> + Eio.traceln " ==> Point = {%s}" (Route_guide.show_point point); + + (* Increment the point count *) + let point_count = point_count + 1 in + + (* Find features *) + let feature_count = + List.find_all + (fun (feature : Route_guide.feature) -> + Route_guide.equal_point (Option.get feature.location) point) + t.features + |> fun x -> List.length x + feature_count + in + + (* Calculate the distance *) + let distance = + match !last_point with + | Some last_point -> calc_distance last_point point + | None -> distance + in + last_point := Some point; + (point_count, feature_count, distance)) + (0, 0, 0) stream + in + let stop = Eio.Time.now clock in + let elapsed_time = int_of_float (stop -. start) in + let summary = + Route_guide.default_route_summary + ~point_count:(point_count |> Int32.of_int) + ~feature_count:(feature_count |> Int32.of_int) + ~distance:(distance |> Int32.of_int) + ~elapsed_time:(elapsed_time |> Int32.of_int) + () + in + Eio.traceln "RecordRoute exit\n"; + (Grpc.Status.(v OK), Some summary)) + +(* $MDX part-end *) +(* $MDX part-begin=server-route-chat *) +let route_chat (_ : t) rpc = + Grpc_eio.Server.Typed_rpc.bidirectional_streaming + (Grpc_protoc.Server_rpc.bidirectional_streaming rpc) + ~f:(fun + (stream : Route_guide.route_note Seq.t) + (f : Route_guide.route_note -> unit) + -> + Printf.printf "RouteChat\n"; + + Seq.iter + (fun note -> + Printf.printf " ==> Note = {%s}\n" (Route_guide.show_route_note note); + f note) + stream; + + Printf.printf "RouteChat exit\n"; + Grpc.Status.(v OK)) + +(* $MDX part-end *) +(* $MDX part-begin=server-grpc *) +let server t clock = + Route_guide.RouteGuide.Server.make ~getFeature:(get_feature t) + ~listFeatures:(list_features t) ~recordRoute:(record_route t clock) + ~routeChat:(route_chat t) () + |> Grpc_protoc.handlers |> Server.Typed_rpc.server + +(* $MDX part-end *) +let connection_handler server ~sw = + let error_handler client_address ?request:_ _error start_response = + Eio.traceln "Error in request from:%a" Eio.Net.Sockaddr.pp client_address; + let response_body = start_response H2.Headers.empty in + H2.Body.Writer.write_string response_body + "There was an error handling your request.\n"; + H2.Body.Writer.close response_body + in + let request_handler _client_address request_descriptor = + Eio.Fiber.fork ~sw (fun () -> + Grpc_eio.Server.handle_request server request_descriptor) + in + fun socket addr -> + H2_eio.Server.create_connection_handler ?config:None ~request_handler + ~error_handler addr socket ~sw + +(* $MDX part-begin=server-main *) +let serve t env = + let port = 8080 in + let net = Eio.Stdenv.net env in + let clock = Eio.Stdenv.clock env in + let addr = `Tcp (Eio.Net.Ipaddr.V4.loopback, port) in + Eio.Switch.run @@ fun sw -> + let handler = connection_handler ~sw (server t clock) in + let server_socket = + Eio.Net.listen net ~sw ~reuse_addr:true ~backlog:10 addr + in + let rec listen () = + Eio.Net.accept_fork ~sw server_socket + ~on_error:(fun exn -> Eio.traceln "%s" (Printexc.to_string exn)) + handler; + listen () + in + Eio.traceln "Listening on port %i for grpc requests\n" port; + listen () + +let () = + let path = + if Array.length Sys.argv > 1 then Sys.argv.(1) + else failwith "Path to datafile required." + in + + (* Load features. *) + let t = { features = load_features path } in + + Eio_main.run (serve t) +(* $MDX part-end *) diff --git a/examples/routeguide-tutorial.md b/examples/routeguide-tutorial.md index 96128be..b99e3b3 100644 --- a/examples/routeguide-tutorial.md +++ b/examples/routeguide-tutorial.md @@ -192,20 +192,10 @@ The individual service functions from our proto definition are implemented using ```ocaml -let route_guide_service clock = - Server.Service.( - v () - |> add_rpc ~name:"GetFeature" ~rpc:(Unary get_feature) - |> add_rpc ~name:"ListFeatures" ~rpc:(Server_streaming list_features) - |> add_rpc ~name:"RecordRoute" ~rpc:(Client_streaming (record_route clock)) - |> add_rpc ~name:"RouteChat" ~rpc:(Bidirectional_streaming route_chat) - |> handle_request) - -let server clock = - Server.( - v () - |> add_service ~name:"routeguide.RouteGuide" - ~service:(route_guide_service clock)) +let server t clock = + Server.Typed_rpc.server + (Grpc_protoc_plugin.handlers + [ get_feature t; list_features t; record_route t clock; route_chat t ]) ``` ### Simple RPC @@ -214,36 +204,28 @@ Let's look at the simplest type first, `GetFeature` which just gets a `Point` fr ```ocaml -let get_feature (buffer : string) = - let decode, encode = Service.make_service_functions RouteGuide.getFeature in - (* Decode the request. *) - let point = - Reader.create buffer |> decode |> function - | Ok v -> v - | Error e -> - failwith - (Printf.sprintf "Could not decode request: %s" (Result.show_error e)) - in - Eio.traceln "GetFeature = {:%s}" (Point.show point); - - (* Lookup the feature and if found return it. *) - let feature = - List.find_opt - (fun (f : Feature.t) -> - match (f.location, point) with - | Some p1, p2 -> Point.equal p1 p2 - | _, _ -> false) - !features - in - Eio.traceln "Found feature %s" - (feature |> Option.map Feature.show |> Option.value ~default:"Missing"); - match feature with - | Some feature -> - (Grpc.Status.(v OK), Some (feature |> encode |> Writer.contents)) - | None -> - (* No feature was found, return an unnamed feature. *) - ( Grpc.Status.(v OK), - Some (Feature.make ~location:point () |> encode |> Writer.contents) ) +let get_feature (t : t) = + Grpc_eio.Server.Typed_rpc.unary + (Grpc_protoc_plugin.Server_rpc.unary (module RouteGuide.GetFeature)) + ~f:(fun point -> + Eio.traceln "GetFeature = {:%s}" (Point.show point); + + (* Lookup the feature and if found return it. *) + let feature = + List.find_opt + (fun (f : Feature.t) -> + match (f.location, point) with + | Some p1, p2 -> Point.equal p1 p2 + | _, _ -> false) + t.features + in + Eio.traceln "Found feature %s" + (feature |> Option.map Feature.show |> Option.value ~default:"Missing"); + match feature with + | Some feature -> (Grpc.Status.(v OK), Some feature) + | None -> + (* No feature was found, return an unnamed feature. *) + (Grpc.Status.(v OK), Some (Feature.make ~location:point ()))) ``` The method is passed the client's `Point` protocol buffer request. It decodes the request into a `Point.t` and uses that to look up the feature. It returns a `Feature` protocol buffer object with the response information indicating the successful response, based on the feature found or an unnamed default feature. @@ -254,27 +236,20 @@ Now let's look at one of our streaming RPCs. `list_features` is a server-side st ```ocaml -let list_features (buffer : string) (f : string -> unit) = - (* Decode request. *) - let decode, encode = Service.make_service_functions RouteGuide.listFeatures in - let rectangle = - Reader.create buffer |> decode |> function - | Ok v -> v - | Error e -> - failwith - (Printf.sprintf "Could not decode request: %s" (Result.show_error e)) - in - - (* Lookup and reply with features found. *) - let () = - List.iter - (fun (feature : Feature.t) -> - if in_range (Option.get feature.location) rectangle then - encode feature |> Writer.contents |> f - else ()) - !features - in - Grpc.Status.(v OK) +let list_features (t : t) = + Grpc_eio.Server.Typed_rpc.server_streaming + (Grpc_protoc_plugin.Server_rpc.server_streaming + (module RouteGuide.ListFeatures)) + ~f:(fun rectangle f -> + (* Lookup and reply with features found. *) + let () = + List.iter + (fun (feature : Feature.t) -> + if in_range (Option.get feature.location) rectangle then f feature + else ()) + t.features + in + Grpc.Status.(v OK)) ``` Like `get_feature` `list_feature`'s input is a single message. A `Rectangle` that is decoded from a string buffer. The `f: (string -> unit)` function is for writing the encoded responses back to the client. In the function we decode the request, lookup any matching features and stream them back to the client as we find them using `f`. Once we've looked at all the `features` we respond with an `OK` indicating the streaming has finished successfully. @@ -285,55 +260,50 @@ Now let's look at something a little more complicated: the client-side streaming ```ocaml -let record_route (clock : _ Eio.Time.clock) (stream : string Seq.t) = - Eio.traceln "RecordRoute"; - - let last_point = ref None in - let start = Eio.Time.now clock in - let decode, encode = Service.make_service_functions RouteGuide.recordRoute in - - let point_count, feature_count, distance = - Seq.fold_left - (fun (point_count, feature_count, distance) i -> - let point = - Reader.create i |> decode |> function - | Ok v -> v - | Error e -> - failwith - (Printf.sprintf "Could not decode request: %s" - (Result.show_error e)) - in - Eio.traceln " ==> Point = {%s}" (Point.show point); - - (* Increment the point count *) - let point_count = point_count + 1 in - - (* Find features *) - let feature_count = - List.find_all - (fun (feature : Feature.t) -> - Point.equal (Option.get feature.location) point) - !features - |> fun x -> List.length x + feature_count - in - - (* Calculate the distance *) - let distance = - match !last_point with - | Some last_point -> calc_distance last_point point - | None -> distance - in - last_point := Some point; - (point_count, feature_count, distance)) - (0, 0, 0) stream - in - let stop = Eio.Time.now clock in - let elapsed_time = int_of_float (stop -. start) in - let summary = - RouteSummary.make ~point_count ~feature_count ~distance ~elapsed_time () - in - Eio.traceln "RecordRoute exit\n"; - (Grpc.Status.(v OK), Some (encode summary |> Writer.contents)) +let record_route (t : t) (clock : _ Eio.Time.clock) = + Grpc_eio.Server.Typed_rpc.client_streaming + (Grpc_protoc_plugin.Server_rpc.client_streaming + (module RouteGuide.RecordRoute)) + ~f:(fun (stream : Point.t Seq.t) -> + Eio.traceln "RecordRoute"; + + let last_point = ref None in + let start = Eio.Time.now clock in + + let point_count, feature_count, distance = + Seq.fold_left + (fun (point_count, feature_count, distance) point -> + Eio.traceln " ==> Point = {%s}" (Point.show point); + + (* Increment the point count *) + let point_count = point_count + 1 in + + (* Find features *) + let feature_count = + List.find_all + (fun (feature : Feature.t) -> + Point.equal (Option.get feature.location) point) + t.features + |> fun x -> List.length x + feature_count + in + + (* Calculate the distance *) + let distance = + match !last_point with + | Some last_point -> calc_distance last_point point + | None -> distance + in + last_point := Some point; + (point_count, feature_count, distance)) + (0, 0, 0) stream + in + let stop = Eio.Time.now clock in + let elapsed_time = int_of_float (stop -. start) in + let summary = + RouteSummary.make ~point_count ~feature_count ~distance ~elapsed_time () + in + Eio.traceln "RecordRoute exit\n"; + (Grpc.Status.(v OK), Some summary)) ``` ### Bidirectional streaming RPCs @@ -342,26 +312,21 @@ Finally, let's look at our bidirectional streaming RPC `route_chat`, which recei ```ocaml -let route_chat (stream : string Seq.t) (f : string -> unit) = - Printf.printf "RouteChat\n"; - - let decode, encode = Service.make_service_functions RouteGuide.routeChat in - Seq.iter - (fun i -> - let note = - Reader.create i |> decode |> function - | Ok v -> v - | Error e -> - failwith - (Printf.sprintf "Could not decode request: %s" - (Result.show_error e)) - in - Printf.printf " ==> Note = {%s}\n" (RouteNote.show note); - encode note |> Writer.contents |> f) - stream; +let route_chat (_ : t) = + Grpc_eio.Server.Typed_rpc.bidirectional_streaming + (Grpc_protoc_plugin.Server_rpc.bidirectional_streaming + (module RouteGuide.RouteChat)) + ~f:(fun (stream : RouteNote.t Seq.t) (f : RouteNote.t -> unit) -> + Printf.printf "RouteChat\n"; + + Seq.iter + (fun note -> + Printf.printf " ==> Note = {%s}\n" (RouteNote.show note); + f note) + stream; - Printf.printf "RouteChat exit\n"; - Grpc.Status.(v OK) + Printf.printf "RouteChat exit\n"; + Grpc.Status.(v OK)) ``` `route_chat` receives a `string Seq.t` of requests which it decodes, logs to stdout to show it has received the note, and then encodes again to send back to the client. Finally it responds with an `OK` indicating it has finished. The logic is we receive one `RouteNote` and respond directly with the same `RouteNote` using the `f` function supplied. @@ -372,13 +337,13 @@ Once we've implemented all our functions, we also need to startup a gRPC server ```ocaml -let serve server env = +let serve t env = let port = 8080 in let net = Eio.Stdenv.net env in let clock = Eio.Stdenv.clock env in let addr = `Tcp (Eio.Net.Ipaddr.V4.loopback, port) in Eio.Switch.run @@ fun sw -> - let handler = connection_handler ~sw (server clock) in + let handler = connection_handler ~sw (server t clock) in let server_socket = Eio.Net.listen net ~sw ~reuse_addr:true ~backlog:10 addr in @@ -398,9 +363,9 @@ let () = in (* Load features. *) - features := load path; + let t = { features = load_features path } in - Eio_main.run (serve server) + Eio_main.run (serve t) ``` To handle requests we use `h2-lwt-unix`, an implementation of the HTTP/2 specification entirely in OCaml. What that means is we can swap in other h2 implementations like MirageOS to run in a Unikernel or Async to use JaneStreet's alternatve async implementation. Furthermore we can add TLS or SSL encryptionon to our HTTP/2 stack. @@ -437,23 +402,14 @@ Calling the simple RPC `get_feature` requires building up a `Client.call` repres ```ocaml let call_get_feature connection point = - let encode, decode = Service.make_client_functions RouteGuide.getFeature in let response = - Client.call ~service:"routeguide.RouteGuide" ~rpc:"GetFeature" + Client.Typed_rpc.call + (Grpc_protoc_plugin.Client_rpc.unary (module RouteGuide.GetFeature)) ~do_request:(H2_eio.Client.request connection ~error_handler:ignore) ~handler: - (Client.Rpc.unary - (encode point |> Writer.contents) - ~f:(fun response -> - match response with - | Some response -> ( - Reader.create response |> decode |> function - | Ok feature -> feature - | Error e -> - failwith - (Printf.sprintf "Could not decode request: %s" - (Result.show_error e))) - | None -> Feature.make ())) + (Client.Typed_rpc.unary point ~f:(function + | Some feature -> feature + | None -> Feature.make ())) () in match response with @@ -474,26 +430,12 @@ let print_features connection = () in - let encode, decode = Service.make_client_functions RouteGuide.listFeatures in let stream = - Client.call ~service:"routeguide.RouteGuide" ~rpc:"ListFeatures" + Client.Typed_rpc.call + (Grpc_protoc_plugin.Client_rpc.server_streaming + (module RouteGuide.ListFeatures)) ~do_request:(H2_eio.Client.request connection ~error_handler:ignore) - ~handler: - (Client.Rpc.server_streaming - (encode rectangle |> Writer.contents) - ~f:(fun responses -> - let stream = - Seq.map - (fun str -> - Reader.create str |> decode |> function - | Ok feature -> feature - | Error e -> - failwith - (Printf.sprintf "Could not decode request: %s" - (Result.show_error e))) - responses - in - stream)) + ~handler:(Client.Typed_rpc.server_streaming rectangle ~f:Fun.id) () in match stream with @@ -526,30 +468,22 @@ let run_record_route connection = |> Seq.unfold (function 0 -> None | x -> Some (random_point (), x - 1)) in - let encode, decode = Service.make_client_functions RouteGuide.recordRoute in let response = - Client.call ~service:"routeguide.RouteGuide" ~rpc:"RecordRoute" + Client.Typed_rpc.call + (Grpc_protoc_plugin.Client_rpc.client_streaming + (module RouteGuide.RecordRoute)) ~do_request:(H2_eio.Client.request connection ~error_handler:ignore) ~handler: - (Client.Rpc.client_streaming ~f:(fun f response -> + (Client.Typed_rpc.client_streaming ~f:(fun f response -> (* Stream points to server. *) - Seq.iter - (fun point -> - encode point |> Writer.contents |> fun x -> Seq.write f x) - points; + Seq.iter (fun point -> Seq.write f point) points; (* Signal we have finished sending points. *) Seq.close_writer f; (* Decode RouteSummary responses. *) Eio.Promise.await response |> function - | Some str -> ( - Reader.create str |> decode |> function - | Ok feature -> feature - | Error err -> - failwith - (Printf.sprintf "Could not decode request: %s" - (Result.show_error err))) + | Some summary -> summary | None -> failwith (Printf.sprintf "No RouteSummary received."))) () in @@ -587,14 +521,12 @@ let run_route_chat clock connection = We start by generating a short sequence of locations, similar to how we did for `record_route`. ```ocaml - let encode, decode = Service.make_client_functions RouteGuide.routeChat in let rec go writer reader notes = match Seq.uncons notes with | None -> Seq.close_writer writer (* Signal no more notes from the client. *) | Some (route_note, xs) -> ( - encode route_note |> Writer.contents |> fun x -> - Seq.write writer x; + Seq.write writer route_note; (* Yield and sleep, waiting for server reply. *) Eio.Time.sleep clock 1.0; @@ -602,23 +534,17 @@ We start by generating a short sequence of locations, similar to how we did for match Seq.uncons reader with | None -> failwith "Expecting response" - | Some (response, reader') -> - let route_note = - Reader.create response |> decode |> function - | Ok route_note -> route_note - | Error e -> - failwith - (Printf.sprintf "Could not decode request: %s" - (Result.show_error e)) - in + | Some (route_note, reader') -> Printf.printf "NOTE = {%s}\n" (RouteNote.show route_note); go writer reader' xs) in let result = - Client.call ~service:"routeguide.RouteGuide" ~rpc:"RouteChat" + Client.Typed_rpc.call + (Grpc_protoc_plugin.Client_rpc.bidirectional_streaming + (module RouteGuide.RouteChat)) ~do_request:(H2_eio.Client.request connection ~error_handler:ignore) ~handler: - (Client.Rpc.bidirectional_streaming ~f:(fun writer reader -> + (Client.Typed_rpc.bidirectional_streaming ~f:(fun writer reader -> go writer reader route_notes)) () in diff --git a/examples/routeguide/src/client.ml b/examples/routeguide/src/client.ml index 47d8dba..94c8167 100644 --- a/examples/routeguide/src/client.ml +++ b/examples/routeguide/src/client.ml @@ -1,6 +1,5 @@ open Grpc_eio open Routeguide.Route_guide.Routeguide -open Ocaml_protoc_plugin (* $MDX part-begin=client-h2 *) let client ~sw host port network = @@ -20,23 +19,14 @@ let client ~sw host port network = (* $MDX part-end *) (* $MDX part-begin=client-get-feature *) let call_get_feature connection point = - let encode, decode = Service.make_client_functions RouteGuide.getFeature in let response = - Client.call ~service:"routeguide.RouteGuide" ~rpc:"GetFeature" + Client.Typed_rpc.call + (Grpc_protoc_plugin.Client_rpc.unary (module RouteGuide.GetFeature)) ~do_request:(H2_eio.Client.request connection ~error_handler:ignore) ~handler: - (Client.Rpc.unary - (encode point |> Writer.contents) - ~f:(fun response -> - match response with - | Some response -> ( - Reader.create response |> decode |> function - | Ok feature -> feature - | Error e -> - failwith - (Printf.sprintf "Could not decode request: %s" - (Result.show_error e))) - | None -> Feature.make ())) + (Client.Typed_rpc.unary point ~f:(function + | Some feature -> feature + | None -> Feature.make ())) () in match response with @@ -53,26 +43,12 @@ let print_features connection = () in - let encode, decode = Service.make_client_functions RouteGuide.listFeatures in let stream = - Client.call ~service:"routeguide.RouteGuide" ~rpc:"ListFeatures" + Client.Typed_rpc.call + (Grpc_protoc_plugin.Client_rpc.server_streaming + (module RouteGuide.ListFeatures)) ~do_request:(H2_eio.Client.request connection ~error_handler:ignore) - ~handler: - (Client.Rpc.server_streaming - (encode rectangle |> Writer.contents) - ~f:(fun responses -> - let stream = - Seq.map - (fun str -> - Reader.create str |> decode |> function - | Ok feature -> feature - | Error e -> - failwith - (Printf.sprintf "Could not decode request: %s" - (Result.show_error e))) - responses - in - stream)) + ~handler:(Client.Typed_rpc.server_streaming rectangle ~f:Fun.id) () in match stream with @@ -98,30 +74,22 @@ let run_record_route connection = |> Seq.unfold (function 0 -> None | x -> Some (random_point (), x - 1)) in - let encode, decode = Service.make_client_functions RouteGuide.recordRoute in let response = - Client.call ~service:"routeguide.RouteGuide" ~rpc:"RecordRoute" + Client.Typed_rpc.call + (Grpc_protoc_plugin.Client_rpc.client_streaming + (module RouteGuide.RecordRoute)) ~do_request:(H2_eio.Client.request connection ~error_handler:ignore) ~handler: - (Client.Rpc.client_streaming ~f:(fun f response -> + (Client.Typed_rpc.client_streaming ~f:(fun f response -> (* Stream points to server. *) - Seq.iter - (fun point -> - encode point |> Writer.contents |> fun x -> Seq.write f x) - points; + Seq.iter (fun point -> Seq.write f point) points; (* Signal we have finished sending points. *) Seq.close_writer f; (* Decode RouteSummary responses. *) Eio.Promise.await response |> function - | Some str -> ( - Reader.create str |> decode |> function - | Ok feature -> feature - | Error err -> - failwith - (Printf.sprintf "Could not decode request: %s" - (Result.show_error err))) + | Some summary -> summary | None -> failwith (Printf.sprintf "No RouteSummary received."))) () in @@ -150,14 +118,12 @@ let run_route_chat clock connection = in (* $MDX part-end *) (* $MDX part-begin=client-route-chat-2 *) - let encode, decode = Service.make_client_functions RouteGuide.routeChat in let rec go writer reader notes = match Seq.uncons notes with | None -> Seq.close_writer writer (* Signal no more notes from the client. *) | Some (route_note, xs) -> ( - encode route_note |> Writer.contents |> fun x -> - Seq.write writer x; + Seq.write writer route_note; (* Yield and sleep, waiting for server reply. *) Eio.Time.sleep clock 1.0; @@ -165,23 +131,17 @@ let run_route_chat clock connection = match Seq.uncons reader with | None -> failwith "Expecting response" - | Some (response, reader') -> - let route_note = - Reader.create response |> decode |> function - | Ok route_note -> route_note - | Error e -> - failwith - (Printf.sprintf "Could not decode request: %s" - (Result.show_error e)) - in + | Some (route_note, reader') -> Printf.printf "NOTE = {%s}\n" (RouteNote.show route_note); go writer reader' xs) in let result = - Client.call ~service:"routeguide.RouteGuide" ~rpc:"RouteChat" + Client.Typed_rpc.call + (Grpc_protoc_plugin.Client_rpc.bidirectional_streaming + (module RouteGuide.RouteChat)) ~do_request:(H2_eio.Client.request connection ~error_handler:ignore) ~handler: - (Client.Rpc.bidirectional_streaming ~f:(fun writer reader -> + (Client.Typed_rpc.bidirectional_streaming ~f:(fun writer reader -> go writer reader route_notes)) () in diff --git a/examples/routeguide/src/dune b/examples/routeguide/src/dune index 9c5afaf..5da330a 100644 --- a/examples/routeguide/src/dune +++ b/examples/routeguide/src/dune @@ -4,9 +4,9 @@ (public_names routeguide-server routeguide-client) (libraries grpc-eio + grpc-protoc-plugin eio_main h2-eio - ocaml-protoc-plugin routeguide yojson ppx_deriving_yojson.runtime) diff --git a/examples/routeguide/src/server.ml b/examples/routeguide/src/server.ml index bfa30d9..557e323 100644 --- a/examples/routeguide/src/server.ml +++ b/examples/routeguide/src/server.ml @@ -1,13 +1,13 @@ open Grpc_eio open Routeguide.Route_guide.Routeguide -open Ocaml_protoc_plugin (* Derived data types to make reading JSON data easier. *) type location = { latitude : int; longitude : int } [@@deriving yojson] type feature = { location : location; name : string } [@@deriving yojson] type feature_list = feature list [@@deriving yojson] -let features : Feature.t list ref = ref [] +(* This will act as a master state that the server is serving over RPC. *) +type t = { features : Feature.t list } module RouteNotesMap = Hashtbl.Make (struct type t = Point.t @@ -17,7 +17,7 @@ module RouteNotesMap = Hashtbl.Make (struct end) (** Load route_guide data from a JSON file. *) -let load path : Feature.t list = +let load_features path : Feature.t list = let json = Yojson.Safe.from_file path in match feature_list_of_yojson json with | Ok v -> @@ -73,152 +73,117 @@ let calc_distance (p1 : Point.t) (p2 : Point.t) : int = Float.to_int (r *. c) (* $MDX part-begin=server-get-feature *) -let get_feature (buffer : string) = - let decode, encode = Service.make_service_functions RouteGuide.getFeature in - (* Decode the request. *) - let point = - Reader.create buffer |> decode |> function - | Ok v -> v - | Error e -> - failwith - (Printf.sprintf "Could not decode request: %s" (Result.show_error e)) - in - Eio.traceln "GetFeature = {:%s}" (Point.show point); - - (* Lookup the feature and if found return it. *) - let feature = - List.find_opt - (fun (f : Feature.t) -> - match (f.location, point) with - | Some p1, p2 -> Point.equal p1 p2 - | _, _ -> false) - !features - in - Eio.traceln "Found feature %s" - (feature |> Option.map Feature.show |> Option.value ~default:"Missing"); - match feature with - | Some feature -> - (Grpc.Status.(v OK), Some (feature |> encode |> Writer.contents)) - | None -> - (* No feature was found, return an unnamed feature. *) - ( Grpc.Status.(v OK), - Some (Feature.make ~location:point () |> encode |> Writer.contents) ) +let get_feature (t : t) = + Grpc_eio.Server.Typed_rpc.unary + (Grpc_protoc_plugin.Server_rpc.unary (module RouteGuide.GetFeature)) + ~f:(fun point -> + Eio.traceln "GetFeature = {:%s}" (Point.show point); + + (* Lookup the feature and if found return it. *) + let feature = + List.find_opt + (fun (f : Feature.t) -> + match (f.location, point) with + | Some p1, p2 -> Point.equal p1 p2 + | _, _ -> false) + t.features + in + Eio.traceln "Found feature %s" + (feature |> Option.map Feature.show |> Option.value ~default:"Missing"); + match feature with + | Some feature -> (Grpc.Status.(v OK), Some feature) + | None -> + (* No feature was found, return an unnamed feature. *) + (Grpc.Status.(v OK), Some (Feature.make ~location:point ()))) (* $MDX part-end *) (* $MDX part-begin=server-list-features *) -let list_features (buffer : string) (f : string -> unit) = - (* Decode request. *) - let decode, encode = Service.make_service_functions RouteGuide.listFeatures in - let rectangle = - Reader.create buffer |> decode |> function - | Ok v -> v - | Error e -> - failwith - (Printf.sprintf "Could not decode request: %s" (Result.show_error e)) - in - - (* Lookup and reply with features found. *) - let () = - List.iter - (fun (feature : Feature.t) -> - if in_range (Option.get feature.location) rectangle then - encode feature |> Writer.contents |> f - else ()) - !features - in - Grpc.Status.(v OK) +let list_features (t : t) = + Grpc_eio.Server.Typed_rpc.server_streaming + (Grpc_protoc_plugin.Server_rpc.server_streaming + (module RouteGuide.ListFeatures)) + ~f:(fun rectangle f -> + (* Lookup and reply with features found. *) + let () = + List.iter + (fun (feature : Feature.t) -> + if in_range (Option.get feature.location) rectangle then f feature + else ()) + t.features + in + Grpc.Status.(v OK)) (* $MDX part-end *) (* $MDX part-begin=server-record-route *) -let record_route (clock : _ Eio.Time.clock) (stream : string Seq.t) = - Eio.traceln "RecordRoute"; - - let last_point = ref None in - let start = Eio.Time.now clock in - let decode, encode = Service.make_service_functions RouteGuide.recordRoute in - - let point_count, feature_count, distance = - Seq.fold_left - (fun (point_count, feature_count, distance) i -> - let point = - Reader.create i |> decode |> function - | Ok v -> v - | Error e -> - failwith - (Printf.sprintf "Could not decode request: %s" - (Result.show_error e)) - in - Eio.traceln " ==> Point = {%s}" (Point.show point); - - (* Increment the point count *) - let point_count = point_count + 1 in - - (* Find features *) - let feature_count = - List.find_all - (fun (feature : Feature.t) -> - Point.equal (Option.get feature.location) point) - !features - |> fun x -> List.length x + feature_count - in - - (* Calculate the distance *) - let distance = - match !last_point with - | Some last_point -> calc_distance last_point point - | None -> distance - in - last_point := Some point; - (point_count, feature_count, distance)) - (0, 0, 0) stream - in - let stop = Eio.Time.now clock in - let elapsed_time = int_of_float (stop -. start) in - let summary = - RouteSummary.make ~point_count ~feature_count ~distance ~elapsed_time () - in - Eio.traceln "RecordRoute exit\n"; - (Grpc.Status.(v OK), Some (encode summary |> Writer.contents)) +let record_route (t : t) (clock : _ Eio.Time.clock) = + Grpc_eio.Server.Typed_rpc.client_streaming + (Grpc_protoc_plugin.Server_rpc.client_streaming + (module RouteGuide.RecordRoute)) + ~f:(fun (stream : Point.t Seq.t) -> + Eio.traceln "RecordRoute"; + + let last_point = ref None in + let start = Eio.Time.now clock in + + let point_count, feature_count, distance = + Seq.fold_left + (fun (point_count, feature_count, distance) point -> + Eio.traceln " ==> Point = {%s}" (Point.show point); + + (* Increment the point count *) + let point_count = point_count + 1 in + + (* Find features *) + let feature_count = + List.find_all + (fun (feature : Feature.t) -> + Point.equal (Option.get feature.location) point) + t.features + |> fun x -> List.length x + feature_count + in + + (* Calculate the distance *) + let distance = + match !last_point with + | Some last_point -> calc_distance last_point point + | None -> distance + in + last_point := Some point; + (point_count, feature_count, distance)) + (0, 0, 0) stream + in + let stop = Eio.Time.now clock in + let elapsed_time = int_of_float (stop -. start) in + let summary = + RouteSummary.make ~point_count ~feature_count ~distance ~elapsed_time () + in + Eio.traceln "RecordRoute exit\n"; + (Grpc.Status.(v OK), Some summary)) (* $MDX part-end *) (* $MDX part-begin=server-route-chat *) -let route_chat (stream : string Seq.t) (f : string -> unit) = - Printf.printf "RouteChat\n"; - - let decode, encode = Service.make_service_functions RouteGuide.routeChat in - Seq.iter - (fun i -> - let note = - Reader.create i |> decode |> function - | Ok v -> v - | Error e -> - failwith - (Printf.sprintf "Could not decode request: %s" - (Result.show_error e)) - in - Printf.printf " ==> Note = {%s}\n" (RouteNote.show note); - encode note |> Writer.contents |> f) - stream; - - Printf.printf "RouteChat exit\n"; - Grpc.Status.(v OK) +let route_chat (_ : t) = + Grpc_eio.Server.Typed_rpc.bidirectional_streaming + (Grpc_protoc_plugin.Server_rpc.bidirectional_streaming + (module RouteGuide.RouteChat)) + ~f:(fun (stream : RouteNote.t Seq.t) (f : RouteNote.t -> unit) -> + Printf.printf "RouteChat\n"; + + Seq.iter + (fun note -> + Printf.printf " ==> Note = {%s}\n" (RouteNote.show note); + f note) + stream; + + Printf.printf "RouteChat exit\n"; + Grpc.Status.(v OK)) (* $MDX part-end *) (* $MDX part-begin=server-grpc *) -let route_guide_service clock = - Server.Service.( - v () - |> add_rpc ~name:"GetFeature" ~rpc:(Unary get_feature) - |> add_rpc ~name:"ListFeatures" ~rpc:(Server_streaming list_features) - |> add_rpc ~name:"RecordRoute" ~rpc:(Client_streaming (record_route clock)) - |> add_rpc ~name:"RouteChat" ~rpc:(Bidirectional_streaming route_chat) - |> handle_request) - -let server clock = - Server.( - v () - |> add_service ~name:"routeguide.RouteGuide" - ~service:(route_guide_service clock)) +let server t clock = + Server.Typed_rpc.server + (Grpc_protoc_plugin.handlers + [ get_feature t; list_features t; record_route t clock; route_chat t ]) (* $MDX part-end *) let connection_handler server ~sw = @@ -238,13 +203,13 @@ let connection_handler server ~sw = ~error_handler addr socket ~sw (* $MDX part-begin=server-main *) -let serve server env = +let serve t env = let port = 8080 in let net = Eio.Stdenv.net env in let clock = Eio.Stdenv.clock env in let addr = `Tcp (Eio.Net.Ipaddr.V4.loopback, port) in Eio.Switch.run @@ fun sw -> - let handler = connection_handler ~sw (server clock) in + let handler = connection_handler ~sw (server t clock) in let server_socket = Eio.Net.listen net ~sw ~reuse_addr:true ~backlog:10 addr in @@ -264,7 +229,7 @@ let () = in (* Load features. *) - features := load path; + let t = { features = load_features path } in - Eio_main.run (serve server) + Eio_main.run (serve t) (* $MDX part-end *) diff --git a/grpc-protoc-plugin.opam b/grpc-protoc-plugin.opam new file mode 100644 index 0000000..1ddae22 --- /dev/null +++ b/grpc-protoc-plugin.opam @@ -0,0 +1,40 @@ +# This file is generated by dune, edit dune-project instead +opam-version: "2.0" +synopsis: "An implementation of gRPC using ocaml-protoc-plugin" +description: + "Functionality for building gRPC services and rpcs with `ocaml-protoc-plugin`" +maintainer: ["Daniel Quernheim "] +authors: [ + "Andrew Jeffery " + "Daniel Quernheim " + "Michael Bacarella " + "Sven Anderson " + "Tim McGilchrist " + "Wojtek Czekalski " + "dimitris.mostrous " +] +license: "BSD-3-Clause" +homepage: "https://github.com/dialohq/ocaml-grpc" +doc: "https://dialohq.github.io/ocaml-grpc" +bug-reports: "https://github.com/dialohq/ocaml-grpc/issues" +depends: [ + "dune" {>= "3.7"} + "grpc" {= version} + "ocaml-protoc-plugin" {>= "4.5"} + "odoc" {with-doc} +] +build: [ + ["dune" "subst"] {dev} + [ + "dune" + "build" + "-p" + name + "-j" + jobs + "@install" + "@runtest" {with-test} + "@doc" {with-doc} + ] +] +dev-repo: "git+https://github.com/dialohq/ocaml-grpc.git" diff --git a/grpc-protoc.opam b/grpc-protoc.opam new file mode 100644 index 0000000..fde3dfd --- /dev/null +++ b/grpc-protoc.opam @@ -0,0 +1,42 @@ +# This file is generated by dune, edit dune-project instead +opam-version: "2.0" +synopsis: "An implementation of gRPC using ocaml-protoc" +description: + "Functionality for building gRPC services and rpcs with `ocaml-protoc`" +maintainer: ["Daniel Quernheim "] +authors: [ + "Andrew Jeffery " + "Daniel Quernheim " + "Michael Bacarella " + "Sven Anderson " + "Tim McGilchrist " + "Wojtek Czekalski " + "dimitris.mostrous " +] +license: "BSD-3-Clause" +homepage: "https://github.com/dialohq/ocaml-grpc" +doc: "https://dialohq.github.io/ocaml-grpc" +bug-reports: "https://github.com/dialohq/ocaml-grpc/issues" +depends: [ + "dune" {>= "3.7"} + "grpc" {= version} + "ocaml-protoc" {>= "3.0"} + "pbrt" {>= "3.0"} + "pbrt_services" {>= "3.0"} + "odoc" {with-doc} +] +build: [ + ["dune" "subst"] {dev} + [ + "dune" + "build" + "-p" + name + "-j" + jobs + "@install" + "@runtest" {with-test} + "@doc" {with-doc} + ] +] +dev-repo: "git+https://github.com/dialohq/ocaml-grpc.git" diff --git a/lib/grpc-eio/client.ml b/lib/grpc-eio/client.ml index 4efe5cd..00e93c8 100644 --- a/lib/grpc-eio/client.ml +++ b/lib/grpc-eio/client.ml @@ -104,3 +104,96 @@ module Rpc = struct let response = Seq.read_and_exhaust responses in f response) end + +module Typed_rpc = struct + type ('request, 'request_mode, 'response, 'response_mode, 'a) handler = + ('request, 'request_mode, 'response, 'response_mode) Grpc.Rpc.Client_rpc.t -> + H2.Body.Writer.t -> + H2.Body.Reader.t -> + 'a + + let unary (type request response) ~f (request : request) + (rpc : + ( request, + Grpc.Rpc.Value_mode.unary, + response, + Grpc.Rpc.Value_mode.unary ) + Grpc.Rpc.Client_rpc.t) = + let request = rpc.encode_request request in + let f response = + let response = response |> Option.map rpc.decode_response in + f response + in + Rpc.unary ~f request + + let server_streaming (type request response) ~f (request : request) + (rpc : + ( request, + Grpc.Rpc.Value_mode.unary, + response, + Grpc.Rpc.Value_mode.stream ) + Grpc.Rpc.Client_rpc.t) = + let request = rpc.encode_request request in + let f responses = + let responses = Seq.map rpc.decode_response responses in + f responses + in + Rpc.server_streaming ~f request + + let client_streaming (type request response) ~f + (rpc : + ( request, + Grpc.Rpc.Value_mode.stream, + response, + Grpc.Rpc.Value_mode.unary ) + Grpc.Rpc.Client_rpc.t) = + let f requests response = + let requests_reader, requests' = Seq.create_reader_writer () in + let response', response_u = Eio.Promise.create () in + Eio.Switch.run @@ fun sw -> + Eio.Fiber.fork ~sw (fun () -> + Eio.Fiber.both + (fun () -> + let response = + Eio.Promise.await response |> Option.map rpc.decode_response + in + Eio.Promise.resolve response_u response) + (fun () -> + Seq.iter + (fun request -> Seq.write requests (rpc.encode_request request)) + requests_reader; + Seq.close_writer requests)); + f requests' response' + in + Rpc.client_streaming ~f + + let bidirectional_streaming (type request response) ~f + (rpc : + ( request, + Grpc.Rpc.Value_mode.stream, + response, + Grpc.Rpc.Value_mode.stream ) + Grpc.Rpc.Client_rpc.t) = + let f requests responses = + let requests_reader, requests' = Seq.create_reader_writer () in + let responses' = Seq.map rpc.decode_response responses in + Eio.Switch.run @@ fun sw -> + Eio.Fiber.fork ~sw (fun () -> + Seq.iter + (fun request -> Seq.write requests (rpc.encode_request request)) + requests_reader; + Seq.close_writer requests); + f requests' responses' + in + Rpc.bidirectional_streaming ~f + + let call (type request request_mode response response_mode a) + (rpc : + (request, request_mode, response, response_mode) Grpc.Rpc.Client_rpc.t) + ?scheme + ~(handler : (request, request_mode, response, response_mode, a) handler) + ~do_request ?headers () = + call + ~service:(Grpc.Rpc.Service_spec.packaged_service_name rpc.service_spec) + ~rpc:rpc.rpc_name ?scheme ~handler:(handler rpc) ~do_request ?headers () +end diff --git a/lib/grpc-eio/client.mli b/lib/grpc-eio/client.mli index 745d33c..45396f1 100644 --- a/lib/grpc-eio/client.mli +++ b/lib/grpc-eio/client.mli @@ -46,3 +46,67 @@ val call : (** [call ~service ~rpc ~handler ~do_request ()] calls the rpc endpoint given by [service] and [rpc] using the [do_request] function. The [handler] is called when this request is set up to send and receive data. *) + +module Typed_rpc : sig + (** This is an experimental API to call RPC from the client side. Compared to + {Rpc}, this interface will: + + - handle the coding/decoding of messages for you under the hood; + - use the service and RPC names provided by the rpc specification to + call the services with their expected names. *) + + type ('request, 'request_mode, 'response, 'response_mode, 'a) handler + + (** The next functions are meant to be used by the client to handle + call to RPCs. *) + + val bidirectional_streaming : + f:('request Seq.writer -> 'response Seq.t -> 'a) -> + ( 'request, + Grpc.Rpc.Value_mode.stream, + 'response, + Grpc.Rpc.Value_mode.stream, + 'a ) + handler + + val client_streaming : + f:('request Seq.writer -> 'response option Eio.Promise.t -> 'a) -> + ( 'request, + Grpc.Rpc.Value_mode.stream, + 'response, + Grpc.Rpc.Value_mode.unary, + 'a ) + handler + + val server_streaming : + f:('response Seq.t -> 'a) -> + 'request -> + ( 'request, + Grpc.Rpc.Value_mode.unary, + 'response, + Grpc.Rpc.Value_mode.stream, + 'a ) + handler + + val unary : + f:('response option -> 'a) -> + 'request -> + ( 'request, + Grpc.Rpc.Value_mode.unary, + 'response, + Grpc.Rpc.Value_mode.unary, + 'a ) + handler + + val call : + ('request, 'request_mode, 'response, 'response_mode) Grpc.Rpc.Client_rpc.t -> + ?scheme:string -> + handler:('request, 'request_mode, 'response, 'response_mode, 'a) handler -> + do_request:do_request -> + ?headers:H2.Headers.t -> + unit -> + ('a * Grpc.Status.t, H2.Status.t) result + (** The rpc specification must be provided as it is used to handle + coding/decoding of messages as well as allows referring to the service + and RPC names specified in the [.proto] file. *) +end diff --git a/lib/grpc-eio/server.ml b/lib/grpc-eio/server.ml index ffd850c..0de65b0 100644 --- a/lib/grpc-eio/server.ml +++ b/lib/grpc-eio/server.ml @@ -127,3 +127,126 @@ module Service = struct | None -> respond_with `Not_found else respond_with `Not_found end + +module Typed_rpc = struct + type server = t + + type ('request, 'response) unary = + 'request -> Grpc.Status.t * 'response option + + type ('request, 'response) client_streaming = + 'request Seq.t -> Grpc.Status.t * 'response option + + type ('request, 'response) server_streaming = + 'request -> ('response -> unit) -> Grpc.Status.t + + type ('request, 'response) bidirectional_streaming = + 'request Seq.t -> ('response -> unit) -> Grpc.Status.t + + type 'service_spec t = + | T : { + rpc_spec : + ( 'request, + 'request_mode, + 'response, + 'response_mode, + 'service_spec ) + Grpc.Rpc.Server_rpc.t; + rpc_impl : Rpc.t; + } + -> 'service_spec t + + let rec make_handlers handlers = + match (handlers : _ Grpc.Rpc.Handlers.t) with + | a :: tl -> List.concat (make_handlers a :: List.map make_handlers tl) + | Handlers { handlers = ts } -> ts + | With_service_spec { service_spec; handlers = ts } -> + List.map + (fun (T t) -> + T + { + t with + rpc_spec = { t.rpc_spec with service_spec = Some service_spec }; + }) + ts + + let server handlers : server = + let handlers = make_handlers handlers in + List.fold_left + (fun map (T t as packed) -> + let service_name = + match t.rpc_spec.service_spec with + | Some service_spec -> + Grpc.Rpc.Service_spec.packaged_service_name service_spec + in + let rpc_impl = + ServiceMap.find_opt service_name map |> Option.value ~default:[] + in + ServiceMap.add service_name (packed :: rpc_impl) map) + ServiceMap.empty handlers + |> ServiceMap.map (fun ts -> + let service = + List.fold_left + (fun acc (T t) -> + Service.add_rpc ~name:t.rpc_spec.rpc_name ~rpc:t.rpc_impl acc) + (Service.v ()) ts + in + Service.handle_request service) + + let unary (type request response) + (rpc_spec : + ( request, + Grpc.Rpc.Value_mode.unary, + response, + Grpc.Rpc.Value_mode.unary, + _ ) + Grpc.Rpc.Server_rpc.t) ~f:handler = + let handler buffer = + let status, response = handler (rpc_spec.decode_request buffer) in + (status, Option.map rpc_spec.encode_response response) + in + T { rpc_spec; rpc_impl = Rpc.Unary handler } + + let server_streaming (type request response) + (rpc_spec : + ( request, + Grpc.Rpc.Value_mode.unary, + response, + Grpc.Rpc.Value_mode.stream, + _ ) + Grpc.Rpc.Server_rpc.t) ~f:handler = + let handler buffer f = + handler (rpc_spec.decode_request buffer) (fun response -> + f (rpc_spec.encode_response response)) + in + T { rpc_spec; rpc_impl = Rpc.Server_streaming handler } + + let client_streaming (type request response) + (rpc_spec : + ( request, + Grpc.Rpc.Value_mode.stream, + response, + Grpc.Rpc.Value_mode.unary, + _ ) + Grpc.Rpc.Server_rpc.t) ~f:handler = + let handler requests = + let requests = Seq.map rpc_spec.decode_request requests in + let status, response = handler requests in + (status, Option.map rpc_spec.encode_response response) + in + T { rpc_spec; rpc_impl = Rpc.Client_streaming handler } + + let bidirectional_streaming (type request response) + (rpc_spec : + ( request, + Grpc.Rpc.Value_mode.stream, + response, + Grpc.Rpc.Value_mode.stream, + _ ) + Grpc.Rpc.Server_rpc.t) ~f:handler = + let handler requests f = + let requests = Seq.map rpc_spec.decode_request requests in + handler requests (fun response -> f (rpc_spec.encode_response response)) + in + T { rpc_spec; rpc_impl = Rpc.Bidirectional_streaming handler } +end diff --git a/lib/grpc-eio/server.mli b/lib/grpc-eio/server.mli index 40961f5..76c000e 100644 --- a/lib/grpc-eio/server.mli +++ b/lib/grpc-eio/server.mli @@ -48,3 +48,88 @@ module Service : sig val handle_request : t -> H2.Reqd.t -> unit (** [handle_request t reqd] handles routing [reqd] to the correct rpc if available in [t]. *) end + +module Typed_rpc : sig + (** This is an experimental API to build RPCs on the server side. Compared to + {Rpc}, this interface will: + + - handle the coding/decoding of messages for you under the hood; + - use the service and RPC names provided by the rpc specification to + register the services with their expected names. *) + + type server := t + + type ('request, 'response) unary = + 'request -> Grpc.Status.t * 'response option + (** [unary] is the type for a unary grpc rpc, one request, one response. *) + + type ('request, 'response) client_streaming = + 'request Seq.t -> Grpc.Status.t * 'response option + (** [client_streaming] is the type for an rpc where the client streams the + requests and the server responds once. *) + + type ('request, 'response) server_streaming = + 'request -> ('response -> unit) -> Grpc.Status.t + (** [server_streaming] is the type for an rpc where the client sends one + request and the server sends multiple responses. *) + + type ('request, 'response) bidirectional_streaming = + 'request Seq.t -> ('response -> unit) -> Grpc.Status.t + (** [bidirectional_streaming] is the type for an rpc where both the client and + server can send multiple messages. *) + + type 'service_spec t + (** [t] represents an implementation for an RPC on the server side. *) + + (** The next functions are meant to be used by the server to create RPC + implementations. The rpc specification that the function implements must + be provided as it is used to handle coding/decoding of messages. It also + allows to refer to the service and RPC names specified in the [.proto] + file. *) + + val unary : + ( 'request, + Grpc.Rpc.Value_mode.unary, + 'response, + Grpc.Rpc.Value_mode.unary, + 'service_spec ) + Grpc.Rpc.Server_rpc.t -> + f:('request, 'response) unary -> + 'service_spec t + + val client_streaming : + ( 'request, + Grpc.Rpc.Value_mode.stream, + 'response, + Grpc.Rpc.Value_mode.unary, + 'service_spec ) + Grpc.Rpc.Server_rpc.t -> + f:('request, 'response) client_streaming -> + 'service_spec t + + val server_streaming : + ( 'request, + Grpc.Rpc.Value_mode.unary, + 'response, + Grpc.Rpc.Value_mode.stream, + 'service_spec ) + Grpc.Rpc.Server_rpc.t -> + f:('request, 'response) server_streaming -> + 'service_spec t + + val bidirectional_streaming : + ( 'request, + Grpc.Rpc.Value_mode.stream, + 'response, + Grpc.Rpc.Value_mode.stream, + 'service_spec ) + Grpc.Rpc.Server_rpc.t -> + f:('request, 'response) bidirectional_streaming -> + 'service_spec t + + val server : (Grpc.Rpc.Service_spec.t t, unit t) Grpc.Rpc.Handlers.t -> server + (** Having built a list of RPCs you will use this function to package them up + into a server that is ready to be served over the network. This function + takes care of registering the services based on the names provided by the + protoc specification. *) +end diff --git a/lib/grpc-protoc-plugin/dune b/lib/grpc-protoc-plugin/dune new file mode 100644 index 0000000..900987e --- /dev/null +++ b/lib/grpc-protoc-plugin/dune @@ -0,0 +1,4 @@ +(library + (name grpc_protoc_plugin) + (public_name grpc-protoc-plugin) + (libraries grpc ocaml-protoc-plugin)) diff --git a/lib/grpc-protoc-plugin/grpc_protoc_plugin.ml b/lib/grpc-protoc-plugin/grpc_protoc_plugin.ml new file mode 100644 index 0000000..baea692 --- /dev/null +++ b/lib/grpc-protoc-plugin/grpc_protoc_plugin.ml @@ -0,0 +1,69 @@ +module type S = Ocaml_protoc_plugin.Service.Rpc + +let encode (type a) + (module M : Ocaml_protoc_plugin.Runtime.Runtime'.Service.Message + with type t = a) (a : a) = + a |> M.to_proto |> Ocaml_protoc_plugin.Runtime.Runtime'.Writer.contents + +let decode (type a) + (module M : Ocaml_protoc_plugin.Runtime.Runtime'.Service.Message + with type t = a) buffer = + buffer |> Ocaml_protoc_plugin.Runtime.Runtime'.Reader.create |> M.from_proto + |> function + | Ok r -> r + | Error e -> + failwith + (Printf.sprintf "Could not decode request: %s" + (Ocaml_protoc_plugin.Result.show_error e)) + +let service_spec (type request response) + (module R : S with type Request.t = request and type Response.t = response) + = + { + Grpc.Rpc.Service_spec.package = R.package_name |> Option.to_list; + service_name = R.service_name; + } + +module Client_rpc = struct + let make (type request response) + (module R : S with type Request.t = request and type Response.t = response) + ~request_mode ~response_mode = + { + Grpc.Rpc.Client_rpc.service_spec = service_spec (module R); + rpc_name = R.method_name; + encode_request = encode (module R.Request); + decode_response = decode (module R.Response); + request_mode; + response_mode; + } + + let unary rpc = make rpc ~request_mode:Unary ~response_mode:Unary + let client_streaming rpc = make rpc ~request_mode:Stream ~response_mode:Unary + let server_streaming rpc = make rpc ~request_mode:Unary ~response_mode:Stream + + let bidirectional_streaming rpc = + make rpc ~request_mode:Stream ~response_mode:Stream +end + +module Server_rpc = struct + let make (type request response) + (module R : S with type Request.t = request and type Response.t = response) + ~request_mode ~response_mode = + { + Grpc.Rpc.Server_rpc.service_spec = Some (service_spec (module R)); + rpc_name = R.method_name; + decode_request = decode (module R.Request); + encode_response = encode (module R.Response); + request_mode; + response_mode; + } + + let unary rpc = make rpc ~request_mode:Unary ~response_mode:Unary + let client_streaming rpc = make rpc ~request_mode:Stream ~response_mode:Unary + let server_streaming rpc = make rpc ~request_mode:Unary ~response_mode:Stream + + let bidirectional_streaming rpc = + make rpc ~request_mode:Stream ~response_mode:Stream +end + +let handlers handlers = Grpc.Rpc.Handlers.Handlers { handlers } diff --git a/lib/grpc-protoc-plugin/grpc_protoc_plugin.mli b/lib/grpc-protoc-plugin/grpc_protoc_plugin.mli new file mode 100644 index 0000000..7bd147f --- /dev/null +++ b/lib/grpc-protoc-plugin/grpc_protoc_plugin.mli @@ -0,0 +1,75 @@ +module type S = Ocaml_protoc_plugin.Service.Rpc + +module Client_rpc : sig + val unary : + (module S with type Request.t = 'request and type Response.t = 'response) -> + ( 'request, + Grpc.Rpc.Value_mode.unary, + 'response, + Grpc.Rpc.Value_mode.unary ) + Grpc.Rpc.Client_rpc.t + + val client_streaming : + (module S with type Request.t = 'request and type Response.t = 'response) -> + ( 'request, + Grpc.Rpc.Value_mode.stream, + 'response, + Grpc.Rpc.Value_mode.unary ) + Grpc.Rpc.Client_rpc.t + + val server_streaming : + (module S with type Request.t = 'request and type Response.t = 'response) -> + ( 'request, + Grpc.Rpc.Value_mode.unary, + 'response, + Grpc.Rpc.Value_mode.stream ) + Grpc.Rpc.Client_rpc.t + + val bidirectional_streaming : + (module S with type Request.t = 'request and type Response.t = 'response) -> + ( 'request, + Grpc.Rpc.Value_mode.stream, + 'response, + Grpc.Rpc.Value_mode.stream ) + Grpc.Rpc.Client_rpc.t +end + +module Server_rpc : sig + val unary : + (module S with type Request.t = 'request and type Response.t = 'response) -> + ( 'request, + Grpc.Rpc.Value_mode.unary, + 'response, + Grpc.Rpc.Value_mode.unary, + Grpc.Rpc.Service_spec.t ) + Grpc.Rpc.Server_rpc.t + + val client_streaming : + (module S with type Request.t = 'request and type Response.t = 'response) -> + ( 'request, + Grpc.Rpc.Value_mode.stream, + 'response, + Grpc.Rpc.Value_mode.unary, + Grpc.Rpc.Service_spec.t ) + Grpc.Rpc.Server_rpc.t + + val server_streaming : + (module S with type Request.t = 'request and type Response.t = 'response) -> + ( 'request, + Grpc.Rpc.Value_mode.unary, + 'response, + Grpc.Rpc.Value_mode.stream, + Grpc.Rpc.Service_spec.t ) + Grpc.Rpc.Server_rpc.t + + val bidirectional_streaming : + (module S with type Request.t = 'request and type Response.t = 'response) -> + ( 'request, + Grpc.Rpc.Value_mode.stream, + 'response, + Grpc.Rpc.Value_mode.stream, + Grpc.Rpc.Service_spec.t ) + Grpc.Rpc.Server_rpc.t +end + +val handlers : 'a list -> ('a, _) Grpc.Rpc.Handlers.t diff --git a/lib/grpc-protoc/dune b/lib/grpc-protoc/dune new file mode 100644 index 0000000..14a98b6 --- /dev/null +++ b/lib/grpc-protoc/dune @@ -0,0 +1,4 @@ +(library + (name grpc_protoc) + (public_name grpc-protoc) + (libraries grpc ocaml-protoc pbrt pbrt_services)) diff --git a/lib/grpc-protoc/grpc_protoc.ml b/lib/grpc-protoc/grpc_protoc.ml new file mode 100644 index 0000000..5ba88aa --- /dev/null +++ b/lib/grpc-protoc/grpc_protoc.ml @@ -0,0 +1,55 @@ +let encode (type a) (encode : a -> Pbrt.Encoder.t -> unit) (a : a) = + let encoder = Pbrt.Encoder.create () in + encode a encoder; + Pbrt.Encoder.to_string encoder + +let decode (type a) (decode : Pbrt.Decoder.t -> a) buffer = + let decoder = Pbrt.Decoder.of_string buffer in + decode decoder + +module Client_rpc = struct + let make (type request response) + (rpc : (request, _, response, _) Pbrt_services.Client.rpc) ~request_mode + ~response_mode = + { + Grpc.Rpc.Client_rpc.service_spec = + { package = rpc.package; service_name = rpc.service_name }; + rpc_name = rpc.rpc_name; + encode_request = encode rpc.encode_pb_req; + decode_response = decode rpc.decode_pb_res; + request_mode; + response_mode; + } + + let unary rpc = make rpc ~request_mode:Unary ~response_mode:Unary + let client_streaming rpc = make rpc ~request_mode:Stream ~response_mode:Unary + let server_streaming rpc = make rpc ~request_mode:Unary ~response_mode:Stream + + let bidirectional_streaming rpc = + make rpc ~request_mode:Stream ~response_mode:Stream +end + +module Server_rpc = struct + let make (type request response) + (rpc : (request, _, response, _) Pbrt_services.Server.rpc) ~request_mode + ~response_mode = + { + Grpc.Rpc.Server_rpc.service_spec = None; + rpc_name = rpc.name; + decode_request = decode rpc.decode_pb_req; + encode_response = encode rpc.encode_pb_res; + request_mode; + response_mode; + } + + let unary rpc = make rpc ~request_mode:Unary ~response_mode:Unary + let client_streaming rpc = make rpc ~request_mode:Stream ~response_mode:Unary + let server_streaming rpc = make rpc ~request_mode:Unary ~response_mode:Stream + + let bidirectional_streaming rpc = + make rpc ~request_mode:Stream ~response_mode:Stream +end + +let handlers { Pbrt_services.Server.package; service_name; handlers } = + Grpc.Rpc.Handlers.With_service_spec + { service_spec = { package; service_name }; handlers } diff --git a/lib/grpc-protoc/grpc_protoc.mli b/lib/grpc-protoc/grpc_protoc.mli new file mode 100644 index 0000000..6390a10 --- /dev/null +++ b/lib/grpc-protoc/grpc_protoc.mli @@ -0,0 +1,105 @@ +module Client_rpc : sig + val unary : + ( 'request, + Pbrt_services.Value_mode.unary, + 'response, + Pbrt_services.Value_mode.unary ) + Pbrt_services.Client.rpc -> + ( 'request, + Grpc.Rpc.Value_mode.unary, + 'response, + Grpc.Rpc.Value_mode.unary ) + Grpc.Rpc.Client_rpc.t + + val client_streaming : + ( 'request, + Pbrt_services.Value_mode.stream, + 'response, + Pbrt_services.Value_mode.unary ) + Pbrt_services.Client.rpc -> + ( 'request, + Grpc.Rpc.Value_mode.stream, + 'response, + Grpc.Rpc.Value_mode.unary ) + Grpc.Rpc.Client_rpc.t + + val server_streaming : + ( 'request, + Pbrt_services.Value_mode.unary, + 'response, + Pbrt_services.Value_mode.stream ) + Pbrt_services.Client.rpc -> + ( 'request, + Grpc.Rpc.Value_mode.unary, + 'response, + Grpc.Rpc.Value_mode.stream ) + Grpc.Rpc.Client_rpc.t + + val bidirectional_streaming : + ( 'request, + Pbrt_services.Value_mode.stream, + 'response, + Pbrt_services.Value_mode.stream ) + Pbrt_services.Client.rpc -> + ( 'request, + Grpc.Rpc.Value_mode.stream, + 'response, + Grpc.Rpc.Value_mode.stream ) + Grpc.Rpc.Client_rpc.t +end + +module Server_rpc : sig + val unary : + ( 'request, + Pbrt_services.Value_mode.unary, + 'response, + Pbrt_services.Value_mode.unary ) + Pbrt_services.Server.rpc -> + ( 'request, + Grpc.Rpc.Value_mode.unary, + 'response, + Grpc.Rpc.Value_mode.unary, + unit ) + Grpc.Rpc.Server_rpc.t + + val client_streaming : + ( 'request, + Pbrt_services.Value_mode.stream, + 'response, + Pbrt_services.Value_mode.unary ) + Pbrt_services.Server.rpc -> + ( 'request, + Grpc.Rpc.Value_mode.stream, + 'response, + Grpc.Rpc.Value_mode.unary, + unit ) + Grpc.Rpc.Server_rpc.t + + val server_streaming : + ( 'request, + Pbrt_services.Value_mode.unary, + 'response, + Pbrt_services.Value_mode.stream ) + Pbrt_services.Server.rpc -> + ( 'request, + Grpc.Rpc.Value_mode.unary, + 'response, + Grpc.Rpc.Value_mode.stream, + unit ) + Grpc.Rpc.Server_rpc.t + + val bidirectional_streaming : + ( 'request, + Pbrt_services.Value_mode.stream, + 'response, + Pbrt_services.Value_mode.stream ) + Pbrt_services.Server.rpc -> + ( 'request, + Grpc.Rpc.Value_mode.stream, + 'response, + Grpc.Rpc.Value_mode.stream, + unit ) + Grpc.Rpc.Server_rpc.t +end + +val handlers : 'a Pbrt_services.Server.t -> (_, 'a) Grpc.Rpc.Handlers.t diff --git a/lib/grpc/grpc.ml b/lib/grpc/grpc.ml index 00ca697..c84744b 100644 --- a/lib/grpc/grpc.ml +++ b/lib/grpc/grpc.ml @@ -2,3 +2,4 @@ module Server = Server module Status = Status module Message = Message module Buffer = Buffer +module Rpc = Rpc diff --git a/lib/grpc/rpc.ml b/lib/grpc/rpc.ml new file mode 100644 index 0000000..487cf7e --- /dev/null +++ b/lib/grpc/rpc.ml @@ -0,0 +1,47 @@ +type buffer = string + +module Value_mode = struct + type unary + type stream + type _ t = Unary : unary t | Stream : stream t +end + +module Service_spec = struct + type t = { package : string list; service_name : string } + + let packaged_service_name t = + String.concat "." (t.package @ [ t.service_name ]) +end + +module Handlers = struct + type ('a, 'b) t = + | Handlers of { handlers : 'a list } + | With_service_spec of { handlers : 'b list; service_spec : Service_spec.t } + | ( :: ) of ('a, 'b) t * ('a, 'b) t list +end + +module Client_rpc = struct + type ('request, 'request_mode, 'response, 'response_mode) t = { + service_spec : Service_spec.t; + rpc_name : string; + encode_request : 'request -> buffer; + decode_response : buffer -> 'response; + request_mode : 'request_mode Value_mode.t; + response_mode : 'response_mode Value_mode.t; + } +end + +module Server_rpc = struct + module Service_spec = struct + type 'a t = None : unit t | Some : Service_spec.t -> Service_spec.t t + end + + type ('request, 'request_mode, 'response, 'response_mode, 'service_spec) t = { + service_spec : 'service_spec Service_spec.t; + rpc_name : string; + decode_request : buffer -> 'request; + encode_response : 'response -> buffer; + request_mode : 'request_mode Value_mode.t; + response_mode : 'response_mode Value_mode.t; + } +end diff --git a/lib/grpc/rpc.mli b/lib/grpc/rpc.mli new file mode 100644 index 0000000..4f86232 --- /dev/null +++ b/lib/grpc/rpc.mli @@ -0,0 +1,48 @@ +type buffer = string + +(** Exploring a separate client/server api that works better with [ocaml-protoc]. *) + +module Value_mode : sig + type unary + type stream + type _ t = Unary : unary t | Stream : stream t +end + +module Service_spec : sig + type t = { package : string list; service_name : string } + + val packaged_service_name : t -> string +end + +module Handlers : sig + type ('a, 'b) t = + | Handlers of { handlers : 'a list } + | With_service_spec of { handlers : 'b list; service_spec : Service_spec.t } + | ( :: ) of ('a, 'b) t * ('a, 'b) t list +end + +module Client_rpc : sig + type ('request, 'request_mode, 'response, 'response_mode) t = { + service_spec : Service_spec.t; + rpc_name : string; + encode_request : 'request -> buffer; + decode_response : buffer -> 'response; + request_mode : 'request_mode Value_mode.t; + response_mode : 'response_mode Value_mode.t; + } +end + +module Server_rpc : sig + module Service_spec : sig + type 'a t = None : unit t | Some : Service_spec.t -> Service_spec.t t + end + + type ('request, 'request_mode, 'response, 'response_mode, 'service_spec) t = { + service_spec : 'service_spec Service_spec.t; + rpc_name : string; + decode_request : buffer -> 'request; + encode_response : 'response -> buffer; + request_mode : 'request_mode Value_mode.t; + response_mode : 'response_mode Value_mode.t; + } +end