Skip to content

Commit 8e4572d

Browse files
authored
Merge branch 'main' into dev
2 parents a039419 + 555fc2e commit 8e4572d

File tree

319 files changed

+15417
-8377
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

319 files changed

+15417
-8377
lines changed

.asf.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@ github:
5050
main:
5151
required_pull_request_reviews:
5252
required_approving_review_count: 1
53+
pull_requests:
54+
# enable updating head branches of pull requests
55+
allow_update_branch: true
5356

5457
# publishes the content of the `asf-site` branch to
5558
# https://datafusion.apache.org/

Cargo.lock

Lines changed: 16 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ members = [
4848
"datafusion/proto-common",
4949
"datafusion/proto-common/gen",
5050
"datafusion/session",
51+
"datafusion/spark",
5152
"datafusion/sql",
5253
"datafusion/sqllogictest",
5354
"datafusion/substrait",
@@ -138,6 +139,7 @@ datafusion-physical-plan = { path = "datafusion/physical-plan", version = "47.0.
138139
datafusion-proto = { path = "datafusion/proto", version = "47.0.0" }
139140
datafusion-proto-common = { path = "datafusion/proto-common", version = "47.0.0" }
140141
datafusion-session = { path = "datafusion/session", version = "47.0.0" }
142+
datafusion-spark = { path = "datafusion/spark", version = "47.0.0" }
141143
datafusion-sql = { path = "datafusion/sql", version = "47.0.0" }
142144
doc-comment = "0.3"
143145
env_logger = "0.11"

NOTICE.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
Apache DataFusion
2-
Copyright 2019-2024 The Apache Software Foundation
2+
Copyright 2019-2025 The Apache Software Foundation
33

44
This product includes software developed at
5-
The Apache Software Foundation (http://www.apache.org/).
5+
The Apache Software Foundation (http://www.apache.org/).

benchmarks/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ snmalloc-rs = { version = "0.3", optional = true }
5151
structopt = { version = "0.3", default-features = false }
5252
test-utils = { path = "../test-utils/", version = "0.1.0" }
5353
tokio = { workspace = true, features = ["rt-multi-thread", "parking_lot"] }
54-
tokio-util = { version = "0.7.14" }
54+
tokio-util = { version = "0.7.15" }
5555

5656
[dev-dependencies]
5757
datafusion-proto = { workspace = true }

benchmarks/queries/clickbench/README.md

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ Results look like
112112
Note this query is somewhat synthetic as "WatchID" is almost unique (there are a few duplicates)
113113

114114
```sql
115-
SELECT "ClientIP", "WatchID", COUNT(*) c, MIN("ResponseStartTiming") tmin, APPROX_PERCENTILE_CONT("ResponseStartTiming", 0.95) tp95, MAX("ResponseStartTiming") tmax
115+
SELECT "ClientIP", "WatchID", COUNT(*) c, MIN("ResponseStartTiming") tmin, APPROX_PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY "ResponseStartTiming") tp95, MAX("ResponseStartTiming") tmax
116116
FROM 'hits.parquet'
117117
WHERE "JavaEnable" = 0 -- filters to 32M of 100M rows
118118
GROUP BY "ClientIP", "WatchID"
@@ -132,6 +132,7 @@ Results look like
132132
```
133133

134134
### Q6: How many social shares meet complex multi-stage filtering criteria?
135+
135136
**Question**: What is the count of sharing actions from iPhone mobile users on specific social networks, within common timezones, participating in seasonal campaigns, with high screen resolutions and closely matched UTM parameters?
136137
**Important Query Properties**: Simple filter with high-selectivity, Costly string matching, A large number of filters with high overhead are positioned relatively later in the process
137138

@@ -159,6 +160,37 @@ WHERE
159160
```
160161
Result is empty,Since it has already been filtered by `"SocialAction" = 'share'`.
161162

163+
### Q7: Device Resolution and Refresh Behavior Analysis
164+
165+
**Question**: Identify the top 10 WatchIDs with the highest resolution range (min/max "ResolutionWidth") and total refresh count ("IsRefresh") in descending WatchID order
166+
167+
**Important Query Properties**: Primitive aggregation functions, group by single primitive column, high cardinality grouping
168+
169+
```sql
170+
SELECT "WatchID", MIN("ResolutionWidth") as wmin, MAX("ResolutionWidth") as wmax, SUM("IsRefresh") as srefresh
171+
FROM hits
172+
GROUP BY "WatchID"
173+
ORDER BY "WatchID" DESC
174+
LIMIT 10;
175+
```
176+
177+
Results look like
178+
```
179+
+---------------------+------+------+----------+
180+
| WatchID | wmin | wmax | srefresh |
181+
+---------------------+------+------+----------+
182+
| 9223372033328793741 | 1368 | 1368 | 0 |
183+
| 9223371941779979288 | 1479 | 1479 | 0 |
184+
| 9223371906781104763 | 1638 | 1638 | 0 |
185+
| 9223371803397398692 | 1990 | 1990 | 0 |
186+
| 9223371799215233959 | 1638 | 1638 | 0 |
187+
| 9223371785975219972 | 0 | 0 | 0 |
188+
| 9223371776706839366 | 1368 | 1368 | 0 |
189+
| 9223371740707848038 | 1750 | 1750 | 0 |
190+
| 9223371715190479830 | 1368 | 1368 | 0 |
191+
| 9223371620124912624 | 1828 | 1828 | 0 |
192+
+---------------------+------+------+----------+
193+
```
162194

163195
## Data Notes
164196

benchmarks/queries/clickbench/extended.sql

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,6 @@ SELECT COUNT(DISTINCT "HitColor"), COUNT(DISTINCT "BrowserCountry"), COUNT(DISTI
33
SELECT "BrowserCountry", COUNT(DISTINCT "SocialNetwork"), COUNT(DISTINCT "HitColor"), COUNT(DISTINCT "BrowserLanguage"), COUNT(DISTINCT "SocialAction") FROM hits GROUP BY 1 ORDER BY 2 DESC LIMIT 10;
44
SELECT "SocialSourceNetworkID", "RegionID", COUNT(*), AVG("Age"), AVG("ParamPrice"), STDDEV("ParamPrice") as s, VAR("ParamPrice") FROM hits GROUP BY "SocialSourceNetworkID", "RegionID" HAVING s IS NOT NULL ORDER BY s DESC LIMIT 10;
55
SELECT "ClientIP", "WatchID", COUNT(*) c, MIN("ResponseStartTiming") tmin, MEDIAN("ResponseStartTiming") tmed, MAX("ResponseStartTiming") tmax FROM hits WHERE "JavaEnable" = 0 GROUP BY "ClientIP", "WatchID" HAVING c > 1 ORDER BY tmed DESC LIMIT 10;
6-
SELECT "ClientIP", "WatchID", COUNT(*) c, MIN("ResponseStartTiming") tmin, APPROX_PERCENTILE_CONT("ResponseStartTiming", 0.95) tp95, MAX("ResponseStartTiming") tmax FROM 'hits' WHERE "JavaEnable" = 0 GROUP BY "ClientIP", "WatchID" HAVING c > 1 ORDER BY tp95 DESC LIMIT 10;
6+
SELECT "ClientIP", "WatchID", COUNT(*) c, MIN("ResponseStartTiming") tmin, APPROX_PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY "ResponseStartTiming") tp95, MAX("ResponseStartTiming") tmax FROM 'hits' WHERE "JavaEnable" = 0 GROUP BY "ClientIP", "WatchID" HAVING c > 1 ORDER BY tp95 DESC LIMIT 10;
77
SELECT COUNT(*) AS ShareCount FROM hits WHERE "IsMobile" = 1 AND "MobilePhoneModel" LIKE 'iPhone%' AND "SocialAction" = 'share' AND "SocialSourceNetworkID" IN (5, 12) AND "ClientTimeZone" BETWEEN -5 AND 5 AND regexp_match("Referer", '\/campaign\/(spring|summer)_promo') IS NOT NULL AND CASE WHEN split_part(split_part("URL", 'resolution=', 2), '&', 1) ~ '^\d+$' THEN split_part(split_part("URL", 'resolution=', 2), '&', 1)::INT ELSE 0 END > 1920 AND levenshtein(CAST("UTMSource" AS STRING), CAST("UTMCampaign" AS STRING)) < 3;
8+
SELECT "WatchID", MIN("ResolutionWidth") as wmin, MAX("ResolutionWidth") as wmax, SUM("IsRefresh") as srefresh FROM hits GROUP BY "WatchID" ORDER BY "WatchID" DESC LIMIT 10;

datafusion-cli/src/command.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,15 @@ impl Command {
6464
let command_batch = all_commands_info();
6565
let schema = command_batch.schema();
6666
let num_rows = command_batch.num_rows();
67-
print_options.print_batches(schema, &[command_batch], now, num_rows)
67+
let task_ctx = ctx.task_ctx();
68+
let config = &task_ctx.session_config().options().format;
69+
print_options.print_batches(
70+
schema,
71+
&[command_batch],
72+
now,
73+
num_rows,
74+
config,
75+
)
6876
}
6977
Self::ListTables => {
7078
exec_and_print(ctx, print_options, "SHOW TABLES".into()).await

datafusion-cli/src/exec.rs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,8 @@ pub(super) async fn exec_and_print(
216216
) -> Result<()> {
217217
let now = Instant::now();
218218
let task_ctx = ctx.task_ctx();
219-
let dialect = &task_ctx.session_config().options().sql_parser.dialect;
219+
let options = task_ctx.session_config().options();
220+
let dialect = &options.sql_parser.dialect;
220221
let dialect = dialect_from_str(dialect).ok_or_else(|| {
221222
plan_datafusion_err!(
222223
"Unsupported SQL dialect: {dialect}. Available dialects: \
@@ -250,7 +251,9 @@ pub(super) async fn exec_and_print(
250251
// As the input stream comes, we can generate results.
251252
// However, memory safety is not guaranteed.
252253
let stream = execute_stream(physical_plan, task_ctx.clone())?;
253-
print_options.print_stream(stream, now).await?;
254+
print_options
255+
.print_stream(stream, now, &options.format)
256+
.await?;
254257
} else {
255258
// Bounded stream; collected results size is limited by the maxrows option
256259
let schema = physical_plan.schema();
@@ -273,9 +276,13 @@ pub(super) async fn exec_and_print(
273276
}
274277
row_count += curr_num_rows;
275278
}
276-
adjusted
277-
.into_inner()
278-
.print_batches(schema, &results, now, row_count)?;
279+
adjusted.into_inner().print_batches(
280+
schema,
281+
&results,
282+
now,
283+
row_count,
284+
&options.format,
285+
)?;
279286
reservation.free();
280287
}
281288
}

datafusion-cli/src/main.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -177,13 +177,14 @@ async fn main_inner() -> Result<()> {
177177

178178
// set disk limit
179179
if let Some(disk_limit) = args.disk_limit {
180-
let disk_manager = DiskManager::try_new(DiskManagerConfig::NewOs)?;
180+
let mut disk_manager = DiskManager::try_new(DiskManagerConfig::NewOs)?;
181181

182-
let disk_manager = Arc::try_unwrap(disk_manager)
183-
.expect("DiskManager should be a single instance")
184-
.with_max_temp_directory_size(disk_limit.try_into().unwrap())?;
182+
DiskManager::set_arc_max_temp_directory_size(
183+
&mut disk_manager,
184+
disk_limit.try_into().unwrap(),
185+
)?;
185186

186-
let disk_config = DiskManagerConfig::new_existing(Arc::new(disk_manager));
187+
let disk_config = DiskManagerConfig::new_existing(disk_manager);
187188
rt_builder = rt_builder.with_disk_manager(disk_config);
188189
}
189190

@@ -265,6 +266,11 @@ fn get_session_config(args: &Args) -> Result<SessionConfig> {
265266
config_options.explain.format = String::from("tree");
266267
}
267268

269+
// in the CLI, we want to show NULL values rather the empty strings
270+
if env::var_os("DATAFUSION_FORMAT_NULL").is_none() {
271+
config_options.format.null = String::from("NULL");
272+
}
273+
268274
let session_config =
269275
SessionConfig::from(config_options).with_information_schema(true);
270276
Ok(session_config)

datafusion-cli/src/print_format.rs

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use arrow::datatypes::SchemaRef;
2626
use arrow::json::{ArrayWriter, LineDelimitedWriter};
2727
use arrow::record_batch::RecordBatch;
2828
use arrow::util::pretty::pretty_format_batches_with_options;
29-
use datafusion::common::format::DEFAULT_CLI_FORMAT_OPTIONS;
29+
use datafusion::config::FormatOptions;
3030
use datafusion::error::Result;
3131

3232
/// Allow records to be printed in different formats
@@ -110,7 +110,10 @@ fn format_batches_with_maxrows<W: std::io::Write>(
110110
writer: &mut W,
111111
batches: &[RecordBatch],
112112
maxrows: MaxRows,
113+
format_options: &FormatOptions,
113114
) -> Result<()> {
115+
let options: arrow::util::display::FormatOptions = format_options.try_into()?;
116+
114117
match maxrows {
115118
MaxRows::Limited(maxrows) => {
116119
// Filter batches to meet the maxrows condition
@@ -131,10 +134,8 @@ fn format_batches_with_maxrows<W: std::io::Write>(
131134
}
132135
}
133136

134-
let formatted = pretty_format_batches_with_options(
135-
&filtered_batches,
136-
&DEFAULT_CLI_FORMAT_OPTIONS,
137-
)?;
137+
let formatted =
138+
pretty_format_batches_with_options(&filtered_batches, &options)?;
138139
if over_limit {
139140
let mut formatted_str = format!("{}", formatted);
140141
formatted_str = keep_only_maxrows(&formatted_str, maxrows);
@@ -144,8 +145,7 @@ fn format_batches_with_maxrows<W: std::io::Write>(
144145
}
145146
}
146147
MaxRows::Unlimited => {
147-
let formatted =
148-
pretty_format_batches_with_options(batches, &DEFAULT_CLI_FORMAT_OPTIONS)?;
148+
let formatted = pretty_format_batches_with_options(batches, &options)?;
149149
writeln!(writer, "{}", formatted)?;
150150
}
151151
}
@@ -162,6 +162,7 @@ impl PrintFormat {
162162
batches: &[RecordBatch],
163163
maxrows: MaxRows,
164164
with_header: bool,
165+
format_options: &FormatOptions,
165166
) -> Result<()> {
166167
// filter out any empty batches
167168
let batches: Vec<_> = batches
@@ -170,7 +171,7 @@ impl PrintFormat {
170171
.cloned()
171172
.collect();
172173
if batches.is_empty() {
173-
return self.print_empty(writer, schema);
174+
return self.print_empty(writer, schema, format_options);
174175
}
175176

176177
match self {
@@ -182,7 +183,7 @@ impl PrintFormat {
182183
if maxrows == MaxRows::Limited(0) {
183184
return Ok(());
184185
}
185-
format_batches_with_maxrows(writer, &batches, maxrows)
186+
format_batches_with_maxrows(writer, &batches, maxrows, format_options)
186187
}
187188
Self::Json => batches_to_json!(ArrayWriter, writer, &batches),
188189
Self::NdJson => batches_to_json!(LineDelimitedWriter, writer, &batches),
@@ -194,15 +195,17 @@ impl PrintFormat {
194195
&self,
195196
writer: &mut W,
196197
schema: SchemaRef,
198+
format_options: &FormatOptions,
197199
) -> Result<()> {
198200
match self {
199201
// Print column headers for Table format
200202
Self::Table if !schema.fields().is_empty() => {
203+
let format_options: arrow::util::display::FormatOptions =
204+
format_options.try_into()?;
205+
201206
let empty_batch = RecordBatch::new_empty(schema);
202-
let formatted = pretty_format_batches_with_options(
203-
&[empty_batch],
204-
&DEFAULT_CLI_FORMAT_OPTIONS,
205-
)?;
207+
let formatted =
208+
pretty_format_batches_with_options(&[empty_batch], &format_options)?;
206209
writeln!(writer, "{}", formatted)?;
207210
}
208211
_ => {}
@@ -644,6 +647,7 @@ mod tests {
644647
&self.batches,
645648
self.maxrows,
646649
with_header,
650+
&FormatOptions::default(),
647651
)
648652
.unwrap();
649653
String::from_utf8(buffer).unwrap()

datafusion-cli/src/print_options.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use datafusion::common::DataFusionError;
2929
use datafusion::error::Result;
3030
use datafusion::physical_plan::RecordBatchStream;
3131

32+
use datafusion::config::FormatOptions;
3233
use futures::StreamExt;
3334

3435
#[derive(Debug, Clone, PartialEq, Copy)]
@@ -103,12 +104,19 @@ impl PrintOptions {
103104
batches: &[RecordBatch],
104105
query_start_time: Instant,
105106
row_count: usize,
107+
format_options: &FormatOptions,
106108
) -> Result<()> {
107109
let stdout = std::io::stdout();
108110
let mut writer = stdout.lock();
109111

110-
self.format
111-
.print_batches(&mut writer, schema, batches, self.maxrows, true)?;
112+
self.format.print_batches(
113+
&mut writer,
114+
schema,
115+
batches,
116+
self.maxrows,
117+
true,
118+
format_options,
119+
)?;
112120

113121
let formatted_exec_details = get_execution_details_formatted(
114122
row_count,
@@ -132,6 +140,7 @@ impl PrintOptions {
132140
&self,
133141
mut stream: Pin<Box<dyn RecordBatchStream>>,
134142
query_start_time: Instant,
143+
format_options: &FormatOptions,
135144
) -> Result<()> {
136145
if self.format == PrintFormat::Table {
137146
return Err(DataFusionError::External(
@@ -154,6 +163,7 @@ impl PrintOptions {
154163
&[batch],
155164
MaxRows::Unlimited,
156165
with_header,
166+
format_options,
157167
)?;
158168
with_header = false;
159169
}

datafusion-cli/tests/cli_integration.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,10 @@ fn init() {
6969
// can choose the old explain format too
7070
["--command", "EXPLAIN FORMAT indent SELECT 123"],
7171
)]
72+
#[case::change_format_version(
73+
"change_format_version",
74+
["--file", "tests/sql/types_format.sql", "-q"],
75+
)]
7276
#[test]
7377
fn cli_quick_test<'a>(
7478
#[case] snapshot_name: &'a str,

0 commit comments

Comments
 (0)