Skip to content

Commit a3f3553

Browse files
feat: draw the rest of the owl
1 parent 1b2ff9a commit a3f3553

File tree

107 files changed

+4645
-813
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

107 files changed

+4645
-813
lines changed

Cargo.lock

Lines changed: 7 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

README.md

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -132,19 +132,20 @@ A more complete example can be found in the [examples directory](https://github.
132132

133133
Always register the instrumentation rule last in your physical optimizer chain.
134134

135-
- Many optimizer rules identify nodes using `as_any().downcast_ref::<ConcreteExec>()`.
136-
Since instrumentation wraps each node in a private `InstrumentedExec`, those downcasts
137-
won’t match if instrumentation runs first, causing rules to be skipped or, in code
138-
that assumes success, to panic.
139-
- Some rules may rewrite parts of the plan after instrumentation. While `InstrumentedExec`
135+
- `InstrumentedExec` is transparent for downcasting: its `as_any()` delegates to the wrapped
136+
node, so `as_any().downcast_ref::<ConcreteExec>()` works through the wrapper. This allows
137+
rules and execution-time code to identify wrapped nodes correctly.
138+
- However, some rules may rewrite parts of the plan after instrumentation. While `InstrumentedExec`
140139
re-wraps many common mutations, placing the rule last guarantees full, consistent
141-
coverage regardless of other rules behaviors.
140+
coverage regardless of other rules' behaviors.
142141

143142
Why is `InstrumentedExec` private?
144143

145-
- To prevent downstream code from downcasting to or unwrapping the wrapper, which would be
144+
- To prevent downstream code from directly accessing or unwrapping the wrapper, which would be
146145
brittle and force long-term compatibility constraints on its internals. The public
147-
contract is the optimizer rule, not the concrete node.
146+
contract is the optimizer rule and the transparent downcasting behavior, not the concrete node.
147+
- Downcasting works transparently (via delegated `as_any()`), so you can check node types
148+
without knowing about the wrapper.
148149

149150
How to ensure it is last:
150151

datafusion-tracing/src/instrument_rule.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -68,19 +68,19 @@ impl PhysicalOptimizerRule for InstrumentRule {
6868
plan: Arc<dyn ExecutionPlan>,
6969
_config: &ConfigOptions,
7070
) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
71-
// Iterate over the plan and wrap each node with InstrumentedExec
71+
// Wrap each node in the plan tree with InstrumentedExec.
72+
// Since InstrumentedExec delegates as_any() to its inner node, it's transparent
73+
// for downcasting - other code can still identify wrapped nodes by their actual type.
74+
//
75+
// Note: This will wrap every node, even if the rule is called multiple times on
76+
// the same plan (unusual but possible). The resulting double-wrapping creates
77+
// duplicate telemetry but remains functionally correct.
7278
plan.transform(|plan| {
73-
if plan.as_any().downcast_ref::<InstrumentedExec>().is_none() {
74-
// Node is not InstrumentedExec; wrap it
75-
Ok(Transformed::yes(Arc::new(InstrumentedExec::new(
76-
plan,
77-
self.span_create_fn.clone(),
78-
&self.options,
79-
))))
80-
} else {
81-
// Node is already InstrumentedExec; do not wrap again
82-
Ok(Transformed::no(plan))
83-
}
79+
Ok(Transformed::yes(Arc::new(InstrumentedExec::new(
80+
plan,
81+
self.span_create_fn.clone(),
82+
&self.options,
83+
))))
8484
})
8585
.data()
8686
}

datafusion-tracing/src/instrumented.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -347,8 +347,9 @@ impl ExecutionPlan for InstrumentedExec {
347347
Some(self.with_new_inner(new_inner))
348348
}
349349

350+
/// Delegate to the inner plan for downcasting.
350351
fn as_any(&self) -> &dyn Any {
351-
self
352+
self.inner.as_any()
352353
}
353354

354355
/// Executes the plan for a given partition and context, instrumented with tracing and metrics recording.

examples/distributed_otlp.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,8 +136,8 @@ async fn main() -> Result<()> {
136136
async fn run_distributed_otlp_example() -> Result<()> {
137137
// Test both distributed execution modes
138138
let modes = [
139-
(DistributedMode::Memory, "Memory"),
140139
(DistributedMode::Localhost, "Localhost"),
140+
(DistributedMode::Memory, "Memory"),
141141
];
142142

143143
for (mode, mode_name) in modes {
@@ -187,6 +187,13 @@ async fn run_distributed_otlp_example() -> Result<()> {
187187

188188
/// Initializes OpenTelemetry and tracing infrastructure to enable tracing of query execution.
189189
fn init_tracing() -> Result<opentelemetry_sdk::trace::SdkTracerProvider> {
190+
// Set up the global text map propagator for trace context propagation across gRPC boundaries.
191+
// This is essential for distributed tracing to work - without it, worker spans won't be
192+
// linked to parent traces.
193+
opentelemetry::global::set_text_map_propagator(
194+
opentelemetry_sdk::propagation::TraceContextPropagator::new(),
195+
);
196+
190197
// Set service metadata for tracing.
191198
let resource = Resource::builder()
192199
.with_attribute(KeyValue::new("service.name", "datafusion-tracing"))

integration-utils/Cargo.toml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,16 +36,21 @@ arrow-flight = "56.1"
3636
async-trait = "0.1"
3737
dashmap = "6.1"
3838
datafusion = { workspace = true, features = ["parquet", "nested_expressions"] }
39-
datafusion-distributed = { git = "https://github.com/datafusion-contrib/datafusion-distributed", branch = "main" }
39+
datafusion-distributed = { git = "https://github.com/geoffreyclaude/datafusion-distributed", branch = "feat/tracing" }
40+
# datafusion-distributed = { path = "../../datafusion-distributed" }
4041
datafusion-tracing = { workspace = true }
4142
futures = { workspace = true }
43+
http = "1.0"
4244
hyper-util = "0.1"
4345
instrumented-object-store = { workspace = true }
4446
object_store = { version = "0.12.1", default-features = false }
47+
opentelemetry = { version = "0.30", features = ["trace"] }
48+
pin-project-lite = "0.2"
4549
structopt = "0.3"
4650
tokio = { workspace = true, features = ["full"] }
4751
tokio-stream = "0.1"
4852
tonic = { version = "0.13", features = ["transport"] }
4953
tower = "0.5"
5054
tracing = { workspace = true }
55+
tracing-opentelemetry = { version = "0.31" }
5156
url = { version = "2.5" }

integration-utils/src/distributed/in_memory.rs

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ use datafusion_distributed::{
2828
};
2929
use hyper_util::rt::TokioIo;
3030
use tonic::transport::{Endpoint, Server};
31+
use tower::ServiceBuilder;
32+
33+
use super::trace_middleware::{TracingClientLayer, TracingServerLayer};
3134

3235
const DUMMY_URL: &str = "http://localhost:50051";
3336
const MAX_MESSAGE_SIZE: usize = 2 * 1024 * 1024 * 1024; // 2GB
@@ -55,6 +58,11 @@ impl InMemoryChannelResolver {
5558
async move { Ok::<_, std::io::Error>(TokioIo::new(client)) }
5659
}));
5760

61+
// Wrap the channel with tracing middleware to inject trace context
62+
let channel = ServiceBuilder::new()
63+
.layer(TracingClientLayer)
64+
.service(channel);
65+
5866
let this = Self {
5967
channel: FlightServiceClient::new(BoxCloneSyncChannel::new(channel))
6068
.max_decoding_message_size(MAX_MESSAGE_SIZE)
@@ -76,12 +84,15 @@ impl InMemoryChannelResolver {
7684
.unwrap();
7785

7886
tokio::spawn(async move {
87+
// Wrap the FlightService with tracing middleware to extract trace context
88+
let service = ServiceBuilder::new().layer(TracingServerLayer).service(
89+
FlightServiceServer::new(endpoint)
90+
.max_decoding_message_size(MAX_MESSAGE_SIZE)
91+
.max_encoding_message_size(MAX_MESSAGE_SIZE),
92+
);
93+
7994
Server::builder()
80-
.add_service(
81-
FlightServiceServer::new(endpoint)
82-
.max_decoding_message_size(MAX_MESSAGE_SIZE)
83-
.max_encoding_message_size(MAX_MESSAGE_SIZE),
84-
)
95+
.add_service(service)
8596
.serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
8697
.await
8798
});

0 commit comments

Comments
 (0)