Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
334 changes: 92 additions & 242 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ opentelemetry_sdk = { version = "0.27", features = ["rt-tokio"] }
parking_lot = { version = "0.12.3", features = ["serde"] }
parquet = { version = "55.1", features = ["async"] }
pin-project = "1.1.10"
prost = "0.13"
prost-types = "0.13.5"
prost = "0.14"
prost-types = "0.14"
regex = "1.11.1"
regex-syntax = "0.8.5"
roaring = "0.10.6"
Expand All @@ -47,9 +47,9 @@ tantivy = "0.22.0"
thiserror = "1.0.69"
tokio = { version = "1.41", features = ["fs", "macros", "rt-multi-thread", "time", "io-util", "signal"] }
tokio-util = "0.7.12"
tonic = "0.12"
tonic-health = "0.12.3"
tower = { version = "0.4.13", features = ["discover"] }
tonic = "0.14"
tonic-health = "0.14"
tower = { version = "0.5.2", features = ["discover"] }
backon = "1.3.0"
tracing = { version = "0.1" }
tracing-bunyan-formatter = "0.3"
Expand Down
2 changes: 1 addition & 1 deletion rust/garbage_collector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl chroma_types::chroma_proto::garbage_collector_server::GarbageCollector
}

pub async fn garbage_collector_service_entrypoint() -> Result<(), Box<dyn std::error::Error>> {
let (mut health_reporter, health_service) = tonic_health::server::health_reporter();
let (health_reporter, health_service) = tonic_health::server::health_reporter();
health_reporter
.set_not_serving::<GarbageCollectorServer<GarbageCollectorService>>()
.await;
Expand Down
2 changes: 1 addition & 1 deletion rust/log-service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2243,7 +2243,7 @@ impl LogServerWrapper {
let addr = format!("[::]:{}", log_server.config.port).parse().unwrap();
println!("Log listening on {}", addr);

let (mut health_reporter, health_service) = tonic_health::server::health_reporter();
let (health_reporter, health_service) = tonic_health::server::health_reporter();
health_reporter
.set_serving::<chroma_types::chroma_proto::log_service_server::LogServiceServer<Self>>()
.await;
Expand Down
4 changes: 2 additions & 2 deletions rust/memberlist/src/client_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ use std::{
sync::Arc,
};
use thiserror::Error;
use tonic::transport::{Channel, Endpoint};
use tower::{discover::Change, ServiceBuilder};
use tonic::transport::{channel::Change, Channel, Endpoint};
use tower::ServiceBuilder;

#[derive(Debug, Clone)]
pub struct ClientAssigner<T> {
Expand Down
40 changes: 21 additions & 19 deletions rust/rust-sysdb/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,33 +74,35 @@ impl SysdbService {

tracing::info!("Sysdb service listening on {}", addr);

let (mut health_reporter, health_service) = tonic_health::server::health_reporter();
let (health_reporter, health_service) = tonic_health::server::health_reporter();

// TODO(Sanket): More sophisticated is_ready logic.
health_reporter
.set_serving::<SysDbServer<SysdbService>>()
.await;

let backends = self.backends.clone();
Server::builder()
.layer(chroma_tracing::GrpcServerTraceLayer)
.add_service(health_service)
.add_service(SysDbServer::new(self))
.serve_with_shutdown(addr, async move {
// TODO(Sanket): Drain existing requests before shutting down.
select! {
_ = sigterm.recv() => {
backends.close().await;
tracing::info!("Received SIGTERM, shutting down server");
Box::pin(
Server::builder()
.layer(chroma_tracing::GrpcServerTraceLayer)
.add_service(health_service)
.add_service(SysDbServer::new(self))
.serve_with_shutdown(addr, async move {
// TODO(Sanket): Drain existing requests before shutting down.
select! {
_ = sigterm.recv() => {
backends.close().await;
tracing::info!("Received SIGTERM, shutting down server");
}
_ = sigint.recv() => {
backends.close().await;
tracing::info!("Received SIGINT, shutting down server");
}
}
_ = sigint.recv() => {
backends.close().await;
tracing::info!("Received SIGINT, shutting down server");
}
}
})
.await
.expect("Server failed");
}),
)
.await
.expect("Server failed");
}
}

Expand Down
6 changes: 3 additions & 3 deletions rust/spanner-migrations/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ chroma-config = { workspace = true }
tokio = { version = "1.0", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
thiserror = "1.0"
google-cloud-spanner = "0.33"
google-cloud-googleapis = { version = "0.16", features = ["spanner"] }
google-cloud-gax = "0.19"
google-cloud-spanner = { workspace = true }
google-cloud-googleapis = { workspace = true }
google-cloud-gax = { workspace = true }
rust-embed = { version = "8.0", features = ["include-exclude"] }
regex = "1.0"
sha2 = "0.10"
Expand Down
1 change: 1 addition & 0 deletions rust/spanner-migrations/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ impl MigrationRunner {
statements: vec![sql.to_string()],
operation_id: String::new(),
proto_descriptors: Vec::new(),
throughput_mode: false,
};

let mut operation = self
Expand Down
4 changes: 2 additions & 2 deletions rust/tracing/src/grpc_client_trace_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{
pin::Pin,
task::{Context, Poll},
};
use tonic::{body::BoxBody, transport::Error, Code};
use tonic::{body::Body, transport::Error, Code};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Critical

[Logic] In tonic 0.14, tonic::body::Body is typically the trait (re-exported from http-body). Using it as a concrete type in Response = http::Response<Body> will likely cause a compilation error because traits cannot be used as concrete types without dyn or Box.

You likely intend to use the concrete body struct tonic::transport::Body (which replaces BoxBody usage in many cases).

Suggested change
use tonic::{body::Body, transport::Error, Code};
use tonic::{transport::{Body, Error}, Code};
Context for Agents
In `tonic` 0.14, `tonic::body::Body` is typically the **trait** (re-exported from `http-body`). Using it as a concrete type in `Response = http::Response<Body>` will likely cause a compilation error because traits cannot be used as concrete types without `dyn` or `Box`.

You likely intend to use the concrete body struct `tonic::transport::Body` (which replaces `BoxBody` usage in many cases).

```suggestion
use tonic::{transport::{Body, Error}, Code};
```

File: rust/tracing/src/grpc_client_trace_layer.rs
Line: 8

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the suggestion. Code compiles. CI passes. I'm going to disregard you.

use tower::{Layer, Service};
use tracing::{field::Empty, info_span, Instrument, Span};
use tracing_opentelemetry::OpenTelemetrySpanExt;
Expand All @@ -32,7 +32,7 @@ pub struct GrpcClientTraceService<S> {

impl<S, ReqBody> Service<http::Request<ReqBody>> for GrpcClientTraceService<S>
where
S: Service<http::Request<ReqBody>, Response = http::Response<BoxBody>, Error = Error>
S: Service<http::Request<ReqBody>, Response = http::Response<Body>, Error = Error>
+ Clone
+ Send
+ 'static,
Expand Down
3 changes: 2 additions & 1 deletion rust/types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ prost-types = { workspace = true }
roaring = { workspace = true }
thiserror = { workspace = true }
tonic = { workspace = true }
tonic-prost = "0.14"
uuid = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
Expand All @@ -37,7 +38,7 @@ chroma-error = { workspace = true, features = ["tonic", "validator"] }
itertools.workspace = true

[build-dependencies]
tonic-build = "0.10"
tonic-prost-build = "0.14"
criterion = { workspace = true }

[features]
Expand Down
4 changes: 2 additions & 2 deletions rust/types/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
proto_paths.push("idl/chromadb/proto/debug.proto");
}

tonic_build::configure()
tonic_prost_build::configure()
.emit_rerun_if_changed(true)
.compile(&proto_paths, &["idl/"])?;
.compile_protos(&proto_paths, &["idl/"])?;

// Note: Operator constants are now generated via bin/generate_operator_constants.sh
// and committed to git as rust/types/src/functions.rs.
Expand Down
2 changes: 1 addition & 1 deletion rust/worker/src/compactor/compaction_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ impl CompactionServer {
let addr = format!("[::]:{}", self.port).parse().unwrap();
tracing::info!("Compaction server listening at {addr}");

let (mut health_reporter, health_service) = tonic_health::server::health_reporter();
let (health_reporter, health_service) = tonic_health::server::health_reporter();
health_reporter
.set_not_serving::<CompactorServer<Self>>()
.await;
Expand Down
2 changes: 1 addition & 1 deletion rust/worker/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ impl WorkerServer {
let addr = format!("[::]:{}", worker.port).parse().unwrap();
println!("Worker listening on {}", addr);

let (mut health_reporter, health_service) = tonic_health::server::health_reporter();
let (health_reporter, health_service) = tonic_health::server::health_reporter();

let server = Server::builder()
.layer(chroma_tracing::GrpcServerTraceLayer)
Expand Down
Loading