Skip to content

Commit 195b699

Browse files
authored
Add metrics for Limit and Projection, and CoalesceBatches (#1004)
* Add metrics for Limit and Projection, and CoalesceBatches * remove duplication
1 parent 22fcb3d commit 195b699

File tree

4 files changed

+142
-39
lines changed

4 files changed

+142
-39
lines changed

datafusion/src/physical_plan/coalesce_batches.rs

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ use async_trait::async_trait;
3737
use futures::stream::{Stream, StreamExt};
3838
use log::debug;
3939

40-
use super::Statistics;
40+
use super::metrics::{BaselineMetrics, MetricsSet};
41+
use super::{metrics::ExecutionPlanMetricsSet, Statistics};
4142

4243
/// CoalesceBatchesExec combines small batches into larger batches for more efficient use of
4344
/// vectorized processing by upstream operators.
@@ -47,6 +48,8 @@ pub struct CoalesceBatchesExec {
4748
input: Arc<dyn ExecutionPlan>,
4849
/// Minimum number of rows for coalesces batches
4950
target_batch_size: usize,
51+
/// Execution metrics
52+
metrics: ExecutionPlanMetricsSet,
5053
}
5154

5255
impl CoalesceBatchesExec {
@@ -55,6 +58,7 @@ impl CoalesceBatchesExec {
5558
Self {
5659
input,
5760
target_batch_size,
61+
metrics: ExecutionPlanMetricsSet::new(),
5862
}
5963
}
6064

@@ -115,6 +119,7 @@ impl ExecutionPlan for CoalesceBatchesExec {
115119
buffer: Vec::new(),
116120
buffered_rows: 0,
117121
is_closed: false,
122+
baseline_metrics: BaselineMetrics::new(&self.metrics, partition),
118123
}))
119124
}
120125

@@ -134,6 +139,10 @@ impl ExecutionPlan for CoalesceBatchesExec {
134139
}
135140
}
136141

142+
fn metrics(&self) -> Option<MetricsSet> {
143+
Some(self.metrics.clone_inner())
144+
}
145+
137146
fn statistics(&self) -> Statistics {
138147
self.input.statistics()
139148
}
@@ -152,6 +161,8 @@ struct CoalesceBatchesStream {
152161
buffered_rows: usize,
153162
/// Whether the stream has finished returning all of its data or not
154163
is_closed: bool,
164+
/// Execution metrics
165+
baseline_metrics: BaselineMetrics,
155166
}
156167

157168
impl Stream for CoalesceBatchesStream {
@@ -161,6 +172,26 @@ impl Stream for CoalesceBatchesStream {
161172
mut self: Pin<&mut Self>,
162173
cx: &mut Context<'_>,
163174
) -> Poll<Option<Self::Item>> {
175+
let poll = self.poll_next_inner(cx);
176+
self.baseline_metrics.record_poll(poll)
177+
}
178+
179+
fn size_hint(&self) -> (usize, Option<usize>) {
180+
// we can't predict the size of incoming batches so re-use the size hint from the input
181+
self.input.size_hint()
182+
}
183+
}
184+
185+
impl CoalesceBatchesStream {
186+
fn poll_next_inner(
187+
self: &mut Pin<&mut Self>,
188+
cx: &mut Context<'_>,
189+
) -> Poll<Option<ArrowResult<RecordBatch>>> {
190+
// Get a clone (uses same underlying atomic) as self gets borrowed below
191+
let cloned_time = self.baseline_metrics.elapsed_compute().clone();
192+
// records time on drop
193+
let _timer = cloned_time.timer();
194+
164195
if self.is_closed {
165196
return Poll::Ready(None);
166197
}
@@ -221,11 +252,6 @@ impl Stream for CoalesceBatchesStream {
221252
}
222253
}
223254
}
224-
225-
fn size_hint(&self) -> (usize, Option<usize>) {
226-
// we can't predict the size of incoming batches so re-use the size hint from the input
227-
self.input.size_hint()
228-
}
229255
}
230256

231257
impl RecordBatchStream for CoalesceBatchesStream {

datafusion/src/physical_plan/limit.rs

Lines changed: 54 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,10 @@ use arrow::datatypes::SchemaRef;
3535
use arrow::error::Result as ArrowResult;
3636
use arrow::record_batch::RecordBatch;
3737

38-
use super::{RecordBatchStream, SendableRecordBatchStream, Statistics};
38+
use super::{
39+
metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet},
40+
RecordBatchStream, SendableRecordBatchStream, Statistics,
41+
};
3942

4043
use async_trait::async_trait;
4144

@@ -46,12 +49,18 @@ pub struct GlobalLimitExec {
4649
input: Arc<dyn ExecutionPlan>,
4750
/// Maximum number of rows to return
4851
limit: usize,
52+
/// Execution metrics
53+
metrics: ExecutionPlanMetricsSet,
4954
}
5055

5156
impl GlobalLimitExec {
5257
/// Create a new GlobalLimitExec
5358
pub fn new(input: Arc<dyn ExecutionPlan>, limit: usize) -> Self {
54-
GlobalLimitExec { input, limit }
59+
GlobalLimitExec {
60+
input,
61+
limit,
62+
metrics: ExecutionPlanMetricsSet::new(),
63+
}
5564
}
5665

5766
/// Input execution plan
@@ -120,8 +129,13 @@ impl ExecutionPlan for GlobalLimitExec {
120129
));
121130
}
122131

132+
let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
123133
let stream = self.input.execute(0).await?;
124-
Ok(Box::pin(LimitStream::new(stream, self.limit)))
134+
Ok(Box::pin(LimitStream::new(
135+
stream,
136+
self.limit,
137+
baseline_metrics,
138+
)))
125139
}
126140

127141
fn fmt_as(
@@ -136,6 +150,10 @@ impl ExecutionPlan for GlobalLimitExec {
136150
}
137151
}
138152

153+
fn metrics(&self) -> Option<MetricsSet> {
154+
Some(self.metrics.clone_inner())
155+
}
156+
139157
fn statistics(&self) -> Statistics {
140158
let input_stats = self.input.statistics();
141159
match input_stats {
@@ -165,12 +183,18 @@ pub struct LocalLimitExec {
165183
input: Arc<dyn ExecutionPlan>,
166184
/// Maximum number of rows to return
167185
limit: usize,
186+
/// Execution metrics
187+
metrics: ExecutionPlanMetricsSet,
168188
}
169189

170190
impl LocalLimitExec {
171191
/// Create a new LocalLimitExec partition
172192
pub fn new(input: Arc<dyn ExecutionPlan>, limit: usize) -> Self {
173-
Self { input, limit }
193+
Self {
194+
input,
195+
limit,
196+
metrics: ExecutionPlanMetricsSet::new(),
197+
}
174198
}
175199

176200
/// Input execution plan
@@ -219,8 +243,13 @@ impl ExecutionPlan for LocalLimitExec {
219243
}
220244

221245
async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
246+
let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
222247
let stream = self.input.execute(partition).await?;
223-
Ok(Box::pin(LimitStream::new(stream, self.limit)))
248+
Ok(Box::pin(LimitStream::new(
249+
stream,
250+
self.limit,
251+
baseline_metrics,
252+
)))
224253
}
225254

226255
fn fmt_as(
@@ -235,6 +264,10 @@ impl ExecutionPlan for LocalLimitExec {
235264
}
236265
}
237266

267+
fn metrics(&self) -> Option<MetricsSet> {
268+
Some(self.metrics.clone_inner())
269+
}
270+
238271
fn statistics(&self) -> Statistics {
239272
let input_stats = self.input.statistics();
240273
match input_stats {
@@ -280,20 +313,29 @@ struct LimitStream {
280313
schema: SchemaRef,
281314
// the current number of rows which have been produced
282315
current_len: usize,
316+
/// Execution time metrics
317+
baseline_metrics: BaselineMetrics,
283318
}
284319

285320
impl LimitStream {
286-
fn new(input: SendableRecordBatchStream, limit: usize) -> Self {
321+
fn new(
322+
input: SendableRecordBatchStream,
323+
limit: usize,
324+
baseline_metrics: BaselineMetrics,
325+
) -> Self {
287326
let schema = input.schema();
288327
Self {
289328
limit,
290329
input: Some(input),
291330
schema,
292331
current_len: 0,
332+
baseline_metrics,
293333
}
294334
}
295335

296336
fn stream_limit(&mut self, batch: RecordBatch) -> Option<RecordBatch> {
337+
// records time on drop
338+
let _timer = self.baseline_metrics.elapsed_compute().timer();
297339
if self.current_len == self.limit {
298340
self.input = None; // clear input so it can be dropped early
299341
None
@@ -316,14 +358,16 @@ impl Stream for LimitStream {
316358
mut self: Pin<&mut Self>,
317359
cx: &mut Context<'_>,
318360
) -> Poll<Option<Self::Item>> {
319-
match &mut self.input {
361+
let poll = match &mut self.input {
320362
Some(input) => input.poll_next_unpin(cx).map(|x| match x {
321363
Some(Ok(batch)) => Ok(self.stream_limit(batch)).transpose(),
322364
other => other,
323365
}),
324366
// input has been cleared
325367
None => Poll::Ready(None),
326-
}
368+
};
369+
370+
self.baseline_metrics.record_poll(poll)
327371
}
328372
}
329373

@@ -394,7 +438,8 @@ mod tests {
394438

395439
// limit of six needs to consume the entire first record batch
396440
// (5 rows) and 1 row from the second (1 row)
397-
let limit_stream = LimitStream::new(Box::pin(input), 6);
441+
let baseline_metrics = BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
442+
let limit_stream = LimitStream::new(Box::pin(input), 6, baseline_metrics);
398443
assert_eq!(index.value(), 0);
399444

400445
let results = collect(Box::pin(limit_stream)).await.unwrap();

datafusion/src/physical_plan/projection.rs

Lines changed: 29 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use arrow::error::Result as ArrowResult;
3434
use arrow::record_batch::RecordBatch;
3535

3636
use super::expressions::Column;
37+
use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
3738
use super::{RecordBatchStream, SendableRecordBatchStream, Statistics};
3839
use async_trait::async_trait;
3940

@@ -49,6 +50,8 @@ pub struct ProjectionExec {
4950
schema: SchemaRef,
5051
/// The input plan
5152
input: Arc<dyn ExecutionPlan>,
53+
/// Execution metrics
54+
metrics: ExecutionPlanMetricsSet,
5255
}
5356

5457
impl ProjectionExec {
@@ -76,6 +79,7 @@ impl ProjectionExec {
7679
expr,
7780
schema,
7881
input: input.clone(),
82+
metrics: ExecutionPlanMetricsSet::new(),
7983
})
8084
}
8185

@@ -131,6 +135,7 @@ impl ExecutionPlan for ProjectionExec {
131135
schema: self.schema.clone(),
132136
expr: self.expr.iter().map(|x| x.0.clone()).collect(),
133137
input: self.input.execute(partition).await?,
138+
baseline_metrics: BaselineMetrics::new(&self.metrics, partition),
134139
}))
135140
}
136141

@@ -159,6 +164,10 @@ impl ExecutionPlan for ProjectionExec {
159164
}
160165
}
161166

167+
fn metrics(&self) -> Option<MetricsSet> {
168+
Some(self.metrics.clone_inner())
169+
}
170+
162171
fn statistics(&self) -> Statistics {
163172
stats_projection(
164173
self.input.statistics(),
@@ -194,27 +203,28 @@ fn stats_projection(
194203
}
195204
}
196205

197-
fn batch_project(
198-
batch: &RecordBatch,
199-
expressions: &[Arc<dyn PhysicalExpr>],
200-
schema: &SchemaRef,
201-
) -> ArrowResult<RecordBatch> {
202-
expressions
203-
.iter()
204-
.map(|expr| expr.evaluate(batch))
205-
.map(|r| r.map(|v| v.into_array(batch.num_rows())))
206-
.collect::<Result<Vec<_>>>()
207-
.map_or_else(
208-
|e| Err(DataFusionError::into_arrow_external_error(e)),
209-
|arrays| RecordBatch::try_new(schema.clone(), arrays),
210-
)
206+
impl ProjectionStream {
207+
fn batch_project(&self, batch: &RecordBatch) -> ArrowResult<RecordBatch> {
208+
// records time on drop
209+
let _timer = self.baseline_metrics.elapsed_compute().timer();
210+
self.expr
211+
.iter()
212+
.map(|expr| expr.evaluate(batch))
213+
.map(|r| r.map(|v| v.into_array(batch.num_rows())))
214+
.collect::<Result<Vec<_>>>()
215+
.map_or_else(
216+
|e| Err(DataFusionError::into_arrow_external_error(e)),
217+
|arrays| RecordBatch::try_new(self.schema.clone(), arrays),
218+
)
219+
}
211220
}
212221

213222
/// Projection iterator
214223
struct ProjectionStream {
215224
schema: SchemaRef,
216225
expr: Vec<Arc<dyn PhysicalExpr>>,
217226
input: SendableRecordBatchStream,
227+
baseline_metrics: BaselineMetrics,
218228
}
219229

220230
impl Stream for ProjectionStream {
@@ -224,10 +234,12 @@ impl Stream for ProjectionStream {
224234
mut self: Pin<&mut Self>,
225235
cx: &mut Context<'_>,
226236
) -> Poll<Option<Self::Item>> {
227-
self.input.poll_next_unpin(cx).map(|x| match x {
228-
Some(Ok(batch)) => Some(batch_project(&batch, &self.expr, &self.schema)),
237+
let poll = self.input.poll_next_unpin(cx).map(|x| match x {
238+
Some(Ok(batch)) => Some(self.batch_project(&batch)),
229239
other => other,
230-
})
240+
});
241+
242+
self.baseline_metrics.record_poll(poll)
231243
}
232244

233245
fn size_hint(&self) -> (usize, Option<usize>) {

0 commit comments

Comments
 (0)