Skip to content

Commit 6a88408

Browse files
committed
#17411 Relax constraint that file sort order must only reference individual columns
1 parent 2ba2f1c commit 6a88408

File tree

9 files changed

+211
-88
lines changed

9 files changed

+211
-88
lines changed

datafusion/catalog/src/stream.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use datafusion_datasource::sink::{DataSink, DataSinkExec};
3434
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
3535
use datafusion_expr::dml::InsertOp;
3636
use datafusion_expr::{CreateExternalTable, Expr, SortExpr, TableType};
37-
use datafusion_physical_expr::create_ordering;
37+
use datafusion_physical_expr::create_lex_ordering;
3838
use datafusion_physical_plan::stream::RecordBatchReceiverStreamBuilder;
3939
use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec};
4040
use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
@@ -321,17 +321,21 @@ impl TableProvider for StreamTable {
321321

322322
async fn scan(
323323
&self,
324-
_state: &dyn Session,
324+
state: &dyn Session,
325325
projection: Option<&Vec<usize>>,
326326
_filters: &[Expr],
327327
limit: Option<usize>,
328328
) -> Result<Arc<dyn ExecutionPlan>> {
329329
let projected_schema = match projection {
330330
Some(p) => {
331-
let projected = self.0.source.schema().project(p)?;
332-
create_ordering(&projected, &self.0.order)?
331+
let projected = Arc::new(self.0.source.schema().project(p)?);
332+
create_lex_ordering(&projected, &self.0.order, state.execution_props())?
333333
}
334-
None => create_ordering(self.0.source.schema(), &self.0.order)?,
334+
None => create_lex_ordering(
335+
self.0.source.schema(),
336+
&self.0.order,
337+
state.execution_props(),
338+
)?,
335339
};
336340

337341
Ok(Arc::new(StreamingTableExec::try_new(
@@ -351,7 +355,8 @@ impl TableProvider for StreamTable {
351355
_insert_op: InsertOp,
352356
) -> Result<Arc<dyn ExecutionPlan>> {
353357
let schema = self.0.source.schema();
354-
let orders = create_ordering(schema, &self.0.order)?;
358+
let orders =
359+
create_lex_ordering(schema, &self.0.order, _state.execution_props())?;
355360
// It is sufficient to pass only one of the equivalent orderings:
356361
let ordering = orders.into_iter().next().map(Into::into);
357362

datafusion/core/src/datasource/listing/table.rs

Lines changed: 57 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use super::{
2323
};
2424
use crate::{
2525
datasource::file_format::{file_compression_type::FileCompressionType, FileFormat},
26-
datasource::{create_ordering, physical_plan::FileSinkConfig},
26+
datasource::physical_plan::FileSinkConfig,
2727
execution::context::SessionState,
2828
};
2929
use arrow::datatypes::{DataType, Field, SchemaBuilder, SchemaRef};
@@ -45,16 +45,19 @@ use datafusion_execution::{
4545
cache::{cache_manager::FileStatisticsCache, cache_unit::DefaultFileStatisticsCache},
4646
config::SessionConfig,
4747
};
48+
use datafusion_expr::execution_props::ExecutionProps;
4849
use datafusion_expr::{
4950
dml::InsertOp, Expr, SortExpr, TableProviderFilterPushDown, TableType,
5051
};
52+
use datafusion_physical_expr::create_lex_ordering;
5153
use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
5254
use datafusion_physical_expr_common::sort_expr::LexOrdering;
5355
use datafusion_physical_plan::{empty::EmptyExec, ExecutionPlan, Statistics};
5456
use futures::{future, stream, Stream, StreamExt, TryStreamExt};
5557
use itertools::Itertools;
5658
use object_store::ObjectStore;
5759
use std::{any::Any, collections::HashMap, str::FromStr, sync::Arc};
60+
5861
/// Indicates the source of the schema for a [`ListingTable`]
5962
// PartialEq required for assert_eq! in tests
6063
#[derive(Debug, Clone, Copy, PartialEq, Default)]
@@ -1129,8 +1132,15 @@ impl ListingTable {
11291132
}
11301133

11311134
/// If file_sort_order is specified, creates the appropriate physical expressions
1132-
fn try_create_output_ordering(&self) -> Result<Vec<LexOrdering>> {
1133-
create_ordering(&self.table_schema, &self.options.file_sort_order)
1135+
fn try_create_output_ordering(
1136+
&self,
1137+
execution_props: &ExecutionProps,
1138+
) -> Result<Vec<LexOrdering>> {
1139+
create_lex_ordering(
1140+
&self.table_schema,
1141+
&self.options.file_sort_order,
1142+
execution_props,
1143+
)
11341144
}
11351145
}
11361146

@@ -1219,7 +1229,7 @@ impl TableProvider for ListingTable {
12191229
return Ok(ScanResult::new(Arc::new(EmptyExec::new(projected_schema))));
12201230
}
12211231

1222-
let output_ordering = self.try_create_output_ordering()?;
1232+
let output_ordering = self.try_create_output_ordering(state.execution_props())?;
12231233
match state
12241234
.config_options()
12251235
.execution
@@ -1359,7 +1369,7 @@ impl TableProvider for ListingTable {
13591369
file_extension: self.options().format.get_ext(),
13601370
};
13611371

1362-
let orderings = self.try_create_output_ordering()?;
1372+
let orderings = self.try_create_output_ordering(state.execution_props())?;
13631373
// It is sufficient to pass only one of the equivalent orderings:
13641374
let order_requirements = orderings.into_iter().next().map(Into::into);
13651375

@@ -1587,6 +1597,7 @@ mod tests {
15871597
SchemaAdapter, SchemaAdapterFactory, SchemaMapper,
15881598
};
15891599
use datafusion_expr::{BinaryExpr, LogicalPlanBuilder, Operator};
1600+
use datafusion_physical_expr::expressions::binary;
15901601
use datafusion_physical_expr::PhysicalSortExpr;
15911602
use datafusion_physical_plan::{collect, ExecutionPlanProperties};
15921603
use rstest::rstest;
@@ -1719,29 +1730,44 @@ mod tests {
17191730

17201731
use crate::datasource::file_format::parquet::ParquetFormat;
17211732
use datafusion_physical_plan::expressions::col as physical_col;
1733+
use datafusion_physical_plan::expressions::lit as physical_lit;
17221734
use std::ops::Add;
17231735

17241736
// (file_sort_order, expected_result)
17251737
let cases = vec![
1726-
(vec![], Ok(Vec::<LexOrdering>::new())),
1738+
(
1739+
vec![],
1740+
Ok::<Vec<LexOrdering>, DataFusionError>(Vec::<LexOrdering>::new()),
1741+
),
17271742
// sort expr, but non column
17281743
(
1729-
vec![vec![
1730-
col("int_col").add(lit(1)).sort(true, true),
1731-
]],
1732-
Err("Expected single column reference in sort_order[0][0], got int_col + Int32(1)"),
1744+
vec![vec![col("int_col").add(lit(1)).sort(true, true)]],
1745+
Ok(vec![[PhysicalSortExpr {
1746+
expr: binary(
1747+
physical_col("int_col", &schema).unwrap(),
1748+
Operator::Plus,
1749+
physical_lit(1),
1750+
&schema,
1751+
)
1752+
.unwrap(),
1753+
options: SortOptions {
1754+
descending: false,
1755+
nulls_first: true,
1756+
},
1757+
}]
1758+
.into()]),
17331759
),
17341760
// ok with one column
17351761
(
17361762
vec![vec![col("string_col").sort(true, false)]],
17371763
Ok(vec![[PhysicalSortExpr {
1738-
expr: physical_col("string_col", &schema).unwrap(),
1739-
options: SortOptions {
1740-
descending: false,
1741-
nulls_first: false,
1742-
},
1743-
}].into(),
1744-
])
1764+
expr: physical_col("string_col", &schema).unwrap(),
1765+
options: SortOptions {
1766+
descending: false,
1767+
nulls_first: false,
1768+
},
1769+
}]
1770+
.into()]),
17451771
),
17461772
// ok with two columns, different options
17471773
(
@@ -1750,14 +1776,18 @@ mod tests {
17501776
col("int_col").sort(false, true),
17511777
]],
17521778
Ok(vec![[
1753-
PhysicalSortExpr::new_default(physical_col("string_col", &schema).unwrap())
1754-
.asc()
1755-
.nulls_last(),
1756-
PhysicalSortExpr::new_default(physical_col("int_col", &schema).unwrap())
1757-
.desc()
1758-
.nulls_first()
1759-
].into(),
1760-
])
1779+
PhysicalSortExpr::new_default(
1780+
physical_col("string_col", &schema).unwrap(),
1781+
)
1782+
.asc()
1783+
.nulls_last(),
1784+
PhysicalSortExpr::new_default(
1785+
physical_col("int_col", &schema).unwrap(),
1786+
)
1787+
.desc()
1788+
.nulls_first(),
1789+
]
1790+
.into()]),
17611791
),
17621792
];
17631793

@@ -1770,7 +1800,8 @@ mod tests {
17701800

17711801
let table =
17721802
ListingTable::try_new(config.clone()).expect("Creating the table");
1773-
let ordering_result = table.try_create_output_ordering();
1803+
let ordering_result =
1804+
table.try_create_output_ordering(state.execution_props());
17741805

17751806
match (expected_result, ordering_result) {
17761807
(Ok(expected), Ok(result)) => {

datafusion/datasource/src/file_scan_config.rs

Lines changed: 11 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,6 @@
1818
//! [`FileScanConfig`] to configure scanning of possibly partitioned
1919
//! file sources.
2020
21-
use std::{
22-
any::Any, borrow::Cow, collections::HashMap, fmt::Debug, fmt::Formatter,
23-
fmt::Result as FmtResult, marker::PhantomData, sync::Arc,
24-
};
25-
2621
use crate::file_groups::FileGroup;
2722
#[allow(unused_imports)]
2823
use crate::schema_adapter::SchemaAdapterFactory;
@@ -52,7 +47,7 @@ use datafusion_physical_expr::{expressions::Column, utils::reassign_predicate_co
5247
use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
5348
use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
5449
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
55-
use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
50+
use datafusion_physical_expr_common::sort_expr::LexOrdering;
5651
use datafusion_physical_plan::projection::ProjectionExpr;
5752
use datafusion_physical_plan::{
5853
display::{display_orderings, ProjectSchemaDisplay},
@@ -63,7 +58,12 @@ use datafusion_physical_plan::{
6358
use datafusion_physical_plan::{
6459
filter::collect_columns_from_predicate, filter_pushdown::FilterPushdownPropagation,
6560
};
61+
use std::{
62+
any::Any, borrow::Cow, collections::HashMap, fmt::Debug, fmt::Formatter,
63+
fmt::Result as FmtResult, marker::PhantomData, sync::Arc,
64+
};
6665

66+
use datafusion_physical_expr::equivalence::project_orderings;
6767
use datafusion_physical_plan::coop::cooperative;
6868
use datafusion_physical_plan::execution_plan::SchedulingType;
6969
use log::{debug, warn};
@@ -1384,30 +1384,11 @@ fn get_projected_output_ordering(
13841384
base_config: &FileScanConfig,
13851385
projected_schema: &SchemaRef,
13861386
) -> Vec<LexOrdering> {
1387-
let mut all_orderings = vec![];
1388-
for output_ordering in &base_config.output_ordering {
1389-
let mut new_ordering = vec![];
1390-
for PhysicalSortExpr { expr, options } in output_ordering.iter() {
1391-
if let Some(col) = expr.as_any().downcast_ref::<Column>() {
1392-
let name = col.name();
1393-
if let Some((idx, _)) = projected_schema.column_with_name(name) {
1394-
// Compute the new sort expression (with correct index) after projection:
1395-
new_ordering.push(PhysicalSortExpr::new(
1396-
Arc::new(Column::new(name, idx)),
1397-
*options,
1398-
));
1399-
continue;
1400-
}
1401-
}
1402-
// Cannot find expression in the projected_schema, stop iterating
1403-
// since rest of the orderings are violated
1404-
break;
1405-
}
1406-
1407-
let Some(new_ordering) = LexOrdering::new(new_ordering) else {
1408-
continue;
1409-
};
1387+
let projected_orderings =
1388+
project_orderings(&base_config.output_ordering, projected_schema);
14101389

1390+
let mut all_orderings = vec![];
1391+
for new_ordering in projected_orderings {
14111392
// Check if any file groups are not sorted
14121393
if base_config.file_groups.iter().any(|group| {
14131394
if group.len() <= 1 {
@@ -1480,6 +1461,7 @@ mod tests {
14801461
use datafusion_expr::{Operator, SortExpr};
14811462
use datafusion_physical_expr::create_physical_sort_expr;
14821463
use datafusion_physical_expr::expressions::{BinaryExpr, Literal};
1464+
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
14831465

14841466
/// Returns the column names on the schema
14851467
pub fn columns(schema: &Schema) -> Vec<String> {

datafusion/datasource/src/memory.rs

Lines changed: 4 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,7 @@ use arrow::array::{RecordBatch, RecordBatchOptions};
3030
use arrow::datatypes::{Schema, SchemaRef};
3131
use datafusion_common::{internal_err, plan_err, project_schema, Result, ScalarValue};
3232
use datafusion_execution::TaskContext;
33-
use datafusion_physical_expr::equivalence::{
34-
OrderingEquivalenceClass, ProjectionMapping,
35-
};
36-
use datafusion_physical_expr::expressions::Column;
33+
use datafusion_physical_expr::equivalence::project_orderings;
3734
use datafusion_physical_expr::utils::collect_columns;
3835
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
3936
use datafusion_physical_plan::memory::MemoryStream;
@@ -433,22 +430,9 @@ impl MemorySourceConfig {
433430
}
434431

435432
// If there is a projection on the source, we also need to project orderings
436-
if let Some(projection) = &self.projection {
437-
let base_schema = self.original_schema();
438-
let proj_exprs = projection.iter().map(|idx| {
439-
let name = base_schema.field(*idx).name();
440-
(Arc::new(Column::new(name, *idx)) as _, name.to_string())
441-
});
442-
let projection_mapping =
443-
ProjectionMapping::try_new(proj_exprs, &base_schema)?;
444-
let base_eqp = EquivalenceProperties::new_with_orderings(
445-
Arc::clone(&base_schema),
446-
sort_information,
447-
);
448-
let proj_eqp =
449-
base_eqp.project(&projection_mapping, Arc::clone(&self.projected_schema));
450-
let oeq_class: OrderingEquivalenceClass = proj_eqp.into();
451-
sort_information = oeq_class.into();
433+
if self.projection.is_some() {
434+
sort_information =
435+
project_orderings(&sort_information, &self.projected_schema);
452436
}
453437

454438
self.sort_information = sort_information;

datafusion/physical-expr/src/equivalence/class.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -590,6 +590,11 @@ impl EquivalenceGroup {
590590
aug_mapping: &AugmentedMapping,
591591
expr: &Arc<dyn PhysicalExpr>,
592592
) -> Option<Arc<dyn PhysicalExpr>> {
593+
// Literals don't need to be projected
594+
if expr.as_any().downcast_ref::<Literal>().is_some() {
595+
return Some(Arc::clone(expr));
596+
}
597+
593598
// The given expression is not inside the mapping, so we try to project
594599
// indirectly using equivalence classes.
595600
for (targets, eq_class) in aug_mapping.values() {

datafusion/physical-expr/src/equivalence/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ mod properties;
3030

3131
pub use class::{AcrossPartitions, ConstExpr, EquivalenceClass, EquivalenceGroup};
3232
pub use ordering::OrderingEquivalenceClass;
33-
pub use projection::ProjectionMapping;
33+
pub use projection::{project_orderings, ProjectionMapping};
3434
pub use properties::{
3535
calculate_union, join_equivalence_properties, EquivalenceProperties,
3636
};

0 commit comments

Comments
 (0)