Skip to content

Commit 30c4fd7

Browse files
Port ArrayDistinct to functions-array subcrate (#9549)
* Issue-9545 - Port ArrayDistinct to function-arrays subcrate * Issue-9545 - Add test coverage on roundtrip_logical_plan * Issue-9545 - Address review comments
1 parent 96669de commit 30c4fd7

File tree

14 files changed

+174
-60
lines changed

14 files changed

+174
-60
lines changed

datafusion/expr/src/built_in_function.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,6 @@ pub enum BuiltinScalarFunction {
114114
ArrayPopFront,
115115
/// array_pop_back
116116
ArrayPopBack,
117-
/// array_distinct
118-
ArrayDistinct,
119117
/// array_element
120118
ArrayElement,
121119
/// array_position
@@ -325,7 +323,6 @@ impl BuiltinScalarFunction {
325323
BuiltinScalarFunction::Tan => Volatility::Immutable,
326324
BuiltinScalarFunction::Tanh => Volatility::Immutable,
327325
BuiltinScalarFunction::Trunc => Volatility::Immutable,
328-
BuiltinScalarFunction::ArrayDistinct => Volatility::Immutable,
329326
BuiltinScalarFunction::ArrayElement => Volatility::Immutable,
330327
BuiltinScalarFunction::ArrayExcept => Volatility::Immutable,
331328
BuiltinScalarFunction::ArrayPopFront => Volatility::Immutable,
@@ -416,7 +413,6 @@ impl BuiltinScalarFunction {
416413
// the return type of the built in function.
417414
// Some built-in functions' return type depends on the incoming type.
418415
match self {
419-
BuiltinScalarFunction::ArrayDistinct => Ok(input_expr_types[0].clone()),
420416
BuiltinScalarFunction::ArrayElement => match &input_expr_types[0] {
421417
List(field)
422418
| LargeList(field)
@@ -658,7 +654,6 @@ impl BuiltinScalarFunction {
658654
Signature::array_and_index(self.volatility())
659655
}
660656
BuiltinScalarFunction::ArrayExcept => Signature::any(2, self.volatility()),
661-
BuiltinScalarFunction::ArrayDistinct => Signature::array(self.volatility()),
662657
BuiltinScalarFunction::ArrayPosition => {
663658
Signature::array_and_element_and_optional_index(self.volatility())
664659
}
@@ -1073,7 +1068,6 @@ impl BuiltinScalarFunction {
10731068
BuiltinScalarFunction::SHA256 => &["sha256"],
10741069
BuiltinScalarFunction::SHA384 => &["sha384"],
10751070
BuiltinScalarFunction::SHA512 => &["sha512"],
1076-
BuiltinScalarFunction::ArrayDistinct => &["array_distinct", "list_distinct"],
10771071
BuiltinScalarFunction::ArrayElement => &[
10781072
"array_element",
10791073
"array_extract",

datafusion/expr/src/expr_fn.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -612,12 +612,6 @@ scalar_expr!(
612612
first_array second_array,
613613
"Returns an array of the elements that appear in the first array but not in the second."
614614
);
615-
scalar_expr!(
616-
ArrayDistinct,
617-
array_distinct,
618-
array,
619-
"return distinct values from the array after removing duplicates."
620-
);
621615
scalar_expr!(
622616
ArrayPosition,
623617
array_position,

datafusion/functions-array/src/kernels.rs

Lines changed: 69 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,20 @@ use arrow::compute;
2828
use arrow::datatypes::Field;
2929
use arrow::datatypes::UInt64Type;
3030
use arrow::datatypes::{DataType, Date32Type, IntervalMonthDayNanoType};
31+
use arrow::row::{RowConverter, SortField};
3132
use arrow_buffer::{BooleanBufferBuilder, NullBuffer};
33+
use arrow_schema::FieldRef;
3234
use arrow_schema::SortOptions;
35+
3336
use datafusion_common::cast::{
3437
as_date32_array, as_generic_list_array, as_generic_string_array, as_int64_array,
3538
as_interval_mdn_array, as_large_list_array, as_list_array, as_null_array,
3639
as_string_array,
3740
};
38-
use datafusion_common::{exec_err, not_impl_datafusion_err, DataFusionError, Result};
41+
use datafusion_common::{
42+
exec_err, internal_err, not_impl_datafusion_err, DataFusionError, Result,
43+
};
44+
use itertools::Itertools;
3945
use std::any::type_name;
4046
use std::sync::Arc;
4147

@@ -865,3 +871,65 @@ pub fn flatten(args: &[ArrayRef]) -> Result<ArrayRef> {
865871
}
866872
}
867873
}
874+
875+
/// array_distinct SQL function
876+
/// example: from list [1, 3, 2, 3, 1, 2, 4] to [1, 2, 3, 4]
877+
pub fn array_distinct(args: &[ArrayRef]) -> Result<ArrayRef> {
878+
if args.len() != 1 {
879+
return exec_err!("array_distinct needs one argument");
880+
}
881+
882+
// handle null
883+
if args[0].data_type() == &DataType::Null {
884+
return Ok(args[0].clone());
885+
}
886+
887+
// handle for list & largelist
888+
match args[0].data_type() {
889+
DataType::List(field) => {
890+
let array = as_list_array(&args[0])?;
891+
general_array_distinct(array, field)
892+
}
893+
DataType::LargeList(field) => {
894+
let array = as_large_list_array(&args[0])?;
895+
general_array_distinct(array, field)
896+
}
897+
array_type => exec_err!("array_distinct does not support type '{array_type:?}'"),
898+
}
899+
}
900+
901+
pub fn general_array_distinct<OffsetSize: OffsetSizeTrait>(
902+
array: &GenericListArray<OffsetSize>,
903+
field: &FieldRef,
904+
) -> Result<ArrayRef> {
905+
let dt = array.value_type();
906+
let mut offsets = Vec::with_capacity(array.len());
907+
offsets.push(OffsetSize::usize_as(0));
908+
let mut new_arrays = Vec::with_capacity(array.len());
909+
let converter = RowConverter::new(vec![SortField::new(dt)])?;
910+
// distinct for each list in ListArray
911+
for arr in array.iter().flatten() {
912+
let values = converter.convert_columns(&[arr])?;
913+
// sort elements in list and remove duplicates
914+
let rows = values.iter().sorted().dedup().collect::<Vec<_>>();
915+
let last_offset: OffsetSize = offsets.last().copied().unwrap();
916+
offsets.push(last_offset + OffsetSize::usize_as(rows.len()));
917+
let arrays = converter.convert_rows(rows)?;
918+
let array = match arrays.first() {
919+
Some(array) => array.clone(),
920+
None => {
921+
return internal_err!("array_distinct: failed to get array from rows")
922+
}
923+
};
924+
new_arrays.push(array);
925+
}
926+
let offsets = OffsetBuffer::new(offsets.into());
927+
let new_arrays_ref = new_arrays.iter().map(|v| v.as_ref()).collect::<Vec<_>>();
928+
let values = compute::concat(&new_arrays_ref)?;
929+
Ok(Arc::new(GenericListArray::<OffsetSize>::try_new(
930+
field.clone(),
931+
offsets,
932+
values,
933+
None,
934+
)?))
935+
}

datafusion/functions-array/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ pub mod expr_fn {
5151
pub use super::concat::array_prepend;
5252
pub use super::make_array::make_array;
5353
pub use super::udf::array_dims;
54+
pub use super::udf::array_distinct;
5455
pub use super::udf::array_empty;
5556
pub use super::udf::array_length;
5657
pub use super::udf::array_ndims;
@@ -84,6 +85,7 @@ pub fn register_all(registry: &mut dyn FunctionRegistry) -> Result<()> {
8485
udf::array_length_udf(),
8586
udf::flatten_udf(),
8687
udf::array_sort_udf(),
88+
udf::array_distinct_udf(),
8789
];
8890
functions.into_iter().try_for_each(|udf| {
8991
let existing_udf = registry.register_udf(udf)?;

datafusion/functions-array/src/udf.rs

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -709,3 +709,67 @@ impl ScalarUDFImpl for Flatten {
709709
&self.aliases
710710
}
711711
}
712+
713+
make_udf_function!(
714+
ArrayDistinct,
715+
array_distinct,
716+
array,
717+
"return distinct values from the array after removing duplicates.",
718+
array_distinct_udf
719+
);
720+
721+
#[derive(Debug)]
722+
pub(super) struct ArrayDistinct {
723+
signature: Signature,
724+
aliases: Vec<String>,
725+
}
726+
727+
impl crate::udf::ArrayDistinct {
728+
pub fn new() -> Self {
729+
Self {
730+
signature: Signature::array(Volatility::Immutable),
731+
aliases: vec!["array_distinct".to_string(), "list_distinct".to_string()],
732+
}
733+
}
734+
}
735+
736+
impl ScalarUDFImpl for crate::udf::ArrayDistinct {
737+
fn as_any(&self) -> &dyn Any {
738+
self
739+
}
740+
fn name(&self) -> &str {
741+
"array_distinct"
742+
}
743+
744+
fn signature(&self) -> &Signature {
745+
&self.signature
746+
}
747+
748+
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
749+
use DataType::*;
750+
match &arg_types[0] {
751+
List(field) | FixedSizeList(field, _) => Ok(List(Arc::new(Field::new(
752+
"item",
753+
field.data_type().clone(),
754+
true,
755+
)))),
756+
LargeList(field) => Ok(LargeList(Arc::new(Field::new(
757+
"item",
758+
field.data_type().clone(),
759+
true,
760+
)))),
761+
_ => exec_err!(
762+
"Not reachable, data_type should be List, LargeList or FixedSizeList"
763+
),
764+
}
765+
}
766+
767+
fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
768+
let args = ColumnarValue::values_to_arrays(args)?;
769+
crate::kernels::array_distinct(&args).map(ColumnarValue::Array)
770+
}
771+
772+
fn aliases(&self) -> &[String] {
773+
&self.aliases
774+
}
775+
}

datafusion/physical-expr/src/array_expressions.rs

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1539,32 +1539,6 @@ pub fn general_array_distinct<OffsetSize: OffsetSizeTrait>(
15391539
)?))
15401540
}
15411541

1542-
/// array_distinct SQL function
1543-
/// example: from list [1, 3, 2, 3, 1, 2, 4] to [1, 2, 3, 4]
1544-
pub fn array_distinct(args: &[ArrayRef]) -> Result<ArrayRef> {
1545-
if args.len() != 1 {
1546-
return exec_err!("array_distinct needs one argument");
1547-
}
1548-
1549-
// handle null
1550-
if args[0].data_type() == &DataType::Null {
1551-
return Ok(args[0].clone());
1552-
}
1553-
1554-
// handle for list & largelist
1555-
match args[0].data_type() {
1556-
DataType::List(field) => {
1557-
let array = as_list_array(&args[0])?;
1558-
general_array_distinct(array, field)
1559-
}
1560-
DataType::LargeList(field) => {
1561-
let array = as_large_list_array(&args[0])?;
1562-
general_array_distinct(array, field)
1563-
}
1564-
array_type => exec_err!("array_distinct does not support type '{array_type:?}'"),
1565-
}
1566-
}
1567-
15681542
/// array_resize SQL function
15691543
pub fn array_resize(arg: &[ArrayRef]) -> Result<ArrayRef> {
15701544
if arg.len() < 2 || arg.len() > 3 {

datafusion/physical-expr/src/functions.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -302,9 +302,6 @@ pub fn create_physical_fun(
302302
}
303303

304304
// array functions
305-
BuiltinScalarFunction::ArrayDistinct => Arc::new(|args| {
306-
make_scalar_function_inner(array_expressions::array_distinct)(args)
307-
}),
308305
BuiltinScalarFunction::ArrayElement => Arc::new(|args| {
309306
make_scalar_function_inner(array_expressions::array_element)(args)
310307
}),

datafusion/proto/proto/datafusion.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -676,7 +676,7 @@ enum ScalarFunction {
676676
SubstrIndex = 126;
677677
FindInSet = 127;
678678
/// 128 was ArraySort
679-
ArrayDistinct = 129;
679+
/// 129 was ArrayDistinct
680680
ArrayResize = 130;
681681
EndsWith = 131;
682682
/// 132 was InStr

datafusion/proto/src/generated/pbjson.rs

Lines changed: 0 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/generated/prost.rs

Lines changed: 1 addition & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/logical_plan/from_proto.rs

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,12 @@ use datafusion_common::{
4747
use datafusion_expr::expr::Unnest;
4848
use datafusion_expr::window_frame::{check_window_frame, regularize_window_order_by};
4949
use datafusion_expr::{
50-
acosh, array_distinct, array_element, array_except, array_intersect, array_pop_back,
51-
array_pop_front, array_position, array_positions, array_remove, array_remove_all,
52-
array_remove_n, array_repeat, array_replace, array_replace_all, array_replace_n,
53-
array_resize, array_slice, array_union, ascii, asinh, atan, atan2, atanh, bit_length,
54-
btrim, cbrt, ceil, character_length, chr, coalesce, concat_expr, concat_ws_expr, cos,
55-
cosh, cot, current_date, current_time, degrees, digest, ends_with, exp,
50+
acosh, array_element, array_except, array_intersect, array_pop_back, array_pop_front,
51+
array_position, array_positions, array_remove, array_remove_all, array_remove_n,
52+
array_repeat, array_replace, array_replace_all, array_replace_n, array_resize,
53+
array_slice, array_union, ascii, asinh, atan, atan2, atanh, bit_length, btrim, cbrt,
54+
ceil, character_length, chr, coalesce, concat_expr, concat_ws_expr, cos, cosh, cot,
55+
current_date, current_time, degrees, digest, ends_with, exp,
5656
expr::{self, InList, Sort, WindowFunction},
5757
factorial, find_in_set, floor, from_unixtime, gcd, initcap, iszero, lcm, left,
5858
levenshtein, ln, log, log10, log2,
@@ -475,7 +475,6 @@ impl From<&protobuf::ScalarFunction> for BuiltinScalarFunction {
475475
ScalarFunction::Ltrim => Self::Ltrim,
476476
ScalarFunction::Rtrim => Self::Rtrim,
477477
ScalarFunction::ArrayExcept => Self::ArrayExcept,
478-
ScalarFunction::ArrayDistinct => Self::ArrayDistinct,
479478
ScalarFunction::ArrayElement => Self::ArrayElement,
480479
ScalarFunction::ArrayPopFront => Self::ArrayPopFront,
481480
ScalarFunction::ArrayPopBack => Self::ArrayPopBack,
@@ -1463,9 +1462,6 @@ pub fn parse_expr(
14631462
parse_expr(&args[2], registry, codec)?,
14641463
parse_expr(&args[3], registry, codec)?,
14651464
)),
1466-
ScalarFunction::ArrayDistinct => {
1467-
Ok(array_distinct(parse_expr(&args[0], registry, codec)?))
1468-
}
14691465
ScalarFunction::ArrayElement => Ok(array_element(
14701466
parse_expr(&args[0], registry, codec)?,
14711467
parse_expr(&args[1], registry, codec)?,

datafusion/proto/src/logical_plan/to_proto.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1457,7 +1457,6 @@ impl TryFrom<&BuiltinScalarFunction> for protobuf::ScalarFunction {
14571457
BuiltinScalarFunction::Rtrim => Self::Rtrim,
14581458
BuiltinScalarFunction::ToChar => Self::ToChar,
14591459
BuiltinScalarFunction::ArrayExcept => Self::ArrayExcept,
1460-
BuiltinScalarFunction::ArrayDistinct => Self::ArrayDistinct,
14611460
BuiltinScalarFunction::ArrayElement => Self::ArrayElement,
14621461
BuiltinScalarFunction::ArrayPopFront => Self::ArrayPopFront,
14631462
BuiltinScalarFunction::ArrayPopBack => Self::ArrayPopBack,

datafusion/proto/tests/cases/roundtrip_logical_plan.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -613,6 +613,7 @@ async fn roundtrip_expr_api() -> Result<()> {
613613
lit("desc"),
614614
lit("NULLS LAST"),
615615
),
616+
array_distinct(make_array(vec![lit(1), lit(3), lit(3), lit(2), lit(2)])),
616617
];
617618

618619
// ensure expressions created with the expr api can be round tripped

0 commit comments

Comments
 (0)