Closed
Description
Is your feature request related to a problem or challenge?
I've had to write a couple ExecutionPlanVisitor
s recently (see below) and when I started I initially looked for some documentation on this but wasn't able to find any. I think it would be beneficial to new comers to see a couple examples of ExecutionPlanVisitor
in the docs.
Describe the solution you'd like
A section in the docs with some example implementations of ExecutionPlanVisitor
Describe alternatives you've considered
No response
Additional context
These were the ExecutionPlanVisitor
s I made. I would be happy to add docs around these.
struct FileScanVisitor {
file_scan_config: Option<FileScanConfig>,
}
impl ExecutionPlanVisitor for FileScanVisitor {
type Error = anyhow::Error;
fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
let maybe_parquet_exec = plan.as_any().downcast_ref::<ParquetExec>();
if let Some(parquet_exec) = maybe_parquet_exec {
self.file_scan_config = Some(parquet_exec.base_config().clone());
}
Ok(true)
}
}
fn get_file_scan_config(plan: Arc<dyn ExecutionPlan>) -> Option<FileScanConfig> {
let mut visitor = FileScanVisitor {
file_scan_config: None,
};
visit_execution_plan(plan.as_ref(), &mut visitor).unwrap();
visitor.file_scan_config
}
#[derive(Debug)]
struct ParquetVisitor;
impl ExecutionPlanVisitor for ParquetVisitor {
type Error = DataFusionError;
fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
// Get the one-line representation of the ExecutionPlan, something like this:
// ParquetExec: file_groups=[...], ...
let mut buf = String::new();
write!(&mut buf, "{}", displayable(plan).one_line()).map_err(|e| {
DataFusionError::Internal(format!("Error while collecting metrics: {e}"))
})?;
// Trim everything up to the first colon.
// This is a hack to extract a human-readable representation of the ExecutionPlan's type.
// We would prefer if `ExecutionPlan` had `name` method, but this will do,
// since every physical operator seems to follow this convention.
// If a node doesn't, we just skip collecting its metrics, and no harm is done.
let plan_type = match buf.split_once(':') {
None => {
println!("execution plan has unexpected display format: {buf}");
return Ok(true);
}
Some((name, _)) => name.to_string(),
};
let maybe_parquet_exec = plan.as_any().downcast_ref::<ParquetExec>();
match maybe_parquet_exec {
Some(parquet_exec) => {
let metrics = match parquet_exec.metrics() {
None => return Ok(true),
Some(metrics) => metrics,
};
// println!("Metrics: {:?}", metrics);
let bytes_scanned = metrics.sum_by_name("bytes_scanned");
println!("Parquet Bytes scanned: {:?}", bytes_scanned);
}
None => {
}
}
Ok(true)
}
}
I had in mind having two examples, one for getting information from parquet files (I could probably combine the two I had) and one that tracked data across all nodes (maybe output_rows
).