Skip to content

Commit 07a754c

Browse files
committed
Add ArrowToParquetSchemaConverter, deprecate arrow_to_parquet_schema et al
1 parent 93ce75c commit 07a754c

File tree

4 files changed

+115
-41
lines changed

4 files changed

+115
-41
lines changed

parquet/src/arrow/arrow_writer/mod.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,10 @@ use arrow_array::types::*;
3030
use arrow_array::{ArrayRef, RecordBatch, RecordBatchWriter};
3131
use arrow_schema::{ArrowError, DataType as ArrowDataType, Field, IntervalUnit, SchemaRef};
3232

33-
use super::schema::{
34-
add_encoded_arrow_schema_to_metadata, arrow_to_parquet_schema,
35-
arrow_to_parquet_schema_with_root, decimal_length_from_precision,
36-
};
33+
use super::schema::{add_encoded_arrow_schema_to_metadata, decimal_length_from_precision};
3734

3835
use crate::arrow::arrow_writer::byte_array::ByteArrayEncoder;
36+
use crate::arrow::ArrowToParquetSchemaConverter;
3937
use crate::column::page::{CompressedPage, PageWriteSpec, PageWriter};
4038
use crate::column::writer::encoder::ColumnValueEncoder;
4139
use crate::column::writer::{
@@ -181,10 +179,12 @@ impl<W: Write + Send> ArrowWriter<W> {
181179
options: ArrowWriterOptions,
182180
) -> Result<Self> {
183181
let mut props = options.properties;
184-
let schema = match options.schema_root {
185-
Some(s) => arrow_to_parquet_schema_with_root(&arrow_schema, &s, props.coerce_types())?,
186-
None => arrow_to_parquet_schema(&arrow_schema, props.coerce_types())?,
187-
};
182+
let mut converter = ArrowToParquetSchemaConverter::new(&arrow_schema)
183+
.with_coerce_types(props.coerce_types());
184+
if let Some(s) = &options.schema_root {
185+
converter = converter.schema_root(s);
186+
}
187+
let schema = converter.build()?;
188188
if !options.skip_arrow_metadata {
189189
// add serialized arrow schema
190190
add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props);

parquet/src/arrow/mod.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,9 +116,13 @@ pub use self::async_writer::AsyncArrowWriter;
116116
use crate::schema::types::SchemaDescriptor;
117117
use arrow_schema::{FieldRef, Schema};
118118

119+
// continue to until functions are removed
120+
#[allow(deprecated)]
121+
pub use self::schema::arrow_to_parquet_schema;
122+
119123
pub use self::schema::{
120-
arrow_to_parquet_schema, parquet_to_arrow_field_levels, parquet_to_arrow_schema,
121-
parquet_to_arrow_schema_by_columns, FieldLevels,
124+
parquet_to_arrow_field_levels, parquet_to_arrow_schema, parquet_to_arrow_schema_by_columns,
125+
ArrowToParquetSchemaConverter, FieldLevels,
122126
};
123127

124128
/// Schema metadata key used to store serialized Arrow IPC schema

parquet/src/arrow/schema/mod.rs

Lines changed: 93 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -225,29 +225,99 @@ pub(crate) fn add_encoded_arrow_schema_to_metadata(schema: &Schema, props: &mut
225225
}
226226
}
227227

228+
/// Converter for arrow schema to parquet schema
229+
///
230+
/// Example:
231+
/// ```
232+
/// # use arrow_schema::{Field, Schema, DataType};
233+
/// use parquet::arrow::ArrowToParquetSchemaConverter;
234+
/// let schema = Schema::new(vec![
235+
/// Field::new("a", DataType::Int64, false),
236+
/// Field::new("b", DataType::Date32, false),
237+
/// ];
238+
///
239+
/// let parquet_schema = ArrowToParquetSchemaConverter::new(&schema)
240+
/// .build()
241+
/// .unwrap();
242+
///
243+
///
244+
/// ```
245+
#[derive(Debug)]
246+
pub struct ArrowToParquetSchemaConverter<'a> {
247+
/// The schema to convert
248+
schema: &'a Schema,
249+
/// Name of the root schema in Parquet
250+
schema_root: &'a str,
251+
/// Should we Coerce arrow types to compatible Parquet types?
252+
///
253+
/// See docs on [Self::with_coerce_types]`
254+
coerce_types: bool
255+
}
256+
257+
impl <'a> ArrowToParquetSchemaConverter<'a> {
258+
/// Create a new converter
259+
pub fn new(schema: &'a Schema) -> Self {
260+
Self {
261+
schema,
262+
schema_root: "arrow_schema",
263+
coerce_types: false,
264+
}
265+
}
266+
267+
/// Should arrow types be coerced into parquet native types (default false).
268+
///
269+
/// Setting this option to `true` will result in parquet files that can be
270+
/// read by more readers, but may lose precision for arrow types such as
271+
/// [`DataType::Date64`] which have no direct corresponding Parquet type.
272+
///
273+
/// # Discussion
274+
///
275+
/// Some Arrow types such as `Date64`, `Timestamp` and `Interval` have no
276+
/// corresponding Parquet logical type. Thus, they can not be losslessly
277+
/// round-tripped when stored using the appropriate Parquet logical type.
278+
///
279+
/// For example, some Date64 values may be truncated when stored with
280+
/// parquet's native 32 bit date type.
281+
///
282+
/// By default, the arrow writer does not coerce to native parquet types. It
283+
/// writes data in such a way that it can be lossless round tripped.
284+
/// However, this means downstream readers must be aware of and correctly
285+
/// interpret the embedded Arrow schema.
286+
pub fn with_coerce_types(mut self, coerce_types: bool) -> Self {
287+
self.coerce_types = coerce_types;
288+
self
289+
}
290+
291+
/// Set the root schema element name (defaults to `"arrow_schema"`).
292+
pub fn schema_root(mut self, schema_root: &'a str) -> Self {
293+
self.schema_root = schema_root;
294+
self
295+
}
296+
297+
/// Build the desired parquet [`SchemaDescriptor`]
298+
pub fn build(self) -> Result<SchemaDescriptor> {
299+
let Self { schema, schema_root: root_schema_name, coerce_types } = self;
300+
let fields = schema
301+
.fields()
302+
.iter()
303+
.map(|field| arrow_to_parquet_type(field, coerce_types).map(Arc::new))
304+
.collect::<Result<_>>()?;
305+
let group = Type::group_type_builder(root_schema_name).with_fields(fields).build()?;
306+
Ok(SchemaDescriptor::new(Arc::new(group)))
307+
}
308+
}
309+
228310
/// Convert arrow schema to parquet schema
229311
///
230312
/// The name of the root schema element defaults to `"arrow_schema"`, this can be
231313
/// overridden with [`arrow_to_parquet_schema_with_root`]
232-
pub fn arrow_to_parquet_schema(schema: &Schema, coerce_types: bool) -> Result<SchemaDescriptor> {
233-
arrow_to_parquet_schema_with_root(schema, "arrow_schema", coerce_types)
234-
}
314+
#[deprecated(since = "54.0.0", note = "Use `ArrowToParquetSchemaConverter` instead")]
315+
pub fn arrow_to_parquet_schema(schema: &Schema) -> Result<SchemaDescriptor> {
235316

236-
/// Convert arrow schema to parquet schema specifying the name of the root schema element
237-
pub fn arrow_to_parquet_schema_with_root(
238-
schema: &Schema,
239-
root: &str,
240-
coerce_types: bool,
241-
) -> Result<SchemaDescriptor> {
242-
let fields = schema
243-
.fields()
244-
.iter()
245-
.map(|field| arrow_to_parquet_type(field, coerce_types).map(Arc::new))
246-
.collect::<Result<_>>()?;
247-
let group = Type::group_type_builder(root).with_fields(fields).build()?;
248-
Ok(SchemaDescriptor::new(Arc::new(group)))
317+
ArrowToParquetSchemaConverter::new(schema).build()
249318
}
250319

320+
251321
fn parse_key_value_metadata(
252322
key_value_metadata: Option<&Vec<KeyValue>>,
253323
) -> Option<HashMap<String, String>> {
@@ -1569,7 +1639,7 @@ mod tests {
15691639
Field::new("decimal256", DataType::Decimal256(39, 2), false),
15701640
];
15711641
let arrow_schema = Schema::new(arrow_fields);
1572-
let converted_arrow_schema = arrow_to_parquet_schema(&arrow_schema, false).unwrap();
1642+
let converted_arrow_schema = ArrowToParquetSchemaConverter::new(&arrow_schema).build().unwrap();
15731643

15741644
assert_eq!(
15751645
parquet_schema.columns().len(),
@@ -1606,9 +1676,10 @@ mod tests {
16061676
false,
16071677
)];
16081678
let arrow_schema = Schema::new(arrow_fields);
1609-
let converted_arrow_schema = arrow_to_parquet_schema(&arrow_schema, true);
1679+
let converted_arrow_schema = ArrowToParquetSchemaConverter::new(&arrow_schema)
1680+
.with_coerce_types(true)
1681+
.build();
16101682

1611-
assert!(converted_arrow_schema.is_err());
16121683
converted_arrow_schema.unwrap();
16131684
}
16141685

@@ -1878,7 +1949,9 @@ mod tests {
18781949
// don't pass metadata so field ids are read from Parquet and not from serialized Arrow schema
18791950
let arrow_schema = crate::arrow::parquet_to_arrow_schema(&schema_descriptor, None)?;
18801951

1881-
let parq_schema_descr = crate::arrow::arrow_to_parquet_schema(&arrow_schema, true)?;
1952+
let parq_schema_descr = crate::arrow::ArrowToParquetSchemaConverter::new(&arrow_schema)
1953+
.with_coerce_types(true)
1954+
.build()?;
18821955
let parq_fields = parq_schema_descr.root_schema().get_fields();
18831956
assert_eq!(parq_fields.len(), 2);
18841957
assert_eq!(parq_fields[0].get_basic_info().id(), 1);

parquet/src/file/properties.rs

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,13 @@
1616
// under the License.
1717

1818
//! Configuration via [`WriterProperties`] and [`ReaderProperties`]
19-
use std::str::FromStr;
20-
use std::{collections::HashMap, sync::Arc};
21-
2219
use crate::basic::{Compression, Encoding};
2320
use crate::compression::{CodecOptions, CodecOptionsBuilder};
2421
use crate::file::metadata::KeyValue;
2522
use crate::format::SortingColumn;
2623
use crate::schema::types::ColumnPath;
24+
use std::str::FromStr;
25+
use std::{collections::HashMap, sync::Arc};
2726

2827
/// Default value for [`WriterProperties::data_page_size_limit`]
2928
pub const DEFAULT_PAGE_SIZE: usize = 1024 * 1024;
@@ -287,15 +286,13 @@ impl WriterProperties {
287286
self.statistics_truncate_length
288287
}
289288

290-
/// Returns `coerce_types` boolean
289+
/// Should the writer coerce types to parquet native types.
290+
///
291+
/// Setting this option to `true` will result in parquet files that can be
292+
/// read by more readers, but may lose precision for arrow types such as
293+
/// [`DataType::Date64`] which have no direct corresponding Parquet type.
291294
///
292-
/// Some Arrow types do not have a corresponding Parquet logical type.
293-
/// Affected Arrow data types include `Date64`, `Timestamp` and `Interval`.
294-
/// Writers have the option to coerce these into native Parquet types. Type
295-
/// coercion allows for meaningful representations that do not require
296-
/// downstream readers to consider the embedded Arrow schema. However, type
297-
/// coercion also prevents the data from being losslessly round-tripped. This method
298-
/// returns `true` if type coercion enabled.
295+
/// See [`ArrowToParquetSchemaConverter::with_coerce_types`] for more details
299296
pub fn coerce_types(&self) -> bool {
300297
self.coerce_types
301298
}

0 commit comments

Comments
 (0)