-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Implement schema adapter support for FileSource and add integration tests #16148
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…a adapter factory
…ce implementations
aaacdb6
to
d2027f1
Compare
9fe9975
to
727032b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @kosiew -- I had a few comments -- mostly about the tests.
Please let me know what you think
#[cfg(feature = "csv")] | ||
use datafusion_datasource_csv::CsvSource; | ||
|
||
/// A schema adapter factory that transforms column names to uppercase |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is very cool
} | ||
|
||
#[tokio::test] | ||
async fn test_multi_source_schema_adapter_reuse() -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is not entirely clear to me what this test is verifying
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test checks that:
- The UppercaseAdapterFactory can be applied to different source types (ArrowSource, ParquetSource, CsvSource)
- After applying the factory, each source correctly reports having a schema adapter factory
- The factory reference is properly maintained across different source instances
@@ -0,0 +1,208 @@ | |||
// Licensed to the Apache Software Foundation (ASF) under one |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure what additional coverage datafusion/core/tests/schema_adapter_factory_tests.rs
adds in addition to the integration test in datafusion/core/tests/integration_tests/schema_adapter_integration_tests.rs
Also if it does add additional coverage can you please include this as part of the other core_integration tests?
Each new file in in datafusion/core/tests
results in a new binary which each take 10s of MB
For example, I build this to check and the binary is 57 MB on my machine (it is even more with normal dev
profile)
$cargo test --profile=ci --test schema_adapter_factory_tests
...
Running tests/schema_adapter_factory_tests.rs (target/ci/deps/schema_adapter_factory_tests-b2997559eccc9857)
...
$ du -h target/ci/deps/schema_adapter_factory_tests-b2997559eccc9857
57M target/ci/deps/schema_adapter_factory_tests-b2997559eccc9857
Field::new("extra", DataType::Int64, true), | ||
]); | ||
|
||
// Create a TestSource |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this test covering? I don't understand what additional coverage it is adding
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
test_schema_adapter validates that:
- Creating and attaching a schema adapter factory to a file source
- Creating a schema adapter using the factory
- The schema adapter's ability to map column indices between a table schema and a file schema
- The schema adapter's ability to create a projection that selects only the columns from the file schema that are present in the table schema
} | ||
|
||
#[test] | ||
fn test_test_source_schema_adapter_factory() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment here -- I am not sure what extra coverage this is adding and it adds a new binary
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch!
The tests in this file are covered elsewhere already.
Will remove this file.
|
||
// Implementation of apply_schema_adapter for testing purposes | ||
// This mimics the private function in the datafusion-parquet crate | ||
fn apply_schema_adapter( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we left the method on ParquetSource it wouldn't have to be replicated 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like your suggestion!
@@ -81,6 +83,8 @@ impl FileSource for MockSource { | |||
fn file_type(&self) -> &str { | |||
"mock" | |||
} | |||
|
|||
impl_schema_adapter_methods!(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would personally suggest not adding this macro as I think peoeple will likely just have their IDE fill it out or let the compiler tell them what to do
What I suggest we do is change the with_schema_adapter_factory()
method return signature to return Result
and provide default implementations in the the trait that return Error(NotYetImplemented)
That way users won't need to change their implementations if they don't use schema adapters at all
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey, really appreciate the suggestion! Turning these two methods into trait defaults is tempting, but we run into some frustrating object-safety and cloning issues:
- Fallible signature (Result)
Switching to
fn with_schema_adapter_factory(...) -> Result<Arc<dyn FileSource>, _>
has drawbacks:
It forces every callsite—even the 99% that never “fail”—to handle a Result. That’s a lot of boilerplate up front.
Pushing the “not implemented” case to runtime means we only discover missing overrides via panics or errors in production, instead of compile-time feedback.
- Trait-object vs. Clone
A default like
fn with_schema_adapter_factory(
&self,
_factory: Arc<dyn SchemaAdapterFactory>,
) -> Arc<dyn FileSource> {
Arc::new(self.clone())
}
can’t compile because:
self is a &Self, so self.clone() gives you another &Self, producing an Arc<&Self>, and &Self isn’t a FileSource.
To make it work you’d need Self: Sized + Clone on the default—but then that method isn’t even available on dyn FileSource, defeating trait-object use.
So, I am leaning towards keeping the macro because:
- Object-safe clone: by generating Arc::new(Self { …, ..self.clone() }) inside each impl, the macro leverages the concrete type’s Clone impl without polluting the trait itself.
- Single maintenance point: if we ever tweak the method signature, we update the macro once and every impl site gets fixed automatically.
- Compile-time assurance: missing impl_schema_adapter_methods!() on a type immediately fails to compile, alerting the author they need to opt in.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It forces every callsite—even the 99% that never “fail”—to handle a Result. That’s a lot of boilerplate up front.
IN my opinion the boilerplate would be realtively minimal (likely it would require ?
a few places)
Pushing the “not implemented” case to runtime means we only discover missing overrides via panics or errors in production, instead of compile-time feedback.
This argument makes sense to me. However, another drawback is that after this PR, it is required for DataSource
implementations to handle schema adapter, rather than allowing an implementation to return a runtime error if it doesn't
I think that is probably fine but in the future I can still see a usecase for a fallable Result
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we implement the new methods for all structs in DataFusion, won't users who upgrade get a compile error because of the missing methods and thus be forced to make them a no-op or unimplemented!()
? That seems reasonable to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
compile error because of the missing methods and thus be forced to make them a no-op or unimplemented!()?
Yes, that is my understanding as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is what this PR does, right? Or are you suggesting a change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does it via macros right? I'm basically saying that instead of providing a macro that implements the functions for you I would force users to implement the functions and (if necessary) provider helpers they can call from within their implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would be my preference too -- the macro
is a nice way to reduce the boiler plate (and @kosiew has documented it super well) but I think it then adds a bit more cognative load and it would be better to have a little more duplication to be explict
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for discussion on this.
What do you think about implementing the schema adapter support via an opt-in trait to avoid breaking changes in FileSource trait?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is my proposal for how to handle this API:
I think it is fairly simple and follows the existing pattern in this codebase
fyi @adriangb |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @kosiew -- I think this PR looks good enough to merge to me. Thank you for your patience and thoroughness
It would be great to avoid a new file datafusion/core/tests/test_adapter_updated.rs
if possible but we can do that as a follow on too
|
||
//! Macros for the datafusion-datasource crate | ||
|
||
/// Helper macro to generate schema adapter methods for FileSource implementations |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is super well documented. I am still not a huge fan of adding this new macro as I think it makes implementing DataSources that much more complicated, but I can see the rationale for not adding a default on #16148 (comment)
So let's go with this approach and see how it goes
@@ -81,6 +83,8 @@ impl FileSource for MockSource { | |||
fn file_type(&self) -> &str { | |||
"mock" | |||
} | |||
|
|||
impl_schema_adapter_methods!(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It forces every callsite—even the 99% that never “fail”—to handle a Result. That’s a lot of boilerplate up front.
IN my opinion the boilerplate would be realtively minimal (likely it would require ?
a few places)
Pushing the “not implemented” case to runtime means we only discover missing overrides via panics or errors in production, instead of compile-time feedback.
This argument makes sense to me. However, another drawback is that after this PR, it is required for DataSource
implementations to handle schema adapter, rather than allowing an implementation to return a runtime error if it doesn't
I think that is probably fine but in the future I can still see a usecase for a fallable Result
@@ -0,0 +1,214 @@ | |||
// Licensed to the Apache Software Foundation (ASF) under one |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Due to
- THe previously mentioned reasons that more targets results in longer compile times
- It will be harder to find tests for the same feature if they are in different files
I think we should remove this new file datafusion/core/tests/test_adapter_updated.rs
and put the test in datafusion/core/tests/integration_tests/schema_adapter_integration_tests.rs
instead
However, we can do that as a follow on PR too
Thank you @alamb for the review and feedback. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which issue does this PR close?
This is part of a series of PRs re-implementing #15295 to close #14657 by adding schema‐evolution support for:
in DataFusion.
Rationale for this change
To enable customizable schema evolution during file scans, we introduce a
SchemaAdapterFactory
hook into allFileSource
implementations. This allows users to adapt column mappings and perform transformations (e.g., renaming, casting, adding defaults) without forking core scan logic.What changes are included in this PR?
Core API additions
with_schema_adapter_factory
andschema_adapter_factory
methods to theFileSource
traitimpl_schema_adapter_methods!()
macro to reduce boilerplate in eachFileSource
implementationas_file_source
helper to convert concrete sources intoArc<dyn FileSource>
Datasource crate updates
FileSource
implementations to store and honor an optionalschema_adapter_factory
FileSource
implementationsTesting
schema_adapter_factory_tests.rs
test_adapter_updated.rs
test_source_adapter_tests.rs
These cover factory wiring, column index mapping, schema transformation logic, and source behavior
schema_adapter_integration_tests.rs
apply_schema_adapter_tests.rs
These validate adapter behavior in real-world scenarios such as scanning Parquet files
Are these changes tested?
Yes. This PR includes comprehensive new tests to ensure:
Are there any user-facing changes?
Yes:
FileSource
traitimpl_schema_adapter_methods!()
for downstream implementorsThese changes are additive and backward-compatible. Developers implementing custom
FileSource
types must either use the macro or provide the new methods to support schema adapters.