Skip to content

Commit 2808625

Browse files
alambetseidltustvold
authored
Add ArrowToParquetSchemaConverter, deprecate arrow_to_parquet_schema (#6840)
* Add ArrowToParquetSchemaConverter, deprecate `arrow_to_parquet_schema` et al * Fmt * update test * Update parquet/src/arrow/schema/mod.rs Co-authored-by: Ed Seidl <[email protected]> * Apply suggestions from code review Co-authored-by: Ed Seidl <[email protected]> * Improve comments * Add more detail to WriterPropertiesBuilder docs * Update parquet/src/file/properties.rs Co-authored-by: Ed Seidl <[email protected]> * Fix some more capitalization and add a link to Parquet date spec * Update parquet/src/arrow/schema/mod.rs Co-authored-by: Raphael Taylor-Davies <[email protected]> * Revert "Update parquet/src/arrow/schema/mod.rs" This reverts commit bd4e2d5. * rename to ArrowSchemaConverter * change from build --> convert * update doc * fix fmt --------- Co-authored-by: Ed Seidl <[email protected]> Co-authored-by: Raphael Taylor-Davies <[email protected]>
1 parent 9ffa065 commit 2808625

File tree

4 files changed

+180
-63
lines changed

4 files changed

+180
-63
lines changed

parquet/src/arrow/arrow_writer/mod.rs

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,10 @@ use arrow_array::types::*;
3030
use arrow_array::{ArrayRef, RecordBatch, RecordBatchWriter};
3131
use arrow_schema::{ArrowError, DataType as ArrowDataType, Field, IntervalUnit, SchemaRef};
3232

33-
use super::schema::{
34-
add_encoded_arrow_schema_to_metadata, arrow_to_parquet_schema,
35-
arrow_to_parquet_schema_with_root, decimal_length_from_precision,
36-
};
33+
use super::schema::{add_encoded_arrow_schema_to_metadata, decimal_length_from_precision};
3734

3835
use crate::arrow::arrow_writer::byte_array::ByteArrayEncoder;
36+
use crate::arrow::ArrowSchemaConverter;
3937
use crate::column::page::{CompressedPage, PageWriteSpec, PageWriter};
4038
use crate::column::writer::encoder::ColumnValueEncoder;
4139
use crate::column::writer::{
@@ -181,10 +179,11 @@ impl<W: Write + Send> ArrowWriter<W> {
181179
options: ArrowWriterOptions,
182180
) -> Result<Self> {
183181
let mut props = options.properties;
184-
let schema = match options.schema_root {
185-
Some(s) => arrow_to_parquet_schema_with_root(&arrow_schema, &s, props.coerce_types())?,
186-
None => arrow_to_parquet_schema(&arrow_schema, props.coerce_types())?,
187-
};
182+
let mut converter = ArrowSchemaConverter::new().with_coerce_types(props.coerce_types());
183+
if let Some(schema_root) = &options.schema_root {
184+
converter = converter.schema_root(schema_root);
185+
}
186+
let schema = converter.convert(&arrow_schema)?;
188187
if !options.skip_arrow_metadata {
189188
// add serialized arrow schema
190189
add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props);
@@ -390,9 +389,9 @@ impl ArrowWriterOptions {
390389
}
391390

392391
/// Set the name of the root parquet schema element (defaults to `"arrow_schema"`)
393-
pub fn with_schema_root(self, name: String) -> Self {
392+
pub fn with_schema_root(self, schema_root: String) -> Self {
394393
Self {
395-
schema_root: Some(name),
394+
schema_root: Some(schema_root),
396395
..self
397396
}
398397
}
@@ -538,7 +537,7 @@ impl ArrowColumnChunk {
538537
/// # use std::sync::Arc;
539538
/// # use arrow_array::*;
540539
/// # use arrow_schema::*;
541-
/// # use parquet::arrow::arrow_to_parquet_schema;
540+
/// # use parquet::arrow::ArrowSchemaConverter;
542541
/// # use parquet::arrow::arrow_writer::{ArrowLeafColumn, compute_leaves, get_column_writers};
543542
/// # use parquet::file::properties::WriterProperties;
544543
/// # use parquet::file::writer::SerializedFileWriter;
@@ -550,7 +549,10 @@ impl ArrowColumnChunk {
550549
///
551550
/// // Compute the parquet schema
552551
/// let props = Arc::new(WriterProperties::default());
553-
/// let parquet_schema = arrow_to_parquet_schema(schema.as_ref(), props.coerce_types()).unwrap();
552+
/// let parquet_schema = ArrowSchemaConverter::new()
553+
/// .with_coerce_types(props.coerce_types())
554+
/// .convert(&schema)
555+
/// .unwrap();
554556
///
555557
/// // Create writers for each of the leaf columns
556558
/// let col_writers = get_column_writers(&parquet_schema, &props, &schema).unwrap();

parquet/src/arrow/mod.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,9 +116,13 @@ pub use self::async_writer::AsyncArrowWriter;
116116
use crate::schema::types::SchemaDescriptor;
117117
use arrow_schema::{FieldRef, Schema};
118118

119+
// continue to export deprecated methods until they are removed
120+
#[allow(deprecated)]
121+
pub use self::schema::arrow_to_parquet_schema;
122+
119123
pub use self::schema::{
120-
arrow_to_parquet_schema, parquet_to_arrow_field_levels, parquet_to_arrow_schema,
121-
parquet_to_arrow_schema_by_columns, FieldLevels,
124+
parquet_to_arrow_field_levels, parquet_to_arrow_schema, parquet_to_arrow_schema_by_columns,
125+
ArrowSchemaConverter, FieldLevels,
122126
};
123127

124128
/// Schema metadata key used to store serialized Arrow IPC schema

parquet/src/arrow/schema/mod.rs

Lines changed: 141 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
//! Provides API for converting parquet schema to arrow schema and vice versa.
19-
//!
20-
//! The main interfaces for converting parquet schema to arrow schema are
21-
//! `parquet_to_arrow_schema`, `parquet_to_arrow_schema_by_columns` and
22-
//! `parquet_to_arrow_field`.
23-
//!
24-
//! The interfaces for converting arrow schema to parquet schema is coming.
18+
//! Converting Parquet schema <--> Arrow schema: [`ArrowSchemaConverter`] and [parquet_to_arrow_schema]
2519
2620
use base64::prelude::BASE64_STANDARD;
2721
use base64::Engine;
@@ -226,27 +220,134 @@ pub(crate) fn add_encoded_arrow_schema_to_metadata(schema: &Schema, props: &mut
226220
}
227221
}
228222

229-
/// Convert arrow schema to parquet schema
223+
/// Converter for Arrow schema to Parquet schema
230224
///
231-
/// The name of the root schema element defaults to `"arrow_schema"`, this can be
232-
/// overridden with [`arrow_to_parquet_schema_with_root`]
233-
pub fn arrow_to_parquet_schema(schema: &Schema, coerce_types: bool) -> Result<SchemaDescriptor> {
234-
arrow_to_parquet_schema_with_root(schema, "arrow_schema", coerce_types)
225+
/// Example:
226+
/// ```
227+
/// # use std::sync::Arc;
228+
/// # use arrow_schema::{Field, Schema, DataType};
229+
/// # use parquet::arrow::ArrowSchemaConverter;
230+
/// use parquet::schema::types::{SchemaDescriptor, Type};
231+
/// use parquet::basic; // note there are two `Type`s in the following example
232+
/// // create an Arrow Schema
233+
/// let arrow_schema = Schema::new(vec![
234+
/// Field::new("a", DataType::Int64, true),
235+
/// Field::new("b", DataType::Date32, true),
236+
/// ]);
237+
/// // convert the Arrow schema to a Parquet schema
238+
/// let parquet_schema = ArrowSchemaConverter::new()
239+
/// .convert(&arrow_schema)
240+
/// .unwrap();
241+
///
242+
/// let expected_parquet_schema = SchemaDescriptor::new(
243+
/// Arc::new(
244+
/// Type::group_type_builder("arrow_schema")
245+
/// .with_fields(vec![
246+
/// Arc::new(
247+
/// Type::primitive_type_builder("a", basic::Type::INT64)
248+
/// .build().unwrap()
249+
/// ),
250+
/// Arc::new(
251+
/// Type::primitive_type_builder("b", basic::Type::INT32)
252+
/// .with_converted_type(basic::ConvertedType::DATE)
253+
/// .with_logical_type(Some(basic::LogicalType::Date))
254+
/// .build().unwrap()
255+
/// ),
256+
/// ])
257+
/// .build().unwrap()
258+
/// )
259+
/// );
260+
/// assert_eq!(parquet_schema, expected_parquet_schema);
261+
/// ```
262+
#[derive(Debug)]
263+
pub struct ArrowSchemaConverter<'a> {
264+
/// Name of the root schema in Parquet
265+
schema_root: &'a str,
266+
/// Should we coerce Arrow types to compatible Parquet types?
267+
///
268+
/// See docs on [Self::with_coerce_types]`
269+
coerce_types: bool,
235270
}
236271

237-
/// Convert arrow schema to parquet schema specifying the name of the root schema element
238-
pub fn arrow_to_parquet_schema_with_root(
239-
schema: &Schema,
240-
root: &str,
241-
coerce_types: bool,
242-
) -> Result<SchemaDescriptor> {
243-
let fields = schema
244-
.fields()
245-
.iter()
246-
.map(|field| arrow_to_parquet_type(field, coerce_types).map(Arc::new))
247-
.collect::<Result<_>>()?;
248-
let group = Type::group_type_builder(root).with_fields(fields).build()?;
249-
Ok(SchemaDescriptor::new(Arc::new(group)))
272+
impl Default for ArrowSchemaConverter<'_> {
273+
fn default() -> Self {
274+
Self::new()
275+
}
276+
}
277+
278+
impl<'a> ArrowSchemaConverter<'a> {
279+
/// Create a new converter
280+
pub fn new() -> Self {
281+
Self {
282+
schema_root: "arrow_schema",
283+
coerce_types: false,
284+
}
285+
}
286+
287+
/// Should Arrow types be coerced into Parquet native types (default `false`).
288+
///
289+
/// Setting this option to `true` will result in Parquet files that can be
290+
/// read by more readers, but may lose precision for Arrow types such as
291+
/// [`DataType::Date64`] which have no direct [corresponding Parquet type].
292+
///
293+
/// By default, this converter does not coerce to native Parquet types. Enabling type
294+
/// coercion allows for meaningful representations that do not require
295+
/// downstream readers to consider the embedded Arrow schema, and can allow
296+
/// for greater compatibility with other Parquet implementations. However,
297+
/// type coercion also prevents data from being losslessly round-tripped.
298+
///
299+
/// # Discussion
300+
///
301+
/// Some Arrow types such as `Date64`, `Timestamp` and `Interval` have no
302+
/// corresponding Parquet logical type. Thus, they can not be losslessly
303+
/// round-tripped when stored using the appropriate Parquet logical type.
304+
/// For example, some Date64 values may be truncated when stored with
305+
/// parquet's native 32 bit date type.
306+
///
307+
/// For [`List`] and [`Map`] types, some Parquet readers expect certain
308+
/// schema elements to have specific names (earlier versions of the spec
309+
/// were somewhat ambiguous on this point). Type coercion will use the names
310+
/// prescribed by the Parquet specification, potentially losing naming
311+
/// metadata from the Arrow schema.
312+
///
313+
/// [`List`]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
314+
/// [`Map`]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#maps
315+
/// [corresponding Parquet type]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#date
316+
///
317+
pub fn with_coerce_types(mut self, coerce_types: bool) -> Self {
318+
self.coerce_types = coerce_types;
319+
self
320+
}
321+
322+
/// Set the root schema element name (defaults to `"arrow_schema"`).
323+
pub fn schema_root(mut self, schema_root: &'a str) -> Self {
324+
self.schema_root = schema_root;
325+
self
326+
}
327+
328+
/// Convert the specified Arrow [`Schema`] to the desired Parquet [`SchemaDescriptor`]
329+
///
330+
/// See example in [`ArrowSchemaConverter`]
331+
pub fn convert(&self, schema: &Schema) -> Result<SchemaDescriptor> {
332+
let fields = schema
333+
.fields()
334+
.iter()
335+
.map(|field| arrow_to_parquet_type(field, self.coerce_types).map(Arc::new))
336+
.collect::<Result<_>>()?;
337+
let group = Type::group_type_builder(self.schema_root)
338+
.with_fields(fields)
339+
.build()?;
340+
Ok(SchemaDescriptor::new(Arc::new(group)))
341+
}
342+
}
343+
344+
/// Convert arrow schema to parquet schema
345+
///
346+
/// The name of the root schema element defaults to `"arrow_schema"`, this can be
347+
/// overridden with [`ArrowSchemaConverter`]
348+
#[deprecated(since = "54.0.0", note = "Use `ArrowToParquetSchemaConverter` instead")]
349+
pub fn arrow_to_parquet_schema(schema: &Schema) -> Result<SchemaDescriptor> {
350+
ArrowSchemaConverter::new().convert(schema)
250351
}
251352

252353
fn parse_key_value_metadata(
@@ -1488,7 +1589,10 @@ mod tests {
14881589
";
14891590
let parquet_group_type = parse_message_type(message_type).unwrap();
14901591
let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1491-
let converted_arrow_schema = arrow_to_parquet_schema(&arrow_schema, true).unwrap();
1592+
let converted_arrow_schema = ArrowSchemaConverter::new()
1593+
.with_coerce_types(true)
1594+
.convert(&arrow_schema)
1595+
.unwrap();
14921596
assert_eq!(
14931597
parquet_schema.columns().len(),
14941598
converted_arrow_schema.columns().len()
@@ -1512,7 +1616,10 @@ mod tests {
15121616
";
15131617
let parquet_group_type = parse_message_type(message_type).unwrap();
15141618
let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1515-
let converted_arrow_schema = arrow_to_parquet_schema(&arrow_schema, false).unwrap();
1619+
let converted_arrow_schema = ArrowSchemaConverter::new()
1620+
.with_coerce_types(false)
1621+
.convert(&arrow_schema)
1622+
.unwrap();
15161623
assert_eq!(
15171624
parquet_schema.columns().len(),
15181625
converted_arrow_schema.columns().len()
@@ -1668,7 +1775,7 @@ mod tests {
16681775
Field::new("decimal256", DataType::Decimal256(39, 2), false),
16691776
];
16701777
let arrow_schema = Schema::new(arrow_fields);
1671-
let converted_arrow_schema = arrow_to_parquet_schema(&arrow_schema, false).unwrap();
1778+
let converted_arrow_schema = ArrowSchemaConverter::new().convert(&arrow_schema).unwrap();
16721779

16731780
assert_eq!(
16741781
parquet_schema.columns().len(),
@@ -1705,9 +1812,10 @@ mod tests {
17051812
false,
17061813
)];
17071814
let arrow_schema = Schema::new(arrow_fields);
1708-
let converted_arrow_schema = arrow_to_parquet_schema(&arrow_schema, true);
1815+
let converted_arrow_schema = ArrowSchemaConverter::new()
1816+
.with_coerce_types(true)
1817+
.convert(&arrow_schema);
17091818

1710-
assert!(converted_arrow_schema.is_err());
17111819
converted_arrow_schema.unwrap();
17121820
}
17131821

@@ -1978,7 +2086,9 @@ mod tests {
19782086
// don't pass metadata so field ids are read from Parquet and not from serialized Arrow schema
19792087
let arrow_schema = crate::arrow::parquet_to_arrow_schema(&schema_descriptor, None)?;
19802088

1981-
let parq_schema_descr = crate::arrow::arrow_to_parquet_schema(&arrow_schema, true)?;
2089+
let parq_schema_descr = ArrowSchemaConverter::new()
2090+
.with_coerce_types(true)
2091+
.convert(&arrow_schema)?;
19822092
let parq_fields = parq_schema_descr.root_schema().get_fields();
19832093
assert_eq!(parq_fields.len(), 2);
19842094
assert_eq!(parq_fields[0].get_basic_info().id(), 1);

parquet/src/file/properties.rs

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,13 @@
1616
// under the License.
1717

1818
//! Configuration via [`WriterProperties`] and [`ReaderProperties`]
19-
use std::str::FromStr;
20-
use std::{collections::HashMap, sync::Arc};
21-
2219
use crate::basic::{Compression, Encoding};
2320
use crate::compression::{CodecOptions, CodecOptionsBuilder};
2421
use crate::file::metadata::KeyValue;
2522
use crate::format::SortingColumn;
2623
use crate::schema::types::ColumnPath;
24+
use std::str::FromStr;
25+
use std::{collections::HashMap, sync::Arc};
2726

2827
/// Default value for [`WriterProperties::data_page_size_limit`]
2928
pub const DEFAULT_PAGE_SIZE: usize = 1024 * 1024;
@@ -780,22 +779,24 @@ impl WriterPropertiesBuilder {
780779
self
781780
}
782781

783-
/// Sets flag to control if type coercion is enabled (defaults to `false`).
782+
/// Should the writer coerce types to parquet native types (defaults to `false`).
784783
///
785-
/// # Notes
786-
/// Some Arrow types do not have a corresponding Parquet logical type.
787-
/// Affected Arrow data types include `Date64`, `Timestamp` and `Interval`.
788-
/// Also, for [`List`] and [`Map`] types, Parquet expects certain schema elements
789-
/// to have specific names to be considered fully compliant.
790-
/// Writers have the option to coerce these types and names to match those required
791-
/// by the Parquet specification.
792-
/// This type coercion allows for meaningful representations that do not require
793-
/// downstream readers to consider the embedded Arrow schema, and can allow for greater
794-
/// compatibility with other Parquet implementations. However, type
795-
/// coercion also prevents the data from being losslessly round-tripped.
796-
///
797-
/// [`List`]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
798-
/// [`Map`]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#maps
784+
/// Leaving this option the default `false` will ensure the exact same data
785+
/// written to parquet using this library will be read.
786+
///
787+
/// Setting this option to `true` will result in parquet files that can be
788+
/// read by more readers, but potentially lose information in the process.
789+
///
790+
/// * Types such as [`DataType::Date64`], which have no direct corresponding
791+
/// Parquet type, may be stored with lower precision.
792+
///
793+
/// * The internal field names of `List` and `Map` types will be renamed if
794+
/// necessary to match what is required by the newest Parquet specification.
795+
///
796+
/// See [`ArrowToParquetSchemaConverter::with_coerce_types`] for more details
797+
///
798+
/// [`DataType::Date64`]: arrow_schema::DataType::Date64
799+
/// [`ArrowToParquetSchemaConverter::with_coerce_types`]: crate::arrow::ArrowSchemaConverter::with_coerce_types
799800
pub fn set_coerce_types(mut self, coerce_types: bool) -> Self {
800801
self.coerce_types = coerce_types;
801802
self

0 commit comments

Comments
 (0)