Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ jobs:
sudo apt-get update -qq
sudo apt-get install -y -qq clang
- name: Setup wasm-pack
uses: taiki-e/install-action@cfdb446e391c69574ebc316dfb7d7849ec12b940 # v2.68.8
uses: taiki-e/install-action@0e76c5c569f13f7eb21e8e5b26fe710062b57b62 # v2.65.13
with:
tool: wasm-pack
- name: Run tests with headless mode
Expand Down Expand Up @@ -537,7 +537,7 @@ jobs:
# command cannot be run for all the .slt files. Run it for just one that works (limit.slt)
# until most of the tickets in https://github.com/apache/datafusion/issues/16248 are addressed
# and this command can be run without filters.
run: cargo test --test sqllogictests -- --substrait-round-trip limit.slt
run: cargo test --test sqllogictests --features substrait -- --substrait-round-trip limit.slt

# Temporarily commenting out the Windows flow, the reason is enormously slow running build
# Waiting for new Windows 2025 github runner
Expand Down
32 changes: 17 additions & 15 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion/catalog-listing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ log = { workspace = true }
object_store = { workspace = true }

[dev-dependencies]
chrono = { workspace = true }
datafusion-datasource-parquet = { workspace = true }

# Note: add additional linter rules in lib.rs.
Expand Down
142 changes: 134 additions & 8 deletions datafusion/catalog-listing/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,17 +340,25 @@ fn filter_partitions(
Ok(None)
}

/// Returns `Ok(None)` when the file is not inside a valid partition path
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

b4dbb6a Skip files outside partition structure in hive-partitioned listing tables (#51)

/// (e.g. a stale file in the table root directory). Such files are skipped
/// because hive-style partition values are never null and there is no valid
/// value to assign for non-partitioned files.
fn try_into_partitioned_file(
object_meta: ObjectMeta,
partition_cols: &[(String, DataType)],
table_path: &ListingTableUrl,
) -> Result<PartitionedFile> {
) -> Result<Option<PartitionedFile>> {
let cols = partition_cols.iter().map(|(name, _)| name.as_str());
let parsed = parse_partitions_for_path(table_path, &object_meta.location, cols);

let Some(parsed) = parsed else {
// parse_partitions_for_path already logs a debug message
return Ok(None);
};

let partition_values = parsed
.into_iter()
.flatten()
.zip(partition_cols)
.map(|(parsed, (_, datatype))| {
ScalarValue::try_from_string(parsed.to_string(), datatype)
Expand All @@ -360,7 +368,7 @@ fn try_into_partitioned_file(
let mut pf: PartitionedFile = object_meta.into();
pf.partition_values = partition_values;

Ok(pf)
Ok(Some(pf))
}

/// Discover the partitions on the given path and prune out files
Expand Down Expand Up @@ -405,13 +413,15 @@ pub async fn pruned_partition_list<'a>(
)?;

Ok(objects
.map_ok(|object_meta| {
try_into_partitioned_file(object_meta, partition_cols, table_path)
.try_filter_map(|object_meta| {
futures::future::ready(try_into_partitioned_file(
object_meta,
partition_cols,
table_path,
))
})
.try_filter_map(move |pf| {
futures::future::ready(
pf.and_then(|pf| filter_partitions(pf, filters, &df_schema)),
)
futures::future::ready(filter_partitions(pf, filters, &df_schema))
})
.boxed())
}
Expand Down Expand Up @@ -574,6 +584,122 @@ mod tests {
);
}

#[test]
fn test_try_into_partitioned_file_valid_partition() {
let table_path = ListingTableUrl::parse("file:///bucket/mytable").unwrap();
let partition_cols = vec![("year_month".to_string(), DataType::Utf8)];
let meta = ObjectMeta {
location: Path::from("bucket/mytable/year_month=2024-01/data.parquet"),
last_modified: chrono::DateTime::from(std::time::SystemTime::UNIX_EPOCH),
size: 100,
e_tag: None,
version: None,
};

let result =
try_into_partitioned_file(meta, &partition_cols, &table_path).unwrap();
assert!(result.is_some());
let pf = result.unwrap();
assert_eq!(pf.partition_values.len(), 1);
assert_eq!(
pf.partition_values[0],
ScalarValue::Utf8(Some("2024-01".to_string()))
);
}

#[test]
fn test_try_into_partitioned_file_root_file_skipped() {
let table_path = ListingTableUrl::parse("file:///bucket/mytable").unwrap();
let partition_cols = vec![("year_month".to_string(), DataType::Utf8)];
let meta = ObjectMeta {
location: Path::from("bucket/mytable/data.parquet"),
last_modified: chrono::DateTime::from(std::time::SystemTime::UNIX_EPOCH),
size: 100,
e_tag: None,
version: None,
};

let result =
try_into_partitioned_file(meta, &partition_cols, &table_path).unwrap();
assert!(
result.is_none(),
"Files outside partition structure should be skipped"
);
}

#[test]
fn test_try_into_partitioned_file_wrong_partition_name() {
let table_path = ListingTableUrl::parse("file:///bucket/mytable").unwrap();
let partition_cols = vec![("year_month".to_string(), DataType::Utf8)];
let meta = ObjectMeta {
location: Path::from("bucket/mytable/wrong_col=2024-01/data.parquet"),
last_modified: chrono::DateTime::from(std::time::SystemTime::UNIX_EPOCH),
size: 100,
e_tag: None,
version: None,
};

let result =
try_into_partitioned_file(meta, &partition_cols, &table_path).unwrap();
assert!(
result.is_none(),
"Files with wrong partition column name should be skipped"
);
}

#[test]
fn test_try_into_partitioned_file_multiple_partitions() {
let table_path = ListingTableUrl::parse("file:///bucket/mytable").unwrap();
let partition_cols = vec![
("year".to_string(), DataType::Utf8),
("month".to_string(), DataType::Utf8),
];
let meta = ObjectMeta {
location: Path::from("bucket/mytable/year=2024/month=01/data.parquet"),
last_modified: chrono::DateTime::from(std::time::SystemTime::UNIX_EPOCH),
size: 100,
e_tag: None,
version: None,
};

let result =
try_into_partitioned_file(meta, &partition_cols, &table_path).unwrap();
assert!(result.is_some());
let pf = result.unwrap();
assert_eq!(pf.partition_values.len(), 2);
assert_eq!(
pf.partition_values[0],
ScalarValue::Utf8(Some("2024".to_string()))
);
assert_eq!(
pf.partition_values[1],
ScalarValue::Utf8(Some("01".to_string()))
);
}

#[test]
fn test_try_into_partitioned_file_partial_partition_skipped() {
let table_path = ListingTableUrl::parse("file:///bucket/mytable").unwrap();
let partition_cols = vec![
("year".to_string(), DataType::Utf8),
("month".to_string(), DataType::Utf8),
];
let meta = ObjectMeta {
location: Path::from("bucket/mytable/year=2024/data.parquet"),
last_modified: chrono::DateTime::from(std::time::SystemTime::UNIX_EPOCH),
size: 100,
e_tag: None,
version: None,
};

let result =
try_into_partitioned_file(meta, &partition_cols, &table_path).unwrap();
assert!(
result.is_none(),
"Files with incomplete partition structure should be skipped"
);
}

#[test]
fn test_expr_applicable_for_cols() {
assert!(expr_applicable_for_cols(
Expand Down
Loading
Loading