Skip to content

Commit 6e10eb2

Browse files
authored
Add pipeline uninstall mechanism (#229)
In order to address the issues raised in #225, the following changes are proposed: 1) Fix the current bug where futures are being dropped instead of properly awaited by updating: ```diff - let spawn = |future| tokio::task::spawn_blocking(|| future); + let spawn = |future| tokio::task::spawn_blocking(|| futures::executor::block_on(future)); ``` 2) Because `spawn_blocking` will now block the program from exiting until the future completes, add a drop guard to pipeline install methods which uninstalls the global tracer provider (and in the otlp case in the future also the meter provider) so the exporters properly shut down, causing the future to resolve. Alternatives considered here could be: a) let users unset the global(s) themselves (but they would likely forget, causing the same blocking behavior) b) return a method that could be called instead of a drop guard (similar problems) c) Current impl uses custom uninstall objects per exporter to make the struct `opentelemetry_jaeger::Uninstall` rather than a more generic `global::TracerProviderGuard` or `global::UninstallPipeline`, alternative name suggestions could be helpful if they improve clarity. c) Return a pipeline object that contains a `Tracer`, and could be the drop guard itself, maybe cleaner API but less explicit, possibly a better choice. 3) Because `Tracer`s currently retain strong references to their `TracerProvider`, and shutdown is called on the inner's drop, it is still never called. To fix this, update the `Tracer` to instead contain a `Weak<TracerProviderInner>` reference, which can be upgraded when needed. This does force other logic like span creation to consider the case where the `TracerProvider` has been dropped on shutdown, but making this case explicit seems beneficial in most cases. 4) Additional clarity/consistency tweaks: * Rename `global::set_provider` to `global::set_tracer_provider` * Rename `global::trace_provider` to `global::tracer_provider` * Rename `ProviderInner` to `TracerProviderInner`
1 parent 1689c73 commit 6e10eb2

File tree

31 files changed

+203
-143
lines changed

31 files changed

+203
-143
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ and metrics from your application. You can analyze them using [Prometheus], [Jae
2020
use opentelemetry::{api::TracerGenerics, global, sdk};
2121

2222
fn main() {
23-
global::set_provider(sdk::Provider::default());
23+
let _guard = global::set_tracer_provider(sdk::Provider::default());
2424

2525
global::tracer("component-a").in_span("foo", |_context| {
2626
global::tracer("component-b").in_span("bar", |_context| {

examples/actix-http/src/main.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use opentelemetry::api::{Key, TraceContextExt, Tracer};
66
use opentelemetry::{global, sdk};
77
use std::error::Error;
88

9-
fn init_tracer() -> Result<sdk::Tracer, Box<dyn Error>> {
9+
fn init_tracer() -> Result<(sdk::Tracer, opentelemetry_jaeger::Uninstall), Box<dyn Error>> {
1010
opentelemetry_jaeger::new_pipeline()
1111
.with_collector_endpoint("http://127.0.0.1:14268/api/traces")
1212
.with_service_name("trace-http-demo")
@@ -25,7 +25,7 @@ async fn index() -> &'static str {
2525
async fn main() -> std::io::Result<()> {
2626
std::env::set_var("RUST_LOG", "debug");
2727
env_logger::init();
28-
let tracer = init_tracer().expect("Failed to initialise tracer.");
28+
let (tracer, _uninstall) = init_tracer().expect("Failed to initialise tracer.");
2929

3030
HttpServer::new(move || {
3131
let tracer = tracer.clone();

examples/actix-udp/src/main.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use opentelemetry::api::{Key, TraceContextExt, Tracer};
66
use opentelemetry::{global, sdk};
77
use std::error::Error;
88

9-
fn init_tracer() -> Result<sdk::Tracer, Box<dyn Error>> {
9+
fn init_tracer() -> Result<(sdk::Tracer, opentelemetry_jaeger::Uninstall), Box<dyn Error>> {
1010
opentelemetry_jaeger::new_pipeline()
1111
.with_agent_endpoint("localhost:6831")
1212
.with_service_name("trace-udp-demo")
@@ -25,7 +25,7 @@ async fn index() -> &'static str {
2525
async fn main() -> std::io::Result<()> {
2626
std::env::set_var("RUST_LOG", "debug");
2727
env_logger::init();
28-
init_tracer().expect("Failed to initialise tracer.");
28+
let _uninstall = init_tracer().expect("Failed to initialise tracer.");
2929

3030
HttpServer::new(|| {
3131
App::new()

examples/async/src/main.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,15 +50,15 @@ async fn run(addr: &SocketAddr) -> io::Result<usize> {
5050
write(&mut stream).with_context(cx).await
5151
}
5252

53-
fn init_tracer() -> Result<sdk::Tracer, Box<dyn Error>> {
53+
fn init_tracer() -> Result<(sdk::Tracer, opentelemetry_jaeger::Uninstall), Box<dyn Error>> {
5454
opentelemetry_jaeger::new_pipeline()
5555
.with_service_name("trace-demo")
5656
.install()
5757
}
5858

5959
#[tokio::main]
6060
pub async fn main() -> Result<(), Box<dyn Error>> {
61-
let tracer = init_tracer()?;
61+
let (tracer, _uninstall) = init_tracer()?;
6262
let addr = "127.0.0.1:6142".parse()?;
6363
let addr2 = "127.0.0.1:6143".parse()?;
6464
let span = tracer.start("root");

examples/aws-xray/src/client.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@ use opentelemetry::api::{Context, TraceContextExt, Tracer};
33
use opentelemetry::{api, exporter::trace::stdout, global, sdk};
44
use opentelemetry_contrib::{XrayIdGenerator, XrayTraceContextPropagator};
55

6-
fn init_tracer() {
6+
fn init_tracer() -> (sdk::Tracer, stdout::Uninstall) {
7+
global::set_text_map_propagator(XrayTraceContextPropagator::new());
8+
79
// Install stdout exporter pipeline to be able to retrieve the collected spans.
810
// For the demonstration, use `Sampler::AlwaysOn` sampler to sample all traces. In a production
911
// application, use `Sampler::ParentBased` or `Sampler::TraceIdRatioBased` with a desired ratio.
@@ -13,14 +15,12 @@ fn init_tracer() {
1315
id_generator: Box::new(XrayIdGenerator::default()),
1416
..Default::default()
1517
})
16-
.install();
17-
18-
global::set_text_map_propagator(XrayTraceContextPropagator::new());
18+
.install()
1919
}
2020

2121
#[tokio::main]
2222
async fn main() -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync>> {
23-
init_tracer();
23+
let (_tracer, _guard) = init_tracer();
2424

2525
let client = Client::new();
2626
let span = global::tracer("example/client").start("say hello");

examples/aws-xray/src/server.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@ async fn handle(req: Request<Body>) -> Result<Response<Body>, Infallible> {
2727
))
2828
}
2929

30-
fn init_tracer() {
30+
fn init_tracer() -> (sdk::Tracer, stdout::Uninstall) {
31+
global::set_text_map_propagator(XrayTraceContextPropagator::new());
32+
3133
// Install stdout exporter pipeline to be able to retrieve the collected spans.
3234
// For the demonstration, use `Sampler::AlwaysOn` sampler to sample all traces. In a production
3335
// application, use `Sampler::ParentBased` or `Sampler::TraceIdRatioBased` with a desired ratio.
@@ -37,14 +39,12 @@ fn init_tracer() {
3739
id_generator: Box::new(XrayIdGenerator::default()),
3840
..Default::default()
3941
})
40-
.install();
41-
42-
global::set_text_map_propagator(XrayTraceContextPropagator::new());
42+
.install()
4343
}
4444

4545
#[tokio::main]
4646
async fn main() {
47-
init_tracer();
47+
let _guard = init_tracer();
4848
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
4949

5050
let make_svc = make_service_fn(|_conn| async { Ok::<_, Infallible>(service_fn(handle)) });

examples/basic-otlp/src/main.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,10 @@ use opentelemetry::api::{BaggageExt, Context, Key, KeyValue, TraceContextExt, Tr
44
use opentelemetry::exporter;
55
use opentelemetry::sdk::metrics::PushController;
66
use opentelemetry::{global, sdk};
7+
use std::error::Error;
78
use std::time::Duration;
89

9-
fn init_tracer() -> Result<sdk::Tracer, Box<dyn std::error::Error>> {
10+
fn init_tracer() -> Result<(sdk::Tracer, opentelemetry_otlp::Uninstall), Box<dyn Error>> {
1011
opentelemetry_otlp::new_pipeline().install()
1112
}
1213

@@ -41,8 +42,8 @@ lazy_static::lazy_static! {
4142
}
4243

4344
#[tokio::main]
44-
async fn main() -> Result<(), Box<dyn std::error::Error>> {
45-
let _ = init_tracer()?;
45+
async fn main() -> Result<(), Box<dyn Error>> {
46+
let _guard = init_tracer()?;
4647
let _started = init_meter()?;
4748

4849
let tracer = global::tracer("ex.com/basic");

examples/basic/src/main.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use opentelemetry::{global, sdk};
77
use std::error::Error;
88
use std::time::Duration;
99

10-
fn init_tracer() -> Result<sdk::Tracer, Box<dyn Error>> {
10+
fn init_tracer() -> Result<(sdk::Tracer, opentelemetry_jaeger::Uninstall), Box<dyn Error>> {
1111
opentelemetry_jaeger::new_pipeline()
1212
.with_service_name("trace-demo")
1313
.with_tags(vec![
@@ -49,7 +49,7 @@ lazy_static::lazy_static! {
4949

5050
#[tokio::main]
5151
async fn main() -> Result<(), Box<dyn Error>> {
52-
let _ = init_tracer()?;
52+
let _uninstall = init_tracer()?;
5353
let _started = init_meter()?;
5454

5555
let tracer = global::tracer("ex.com/basic");

examples/datadog/src/main.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
use opentelemetry::api::{Key, Span, TraceContextExt, Tracer};
22
use opentelemetry::global;
3+
use opentelemetry_contrib::datadog::ApiVersion;
34
use std::thread;
45
use std::time::Duration;
5-
use opentelemetry_contrib::datadog::ApiVersion;
66

77
fn bar() {
88
let tracer = global::tracer("component-bar");
@@ -14,7 +14,7 @@ fn bar() {
1414
}
1515

1616
fn main() -> Result<(), Box<dyn std::error::Error>> {
17-
let tracer = opentelemetry_contrib::datadog::new_pipeline()
17+
let (tracer, _uninstall) = opentelemetry_contrib::datadog::new_pipeline()
1818
.with_service_name("trace-demo")
1919
.with_version(ApiVersion::Version05)
2020
.install()?;

examples/grpc/src/client.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,15 @@ pub mod hello_world {
1010
tonic::include_proto!("helloworld");
1111
}
1212

13-
fn tracing_init() -> Result<sdk::Tracer, Box<dyn Error>> {
13+
fn tracing_init() -> Result<(sdk::Tracer, opentelemetry_jaeger::Uninstall), Box<dyn Error>> {
1414
opentelemetry_jaeger::new_pipeline()
1515
.with_service_name("grpc-client")
1616
.install()
1717
}
1818

1919
#[tokio::main]
2020
async fn main() -> Result<(), Box<dyn std::error::Error>> {
21-
let tracer = tracing_init()?;
21+
let (tracer, _uninstall) = tracing_init()?;
2222
let mut client = GreeterClient::connect("http://[::1]:50051").await?;
2323
let propagator = TraceContextPropagator::new();
2424
let span = tracer.start("client-request");

examples/grpc/src/server.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,15 +34,15 @@ impl Greeter for MyGreeter {
3434
}
3535
}
3636

37-
fn tracing_init() -> Result<sdk::Tracer, Box<dyn Error>> {
37+
fn tracing_init() -> Result<(sdk::Tracer, opentelemetry_jaeger::Uninstall), Box<dyn Error>> {
3838
opentelemetry_jaeger::new_pipeline()
3939
.with_service_name("grpc-server")
4040
.install()
4141
}
4242

4343
#[tokio::main]
4444
async fn main() -> Result<(), Box<dyn Error>> {
45-
let _ = tracing_init()?;
45+
let _uninstall = tracing_init()?;
4646
let addr = "[::1]:50051".parse()?;
4747
let greeter = MyGreeter::default();
4848

examples/http/src/client.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use hyper::{body::Body, Client};
22
use opentelemetry::api::{Context, TextMapFormat, TraceContextExt, Tracer};
33
use opentelemetry::{api, exporter::trace::stdout, global, sdk};
44

5-
fn init_tracer() {
5+
fn init_tracer() -> (sdk::Tracer, stdout::Uninstall) {
66
// Install stdout exporter pipeline to be able to retrieve the collected spans.
77
// For the demonstration, use `Sampler::AlwaysOn` sampler to sample all traces. In a production
88
// application, use `Sampler::ParentBased` or `Sampler::TraceIdRatioBased` with a desired ratio.
@@ -11,12 +11,12 @@ fn init_tracer() {
1111
default_sampler: Box::new(sdk::Sampler::AlwaysOn),
1212
..Default::default()
1313
})
14-
.install();
14+
.install()
1515
}
1616

1717
#[tokio::main]
1818
async fn main() -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync>> {
19-
init_tracer();
19+
let _guard = init_tracer();
2020

2121
let client = Client::new();
2222
let propagator = api::TraceContextPropagator::new();

examples/http/src/server.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,21 +16,22 @@ async fn handle(req: Request<Body>) -> Result<Response<Body>, Infallible> {
1616
Ok(Response::new("Hello, World!".into()))
1717
}
1818

19-
fn init_tracer() {
19+
fn init_tracer() -> (sdk::Tracer, stdout::Uninstall) {
2020
// Install stdout exporter pipeline to be able to retrieve the collected spans.
21+
2122
// For the demonstration, use `Sampler::AlwaysOn` sampler to sample all traces. In a production
2223
// application, use `Sampler::ParentBased` or `Sampler::TraceIdRatioBased` with a desired ratio.
2324
stdout::new_pipeline()
2425
.with_trace_config(sdk::Config {
2526
default_sampler: Box::new(sdk::Sampler::AlwaysOn),
2627
..Default::default()
2728
})
28-
.install();
29+
.install()
2930
}
3031

3132
#[tokio::main]
3233
async fn main() {
33-
init_tracer();
34+
let _guard = init_tracer();
3435
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
3536

3637
let make_svc = make_service_fn(|_conn| async { Ok::<_, Infallible>(service_fn(handle)) });

examples/stdout.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ fn main() {
55
// Install stdout exporter pipeline to be able to retrieve collected spans.
66
// For the demonstration, use `Sampler::AlwaysOn` sampler to sample all traces. In a production
77
// application, use `Sampler::ParentBased` or `Sampler::TraceIdRatioBased` with a desired ratio.
8-
let tracer = stdout::new_pipeline()
8+
let (tracer, _uninstall) = stdout::new_pipeline()
99
.with_trace_config(sdk::Config {
1010
default_sampler: Box::new(sdk::Sampler::AlwaysOn),
1111
..Default::default()

examples/zipkin/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ fn bar() {
1111
}
1212

1313
fn main() -> Result<(), Box<dyn std::error::Error>> {
14-
let tracer = opentelemetry_zipkin::new_pipeline()
14+
let (tracer, _uninstall) = opentelemetry_zipkin::new_pipeline()
1515
.with_service_name("trace-demo")
1616
.install()?;
1717

opentelemetry-contrib/src/datadog/mod.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@
5959
//! use opentelemetry::sdk::{trace, IdGenerator, Resource, Sampler};
6060
//!
6161
//! fn main() -> Result<(), Box<dyn std::error::Error>> {
62-
//! let tracer = opentelemetry_contrib::datadog::new_pipeline()
62+
//! let (tracer, _uninstall) = opentelemetry_contrib::datadog::new_pipeline()
6363
//! .with_service_name("my_app")
6464
//! .with_version(opentelemetry_contrib::datadog::ApiVersion::Version05)
6565
//! .with_agent_endpoint("http://localhost:8126")
@@ -147,7 +147,7 @@ impl Default for DatadogPipelineBuilder {
147147

148148
impl DatadogPipelineBuilder {
149149
/// Create `ExporterConfig` struct from current `ExporterConfigBuilder`
150-
pub fn install(mut self) -> Result<sdk::Tracer, Box<dyn Error>> {
150+
pub fn install(mut self) -> Result<(sdk::Tracer, Uninstall), Box<dyn Error>> {
151151
let exporter = DatadogExporter::new(
152152
self.service_name.clone(),
153153
self.agent_endpoint.parse()?,
@@ -160,9 +160,9 @@ impl DatadogPipelineBuilder {
160160
}
161161
let provider = provider_builder.build();
162162
let tracer = provider.get_tracer("opentelemetry-datadog", Some(env!("CARGO_PKG_VERSION")));
163-
global::set_provider(provider);
163+
let provider_guard = global::set_tracer_provider(provider);
164164

165-
Ok(tracer)
165+
Ok((tracer, Uninstall(provider_guard)))
166166
}
167167

168168
/// Assign the service name under which to group traces
@@ -211,3 +211,7 @@ impl trace::SpanExporter for DatadogExporter {
211211
}
212212
}
213213
}
214+
215+
/// Uninstalls the Datadog pipeline on drop
216+
#[derive(Debug)]
217+
pub struct Uninstall(global::TracerProviderGuard);

opentelemetry-contrib/src/datadog/model/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,10 +65,10 @@ impl ApiVersion {
6565
#[cfg(test)]
6666
mod tests {
6767
use super::*;
68+
use opentelemetry::api::Key;
6869
use opentelemetry::sdk::InstrumentationLibrary;
6970
use opentelemetry::{api, sdk};
7071
use std::time::{Duration, SystemTime};
71-
use opentelemetry::api::Key;
7272

7373
fn get_spans() -> Vec<Arc<trace::SpanData>> {
7474
let parent_span_id = 1;

opentelemetry-contrib/src/datadog/model/v03.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
use crate::datadog::model::Error;
2+
use opentelemetry::api::{Key, Value};
23
use opentelemetry::exporter::trace;
34
use std::sync::Arc;
45
use std::time::SystemTime;
5-
use opentelemetry::api::{Key, Value};
66

77
pub(crate) fn encode(
88
service_name: &str,

opentelemetry-jaeger/README.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ exporting telemetry:
2222
use opentelemetry::api::Tracer;
2323

2424
fn main() -> Result<(), Box<dyn std::error::Error>> {
25-
let tracer = opentelemetry_jaeger::new_pipeline().install()?;
25+
let (tracer, _uninstall) = opentelemetry_jaeger::new_pipeline().install()?;
2626

2727
tracer.in_span("doing_work", |cx| {
2828
// Traced app logic here...
@@ -64,7 +64,7 @@ use opentelemetry::api::Tracer;
6464

6565
fn main() -> Result<(), Box<dyn std::error::Error>> {
6666
// export OTEL_SERVICE_NAME=my-service-name
67-
let tracer = opentelemetry_jaeger::new_pipeline().from_env().install()?;
67+
let (tracer, _uninstall) = opentelemetry_jaeger::new_pipeline().from_env().install()?;
6868

6969
tracer.in_span("doing_work", |cx| {
7070
// Traced app logic here...
@@ -94,7 +94,7 @@ Then you can use the [`with_collector_endpoint`] method to specify the endpoint:
9494
use opentelemetry::api::Tracer;
9595

9696
fn main() -> Result<(), Box<dyn std::error::Error>> {
97-
let tracer = opentelemetry_jaeger::new_pipeline()
97+
let (tracer, _uninstall) = opentelemetry_jaeger::new_pipeline()
9898
.with_collector_endpoint("http://localhost:14268/api/traces")
9999
// optionally set username and password as well.
100100
.with_collector_username("username")
@@ -121,7 +121,7 @@ use opentelemetry::api::{KeyValue, Tracer};
121121
use opentelemetry::sdk::{trace, IdGenerator, Resource, Sampler};
122122

123123
fn main() -> Result<(), Box<dyn std::error::Error>> {
124-
let tracer = opentelemetry_jaeger::new_pipeline()
124+
let (tracer, _uninstall) = opentelemetry_jaeger::new_pipeline()
125125
.from_env()
126126
.with_agent_endpoint("localhost:6831")
127127
.with_service_name("my_app")

0 commit comments

Comments
 (0)