Skip to content

Commit ee6040c

Browse files
test: add new distributed TPCH example
1 parent 61223f3 commit ee6040c

File tree

26 files changed

+789
-5
lines changed

26 files changed

+789
-5
lines changed

dev/generate_tpch_parquet.sh

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,10 @@ if ! command -v tpchgen-cli >/dev/null 2>&1; then
3333
exit 1
3434
fi
3535

36-
echo "Generating TPCH Parquet data (SF=0.1) into $OUT_DIR"
36+
echo "Generating TPCH Parquet data (SF=1) into $OUT_DIR"
3737

3838
# Generate all tables, single file per table, Parquet format
39-
tpchgen-cli --scale-factor 0.1 --format parquet --output-dir "$OUT_DIR"
39+
tpchgen-cli --scale-factor 1 --format parquet --output-dir "$OUT_DIR"
4040

4141
echo "TPCH Parquet generation complete. Files in $OUT_DIR:"
4242
ls -1 "$OUT_DIR" | sed 's/^/ - /'

examples/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ workspace = true
3535
name = "otlp"
3636
path = "otlp.rs"
3737

38+
[[example]]
39+
name = "distributed_otlp"
40+
path = "distributed_otlp.rs"
41+
3842
[dependencies]
3943
datafusion = { workspace = true, features = ["parquet", "nested_expressions"] }
4044
integration-utils = { path = "../integration-utils" }

examples/distributed_otlp.rs

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
//
18+
// This product includes software developed at Datadog (https://www.datadoghq.com/) Copyright 2025 Datadog, Inc.
19+
20+
//! # Distributed OTLP Example
21+
//!
22+
//! This example demonstrates OpenTelemetry integration with DataFusion for distributed tracing.
23+
//! It shows how to:
24+
//!
25+
//! - Configure OpenTelemetry with an OTLP exporter
26+
//! - Set up service metadata and tracing layers
27+
//! - Instrument DataFusion query execution for distributed plans
28+
//! - Execute multiple queries with separate contexts
29+
//!
30+
//! ## Prerequisites
31+
//!
32+
//! Before running this example, you'll need an OpenTelemetry collector. For local development,
33+
//! Jaeger is recommended:
34+
//!
35+
//! ```bash
36+
//! docker run --rm --name jaeger \
37+
//! -p 16686:16686 \
38+
//! -p 4317:4317 \
39+
//! jaegertracing/jaeger:2.7.0
40+
//! ```
41+
//!
42+
//! After starting Jaeger, you can view traces at http://localhost:16686
43+
//!
44+
//! ## Running the Example
45+
//!
46+
//! ```bash
47+
//! cargo run --example distributed_otlp
48+
//! ```
49+
//!
50+
//! This example executes all 22 TPCH benchmark queries with distributed query execution enabled.
51+
52+
use std::time::Duration;
53+
54+
use datafusion::{common::internal_datafusion_err, error::Result};
55+
use integration_utils::{init_session, run_traced_query};
56+
use opentelemetry::{KeyValue, trace::TracerProvider};
57+
use opentelemetry_otlp::WithExportConfig;
58+
use opentelemetry_sdk::{Resource, trace::Sampler};
59+
use tracing::{Instrument, Level};
60+
use tracing_subscriber::{Registry, fmt, prelude::*};
61+
62+
#[tokio::main]
63+
async fn main() -> Result<()> {
64+
// Initialize tracing infrastructure and obtain a tracer provider.
65+
let tracer_provider = init_tracing()?;
66+
67+
// Run the example under the root span.
68+
run_distributed_otlp_example().await?;
69+
70+
// Properly shutdown tracing to ensure all data is flushed.
71+
tracer_provider
72+
.shutdown()
73+
.map_err(|e| internal_datafusion_err!("Tracer shutdown error: {}", e))
74+
}
75+
76+
async fn run_distributed_otlp_example() -> Result<()> {
77+
// Loop over all 22 TPCH queries
78+
for i in 1..=22 {
79+
let query_name = format!("tpch/q{}", i);
80+
81+
// Create a new root span for each query to ensure independent traces.
82+
// This span will be the root of a new trace tree.
83+
let span = tracing::info_span!("tpch_query", query = %query_name, query_num = i);
84+
85+
// Execute the query within the new root span context.
86+
async {
87+
tracing::info!("Running TPCH query: {}", query_name);
88+
89+
// Initialize a distinct DataFusion session context for each query.
90+
let ctx = init_session(false, true, 5, true, true).await?;
91+
92+
// Run the SQL query with tracing enabled.
93+
run_traced_query(&ctx, &query_name).await
94+
}
95+
.instrument(span)
96+
.await?;
97+
}
98+
99+
Ok(())
100+
}
101+
102+
/// Initializes OpenTelemetry and tracing infrastructure to enable tracing of query execution.
103+
fn init_tracing() -> Result<opentelemetry_sdk::trace::SdkTracerProvider> {
104+
// Set service metadata for tracing.
105+
let resource = Resource::builder()
106+
.with_attribute(KeyValue::new("service.name", "datafusion-tracing"))
107+
.build();
108+
109+
// Configure an OTLP exporter to send tracing data.
110+
let exporter = opentelemetry_otlp::SpanExporter::builder()
111+
.with_tonic()
112+
.with_endpoint("http://localhost:4317") // Endpoint for OTLP collector.
113+
.with_timeout(Duration::from_secs(10))
114+
.build()
115+
.map_err(|e| internal_datafusion_err!("OTLP exporter error: {}", e))?;
116+
117+
// Create a tracer provider configured with the exporter and sampling strategy.
118+
let tracer_provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
119+
.with_batch_exporter(exporter)
120+
.with_resource(resource)
121+
.with_sampler(Sampler::AlwaysOn)
122+
.build();
123+
124+
// Obtain a tracer instance for recording tracing information.
125+
let tracer = tracer_provider.tracer("datafusion-tracing-query");
126+
127+
// Create a telemetry layer using the tracer to collect and filter tracing data at INFO level.
128+
let telemetry_layer = tracing_opentelemetry::layer()
129+
.with_tracer(tracer)
130+
.with_filter(tracing::level_filters::LevelFilter::INFO);
131+
132+
// Create a formatting layer to output logs to stdout, including thread IDs and names.
133+
let fmt_layer = fmt::layer()
134+
.with_thread_ids(true)
135+
.with_thread_names(true)
136+
.with_writer(std::io::stdout.with_max_level(Level::INFO));
137+
138+
// Combine the telemetry and formatting layers into a tracing subscriber and initialize it.
139+
Registry::default()
140+
.with(telemetry_layer)
141+
.with(fmt_layer)
142+
.init();
143+
144+
// Return the configured tracer provider
145+
Ok(tracer_provider)
146+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
select
2+
l_returnflag,
3+
l_linestatus,
4+
sum(l_quantity) as sum_qty,
5+
sum(l_extendedprice) as sum_base_price,
6+
sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
7+
sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
8+
avg(l_quantity) as avg_qty,
9+
avg(l_extendedprice) as avg_price,
10+
avg(l_discount) as avg_disc,
11+
count(*) as count_order
12+
from
13+
lineitem
14+
where
15+
l_shipdate <= date '1998-09-02'
16+
group by
17+
l_returnflag,
18+
l_linestatus
19+
order by
20+
l_returnflag,
21+
l_linestatus;
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
select
2+
c_custkey,
3+
c_name,
4+
sum(l_extendedprice * (1 - l_discount)) as revenue,
5+
c_acctbal,
6+
n_name,
7+
c_address,
8+
c_phone,
9+
c_comment
10+
from
11+
customer,
12+
orders,
13+
lineitem,
14+
nation
15+
where
16+
c_custkey = o_custkey
17+
and l_orderkey = o_orderkey
18+
and o_orderdate >= date '1993-10-01'
19+
and o_orderdate < date '1994-01-01'
20+
and l_returnflag = 'R'
21+
and c_nationkey = n_nationkey
22+
group by
23+
c_custkey,
24+
c_name,
25+
c_acctbal,
26+
c_phone,
27+
n_name,
28+
c_address,
29+
c_comment
30+
order by
31+
revenue desc;
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
select
2+
ps_partkey,
3+
sum(ps_supplycost * ps_availqty) as value
4+
from
5+
partsupp,
6+
supplier,
7+
nation
8+
where
9+
ps_suppkey = s_suppkey
10+
and s_nationkey = n_nationkey
11+
and n_name = 'GERMANY'
12+
group by
13+
ps_partkey having
14+
sum(ps_supplycost * ps_availqty) > (
15+
select
16+
sum(ps_supplycost * ps_availqty) * 0.0001
17+
from
18+
partsupp,
19+
supplier,
20+
nation
21+
where
22+
ps_suppkey = s_suppkey
23+
and s_nationkey = n_nationkey
24+
and n_name = 'GERMANY'
25+
)
26+
order by
27+
value desc;
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
select
2+
l_shipmode,
3+
sum(case
4+
when o_orderpriority = '1-URGENT'
5+
or o_orderpriority = '2-HIGH'
6+
then 1
7+
else 0
8+
end) as high_line_count,
9+
sum(case
10+
when o_orderpriority <> '1-URGENT'
11+
and o_orderpriority <> '2-HIGH'
12+
then 1
13+
else 0
14+
end) as low_line_count
15+
from
16+
lineitem
17+
join
18+
orders
19+
on
20+
l_orderkey = o_orderkey
21+
where
22+
l_shipmode in ('MAIL', 'SHIP')
23+
and l_commitdate < l_receiptdate
24+
and l_shipdate < l_commitdate
25+
and l_receiptdate >= date '1994-01-01'
26+
and l_receiptdate < date '1995-01-01'
27+
group by
28+
l_shipmode
29+
order by
30+
l_shipmode;
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
select
2+
c_count,
3+
count(*) as custdist
4+
from
5+
(
6+
select
7+
c_custkey,
8+
count(o_orderkey)
9+
from
10+
customer left outer join orders on
11+
c_custkey = o_custkey
12+
and o_comment not like '%special%requests%'
13+
group by
14+
c_custkey
15+
) as c_orders (c_custkey, c_count)
16+
group by
17+
c_count
18+
order by
19+
custdist desc,
20+
c_count desc;
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
select
2+
100.00 * sum(case
3+
when p_type like 'PROMO%'
4+
then l_extendedprice * (1 - l_discount)
5+
else 0
6+
end) / sum(l_extendedprice * (1 - l_discount)) as promo_revenue
7+
from
8+
lineitem,
9+
part
10+
where
11+
l_partkey = p_partkey
12+
and l_shipdate >= date '1995-09-01'
13+
and l_shipdate < date '1995-10-01';
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
with revenue0 (supplier_no, total_revenue) as (
2+
select
3+
l_suppkey,
4+
sum(l_extendedprice * (1 - l_discount))
5+
from
6+
lineitem
7+
where
8+
l_shipdate >= date '1996-01-01'
9+
and l_shipdate < date '1996-01-01' + interval '3' month
10+
group by
11+
l_suppkey
12+
)
13+
select
14+
s_suppkey,
15+
s_name,
16+
s_address,
17+
s_phone,
18+
total_revenue
19+
from
20+
supplier,
21+
revenue0
22+
where
23+
s_suppkey = supplier_no
24+
and total_revenue = (
25+
select
26+
max(total_revenue)
27+
from
28+
revenue0
29+
)
30+
order by
31+
s_suppkey;

0 commit comments

Comments
 (0)