Skip to content

Commit af777cd

Browse files
authored
Add support StringView / BinaryView in interleave kernel (#6779)
* add byteview specific interleave * clippy * test * more clippy * more test coverage * enable assertion, remove explicit vector capacity * add new test, address comments
1 parent db20a81 commit af777cd

File tree

1 file changed

+244
-0
lines changed

1 file changed

+244
-0
lines changed

arrow-select/src/interleave.rs

Lines changed: 244 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ use arrow_array::types::*;
2424
use arrow_array::*;
2525
use arrow_buffer::{ArrowNativeType, MutableBuffer, NullBuffer, NullBufferBuilder, OffsetBuffer};
2626
use arrow_data::transform::MutableArrayData;
27+
use arrow_data::ByteView;
2728
use arrow_schema::{ArrowError, DataType};
29+
use std::collections::HashMap;
2830
use std::sync::Arc;
2931

3032
macro_rules! primitive_helper {
@@ -97,6 +99,8 @@ pub fn interleave(
9799
DataType::LargeUtf8 => interleave_bytes::<LargeUtf8Type>(values, indices),
98100
DataType::Binary => interleave_bytes::<BinaryType>(values, indices),
99101
DataType::LargeBinary => interleave_bytes::<LargeBinaryType>(values, indices),
102+
DataType::BinaryView => interleave_views::<BinaryViewType>(values, indices),
103+
DataType::Utf8View => interleave_views::<StringViewType>(values, indices),
100104
DataType::Dictionary(k, _) => downcast_integer! {
101105
k.as_ref() => (dict_helper, values, indices),
102106
_ => unreachable!("illegal dictionary key type {k}")
@@ -231,6 +235,41 @@ fn interleave_dictionaries<K: ArrowDictionaryKeyType>(
231235
Ok(Arc::new(array))
232236
}
233237

238+
fn interleave_views<T: ByteViewType>(
239+
values: &[&dyn Array],
240+
indices: &[(usize, usize)],
241+
) -> Result<ArrayRef, ArrowError> {
242+
let interleaved = Interleave::<'_, GenericByteViewArray<T>>::new(values, indices);
243+
let mut views_builder = BufferBuilder::new(indices.len());
244+
let mut buffers = Vec::new();
245+
246+
// (input array_index, input buffer_index) -> output buffer_index
247+
let mut buffer_lookup: HashMap<(usize, u32), u32> = HashMap::new();
248+
for (array_idx, value_idx) in indices {
249+
let array = interleaved.arrays[*array_idx];
250+
let raw_view = array.views().get(*value_idx).unwrap();
251+
let view_len = *raw_view as u32;
252+
if view_len <= 12 {
253+
views_builder.append(*raw_view);
254+
continue;
255+
}
256+
// value is big enough to be in a variadic buffer
257+
let view = ByteView::from(*raw_view);
258+
let new_buffer_idx: &mut u32 = buffer_lookup
259+
.entry((*array_idx, view.buffer_index))
260+
.or_insert_with(|| {
261+
buffers.push(array.data_buffers()[view.buffer_index as usize].clone());
262+
(buffers.len() - 1) as u32
263+
});
264+
views_builder.append(view.with_buffer_index(*new_buffer_idx).into());
265+
}
266+
267+
let array = unsafe {
268+
GenericByteViewArray::<T>::new_unchecked(views_builder.into(), buffers, interleaved.nulls)
269+
};
270+
Ok(Arc::new(array))
271+
}
272+
234273
/// Fallback implementation of interleave using [`MutableArrayData`]
235274
fn interleave_fallback(
236275
values: &[&dyn Array],
@@ -461,4 +500,209 @@ mod tests {
461500
DictionaryArray::<Int32Type>::from_iter(vec![Some("0"), Some("1"), Some("2"), None]);
462501
assert_eq!(array.as_ref(), &expected)
463502
}
503+
504+
#[test]
505+
fn test_interleave_views() {
506+
let values = StringArray::from_iter_values([
507+
"hello",
508+
"world_long_string_not_inlined",
509+
"foo",
510+
"bar",
511+
"baz",
512+
]);
513+
let view_a = StringViewArray::from(&values);
514+
515+
let values = StringArray::from_iter_values([
516+
"test",
517+
"data",
518+
"more_long_string_not_inlined",
519+
"views",
520+
"here",
521+
]);
522+
let view_b = StringViewArray::from(&values);
523+
524+
let indices = &[
525+
(0, 2), // "foo"
526+
(1, 0), // "test"
527+
(0, 4), // "baz"
528+
(1, 3), // "views"
529+
(0, 1), // "world_long_string_not_inlined"
530+
];
531+
532+
// Test specialized implementation
533+
let values = interleave(&[&view_a, &view_b], indices).unwrap();
534+
let result = values.as_string_view();
535+
assert_eq!(result.data_buffers().len(), 1);
536+
537+
let fallback = interleave_fallback(&[&view_a, &view_b], indices).unwrap();
538+
let fallback_result = fallback.as_string_view();
539+
// note that fallback_result has 2 buffers, but only one long enough string to warrant a buffer
540+
assert_eq!(fallback_result.data_buffers().len(), 2);
541+
542+
// Convert to strings for easier assertion
543+
let collected: Vec<_> = result.iter().map(|x| x.map(|s| s.to_string())).collect();
544+
545+
let fallback_collected: Vec<_> = fallback_result
546+
.iter()
547+
.map(|x| x.map(|s| s.to_string()))
548+
.collect();
549+
550+
assert_eq!(&collected, &fallback_collected);
551+
552+
assert_eq!(
553+
&collected,
554+
&[
555+
Some("foo".to_string()),
556+
Some("test".to_string()),
557+
Some("baz".to_string()),
558+
Some("views".to_string()),
559+
Some("world_long_string_not_inlined".to_string()),
560+
]
561+
);
562+
}
563+
564+
#[test]
565+
fn test_interleave_views_with_nulls() {
566+
let values = StringArray::from_iter([
567+
Some("hello"),
568+
None,
569+
Some("foo_long_string_not_inlined"),
570+
Some("bar"),
571+
None,
572+
]);
573+
let view_a = StringViewArray::from(&values);
574+
575+
let values = StringArray::from_iter([
576+
Some("test"),
577+
Some("data_long_string_not_inlined"),
578+
None,
579+
None,
580+
Some("here"),
581+
]);
582+
let view_b = StringViewArray::from(&values);
583+
584+
let indices = &[
585+
(0, 1), // null
586+
(1, 2), // null
587+
(0, 2), // "foo_long_string_not_inlined"
588+
(1, 3), // null
589+
(0, 4), // null
590+
];
591+
592+
// Test specialized implementation
593+
let values = interleave(&[&view_a, &view_b], indices).unwrap();
594+
let result = values.as_string_view();
595+
assert_eq!(result.data_buffers().len(), 1);
596+
597+
let fallback = interleave_fallback(&[&view_a, &view_b], indices).unwrap();
598+
let fallback_result = fallback.as_string_view();
599+
600+
// Convert to strings for easier assertion
601+
let collected: Vec<_> = result.iter().map(|x| x.map(|s| s.to_string())).collect();
602+
603+
let fallback_collected: Vec<_> = fallback_result
604+
.iter()
605+
.map(|x| x.map(|s| s.to_string()))
606+
.collect();
607+
608+
assert_eq!(&collected, &fallback_collected);
609+
610+
assert_eq!(
611+
&collected,
612+
&[
613+
None,
614+
None,
615+
Some("foo_long_string_not_inlined".to_string()),
616+
None,
617+
None,
618+
]
619+
);
620+
}
621+
622+
#[test]
623+
fn test_interleave_views_multiple_buffers() {
624+
let str1 = "very_long_string_from_first_buffer".as_bytes();
625+
let str2 = "very_long_string_from_second_buffer".as_bytes();
626+
let buffer1 = str1.to_vec().into();
627+
let buffer2 = str2.to_vec().into();
628+
629+
let view1 = ByteView::new(str1.len() as u32, &str1[..4])
630+
.with_buffer_index(0)
631+
.with_offset(0)
632+
.as_u128();
633+
let view2 = ByteView::new(str2.len() as u32, &str2[..4])
634+
.with_buffer_index(1)
635+
.with_offset(0)
636+
.as_u128();
637+
let view_a =
638+
StringViewArray::try_new(vec![view1, view2].into(), vec![buffer1, buffer2], None)
639+
.unwrap();
640+
641+
let str3 = "another_very_long_string_buffer_three".as_bytes();
642+
let str4 = "different_long_string_in_buffer_four".as_bytes();
643+
let buffer3 = str3.to_vec().into();
644+
let buffer4 = str4.to_vec().into();
645+
646+
let view3 = ByteView::new(str3.len() as u32, &str3[..4])
647+
.with_buffer_index(0)
648+
.with_offset(0)
649+
.as_u128();
650+
let view4 = ByteView::new(str4.len() as u32, &str4[..4])
651+
.with_buffer_index(1)
652+
.with_offset(0)
653+
.as_u128();
654+
let view_b =
655+
StringViewArray::try_new(vec![view3, view4].into(), vec![buffer3, buffer4], None)
656+
.unwrap();
657+
658+
let indices = &[
659+
(0, 0), // String from first buffer of array A
660+
(1, 0), // String from first buffer of array B
661+
(0, 1), // String from second buffer of array A
662+
(1, 1), // String from second buffer of array B
663+
(0, 0), // String from first buffer of array A again
664+
(1, 1), // String from second buffer of array B again
665+
];
666+
667+
// Test interleave
668+
let values = interleave(&[&view_a, &view_b], indices).unwrap();
669+
let result = values.as_string_view();
670+
671+
assert_eq!(
672+
result.data_buffers().len(),
673+
4,
674+
"Expected four buffers (two from each input array)"
675+
);
676+
677+
let result_strings: Vec<_> = result.iter().map(|x| x.map(|s| s.to_string())).collect();
678+
assert_eq!(
679+
result_strings,
680+
vec![
681+
Some("very_long_string_from_first_buffer".to_string()),
682+
Some("another_very_long_string_buffer_three".to_string()),
683+
Some("very_long_string_from_second_buffer".to_string()),
684+
Some("different_long_string_in_buffer_four".to_string()),
685+
Some("very_long_string_from_first_buffer".to_string()),
686+
Some("different_long_string_in_buffer_four".to_string()),
687+
]
688+
);
689+
690+
let views = result.views();
691+
let buffer_indices: Vec<_> = views
692+
.iter()
693+
.map(|raw_view| ByteView::from(*raw_view).buffer_index)
694+
.collect();
695+
696+
assert_eq!(
697+
buffer_indices,
698+
vec![
699+
0, // First buffer from array A
700+
1, // First buffer from array B
701+
2, // Second buffer from array A
702+
3, // Second buffer from array B
703+
0, // First buffer from array A (reused)
704+
3, // Second buffer from array B (reused)
705+
]
706+
);
707+
}
464708
}

0 commit comments

Comments
 (0)