-
Notifications
You must be signed in to change notification settings - Fork 1.1k
feat(grpc): Add tonic transport #2339
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
1417e9d
to
66e6c10
Compare
Hi @dfawley and @LucioFranco, could you please review this PR when you have a moment? |
fe1436e
to
957377c
Compare
@@ -229,6 +235,7 @@ impl InternalSubchannel { | |||
transport: Arc<dyn Transport>, | |||
backoff: Arc<dyn Backoff>, | |||
unregister_fn: Box<dyn FnOnce(SubchannelKey) + Send + Sync>, | |||
runtime: Arc<dyn Runtime>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious:
Given that we will have the same runtime for all the different gRPC components that require a runtime, did we consider something like a singleton that is initialized at init time, and all the components can use a getter to retrieve and use the singleton instead of the runtime being passed to every component that needs it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Different grpc channels could theoretically use different runtimes. Maybe that isn't something we need to support, but it's pretty easily attained - it just requires passing around the runtime a bit more than if it were global.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
C++ passes the event engine through channel args. In my opinion passing the runtime through a function param allows for cleaner dependency injection. It also enforces that the runtime is set during channel creation, before RPCs are made.
Having a singleton runtime will force all gRPC channels in a binary to use the same runtime. I don't know if this is a con though. We can discuss this in the team meeting.
@@ -345,30 +353,34 @@ impl InternalSubchannel { | |||
let transport = self.transport.clone(); | |||
let address = self.address().address; | |||
let state_machine_tx = self.state_machine_event_sender.clone(); | |||
let connect_task = tokio::task::spawn(async move { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have some kind of vet
equivalent to ensure that task spawning (and other features provided by the runtime) are always only used from the runtime and not from other places (like tokio or the standard library)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ultimately (pre-1.0) we want to not have any tokio runtime crates/features listed in Cargo.toml, except if you are using a tokio feature flag. That would prevent such a thing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I spent some time looking into this. I found two approaches:
- Use clippy disallowed_method, disallowed_macros, etc. to block tokio symbols like
tokio::spawn
,tokio::task::spawn
, etc. The problem with this approach is that we need to list all the types we want to block, there's not glob (*
) operator available. It's also easy to miss the clippy warnings since they don't block PR submission. - Introduce a separate crate, say
grpc-runtime-tokio
, for the default runtime implementation, and disable tokio's runtime features in the main grpc crate. If a function in thegrpc
crate tries to calltokio::spawn
, it will fail to compile as the required feature will be disabled. The concern with this approach is that we need to export the runtime trait (and related types) which are unstable.
@LucioFranco would like to get your thoughts on this.
self.m | ||
.lock() | ||
.unwrap() | ||
.insert(address_type.to_string(), Arc::new(transport)); | ||
} | ||
|
||
/// Retrieve a name resolver from the registry, or None if not found. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: this comments needs updating.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed the comment.
@@ -26,20 +25,20 @@ impl std::fmt::Debug for TransportRegistry { | |||
|
|||
impl TransportRegistry { | |||
/// Construct an empty name resolver registry. | |||
pub fn new() -> Self { | |||
pub(crate) fn new() -> Self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also implement Default
trait for this type by inheriting it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is an explicit Default
implementation below. I removed it and used the derive
macro instead.
@@ -388,7 +400,9 @@ impl InternalSubchannel { | |||
// error string containing information about why the connection | |||
// terminated? But what can we do with that error other than logging | |||
// it, which the transport can do as well? | |||
svc.disconnected().await; | |||
if let Err(e) = closed_rx.await { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should the above task spawn be on the runtime as well instead of directly using tokio
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, used the runtime here.
futures = "0.3.31" | ||
tower = { version = "0.5.2", features = ["buffer", "limit", "util"] } | ||
tower-service = "0.3.3" | ||
socket2 = "0.5.10" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please keep these sorted so the diffs are easier to read? E.g. socket2
and tower-service
are already present at the same number.
@@ -229,6 +235,7 @@ impl InternalSubchannel { | |||
transport: Arc<dyn Transport>, | |||
backoff: Arc<dyn Backoff>, | |||
unregister_fn: Box<dyn FnOnce(SubchannelKey) + Send + Sync>, | |||
runtime: Arc<dyn Runtime>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Different grpc channels could theoretically use different runtimes. Maybe that isn't something we need to support, but it's pretty easily attained - it just requires passing around the runtime a bit more than if it were global.
@@ -345,30 +353,34 @@ impl InternalSubchannel { | |||
let transport = self.transport.clone(); | |||
let address = self.address().address; | |||
let state_machine_tx = self.state_machine_event_sender.clone(); | |||
let connect_task = tokio::task::spawn(async move { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ultimately (pre-1.0) we want to not have any tokio runtime crates/features listed in Cargo.toml, except if you are using a tokio feature flag. That would prevent such a thing.
// TODO: The following options are specific to HTTP/2. We should | ||
// instead pass an `Attribute` like struct to the connect method instead which | ||
// can hold config relevant to a particular transport. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW I think it's good to eventually have a "common http2 configuration" struct that multiple h2 transports can share for configuration knobs common across any/most h2-based transport implementations. We can also have transport-specific configuration for tonic. And all of those should be in a generic transport configuration, attribute-like struct.
pub rate_limit: Option<(u64, Duration)>, | ||
pub tcp_keepalive: Option<Duration>, | ||
pub tcp_nodelay: bool, | ||
pub connect_timeout: Option<Duration>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deadline (Instant
) instead?
grpc/src/service.rs
Outdated
|
||
#[async_trait] | ||
pub trait Service: Send + Sync { | ||
async fn call(&self, method: String, request: Request) -> Response; | ||
} | ||
|
||
// TODO: define methods that will allow serialization/deserialization. | ||
pub trait Message: Any + Send + Sync {} | ||
pub trait Message: Any + Send + Sync { | ||
fn as_any(self: Box<Self>) -> Box<dyn Any>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think these can be removed again, right? Because we're using trait upcasting elsewhere now.
Co-authored-by: Doug Fawley <[email protected]>
This PR includes the following:
tonic/src/transport
, required code is copied over.Bytes
. This is a temporary workaround until tonic supports bypassing the codec and receiving bytes.