From def0f742337f51a6e6205bb7bb00c9edda364a59 Mon Sep 17 00:00:00 2001 From: Wojtek Czekalski Date: Mon, 15 Apr 2024 12:09:48 +0200 Subject: [PATCH] Refactor gRPC eio packages to improve modularity --- .ocamlformat | 2 +- dune | 3 + dune-project | 79 ++++++- examples/greeter-client-eio/dune | 2 +- .../greeter-client-eio/greeter_client_eio.ml | 59 ++--- examples/greeter-server-eio/dune | 2 +- .../greeter-server-eio/greeter_server_eio.ml | 67 ++---- examples/routeguide/src/client.ml | 191 +++++---------- examples/routeguide/src/dune | 8 +- examples/routeguide/src/server.ml | 86 +++---- flake.lock | 118 +++++++++- flake.nix | 218 +++++++++++++----- grpc-async.opam | 2 +- grpc-client-eio.opam | 39 ++++ grpc-client.opam | 41 ++++ grpc-core-eio.opam | 40 ++++ grpc-eio-net-client-h2.opam | 39 ++++ grpc-eio-net-server-h2.opam | 40 ++++ grpc-lwt.opam | 2 +- grpc-eio.opam => grpc-server-eio.opam | 5 +- grpc-server.opam | 41 ++++ grpc.opam | 3 +- lib/{grpc-async => async}/client.ml | 0 lib/{grpc-async => async}/client.mli | 0 lib/{grpc-async => async}/connection.ml | 0 lib/{grpc-async => async}/dune | 0 lib/{grpc-async => async}/grpc_async.ml | 0 lib/{grpc-async => async}/server.ml | 0 lib/{grpc-async => async}/server.mli | 0 lib/eio/client/client.ml | 129 +++++++++++ lib/eio/client/client.mli | 69 ++++++ lib/eio/client/dune | 5 + lib/eio/client/net.ml | 38 +++ lib/eio/core/dune | 4 + lib/{grpc-eio => eio/core}/seq.ml | 0 lib/{grpc-eio => eio/core}/seq.mli | 0 lib/eio/core/stream.ml | 26 +++ lib/eio/core/stream.mli | 11 + lib/eio/net-client-h2/dune | 4 + .../net-client-h2/grpc_eio_net_client_h2.ml | 138 +++++++++++ .../net-client-h2/grpc_eio_net_client_h2.mli | 19 ++ lib/eio/net-server-h2/dune | 4 + .../net-server-h2/grpc_eio_net_server_h2.ml | 91 ++++++++ .../net-server-h2/grpc_eio_net_server_h2.mli | 14 ++ lib/eio/server/dune | 8 + lib/eio/server/grpc_server_eio.ml | 72 ++++++ lib/eio/server/grpc_server_eio.mli | 35 +++ lib/eio/server/net.ml | 23 ++ lib/{grpc-eio => eio/server}/readme.md | 0 lib/grpc-client/dune | 4 + lib/grpc-client/grpc_client.ml | 31 +++ lib/grpc-client/grpc_client.mli | 8 + lib/grpc-eio/client.ml | 94 -------- lib/grpc-eio/client.mli | 54 ----- lib/grpc-eio/connection.ml | 43 ---- lib/grpc-eio/dune | 4 - lib/grpc-eio/grpc_eio.ml | 3 - lib/grpc-eio/server.ml | 114 --------- lib/grpc-eio/server.mli | 49 ---- lib/grpc-eio/stream.ml | 43 ---- lib/grpc-eio/stream.mli | 10 - lib/grpc-server/dune | 4 + lib/grpc-server/grpc_server.ml | 117 ++++++++++ lib/grpc-server/grpc_server.mli | 64 +++++ lib/grpc/dune | 2 +- lib/grpc/grpc.ml | 1 - lib/grpc/message.ml | 15 +- lib/grpc/message.mli | 5 + lib/grpc/server.ml | 14 -- lib/grpc/status.ml | 6 +- lib/grpc/status.mli | 4 +- lib/{grpc-lwt => lwt}/client.ml | 0 lib/{grpc-lwt => lwt}/client.mli | 0 lib/{grpc-lwt => lwt}/connection.ml | 0 lib/{grpc-lwt => lwt}/dune | 0 lib/{grpc-lwt => lwt}/grpc_lwt.ml | 0 lib/{grpc-lwt => lwt}/server.ml | 0 lib/{grpc-lwt => lwt}/server.mli | 0 78 files changed, 1678 insertions(+), 788 deletions(-) create mode 100644 dune create mode 100644 grpc-client-eio.opam create mode 100644 grpc-client.opam create mode 100644 grpc-core-eio.opam create mode 100644 grpc-eio-net-client-h2.opam create mode 100644 grpc-eio-net-server-h2.opam rename grpc-eio.opam => grpc-server-eio.opam (92%) create mode 100644 grpc-server.opam rename lib/{grpc-async => async}/client.ml (100%) rename lib/{grpc-async => async}/client.mli (100%) rename lib/{grpc-async => async}/connection.ml (100%) rename lib/{grpc-async => async}/dune (100%) rename lib/{grpc-async => async}/grpc_async.ml (100%) rename lib/{grpc-async => async}/server.ml (100%) rename lib/{grpc-async => async}/server.mli (100%) create mode 100644 lib/eio/client/client.ml create mode 100644 lib/eio/client/client.mli create mode 100644 lib/eio/client/dune create mode 100644 lib/eio/client/net.ml create mode 100644 lib/eio/core/dune rename lib/{grpc-eio => eio/core}/seq.ml (100%) rename lib/{grpc-eio => eio/core}/seq.mli (100%) create mode 100644 lib/eio/core/stream.ml create mode 100644 lib/eio/core/stream.mli create mode 100644 lib/eio/net-client-h2/dune create mode 100644 lib/eio/net-client-h2/grpc_eio_net_client_h2.ml create mode 100644 lib/eio/net-client-h2/grpc_eio_net_client_h2.mli create mode 100644 lib/eio/net-server-h2/dune create mode 100644 lib/eio/net-server-h2/grpc_eio_net_server_h2.ml create mode 100644 lib/eio/net-server-h2/grpc_eio_net_server_h2.mli create mode 100644 lib/eio/server/dune create mode 100644 lib/eio/server/grpc_server_eio.ml create mode 100644 lib/eio/server/grpc_server_eio.mli create mode 100644 lib/eio/server/net.ml rename lib/{grpc-eio => eio/server}/readme.md (100%) create mode 100644 lib/grpc-client/dune create mode 100644 lib/grpc-client/grpc_client.ml create mode 100644 lib/grpc-client/grpc_client.mli delete mode 100644 lib/grpc-eio/client.ml delete mode 100644 lib/grpc-eio/client.mli delete mode 100644 lib/grpc-eio/connection.ml delete mode 100644 lib/grpc-eio/dune delete mode 100644 lib/grpc-eio/grpc_eio.ml delete mode 100644 lib/grpc-eio/server.ml delete mode 100644 lib/grpc-eio/server.mli delete mode 100644 lib/grpc-eio/stream.ml delete mode 100644 lib/grpc-eio/stream.mli create mode 100644 lib/grpc-server/dune create mode 100644 lib/grpc-server/grpc_server.ml create mode 100644 lib/grpc-server/grpc_server.mli delete mode 100644 lib/grpc/server.ml rename lib/{grpc-lwt => lwt}/client.ml (100%) rename lib/{grpc-lwt => lwt}/client.mli (100%) rename lib/{grpc-lwt => lwt}/connection.ml (100%) rename lib/{grpc-lwt => lwt}/dune (100%) rename lib/{grpc-lwt => lwt}/grpc_lwt.ml (100%) rename lib/{grpc-lwt => lwt}/server.ml (100%) rename lib/{grpc-lwt => lwt}/server.mli (100%) diff --git a/.ocamlformat b/.ocamlformat index 3e21906..d782af1 100644 --- a/.ocamlformat +++ b/.ocamlformat @@ -1,2 +1,2 @@ -version=0.26.1 +# version=0.26.1 ocaml-version=4.08 diff --git a/dune b/dune new file mode 100644 index 0000000..494744c --- /dev/null +++ b/dune @@ -0,0 +1,3 @@ +(dirs bench examples lib) + +(vendored_dirs ocaml-h2 gluten) diff --git a/dune-project b/dune-project index 32c4d3a..ae55528 100644 --- a/dune-project +++ b/dune-project @@ -28,7 +28,7 @@ (name grpc) (synopsis "A modular gRPC library") (description - "This library builds some of the signatures and implementations of gRPC functionality. This is used in the more specialised package `grpc-lwt` which has more machinery, however this library can also be used to do some bits yourself.") + "This library contains the implementation of (de)serialization of gRPC messages and statuses.") (tags (network rpc serialisation)) (depends @@ -36,12 +36,46 @@ (>= 4.08)) (bigstringaf (>= 0.9.1)) - (h2 - (>= 0.9.0)) ppx_deriving (uri (>= 4.0.0)))) +(package + (name grpc-server) + (synopsis "Reusable logic for server side gRPC") + (description + "All modules are networking-layer and concurrency-layer agnostic.") + (tags + (network rpc serialisation)) + (depends + (ocaml + (>= 4.08)) + (grpc (= :version)))) + +(package + (name grpc-client) + (synopsis "Reusable logic for client side gRPC") + (description + "All modules are networking-layer and concurrency-layer agnostic.") + (tags + (network rpc serialisation)) + (depends + (ocaml + (>= 4.08)) + (grpc (= :version)))) + +(package + (name grpc-core-eio) + (synopsis "Shared logic for gRPC clients and servers based on eio.") + (description + "All modules are networking-layer agnostic.") + (tags + (network rpc serialisation)) + (depends + (eio + (>= 0.12)) + (grpc (= :version)))) + (package (name grpc-lwt) (synopsis "An Lwt implementation of gRPC") @@ -50,7 +84,7 @@ (tags (network rpc serialisation)) (depends - (grpc + (grpc-server (= :version)) (h2 (>= 0.9.0)) @@ -70,7 +104,7 @@ (>= 4.11)) (async (>= v0.16)) - (grpc + (grpc-server (= :version)) (h2 (>= 0.9.0)) @@ -79,19 +113,48 @@ stringext)) (package - (name grpc-eio) - (synopsis "An Eio implementation of gRPC") + (name grpc-server-eio) + (deprecated_package_names grpc-eio) + (synopsis "An Eio implementation of gRPC server") (description "Functionality for building gRPC services and rpcs with `eio`.") (depends (eio (>= 0.12)) - (grpc + (grpc-server + (= :version)) + stringext)) + +(package + (name grpc-eio-net-server-h2) + (synopsis "An h2 implementation of gRPC networking layer for eio based servers.") + (depends + (grpc-server-eio (= :version)) (h2 (>= 0.9.0)) stringext)) +(package + (name grpc-client-eio) + (synopsis "An Eio implementation of gRPC client") + (description + "Functionality for building gRPC services and rpcs with `eio`.") + (depends + (eio + (>= 0.12)) + (grpc-client + (= :version)))) + +(package + (name grpc-eio-net-client-h2) + (synopsis "An h2 implementation of gRPC networking layer for eio based clients.") + (depends + (grpc-client-eio + (= :version)) + (h2 + (>= 0.9.0)))) + (package (name grpc-examples) (synopsis "Various gRPC examples") diff --git a/examples/greeter-client-eio/dune b/examples/greeter-client-eio/dune index 37f97bc..795ebad 100644 --- a/examples/greeter-client-eio/dune +++ b/examples/greeter-client-eio/dune @@ -1,3 +1,3 @@ (executable (name greeter_client_eio) - (libraries grpc grpc-eio ocaml-protoc-plugin eio_main greeter h2 h2-eio)) + (libraries grpc-client-eio ocaml-protoc-plugin eio_main greeter grpc-eio-net-client-h2)) diff --git a/examples/greeter-client-eio/greeter_client_eio.ml b/examples/greeter-client-eio/greeter_client_eio.ml index c8b0530..1ed0bda 100644 --- a/examples/greeter-client-eio/greeter_client_eio.ml +++ b/examples/greeter-client-eio/greeter_client_eio.ml @@ -1,56 +1,33 @@ let main env = let name = if Array.length Sys.argv > 1 then Sys.argv.(1) else "anonymous" in - let host = "localhost" in - let port = "8080" in let network = Eio.Stdenv.net env in let run sw = - let inet, port = - Eio_unix.run_in_systhread (fun () -> - Unix.getaddrinfo host port [ Unix.(AI_FAMILY PF_INET) ]) - |> List.filter_map (fun (addr : Unix.addr_info) -> - match addr.ai_addr with - | Unix.ADDR_UNIX _ -> None - | ADDR_INET (addr, port) -> Some (addr, port)) - |> List.hd - in - let addr = `Tcp (Eio_unix.Net.Ipaddr.of_unix inet, port) in - let socket = Eio.Net.connect ~sw network addr in - let connection = - H2_eio.Client.create_connection ~sw ~error_handler:ignore socket - in - let open Ocaml_protoc_plugin in let open Greeter.Mypackage in let encode, decode = Service.make_client_functions Greeter.sayHello in - let encoded_request = - HelloRequest.make ~name () |> encode |> Writer.contents - in - let f decoder = - match decoder with - | Some decoder -> ( - Reader.create decoder |> decode |> function - | Ok v -> v - | Error e -> - failwith - (Printf.sprintf "Could not decode request: %s" - (Result.show_error e))) - | None -> Greeter.SayHello.Response.make () + let net = + Grpc_eio_net_client_h2.create_client ~sw ~net:network + "http://localhost:8080" in let result = - Grpc_eio.Client.call ~service:"mypackage.Greeter" ~rpc:"SayHello" - ~do_request:(H2_eio.Client.request connection ~error_handler:ignore) - ~handler:(Grpc_eio.Client.Rpc.unary encoded_request ~f) - () + Grpc_client_eio.Client.unary ~sw ~net ~service:"mypackage.Greeter" + ~method_name:"SayHello" + ~encode:(fun x -> x |> encode |> Writer.contents) + ~decode:(fun x -> Reader.create x |> decode) + ~headers:(Grpc_client.make_request_headers `Proto) + (HelloRequest.make ~name ()) in - Eio.Promise.await (H2_eio.Client.shutdown connection); - result + match result with + | Ok message -> Eio.traceln "%s" message + | Error (`Rpc (response, status)) -> + Eio.traceln "Error: %a, %a" H2.Status.pp_hum response.status + Grpc.Status.pp status + | Error (`Connection _err) -> Eio.traceln "Connection error" + | Error (`Decoding err) -> + Eio.traceln "Decoding error: %a" Ocaml_protoc_plugin.Result.pp_error err in Eio.Switch.run run -let () = - match Eio_main.run main with - | Ok (message, status) -> - Eio.traceln "%s: %s" (Grpc.Status.show status) message - | Error err -> Eio.traceln "Error: %a" H2.Status.pp_hum err +let () = Eio_main.run main diff --git a/examples/greeter-server-eio/dune b/examples/greeter-server-eio/dune index 8108aa6..576aef5 100644 --- a/examples/greeter-server-eio/dune +++ b/examples/greeter-server-eio/dune @@ -1,3 +1,3 @@ (executable (name greeter_server_eio) - (libraries grpc grpc-eio ocaml-protoc-plugin eio_main greeter h2 h2-eio)) + (libraries eio grpc-server-eio ocaml-protoc-plugin eio_main greeter grpc-eio-net-server-h2)) diff --git a/examples/greeter-server-eio/greeter_server_eio.ml b/examples/greeter-server-eio/greeter_server_eio.ml index 16aaba0..0d41123 100644 --- a/examples/greeter-server-eio/greeter_server_eio.ml +++ b/examples/greeter-server-eio/greeter_server_eio.ml @@ -1,6 +1,7 @@ -open Grpc_eio +module Server = Grpc_server_eio +module Net = Grpc_eio_net_server_h2 -let say_hello buffer = +let say_hello env buffer = let open Ocaml_protoc_plugin in let open Greeter.Mypackage in let decode, encode = Service.make_service_functions Greeter.sayHello in @@ -16,55 +17,35 @@ let say_hello buffer = else Format.sprintf "Hello, %s!" request in let reply = Greeter.SayHello.Response.make ~message () in - (Grpc.Status.(v OK), Some (encode reply |> Writer.contents)) - -let connection_handler server sw = - let error_handler client_address ?request:_ _error start_response = - Eio.traceln "Error in request from:%a" Eio.Net.Sockaddr.pp client_address; - let response_body = start_response H2.Headers.empty in - H2.Body.Writer.write_string response_body - "There was an error handling your request.\n"; - H2.Body.Writer.close response_body - in - let request_handler client_address request_descriptor = - Eio.traceln "Handling a request from:%a" Eio.Net.Sockaddr.pp client_address; - Eio.Fiber.fork ~sw (fun () -> - Grpc_eio.Server.handle_request server request_descriptor) - in - fun socket addr -> - H2_eio.Server.create_connection_handler ?config:None ~request_handler - ~error_handler addr ~sw socket + Eio.Time.sleep env#clock 10.0; + (Grpc_server.trailers_with_code OK, Some (encode reply |> Writer.contents)) let serve server env = let port = 8080 in let net = Eio.Stdenv.net env in let addr = `Tcp (Eio.Net.Ipaddr.V4.loopback, port) in Eio.Switch.run @@ fun sw -> - let handler = connection_handler server sw in let server_socket = Eio.Net.listen net ~sw ~reuse_addr:true ~backlog:10 addr in - let rec listen () = - Eio.Net.accept_fork ~sw server_socket - ~on_error:(fun exn -> Eio.traceln "%s" (Printexc.to_string exn)) - handler; - listen () + let connection_handler client_addr socket = + Eio.Switch.run (fun sw -> + Net.connection_handler ~sw server client_addr socket) in - Printf.printf "Listening on port %i for grpc requests\n" port; - print_endline ""; - print_endline "Try running:"; - print_endline ""; - print_endline - {| dune exec -- examples/greeter-client-eio/greeter_client_eio.exe |}; - listen () + Eio.Net.run_server + ~on_error:(fun exn -> Eio.traceln "%s" (Printexc.to_string exn)) + server_socket connection_handler -let () = - let greeter_service = - Server.Service.( - v () |> add_rpc ~name:"SayHello" ~rpc:(Unary say_hello) |> handle_request) - in - let server = - Server.( - v () |> add_service ~name:"mypackage.Greeter" ~service:greeter_service) - in - Eio_main.run (serve server) +let mk_handler f = + { Grpc_server_eio.Rpc.headers = (fun _ -> Grpc_server.headers `Proto); f } + +let server env = + let add_rpc = Server.Service.add_rpc in + let open Server.Rpc in + let service = + Server.Service.v () + |> add_rpc ~name:"SayHello" ~rpc:(mk_handler (unary (say_hello env))) + in + Server.(make () |> add_service ~name:"mypackage.Greeter" ~service) + +let () = Eio_main.run (fun env -> serve (server env) env) diff --git a/examples/routeguide/src/client.ml b/examples/routeguide/src/client.ml index 92269dc..2b8941a 100644 --- a/examples/routeguide/src/client.ml +++ b/examples/routeguide/src/client.ml @@ -1,51 +1,26 @@ -open Grpc_eio open Routeguide.Route_guide.Routeguide open Ocaml_protoc_plugin - -(* $MDX part-begin=client-h2 *) -let client ~sw host port network = - let inet, port = - Eio_unix.run_in_systhread (fun () -> - Unix.getaddrinfo host port [ Unix.(AI_FAMILY PF_INET) ]) - |> List.filter_map (fun (addr : Unix.addr_info) -> - match addr.ai_addr with - | Unix.ADDR_UNIX _ -> None - | ADDR_INET (addr, port) -> Some (addr, port)) - |> List.hd - in - let addr = `Tcp (Eio_unix.Net.Ipaddr.of_unix inet, port) in - let socket = Eio.Net.connect ~sw network addr in - H2_eio.Client.create_connection ~sw ~error_handler:ignore socket +module Client = Grpc_client_eio.Client (* $MDX part-end *) (* $MDX part-begin=client-get-feature *) -let call_get_feature connection point = +let call_get_feature sw net point = let encode, decode = Service.make_client_functions RouteGuide.getFeature in let response = - Client.call ~service:"routeguide.RouteGuide" ~rpc:"GetFeature" - ~do_request:(H2_eio.Client.request connection ~error_handler:ignore) - ~handler: - (Client.Rpc.unary - (encode point |> Writer.contents) - ~f:(fun response -> - match response with - | Some response -> ( - Reader.create response |> decode |> function - | Ok feature -> feature - | Error e -> - failwith - (Printf.sprintf "Could not decode request: %s" - (Result.show_error e))) - | None -> Feature.make ())) - () + Client.unary ~sw + ~headers:(Grpc_client.make_request_headers `Proto) + ~service:"routeguide.RouteGuide" ~method_name:"GetFeature" ~net + ~encode:(fun point -> encode point |> Writer.contents) + ~decode:(fun str -> decode (Reader.create str)) + point in match response with - | Ok (res, _ok) -> Printf.printf "RESPONSE = {%s}" (Feature.show res) + | Ok res -> Printf.printf "RESPONSE = {%s}" (Feature.show res) | Error _ -> Printf.printf "an error occurred" (* $MDX part-end *) (* $MDX part-begin=client-list-features *) -let print_features connection = +let print_features sw net = let rectangle = Rectangle.make ~lo:(Point.make ~latitude:400000000 ~longitude:(-750000000) ()) @@ -55,33 +30,23 @@ let print_features connection = let encode, decode = Service.make_client_functions RouteGuide.listFeatures in let stream = - Client.call ~service:"routeguide.RouteGuide" ~rpc:"ListFeatures" - ~do_request:(H2_eio.Client.request connection ~error_handler:ignore) - ~handler: - (Client.Rpc.server_streaming - (encode rectangle |> Writer.contents) - ~f:(fun responses -> - let stream = - Seq.map - (fun str -> - Reader.create str |> decode |> function - | Ok feature -> feature - | Error e -> - failwith - (Printf.sprintf "Could not decode request: %s" - (Result.show_error e))) - (Grpc_eio.Stream.to_seq responses) - in - stream)) - () + Client.server_streaming ~sw ~net ~service:"routeguide.RouteGuide" + ~method_name:"ListFeatures" + ~encode:(fun rectangle -> encode rectangle |> Writer.contents) + ~decode:(fun str -> decode (Reader.create str)) + ~headers:(Grpc_client.make_request_headers `Proto) + rectangle + (fun ~read -> + let rec loop () = + match read () with + | None -> () + | Some f -> + Printf.printf "RESPONSE = {%s}" (Feature.show f); + loop () + in + loop ()) in - match stream with - | Ok (results, _ok) -> - Seq.iter - (fun f -> Printf.printf "RESPONSE = {%s}" (Feature.show f)) - results - | Error e -> - failwith (Printf.sprintf "HTTP2 error: %s" (H2.Status.to_string e)) + match stream with Ok () -> () | Error _e -> failwith "an erra" (* $MDX part-end *) (* $MDX part-begin=client-random-point *) @@ -92,7 +57,7 @@ let random_point () : Point.t = (* $MDX part-end *) (* $MDX part-begin=client-record-route *) -let run_record_route connection = +let run_record_route sw net = let points = Random.int 100 |> Seq.unfold (function 0 -> None | x -> Some (random_point (), x - 1)) @@ -100,39 +65,20 @@ let run_record_route connection = let encode, decode = Service.make_client_functions RouteGuide.recordRoute in let response = - Client.call ~service:"routeguide.RouteGuide" ~rpc:"RecordRoute" - ~do_request:(H2_eio.Client.request connection ~error_handler:ignore) - ~handler: - (Client.Rpc.client_streaming ~f:(fun ~send ~close response -> - (* Stream points to server. *) - Seq.iter - (fun point -> encode point |> Writer.contents |> send) - points; - - (* Signal we have finished sending points. *) - close (); - - (* Decode RouteSummary responses. *) - Eio.Promise.await response |> function - | Some str -> ( - Reader.create str |> decode |> function - | Ok feature -> feature - | Error err -> - failwith - (Printf.sprintf "Could not decode request: %s" - (Result.show_error err))) - | None -> failwith (Printf.sprintf "No RouteSummary received."))) - () + Client.client_streaming ~net ~sw ~service:"routeguide.RouteGuide" + ~method_name:"RecordRoute" + ~encode:(fun point -> encode point |> Writer.contents) + ~decode:(fun str -> decode (Reader.create str)) + ~headers:(Grpc_client.make_request_headers `Proto) + (fun ~write -> Seq.iter write points) in match response with - | Ok (result, _ok) -> - Printf.printf "SUMMARY = {%s}" (RouteSummary.show result) - | Error e -> - failwith (Printf.sprintf "HTTP2 error: %s" (H2.Status.to_string e)) + | Ok result -> Printf.printf "SUMMARY = {%s}" (RouteSummary.show result) + | Error _e -> failwith "Error occured" (* $MDX part-end *) (* $MDX part-begin=client-route-chat-1 *) -let run_route_chat clock connection = +let run_route_chat clock net sw = (* Generate locations. *) let location_count = 5 in Printf.printf "Generating %i locations\n" location_count; @@ -152,72 +98,59 @@ let run_route_chat clock connection = let encode, decode = Service.make_client_functions RouteGuide.routeChat in let rec go ~send ~close reader notes = match Seq.uncons notes with - | None -> close () (* Signal no more notes from the client. *) + | None -> close () (* Signal no more notes from the server. *) | Some (route_note, xs) -> ( - encode route_note |> Writer.contents |> fun x -> - send x; + send route_note; - (* Yield and sleep, waiting for server reply. *) Eio.Time.sleep clock 1.0; - Eio.Fiber.yield (); - match Seq.uncons reader with + match reader () with | None -> failwith "Expecting response" - | Some (response, reader') -> - let route_note = - Reader.create response |> decode |> function - | Ok route_note -> route_note - | Error e -> - failwith - (Printf.sprintf "Could not decode request: %s" - (Result.show_error e)) - in - Printf.printf "NOTE = {%s}\n" (RouteNote.show route_note); - go ~send ~close reader' xs) + | Some route_note -> + Printf.printf "NOTE = {%s}\n%!" (RouteNote.show route_note); + go ~send ~close reader xs) in let result = - Client.call ~service:"routeguide.RouteGuide" ~rpc:"RouteChat" - ~do_request:(H2_eio.Client.request connection ~error_handler:ignore) - ~handler: - (Client.Rpc.bidirectional_streaming ~f:(fun ~send ~close reader -> - go ~send ~close (Stream.to_seq reader) route_notes)) - () + Client.bidirectional_streaming ~service:"routeguide.RouteGuide" + ~method_name:"RouteChat" ~net ~sw + ~headers:(Grpc_client.make_request_headers `Proto) + ~encode:(fun x -> encode x |> Writer.contents) + ~decode:(fun x -> decode (Reader.create x)) + (fun ~writer ~take -> + go ~send:writer.Client.write ~close:writer.close take route_notes) in - match result with - | Ok ((), _ok) -> () - | Error e -> - failwith (Printf.sprintf "HTTP2 error: %s" (H2.Status.to_string e)) + match result with Ok () -> () | Error _e -> failwith "Error" (* $MDX part-end *) (* $MDX part-begin=client-main *) let main env = - let port = "8080" in - let host = "localhost" in let clock = Eio.Stdenv.clock env in let network = Eio.Stdenv.net env in let () = Random.self_init () in let run sw = - let connection = client ~sw host port network in + let net = + Grpc_eio_net_client_h2.create_client ~net:network ~sw + "http://localhost:8080" + in - Printf.printf "*** SIMPLE RPC ***\n"; + Printf.printf "*** SIMPLE RPC ***\n%!"; let request = RouteGuide.GetFeature.Request.make ~latitude:409146138 ~longitude:(-746188906) () in - let result = call_get_feature connection request in + let result = call_get_feature sw net request in - Printf.printf "\n*** SERVER STREAMING ***\n"; - print_features connection; + Printf.printf "\n*** SERVER STREAMING ***\n%!"; + print_features sw net; - Printf.printf "\n*** CLIENT STREAMING ***\n"; - run_record_route connection; + Printf.printf "\n*** CLIENT STREAMING ***\n%!"; + run_record_route sw net; - Printf.printf "\n*** BIDIRECTIONAL STREAMING ***\n"; - run_route_chat clock connection; + Printf.printf "\n*** BIDIRECTIONAL STREAMING ***\n%!"; + run_route_chat clock net sw; - Eio.Promise.await (H2_eio.Client.shutdown connection); result in diff --git a/examples/routeguide/src/dune b/examples/routeguide/src/dune index 9c5afaf..6a048d7 100644 --- a/examples/routeguide/src/dune +++ b/examples/routeguide/src/dune @@ -3,12 +3,14 @@ (package grpc-examples) (public_names routeguide-server routeguide-client) (libraries - grpc-eio + grpc-server-eio + grpc-client-eio eio_main - h2-eio ocaml-protoc-plugin routeguide yojson - ppx_deriving_yojson.runtime) + ppx_deriving_yojson.runtime + grpc-eio-net-server-h2 + grpc-eio-net-client-h2) (preprocess (pps ppx_deriving_yojson ppx_deriving.eq))) diff --git a/examples/routeguide/src/server.ml b/examples/routeguide/src/server.ml index 44113d0..f6add2f 100644 --- a/examples/routeguide/src/server.ml +++ b/examples/routeguide/src/server.ml @@ -1,6 +1,6 @@ -open Grpc_eio open Routeguide.Route_guide.Routeguide open Ocaml_protoc_plugin +module Server = Grpc_server_eio (* Derived data types to make reading JSON data easier. *) type location = { latitude : int; longitude : int } [@@deriving yojson] @@ -98,15 +98,17 @@ let get_feature (buffer : string) = (feature |> Option.map Feature.show |> Option.value ~default:"Missing"); match feature with | Some feature -> - (Grpc.Status.(v OK), Some (feature |> encode |> Writer.contents)) + ( Grpc_server.trailers_with_code Grpc.Status.OK, + Some (feature |> encode |> Writer.contents) ) | None -> (* No feature was found, return an unnamed feature. *) - ( Grpc.Status.(v OK), + ( Grpc_server.trailers_with_code Grpc.Status.OK, Some (Feature.make ~location:point () |> encode |> Writer.contents) ) (* $MDX part-end *) (* $MDX part-begin=server-list-features *) let list_features (buffer : string) (f : string -> unit) = + Eio.traceln "ListFeatures"; (* Decode request. *) let decode, encode = Service.make_service_functions RouteGuide.listFeatures in let rectangle = @@ -126,11 +128,11 @@ let list_features (buffer : string) (f : string -> unit) = else ()) !features in - Grpc.Status.(v OK) + Grpc_server.trailers_with_code OK (* $MDX part-end *) (* $MDX part-begin=server-record-route *) -let record_route (clock : #Eio.Time.clock) stream = +let record_route (clock : _ Eio.Time.clock) stream = Eio.traceln "RecordRoute"; let last_point = ref None in let start = Eio.Time.now clock in @@ -170,7 +172,7 @@ let record_route (clock : #Eio.Time.clock) stream = last_point := Some point; (point_count, feature_count, distance)) (0, 0, 0) - (Grpc_eio.Stream.to_seq stream) + (Grpc_core_eio.Stream.to_seq stream) in let stop = Eio.Time.now clock in let elapsed_time = int_of_float (stop -. start) in @@ -178,12 +180,12 @@ let record_route (clock : #Eio.Time.clock) stream = RouteSummary.make ~point_count ~feature_count ~distance ~elapsed_time () in Eio.traceln "RecordRoute exit\n"; - (Grpc.Status.(v OK), Some (encode summary |> Writer.contents)) + (Grpc_server.trailers_with_code OK, Some (encode summary |> Writer.contents)) (* $MDX part-end *) (* $MDX part-begin=server-route-chat *) -let route_chat stream (f : string -> unit) = - Printf.printf "RouteChat\n"; +let route_chat stream (write : string -> unit) = + Printf.printf "RouteChat\n%!"; let decode, encode = Service.make_service_functions RouteGuide.routeChat in Seq.iter @@ -196,66 +198,54 @@ let route_chat stream (f : string -> unit) = (Printf.sprintf "Could not decode request: %s" (Result.show_error e)) in - Printf.printf " ==> Note = {%s}\n" (RouteNote.show note); - encode note |> Writer.contents |> f) - (Grpc_eio.Stream.to_seq stream); + Printf.printf " ==> Note = {%s}\n%!" (RouteNote.show note); + encode note |> Writer.contents |> write) + (Grpc_core_eio.Stream.to_seq stream); - Printf.printf "RouteChat exit\n"; - Grpc.Status.(v OK) + Printf.printf "RouteChat exit\n%!"; + Grpc_server.trailers_with_code OK (* $MDX part-end *) (* $MDX part-begin=server-grpc *) + +let mk_handler f = + { Grpc_server_eio.Rpc.headers = (fun _ -> Grpc_server.headers `Proto); f } + let route_guide_service clock = - Server.Service.( - v () - |> add_rpc ~name:"GetFeature" ~rpc:(Unary get_feature) - |> add_rpc ~name:"ListFeatures" ~rpc:(Server_streaming list_features) - |> add_rpc ~name:"RecordRoute" ~rpc:(Client_streaming (record_route clock)) - |> add_rpc ~name:"RouteChat" ~rpc:(Bidirectional_streaming route_chat) - |> handle_request) + let add_rpc = Server.Service.add_rpc in + let open Server.Rpc in + Server.Service.v () + |> add_rpc ~name:"GetFeature" ~rpc:(mk_handler (unary get_feature)) + |> add_rpc ~name:"ListFeatures" + ~rpc:(mk_handler (server_streaming list_features)) + |> add_rpc ~name:"RecordRoute" + ~rpc:(mk_handler (client_streaming (record_route clock))) + |> add_rpc ~name:"RouteChat" ~rpc:(mk_handler route_chat) let server clock = Server.( - v () + make () |> add_service ~name:"routeguide.RouteGuide" ~service:(route_guide_service clock)) (* $MDX part-end *) -let connection_handler server ~sw = - let error_handler client_address ?request:_ _error start_response = - Eio.traceln "Error in request from:%a" Eio.Net.Sockaddr.pp client_address; - let response_body = start_response H2.Headers.empty in - H2.Body.Writer.write_string response_body - "There was an error handling your request.\n"; - H2.Body.Writer.close response_body - in - let request_handler _client_address request_descriptor = - Eio.Fiber.fork ~sw (fun () -> - Grpc_eio.Server.handle_request server request_descriptor) - in - fun socket addr -> - H2_eio.Server.create_connection_handler ?config:None ~request_handler - ~error_handler addr socket ~sw (* $MDX part-begin=server-main *) -let serve server env = +let serve server env : unit = let port = 8080 in let net = Eio.Stdenv.net env in - let clock = Eio.Stdenv.clock env in let addr = `Tcp (Eio.Net.Ipaddr.V4.loopback, port) in Eio.Switch.run @@ fun sw -> - let handler = connection_handler ~sw (server clock) in let server_socket = Eio.Net.listen net ~sw ~reuse_addr:true ~backlog:10 addr in - let rec listen () = - Eio.Net.accept_fork ~sw server_socket - ~on_error:(fun exn -> Eio.traceln "%s" (Printexc.to_string exn)) - handler; - listen () + let connection_handler client_addr socket = + Eio.Switch.run (fun sw -> + Grpc_eio_net_server_h2.connection_handler ~sw server client_addr socket) in - Eio.traceln "Listening on port %i for grpc requests\n" port; - listen () + Eio.Net.run_server + ~on_error:(fun exn -> Eio.traceln "%s" (Printexc.to_string exn)) + server_socket connection_handler let () = let path = @@ -266,5 +256,5 @@ let () = (* Load features. *) features := load path; - Eio_main.run (serve server) + Eio_main.run (fun env -> serve (server (Eio.Stdenv.clock env)) env) (* $MDX part-end *) diff --git a/flake.lock b/flake.lock index 6d4b0fc..565f608 100644 --- a/flake.lock +++ b/flake.lock @@ -1,25 +1,131 @@ { "nodes": { + "flake-parts": { + "inputs": { + "nixpkgs-lib": "nixpkgs-lib" + }, + "locked": { + "lastModified": 1712014858, + "narHash": "sha256-sB4SWl2lX95bExY2gMFG5HIzvva5AVMJd4Igm+GpZNw=", + "owner": "hercules-ci", + "repo": "flake-parts", + "rev": "9126214d0a59633752a136528f5f3b9aa8565b7d", + "type": "github" + }, + "original": { + "owner": "hercules-ci", + "repo": "flake-parts", + "type": "github" + } + }, + "flake-utils": { + "inputs": { + "systems": "systems" + }, + "locked": { + "lastModified": 1710146030, + "narHash": "sha256-SZ5L6eA7HJ/nmkzGG7/ISclqe6oZdOZTNoesiInkXPQ=", + "owner": "numtide", + "repo": "flake-utils", + "rev": "b1d9ab70662946ef0850d488da1c9019f3a9752a", + "type": "github" + }, + "original": { + "owner": "numtide", + "repo": "flake-utils", + "type": "github" + } + }, + "nix-filter": { + "locked": { + "lastModified": 1710156097, + "narHash": "sha256-1Wvk8UP7PXdf8bCCaEoMnOT1qe5/Duqgj+rL8sRQsSM=", + "owner": "numtide", + "repo": "nix-filter", + "rev": "3342559a24e85fc164b295c3444e8a139924675b", + "type": "github" + }, + "original": { + "owner": "numtide", + "repo": "nix-filter", + "type": "github" + } + }, "nixpkgs": { + "inputs": { + "flake-utils": "flake-utils", + "nixpkgs": "nixpkgs_2" + }, "locked": { - "lastModified": 1609870929, - "narHash": "sha256-aVGF0O3T+Xg4avzyCWhkZG6DvqItK6u/1Y4yY7jnj80=", - "owner": "sternenseemann", + "lastModified": 1712561094, + "narHash": "sha256-cRvbal29hZjqtu9/hpQo4fGCH2YGKn+Kqo3apDOf5bo=", + "owner": "nix-ocaml", + "repo": "nix-overlays", + "rev": "bf4dbbb8793e72575f07489e317cc6309bca7f17", + "type": "github" + }, + "original": { + "owner": "nix-ocaml", + "repo": "nix-overlays", + "rev": "bf4dbbb8793e72575f07489e317cc6309bca7f17", + "type": "github" + } + }, + "nixpkgs-lib": { + "locked": { + "dir": "lib", + "lastModified": 1711703276, + "narHash": "sha256-iMUFArF0WCatKK6RzfUJknjem0H9m4KgorO/p3Dopkk=", + "owner": "NixOS", "repo": "nixpkgs", - "rev": "2de4f7dab09871fd05856ffde8f8e3bd40635579", + "rev": "d8fe5e6c92d0d190646fb9f1056741a229980089", "type": "github" }, "original": { - "owner": "sternenseemann", - "ref": "ppx_deriving-5.1", + "dir": "lib", + "owner": "NixOS", + "ref": "nixos-unstable", "repo": "nixpkgs", "type": "github" } }, + "nixpkgs_2": { + "locked": { + "lastModified": 1712514290, + "narHash": "sha256-Uvy+mgMdqRhuazAXwMQHVELi+yPGNj6+VTppWTurxRE=", + "owner": "NixOS", + "repo": "nixpkgs", + "rev": "274e6aa01f2c2266e1cd8debdb82863cd83e2ff7", + "type": "github" + }, + "original": { + "owner": "NixOS", + "repo": "nixpkgs", + "rev": "274e6aa01f2c2266e1cd8debdb82863cd83e2ff7", + "type": "github" + } + }, "root": { "inputs": { + "flake-parts": "flake-parts", + "nix-filter": "nix-filter", "nixpkgs": "nixpkgs" } + }, + "systems": { + "locked": { + "lastModified": 1681028828, + "narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=", + "owner": "nix-systems", + "repo": "default", + "rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e", + "type": "github" + }, + "original": { + "owner": "nix-systems", + "repo": "default", + "type": "github" + } } }, "root": "root", diff --git a/flake.nix b/flake.nix index 5b5a73c..0b4beb8 100644 --- a/flake.nix +++ b/flake.nix @@ -1,78 +1,174 @@ { - description = "A modular gRPC library"; + description = "Description for the project"; inputs = { nixpkgs = { - url = "github:sternenseemann/nixpkgs/ppx_deriving-5.1"; + url = + "github:nix-ocaml/nix-overlays/bf4dbbb8793e72575f07489e317cc6309bca7f17"; }; + flake-parts.url = "github:hercules-ci/flake-parts"; + nix-filter.url = "github:numtide/nix-filter"; }; - outputs = { self, nixpkgs }: - with import nixpkgs { system = "x86_64-linux"; }; - let - h2-src = fetchFromGitHub { - owner = "jeffa5"; - repo = "ocaml-h2"; - rev = "36bd7bfa46fb0eb2bce184413f663a46a5e0dd3b"; - sha256 = "sha256-8vsRpx0JVN6KHOVfKit6LhlQqGTO1ofRhfyDgJ7dGz0="; - }; + outputs = inputs@{ flake-parts, nix-filter, ... }: + flake-parts.lib.mkFlake { inherit inputs; } { + systems = + [ "x86_64-linux" "aarch64-linux" "x86_64-darwin" "aarch64-darwin" ]; - hpack = ocamlPackages.buildDunePackage { - pname = "hpack"; - version = "0.2.0"; - src = h2-src; - useDune2 = true; - buildInputs = (with ocamlPackages; [ angstrom faraday ]); - }; + imports = [ inputs.flake-parts.flakeModules.easyOverlay ]; - h2 = ocamlPackages.buildDunePackage { - pname = "h2"; - version = "0.7.0"; - src = h2-src; - useDune2 = true; - buildInputs = (with ocamlPackages; [ hpack result httpaf psq base64 ]); - }; - in - { - packages.x86_64-linux = rec { - grpc = - ocamlPackages.buildDunePackage { - pname = "grpc"; - version = "0.1.0"; - src = self; - useDune2 = true; - doCheck = true; - buildInputs = (with ocamlPackages; [ uri h2 ppx_deriving ]); + perSystem = { config, self', inputs', system, ... }: + let + pkgs = (import inputs.nixpkgs { + inherit system; + config.allowUnfree = true; + overlays = [ ]; + }).extend (self: super: { + ocamlPackages = super.ocaml-ng.ocamlPackages_5_1; + }); + camlPkgs = pkgs.ocamlPackages; + bechamel-notty = camlPkgs.buildDunePackage { + pname = "bechamel-notty"; + version = "0.5.0"; + duneVersion = "3"; + propagatedBuildInputs = + [ camlPkgs.notty camlPkgs.fmt camlPkgs.bechamel ]; + src = pkgs.fetchFromGitHub { + owner = "mirage"; + repo = "bechamel"; + rev = "v0.5.0"; + sha256 = "sha256-aTz80gjVi+ITqi8TXH1NjWPECuTcLFvTEDC7BoRo+6M="; + fetchSubmodules = true; + }; }; - - grpc-lwt = - ocamlPackages.buildDunePackage { - pname = "grpc-lwt"; + dialo-ocaml-protoc-plugin = camlPkgs.buildDunePackage { + pname = "ocaml-protoc-plugin"; version = "0.1.0"; - src = self; - useDune2 = true; - doCheck = true; - buildInputs = (with ocamlPackages; [ ocaml-protoc lwt stringext h2 grpc ]); - }; - }; - - defaultPackage.x86_64-linux = self.packages.x86_64-linux.grpc; + duneVersion = "3"; - devShell.x86_64-linux = mkShell { - buildInputs = [ - ocaml - opam + INCLUDE_GOOGLE_PROTOBUF = "${pkgs.protobuf}/include"; - m4 - pkgconfig + nativeBuildInputs = [ pkgs.protobuf ]; + propagatedBuildInputs = [ pkgs.protobuf pkgs.pkg-config ]; + buildInputs = with camlPkgs; [ lwt stringext ]; + src = pkgs.fetchFromGitHub { + owner = "dialohq"; + repo = "ocaml-protoc-plugin"; + rev = "b814b305520563fff58388682cb360660cc29c47"; + sha256 = "sha256-NgFvc+HTJXc17GwyfA0VqlWXx9R35FJ6CSEQrQ52Jds="; + fetchSubmodules = true; + }; + }; - nixpkgs-fmt - rnix-lsp - ]; + in { + devShells.default = pkgs.mkShell { + inputsFrom = [ + self'.packages.grpc + self'.packages.grpc-lwt + self'.packages.grpc-async + self'.packages.grpc-eio + self'.packages.grpc-examples + self'.packages.grpc-bench + ]; + nativeBuildInputs = with pkgs; [ + nil + nixfmt + camlPkgs.ocaml-lsp + camlPkgs.ocamlformat + ]; + }; - shellHook = '' - eval $(opam env) - ''; - }; + packages = { + grpc-bench = camlPkgs.buildDunePackage { + pname = "grpc-bench"; + version = "0.1.0"; + duneVersion = "3"; + buildInputs = with camlPkgs; [ + self'.packages.grpc + self'.packages.grpc-lwt + self'.packages.grpc-async + self'.packages.grpc-eio + bechamel-notty + bigstringaf + ]; + src = nix-filter.lib.filter { + root = ./.; + include = [ "dune-project" "examples" ]; + }; + }; + grpc-examples = camlPkgs.buildDunePackage { + pname = "grpc-examples"; + version = "0.1.0"; + duneVersion = "3"; + nativeBuildInputs = with camlPkgs; [ + dialo-ocaml-protoc-plugin + ppx_jane + ppx_deriving + ppx_deriving_yojson + ]; + buildInputs = with camlPkgs; [ + h2-lwt-unix + conduit-lwt-unix + core_unix + ppx_deriving_yojson + cohttp-lwt-unix + h2-eio + h2-async + tls-async + self'.packages.grpc + self'.packages.grpc-lwt + self'.packages.grpc-async + self'.packages.grpc-eio + ]; + src = nix-filter.lib.filter { + root = ./.; + include = [ "dune-project" "examples" ]; + }; + }; + grpc = camlPkgs.buildDunePackage { + pname = "grpc"; + version = "0.1.0"; + duneVersion = "3"; + nativeBuildInputs = with camlPkgs; [ mdx ]; + propagatedBuildInputs = with camlPkgs; [ ppxlib ]; + buildInputs = with camlPkgs; [ uri h2 ppx_deriving ]; + src = nix-filter.lib.filter { + root = ./.; + include = [ "dune-project" "lib/grpc" ]; + }; + }; + grpc-lwt = camlPkgs.buildDunePackage { + pname = "grpc-lwt"; + version = "0.1.0"; + duneVersion = "3"; + buildInputs = with camlPkgs; [ self'.packages.grpc lwt ]; + src = nix-filter.lib.filter { + root = ./.; + include = [ "dune-project" "lib/grpc-lwt" ]; + }; + }; + grpc-async = camlPkgs.buildDunePackage { + pname = "grpc-async"; + version = "0.1.0"; + duneVersion = "3"; + buildInputs = with camlPkgs; [ self'.packages.grpc async ]; + src = nix-filter.lib.filter { + root = ./.; + include = [ "dune-project" "lib/grpc-async" ]; + }; + }; + grpc-eio = camlPkgs.buildDunePackage { + pname = "grpc-eio"; + version = "0.1.0"; + duneVersion = "3"; + buildInputs = with camlPkgs; [ self'.packages.grpc eio ]; + src = nix-filter.lib.filter { + root = ./.; + include = [ "dune-project" "lib/grpc-eio" ]; + }; + }; + }; + packages.default = self'.packages.grpc; + }; }; } diff --git a/grpc-async.opam b/grpc-async.opam index fd4a3b4..702cac6 100644 --- a/grpc-async.opam +++ b/grpc-async.opam @@ -22,7 +22,7 @@ depends: [ "dune" {>= "3.7"} "ocaml" {>= "4.11"} "async" {>= "v0.16"} - "grpc" {= version} + "grpc-server" {= version} "h2" {>= "0.9.0"} "ppx_jane" {>= "v0.16.0"} "stringext" diff --git a/grpc-client-eio.opam b/grpc-client-eio.opam new file mode 100644 index 0000000..1ad0b77 --- /dev/null +++ b/grpc-client-eio.opam @@ -0,0 +1,39 @@ +# This file is generated by dune, edit dune-project instead +opam-version: "2.0" +synopsis: "An Eio implementation of gRPC client" +description: "Functionality for building gRPC services and rpcs with `eio`." +maintainer: ["Daniel Quernheim "] +authors: [ + "Andrew Jeffery " + "Daniel Quernheim " + "Michael Bacarella " + "Sven Anderson " + "Tim McGilchrist " + "Wojtek Czekalski " + "dimitris.mostrous " +] +license: "BSD-3-Clause" +homepage: "https://github.com/dialohq/ocaml-grpc" +doc: "https://dialohq.github.io/ocaml-grpc" +bug-reports: "https://github.com/dialohq/ocaml-grpc/issues" +depends: [ + "dune" {>= "3.7"} + "eio" {>= "0.12"} + "grpc-client" {= version} + "odoc" {with-doc} +] +build: [ + ["dune" "subst"] {dev} + [ + "dune" + "build" + "-p" + name + "-j" + jobs + "@install" + "@runtest" {with-test} + "@doc" {with-doc} + ] +] +dev-repo: "git+https://github.com/dialohq/ocaml-grpc.git" diff --git a/grpc-client.opam b/grpc-client.opam new file mode 100644 index 0000000..90a909b --- /dev/null +++ b/grpc-client.opam @@ -0,0 +1,41 @@ +# This file is generated by dune, edit dune-project instead +opam-version: "2.0" +synopsis: "Reusable logic for client side gRPC" +description: + "All modules are networking-layer and concurrency-layer agnostic." +maintainer: ["Daniel Quernheim "] +authors: [ + "Andrew Jeffery " + "Daniel Quernheim " + "Michael Bacarella " + "Sven Anderson " + "Tim McGilchrist " + "Wojtek Czekalski " + "dimitris.mostrous " +] +license: "BSD-3-Clause" +tags: ["network" "rpc" "serialisation"] +homepage: "https://github.com/dialohq/ocaml-grpc" +doc: "https://dialohq.github.io/ocaml-grpc" +bug-reports: "https://github.com/dialohq/ocaml-grpc/issues" +depends: [ + "dune" {>= "3.7"} + "ocaml" {>= "4.08"} + "grpc" {= version} + "odoc" {with-doc} +] +build: [ + ["dune" "subst"] {dev} + [ + "dune" + "build" + "-p" + name + "-j" + jobs + "@install" + "@runtest" {with-test} + "@doc" {with-doc} + ] +] +dev-repo: "git+https://github.com/dialohq/ocaml-grpc.git" diff --git a/grpc-core-eio.opam b/grpc-core-eio.opam new file mode 100644 index 0000000..9583e54 --- /dev/null +++ b/grpc-core-eio.opam @@ -0,0 +1,40 @@ +# This file is generated by dune, edit dune-project instead +opam-version: "2.0" +synopsis: "Shared logic for gRPC clients and servers based on eio." +description: "All modules are networking-layer agnostic." +maintainer: ["Daniel Quernheim "] +authors: [ + "Andrew Jeffery " + "Daniel Quernheim " + "Michael Bacarella " + "Sven Anderson " + "Tim McGilchrist " + "Wojtek Czekalski " + "dimitris.mostrous " +] +license: "BSD-3-Clause" +tags: ["network" "rpc" "serialisation"] +homepage: "https://github.com/dialohq/ocaml-grpc" +doc: "https://dialohq.github.io/ocaml-grpc" +bug-reports: "https://github.com/dialohq/ocaml-grpc/issues" +depends: [ + "dune" {>= "3.7"} + "eio" {>= "0.12"} + "grpc" {= version} + "odoc" {with-doc} +] +build: [ + ["dune" "subst"] {dev} + [ + "dune" + "build" + "-p" + name + "-j" + jobs + "@install" + "@runtest" {with-test} + "@doc" {with-doc} + ] +] +dev-repo: "git+https://github.com/dialohq/ocaml-grpc.git" diff --git a/grpc-eio-net-client-h2.opam b/grpc-eio-net-client-h2.opam new file mode 100644 index 0000000..eeb77af --- /dev/null +++ b/grpc-eio-net-client-h2.opam @@ -0,0 +1,39 @@ +# This file is generated by dune, edit dune-project instead +opam-version: "2.0" +synopsis: + "An h2 implementation of gRPC networking layer for eio based clients." +maintainer: ["Daniel Quernheim "] +authors: [ + "Andrew Jeffery " + "Daniel Quernheim " + "Michael Bacarella " + "Sven Anderson " + "Tim McGilchrist " + "Wojtek Czekalski " + "dimitris.mostrous " +] +license: "BSD-3-Clause" +homepage: "https://github.com/dialohq/ocaml-grpc" +doc: "https://dialohq.github.io/ocaml-grpc" +bug-reports: "https://github.com/dialohq/ocaml-grpc/issues" +depends: [ + "dune" {>= "3.7"} + "grpc-client-eio" {= version} + "h2" {>= "0.9.0"} + "odoc" {with-doc} +] +build: [ + ["dune" "subst"] {dev} + [ + "dune" + "build" + "-p" + name + "-j" + jobs + "@install" + "@runtest" {with-test} + "@doc" {with-doc} + ] +] +dev-repo: "git+https://github.com/dialohq/ocaml-grpc.git" diff --git a/grpc-eio-net-server-h2.opam b/grpc-eio-net-server-h2.opam new file mode 100644 index 0000000..ad29f06 --- /dev/null +++ b/grpc-eio-net-server-h2.opam @@ -0,0 +1,40 @@ +# This file is generated by dune, edit dune-project instead +opam-version: "2.0" +synopsis: + "An h2 implementation of gRPC networking layer for eio based servers." +maintainer: ["Daniel Quernheim "] +authors: [ + "Andrew Jeffery " + "Daniel Quernheim " + "Michael Bacarella " + "Sven Anderson " + "Tim McGilchrist " + "Wojtek Czekalski " + "dimitris.mostrous " +] +license: "BSD-3-Clause" +homepage: "https://github.com/dialohq/ocaml-grpc" +doc: "https://dialohq.github.io/ocaml-grpc" +bug-reports: "https://github.com/dialohq/ocaml-grpc/issues" +depends: [ + "dune" {>= "3.7"} + "grpc-server-eio" {= version} + "h2" {>= "0.9.0"} + "stringext" + "odoc" {with-doc} +] +build: [ + ["dune" "subst"] {dev} + [ + "dune" + "build" + "-p" + name + "-j" + jobs + "@install" + "@runtest" {with-test} + "@doc" {with-doc} + ] +] +dev-repo: "git+https://github.com/dialohq/ocaml-grpc.git" diff --git a/grpc-lwt.opam b/grpc-lwt.opam index e6797ef..f87d5c5 100644 --- a/grpc-lwt.opam +++ b/grpc-lwt.opam @@ -19,7 +19,7 @@ doc: "https://dialohq.github.io/ocaml-grpc" bug-reports: "https://github.com/dialohq/ocaml-grpc/issues" depends: [ "dune" {>= "3.7"} - "grpc" {= version} + "grpc-server" {= version} "h2" {>= "0.9.0"} "lwt" {>= "5.3.0"} "stringext" diff --git a/grpc-eio.opam b/grpc-server-eio.opam similarity index 92% rename from grpc-eio.opam rename to grpc-server-eio.opam index 7f00944..29bc4dc 100644 --- a/grpc-eio.opam +++ b/grpc-server-eio.opam @@ -1,6 +1,6 @@ # This file is generated by dune, edit dune-project instead opam-version: "2.0" -synopsis: "An Eio implementation of gRPC" +synopsis: "An Eio implementation of gRPC server" description: "Functionality for building gRPC services and rpcs with `eio`." maintainer: ["Daniel Quernheim "] authors: [ @@ -19,8 +19,7 @@ bug-reports: "https://github.com/dialohq/ocaml-grpc/issues" depends: [ "dune" {>= "3.7"} "eio" {>= "0.12"} - "grpc" {= version} - "h2" {>= "0.9.0"} + "grpc-server" {= version} "stringext" "odoc" {with-doc} ] diff --git a/grpc-server.opam b/grpc-server.opam new file mode 100644 index 0000000..94abdb8 --- /dev/null +++ b/grpc-server.opam @@ -0,0 +1,41 @@ +# This file is generated by dune, edit dune-project instead +opam-version: "2.0" +synopsis: "Reusable logic for server side gRPC" +description: + "All modules are networking-layer and concurrency-layer agnostic." +maintainer: ["Daniel Quernheim "] +authors: [ + "Andrew Jeffery " + "Daniel Quernheim " + "Michael Bacarella " + "Sven Anderson " + "Tim McGilchrist " + "Wojtek Czekalski " + "dimitris.mostrous " +] +license: "BSD-3-Clause" +tags: ["network" "rpc" "serialisation"] +homepage: "https://github.com/dialohq/ocaml-grpc" +doc: "https://dialohq.github.io/ocaml-grpc" +bug-reports: "https://github.com/dialohq/ocaml-grpc/issues" +depends: [ + "dune" {>= "3.7"} + "ocaml" {>= "4.08"} + "grpc" {= version} + "odoc" {with-doc} +] +build: [ + ["dune" "subst"] {dev} + [ + "dune" + "build" + "-p" + name + "-j" + jobs + "@install" + "@runtest" {with-test} + "@doc" {with-doc} + ] +] +dev-repo: "git+https://github.com/dialohq/ocaml-grpc.git" diff --git a/grpc.opam b/grpc.opam index 8355be4..1002cdb 100644 --- a/grpc.opam +++ b/grpc.opam @@ -2,7 +2,7 @@ opam-version: "2.0" synopsis: "A modular gRPC library" description: - "This library builds some of the signatures and implementations of gRPC functionality. This is used in the more specialised package `grpc-lwt` which has more machinery, however this library can also be used to do some bits yourself." + "This library contains the implementation of (de)serialization of gRPC messages and statuses." maintainer: ["Daniel Quernheim "] authors: [ "Andrew Jeffery " @@ -22,7 +22,6 @@ depends: [ "dune" {>= "3.7"} "ocaml" {>= "4.08"} "bigstringaf" {>= "0.9.1"} - "h2" {>= "0.9.0"} "ppx_deriving" "uri" {>= "4.0.0"} "odoc" {with-doc} diff --git a/lib/grpc-async/client.ml b/lib/async/client.ml similarity index 100% rename from lib/grpc-async/client.ml rename to lib/async/client.ml diff --git a/lib/grpc-async/client.mli b/lib/async/client.mli similarity index 100% rename from lib/grpc-async/client.mli rename to lib/async/client.mli diff --git a/lib/grpc-async/connection.ml b/lib/async/connection.ml similarity index 100% rename from lib/grpc-async/connection.ml rename to lib/async/connection.ml diff --git a/lib/grpc-async/dune b/lib/async/dune similarity index 100% rename from lib/grpc-async/dune rename to lib/async/dune diff --git a/lib/grpc-async/grpc_async.ml b/lib/async/grpc_async.ml similarity index 100% rename from lib/grpc-async/grpc_async.ml rename to lib/async/grpc_async.ml diff --git a/lib/grpc-async/server.ml b/lib/async/server.ml similarity index 100% rename from lib/grpc-async/server.ml rename to lib/async/server.ml diff --git a/lib/grpc-async/server.mli b/lib/async/server.mli similarity index 100% rename from lib/grpc-async/server.mli rename to lib/async/server.mli diff --git a/lib/eio/client/client.ml b/lib/eio/client/client.ml new file mode 100644 index 0000000..a25878f --- /dev/null +++ b/lib/eio/client/client.ml @@ -0,0 +1,129 @@ +type 'a writer = { write : 'a -> unit; close : unit -> unit } + +module Stream = Grpc_core_eio.Stream + +type ('response, 'conn_error) connection = { + writer : Net.writer; + recv : ('response * Grpc_core_eio.Stream.t, 'conn_error) result Eio.Promise.t; + grpc_status : Grpc.Status.t Eio.Promise.t; +} + +type ('decoding_error, 'connection_error, 'response) error = + [ `Decoding of 'decoding_error + | `Rpc of 'response * Grpc.Status.t + | `Connection of 'connection_error ] + +type ('ok, 'decoding_error, 'connection_error, 'net_response) rpc_result = + ('ok, ('decoding_error, 'connection_error, 'net_response) error) result + +let call (type headers response conn_error) ~sw + ~(net : (headers, response, conn_error) Net.t) ~service ~method_name + ~(headers : Grpc_client.request_headers) () : + ((response, conn_error) connection, conn_error) result = + let (module Net) = net in + let path = Grpc_client.make_path ~service ~method_name in + match Net.send_request ~headers path with + | Error conn_error -> Error conn_error + | Ok (writer, recv_net) -> + let status, status_notify = Eio.Promise.create () in + let recv, recv_notify = Eio.Promise.create () in + let () = + Eio.Fiber.fork ~sw (fun () -> + Eio.Promise.resolve recv_notify + (match Eio.Promise.await recv_net with + | Error conn_error -> + Eio.Promise.resolve status_notify + (Grpc.Status.v ~message:"Connection error" + Grpc.Status.Unknown); + Error conn_error + | Ok { response; body_reader; trailers } -> + Eio.Fiber.fork ~sw (fun () -> + Eio.Promise.resolve status_notify + (Grpc_client.status_of_trailers + ~get_header: + (Net.Headers.get (Eio.Promise.await trailers)))); + Ok (response, body_reader))) + in + Ok { writer; recv; grpc_status = status } + +let bidirectional_streaming (type headers response conn_error) ~sw + ~(net : (headers, response, conn_error) Net.t) ~service ~method_name ~decode + ~encode ~headers f : + ('response, 'decoding_error, conn_error, response) rpc_result = + match call ~sw ~net ~service ~method_name ~headers () with + | Ok { writer; recv; grpc_status } -> ( + match Eio.Promise.await recv with + | Ok (response, reader) -> + let (module Net) = net in + if Net.Response.is_ok response then ( + let decoding_error = ref None in + let closed = ref false in + let writer = + { + write = (fun msg -> writer.write (encode msg)); + close = + (fun () -> + writer.close (); + closed := true); + } + in + + let () = + f ~writer ~take:(fun () -> + match Eio.Stream.take reader with + | None -> None + | Some t -> ( + match decode t with + | Ok t -> Some t + | Error e -> + let () = decoding_error := Some e in + None)) + in + if not !closed then writer.close (); + match !decoding_error with + | Some error -> Error (`Decoding error) + | None -> ( + let status = Eio.Promise.await grpc_status in + match Grpc.Status.code status with + | Grpc.Status.OK -> Ok () + | _ -> Error (`Rpc (response, status)))) + else Error (`Rpc (response, Eio.Promise.await grpc_status)) + | Error e -> Error (`Connection e)) + | Error e -> Error (`Connection e) + +let server_streaming ~sw ~net ~service ~method_name ~decode ~encode ~headers + request f = + bidirectional_streaming ~sw ~net ~service ~method_name ~headers ~decode + ~encode (fun ~writer ~take -> + writer.write request; + writer.close (); + f ~read:(fun () -> take ())) + +let client_streaming (type headers response conn_error) ~sw + ~(net : (headers, response, conn_error) Net.t) ~service ~method_name ~decode + ~encode ~headers f = + match call ~sw ~net ~service ~method_name ~headers () with + | Error e -> Error (`Connection e) + | Ok { writer; recv; grpc_status } -> ( + f ~write:(fun msg -> writer.write (encode msg)); + writer.close (); + match Eio.Promise.await recv with + | Error e -> Error (`Connection e) + | Ok (response, reader) -> ( + let resp = Eio.Stream.take reader |> Option.map decode in + let grpc_status = Eio.Promise.await grpc_status in + match (Grpc.Status.code grpc_status, resp) with + | Grpc.Status.OK, Some (Ok decoded) -> Ok decoded + | Grpc.Status.OK, Some (Error e) -> Error (`Decoding e) + | Grpc.Status.OK, None -> + Error + (`Rpc + ( response, + Grpc.Status.v + ~message:"HTTP response is OK but body is empty" + Grpc.Status.Internal )) + | _ -> Error (`Rpc (response, grpc_status)))) + +let unary ~sw ~net ~service ~method_name ~decode ~encode ~headers request = + client_streaming ~sw ~net ~service ~method_name ~decode ~encode ~headers + (fun ~write -> write request) diff --git a/lib/eio/client/client.mli b/lib/eio/client/client.mli new file mode 100644 index 0000000..1a2e846 --- /dev/null +++ b/lib/eio/client/client.mli @@ -0,0 +1,69 @@ +type ('response, 'conn_error) connection = { + writer : Net.writer; + recv : ('response * Grpc_core_eio.Stream.t, 'conn_error) result Eio.Promise.t; + grpc_status : Grpc.Status.t Eio.Promise.t; +} + +type ('decoding_error, 'connection_error, 'response) error = + [ `Decoding of 'decoding_error + | `Rpc of 'response * Grpc.Status.t + | `Connection of 'connection_error ] + +type ('ok, 'decoding_error, 'connection_error, 'net_response) rpc_result = + ('ok, ('decoding_error, 'connection_error, 'net_response) error) result + +val call : + sw:Eio.Switch.t -> + net:('headers, 'response, 'connection_error) Net.t -> + service:string -> + method_name:string -> + headers:Grpc_client.request_headers -> + unit -> + (('response, 'connection_error) connection, 'connection_error) result + +val unary : + sw:Eio.Switch.t -> + net:('headers, 'net_response, 'conn_error) Net.t -> + service:string -> + method_name:string -> + decode:(string -> ('response, 'decoding_error) result) -> + encode:('request -> string) -> + headers:Grpc_client.request_headers -> + 'request -> + ('response, 'decoding_error, 'conn_error, 'net_response) rpc_result + +val client_streaming : + sw:Eio.Switch.t -> + net:('headers, 'net_response, 'conn_error) Net.t -> + service:string -> + method_name:string -> + decode:(string -> ('response, 'decoding_error) result) -> + encode:('request -> string) -> + headers:Grpc_client.request_headers -> + (write:('request -> unit) -> unit) -> + ('response, 'decoding_error, 'conn_error, 'net_response) rpc_result + +val server_streaming : + sw:Eio.Switch.t -> + net:('headers, 'net_response, 'conn_error) Net.t -> + service:string -> + method_name:string -> + decode:(string -> ('response, 'decoding_error) result) -> + encode:('request -> string) -> + headers:Grpc_client.request_headers -> + 'request -> + (read:(unit -> 'response option) -> unit) -> + (unit, 'decoding_error, 'conn_error, 'net_response) rpc_result + +type 'a writer = { write : 'a -> unit; close : unit -> unit } + +val bidirectional_streaming : + sw:Eio.Switch.t -> + net:('headers, 'net_response, 'conn_error) Net.t -> + service:string -> + method_name:string -> + decode:(string -> ('response, 'decoding_error) result) -> + encode:('request -> string) -> + headers:Grpc_client.request_headers -> + (writer:'request writer -> take:(unit -> 'response option) -> unit) -> + (unit, 'decoding_error, 'conn_error, 'net_response) rpc_result diff --git a/lib/eio/client/dune b/lib/eio/client/dune new file mode 100644 index 0000000..79e4a44 --- /dev/null +++ b/lib/eio/client/dune @@ -0,0 +1,5 @@ +(library + (name grpc_client_eio) + (public_name grpc-client-eio) + (libraries grpc grpc-core-eio eio grpc-client)) + diff --git a/lib/eio/client/net.ml b/lib/eio/client/net.ml new file mode 100644 index 0000000..c61c21a --- /dev/null +++ b/lib/eio/client/net.ml @@ -0,0 +1,38 @@ +type writer = { write : string -> unit; close : unit -> unit } + +type ('response, 'headers) reader = { + response : 'response; + body_reader : string option Eio.Stream.t; + trailers : 'headers Eio.Promise.t; +} + +module type S = sig + module Headers : sig + type t + + val get : t -> string -> string option + end + + module Response : sig + type t + + val is_ok : t -> bool + val headers : t -> Headers.t + end + + type connection_error + + val send_request : + headers:Grpc_client.request_headers -> + string -> + ( writer + * ((Response.t, Headers.t) reader, connection_error) result Eio.Promise.t, + connection_error ) + result +end + +type ('headers, 'response, 'connection_error) t = + (module S + with type Response.t = 'response + and type Headers.t = 'headers + and type connection_error = 'connection_error) diff --git a/lib/eio/core/dune b/lib/eio/core/dune new file mode 100644 index 0000000..b65bfa8 --- /dev/null +++ b/lib/eio/core/dune @@ -0,0 +1,4 @@ +(library + (name grpc_core_eio) + (public_name grpc-core-eio) + (libraries grpc eio)) diff --git a/lib/grpc-eio/seq.ml b/lib/eio/core/seq.ml similarity index 100% rename from lib/grpc-eio/seq.ml rename to lib/eio/core/seq.ml diff --git a/lib/grpc-eio/seq.mli b/lib/eio/core/seq.mli similarity index 100% rename from lib/grpc-eio/seq.mli rename to lib/eio/core/seq.mli diff --git a/lib/eio/core/stream.ml b/lib/eio/core/stream.ml new file mode 100644 index 0000000..4f2de9d --- /dev/null +++ b/lib/eio/core/stream.ml @@ -0,0 +1,26 @@ +type t = string option Eio.Stream.t + +let make + ~(schedule_read_raw : + on_eof:(unit -> unit) -> + on_read:(Bigstringaf.t -> off:int -> len:int -> unit) -> + unit) = + let buffer = Grpc.Buffer.v () in + let stream = Eio.Stream.create max_int in + let on_msg msg = Eio.Stream.add stream (Some msg) in + let on_eof () = Eio.Stream.add stream None in + let rec on_read src ~off ~len = + Grpc.Buffer.copy_from_bigstringaf ~src_off:off ~src ~dst:buffer ~length:len; + Grpc.Message.extract_all on_msg buffer; + schedule_read_raw ~on_read ~on_eof + in + schedule_read_raw ~on_read ~on_eof; + stream + +let to_seq t = + let rec seq () = + match Eio.Stream.take t with + | Some msg -> Seq.Cons (msg, seq) + | None -> Seq.Nil + in + seq diff --git a/lib/eio/core/stream.mli b/lib/eio/core/stream.mli new file mode 100644 index 0000000..a01f716 --- /dev/null +++ b/lib/eio/core/stream.mli @@ -0,0 +1,11 @@ +type t = string option Eio.Stream.t + +(* Stream of string-encoded grpc messages *) +val make : + schedule_read_raw: + (on_eof:(unit -> unit) -> + on_read:(Bigstringaf.t -> off:int -> len:int -> unit) -> + unit) -> + string option Eio.Stream.t + +val to_seq : string option Eio.Stream.t -> string Seq.t diff --git a/lib/eio/net-client-h2/dune b/lib/eio/net-client-h2/dune new file mode 100644 index 0000000..886b9cf --- /dev/null +++ b/lib/eio/net-client-h2/dune @@ -0,0 +1,4 @@ +(library + (name grpc_eio_net_client_h2) + (public_name grpc-eio-net-client-h2) + (libraries grpc-client-eio h2-eio)) diff --git a/lib/eio/net-client-h2/grpc_eio_net_client_h2.ml b/lib/eio/net-client-h2/grpc_eio_net_client_h2.ml new file mode 100644 index 0000000..12793e9 --- /dev/null +++ b/lib/eio/net-client-h2/grpc_eio_net_client_h2.ml @@ -0,0 +1,138 @@ +module type Client = sig + val connection : H2_eio.Client.t Eio.Promise.t + val host : string + val scheme : string +end + +exception Network_error_todo_remove of H2.Client_connection.error + +module Response = struct + type t = H2.Response.t + + let is_ok response = response.H2.Response.status = `OK + let headers response = response.H2.Response.headers +end + +module Headers = struct + type t = H2.Headers.t + + let get headers key = H2.Headers.get headers key +end + +type connection_error = H2.Client_connection.error + +module Make_net (Client : Client) : + Grpc_client_eio.Net.S + with type Response.t = H2.Response.t + and type Headers.t = H2.Headers.t + and type connection_error = connection_error = struct + module Response = Response + module Headers = Headers + + type nonrec connection_error = connection_error + + let send_request ~(headers : Grpc_client.request_headers) target = + (* We are flushing headers immediately but potentially for the + unary and server streaming cases we shouldn't do it + *) + let reader_a, reader_u = Eio.Promise.create () in + let trailers_a, trailers_u = Eio.Promise.create () in + let trailers_handler trailers = Eio.Promise.resolve trailers_u trailers in + let response_handler response body = + let body_reader = + Grpc_core_eio.Stream.make + ~schedule_read_raw:(fun ~on_eof:on_eof_stream ~on_read -> + let on_eof () = + if not (Eio.Promise.is_resolved trailers_a) then + Eio.Promise.resolve trailers_u response.H2.Response.headers; + on_eof_stream () + in + H2.Body.Reader.schedule_read body ~on_eof ~on_read) + in + Eio.Promise.resolve_ok reader_u + { Grpc_client_eio.Net.response; body_reader; trailers = trailers_a } + in + let error_handler error = + if Eio.Promise.is_resolved reader_a then + raise (Network_error_todo_remove error) + else Eio.Promise.resolve_error reader_u error + in + let request = + H2.Request.create ~scheme:Client.scheme `POST target + ~headers: + (H2.Headers.of_list + [ + (":authority", Client.host); + ("te", headers.te); + ("content-type", headers.content_type); + ]) + in + let body_writer = + H2_eio.Client.request ~flush_headers_immediately:true + (Eio.Promise.await Client.connection) + ~trailers_handler ~response_handler ~error_handler request + in + let writer = + { + Grpc_client_eio.Net.write = + (fun input -> + H2.Body.Writer.write_string body_writer (Grpc.Message.make input)); + close = (fun () -> H2.Body.Writer.close body_writer); + } + in + Ok (writer, reader_a) +end + +module Expert = struct + (* TODO: Connection management *) + let create_client ~(net : Eio_unix.Net.t) ~sw ~scheme ~host ~port : + (Headers.t, Response.t, connection_error) Grpc_client_eio.Net.t = + let inet, port = + Eio_unix.run_in_systhread (fun () -> + Unix.getaddrinfo host (string_of_int port) + [ Unix.(AI_FAMILY PF_INET) ]) + |> List.filter_map (fun (addr : Unix.addr_info) -> + match addr.ai_addr with + | Unix.ADDR_UNIX _ -> None + | ADDR_INET (addr, port) -> Some (addr, port)) + |> List.hd + in + let addr = `Tcp (Eio_unix.Net.Ipaddr.of_unix inet, port) in + let socket = Eio.Net.connect ~sw net addr in + let connection, connection_resolve = Eio.Promise.create () in + Eio.Fiber.fork_daemon ~sw (fun () -> + Eio.Switch.run (fun sw' -> + let conn = + H2_eio.Client.create_connection ~sw:sw' ~error_handler:ignore + socket + in + Eio.Switch.on_release sw' (fun () -> + Eio.Promise.await (H2_eio.Client.shutdown conn)); + (* For now we're ignoring the errors, we should probably inject them into grpc handlers to let them handle it *) + Eio.Promise.resolve connection_resolve conn); + `Stop_daemon); + (module Make_net (struct + let connection = connection + let host = host + let scheme = scheme + end)) +end + +let create_client ~net ~sw addr = + let uri = Uri.of_string addr in + let scheme = Uri.scheme uri |> Option.value ~default:"http" in + let host = + match Uri.host uri with + | None -> invalid_arg "No host in uri" + | Some host -> host + in + let port = + Uri.port uri + |> Option.value + ~default: + (match scheme with + | "http" -> 80 + | "https" -> 443 + | _ -> failwith "Don't know default port for this scheme") + in + Expert.create_client ~net ~sw ~scheme ~host ~port diff --git a/lib/eio/net-client-h2/grpc_eio_net_client_h2.mli b/lib/eio/net-client-h2/grpc_eio_net_client_h2.mli new file mode 100644 index 0000000..63fa3bc --- /dev/null +++ b/lib/eio/net-client-h2/grpc_eio_net_client_h2.mli @@ -0,0 +1,19 @@ +exception Network_error_todo_remove of H2.Client_connection.error + +type connection_error = H2.Client_connection.error + +val create_client : + net:Eio_unix.Net.t -> + sw:Eio.Switch.t -> + string -> + (H2.Headers.t, H2.Response.t, connection_error) Grpc_client_eio.Net.t + +module Expert : sig + val create_client : + net:Eio_unix.Net.t -> + sw:Eio.Switch.t -> + scheme:string -> + host:string -> + port:int -> + (H2.Headers.t, H2.Response.t, connection_error) Grpc_client_eio.Net.t +end diff --git a/lib/eio/net-server-h2/dune b/lib/eio/net-server-h2/dune new file mode 100644 index 0000000..07684df --- /dev/null +++ b/lib/eio/net-server-h2/dune @@ -0,0 +1,4 @@ +(library + (name grpc_eio_net_server_h2) + (public_name grpc-eio-net-server-h2) + (libraries grpc-server-eio h2-eio)) diff --git a/lib/eio/net-server-h2/grpc_eio_net_server_h2.ml b/lib/eio/net-server-h2/grpc_eio_net_server_h2.ml new file mode 100644 index 0000000..1db0dc3 --- /dev/null +++ b/lib/eio/net-server-h2/grpc_eio_net_server_h2.ml @@ -0,0 +1,91 @@ +module Net = struct + module Request = struct + type t = Eio.Net.Sockaddr.stream * H2.Reqd.t * H2.Request.t + + let is_post (_, _, req) = + match req with { H2.Request.meth = `POST; _ } -> true | _ -> false + + let target (_, _, req) = req.H2.Request.target + let get_header (_, _, req) name = H2.Headers.get req.H2.Request.headers name + + let read_body (_, reqd, _) = + let body = H2.Reqd.request_body reqd in + Grpc_core_eio.Stream.make + ~schedule_read_raw:(H2.Body.Reader.schedule_read body) + end + + let write_trailers reqd (trailers : Grpc_server.trailers) = + try + H2.Reqd.schedule_trailers reqd + (H2.Headers.of_list + (("grpc-status", string_of_int trailers.grpc_status) + :: + (match trailers.grpc_message with + | None -> trailers.extra + | Some msg -> ("grpc-message", msg) :: trailers.extra))) + with + | ((Failure "h2.Reqd.schedule_trailers: stream already closed") + [@warning "-52"] (* https://github.com/anmonteiro/ocaml-h2/issues/175 *)) + -> + () + + let respond_streaming ~headers (_, reqd, _) = + let body_writer = + H2.Reqd.respond_with_streaming ~flush_headers_immediately:true reqd + (H2.Response.create + ~headers: + (H2.Headers.of_list + (("content-type", headers.Grpc_server.content_type) + :: headers.extra)) + `OK) + in + let close () = H2.Body.Writer.close body_writer in + let on_msg input = + + H2.Body.Writer.write_string body_writer input + in + let write_trailers = write_trailers reqd in + { Grpc_server_eio.Net.close; on_msg; write_trailers } + + let respond_error (_, reqd, _) (error : Grpc_server.error) = + let respond_with code = + H2.Reqd.respond_with_string reqd (H2.Response.create code) "" + in + match error with + | `Not_found _ -> respond_with `Not_found + | `Unsupported_media_type -> respond_with `Unsupported_media_type + | `Not_acceptable -> respond_with `Not_acceptable + | `Bad_request -> respond_with `Bad_request +end + +include Net + +let net = + (module Net : Grpc_server_eio.Net.S with type Request.t = Net.Request.t) + +let connection_handler ~sw ?config ?error_handler server : + 'a Eio.Net.connection_handler = + fun socket addr -> + let error_handler client_address ?request error respond = + (* Report internal error via headers *) + let () = + match error_handler with + | Some f -> f client_address ?request error + | None -> () + in + let writer = + respond + (H2.Headers.of_list + [ + ( "grpc-status", + string_of_int (Grpc.Status.int_of_code Grpc.Status.Internal) ); + ]) + in + H2.Body.Writer.close writer + in + H2_eio.Server.create_connection_handler ?config + ~request_handler:(fun client_addr reqd -> + Eio.Fiber.fork ~sw (fun () -> + Grpc_server_eio.handle_request ~net server + (client_addr, reqd, H2.Reqd.request reqd))) + ~error_handler addr socket ~sw diff --git a/lib/eio/net-server-h2/grpc_eio_net_server_h2.mli b/lib/eio/net-server-h2/grpc_eio_net_server_h2.mli new file mode 100644 index 0000000..a5e1bc4 --- /dev/null +++ b/lib/eio/net-server-h2/grpc_eio_net_server_h2.mli @@ -0,0 +1,14 @@ +include + Grpc_server_eio.Net.S + with type Request.t = Eio.Net.Sockaddr.stream * H2.Reqd.t * H2.Request.t + +val connection_handler : + sw:Eio.Switch.t -> + ?config:H2.Config.t -> + ?error_handler: + (Eio.Net.Sockaddr.stream -> + ?request:H2.Request.t -> + H2.Server_connection.error -> + unit) -> + Request.t Grpc_server_eio.Rpc.handler Grpc_server.t -> + 'a Eio.Net.connection_handler diff --git a/lib/eio/server/dune b/lib/eio/server/dune new file mode 100644 index 0000000..009ca1c --- /dev/null +++ b/lib/eio/server/dune @@ -0,0 +1,8 @@ +(library + (name grpc_server_eio) + (public_name grpc-server-eio) + (libraries grpc grpc-core-eio eio grpc-server)) + +(deprecated_library_name + (old_public_name grpc-eio) + (new_public_name grpc-server-eio)) diff --git a/lib/eio/server/grpc_server_eio.ml b/lib/eio/server/grpc_server_eio.ml new file mode 100644 index 0000000..bba63d3 --- /dev/null +++ b/lib/eio/server/grpc_server_eio.ml @@ -0,0 +1,72 @@ +module Net = Net + +module Rpc = struct + type stream = Grpc_core_eio.Stream.t + type unary = string -> Grpc_server.trailers * string option + type client_streaming = stream -> Grpc_server.trailers * string option + type server_streaming = string -> (string -> unit) -> Grpc_server.trailers + + type bidirectional_streaming = + stream -> (string -> unit) -> Grpc_server.trailers + + type rpc_impl = stream -> (string -> unit) -> Grpc_server.trailers + + type 'request handler = { + headers : 'request -> Grpc_server.headers; + f : stream -> (string -> unit) -> Grpc_server.trailers; + } + + module Stream = Grpc_core_eio.Stream + + let unary (unary_handler : unary) = + fun request_stream respond -> + match Eio.Stream.take request_stream with + | Some request -> + let status, response = unary_handler request in + Option.iter respond response; + status + | None -> Grpc_server.make_trailers Grpc.Status.(v OK) + + let client_streaming (client_streaming_handler : client_streaming) = + fun request_stream respond -> + let status, response = client_streaming_handler request_stream in + Option.iter respond response; + status + + let server_streaming (server_streaming_handler : server_streaming) = + fun requests respond -> + match Eio.Stream.take requests with + | Some request -> server_streaming_handler request respond + | None -> Grpc_server.make_trailers Grpc.Status.(v OK) +end + +module Service = Grpc_server.Service +module G = Grpc_server + +type 'request t = 'request Rpc.handler Grpc_server.t + +let make = G.v +let add_service = G.add_service + +type 'request net = (module Net.S with type Request.t = 'request) + +let handle_request (type request) ~(net : request net) server request = + let module Net' = (val net) in + match + G.handle_request server + ~is_post_request:(Net'.Request.is_post request) + ~get_header:(fun header -> Net'.Request.get_header request header) + ~path:(Net'.Request.target request) + with + | Ok handler -> + let request_stream = Net'.Request.read_body request in + let { Net.on_msg; write_trailers; close } = + let headers = handler.Rpc.headers request in + Net'.respond_streaming ~headers request + in + let trailers = + handler.f request_stream (fun input -> on_msg (Grpc.Message.make input)) + in + write_trailers trailers; + close () + | Error e -> Net'.respond_error request e diff --git a/lib/eio/server/grpc_server_eio.mli b/lib/eio/server/grpc_server_eio.mli new file mode 100644 index 0000000..3666eb1 --- /dev/null +++ b/lib/eio/server/grpc_server_eio.mli @@ -0,0 +1,35 @@ +module Net = Net + +module Rpc : sig + type stream = Grpc_core_eio.Stream.t + + type 'request handler = { + headers : 'request -> Grpc_server.headers; + f : stream -> (string -> unit) -> Grpc_server.trailers; + } + + type rpc_impl = stream -> (string -> unit) -> Grpc_server.trailers + (** [handler] represents the most general signature of a gRPC handler. *) + + type unary = string -> Grpc_server.trailers * string option + type client_streaming = stream -> Grpc_server.trailers * string option + type server_streaming = string -> (string -> unit) -> Grpc_server.trailers + type bidirectional_streaming = rpc_impl + + val unary : unary -> rpc_impl + val client_streaming : client_streaming -> rpc_impl + val server_streaming : server_streaming -> rpc_impl +end + +module Service = Grpc_server.Service + +type 'request t = 'request Rpc.handler Grpc_server.t + +val make : unit -> 'a t + +val add_service : + name:string -> service:'a Rpc.handler Service.t -> 'a t -> 'a t + +(* TODO: add context *) +val handle_request : + net:'request Net.t -> 'request Rpc.handler Grpc_server.t -> 'request -> unit diff --git a/lib/eio/server/net.ml b/lib/eio/server/net.ml new file mode 100644 index 0000000..c4cb900 --- /dev/null +++ b/lib/eio/server/net.ml @@ -0,0 +1,23 @@ +type streaming_writer = { + on_msg : string -> unit; + close : unit -> unit; + write_trailers : Grpc_server.trailers -> unit; +} + +module type S = sig + module Request : sig + type t + + val read_body : t -> string option Eio.Stream.t + val is_post : t -> bool + val target : t -> string + val get_header : t -> string -> string option + end + + val respond_streaming : + headers:Grpc_server.headers -> Request.t -> streaming_writer + + val respond_error : Request.t -> Grpc_server.error -> unit +end + +type 'request t = (module S with type Request.t = 'request) diff --git a/lib/grpc-eio/readme.md b/lib/eio/server/readme.md similarity index 100% rename from lib/grpc-eio/readme.md rename to lib/eio/server/readme.md diff --git a/lib/grpc-client/dune b/lib/grpc-client/dune new file mode 100644 index 0000000..119484d --- /dev/null +++ b/lib/grpc-client/dune @@ -0,0 +1,4 @@ +(library + (name grpc_client) + (public_name grpc-client) + (libraries grpc)) diff --git a/lib/grpc-client/grpc_client.ml b/lib/grpc-client/grpc_client.ml new file mode 100644 index 0000000..bc14dff --- /dev/null +++ b/lib/grpc-client/grpc_client.ml @@ -0,0 +1,31 @@ +type request_headers = { content_type : string; te : string } + +let make_request_headers ?(te = []) format = + { + content_type = Grpc.Message.format_to_content_type format; + te = + (match te with + | [] -> "trailers" + | te -> Printf.sprintf "trailers; %s" (String.concat "; " te)); + } + +let make_path ~service ~method_name = + Printf.sprintf "/%s/%s" service method_name + +let status_of_trailers ~get_header = + match get_header "grpc-status" with + | None -> + Grpc.Status.v ~message:"Server did not return grpc-status" + Grpc.Status.Unknown + | Some s -> ( + match Option.bind (int_of_string_opt s) Grpc.Status.code_of_int with + | None -> + Grpc.Status.v + ~message: + (Printf.sprintf "Server returned an invalid grpc-status %s" s) + Grpc.Status.Unknown + | Some status -> Grpc.Status.v ?message:(get_header "grpc-message") status + ) + +let trailers_missing_status = + Grpc.Status.v ~message:"Trailers missing" Grpc.Status.Unknown diff --git a/lib/grpc-client/grpc_client.mli b/lib/grpc-client/grpc_client.mli new file mode 100644 index 0000000..93b2deb --- /dev/null +++ b/lib/grpc-client/grpc_client.mli @@ -0,0 +1,8 @@ +type request_headers = { content_type : string; te : string } + +val make_request_headers : + ?te:string list -> Grpc.Message.format -> request_headers + +val make_path : service:string -> method_name:string -> string +val status_of_trailers : get_header:(string -> string option) -> Grpc.Status.t +val trailers_missing_status : Grpc.Status.t diff --git a/lib/grpc-eio/client.ml b/lib/grpc-eio/client.ml deleted file mode 100644 index 3ab1164..0000000 --- a/lib/grpc-eio/client.ml +++ /dev/null @@ -1,94 +0,0 @@ -type response_handler = H2.Client_connection.response_handler - -type do_request = - ?flush_headers_immediately:bool -> - ?trailers_handler:(H2.Headers.t -> unit) -> - H2.Request.t -> - response_handler:response_handler -> - H2.Body.Writer.t - -let make_request ~scheme ~service ~rpc ~headers = - H2.Request.create ~scheme `POST ("/" ^ service ^ "/" ^ rpc) ~headers - -let default_headers = - H2.Headers.of_list - [ ("te", "trailers"); ("content-type", "application/grpc+proto") ] - -let make_trailers_handler () = - let status, status_notify = Eio.Promise.create () in - let trailers_handler headers = - let code = - match H2.Headers.get headers "grpc-status" with - | None -> None - | Some s -> Option.bind (int_of_string_opt s) Grpc.Status.code_of_int - in - match (code, Eio.Promise.is_resolved status) with - | Some code, false -> - let message = H2.Headers.get headers "grpc-message" in - let status = Grpc.Status.v ?message code in - Eio.Promise.resolve status_notify status - | Some _, true (* This should never happen, but just in case. *) | _ -> () - in - (status, trailers_handler) - -let get_response_and_bodies request = - let response, response_notify = Eio.Promise.create () in - let read_body, read_body_notify = Eio.Promise.create () in - let response_handler response body = - Eio.Promise.resolve response_notify response; - Eio.Promise.resolve read_body_notify body - in - let write_body = request ~response_handler in - let response = Eio.Promise.await response in - let read_body = Eio.Promise.await read_body in - (response, read_body, write_body) - -let call ~service ~rpc ?(scheme = "https") ~handler ~(do_request : do_request) - ?(headers = default_headers) () = - let request = make_request ~service ~rpc ~scheme ~headers in - let status, trailers_handler = make_trailers_handler () in - let response, read_body, write_body = - get_response_and_bodies - (do_request ~flush_headers_immediately:true request ~trailers_handler) - in - match response.status with - | `OK -> - trailers_handler response.headers; - let result = handler write_body read_body in - let status = - match Eio.Promise.is_resolved status with - (* In case no grpc-status appears in headers or trailers. *) - | true -> Eio.Promise.await status - | false -> - Grpc.Status.v ~message:"Server did not return grpc-status" - Grpc.Status.Unknown - in - Ok (result, status) - | error_status -> Error error_status - -module Rpc = struct - type 'a handler = H2.Body.Writer.t -> H2.Body.Reader.t -> 'a - - let bidirectional_streaming ~f write_body read_body = - let send, close = Connection.grpc_send_streaming_client write_body in - let response_reader = Connection.grpc_recv_streaming read_body in - f ~send ~close response_reader - - let client_streaming ~f = - bidirectional_streaming ~f:(fun ~send ~close responses -> - let response = Stream.take responses in - f ~send ~close response) - - let server_streaming ~f request = - bidirectional_streaming ~f:(fun ~send ~close responses -> - send request; - close (); - f responses) - - let unary ~f request = - bidirectional_streaming ~f:(fun ~send ~close responses -> - send request; - close (); - let response = Stream.take responses |> Eio.Promise.await in - f response) -end diff --git a/lib/grpc-eio/client.mli b/lib/grpc-eio/client.mli deleted file mode 100644 index 46b9cf5..0000000 --- a/lib/grpc-eio/client.mli +++ /dev/null @@ -1,54 +0,0 @@ -module Rpc : sig - type 'a handler = H2.Body.Writer.t -> H2.Body.Reader.t -> 'a - - val bidirectional_streaming : - f:(send:(string -> unit) -> close:(unit -> unit) -> Stream.t -> 'a) -> - 'a handler - (** [bidirectional_streaming ~f write read] sets up the sending and receiving - logic using [write] and [read], then calls [f] with a push function for - requests and a stream of responses. *) - - val client_streaming : - f: - (send:(string -> unit) -> - close:(unit -> unit) -> - string option Eio.Promise.t -> - 'a) -> - 'a handler - (** [client_streaming ~f write read] sets up the sending and receiving - logic using [write] and [read], then calls [f] with a push function for - requests and promise for the response. *) - - val server_streaming : f:(Stream.t -> 'a) -> string -> 'a handler - (** [server_streaming ~f enc write read] sets up the sending and receiving - logic using [write] and [read], then sends [enc] and calls [f] with a - stream of responses. *) - - val unary : f:(string option -> 'a) -> string -> 'a handler - (** [unary ~f enc write read] sets up the sending and receiving - logic using [write] and [read], then sends [enc] and calls [f] with a - promise for the response. *) -end - -type response_handler = H2.Client_connection.response_handler - -type do_request = - ?flush_headers_immediately:bool -> - ?trailers_handler:(H2.Headers.t -> unit) -> - H2.Request.t -> - response_handler:response_handler -> - H2.Body.Writer.t -(** [do_request] is the type of a function that performs the request *) - -val call : - service:string -> - rpc:string -> - ?scheme:string -> - handler:'a Rpc.handler -> - do_request:do_request -> - ?headers:H2.Headers.t -> - unit -> - ('a * Grpc.Status.t, H2.Status.t) result -(** [call ~service ~rpc ~handler ~do_request ()] calls the rpc endpoint given - by [service] and [rpc] using the [do_request] function. The [handler] is - called when this request is set up to send and receive data. *) diff --git a/lib/grpc-eio/connection.ml b/lib/grpc-eio/connection.ml deleted file mode 100644 index 5d26f85..0000000 --- a/lib/grpc-eio/connection.ml +++ /dev/null @@ -1,43 +0,0 @@ -let grpc_recv_streaming body = Stream.(of_h2_body body) - -let grpc_send_streaming_client body = - let send input = - let payload = Grpc.Message.make input in - H2.Body.Writer.write_string body payload - in - let close () = H2.Body.Writer.close body in - (send, close) - -let grpc_send_streaming request = - let body = - H2.Reqd.respond_with_streaming ~flush_headers_immediately:true request - (H2.Response.create - ~headers: - (H2.Headers.of_list [ ("content-type", "application/grpc+proto") ]) - `OK) - in - let on_msg input = - let payload = Grpc.Message.make input in - H2.Body.Writer.write_string body payload - in - let on_eof status = - (try - H2.Reqd.schedule_trailers request - (H2.Headers.of_list - ([ - ( "grpc-status", - string_of_int - (Grpc.Status.int_of_code (Grpc.Status.code status)) ); - ] - @ - match Grpc.Status.message status with - | None -> [] - | Some message -> [ ("grpc-message", message) ])) - with - | ((Failure "h2.Reqd.schedule_trailers: stream already closed") - [@warning "-52"] (* https://github.com/anmonteiro/ocaml-h2/issues/175 *)) - -> - ()); - H2.Body.Writer.close body - in - (on_msg, on_eof) diff --git a/lib/grpc-eio/dune b/lib/grpc-eio/dune deleted file mode 100644 index 39ce5ea..0000000 --- a/lib/grpc-eio/dune +++ /dev/null @@ -1,4 +0,0 @@ -(library - (name grpc_eio) - (public_name grpc-eio) - (libraries grpc h2 eio)) diff --git a/lib/grpc-eio/grpc_eio.ml b/lib/grpc-eio/grpc_eio.ml deleted file mode 100644 index 7d2273e..0000000 --- a/lib/grpc-eio/grpc_eio.ml +++ /dev/null @@ -1,3 +0,0 @@ -module Server = Server -module Client = Client -module Stream = Stream diff --git a/lib/grpc-eio/server.ml b/lib/grpc-eio/server.ml deleted file mode 100644 index cda712a..0000000 --- a/lib/grpc-eio/server.ml +++ /dev/null @@ -1,114 +0,0 @@ -module ServiceMap = Map.Make (String) - -type service = H2.Reqd.t -> unit -type t = service ServiceMap.t - -let v () = ServiceMap.empty -let add_service ~name ~service t = ServiceMap.add name service t - -let handle_request t reqd = - let request = H2.Reqd.request reqd in - let respond_with code = - H2.Reqd.respond_with_string reqd (H2.Response.create code) "" - in - let route () = - let parts = String.split_on_char '/' request.target in - if List.length parts > 1 then - (* allow for arbitrary prefixes *) - let service_name = List.nth parts (List.length parts - 2) in - let service = ServiceMap.find_opt service_name t in - match service with - | Some service -> service reqd - | None -> respond_with `Not_found - else respond_with `Not_found - in - match request.meth with - | `POST -> ( - match H2.Headers.get request.headers "content-type" with - | Some s -> - if - Stringext.chop_prefix s ~prefix:"application/grpc" |> Option.is_some - then - match H2.Headers.get request.headers "grpc-encoding" with - | None | Some "identity" -> ( - match H2.Headers.get request.headers "grpc-accept-encoding" with - | None -> route () - | Some encodings -> - let encodings = String.split_on_char ',' encodings in - if List.mem "identity" encodings then route () - else respond_with `Not_acceptable) - | Some _ -> - (* TODO: not sure if there is a specific way to handle this in grpc *) - respond_with `Bad_request - else respond_with `Unsupported_media_type - | None -> respond_with `Unsupported_media_type) - | _ -> respond_with `Not_found - -module Rpc = struct - type unary = string -> Grpc.Status.t * string option - type client_streaming = Stream.t -> Grpc.Status.t * string option - type server_streaming = string -> (string -> unit) -> Grpc.Status.t - type bidirectional_streaming = Stream.t -> (string -> unit) -> Grpc.Status.t - - type t = - | Unary of unary - | Client_streaming of client_streaming - | Server_streaming of server_streaming - | Bidirectional_streaming of bidirectional_streaming - - let bidirectional_streaming ~f reqd = - let body = H2.Reqd.request_body reqd in - let request_stream = Connection.grpc_recv_streaming body in - let on_msg, on_eof = Connection.grpc_send_streaming reqd in - let status = f request_stream on_msg in - on_eof status - - let client_streaming ~f reqd = - bidirectional_streaming reqd ~f:(fun requests respond -> - let status, response = f requests in - Option.iter respond response; - status) - - let server_streaming ~f reqd = - bidirectional_streaming reqd ~f:(fun requests respond -> - match Stream.take requests |> Eio.Promise.await with - | Some request -> f request respond - | None -> Grpc.Status.(v OK)) - - let unary ~f reqd = - bidirectional_streaming reqd ~f:(fun requests respond -> - match Stream.take requests |> Eio.Promise.await with - | Some request -> - let status, response = f request in - Option.iter respond response; - status - | None -> Grpc.Status.(v OK)) -end - -module Service = struct - module RpcMap = Map.Make (String) - - type t = Rpc.t RpcMap.t - - let v () = RpcMap.empty - let add_rpc ~name ~rpc t = RpcMap.add name rpc t - - let handle_request (t : t) reqd = - let request = H2.Reqd.request reqd in - let respond_with code = - H2.Reqd.respond_with_string reqd (H2.Response.create code) "" - in - let parts = String.split_on_char '/' request.target in - if List.length parts > 1 then - let rpc_name = List.nth parts (List.length parts - 1) in - let rpc = RpcMap.find_opt rpc_name t in - match rpc with - | Some rpc -> ( - match rpc with - | Unary f -> Rpc.unary ~f reqd - | Client_streaming f -> Rpc.client_streaming ~f reqd - | Server_streaming f -> Rpc.server_streaming ~f reqd - | Bidirectional_streaming f -> Rpc.bidirectional_streaming ~f reqd) - | None -> respond_with `Not_found - else respond_with `Not_found -end diff --git a/lib/grpc-eio/server.mli b/lib/grpc-eio/server.mli deleted file mode 100644 index b0571d6..0000000 --- a/lib/grpc-eio/server.mli +++ /dev/null @@ -1,49 +0,0 @@ -include Grpc.Server.S - -module Rpc : sig - type unary = string -> Grpc.Status.t * string option - (** [unary] is the type for a unary grpc rpc, one request, one response. *) - - type client_streaming = Stream.t -> Grpc.Status.t * string option - (** [client_streaming] is the type for an rpc where the client streams the requests and the server responds once. *) - - type server_streaming = string -> (string -> unit) -> Grpc.Status.t - (** [server_streaming] is the type for an rpc where the client sends one request and the server sends multiple responses. *) - - type bidirectional_streaming = Stream.t -> (string -> unit) -> Grpc.Status.t - (** [bidirectional_streaming] is the type for an rpc where both the client and server can send multiple messages. *) - - type t = - | Unary of unary - | Client_streaming of client_streaming - | Server_streaming of server_streaming - | Bidirectional_streaming of bidirectional_streaming - - (** [t] represents the types of rpcs available in gRPC. *) - - val unary : f:unary -> H2.Reqd.t -> unit - (** [unary ~f reqd] calls [f] with the request obtained from [reqd] and handles sending the response. *) - - val client_streaming : f:client_streaming -> H2.Reqd.t -> unit - (** [client_streaming ~f reqd] calls [f] with a stream to pull requests from and handles sending the response. *) - - val server_streaming : f:server_streaming -> H2.Reqd.t -> unit - (** [server_streaming ~f reqd] calls [f] with the request optained from [reqd] and handles sending the responses pushed out. *) - - val bidirectional_streaming : f:bidirectional_streaming -> H2.Reqd.t -> unit - (** [bidirectional_streaming ~f reqd] calls [f] with a stream to pull requests from and andles sending the responses pushed out. *) -end - -module Service : sig - type t - (** [t] represents a gRPC service with potentially multiple rpcs and the information needed to route to them. *) - - val v : unit -> t - (** [v ()] creates a new service *) - - val add_rpc : name:string -> rpc:Rpc.t -> t -> t - (** [add_rpc ~name ~rpc t] adds [rpc] to [t] and ensures that [t] can route to it with [name]. *) - - val handle_request : t -> H2.Reqd.t -> unit - (** [handle_request t reqd] handles routing [reqd] to the correct rpc if available in [t]. *) -end diff --git a/lib/grpc-eio/stream.ml b/lib/grpc-eio/stream.ml deleted file mode 100644 index 92ad8c4..0000000 --- a/lib/grpc-eio/stream.ml +++ /dev/null @@ -1,43 +0,0 @@ -type t = { body : H2.Body.Reader.t; buffer : Grpc.Buffer.t } - -let of_h2_body body = - let buffer = Grpc.Buffer.v () in - { body; buffer } - -let read_loop ~on_msg ~on_eof { body; buffer } = - let rec on_read src ~off ~len = - Grpc.Buffer.copy_from_bigstringaf ~src_off:off ~src ~dst:buffer ~length:len; - Grpc.Message.extract_all on_msg buffer; - H2.Body.Reader.schedule_read body ~on_read ~on_eof - in - H2.Body.Reader.schedule_read body ~on_read ~on_eof - -let read_once ~on_msg ~on_eof { body; buffer } = - let rec read () = - match Grpc.Message.extract buffer with - | Some message -> on_msg message - | None -> - let on_read src ~off ~len = - Grpc.Buffer.copy_from_bigstringaf ~src_off:off ~src ~dst:buffer - ~length:len; - read () - in - H2.Body.Reader.schedule_read body ~on_read ~on_eof - in - read () - -let schedule_read = read_once - -let take stream = - let promise, resolver = Eio.Promise.create () in - let on_msg msg = Eio.Promise.resolve resolver (Some msg) in - let on_eof () = Eio.Promise.resolve resolver None in - read_once ~on_msg ~on_eof stream; - promise - -let to_seq stream = - let reader, writer = Seq.create_reader_writer () in - let on_eof () = Seq.close_writer writer in - let on_msg msg = Seq.write writer msg in - read_loop ~on_msg ~on_eof stream; - reader diff --git a/lib/grpc-eio/stream.mli b/lib/grpc-eio/stream.mli deleted file mode 100644 index 5fe366a..0000000 --- a/lib/grpc-eio/stream.mli +++ /dev/null @@ -1,10 +0,0 @@ -type t - -val of_h2_body : H2.Body.Reader.t -> t - -val schedule_read : - on_msg:(string -> unit) -> on_eof:(unit -> unit) -> t -> unit - -val read_loop : on_msg:(string -> unit) -> on_eof:(unit -> unit) -> t -> unit -val take : t -> string option Eio.Promise.t -val to_seq : t -> string Seq.t diff --git a/lib/grpc-server/dune b/lib/grpc-server/dune new file mode 100644 index 0000000..ab421c0 --- /dev/null +++ b/lib/grpc-server/dune @@ -0,0 +1,4 @@ +(library + (name grpc_server) + (public_name grpc-server) + (libraries grpc)) diff --git a/lib/grpc-server/grpc_server.ml b/lib/grpc-server/grpc_server.ml new file mode 100644 index 0000000..2bcda43 --- /dev/null +++ b/lib/grpc-server/grpc_server.ml @@ -0,0 +1,117 @@ +module StringMap = Map.Make (String) + +module Service = struct + module RpcMap = StringMap + + type 'handler t = 'handler RpcMap.t + + let v () = RpcMap.empty + let add_rpc ~name ~rpc t = RpcMap.add name rpc t +end + +module ServiceMap = StringMap + +type error = + [ `Not_found of + [ `Service_not_found + | `Rpc_not_found of string + | `Invalid_url + | `Bad_method ] + | `Unsupported_media_type + | `Not_acceptable + | `Bad_request ] + +type 'handler rpc_handler = string -> ('handler, error) result +type 'handler t = 'handler rpc_handler ServiceMap.t + +let v () = ServiceMap.empty + +module Expert = struct + type nonrec 'a rpc_handler = 'a rpc_handler + + let add_service ~name ~service (t : 'a t) = ServiceMap.add name service t +end + +let add_service ~name ~service t = + Expert.add_service ~name + ~service:(fun rpc_name -> + match StringMap.find_opt rpc_name service with + | Some rpc -> Ok rpc + | None -> Error (`Not_found (`Rpc_not_found rpc_name))) + t + +let rec service_name_and_method = function + | [] -> None + | [ _ ] -> None + | [ service_name; method_name ] -> Some (service_name, method_name) + | _ :: tl -> service_name_and_method tl + +let handle_request (t : 'handler t) ~is_post_request ~get_header ~path : + ('handler, error) result = + let route () = + let parts = String.split_on_char '/' path in + match service_name_and_method parts with + | Some (service, rpc) -> ( + match ServiceMap.find_opt service t with + | Some service -> service rpc + | None -> Error (`Not_found `Service_not_found)) + | None -> Error (`Not_found `Invalid_url) + in + match is_post_request with + | true -> ( + match get_header "content-type" with + | Some s -> + if + Stringext.chop_prefix s ~prefix:"application/grpc" |> Option.is_some + then + match get_header "grpc-encoding" with + | None | Some "identity" -> ( + match get_header "grpc-accept-encoding" with + | None -> route () + | Some encodings -> + let encodings = String.split_on_char ',' encodings in + if List.mem "identity" encodings then route () + else + (* TODO: respond with unimplemented *) + Error `Not_acceptable) + | Some _ -> + (* TODO: not sure if there is a specific way to handle this in grpc *) + Error `Bad_request + else Error `Unsupported_media_type + | None -> Error `Unsupported_media_type) + | _ -> Error (`Not_found `Bad_method) + +type headers = { content_type : string; extra : (string * string) list } +type format = [ `Json | `Proto | `Other of string ] + +let headers ?(extra = []) (format : format) = + { + content_type = + (match format with + | `Json -> "application/grpc+json" + | `Proto -> "application/grpc+proto" + | `Other s -> Printf.sprintf "application/grpc+%s" s); + extra; + } + +let headers_grpc_proto = headers `Proto + +type trailers = { + grpc_status : int; + grpc_message : string option; + extra : (string * string) list; +} + +let make_trailers ?(extra = []) status = + { + grpc_status = Grpc.Status.int_of_code (Grpc.Status.code status); + grpc_message = Grpc.Status.message status; + extra; + } + +let trailers_with_code code = + { + grpc_status = Grpc.Status.int_of_code code; + grpc_message = None; + extra = []; + } diff --git a/lib/grpc-server/grpc_server.mli b/lib/grpc-server/grpc_server.mli new file mode 100644 index 0000000..33d907c --- /dev/null +++ b/lib/grpc-server/grpc_server.mli @@ -0,0 +1,64 @@ +module Service : sig + type 'handler t + (** [t] represents a service. *) + + val v : unit -> 'handler t + (** [v ()] creates a new service. *) + + val add_rpc : name:string -> rpc:'handler -> 'handler t -> 'handler t + (** [add_rpc ~name ~rpc t] adds [rpc] to [t] and ensures that it is routable + via [name]. *) +end + +type error = + [ `Not_found of + [ `Service_not_found + | `Rpc_not_found of string + | `Invalid_url + | `Bad_method ] + | `Unsupported_media_type + | `Not_acceptable + | `Bad_request ] + +type 'handler t +(** [t] represents a server and its associated services and routing information. *) + +val v : unit -> 'handler t +(** [v ()] creates a new server. *) + +val add_service : + name:string -> service:'handler Service.t -> 'handler t -> 'handler t +(** [add_service ~name ~service t] adds [service] to [t] and ensures that it is + routable via [name]. *) + +val handle_request : + 'handler t -> + is_post_request:bool -> + get_header:(string -> string option) -> + path:string -> + ('handler, error) result +(** [handle_request t handler] handles a request using [handler] and the + services registered in [t]. *) + +(** Expert functionality. *) +module Expert : sig + type 'handler rpc_handler = string -> ('handler, error) result + + val add_service : name:string -> service:'a rpc_handler -> 'a t -> 'a t + (** [add_rpc ~name ~rpc t] adds [service] to [t] and ensures that it is + routable via [name]. *) +end + +type headers = { content_type : string; extra : (string * string) list } + +val headers : ?extra:(string * string) list -> Grpc.Message.format -> headers +val headers_grpc_proto : headers + +type trailers = { + grpc_status : int; + grpc_message : string option; + extra : (string * string) list; +} + +val make_trailers : ?extra:(string * string) list -> Grpc.Status.t -> trailers +val trailers_with_code : Grpc.Status.code -> trailers diff --git a/lib/grpc/dune b/lib/grpc/dune index 1170fe5..32defe6 100644 --- a/lib/grpc/dune +++ b/lib/grpc/dune @@ -3,4 +3,4 @@ (public_name grpc) (preprocess (pps ppx_deriving.show)) - (libraries h2 bigstringaf uri)) + (libraries bigstringaf uri)) diff --git a/lib/grpc/grpc.ml b/lib/grpc/grpc.ml index 00ca697..3ae2efa 100644 --- a/lib/grpc/grpc.ml +++ b/lib/grpc/grpc.ml @@ -1,4 +1,3 @@ -module Server = Server module Status = Status module Message = Message module Buffer = Buffer diff --git a/lib/grpc/message.ml b/lib/grpc/message.ml index 7c848fa..81e949b 100644 --- a/lib/grpc/message.ml +++ b/lib/grpc/message.ml @@ -12,8 +12,8 @@ let make content = Bytes.blit_string content 0 payload 5 content_len; Bytes.to_string payload -(** [extract_message buf] extracts the grpc message starting in [buf] - in the buffer if there is one *) +(** [extract_message buf] extracts the grpc message starting in [buf] in the + buffer if there is one *) let extract_message buf = if Buffer.length buf >= 5 then ( let compressed = @@ -36,8 +36,8 @@ let extract_message buf = else None) else None -(** [get_message_and_shift buf] tries to extract the first grpc message - from [buf] and if successful shifts these bytes out of the buffer *) +(** [get_message_and_shift buf] tries to extract the first grpc message from + [buf] and if successful shifts these bytes out of the buffer *) let get_message_and_shift buf = match extract_message buf with | None -> None @@ -57,3 +57,10 @@ let extract_all f buf = loop () in loop () + +type format = [ `Json | `Proto | `Other of string ] + +let format_to_content_type = function + | `Json -> "application/grpc+json" + | `Proto -> "application/grpc+proto" + | `Other s -> Printf.sprintf "application/grpc+%s" s diff --git a/lib/grpc/message.mli b/lib/grpc/message.mli index 05e5608..0900f8e 100644 --- a/lib/grpc/message.mli +++ b/lib/grpc/message.mli @@ -6,3 +6,8 @@ val extract : Buffer.t -> string option val extract_all : (string -> unit) -> Buffer.t -> unit (** [extract_all f b] extracts and calls [f] on all gRPC messages from [b]. *) + +type format = [ `Json | `Proto | `Other of string ] + +val format_to_content_type : format -> string +(** [format_to_content_type f] returns the content type for [f]. *) diff --git a/lib/grpc/server.ml b/lib/grpc/server.ml deleted file mode 100644 index aaea758..0000000 --- a/lib/grpc/server.ml +++ /dev/null @@ -1,14 +0,0 @@ -(** The type of a Server *) -module type S = sig - type t - (** [t] represents a server and its associated services and routing information. *) - - val v : unit -> t - (** [v ()] creates a new server. *) - - val add_service : name:string -> service:(H2.Reqd.t -> unit) -> t -> t - (** [add_service ~name ~service t] adds [service] to [t] and ensures that it is routable via [name]. *) - - val handle_request : t -> H2.Reqd.t -> unit - (** [handle_request t reqd] routes [reqd] to the appropriate service in [t] if available. *) -end diff --git a/lib/grpc/status.ml b/lib/grpc/status.ml index 91bbd39..636da56 100644 --- a/lib/grpc/status.ml +++ b/lib/grpc/status.ml @@ -63,9 +63,9 @@ let v ?message code = { code; message } let code t = t.code let message t = Option.map (fun message -> Uri.pct_encode message) t.message -let extract_status headers = +let extract_status ~get_header = let code, message = - match H2.Headers.get headers "grpc-status" with + match get_header "grpc-status" with | None -> (Unknown, Some "Expected gprc-status header, got nothing") | Some s -> ( match int_of_string_opt s with @@ -81,6 +81,6 @@ let extract_status headers = Printf.sprintf "Expected valid gprc-status code, got %i" i in (Unknown, Some msg) - | Some c -> (c, H2.Headers.get headers "grpc-message"))) + | Some c -> (c, get_header "grpc-message"))) in v ?message code diff --git a/lib/grpc/status.mli b/lib/grpc/status.mli index 5494327..6c0c4f3 100644 --- a/lib/grpc/status.mli +++ b/lib/grpc/status.mli @@ -38,6 +38,6 @@ val code : t -> code val message : t -> string option (** [message t] returns the message associated with [t], if there is one. *) -val extract_status : H2.Headers.t -> t -(** [extract_status headers] returns the status embedded in the headers, or a default +val extract_status : get_header:(string -> string option) -> t +(** [extract_status ~get_header] returns the status embedded in the headers, or a default when the status is invalid or missing. *) diff --git a/lib/grpc-lwt/client.ml b/lib/lwt/client.ml similarity index 100% rename from lib/grpc-lwt/client.ml rename to lib/lwt/client.ml diff --git a/lib/grpc-lwt/client.mli b/lib/lwt/client.mli similarity index 100% rename from lib/grpc-lwt/client.mli rename to lib/lwt/client.mli diff --git a/lib/grpc-lwt/connection.ml b/lib/lwt/connection.ml similarity index 100% rename from lib/grpc-lwt/connection.ml rename to lib/lwt/connection.ml diff --git a/lib/grpc-lwt/dune b/lib/lwt/dune similarity index 100% rename from lib/grpc-lwt/dune rename to lib/lwt/dune diff --git a/lib/grpc-lwt/grpc_lwt.ml b/lib/lwt/grpc_lwt.ml similarity index 100% rename from lib/grpc-lwt/grpc_lwt.ml rename to lib/lwt/grpc_lwt.ml diff --git a/lib/grpc-lwt/server.ml b/lib/lwt/server.ml similarity index 100% rename from lib/grpc-lwt/server.ml rename to lib/lwt/server.ml diff --git a/lib/grpc-lwt/server.mli b/lib/lwt/server.mli similarity index 100% rename from lib/grpc-lwt/server.mli rename to lib/lwt/server.mli