@@ -24,8 +24,9 @@ use arrow_schema::{ArrowError, DataType, Field};
24
24
use crate :: {
25
25
builder:: StringRunBuilder ,
26
26
make_array,
27
+ run_iterator:: RunArrayIter ,
27
28
types:: { Int16Type , Int32Type , Int64Type , RunEndIndexType } ,
28
- Array , ArrayRef , PrimitiveArray ,
29
+ Array , ArrayAccessor , ArrayRef , PrimitiveArray ,
29
30
} ;
30
31
31
32
///
@@ -121,6 +122,27 @@ impl<R: RunEndIndexType> RunArray<R> {
121
122
pub fn values ( & self ) -> & ArrayRef {
122
123
& self . values
123
124
}
125
+
126
+ /// Downcast this [`RunArray`] to a [`TypedRunArray`]
127
+ ///
128
+ /// ```
129
+ /// use arrow_array::{Array, ArrayAccessor, RunArray, StringArray, types::Int32Type};
130
+ ///
131
+ /// let orig = [Some("a"), Some("b"), None];
132
+ /// let run_array = RunArray::<Int32Type>::from_iter(orig);
133
+ /// let typed = run_array.downcast::<StringArray>().unwrap();
134
+ /// assert_eq!(typed.value(0), "a");
135
+ /// assert_eq!(typed.value(1), "b");
136
+ /// assert!(typed.values().is_null(2));
137
+ /// ```
138
+ ///
139
+ pub fn downcast < V : ' static > ( & self ) -> Option < TypedRunArray < ' _ , R , V > > {
140
+ let values = self . values . as_any ( ) . downcast_ref ( ) ?;
141
+ Some ( TypedRunArray {
142
+ run_array : self ,
143
+ values,
144
+ } )
145
+ }
124
146
}
125
147
126
148
impl < R : RunEndIndexType > From < ArrayData > for RunArray < R > {
@@ -274,15 +296,195 @@ pub type Int32RunArray = RunArray<Int32Type>;
274
296
/// ```
275
297
pub type Int64RunArray = RunArray < Int64Type > ;
276
298
299
+ /// A strongly-typed wrapper around a [`RunArray`] that implements [`ArrayAccessor`]
300
+ /// and [`IntoIterator`] allowing fast access to its elements
301
+ ///
302
+ /// ```
303
+ /// use arrow_array::{RunArray, StringArray, types::Int32Type};
304
+ ///
305
+ /// let orig = ["a", "b", "a", "b"];
306
+ /// let ree_array = RunArray::<Int32Type>::from_iter(orig);
307
+ ///
308
+ /// // `TypedRunArray` allows you to access the values directly
309
+ /// let typed = ree_array.downcast::<StringArray>().unwrap();
310
+ ///
311
+ /// for (maybe_val, orig) in typed.into_iter().zip(orig) {
312
+ /// assert_eq!(maybe_val.unwrap(), orig)
313
+ /// }
314
+ /// ```
315
+ pub struct TypedRunArray < ' a , R : RunEndIndexType , V > {
316
+ /// The run array
317
+ run_array : & ' a RunArray < R > ,
318
+
319
+ /// The values of the run_array
320
+ values : & ' a V ,
321
+ }
322
+
323
+ // Manually implement `Clone` to avoid `V: Clone` type constraint
324
+ impl < ' a , R : RunEndIndexType , V > Clone for TypedRunArray < ' a , R , V > {
325
+ fn clone ( & self ) -> Self {
326
+ Self {
327
+ run_array : self . run_array ,
328
+ values : self . values ,
329
+ }
330
+ }
331
+ }
332
+
333
+ impl < ' a , R : RunEndIndexType , V > Copy for TypedRunArray < ' a , R , V > { }
334
+
335
+ impl < ' a , R : RunEndIndexType , V > std:: fmt:: Debug for TypedRunArray < ' a , R , V > {
336
+ fn fmt ( & self , f : & mut std:: fmt:: Formatter ) -> std:: fmt:: Result {
337
+ writeln ! ( f, "TypedRunArray({:?})" , self . run_array)
338
+ }
339
+ }
340
+
341
+ impl < ' a , R : RunEndIndexType , V > TypedRunArray < ' a , R , V > {
342
+ /// Returns the run_ends of this [`TypedRunArray`]
343
+ pub fn run_ends ( & self ) -> & ' a PrimitiveArray < R > {
344
+ self . run_array . run_ends ( )
345
+ }
346
+
347
+ /// Returns the values of this [`TypedRunArray`]
348
+ pub fn values ( & self ) -> & ' a V {
349
+ self . values
350
+ }
351
+
352
+ /// Returns index to the physcial array for the given index to the logical array.
353
+ /// Performs a binary search on the run_ends array for the input index.
354
+ #[ inline]
355
+ pub fn get_physical_index ( & self , logical_index : usize ) -> Option < usize > {
356
+ if logical_index >= self . run_array . len ( ) {
357
+ return None ;
358
+ }
359
+ let mut st: usize = 0 ;
360
+ let mut en: usize = self . run_ends ( ) . len ( ) ;
361
+ while st + 1 < en {
362
+ let mid: usize = ( st + en) / 2 ;
363
+ if logical_index
364
+ < unsafe {
365
+ // Safety:
366
+ // The value of mid will always be between 1 and len - 1,
367
+ // where len is length of run ends array.
368
+ // This is based on the fact that `st` starts with 0 and
369
+ // `en` starts with len. The condition `st + 1 < en` ensures
370
+ // `st` and `en` differs atleast by two. So the value of `mid`
371
+ // will never be either `st` or `en`
372
+ self . run_ends ( ) . value_unchecked ( mid - 1 ) . as_usize ( )
373
+ }
374
+ {
375
+ en = mid
376
+ } else {
377
+ st = mid
378
+ }
379
+ }
380
+ Some ( st)
381
+ }
382
+ }
383
+
384
+ impl < ' a , R : RunEndIndexType , V : Sync > Array for TypedRunArray < ' a , R , V > {
385
+ fn as_any ( & self ) -> & dyn Any {
386
+ self . run_array
387
+ }
388
+
389
+ fn data ( & self ) -> & ArrayData {
390
+ & self . run_array . data
391
+ }
392
+
393
+ fn into_data ( self ) -> ArrayData {
394
+ self . run_array . into_data ( )
395
+ }
396
+ }
397
+
398
+ // Array accessor converts the index of logical array to the index of the physical array
399
+ // using binary search. The time complexity is O(log N) where N is number of runs.
400
+ impl < ' a , R , V > ArrayAccessor for TypedRunArray < ' a , R , V >
401
+ where
402
+ R : RunEndIndexType ,
403
+ V : Sync + Send ,
404
+ & ' a V : ArrayAccessor ,
405
+ <& ' a V as ArrayAccessor >:: Item : Default ,
406
+ {
407
+ type Item = <& ' a V as ArrayAccessor >:: Item ;
408
+
409
+ fn value ( & self , logical_index : usize ) -> Self :: Item {
410
+ assert ! (
411
+ logical_index < self . len( ) ,
412
+ "Trying to access an element at index {} from a TypedRunArray of length {}" ,
413
+ logical_index,
414
+ self . len( )
415
+ ) ;
416
+ unsafe { self . value_unchecked ( logical_index) }
417
+ }
418
+
419
+ unsafe fn value_unchecked ( & self , logical_index : usize ) -> Self :: Item {
420
+ let physical_index = self . get_physical_index ( logical_index) . unwrap ( ) ;
421
+ self . values ( ) . value_unchecked ( physical_index)
422
+ }
423
+ }
424
+
425
+ impl < ' a , R , V > IntoIterator for TypedRunArray < ' a , R , V >
426
+ where
427
+ R : RunEndIndexType ,
428
+ V : Sync + Send ,
429
+ & ' a V : ArrayAccessor ,
430
+ <& ' a V as ArrayAccessor >:: Item : Default ,
431
+ {
432
+ type Item = Option < <& ' a V as ArrayAccessor >:: Item > ;
433
+ type IntoIter = RunArrayIter < ' a , R , V > ;
434
+
435
+ fn into_iter ( self ) -> Self :: IntoIter {
436
+ RunArrayIter :: new ( self )
437
+ }
438
+ }
439
+
277
440
#[ cfg( test) ]
278
441
mod tests {
279
442
use std:: sync:: Arc ;
280
443
444
+ use rand:: seq:: SliceRandom ;
445
+ use rand:: thread_rng;
446
+ use rand:: Rng ;
447
+
281
448
use super :: * ;
282
449
use crate :: builder:: PrimitiveRunBuilder ;
283
450
use crate :: types:: { Int16Type , Int32Type , Int8Type , UInt32Type } ;
284
451
use crate :: { Array , Int16Array , Int32Array , StringArray } ;
285
452
453
+ fn build_input_array ( approx_size : usize ) -> Vec < Option < i32 > > {
454
+ // The input array is created by shuffling and repeating
455
+ // the seed values random number of times.
456
+ let mut seed: Vec < Option < i32 > > = vec ! [
457
+ None ,
458
+ None ,
459
+ Some ( 1 ) ,
460
+ Some ( 2 ) ,
461
+ Some ( 3 ) ,
462
+ Some ( 4 ) ,
463
+ Some ( 5 ) ,
464
+ Some ( 6 ) ,
465
+ ] ;
466
+ let mut ix = 0 ;
467
+ let mut result: Vec < Option < i32 > > = Vec :: with_capacity ( approx_size) ;
468
+ let mut rng = thread_rng ( ) ;
469
+ while result. len ( ) < approx_size {
470
+ // shuffle the seed array if all the values are iterated.
471
+ if ix == 0 {
472
+ seed. shuffle ( & mut rng) ;
473
+ }
474
+ // repeat the items between 1 and 7 times.
475
+ let num = rand:: thread_rng ( ) . gen_range ( 1 ..8 ) ;
476
+ for _ in 0 ..num {
477
+ result. push ( seed[ ix] ) ;
478
+ }
479
+ ix += 1 ;
480
+ if ix == 8 {
481
+ ix = 0
482
+ }
483
+ }
484
+ println ! ( "Size of input array: {}" , result. len( ) ) ;
485
+ result
486
+ }
487
+
286
488
#[ test]
287
489
fn test_run_array ( ) {
288
490
// Construct a value array
@@ -504,4 +706,26 @@ mod tests {
504
706
let a = RunArray :: < Int32Type > :: from_iter ( [ "32" ] ) ;
505
707
let _ = RunArray :: < Int64Type > :: from ( a. into_data ( ) ) ;
506
708
}
709
+
710
+ #[ test]
711
+ fn test_ree_array_accessor ( ) {
712
+ let input_array = build_input_array ( 256 ) ;
713
+
714
+ // Encode the input_array to ree_array
715
+ let mut builder =
716
+ PrimitiveRunBuilder :: < Int16Type , Int32Type > :: with_capacity ( input_array. len ( ) ) ;
717
+ builder. extend ( input_array. iter ( ) . copied ( ) ) ;
718
+ let run_array = builder. finish ( ) ;
719
+ let typed = run_array. downcast :: < PrimitiveArray < Int32Type > > ( ) . unwrap ( ) ;
720
+
721
+ for ( i, inp_val) in input_array. iter ( ) . enumerate ( ) {
722
+ if let Some ( val) = inp_val {
723
+ let actual = typed. value ( i) ;
724
+ assert_eq ! ( * val, actual)
725
+ } else {
726
+ let physical_ix = typed. get_physical_index ( i) . unwrap ( ) ;
727
+ assert ! ( typed. values( ) . is_null( physical_ix) ) ;
728
+ } ;
729
+ }
730
+ }
507
731
}
0 commit comments