diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 61e34baebe7c..8d8aa991e942 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -32,6 +32,7 @@ use datafusion_catalog::TableProvider; use datafusion_common::{config_err, DataFusionError, Result}; use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder}; use datafusion_datasource::schema_adapter::DefaultSchemaAdapterFactory; +use datafusion_datasource::schema_adapter::SchemaAdapterFactory; use datafusion_expr::dml::InsertOp; use datafusion_expr::{Expr, TableProviderFilterPushDown}; use datafusion_expr::{SortExpr, TableType}; @@ -60,7 +61,7 @@ use object_store::ObjectStore; /// Configuration for creating a [`ListingTable`] /// /// -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] pub struct ListingTableConfig { /// Paths on the `ObjectStore` for creating `ListingTable`. /// They should share the same schema and object store. @@ -73,16 +74,16 @@ pub struct ListingTableConfig { /// /// See details on [`ListingTableConfig::with_listing_options`] pub options: Option, + /// schema_adapter to handle schema evolution of fields over time + pub schema_adapter_factory: Option>, } impl ListingTableConfig { /// Creates new [`ListingTableConfig`] for reading the specified URL pub fn new(table_path: ListingTableUrl) -> Self { - let table_paths = vec![table_path]; Self { - table_paths, - file_schema: None, - options: None, + table_paths: vec![table_path], + ..Default::default() } } @@ -92,8 +93,7 @@ impl ListingTableConfig { pub fn new_with_multi_paths(table_paths: Vec) -> Self { Self { table_paths, - file_schema: None, - options: None, + ..Default::default() } } /// Set the `schema` for the overall [`ListingTable`] @@ -111,6 +111,7 @@ impl ListingTableConfig { table_paths: self.table_paths, file_schema: Some(schema), options: self.options, + schema_adapter_factory: self.schema_adapter_factory, } } @@ -123,9 +124,23 @@ impl ListingTableConfig { table_paths: self.table_paths, file_schema: self.file_schema, options: Some(listing_options), + schema_adapter_factory: self.schema_adapter_factory, } } + /// Add a schema adapter factory to the [`ListingTableConfig`] + /// + /// Schema adapters handle schema evolution over time, allowing the table to adapt + /// to changes in file schemas. This is particularly useful for handling nested fields + /// in formats like Parquet where the schema may evolve. + pub fn with_schema_adapter_factory( + mut self, + factory: Arc, + ) -> Self { + self.schema_adapter_factory = Some(factory); + self + } + /// Returns a tuple of `(file_extension, optional compression_extension)` /// /// For example a path ending with blah.test.csv.gz returns `("csv", Some("gz"))` @@ -201,6 +216,7 @@ impl ListingTableConfig { table_paths: self.table_paths, file_schema: self.file_schema, options: Some(listing_options), + schema_adapter_factory: self.schema_adapter_factory, }) } @@ -224,6 +240,7 @@ impl ListingTableConfig { table_paths: self.table_paths, file_schema: Some(schema), options: Some(options), + schema_adapter_factory: self.schema_adapter_factory, }) } None => internal_err!("No `ListingOptions` set for inferring schema"), @@ -264,6 +281,7 @@ impl ListingTableConfig { table_paths: self.table_paths, file_schema: self.file_schema, options: Some(options), + schema_adapter_factory: self.schema_adapter_factory, }) } None => config_err!("No `ListingOptions` set for inferring schema"), @@ -750,6 +768,8 @@ pub struct ListingTable { collected_statistics: FileStatisticsCache, constraints: Constraints, column_defaults: HashMap, + /// schema_adapter to handle schema evolution of fields over time + schema_adapter_factory: Option>, } impl ListingTable { @@ -786,6 +806,7 @@ impl ListingTable { collected_statistics: Arc::new(DefaultFileStatisticsCache::default()), constraints: Constraints::empty(), column_defaults: HashMap::new(), + schema_adapter_factory: config.schema_adapter_factory, }; Ok(table) @@ -942,6 +963,13 @@ impl TableProvider for ListingTable { return Ok(Arc::new(EmptyExec::new(Arc::new(Schema::empty())))); }; + let mut source = self.options.format.file_source(); + + // Apply schema adapter to source if available + if let Some(factory) = &self.schema_adapter_factory { + source = source.with_schema_adapter_factory(Arc::clone(factory)); + } + // create the execution plan self.options .format @@ -950,7 +978,7 @@ impl TableProvider for ListingTable { FileScanConfigBuilder::new( object_store_url, Arc::clone(&self.file_schema), - self.options.format.file_source(), + source, ) .with_file_groups(partitioned_file_lists) .with_constraints(self.constraints.clone()) diff --git a/datafusion/core/src/datasource/mod.rs b/datafusion/core/src/datasource/mod.rs index 674541ff73a5..f99db86c26b7 100644 --- a/datafusion/core/src/datasource/mod.rs +++ b/datafusion/core/src/datasource/mod.rs @@ -42,6 +42,7 @@ pub use datafusion_catalog::default_table_source; pub use datafusion_catalog::memory; pub use datafusion_catalog::stream; pub use datafusion_catalog::view; +pub use datafusion_datasource::nested_schema_adapter; pub use datafusion_datasource::schema_adapter; pub use datafusion_datasource::sink; pub use datafusion_datasource::source; @@ -52,7 +53,7 @@ pub use datafusion_physical_expr::create_ordering; mod tests { use crate::prelude::SessionContext; - + use datafusion_datasource::file::FileSource; use std::fs; use std::sync::Arc; @@ -124,10 +125,8 @@ mod tests { let f2 = Field::new("extra_column", DataType::Utf8, true); let schema = Arc::new(Schema::new(vec![f1.clone(), f2.clone()])); - let source = Arc::new( - ParquetSource::default() - .with_schema_adapter_factory(Arc::new(TestSchemaAdapterFactory {})), - ); + let source = ParquetSource::default() + .with_schema_adapter_factory(Arc::new(TestSchemaAdapterFactory {})); let base_conf = FileScanConfigBuilder::new( ObjectStoreUrl::local_filesystem(), schema, diff --git a/datafusion/core/src/datasource/physical_plan/arrow_file.rs b/datafusion/core/src/datasource/physical_plan/arrow_file.rs index 897d1c04471c..56cd0072dab7 100644 --- a/datafusion/core/src/datasource/physical_plan/arrow_file.rs +++ b/datafusion/core/src/datasource/physical_plan/arrow_file.rs @@ -25,8 +25,10 @@ use arrow::buffer::Buffer; use arrow::datatypes::SchemaRef; use arrow_ipc::reader::FileDecoder; use datafusion_common::Statistics; +use datafusion_datasource::as_file_source; use datafusion_datasource::file::FileSource; use datafusion_datasource::file_scan_config::FileScanConfig; +use datafusion_datasource::schema_adapter::SchemaAdapterFactory; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use futures::StreamExt; @@ -39,6 +41,12 @@ use object_store::{GetOptions, GetRange, GetResultPayload, ObjectStore}; pub struct ArrowSource { metrics: ExecutionPlanMetricsSet, projected_statistics: Option, + schema_adapter_factory: Option>, +} +impl From for Arc { + fn from(source: ArrowSource) -> Self { + as_file_source(source) + } } impl FileSource for ArrowSource { @@ -89,6 +97,20 @@ impl FileSource for ArrowSource { fn file_type(&self) -> &str { "arrow" } + + fn with_schema_adapter_factory( + &self, + schema_adapter_factory: Arc, + ) -> Arc { + Arc::new(Self { + schema_adapter_factory: Some(schema_adapter_factory), + ..self.clone() + }) + } + + fn schema_adapter_factory(&self) -> Option> { + self.schema_adapter_factory.clone() + } } /// The struct arrow that implements `[FileOpener]` trait diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs index 393322a7f3e2..df27acd2c507 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs @@ -31,7 +31,6 @@ use arrow::{array::RecordBatch, compute::concat_batches}; use datafusion::{datasource::object_store::ObjectStoreUrl, physical_plan::PhysicalExpr}; use datafusion_common::{config::ConfigOptions, Statistics}; use datafusion_common::{internal_err, Result}; -use datafusion_datasource::file_scan_config::FileScanConfigBuilder; use datafusion_datasource::file_stream::FileOpenFuture; use datafusion_datasource::source::DataSourceExec; use datafusion_datasource::{ @@ -40,6 +39,9 @@ use datafusion_datasource::{ use datafusion_datasource::{ file_meta::FileMeta, schema_adapter::DefaultSchemaAdapterFactory, PartitionedFile, }; +use datafusion_datasource::{ + file_scan_config::FileScanConfigBuilder, schema_adapter::SchemaAdapterFactory, +}; use datafusion_physical_expr::conjunction; use datafusion_physical_expr_common::physical_expr::fmt_sql; use datafusion_physical_optimizer::PhysicalOptimizerRule; @@ -119,6 +121,7 @@ pub struct TestSource { schema: Option, metrics: ExecutionPlanMetricsSet, projection: Option>, + schema_adapter_factory: Option>, } impl TestSource { @@ -132,6 +135,7 @@ impl TestSource { projection: None, metrics: ExecutionPlanMetricsSet::new(), batches, + schema_adapter_factory: None, } } } @@ -243,6 +247,20 @@ impl FileSource for TestSource { Ok(FilterPushdownPropagation::unsupported(filters)) } } + + fn with_schema_adapter_factory( + &self, + schema_adapter_factory: Arc, + ) -> Arc { + Arc::new(Self { + schema_adapter_factory: Some(schema_adapter_factory), + ..self.clone() + }) + } + + fn schema_adapter_factory(&self) -> Option> { + self.schema_adapter_factory.clone() + } } #[derive(Debug, Clone)] diff --git a/datafusion/datasource-csv/src/source.rs b/datafusion/datasource-csv/src/source.rs index cbadb5dd91af..e49c38c556e3 100644 --- a/datafusion/datasource-csv/src/source.rs +++ b/datafusion/datasource-csv/src/source.rs @@ -28,7 +28,7 @@ use datafusion_datasource::file_compression_type::FileCompressionType; use datafusion_datasource::file_meta::FileMeta; use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener}; use datafusion_datasource::{ - calculate_range, FileRange, ListingTableUrl, RangeCalculation, + as_file_source, calculate_range, FileRange, ListingTableUrl, RangeCalculation, }; use arrow::csv; @@ -37,6 +37,7 @@ use datafusion_common::{DataFusionError, Result, Statistics}; use datafusion_common_runtime::JoinSet; use datafusion_datasource::file::FileSource; use datafusion_datasource::file_scan_config::FileScanConfig; +use datafusion_datasource::schema_adapter::SchemaAdapterFactory; use datafusion_execution::TaskContext; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion_physical_plan::{ @@ -91,6 +92,7 @@ pub struct CsvSource { comment: Option, metrics: ExecutionPlanMetricsSet, projected_statistics: Option, + schema_adapter_factory: Option>, } impl CsvSource { @@ -212,6 +214,12 @@ impl CsvOpener { } } +impl From for Arc { + fn from(source: CsvSource) -> Self { + as_file_source(source) + } +} + impl FileSource for CsvSource { fn create_file_opener( &self, @@ -254,6 +262,21 @@ impl FileSource for CsvSource { Arc::new(conf) } + fn with_schema_adapter_factory( + &self, + schema_adapter_factory: Arc, + ) -> Arc { + // For CSV, we don't have schema adapter factory support yet, so just return self + Arc::new(Self { + schema_adapter_factory: Some(schema_adapter_factory), + ..self.clone() + }) + } + + fn schema_adapter_factory(&self) -> Option> { + self.schema_adapter_factory.clone() + } + fn metrics(&self) -> &ExecutionPlanMetricsSet { &self.metrics } diff --git a/datafusion/datasource-json/src/source.rs b/datafusion/datasource-json/src/source.rs index 982b799556ab..5cd06aa77673 100644 --- a/datafusion/datasource-json/src/source.rs +++ b/datafusion/datasource-json/src/source.rs @@ -30,7 +30,10 @@ use datafusion_datasource::decoder::{deserialize_stream, DecoderDeserializer}; use datafusion_datasource::file_compression_type::FileCompressionType; use datafusion_datasource::file_meta::FileMeta; use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener}; -use datafusion_datasource::{calculate_range, ListingTableUrl, RangeCalculation}; +use datafusion_datasource::schema_adapter::SchemaAdapterFactory; +use datafusion_datasource::{ + as_file_source, calculate_range, ListingTableUrl, RangeCalculation, +}; use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties}; use arrow::json::ReaderBuilder; @@ -77,6 +80,7 @@ pub struct JsonSource { batch_size: Option, metrics: ExecutionPlanMetricsSet, projected_statistics: Option, + schema_adapter_factory: Option>, } impl JsonSource { @@ -86,6 +90,12 @@ impl JsonSource { } } +impl From for Arc { + fn from(source: JsonSource) -> Self { + as_file_source(source) + } +} + impl FileSource for JsonSource { fn create_file_opener( &self, @@ -140,6 +150,20 @@ impl FileSource for JsonSource { fn file_type(&self) -> &str { "json" } + + fn with_schema_adapter_factory( + &self, + schema_adapter_factory: Arc, + ) -> Arc { + Arc::new(Self { + schema_adapter_factory: Some(schema_adapter_factory), + ..self.clone() + }) + } + + fn schema_adapter_factory(&self) -> Option> { + self.schema_adapter_factory.clone() + } } impl FileOpener for JsonOpener { diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 83013e5c9783..2e629814378e 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -421,8 +421,11 @@ impl FileFormat for ParquetFormat { source = source.with_metadata_size_hint(metadata_size_hint) } + // Apply schema adapter factory before building the new config + let file_source = apply_schema_adapter(source, &conf); + let conf = FileScanConfigBuilder::from(conf) - .with_source(Arc::new(source)) + .with_source(file_source) .build(); Ok(DataSourceExec::from_data_source(conf)) } @@ -1576,3 +1579,27 @@ fn create_max_min_accs( .collect(); (max_values, min_values) } + +/// Converts a ParquetSource to an Arc and applies the schema adapter factory +/// from the FileScanConfig if present. +/// +/// # Arguments +/// * `source` - The ParquetSource to convert +/// * `conf` - FileScanConfig that may contain a schema adapter factory +/// +/// # Returns +/// The converted FileSource with schema adapter factory applied if provided +fn apply_schema_adapter( + source: ParquetSource, + conf: &FileScanConfig, +) -> Arc { + // Convert the ParquetSource to Arc + let file_source: Arc = source.into(); + + // If the FileScanConfig.file_source() has a schema adapter factory, apply it + if let Some(factory) = conf.file_source().schema_adapter_factory() { + file_source.with_schema_adapter_factory(factory.clone()) + } else { + file_source + } +} diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 13684db8ea15..1df2f859f5b4 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -26,17 +26,17 @@ use crate::opener::ParquetOpener; use crate::row_filter::can_expr_be_pushed_down_with_schemas; use crate::DefaultParquetFileReaderFactory; use crate::ParquetFileReaderFactory; -use datafusion_common::config::ConfigOptions; -use datafusion_datasource::file_stream::FileOpener; -use datafusion_datasource::schema_adapter::{ - DefaultSchemaAdapterFactory, SchemaAdapterFactory, -}; - use arrow::datatypes::{SchemaRef, TimeUnit}; +use datafusion_common::config::ConfigOptions; use datafusion_common::config::TableParquetOptions; use datafusion_common::{DataFusionError, Statistics}; +use datafusion_datasource::file::as_file_source; use datafusion_datasource::file::FileSource; use datafusion_datasource::file_scan_config::FileScanConfig; +use datafusion_datasource::file_stream::FileOpener; +use datafusion_datasource::schema_adapter::{ + DefaultSchemaAdapterFactory, SchemaAdapterFactory, +}; use datafusion_physical_expr::conjunction; use datafusion_physical_expr_common::physical_expr::fmt_sql; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; @@ -348,20 +348,6 @@ impl ParquetSource { self.schema_adapter_factory.as_ref() } - /// Set optional schema adapter factory. - /// - /// [`SchemaAdapterFactory`] allows user to specify how fields from the - /// parquet file get mapped to that of the table schema. The default schema - /// adapter uses arrow's cast library to map the parquet fields to the table - /// schema. - pub fn with_schema_adapter_factory( - mut self, - schema_adapter_factory: Arc, - ) -> Self { - self.schema_adapter_factory = Some(schema_adapter_factory); - self - } - /// If true, the predicate will be used during the parquet scan. /// Defaults to false /// @@ -446,6 +432,13 @@ pub(crate) fn parse_coerce_int96_string( } } +/// Allows easy conversion from ParquetSource to Arc +impl From for Arc { + fn from(source: ParquetSource) -> Self { + as_file_source(source) + } +} + impl FileSource for ParquetSource { fn create_file_opener( &self, @@ -656,4 +649,24 @@ impl FileSource for ParquetSource { ); Ok(FilterPushdownPropagation::with_filters(filters).with_updated_node(source)) } + + /// Set optional schema adapter factory. + /// + /// [`SchemaAdapterFactory`] allows user to specify how fields from the + /// parquet file get mapped to that of the table schema. The default schema + /// adapter uses arrow's cast library to map the parquet fields to the table + /// schema. + fn with_schema_adapter_factory( + &self, + schema_adapter_factory: Arc, + ) -> Arc { + Arc::new(Self { + schema_adapter_factory: Some(schema_adapter_factory), + ..self.clone() + }) + } + + fn schema_adapter_factory(&self) -> Option> { + self.schema_adapter_factory.clone() + } } diff --git a/datafusion/datasource/src/file.rs b/datafusion/datasource/src/file.rs index c9b5c416f0c0..04b6b6975a76 100644 --- a/datafusion/datasource/src/file.rs +++ b/datafusion/datasource/src/file.rs @@ -25,6 +25,7 @@ use std::sync::Arc; use crate::file_groups::FileGroupPartitioner; use crate::file_scan_config::FileScanConfig; use crate::file_stream::FileOpener; +use crate::schema_adapter::SchemaAdapterFactory; use arrow::datatypes::SchemaRef; use datafusion_common::config::ConfigOptions; use datafusion_common::{Result, Statistics}; @@ -35,6 +36,11 @@ use datafusion_physical_plan::DisplayFormatType; use object_store::ObjectStore; +/// Helper function to convert any type implementing FileSource to Arc +pub fn as_file_source(source: T) -> Arc { + Arc::new(source) +} + /// file format specific behaviors for elements in [`DataSource`] /// /// See more details on specific implementations: @@ -116,4 +122,17 @@ pub trait FileSource: Send + Sync { ) -> Result>> { Ok(FilterPushdownPropagation::unsupported(filters)) } + + /// Set optional schema adapter factory. + /// + /// [`SchemaAdapterFactory`] allows user to specify how fields from the + /// file get mapped to that of the table schema. The default implementation + /// returns the original source. + fn with_schema_adapter_factory( + &self, + factory: Arc, + ) -> Arc; + + /// Returns the current schema adapter factory if set + fn schema_adapter_factory(&self) -> Option>; } diff --git a/datafusion/datasource/src/mod.rs b/datafusion/datasource/src/mod.rs index 9e83adc6b9fe..c99e570ea1b8 100644 --- a/datafusion/datasource/src/mod.rs +++ b/datafusion/datasource/src/mod.rs @@ -38,6 +38,7 @@ pub mod file_scan_config; pub mod file_sink_config; pub mod file_stream; pub mod memory; +pub mod nested_schema_adapter; pub mod schema_adapter; pub mod sink; pub mod source; @@ -48,6 +49,7 @@ pub mod test_util; pub mod url; pub mod write; +pub use self::file::as_file_source; pub use self::url::ListingTableUrl; use crate::file_groups::FileGroup; use chrono::TimeZone; diff --git a/datafusion/datasource/src/nested_schema_adapter.rs b/datafusion/datasource/src/nested_schema_adapter.rs new file mode 100644 index 000000000000..4af80b1c061e --- /dev/null +++ b/datafusion/datasource/src/nested_schema_adapter.rs @@ -0,0 +1,943 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`NestedStructSchemaAdapter`] and [`NestedStructSchemaAdapterFactory`] to adapt file-level record batches to a table schema. +//! +//! Adapter provides a method of translating the RecordBatches that come out of the +//! physical format into how they should be used by DataFusion. For instance, a schema +//! can be stored external to a parquet file that maps parquet logical types to arrow types. + +use arrow::datatypes::{DataType::Struct, Field, Schema, SchemaRef}; +use datafusion_common::{ColumnStatistics, Result}; +use std::sync::Arc; + +use crate::schema_adapter::{ + create_field_mapping, DefaultSchemaAdapterFactory, SchemaAdapter, + SchemaAdapterFactory, SchemaMapper, +}; +use arrow::array::{Array, ArrayRef, StructArray}; +use arrow::compute::cast; +use arrow::record_batch::{RecordBatch, RecordBatchOptions}; +use datafusion_common::arrow::array::new_null_array; + +/// Factory for creating [`NestedStructSchemaAdapter`] +/// +/// This factory creates schema adapters that properly handle schema evolution +/// for nested struct fields, allowing new fields to be added to struct columns +/// over time. +#[derive(Debug, Clone, Default)] +pub struct NestedStructSchemaAdapterFactory; + +impl SchemaAdapterFactory for NestedStructSchemaAdapterFactory { + fn create( + &self, + projected_table_schema: SchemaRef, + table_schema: SchemaRef, + ) -> Box { + Box::new(NestedStructSchemaAdapter::new( + projected_table_schema, + table_schema, + )) + } +} + +impl NestedStructSchemaAdapterFactory { + /// Create a new factory for mapping batches from a file schema to a table + /// schema with support for nested struct evolution. + /// + /// This is a convenience method that handles nested struct fields properly. + pub fn from_schema(table_schema: SchemaRef) -> Box { + Self.create(Arc::clone(&table_schema), table_schema) + } + + /// Determines if a schema contains nested struct fields that would benefit + /// from special handling during schema evolution + pub fn has_nested_structs(schema: &Schema) -> bool { + schema + .fields() + .iter() + .any(|field| matches!(field.data_type(), Struct(_))) + } + + /// Create an appropriate schema adapter based on schema characteristics. + /// Returns a NestedStructSchemaAdapter if the projected schema contains nested structs, + /// otherwise returns a DefaultSchemaAdapter. + pub fn create_adapter( + projected_table_schema: SchemaRef, + table_schema: SchemaRef, + ) -> Box { + // Use nested adapter if target has nested structs + if Self::has_nested_structs(table_schema.as_ref()) { + NestedStructSchemaAdapterFactory.create(projected_table_schema, table_schema) + } else { + // Default case for simple schemas + DefaultSchemaAdapterFactory.create(projected_table_schema, table_schema) + } + } +} + +/// A SchemaAdapter that handles schema evolution for nested struct types +#[derive(Debug, Clone)] +pub struct NestedStructSchemaAdapter { + /// The schema for the table, projected to include only the fields being output (projected) by the + /// associated ParquetSource + projected_table_schema: SchemaRef, + /// The entire table schema for the table we're using this to adapt. + /// + /// This is used to evaluate any filters pushed down into the scan + /// which may refer to columns that are not referred to anywhere + /// else in the plan. + table_schema: SchemaRef, +} + +impl NestedStructSchemaAdapter { + /// Create a new NestedStructSchemaAdapter with the target schema + pub fn new(projected_table_schema: SchemaRef, table_schema: SchemaRef) -> Self { + Self { + projected_table_schema, + table_schema, + } + } + + pub fn projected_table_schema(&self) -> &Schema { + self.projected_table_schema.as_ref() + } + + pub fn table_schema(&self) -> &Schema { + self.table_schema.as_ref() + } +} + +impl SchemaAdapter for NestedStructSchemaAdapter { + fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option { + let field_name = self.table_schema.field(index).name(); + file_schema.index_of(field_name).ok() + } + + fn map_schema( + &self, + file_schema: &Schema, + ) -> Result<(Arc, Vec)> { + let (field_mappings, projection) = create_field_mapping( + file_schema, + &self.projected_table_schema, + |file_field, table_field| { + // Special handling for struct fields - always include them even if the + // internal structure differs, as we'll adapt them later + match (file_field.data_type(), table_field.data_type()) { + (Struct(_), Struct(_)) => Ok(true), + _ => { + // For non-struct fields, use the regular cast check + crate::schema_adapter::can_cast_field(file_field, table_field) + } + } + }, + )?; + + Ok(( + Arc::new(NestedStructSchemaMapping::new( + Arc::clone(&self.projected_table_schema), + field_mappings, + )), + projection, + )) + } +} + +/// A SchemaMapping implementation specifically for nested structs +#[derive(Debug)] +struct NestedStructSchemaMapping { + /// The schema for the table, projected to include only the fields being output + projected_table_schema: SchemaRef, + /// Field mappings from projected table to file schema + field_mappings: Vec>, +} + +impl NestedStructSchemaMapping { + /// Create a new nested struct schema mapping + pub fn new( + projected_table_schema: SchemaRef, + field_mappings: Vec>, + ) -> Self { + Self { + projected_table_schema, + field_mappings, + } + } +} + +/// Maps a `RecordBatch` to a new `RecordBatch` according to the schema mapping defined in `NestedStructSchemaMapping`. +/// +/// # Arguments +/// +/// * `batch` - The input `RecordBatch` to be mapped. +/// +/// # Returns +/// +/// A `Result` containing the new `RecordBatch` with columns adapted according to the schema mapping, or an error if the mapping fails. +/// +/// # Behavior +/// +/// - For each field in the projected table schema, the corresponding column in the input batch is adapted. +/// - If a field does not exist in the input batch, a null array of the appropriate data type and length is created and used in the output batch. +/// - If a field exists in the input batch, the column is adapted to handle potential nested struct adaptation. +/// +/// # Errors +/// +/// Returns an error if the column adaptation fails or if the new `RecordBatch` cannot be created. +impl SchemaMapper for NestedStructSchemaMapping { + fn map_batch(&self, batch: RecordBatch) -> Result { + let batch_rows = batch.num_rows(); + let batch_cols = batch.columns().to_vec(); + + let cols = self + .projected_table_schema + .fields() + .iter() + .zip(&self.field_mappings) + .map(|(field, file_idx)| { + file_idx.map_or_else( + // If field doesn't exist in file, return null array + || Ok(new_null_array(field.data_type(), batch_rows)), + // If field exists, handle potential nested struct adaptation + |batch_idx| adapt_column(&batch_cols[batch_idx], field), + ) + }) + .collect::, _>>()?; + + // Create record batch with adapted columns + let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows())); + let schema = Arc::clone(&self.projected_table_schema); + let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?; + Ok(record_batch) + } + + /// Adapts file-level column `Statistics` to match the `table_schema` + /// + /// Maps statistics from the file schema to the projected table schema using field mappings. + /// For fields not present in the file schema, uses unknown statistics. + fn map_column_statistics( + &self, + file_col_statistics: &[ColumnStatistics], + ) -> Result> { + let mut table_col_statistics = vec![]; + + // Map statistics for each field based on field_mappings + for (_, file_col_idx) in self + .projected_table_schema + .fields() + .iter() + .zip(&self.field_mappings) + { + if let Some(file_col_idx) = file_col_idx { + // Use statistics from file if available, otherwise default + table_col_statistics.push( + file_col_statistics + .get(*file_col_idx) + .cloned() + .unwrap_or_default(), + ); + } else { + // Field doesn't exist in file schema, use unknown statistics + table_col_statistics.push(ColumnStatistics::new_unknown()); + } + } + + Ok(table_col_statistics) + } +} + +// Helper methods for the NestedStructSchemaMapping +/// Adapt a column to match the target field type, handling nested structs specially +fn adapt_column(source_col: &ArrayRef, target_field: &Field) -> Result { + match target_field.data_type() { + Struct(target_fields) => { + // For struct arrays, we need to handle them specially + if let Some(struct_array) = source_col.as_any().downcast_ref::() + { + // Create a vector to store field-array pairs with the correct type + let mut children: Vec<(Arc, Arc)> = Vec::new(); + let num_rows = source_col.len(); + + // For each field in the target schema + for target_child_field in target_fields { + // Create Arc directly (not Arc>) + let field_arc = Arc::clone(target_child_field); + + // Try to find corresponding field in source + match struct_array.column_by_name(target_child_field.name()) { + Some(source_child_col) => { + // Field exists in source, adapt it + let adapted_child = + adapt_column(source_child_col, target_child_field)?; + children.push((field_arc, adapted_child)); + } + None => { + // Field doesn't exist in source, add null array + children.push(( + field_arc, + new_null_array(target_child_field.data_type(), num_rows), + )); + } + } + } + + // Create new struct array with all target fields + let struct_array = StructArray::from(children); + Ok(Arc::new(struct_array)) + } else { + // Not a struct array, but target expects struct - return nulls + Ok(new_null_array(target_field.data_type(), source_col.len())) + } + } + // For non-struct types, just cast + _ => Ok(cast(source_col, target_field.data_type())?), + } +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::array::{Array, StringBuilder, StructArray, TimestampMillisecondArray}; + use arrow::datatypes::{ + DataType::{Float64, Int16, Int32, Timestamp, Utf8}, + TimeUnit::Millisecond, + }; + use datafusion_common::ScalarValue; + + // ================================ + // Schema Creation Helper Functions + // ================================ + + /// Helper function to create a basic nested schema with additionalInfo + fn create_basic_nested_schema() -> SchemaRef { + Arc::new(Schema::new(vec![ + create_additional_info_field(false), // without reason field + ])) + } + + /// Helper function to create a deeply nested schema with additionalInfo including reason field + fn create_deep_nested_schema() -> SchemaRef { + Arc::new(Schema::new(vec![ + create_additional_info_field(true), // with reason field + ])) + } + + /// Helper function to create the additionalInfo field with or without the reason subfield + fn create_additional_info_field(with_reason: bool) -> Field { + let mut field_children = vec![ + Field::new("location", Utf8, true), + Field::new("timestamp_utc", Timestamp(Millisecond, None), true), + ]; + + // Add the reason field if requested (for target schema) + if with_reason { + field_children.push(create_reason_field()); + } + + Field::new("additionalInfo", Struct(field_children.into()), true) + } + + /// Helper function to create the reason nested field with its details subfield + fn create_reason_field() -> Field { + Field::new( + "reason", + Struct( + vec![ + Field::new("_level", Float64, true), + // Inline the details field creation + Field::new( + "details", + Struct( + vec![ + Field::new("rurl", Utf8, true), + Field::new("s", Float64, true), + Field::new("t", Utf8, true), + ] + .into(), + ), + true, + ), + ] + .into(), + ), + true, + ) + } + + // ================================ + // Schema Evolution Tests + // ================================ + + #[test] + fn test_adapter_factory_selection() -> Result<()> { + // Test schemas for adapter selection logic + let simple_schema = Arc::new(Schema::new(vec![ + Field::new("id", Int32, false), + Field::new("name", Utf8, true), + Field::new("age", Int16, true), + ])); + + let nested_schema = Arc::new(Schema::new(vec![ + Field::new("id", Int32, false), + Field::new( + "metadata", + Struct( + vec![ + Field::new("created", Utf8, true), + Field::new("modified", Utf8, true), + ] + .into(), + ), + true, + ), + ])); + + // Source schema with missing field + let source_schema = Arc::new(Schema::new(vec![ + Field::new("id", Int32, false), + Field::new( + "metadata", + Struct( + vec![ + Field::new("created", Utf8, true), + // "modified" field is missing + ] + .into(), + ), + true, + ), + ])); + + // Test struct detection + assert!( + !NestedStructSchemaAdapterFactory::has_nested_structs(&simple_schema), + "Simple schema should not be detected as having nested structs" + ); + assert!( + NestedStructSchemaAdapterFactory::has_nested_structs(&nested_schema), + "Nested schema should be detected as having nested structs" + ); + + // Test adapter behavior with schema evolution + let default_adapter = DefaultSchemaAdapterFactory + .create(nested_schema.clone(), nested_schema.clone()); + let nested_adapter = NestedStructSchemaAdapterFactory + .create(nested_schema.clone(), nested_schema.clone()); + + // Default adapter should fail with schema evolution + assert!(default_adapter.map_schema(&source_schema).is_err()); + + // Nested adapter should handle schema evolution + assert!( + nested_adapter.map_schema(&source_schema).is_ok(), + "Nested adapter should handle schema with missing fields" + ); + + // Test factory selection logic + let adapter = NestedStructSchemaAdapterFactory::create_adapter( + nested_schema.clone(), + nested_schema.clone(), + ); + + assert!( + adapter.map_schema(&source_schema).is_ok(), + "Factory should select appropriate adapter that handles schema evolution" + ); + + Ok(()) + } + + #[test] + fn test_adapt_struct_with_added_nested_fields() -> Result<()> { + // Create test schemas + let (file_schema, table_schema) = create_test_schemas_with_nested_fields(); + + // Create batch with test data + let batch = create_test_batch_with_struct_data(&file_schema)?; + + // Create adapter and apply it + let mapped_batch = + adapt_batch_with_nested_schema_adapter(&file_schema, &table_schema, batch)?; + + // Verify the results + verify_adapted_batch_with_nested_fields(&mapped_batch, &table_schema)?; + + Ok(()) + } + + /// Create file and table schemas for testing nested field evolution + fn create_test_schemas_with_nested_fields() -> (SchemaRef, SchemaRef) { + // Create file schema with just location and timestamp_utc + let file_schema = Arc::new(Schema::new(vec![Field::new( + "info", + Struct( + vec![ + Field::new("location", Utf8, true), + Field::new( + "timestamp_utc", + Timestamp(Millisecond, Some("UTC".into())), + true, + ), + ] + .into(), + ), + true, + )])); + + // Create table schema with additional nested reason field + let table_schema = Arc::new(Schema::new(vec![Field::new( + "info", + Struct( + vec![ + Field::new("location", Utf8, true), + Field::new( + "timestamp_utc", + Timestamp(Millisecond, Some("UTC".into())), + true, + ), + Field::new( + "reason", + Struct( + vec![ + Field::new("_level", Float64, true), + Field::new( + "details", + Struct( + vec![ + Field::new("rurl", Utf8, true), + Field::new("s", Float64, true), + Field::new("t", Utf8, true), + ] + .into(), + ), + true, + ), + ] + .into(), + ), + true, + ), + ] + .into(), + ), + true, + )])); + + (file_schema, table_schema) + } + + /// Create a test RecordBatch with struct data matching the file schema + fn create_test_batch_with_struct_data( + file_schema: &SchemaRef, + ) -> Result { + let mut location_builder = StringBuilder::new(); + location_builder.append_value("San Francisco"); + location_builder.append_value("New York"); + + // Create timestamp array + let timestamp_array = TimestampMillisecondArray::from(vec![ + Some(1640995200000), // 2022-01-01 + Some(1641081600000), // 2022-01-02 + ]); + + // Create data type with UTC timezone to match the schema + let timestamp_type = Timestamp(Millisecond, Some("UTC".into())); + + // Cast the timestamp array to include the timezone metadata + let timestamp_array = cast(×tamp_array, ×tamp_type)?; + + let info_struct = StructArray::from(vec![ + ( + Arc::new(Field::new("location", Utf8, true)), + Arc::new(location_builder.finish()) as Arc, + ), + ( + Arc::new(Field::new("timestamp_utc", timestamp_type, true)), + timestamp_array, + ), + ]); + + Ok(RecordBatch::try_new( + Arc::clone(file_schema), + vec![Arc::new(info_struct)], + )?) + } + + /// Apply the nested schema adapter to the batch + fn adapt_batch_with_nested_schema_adapter( + file_schema: &SchemaRef, + table_schema: &SchemaRef, + batch: RecordBatch, + ) -> Result { + let adapter = NestedStructSchemaAdapter::new( + Arc::clone(table_schema), + Arc::clone(table_schema), + ); + + let (mapper, _) = adapter.map_schema(file_schema.as_ref())?; + mapper.map_batch(batch) + } + + /// Verify the adapted batch has the expected structure and data + fn verify_adapted_batch_with_nested_fields( + mapped_batch: &RecordBatch, + table_schema: &SchemaRef, + ) -> Result<()> { + // Verify the mapped batch structure and data + assert_eq!(mapped_batch.schema(), *table_schema); + assert_eq!(mapped_batch.num_rows(), 2); + + // Extract and verify the info struct column + let info_col = mapped_batch.column(0); + let info_array = info_col + .as_any() + .downcast_ref::() + .expect("Expected info column to be a StructArray"); + + // Verify the original fields are preserved + verify_preserved_fields(info_array)?; + + // Verify the reason field exists with correct structure + verify_reason_field_structure(info_array)?; + + Ok(()) + } + + /// Verify the original fields from file schema are preserved in the adapted batch + fn verify_preserved_fields(info_array: &StructArray) -> Result<()> { + // Verify location field + let location_col = info_array + .column_by_name("location") + .expect("Expected location field in struct"); + let location_array = location_col + .as_any() + .downcast_ref::() + .expect("Expected location to be a StringArray"); + + // Verify the location values are preserved + assert_eq!(location_array.value(0), "San Francisco"); + assert_eq!(location_array.value(1), "New York"); + + // Verify timestamp field + let timestamp_col = info_array + .column_by_name("timestamp_utc") + .expect("Expected timestamp_utc field in struct"); + let timestamp_array = timestamp_col + .as_any() + .downcast_ref::() + .expect("Expected timestamp_utc to be a TimestampMillisecondArray"); + + assert_eq!(timestamp_array.value(0), 1640995200000); + assert_eq!(timestamp_array.value(1), 1641081600000); + + Ok(()) + } + + /// Verify the added reason field structure and null values + fn verify_reason_field_structure(info_array: &StructArray) -> Result<()> { + // Verify the reason field exists and is null + let reason_col = info_array + .column_by_name("reason") + .expect("Expected reason field in struct"); + let reason_array = reason_col + .as_any() + .downcast_ref::() + .expect("Expected reason to be a StructArray"); + + // Verify reason has correct structure + assert_eq!(reason_array.fields().len(), 2); + assert!(reason_array.column_by_name("_level").is_some()); + assert!(reason_array.column_by_name("details").is_some()); + + // Verify details field has correct nested structure + let details_col = reason_array + .column_by_name("details") + .expect("Expected details field in reason struct"); + let details_array = details_col + .as_any() + .downcast_ref::() + .expect("Expected details to be a StructArray"); + + assert_eq!(details_array.fields().len(), 3); + assert!(details_array.column_by_name("rurl").is_some()); + assert!(details_array.column_by_name("s").is_some()); + assert!(details_array.column_by_name("t").is_some()); + + // Verify all added fields are null + for i in 0..2 { + assert!(reason_array.is_null(i), "reason field should be null"); + } + + Ok(()) + } + + // ================================ + // Data Mapping Tests + // ================================ + + // Helper function to verify column statistics match expected values + fn verify_column_statistics( + stats: &ColumnStatistics, + expected_null_count: Option, + expected_distinct_count: Option, + expected_min: Option, + expected_max: Option, + expected_sum: Option, + ) { + if let Some(count) = expected_null_count { + assert_eq!( + stats.null_count, + datafusion_common::stats::Precision::Exact(count), + "Null count should match expected value" + ); + } + + if let Some(count) = expected_distinct_count { + assert_eq!( + stats.distinct_count, + datafusion_common::stats::Precision::Exact(count), + "Distinct count should match expected value" + ); + } + + if let Some(min) = expected_min { + assert_eq!( + stats.min_value, + datafusion_common::stats::Precision::Exact(min), + "Min value should match expected value" + ); + } + + if let Some(max) = expected_max { + assert_eq!( + stats.max_value, + datafusion_common::stats::Precision::Exact(max), + "Max value should match expected value" + ); + } + + if let Some(sum) = expected_sum { + assert_eq!( + stats.sum_value, + datafusion_common::stats::Precision::Exact(sum), + "Sum value should match expected value" + ); + } + } + + // Helper to create test column statistics + fn create_test_column_statistics( + null_count: usize, + distinct_count: usize, + min_value: Option, + max_value: Option, + sum_value: Option, + ) -> ColumnStatistics { + ColumnStatistics { + null_count: datafusion_common::stats::Precision::Exact(null_count), + distinct_count: datafusion_common::stats::Precision::Exact(distinct_count), + min_value: min_value.map_or_else( + || datafusion_common::stats::Precision::Absent, + datafusion_common::stats::Precision::Exact, + ), + max_value: max_value.map_or_else( + || datafusion_common::stats::Precision::Absent, + datafusion_common::stats::Precision::Exact, + ), + sum_value: sum_value.map_or_else( + || datafusion_common::stats::Precision::Absent, + datafusion_common::stats::Precision::Exact, + ), + } + } + + #[test] + fn test_map_column_statistics_basic() -> Result<()> { + // Test statistics mapping with a simple schema + let file_schema = create_basic_nested_schema(); + let table_schema = create_deep_nested_schema(); + + let adapter = NestedStructSchemaAdapter::new( + Arc::clone(&table_schema), + Arc::clone(&table_schema), + ); + + let (mapper, _) = adapter.map_schema(file_schema.as_ref())?; + + // Create test statistics for additionalInfo column + let file_stats = vec![create_test_column_statistics( + 5, + 100, + Some(ScalarValue::Utf8(Some("min_value".to_string()))), + Some(ScalarValue::Utf8(Some("max_value".to_string()))), + Some(ScalarValue::Utf8(Some("sum_value".to_string()))), + )]; + + // Map statistics + let table_stats = mapper.map_column_statistics(&file_stats)?; + + // Verify count and content + assert_eq!( + table_stats.len(), + 1, + "Should have stats for one struct column" + ); + verify_column_statistics( + &table_stats[0], + Some(5), + Some(100), + Some(ScalarValue::Utf8(Some("min_value".to_string()))), + Some(ScalarValue::Utf8(Some("max_value".to_string()))), + Some(ScalarValue::Utf8(Some("sum_value".to_string()))), + ); + + Ok(()) + } + + #[test] + fn test_map_column_statistics_empty() -> Result<()> { + // Test statistics mapping with empty input + let file_schema = create_basic_nested_schema(); + let table_schema = create_deep_nested_schema(); + + let adapter = NestedStructSchemaAdapter::new( + Arc::clone(&table_schema), + Arc::clone(&table_schema), + ); + + let (mapper, _) = adapter.map_schema(file_schema.as_ref())?; + + // Test with missing statistics + let empty_stats = vec![]; + let mapped_empty_stats = mapper.map_column_statistics(&empty_stats)?; + + assert_eq!( + mapped_empty_stats.len(), + 1, + "Should have stats for one column even with empty input" + ); + + assert_eq!( + mapped_empty_stats[0], + ColumnStatistics::new_unknown(), + "Empty input should result in unknown statistics" + ); + + Ok(()) + } + + #[test] + fn test_map_column_statistics_multiple_columns() -> Result<()> { + // Create schemas with multiple columns + let file_schema = Arc::new(Schema::new(vec![ + Field::new("id", Int32, false), + Field::new( + "additionalInfo", + Struct( + vec![ + Field::new("location", Utf8, true), + Field::new( + "timestamp_utc", + Timestamp(Millisecond, Some("UTC".into())), + true, + ), + ] + .into(), + ), + true, + ), + ])); + + let table_schema = Arc::new(Schema::new(vec![ + Field::new("id", Int32, false), + Field::new( + "additionalInfo", + Struct( + vec![ + Field::new("location", Utf8, true), + Field::new( + "timestamp_utc", + Timestamp(Millisecond, Some("UTC".into())), + true, + ), + Field::new( + "reason", + Struct(vec![Field::new("_level", Float64, true)].into()), + true, + ), + ] + .into(), + ), + true, + ), + Field::new("status", Utf8, true), // Extra column in table schema + ])); + + // Create adapter and mapping + let adapter = NestedStructSchemaAdapter::new( + Arc::clone(&table_schema), + Arc::clone(&table_schema), + ); + + let (mapper, _) = adapter.map_schema(file_schema.as_ref())?; + + // Create file column statistics + let file_stats = vec![ + create_test_column_statistics( + 0, + 100, + Some(ScalarValue::Int32(Some(1))), + Some(ScalarValue::Int32(Some(100))), + Some(ScalarValue::Int32(Some(5100))), + ), + create_test_column_statistics(10, 50, None, None, None), + ]; + + // Map statistics + let table_stats = mapper.map_column_statistics(&file_stats)?; + + // Verify mapped statistics + assert_eq!( + table_stats.len(), + 3, + "Should have stats for all 3 columns in table schema" + ); + + // Verify ID column stats + verify_column_statistics( + &table_stats[0], + Some(0), + Some(100), + Some(ScalarValue::Int32(Some(1))), + Some(ScalarValue::Int32(Some(100))), + Some(ScalarValue::Int32(Some(5100))), + ); + + // Verify additionalInfo column stats + verify_column_statistics(&table_stats[1], Some(10), Some(50), None, None, None); + + // Verify status column has unknown stats + assert_eq!( + table_stats[2], + ColumnStatistics::new_unknown(), + "Missing column should have unknown statistics" + ); + + Ok(()) + } +} diff --git a/datafusion/datasource/src/schema_adapter.rs b/datafusion/datasource/src/schema_adapter.rs index bacec7f4f9f0..061486871737 100644 --- a/datafusion/datasource/src/schema_adapter.rs +++ b/datafusion/datasource/src/schema_adapter.rs @@ -23,7 +23,7 @@ use arrow::array::{new_null_array, RecordBatch, RecordBatchOptions}; use arrow::compute::{can_cast_types, cast}; -use arrow::datatypes::{Schema, SchemaRef}; +use arrow::datatypes::{Field, Schema, SchemaRef}; use datafusion_common::{plan_err, ColumnStatistics}; use std::fmt::Debug; use std::sync::Arc; @@ -225,6 +225,25 @@ pub(crate) struct DefaultSchemaAdapter { projected_table_schema: SchemaRef, } +/// Checks if a file field can be cast to a table field +/// +/// Returns Ok(true) if casting is possible, or an error explaining why casting is not possible +pub(crate) fn can_cast_field( + file_field: &Field, + table_field: &Field, +) -> datafusion_common::Result { + if can_cast_types(file_field.data_type(), table_field.data_type()) { + Ok(true) + } else { + plan_err!( + "Cannot cast file schema field {} of type {:?} to table schema field of type {:?}", + file_field.name(), + file_field.data_type(), + table_field.data_type() + ) + } +} + impl SchemaAdapter for DefaultSchemaAdapter { /// Map a column index in the table schema to a column index in a particular /// file schema @@ -248,40 +267,53 @@ impl SchemaAdapter for DefaultSchemaAdapter { &self, file_schema: &Schema, ) -> datafusion_common::Result<(Arc, Vec)> { - let mut projection = Vec::with_capacity(file_schema.fields().len()); - let mut field_mappings = vec![None; self.projected_table_schema.fields().len()]; - - for (file_idx, file_field) in file_schema.fields.iter().enumerate() { - if let Some((table_idx, table_field)) = - self.projected_table_schema.fields().find(file_field.name()) - { - match can_cast_types(file_field.data_type(), table_field.data_type()) { - true => { - field_mappings[table_idx] = Some(projection.len()); - projection.push(file_idx); - } - false => { - return plan_err!( - "Cannot cast file schema field {} of type {:?} to table schema field of type {:?}", - file_field.name(), - file_field.data_type(), - table_field.data_type() - ) - } - } - } - } + let (field_mappings, projection) = create_field_mapping( + file_schema, + &self.projected_table_schema, + can_cast_field, + )?; Ok(( - Arc::new(SchemaMapping { - projected_table_schema: Arc::clone(&self.projected_table_schema), + Arc::new(SchemaMapping::new( + Arc::clone(&self.projected_table_schema), field_mappings, - }), + )), projection, )) } } +/// Helper function that creates field mappings between file schema and table schema +/// +/// Maps columns from the file schema to their corresponding positions in the table schema, +/// applying type compatibility checking via the provided predicate function. +/// +/// Returns field mappings (for column reordering) and a projection (for field selection). +pub(crate) fn create_field_mapping( + file_schema: &Schema, + projected_table_schema: &SchemaRef, + can_map_field: F, +) -> datafusion_common::Result<(Vec>, Vec)> +where + F: Fn(&Field, &Field) -> datafusion_common::Result, +{ + let mut projection = Vec::with_capacity(file_schema.fields().len()); + let mut field_mappings = vec![None; projected_table_schema.fields().len()]; + + for (file_idx, file_field) in file_schema.fields.iter().enumerate() { + if let Some((table_idx, table_field)) = + projected_table_schema.fields().find(file_field.name()) + { + if can_map_field(file_field, table_field)? { + field_mappings[table_idx] = Some(projection.len()); + projection.push(file_idx); + } + } + } + + Ok((field_mappings, projection)) +} + /// The SchemaMapping struct holds a mapping from the file schema to the table /// schema and any necessary type conversions. /// @@ -304,6 +336,21 @@ pub struct SchemaMapping { field_mappings: Vec>, } +impl SchemaMapping { + /// Creates a new SchemaMapping instance + /// + /// Initializes the field mappings needed to transform file data to the projected table schema + pub fn new( + projected_table_schema: SchemaRef, + field_mappings: Vec>, + ) -> Self { + Self { + projected_table_schema, + field_mappings, + } + } +} + impl SchemaMapper for SchemaMapping { /// Adapts a `RecordBatch` to match the `projected_table_schema` using the stored mapping and /// conversions. @@ -462,4 +509,80 @@ mod tests { assert_eq!(table_col_stats[0], ColumnStatistics::new_unknown(),); assert_eq!(table_col_stats[1], ColumnStatistics::new_unknown(),); } + + #[test] + fn test_map_schema_happy_path() { + // Create table schema (a, b, c) + let table_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Utf8, true), + Field::new("c", DataType::Float64, true), + ])); + + // Create file schema with compatible types but different order and a missing column + let file_schema = Schema::new(vec![ + Field::new("b", DataType::Utf8, true), + Field::new("a", DataType::Int32, true), + // c is missing + ]); + + // Create SchemaAdapter + let adapter = DefaultSchemaAdapter { + projected_table_schema: Arc::clone(&table_schema), + }; + + // Get mapper and projection - This should succeed + let (mapper, projection) = adapter.map_schema(&file_schema).unwrap(); + + // Should project columns 0,1 from file + assert_eq!(projection, vec![0, 1]); + + // Check field mappings in the SchemaMapping struct + if let Some(schema_mapping) = mapper.downcast_ref::() { + assert_eq!(schema_mapping.field_mappings.len(), 3); + assert_eq!(schema_mapping.field_mappings[0], Some(1)); // a maps to file index 1 + assert_eq!(schema_mapping.field_mappings[1], Some(0)); // b maps to file index 0 + assert_eq!(schema_mapping.field_mappings[2], None); // c is missing + } else { + panic!("Expected mapper to be a SchemaMapping"); + } + } + + #[test] + fn test_map_schema_error_path() { + // Create table schema with specific types + let table_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Utf8, true), + ])); + + // Create file schema with incompatible type for column b + let file_schema = Schema::new(vec![ + Field::new("a", DataType::Int32, true), + // Boolean cannot be cast to Utf8 + Field::new("b", DataType::Boolean, true), + ]); + + // Create SchemaAdapter + let adapter = DefaultSchemaAdapter { + projected_table_schema: Arc::clone(&table_schema), + }; + + // map_schema should return an error + let result = adapter.map_schema(&file_schema); + assert!(result.is_err()); + + // Verify error message contains expected information + let error_msg = result.unwrap_err().to_string(); + assert!( + error_msg.contains("Cannot cast file schema field b"), + "Error message '{}' does not contain expected text", + error_msg + ); + assert!( + error_msg.contains("Boolean") && error_msg.contains("Utf8"), + "Error message '{}' does not mention the incompatible types", + error_msg + ); + } }