Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ clap = { version = "4.6.0", features = ["derive", "env"] }
criterion = { workspace = true, features = ["html_reports"] }
datafusion = { workspace = true, default-features = true }
datafusion-common = { workspace = true, default-features = true }
datafusion-common-runtime = { workspace = true }
env_logger = { workspace = true }
futures = { workspace = true }
libmimalloc-sys = { version = "0.1", optional = true }
Expand Down
259 changes: 20 additions & 239 deletions benchmarks/benches/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,13 @@
//! Cargo, for example: `BENCH_NAME=tpch cargo bench --bench sql`.

use clap::Parser;
use criterion::{Criterion, SamplingMode, criterion_group, criterion_main};
use datafusion::error::Result;
use datafusion::prelude::SessionContext;
use datafusion_benchmarks::sql_benchmark::SqlBenchmark;
use datafusion_benchmarks::util::{CommonOpt, print_memory_stats};
use criterion::{Criterion, criterion_group, criterion_main};
use datafusion_benchmarks::sql_benchmark_runner::{
BenchmarkFilter, SqlRunConfig, default_sql_benchmark_directory,
run_criterion_benchmarks_impl,
};
use datafusion_benchmarks::util::CommonOpt;
use datafusion_common::instant::Instant;
use log::{debug, info};
use std::collections::BTreeMap;
use std::fs;
use std::sync::LazyLock;
use tokio::runtime::Runtime;

static SQL_BENCHMARK_DIRECTORY: LazyLock<String> = LazyLock::new(|| {
format!(
"{}{}{}",
env!("CARGO_MANIFEST_DIR"),
std::path::MAIN_SEPARATOR,
"sql_benchmarks"
)
});

#[cfg(feature = "snmalloc")]
#[global_allocator]
Expand Down Expand Up @@ -90,234 +77,28 @@ pub fn sql(c: &mut Criterion) {

let start = Instant::now();
let args = EnvParser::parse();
let rt = make_tokio_runtime();
let config = SqlRunConfig {
common: args.options,
filter: BenchmarkFilter {
name: args.name,
subgroup: args.subgroup,
query: args.query,
},
persist_results: args.persist_results,
validate_results: args.validate,
output: None,
};

println!("Loading benchmarks...");

let benchmarks = rt.block_on(async {
let ctx = make_ctx(&args).expect("SessionContext creation failed");

load_benchmarks(&args, &ctx, &SQL_BENCHMARK_DIRECTORY)
.await
.unwrap_or_else(|err| panic!("failed load benchmarks: {err:?}"))
});
run_criterion_benchmarks_impl(&default_sql_benchmark_directory(), &config, c)
.unwrap_or_else(|err| panic!("failed to run SQL benchmarks: {err:?}"));

println!(
"Loaded benchmarks in {} ms ...",
"Completed benchmarks in {} ms ...",
start.elapsed().as_millis()
);

for (group, benchmarks) in benchmarks {
let mut group = c.benchmark_group(group);
group.sample_size(10);
group.sampling_mode(SamplingMode::Flat);

for mut benchmark in benchmarks {
// create a context
let ctx = make_ctx(&args).expect("SessionContext creation failed");

// initialize the benchmark. This parses the benchmark file and does any pre-execution
// work such as loading data into tables
rt.block_on(async {
benchmark
.initialize(&ctx)
.await
.expect("initialization failed");

// run assertions
benchmark.assert(&ctx).await.expect("assertion failed");
});

let mut name = benchmark.name().to_string();
if !benchmark.subgroup().is_empty() {
name.push('_');
name.push_str(benchmark.subgroup());
}

if args.persist_results {
handle_persist(&rt, &ctx, &name, &mut benchmark);
} else if args.validate {
handle_verify(&rt, &ctx, &name, &mut benchmark);
} else {
info!("Running benchmark {name} ...");

let name = name.clone();
group.bench_function(name.clone(), |b| {
b.iter(|| handle_run(&rt, &ctx, &args, &mut benchmark, &name))
});

print_memory_stats();

info!("Benchmark {name} completed");
}

// run cleanup
rt.block_on(async {
benchmark.cleanup(&ctx).await.expect("Cleanup failed");
});
}

group.finish();
}
}

fn handle_run(
rt: &Runtime,
ctx: &SessionContext,
args: &EnvParser,
benchmark: &mut SqlBenchmark,
name: &str,
) {
rt.block_on(async {
benchmark
.run(ctx, args.validate)
.await
.unwrap_or_else(|err| panic!("Failed to run benchmark {name}: {err:?}"))
});
}

fn handle_persist(
rt: &Runtime,
ctx: &SessionContext,
name: &str,
benchmark: &mut SqlBenchmark,
) {
info!("Running benchmark {name} prior to persisting results ...");

rt.block_on(async {
info!("Persisting benchmark {name} ...");

benchmark
.persist(ctx)
.await
.expect("Failed to persist results");
});

info!("Persisted benchmark {name} successfully");
}

fn handle_verify(
rt: &Runtime,
ctx: &SessionContext,
name: &str,
benchmark: &mut SqlBenchmark,
) {
info!("Verifying benchmark {name} results ...");

rt.block_on(async {
benchmark
.run(ctx, true)
.await
.unwrap_or_else(|err| panic!("Failed to run benchmark {name}: {err:?}"));
benchmark
.verify(ctx)
.await
.unwrap_or_else(|err| panic!("Verification failed: {err:?}"));
});

info!("Verified benchmark {name} results successfully");
}

criterion_group!(benches, sql);
criterion_main!(benches);

fn make_tokio_runtime() -> Runtime {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
}

fn make_ctx(args: &EnvParser) -> Result<SessionContext> {
let config = args.options.config()?;
let rt = args.options.build_runtime()?;

Ok(SessionContext::new_with_config_rt(config, rt))
}

/// Recursively walks the directory tree starting at `path` and
/// calls the call back function for every file encountered.
pub fn list_files<F>(path: &str, callback: &mut F)
where
F: FnMut(&str),
{
let mut entries: Vec<fs::DirEntry> =
fs::read_dir(path).unwrap().filter_map(Result::ok).collect();
entries.sort_by_key(|entry| entry.path());

for dir_entry in entries {
let path = dir_entry.path();
if path.is_dir() {
// Recurse into the sub‑directory
list_files(&path.to_string_lossy(), callback);
} else {
// For files, invoke the callback with the full path as a string
let full_str = path.to_string_lossy();
callback(&full_str);
}
}
}

/// Loads all benchmark files in the `sql_benchmarks` directory.
/// For each file ending with `.benchmark` it creates a new
/// `SqlBenchmark` instance.
async fn load_benchmarks(
args: &EnvParser,
ctx: &SessionContext,
path: &str,
) -> Result<BTreeMap<String, Vec<SqlBenchmark>>> {
let mut benches = BTreeMap::new();
let mut paths = Vec::new();

list_files(path, &mut |path: &str| {
if path.ends_with(".benchmark") {
paths.push(path.to_string());
}
});

for path in paths {
debug!("Loading benchmark from {path}");

let benchmark = SqlBenchmark::new(ctx, &path, &*SQL_BENCHMARK_DIRECTORY).await?;
let entries = benches
.entry(benchmark.group().to_string())
.or_insert(vec![]);

entries.push(benchmark);
}

benches = filter_benchmarks(args, benches);
benches.iter_mut().for_each(|(_, benchmarks)| {
benchmarks.sort_by(|b1, b2| b1.name().cmp(b2.name()))
});

Ok(benches)
}

fn filter_benchmarks(
args: &EnvParser,
benchmarks: BTreeMap<String, Vec<SqlBenchmark>>,
) -> BTreeMap<String, Vec<SqlBenchmark>> {
match &args.name {
Some(bench_name) => benchmarks
.into_iter()
.filter(|(key, _val)| key.eq_ignore_ascii_case(bench_name))
.map(|(key, mut val)| {
if let Some(subgroup) = &args.subgroup {
val.retain(|bench| bench.subgroup().eq_ignore_ascii_case(subgroup));
}
if let Some(query) = &args.query {
// Accept `1`, `01`, `6a`, `Q06a`, ... case-insensitively.
// Bench names are canonical, e.g. `Q01`, `Q06a`.
let q = query.trim_start_matches(['Q', 'q']);
let split = q.find(|c: char| !c.is_ascii_digit()).unwrap_or(q.len());
let (num, suffix) = q.split_at(split);
let normalized = format!("Q{num:0>2}{suffix}");
val.retain(|bench| bench.name().eq_ignore_ascii_case(&normalized));
}
(key, val)
})
.collect(),
None => benchmarks,
}
}
39 changes: 39 additions & 0 deletions benchmarks/src/bin/benchmark_runner.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! DataFusion SQL benchmark runner.

use datafusion_benchmarks::sql_benchmark_runner;

#[cfg(feature = "snmalloc")]
#[global_allocator]
static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;

// `cargo clippy --all-features` enables both allocator features, so prefer
// `snmalloc` in that case and fall back to `mimalloc` otherwise.
#[cfg(all(not(feature = "snmalloc"), feature = "mimalloc"))]
#[global_allocator]
static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc;

#[tokio::main]
async fn main() {
env_logger::init();
if let Err(error) = sql_benchmark_runner::run_cli().await {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor nit -- it might make sense to move run_cli and relevant code such as argument processing, main, etc in the actual binary rather than a function in the a crate.

I don't think the argument processing will be used by anything other than this binary

eprintln!("Error: {error}");
std::process::exit(1);
}
}
1 change: 1 addition & 0 deletions benchmarks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub mod smj;
pub mod sort_pushdown;
pub mod sort_tpch;
pub mod sql_benchmark;
pub mod sql_benchmark_runner;
pub mod tpcds;
pub mod tpch;
pub mod util;
Loading
Loading