Skip to content

Implement schema adapter support for FileSource and add integration tests #16148

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 41 commits into from
May 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
b23cd58
Implement schema adapter factory support for file sources
kosiew May 22, 2025
cd51b5a
Add schema adapter factory support to file sources
kosiew May 22, 2025
2233456
Add SchemaAdapterFactory import to file source module
kosiew May 22, 2025
60ff7e6
Add schema_adapter_factory field to JsonOpener and JsonSource structs
kosiew May 22, 2025
4d6f8f7
Add missing import for as_file_source in source.rs
kosiew May 22, 2025
011ce03
Fix formatting in ArrowSource implementation by removing extra newlines
kosiew May 22, 2025
7fde396
Add integration and unit tests for schema adapter factory functionality
kosiew May 22, 2025
dc9b8ac
fix tests
kosiew May 22, 2025
08b3ce0
Refactor adapt method signature and improve test assertions for schem…
kosiew May 22, 2025
aef5dd3
Simplify constructor in TestSource by removing redundant function def…
kosiew May 22, 2025
f964947
Remove redundant import of SchemaAdapterFactory in util.rs
kosiew May 22, 2025
d8720f0
fix tests: refactor schema_adapter_factory methods in TestSource for …
kosiew May 22, 2025
652fbaf
feat: add macro for schema adapter methods in FileSource implementation
kosiew May 22, 2025
fbd8c99
feat: use macro implement schema adapter methods for various FileSour…
kosiew May 22, 2025
7e9f070
refactor: clean up unused schema adapter factory methods in ParquetSo…
kosiew May 22, 2025
4c23e82
feat: add macro for generating schema adapter methods in FileSource i…
kosiew May 22, 2025
e91eb1b
refactor: re-export impl_schema_adapter_methods from crate root
kosiew May 22, 2025
9416efb
refactor: update macro usage and documentation for schema adapter met…
kosiew May 22, 2025
5fb40df
refactor: clean up import statements in datasource module
kosiew May 22, 2025
413ebe1
refactor: reorganize and clean up import statements in util.rs
kosiew May 22, 2025
a3fc370
Merge branch 'main' into file-source-merge
kosiew May 23, 2025
f11134a
Resolve merge conflict
kosiew May 23, 2025
c6ff4d5
Export macro with local inner macros for improved encapsulation
kosiew May 23, 2025
cb27246
fix clippy error
kosiew May 23, 2025
613d115
fix doc tests
kosiew May 23, 2025
d2027f1
fix CI error
kosiew May 23, 2025
727032b
Add metrics initialization to TestSource constructor
kosiew May 23, 2025
148148c
Add comment for test_multi_source_schema_adapter_reuse
kosiew May 26, 2025
d3b1680
reduce test files, move non-redundant tests, consolidate in one file
kosiew May 26, 2025
79a56f6
test_schema_adapter - add comments
kosiew May 26, 2025
55dc418
remove redundant tests
kosiew May 26, 2025
e8f8df4
Refactor schema adapter application to use ParquetSource method directly
kosiew May 26, 2025
6154b2d
Refactor apply_schema_adapter usage to call method directly on Parque…
kosiew May 26, 2025
208b1cc
remove macro
kosiew May 26, 2025
fd6dd78
Revert "remove macro"
kosiew May 26, 2025
ee07b69
FileSource - provide default implementations for schema_adapter_facto…
kosiew May 26, 2025
16eb25d
Revert "FileSource - provide default implementations for schema_adapt…
kosiew May 26, 2025
f890e8d
Remove unused import of SchemaAdapterFactory from file_format.rs
kosiew May 26, 2025
35036ec
Merge branch 'main' into file-source-with-adapter-factory
kosiew May 26, 2025
999e0cd
Refactor imports in apply_schema_adapter_tests.rs for improved readab…
kosiew May 26, 2025
befc171
Merge branch 'main' into file-source-with-adapter-factory
kosiew May 26, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 17 additions & 22 deletions datafusion/core/src/datasource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,28 +51,26 @@ pub use datafusion_physical_expr::create_ordering;
#[cfg(all(test, feature = "parquet"))]
mod tests {

use crate::prelude::SessionContext;

use std::fs;
use std::sync::Arc;

use arrow::array::{Int32Array, StringArray};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use datafusion_common::test_util::batches_to_sort_string;
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
use datafusion_datasource::schema_adapter::{
DefaultSchemaAdapterFactory, SchemaAdapter, SchemaAdapterFactory, SchemaMapper,
};
use datafusion_datasource::PartitionedFile;
use datafusion_datasource_parquet::source::ParquetSource;

use datafusion_common::record_batch;

use ::object_store::path::Path;
use ::object_store::ObjectMeta;
use datafusion_datasource::source::DataSourceExec;
use crate::prelude::SessionContext;
use arrow::{
array::{Int32Array, StringArray},
datatypes::{DataType, Field, Schema, SchemaRef},
record_batch::RecordBatch,
};
use datafusion_common::{record_batch, test_util::batches_to_sort_string};
use datafusion_datasource::{
file::FileSource, file_scan_config::FileScanConfigBuilder,
source::DataSourceExec, PartitionedFile,
};
use datafusion_datasource_parquet::source::ParquetSource;
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_physical_plan::collect;
use object_store::{path::Path, ObjectMeta};
use std::{fs, sync::Arc};
use tempfile::TempDir;

#[tokio::test]
Expand All @@ -81,7 +79,6 @@ mod tests {
// record batches returned from parquet. This can be useful for schema evolution
// where older files may not have all columns.

use datafusion_execution::object_store::ObjectStoreUrl;
let tmp_dir = TempDir::new().unwrap();
let table_dir = tmp_dir.path().join("parquet_test");
fs::DirBuilder::new().create(table_dir.as_path()).unwrap();
Expand Down Expand Up @@ -124,10 +121,8 @@ mod tests {
let f2 = Field::new("extra_column", DataType::Utf8, true);

let schema = Arc::new(Schema::new(vec![f1.clone(), f2.clone()]));
let source = Arc::new(
ParquetSource::default()
.with_schema_adapter_factory(Arc::new(TestSchemaAdapterFactory {})),
);
let source = ParquetSource::default()
.with_schema_adapter_factory(Arc::new(TestSchemaAdapterFactory {}));
let base_conf = FileScanConfigBuilder::new(
ObjectStoreUrl::local_filesystem(),
schema,
Expand Down
11 changes: 11 additions & 0 deletions datafusion/core/src/datasource/physical_plan/arrow_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ use std::sync::Arc;

use crate::datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener};
use crate::error::Result;
use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
use datafusion_datasource::{as_file_source, impl_schema_adapter_methods};

use arrow::buffer::Buffer;
use arrow::datatypes::SchemaRef;
Expand All @@ -39,6 +41,13 @@ use object_store::{GetOptions, GetRange, GetResultPayload, ObjectStore};
pub struct ArrowSource {
metrics: ExecutionPlanMetricsSet,
projected_statistics: Option<Statistics>,
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
}

impl From<ArrowSource> for Arc<dyn FileSource> {
fn from(source: ArrowSource) -> Self {
as_file_source(source)
}
}

impl FileSource for ArrowSource {
Expand Down Expand Up @@ -89,6 +98,8 @@ impl FileSource for ArrowSource {
fn file_type(&self) -> &str {
"arrow"
}

impl_schema_adapter_methods!();
}

/// The struct arrow that implements `[FileOpener]` trait
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Integration test for schema adapter factory functionality

use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::datasource::physical_plan::arrow_file::ArrowSource;
use datafusion::prelude::*;
use datafusion_common::Result;
use datafusion_datasource::file::FileSource;
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
use datafusion_datasource::schema_adapter::{SchemaAdapter, SchemaAdapterFactory};
use datafusion_datasource::source::DataSourceExec;
use datafusion_datasource::PartitionedFile;
use std::sync::Arc;
use tempfile::TempDir;

#[cfg(feature = "parquet")]
use datafusion_datasource_parquet::ParquetSource;
#[cfg(feature = "parquet")]
use parquet::arrow::ArrowWriter;
#[cfg(feature = "parquet")]
use parquet::file::properties::WriterProperties;

#[cfg(feature = "csv")]
use datafusion_datasource_csv::CsvSource;

/// A schema adapter factory that transforms column names to uppercase
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is very cool

#[derive(Debug)]
struct UppercaseAdapterFactory {}

impl SchemaAdapterFactory for UppercaseAdapterFactory {
fn create(&self, schema: &Schema) -> Result<Box<dyn SchemaAdapter>> {
Ok(Box::new(UppercaseAdapter {
input_schema: Arc::new(schema.clone()),
}))
}
}

/// Schema adapter that transforms column names to uppercase
#[derive(Debug)]
struct UppercaseAdapter {
input_schema: SchemaRef,
}

impl SchemaAdapter for UppercaseAdapter {
fn adapt(&self, record_batch: RecordBatch) -> Result<RecordBatch> {
// In a real adapter, we might transform the data too
// For this test, we're just passing through the batch
Ok(record_batch)
}

fn output_schema(&self) -> SchemaRef {
let fields = self
.input_schema
.fields()
.iter()
.map(|f| {
Field::new(
f.name().to_uppercase().as_str(),
f.data_type().clone(),
f.is_nullable(),
)
})
.collect();

Arc::new(Schema::new(fields))
}
}

#[cfg(feature = "parquet")]
#[tokio::test]
async fn test_parquet_integration_with_schema_adapter() -> Result<()> {
// Create a temporary directory for our test file
let tmp_dir = TempDir::new()?;
let file_path = tmp_dir.path().join("test.parquet");
let file_path_str = file_path.to_str().unwrap();

// Create test data
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("name", DataType::Utf8, true),
]));

let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(arrow::array::Int32Array::from(vec![1, 2, 3])),
Arc::new(arrow::array::StringArray::from(vec!["a", "b", "c"])),
],
)?;

// Write test parquet file
let file = std::fs::File::create(file_path_str)?;
let props = WriterProperties::builder().build();
let mut writer = ArrowWriter::try_new(file, schema.clone(), Some(props))?;
writer.write(&batch)?;
writer.close()?;

// Create a session context
let ctx = SessionContext::new();

// Create a ParquetSource with the adapter factory
let source = ParquetSource::default()
.with_schema_adapter_factory(Arc::new(UppercaseAdapterFactory {}));

// Create a scan config
let config = FileScanConfigBuilder::new(
ObjectStoreUrl::parse(&format!("file://{}", file_path_str))?,
schema.clone(),
)
.with_source(source)
.build();

// Create a data source executor
let exec = DataSourceExec::from_data_source(config);

// Collect results
let task_ctx = ctx.task_ctx();
let stream = exec.execute(0, task_ctx)?;
let batches = datafusion::physical_plan::common::collect(stream).await?;

// There should be one batch
assert_eq!(batches.len(), 1);

// Verify the schema has uppercase column names
let result_schema = batches[0].schema();
assert_eq!(result_schema.field(0).name(), "ID");
assert_eq!(result_schema.field(1).name(), "NAME");

Ok(())
}

#[tokio::test]
async fn test_multi_source_schema_adapter_reuse() -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is not entirely clear to me what this test is verifying

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test checks that:

  1. The UppercaseAdapterFactory can be applied to different source types (ArrowSource, ParquetSource, CsvSource)
  2. After applying the factory, each source correctly reports having a schema adapter factory
  3. The factory reference is properly maintained across different source instances

// This test verifies that the same schema adapter factory can be reused
// across different file source types. This is important for ensuring that:
// 1. The schema adapter factory interface works uniformly across all source types
// 2. The factory can be shared and cloned efficiently using Arc
// 3. Various data source implementations correctly implement the schema adapter factory pattern

// Create a test factory
let factory = Arc::new(UppercaseAdapterFactory {});

// Apply the same adapter to different source types
let arrow_source =
ArrowSource::default().with_schema_adapter_factory(factory.clone());

#[cfg(feature = "parquet")]
let parquet_source =
ParquetSource::default().with_schema_adapter_factory(factory.clone());

#[cfg(feature = "csv")]
let csv_source = CsvSource::default().with_schema_adapter_factory(factory.clone());

// Verify adapters were properly set
assert!(arrow_source.schema_adapter_factory().is_some());

#[cfg(feature = "parquet")]
assert!(parquet_source.schema_adapter_factory().is_some());

#[cfg(feature = "csv")]
assert!(csv_source.schema_adapter_factory().is_some());

Ok(())
}

// Helper function to test From<T> for Arc<dyn FileSource> implementations
fn test_from_impl<T: Into<Arc<dyn FileSource>> + Default>(expected_file_type: &str) {
let source = T::default();
let file_source: Arc<dyn FileSource> = source.into();
assert_eq!(file_source.file_type(), expected_file_type);
}

#[test]
fn test_from_implementations() {
// Test From implementation for various sources
test_from_impl::<ArrowSource>("arrow");

#[cfg(feature = "parquet")]
test_from_impl::<ParquetSource>("parquet");

#[cfg(feature = "csv")]
test_from_impl::<CsvSource>("csv");

#[cfg(feature = "json")]
test_from_impl::<datafusion_datasource_json::JsonSource>("json");
}

/// A simple test schema adapter factory that doesn't modify the schema
#[derive(Debug)]
struct TestSchemaAdapterFactory {}

impl SchemaAdapterFactory for TestSchemaAdapterFactory {
fn create(&self, schema: &Schema) -> Result<Box<dyn SchemaAdapter>> {
Ok(Box::new(TestSchemaAdapter {
input_schema: Arc::new(schema.clone()),
}))
}
}

/// A test schema adapter that passes through data unmodified
#[derive(Debug)]
struct TestSchemaAdapter {
input_schema: SchemaRef,
}

impl SchemaAdapter for TestSchemaAdapter {
fn adapt(&self, record_batch: RecordBatch) -> Result<RecordBatch> {
// Just pass through the batch unmodified
Ok(record_batch)
}

fn output_schema(&self) -> SchemaRef {
self.input_schema.clone()
}
}

#[cfg(feature = "parquet")]
#[test]
fn test_schema_adapter_preservation() {
// Create a test schema
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("name", DataType::Utf8, true),
]));

// Create source with schema adapter factory
let source = ParquetSource::default();
let factory = Arc::new(TestSchemaAdapterFactory {});
let file_source = source.with_schema_adapter_factory(factory);

// Create a FileScanConfig with the source
let config_builder =
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), schema.clone())
.with_source(file_source.clone())
// Add a file to make it valid
.with_file(PartitionedFile::new("test.parquet", 100));

let config = config_builder.build();

// Verify the schema adapter factory is present in the file source
assert!(config.source().schema_adapter_factory().is_some());
}
Loading