Skip to content

Enhance Schema adapter to accommodate evolving struct #15295

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 156 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
156 commits
Select commit Hold shift + click to select a range
c8236ed
feat: implement NestedStructSchemaAdapter for handling schema evoluti…
kosiew Mar 18, 2025
afbe1ed
feat: enhance NestedStructSchemaAdapter with schema mapping capabilities
kosiew Mar 18, 2025
c774cab
test: add schema mapping test for NestedStructSchemaAdapter
kosiew Mar 18, 2025
5f5cd45
feat: implement NestedStructSchemaAdapterFactory for handling nested …
kosiew Mar 18, 2025
6065bc1
test: add unit test for NestedStructSchemaAdapterFactory to validate …
kosiew Mar 18, 2025
410f8d7
test: refactor test_create_appropriate_adapter for clarity and effici…
kosiew Mar 18, 2025
50cf134
feat: enhance create_appropriate_adapter to support nested schema tra…
kosiew Mar 18, 2025
3f52617
refactor: simplify create_appropriate_adapter logic for nested schema…
kosiew Mar 18, 2025
ad74d3a
refactor: remove redundant default adapter test in nested schema adapter
kosiew Mar 18, 2025
134dace
feat: enhance NestedStructSchemaAdapter to support additional table s…
kosiew Mar 18, 2025
aa89671
refactor: simplify test_nested_struct_evolution
kosiew Mar 18, 2025
f361311
refactor: streamline schema creation in nested schema adapter tests
kosiew Mar 18, 2025
a914a6b
Fix clippy errors
kosiew Mar 18, 2025
d8eb3eb
test: add async test for schema evolution with compaction in NestedSt…
kosiew Mar 21, 2025
1735b45
refactor: add missing imports and clean up test code in nested_schema…
kosiew Mar 21, 2025
72aee85
Rollback to before adding test_datafusion_schema_evolution_with_compa…
kosiew Mar 21, 2025
772fbce
feat: add nested_struct.rs to test nested schema evolution test with …
kosiew Mar 21, 2025
20af2c0
chore: remove nested_struct.rs example file to streamline repository …
kosiew Mar 21, 2025
3c0844c
feat: Add nested_struct.rs async function for schema evolution with c…
kosiew Mar 21, 2025
ad09e60
feat: Enhance logging in nested_struct.rs for better traceability 📜✨
kosiew Mar 21, 2025
61f1f6e
created helper functions
kosiew Mar 21, 2025
16a47d3
map batch1 to schema2
kosiew Mar 21, 2025
7b7183e
feat: Enhance NestedStructSchemaAdapter with custom schema mapping fo…
kosiew Mar 21, 2025
84ab195
feat: Add debug print statements to map_batch for tracing execution f…
kosiew Mar 21, 2025
51dacc5
fix: Refactor nested schema mapping for improved error handling and c…
kosiew Mar 21, 2025
aa5128a
refactor: Remove debug print statements for cleaner code execution 🧹✨
kosiew Mar 21, 2025
839bf61
nested_struct - plug adapter into ListingTableConfig
kosiew Mar 24, 2025
2e99158
feat: Add optional schema adapter factory to ListingTableConfig for e…
kosiew Mar 24, 2025
fe7ff84
feat: Add optional schema adapter factory to FileScanConfig for enhan…
kosiew Mar 24, 2025
3689140
feat: Enhance ListingTableConfig to support schema adapter factory fo…
kosiew Mar 24, 2025
76fbc6f
struct NestedStructSchemaMapping - remove table_schema, file_schema
kosiew Mar 25, 2025
f2d6b60
refactor: Remove nested_struct.rs example for schema evolution and co…
kosiew Mar 25, 2025
6b7fed9
style: Fix comment tests in ListingOptions documentation 📜✨
kosiew Mar 25, 2025
2cef654
Merge branch 'main' into test-merge
kosiew Mar 25, 2025
565ad5c
SchemaMapping remove table_schema, nested_schema_adapter remove map_p…
kosiew Mar 25, 2025
778da1e
docs: Update comments for schema_adapter_factory in ListingTableConfi…
kosiew Mar 25, 2025
f066e59
refactor: Extract schema adapter preservation logic into a helper fun…
kosiew Mar 25, 2025
4cc5f77
refactor: Extract schema adapter application logic into a dedicated f…
kosiew Mar 25, 2025
b6a828c
docs: Enhance adapt_fields documentation with performance considerati…
kosiew Mar 25, 2025
41fb40c
docs: Add detailed documentation for RecordBatch mapping in NestedStr…
kosiew Mar 25, 2025
3133cd7
refactor: Add missing import for FileSource in ListingTable implement…
kosiew Mar 25, 2025
5ad6287
refactor: Update license documentation comments for NestedSchemaAdapt…
kosiew Mar 25, 2025
8fa34da
refactor: Remove unused file_scan_exec.rs to clean up the codebase 🗑️✨
kosiew Mar 25, 2025
d229dd3
refactor: Remove unused file_scan_config.rs to streamline the codebas…
kosiew Mar 25, 2025
ff41c43
Moved the adapt_column method from NestedStructSchemaMapping to a sta…
kosiew Mar 25, 2025
2df74b6
Fix Clippy errors
kosiew Mar 25, 2025
bb4a5de
docs: Correct the struct names in documentation for NestedStructSchem…
kosiew Mar 25, 2025
a8cce59
Merge branch 'main' into schema-adapter
kosiew Mar 25, 2025
f547355
fix: remove unnecessary clone in create_physical_plan call for Listin…
kosiew Mar 25, 2025
fa7c17f
refactor: rename preserve_schema_adapter_factory to preserve_conf_sch…
kosiew Mar 25, 2025
e9c93d6
refactor: rename create_appropriate_adapter to create_adapter for cla…
kosiew Mar 25, 2025
64a4e3f
feature gate parquet
kosiew Mar 26, 2025
dd9f66d
Trigger CI
kosiew Mar 26, 2025
ca511df
refactor: mod tests, add user_infos
kosiew Mar 26, 2025
54590f4
feat: expose nested schema adapter and source for improved data handl…
kosiew Mar 26, 2025
50f67cb
Merge branch 'main' into schema-adapter
kosiew Apr 3, 2025
18a368e
Resolve merge conflict
kosiew Apr 3, 2025
42bb782
Refactor schema adapter application in ListingTable
kosiew Apr 3, 2025
0f52160
Merge branch 'main' into schema-adapter
kosiew Apr 8, 2025
52044da
Merge branch 'main' into schema-adapter
kosiew Apr 14, 2025
81d2a25
trigger ci
kosiew Apr 21, 2025
30db2d7
Merge branch 'main' into schema-adapter
kosiew May 7, 2025
90d260b
feat: add column statistics mapping for NestedStructSchemaMapping
kosiew May 7, 2025
f07dfdc
feat: add column statistics mapping for NestedStructSchemaMapping
kosiew May 7, 2025
0d7728f
add tests
kosiew May 7, 2025
6314b24
test: add helper functions for readability
kosiew May 7, 2025
eee5566
refactor: simplify DataType usage in NestedStructSchemaAdapter
kosiew May 7, 2025
5cf3a3c
fix: update timestamp array casting to include timezone metadata
kosiew May 7, 2025
25af310
streamline the tests to ensure no duplicate
kosiew May 7, 2025
5167825
verify_column_statistics - include expected_sum
kosiew May 7, 2025
38e4dc5
Merge branch 'schema-adapter-2' into schema-adapter
kosiew May 7, 2025
09d4b65
Copy license header
kosiew May 7, 2025
81f7ea5
Merge branch 'main' into schema-adapter
kosiew May 7, 2025
bd207e6
fix clippy errors
kosiew May 7, 2025
5257b44
Add nested_struct to test schema adaptation
kosiew May 12, 2025
c130bc6
fix: correct adapter creation method in schema evolution test
kosiew May 12, 2025
759e678
fix: update schema references in schema evolution test
kosiew May 12, 2025
039306e
amend create_batch to create a batch with fields as per schema withou…
kosiew May 12, 2025
b30e76f
fix: remove unnecessary Arc wrapping in create_array_for_field
kosiew May 12, 2025
c9192b5
feat: enhance logging in schema evolution test for better traceability
kosiew May 12, 2025
9bb3a5f
refactor: rename test function and remove old schema4 definition for …
kosiew May 12, 2025
4752b2c
feat: add logging for field names in create_batch and enhance timesta…
kosiew May 12, 2025
64d3a56
fix: replace create_batch2 with create_batch in schema evolution test
kosiew May 12, 2025
5243f7a
refactor: update schema adapter creation and mapping in schema evolut…
kosiew May 12, 2025
71ae846
pass adapter_factory to listing table config
kosiew May 12, 2025
41b6edd
refactor: streamline schema evolution test by creating a helper funct…
kosiew May 12, 2025
d4cdf2d
feat: add debug logging for column counts in PartitionColumnProjector
kosiew May 14, 2025
b6df0b3
Fix clippy errors
kosiew May 14, 2025
3d381c1
Merge branch 'main' into schema-adapter
kosiew May 14, 2025
a71c6f0
refactor: reorder test file paths
kosiew May 15, 2025
2834842
add jobs.parquet, nested_struct2.rs
kosiew May 15, 2025
b951294
feat: add debug logging for column mismatch in PartitionColumnProjector
kosiew May 15, 2025
9b53a88
fix: replace debug logging with println for column mismatch in Partit…
kosiew May 15, 2025
27a32bc
remove compacting section, test with select * query
kosiew May 15, 2025
67e080a
refactor: remove compacted parquet file writing and update SQL query …
kosiew May 15, 2025
65cfdc3
add jobs.parquet, amend nested_struct2 not to delete it
kosiew May 15, 2025
63132b2
remove adapter_factory
kosiew May 15, 2025
854f2ce
refactor: remove schema adapter factory and reorder test file paths
kosiew May 15, 2025
087c815
add adapter_factory
kosiew May 15, 2025
ee41f12
Merge branch 'main' into schema-adapter
kosiew May 16, 2025
8692877
fix: ListingTableConfig remove schema
kosiew May 16, 2025
37ecc57
fix: Simplify paths in test_datafusion_schema_evolution and add resul…
kosiew May 16, 2025
efda93d
fix: Enhance schema adaptation for projection in nested struct fields…
kosiew May 16, 2025
02f7c33
nested_struct2 use adapter_factory
kosiew May 16, 2025
034499f
fix cargo fmt error
kosiew May 16, 2025
f241545
fix: Add NestedStructSchemaAdapterFactory import in nested_struct2 ex…
kosiew May 16, 2025
3b50aa9
fix: amend create_schema4
kosiew May 16, 2025
54f90a0
fix: add query results display in schema evolution test
kosiew May 16, 2025
4ecc450
chore: remove unused nested_struct and nested_struct2 examples, and d…
kosiew May 17, 2025
df66922
fix: remove debug print statements from file_scan_config.rs
kosiew May 17, 2025
e238c10
refactor fn map_schema in schema_adapter.rs, nested_schema_adapter.rs…
kosiew May 17, 2025
ed6d2c3
fix: add missing Field import in schema_adapter.rs
kosiew May 17, 2025
c2264d3
refactor: extract can_cast_field helper function to improve code read…
kosiew May 19, 2025
6018c24
refactor: remove unused create_schema_mapping function to clean up code
kosiew May 19, 2025
091bb6a
test: amend create_nested_schema to include original user and timesta…
kosiew May 19, 2025
2b7f9df
Merge branch 'main' into test-merge
kosiew May 20, 2025
57d8671
doc: enhance documentation for with_schema_adapter_factory in Listing…
kosiew May 20, 2025
b28eb92
feat: add schema evolution support for FileSource with extension trait
kosiew May 20, 2025
1903158
refactor: remove unused imports in nested_schema_adapter.rs
kosiew May 20, 2025
3cb816b
refactor: remove adapt_fields function and related schema adaptation …
kosiew May 20, 2025
792fc20
refactor: remove adapt_schema tests from NestedStructSchemaAdapter
kosiew May 20, 2025
008e6ad
fix: correct cloning of self in with_schema_adapter_factory method
kosiew May 20, 2025
dc11478
refactor: enhance with_schema_adapter method for dynamic schema adapt…
kosiew May 20, 2025
7eeba38
fully qualif <Arc<dyn FileSource> as FileSourceExt>
kosiew May 20, 2025
baef480
FileSourceExt-change self to source
kosiew May 20, 2025
61226a0
refactor: use fully qualified syntax for with_schema_adapter method i…
kosiew May 20, 2025
afc87cd
refactor: simplify with_schema_adapter_factory method by using mutabl…
kosiew May 20, 2025
581379a
refactor: update with_schema_adapter method to use self instead of so…
kosiew May 20, 2025
5494de1
refactor: cast self to Arc<dyn FileSource> for compatibility in FileS…
kosiew May 20, 2025
7d9d038
refactor: simplify with_schema_adapter method by removing explicit Ar…
kosiew May 20, 2025
2b80a63
refactor: enhance ListingTableConfig by implementing Default trait an…
kosiew May 20, 2025
1b0f83c
refactor: removing apply_schema_adapter_to_source function
kosiew May 20, 2025
4f9aba6
refactor: simplify schema adapter factory handling in ParquetSource
kosiew May 20, 2025
085ae46
refactor: impl FileSourceExt for dyn FileSource
kosiew May 20, 2025
2a37983
refactor: remove apply_schema_adapter_to_source function, integrate i…
kosiew May 20, 2025
54acd98
refactor: rename with_factory to with_schema_adapter_factory for clarity
kosiew May 20, 2025
3018b02
refactor: update schema adapter factory methods to use Option type fo…
kosiew May 20, 2025
7154234
refactor: remove with_schema_adapter_factory_opt method
kosiew May 20, 2025
64c1691
refactor: enhance schema adapter factory handling in ParquetSource
kosiew May 20, 2025
5eede31
refactor: simplify SchemaMapping instantiation in DefaultSchemaAdapter
kosiew May 21, 2025
d99556b
refactor: improve documentation for create_field_mapping and SchemaMa…
kosiew May 21, 2025
c8d642f
test: add unit tests for schema mapping happy and error paths
kosiew May 21, 2025
3243ab7
refactor: add with_schema_adapter_factor directly to FileSource
kosiew May 22, 2025
84f0991
refactor: add Sized constraint to with_schema_adapter_factory method …
kosiew May 22, 2025
58dd0d9
refactor: update with_schema_adapter_factory method to indicate defau…
kosiew May 22, 2025
3ef15c1
revert to before add with_schema_adapter to FileSource
kosiew May 22, 2025
1804498
refactor: FileSource implement with_schema_adapter_factory
kosiew May 22, 2025
f69b80e
refactor: add schema_adapter_factory support to CsvSource
kosiew May 22, 2025
0a5db8b
refactor: reintroduce From implementation for ParquetSource and add g…
kosiew May 22, 2025
9aeaacc
Revert "refactor: reintroduce From implementation for ParquetSource a…
kosiew May 22, 2025
9dc95be
refactor: add as_file_source helper function for FileSource conversion
kosiew May 22, 2025
a56b05c
refactor: implement From trait for CsvSource to use as_file_source he…
kosiew May 22, 2025
8e744f6
refactor: enhance JsonSource with schema adapter factory support and …
kosiew May 22, 2025
cd65627
refactor: remove unused ParquetSource import from table.rs
kosiew May 22, 2025
3aab6ec
refactor: add schema adapter factory support to ArrowSource
kosiew May 22, 2025
248e276
refactor: update TestSource to support schema adapter factory
kosiew May 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 36 additions & 8 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use datafusion_catalog::TableProvider;
use datafusion_common::{config_err, DataFusionError, Result};
use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
use datafusion_datasource::schema_adapter::DefaultSchemaAdapterFactory;
use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
use datafusion_expr::dml::InsertOp;
use datafusion_expr::{Expr, TableProviderFilterPushDown};
use datafusion_expr::{SortExpr, TableType};
Expand Down Expand Up @@ -60,7 +61,7 @@ use object_store::ObjectStore;
/// Configuration for creating a [`ListingTable`]
///
///
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Default)]
pub struct ListingTableConfig {
/// Paths on the `ObjectStore` for creating `ListingTable`.
/// They should share the same schema and object store.
Expand All @@ -73,16 +74,16 @@ pub struct ListingTableConfig {
///
/// See details on [`ListingTableConfig::with_listing_options`]
pub options: Option<ListingOptions>,
/// schema_adapter to handle schema evolution of fields over time
pub schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
}

impl ListingTableConfig {
/// Creates new [`ListingTableConfig`] for reading the specified URL
pub fn new(table_path: ListingTableUrl) -> Self {
let table_paths = vec![table_path];
Self {
table_paths,
file_schema: None,
options: None,
table_paths: vec![table_path],
..Default::default()
}
}

Expand All @@ -92,8 +93,7 @@ impl ListingTableConfig {
pub fn new_with_multi_paths(table_paths: Vec<ListingTableUrl>) -> Self {
Self {
table_paths,
file_schema: None,
options: None,
..Default::default()
}
}
/// Set the `schema` for the overall [`ListingTable`]
Expand All @@ -111,6 +111,7 @@ impl ListingTableConfig {
table_paths: self.table_paths,
file_schema: Some(schema),
options: self.options,
schema_adapter_factory: self.schema_adapter_factory,
}
}

Expand All @@ -123,9 +124,23 @@ impl ListingTableConfig {
table_paths: self.table_paths,
file_schema: self.file_schema,
options: Some(listing_options),
schema_adapter_factory: self.schema_adapter_factory,
}
}

/// Add a schema adapter factory to the [`ListingTableConfig`]
///
/// Schema adapters handle schema evolution over time, allowing the table to adapt
/// to changes in file schemas. This is particularly useful for handling nested fields
/// in formats like Parquet where the schema may evolve.
pub fn with_schema_adapter_factory(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

at a high level, it makes a lot of sense to provide the schema adapter factory to the listing table

mut self,
factory: Arc<dyn SchemaAdapterFactory>,
) -> Self {
self.schema_adapter_factory = Some(factory);
self
}

/// Returns a tuple of `(file_extension, optional compression_extension)`
///
/// For example a path ending with blah.test.csv.gz returns `("csv", Some("gz"))`
Expand Down Expand Up @@ -201,6 +216,7 @@ impl ListingTableConfig {
table_paths: self.table_paths,
file_schema: self.file_schema,
options: Some(listing_options),
schema_adapter_factory: self.schema_adapter_factory,
})
}

Expand All @@ -224,6 +240,7 @@ impl ListingTableConfig {
table_paths: self.table_paths,
file_schema: Some(schema),
options: Some(options),
schema_adapter_factory: self.schema_adapter_factory,
})
}
None => internal_err!("No `ListingOptions` set for inferring schema"),
Expand Down Expand Up @@ -264,6 +281,7 @@ impl ListingTableConfig {
table_paths: self.table_paths,
file_schema: self.file_schema,
options: Some(options),
schema_adapter_factory: self.schema_adapter_factory,
})
}
None => config_err!("No `ListingOptions` set for inferring schema"),
Expand Down Expand Up @@ -750,6 +768,8 @@ pub struct ListingTable {
collected_statistics: FileStatisticsCache,
constraints: Constraints,
column_defaults: HashMap<String, Expr>,
/// schema_adapter to handle schema evolution of fields over time
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
}

impl ListingTable {
Expand Down Expand Up @@ -786,6 +806,7 @@ impl ListingTable {
collected_statistics: Arc::new(DefaultFileStatisticsCache::default()),
constraints: Constraints::empty(),
column_defaults: HashMap::new(),
schema_adapter_factory: config.schema_adapter_factory,
};

Ok(table)
Expand Down Expand Up @@ -942,6 +963,13 @@ impl TableProvider for ListingTable {
return Ok(Arc::new(EmptyExec::new(Arc::new(Schema::empty()))));
};

let mut source = self.options.format.file_source();

// Apply schema adapter to source if available
if let Some(factory) = &self.schema_adapter_factory {
source = source.with_schema_adapter_factory(Arc::clone(factory));
}

// create the execution plan
self.options
.format
Expand All @@ -950,7 +978,7 @@ impl TableProvider for ListingTable {
FileScanConfigBuilder::new(
object_store_url,
Arc::clone(&self.file_schema),
self.options.format.file_source(),
source,
)
.with_file_groups(partitioned_file_lists)
.with_constraints(self.constraints.clone())
Expand Down
9 changes: 4 additions & 5 deletions datafusion/core/src/datasource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ pub use datafusion_catalog::default_table_source;
pub use datafusion_catalog::memory;
pub use datafusion_catalog::stream;
pub use datafusion_catalog::view;
pub use datafusion_datasource::nested_schema_adapter;
pub use datafusion_datasource::schema_adapter;
pub use datafusion_datasource::sink;
pub use datafusion_datasource::source;
Expand All @@ -52,7 +53,7 @@ pub use datafusion_physical_expr::create_ordering;
mod tests {

use crate::prelude::SessionContext;

use datafusion_datasource::file::FileSource;
use std::fs;
use std::sync::Arc;

Expand Down Expand Up @@ -124,10 +125,8 @@ mod tests {
let f2 = Field::new("extra_column", DataType::Utf8, true);

let schema = Arc::new(Schema::new(vec![f1.clone(), f2.clone()]));
let source = Arc::new(
ParquetSource::default()
.with_schema_adapter_factory(Arc::new(TestSchemaAdapterFactory {})),
);
let source = ParquetSource::default()
.with_schema_adapter_factory(Arc::new(TestSchemaAdapterFactory {}));
let base_conf = FileScanConfigBuilder::new(
ObjectStoreUrl::local_filesystem(),
schema,
Expand Down
22 changes: 22 additions & 0 deletions datafusion/core/src/datasource/physical_plan/arrow_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ use arrow::buffer::Buffer;
use arrow::datatypes::SchemaRef;
use arrow_ipc::reader::FileDecoder;
use datafusion_common::Statistics;
use datafusion_datasource::as_file_source;
use datafusion_datasource::file::FileSource;
use datafusion_datasource::file_scan_config::FileScanConfig;
use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;

use futures::StreamExt;
Expand All @@ -39,6 +41,12 @@ use object_store::{GetOptions, GetRange, GetResultPayload, ObjectStore};
pub struct ArrowSource {
metrics: ExecutionPlanMetricsSet,
projected_statistics: Option<Statistics>,
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
}
impl From<ArrowSource> for Arc<dyn FileSource> {
fn from(source: ArrowSource) -> Self {
as_file_source(source)
}
}

impl FileSource for ArrowSource {
Expand Down Expand Up @@ -89,6 +97,20 @@ impl FileSource for ArrowSource {
fn file_type(&self) -> &str {
"arrow"
}

fn with_schema_adapter_factory(
&self,
schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
) -> Arc<dyn FileSource> {
Arc::new(Self {
schema_adapter_factory: Some(schema_adapter_factory),
..self.clone()
})
}

fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
self.schema_adapter_factory.clone()
}
}

/// The struct arrow that implements `[FileOpener]` trait
Expand Down
20 changes: 19 additions & 1 deletion datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ use arrow::{array::RecordBatch, compute::concat_batches};
use datafusion::{datasource::object_store::ObjectStoreUrl, physical_plan::PhysicalExpr};
use datafusion_common::{config::ConfigOptions, Statistics};
use datafusion_common::{internal_err, Result};
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
use datafusion_datasource::file_stream::FileOpenFuture;
use datafusion_datasource::source::DataSourceExec;
use datafusion_datasource::{
Expand All @@ -40,6 +39,9 @@ use datafusion_datasource::{
use datafusion_datasource::{
file_meta::FileMeta, schema_adapter::DefaultSchemaAdapterFactory, PartitionedFile,
};
use datafusion_datasource::{
file_scan_config::FileScanConfigBuilder, schema_adapter::SchemaAdapterFactory,
};
use datafusion_physical_expr::conjunction;
use datafusion_physical_expr_common::physical_expr::fmt_sql;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
Expand Down Expand Up @@ -119,6 +121,7 @@ pub struct TestSource {
schema: Option<SchemaRef>,
metrics: ExecutionPlanMetricsSet,
projection: Option<Vec<usize>>,
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
}

impl TestSource {
Expand All @@ -132,6 +135,7 @@ impl TestSource {
projection: None,
metrics: ExecutionPlanMetricsSet::new(),
batches,
schema_adapter_factory: None,
}
}
}
Expand Down Expand Up @@ -243,6 +247,20 @@ impl FileSource for TestSource {
Ok(FilterPushdownPropagation::unsupported(filters))
}
}

fn with_schema_adapter_factory(
&self,
schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
) -> Arc<dyn FileSource> {
Arc::new(Self {
schema_adapter_factory: Some(schema_adapter_factory),
..self.clone()
})
}

fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
self.schema_adapter_factory.clone()
}
}

#[derive(Debug, Clone)]
Expand Down
25 changes: 24 additions & 1 deletion datafusion/datasource-csv/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use datafusion_datasource::file_compression_type::FileCompressionType;
use datafusion_datasource::file_meta::FileMeta;
use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
use datafusion_datasource::{
calculate_range, FileRange, ListingTableUrl, RangeCalculation,
as_file_source, calculate_range, FileRange, ListingTableUrl, RangeCalculation,
};

use arrow::csv;
Expand All @@ -37,6 +37,7 @@ use datafusion_common::{DataFusionError, Result, Statistics};
use datafusion_common_runtime::JoinSet;
use datafusion_datasource::file::FileSource;
use datafusion_datasource::file_scan_config::FileScanConfig;
use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
use datafusion_execution::TaskContext;
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion_physical_plan::{
Expand Down Expand Up @@ -91,6 +92,7 @@ pub struct CsvSource {
comment: Option<u8>,
metrics: ExecutionPlanMetricsSet,
projected_statistics: Option<Statistics>,
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
}

impl CsvSource {
Expand Down Expand Up @@ -212,6 +214,12 @@ impl CsvOpener {
}
}

impl From<CsvSource> for Arc<dyn FileSource> {
fn from(source: CsvSource) -> Self {
as_file_source(source)
}
}

impl FileSource for CsvSource {
fn create_file_opener(
&self,
Expand Down Expand Up @@ -254,6 +262,21 @@ impl FileSource for CsvSource {
Arc::new(conf)
}

fn with_schema_adapter_factory(
&self,
schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
) -> Arc<dyn FileSource> {
// For CSV, we don't have schema adapter factory support yet, so just return self
Arc::new(Self {
schema_adapter_factory: Some(schema_adapter_factory),
..self.clone()
})
}

fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
self.schema_adapter_factory.clone()
}

fn metrics(&self) -> &ExecutionPlanMetricsSet {
&self.metrics
}
Expand Down
26 changes: 25 additions & 1 deletion datafusion/datasource-json/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ use datafusion_datasource::decoder::{deserialize_stream, DecoderDeserializer};
use datafusion_datasource::file_compression_type::FileCompressionType;
use datafusion_datasource::file_meta::FileMeta;
use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
use datafusion_datasource::{calculate_range, ListingTableUrl, RangeCalculation};
use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
use datafusion_datasource::{
as_file_source, calculate_range, ListingTableUrl, RangeCalculation,
};
use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};

use arrow::json::ReaderBuilder;
Expand Down Expand Up @@ -77,6 +80,7 @@ pub struct JsonSource {
batch_size: Option<usize>,
metrics: ExecutionPlanMetricsSet,
projected_statistics: Option<Statistics>,
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
}

impl JsonSource {
Expand All @@ -86,6 +90,12 @@ impl JsonSource {
}
}

impl From<JsonSource> for Arc<dyn FileSource> {
fn from(source: JsonSource) -> Self {
as_file_source(source)
}
}

impl FileSource for JsonSource {
fn create_file_opener(
&self,
Expand Down Expand Up @@ -140,6 +150,20 @@ impl FileSource for JsonSource {
fn file_type(&self) -> &str {
"json"
}

fn with_schema_adapter_factory(
&self,
schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
) -> Arc<dyn FileSource> {
Arc::new(Self {
schema_adapter_factory: Some(schema_adapter_factory),
..self.clone()
})
}

fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
self.schema_adapter_factory.clone()
}
}

impl FileOpener for JsonOpener {
Expand Down
Loading