Skip to content

Commit

Permalink
PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
landonxjames committed Nov 18, 2024
1 parent d27317c commit 71cd2ef
Show file tree
Hide file tree
Showing 9 changed files with 111 additions and 91 deletions.
38 changes: 16 additions & 22 deletions rust-runtime/aws-smithy-observability-otel/src/attributes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,16 @@ pub(crate) fn option_attr_from_kv(input: &[KeyValue]) -> Option<Attributes> {
impl From<AttributesWrap> for Vec<KeyValue> {
fn from(value: AttributesWrap) -> Self {
value
.attributes()
.iter()
.0
.into_attributes()
.map(|(k, v)| {
KeyValue::new(
k.clone(),
k,
match v {
AttributeValue::I64(val) => Value::I64(*val),
AttributeValue::F64(val) => Value::F64(*val),
AttributeValue::String(val) => Value::String(val.clone().into()),
AttributeValue::Bool(val) => Value::Bool(*val),
AttributeValue::I64(val) => Value::I64(val),
AttributeValue::F64(val) => Value::F64(val),
AttributeValue::String(val) => Value::String(val.into()),
AttributeValue::Bool(val) => Value::Bool(val),
_ => Value::String("UNSUPPORTED ATTRIBUTE VALUE TYPE".into()),
},
)
Expand All @@ -68,7 +68,7 @@ impl From<&[KeyValue]> for AttributesWrap {

value.iter().for_each(|kv| {
attrs.set(
kv.key.clone().into(),
kv.key.clone(),
match &kv.value {
Value::Bool(val) => AttributeValue::Bool(*val),
Value::I64(val) => AttributeValue::I64(*val),
Expand Down Expand Up @@ -96,13 +96,10 @@ mod tests {
#[test]
fn attr_to_kv() {
let mut attrs = Attributes::new();
attrs.set("I64".into(), AttributeValue::I64(64));
attrs.set("F64".into(), AttributeValue::F64(64.0));
attrs.set(
"String".into(),
AttributeValue::String("I AM A STRING".into()),
);
attrs.set("Bool".into(), AttributeValue::Bool(true));
attrs.set("I64", AttributeValue::I64(64));
attrs.set("F64", AttributeValue::F64(64.0));
attrs.set("String", AttributeValue::String("I AM A STRING".into()));
attrs.set("Bool", AttributeValue::Bool(true));

let kv = kv_from_option_attr(Some(&attrs));

Expand Down Expand Up @@ -130,15 +127,12 @@ mod tests {
];

let attrs = option_attr_from_kv(&kvs).unwrap();
assert_eq!(attrs.get("Bool").unwrap(), &AttributeValue::Bool(true));
assert_eq!(
attrs.get("Bool".into()).unwrap(),
&AttributeValue::Bool(true)
);
assert_eq!(
attrs.get("String".into()).unwrap(),
attrs.get("String").unwrap(),
&AttributeValue::String("I AM A STRING".into())
);
assert_eq!(attrs.get("I64".into()).unwrap(), &AttributeValue::I64(64));
assert_eq!(attrs.get("F64".into()).unwrap(), &AttributeValue::F64(64.0));
assert_eq!(attrs.get("I64").unwrap(), &AttributeValue::I64(64));
assert_eq!(attrs.get("F64").unwrap(), &AttributeValue::F64(64.0));
}
}
28 changes: 17 additions & 11 deletions rust-runtime/aws-smithy-observability-otel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ pub mod meter;
mod tests {

use crate::meter::AwsSdkOtelMeterProvider;
use aws_smithy_observability::global::{
get_global_telemetry_provider, set_global_telemetry_provider,
};
use aws_smithy_observability::global::{get_telemetry_provider, set_telemetry_provider};
use aws_smithy_observability::provider::TelemetryProvider;
use opentelemetry_sdk::metrics::{data::Sum, PeriodicReader, SdkMeterProvider};
use opentelemetry_sdk::runtime::Tokio;
Expand All @@ -44,13 +42,11 @@ mod tests {

// Create the SDK metrics types from the OTel objects
let sdk_mp = AwsSdkOtelMeterProvider::new(otel_mp);
let sdk_tp = TelemetryProvider::builder()
.meter_provider(Box::new(sdk_mp))
.build();
let sdk_tp = TelemetryProvider::builder().meter_provider(sdk_mp).build();

// Set the global TelemetryProvider and then get it back out
let _ = set_global_telemetry_provider(Some(sdk_tp));
let global_tp = get_global_telemetry_provider();
let _ = set_telemetry_provider(sdk_tp);
let global_tp = get_telemetry_provider();

// Create an instrument and record a value
let global_meter = global_tp
Expand All @@ -62,7 +58,13 @@ mod tests {
mono_counter.add(4, None, None);

// Flush metric pipeline and extract metrics from exporter
global_tp.meter_provider().flush().unwrap();
global_tp
.meter_provider()
.as_any()
.downcast_ref::<AwsSdkOtelMeterProvider>()
.unwrap()
.shutdown()
.unwrap();
let finished_metrics = exporter.get_finished_metrics().unwrap();

let extracted_mono_counter_data = &finished_metrics[0].scope_metrics[0].metrics[0]
Expand All @@ -75,7 +77,11 @@ mod tests {
assert_eq!(extracted_mono_counter_data, &4);

// Get the OTel TP out and shut it down
let otel_tp = set_global_telemetry_provider(None);
otel_tp.meter_provider().shutdown().unwrap();
let foo = global_tp
.meter_provider()
.as_any()
.downcast_ref::<AwsSdkOtelMeterProvider>()
.unwrap();
foo.shutdown().unwrap();
}
}
51 changes: 31 additions & 20 deletions rust-runtime/aws-smithy-observability-otel/src/meter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,28 +266,34 @@ impl AwsSdkOtelMeterProvider {
meter_provider: otel_meter_provider,
}
}
}

impl MeterProvider for AwsSdkOtelMeterProvider {
fn get_meter(&self, scope: &'static str, _attributes: Option<&Attributes>) -> Box<dyn Meter> {
Box::new(MeterWrap(self.meter_provider.meter(scope)))
}

fn flush(&self) -> Result<(), ObservabilityError> {
/// Flush the metric pipeline.
pub fn flush(&self) -> Result<(), ObservabilityError> {
match self.meter_provider.force_flush() {
Ok(_) => Ok(()),
Err(err) => Err(ObservabilityError::new(ErrorKind::MetricsFlush, err)),
}
}

fn shutdown(&self) -> Result<(), ObservabilityError> {
/// Gracefully shutdown the metric pipeline.
pub fn shutdown(&self) -> Result<(), ObservabilityError> {
match self.meter_provider.force_flush() {
Ok(_) => Ok(()),
Err(err) => Err(ObservabilityError::new(ErrorKind::MetricsShutdown, err)),
}
}
}

impl MeterProvider for AwsSdkOtelMeterProvider {
fn get_meter(&self, scope: &'static str, _attributes: Option<&Attributes>) -> Box<dyn Meter> {
Box::new(MeterWrap(self.meter_provider.meter(scope)))
}

fn as_any(&self) -> &dyn std::any::Any {
self
}
}

#[cfg(test)]
mod tests {

Expand All @@ -313,9 +319,7 @@ mod tests {

// Create the SDK metrics types from the OTel objects
let sdk_mp = AwsSdkOtelMeterProvider::new(otel_mp);
let sdk_tp = TelemetryProvider::builder()
.meter_provider(Box::new(sdk_mp))
.build();
let sdk_tp = TelemetryProvider::builder().meter_provider(sdk_mp).build();

// Get the dyn versions of the SDK metrics objects
let dyn_sdk_mp = sdk_tp.meter_provider();
Expand All @@ -332,7 +336,12 @@ mod tests {
histogram.record(1.234, None, None);

// Gracefully shutdown the metrics provider so all metrics are flushed through the pipeline
dyn_sdk_mp.shutdown().unwrap();
dyn_sdk_mp
.as_any()
.downcast_ref::<AwsSdkOtelMeterProvider>()
.unwrap()
.shutdown()
.unwrap();

// Extract the metrics from the exporter and assert that they are what we expect
let finished_metrics = exporter.get_finished_metrics().unwrap();
Expand Down Expand Up @@ -373,9 +382,7 @@ mod tests {

// Create the SDK metrics types from the OTel objects
let sdk_mp = AwsSdkOtelMeterProvider::new(otel_mp);
let sdk_tp = TelemetryProvider::builder()
.meter_provider(Box::new(sdk_mp))
.build();
let sdk_tp = TelemetryProvider::builder().meter_provider(sdk_mp).build();

// Get the dyn versions of the SDK metrics objects
let dyn_sdk_mp = sdk_tp.meter_provider();
Expand All @@ -388,7 +395,7 @@ mod tests {
Box::new(|measurement: &dyn AsyncMeasurement<Value = f64>| {
let mut attrs = Attributes::new();
attrs.set(
"TestGaugeAttr".into(),
"TestGaugeAttr",
AttributeValue::String("TestGaugeAttr".into()),
);
measurement.record(6.789, Some(&attrs), None);
Expand All @@ -403,7 +410,7 @@ mod tests {
Box::new(|measurement: &dyn AsyncMeasurement<Value = i64>| {
let mut attrs = Attributes::new();
attrs.set(
"TestAsyncUpDownCounterAttr".into(),
"TestAsyncUpDownCounterAttr",
AttributeValue::String("TestAsyncUpDownCounterAttr".into()),
);
measurement.record(12, Some(&attrs), None);
Expand All @@ -418,7 +425,7 @@ mod tests {
Box::new(|measurement: &dyn AsyncMeasurement<Value = u64>| {
let mut attrs = Attributes::new();
attrs.set(
"TestAsyncMonoCounterAttr".into(),
"TestAsyncMonoCounterAttr",
AttributeValue::String("TestAsyncMonoCounterAttr".into()),
);
measurement.record(123, Some(&attrs), None);
Expand All @@ -429,8 +436,12 @@ mod tests {
async_mono_counter.record(4, None, None);

// Gracefully shutdown the metrics provider so all metrics are flushed through the pipeline
dyn_sdk_mp.flush().unwrap();
dyn_sdk_mp.shutdown().unwrap();
dyn_sdk_mp
.as_any()
.downcast_ref::<AwsSdkOtelMeterProvider>()
.unwrap()
.shutdown()
.unwrap();

// Extract the metrics from the exporter
let finished_metrics = exporter.get_finished_metrics().unwrap();
Expand Down
13 changes: 9 additions & 4 deletions rust-runtime/aws-smithy-observability/src/attributes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,24 @@ impl Attributes {
}

/// Set an attribute.
pub fn set(&mut self, key: String, value: AttributeValue) {
self.attrs.insert(key, value);
pub fn set(&mut self, key: impl Into<String>, value: impl Into<AttributeValue>) {
self.attrs.insert(key.into(), value.into());
}

/// Get an attribute.
pub fn get(&self, key: String) -> Option<&AttributeValue> {
self.attrs.get(&key)
pub fn get(&self, key: impl Into<String>) -> Option<&AttributeValue> {
self.attrs.get(&key.into())
}

/// Get all of the attribute key value pairs.
pub fn attributes(&self) -> &HashMap<String, AttributeValue> {
&self.attrs
}

/// Get an owned [Iterator] of ([String], [AttributeValue]).
pub fn into_attributes(self) -> impl Iterator<Item = (String, AttributeValue)> {
self.attrs.into_iter()
}
}

impl Default for Attributes {
Expand Down
29 changes: 12 additions & 17 deletions rust-runtime/aws-smithy-observability/src/global.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,26 @@ use crate::provider::{GlobalTelemetryProvider, TelemetryProvider};
static GLOBAL_TELEMETRY_PROVIDER: Lazy<RwLock<GlobalTelemetryProvider>> =
Lazy::new(|| RwLock::new(GlobalTelemetryProvider::new(TelemetryProvider::default())));

/// Set the current global [TelemetryProvider]. If [None] is supplied then a noop provider is set.
/// The previous [TelemetryProvider] is returned in an [Arc] so appropriate cleanup can be done if necessary.
pub fn set_global_telemetry_provider(
new_provider: Option<TelemetryProvider>,
) -> Arc<TelemetryProvider> {
/// Set the current global [TelemetryProvider].
///
/// This is meant to be run once at the beginning of an application. It will panic if two threads
/// attempt to call it at the same time.
pub fn set_telemetry_provider(new_provider: TelemetryProvider) {
// TODO(smithyObservability): would probably be nicer to return a Result here, but the Guard held by the error from
// .try_write is not Send so I struggled to build an ObservabilityError from it
let mut old_provider = GLOBAL_TELEMETRY_PROVIDER
.try_write()
.expect("GLOBAL_TELEMETRY_PROVIDER RwLock Poisoned");

let new_global_provider = if let Some(tp) = new_provider {
GlobalTelemetryProvider::new(tp)
} else {
GlobalTelemetryProvider::new(TelemetryProvider::default())
};
let new_global_provider = GlobalTelemetryProvider::new(new_provider);

mem::replace(&mut *old_provider, new_global_provider).telemetry_provider
let _ = mem::replace(&mut *old_provider, new_global_provider);
}

/// Get an [Arc] reference to the current global [TelemetryProvider].
pub fn get_global_telemetry_provider() -> Arc<TelemetryProvider> {
///
/// This can panic if called when another thread is calling [set_telemetry_provider].
pub fn get_telemetry_provider() -> Arc<TelemetryProvider> {
// TODO(smithyObservability): would probably be nicer to return a Result here, but the Guard held by the error from
// .try_read is not Send so I struggled to build an ObservabilityError from it
GLOBAL_TELEMETRY_PROVIDER
Expand All @@ -62,16 +60,13 @@ mod tests {
let my_provider = TelemetryProvider::default();

// Set the new counter and get a reference to the old one
let old_provider = set_global_telemetry_provider(Some(my_provider));

// Call shutdown on the old meter provider
let _old_meter = old_provider.meter_provider().shutdown().unwrap();
set_telemetry_provider(my_provider);
}

#[test]
#[serial]
fn can_get_global_telemetry_provider() {
let curr_provider = get_global_telemetry_provider();
let curr_provider = get_telemetry_provider();

// Use the global provider to create an instrument and record a value with it
let curr_mp = curr_provider.meter_provider();
Expand Down
5 changes: 4 additions & 1 deletion rust-runtime/aws-smithy-observability/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@
)]

//! Smithy Observability
//TODO(smithyobservability): once we have finalized everything and integrated metrics with our runtime
// TODO(smithyobservability): once we have finalized everything and integrated metrics with our runtime
// libraries update this with detailed usage docs and examples

pub mod attributes;
pub use attributes::{AttributeValue, Attributes};
pub mod error;
pub use error::{ErrorKind, ObservabilityError};
pub mod global;
pub mod meter;
mod noop;
pub mod provider;
pub use provider::{TelemetryProvider, TelemetryProviderBuilder};
12 changes: 2 additions & 10 deletions rust-runtime/aws-smithy-observability/src/meter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,14 @@
//! real time.
use crate::attributes::{Attributes, Context};
use crate::error::ObservabilityError;

/// Provides named instances of [Meter].
pub trait MeterProvider {
/// Get or create a named [Meter].
fn get_meter(&self, scope: &'static str, attributes: Option<&Attributes>) -> Box<dyn Meter>;

/// Optional method to flush the metrics pipeline, default is noop
fn flush(&self) -> Result<(), ObservabilityError> {
Ok(())
}

/// Optional method to shutdown the metrics provider, default is noop
fn shutdown(&self) -> Result<(), ObservabilityError> {
Ok(())
}
/// Foo
fn as_any(&self) -> &dyn std::any::Any;
}

/// The entry point to creating instruments. A grouping of related metrics.
Expand Down
4 changes: 4 additions & 0 deletions rust-runtime/aws-smithy-observability/src/noop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ impl MeterProvider for NoopMeterProvider {
fn get_meter(&self, _scope: &'static str, _attributes: Option<&Attributes>) -> Box<dyn Meter> {
Box::new(NoopMeter)
}

fn as_any(&self) -> &dyn std::any::Any {
self
}
}

pub(crate) struct NoopMeter;
Expand Down
Loading

0 comments on commit 71cd2ef

Please sign in to comment.