Skip to content

Commit 0979c06

Browse files
authored
Switch to async exporters (#232)
1 parent 6e10eb2 commit 0979c06

File tree

30 files changed

+364
-530
lines changed

30 files changed

+364
-530
lines changed

Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ edition = "2018"
1818

1919
[dependencies]
2020
async-std = { version = "1.6", features = ["unstable"], optional = true }
21+
async-trait = { version = "0.1", optional = true }
2122
base64 = { version = "0.12", optional = true }
2223
bincode = { version = "1.2", optional = true }
2324
dashmap = { version = "4.0.0-rc6", optional = true }
@@ -42,7 +43,7 @@ tokio = { version = "0.2", features = ["full"] }
4243
[features]
4344
default = ["metrics", "trace"]
4445
base64_format = ["base64", "binary_propagator"]
45-
trace = ["rand", "pin-project"]
46+
trace = ["rand", "pin-project", "async-trait"]
4647
metrics = ["thiserror", "dashmap", "fnv"]
4748
serialize = ["serde", "bincode"]
4849
binary_propagator = []
@@ -78,4 +79,4 @@ harness = false
7879

7980
[[bench]]
8081
name = "ddsketch"
81-
harness = false
82+
harness = false

examples/basic/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ edition = "2018"
66
[dependencies]
77
futures = "0.3"
88
lazy_static = "1.4"
9-
opentelemetry = { path = "../../", features = ["serialize"] }
10-
opentelemetry-jaeger = { path = "../../opentelemetry-jaeger" }
9+
opentelemetry = { path = "../../", features = ["serialize", "tokio"] }
10+
opentelemetry-jaeger = { path = "../../opentelemetry-jaeger", features = ["tokio"] }
1111
serde_json = "1.0"
1212
thrift = "0.13"
1313
tokio = { version = "0.2", features = ["full"] }

examples/stdout.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
1-
use opentelemetry::exporter::trace::stdout;
2-
use opentelemetry::{api::Tracer, sdk};
1+
use opentelemetry::{
2+
api::Tracer,
3+
exporter::trace::stdout,
4+
sdk::{trace, Sampler},
5+
};
36

47
fn main() {
58
// Install stdout exporter pipeline to be able to retrieve collected spans.
69
// For the demonstration, use `Sampler::AlwaysOn` sampler to sample all traces. In a production
710
// application, use `Sampler::ParentBased` or `Sampler::TraceIdRatioBased` with a desired ratio.
811
let (tracer, _uninstall) = stdout::new_pipeline()
9-
.with_trace_config(sdk::Config {
10-
default_sampler: Box::new(sdk::Sampler::AlwaysOn),
11-
..Default::default()
12-
})
12+
.with_trace_config(trace::config().with_default_sampler(Sampler::AlwaysOn))
1313
.install();
1414

1515
tracer.in_span("operation", |_cx| {});

opentelemetry-contrib/Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,13 @@ edition = "2018"
1818

1919
[features]
2020
default = []
21-
datadog = ["indexmap", "reqwest", "rmp"]
21+
datadog = ["indexmap", "reqwest", "rmp", "async-trait"]
2222

2323
[dependencies]
24+
async-trait = { version = "0.1", optional = true }
2425
indexmap = { version = "1.6.0", optional = true }
2526
opentelemetry = { version = "0.8.0", path = ".." }
26-
reqwest = { version = "0.10", features = ["blocking"], optional = true }
27+
reqwest = { version = "0.10", optional = true }
2728
rmp = { version = "0.8", optional = true }
2829
lazy_static = "1.4"
2930

opentelemetry-contrib/src/datadog/mod.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ mod model;
8585

8686
pub use model::ApiVersion;
8787

88+
use async_trait::async_trait;
8889
use opentelemetry::{api::TracerProvider, exporter::trace, global, sdk};
8990
use reqwest::header::CONTENT_TYPE;
9091
use reqwest::Url;
@@ -100,7 +101,7 @@ const DEFAULT_SERVICE_NAME: &str = "OpenTelemetry";
100101
/// Datadog span exporter
101102
#[derive(Debug)]
102103
pub struct DatadogExporter {
103-
client: reqwest::blocking::Client,
104+
client: reqwest::Client,
104105
request_url: Url,
105106
service_name: String,
106107
version: ApiVersion,
@@ -112,7 +113,7 @@ impl DatadogExporter {
112113
request_url.set_path(version.path());
113114

114115
DatadogExporter {
115-
client: reqwest::blocking::Client::new(),
116+
client: reqwest::Client::new(),
116117
request_url,
117118
service_name,
118119
version,
@@ -190,9 +191,10 @@ impl DatadogPipelineBuilder {
190191
}
191192
}
192193

194+
#[async_trait]
193195
impl trace::SpanExporter for DatadogExporter {
194196
/// Export spans to datadog-agent
195-
fn export(&self, batch: Vec<Arc<trace::SpanData>>) -> trace::ExportResult {
197+
async fn export(&self, batch: &[Arc<trace::SpanData>]) -> trace::ExportResult {
196198
let data = match self.version.encode(&self.service_name, batch) {
197199
Ok(data) => data,
198200
Err(_) => return trace::ExportResult::FailedNotRetryable,
@@ -203,7 +205,8 @@ impl trace::SpanExporter for DatadogExporter {
203205
.post(self.request_url.clone())
204206
.header(CONTENT_TYPE, self.version.content_type())
205207
.body(data)
206-
.send();
208+
.send()
209+
.await;
207210

208211
match resp {
209212
Ok(response) if response.status().is_success() => trace::ExportResult::Success,

opentelemetry-contrib/src/datadog/model/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ impl ApiVersion {
5353
pub(crate) fn encode(
5454
self,
5555
service_name: &str,
56-
spans: Vec<Arc<trace::SpanData>>,
56+
spans: &[Arc<trace::SpanData>],
5757
) -> Result<Vec<u8>, Error> {
5858
match self {
5959
Self::Version03 => v03::encode(service_name, spans),
@@ -115,7 +115,7 @@ mod tests {
115115
#[test]
116116
fn test_encode_v03() -> Result<(), Box<dyn std::error::Error>> {
117117
let spans = get_spans();
118-
let encoded = base64::encode(ApiVersion::Version03.encode("service_name", spans)?);
118+
let encoded = base64::encode(ApiVersion::Version03.encode("service_name", &spans)?);
119119

120120
assert_eq!(encoded.as_str(), "kZGLpHR5cGWjd2Vip3NlcnZpY2Wsc2VydmljZV9uYW1lpG5hbWWpY29tcG9uZW50qHJlc291cmNlqHJlc291cmNlqHRyYWNlX2lkzwAAAAAAAAAHp3NwYW5faWTPAAAAAAAAAGOpcGFyZW50X2lkzwAAAAAAAAABpXN0YXJ00wAAAAAAAAAAqGR1cmF0aW9u0wAAAAA7msoApWVycm9y0gAAAACkbWV0YYGpc3Bhbi50eXBlo3dlYg==");
121121

@@ -125,7 +125,7 @@ mod tests {
125125
#[test]
126126
fn test_encode_v05() -> Result<(), Box<dyn std::error::Error>> {
127127
let spans = get_spans();
128-
let encoded = base64::encode(ApiVersion::Version05.encode("service_name", spans)?);
128+
let encoded = base64::encode(ApiVersion::Version05.encode("service_name", &spans)?);
129129

130130
assert_eq!(encoded.as_str(), "kpWsc2VydmljZV9uYW1lo3dlYqljb21wb25lbnSocmVzb3VyY2Wpc3Bhbi50eXBlkZGczgAAAADOAAAAAs4AAAADzwAAAAAAAAAHzwAAAAAAAABjzwAAAAAAAAAB0wAAAAAAAAAA0wAAAAA7msoA0gAAAACBzgAAAATOAAAAAYDOAAAAAQ==");
131131

opentelemetry-contrib/src/datadog/model/v03.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,7 @@ use opentelemetry::exporter::trace;
44
use std::sync::Arc;
55
use std::time::SystemTime;
66

7-
pub(crate) fn encode(
8-
service_name: &str,
9-
spans: Vec<Arc<trace::SpanData>>,
10-
) -> Result<Vec<u8>, Error> {
7+
pub(crate) fn encode(service_name: &str, spans: &[Arc<trace::SpanData>]) -> Result<Vec<u8>, Error> {
118
let mut encoded = Vec::new();
129
rmp::encode::write_array_len(&mut encoded, spans.len() as u32)?;
1310

opentelemetry-contrib/src/datadog/model/v05.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,7 @@ use std::time::SystemTime;
5050
//
5151
// The dictionary in this case would be []string{""}, having only the empty string at index 0.
5252
//
53-
pub(crate) fn encode(
54-
service_name: &str,
55-
spans: Vec<Arc<trace::SpanData>>,
56-
) -> Result<Vec<u8>, Error> {
53+
pub(crate) fn encode(service_name: &str, spans: &[Arc<trace::SpanData>]) -> Result<Vec<u8>, Error> {
5754
let mut interner = StringInterner::new();
5855
let mut encoded_spans = encode_spans(&mut interner, service_name, spans)?;
5956

@@ -73,7 +70,7 @@ pub(crate) fn encode(
7370
fn encode_spans(
7471
interner: &mut StringInterner,
7572
service_name: &str,
76-
spans: Vec<Arc<trace::SpanData>>,
73+
spans: &[Arc<trace::SpanData>],
7774
) -> Result<Vec<u8>, Error> {
7875
let mut encoded = Vec::new();
7976
rmp::encode::write_array_len(&mut encoded, spans.len() as u32)?;

opentelemetry-jaeger/Cargo.toml

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,14 @@ license = "Apache-2.0"
1616
edition = "2018"
1717

1818
[dependencies]
19+
async-std = { version = "1.6", optional = true }
20+
async-trait = "0.1"
21+
http = { version = "0.2", optional = true }
22+
isahc = { version = "0.9", default-features = false, optional = true }
1923
opentelemetry = { version = "0.8", default-features = false, features = ["trace"], path = ".." }
20-
ureq = { version = "1.4", optional = true }
2124
thrift = "0.13"
25+
tokio = { version = "0.2", features = ["udp", "sync"], optional = true }
2226

2327
[features]
2428
default = []
25-
collector_client = ["ureq"]
29+
collector_client = ["isahc", "http"]

opentelemetry-jaeger/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ for either executor when you install the pipeline.
4444
```toml
4545
[dependencies]
4646
opentelemetry = { version = "*", features = ["tokio"] }
47-
opentelemetry-jaeger = "*"
47+
opentelemetry-jaeger = { version = "*", features = ["tokio"] }
4848
```
4949

5050
[`tokio`]: https://tokio.rs

opentelemetry-jaeger/src/agent.rs

Lines changed: 96 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,57 +1,120 @@
11
//! # UDP Jaeger Agent Client
2-
use crate::thrift::{agent, jaeger, zipkincore};
3-
use crate::transport::TUdpChannel;
2+
use crate::thrift::{
3+
agent::{self, TAgentSyncClient},
4+
jaeger,
5+
};
6+
use crate::transport::TNoopChannel;
47
use std::fmt;
5-
use std::net::ToSocketAddrs;
6-
use thrift::protocol;
7-
use thrift::protocol::{TCompactInputProtocol, TCompactOutputProtocol};
8-
use thrift::transport::{ReadHalf, TIoChannel, WriteHalf};
8+
use std::net::{ToSocketAddrs, UdpSocket};
9+
use std::sync::Mutex;
10+
use thrift::{
11+
protocol::{TCompactInputProtocol, TCompactOutputProtocol},
12+
transport::{ReadHalf, TBufferChannel, TIoChannel, WriteHalf},
13+
};
914

10-
/// The max size of UDP packet we want to send, synced with jaeger-agent.
11-
const UDP_MAX_PACKET_SIZE: usize = 65000;
12-
13-
/// `AgentSyncClientUDP` implements the `TAgentSyncClient` interface over UDP
14-
pub(crate) struct AgentSyncClientUDP {
15+
struct BufferClient {
16+
buffer: ReadHalf<TBufferChannel>,
1517
client: agent::AgentSyncClient<
16-
TCompactInputProtocol<ReadHalf<TUdpChannel>>,
17-
TCompactOutputProtocol<WriteHalf<TUdpChannel>>,
18+
TCompactInputProtocol<TNoopChannel>,
19+
TCompactOutputProtocol<WriteHalf<TBufferChannel>>,
1820
>,
1921
}
2022

21-
impl fmt::Debug for AgentSyncClientUDP {
23+
impl fmt::Debug for BufferClient {
2224
/// Debug info
2325
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
24-
fmt.debug_struct("AgentClientUDP")
26+
fmt.debug_struct("BufferClient")
27+
.field("buffer", &self.buffer)
2528
.field("client", &"AgentSyncClient")
2629
.finish()
2730
}
2831
}
2932

30-
impl AgentSyncClientUDP {
33+
/// `AgentAsyncClientUDP` implements an async version of the `TAgentSyncClient`
34+
/// interface over UDP.
35+
#[derive(Debug)]
36+
pub(crate) struct AgentAsyncClientUDP {
37+
#[cfg(all(not(feature = "async-std"), not(feature = "tokio")))]
38+
conn: UdpSocket,
39+
#[cfg(feature = "tokio")]
40+
conn: tokio::sync::Mutex<tokio::net::UdpSocket>,
41+
#[cfg(all(feature = "async-std", not(feature = "tokio")))]
42+
conn: async_std::sync::Mutex<async_std::net::UdpSocket>,
43+
buffer_client: Mutex<BufferClient>,
44+
}
45+
46+
impl AgentAsyncClientUDP {
3147
/// Create a new UDP agent client
32-
pub(crate) fn new<T: ToSocketAddrs>(
33-
host_port: T,
34-
max_packet_size: Option<usize>,
35-
) -> thrift::Result<Self> {
36-
let transport = TUdpChannel::new(host_port, max_packet_size.or(Some(UDP_MAX_PACKET_SIZE)))?;
37-
let (read, write) = transport.split()?;
48+
pub(crate) fn new<T: ToSocketAddrs>(host_port: T) -> thrift::Result<Self> {
49+
let (buffer, write) = TBufferChannel::with_capacity(0, 512).split()?;
3850
let client = agent::AgentSyncClient::new(
39-
protocol::TCompactInputProtocol::new(read),
40-
protocol::TCompactOutputProtocol::new(write),
51+
TCompactInputProtocol::new(TNoopChannel),
52+
TCompactOutputProtocol::new(write),
4153
);
4254

43-
Ok(AgentSyncClientUDP { client })
44-
}
45-
}
55+
let conn = UdpSocket::bind("0.0.0.0:0")?;
56+
conn.connect(host_port)?;
4657

47-
impl agent::TAgentSyncClient for AgentSyncClientUDP {
48-
/// Emit zipkin batch (Deprecated)
49-
fn emit_zipkin_batch(&mut self, spans: Vec<zipkincore::Span>) -> thrift::Result<()> {
50-
self.client.emit_zipkin_batch(spans)
58+
Ok(AgentAsyncClientUDP {
59+
#[cfg(all(not(feature = "async-std"), not(feature = "tokio")))]
60+
conn,
61+
#[cfg(feature = "tokio")]
62+
conn: tokio::sync::Mutex::new(tokio::net::UdpSocket::from_std(conn)?),
63+
#[cfg(all(feature = "async-std", not(feature = "tokio")))]
64+
conn: async_std::sync::Mutex::new(async_std::net::UdpSocket::from(conn)),
65+
buffer_client: Mutex::new(BufferClient { buffer, client }),
66+
})
5167
}
5268

5369
/// Emit standard Jaeger batch
54-
fn emit_batch(&mut self, batch: jaeger::Batch) -> thrift::Result<()> {
55-
self.client.emit_batch(batch)
70+
pub(crate) async fn emit_batch(&self, batch: jaeger::Batch) -> thrift::Result<()> {
71+
// Write payload to buffer
72+
let payload = self
73+
.buffer_client
74+
.lock()
75+
.map_err(|err| {
76+
thrift::Error::from(std::io::Error::new(
77+
std::io::ErrorKind::Other,
78+
err.to_string(),
79+
))
80+
})
81+
.and_then(|mut buffer_client| {
82+
// Write to tmp buffer
83+
buffer_client.client.emit_batch(batch)?;
84+
// extract written payload
85+
let payload = buffer_client.buffer.write_bytes();
86+
// reset
87+
buffer_client.buffer.empty_write_buffer();
88+
89+
Ok(payload)
90+
})?;
91+
92+
// Write async to socket, reading from buffer
93+
write_to_socket(self, payload).await?;
94+
95+
Ok(())
5696
}
5797
}
98+
99+
#[cfg(all(not(feature = "async-std"), not(feature = "tokio")))]
100+
async fn write_to_socket(client: &AgentAsyncClientUDP, payload: Vec<u8>) -> thrift::Result<()> {
101+
client.conn.send(&payload)?;
102+
103+
Ok(())
104+
}
105+
106+
#[cfg(feature = "tokio")]
107+
async fn write_to_socket(client: &AgentAsyncClientUDP, payload: Vec<u8>) -> thrift::Result<()> {
108+
let mut conn = client.conn.lock().await;
109+
conn.send(&payload).await?;
110+
111+
Ok(())
112+
}
113+
114+
#[cfg(all(feature = "async-std", not(feature = "tokio")))]
115+
async fn write_to_socket(client: &AgentAsyncClientUDP, payload: Vec<u8>) -> thrift::Result<()> {
116+
let conn = client.conn.lock().await;
117+
conn.send(&payload).await?;
118+
119+
Ok(())
120+
}

0 commit comments

Comments
 (0)