Skip to content

feat(grpc): Add tonic transport #2339

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions codegen/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,20 @@ fn main() {
false,
false,
);

// grpc
codegen(
&PathBuf::from(std::env!("CARGO_MANIFEST_DIR"))
.parent()
.unwrap()
.join("grpc"),
&["proto/echo/echo.proto"],
&["proto"],
&PathBuf::from("src/generated"),
&PathBuf::from("src/generated/echo_fds.rs"),
true,
true,
);
}

fn codegen(
Expand Down
14 changes: 9 additions & 5 deletions grpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ license = "MIT"

[dependencies]
bytes = "1.10.1"
futures = "0.3.31"
tower = { version = "0.5.2", features = ["buffer", "limit", "util"] }
tower-service = "0.3.3"
socket2 = "0.5.10"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please keep these sorted so the diffs are easier to read? E.g. socket2 and tower-service are already present at the same number.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorted them using cargo-sort and formatted it using taplo.

futures-core = "0.3.31"
futures-util = "0.3.31"
hickory-resolver = { version = "0.25.1", optional = true }
Expand All @@ -20,19 +24,20 @@ pin-project-lite = "0.2.16"
rand = "0.9"
serde = { version = "1.0.219", features = ["derive"] }
serde_json = "1.0.140"
socket2 = "0.5.10"
tokio = { version = "1.37.0", features = ["sync", "rt", "net", "time", "macros"] }
tokio-stream = "0.1.17"
tonic = { version = "0.14.0", path = "../tonic", default-features = false, features = ["codegen", "transport"] }
tower = "0.5.2"
tower-service = "0.3.3"
url = "2.5.0"

[dev-dependencies]
async-stream = "0.3.6"
tonic = { version = "0.14.0", path = "../tonic", default-features = false, features = ["prost", "server", "router"] }
hickory-server = "0.25.2"
prost = "0.13.5"
prost = "0.14"

[build-dependencies]
tonic-build = { path = "../tonic-build" }
prost = "0.14"

[features]
default = ["dns"]
Expand All @@ -43,5 +48,4 @@ allowed_external_types = [
"tonic::*",
"futures_core::stream::Stream",
"tokio::sync::oneshot::Sender",
"once_cell::sync::Lazy",
]
3 changes: 0 additions & 3 deletions grpc/examples/inmemory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,8 @@ struct Handler {}
#[derive(Debug)]
struct MyReqMessage(String);

impl Message for MyReqMessage {}

#[derive(Debug)]
struct MyResMessage(String);
impl Message for MyResMessage {}

#[async_trait]
impl Service for Handler {
Expand Down
3 changes: 0 additions & 3 deletions grpc/examples/multiaddr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,8 @@ struct Handler {
#[derive(Debug)]
struct MyReqMessage(String);

impl Message for MyReqMessage {}

#[derive(Debug)]
struct MyResMessage(String);
impl Message for MyResMessage {}

#[async_trait]
impl Service for Handler {
Expand Down
43 changes: 43 additions & 0 deletions grpc/proto/echo/echo.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

syntax = "proto3";

package grpc.examples.echo;

// EchoRequest is the request for echo.
message EchoRequest {
string message = 1;
}

// EchoResponse is the response for echo.
message EchoResponse {
string message = 1;
}

// Echo is the echo service.
service Echo {
// UnaryEcho is unary echo.
rpc UnaryEcho(EchoRequest) returns (EchoResponse) {}
// ServerStreamingEcho is server side streaming.
rpc ServerStreamingEcho(EchoRequest) returns (stream EchoResponse) {}
// ClientStreamingEcho is client side streaming.
rpc ClientStreamingEcho(stream EchoRequest) returns (EchoResponse) {}
// BidirectionalStreamingEcho is bidi streaming.
rpc BidirectionalStreamingEcho(stream EchoRequest) returns (stream EchoResponse) {}
}
5 changes: 5 additions & 0 deletions grpc/src/client/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ impl ActiveChannel {
tx.clone(),
picker.clone(),
connectivity_state.clone(),
runtime.clone(),
);

let resolver_helper = Box::new(tx.clone());
Expand Down Expand Up @@ -360,6 +361,7 @@ pub(crate) struct InternalChannelController {
wqtx: WorkQueueTx,
picker: Arc<Watcher<Arc<dyn Picker>>>,
connectivity_state: Arc<Watcher<ConnectivityState>>,
runtime: Arc<dyn Runtime>,
}

impl InternalChannelController {
Expand All @@ -369,6 +371,7 @@ impl InternalChannelController {
wqtx: WorkQueueTx,
picker: Arc<Watcher<Arc<dyn Picker>>>,
connectivity_state: Arc<Watcher<ConnectivityState>>,
runtime: Arc<dyn Runtime>,
) -> Self {
let lb = Arc::new(GracefulSwitchBalancer::new(wqtx.clone()));

Expand All @@ -380,6 +383,7 @@ impl InternalChannelController {
wqtx,
picker,
connectivity_state,
runtime,
}
}

Expand Down Expand Up @@ -429,6 +433,7 @@ impl load_balancing::ChannelController for InternalChannelController {
Box::new(move |k: SubchannelKey| {
scp.unregister_subchannel(&k);
}),
self.runtime.clone(),
);
let _ = self.subchannel_pool.register_subchannel(&key, isc.clone());
self.new_esc_for_isc(isc)
Expand Down
3 changes: 1 addition & 2 deletions grpc/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@ pub mod channel;
pub(crate) mod load_balancing;
pub(crate) mod name_resolution;
pub mod service_config;
pub mod transport;

mod subchannel;
pub(crate) mod transport;
pub use channel::Channel;
pub use channel::ChannelOptions;

Expand Down
8 changes: 8 additions & 0 deletions grpc/src/client/name_resolution/dns/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,14 @@ impl rt::Runtime for FakeRuntime {
fn sleep(&self, duration: std::time::Duration) -> Pin<Box<dyn rt::Sleep>> {
self.inner.sleep(duration)
}

fn tcp_stream(
&self,
target: std::net::SocketAddr,
opts: rt::TcpOptions,
) -> Pin<Box<dyn Future<Output = Result<Box<dyn rt::TcpStream>, String>> + Send>> {
self.inner.tcp_stream(target, opts)
}
}

#[tokio::test]
Expand Down
50 changes: 32 additions & 18 deletions grpc/src/client/subchannel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@ use super::{
channel::{InternalChannelController, WorkQueueTx},
load_balancing::{self, ExternalSubchannel, Picker, Subchannel, SubchannelState},
name_resolution::Address,
transport::{self, ConnectedTransport, Transport, TransportRegistry},
transport::{self, Transport, TransportRegistry},
ConnectivityState,
};
use crate::{
client::{channel::WorkQueueItem, subchannel},
client::{
channel::WorkQueueItem,
subchannel,
transport::{ConnectedTransport, TransportOptions},
},
rt::{Runtime, TaskHandle},
service::{Request, Response, Service},
};
use core::panic;
Expand All @@ -18,13 +23,13 @@ use std::{
sync::{Arc, Mutex, RwLock, Weak},
};
use tokio::{
sync::{mpsc, watch, Notify},
sync::{mpsc, oneshot, watch, Notify},
task::{AbortHandle, JoinHandle},
time::{Duration, Instant},
};
use tonic::async_trait;

type SharedService = Arc<dyn ConnectedTransport>;
type SharedService = Arc<dyn Service>;

pub trait Backoff: Send + Sync {
fn backoff_until(&self) -> Instant;
Expand Down Expand Up @@ -52,7 +57,7 @@ enum InternalSubchannelState {
}

struct InternalSubchannelConnectingState {
abort_handle: Option<AbortHandle>,
abort_handle: Option<Box<dyn TaskHandle>>,
}

struct InternalSubchannelReadyState {
Expand Down Expand Up @@ -178,6 +183,7 @@ pub(crate) struct InternalSubchannel {
unregister_fn: Option<Box<dyn FnOnce(SubchannelKey) + Send + Sync>>,
state_machine_event_sender: mpsc::UnboundedSender<SubchannelStateMachineEvent>,
inner: Mutex<InnerSubchannel>,
runtime: Arc<dyn Runtime>,
}

struct InnerSubchannel {
Expand All @@ -204,7 +210,7 @@ impl Service for InternalSubchannel {

enum SubchannelStateMachineEvent {
ConnectionRequested,
ConnectionSucceeded(SharedService),
ConnectionSucceeded(SharedService, oneshot::Receiver<Result<(), String>>),
ConnectionTimedOut,
ConnectionFailed(String),
ConnectionTerminated,
Expand All @@ -214,7 +220,7 @@ impl Debug for SubchannelStateMachineEvent {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::ConnectionRequested => write!(f, "ConnectionRequested"),
Self::ConnectionSucceeded(_) => write!(f, "ConnectionSucceeded"),
Self::ConnectionSucceeded(_, _) => write!(f, "ConnectionSucceeded"),
Self::ConnectionTimedOut => write!(f, "ConnectionTimedOut"),
Self::ConnectionFailed(_) => write!(f, "ConnectionFailed"),
Self::ConnectionTerminated => write!(f, "ConnectionTerminated"),
Expand All @@ -229,6 +235,7 @@ impl InternalSubchannel {
transport: Arc<dyn Transport>,
backoff: Arc<dyn Backoff>,
unregister_fn: Box<dyn FnOnce(SubchannelKey) + Send + Sync>,
runtime: Arc<dyn Runtime>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious:

Given that we will have the same runtime for all the different gRPC components that require a runtime, did we consider something like a singleton that is initialized at init time, and all the components can use a getter to retrieve and use the singleton instead of the runtime being passed to every component that needs it?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Different grpc channels could theoretically use different runtimes. Maybe that isn't something we need to support, but it's pretty easily attained - it just requires passing around the runtime a bit more than if it were global.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

C++ passes the event engine through channel args. In my opinion passing the runtime through a function param allows for cleaner dependency injection. It also enforces that the runtime is set during channel creation, before RPCs are made.

Having a singleton runtime will force all gRPC channels in a binary to use the same runtime. I don't know if this is a con though. We can discuss this in the team meeting.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fine, thoug all of these Arc<dyn ...> should have new type wrappers to clean this up. I think passing a runtime handle around is totally fine as long as its cheap to clone. We likely do not want users to have to shuffle a runtime around though.

) -> Arc<InternalSubchannel> {
println!("creating new internal subchannel for: {:?}", &key);
let (tx, mut rx) = mpsc::unbounded_channel::<SubchannelStateMachineEvent>();
Expand All @@ -244,23 +251,24 @@ impl InternalSubchannel {
backoff_task: None,
disconnect_task: None,
}),
runtime: runtime.clone(),
});

// This long running task implements the subchannel state machine. When
// the subchannel is dropped, the channel from which this task reads is
// closed, and therefore this task exits because rx.recv() returns None
// in that case.
let arc_to_self = Arc::clone(&isc);
tokio::task::spawn(async move {
runtime.spawn(Box::pin(async move {
println!("starting subchannel state machine for: {:?}", &key);
while let Some(m) = rx.recv().await {
println!("subchannel {:?} received event {:?}", &key, &m);
match m {
SubchannelStateMachineEvent::ConnectionRequested => {
arc_to_self.move_to_connecting();
}
SubchannelStateMachineEvent::ConnectionSucceeded(svc) => {
arc_to_self.move_to_ready(svc);
SubchannelStateMachineEvent::ConnectionSucceeded(svc, rx) => {
arc_to_self.move_to_ready(svc, rx);
}
SubchannelStateMachineEvent::ConnectionTimedOut => {
arc_to_self.move_to_transient_failure("connect timeout expired".into());
Expand All @@ -277,7 +285,7 @@ impl InternalSubchannel {
}
}
println!("exiting work queue task in subchannel");
});
}));
isc
}

Expand Down Expand Up @@ -345,30 +353,34 @@ impl InternalSubchannel {
let transport = self.transport.clone();
let address = self.address().address;
let state_machine_tx = self.state_machine_event_sender.clone();
let connect_task = tokio::task::spawn(async move {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have some kind of vet equivalent to ensure that task spawning (and other features provided by the runtime) are always only used from the runtime and not from other places (like tokio or the standard library)?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ultimately (pre-1.0) we want to not have any tokio runtime crates/features listed in Cargo.toml, except if you are using a tokio feature flag. That would prevent such a thing.

Copy link
Collaborator Author

@arjan-bal arjan-bal Jul 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I spent some time looking into this. I found two approaches:

  1. Use clippy disallowed_method, disallowed_macros, etc. to block tokio symbols like tokio::spawn, tokio::task::spawn, etc. The problem with this approach is that we need to list all the types we want to block, there's not glob (*) operator available. It's also easy to miss the clippy warnings since they don't block PR submission.
  2. Introduce a separate crate, say grpc-runtime-tokio, for the default runtime implementation, and disable tokio's runtime features in the main grpc crate. If a function in the grpc crate tries to call tokio::spawn, it will fail to compile as the required feature will be disabled. The concern with this approach is that we need to export the runtime trait (and related types) which are unstable.

@LucioFranco would like to get your thoughts on this.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cargo features can be enabled even if a (transitive) dependency enabled the feature. I wasn't seein any compilation failures even after removing the tokio:rt feature from the Cargo.toml. I tracked down the depdency to tower's buffer feature:

cargo tree -i tokio -e features --edges=normal -p grpc --no-default-features
tokio v1.46.1
├── tokio feature "bytes"
│   └── tokio feature "io-util"
│       └── h2 v0.4.11
│           └── h2 feature "default"
│               └── hyper v1.6.0
│                   ├── hyper feature "client"
│                   │   └── grpc v0.9.0-alpha.1 (/usr/local/google/home/arjansbal/Development/tonic/grpc-tonic-transport-1/grpc)
│                   ├── hyper feature "default"
│                   │   └── grpc v0.9.0-alpha.1 (/usr/local/google/home/arjansbal/Development/tonic/grpc-tonic-transport-1/grpc)
│                   └── hyper feature "http2"
│                       └── grpc v0.9.0-alpha.1 (/usr/local/google/home/arjansbal/Development/tonic/grpc-tonic-transport-1/grpc)
├── tokio feature "default"
│   ├── grpc v0.9.0-alpha.1 (/usr/local/google/home/arjansbal/Development/tonic/grpc-tonic-transport-1/grpc)
│   ├── h2 v0.4.11 (*)
│   ├── hyper v1.6.0 (*)
│   ├── tokio-stream v0.1.17
│   │   └── tonic v0.14.0 (/usr/local/google/home/arjansbal/Development/tonic/grpc-tonic-transport-1/tonic)
│   │       └── tonic feature "codegen"
│   │           └── grpc v0.9.0-alpha.1 (/usr/local/google/home/arjansbal/Development/tonic/grpc-tonic-transport-1/grpc)
│   │   ├── tokio-stream feature "default"
│   │   │   └── grpc v0.9.0-alpha.1 (/usr/local/google/home/arjansbal/Development/tonic/grpc-tonic-transport-1/grpc)
│   │   └── tokio-stream feature "time"
│   │       └── tokio-stream feature "default" (*)
│   ├── tokio-util v0.7.15
│   │   └── tower v0.5.2
│   │       ├── tower feature "__common"
│   │       │   ├── tower feature "buffer"
│   │       │   │   └── grpc v0.9.0-alpha.1 (/usr/local/google/home/arjansbal/Development/tonic/grpc-tonic-transport-1/grpc)
│   │       │   ├── tower feature "limit"
│   │       │   │   └── grpc v0.9.0-alpha.1 (/usr/local/google/home/arjansbal/Development/tonic/grpc-tonic-transport-1/grpc)
│   │       │   └── tower feature "util"
│   │       │       └── grpc v0.9.0-alpha.1 (/usr/local/google/home/arjansbal/Development/tonic/grpc-tonic-transport-1/grpc)
│   │       ├── tower feature "buffer" (*)
│   │       ├── tower feature "default"
│   │       │   └── grpc v0.9.0-alpha.1 (/usr/local/google/home/arjansbal/Development/tonic/grpc-tonic-transport-1/grpc)
│   │       ├── tower feature "futures-core"
│   │       │   └── tower feature "__common" (*)
│   │       ├── tower feature "futures-util"
│   │       │   └── tower feature "util" (*)
│   │       ├── tower feature "limit" (*)
│   │       ├── tower feature "pin-project-lite"
│   │       │   ├── tower feature "__common" (*)
│   │       │   └── tower feature "util" (*)
│   │       ├── tower feature "sync_wrapper"
│   │       │   └── tower feature "util" (*)
│   │       ├── tower feature "tokio"
│   │       │   ├── tower feature "buffer" (*)
│   │       │   └── tower feature "limit" (*)
│   │       ├── tower feature "tokio-util"
│   │       │   ├── tower feature "buffer" (*)
│   │       │   └── tower feature "limit" (*)
│   │       ├── tower feature "tracing"
│   │       │   ├── tower feature "buffer" (*)
│   │       │   └── tower feature "limit" (*)
│   │       └── tower feature "util" (*)
│   │   ├── tokio-util feature "codec"
│   │   │   └── h2 v0.4.11 (*)
│   │   ├── tokio-util feature "default"
│   │   │   └── h2 v0.4.11 (*)
│   │   └── tokio-util feature "io"
│   │       └── h2 v0.4.11 (*)
│   └── tower v0.5.2 (*)
├── tokio feature "io-util" (*)
├── tokio feature "rt"
│   └── tower feature "buffer" (*)
├── tokio feature "sync"
│   ├── grpc v0.9.0-alpha.1 (/usr/local/google/home/arjansbal/Development/tonic/grpc-tonic-transport-1/grpc)
│   ├── hyper v1.6.0 (*)
│   ├── tokio-stream v0.1.17 (*)
│   ├── tokio-util v0.7.15 (*)
│   └── tower v0.5.2 (*)
│   ├── tower feature "buffer" (*)
│   └── tower feature "limit" (*)
└── tokio feature "time"
    └── grpc v0.9.0-alpha.1 (/usr/local/google/home/arjansbal/Development/tonic/grpc-tonic-transport-1/grpc)
    ├── tokio-stream feature "time" (*)
    └── tower feature "limit" (*)

Buffer has a constructor that uses tokio as the default executor. We're not using this constructor though.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a default private feature for the tokio runtime that enables the tokio/rt feature flag. Due to this, tokio::spawn should not be usable outside the grpc::rt::tokio module. If tokio::spawn is used outside this module, the build will fail with default feature flags disabled, failing CI.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We discussed this yesterday, for now this is fine, we can rely on tokio initially until we make some more overall progress.

// TODO: All these options to be configured by users.
let transport_opts = TransportOptions::default();
let runtime = self.runtime.clone();

let connect_task = self.runtime.spawn(Box::pin(async move {
tokio::select! {
_ = tokio::time::sleep(min_connect_timeout) => {
let _ = state_machine_tx.send(SubchannelStateMachineEvent::ConnectionTimedOut);
}
result = transport.connect(address.to_string().clone()) => {
result = transport.connect(address.to_string().clone(), runtime, &transport_opts) => {
match result {
Ok(s) => {
let _ = state_machine_tx.send(SubchannelStateMachineEvent::ConnectionSucceeded(Arc::from(s)));
let _ = state_machine_tx.send(SubchannelStateMachineEvent::ConnectionSucceeded(Arc::from(s.service), s.disconnection_listener));
}
Err(e) => {
let _ = state_machine_tx.send(SubchannelStateMachineEvent::ConnectionFailed(e));
}
}
},
}
});
}));
let mut inner = self.inner.lock().unwrap();
inner.state = InternalSubchannelState::Connecting(InternalSubchannelConnectingState {
abort_handle: Some(connect_task.abort_handle()),
abort_handle: Some(connect_task),
});
}

fn move_to_ready(&self, svc: SharedService) {
fn move_to_ready(&self, svc: SharedService, closed_rx: oneshot::Receiver<Result<(), String>>) {
let svc2 = svc.clone();
{
let mut inner = self.inner.lock().unwrap();
Expand All @@ -388,7 +400,9 @@ impl InternalSubchannel {
// error string containing information about why the connection
// terminated? But what can we do with that error other than logging
// it, which the transport can do as well?
svc.disconnected().await;
if let Err(e) = closed_rx.await {
eprintln!("Transport closed with error: {}", e.to_string())
};
let _ = state_machine_tx.send(SubchannelStateMachineEvent::ConnectionTerminated);
});
let mut inner = self.inner.lock().unwrap();
Expand Down
Loading
Loading