diff --git a/arrow-array/src/array/run_array.rs b/arrow-array/src/array/run_array.rs index 0e39cd288340..c2bb64d19f10 100644 --- a/arrow-array/src/array/run_array.rs +++ b/arrow-array/src/array/run_array.rs @@ -24,8 +24,9 @@ use arrow_schema::{ArrowError, DataType, Field}; use crate::{ builder::StringRunBuilder, make_array, + run_iterator::RunArrayIter, types::{Int16Type, Int32Type, Int64Type, RunEndIndexType}, - Array, ArrayRef, PrimitiveArray, + Array, ArrayAccessor, ArrayRef, PrimitiveArray, }; /// @@ -121,6 +122,27 @@ impl RunArray { pub fn values(&self) -> &ArrayRef { &self.values } + + /// Downcast this [`RunArray`] to a [`TypedRunArray`] + /// + /// ``` + /// use arrow_array::{Array, ArrayAccessor, RunArray, StringArray, types::Int32Type}; + /// + /// let orig = [Some("a"), Some("b"), None]; + /// let run_array = RunArray::::from_iter(orig); + /// let typed = run_array.downcast::().unwrap(); + /// assert_eq!(typed.value(0), "a"); + /// assert_eq!(typed.value(1), "b"); + /// assert!(typed.values().is_null(2)); + /// ``` + /// + pub fn downcast(&self) -> Option> { + let values = self.values.as_any().downcast_ref()?; + Some(TypedRunArray { + run_array: self, + values, + }) + } } impl From for RunArray { @@ -274,15 +296,195 @@ pub type Int32RunArray = RunArray; /// ``` pub type Int64RunArray = RunArray; +/// A strongly-typed wrapper around a [`RunArray`] that implements [`ArrayAccessor`] +/// and [`IntoIterator`] allowing fast access to its elements +/// +/// ``` +/// use arrow_array::{RunArray, StringArray, types::Int32Type}; +/// +/// let orig = ["a", "b", "a", "b"]; +/// let ree_array = RunArray::::from_iter(orig); +/// +/// // `TypedRunArray` allows you to access the values directly +/// let typed = ree_array.downcast::().unwrap(); +/// +/// for (maybe_val, orig) in typed.into_iter().zip(orig) { +/// assert_eq!(maybe_val.unwrap(), orig) +/// } +/// ``` +pub struct TypedRunArray<'a, R: RunEndIndexType, V> { + /// The run array + run_array: &'a RunArray, + + /// The values of the run_array + values: &'a V, +} + +// Manually implement `Clone` to avoid `V: Clone` type constraint +impl<'a, R: RunEndIndexType, V> Clone for TypedRunArray<'a, R, V> { + fn clone(&self) -> Self { + Self { + run_array: self.run_array, + values: self.values, + } + } +} + +impl<'a, R: RunEndIndexType, V> Copy for TypedRunArray<'a, R, V> {} + +impl<'a, R: RunEndIndexType, V> std::fmt::Debug for TypedRunArray<'a, R, V> { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + writeln!(f, "TypedRunArray({:?})", self.run_array) + } +} + +impl<'a, R: RunEndIndexType, V> TypedRunArray<'a, R, V> { + /// Returns the run_ends of this [`TypedRunArray`] + pub fn run_ends(&self) -> &'a PrimitiveArray { + self.run_array.run_ends() + } + + /// Returns the values of this [`TypedRunArray`] + pub fn values(&self) -> &'a V { + self.values + } + + /// Returns index to the physcial array for the given index to the logical array. + /// Performs a binary search on the run_ends array for the input index. + #[inline] + pub fn get_physical_index(&self, logical_index: usize) -> Option { + if logical_index >= self.run_array.len() { + return None; + } + let mut st: usize = 0; + let mut en: usize = self.run_ends().len(); + while st + 1 < en { + let mid: usize = (st + en) / 2; + if logical_index + < unsafe { + // Safety: + // The value of mid will always be between 1 and len - 1, + // where len is length of run ends array. + // This is based on the fact that `st` starts with 0 and + // `en` starts with len. The condition `st + 1 < en` ensures + // `st` and `en` differs atleast by two. So the value of `mid` + // will never be either `st` or `en` + self.run_ends().value_unchecked(mid - 1).as_usize() + } + { + en = mid + } else { + st = mid + } + } + Some(st) + } +} + +impl<'a, R: RunEndIndexType, V: Sync> Array for TypedRunArray<'a, R, V> { + fn as_any(&self) -> &dyn Any { + self.run_array + } + + fn data(&self) -> &ArrayData { + &self.run_array.data + } + + fn into_data(self) -> ArrayData { + self.run_array.into_data() + } +} + +// Array accessor converts the index of logical array to the index of the physical array +// using binary search. The time complexity is O(log N) where N is number of runs. +impl<'a, R, V> ArrayAccessor for TypedRunArray<'a, R, V> +where + R: RunEndIndexType, + V: Sync + Send, + &'a V: ArrayAccessor, + <&'a V as ArrayAccessor>::Item: Default, +{ + type Item = <&'a V as ArrayAccessor>::Item; + + fn value(&self, logical_index: usize) -> Self::Item { + assert!( + logical_index < self.len(), + "Trying to access an element at index {} from a TypedRunArray of length {}", + logical_index, + self.len() + ); + unsafe { self.value_unchecked(logical_index) } + } + + unsafe fn value_unchecked(&self, logical_index: usize) -> Self::Item { + let physical_index = self.get_physical_index(logical_index).unwrap(); + self.values().value_unchecked(physical_index) + } +} + +impl<'a, R, V> IntoIterator for TypedRunArray<'a, R, V> +where + R: RunEndIndexType, + V: Sync + Send, + &'a V: ArrayAccessor, + <&'a V as ArrayAccessor>::Item: Default, +{ + type Item = Option<<&'a V as ArrayAccessor>::Item>; + type IntoIter = RunArrayIter<'a, R, V>; + + fn into_iter(self) -> Self::IntoIter { + RunArrayIter::new(self) + } +} + #[cfg(test)] mod tests { use std::sync::Arc; + use rand::seq::SliceRandom; + use rand::thread_rng; + use rand::Rng; + use super::*; use crate::builder::PrimitiveRunBuilder; use crate::types::{Int16Type, Int32Type, Int8Type, UInt32Type}; use crate::{Array, Int16Array, Int32Array, StringArray}; + fn build_input_array(approx_size: usize) -> Vec> { + // The input array is created by shuffling and repeating + // the seed values random number of times. + let mut seed: Vec> = vec![ + None, + None, + Some(1), + Some(2), + Some(3), + Some(4), + Some(5), + Some(6), + ]; + let mut ix = 0; + let mut result: Vec> = Vec::with_capacity(approx_size); + let mut rng = thread_rng(); + while result.len() < approx_size { + // shuffle the seed array if all the values are iterated. + if ix == 0 { + seed.shuffle(&mut rng); + } + // repeat the items between 1 and 7 times. + let num = rand::thread_rng().gen_range(1..8); + for _ in 0..num { + result.push(seed[ix]); + } + ix += 1; + if ix == 8 { + ix = 0 + } + } + println!("Size of input array: {}", result.len()); + result + } + #[test] fn test_run_array() { // Construct a value array @@ -504,4 +706,26 @@ mod tests { let a = RunArray::::from_iter(["32"]); let _ = RunArray::::from(a.into_data()); } + + #[test] + fn test_ree_array_accessor() { + let input_array = build_input_array(256); + + // Encode the input_array to ree_array + let mut builder = + PrimitiveRunBuilder::::with_capacity(input_array.len()); + builder.extend(input_array.iter().copied()); + let run_array = builder.finish(); + let typed = run_array.downcast::>().unwrap(); + + for (i, inp_val) in input_array.iter().enumerate() { + if let Some(val) = inp_val { + let actual = typed.value(i); + assert_eq!(*val, actual) + } else { + let physical_ix = typed.get_physical_index(i).unwrap(); + assert!(typed.values().is_null(physical_ix)); + }; + } + } } diff --git a/arrow-array/src/builder/generic_byte_run_builder.rs b/arrow-array/src/builder/generic_byte_run_builder.rs index c1ecbcb5ddec..c6dbb82ff6eb 100644 --- a/arrow-array/src/builder/generic_byte_run_builder.rs +++ b/arrow-array/src/builder/generic_byte_run_builder.rs @@ -44,15 +44,14 @@ use arrow_buffer::ArrowNativeType; /// /// let mut builder = /// GenericByteRunBuilder::::new(); -/// builder.append_value(b"abc"); -/// builder.append_value(b"abc"); -/// builder.append_null(); +/// builder.extend([Some(b"abc"), Some(b"abc"), None, Some(b"def")].into_iter()); /// builder.append_value(b"def"); +/// builder.append_null(); /// let array = builder.finish(); /// /// assert_eq!( /// array.run_ends(), -/// &Int16Array::from(vec![Some(2), Some(3), Some(4)]) +/// &Int16Array::from(vec![Some(2), Some(3), Some(5), Some(6)]) /// ); /// /// let av = array.values(); @@ -60,6 +59,7 @@ use arrow_buffer::ArrowNativeType; /// assert!(!av.is_null(0)); /// assert!(av.is_null(1)); /// assert!(!av.is_null(2)); +/// assert!(av.is_null(3)); /// /// // Values are polymorphic and so require a downcast. /// let ava: &BinaryArray = as_generic_binary_array(av.as_ref()); @@ -299,6 +299,19 @@ where } } +impl Extend> for GenericByteRunBuilder +where + R: RunEndIndexType, + V: ByteArrayType, + S: AsRef, +{ + fn extend>>(&mut self, iter: T) { + for elem in iter { + self.append_option(elem); + } + } +} + /// Array builder for [`RunArray`] that encodes strings ([`Utf8Type`]). /// /// ``` @@ -315,9 +328,7 @@ where /// // The builder builds the dictionary value by value /// builder.append_value("abc"); /// builder.append_null(); -/// builder.append_value("def"); -/// builder.append_value("def"); -/// builder.append_value("abc"); +/// builder.extend([Some("def"), Some("def"), Some("abc")]); /// let array = builder.finish(); /// /// assert_eq!( @@ -356,9 +367,7 @@ pub type LargeStringRunBuilder = GenericByteRunBuilder; /// // The builder builds the dictionary value by value /// builder.append_value(b"abc"); /// builder.append_null(); -/// builder.append_value(b"def"); -/// builder.append_value(b"def"); -/// builder.append_value(b"abc"); +/// builder.extend([Some(b"def"), Some(b"def"), Some(b"abc")]); /// let array = builder.finish(); /// /// assert_eq!( @@ -387,7 +396,9 @@ mod tests { use super::*; use crate::array::Array; - use crate::types::Int16Type; + use crate::cast::as_primitive_array; + use crate::cast::as_string_array; + use crate::types::{Int16Type, Int32Type}; use crate::GenericByteArray; use crate::Int16Array; use crate::Int16RunArray; @@ -516,4 +527,24 @@ mod tests { fn test_binary_run_buider_finish_cloned() { test_bytes_run_buider_finish_cloned::(vec![b"abc", b"def", b"ghi"]); } + + #[test] + fn test_extend() { + let mut builder = StringRunBuilder::::new(); + builder.extend(["a", "a", "a", "", "", "b", "b"].into_iter().map(Some)); + builder.extend(["b", "cupcakes", "cupcakes"].into_iter().map(Some)); + let array = builder.finish(); + + assert_eq!(array.len(), 10); + assert_eq!( + as_primitive_array::(array.run_ends()).values(), + &[3, 5, 8, 10] + ); + + let str_array = as_string_array(array.values().as_ref()); + assert_eq!(str_array.value(0), "a"); + assert_eq!(str_array.value(1), ""); + assert_eq!(str_array.value(2), "b"); + assert_eq!(str_array.value(3), "cupcakes"); + } } diff --git a/arrow-array/src/builder/primitive_run_builder.rs b/arrow-array/src/builder/primitive_run_builder.rs index 82c46abfa053..41066228390d 100644 --- a/arrow-array/src/builder/primitive_run_builder.rs +++ b/arrow-array/src/builder/primitive_run_builder.rs @@ -253,6 +253,18 @@ where } } +impl Extend> for PrimitiveRunBuilder +where + R: RunEndIndexType, + V: ArrowPrimitiveType, +{ + fn extend>>(&mut self, iter: T) { + for elem in iter { + self.append_option(elem); + } + } +} + #[cfg(test)] mod tests { use crate::builder::PrimitiveRunBuilder; @@ -291,4 +303,23 @@ mod tests { assert_eq!(ava, &UInt32Array::from(vec![Some(1234), None, Some(5678)])); } + + #[test] + fn test_extend() { + let mut builder = PrimitiveRunBuilder::::new(); + builder.extend([1, 2, 2, 5, 5, 4, 4].into_iter().map(Some)); + builder.extend([4, 4, 6, 2].into_iter().map(Some)); + let array = builder.finish(); + + assert_eq!(array.len(), 11); + assert_eq!(array.null_count(), 0); + assert_eq!( + as_primitive_array::(array.run_ends()).values(), + &[1, 3, 5, 9, 10, 11] + ); + assert_eq!( + as_primitive_array::(array.values().as_ref()).values(), + &[1, 2, 5, 4, 6, 2] + ); + } } diff --git a/arrow-array/src/lib.rs b/arrow-array/src/lib.rs index d6a9ab30b85b..d8dc6efe25be 100644 --- a/arrow-array/src/lib.rs +++ b/arrow-array/src/lib.rs @@ -178,6 +178,7 @@ pub mod cast; mod delta; pub mod iterator; mod raw_pointer; +pub mod run_iterator; pub mod temporal_conversions; pub mod timezone; mod trusted_len; diff --git a/arrow-array/src/run_iterator.rs b/arrow-array/src/run_iterator.rs new file mode 100644 index 000000000000..6a7b785fe1c6 --- /dev/null +++ b/arrow-array/src/run_iterator.rs @@ -0,0 +1,273 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Idiomatic iterator for [`RunArray`](crate::Array) + +use arrow_buffer::ArrowNativeType; + +use crate::{array::ArrayAccessor, types::RunEndIndexType, Array, TypedRunArray}; + +/// The [`RunArrayIter`] provides an idiomatic way to iterate over the run array. +/// It returns Some(T) if there is a value or None if the value is null. +/// +/// The iterator comes with a cost as it has to iterate over three arrays to determine +/// the value to be returned. The run_ends array is used to determine the index of the value. +/// The nulls array is used to determine if the value is null and the values array is used to +/// get the value. +/// +/// Unlike other iterators in this crate, [`RunArrayIter`] does not use [`ArrayAccessor`] +/// because the run array accessor does binary search to access each value which is too slow. +/// The run array iterator can determine the next value in constant time. +/// +#[derive(Debug)] +pub struct RunArrayIter<'a, R, V> +where + R: RunEndIndexType, + V: Sync + Send, + &'a V: ArrayAccessor, + <&'a V as ArrayAccessor>::Item: Default, +{ + array: TypedRunArray<'a, R, V>, + current_logical: usize, + current_physical: usize, + current_end_logical: usize, + current_end_physical: usize, +} + +impl<'a, R, V> RunArrayIter<'a, R, V> +where + R: RunEndIndexType, + V: Sync + Send, + &'a V: ArrayAccessor, + <&'a V as ArrayAccessor>::Item: Default, +{ + /// create a new iterator + pub fn new(array: TypedRunArray<'a, R, V>) -> Self { + let logical_len = array.len(); + let physical_len: usize = array.values().len(); + RunArrayIter { + array, + current_logical: 0, + current_physical: 0, + current_end_logical: logical_len, + current_end_physical: physical_len, + } + } +} + +impl<'a, R, V> Iterator for RunArrayIter<'a, R, V> +where + R: RunEndIndexType, + V: Sync + Send, + &'a V: ArrayAccessor, + <&'a V as ArrayAccessor>::Item: Default, +{ + type Item = Option<<&'a V as ArrayAccessor>::Item>; + + #[inline] + fn next(&mut self) -> Option { + if self.current_logical == self.current_end_logical { + return None; + } + // If current logical index is greater than current run end index then increment + // the physical index. + if self.current_logical + >= self + .array + .run_ends() + .value(self.current_physical) + .as_usize() + { + // As the run_ends is expected to be strictly increasing, there + // should be at least one logical entry in one physical entry. Because of this + // reason the next value can be accessed by incrementing physical index once. + self.current_physical += 1; + } + if self.array.values().is_null(self.current_physical) { + self.current_logical += 1; + Some(None) + } else { + self.current_logical += 1; + // Safety: + // The self.current_physical is kept within bounds of self.current_logical. + // The self.current_logical will not go out of bounds because of the check + // `self.current_logical = self.current_end_logical` above. + unsafe { + Some(Some( + self.array.values().value_unchecked(self.current_physical), + )) + } + } + } + + fn size_hint(&self) -> (usize, Option) { + ( + self.current_end_logical - self.current_logical, + Some(self.current_end_logical - self.current_logical), + ) + } +} + +impl<'a, R, V> DoubleEndedIterator for RunArrayIter<'a, R, V> +where + R: RunEndIndexType, + V: Sync + Send, + &'a V: ArrayAccessor, + <&'a V as ArrayAccessor>::Item: Default, +{ + fn next_back(&mut self) -> Option { + if self.current_end_logical == self.current_logical { + return None; + } + + self.current_end_logical -= 1; + + if self.current_end_physical > 0 + && self.current_end_logical + < self + .array + .run_ends() + .value(self.current_end_physical - 1) + .as_usize() + { + // As the run_ends is expected to be strictly increasing, there + // should be at least one logical entry in one physical entry. Because of this + // reason the next value can be accessed by decrementing physical index once. + self.current_end_physical -= 1; + } + + Some(if self.array.values().is_null(self.current_end_physical) { + None + } else { + // Safety: + // The check `self.current_end_physical > 0` ensures the value will not underflow. + // Also self.current_end_physical starts with array.len() and + // decrements based on the bounds of self.current_end_logical. + unsafe { + Some( + self.array + .values() + .value_unchecked(self.current_end_physical), + ) + } + }) + } +} + +/// all arrays have known size. +impl<'a, R, V> ExactSizeIterator for RunArrayIter<'a, R, V> +where + R: RunEndIndexType, + V: Sync + Send, + &'a V: ArrayAccessor, + <&'a V as ArrayAccessor>::Item: Default, +{ +} + +#[cfg(test)] +mod tests { + use crate::{ + array::{Int32Array, StringArray}, + builder::PrimitiveRunBuilder, + types::Int32Type, + Int64RunArray, + }; + + #[test] + fn test_primitive_array_iter_round_trip() { + let mut input_vec = vec![ + Some(32), + Some(32), + None, + Some(64), + Some(64), + Some(64), + Some(72), + ]; + let mut builder = PrimitiveRunBuilder::::new(); + builder.extend(input_vec.clone().into_iter()); + let ree_array = builder.finish(); + let ree_array = ree_array.downcast::().unwrap(); + + let output_vec: Vec> = ree_array.into_iter().collect(); + assert_eq!(input_vec, output_vec); + + let rev_output_vec: Vec> = ree_array.into_iter().rev().collect(); + input_vec.reverse(); + assert_eq!(input_vec, rev_output_vec); + } + + #[test] + fn test_double_ended() { + let input_vec = vec![ + Some(32), + Some(32), + None, + Some(64), + Some(64), + Some(64), + Some(72), + ]; + let mut builder = PrimitiveRunBuilder::::new(); + builder.extend(input_vec.into_iter()); + let ree_array = builder.finish(); + let ree_array = ree_array.downcast::().unwrap(); + + let mut iter = ree_array.into_iter(); + assert_eq!(Some(Some(32)), iter.next()); + assert_eq!(Some(Some(72)), iter.next_back()); + assert_eq!(Some(Some(32)), iter.next()); + assert_eq!(Some(Some(64)), iter.next_back()); + assert_eq!(Some(None), iter.next()); + assert_eq!(Some(Some(64)), iter.next_back()); + assert_eq!(Some(Some(64)), iter.next()); + assert_eq!(None, iter.next_back()); + assert_eq!(None, iter.next()); + } + + #[test] + fn test_string_array_iter_round_trip() { + let input_vec = vec!["ab", "ab", "ba", "cc", "cc"]; + let input_ree_array: Int64RunArray = input_vec.into_iter().collect(); + let string_ree_array = input_ree_array.downcast::().unwrap(); + + // to and from iter, with a +1 + let result: Vec> = string_ree_array + .into_iter() + .map(|e| { + e.map(|e| { + let mut a = e.to_string(); + a.push('b'); + a + }) + }) + .collect(); + + let result_asref: Vec> = + result.iter().map(|f| f.as_deref()).collect(); + + let expected_vec = vec![ + Some("abb"), + Some("abb"), + Some("bab"), + Some("ccb"), + Some("ccb"), + ]; + + assert_eq!(expected_vec, result_asref); + } +} diff --git a/arrow/Cargo.toml b/arrow/Cargo.toml index ee926ee52868..decfeb949a08 100644 --- a/arrow/Cargo.toml +++ b/arrow/Cargo.toml @@ -237,6 +237,10 @@ required-features = ["test_utils"] name = "string_dictionary_builder" harness = false +[[bench]] +name = "string_run_builder" +harness = false + [[bench]] name = "substring_kernels" harness = false diff --git a/arrow/benches/string_run_builder.rs b/arrow/benches/string_run_builder.rs new file mode 100644 index 000000000000..2f0401bbef48 --- /dev/null +++ b/arrow/benches/string_run_builder.rs @@ -0,0 +1,80 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::StringRunBuilder; +use arrow::datatypes::Int32Type; +use criterion::{criterion_group, criterion_main, Criterion}; +use rand::{thread_rng, Rng}; + +fn build_strings( + physical_array_len: usize, + logical_array_len: usize, + string_len: usize, +) -> Vec { + let mut rng = thread_rng(); + let run_len = logical_array_len / physical_array_len; + let mut values: Vec = (0..physical_array_len) + .map(|_| (0..string_len).map(|_| rng.gen::()).collect()) + .flat_map(|s| std::iter::repeat(s).take(run_len)) + .collect(); + while values.len() < logical_array_len { + let last_val = values[values.len() - 1].clone(); + values.push(last_val); + } + values +} + +fn criterion_benchmark(c: &mut Criterion) { + let mut group = c.benchmark_group("string_run_builder"); + + let mut do_bench = |physical_array_len: usize, + logical_array_len: usize, + string_len: usize| { + group.bench_function( + format!( + "(run_array_len:{logical_array_len}, physical_array_len:{physical_array_len}, string_len: {string_len})", + ), + |b| { + let strings = + build_strings(physical_array_len, logical_array_len, string_len); + b.iter(|| { + let mut builder = StringRunBuilder::::with_capacity( + physical_array_len, + (string_len + 1) * physical_array_len, + ); + + for val in &strings { + builder.append_value(val); + } + + builder.finish(); + }) + }, + ); + }; + + do_bench(20, 1000, 5); + do_bench(100, 1000, 5); + do_bench(100, 1000, 10); + do_bench(100, 10000, 10); + do_bench(100, 10000, 100); + + group.finish(); +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches);