Skip to content

Commit d1c78c2

Browse files
committed
Add ArrowToParquetSchemaConverter, deprecate arrow_to_parquet_schema et al
1 parent 93ce75c commit d1c78c2

File tree

4 files changed

+137
-41
lines changed

4 files changed

+137
-41
lines changed

parquet/src/arrow/arrow_writer/mod.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,10 @@ use arrow_array::types::*;
3030
use arrow_array::{ArrayRef, RecordBatch, RecordBatchWriter};
3131
use arrow_schema::{ArrowError, DataType as ArrowDataType, Field, IntervalUnit, SchemaRef};
3232

33-
use super::schema::{
34-
add_encoded_arrow_schema_to_metadata, arrow_to_parquet_schema,
35-
arrow_to_parquet_schema_with_root, decimal_length_from_precision,
36-
};
33+
use super::schema::{add_encoded_arrow_schema_to_metadata, decimal_length_from_precision};
3734

3835
use crate::arrow::arrow_writer::byte_array::ByteArrayEncoder;
36+
use crate::arrow::ArrowToParquetSchemaConverter;
3937
use crate::column::page::{CompressedPage, PageWriteSpec, PageWriter};
4038
use crate::column::writer::encoder::ColumnValueEncoder;
4139
use crate::column::writer::{
@@ -181,10 +179,12 @@ impl<W: Write + Send> ArrowWriter<W> {
181179
options: ArrowWriterOptions,
182180
) -> Result<Self> {
183181
let mut props = options.properties;
184-
let schema = match options.schema_root {
185-
Some(s) => arrow_to_parquet_schema_with_root(&arrow_schema, &s, props.coerce_types())?,
186-
None => arrow_to_parquet_schema(&arrow_schema, props.coerce_types())?,
187-
};
182+
let mut converter = ArrowToParquetSchemaConverter::new(&arrow_schema)
183+
.with_coerce_types(props.coerce_types());
184+
if let Some(s) = &options.schema_root {
185+
converter = converter.schema_root(s);
186+
}
187+
let schema = converter.build()?;
188188
if !options.skip_arrow_metadata {
189189
// add serialized arrow schema
190190
add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props);

parquet/src/arrow/mod.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,9 +116,13 @@ pub use self::async_writer::AsyncArrowWriter;
116116
use crate::schema::types::SchemaDescriptor;
117117
use arrow_schema::{FieldRef, Schema};
118118

119+
// continue to until functions are removed
120+
#[allow(deprecated)]
121+
pub use self::schema::arrow_to_parquet_schema;
122+
119123
pub use self::schema::{
120-
arrow_to_parquet_schema, parquet_to_arrow_field_levels, parquet_to_arrow_schema,
121-
parquet_to_arrow_schema_by_columns, FieldLevels,
124+
parquet_to_arrow_field_levels, parquet_to_arrow_schema, parquet_to_arrow_schema_by_columns,
125+
ArrowToParquetSchemaConverter, FieldLevels,
122126
};
123127

124128
/// Schema metadata key used to store serialized Arrow IPC schema

parquet/src/arrow/schema/mod.rs

Lines changed: 115 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -225,29 +225,121 @@ pub(crate) fn add_encoded_arrow_schema_to_metadata(schema: &Schema, props: &mut
225225
}
226226
}
227227

228+
/// Converter for arrow schema to parquet schema
229+
///
230+
/// Example:
231+
/// ```
232+
/// # use std::sync::Arc;
233+
/// use arrow_schema::{Field, Schema, DataType};
234+
/// use parquet::arrow::ArrowToParquetSchemaConverter;
235+
/// use parquet::schema::types::{SchemaDescriptor, Type};
236+
/// use parquet::basic;
237+
/// let arrow_schema = Schema::new(vec![
238+
/// Field::new("a", DataType::Int64, true),
239+
/// Field::new("b", DataType::Date32, true),
240+
/// ]);
241+
///
242+
/// let parquet_schema = ArrowToParquetSchemaConverter::new(&arrow_schema)
243+
/// .build()
244+
/// .unwrap();
245+
/// //
246+
/// let expected_parquet_schema = SchemaDescriptor::new(
247+
/// Arc::new(
248+
/// Type::group_type_builder("arrow_schema")
249+
/// .with_fields(vec![
250+
/// Arc::new(
251+
/// Type::primitive_type_builder("a", basic::Type::INT64)
252+
/// .build().unwrap()
253+
/// ),
254+
/// Arc::new(
255+
/// Type::primitive_type_builder("b", basic::Type::INT32)
256+
/// .with_converted_type(basic::ConvertedType::DATE)
257+
/// .with_logical_type(Some(basic::LogicalType::Date))
258+
/// .build().unwrap()
259+
/// ),
260+
/// ])
261+
/// .build().unwrap()
262+
/// )
263+
/// );
264+
///
265+
/// assert_eq!(parquet_schema, expected_parquet_schema);
266+
/// ```
267+
#[derive(Debug)]
268+
pub struct ArrowToParquetSchemaConverter<'a> {
269+
/// The schema to convert
270+
schema: &'a Schema,
271+
/// Name of the root schema in Parquet
272+
schema_root: &'a str,
273+
/// Should we Coerce arrow types to compatible Parquet types?
274+
///
275+
/// See docs on [Self::with_coerce_types]`
276+
coerce_types: bool
277+
}
278+
279+
impl <'a> ArrowToParquetSchemaConverter<'a> {
280+
/// Create a new converter
281+
pub fn new(schema: &'a Schema) -> Self {
282+
Self {
283+
schema,
284+
schema_root: "arrow_schema",
285+
coerce_types: false,
286+
}
287+
}
288+
289+
/// Should arrow types be coerced into parquet native types (default false).
290+
///
291+
/// Setting this option to `true` will result in parquet files that can be
292+
/// read by more readers, but may lose precision for arrow types such as
293+
/// [`DataType::Date64`] which have no direct corresponding Parquet type.
294+
///
295+
/// # Discussion
296+
///
297+
/// Some Arrow types such as `Date64`, `Timestamp` and `Interval` have no
298+
/// corresponding Parquet logical type. Thus, they can not be losslessly
299+
/// round-tripped when stored using the appropriate Parquet logical type.
300+
///
301+
/// For example, some Date64 values may be truncated when stored with
302+
/// parquet's native 32 bit date type.
303+
///
304+
/// By default, the arrow writer does not coerce to native parquet types. It
305+
/// writes data in such a way that it can be lossless round tripped.
306+
/// However, this means downstream readers must be aware of and correctly
307+
/// interpret the embedded Arrow schema.
308+
pub fn with_coerce_types(mut self, coerce_types: bool) -> Self {
309+
self.coerce_types = coerce_types;
310+
self
311+
}
312+
313+
/// Set the root schema element name (defaults to `"arrow_schema"`).
314+
pub fn schema_root(mut self, schema_root: &'a str) -> Self {
315+
self.schema_root = schema_root;
316+
self
317+
}
318+
319+
/// Build the desired parquet [`SchemaDescriptor`]
320+
pub fn build(self) -> Result<SchemaDescriptor> {
321+
let Self { schema, schema_root: root_schema_name, coerce_types } = self;
322+
let fields = schema
323+
.fields()
324+
.iter()
325+
.map(|field| arrow_to_parquet_type(field, coerce_types).map(Arc::new))
326+
.collect::<Result<_>>()?;
327+
let group = Type::group_type_builder(root_schema_name).with_fields(fields).build()?;
328+
Ok(SchemaDescriptor::new(Arc::new(group)))
329+
}
330+
}
331+
228332
/// Convert arrow schema to parquet schema
229333
///
230334
/// The name of the root schema element defaults to `"arrow_schema"`, this can be
231335
/// overridden with [`arrow_to_parquet_schema_with_root`]
232-
pub fn arrow_to_parquet_schema(schema: &Schema, coerce_types: bool) -> Result<SchemaDescriptor> {
233-
arrow_to_parquet_schema_with_root(schema, "arrow_schema", coerce_types)
234-
}
336+
#[deprecated(since = "54.0.0", note = "Use `ArrowToParquetSchemaConverter` instead")]
337+
pub fn arrow_to_parquet_schema(schema: &Schema) -> Result<SchemaDescriptor> {
235338

236-
/// Convert arrow schema to parquet schema specifying the name of the root schema element
237-
pub fn arrow_to_parquet_schema_with_root(
238-
schema: &Schema,
239-
root: &str,
240-
coerce_types: bool,
241-
) -> Result<SchemaDescriptor> {
242-
let fields = schema
243-
.fields()
244-
.iter()
245-
.map(|field| arrow_to_parquet_type(field, coerce_types).map(Arc::new))
246-
.collect::<Result<_>>()?;
247-
let group = Type::group_type_builder(root).with_fields(fields).build()?;
248-
Ok(SchemaDescriptor::new(Arc::new(group)))
339+
ArrowToParquetSchemaConverter::new(schema).build()
249340
}
250341

342+
251343
fn parse_key_value_metadata(
252344
key_value_metadata: Option<&Vec<KeyValue>>,
253345
) -> Option<HashMap<String, String>> {
@@ -1569,7 +1661,7 @@ mod tests {
15691661
Field::new("decimal256", DataType::Decimal256(39, 2), false),
15701662
];
15711663
let arrow_schema = Schema::new(arrow_fields);
1572-
let converted_arrow_schema = arrow_to_parquet_schema(&arrow_schema, false).unwrap();
1664+
let converted_arrow_schema = ArrowToParquetSchemaConverter::new(&arrow_schema).build().unwrap();
15731665

15741666
assert_eq!(
15751667
parquet_schema.columns().len(),
@@ -1606,9 +1698,10 @@ mod tests {
16061698
false,
16071699
)];
16081700
let arrow_schema = Schema::new(arrow_fields);
1609-
let converted_arrow_schema = arrow_to_parquet_schema(&arrow_schema, true);
1701+
let converted_arrow_schema = ArrowToParquetSchemaConverter::new(&arrow_schema)
1702+
.with_coerce_types(true)
1703+
.build();
16101704

1611-
assert!(converted_arrow_schema.is_err());
16121705
converted_arrow_schema.unwrap();
16131706
}
16141707

@@ -1878,7 +1971,9 @@ mod tests {
18781971
// don't pass metadata so field ids are read from Parquet and not from serialized Arrow schema
18791972
let arrow_schema = crate::arrow::parquet_to_arrow_schema(&schema_descriptor, None)?;
18801973

1881-
let parq_schema_descr = crate::arrow::arrow_to_parquet_schema(&arrow_schema, true)?;
1974+
let parq_schema_descr = crate::arrow::ArrowToParquetSchemaConverter::new(&arrow_schema)
1975+
.with_coerce_types(true)
1976+
.build()?;
18821977
let parq_fields = parq_schema_descr.root_schema().get_fields();
18831978
assert_eq!(parq_fields.len(), 2);
18841979
assert_eq!(parq_fields[0].get_basic_info().id(), 1);

parquet/src/file/properties.rs

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,13 @@
1616
// under the License.
1717

1818
//! Configuration via [`WriterProperties`] and [`ReaderProperties`]
19-
use std::str::FromStr;
20-
use std::{collections::HashMap, sync::Arc};
21-
2219
use crate::basic::{Compression, Encoding};
2320
use crate::compression::{CodecOptions, CodecOptionsBuilder};
2421
use crate::file::metadata::KeyValue;
2522
use crate::format::SortingColumn;
2623
use crate::schema::types::ColumnPath;
24+
use std::str::FromStr;
25+
use std::{collections::HashMap, sync::Arc};
2726

2827
/// Default value for [`WriterProperties::data_page_size_limit`]
2928
pub const DEFAULT_PAGE_SIZE: usize = 1024 * 1024;
@@ -287,15 +286,13 @@ impl WriterProperties {
287286
self.statistics_truncate_length
288287
}
289288

290-
/// Returns `coerce_types` boolean
289+
/// Should the writer coerce types to parquet native types.
290+
///
291+
/// Setting this option to `true` will result in parquet files that can be
292+
/// read by more readers, but may lose precision for arrow types such as
293+
/// [`DataType::Date64`] which have no direct corresponding Parquet type.
291294
///
292-
/// Some Arrow types do not have a corresponding Parquet logical type.
293-
/// Affected Arrow data types include `Date64`, `Timestamp` and `Interval`.
294-
/// Writers have the option to coerce these into native Parquet types. Type
295-
/// coercion allows for meaningful representations that do not require
296-
/// downstream readers to consider the embedded Arrow schema. However, type
297-
/// coercion also prevents the data from being losslessly round-tripped. This method
298-
/// returns `true` if type coercion enabled.
295+
/// See [`ArrowToParquetSchemaConverter::with_coerce_types`] for more details
299296
pub fn coerce_types(&self) -> bool {
300297
self.coerce_types
301298
}

0 commit comments

Comments
 (0)