From f608a33864ecaf457827b606a64a56d6f3ca75ba Mon Sep 17 00:00:00 2001 From: Matthijs Brobbel Date: Mon, 24 Mar 2025 21:21:20 +0100 Subject: [PATCH 1/4] Bump `object_store` to `0.12.0` --- parquet/Cargo.toml | 4 ++-- parquet/src/arrow/async_reader/store.rs | 9 +++++++-- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index e9495437bea2..8a5845f875ca 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -45,7 +45,7 @@ arrow-data = { workspace = true, optional = true } arrow-schema = { workspace = true, optional = true } arrow-select = { workspace = true, optional = true } arrow-ipc = { workspace = true, optional = true } -object_store = { version = "0.11.0", default-features = false, optional = true } +object_store = { version = "0.12.0", default-features = false, optional = true } bytes = { version = "1.1", default-features = false, features = ["std"] } thrift = { version = "0.17", default-features = false } @@ -86,7 +86,7 @@ serde_json = { version = "1.0", features = ["std"], default-features = false } arrow = { workspace = true, features = ["ipc", "test_utils", "prettyprint", "json"] } tokio = { version = "1.0", default-features = false, features = ["macros", "rt-multi-thread", "io-util", "fs"] } rand = { version = "0.8", default-features = false, features = ["std", "std_rng"] } -object_store = { version = "0.11.0", default-features = false, features = ["azure"] } +object_store = { version = "0.12.0", default-features = false, features = ["azure", "fs"] } [package.metadata.docs.rs] all-features = true diff --git a/parquet/src/arrow/async_reader/store.rs b/parquet/src/arrow/async_reader/store.rs index a1e94efd1451..ec81c99cad8a 100644 --- a/parquet/src/arrow/async_reader/store.rs +++ b/parquet/src/arrow/async_reader/store.rs @@ -147,6 +147,7 @@ impl ParquetObjectReader { impl AsyncFileReader for ParquetObjectReader { fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { + let range = range.start as u64..range.end as u64; self.spawn(|store, path| store.get_range(path, range)) } @@ -154,6 +155,10 @@ impl AsyncFileReader for ParquetObjectReader { where Self: Send, { + let ranges = ranges + .into_iter() + .map(|range| range.start as u64..range.end as u64) + .collect::>(); self.spawn(|store, path| async move { store.get_ranges(path, &ranges).await }.boxed()) } @@ -165,7 +170,7 @@ impl AsyncFileReader for ParquetObjectReader { // `Self::get_bytes`. fn get_metadata(&mut self) -> BoxFuture<'_, Result>> { Box::pin(async move { - let file_size = self.meta.size; + let file_size = self.meta.size.try_into().expect("overflow"); let metadata = ParquetMetaDataReader::new() .with_column_indexes(self.preload_column_index) .with_offset_indexes(self.preload_offset_index) @@ -181,7 +186,7 @@ impl AsyncFileReader for ParquetObjectReader { options: &'a ArrowReaderOptions, ) -> BoxFuture<'a, Result>> { Box::pin(async move { - let file_size = self.meta.size; + let file_size = self.meta.size.try_into().expect("overflow"); let metadata = ParquetMetaDataReader::new() .with_column_indexes(self.preload_column_index) .with_offset_indexes(self.preload_offset_index) From dd7336ad3c3c52945f5c37b442cbf8d0407b722d Mon Sep 17 00:00:00 2001 From: Matthijs Brobbel Date: Mon, 31 Mar 2025 17:14:20 +0200 Subject: [PATCH 2/4] Fix some tests --- parquet/src/arrow/async_reader/store.rs | 10 +++++----- parquet/tests/arrow_reader/encryption_async.rs | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/parquet/src/arrow/async_reader/store.rs b/parquet/src/arrow/async_reader/store.rs index 042631d551b0..6f15776602a1 100644 --- a/parquet/src/arrow/async_reader/store.rs +++ b/parquet/src/arrow/async_reader/store.rs @@ -274,7 +274,7 @@ mod tests { async fn test_simple() { let (meta, store) = get_meta_store().await; let object_reader = - ParquetObjectReader::new(store, meta.location).with_file_size(meta.size); + ParquetObjectReader::new(store, meta.location).with_file_size(meta.size.try_into().unwrap()); let builder = ParquetRecordBatchStreamBuilder::new(object_reader) .await @@ -305,7 +305,7 @@ mod tests { meta.location = Path::from("I don't exist.parquet"); let object_reader = - ParquetObjectReader::new(store, meta.location).with_file_size(meta.size); + ParquetObjectReader::new(store, meta.location).with_file_size(meta.size.try_into().unwrap()); // Cannot use unwrap_err as ParquetRecordBatchStreamBuilder: !Debug match ParquetRecordBatchStreamBuilder::new(object_reader).await { Ok(_) => panic!("expected failure"), @@ -339,7 +339,7 @@ mod tests { let initial_actions = num_actions.load(Ordering::Relaxed); let reader = ParquetObjectReader::new(store, meta.location) - .with_file_size(meta.size) + .with_file_size(meta.size.try_into().unwrap()) .with_runtime(rt.handle().clone()); let builder = ParquetRecordBatchStreamBuilder::new(reader).await.unwrap(); @@ -367,7 +367,7 @@ mod tests { let (meta, store) = get_meta_store().await; let reader = ParquetObjectReader::new(store, meta.location) - .with_file_size(meta.size) + .with_file_size(meta.size.try_into().unwrap()) .with_runtime(rt.handle().clone()); let current_id = std::thread::current().id(); @@ -392,7 +392,7 @@ mod tests { let (meta, store) = get_meta_store().await; let mut reader = ParquetObjectReader::new(store, meta.location) - .with_file_size(meta.size) + .with_file_size(meta.size.try_into().unwrap()) .with_runtime(rt.handle().clone()); rt.shutdown_background(); diff --git a/parquet/tests/arrow_reader/encryption_async.rs b/parquet/tests/arrow_reader/encryption_async.rs index 9c9b3245823d..e29f7c403add 100644 --- a/parquet/tests/arrow_reader/encryption_async.rs +++ b/parquet/tests/arrow_reader/encryption_async.rs @@ -276,7 +276,7 @@ async fn test_read_encrypted_file_from_object_store() { .unwrap(); let options = ArrowReaderOptions::new().with_file_decryption_properties(decryption_properties); - let mut reader = ParquetObjectReader::new(store, meta.location).with_file_size(meta.size); + let mut reader = ParquetObjectReader::new(store, meta.location).with_file_size(meta.size.try_into().unwrap()); let metadata = reader.get_metadata_with_options(&options).await.unwrap(); let builder = ParquetRecordBatchStreamBuilder::new_with_options(reader, options) .await From 020c444511f063db70047fed1e755cb35f3310ad Mon Sep 17 00:00:00 2001 From: Matthijs Brobbel Date: Mon, 31 Mar 2025 17:17:05 +0200 Subject: [PATCH 3/4] Rustfmt --- parquet/src/arrow/async_reader/store.rs | 8 ++++---- parquet/tests/arrow_reader/encryption_async.rs | 3 ++- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/parquet/src/arrow/async_reader/store.rs b/parquet/src/arrow/async_reader/store.rs index 6f15776602a1..b16b83a5d57d 100644 --- a/parquet/src/arrow/async_reader/store.rs +++ b/parquet/src/arrow/async_reader/store.rs @@ -273,8 +273,8 @@ mod tests { #[tokio::test] async fn test_simple() { let (meta, store) = get_meta_store().await; - let object_reader = - ParquetObjectReader::new(store, meta.location).with_file_size(meta.size.try_into().unwrap()); + let object_reader = ParquetObjectReader::new(store, meta.location) + .with_file_size(meta.size.try_into().unwrap()); let builder = ParquetRecordBatchStreamBuilder::new(object_reader) .await @@ -304,8 +304,8 @@ mod tests { let (mut meta, store) = get_meta_store().await; meta.location = Path::from("I don't exist.parquet"); - let object_reader = - ParquetObjectReader::new(store, meta.location).with_file_size(meta.size.try_into().unwrap()); + let object_reader = ParquetObjectReader::new(store, meta.location) + .with_file_size(meta.size.try_into().unwrap()); // Cannot use unwrap_err as ParquetRecordBatchStreamBuilder: !Debug match ParquetRecordBatchStreamBuilder::new(object_reader).await { Ok(_) => panic!("expected failure"), diff --git a/parquet/tests/arrow_reader/encryption_async.rs b/parquet/tests/arrow_reader/encryption_async.rs index e29f7c403add..68313cb9c877 100644 --- a/parquet/tests/arrow_reader/encryption_async.rs +++ b/parquet/tests/arrow_reader/encryption_async.rs @@ -276,7 +276,8 @@ async fn test_read_encrypted_file_from_object_store() { .unwrap(); let options = ArrowReaderOptions::new().with_file_decryption_properties(decryption_properties); - let mut reader = ParquetObjectReader::new(store, meta.location).with_file_size(meta.size.try_into().unwrap()); + let mut reader = ParquetObjectReader::new(store, meta.location) + .with_file_size(meta.size.try_into().unwrap()); let metadata = reader.get_metadata_with_options(&options).await.unwrap(); let builder = ParquetRecordBatchStreamBuilder::new_with_options(reader, options) .await From 22e875ae32deb9b9643636f14267cd8e6d7805b4 Mon Sep 17 00:00:00 2001 From: Matthijs Brobbel Date: Mon, 31 Mar 2025 18:23:21 +0200 Subject: [PATCH 4/4] Fix doctest --- parquet/src/arrow/async_reader/store.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/src/arrow/async_reader/store.rs b/parquet/src/arrow/async_reader/store.rs index b16b83a5d57d..b876bca5c5c6 100644 --- a/parquet/src/arrow/async_reader/store.rs +++ b/parquet/src/arrow/async_reader/store.rs @@ -46,7 +46,7 @@ use tokio::runtime::Handle; /// println!("Found Blob with {}B at {}", meta.size, meta.location); /// /// // Show Parquet metadata -/// let reader = ParquetObjectReader::new(storage_container, meta.location).with_file_size(meta.size); +/// let reader = ParquetObjectReader::new(storage_container, meta.location).with_file_size(meta.size.try_into().unwrap()); /// let builder = ParquetRecordBatchStreamBuilder::new(reader).await.unwrap(); /// print_parquet_metadata(&mut stdout(), builder.metadata()); /// # }