From 55cc9f13e12938b9a30e1ff37dc827828363bbd2 Mon Sep 17 00:00:00 2001 From: Pi Lanningham Date: Sat, 28 Dec 2024 16:00:44 -0500 Subject: [PATCH] Wait for traces and metrics on close Because we're using the batch provider, and span information is sent when the span *exits*, if we just let the process exit immediately, we might lose some tracing data; The [recommended pattern](https://github.com/open-telemetry/opentelemetry-rust/issues/1961#issuecomment-2494004249) is to hold onto the providers and shut them down manually as the process exits. This will wait for any spans to finish shipping and avoid losing data. Note, that we might want another pass at this in the future: - integrate it into the panic handler that I added in another branch - integrate something like [Tokio Graceful Shutdown](https://docs.rs/tokio-graceful-shutdown/latest/tokio_graceful_shutdown/) to intercept ctrl+C and the like - add a timeout, so that a stalled metrics writer doesn't wait forever I kept it simple for this PR, but just something we should keep in mind --- crates/amaru/src/bin/amaru/main.rs | 66 ++++++++++++++++++++---------- 1 file changed, 44 insertions(+), 22 deletions(-) diff --git a/crates/amaru/src/bin/amaru/main.rs b/crates/amaru/src/bin/amaru/main.rs index f64706f..de12015 100644 --- a/crates/amaru/src/bin/amaru/main.rs +++ b/crates/amaru/src/bin/amaru/main.rs @@ -1,5 +1,7 @@ use clap::{Parser, Subcommand}; +use miette::IntoDiagnostic; use opentelemetry::metrics::Counter; +use opentelemetry_sdk::{metrics::SdkMeterProvider, trace::TracerProvider}; use panic::panic_handler; use std::env; @@ -32,17 +34,22 @@ struct Cli { async fn main() -> miette::Result<()> { panic_handler(); - let counter = setup_tracing(); + let (tracing, metrics, counter) = setup_tracing(); let args = Cli::parse(); - match args.command { + let result = match args.command { Command::Daemon(args) => cmd::daemon::run(args, counter).await, Command::Import(args) => cmd::import::run(args).await, - } + }; + + // TODO: we might also want to integrate this into a graceful shutdown system, and into a panic hook + teardown_tracing(tracing, metrics)?; + + result } -pub fn setup_tracing() -> Counter { +pub fn setup_tracing() -> (TracerProvider, SdkMeterProvider, Counter) { use opentelemetry::{metrics::MeterProvider, trace::TracerProvider as _, KeyValue}; use opentelemetry_sdk::{metrics::Temporality, Resource}; use tracing_subscriber::{prelude::*, *}; @@ -72,22 +79,19 @@ pub fn setup_tracing() -> Counter { let resource = Resource::new(vec![KeyValue::new("service.name", SERVICE_NAME)]); // Traces & span - let opentelemetry_layer = tracing_opentelemetry::layer() - .with_tracer( - opentelemetry_sdk::trace::TracerProvider::builder() - .with_resource(resource.clone()) - .with_batch_exporter( - opentelemetry_otlp::SpanExporter::builder() - .with_tonic() - .build() - .unwrap_or_else(|e| { - panic!("failed to setup opentelemetry span exporter: {e}") - }), - opentelemetry_sdk::runtime::Tokio, - ) + let opentelemetry_provider = opentelemetry_sdk::trace::TracerProvider::builder() + .with_resource(resource.clone()) + .with_batch_exporter( + opentelemetry_otlp::SpanExporter::builder() + .with_tonic() .build() - .tracer(SERVICE_NAME), + .unwrap_or_else(|e| panic!("failed to setup opentelemetry span exporter: {e}")), + opentelemetry_sdk::runtime::Tokio, ) + .build(); + let opentelemetry_tracer = opentelemetry_provider.tracer(SERVICE_NAME); + let opentelemetry_layer = tracing_opentelemetry::layer() + .with_tracer(opentelemetry_tracer) .with_filter(filter(AMARU_LOG)); // Metrics @@ -105,14 +109,14 @@ pub fn setup_tracing() -> Counter { ) .build(); - let provider = opentelemetry_sdk::metrics::SdkMeterProvider::builder() + let metrics_provider = opentelemetry_sdk::metrics::SdkMeterProvider::builder() .with_reader(metric_reader) .with_resource(resource) .build(); - let meter = provider.meter("amaru"); + let meter = metrics_provider.meter("amaru"); - opentelemetry::global::set_meter_provider(provider); + opentelemetry::global::set_meter_provider(metrics_provider.clone()); // Subscriber tracing_subscriber::registry() @@ -127,5 +131,23 @@ pub fn setup_tracing() -> Counter { let counter = meter.u64_counter("block.count").build(); - counter + (opentelemetry_provider, metrics_provider, counter) +} + +pub fn teardown_tracing(tracing: TracerProvider, metrics: SdkMeterProvider) -> miette::Result<()> { + // Shut down the providers so that it flushes any remaining spans + // TODO: we might also want to wrap this in a timeout, so we don't hold the process open forever? + tracing.shutdown().into_diagnostic()?; + metrics.shutdown().into_diagnostic()?; + + // This appears to be a deprecated method that will be removed soon + // and just *releases* a reference to it, but doesn't actually call shutdown + // still, we call it just in case until it gets removed + // See: + // https://github.com/tokio-rs/tracing-opentelemetry/issues/159 + // https://github.com/tokio-rs/tracing-opentelemetry/pull/175 + // https://github.com/open-telemetry/opentelemetry-rust/issues/1961 + opentelemetry::global::shutdown_tracer_provider(); + + Ok(()) }