Skip to content

Commit

Permalink
Align schemas for DataFusion plan and stream (#829)
Browse files Browse the repository at this point in the history
  • Loading branch information
gruuya authored Dec 20, 2024
1 parent 982db0e commit c8f5d91
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 6 deletions.
5 changes: 1 addition & 4 deletions crates/integrations/datafusion/src/physical_plan/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ pub(crate) struct IcebergTableScan {
table: Table,
/// Snapshot of the table to scan.
snapshot_id: Option<i64>,
/// A reference-counted arrow `Schema`.
schema: ArrowSchemaRef,
/// Stores certain, often expensive to compute,
/// plan properties used in query optimization.
plan_properties: PlanProperties,
Expand Down Expand Up @@ -76,7 +74,6 @@ impl IcebergTableScan {
Self {
table,
snapshot_id,
schema,
plan_properties,
projection,
predicates,
Expand Down Expand Up @@ -134,7 +131,7 @@ impl ExecutionPlan for IcebergTableScan {
let stream = futures::stream::once(fut).try_flatten();

Ok(Box::pin(RecordBatchStreamAdapter::new(
self.schema.clone(),
self.schema(),
stream,
)))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ use std::sync::Arc;
use std::vec;

use datafusion::arrow::array::{Array, StringArray};
use datafusion::arrow::datatypes::DataType;
use datafusion::arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
use datafusion::execution::context::SessionContext;
use datafusion::parquet::arrow::PARQUET_FIELD_ID_META_KEY;
use iceberg::io::FileIOBuilder;
use iceberg::spec::{NestedField, PrimitiveType, Schema, StructType, Type};
use iceberg::{Catalog, NamespaceIdent, Result, TableCreation};
Expand Down Expand Up @@ -83,7 +84,7 @@ fn get_table_creation(
}

#[tokio::test]
async fn test_provider_get_table_schema() -> Result<()> {
async fn test_provider_plan_stream_schema() -> Result<()> {
let iceberg_catalog = get_iceberg_catalog();
let namespace = NamespaceIdent::new("test_provider_get_table_schema".to_string());
set_test_namespace(&iceberg_catalog, &namespace).await?;
Expand Down Expand Up @@ -111,6 +112,26 @@ async fn test_provider_get_table_schema() -> Result<()> {
assert!(!field.is_nullable())
}

let df = ctx
.sql("select foo2 from catalog.test_provider_get_table_schema.my_table")
.await
.unwrap();

let task_ctx = Arc::new(df.task_ctx());
let plan = df.create_physical_plan().await.unwrap();
let stream = plan.execute(1, task_ctx).unwrap();

// Ensure both the plan and the stream conform to the same schema
assert_eq!(plan.schema(), stream.schema());
assert_eq!(
stream.schema().as_ref(),
&ArrowSchema::new(vec![Field::new("foo2", DataType::Utf8, false)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"2".to_string(),
)]))]),
);

Ok(())
}

Expand Down

0 comments on commit c8f5d91

Please sign in to comment.