Skip to content

Commit abce0e9

Browse files
goldmedalalamb
authored andcommitted
introduce binary_as_string parquet option
1 parent 711bf93 commit abce0e9

File tree

16 files changed

+452
-154
lines changed

16 files changed

+452
-154
lines changed

benchmarks/src/clickbench.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -116,12 +116,15 @@ impl RunOpt {
116116
None => queries.min_query_id()..=queries.max_query_id(),
117117
};
118118

119+
// configure parquet options
119120
let mut config = self.common.config();
120-
config
121-
.options_mut()
122-
.execution
123-
.parquet
124-
.schema_force_view_types = self.common.force_view_types;
121+
{
122+
let parquet_options = &mut config.options_mut().execution.parquet;
123+
parquet_options.schema_force_view_types = self.common.force_view_types;
124+
// The hits_partitioned dataset specifies string columns
125+
// as binary due to how it was written. Force it to strings
126+
parquet_options.binary_as_string = true;
127+
}
125128

126129
let ctx = SessionContext::new_with_config(config);
127130
self.register_hits(&ctx).await?;
@@ -149,7 +152,7 @@ impl RunOpt {
149152
Ok(())
150153
}
151154

152-
/// Registrs the `hits.parquet` as a table named `hits`
155+
/// Registers the `hits.parquet` as a table named `hits`
153156
async fn register_hits(&self, ctx: &SessionContext) -> Result<()> {
154157
let options = Default::default();
155158
let path = self.path.as_os_str().to_str().unwrap();

datafusion/common/src/config.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,14 @@ config_namespace! {
384384
/// and `Binary/BinaryLarge` with `BinaryView`.
385385
pub schema_force_view_types: bool, default = false
386386

387+
/// (reading) If true, parquet reader will read columns of
388+
/// `Binary/LargeBinary` with `Utf8`, and `BinaryView` with `Utf8View`.
389+
///
390+
/// Parquet files generated by some legacy writers do not correctly set
391+
/// the UTF8 flag for strings, causing string columns to be loaded as
392+
/// BLOB instead.
393+
pub binary_as_string: bool, default = false
394+
387395
// The following options affect writing to parquet files
388396
// and map to parquet::file::properties::WriterProperties
389397

datafusion/common/src/file_options/parquet_writer.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ impl ParquetOptions {
176176
maximum_buffered_record_batches_per_stream: _,
177177
bloom_filter_on_read: _, // reads not used for writer props
178178
schema_force_view_types: _,
179+
binary_as_string: _, // not used for writer props
179180
} = self;
180181

181182
let mut builder = WriterProperties::builder()
@@ -442,6 +443,7 @@ mod tests {
442443
.maximum_buffered_record_batches_per_stream,
443444
bloom_filter_on_read: defaults.bloom_filter_on_read,
444445
schema_force_view_types: defaults.schema_force_view_types,
446+
binary_as_string: defaults.binary_as_string,
445447
}
446448
}
447449

@@ -543,6 +545,7 @@ mod tests {
543545
.maximum_buffered_record_batches_per_stream,
544546
bloom_filter_on_read: global_options_defaults.bloom_filter_on_read,
545547
schema_force_view_types: global_options_defaults.schema_force_view_types,
548+
binary_as_string: global_options_defaults.binary_as_string,
546549
},
547550
column_specific_options,
548551
key_value_metadata,

datafusion/core/src/datasource/file_format/mod.rs

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,87 @@ pub(crate) fn coerce_file_schema_to_view_type(
302302
))
303303
}
304304

305+
/// Transform a schema to force binary types to be strings
306+
pub fn transform_binary_to_string(schema: &Schema) -> Schema {
307+
let transformed_fields: Vec<Arc<Field>> = schema
308+
.fields
309+
.iter()
310+
.map(|field| match field.data_type() {
311+
DataType::Binary => Arc::new(
312+
Field::new(field.name(), DataType::Utf8, field.is_nullable())
313+
.with_metadata(field.metadata().to_owned()),
314+
),
315+
DataType::LargeBinary => Arc::new(
316+
Field::new(field.name(), DataType::LargeUtf8, field.is_nullable())
317+
.with_metadata(field.metadata().to_owned()),
318+
),
319+
_ => field.clone(),
320+
})
321+
.collect();
322+
Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
323+
}
324+
325+
/// If the table schema uses a string type, coerce the file schema to use a string type.
326+
pub(crate) fn coerce_file_schema_to_string_type(
327+
table_schema: &Schema,
328+
file_schema: &Schema,
329+
) -> Option<Schema> {
330+
let mut transform = false;
331+
let table_fields: HashMap<_, _> = table_schema
332+
.fields
333+
.iter()
334+
.map(|f| (f.name(), f.data_type()))
335+
.collect();
336+
let transformed_fields: Vec<Arc<Field>> = file_schema
337+
.fields
338+
.iter()
339+
.map(
340+
|field| match (table_fields.get(field.name()), field.data_type()) {
341+
(Some(DataType::Utf8), DataType::Binary) => {
342+
transform = true;
343+
Arc::new(Field::new(
344+
field.name(),
345+
DataType::Utf8,
346+
field.is_nullable(),
347+
))
348+
}
349+
(Some(DataType::LargeUtf8), DataType::LargeBinary) => {
350+
transform = true;
351+
Arc::new(Field::new(
352+
field.name(),
353+
DataType::LargeUtf8,
354+
field.is_nullable(),
355+
))
356+
}
357+
// If `schema_force_view_types` is enabled, the actual data could be `Binary` or `LargeBinary`
358+
// because we will first change the table schema for binary-to-string coercion, then apply the
359+
// string-to-view transformation. So we need all binary types to be coerced to `Utf8View` here.
360+
(
361+
Some(DataType::Utf8View),
362+
DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
363+
) => {
364+
transform = true;
365+
Arc::new(Field::new(
366+
field.name(),
367+
DataType::Utf8View,
368+
field.is_nullable(),
369+
))
370+
}
371+
_ => field.clone(),
372+
},
373+
)
374+
.collect();
375+
376+
if !transform {
377+
None
378+
} else {
379+
Some(Schema::new_with_metadata(
380+
transformed_fields,
381+
file_schema.metadata.clone(),
382+
))
383+
}
384+
}
385+
305386
#[cfg(test)]
306387
pub(crate) mod test_util {
307388
use std::ops::Range;

datafusion/core/src/datasource/file_format/parquet.rs

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,9 @@ use std::sync::Arc;
2626
use super::write::demux::start_demuxer_task;
2727
use super::write::{create_writer, SharedBuffer};
2828
use super::{
29-
coerce_file_schema_to_view_type, transform_schema_to_view, FileFormat,
30-
FileFormatFactory, FilePushdownSupport, FileScanConfig,
29+
coerce_file_schema_to_string_type, coerce_file_schema_to_view_type,
30+
transform_binary_to_string, transform_schema_to_view, FileFormat, FileFormatFactory,
31+
FilePushdownSupport, FileScanConfig,
3132
};
3233
use crate::arrow::array::RecordBatch;
3334
use crate::arrow::datatypes::{Fields, Schema, SchemaRef};
@@ -260,6 +261,19 @@ impl ParquetFormat {
260261
self.options.global.schema_force_view_types = use_views;
261262
self
262263
}
264+
265+
/// Return `true` if binary type will be read as string.
266+
pub fn binary_as_string(&self) -> bool {
267+
self.options.global.binary_as_string
268+
}
269+
270+
/// If true, will read binary type as string.
271+
///
272+
/// Refer to [`Self::binary_as_string`].
273+
pub fn with_binary_as_string(mut self, binary_as_string: bool) -> Self {
274+
self.options.global.binary_as_string = binary_as_string;
275+
self
276+
}
263277
}
264278

265279
/// Clears all metadata (Schema level and field level) on an iterator
@@ -350,6 +364,12 @@ impl FileFormat for ParquetFormat {
350364
Schema::try_merge(schemas)
351365
}?;
352366

367+
let schema = if self.binary_as_string() {
368+
transform_binary_to_string(&schema)
369+
} else {
370+
schema
371+
};
372+
353373
let schema = if self.force_view_types() {
354374
transform_schema_to_view(&schema)
355375
} else {
@@ -552,6 +572,10 @@ pub fn statistics_from_parquet_meta_calc(
552572
file_metadata.schema_descr(),
553573
file_metadata.key_value_metadata(),
554574
)?;
575+
if let Some(merged) = coerce_file_schema_to_string_type(&table_schema, &file_schema) {
576+
file_schema = merged;
577+
}
578+
555579
if let Some(merged) = coerce_file_schema_to_view_type(&table_schema, &file_schema) {
556580
file_schema = merged;
557581
}

datafusion/core/src/datasource/physical_plan/parquet/opener.rs

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717

1818
//! [`ParquetOpener`] for opening Parquet files
1919
20-
use crate::datasource::file_format::coerce_file_schema_to_view_type;
20+
use crate::datasource::file_format::{
21+
coerce_file_schema_to_string_type, coerce_file_schema_to_view_type,
22+
};
2123
use crate::datasource::physical_plan::parquet::page_filter::PagePruningAccessPlanFilter;
2224
use crate::datasource::physical_plan::parquet::row_group_filter::RowGroupAccessPlanFilter;
2325
use crate::datasource::physical_plan::parquet::{
@@ -80,7 +82,7 @@ pub(super) struct ParquetOpener {
8082
}
8183

8284
impl FileOpener for ParquetOpener {
83-
fn open(&self, file_meta: FileMeta) -> datafusion_common::Result<FileOpenFuture> {
85+
fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture> {
8486
let file_range = file_meta.range.clone();
8587
let extensions = file_meta.extensions.clone();
8688
let file_name = file_meta.location().to_string();
@@ -121,7 +123,14 @@ impl FileOpener for ParquetOpener {
121123
let mut metadata_timer = file_metrics.metadata_load_time.timer();
122124
let metadata =
123125
ArrowReaderMetadata::load_async(&mut reader, options.clone()).await?;
124-
let mut schema = metadata.schema().clone();
126+
let mut schema = Arc::clone(metadata.schema());
127+
128+
if let Some(merged) =
129+
coerce_file_schema_to_string_type(&table_schema, &schema)
130+
{
131+
schema = Arc::new(merged);
132+
}
133+
125134
// read with view types
126135
if let Some(merged) = coerce_file_schema_to_view_type(&table_schema, &schema)
127136
{
@@ -130,16 +139,16 @@ impl FileOpener for ParquetOpener {
130139

131140
let options = ArrowReaderOptions::new()
132141
.with_page_index(enable_page_index)
133-
.with_schema(schema.clone());
142+
.with_schema(Arc::clone(&schema));
134143
let metadata =
135-
ArrowReaderMetadata::try_new(metadata.metadata().clone(), options)?;
144+
ArrowReaderMetadata::try_new(Arc::clone(metadata.metadata()), options)?;
136145

137146
metadata_timer.stop();
138147

139148
let mut builder =
140149
ParquetRecordBatchStreamBuilder::new_with_metadata(reader, metadata);
141150

142-
let file_schema = builder.schema().clone();
151+
let file_schema = Arc::clone(builder.schema());
143152

144153
let (schema_mapping, adapted_projections) =
145154
schema_adapter.map_schema(&file_schema)?;
@@ -177,7 +186,7 @@ impl FileOpener for ParquetOpener {
177186

178187
// Determine which row groups to actually read. The idea is to skip
179188
// as many row groups as possible based on the metadata and query
180-
let file_metadata = builder.metadata().clone();
189+
let file_metadata = Arc::clone(builder.metadata());
181190
let predicate = pruning_predicate.as_ref().map(|p| p.as_ref());
182191
let rg_metadata = file_metadata.row_groups();
183192
// track which row groups to actually read

datafusion/proto-common/proto/datafusion_common.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -494,6 +494,7 @@ message ParquetOptions {
494494
bool bloom_filter_on_read = 26; // default = true
495495
bool bloom_filter_on_write = 27; // default = false
496496
bool schema_force_view_types = 28; // default = false
497+
bool binary_as_string = 29; // default = false
497498

498499
oneof metadata_size_hint_opt {
499500
uint64 metadata_size_hint = 4;

datafusion/proto-common/src/from_proto/mod.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -897,7 +897,7 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions {
897897
pruning: value.pruning,
898898
skip_metadata: value.skip_metadata,
899899
metadata_size_hint: value
900-
.metadata_size_hint_opt.clone()
900+
.metadata_size_hint_opt
901901
.map(|opt| match opt {
902902
protobuf::parquet_options::MetadataSizeHintOpt::MetadataSizeHint(v) => Some(v as usize),
903903
})
@@ -958,6 +958,7 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions {
958958
maximum_parallel_row_group_writers: value.maximum_parallel_row_group_writers as usize,
959959
maximum_buffered_record_batches_per_stream: value.maximum_buffered_record_batches_per_stream as usize,
960960
schema_force_view_types: value.schema_force_view_types,
961+
binary_as_string: value.binary_as_string,
961962
})
962963
}
963964
}
@@ -979,7 +980,7 @@ impl TryFrom<&protobuf::ParquetColumnOptions> for ParquetColumnOptions {
979980
})
980981
.unwrap_or(None),
981982
max_statistics_size: value
982-
.max_statistics_size_opt.clone()
983+
.max_statistics_size_opt
983984
.map(|opt| match opt {
984985
protobuf::parquet_column_options::MaxStatisticsSizeOpt::MaxStatisticsSize(v) => Some(v as usize),
985986
})
@@ -990,18 +991,18 @@ impl TryFrom<&protobuf::ParquetColumnOptions> for ParquetColumnOptions {
990991
protobuf::parquet_column_options::EncodingOpt::Encoding(v) => Some(v),
991992
})
992993
.unwrap_or(None),
993-
bloom_filter_enabled: value.bloom_filter_enabled_opt.clone().map(|opt| match opt {
994+
bloom_filter_enabled: value.bloom_filter_enabled_opt.map(|opt| match opt {
994995
protobuf::parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled(v) => Some(v),
995996
})
996997
.unwrap_or(None),
997998
bloom_filter_fpp: value
998-
.bloom_filter_fpp_opt.clone()
999+
.bloom_filter_fpp_opt
9991000
.map(|opt| match opt {
10001001
protobuf::parquet_column_options::BloomFilterFppOpt::BloomFilterFpp(v) => Some(v),
10011002
})
10021003
.unwrap_or(None),
10031004
bloom_filter_ndv: value
1004-
.bloom_filter_ndv_opt.clone()
1005+
.bloom_filter_ndv_opt
10051006
.map(|opt| match opt {
10061007
protobuf::parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv(v) => Some(v),
10071008
})

0 commit comments

Comments
 (0)