Skip to content

Commit ff11e08

Browse files
committed
feat: Update to Datafusion 47.0.0
Signed-off-by: Andrew Lamb <[email protected]>
1 parent 64f6369 commit ff11e08

File tree

30 files changed

+141
-147
lines changed

30 files changed

+141
-147
lines changed

Cargo.toml

Lines changed: 41 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -26,35 +26,37 @@ debug = true
2626
debug = "line-tables-only"
2727

2828
[workspace.dependencies]
29-
delta_kernel = { version = "0.8.0", features = ["arrow_54"] }
29+
#delta_kernel = { version = "0.8.0", features = ["arrow_54"] }
3030
#delta_kernel = { path = "../delta-kernel-rs/kernel", features = ["sync-engine"] }
31+
# https://github.com/delta-io/delta-kernel-rs/pull/831
32+
delta_kernel = { git = "https://github.com/alamb/delta-kernel-rs.git", rev = "e0c211a2feaf60ff7be1f7102292c6d93dbb8ecd", features = ["arrow_55"] }
3133

3234
# arrow
33-
arrow = { version = "54" }
34-
arrow-arith = { version = "54" }
35-
arrow-array = { version = "54", features = ["chrono-tz"] }
36-
arrow-buffer = { version = "54" }
37-
arrow-cast = { version = "54" }
38-
arrow-ipc = { version = "54" }
39-
arrow-json = { version = "54" }
40-
arrow-ord = { version = "54" }
41-
arrow-row = { version = "54" }
42-
arrow-schema = { version = "54" }
43-
arrow-select = { version = "54" }
44-
object_store = { version = "0.11.2" }
45-
parquet = { version = "54" }
35+
arrow = { version = "55" }
36+
arrow-arith = { version = "55" }
37+
arrow-array = { version = "55", features = ["chrono-tz"] }
38+
arrow-buffer = { version = "55" }
39+
arrow-cast = { version = "55" }
40+
arrow-ipc = { version = "55" }
41+
arrow-json = { version = "55" }
42+
arrow-ord = { version = "55" }
43+
arrow-row = { version = "55" }
44+
arrow-schema = { version = "55" }
45+
arrow-select = { version = "55" }
46+
object_store = { version = "0.12.0" }
47+
parquet = { version = "55" }
4648

4749
# datafusion
48-
datafusion = "46"
49-
datafusion-expr = "46"
50-
datafusion-common = "46"
51-
datafusion-ffi = "46"
52-
datafusion-functions = "46"
53-
datafusion-functions-aggregate = "46"
54-
datafusion-physical-expr = "46"
55-
datafusion-physical-plan = "46"
56-
datafusion-proto = "46"
57-
datafusion-sql = "46"
50+
datafusion = "46.0.1"
51+
datafusion-expr = "46.0.1"
52+
datafusion-common = "46.0.1"
53+
datafusion-ffi = "46.0.1"
54+
datafusion-functions = "46.0.1"
55+
datafusion-functions-aggregate = "46.0.1"
56+
datafusion-physical-expr = "46.0.1"
57+
datafusion-physical-plan = "46.0.1"
58+
datafusion-proto = "46.0.1"
59+
datafusion-sql = "46.0.1"
5860

5961
# serde
6062
serde = { version = "1.0.194", features = ["derive"] }
@@ -64,7 +66,7 @@ strum = { version = "0.26"}
6466

6567
# "stdlib"
6668
bytes = { version = "1" }
67-
chrono = { version = "=0.4.39", default-features = false, features = ["clock"] }
69+
chrono = { version = "0.4.40", default-features = false, features = ["clock"] }
6870
tracing = { version = "0.1", features = ["log"] }
6971
regex = { version = "1" }
7072
thiserror = { version = "2" }
@@ -77,3 +79,17 @@ async-trait = { version = "0.1" }
7779
futures = { version = "0.3" }
7880
tokio = { version = "1" }
7981
num_cpus = { version = "1" }
82+
83+
84+
[patch.crates-io]
85+
# https://github.com/apache/datafusion/pull/15466 (arrow 55)
86+
datafusion = { git = "https://github.com/apache/datafusion.git", rev = "1f0711e6f33903226d835808e6b178032f38e178" }
87+
datafusion-expr = { git = "https://github.com/apache/datafusion.git", rev = "1f0711e6f33903226d835808e6b178032f38e178" }
88+
datafusion-common = { git = "https://github.com/apache/datafusion.git", rev = "1f0711e6f33903226d835808e6b178032f38e178" }
89+
datafusion-ffi = { git = "https://github.com/apache/datafusion.git", rev = "1f0711e6f33903226d835808e6b178032f38e178" }
90+
datafusion-functions = { git = "https://github.com/apache/datafusion.git", rev = "1f0711e6f33903226d835808e6b178032f38e178" }
91+
datafusion-functions-aggregate = {git = "https://github.com/apache/datafusion.git", rev = "1f0711e6f33903226d835808e6b178032f38e178" }
92+
datafusion-physical-expr = { git = "https://github.com/apache/datafusion.git", rev = "1f0711e6f33903226d835808e6b178032f38e178" }
93+
datafusion-physical-plan = { git = "https://github.com/apache/datafusion.git", rev = "1f0711e6f33903226d835808e6b178032f38e178" }
94+
datafusion-proto = { git = "https://github.com/apache/datafusion.git", rev = "1f0711e6f33903226d835808e6b178032f38e178" }
95+
datafusion-sql = { git = "https://github.com/apache/datafusion.git", rev = "1f0711e6f33903226d835808e6b178032f38e178" }

crates/aws/src/storage.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -368,7 +368,7 @@ impl ObjectStore for S3StorageBackend {
368368
self.inner.get_opts(location, options).await
369369
}
370370

371-
async fn get_range(&self, location: &Path, range: Range<usize>) -> ObjectStoreResult<Bytes> {
371+
async fn get_range(&self, location: &Path, range: Range<u64>) -> ObjectStoreResult<Bytes> {
372372
self.inner.get_range(location, range).await
373373
}
374374

@@ -380,15 +380,15 @@ impl ObjectStore for S3StorageBackend {
380380
self.inner.delete(location).await
381381
}
382382

383-
fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, ObjectStoreResult<ObjectMeta>> {
383+
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, ObjectStoreResult<ObjectMeta>> {
384384
self.inner.list(prefix)
385385
}
386386

387387
fn list_with_offset(
388388
&self,
389389
prefix: Option<&Path>,
390390
offset: &Path,
391-
) -> BoxStream<'_, ObjectStoreResult<ObjectMeta>> {
391+
) -> BoxStream<'static, ObjectStoreResult<ObjectMeta>> {
392392
self.inner.list_with_offset(prefix, offset)
393393
}
394394

crates/core/src/delta_datafusion/cdf/scan_utils.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ pub fn create_partition_values<F: FileAction>(
7575
let part = PartitionedFile {
7676
object_meta: ObjectMeta {
7777
location: Path::parse(action.path().as_str())?,
78-
size: action.size()?,
78+
size: action.size()? as u64,
7979
e_tag: None,
8080
last_modified: chrono::Utc.timestamp_nanos(0),
8181
version: None,

crates/core/src/delta_datafusion/logical.rs

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -50,15 +50,6 @@ impl UserDefinedLogicalNodeCore for MetricObserver {
5050
write!(f, "MetricObserver id={}", self.id)
5151
}
5252

53-
fn from_template(
54-
&self,
55-
exprs: &[datafusion_expr::Expr],
56-
inputs: &[datafusion_expr::LogicalPlan],
57-
) -> Self {
58-
self.with_exprs_and_inputs(exprs.to_vec(), inputs.to_vec())
59-
.unwrap()
60-
}
61-
6253
fn with_exprs_and_inputs(
6354
&self,
6455
_exprs: Vec<datafusion_expr::Expr>,

crates/core/src/delta_datafusion/mod.rs

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ use chrono::{DateTime, TimeZone, Utc};
4040
use datafusion::catalog::{Session, TableProviderFactory};
4141
use datafusion::config::TableParquetOptions;
4242
use datafusion::datasource::physical_plan::{
43-
wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileScanConfig, ParquetSource,
43+
wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileGroup, FileScanConfig,
44+
ParquetSource,
4445
};
4546
use datafusion::datasource::{listing::PartitionedFile, MemTable, TableProvider, TableType};
4647
use datafusion::execution::context::{SessionConfig, SessionContext, SessionState, TaskContext};
@@ -682,9 +683,9 @@ impl<'a> DeltaScanBuilder<'a> {
682683
//
683684
// See https://github.com/apache/datafusion/issues/11322
684685
if file_groups.is_empty() {
685-
vec![vec![]]
686+
vec![FileGroup::from(vec![])]
686687
} else {
687-
file_groups.into_values().collect()
688+
file_groups.into_values().map(FileGroup::from).collect()
688689
},
689690
)
690691
.with_statistics(stats)
@@ -2731,10 +2732,6 @@ mod tests {
27312732
visit_execution_plan(&scan, &mut visitor).unwrap();
27322733

27332734
assert_eq!(visitor.predicate.unwrap().to_string(), "a@0 = s");
2734-
assert_eq!(
2735-
visitor.pruning_predicate.unwrap().orig_expr().to_string(),
2736-
"a@0 = s"
2737-
);
27382735
}
27392736

27402737
#[tokio::test]
@@ -2766,7 +2763,6 @@ mod tests {
27662763
visit_execution_plan(&scan, &mut visitor).unwrap();
27672764

27682765
assert!(visitor.predicate.is_none());
2769-
assert!(visitor.pruning_predicate.is_none());
27702766
}
27712767

27722768
#[tokio::test]
@@ -2801,7 +2797,6 @@ mod tests {
28012797
#[derive(Default)]
28022798
struct ParquetVisitor {
28032799
predicate: Option<Arc<dyn PhysicalExpr>>,
2804-
pruning_predicate: Option<Arc<PruningPredicate>>,
28052800
options: Option<TableParquetOptions>,
28062801
}
28072802

@@ -2828,7 +2823,6 @@ mod tests {
28282823
{
28292824
self.options = Some(parquet_source.table_parquet_options().clone());
28302825
self.predicate = parquet_source.predicate().cloned();
2831-
self.pruning_predicate = parquet_source.pruning_predicate().cloned();
28322826
}
28332827

28342828
Ok(true)
@@ -2974,8 +2968,8 @@ mod tests {
29742968

29752969
#[derive(Debug, PartialEq)]
29762970
enum ObjectStoreOperation {
2977-
GetRanges(LocationType, Vec<Range<usize>>),
2978-
GetRange(LocationType, Range<usize>),
2971+
GetRanges(LocationType, Vec<Range<u64>>),
2972+
GetRange(LocationType, Range<u64>),
29792973
GetOpts(LocationType),
29802974
Get(LocationType),
29812975
}
@@ -3054,7 +3048,7 @@ mod tests {
30543048
async fn get_range(
30553049
&self,
30563050
location: &Path,
3057-
range: Range<usize>,
3051+
range: Range<u64>,
30583052
) -> object_store::Result<Bytes> {
30593053
self.operations
30603054
.send(ObjectStoreOperation::GetRange(
@@ -3068,7 +3062,7 @@ mod tests {
30683062
async fn get_ranges(
30693063
&self,
30703064
location: &Path,
3071-
ranges: &[Range<usize>],
3065+
ranges: &[Range<u64>],
30723066
) -> object_store::Result<Vec<Bytes>> {
30733067
self.operations
30743068
.send(ObjectStoreOperation::GetRanges(
@@ -3094,15 +3088,18 @@ mod tests {
30943088
self.inner.delete_stream(locations)
30953089
}
30963090

3097-
fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
3091+
fn list(
3092+
&self,
3093+
prefix: Option<&Path>,
3094+
) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
30983095
self.inner.list(prefix)
30993096
}
31003097

31013098
fn list_with_offset(
31023099
&self,
31033100
prefix: Option<&Path>,
31043101
offset: &Path,
3105-
) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
3102+
) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
31063103
self.inner.list_with_offset(prefix, offset)
31073104
}
31083105

crates/core/src/delta_datafusion/schema_adapter.rs

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -77,21 +77,4 @@ impl SchemaMapper for SchemaMapping {
7777
let record_batch = cast_record_batch(&batch, self.projected_schema.clone(), false, true)?;
7878
Ok(record_batch)
7979
}
80-
81-
fn map_partial_batch(&self, batch: RecordBatch) -> datafusion_common::Result<RecordBatch> {
82-
let partial_table_schema = Arc::new(Schema::new(
83-
batch
84-
.schema()
85-
.fields()
86-
.iter()
87-
.filter_map(|batch_field| {
88-
self.table_schema.field_with_name(batch_field.name()).ok()
89-
})
90-
.cloned()
91-
.collect::<Vec<_>>(),
92-
));
93-
94-
let record_batch = cast_record_batch(&batch, partial_table_schema, false, true)?;
95-
Ok(record_batch)
96-
}
9780
}

crates/core/src/kernel/snapshot/log_data.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,7 @@ impl<'a> TryFrom<&LogicalFile<'a>> for ObjectMeta {
391391
fn try_from(file_stats: &LogicalFile<'a>) -> Result<Self, Self::Error> {
392392
Ok(ObjectMeta {
393393
location: file_stats.object_store_path(),
394-
size: file_stats.size() as usize,
394+
size: file_stats.size() as u64,
395395
last_modified: file_stats.modification_datetime()?,
396396
version: None,
397397
e_tag: None,

crates/core/src/kernel/snapshot/log_segment.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,8 @@ impl LogSegment {
282282
let store = store.clone();
283283
let read_schema = read_schema.clone();
284284
async move {
285-
let mut reader = ParquetObjectReader::new(store, meta);
285+
let mut reader =
286+
ParquetObjectReader::new(store, meta.location).with_file_size(meta.size);
286287
let options = ArrowReaderOptions::new();
287288
let reader_meta = ArrowReaderMetadata::load_async(&mut reader, options).await?;
288289

@@ -395,7 +396,7 @@ impl LogSegment {
395396
let bytes = commit.get_bytes()?;
396397
let meta = ObjectMeta {
397398
location: path,
398-
size: bytes.len(),
399+
size: bytes.len() as u64,
399400
last_modified: Utc::now(),
400401
e_tag: None,
401402
version: None,
@@ -759,7 +760,7 @@ pub(super) mod tests {
759760
self.store.delete(location).await
760761
}
761762

762-
fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
763+
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
763764
std::thread::sleep(std::time::Duration::from_secs(1));
764765
self.store.list(prefix)
765766
}

crates/core/src/kernel/snapshot/serde.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ impl Serialize for LogSegment {
2929
.iter()
3030
.map(|f| FileInfo {
3131
path: f.location.to_string(),
32-
size: f.size,
32+
size: f.size as usize,
3333
last_modified: f.last_modified.timestamp_nanos_opt().unwrap(),
3434
e_tag: f.e_tag.clone(),
3535
version: f.version.clone(),
@@ -40,7 +40,7 @@ impl Serialize for LogSegment {
4040
.iter()
4141
.map(|f| FileInfo {
4242
path: f.location.to_string(),
43-
size: f.size,
43+
size: f.size as usize,
4444
last_modified: f.last_modified.timestamp_nanos_opt().unwrap(),
4545
e_tag: f.e_tag.clone(),
4646
version: f.version.clone(),
@@ -88,7 +88,7 @@ impl<'de> Visitor<'de> for LogSegmentVisitor {
8888
let nano_seconds = (f.last_modified % 1_000_000_000) as u32;
8989
ObjectMeta {
9090
location: f.path.into(),
91-
size: f.size,
91+
size: f.size as u64,
9292
last_modified: Utc.timestamp_opt(seconds, nano_seconds).single().unwrap(),
9393
version: f.version,
9494
e_tag: f.e_tag,
@@ -99,7 +99,7 @@ impl<'de> Visitor<'de> for LogSegmentVisitor {
9999
.into_iter()
100100
.map(|f| ObjectMeta {
101101
location: f.path.into(),
102-
size: f.size,
102+
size: f.size as u64,
103103
last_modified: DateTime::from_timestamp_millis(f.last_modified).unwrap(),
104104

105105
version: None,

crates/core/src/logstore/default_logstore.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ fn put_options() -> &'static PutOptions {
1919
mode: object_store::PutMode::Create, // Creates if file doesn't exists yet
2020
tags: TagSet::default(),
2121
attributes: Attributes::default(),
22+
extensions: Default::default(),
2223
})
2324
}
2425

crates/core/src/operations/convert_to_delta.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -352,11 +352,11 @@ impl ConvertToDeltaBuilder {
352352
subpath = iter.next();
353353
}
354354

355-
let batch_builder = ParquetRecordBatchStreamBuilder::new(ParquetObjectReader::new(
356-
object_store.clone(),
357-
file.clone(),
358-
))
359-
.await?;
355+
let object_reader =
356+
ParquetObjectReader::new(object_store.clone(), file.location.clone())
357+
.with_file_size(file.size);
358+
359+
let batch_builder = ParquetRecordBatchStreamBuilder::new(object_reader).await?;
360360

361361
// Fetch the stats
362362
let parquet_metadata = batch_builder.metadata();

0 commit comments

Comments
 (0)