From 66e6c1092231e7d50d9ec6f72f4fff1e3adfdb88 Mon Sep 17 00:00:00 2001 From: Arjan Bal Date: Tue, 15 Jul 2025 12:44:03 +0530 Subject: [PATCH 01/13] Add tonic transport --- codegen/src/main.rs | 14 + grpc/Cargo.toml | 17 +- grpc/proto/echo/echo.proto | 43 ++ grpc/src/client/mod.rs | 1 + grpc/src/client/name_resolution/dns/test.rs | 8 + grpc/src/client/transport/mod.rs | 43 ++ grpc/src/client/transport/registry.rs | 65 +++ grpc/src/client/transport/tonic/mod.rs | 303 +++++++++++ grpc/src/client/transport/tonic/test.rs | 175 +++++++ grpc/src/codec.rs | 58 +++ grpc/src/generated/echo_fds.rs | 61 +++ grpc/src/generated/grpc_examples_echo.rs | 547 ++++++++++++++++++++ grpc/src/lib.rs | 8 + grpc/src/rt/hyper_wrapper.rs | 165 ++++++ grpc/src/rt/mod.rs | 23 +- grpc/src/rt/tokio/mod.rs | 68 ++- grpc/src/service.rs | 4 +- 17 files changed, 1598 insertions(+), 5 deletions(-) create mode 100644 grpc/proto/echo/echo.proto create mode 100644 grpc/src/client/transport/mod.rs create mode 100644 grpc/src/client/transport/registry.rs create mode 100644 grpc/src/client/transport/tonic/mod.rs create mode 100644 grpc/src/client/transport/tonic/test.rs create mode 100644 grpc/src/codec.rs create mode 100644 grpc/src/generated/echo_fds.rs create mode 100644 grpc/src/generated/grpc_examples_echo.rs create mode 100644 grpc/src/rt/hyper_wrapper.rs diff --git a/codegen/src/main.rs b/codegen/src/main.rs index 9dd7f322a..776e67608 100644 --- a/codegen/src/main.rs +++ b/codegen/src/main.rs @@ -62,6 +62,20 @@ fn main() { false, false, ); + + // grpc + codegen( + &PathBuf::from(std::env!("CARGO_MANIFEST_DIR")) + .parent() + .unwrap() + .join("grpc"), + &["proto/echo/echo.proto"], + &["proto"], + &PathBuf::from("src/generated"), + &PathBuf::from("src/generated/echo_fds.rs"), + true, + true, + ); } fn codegen( diff --git a/grpc/Cargo.toml b/grpc/Cargo.toml index 5438e7176..4544083d3 100644 --- a/grpc/Cargo.toml +++ b/grpc/Cargo.toml @@ -16,9 +16,24 @@ hickory-resolver = { version = "0.25.1", optional = true } rand = "0.9" parking_lot = "0.12.4" bytes = "1.10.1" +futures = "0.3.31" +hyper-util = "0.1.14" +hyper = { version = "1.6.0", features = ["client", "http2"] } +pin-project-lite = "0.2.16" +tokio-stream = "0.1.17" +http = "1.1.0" +tower = { version = "0.5.2", features = ["buffer", "limit", "util"] } +tower-service = "0.3.3" +socket2 = "0.5.10" [dev-dependencies] hickory-server = "0.25.2" +prost = "0.14" +async-stream = "0.3.6" +tonic = { version = "0.14.0", path = "../tonic", default-features = false, features = ["prost", "server", "router"] } + +[build-dependencies] +tonic-build = { path = "../tonic-build" } [features] default = ["dns"] @@ -28,4 +43,4 @@ dns = ["dep:hickory-resolver"] allowed_external_types = [ "tonic::*", "futures_core::stream::Stream", -] \ No newline at end of file +] diff --git a/grpc/proto/echo/echo.proto b/grpc/proto/echo/echo.proto new file mode 100644 index 000000000..1ed1207f0 --- /dev/null +++ b/grpc/proto/echo/echo.proto @@ -0,0 +1,43 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +syntax = "proto3"; + +package grpc.examples.echo; + +// EchoRequest is the request for echo. +message EchoRequest { + string message = 1; +} + +// EchoResponse is the response for echo. +message EchoResponse { + string message = 1; +} + +// Echo is the echo service. +service Echo { + // UnaryEcho is unary echo. + rpc UnaryEcho(EchoRequest) returns (EchoResponse) {} + // ServerStreamingEcho is server side streaming. + rpc ServerStreamingEcho(EchoRequest) returns (stream EchoResponse) {} + // ClientStreamingEcho is client side streaming. + rpc ClientStreamingEcho(stream EchoRequest) returns (EchoResponse) {} + // BidirectionalStreamingEcho is bidi streaming. + rpc BidirectionalStreamingEcho(stream EchoRequest) returns (stream EchoResponse) {} +} diff --git a/grpc/src/client/mod.rs b/grpc/src/client/mod.rs index a7b722556..d3006d58c 100644 --- a/grpc/src/client/mod.rs +++ b/grpc/src/client/mod.rs @@ -29,6 +29,7 @@ pub(crate) mod load_balancing; pub(crate) mod name_resolution; pub mod service; pub mod service_config; +pub(crate) mod transport; /// A representation of the current state of a gRPC channel, also used for the /// state of subchannels (individual connections within the channel). diff --git a/grpc/src/client/name_resolution/dns/test.rs b/grpc/src/client/name_resolution/dns/test.rs index 952e30f86..4b5d6226f 100644 --- a/grpc/src/client/name_resolution/dns/test.rs +++ b/grpc/src/client/name_resolution/dns/test.rs @@ -293,6 +293,14 @@ impl rt::Runtime for FakeRuntime { fn sleep(&self, duration: std::time::Duration) -> Pin> { self.inner.sleep(duration) } + + fn tcp_stream( + &self, + target: std::net::SocketAddr, + opts: rt::TcpOptions, + ) -> Pin, String>> + Send>> { + self.inner.tcp_stream(target, opts) + } } #[tokio::test] diff --git a/grpc/src/client/transport/mod.rs b/grpc/src/client/transport/mod.rs new file mode 100644 index 000000000..e79ea58da --- /dev/null +++ b/grpc/src/client/transport/mod.rs @@ -0,0 +1,43 @@ +use std::{sync::Arc, time::Duration}; + +use crate::{rt::Runtime, service::Service}; + +mod registry; +mod tonic; + +use ::tonic::async_trait; +use tokio::sync::oneshot; + +pub(crate) struct ConnectedTransport { + pub service: Box, + pub disconnection_listener: oneshot::Receiver>, +} + +// TODO: The following options are specific to HTTP/2. We should +// instead pass an `Attribute` like struct to the connect method instead which +// can hold config relevant to a particular transport. +#[derive(Default)] +pub(crate) struct TransportOptions { + pub init_stream_window_size: Option, + pub init_connection_window_size: Option, + pub http2_keep_alive_interval: Option, + pub http2_keep_alive_timeout: Option, + pub http2_keep_alive_while_idle: Option, + pub http2_max_header_list_size: Option, + pub http2_adaptive_window: Option, + pub concurrency_limit: Option, + pub rate_limit: Option<(u64, Duration)>, + pub tcp_keepalive: Option, + pub tcp_nodelay: bool, + pub connect_timeout: Option, +} + +#[async_trait] +pub(crate) trait Transport: Send + Sync { + async fn connect( + &self, + address: String, + runtime: Arc, + opts: &TransportOptions, + ) -> Result; +} diff --git a/grpc/src/client/transport/registry.rs b/grpc/src/client/transport/registry.rs new file mode 100644 index 000000000..fde977077 --- /dev/null +++ b/grpc/src/client/transport/registry.rs @@ -0,0 +1,65 @@ +use std::{ + collections::HashMap, + sync::{Arc, Mutex, OnceLock}, +}; + +use super::Transport; + +/// A registry to store and retrieve transports. Transports are indexed by +/// the address type they are intended to handle. +#[derive(Clone)] +pub struct TransportRegistry { + m: Arc>>>, +} + +impl std::fmt::Debug for TransportRegistry { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let m = self.m.lock().unwrap(); + for key in m.keys() { + write!(f, "k: {:?}", key)? + } + Ok(()) + } +} + +impl TransportRegistry { + /// Construct an empty name resolver registry. + pub fn new() -> Self { + Self { m: Arc::default() } + } + + /// Add a name resolver into the registry. + pub fn add_transport(&self, address_type: &str, transport: impl Transport + 'static) { + self.m + .lock() + .unwrap() + .insert(address_type.to_string(), Arc::new(transport)); + } + + /// Retrieve a name resolver from the registry, or None if not found. + pub fn get_transport(&self, address_type: &str) -> Result, String> { + self.m + .lock() + .unwrap() + .get(address_type) + .ok_or(format!( + "no transport found for address type {address_type}" + )) + .cloned() + } +} + +impl Default for TransportRegistry { + fn default() -> Self { + Self::new() + } +} + +/// The registry used if a local registry is not provided to a channel or if it +/// does not exist in the local registry. +static GLOBAL_TRANSPORT_REGISTRY: OnceLock = OnceLock::new(); + +/// Global registry for resolver builders. +pub fn global_registry() -> &'static TransportRegistry { + GLOBAL_TRANSPORT_REGISTRY.get_or_init(TransportRegistry::new) +} diff --git a/grpc/src/client/transport/tonic/mod.rs b/grpc/src/client/transport/tonic/mod.rs new file mode 100644 index 000000000..e989f816b --- /dev/null +++ b/grpc/src/client/transport/tonic/mod.rs @@ -0,0 +1,303 @@ +use crate::client::transport::registry::global_registry; +use crate::client::transport::ConnectedTransport; +use crate::client::transport::Transport; +use crate::client::transport::TransportOptions; +use crate::codec::BytesCodec; +use crate::rt::TcpOptions; +use crate::service::Message; +use crate::service::Request as GrpcRequest; +use crate::service::Response as GrpcResponse; +use bytes::Bytes; +use futures::stream::StreamExt; +use futures::Stream; +use http::uri::PathAndQuery; +use http::Request as HttpRequest; +use http::Response as HttpResponse; +use http::Uri; +use hyper::client::conn::http2::Builder; +use hyper::client::conn::http2::SendRequest; +use std::any::Any; +use std::{ + error::Error, + future::Future, + net::SocketAddr, + pin::Pin, + str::FromStr, + sync::Arc, + task::{Context, Poll}, +}; +use tonic::Request as TonicRequest; +use tonic::Response as TonicResponse; +use tonic::Streaming; +use tower::{ + buffer::{future::ResponseFuture as BufferResponseFuture, Buffer}, + limit::{ConcurrencyLimitLayer, RateLimitLayer}, + util::BoxService, + ServiceBuilder, +}; +use tower_service::Service as TowerService; + +use crate::{ + client::name_resolution::TCP_IP_NETWORK_TYPE, + rt::{ + self, + hyper_wrapper::{HyperCompatExec, HyperCompatTimer, HyperStream}, + }, + service::Service, +}; +use tokio::sync::oneshot; +use tonic::client::GrpcService; +use tonic::{async_trait, body::Body, client::Grpc, Status}; + +#[cfg(test)] +mod test; + +const DEFAULT_BUFFER_SIZE: usize = 1024; +pub(crate) type BoxError = Box; + +type BoxFuture<'a, T> = Pin + Send + 'a>>; + +pub(crate) fn reg() { + global_registry().add_transport(TCP_IP_NETWORK_TYPE, TransportBuilder {}); +} + +struct TransportBuilder {} + +struct TonicTransport { + grpc: Grpc, + task_handle: Box, +} + +impl Drop for TonicTransport { + fn drop(&mut self) { + self.task_handle.abort(); + } +} + +#[async_trait] +impl Service for TonicTransport { + async fn call(&self, method: String, request: GrpcRequest) -> GrpcResponse { + let mut grpc = self.grpc.clone(); + if let Err(e) = grpc.ready().await { + let err = Status::unknown(format!("Service was not ready: {}", e.to_string())); + return create_error_response(err); + }; + let path = if let Ok(p) = PathAndQuery::from_maybe_shared(method) { + p + } else { + let err = Status::internal("Failed to parse path"); + return create_error_response(err); + }; + let request = convert_request(request); + let response = grpc.streaming(request, path, BytesCodec {}).await; + convert_response(response) + } +} + +/// Helper function to create an error response stream. +fn create_error_response(status: Status) -> GrpcResponse { + let stream = futures::stream::once(async { Err(status) }); + TonicResponse::new(Box::pin(stream)) +} + +fn convert_request(req: GrpcRequest) -> TonicRequest + Send>>> { + let (metadata, extensions) = (req.metadata().clone(), req.extensions().clone()); + let stream = req.into_inner(); + + let bytes_stream = Box::pin(stream.filter_map(|msg| async { + let downcast_result = (msg as Box).downcast::(); + + match downcast_result { + Ok(boxed_bytes) => Some(*boxed_bytes), + + // If it fails, log the error and return None to filter it out. + Err(_) => { + eprintln!("A message could not be downcast to Bytes and was skipped."); + None + } + } + })); + + let mut new_req = TonicRequest::new(bytes_stream as _); + *new_req.metadata_mut() = metadata; + *new_req.extensions_mut() = extensions; + new_req +} + +fn convert_response(res: Result>, Status>) -> GrpcResponse { + let response = match res { + Ok(s) => s, + Err(e) => { + let stream = futures::stream::once(async { Err(e) }); + return TonicResponse::new(Box::pin(stream)); + } + }; + let (metadata, extensions) = (response.metadata().clone(), response.extensions().clone()); + let stream = response.into_inner(); + let message_stream: Pin, Status>> + Send>> = + Box::pin(stream.map(|msg| { + let res = msg.map(|b| { + let msg: Box = Box::new(b); + msg + }); + res + })); + let mut new_res = TonicResponse::new(message_stream); + *new_res.metadata_mut() = metadata; + *new_res.extensions_mut() = extensions; + new_res +} + +#[async_trait] +impl Transport for TransportBuilder { + async fn connect( + &self, + address: String, + runtime: Arc, + opts: &TransportOptions, + ) -> Result { + let runtime = runtime.clone(); + let mut settings = Builder::::new(HyperCompatExec { + inner: runtime.clone(), + }) + .timer(HyperCompatTimer { + inner: runtime.clone(), + }) + .initial_stream_window_size(opts.init_stream_window_size) + .initial_connection_window_size(opts.init_connection_window_size) + .keep_alive_interval(opts.http2_keep_alive_interval) + .clone(); + + if let Some(val) = opts.http2_keep_alive_timeout { + settings.keep_alive_timeout(val); + } + + if let Some(val) = opts.http2_keep_alive_while_idle { + settings.keep_alive_while_idle(val); + } + + if let Some(val) = opts.http2_adaptive_window { + settings.adaptive_window(val); + } + + if let Some(val) = opts.http2_max_header_list_size { + settings.max_header_list_size(val); + } + + let addr: SocketAddr = SocketAddr::from_str(&address).map_err(|err| err.to_string())?; + let tcp_stream_fut = runtime.tcp_stream( + addr, + TcpOptions { + enable_nodelay: opts.tcp_nodelay, + keepalive: opts.tcp_keepalive, + }, + ); + let tcp_stream = if let Some(timeout) = opts.connect_timeout { + tokio::select! { + _ = runtime.sleep(timeout) => { + return Err("timed out waiting for TCP stream to connect".to_string()) + } + tcp_stream = tcp_stream_fut => { tcp_stream? } + } + } else { + tcp_stream_fut.await? + }; + let tcp_stream = HyperStream::new(tcp_stream); + + let (sender, connection) = settings + .handshake(tcp_stream) + .await + .map_err(|err| err.to_string())?; + let (tx, rx) = oneshot::channel(); + + let task_handle = runtime.spawn(Box::pin(async move { + if let Err(err) = connection.await { + let _ = tx.send(Err(err.to_string())); + } else { + let _ = tx.send(Ok(())); + } + })); + let sender = SendRequestWrapper::from(sender); + + let service = ServiceBuilder::new() + .option_layer(opts.concurrency_limit.map(ConcurrencyLimitLayer::new)) + .option_layer(opts.rate_limit.map(|(l, d)| RateLimitLayer::new(l, d))) + .map_err(Into::::into) + .service(sender); + + let service = BoxService::new(service); + let (service, worker) = Buffer::pair(service, DEFAULT_BUFFER_SIZE); + runtime.spawn(Box::pin(worker)); + let uri = + Uri::from_maybe_shared(format!("http://{}", &address)).map_err(|e| e.to_string())?; // TODO: err msg + let grpc = Grpc::with_origin(TonicService { inner: service }, uri); + + let service = TonicTransport { grpc, task_handle }; + Ok(ConnectedTransport { + service: Box::new(service), + disconnection_listener: rx, + }) + } +} + +struct SendRequestWrapper { + inner: SendRequest, +} + +impl From> for SendRequestWrapper { + fn from(inner: SendRequest) -> Self { + Self { inner } + } +} + +impl TowerService> for SendRequestWrapper { + type Response = HttpResponse; + type Error = BoxError; + type Future = BoxFuture<'static, Result>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx).map_err(Into::into) + } + + fn call(&mut self, req: http::Request) -> Self::Future { + let fut = self.inner.send_request(req); + + Box::pin(async move { fut.await.map_err(Into::into).map(|res| res.map(Body::new)) }) + } +} + +#[derive(Clone)] +struct TonicService { + inner: Buffer, BoxFuture<'static, Result, BoxError>>>, +} + +impl GrpcService for TonicService { + type ResponseBody = Body; + type Error = BoxError; + type Future = ResponseFuture; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + tower::Service::poll_ready(&mut self.inner, cx) + } + + fn call(&mut self, request: http::Request) -> Self::Future { + ResponseFuture { + inner: tower::Service::call(&mut self.inner, request), + } + } +} + +/// A future that resolves to an HTTP response. +/// +/// This is returned by the `Service::call` on [`Channel`]. +pub struct ResponseFuture { + inner: BufferResponseFuture, BoxError>>>, +} + +impl Future for ResponseFuture { + type Output = Result, BoxError>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Pin::new(&mut self.inner).poll(cx) + } +} diff --git a/grpc/src/client/transport/tonic/test.rs b/grpc/src/client/transport/tonic/test.rs new file mode 100644 index 000000000..9cc5694c8 --- /dev/null +++ b/grpc/src/client/transport/tonic/test.rs @@ -0,0 +1,175 @@ +use crate::client::name_resolution::TCP_IP_NETWORK_TYPE; +use crate::client::transport::registry::global_registry; +use crate::echo_pb::echo_server::{Echo, EchoServer}; +use crate::service::Request as GrpcRequest; + +use crate::echo_pb::{EchoRequest, EchoResponse}; +use crate::service::Message; + +use bytes::Bytes; +use prost::Message as ProstMessage; +use std::any::Any; +use std::{pin::Pin, sync::Arc, time::Duration}; +use tokio::net::TcpListener; +use tokio::sync::{mpsc, oneshot, Notify}; +use tokio::time::timeout; +use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; + +use tonic::async_trait; +use tonic::{transport::Server, Request, Response, Status}; + +use crate::{client::transport::TransportOptions, rt::tokio::TokioRuntime}; + +const DEFAULT_TEST_DURATION: Duration = Duration::from_secs(10); +const DEFAULT_TEST_SHORT_DURATION: Duration = Duration::from_millis(10); + +// Tests the tonic transport by creating a bi-di stream with a tonic server. +#[tokio::test] +pub async fn tonic_transport_rpc() { + super::reg(); + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); // get the assigned address + let shutdown_notify = Arc::new(Notify::new()); + let shutdown_notify_copy = shutdown_notify.clone(); + println!("EchoServer listening on: {}", addr); + let server_handle = tokio::spawn(async move { + let echo_server = EchoService {}; + let svc = EchoServer::new(echo_server); + let _ = Server::builder() + .add_service(svc) + .serve_with_incoming_shutdown( + tokio_stream::wrappers::TcpListenerStream::new(listener), + shutdown_notify_copy.notified(), + ) + .await; + }); + + let builder = global_registry() + .get_transport(TCP_IP_NETWORK_TYPE) + .unwrap(); + let config = Arc::new(TransportOptions::default()); + let mut connected_transport = builder + .connect(addr.to_string(), Arc::new(TokioRuntime {}), &config) + .await + .unwrap(); + let conn = connected_transport.service; + + let (tx, rx) = mpsc::channel::>(1); + + // Convert the mpsc receiver into a Stream + let outbound: GrpcRequest = + Request::new(Box::pin(tokio_stream::wrappers::ReceiverStream::new(rx))); + + let mut inbound = conn + .call( + "/grpc.examples.echo.Echo/BidirectionalStreamingEcho".to_string(), + outbound, + ) + .await + .into_inner(); + + // Spawn a sender task + let client_handle = tokio::spawn(async move { + for i in 0..5 { + let message = format!("message {}", i); + let request = EchoRequest { + message: message.clone(), + }; + + let bytes = Bytes::from(request.encode_to_vec()); + + println!("Sent request: {:?}", request); + if let Err(_) = tx.send(Box::new(bytes)).await { + panic!("Receiver dropped"); + } + + // Wait for the reply + match inbound.next().await { + Some(Ok(resp)) => { + let bytes = (resp as Box).downcast::().unwrap(); + let echo_reponse = EchoResponse::decode(bytes).unwrap(); + println!("Got response: {:?}", echo_reponse); + assert_eq!(echo_reponse.message, message); + } + Some(Err(status)) => { + panic!("Error from server: {:?}", status); + } + None => { + panic!("Server closed the stream"); + } + } + } + }); + + client_handle.await.unwrap(); + // The connection should break only after the server is stopped. + assert_eq!( + connected_transport.disconnection_listener.try_recv(), + Err(oneshot::error::TryRecvError::Empty), + ); + shutdown_notify.notify_waiters(); + let res = timeout( + DEFAULT_TEST_DURATION, + connected_transport.disconnection_listener, + ) + .await + .unwrap() + .unwrap(); + assert_eq!(res, Ok(())); + server_handle.await.unwrap(); +} + +#[derive(Debug)] +pub struct EchoService {} + +#[async_trait] +impl Echo for EchoService { + async fn unary_echo( + &self, + _: tonic::Request, + ) -> std::result::Result, tonic::Status> { + unimplemented!() + } + + type ServerStreamingEchoStream = ReceiverStream>; + + async fn server_streaming_echo( + &self, + _: tonic::Request, + ) -> std::result::Result, tonic::Status> { + unimplemented!() + } + + async fn client_streaming_echo( + &self, + _: tonic::Request>, + ) -> std::result::Result, tonic::Status> { + unimplemented!() + } + type BidirectionalStreamingEchoStream = + Pin> + Send + 'static>>; + + async fn bidirectional_streaming_echo( + &self, + request: tonic::Request>, + ) -> std::result::Result, tonic::Status> + { + let mut inbound = request.into_inner(); + + // Map each request to a corresponding EchoResponse + let outbound = async_stream::try_stream! { + while let Some(req) = inbound.next().await { + let req = req?; // Return Err(Status) if stream item is error + let reply = EchoResponse { + message: req.message.clone(), + }; + yield reply; + } + println!("Server closing stream"); + }; + + Ok(Response::new( + Box::pin(outbound) as Self::BidirectionalStreamingEchoStream + )) + } +} diff --git a/grpc/src/codec.rs b/grpc/src/codec.rs new file mode 100644 index 000000000..dab008a85 --- /dev/null +++ b/grpc/src/codec.rs @@ -0,0 +1,58 @@ +use bytes::{Buf, BufMut, Bytes}; +use tonic::{ + codec::{Codec, Decoder, EncodeBuf, Encoder}, + Status, +}; + +/// An adapter for sending and receiving messages as bytes using tonic. +/// Coding/decoding is handled within gRPC. +/// TODO: Remove this when tonic allows access to bytes without requiring a +/// codec. +pub(crate) struct BytesCodec {} + +impl Codec for BytesCodec { + type Encode = Bytes; + + type Decode = Bytes; + + type Encoder = BytesEncoder; + + type Decoder = BytesDecoder; + + fn encoder(&mut self) -> Self::Encoder { + BytesEncoder {} + } + + fn decoder(&mut self) -> Self::Decoder { + BytesDecoder {} + } +} + +pub struct BytesEncoder {} + +impl Encoder for BytesEncoder { + type Item = Bytes; + + type Error = Status; + + fn encode(&mut self, item: Self::Item, dst: &mut EncodeBuf<'_>) -> Result<(), Self::Error> { + dst.put_slice(&item); + Ok(()) + } +} + +#[derive(Debug)] +pub struct BytesDecoder {} + +impl Decoder for BytesDecoder { + type Item = Bytes; + + type Error = Status; + + fn decode( + &mut self, + src: &mut tonic::codec::DecodeBuf<'_>, + ) -> Result, Self::Error> { + Ok(Some(src.copy_to_bytes(src.remaining()))) + } +} diff --git a/grpc/src/generated/echo_fds.rs b/grpc/src/generated/echo_fds.rs new file mode 100644 index 000000000..9833d2636 --- /dev/null +++ b/grpc/src/generated/echo_fds.rs @@ -0,0 +1,61 @@ +// This file is @generated by codegen. +// +// +// Copyright 2018 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// +/// Byte encoded FILE_DESCRIPTOR_SET. +pub const FILE_DESCRIPTOR_SET: &[u8] = &[ + 10u8, 246u8, 3u8, 10u8, 15u8, 101u8, 99u8, 104u8, 111u8, 47u8, 101u8, 99u8, 104u8, + 111u8, 46u8, 112u8, 114u8, 111u8, 116u8, 111u8, 18u8, 18u8, 103u8, 114u8, 112u8, + 99u8, 46u8, 101u8, 120u8, 97u8, 109u8, 112u8, 108u8, 101u8, 115u8, 46u8, 101u8, 99u8, + 104u8, 111u8, 34u8, 39u8, 10u8, 11u8, 69u8, 99u8, 104u8, 111u8, 82u8, 101u8, 113u8, + 117u8, 101u8, 115u8, 116u8, 18u8, 24u8, 10u8, 7u8, 109u8, 101u8, 115u8, 115u8, 97u8, + 103u8, 101u8, 24u8, 1u8, 32u8, 1u8, 40u8, 9u8, 82u8, 7u8, 109u8, 101u8, 115u8, 115u8, + 97u8, 103u8, 101u8, 34u8, 40u8, 10u8, 12u8, 69u8, 99u8, 104u8, 111u8, 82u8, 101u8, + 115u8, 112u8, 111u8, 110u8, 115u8, 101u8, 18u8, 24u8, 10u8, 7u8, 109u8, 101u8, 115u8, + 115u8, 97u8, 103u8, 101u8, 24u8, 1u8, 32u8, 1u8, 40u8, 9u8, 82u8, 7u8, 109u8, 101u8, + 115u8, 115u8, 97u8, 103u8, 101u8, 50u8, 243u8, 2u8, 10u8, 4u8, 69u8, 99u8, 104u8, + 111u8, 18u8, 78u8, 10u8, 9u8, 85u8, 110u8, 97u8, 114u8, 121u8, 69u8, 99u8, 104u8, + 111u8, 18u8, 31u8, 46u8, 103u8, 114u8, 112u8, 99u8, 46u8, 101u8, 120u8, 97u8, 109u8, + 112u8, 108u8, 101u8, 115u8, 46u8, 101u8, 99u8, 104u8, 111u8, 46u8, 69u8, 99u8, 104u8, + 111u8, 82u8, 101u8, 113u8, 117u8, 101u8, 115u8, 116u8, 26u8, 32u8, 46u8, 103u8, + 114u8, 112u8, 99u8, 46u8, 101u8, 120u8, 97u8, 109u8, 112u8, 108u8, 101u8, 115u8, + 46u8, 101u8, 99u8, 104u8, 111u8, 46u8, 69u8, 99u8, 104u8, 111u8, 82u8, 101u8, 115u8, + 112u8, 111u8, 110u8, 115u8, 101u8, 18u8, 90u8, 10u8, 19u8, 83u8, 101u8, 114u8, 118u8, + 101u8, 114u8, 83u8, 116u8, 114u8, 101u8, 97u8, 109u8, 105u8, 110u8, 103u8, 69u8, + 99u8, 104u8, 111u8, 18u8, 31u8, 46u8, 103u8, 114u8, 112u8, 99u8, 46u8, 101u8, 120u8, + 97u8, 109u8, 112u8, 108u8, 101u8, 115u8, 46u8, 101u8, 99u8, 104u8, 111u8, 46u8, 69u8, + 99u8, 104u8, 111u8, 82u8, 101u8, 113u8, 117u8, 101u8, 115u8, 116u8, 26u8, 32u8, 46u8, + 103u8, 114u8, 112u8, 99u8, 46u8, 101u8, 120u8, 97u8, 109u8, 112u8, 108u8, 101u8, + 115u8, 46u8, 101u8, 99u8, 104u8, 111u8, 46u8, 69u8, 99u8, 104u8, 111u8, 82u8, 101u8, + 115u8, 112u8, 111u8, 110u8, 115u8, 101u8, 48u8, 1u8, 18u8, 90u8, 10u8, 19u8, 67u8, + 108u8, 105u8, 101u8, 110u8, 116u8, 83u8, 116u8, 114u8, 101u8, 97u8, 109u8, 105u8, + 110u8, 103u8, 69u8, 99u8, 104u8, 111u8, 18u8, 31u8, 46u8, 103u8, 114u8, 112u8, 99u8, + 46u8, 101u8, 120u8, 97u8, 109u8, 112u8, 108u8, 101u8, 115u8, 46u8, 101u8, 99u8, + 104u8, 111u8, 46u8, 69u8, 99u8, 104u8, 111u8, 82u8, 101u8, 113u8, 117u8, 101u8, + 115u8, 116u8, 26u8, 32u8, 46u8, 103u8, 114u8, 112u8, 99u8, 46u8, 101u8, 120u8, 97u8, + 109u8, 112u8, 108u8, 101u8, 115u8, 46u8, 101u8, 99u8, 104u8, 111u8, 46u8, 69u8, 99u8, + 104u8, 111u8, 82u8, 101u8, 115u8, 112u8, 111u8, 110u8, 115u8, 101u8, 40u8, 1u8, 18u8, + 99u8, 10u8, 26u8, 66u8, 105u8, 100u8, 105u8, 114u8, 101u8, 99u8, 116u8, 105u8, 111u8, + 110u8, 97u8, 108u8, 83u8, 116u8, 114u8, 101u8, 97u8, 109u8, 105u8, 110u8, 103u8, + 69u8, 99u8, 104u8, 111u8, 18u8, 31u8, 46u8, 103u8, 114u8, 112u8, 99u8, 46u8, 101u8, + 120u8, 97u8, 109u8, 112u8, 108u8, 101u8, 115u8, 46u8, 101u8, 99u8, 104u8, 111u8, + 46u8, 69u8, 99u8, 104u8, 111u8, 82u8, 101u8, 113u8, 117u8, 101u8, 115u8, 116u8, 26u8, + 32u8, 46u8, 103u8, 114u8, 112u8, 99u8, 46u8, 101u8, 120u8, 97u8, 109u8, 112u8, 108u8, + 101u8, 115u8, 46u8, 101u8, 99u8, 104u8, 111u8, 46u8, 69u8, 99u8, 104u8, 111u8, 82u8, + 101u8, 115u8, 112u8, 111u8, 110u8, 115u8, 101u8, 40u8, 1u8, 48u8, 1u8, 98u8, 6u8, + 112u8, 114u8, 111u8, 116u8, 111u8, 51u8, +]; diff --git a/grpc/src/generated/grpc_examples_echo.rs b/grpc/src/generated/grpc_examples_echo.rs new file mode 100644 index 000000000..5ab660c2a --- /dev/null +++ b/grpc/src/generated/grpc_examples_echo.rs @@ -0,0 +1,547 @@ +// This file is @generated by prost-build. +/// EchoRequest is the request for echo. +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct EchoRequest { + #[prost(string, tag = "1")] + pub message: ::prost::alloc::string::String, +} +/// EchoResponse is the response for echo. +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct EchoResponse { + #[prost(string, tag = "1")] + pub message: ::prost::alloc::string::String, +} +/// Generated client implementations. +pub mod echo_client { + #![allow( + unused_variables, + dead_code, + missing_docs, + clippy::wildcard_imports, + clippy::let_unit_value, + )] + use tonic::codegen::*; + use tonic::codegen::http::Uri; + /// Echo is the echo service. + #[derive(Debug, Clone)] + pub struct EchoClient { + inner: tonic::client::Grpc, + } + impl EchoClient + where + T: tonic::client::GrpcService, + T::Error: Into, + T::ResponseBody: Body + std::marker::Send + 'static, + ::Error: Into + std::marker::Send, + { + pub fn new(inner: T) -> Self { + let inner = tonic::client::Grpc::new(inner); + Self { inner } + } + pub fn with_origin(inner: T, origin: Uri) -> Self { + let inner = tonic::client::Grpc::with_origin(inner, origin); + Self { inner } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> EchoClient> + where + F: tonic::service::Interceptor, + T::ResponseBody: Default, + T: tonic::codegen::Service< + http::Request, + Response = http::Response< + >::ResponseBody, + >, + >, + , + >>::Error: Into + std::marker::Send + std::marker::Sync, + { + EchoClient::new(InterceptedService::new(inner, interceptor)) + } + /// Compress requests with the given encoding. + /// + /// This requires the server to support it otherwise it might respond with an + /// error. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.send_compressed(encoding); + self + } + /// Enable decompressing responses. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.accept_compressed(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_decoding_message_size(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_encoding_message_size(limit); + self + } + /// UnaryEcho is unary echo. + pub async fn unary_echo( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/grpc.examples.echo.Echo/UnaryEcho", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("grpc.examples.echo.Echo", "UnaryEcho")); + self.inner.unary(req, path, codec).await + } + /// ServerStreamingEcho is server side streaming. + pub async fn server_streaming_echo( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response>, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/grpc.examples.echo.Echo/ServerStreamingEcho", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new("grpc.examples.echo.Echo", "ServerStreamingEcho"), + ); + self.inner.server_streaming(req, path, codec).await + } + /// ClientStreamingEcho is client side streaming. + pub async fn client_streaming_echo( + &mut self, + request: impl tonic::IntoStreamingRequest, + ) -> std::result::Result, tonic::Status> { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/grpc.examples.echo.Echo/ClientStreamingEcho", + ); + let mut req = request.into_streaming_request(); + req.extensions_mut() + .insert( + GrpcMethod::new("grpc.examples.echo.Echo", "ClientStreamingEcho"), + ); + self.inner.client_streaming(req, path, codec).await + } + /// BidirectionalStreamingEcho is bidi streaming. + pub async fn bidirectional_streaming_echo( + &mut self, + request: impl tonic::IntoStreamingRequest, + ) -> std::result::Result< + tonic::Response>, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/grpc.examples.echo.Echo/BidirectionalStreamingEcho", + ); + let mut req = request.into_streaming_request(); + req.extensions_mut() + .insert( + GrpcMethod::new( + "grpc.examples.echo.Echo", + "BidirectionalStreamingEcho", + ), + ); + self.inner.streaming(req, path, codec).await + } + } +} +/// Generated server implementations. +pub mod echo_server { + #![allow( + unused_variables, + dead_code, + missing_docs, + clippy::wildcard_imports, + clippy::let_unit_value, + )] + use tonic::codegen::*; + /// Generated trait containing gRPC methods that should be implemented for use with EchoServer. + #[async_trait] + pub trait Echo: std::marker::Send + std::marker::Sync + 'static { + /// UnaryEcho is unary echo. + async fn unary_echo( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; + /// Server streaming response type for the ServerStreamingEcho method. + type ServerStreamingEchoStream: tonic::codegen::tokio_stream::Stream< + Item = std::result::Result, + > + + std::marker::Send + + 'static; + /// ServerStreamingEcho is server side streaming. + async fn server_streaming_echo( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + /// ClientStreamingEcho is client side streaming. + async fn client_streaming_echo( + &self, + request: tonic::Request>, + ) -> std::result::Result, tonic::Status>; + /// Server streaming response type for the BidirectionalStreamingEcho method. + type BidirectionalStreamingEchoStream: tonic::codegen::tokio_stream::Stream< + Item = std::result::Result, + > + + std::marker::Send + + 'static; + /// BidirectionalStreamingEcho is bidi streaming. + async fn bidirectional_streaming_echo( + &self, + request: tonic::Request>, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + } + /// Echo is the echo service. + #[derive(Debug)] + pub struct EchoServer { + inner: Arc, + accept_compression_encodings: EnabledCompressionEncodings, + send_compression_encodings: EnabledCompressionEncodings, + max_decoding_message_size: Option, + max_encoding_message_size: Option, + } + impl EchoServer { + pub fn new(inner: T) -> Self { + Self::from_arc(Arc::new(inner)) + } + pub fn from_arc(inner: Arc) -> Self { + Self { + inner, + accept_compression_encodings: Default::default(), + send_compression_encodings: Default::default(), + max_decoding_message_size: None, + max_encoding_message_size: None, + } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> InterceptedService + where + F: tonic::service::Interceptor, + { + InterceptedService::new(Self::new(inner), interceptor) + } + /// Enable decompressing requests with the given encoding. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.accept_compression_encodings.enable(encoding); + self + } + /// Compress responses with the given encoding, if the client supports it. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.send_compression_encodings.enable(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.max_decoding_message_size = Some(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.max_encoding_message_size = Some(limit); + self + } + } + impl tonic::codegen::Service> for EchoServer + where + T: Echo, + B: Body + std::marker::Send + 'static, + B::Error: Into + std::marker::Send + 'static, + { + type Response = http::Response; + type Error = std::convert::Infallible; + type Future = BoxFuture; + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } + fn call(&mut self, req: http::Request) -> Self::Future { + match req.uri().path() { + "/grpc.examples.echo.Echo/UnaryEcho" => { + #[allow(non_camel_case_types)] + struct UnaryEchoSvc(pub Arc); + impl tonic::server::UnaryService + for UnaryEchoSvc { + type Response = super::EchoResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::unary_echo(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = UnaryEchoSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/grpc.examples.echo.Echo/ServerStreamingEcho" => { + #[allow(non_camel_case_types)] + struct ServerStreamingEchoSvc(pub Arc); + impl< + T: Echo, + > tonic::server::ServerStreamingService + for ServerStreamingEchoSvc { + type Response = super::EchoResponse; + type ResponseStream = T::ServerStreamingEchoStream; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::server_streaming_echo(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = ServerStreamingEchoSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.server_streaming(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/grpc.examples.echo.Echo/ClientStreamingEcho" => { + #[allow(non_camel_case_types)] + struct ClientStreamingEchoSvc(pub Arc); + impl< + T: Echo, + > tonic::server::ClientStreamingService + for ClientStreamingEchoSvc { + type Response = super::EchoResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request>, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::client_streaming_echo(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = ClientStreamingEchoSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.client_streaming(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/grpc.examples.echo.Echo/BidirectionalStreamingEcho" => { + #[allow(non_camel_case_types)] + struct BidirectionalStreamingEchoSvc(pub Arc); + impl tonic::server::StreamingService + for BidirectionalStreamingEchoSvc { + type Response = super::EchoResponse; + type ResponseStream = T::BidirectionalStreamingEchoStream; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request>, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::bidirectional_streaming_echo(&inner, request) + .await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = BidirectionalStreamingEchoSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.streaming(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + _ => { + Box::pin(async move { + let mut response = http::Response::new( + tonic::body::Body::default(), + ); + let headers = response.headers_mut(); + headers + .insert( + tonic::Status::GRPC_STATUS, + (tonic::Code::Unimplemented as i32).into(), + ); + headers + .insert( + http::header::CONTENT_TYPE, + tonic::metadata::GRPC_CONTENT_TYPE, + ); + Ok(response) + }) + } + } + } + } + impl Clone for EchoServer { + fn clone(&self) -> Self { + let inner = self.inner.clone(); + Self { + inner, + accept_compression_encodings: self.accept_compression_encodings, + send_compression_encodings: self.send_compression_encodings, + max_decoding_message_size: self.max_decoding_message_size, + max_encoding_message_size: self.max_encoding_message_size, + } + } + } + /// Generated gRPC service name + pub const SERVICE_NAME: &str = "grpc.examples.echo.Echo"; + impl tonic::server::NamedService for EchoServer { + const NAME: &'static str = SERVICE_NAME; + } +} diff --git a/grpc/src/lib.rs b/grpc/src/lib.rs index 567925131..650219d16 100644 --- a/grpc/src/lib.rs +++ b/grpc/src/lib.rs @@ -38,3 +38,11 @@ pub mod service; pub(crate) mod attributes; pub(crate) mod byte_str; +pub(crate) mod codec; +#[cfg(test)] +pub(crate) mod echo_pb { + include!(concat!( + env!("CARGO_MANIFEST_DIR"), + "/src/generated/grpc_examples_echo.rs" + )); +} diff --git a/grpc/src/rt/hyper_wrapper.rs b/grpc/src/rt/hyper_wrapper.rs new file mode 100644 index 000000000..fc2b921de --- /dev/null +++ b/grpc/src/rt/hyper_wrapper.rs @@ -0,0 +1,165 @@ +use hyper::rt::{Executor, Timer}; +use pin_project_lite::pin_project; +use std::{ + future::Future, + io, + pin::Pin, + sync::Arc, + task::{Context, Poll}, + time::Instant, +}; +use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; + +use super::{Runtime, TcpStream}; + +/// Adapts a runtime to a hyper compatible executor. +#[derive(Clone)] +pub(crate) struct HyperCompatExec { + pub(crate) inner: Arc, +} + +impl Executor for HyperCompatExec +where + F: Future + Send + 'static, + F::Output: Send + 'static, +{ + fn execute(&self, fut: F) { + self.inner.spawn(Box::pin(async { + let _ = fut.await; + })); + } +} + +struct HyperCompatSleep { + inner: Pin>, +} + +impl Future for HyperCompatSleep { + type Output = (); + + fn poll( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + self.inner.as_mut().poll(cx) + } +} + +impl hyper::rt::Sleep for HyperCompatSleep {} + +/// Adapts a runtime to a hyper compatible timer. +pub(crate) struct HyperCompatTimer { + pub(crate) inner: Arc, +} + +impl Timer for HyperCompatTimer { + fn sleep(&self, duration: std::time::Duration) -> Pin> { + let sleep = self.inner.sleep(duration); + Box::pin(HyperCompatSleep { inner: sleep }) + } + + fn sleep_until(&self, deadline: Instant) -> Pin> { + let now = Instant::now(); + let duration = deadline.saturating_duration_since(now); + self.sleep(duration) + } +} + +// The following adapters are copied from hyper: +// https://github.com/hyperium/hyper/blob/v1.6.0/benches/support/tokiort.rs + +pin_project! { + /// A wrapper to make any `TcpStream` compatible with Hyper. It implements + /// Tokio's async IO traits. + pub(crate) struct HyperStream { + #[pin] + inner: Box, + } +} + +impl HyperStream { + /// Creates a new `HyperStream` from a type implementing `TcpStream`. + pub fn new(stream: Box) -> Self { + Self { inner: stream } + } +} + +impl AsyncRead for HyperStream { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + // Delegate the poll_read call to the inner stream. + Pin::new(&mut self.inner).poll_read(cx, buf) + } +} + +impl AsyncWrite for HyperStream { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + Pin::new(&mut self.inner).poll_write(cx, buf) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.inner).poll_flush(cx) + } + + fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.inner).poll_shutdown(cx) + } +} + +impl hyper::rt::Read for HyperStream { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + mut buf: hyper::rt::ReadBufCursor<'_>, + ) -> Poll> { + let n = unsafe { + let mut tbuf = tokio::io::ReadBuf::uninit(buf.as_mut()); + match tokio::io::AsyncRead::poll_read(self.project().inner, cx, &mut tbuf) { + Poll::Ready(Ok(())) => tbuf.filled().len(), + other => return other, + } + }; + + unsafe { + buf.advance(n); + } + Poll::Ready(Ok(())) + } +} + +impl hyper::rt::Write for HyperStream { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + AsyncWrite::poll_write(self.project().inner, cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + AsyncWrite::poll_flush(self.project().inner, cx) + } + + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + AsyncWrite::poll_shutdown(self.project().inner, cx) + } + + fn is_write_vectored(&self) -> bool { + AsyncWrite::is_write_vectored(&self.inner) + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[io::IoSlice<'_>], + ) -> Poll> { + AsyncWrite::poll_write_vectored(self.project().inner, cx, bufs) + } +} diff --git a/grpc/src/rt/mod.rs b/grpc/src/rt/mod.rs index 0cf4c3361..0569bd8b8 100644 --- a/grpc/src/rt/mod.rs +++ b/grpc/src/rt/mod.rs @@ -22,9 +22,12 @@ * */ -use std::{future::Future, pin::Pin}; +use std::{future::Future, net::SocketAddr, pin::Pin, time::Duration}; -pub mod tokio; +use ::tokio::io::{AsyncRead, AsyncWrite}; + +pub(crate) mod hyper_wrapper; +pub(crate) mod tokio; /// An abstraction over an asynchronous runtime. /// @@ -47,6 +50,14 @@ pub(super) trait Runtime: Send + Sync { /// Returns a future that completes after the specified duration. fn sleep(&self, duration: std::time::Duration) -> Pin>; + + /// Establishes a TCP connection to the given `target` address with the + /// specified `opts`. + fn tcp_stream( + &self, + target: SocketAddr, + opts: TcpOptions, + ) -> Pin, String>> + Send>>; } /// A future that resolves after a specified duration. @@ -73,3 +84,11 @@ pub(super) struct ResolverOptions { /// system's default DNS server will be used. pub(super) server_addr: Option, } + +#[derive(Default)] +pub(crate) struct TcpOptions { + pub(crate) enable_nodelay: bool, + pub(crate) keepalive: Option, +} + +pub(crate) trait TcpStream: AsyncRead + AsyncWrite + Send + Unpin {} diff --git a/grpc/src/rt/tokio/mod.rs b/grpc/src/rt/tokio/mod.rs index f33886fb8..118ee05bd 100644 --- a/grpc/src/rt/tokio/mod.rs +++ b/grpc/src/rt/tokio/mod.rs @@ -29,7 +29,11 @@ use std::{ time::Duration, }; -use tokio::task::JoinHandle; +use tokio::{ + io::{AsyncRead, AsyncWrite}, + net::TcpStream, + task::JoinHandle, +}; use super::{DnsResolver, ResolverOptions, Runtime, Sleep, TaskHandle}; @@ -92,6 +96,28 @@ impl Runtime for TokioRuntime { fn sleep(&self, duration: Duration) -> Pin> { Box::pin(tokio::time::sleep(duration)) } + + fn tcp_stream( + &self, + target: SocketAddr, + opts: super::TcpOptions, + ) -> Pin, String>> + Send>> { + Box::pin(async move { + let stream = TcpStream::connect(target) + .await + .map_err(|err| err.to_string())?; + if let Some(duration) = opts.keepalive { + let sock_ref = socket2::SockRef::from(&stream); + let mut ka = socket2::TcpKeepalive::new(); + ka = ka.with_time(duration); + sock_ref + .set_tcp_keepalive(&ka) + .map_err(|err| err.to_string())?; + } + let stream: Box = Box::new(TokioTcpStream { inner: stream }); + Ok(stream) + }) + } } impl TokioDefaultDnsResolver { @@ -103,6 +129,46 @@ impl TokioDefaultDnsResolver { } } +struct TokioTcpStream { + inner: TcpStream, +} + +impl AsyncRead for TokioTcpStream { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll> { + Pin::new(&mut self.inner).poll_read(cx, buf) + } +} + +impl AsyncWrite for TokioTcpStream { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll> { + Pin::new(&mut self.inner).poll_write(cx, buf) + } + + fn poll_flush( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + Pin::new(&mut self.inner).poll_flush(cx) + } + + fn poll_shutdown( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + Pin::new(&mut self.inner).poll_shutdown(cx) + } +} + +impl super::TcpStream for TokioTcpStream {} + #[cfg(test)] mod tests { use super::{DnsResolver, ResolverOptions, Runtime, TokioDefaultDnsResolver, TokioRuntime}; diff --git a/grpc/src/service.rs b/grpc/src/service.rs index 7b9401ed9..429293356 100644 --- a/grpc/src/service.rs +++ b/grpc/src/service.rs @@ -29,7 +29,7 @@ use tonic::{async_trait, Request as TonicRequest, Response as TonicResponse, Sta pub type Request = TonicRequest> + Send + Sync>>>; pub type Response = - TonicResponse, Status>> + Send + Sync>>>; + TonicResponse, Status>> + Send>>>; #[async_trait] pub trait Service: Send + Sync { @@ -38,3 +38,5 @@ pub trait Service: Send + Sync { // TODO: define methods that will allow serialization/deserialization. pub trait Message: Any + Send + Sync {} + +impl Message for T where T: Any + Send + Sync {} From b113136bd1b472402a44a35760b90d861460aa91 Mon Sep 17 00:00:00 2001 From: Arjan Bal Date: Tue, 15 Jul 2025 19:13:29 +0530 Subject: [PATCH 02/13] Fix udeps and msrv --- grpc/Cargo.toml | 1 - grpc/src/client/transport/tonic/mod.rs | 3 +-- grpc/src/service.rs | 13 +++++++++++-- 3 files changed, 12 insertions(+), 5 deletions(-) diff --git a/grpc/Cargo.toml b/grpc/Cargo.toml index 4544083d3..195a25dc6 100644 --- a/grpc/Cargo.toml +++ b/grpc/Cargo.toml @@ -17,7 +17,6 @@ rand = "0.9" parking_lot = "0.12.4" bytes = "1.10.1" futures = "0.3.31" -hyper-util = "0.1.14" hyper = { version = "1.6.0", features = ["client", "http2"] } pin-project-lite = "0.2.16" tokio-stream = "0.1.17" diff --git a/grpc/src/client/transport/tonic/mod.rs b/grpc/src/client/transport/tonic/mod.rs index e989f816b..780733e3d 100644 --- a/grpc/src/client/transport/tonic/mod.rs +++ b/grpc/src/client/transport/tonic/mod.rs @@ -16,7 +16,6 @@ use http::Response as HttpResponse; use http::Uri; use hyper::client::conn::http2::Builder; use hyper::client::conn::http2::SendRequest; -use std::any::Any; use std::{ error::Error, future::Future, @@ -105,7 +104,7 @@ fn convert_request(req: GrpcRequest) -> TonicRequest).downcast::(); + let downcast_result = msg.as_any().downcast::(); match downcast_result { Ok(boxed_bytes) => Some(*boxed_bytes), diff --git a/grpc/src/service.rs b/grpc/src/service.rs index 429293356..cd237162b 100644 --- a/grpc/src/service.rs +++ b/grpc/src/service.rs @@ -37,6 +37,15 @@ pub trait Service: Send + Sync { } // TODO: define methods that will allow serialization/deserialization. -pub trait Message: Any + Send + Sync {} +pub trait Message: Any + Send + Sync { + fn as_any(self: Box) -> Box; +} -impl Message for T where T: Any + Send + Sync {} +impl Message for T +where + T: Any + Send + Sync, +{ + fn as_any(self: Box) -> Box { + self + } +} From 13f162808e104fb2ef1ca516e50cb065112ce9d0 Mon Sep 17 00:00:00 2001 From: Arjan Bal Date: Wed, 23 Jul 2025 20:29:58 +0530 Subject: [PATCH 03/13] Clippy fixes --- grpc/src/client/name_resolution/registry.rs | 2 +- grpc/src/client/transport/tonic/mod.rs | 7 +++---- grpc/src/inmemory/mod.rs | 7 ++----- 3 files changed, 6 insertions(+), 10 deletions(-) diff --git a/grpc/src/client/name_resolution/registry.rs b/grpc/src/client/name_resolution/registry.rs index aeb0331c9..3cd0c3ac5 100644 --- a/grpc/src/client/name_resolution/registry.rs +++ b/grpc/src/client/name_resolution/registry.rs @@ -74,7 +74,7 @@ impl ResolverRegistry { .lock() .unwrap() .insert(scheme.to_string(), Arc::from(builder)); - return Ok(()); + Ok(()) } /// Returns the resolver builder registered for the given scheme, if any. diff --git a/grpc/src/client/transport/tonic/mod.rs b/grpc/src/client/transport/tonic/mod.rs index 83099f290..7bb6dfd4e 100644 --- a/grpc/src/client/transport/tonic/mod.rs +++ b/grpc/src/client/transport/tonic/mod.rs @@ -78,7 +78,7 @@ impl Service for TonicTransport { async fn call(&self, method: String, request: GrpcRequest) -> GrpcResponse { let mut grpc = self.grpc.clone(); if let Err(e) = grpc.ready().await { - let err = Status::unknown(format!("Service was not ready: {}", e.to_string())); + let err = Status::unknown(format!("Service was not ready: {}", e)); return create_error_response(err); }; let path = if let Ok(p) = PathAndQuery::from_maybe_shared(method) { @@ -135,11 +135,10 @@ fn convert_response(res: Result>, Status>) -> Grp let stream = response.into_inner(); let message_stream: Pin, Status>> + Send>> = Box::pin(stream.map(|msg| { - let res = msg.map(|b| { + msg.map(|b| { let msg: Box = Box::new(b); msg - }); - res + }) })); let mut new_res = TonicResponse::new(message_stream); *new_res.metadata_mut() = metadata; diff --git a/grpc/src/inmemory/mod.rs b/grpc/src/inmemory/mod.rs index 888526fce..26150dfde 100644 --- a/grpc/src/inmemory/mod.rs +++ b/grpc/src/inmemory/mod.rs @@ -85,11 +85,8 @@ impl crate::server::Listener for Arc { async fn accept(&self) -> Option { let mut recv = self.r.lock().await; let r = recv.recv().await; - if r.is_none() { - // Listener was closed. - return None; - } - r.unwrap() + // Listener may be closed. + r? } } From e69cb49f2a2f89949c22ad47ccdb4e9c1610a8b4 Mon Sep 17 00:00:00 2001 From: Arjan Bal Date: Wed, 23 Jul 2025 21:23:17 +0530 Subject: [PATCH 04/13] Address review --- grpc/src/client/subchannel.rs | 8 ++++---- grpc/src/client/transport/registry.rs | 10 ++-------- 2 files changed, 6 insertions(+), 12 deletions(-) diff --git a/grpc/src/client/subchannel.rs b/grpc/src/client/subchannel.rs index 29a527757..5f994cb7d 100644 --- a/grpc/src/client/subchannel.rs +++ b/grpc/src/client/subchannel.rs @@ -61,7 +61,7 @@ struct InternalSubchannelConnectingState { } struct InternalSubchannelReadyState { - abort_handle: Option, + abort_handle: Option>, svc: SharedService, } @@ -395,7 +395,7 @@ impl InternalSubchannel { }); let state_machine_tx = self.state_machine_event_sender.clone(); - let disconnect_task = tokio::task::spawn(async move { + let task_handle = self.runtime.spawn(Box::pin(async move { // TODO(easwars): Does it make sense for disconnected() to return an // error string containing information about why the connection // terminated? But what can we do with that error other than logging @@ -404,10 +404,10 @@ impl InternalSubchannel { eprintln!("Transport closed with error: {}", e.to_string()) }; let _ = state_machine_tx.send(SubchannelStateMachineEvent::ConnectionTerminated); - }); + })); let mut inner = self.inner.lock().unwrap(); inner.state = InternalSubchannelState::Ready(InternalSubchannelReadyState { - abort_handle: Some(disconnect_task.abort_handle()), + abort_handle: Some(task_handle), svc: svc2.clone(), }); } diff --git a/grpc/src/client/transport/registry.rs b/grpc/src/client/transport/registry.rs index e290c098e..5e4c45a8d 100644 --- a/grpc/src/client/transport/registry.rs +++ b/grpc/src/client/transport/registry.rs @@ -8,7 +8,7 @@ use std::{ /// A registry to store and retrieve transports. Transports are indexed by /// the address type they are intended to handle. -#[derive(Clone)] +#[derive(Default, Clone)] pub(crate) struct TransportRegistry { m: Arc>>>, } @@ -26,7 +26,7 @@ impl Debug for TransportRegistry { impl TransportRegistry { /// Construct an empty name resolver registry. pub(crate) fn new() -> Self { - Self { m: Arc::default() } + Self::default() } /// Add a transport into the registry. @@ -50,12 +50,6 @@ impl TransportRegistry { } } -impl Default for TransportRegistry { - fn default() -> Self { - Self::new() - } -} - /// The registry used if a local registry is not provided to a channel or if it /// does not exist in the local registry. pub static GLOBAL_TRANSPORT_REGISTRY: Lazy = Lazy::new(TransportRegistry::new); From b01eca5dabcb0d45ea6993cd9ff1c95584699bb1 Mon Sep 17 00:00:00 2001 From: Arjan Bal Date: Wed, 23 Jul 2025 21:28:11 +0530 Subject: [PATCH 05/13] Use trait down casting --- grpc/src/client/transport/tonic/mod.rs | 3 ++- grpc/src/service.rs | 13 ++----------- 2 files changed, 4 insertions(+), 12 deletions(-) diff --git a/grpc/src/client/transport/tonic/mod.rs b/grpc/src/client/transport/tonic/mod.rs index 7bb6dfd4e..447901159 100644 --- a/grpc/src/client/transport/tonic/mod.rs +++ b/grpc/src/client/transport/tonic/mod.rs @@ -16,6 +16,7 @@ use http::Response as HttpResponse; use http::Uri; use hyper::client::conn::http2::Builder; use hyper::client::conn::http2::SendRequest; +use std::any::Any; use std::{ error::Error, future::Future, @@ -104,7 +105,7 @@ fn convert_request(req: GrpcRequest) -> TonicRequest(); + let downcast_result = (msg as Box).downcast::(); match downcast_result { Ok(boxed_bytes) => Some(*boxed_bytes), diff --git a/grpc/src/service.rs b/grpc/src/service.rs index cd237162b..429293356 100644 --- a/grpc/src/service.rs +++ b/grpc/src/service.rs @@ -37,15 +37,6 @@ pub trait Service: Send + Sync { } // TODO: define methods that will allow serialization/deserialization. -pub trait Message: Any + Send + Sync { - fn as_any(self: Box) -> Box; -} +pub trait Message: Any + Send + Sync {} -impl Message for T -where - T: Any + Send + Sync, -{ - fn as_any(self: Box) -> Box { - self - } -} +impl Message for T where T: Any + Send + Sync {} From 13bb667d9227a6812c8bb958ed041bb93b216bc9 Mon Sep 17 00:00:00 2001 From: Arjan Singh Bal <46515553+arjan-bal@users.noreply.github.com> Date: Thu, 24 Jul 2025 13:31:38 +0530 Subject: [PATCH 06/13] Update grpc/src/codec.rs Co-authored-by: Doug Fawley --- grpc/src/codec.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/grpc/src/codec.rs b/grpc/src/codec.rs index dab008a85..eb9cc03e7 100644 --- a/grpc/src/codec.rs +++ b/grpc/src/codec.rs @@ -12,11 +12,8 @@ pub(crate) struct BytesCodec {} impl Codec for BytesCodec { type Encode = Bytes; - type Decode = Bytes; - type Encoder = BytesEncoder; - type Decoder = BytesDecoder; fn encoder(&mut self) -> Self::Encoder { @@ -32,7 +29,6 @@ pub struct BytesEncoder {} impl Encoder for BytesEncoder { type Item = Bytes; - type Error = Status; fn encode(&mut self, item: Self::Item, dst: &mut EncodeBuf<'_>) -> Result<(), Self::Error> { @@ -46,7 +42,6 @@ pub struct BytesDecoder {} impl Decoder for BytesDecoder { type Item = Bytes; - type Error = Status; fn decode( From e89323862063a5c20faed69d414fb1d10fa64099 Mon Sep 17 00:00:00 2001 From: Arjan Bal Date: Thu, 24 Jul 2025 13:30:02 +0530 Subject: [PATCH 07/13] Sort dependencies --- grpc/Cargo.toml | 57 ++++++++++++++++++++++++++++++------------------- 1 file changed, 35 insertions(+), 22 deletions(-) diff --git a/grpc/Cargo.toml b/grpc/Cargo.toml index 1d365e150..0fa8aebfb 100644 --- a/grpc/Cargo.toml +++ b/grpc/Cargo.toml @@ -5,12 +5,20 @@ edition = "2021" authors = ["gRPC Authors"] license = "MIT" +[package.metadata.cargo_check_external_types] +allowed_external_types = [ + "tonic::*", + "futures_core::stream::Stream", + "tokio::sync::oneshot::Sender", +] + +[features] +default = ["dns"] +dns = ["dep:hickory-resolver"] + [dependencies] bytes = "1.10.1" futures = "0.3.31" -tower = { version = "0.5.2", features = ["buffer", "limit", "util"] } -tower-service = "0.3.3" -socket2 = "0.5.10" futures-core = "0.3.31" futures-util = "0.3.31" hickory-resolver = { version = "0.25.1", optional = true } @@ -24,28 +32,33 @@ pin-project-lite = "0.2.16" rand = "0.9" serde = { version = "1.0.219", features = ["derive"] } serde_json = "1.0.140" -tokio = { version = "1.37.0", features = ["sync", "rt", "net", "time", "macros"] } +socket2 = "0.5.10" +tokio = { version = "1.37.0", features = [ + "sync", + "rt", + "net", + "time", + "macros", +] } tokio-stream = "0.1.17" -tonic = { version = "0.14.0", path = "../tonic", default-features = false, features = ["codegen", "transport"] } +tonic = { version = "0.14.0", path = "../tonic", default-features = false, features = [ + "codegen", + "transport", +] } +tower = { version = "0.5.2", features = ["buffer", "limit", "util"] } +tower-service = "0.3.3" url = "2.5.0" -[dev-dependencies] -async-stream = "0.3.6" -tonic = { version = "0.14.0", path = "../tonic", default-features = false, features = ["prost", "server", "router"] } -hickory-server = "0.25.2" -prost = "0.14" - [build-dependencies] -tonic-build = { path = "../tonic-build" } prost = "0.14" +tonic-build = { path = "../tonic-build" } -[features] -default = ["dns"] -dns = ["dep:hickory-resolver"] - -[package.metadata.cargo_check_external_types] -allowed_external_types = [ - "tonic::*", - "futures_core::stream::Stream", - "tokio::sync::oneshot::Sender", -] +[dev-dependencies] +async-stream = "0.3.6" +hickory-server = "0.25.2" +prost = "0.14" +tonic = { version = "0.14.0", path = "../tonic", default-features = false, features = [ + "prost", + "server", + "router", +] } From ae686c86321a72be82335d6d32e83c9d33455b53 Mon Sep 17 00:00:00 2001 From: Arjan Bal Date: Fri, 25 Jul 2025 01:19:36 +0530 Subject: [PATCH 08/13] Fail build if tokio spawn is used outside runtime --- grpc/Cargo.toml | 35 +++++++++------ grpc/src/client/channel.rs | 16 ++++--- .../client/load_balancing/child_manager.rs | 9 +++- grpc/src/client/load_balancing/mod.rs | 2 + grpc/src/client/load_balancing/pick_first.rs | 7 ++- grpc/src/client/subchannel.rs | 17 ++++--- grpc/src/client/transport/mod.rs | 4 ++ grpc/src/rt/mod.rs | 45 ++++++++++++++++++- 8 files changed, 101 insertions(+), 34 deletions(-) diff --git a/grpc/Cargo.toml b/grpc/Cargo.toml index 0fa8aebfb..4cb6c50e7 100644 --- a/grpc/Cargo.toml +++ b/grpc/Cargo.toml @@ -13,39 +13,46 @@ allowed_external_types = [ ] [features] -default = ["dns"] -dns = ["dep:hickory-resolver"] +default = ["dns", "_runtime-tokio"] +dns = ["dep:hickory-resolver", "_runtime-tokio"] +# The following feature is used to ensure all modules use the runtime +# abstraction instead of using tokio directly. +# Using tower/buffer enables tokio's rt feature even though it's possible to +# create Buffers with a user provided executor. +_runtime-tokio = [ + "tokio/rt", + "tokio/net", + "dep:futures", + "dep:socket2", + "dep:tower", +] [dependencies] bytes = "1.10.1" -futures = "0.3.31" +futures = { version = "0.3.31", optional = true } futures-core = "0.3.31" futures-util = "0.3.31" hickory-resolver = { version = "0.25.1", optional = true } http = "1.1.0" http-body = "1.0.1" hyper = { version = "1.6.0", features = ["client", "http2"] } -hyper-util = "0.1.14" once_cell = "1.19.0" parking_lot = "0.12.4" pin-project-lite = "0.2.16" rand = "0.9" serde = { version = "1.0.219", features = ["derive"] } serde_json = "1.0.140" -socket2 = "0.5.10" -tokio = { version = "1.37.0", features = [ - "sync", - "rt", - "net", - "time", - "macros", -] } +socket2 = { version = "0.5.10", optional = true } +tokio = { version = "1.37.0", features = ["sync", "time", "macros"] } tokio-stream = "0.1.17" tonic = { version = "0.14.0", path = "../tonic", default-features = false, features = [ "codegen", - "transport", ] } -tower = { version = "0.5.2", features = ["buffer", "limit", "util"] } +tower = { version = "0.5.2", features = [ + "limit", + "util", + "buffer", +], optional = true } tower-service = "0.3.3" url = "2.5.0" diff --git a/grpc/src/client/channel.rs b/grpc/src/client/channel.rs index 91278d8ab..edbd55131 100644 --- a/grpc/src/client/channel.rs +++ b/grpc/src/client/channel.rs @@ -37,17 +37,16 @@ use std::{ }; use tokio::sync::{mpsc, oneshot, watch, Notify}; -use tokio::task::AbortHandle; use serde_json::json; use tonic::async_trait; use url::Url; // NOTE: http::Uri requires non-empty authority portion of URI -use crate::credentials::Credentials; +use crate::attributes::Attributes; use crate::rt; use crate::service::{Request, Response, Service}; -use crate::{attributes::Attributes, rt::tokio::TokioRuntime}; use crate::{client::ConnectivityState, rt::Runtime}; +use crate::{credentials::Credentials, rt::default_runtime}; use super::service_config::ServiceConfig; use super::transport::{TransportRegistry, GLOBAL_TRANSPORT_REGISTRY}; @@ -156,7 +155,7 @@ impl Channel { inner: Arc::new(PersistentChannel::new( target, credentials, - Arc::new(rt::tokio::TokioRuntime {}), + default_runtime(), options, )), } @@ -280,7 +279,7 @@ impl ActiveChannel { let resolver_opts = name_resolution::ResolverOptions { authority, work_scheduler, - runtime: Arc::new(TokioRuntime {}), + runtime: runtime.clone(), }; let resolver = rb.build(&target, resolver_opts); @@ -373,7 +372,7 @@ impl InternalChannelController { connectivity_state: Arc>, runtime: Arc, ) -> Self { - let lb = Arc::new(GracefulSwitchBalancer::new(wqtx.clone())); + let lb = Arc::new(GracefulSwitchBalancer::new(wqtx.clone(), runtime.clone())); Self { lb, @@ -459,6 +458,7 @@ pub(super) struct GracefulSwitchBalancer { policy_builder: Mutex>>, work_scheduler: WorkQueueTx, pending: Mutex, + runtime: Arc, } impl WorkScheduler for GracefulSwitchBalancer { @@ -483,12 +483,13 @@ impl WorkScheduler for GracefulSwitchBalancer { } impl GracefulSwitchBalancer { - fn new(work_scheduler: WorkQueueTx) -> Self { + fn new(work_scheduler: WorkQueueTx, runtime: Arc) -> Self { Self { policy_builder: Mutex::default(), policy: Mutex::default(), // new(None::>), work_scheduler, pending: Mutex::default(), + runtime, } } @@ -506,6 +507,7 @@ impl GracefulSwitchBalancer { let builder = GLOBAL_LB_REGISTRY.get_policy(policy_name).unwrap(); let newpol = builder.build(LbPolicyOptions { work_scheduler: self.clone(), + runtime: self.runtime.clone(), }); *self.policy_builder.lock().unwrap() = Some(builder); *p = Some(newpol); diff --git a/grpc/src/client/load_balancing/child_manager.rs b/grpc/src/client/load_balancing/child_manager.rs index 0d4af6542..8086a842d 100644 --- a/grpc/src/client/load_balancing/child_manager.rs +++ b/grpc/src/client/load_balancing/child_manager.rs @@ -38,6 +38,7 @@ use crate::client::load_balancing::{ WeakSubchannel, WorkScheduler, }; use crate::client::name_resolution::{Address, ResolverUpdate}; +use crate::rt::Runtime; use super::{Subchannel, SubchannelState}; @@ -47,6 +48,7 @@ pub struct ChildManager { children: Vec>, update_sharder: Box>, pending_work: Arc>>, + runtime: Arc, } struct Child { @@ -81,12 +83,16 @@ pub trait ResolverUpdateSharder: Send { impl ChildManager { /// Creates a new ChildManager LB policy. shard_update is called whenever a /// resolver_update operation occurs. - pub fn new(update_sharder: Box>) -> Self { + pub fn new( + update_sharder: Box>, + runtime: Arc, + ) -> Self { Self { update_sharder, subchannel_child_map: Default::default(), children: Default::default(), pending_work: Default::default(), + runtime, } } @@ -197,6 +203,7 @@ impl LbPolicy for ChildManager }); let policy = builder.build(LbPolicyOptions { work_scheduler: work_scheduler.clone(), + runtime: self.runtime.clone(), }); let state = LbState::initial(); self.children.push(Child { diff --git a/grpc/src/client/load_balancing/mod.rs b/grpc/src/client/load_balancing/mod.rs index 16b4cafbe..fa5ad8c40 100644 --- a/grpc/src/client/load_balancing/mod.rs +++ b/grpc/src/client/load_balancing/mod.rs @@ -41,6 +41,7 @@ use tonic::{metadata::MetadataMap, Status}; use crate::{ client::channel::WorkQueueTx, + rt::Runtime, service::{Request, Response, Service}, }; @@ -64,6 +65,7 @@ pub struct LbPolicyOptions { /// A hook into the channel's work scheduler that allows the LbPolicy to /// request the ability to perform operations on the ChannelController. pub work_scheduler: Arc, + pub runtime: Arc, } /// Used to asynchronously request a call into the LbPolicy's work method if diff --git a/grpc/src/client/load_balancing/pick_first.rs b/grpc/src/client/load_balancing/pick_first.rs index ed7ae76f6..ed8781451 100644 --- a/grpc/src/client/load_balancing/pick_first.rs +++ b/grpc/src/client/load_balancing/pick_first.rs @@ -13,6 +13,7 @@ use crate::{ name_resolution::{Address, ResolverUpdate}, subchannel, ConnectivityState, }, + rt::Runtime, service::Request, }; @@ -31,6 +32,7 @@ impl LbPolicyBuilder for Builder { work_scheduler: options.work_scheduler, subchannel: None, next_addresses: Vec::default(), + runtime: options.runtime, }) } @@ -47,6 +49,7 @@ struct PickFirstPolicy { work_scheduler: Arc, subchannel: Option>, next_addresses: Vec
, + runtime: Arc, } impl LbPolicy for PickFirstPolicy { @@ -73,10 +76,10 @@ impl LbPolicy for PickFirstPolicy { self.next_addresses = addresses; let work_scheduler = self.work_scheduler.clone(); // TODO: Implement Drop that cancels this task. - tokio::task::spawn(async move { + self.runtime.spawn(Box::pin(async move { sleep(Duration::from_millis(200)).await; work_scheduler.schedule_work(); - }); + })); // TODO: return a picker that queues RPCs. Ok(()) } diff --git a/grpc/src/client/subchannel.rs b/grpc/src/client/subchannel.rs index 5f994cb7d..f7886c29d 100644 --- a/grpc/src/client/subchannel.rs +++ b/grpc/src/client/subchannel.rs @@ -24,7 +24,6 @@ use std::{ }; use tokio::{ sync::{mpsc, oneshot, watch, Notify}, - task::{AbortHandle, JoinHandle}, time::{Duration, Instant}, }; use tonic::async_trait; @@ -66,7 +65,7 @@ struct InternalSubchannelReadyState { } struct InternalSubchannelTransientFailureState { - abort_handle: Option, + task_handle: Option>, error: String, } @@ -168,7 +167,7 @@ impl Drop for InternalSubchannelState { } } Self::TransientFailure(st) => { - if let Some(ah) = &st.abort_handle { + if let Some(ah) = &st.task_handle { ah.abort(); } } @@ -189,8 +188,8 @@ pub(crate) struct InternalSubchannel { struct InnerSubchannel { state: InternalSubchannelState, watchers: Vec>, // TODO(easwars): Revisit the choice for this data structure. - backoff_task: Option>, - disconnect_task: Option>, + backoff_task: Option>, + disconnect_task: Option>, } #[async_trait] @@ -417,7 +416,7 @@ impl InternalSubchannel { let mut inner = self.inner.lock().unwrap(); inner.state = InternalSubchannelState::TransientFailure( InternalSubchannelTransientFailureState { - abort_handle: None, + task_handle: None, error: err.clone(), }, ); @@ -431,14 +430,14 @@ impl InternalSubchannel { let backoff_interval = self.backoff.backoff_until(); let state_machine_tx = self.state_machine_event_sender.clone(); - let backoff_task = tokio::task::spawn(async move { + let backoff_task = self.runtime.spawn(Box::pin(async move { tokio::time::sleep_until(backoff_interval).await; let _ = state_machine_tx.send(SubchannelStateMachineEvent::BackoffExpired); - }); + })); let mut inner = self.inner.lock().unwrap(); inner.state = InternalSubchannelState::TransientFailure(InternalSubchannelTransientFailureState { - abort_handle: Some(backoff_task.abort_handle()), + task_handle: Some(backoff_task), error: err.clone(), }); } diff --git a/grpc/src/client/transport/mod.rs b/grpc/src/client/transport/mod.rs index 8eb867b45..e598e4eca 100644 --- a/grpc/src/client/transport/mod.rs +++ b/grpc/src/client/transport/mod.rs @@ -2,6 +2,10 @@ use crate::{rt::Runtime, service::Service}; use std::{sync::Arc, time::Duration}; mod registry; + +// Using tower/buffer enables tokio's rt feature even though it's possible to +// create Buffers with a user provided executor. +#[cfg(feature = "_runtime-tokio")] mod tonic; use ::tonic::async_trait; diff --git a/grpc/src/rt/mod.rs b/grpc/src/rt/mod.rs index fd9a8e1de..b6dba9393 100644 --- a/grpc/src/rt/mod.rs +++ b/grpc/src/rt/mod.rs @@ -23,9 +23,10 @@ */ use ::tokio::io::{AsyncRead, AsyncWrite}; -use std::{future::Future, net::SocketAddr, pin::Pin, time::Duration}; +use std::{future::Future, net::SocketAddr, pin::Pin, sync::Arc, time::Duration}; pub(crate) mod hyper_wrapper; +#[cfg(feature = "_runtime-tokio")] pub(crate) mod tokio; /// An abstraction over an asynchronous runtime. @@ -91,3 +92,45 @@ pub(crate) struct TcpOptions { } pub(crate) trait TcpStream: AsyncRead + AsyncWrite + Send + Unpin {} + +/// A fake runtime to satisfy the compiler when no runtime is enabled. This will +/// +/// # Panics +/// +/// Panics if any of its functions are called. +#[derive(Default)] +pub(crate) struct NoOpRuntime {} + +impl Runtime for NoOpRuntime { + fn spawn( + &self, + task: Pin + Send + 'static>>, + ) -> Box { + unimplemented!() + } + + fn get_dns_resolver(&self, opts: ResolverOptions) -> Result, String> { + unimplemented!() + } + + fn sleep(&self, duration: std::time::Duration) -> Pin> { + unimplemented!() + } + + fn tcp_stream( + &self, + target: SocketAddr, + opts: TcpOptions, + ) -> Pin, String>> + Send>> { + unimplemented!() + } +} + +pub(crate) fn default_runtime() -> Arc { + #[cfg(feature = "_runtime-tokio")] + { + return Arc::new(tokio::TokioRuntime {}); + } + #[allow(unreachable_code)] + Arc::new(NoOpRuntime::default()) +} From 73880137655f9ba85c7475034758f0723e921f1c Mon Sep 17 00:00:00 2001 From: Arjan Bal Date: Fri, 25 Jul 2025 02:33:33 +0530 Subject: [PATCH 09/13] Use connect deadline instead of timeout --- grpc/src/client/transport/mod.rs | 3 ++- grpc/src/client/transport/tonic/mod.rs | 4 +++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/grpc/src/client/transport/mod.rs b/grpc/src/client/transport/mod.rs index e598e4eca..145d3fe3e 100644 --- a/grpc/src/client/transport/mod.rs +++ b/grpc/src/client/transport/mod.rs @@ -1,4 +1,5 @@ use crate::{rt::Runtime, service::Service}; +use std::time::Instant; use std::{sync::Arc, time::Duration}; mod registry; @@ -34,7 +35,7 @@ pub(crate) struct TransportOptions { pub rate_limit: Option<(u64, Duration)>, pub tcp_keepalive: Option, pub tcp_nodelay: bool, - pub connect_timeout: Option, + pub connect_deadline: Option, } #[async_trait] diff --git a/grpc/src/client/transport/tonic/mod.rs b/grpc/src/client/transport/tonic/mod.rs index 447901159..f51496c98 100644 --- a/grpc/src/client/transport/tonic/mod.rs +++ b/grpc/src/client/transport/tonic/mod.rs @@ -17,6 +17,7 @@ use http::Uri; use hyper::client::conn::http2::Builder; use hyper::client::conn::http2::SendRequest; use std::any::Any; +use std::time::Instant; use std::{ error::Error, future::Future, @@ -191,7 +192,8 @@ impl Transport for TransportBuilder { keepalive: opts.tcp_keepalive, }, ); - let tcp_stream = if let Some(timeout) = opts.connect_timeout { + let tcp_stream = if let Some(deadline) = opts.connect_deadline { + let timeout = deadline.saturating_duration_since(Instant::now()); tokio::select! { _ = runtime.sleep(timeout) => { return Err("timed out waiting for TCP stream to connect".to_string()) From 169bd72eaecea86c5063b63d5fd2ee8eaef6851f Mon Sep 17 00:00:00 2001 From: Arjan Bal Date: Tue, 29 Jul 2025 17:06:14 +0530 Subject: [PATCH 10/13] Review suggestions --- grpc/src/client/transport/tonic/mod.rs | 42 +++++++++---------------- grpc/src/client/transport/tonic/test.rs | 27 ++++++++-------- 2 files changed, 28 insertions(+), 41 deletions(-) diff --git a/grpc/src/client/transport/tonic/mod.rs b/grpc/src/client/transport/tonic/mod.rs index f51496c98..52414c964 100644 --- a/grpc/src/client/transport/tonic/mod.rs +++ b/grpc/src/client/transport/tonic/mod.rs @@ -78,15 +78,13 @@ impl Drop for TonicTransport { #[async_trait] impl Service for TonicTransport { async fn call(&self, method: String, request: GrpcRequest) -> GrpcResponse { - let mut grpc = self.grpc.clone(); - if let Err(e) = grpc.ready().await { - let err = Status::unknown(format!("Service was not ready: {}", e)); + let Ok(path) = PathAndQuery::from_maybe_shared(method) else { + let err = Status::internal("Failed to parse path"); return create_error_response(err); }; - let path = if let Ok(p) = PathAndQuery::from_maybe_shared(method) { - p - } else { - let err = Status::internal("Failed to parse path"); + let mut grpc = self.grpc.clone(); + if let Err(e) = grpc.ready().await { + let err = Status::unknown(format!("Service was not ready: {e}")); return create_error_response(err); }; let request = convert_request(request); @@ -102,27 +100,19 @@ fn create_error_response(status: Status) -> GrpcResponse { } fn convert_request(req: GrpcRequest) -> TonicRequest + Send>>> { - let (metadata, extensions) = (req.metadata().clone(), req.extensions().clone()); - let stream = req.into_inner(); + let (metadata, extensions, stream) = req.into_parts(); let bytes_stream = Box::pin(stream.filter_map(|msg| async { - let downcast_result = (msg as Box).downcast::(); - - match downcast_result { - Ok(boxed_bytes) => Some(*boxed_bytes), - + if let Ok(bytes) = (msg as Box).downcast::() { + Some(*bytes) + } else { // If it fails, log the error and return None to filter it out. - Err(_) => { - eprintln!("A message could not be downcast to Bytes and was skipped."); - None - } + eprintln!("A message could not be downcast to Bytes and was skipped."); + None } })); - let mut new_req = TonicRequest::new(bytes_stream as _); - *new_req.metadata_mut() = metadata; - *new_req.extensions_mut() = extensions; - new_req + TonicRequest::from_parts(metadata, extensions, bytes_stream as _) } fn convert_response(res: Result>, Status>) -> GrpcResponse { @@ -133,8 +123,7 @@ fn convert_response(res: Result>, Status>) -> Grp return TonicResponse::new(Box::pin(stream)); } }; - let (metadata, extensions) = (response.metadata().clone(), response.extensions().clone()); - let stream = response.into_inner(); + let (metadata, stream, extensions) = response.into_parts(); let message_stream: Pin, Status>> + Send>> = Box::pin(stream.map(|msg| { msg.map(|b| { @@ -142,10 +131,7 @@ fn convert_response(res: Result>, Status>) -> Grp msg }) })); - let mut new_res = TonicResponse::new(message_stream); - *new_res.metadata_mut() = metadata; - *new_res.extensions_mut() = extensions; - new_res + TonicResponse::from_parts(metadata, message_stream, extensions) } #[async_trait] diff --git a/grpc/src/client/transport/tonic/test.rs b/grpc/src/client/transport/tonic/test.rs index fbbff21b4..61cdde4f1 100644 --- a/grpc/src/client/transport/tonic/test.rs +++ b/grpc/src/client/transport/tonic/test.rs @@ -7,13 +7,13 @@ use crate::echo_pb::{EchoRequest, EchoResponse}; use crate::service::Message; use bytes::Bytes; -use prost::Message as ProstMessage; use std::any::Any; use std::{pin::Pin, sync::Arc, time::Duration}; use tokio::net::TcpListener; use tokio::sync::{mpsc, oneshot, Notify}; use tokio::time::timeout; use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; +use tonic_prost::prost::Message as ProstMessage; use tonic::async_trait; use tonic::{transport::Server, Request, Response, Status}; @@ -31,7 +31,7 @@ pub async fn tonic_transport_rpc() { let addr = listener.local_addr().unwrap(); // get the assigned address let shutdown_notify = Arc::new(Notify::new()); let shutdown_notify_copy = shutdown_notify.clone(); - println!("EchoServer listening on: {}", addr); + println!("EchoServer listening on: {addr}"); let server_handle = tokio::spawn(async move { let echo_server = EchoService {}; let svc = EchoServer::new(echo_server); @@ -71,31 +71,32 @@ pub async fn tonic_transport_rpc() { // Spawn a sender task let client_handle = tokio::spawn(async move { for i in 0..5 { - let message = format!("message {}", i); + let message = format!("message {i}"); let request = EchoRequest { message: message.clone(), }; let bytes = Bytes::from(request.encode_to_vec()); - println!("Sent request: {:?}", request); - if let Err(_) = tx.send(Box::new(bytes)).await { + println!("Sent request: {request:?}"); + if tx.send(Box::new(bytes)).await.is_err() { panic!("Receiver dropped"); } // Wait for the reply - match inbound.next().await { - Some(Ok(resp)) => { + match inbound + .next() + .await + .expect("server unexpectedly closed the stream!") + { + Ok(resp) => { let bytes = (resp as Box).downcast::().unwrap(); let echo_reponse = EchoResponse::decode(bytes).unwrap(); - println!("Got response: {:?}", echo_reponse); + println!("Got response: {echo_reponse:?}"); assert_eq!(echo_reponse.message, message); } - Some(Err(status)) => { - panic!("Error from server: {:?}", status); - } - None => { - panic!("Server closed the stream"); + Err(status) => { + panic!("Error from server: {status:?}"); } } } From e6afa5f87784d90d7023fd469b3041bc64d83bb0 Mon Sep 17 00:00:00 2001 From: Arjan Bal Date: Wed, 30 Jul 2025 19:35:41 +0530 Subject: [PATCH 11/13] Fix udeps check --- examples/Cargo.toml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/examples/Cargo.toml b/examples/Cargo.toml index b2f6d0ccf..361ec8fcd 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -263,7 +263,6 @@ tracing = ["dep:tracing", "dep:tracing-subscriber"] uds = ["dep:tokio-stream", "tokio-stream?/net", "dep:tower", "dep:hyper", "dep:hyper-util"] streaming = ["dep:tokio-stream", "dep:h2"] mock = ["dep:tokio-stream", "dep:tower", "dep:hyper-util"] -tower = ["dep:tower", "dep:http"] json-codec = ["dep:serde", "dep:serde_json", "dep:bytes"] compression = ["tonic/gzip"] tls = ["tonic/tls-ring"] @@ -273,7 +272,7 @@ types = ["dep:tonic-types"] h2c = ["dep:hyper", "dep:tower", "dep:http", "dep:hyper-util"] cancellation = ["dep:tokio-util"] -full = ["gcp", "routeguide", "reflection", "autoreload", "health", "grpc-web", "tracing", "uds", "streaming", "mock", "tower", "json-codec", "compression", "tls", "tls-rustls", "tls-client-auth", "types", "cancellation", "h2c"] +full = ["gcp", "routeguide", "reflection", "autoreload", "health", "grpc-web", "tracing", "uds", "streaming", "mock", "json-codec", "compression", "tls", "tls-rustls", "tls-client-auth", "types", "cancellation", "h2c"] default = ["full"] [dependencies] From da64ac54660ed52f32a7246eb40faff718b990a0 Mon Sep 17 00:00:00 2001 From: Arjan Bal Date: Thu, 31 Jul 2025 14:25:23 +0530 Subject: [PATCH 12/13] Address review --- grpc/Cargo.toml | 9 +++----- grpc/examples/inmemory.rs | 2 +- grpc/examples/multiaddr.rs | 2 +- grpc/src/client/load_balancing/pick_first.rs | 4 ++-- grpc/src/client/subchannel.rs | 15 ++++++------ grpc/src/client/transport/mod.rs | 24 ++++++++++---------- grpc/src/client/transport/registry.rs | 8 +++---- grpc/src/client/transport/tonic/mod.rs | 10 ++++---- grpc/src/inmemory/mod.rs | 1 + grpc/src/service.rs | 2 +- 10 files changed, 38 insertions(+), 39 deletions(-) diff --git a/grpc/Cargo.toml b/grpc/Cargo.toml index d108f90b3..cde9efcfa 100644 --- a/grpc/Cargo.toml +++ b/grpc/Cargo.toml @@ -22,16 +22,13 @@ dns = ["dep:hickory-resolver", "_runtime-tokio"] _runtime-tokio = [ "tokio/rt", "tokio/net", - "dep:futures", + "tokio/time", "dep:socket2", "dep:tower", ] [dependencies] bytes = "1.10.1" -futures = { version = "0.3.31", optional = true } -futures-core = "0.3.31" -futures-util = "0.3.31" hickory-resolver = { version = "0.25.1", optional = true } http = "1.1.0" http-body = "1.0.1" @@ -43,8 +40,8 @@ rand = "0.9" serde = { version = "1.0.219", features = ["derive"] } serde_json = "1.0.140" socket2 = { version = "0.5.10", optional = true } -tokio = { version = "1.37.0", features = ["sync", "time", "macros"] } -tokio-stream = "0.1.17" +tokio = { version = "1.37.0", features = ["sync", "macros"] } +tokio-stream = { version = "0.1.17", default-features = false } tonic = { version = "0.14.0", path = "../tonic", default-features = false, features = [ "codegen", ] } diff --git a/grpc/examples/inmemory.rs b/grpc/examples/inmemory.rs index d7f96d5a3..88b17ee01 100644 --- a/grpc/examples/inmemory.rs +++ b/grpc/examples/inmemory.rs @@ -1,8 +1,8 @@ use std::any::Any; -use futures_util::stream::StreamExt; use grpc::service::{Message, Request, Response, Service}; use grpc::{client::ChannelOptions, inmemory}; +use tokio_stream::StreamExt; use tonic::async_trait; struct Handler {} diff --git a/grpc/examples/multiaddr.rs b/grpc/examples/multiaddr.rs index 4c2466de0..c631d33c5 100644 --- a/grpc/examples/multiaddr.rs +++ b/grpc/examples/multiaddr.rs @@ -1,8 +1,8 @@ use std::any::Any; -use futures_util::StreamExt; use grpc::service::{Message, Request, Response, Service}; use grpc::{client::ChannelOptions, inmemory}; +use tokio_stream::StreamExt; use tonic::async_trait; struct Handler { diff --git a/grpc/src/client/load_balancing/pick_first.rs b/grpc/src/client/load_balancing/pick_first.rs index ed8781451..d88cd904f 100644 --- a/grpc/src/client/load_balancing/pick_first.rs +++ b/grpc/src/client/load_balancing/pick_first.rs @@ -4,7 +4,6 @@ use std::{ time::Duration, }; -use tokio::time::sleep; use tonic::metadata::MetadataMap; use crate::{ @@ -75,9 +74,10 @@ impl LbPolicy for PickFirstPolicy { self.next_addresses = addresses; let work_scheduler = self.work_scheduler.clone(); + let runtime = self.runtime.clone(); // TODO: Implement Drop that cancels this task. self.runtime.spawn(Box::pin(async move { - sleep(Duration::from_millis(200)).await; + runtime.sleep(Duration::from_millis(200)).await; work_scheduler.schedule_work(); })); // TODO: return a picker that queues RPCs. diff --git a/grpc/src/client/subchannel.rs b/grpc/src/client/subchannel.rs index f2bf0c355..c1db6fb20 100644 --- a/grpc/src/client/subchannel.rs +++ b/grpc/src/client/subchannel.rs @@ -15,6 +15,7 @@ use crate::{ service::{Request, Response, Service}, }; use core::panic; +use std::time::{Duration, Instant}; use std::{ collections::BTreeMap, error::Error, @@ -22,10 +23,7 @@ use std::{ ops::Sub, sync::{Arc, Mutex, RwLock, Weak}, }; -use tokio::{ - sync::{mpsc, oneshot, watch, Notify}, - time::{Duration, Instant}, -}; +use tokio::sync::{mpsc, oneshot, watch, Notify}; use tonic::async_trait; type SharedService = Arc; @@ -358,7 +356,7 @@ impl InternalSubchannel { let connect_task = self.runtime.spawn(Box::pin(async move { tokio::select! { - _ = tokio::time::sleep(min_connect_timeout) => { + _ = runtime.sleep(min_connect_timeout) => { let _ = state_machine_tx.send(SubchannelStateMachineEvent::ConnectionTimedOut); } result = transport.connect(address.to_string().clone(), runtime, &transport_opts) => { @@ -400,7 +398,7 @@ impl InternalSubchannel { // terminated? But what can we do with that error other than logging // it, which the transport can do as well? if let Err(e) = closed_rx.await { - eprintln!("Transport closed with error: {}", e.to_string()) + eprintln!("Transport closed with error: {e}",) }; let _ = state_machine_tx.send(SubchannelStateMachineEvent::ConnectionTerminated); })); @@ -430,8 +428,11 @@ impl InternalSubchannel { let backoff_interval = self.backoff.backoff_until(); let state_machine_tx = self.state_machine_event_sender.clone(); + let runtime = self.runtime.clone(); let backoff_task = self.runtime.spawn(Box::pin(async move { - tokio::time::sleep_until(backoff_interval).await; + runtime + .sleep(backoff_interval.saturating_duration_since(Instant::now())) + .await; let _ = state_machine_tx.send(SubchannelStateMachineEvent::BackoffExpired); })); let mut inner = self.inner.lock().unwrap(); diff --git a/grpc/src/client/transport/mod.rs b/grpc/src/client/transport/mod.rs index 145d3fe3e..411a2954b 100644 --- a/grpc/src/client/transport/mod.rs +++ b/grpc/src/client/transport/mod.rs @@ -24,18 +24,18 @@ pub(crate) struct ConnectedTransport { // can hold config relevant to a particular transport. #[derive(Default)] pub(crate) struct TransportOptions { - pub init_stream_window_size: Option, - pub init_connection_window_size: Option, - pub http2_keep_alive_interval: Option, - pub http2_keep_alive_timeout: Option, - pub http2_keep_alive_while_idle: Option, - pub http2_max_header_list_size: Option, - pub http2_adaptive_window: Option, - pub concurrency_limit: Option, - pub rate_limit: Option<(u64, Duration)>, - pub tcp_keepalive: Option, - pub tcp_nodelay: bool, - pub connect_deadline: Option, + pub(crate) init_stream_window_size: Option, + pub(crate) init_connection_window_size: Option, + pub(crate) http2_keep_alive_interval: Option, + pub(crate) http2_keep_alive_timeout: Option, + pub(crate) http2_keep_alive_while_idle: Option, + pub(crate) http2_max_header_list_size: Option, + pub(crate) http2_adaptive_window: Option, + pub(crate) concurrency_limit: Option, + pub(crate) rate_limit: Option<(u64, Duration)>, + pub(crate) tcp_keepalive: Option, + pub(crate) tcp_nodelay: bool, + pub(crate) connect_deadline: Option, } #[async_trait] diff --git a/grpc/src/client/transport/registry.rs b/grpc/src/client/transport/registry.rs index c86b613d5..cd92ca81a 100644 --- a/grpc/src/client/transport/registry.rs +++ b/grpc/src/client/transport/registry.rs @@ -10,12 +10,12 @@ use std::{ /// the address type they are intended to handle. #[derive(Default, Clone)] pub(crate) struct TransportRegistry { - m: Arc>>>, + inner: Arc>>>, } impl Debug for TransportRegistry { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let m = self.m.lock().unwrap(); + let m = self.inner.lock().unwrap(); for key in m.keys() { write!(f, "k: {key:?}")? } @@ -31,7 +31,7 @@ impl TransportRegistry { /// Add a transport into the registry. pub(crate) fn add_transport(&self, address_type: &str, transport: impl Transport + 'static) { - self.m + self.inner .lock() .unwrap() .insert(address_type.to_string(), Arc::new(transport)); @@ -39,7 +39,7 @@ impl TransportRegistry { /// Retrieve a name resolver from the registry, or None if not found. pub(crate) fn get_transport(&self, address_type: &str) -> Result, String> { - self.m + self.inner .lock() .unwrap() .get(address_type) diff --git a/grpc/src/client/transport/tonic/mod.rs b/grpc/src/client/transport/tonic/mod.rs index 52414c964..ce0d8fca9 100644 --- a/grpc/src/client/transport/tonic/mod.rs +++ b/grpc/src/client/transport/tonic/mod.rs @@ -8,8 +8,6 @@ use crate::service::Message; use crate::service::Request as GrpcRequest; use crate::service::Response as GrpcResponse; use bytes::Bytes; -use futures::stream::StreamExt; -use futures::Stream; use http::uri::PathAndQuery; use http::Request as HttpRequest; use http::Response as HttpResponse; @@ -27,6 +25,8 @@ use std::{ sync::Arc, task::{Context, Poll}, }; +use tokio_stream::Stream; +use tokio_stream::StreamExt; use tonic::Request as TonicRequest; use tonic::Response as TonicResponse; use tonic::Streaming; @@ -95,14 +95,14 @@ impl Service for TonicTransport { /// Helper function to create an error response stream. fn create_error_response(status: Status) -> GrpcResponse { - let stream = futures::stream::once(async { Err(status) }); + let stream = tokio_stream::once(Err(status)); TonicResponse::new(Box::pin(stream)) } fn convert_request(req: GrpcRequest) -> TonicRequest + Send>>> { let (metadata, extensions, stream) = req.into_parts(); - let bytes_stream = Box::pin(stream.filter_map(|msg| async { + let bytes_stream = Box::pin(stream.filter_map(|msg| { if let Ok(bytes) = (msg as Box).downcast::() { Some(*bytes) } else { @@ -119,7 +119,7 @@ fn convert_response(res: Result>, Status>) -> Grp let response = match res { Ok(s) => s, Err(e) => { - let stream = futures::stream::once(async { Err(e) }); + let stream = tokio_stream::once(Err(e)); return TonicResponse::new(Box::pin(stream)); } }; diff --git a/grpc/src/inmemory/mod.rs b/grpc/src/inmemory/mod.rs index 26150dfde..00e7af289 100644 --- a/grpc/src/inmemory/mod.rs +++ b/grpc/src/inmemory/mod.rs @@ -128,6 +128,7 @@ static INMEMORY_NETWORK_TYPE: &str = "inmemory"; pub fn reg() { GLOBAL_TRANSPORT_REGISTRY.add_transport(INMEMORY_NETWORK_TYPE, ClientTransport::new()); + global_registry().add_builder(Box::new(InMemoryResolverBuilder)); } struct InMemoryResolverBuilder; diff --git a/grpc/src/service.rs b/grpc/src/service.rs index 429293356..0f9a79901 100644 --- a/grpc/src/service.rs +++ b/grpc/src/service.rs @@ -24,7 +24,7 @@ use std::{any::Any, pin::Pin}; -use futures_core::Stream; +use tokio_stream::Stream; use tonic::{async_trait, Request as TonicRequest, Response as TonicResponse, Status}; pub type Request = TonicRequest> + Send + Sync>>>; From 4ac449fb65705cf3bcfbd454488955a1104ce0ef Mon Sep 17 00:00:00 2001 From: Arjan Bal Date: Thu, 31 Jul 2025 17:03:47 +0530 Subject: [PATCH 13/13] Reformat imports --- grpc/src/client/transport/tonic/mod.rs | 43 ++++++++----------------- grpc/src/client/transport/tonic/test.rs | 10 ++---- grpc/src/rt/hyper_wrapper.rs | 13 ++------ 3 files changed, 20 insertions(+), 46 deletions(-) diff --git a/grpc/src/client/transport/tonic/mod.rs b/grpc/src/client/transport/tonic/mod.rs index ce0d8fca9..4ac5493b5 100644 --- a/grpc/src/client/transport/tonic/mod.rs +++ b/grpc/src/client/transport/tonic/mod.rs @@ -3,10 +3,14 @@ use crate::client::transport::ConnectedTransport; use crate::client::transport::Transport; use crate::client::transport::TransportOptions; use crate::codec::BytesCodec; +use crate::rt::hyper_wrapper::{HyperCompatExec, HyperCompatTimer, HyperStream}; +use crate::rt::Runtime; +use crate::rt::TaskHandle; use crate::rt::TcpOptions; use crate::service::Message; use crate::service::Request as GrpcRequest; use crate::service::Response as GrpcResponse; +use crate::{client::name_resolution::TCP_IP_NETWORK_TYPE, service::Service}; use bytes::Bytes; use http::uri::PathAndQuery; use http::Request as HttpRequest; @@ -15,40 +19,21 @@ use http::Uri; use hyper::client::conn::http2::Builder; use hyper::client::conn::http2::SendRequest; use std::any::Any; +use std::task::{Context, Poll}; use std::time::Instant; -use std::{ - error::Error, - future::Future, - net::SocketAddr, - pin::Pin, - str::FromStr, - sync::Arc, - task::{Context, Poll}, -}; +use std::{error::Error, future::Future, net::SocketAddr, pin::Pin, str::FromStr, sync::Arc}; +use tokio::sync::oneshot; use tokio_stream::Stream; use tokio_stream::StreamExt; +use tonic::client::GrpcService; use tonic::Request as TonicRequest; use tonic::Response as TonicResponse; use tonic::Streaming; -use tower::{ - buffer::{future::ResponseFuture as BufferResponseFuture, Buffer}, - limit::{ConcurrencyLimitLayer, RateLimitLayer}, - util::BoxService, - ServiceBuilder, -}; -use tower_service::Service as TowerService; - -use crate::{ - client::name_resolution::TCP_IP_NETWORK_TYPE, - rt::{ - self, - hyper_wrapper::{HyperCompatExec, HyperCompatTimer, HyperStream}, - }, - service::Service, -}; -use tokio::sync::oneshot; -use tonic::client::GrpcService; use tonic::{async_trait, body::Body, client::Grpc, Status}; +use tower::buffer::{future::ResponseFuture as BufferResponseFuture, Buffer}; +use tower::limit::{ConcurrencyLimitLayer, RateLimitLayer}; +use tower::{util::BoxService, ServiceBuilder}; +use tower_service::Service as TowerService; #[cfg(test)] mod test; @@ -66,7 +51,7 @@ struct TransportBuilder {} struct TonicTransport { grpc: Grpc, - task_handle: Box, + task_handle: Box, } impl Drop for TonicTransport { @@ -139,7 +124,7 @@ impl Transport for TransportBuilder { async fn connect( &self, address: String, - runtime: Arc, + runtime: Arc, opts: &TransportOptions, ) -> Result { let runtime = runtime.clone(); diff --git a/grpc/src/client/transport/tonic/test.rs b/grpc/src/client/transport/tonic/test.rs index 61cdde4f1..095e72e3f 100644 --- a/grpc/src/client/transport/tonic/test.rs +++ b/grpc/src/client/transport/tonic/test.rs @@ -1,11 +1,10 @@ use crate::client::name_resolution::TCP_IP_NETWORK_TYPE; use crate::client::transport::registry::GLOBAL_TRANSPORT_REGISTRY; use crate::echo_pb::echo_server::{Echo, EchoServer}; -use crate::service::Request as GrpcRequest; - use crate::echo_pb::{EchoRequest, EchoResponse}; use crate::service::Message; - +use crate::service::Request as GrpcRequest; +use crate::{client::transport::TransportOptions, rt::tokio::TokioRuntime}; use bytes::Bytes; use std::any::Any; use std::{pin::Pin, sync::Arc, time::Duration}; @@ -13,12 +12,9 @@ use tokio::net::TcpListener; use tokio::sync::{mpsc, oneshot, Notify}; use tokio::time::timeout; use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; -use tonic_prost::prost::Message as ProstMessage; - use tonic::async_trait; use tonic::{transport::Server, Request, Response, Status}; - -use crate::{client::transport::TransportOptions, rt::tokio::TokioRuntime}; +use tonic_prost::prost::Message as ProstMessage; const DEFAULT_TEST_DURATION: Duration = Duration::from_secs(10); const DEFAULT_TEST_SHORT_DURATION: Duration = Duration::from_millis(10); diff --git a/grpc/src/rt/hyper_wrapper.rs b/grpc/src/rt/hyper_wrapper.rs index fc2b921de..6bdaad48f 100644 --- a/grpc/src/rt/hyper_wrapper.rs +++ b/grpc/src/rt/hyper_wrapper.rs @@ -1,17 +1,10 @@ +use super::{Runtime, TcpStream}; use hyper::rt::{Executor, Timer}; use pin_project_lite::pin_project; -use std::{ - future::Future, - io, - pin::Pin, - sync::Arc, - task::{Context, Poll}, - time::Instant, -}; +use std::task::{Context, Poll}; +use std::{future::Future, io, pin::Pin, sync::Arc, time::Instant}; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; -use super::{Runtime, TcpStream}; - /// Adapts a runtime to a hyper compatible executor. #[derive(Clone)] pub(crate) struct HyperCompatExec {