Skip to content

Commit e430e36

Browse files
committed
feat: Update to Datafusion 47.0.0
Signed-off-by: Andrew Lamb <[email protected]>
1 parent 494e0b0 commit e430e36

File tree

31 files changed

+181
-178
lines changed

31 files changed

+181
-178
lines changed

Cargo.toml

Lines changed: 41 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -26,37 +26,38 @@ debug = true
2626
debug = "line-tables-only"
2727

2828
[workspace.dependencies]
29-
delta_kernel = { version = "0.9.0", features = [
30-
"arrow_54",
29+
# https://github.com/delta-io/delta-kernel-rs/pull/831
30+
delta_kernel = { git = "https://github.com/alamb/delta-kernel-rs.git", rev = "f6d0f7ffb14d7d2e10b68b770de740f79215c250", features = [
31+
"arrow_55",
3132
"developer-visibility",
3233
] }
3334

3435
# arrow
35-
arrow = { version = "54" }
36-
arrow-arith = { version = "54" }
37-
arrow-array = { version = "54", features = ["chrono-tz"] }
38-
arrow-buffer = { version = "54" }
39-
arrow-cast = { version = "54" }
40-
arrow-ipc = { version = "54" }
41-
arrow-json = { version = "54" }
42-
arrow-ord = { version = "54" }
43-
arrow-row = { version = "54" }
44-
arrow-schema = { version = "54" }
45-
arrow-select = { version = "54" }
46-
object_store = { version = "0.11.2" }
47-
parquet = { version = "54" }
36+
arrow = { version = "55" }
37+
arrow-arith = { version = "55" }
38+
arrow-array = { version = "55", features = ["chrono-tz"] }
39+
arrow-buffer = { version = "55" }
40+
arrow-cast = { version = "55" }
41+
arrow-ipc = { version = "55" }
42+
arrow-json = { version = "55" }
43+
arrow-ord = { version = "55" }
44+
arrow-row = { version = "55" }
45+
arrow-schema = { version = "55" }
46+
arrow-select = { version = "55" }
47+
object_store = { version = "0.12.0" }
48+
parquet = { version = "55" }
4849

4950
# datafusion
50-
datafusion = "46"
51-
datafusion-expr = "46"
52-
datafusion-common = "46"
53-
datafusion-ffi = "46"
54-
datafusion-functions = "46"
55-
datafusion-functions-aggregate = "46"
56-
datafusion-physical-expr = "46"
57-
datafusion-physical-plan = "46"
58-
datafusion-proto = "46"
59-
datafusion-sql = "46"
51+
datafusion = "47.0.0"
52+
datafusion-expr = "47.0.0"
53+
datafusion-common = "47.0.0"
54+
datafusion-ffi = "47.0.0"
55+
datafusion-functions = "47.0.0"
56+
datafusion-functions-aggregate = "47.0.0"
57+
datafusion-physical-expr = "47.0.0"
58+
datafusion-physical-plan = "47.0.0"
59+
datafusion-proto = "47.0.0"
60+
datafusion-sql = "47.0.0"
6061

6162
# serde
6263
serde = { version = "1.0.194", features = ["derive"] }
@@ -65,7 +66,7 @@ strum = { version = "0.26" }
6566

6667
# "stdlib"
6768
bytes = { version = "1" }
68-
chrono = { version = "=0.4.39", default-features = false, features = ["clock"] }
69+
chrono = { version = "0.4.40", default-features = false, features = ["clock"] }
6970
tracing = { version = "0.1", features = ["log"] }
7071
regex = { version = "1" }
7172
thiserror = { version = "2" }
@@ -78,3 +79,17 @@ async-trait = { version = "0.1" }
7879
futures = { version = "0.3" }
7980
tokio = { version = "1" }
8081
num_cpus = { version = "1" }
82+
83+
84+
[patch.crates-io]
85+
# https://github.com/apache/datafusion/pull/15466 (arrow 55)
86+
datafusion = { git = "https://github.com/apache/datafusion.git", branch = "branch-47" }
87+
datafusion-expr = { git = "https://github.com/apache/datafusion.git", branch = "branch-47" }
88+
datafusion-common = { git = "https://github.com/apache/datafusion.git", branch = "branch-47" }
89+
datafusion-ffi = { git = "https://github.com/apache/datafusion.git", branch = "branch-47" }
90+
datafusion-functions = { git = "https://github.com/apache/datafusion.git", branch = "branch-47" }
91+
datafusion-functions-aggregate = {git = "https://github.com/apache/datafusion.git", branch = "branch-47" }
92+
datafusion-physical-expr = { git = "https://github.com/apache/datafusion.git", branch = "branch-47" }
93+
datafusion-physical-plan = { git = "https://github.com/apache/datafusion.git", branch = "branch-47" }
94+
datafusion-proto = { git = "https://github.com/apache/datafusion.git", branch = "branch-47" }
95+
datafusion-sql = { git = "https://github.com/apache/datafusion.git", branch = "branch-47" }

crates/aws/src/storage.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -358,7 +358,7 @@ impl ObjectStore for S3StorageBackend {
358358
self.inner.get_opts(location, options).await
359359
}
360360

361-
async fn get_range(&self, location: &Path, range: Range<usize>) -> ObjectStoreResult<Bytes> {
361+
async fn get_range(&self, location: &Path, range: Range<u64>) -> ObjectStoreResult<Bytes> {
362362
self.inner.get_range(location, range).await
363363
}
364364

@@ -370,15 +370,15 @@ impl ObjectStore for S3StorageBackend {
370370
self.inner.delete(location).await
371371
}
372372

373-
fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, ObjectStoreResult<ObjectMeta>> {
373+
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, ObjectStoreResult<ObjectMeta>> {
374374
self.inner.list(prefix)
375375
}
376376

377377
fn list_with_offset(
378378
&self,
379379
prefix: Option<&Path>,
380380
offset: &Path,
381-
) -> BoxStream<'_, ObjectStoreResult<ObjectMeta>> {
381+
) -> BoxStream<'static, ObjectStoreResult<ObjectMeta>> {
382382
self.inner.list_with_offset(prefix, offset)
383383
}
384384

crates/aws/tests/repair_s3_rename_test.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ impl ObjectStore for DelayedObjectStore {
187187
self.inner.get_opts(location, options).await
188188
}
189189

190-
async fn get_range(&self, location: &Path, range: Range<usize>) -> ObjectStoreResult<Bytes> {
190+
async fn get_range(&self, location: &Path, range: Range<u64>) -> ObjectStoreResult<Bytes> {
191191
self.inner.get_range(location, range).await
192192
}
193193

@@ -199,15 +199,15 @@ impl ObjectStore for DelayedObjectStore {
199199
self.inner.delete(location).await
200200
}
201201

202-
fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, ObjectStoreResult<ObjectMeta>> {
202+
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, ObjectStoreResult<ObjectMeta>> {
203203
self.inner.list(prefix)
204204
}
205205

206206
fn list_with_offset(
207207
&self,
208208
prefix: Option<&Path>,
209209
offset: &Path,
210-
) -> BoxStream<'_, ObjectStoreResult<ObjectMeta>> {
210+
) -> BoxStream<'static, ObjectStoreResult<ObjectMeta>> {
211211
self.inner.list_with_offset(prefix, offset)
212212
}
213213

crates/azure/tests/integration.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,8 @@ async fn read_write_test_onelake(context: &IntegrationContext, path: &Path) -> T
8585
assert_eq!(expected, fetched);
8686

8787
for range in [0..10, 3..5, 0..expected.len()] {
88-
let data = delta_store.get_range(path, range.clone()).await.unwrap();
88+
let range_u64 = range.start as u64..range.end as u64;
89+
let data = delta_store.get_range(path, range_u64).await.unwrap();
8990
assert_eq!(&data[..], &expected[range])
9091
}
9192

crates/core/src/delta_datafusion/cdf/scan_utils.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ pub fn create_partition_values<F: FileAction>(
7575
let part = PartitionedFile {
7676
object_meta: ObjectMeta {
7777
location: Path::parse(action.path().as_str())?,
78-
size: action.size()?,
78+
size: action.size()? as u64,
7979
e_tag: None,
8080
last_modified: chrono::Utc.timestamp_nanos(0),
8181
version: None,

crates/core/src/delta_datafusion/logical.rs

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -50,15 +50,6 @@ impl UserDefinedLogicalNodeCore for MetricObserver {
5050
write!(f, "MetricObserver id={}", self.id)
5151
}
5252

53-
fn from_template(
54-
&self,
55-
exprs: &[datafusion_expr::Expr],
56-
inputs: &[datafusion_expr::LogicalPlan],
57-
) -> Self {
58-
self.with_exprs_and_inputs(exprs.to_vec(), inputs.to_vec())
59-
.unwrap()
60-
}
61-
6253
fn with_exprs_and_inputs(
6354
&self,
6455
_exprs: Vec<datafusion_expr::Expr>,

crates/core/src/delta_datafusion/mod.rs

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,12 @@ use arrow_schema::{
3737
use arrow_select::concat::concat_batches;
3838
use async_trait::async_trait;
3939
use chrono::{DateTime, TimeZone, Utc};
40+
use datafusion::catalog::memory::DataSourceExec;
4041
use datafusion::catalog::{Session, TableProviderFactory};
4142
use datafusion::config::TableParquetOptions;
4243
use datafusion::datasource::physical_plan::{
43-
wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileScanConfig, ParquetSource,
44+
wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileGroup, FileScanConfigBuilder,
45+
ParquetSource,
4446
};
4547
use datafusion::datasource::{listing::PartitionedFile, MemTable, TableProvider, TableType};
4648
use datafusion::execution::context::{SessionConfig, SessionContext, SessionState, TaskContext};
@@ -671,7 +673,7 @@ impl<'a> DeltaScanBuilder<'a> {
671673
}
672674
};
673675

674-
let file_scan_config = FileScanConfig::new(
676+
let file_scan_config = FileScanConfigBuilder::new(
675677
self.log_store.object_store_url(),
676678
file_schema,
677679
Arc::new(file_source),
@@ -682,15 +684,16 @@ impl<'a> DeltaScanBuilder<'a> {
682684
//
683685
// See https://github.com/apache/datafusion/issues/11322
684686
if file_groups.is_empty() {
685-
vec![vec![]]
687+
vec![FileGroup::from(vec![])]
686688
} else {
687-
file_groups.into_values().collect()
689+
file_groups.into_values().map(FileGroup::from).collect()
688690
},
689691
)
690692
.with_statistics(stats)
691693
.with_projection(self.projection.cloned())
692694
.with_limit(self.limit)
693-
.with_table_partition_cols(table_partition_cols);
695+
.with_table_partition_cols(table_partition_cols)
696+
.build();
694697

695698
let metrics = ExecutionPlanMetricsSet::new();
696699
MetricBuilder::new(&metrics)
@@ -702,7 +705,7 @@ impl<'a> DeltaScanBuilder<'a> {
702705

703706
Ok(DeltaScan {
704707
table_uri: ensure_table_uri(self.log_store.root_uri())?.as_str().into(),
705-
parquet_scan: file_scan_config.build(),
708+
parquet_scan: DataSourceExec::from_data_source(file_scan_config),
706709
config,
707710
logical_schema,
708711
metrics,
@@ -1974,6 +1977,7 @@ mod tests {
19741977
use bytes::Bytes;
19751978
use chrono::{TimeZone, Utc};
19761979
use datafusion::assert_batches_sorted_eq;
1980+
use datafusion::datasource::physical_plan::FileScanConfig;
19771981
use datafusion::datasource::source::DataSourceExec;
19781982
use datafusion::physical_plan::empty::EmptyExec;
19791983
use datafusion::physical_plan::{visit_execution_plan, ExecutionPlanVisitor, PhysicalExpr};
@@ -2731,10 +2735,6 @@ mod tests {
27312735
visit_execution_plan(&scan, &mut visitor).unwrap();
27322736

27332737
assert_eq!(visitor.predicate.unwrap().to_string(), "a@0 = s");
2734-
assert_eq!(
2735-
visitor.pruning_predicate.unwrap().orig_expr().to_string(),
2736-
"a@0 = s"
2737-
);
27382738
}
27392739

27402740
#[tokio::test]
@@ -2766,7 +2766,6 @@ mod tests {
27662766
visit_execution_plan(&scan, &mut visitor).unwrap();
27672767

27682768
assert!(visitor.predicate.is_none());
2769-
assert!(visitor.pruning_predicate.is_none());
27702769
}
27712770

27722771
#[tokio::test]
@@ -2801,7 +2800,6 @@ mod tests {
28012800
#[derive(Default)]
28022801
struct ParquetVisitor {
28032802
predicate: Option<Arc<dyn PhysicalExpr>>,
2804-
pruning_predicate: Option<Arc<PruningPredicate>>,
28052803
options: Option<TableParquetOptions>,
28062804
}
28072805

@@ -2828,7 +2826,6 @@ mod tests {
28282826
{
28292827
self.options = Some(parquet_source.table_parquet_options().clone());
28302828
self.predicate = parquet_source.predicate().cloned();
2831-
self.pruning_predicate = parquet_source.pruning_predicate().cloned();
28322829
}
28332830

28342831
Ok(true)
@@ -2974,8 +2971,8 @@ mod tests {
29742971

29752972
#[derive(Debug, PartialEq)]
29762973
enum ObjectStoreOperation {
2977-
GetRanges(LocationType, Vec<Range<usize>>),
2978-
GetRange(LocationType, Range<usize>),
2974+
GetRanges(LocationType, Vec<Range<u64>>),
2975+
GetRange(LocationType, Range<u64>),
29792976
GetOpts(LocationType),
29802977
Get(LocationType),
29812978
}
@@ -3054,7 +3051,7 @@ mod tests {
30543051
async fn get_range(
30553052
&self,
30563053
location: &Path,
3057-
range: Range<usize>,
3054+
range: Range<u64>,
30583055
) -> object_store::Result<Bytes> {
30593056
self.operations
30603057
.send(ObjectStoreOperation::GetRange(
@@ -3068,7 +3065,7 @@ mod tests {
30683065
async fn get_ranges(
30693066
&self,
30703067
location: &Path,
3071-
ranges: &[Range<usize>],
3068+
ranges: &[Range<u64>],
30723069
) -> object_store::Result<Vec<Bytes>> {
30733070
self.operations
30743071
.send(ObjectStoreOperation::GetRanges(
@@ -3094,15 +3091,18 @@ mod tests {
30943091
self.inner.delete_stream(locations)
30953092
}
30963093

3097-
fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
3094+
fn list(
3095+
&self,
3096+
prefix: Option<&Path>,
3097+
) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
30983098
self.inner.list(prefix)
30993099
}
31003100

31013101
fn list_with_offset(
31023102
&self,
31033103
prefix: Option<&Path>,
31043104
offset: &Path,
3105-
) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
3105+
) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
31063106
self.inner.list_with_offset(prefix, offset)
31073107
}
31083108

crates/core/src/delta_datafusion/schema_adapter.rs

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@ impl SchemaAdapter for DeltaSchemaAdapter {
5959
Ok((
6060
Arc::new(SchemaMapping {
6161
projected_schema: self.projected_table_schema.clone(),
62-
table_schema: self.table_schema.clone(),
6362
}),
6463
projection,
6564
))
@@ -69,29 +68,11 @@ impl SchemaAdapter for DeltaSchemaAdapter {
6968
#[derive(Debug)]
7069
pub(crate) struct SchemaMapping {
7170
projected_schema: SchemaRef,
72-
table_schema: SchemaRef,
7371
}
7472

7573
impl SchemaMapper for SchemaMapping {
7674
fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result<RecordBatch> {
7775
let record_batch = cast_record_batch(&batch, self.projected_schema.clone(), false, true)?;
7876
Ok(record_batch)
7977
}
80-
81-
fn map_partial_batch(&self, batch: RecordBatch) -> datafusion_common::Result<RecordBatch> {
82-
let partial_table_schema = Arc::new(Schema::new(
83-
batch
84-
.schema()
85-
.fields()
86-
.iter()
87-
.filter_map(|batch_field| {
88-
self.table_schema.field_with_name(batch_field.name()).ok()
89-
})
90-
.cloned()
91-
.collect::<Vec<_>>(),
92-
));
93-
94-
let record_batch = cast_record_batch(&batch, partial_table_schema, false, true)?;
95-
Ok(record_batch)
96-
}
9778
}

crates/core/src/kernel/snapshot/log_data.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,7 @@ impl<'a> TryFrom<&LogicalFile<'a>> for ObjectMeta {
391391
fn try_from(file_stats: &LogicalFile<'a>) -> Result<Self, Self::Error> {
392392
Ok(ObjectMeta {
393393
location: file_stats.object_store_path(),
394-
size: file_stats.size() as usize,
394+
size: file_stats.size() as u64,
395395
last_modified: file_stats.modification_datetime()?,
396396
version: None,
397397
e_tag: None,

crates/core/src/kernel/snapshot/log_segment.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,8 @@ impl LogSegment {
300300
let store = store.clone();
301301
let read_schema = read_schema.clone();
302302
async move {
303-
let mut reader = ParquetObjectReader::new(store, meta);
303+
let mut reader =
304+
ParquetObjectReader::new(store, meta.location).with_file_size(meta.size);
304305
let options = ArrowReaderOptions::new();
305306
let reader_meta = ArrowReaderMetadata::load_async(&mut reader, options).await?;
306307

@@ -413,7 +414,7 @@ impl LogSegment {
413414
let bytes = commit.get_bytes()?;
414415
let meta = ObjectMeta {
415416
location: path,
416-
size: bytes.len(),
417+
size: bytes.len() as u64,
417418
last_modified: Utc::now(),
418419
e_tag: None,
419420
version: None,
@@ -777,7 +778,7 @@ pub(super) mod tests {
777778
self.store.delete(location).await
778779
}
779780

780-
fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
781+
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
781782
std::thread::sleep(std::time::Duration::from_secs(1));
782783
self.store.list(prefix)
783784
}

0 commit comments

Comments
 (0)