Skip to content

Commit b8fd432

Browse files
authored
Don't hydrate string dictionaries when writing to parquet (#1764) (#2322)
1 parent 6859efa commit b8fd432

File tree

5 files changed

+147
-107
lines changed

5 files changed

+147
-107
lines changed

arrow/src/array/array_dictionary.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -421,14 +421,25 @@ impl<T: ArrowPrimitiveType> fmt::Debug for DictionaryArray<T> {
421421
/// assert_eq!(maybe_val.unwrap(), orig)
422422
/// }
423423
/// ```
424-
#[derive(Copy, Clone)]
425424
pub struct TypedDictionaryArray<'a, K: ArrowPrimitiveType, V> {
426425
/// The dictionary array
427426
dictionary: &'a DictionaryArray<K>,
428427
/// The values of the dictionary
429428
values: &'a V,
430429
}
431430

431+
// Manually implement `Clone` to avoid `V: Clone` type constraint
432+
impl<'a, K: ArrowPrimitiveType, V> Clone for TypedDictionaryArray<'a, K, V> {
433+
fn clone(&self) -> Self {
434+
Self {
435+
dictionary: self.dictionary,
436+
values: self.values,
437+
}
438+
}
439+
}
440+
441+
impl<'a, K: ArrowPrimitiveType, V> Copy for TypedDictionaryArray<'a, K, V> {}
442+
432443
impl<'a, K: ArrowPrimitiveType, V> fmt::Debug for TypedDictionaryArray<'a, K, V> {
433444
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
434445
writeln!(f, "TypedDictionaryArray({:?})", self.dictionary)

arrow/src/util/data_gen.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,17 @@ pub fn create_random_array(
143143
})
144144
.collect::<Result<Vec<(&str, ArrayRef)>>>()?,
145145
)?),
146+
d @ Dictionary(_, value_type)
147+
if crate::compute::can_cast_types(value_type, d) =>
148+
{
149+
let f = Field::new(
150+
field.name(),
151+
value_type.as_ref().clone(),
152+
field.is_nullable(),
153+
);
154+
let v = create_random_array(&f, size, null_density, true_density)?;
155+
crate::compute::cast(&v, d)?
156+
}
146157
other => {
147158
return Err(ArrowError::NotYetImplemented(format!(
148159
"Generating random arrays not yet implemented for {:?}",

parquet/benches/arrow_writer.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,25 @@ fn create_string_bench_batch(
9292
)?)
9393
}
9494

95+
fn create_string_dictionary_bench_batch(
96+
size: usize,
97+
null_density: f32,
98+
true_density: f32,
99+
) -> Result<RecordBatch> {
100+
let fields = vec![Field::new(
101+
"_1",
102+
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
103+
true,
104+
)];
105+
let schema = Schema::new(fields);
106+
Ok(create_random_batch(
107+
Arc::new(schema),
108+
size,
109+
null_density,
110+
true_density,
111+
)?)
112+
}
113+
95114
fn create_string_bench_batch_non_null(
96115
size: usize,
97116
null_density: f32,
@@ -346,6 +365,18 @@ fn bench_primitive_writer(c: &mut Criterion) {
346365
b.iter(|| write_batch(&batch).unwrap())
347366
});
348367

368+
let batch = create_string_dictionary_bench_batch(4096, 0.25, 0.75).unwrap();
369+
group.throughput(Throughput::Bytes(
370+
batch
371+
.columns()
372+
.iter()
373+
.map(|f| f.get_array_memory_size() as u64)
374+
.sum(),
375+
));
376+
group.bench_function("4096 values string dictionary", |b| {
377+
b.iter(|| write_batch(&batch).unwrap())
378+
});
379+
349380
let batch = create_string_bench_batch_non_null(4096, 0.25, 0.75).unwrap();
350381
group.throughput(Throughput::Bytes(
351382
batch

parquet/src/arrow/arrow_writer/byte_array.rs

Lines changed: 63 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
// under the License.
1717

1818
use crate::arrow::arrow_writer::levels::LevelInfo;
19-
use crate::arrow::arrow_writer::ArrayWriter;
2019
use crate::basic::Encoding;
2120
use crate::column::page::PageWriter;
2221
use crate::column::writer::encoder::{
@@ -33,11 +32,38 @@ use crate::schema::types::ColumnDescPtr;
3332
use crate::util::bit_util::num_required_bits;
3433
use crate::util::interner::{Interner, Storage};
3534
use arrow::array::{
36-
Array, ArrayAccessor, ArrayRef, BinaryArray, LargeBinaryArray, LargeStringArray,
37-
StringArray,
35+
Array, ArrayAccessor, ArrayRef, BinaryArray, DictionaryArray, LargeBinaryArray,
36+
LargeStringArray, StringArray,
3837
};
3938
use arrow::datatypes::DataType;
4039

40+
macro_rules! downcast_dict_impl {
41+
($array:ident, $key:ident, $val:ident, $op:expr $(, $arg:expr)*) => {{
42+
$op($array
43+
.as_any()
44+
.downcast_ref::<DictionaryArray<arrow::datatypes::$key>>()
45+
.unwrap()
46+
.downcast_dict::<$val>()
47+
.unwrap()$(, $arg)*)
48+
}};
49+
}
50+
51+
macro_rules! downcast_dict_op {
52+
($key_type:expr, $val:ident, $array:ident, $op:expr $(, $arg:expr)*) => {
53+
match $key_type.as_ref() {
54+
DataType::UInt8 => downcast_dict_impl!($array, UInt8Type, $val, $op$(, $arg)*),
55+
DataType::UInt16 => downcast_dict_impl!($array, UInt16Type, $val, $op$(, $arg)*),
56+
DataType::UInt32 => downcast_dict_impl!($array, UInt32Type, $val, $op$(, $arg)*),
57+
DataType::UInt64 => downcast_dict_impl!($array, UInt64Type, $val, $op$(, $arg)*),
58+
DataType::Int8 => downcast_dict_impl!($array, Int8Type, $val, $op$(, $arg)*),
59+
DataType::Int16 => downcast_dict_impl!($array, Int16Type, $val, $op$(, $arg)*),
60+
DataType::Int32 => downcast_dict_impl!($array, Int32Type, $val, $op$(, $arg)*),
61+
DataType::Int64 => downcast_dict_impl!($array, Int64Type, $val, $op$(, $arg)*),
62+
_ => unreachable!(),
63+
}
64+
};
65+
}
66+
4167
macro_rules! downcast_op {
4268
($data_type:expr, $array:ident, $op:expr $(, $arg:expr)*) => {
4369
match $data_type {
@@ -51,36 +77,44 @@ macro_rules! downcast_op {
5177
DataType::LargeBinary => {
5278
$op($array.as_any().downcast_ref::<LargeBinaryArray>().unwrap()$(, $arg)*)
5379
}
54-
d => unreachable!("cannot downcast {} to byte array", d)
80+
DataType::Dictionary(key, value) => match value.as_ref() {
81+
DataType::Utf8 => downcast_dict_op!(key, StringArray, $array, $op$(, $arg)*),
82+
DataType::LargeUtf8 => {
83+
downcast_dict_op!(key, LargeStringArray, $array, $op$(, $arg)*)
84+
}
85+
DataType::Binary => downcast_dict_op!(key, BinaryArray, $array, $op$(, $arg)*),
86+
DataType::LargeBinary => {
87+
downcast_dict_op!(key, LargeBinaryArray, $array, $op$(, $arg)*)
88+
}
89+
d => unreachable!("cannot downcast {} dictionary value to byte array", d),
90+
},
91+
d => unreachable!("cannot downcast {} to byte array", d),
5592
}
5693
};
5794
}
5895

59-
/// Returns an [`ArrayWriter`] for byte or string arrays
60-
pub(super) fn make_byte_array_writer<'a>(
61-
descr: ColumnDescPtr,
62-
data_type: DataType,
63-
props: WriterPropertiesPtr,
64-
page_writer: Box<dyn PageWriter + 'a>,
65-
on_close: OnCloseColumnChunk<'a>,
66-
) -> Box<dyn ArrayWriter + 'a> {
67-
Box::new(ByteArrayWriter {
68-
writer: Some(GenericColumnWriter::new(descr, props, page_writer)),
69-
on_close: Some(on_close),
70-
data_type,
71-
})
72-
}
73-
74-
/// An [`ArrayWriter`] for [`ByteArray`]
75-
struct ByteArrayWriter<'a> {
76-
writer: Option<GenericColumnWriter<'a, ByteArrayEncoder>>,
96+
/// A writer for byte array types
97+
pub(super) struct ByteArrayWriter<'a> {
98+
writer: GenericColumnWriter<'a, ByteArrayEncoder>,
7799
on_close: Option<OnCloseColumnChunk<'a>>,
78-
data_type: DataType,
79100
}
80101

81-
impl<'a> ArrayWriter for ByteArrayWriter<'a> {
82-
fn write(&mut self, array: &ArrayRef, levels: LevelInfo) -> Result<()> {
83-
self.writer.as_mut().unwrap().write_batch_internal(
102+
impl<'a> ByteArrayWriter<'a> {
103+
/// Returns a new [`ByteArrayWriter`]
104+
pub fn new(
105+
descr: ColumnDescPtr,
106+
props: &'a WriterPropertiesPtr,
107+
page_writer: Box<dyn PageWriter + 'a>,
108+
on_close: OnCloseColumnChunk<'a>,
109+
) -> Result<Self> {
110+
Ok(Self {
111+
writer: GenericColumnWriter::new(descr, props.clone(), page_writer),
112+
on_close: Some(on_close),
113+
})
114+
}
115+
116+
pub fn write(&mut self, array: &ArrayRef, levels: LevelInfo) -> Result<()> {
117+
self.writer.write_batch_internal(
84118
array,
85119
Some(levels.non_null_indices()),
86120
levels.def_levels(),
@@ -92,11 +126,11 @@ impl<'a> ArrayWriter for ByteArrayWriter<'a> {
92126
Ok(())
93127
}
94128

95-
fn close(&mut self) -> Result<()> {
129+
pub fn close(self) -> Result<()> {
96130
let (bytes_written, rows_written, metadata, column_index, offset_index) =
97-
self.writer.take().unwrap().close()?;
131+
self.writer.close()?;
98132

99-
if let Some(on_close) = self.on_close.take() {
133+
if let Some(on_close) = self.on_close {
100134
on_close(
101135
bytes_written,
102136
rows_written,

parquet/src/arrow/arrow_writer/mod.rs

Lines changed: 30 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -33,70 +33,18 @@ use super::schema::{
3333
decimal_length_from_precision,
3434
};
3535

36-
use crate::column::writer::{get_column_writer, ColumnWriter, ColumnWriterImpl};
36+
use crate::arrow::arrow_writer::byte_array::ByteArrayWriter;
37+
use crate::column::writer::{ColumnWriter, ColumnWriterImpl};
3738
use crate::errors::{ParquetError, Result};
3839
use crate::file::metadata::RowGroupMetaDataPtr;
3940
use crate::file::properties::WriterProperties;
40-
use crate::file::writer::{SerializedColumnWriter, SerializedRowGroupWriter};
41+
use crate::file::writer::SerializedRowGroupWriter;
4142
use crate::{data_type::*, file::writer::SerializedFileWriter};
4243
use levels::{calculate_array_levels, LevelInfo};
4344

4445
mod byte_array;
4546
mod levels;
4647

47-
/// An object-safe API for writing an [`ArrayRef`]
48-
trait ArrayWriter {
49-
fn write(&mut self, array: &ArrayRef, levels: LevelInfo) -> Result<()>;
50-
51-
fn close(&mut self) -> Result<()>;
52-
}
53-
54-
/// Fallback implementation for writing an [`ArrayRef`] that uses [`SerializedColumnWriter`]
55-
struct ColumnArrayWriter<'a>(Option<SerializedColumnWriter<'a>>);
56-
57-
impl<'a> ArrayWriter for ColumnArrayWriter<'a> {
58-
fn write(&mut self, array: &ArrayRef, levels: LevelInfo) -> Result<()> {
59-
write_leaf(self.0.as_mut().unwrap().untyped(), array, levels)?;
60-
Ok(())
61-
}
62-
63-
fn close(&mut self) -> Result<()> {
64-
self.0.take().unwrap().close()
65-
}
66-
}
67-
68-
fn get_writer<'a, W: Write>(
69-
row_group_writer: &'a mut SerializedRowGroupWriter<'_, W>,
70-
data_type: &ArrowDataType,
71-
) -> Result<Box<dyn ArrayWriter + 'a>> {
72-
let array_writer = row_group_writer
73-
.next_column_with_factory(
74-
|descr, props, page_writer, on_close| match data_type {
75-
ArrowDataType::Utf8
76-
| ArrowDataType::LargeUtf8
77-
| ArrowDataType::Binary
78-
| ArrowDataType::LargeBinary => Ok(byte_array::make_byte_array_writer(
79-
descr,
80-
data_type.clone(),
81-
props.clone(),
82-
page_writer,
83-
on_close,
84-
)),
85-
_ => {
86-
let column_writer =
87-
get_column_writer(descr, props.clone(), page_writer);
88-
89-
let serialized_writer =
90-
SerializedColumnWriter::new(column_writer, Some(on_close));
91-
92-
Ok(Box::new(ColumnArrayWriter(Some(serialized_writer))))
93-
}
94-
},
95-
)?
96-
.expect("Unable to get column writer");
97-
Ok(array_writer)
98-
}
99-
10048
/// Arrow writer
10149
///
10250
/// Writes Arrow `RecordBatch`es to a Parquet writer, buffering up `RecordBatch` in order
@@ -314,22 +262,24 @@ fn write_leaves<W: Write>(
314262
| ArrowDataType::Time64(_)
315263
| ArrowDataType::Duration(_)
316264
| ArrowDataType::Interval(_)
317-
| ArrowDataType::LargeBinary
318-
| ArrowDataType::Binary
319-
| ArrowDataType::Utf8
320-
| ArrowDataType::LargeUtf8
321265
| ArrowDataType::Decimal128(_, _)
322266
| ArrowDataType::Decimal256(_, _)
323267
| ArrowDataType::FixedSizeBinary(_) => {
324-
let mut writer = get_writer(row_group_writer, &data_type)?;
268+
let mut col_writer = row_group_writer.next_column()?.unwrap();
325269
for (array, levels) in arrays.iter().zip(levels.iter_mut()) {
326-
writer.write(
327-
array,
328-
levels.pop().expect("Levels exhausted"),
329-
)?;
270+
write_leaf(col_writer.untyped(), array, levels.pop().expect("Levels exhausted"))?;
330271
}
331-
writer.close()?;
332-
Ok(())
272+
col_writer.close()
273+
}
274+
ArrowDataType::LargeBinary
275+
| ArrowDataType::Binary
276+
| ArrowDataType::Utf8
277+
| ArrowDataType::LargeUtf8 => {
278+
let mut col_writer = row_group_writer.next_column_with_factory(ByteArrayWriter::new)?.unwrap();
279+
for (array, levels) in arrays.iter().zip(levels.iter_mut()) {
280+
col_writer.write(array, levels.pop().expect("Levels exhausted"))?;
281+
}
282+
col_writer.close()
333283
}
334284
ArrowDataType::List(_) | ArrowDataType::LargeList(_) => {
335285
let arrays: Vec<_> = arrays.iter().map(|array|{
@@ -380,18 +330,21 @@ fn write_leaves<W: Write>(
380330
write_leaves(row_group_writer, &values, levels)?;
381331
Ok(())
382332
}
383-
ArrowDataType::Dictionary(_, value_type) => {
384-
let mut writer = get_writer(row_group_writer, value_type)?;
385-
for (array, levels) in arrays.iter().zip(levels.iter_mut()) {
386-
// cast dictionary to a primitive
387-
let array = arrow::compute::cast(array, value_type)?;
388-
writer.write(
389-
&array,
390-
levels.pop().expect("Levels exhausted"),
391-
)?;
333+
ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
334+
ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 | ArrowDataType::Binary | ArrowDataType::LargeBinary => {
335+
let mut col_writer = row_group_writer.next_column_with_factory(ByteArrayWriter::new)?.unwrap();
336+
for (array, levels) in arrays.iter().zip(levels.iter_mut()) {
337+
col_writer.write(array, levels.pop().expect("Levels exhausted"))?;
338+
}
339+
col_writer.close()
340+
}
341+
_ => {
342+
let mut col_writer = row_group_writer.next_column()?.unwrap();
343+
for (array, levels) in arrays.iter().zip(levels.iter_mut()) {
344+
write_leaf(col_writer.untyped(), array, levels.pop().expect("Levels exhausted"))?;
345+
}
346+
col_writer.close()
392347
}
393-
writer.close()?;
394-
Ok(())
395348
}
396349
ArrowDataType::Float16 => Err(ParquetError::ArrowError(
397350
"Float16 arrays not supported".to_string(),

0 commit comments

Comments
 (0)