Skip to content

Commit

Permalink
Wait for traces and metrics on close
Browse files Browse the repository at this point in the history
Because we're using the batch provider, and span information is sent
when the span *exits*, if we just let the process exit immediately, we
might lose some tracing data;

The [recommended pattern](open-telemetry/opentelemetry-rust#1961 (comment)) is to hold onto the providers and shut them down manually
as the process exits. This will wait for any spans to finish shipping
and avoid losing data.

Note, that we might want another pass at this in the future:
 - integrate it into the panic handler that I added in another branch
 - integrate something like [Tokio Graceful Shutdown](https://docs.rs/tokio-graceful-shutdown/latest/tokio_graceful_shutdown/) to intercept ctrl+C and the like
 - add a timeout, so that a stalled metrics writer doesn't wait forever

I kept it simple for this PR, but just something we should keep in mind
  • Loading branch information
Quantumplation committed Jan 1, 2025
1 parent e230b96 commit 9325e41
Showing 1 changed file with 41 additions and 22 deletions.
63 changes: 41 additions & 22 deletions crates/amaru/src/bin/amaru/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use clap::{Parser, Subcommand};
use miette::IntoDiagnostic;
use opentelemetry::metrics::Counter;
use opentelemetry_sdk::{metrics::SdkMeterProvider, trace::TracerProvider};
use panic::panic_handler;
use std::env;

Expand Down Expand Up @@ -32,17 +34,21 @@ struct Cli {
async fn main() -> miette::Result<()> {
panic_handler();

let counter = setup_tracing();
let (tracing, metrics, counter) = setup_tracing();

let args = Cli::parse();

match args.command {
let result = match args.command {
Command::Daemon(args) => cmd::daemon::run(args, counter).await,
Command::Import(args) => cmd::import::run(args).await,
}
};

// TODO: we might also want to integrate this into a graceful shutdown system, and into a panic hook
finish_tracing(tracing, metrics)?;
result
}

pub fn setup_tracing() -> Counter<u64> {
pub fn setup_tracing() -> (TracerProvider, SdkMeterProvider, Counter<u64>) {
use opentelemetry::{metrics::MeterProvider, trace::TracerProvider as _, KeyValue};
use opentelemetry_sdk::{metrics::Temporality, Resource};
use tracing_subscriber::{prelude::*, *};
Expand Down Expand Up @@ -72,22 +78,19 @@ pub fn setup_tracing() -> Counter<u64> {
let resource = Resource::new(vec![KeyValue::new("service.name", SERVICE_NAME)]);

// Traces & span
let opentelemetry_layer = tracing_opentelemetry::layer()
.with_tracer(
opentelemetry_sdk::trace::TracerProvider::builder()
.with_resource(resource.clone())
.with_batch_exporter(
opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.build()
.unwrap_or_else(|e| {
panic!("failed to setup opentelemetry span exporter: {e}")
}),
opentelemetry_sdk::runtime::Tokio,
)
let opentelemetry_provider = opentelemetry_sdk::trace::TracerProvider::builder()
.with_resource(resource.clone())
.with_batch_exporter(
opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.build()
.tracer(SERVICE_NAME),
.unwrap_or_else(|e| panic!("failed to setup opentelemetry span exporter: {e}")),
opentelemetry_sdk::runtime::Tokio,
)
.build();
let opentelemetry_tracer = opentelemetry_provider.tracer(SERVICE_NAME);
let opentelemetry_layer = tracing_opentelemetry::layer()
.with_tracer(opentelemetry_tracer)
.with_filter(filter(AMARU_LOG));

// Metrics
Expand All @@ -105,14 +108,14 @@ pub fn setup_tracing() -> Counter<u64> {
)
.build();

let provider = opentelemetry_sdk::metrics::SdkMeterProvider::builder()
let metrics_provider = opentelemetry_sdk::metrics::SdkMeterProvider::builder()
.with_reader(metric_reader)
.with_resource(resource)
.build();

let meter = provider.meter("amaru");
let meter = metrics_provider.meter("amaru");

opentelemetry::global::set_meter_provider(provider);
opentelemetry::global::set_meter_provider(metrics_provider.clone());

// Subscriber
tracing_subscriber::registry()
Expand All @@ -127,5 +130,21 @@ pub fn setup_tracing() -> Counter<u64> {

let counter = meter.u64_counter("block.count").build();

counter
(opentelemetry_provider, metrics_provider, counter)
}

pub fn finish_tracing(tracing: TracerProvider, metrics: SdkMeterProvider) -> miette::Result<()> {
// Shut down the providers so that it flushes any remaining spans
// TODO: we might also want to wrap this in a timeout, so we don't hold the process open forever?
tracing.shutdown().into_diagnostic()?;
metrics.shutdown().into_diagnostic()?;
// This appears to be a deprecated method that will be removed soon
// and just *releases* a reference to it, but doesn't actually call shutdown
// still, we call it just in case until it gets removed
// See:
// https://github.com/tokio-rs/tracing-opentelemetry/issues/159
// https://github.com/tokio-rs/tracing-opentelemetry/pull/175
// https://github.com/open-telemetry/opentelemetry-rust/issues/1961
opentelemetry::global::shutdown_tracer_provider();
Ok(())
}

0 comments on commit 9325e41

Please sign in to comment.