From f0c12b02853afb970ad8d34f71f1ec0752482b46 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 13 Feb 2024 12:12:10 -0500 Subject: [PATCH 1/7] Consolidate array_to_string --- datafusion/functions-array/src/lib.rs | 16 +++- datafusion/functions-array/src/macros.rs | 4 +- .../src/{kernels.rs => to_string.rs} | 59 ++++++++++++- datafusion/functions-array/src/udf.rs | 85 ------------------- 4 files changed, 71 insertions(+), 93 deletions(-) rename datafusion/functions-array/src/{kernels.rs => to_string.rs} (84%) delete mode 100644 datafusion/functions-array/src/udf.rs diff --git a/datafusion/functions-array/src/lib.rs b/datafusion/functions-array/src/lib.rs index 84997ed10e32..bca440b035c1 100644 --- a/datafusion/functions-array/src/lib.rs +++ b/datafusion/functions-array/src/lib.rs @@ -28,23 +28,31 @@ #[macro_use] pub mod macros; -mod kernels; -mod udf; +mod to_string; use datafusion_common::Result; use datafusion_execution::FunctionRegistry; use datafusion_expr::ScalarUDF; use log::debug; use std::sync::Arc; +use to_string::ArrayToString; + +// Create static instances of ScalarUDFs for each function +make_udf_function!(ArrayToString, + array_to_string, + array delimiter, // arg name + "converts each element to its text representation.", // doc + ®array_to_string_udf // internal function name +); /// Fluent-style API for creating `Expr`s pub mod expr_fn { - pub use super::udf::array_to_string; + pub use super::array_to_string; } /// Registers all enabled packages with a [`FunctionRegistry`] pub fn register_all(registry: &mut dyn FunctionRegistry) -> Result<()> { - let functions: Vec> = vec![udf::array_to_string_udf()]; + let functions: Vec> = vec![array_to_string_udf()]; functions.into_iter().try_for_each(|udf| { let existing_udf = registry.register_udf(udf)?; if let Some(existing_udf) = existing_udf { diff --git a/datafusion/functions-array/src/macros.rs b/datafusion/functions-array/src/macros.rs index c503fde05b18..aca307fcee49 100644 --- a/datafusion/functions-array/src/macros.rs +++ b/datafusion/functions-array/src/macros.rs @@ -49,8 +49,8 @@ macro_rules! make_udf_function { paste::paste! { // "fluent expr_fn" style function #[doc = $DOC] - pub fn $EXPR_FN($($arg: Expr),*) -> Expr { - Expr::ScalarFunction(ScalarFunction::new_udf( + pub fn $EXPR_FN($($arg: datafusion_expr::Expr),*) -> datafusion_expr::Expr { + datafusion_expr::Expr::ScalarFunction(datafusion_expr::expr::ScalarFunction::new_udf( $SCALAR_UDF_FN(), vec![$($arg),*], )) diff --git a/datafusion/functions-array/src/kernels.rs b/datafusion/functions-array/src/to_string.rs similarity index 84% rename from datafusion/functions-array/src/kernels.rs rename to datafusion/functions-array/src/to_string.rs index 1b96e01d8b9a..8c51f5c2d3d7 100644 --- a/datafusion/functions-array/src/kernels.rs +++ b/datafusion/functions-array/src/to_string.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! implementation kernels for array functions +//! implementation of array_to_string function use arrow::array::{ Array, ArrayRef, BooleanArray, Float32Array, Float64Array, GenericListArray, @@ -24,10 +24,65 @@ use arrow::array::{ }; use arrow::datatypes::DataType; use datafusion_common::cast::{as_large_list_array, as_list_array, as_string_array}; -use datafusion_common::{exec_err, DataFusionError}; +use datafusion_common::{exec_err, plan_err, DataFusionError}; use std::any::type_name; use std::sync::Arc; +use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility}; +use std::any::Any; + +#[derive(Debug)] +pub(super) struct ArrayToString { + signature: Signature, + aliases: Vec, +} + +impl ArrayToString { + pub fn new() -> Self { + Self { + signature: Signature::variadic_any(Volatility::Immutable), + aliases: vec![ + String::from("array_to_string"), + String::from("list_to_string"), + String::from("array_join"), + String::from("list_join"), + ], + } + } +} + +impl ScalarUDFImpl for ArrayToString { + fn as_any(&self) -> &dyn Any { + self + } + fn name(&self) -> &str { + "array_to_string" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, arg_types: &[DataType]) -> datafusion_common::Result { + use DataType::*; + Ok(match arg_types[0] { + List(_) | LargeList(_) | FixedSizeList(_, _) => Utf8, + _ => { + return plan_err!("The array_to_string function can only accept List/LargeList/FixedSizeList."); + } + }) + } + + fn invoke(&self, args: &[ColumnarValue]) -> datafusion_common::Result { + let args = ColumnarValue::values_to_arrays(args)?; + crate::to_string::array_to_string(&args).map(ColumnarValue::Array) + } + + fn aliases(&self) -> &[String] { + &self.aliases + } +} + macro_rules! downcast_arg { ($ARG:expr, $ARRAY_TYPE:ident) => {{ $ARG.as_any().downcast_ref::<$ARRAY_TYPE>().ok_or_else(|| { diff --git a/datafusion/functions-array/src/udf.rs b/datafusion/functions-array/src/udf.rs deleted file mode 100644 index b7f9d2497fb7..000000000000 --- a/datafusion/functions-array/src/udf.rs +++ /dev/null @@ -1,85 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! [`ScalarUDFImpl`] definitions for array functions. - -use arrow::datatypes::DataType; -use datafusion_common::{plan_err, DataFusionError}; -use datafusion_expr::expr::ScalarFunction; -use datafusion_expr::Expr; -use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility}; -use std::any::Any; - -// Create static instances of ScalarUDFs for each function -make_udf_function!(ArrayToString, - array_to_string, - array delimiter, // arg name - "converts each element to its text representation.", // doc - array_to_string_udf // internal function name -); - -#[derive(Debug)] -pub(super) struct ArrayToString { - signature: Signature, - aliases: Vec, -} - -impl ArrayToString { - pub fn new() -> Self { - Self { - signature: Signature::variadic_any(Volatility::Immutable), - aliases: vec![ - String::from("array_to_string"), - String::from("list_to_string"), - String::from("array_join"), - String::from("list_join"), - ], - } - } -} - -impl ScalarUDFImpl for ArrayToString { - fn as_any(&self) -> &dyn Any { - self - } - fn name(&self) -> &str { - "array_to_string" - } - - fn signature(&self) -> &Signature { - &self.signature - } - - fn return_type(&self, arg_types: &[DataType]) -> datafusion_common::Result { - use DataType::*; - Ok(match arg_types[0] { - List(_) | LargeList(_) | FixedSizeList(_, _) => Utf8, - _ => { - return plan_err!("The array_to_string function can only accept List/LargeList/FixedSizeList."); - } - }) - } - - fn invoke(&self, args: &[ColumnarValue]) -> datafusion_common::Result { - let args = ColumnarValue::values_to_arrays(args)?; - crate::kernels::array_to_string(&args).map(ColumnarValue::Array) - } - - fn aliases(&self) -> &[String] { - &self.aliases - } -} From df0f2aefca3ae664ad3847e140200a325761d066 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 13 Feb 2024 14:31:25 -0500 Subject: [PATCH 2/7] make_array --- datafusion/functions-array/src/lib.rs | 15 +- datafusion/functions-array/src/make_array.rs | 205 +++++++++++ datafusion/functions-array/src/set_ops.rs | 90 +++++ datafusion/functions-array/src/to_string.rs | 347 +++++++++--------- .../physical-expr/src/array_expressions.rs | 114 ------ 5 files changed, 475 insertions(+), 296 deletions(-) create mode 100644 datafusion/functions-array/src/make_array.rs create mode 100644 datafusion/functions-array/src/set_ops.rs diff --git a/datafusion/functions-array/src/lib.rs b/datafusion/functions-array/src/lib.rs index bca440b035c1..38157931bf9f 100644 --- a/datafusion/functions-array/src/lib.rs +++ b/datafusion/functions-array/src/lib.rs @@ -28,31 +28,24 @@ #[macro_use] pub mod macros; +mod make_array; mod to_string; +mod set_ops; use datafusion_common::Result; use datafusion_execution::FunctionRegistry; use datafusion_expr::ScalarUDF; use log::debug; use std::sync::Arc; -use to_string::ArrayToString; - -// Create static instances of ScalarUDFs for each function -make_udf_function!(ArrayToString, - array_to_string, - array delimiter, // arg name - "converts each element to its text representation.", // doc - ®array_to_string_udf // internal function name -); /// Fluent-style API for creating `Expr`s pub mod expr_fn { - pub use super::array_to_string; + pub use super::{make_array::make_array, to_string::array_to_string}; } /// Registers all enabled packages with a [`FunctionRegistry`] pub fn register_all(registry: &mut dyn FunctionRegistry) -> Result<()> { - let functions: Vec> = vec![array_to_string_udf()]; + let functions: Vec> = vec![to_string::udf(), make_array::udf()]; functions.into_iter().try_for_each(|udf| { let existing_udf = registry.register_udf(udf)?; if let Some(existing_udf) = existing_udf { diff --git a/datafusion/functions-array/src/make_array.rs b/datafusion/functions-array/src/make_array.rs new file mode 100644 index 000000000000..6da1e738a7a1 --- /dev/null +++ b/datafusion/functions-array/src/make_array.rs @@ -0,0 +1,205 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! implementation of make_array function + +use arrow::datatypes::{DataType, Field}; +use arrow::datatypes::DataType::{FixedSizeList, LargeList, List, Utf8}; +use datafusion_common::{plan_err, DataFusionError, Result}; +use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility}; +use std::any::Any; +use std::sync::Arc; +use arrow::array::{Array, ArrayData, ArrayRef, Capacities, GenericListArray, MutableArrayData, new_null_array, NullArray, OffsetSizeTrait}; +use arrow::buffer::OffsetBuffer; +use datafusion_common::utils::array_into_list_array; + +// Create static instances of ScalarUDFs for each function +make_udf_function!( + MakeArray, make_array, xx, // arg name + "yyy", // The name of the function to create the ScalarUDF + udf +); + +#[derive(Debug)] +pub(super) struct MakeArray { + signature: Signature, + aliases: Vec, +} + +impl MakeArray { + pub fn new() -> Self { + Self { + signature: Signature::variadic_any(Volatility::Immutable), + aliases: vec![ + String::from("array_to_string"), + String::from("list_to_string"), + String::from("array_join"), + String::from("list_join"), + ], + } + } +} + +impl ScalarUDFImpl for MakeArray { + fn as_any(&self) -> &dyn Any { + self + } + fn name(&self) -> &str { + "array_to_string" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, arg_types: &[DataType]) -> Result { + use DataType::*; + Ok(match arg_types[0] { + List(_) | LargeList(_) | FixedSizeList(_, _) => Utf8, + _ => { + return plan_err!("The array_to_string function can only accept List/LargeList/FixedSizeList."); + } + }) + } + + fn invoke(&self, args: &[ColumnarValue]) -> Result { + Ok(make_array_inner(&ColumnarValue::values_to_arrays(args)?) + .map(ColumnarValue::Array)?) + } + + fn aliases(&self) -> &[String] { + &self.aliases + } +} + + +/// `make_array` SQL function +fn make_array_inner(arrays: &[ArrayRef]) -> Result { + let mut data_type = DataType::Null; + for arg in arrays { + let arg_data_type = arg.data_type(); + if !arg_data_type.equals_datatype(&DataType::Null) { + data_type = arg_data_type.clone(); + break; + } + } + + match data_type { + // Either an empty array or all nulls: + DataType::Null => { + let array = + new_null_array(&DataType::Null, arrays.iter().map(|a| a.len()).sum()); + Ok(Arc::new(array_into_list_array(array))) + } + DataType::LargeList(..) => array_array::(arrays, data_type), + _ => array_array::(arrays, data_type), + } +} + + + +/// Convert one or more [`ArrayRef`] of the same type into a +/// `ListArray` or 'LargeListArray' depending on the offset size. +/// +/// # Example (non nested) +/// +/// Calling `array(col1, col2)` where col1 and col2 are non nested +/// would return a single new `ListArray`, where each row was a list +/// of 2 elements: +/// +/// ```text +/// ┌─────────┐ ┌─────────┐ ┌──────────────┐ +/// │ ┌─────┐ │ │ ┌─────┐ │ │ ┌──────────┐ │ +/// │ │ A │ │ │ │ X │ │ │ │ [A, X] │ │ +/// │ ├─────┤ │ │ ├─────┤ │ │ ├──────────┤ │ +/// │ │NULL │ │ │ │ Y │ │──────────▶│ │[NULL, Y] │ │ +/// │ ├─────┤ │ │ ├─────┤ │ │ ├──────────┤ │ +/// │ │ C │ │ │ │ Z │ │ │ │ [C, Z] │ │ +/// │ └─────┘ │ │ └─────┘ │ │ └──────────┘ │ +/// └─────────┘ └─────────┘ └──────────────┘ +/// col1 col2 output +/// ``` +/// +/// # Example (nested) +/// +/// Calling `array(col1, col2)` where col1 and col2 are lists +/// would return a single new `ListArray`, where each row was a list +/// of the corresponding elements of col1 and col2. +/// +/// ``` text +/// ┌──────────────┐ ┌──────────────┐ ┌─────────────────────────────┐ +/// │ ┌──────────┐ │ │ ┌──────────┐ │ │ ┌────────────────────────┐ │ +/// │ │ [A, X] │ │ │ │ [] │ │ │ │ [[A, X], []] │ │ +/// │ ├──────────┤ │ │ ├──────────┤ │ │ ├────────────────────────┤ │ +/// │ │[NULL, Y] │ │ │ │[Q, R, S] │ │───────▶│ │ [[NULL, Y], [Q, R, S]] │ │ +/// │ ├──────────┤ │ │ ├──────────┤ │ │ ├────────────────────────│ │ +/// │ │ [C, Z] │ │ │ │ NULL │ │ │ │ [[C, Z], NULL] │ │ +/// │ └──────────┘ │ │ └──────────┘ │ │ └────────────────────────┘ │ +/// └──────────────┘ └──────────────┘ └─────────────────────────────┘ +/// col1 col2 output +/// ``` +fn array_array( + args: &[ArrayRef], + data_type: DataType, +) -> Result { + // do not accept 0 arguments. + if args.is_empty() { + return plan_err!("Array requires at least one argument"); + } + + let mut data = vec![]; + let mut total_len = 0; + for arg in args { + let arg_data = if arg.as_any().is::() { + ArrayData::new_empty(&data_type) + } else { + arg.to_data() + }; + total_len += arg_data.len(); + data.push(arg_data); + } + + let mut offsets: Vec = Vec::with_capacity(total_len); + offsets.push(O::usize_as(0)); + + let capacity = Capacities::Array(total_len); + let data_ref = data.iter().collect::>(); + let mut mutable = MutableArrayData::with_capacities(data_ref, true, capacity); + + let num_rows = args[0].len(); + for row_idx in 0..num_rows { + for (arr_idx, arg) in args.iter().enumerate() { + if !arg.as_any().is::() + && !arg.is_null(row_idx) + && arg.is_valid(row_idx) + { + mutable.extend(arr_idx, row_idx, row_idx + 1); + } else { + mutable.extend_nulls(1); + } + } + offsets.push(O::usize_as(mutable.len())); + } + let data = mutable.freeze(); + + Ok(Arc::new(GenericListArray::::try_new( + Arc::new(Field::new("item", data_type, true)), + OffsetBuffer::new(offsets.into()), + arrow::array::make_array(data), + None, + )?)) +} diff --git a/datafusion/functions-array/src/set_ops.rs b/datafusion/functions-array/src/set_ops.rs new file mode 100644 index 000000000000..ffa66c9b820d --- /dev/null +++ b/datafusion/functions-array/src/set_ops.rs @@ -0,0 +1,90 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! implementation of make_array function + +use arrow::datatypes::{DataType, Field}; +use arrow::datatypes::DataType::{FixedSizeList, LargeList, List, Utf8}; +use datafusion_common::{plan_err, DataFusionError, Result}; +use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility}; +use std::any::Any; +use std::sync::Arc; +use arrow::array::{Array, ArrayData, ArrayRef, Capacities, GenericListArray, MutableArrayData, new_null_array, NullArray, OffsetSizeTrait}; +use arrow::buffer::OffsetBuffer; +use datafusion_common::utils::array_into_list_array; + +// Create static instances of ScalarUDFs for each function +make_udf_function!( + MakeArray, make_array, xx, // arg name + "yyy", // The name of the function to create the ScalarUDF + udf +); + +#[derive(Debug)] +pub(super) struct MakeArray { + signature: Signature, + aliases: Vec, +} + +impl MakeArray { + pub fn new() -> Self { + Self { + signature: Signature::variadic_any(Volatility::Immutable), + aliases: vec![ + String::from("array_to_string"), + String::from("list_to_string"), + String::from("array_join"), + String::from("list_join"), + ], + } + } +} + +impl ScalarUDFImpl for MakeArray { + fn as_any(&self) -> &dyn Any { + self + } + fn name(&self) -> &str { + "array_to_string" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, arg_types: &[DataType]) -> Result { + use DataType::*; + Ok(match arg_types[0] { + List(_) | LargeList(_) | FixedSizeList(_, _) => Utf8, + _ => { + return plan_err!("The array_to_string function can only accept List/LargeList/FixedSizeList."); + } + }) + } + + fn invoke(&self, args: &[ColumnarValue]) -> Result { + Ok(make_array_inner(&ColumnarValue::values_to_arrays(args)?) + .map(ColumnarValue::Array)?) + } + + fn aliases(&self) -> &[String] { + &self.aliases + } +} + + + diff --git a/datafusion/functions-array/src/to_string.rs b/datafusion/functions-array/src/to_string.rs index 8c51f5c2d3d7..75a7356fe3b8 100644 --- a/datafusion/functions-array/src/to_string.rs +++ b/datafusion/functions-array/src/to_string.rs @@ -31,57 +31,14 @@ use std::sync::Arc; use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility}; use std::any::Any; -#[derive(Debug)] -pub(super) struct ArrayToString { - signature: Signature, - aliases: Vec, -} - -impl ArrayToString { - pub fn new() -> Self { - Self { - signature: Signature::variadic_any(Volatility::Immutable), - aliases: vec![ - String::from("array_to_string"), - String::from("list_to_string"), - String::from("array_join"), - String::from("list_join"), - ], - } - } -} - -impl ScalarUDFImpl for ArrayToString { - fn as_any(&self) -> &dyn Any { - self - } - fn name(&self) -> &str { - "array_to_string" - } - - fn signature(&self) -> &Signature { - &self.signature - } - - fn return_type(&self, arg_types: &[DataType]) -> datafusion_common::Result { - use DataType::*; - Ok(match arg_types[0] { - List(_) | LargeList(_) | FixedSizeList(_, _) => Utf8, - _ => { - return plan_err!("The array_to_string function can only accept List/LargeList/FixedSizeList."); - } - }) - } - - fn invoke(&self, args: &[ColumnarValue]) -> datafusion_common::Result { - let args = ColumnarValue::values_to_arrays(args)?; - crate::to_string::array_to_string(&args).map(ColumnarValue::Array) - } - - fn aliases(&self) -> &[String] { - &self.aliases - } -} +// Create static instances of ScalarUDFs for each function +make_udf_function!(ArrayToString, + array_to_string, + array delimiter, // arg name + "converts each element to its text representation.", // doc + // name of the function to create (just) the `ScalarUDF` + udf +); macro_rules! downcast_arg { ($ARG:expr, $ARRAY_TYPE:ident) => {{ @@ -155,155 +112,203 @@ macro_rules! call_array_function { }}; } -/// Array_to_string SQL function -pub(super) fn array_to_string(args: &[ArrayRef]) -> datafusion_common::Result { - if args.len() < 2 || args.len() > 3 { - return exec_err!("array_to_string expects two or three arguments"); +#[derive(Debug)] +pub(super) struct ArrayToString { + signature: Signature, + aliases: Vec, +} + +impl ArrayToString { + pub fn new() -> Self { + Self { + signature: Signature::variadic_any(Volatility::Immutable), + aliases: vec![ + String::from("array_to_string"), + String::from("list_to_string"), + String::from("array_join"), + String::from("list_join"), + ], + } } +} - let arr = &args[0]; +impl ScalarUDFImpl for ArrayToString { + fn as_any(&self) -> &dyn Any { + self + } + fn name(&self) -> &str { + "array_to_string" + } - let delimiters = as_string_array(&args[1])?; - let delimiters: Vec> = delimiters.iter().collect(); + fn signature(&self) -> &Signature { + &self.signature + } - let mut null_string = String::from(""); - let mut with_null_string = false; - if args.len() == 3 { - null_string = as_string_array(&args[2])?.value(0).to_string(); - with_null_string = true; + fn return_type(&self, arg_types: &[DataType]) -> datafusion_common::Result { + use DataType::*; + Ok(match arg_types[0] { + List(_) | LargeList(_) | FixedSizeList(_, _) => Utf8, + _ => { + return plan_err!("The array_to_string function can only accept List/LargeList/FixedSizeList."); + } + }) } - fn compute_array_to_string( - arg: &mut String, - arr: ArrayRef, - delimiter: String, - null_string: String, - with_null_string: bool, - ) -> datafusion_common::Result<&mut String> { - match arr.data_type() { - DataType::List(..) => { - let list_array = as_list_array(&arr)?; - for i in 0..list_array.len() { - compute_array_to_string( - arg, - list_array.value(i), - delimiter.clone(), - null_string.clone(), - with_null_string, - )?; + fn invoke(&self, args: &[ColumnarValue]) -> datafusion_common::Result { + let args = ColumnarValue::values_to_arrays(args)?; + + if args.len() < 2 || args.len() > 3 { + return exec_err!("array_to_string expects two or three arguments"); + } + + let arr = &args[0]; + + let delimiters = as_string_array(&args[1])?; + let delimiters: Vec> = delimiters.iter().collect(); + + let mut null_string = String::from(""); + let mut with_null_string = false; + if args.len() == 3 { + null_string = as_string_array(&args[2])?.value(0).to_string(); + with_null_string = true; + } + + fn compute_array_to_string( + arg: &mut String, + arr: ArrayRef, + delimiter: String, + null_string: String, + with_null_string: bool, + ) -> datafusion_common::Result<&mut String> { + match arr.data_type() { + DataType::List(..) => { + let list_array = as_list_array(&arr)?; + for i in 0..list_array.len() { + compute_array_to_string( + arg, + list_array.value(i), + delimiter.clone(), + null_string.clone(), + with_null_string, + )?; + } + + Ok(arg) } + DataType::LargeList(..) => { + let list_array = as_large_list_array(&arr)?; + for i in 0..list_array.len() { + compute_array_to_string( + arg, + list_array.value(i), + delimiter.clone(), + null_string.clone(), + with_null_string, + )?; + } - Ok(arg) + Ok(arg) + } + DataType::Null => Ok(arg), + data_type => { + macro_rules! array_function { + ($ARRAY_TYPE:ident) => { + to_string!( + arg, + arr, + &delimiter, + &null_string, + with_null_string, + $ARRAY_TYPE + ) + }; + } + call_array_function!(data_type, false) + } } - DataType::LargeList(..) => { - let list_array = as_large_list_array(&arr)?; - for i in 0..list_array.len() { - compute_array_to_string( - arg, - list_array.value(i), - delimiter.clone(), + } + + fn generate_string_array( + list_arr: &GenericListArray, + delimiters: Vec>, + null_string: String, + with_null_string: bool, + ) -> datafusion_common::Result { + let mut res: Vec> = Vec::new(); + for (arr, &delimiter) in list_arr.iter().zip(delimiters.iter()) { + if let (Some(arr), Some(delimiter)) = (arr, delimiter) { + let mut arg = String::from(""); + let s = compute_array_to_string( + &mut arg, + arr, + delimiter.to_string(), null_string.clone(), with_null_string, - )?; - } + )? + .clone(); - Ok(arg) - } - DataType::Null => Ok(arg), - data_type => { - macro_rules! array_function { - ($ARRAY_TYPE:ident) => { - to_string!( - arg, - arr, - &delimiter, - &null_string, - with_null_string, - $ARRAY_TYPE - ) - }; + if let Some(s) = s.strip_suffix(delimiter) { + res.push(Some(s.to_string())); + } else { + res.push(Some(s)); + } + } else { + res.push(None); } - call_array_function!(data_type, false) } + + Ok(StringArray::from(res)) } - } - fn generate_string_array( - list_arr: &GenericListArray, - delimiters: Vec>, - null_string: String, - with_null_string: bool, - ) -> datafusion_common::Result { - let mut res: Vec> = Vec::new(); - for (arr, &delimiter) in list_arr.iter().zip(delimiters.iter()) { - if let (Some(arr), Some(delimiter)) = (arr, delimiter) { + let arr_type = arr.data_type(); + let string_arr = match arr_type { + DataType::List(_) | DataType::FixedSizeList(_, _) => { + let list_array = as_list_array(&arr)?; + generate_string_array::( + list_array, + delimiters, + null_string, + with_null_string, + )? + } + DataType::LargeList(_) => { + let list_array = as_large_list_array(&arr)?; + generate_string_array::( + list_array, + delimiters, + null_string, + with_null_string, + )? + } + _ => { let mut arg = String::from(""); + let mut res: Vec> = Vec::new(); + // delimiter length is 1 + assert_eq!(delimiters.len(), 1); + let delimiter = delimiters[0].unwrap(); let s = compute_array_to_string( &mut arg, - arr, + arr.clone(), delimiter.to_string(), - null_string.clone(), + null_string, with_null_string, )? .clone(); - if let Some(s) = s.strip_suffix(delimiter) { - res.push(Some(s.to_string())); + if !s.is_empty() { + let s = s.strip_suffix(delimiter).unwrap().to_string(); + res.push(Some(s)); } else { res.push(Some(s)); } - } else { - res.push(None); + StringArray::from(res) } - } + }; - Ok(StringArray::from(res)) + Ok(ColumnarValue::Array(Arc::new(string_arr))) } - let arr_type = arr.data_type(); - let string_arr = match arr_type { - DataType::List(_) | DataType::FixedSizeList(_, _) => { - let list_array = as_list_array(&arr)?; - generate_string_array::( - list_array, - delimiters, - null_string, - with_null_string, - )? - } - DataType::LargeList(_) => { - let list_array = as_large_list_array(&arr)?; - generate_string_array::( - list_array, - delimiters, - null_string, - with_null_string, - )? - } - _ => { - let mut arg = String::from(""); - let mut res: Vec> = Vec::new(); - // delimiter length is 1 - assert_eq!(delimiters.len(), 1); - let delimiter = delimiters[0].unwrap(); - let s = compute_array_to_string( - &mut arg, - arr.clone(), - delimiter.to_string(), - null_string, - with_null_string, - )? - .clone(); - - if !s.is_empty() { - let s = s.strip_suffix(delimiter).unwrap().to_string(); - res.push(Some(s)); - } else { - res.push(Some(s)); - } - StringArray::from(res) - } - }; - - Ok(Arc::new(string_arr)) + fn aliases(&self) -> &[String] { + &self.aliases + } } diff --git a/datafusion/physical-expr/src/array_expressions.rs b/datafusion/physical-expr/src/array_expressions.rs index 0468b3e54294..71657eb29cb1 100644 --- a/datafusion/physical-expr/src/array_expressions.rs +++ b/datafusion/physical-expr/src/array_expressions.rs @@ -229,120 +229,6 @@ fn check_datatypes(name: &str, args: &[&ArrayRef]) -> Result<()> { Ok(()) } -/// Convert one or more [`ArrayRef`] of the same type into a -/// `ListArray` or 'LargeListArray' depending on the offset size. -/// -/// # Example (non nested) -/// -/// Calling `array(col1, col2)` where col1 and col2 are non nested -/// would return a single new `ListArray`, where each row was a list -/// of 2 elements: -/// -/// ```text -/// ┌─────────┐ ┌─────────┐ ┌──────────────┐ -/// │ ┌─────┐ │ │ ┌─────┐ │ │ ┌──────────┐ │ -/// │ │ A │ │ │ │ X │ │ │ │ [A, X] │ │ -/// │ ├─────┤ │ │ ├─────┤ │ │ ├──────────┤ │ -/// │ │NULL │ │ │ │ Y │ │──────────▶│ │[NULL, Y] │ │ -/// │ ├─────┤ │ │ ├─────┤ │ │ ├──────────┤ │ -/// │ │ C │ │ │ │ Z │ │ │ │ [C, Z] │ │ -/// │ └─────┘ │ │ └─────┘ │ │ └──────────┘ │ -/// └─────────┘ └─────────┘ └──────────────┘ -/// col1 col2 output -/// ``` -/// -/// # Example (nested) -/// -/// Calling `array(col1, col2)` where col1 and col2 are lists -/// would return a single new `ListArray`, where each row was a list -/// of the corresponding elements of col1 and col2. -/// -/// ``` text -/// ┌──────────────┐ ┌──────────────┐ ┌─────────────────────────────┐ -/// │ ┌──────────┐ │ │ ┌──────────┐ │ │ ┌────────────────────────┐ │ -/// │ │ [A, X] │ │ │ │ [] │ │ │ │ [[A, X], []] │ │ -/// │ ├──────────┤ │ │ ├──────────┤ │ │ ├────────────────────────┤ │ -/// │ │[NULL, Y] │ │ │ │[Q, R, S] │ │───────▶│ │ [[NULL, Y], [Q, R, S]] │ │ -/// │ ├──────────┤ │ │ ├──────────┤ │ │ ├────────────────────────│ │ -/// │ │ [C, Z] │ │ │ │ NULL │ │ │ │ [[C, Z], NULL] │ │ -/// │ └──────────┘ │ │ └──────────┘ │ │ └────────────────────────┘ │ -/// └──────────────┘ └──────────────┘ └─────────────────────────────┘ -/// col1 col2 output -/// ``` -fn array_array( - args: &[ArrayRef], - data_type: DataType, -) -> Result { - // do not accept 0 arguments. - if args.is_empty() { - return plan_err!("Array requires at least one argument"); - } - - let mut data = vec![]; - let mut total_len = 0; - for arg in args { - let arg_data = if arg.as_any().is::() { - ArrayData::new_empty(&data_type) - } else { - arg.to_data() - }; - total_len += arg_data.len(); - data.push(arg_data); - } - - let mut offsets: Vec = Vec::with_capacity(total_len); - offsets.push(O::usize_as(0)); - - let capacity = Capacities::Array(total_len); - let data_ref = data.iter().collect::>(); - let mut mutable = MutableArrayData::with_capacities(data_ref, true, capacity); - - let num_rows = args[0].len(); - for row_idx in 0..num_rows { - for (arr_idx, arg) in args.iter().enumerate() { - if !arg.as_any().is::() - && !arg.is_null(row_idx) - && arg.is_valid(row_idx) - { - mutable.extend(arr_idx, row_idx, row_idx + 1); - } else { - mutable.extend_nulls(1); - } - } - offsets.push(O::usize_as(mutable.len())); - } - let data = mutable.freeze(); - - Ok(Arc::new(GenericListArray::::try_new( - Arc::new(Field::new("item", data_type, true)), - OffsetBuffer::new(offsets.into()), - arrow_array::make_array(data), - None, - )?)) -} - -/// `make_array` SQL function -pub fn make_array(arrays: &[ArrayRef]) -> Result { - let mut data_type = DataType::Null; - for arg in arrays { - let arg_data_type = arg.data_type(); - if !arg_data_type.equals_datatype(&DataType::Null) { - data_type = arg_data_type.clone(); - break; - } - } - - match data_type { - // Either an empty array or all nulls: - DataType::Null => { - let array = - new_null_array(&DataType::Null, arrays.iter().map(|a| a.len()).sum()); - Ok(Arc::new(array_into_list_array(array))) - } - DataType::LargeList(..) => array_array::(arrays, data_type), - _ => array_array::(arrays, data_type), - } -} fn general_array_element( array: &GenericListArray, From ae74afb1e51f2905f7e70556394b79329a0f9fd2 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 13 Feb 2024 14:38:56 -0500 Subject: [PATCH 3/7] basic scaffolding for union/intersect --- datafusion/expr/src/built_in_function.rs | 22 ----- datafusion/expr/src/expr_fn.rs | 5 -- datafusion/functions-array/src/lib.rs | 15 +++- datafusion/functions-array/src/make_array.rs | 50 ++++++----- datafusion/functions-array/src/set_ops.rs | 85 +++++++++++++++---- .../physical-expr/src/array_expressions.rs | 1 - 6 files changed, 110 insertions(+), 68 deletions(-) diff --git a/datafusion/expr/src/built_in_function.rs b/datafusion/expr/src/built_in_function.rs index e4a9471f07a8..a05fad8f4c70 100644 --- a/datafusion/expr/src/built_in_function.rs +++ b/datafusion/expr/src/built_in_function.rs @@ -185,8 +185,6 @@ pub enum BuiltinScalarFunction { Cardinality, /// array_resize ArrayResize, - /// construct an array from columns - MakeArray, /// Flatten Flatten, /// Range @@ -439,7 +437,6 @@ impl BuiltinScalarFunction { BuiltinScalarFunction::ArrayResize => Volatility::Immutable, BuiltinScalarFunction::Range => Volatility::Immutable, BuiltinScalarFunction::Cardinality => Volatility::Immutable, - BuiltinScalarFunction::MakeArray => Volatility::Immutable, BuiltinScalarFunction::Ascii => Volatility::Immutable, BuiltinScalarFunction::BitLength => Volatility::Immutable, BuiltinScalarFunction::Btrim => Volatility::Immutable, @@ -661,20 +658,6 @@ impl BuiltinScalarFunction { } } BuiltinScalarFunction::Cardinality => Ok(UInt64), - BuiltinScalarFunction::MakeArray => match input_expr_types.len() { - 0 => Ok(List(Arc::new(Field::new("item", Null, true)))), - _ => { - let mut expr_type = Null; - for input_expr_type in input_expr_types { - if !input_expr_type.equals_datatype(&Null) { - expr_type = input_expr_type.clone(); - break; - } - } - - Ok(List(Arc::new(Field::new("item", expr_type, true)))) - } - }, BuiltinScalarFunction::Ascii => Ok(Int32), BuiltinScalarFunction::BitLength => { utf8_to_int_type(&input_expr_types[0], "bit_length") @@ -937,10 +920,6 @@ impl BuiltinScalarFunction { BuiltinScalarFunction::ArrayAppend => { Signature::array_and_element(self.volatility()) } - BuiltinScalarFunction::MakeArray => { - // 0 or more arguments of arbitrary type - Signature::one_of(vec![VariadicEqual, Any(0)], self.volatility()) - } BuiltinScalarFunction::ArrayPopFront => Signature::any(1, self.volatility()), BuiltinScalarFunction::ArrayPopBack => Signature::any(1, self.volatility()), BuiltinScalarFunction::ArrayConcat => { @@ -1641,7 +1620,6 @@ impl BuiltinScalarFunction { BuiltinScalarFunction::ArrayUnion => &["array_union", "list_union"], BuiltinScalarFunction::Cardinality => &["cardinality"], BuiltinScalarFunction::ArrayResize => &["array_resize", "list_resize"], - BuiltinScalarFunction::MakeArray => &["make_array", "make_list"], BuiltinScalarFunction::ArrayIntersect => { &["array_intersect", "list_intersect"] } diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs index ad0eae898534..0414d024ef9b 100644 --- a/datafusion/expr/src/expr_fn.rs +++ b/datafusion/expr/src/expr_fn.rs @@ -756,11 +756,6 @@ scalar_expr!( "returns an array with the specified size filled with the given value." ); -nary_scalar_expr!( - MakeArray, - array, - "returns an Arrow array using the specified input expressions." -); scalar_expr!( ArrayIntersect, array_intersect, diff --git a/datafusion/functions-array/src/lib.rs b/datafusion/functions-array/src/lib.rs index 38157931bf9f..ec56ec1d91c8 100644 --- a/datafusion/functions-array/src/lib.rs +++ b/datafusion/functions-array/src/lib.rs @@ -29,8 +29,8 @@ pub mod macros; mod make_array; -mod to_string; mod set_ops; +mod to_string; use datafusion_common::Result; use datafusion_execution::FunctionRegistry; @@ -40,12 +40,21 @@ use std::sync::Arc; /// Fluent-style API for creating `Expr`s pub mod expr_fn { - pub use super::{make_array::make_array, to_string::array_to_string}; + pub use super::{ + make_array::make_array, + set_ops::{array_intersect, array_union}, + to_string::array_to_string, + }; } /// Registers all enabled packages with a [`FunctionRegistry`] pub fn register_all(registry: &mut dyn FunctionRegistry) -> Result<()> { - let functions: Vec> = vec![to_string::udf(), make_array::udf()]; + let functions: Vec> = vec![ + to_string::udf(), + make_array::udf(), + set_ops::union_udf(), + set_ops::intersect_udf(), + ]; functions.into_iter().try_for_each(|udf| { let existing_udf = registry.register_udf(udf)?; if let Some(existing_udf) = existing_udf { diff --git a/datafusion/functions-array/src/make_array.rs b/datafusion/functions-array/src/make_array.rs index 6da1e738a7a1..f9fcd343fdf4 100644 --- a/datafusion/functions-array/src/make_array.rs +++ b/datafusion/functions-array/src/make_array.rs @@ -17,20 +17,28 @@ //! implementation of make_array function -use arrow::datatypes::{DataType, Field}; +use arrow::array::{ + new_null_array, Array, ArrayData, ArrayRef, Capacities, GenericListArray, + MutableArrayData, NullArray, OffsetSizeTrait, +}; +use arrow::buffer::OffsetBuffer; use arrow::datatypes::DataType::{FixedSizeList, LargeList, List, Utf8}; +use arrow::datatypes::{DataType, Field}; +use datafusion_common::utils::array_into_list_array; use datafusion_common::{plan_err, DataFusionError, Result}; -use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility}; +use datafusion_expr::{ + BuiltinScalarFunction, ColumnarValue, ScalarUDFImpl, Signature, TypeSignature, + Volatility, +}; use std::any::Any; use std::sync::Arc; -use arrow::array::{Array, ArrayData, ArrayRef, Capacities, GenericListArray, MutableArrayData, new_null_array, NullArray, OffsetSizeTrait}; -use arrow::buffer::OffsetBuffer; -use datafusion_common::utils::array_into_list_array; // Create static instances of ScalarUDFs for each function make_udf_function!( - MakeArray, make_array, xx, // arg name - "yyy", // The name of the function to create the ScalarUDF + MakeArray, + make_array, + array, + "returns an Arrow array using the specified input expressions.", udf ); @@ -43,12 +51,11 @@ pub(super) struct MakeArray { impl MakeArray { pub fn new() -> Self { Self { - signature: Signature::variadic_any(Volatility::Immutable), + signature: // 0 or more arguments of arbitrary type + Signature::one_of(vec![TypeSignature::VariadicEqual, TypeSignature::Any(0)], + Volatility::Immutable), aliases: vec![ - String::from("array_to_string"), - String::from("list_to_string"), - String::from("array_join"), - String::from("list_join"), + "make_list".to_string(), ], } } @@ -68,12 +75,20 @@ impl ScalarUDFImpl for MakeArray { fn return_type(&self, arg_types: &[DataType]) -> Result { use DataType::*; - Ok(match arg_types[0] { - List(_) | LargeList(_) | FixedSizeList(_, _) => Utf8, + match arg_types.len() { + 0 => Ok(List(Arc::new(Field::new("item", Null, true)))), _ => { - return plan_err!("The array_to_string function can only accept List/LargeList/FixedSizeList."); + let mut expr_type = Null; + for input_expr_type in arg_types { + if !input_expr_type.equals_datatype(&Null) { + expr_type = input_expr_type.clone(); + break; + } + } + + Ok(List(Arc::new(Field::new("item", expr_type, true)))) } - }) + } } fn invoke(&self, args: &[ColumnarValue]) -> Result { @@ -86,7 +101,6 @@ impl ScalarUDFImpl for MakeArray { } } - /// `make_array` SQL function fn make_array_inner(arrays: &[ArrayRef]) -> Result { let mut data_type = DataType::Null; @@ -110,8 +124,6 @@ fn make_array_inner(arrays: &[ArrayRef]) -> Result { } } - - /// Convert one or more [`ArrayRef`] of the same type into a /// `ListArray` or 'LargeListArray' depending on the offset size. /// diff --git a/datafusion/functions-array/src/set_ops.rs b/datafusion/functions-array/src/set_ops.rs index ffa66c9b820d..f8a9d58689d5 100644 --- a/datafusion/functions-array/src/set_ops.rs +++ b/datafusion/functions-array/src/set_ops.rs @@ -17,49 +17,56 @@ //! implementation of make_array function -use arrow::datatypes::{DataType, Field}; +use arrow::array::{ + new_null_array, Array, ArrayData, ArrayRef, Capacities, GenericListArray, + MutableArrayData, NullArray, OffsetSizeTrait, +}; +use arrow::buffer::OffsetBuffer; use arrow::datatypes::DataType::{FixedSizeList, LargeList, List, Utf8}; +use arrow::datatypes::{DataType, Field}; +use datafusion_common::utils::array_into_list_array; use datafusion_common::{plan_err, DataFusionError, Result}; use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility}; use std::any::Any; use std::sync::Arc; -use arrow::array::{Array, ArrayData, ArrayRef, Capacities, GenericListArray, MutableArrayData, new_null_array, NullArray, OffsetSizeTrait}; -use arrow::buffer::OffsetBuffer; -use datafusion_common::utils::array_into_list_array; // Create static instances of ScalarUDFs for each function make_udf_function!( - MakeArray, make_array, xx, // arg name + ArrayUnion, + array_union, + xx, // arg name "yyy", // The name of the function to create the ScalarUDF - udf + union_udf +); +make_udf_function!( + ArrayIntersect, + array_intersect, + xx, // arg name + "yyy", // The name of the function to create the ScalarUDF + intersect_udf ); #[derive(Debug)] -pub(super) struct MakeArray { +pub(super) struct ArrayUnion { signature: Signature, aliases: Vec, } -impl MakeArray { +impl ArrayUnion { pub fn new() -> Self { Self { signature: Signature::variadic_any(Volatility::Immutable), - aliases: vec![ - String::from("array_to_string"), - String::from("list_to_string"), - String::from("array_join"), - String::from("list_join"), - ], + aliases: vec![], } } } -impl ScalarUDFImpl for MakeArray { +impl ScalarUDFImpl for ArrayUnion { fn as_any(&self) -> &dyn Any { self } fn name(&self) -> &str { - "array_to_string" + "array_union" } fn signature(&self) -> &Signature { @@ -77,8 +84,7 @@ impl ScalarUDFImpl for MakeArray { } fn invoke(&self, args: &[ColumnarValue]) -> Result { - Ok(make_array_inner(&ColumnarValue::values_to_arrays(args)?) - .map(ColumnarValue::Array)?) + todo!() } fn aliases(&self) -> &[String] { @@ -86,5 +92,48 @@ impl ScalarUDFImpl for MakeArray { } } +#[derive(Debug)] +pub(super) struct ArrayIntersect { + signature: Signature, + aliases: Vec, +} + +impl ArrayIntersect { + pub fn new() -> Self { + Self { + signature: Signature::variadic_any(Volatility::Immutable), + aliases: vec![], + } + } +} + +impl ScalarUDFImpl for ArrayIntersect { + fn as_any(&self) -> &dyn Any { + self + } + fn name(&self) -> &str { + "array_union" + } + + fn signature(&self) -> &Signature { + &self.signature + } + fn return_type(&self, arg_types: &[DataType]) -> Result { + use DataType::*; + Ok(match arg_types[0] { + List(_) | LargeList(_) | FixedSizeList(_, _) => Utf8, + _ => { + return plan_err!("The array_to_string function can only accept List/LargeList/FixedSizeList."); + } + }) + } + fn invoke(&self, args: &[ColumnarValue]) -> Result { + todo!() + } + + fn aliases(&self) -> &[String] { + &self.aliases + } +} diff --git a/datafusion/physical-expr/src/array_expressions.rs b/datafusion/physical-expr/src/array_expressions.rs index 71657eb29cb1..7032ffa8f223 100644 --- a/datafusion/physical-expr/src/array_expressions.rs +++ b/datafusion/physical-expr/src/array_expressions.rs @@ -229,7 +229,6 @@ fn check_datatypes(name: &str, args: &[&ArrayRef]) -> Result<()> { Ok(()) } - fn general_array_element( array: &GenericListArray, indexes: &Int64Array, From e93f996b8333892baf742d2d756bc4b915d65c02 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 13 Feb 2024 14:58:12 -0500 Subject: [PATCH 4/7] distinct --- datafusion/functions-array/Cargo.toml | 1 + datafusion/functions-array/src/lib.rs | 3 +- datafusion/functions-array/src/make_array.rs | 2 +- datafusion/functions-array/src/set_ops.rs | 300 +++++++++++++++++- .../physical-expr/src/array_expressions.rs | 228 ------------- 5 files changed, 296 insertions(+), 238 deletions(-) diff --git a/datafusion/functions-array/Cargo.toml b/datafusion/functions-array/Cargo.toml index 9cf769bf294e..a69cbc0bf1f6 100644 --- a/datafusion/functions-array/Cargo.toml +++ b/datafusion/functions-array/Cargo.toml @@ -43,3 +43,4 @@ datafusion-execution = { workspace = true } datafusion-expr = { workspace = true } log = "0.4.20" paste = "1.0.14" +itertools = "0.12.1" diff --git a/datafusion/functions-array/src/lib.rs b/datafusion/functions-array/src/lib.rs index ec56ec1d91c8..1ef8c180f722 100644 --- a/datafusion/functions-array/src/lib.rs +++ b/datafusion/functions-array/src/lib.rs @@ -42,7 +42,7 @@ use std::sync::Arc; pub mod expr_fn { pub use super::{ make_array::make_array, - set_ops::{array_intersect, array_union}, + set_ops::{array_intersect, array_union, array_distinct}, to_string::array_to_string, }; } @@ -54,6 +54,7 @@ pub fn register_all(registry: &mut dyn FunctionRegistry) -> Result<()> { make_array::udf(), set_ops::union_udf(), set_ops::intersect_udf(), + set_ops::distinct_udf(), ]; functions.into_iter().try_for_each(|udf| { let existing_udf = registry.register_udf(udf)?; diff --git a/datafusion/functions-array/src/make_array.rs b/datafusion/functions-array/src/make_array.rs index f9fcd343fdf4..f4d7fcf8520c 100644 --- a/datafusion/functions-array/src/make_array.rs +++ b/datafusion/functions-array/src/make_array.rs @@ -102,7 +102,7 @@ impl ScalarUDFImpl for MakeArray { } /// `make_array` SQL function -fn make_array_inner(arrays: &[ArrayRef]) -> Result { +pub(crate) fn make_array_inner(arrays: &[ArrayRef]) -> Result { let mut data_type = DataType::Null; for arg in arrays { let arg_data_type = arg.data_type(); diff --git a/datafusion/functions-array/src/set_ops.rs b/datafusion/functions-array/src/set_ops.rs index f8a9d58689d5..e06a8f1bcf9f 100644 --- a/datafusion/functions-array/src/set_ops.rs +++ b/datafusion/functions-array/src/set_ops.rs @@ -17,18 +17,22 @@ //! implementation of make_array function -use arrow::array::{ - new_null_array, Array, ArrayData, ArrayRef, Capacities, GenericListArray, - MutableArrayData, NullArray, OffsetSizeTrait, -}; +use arrow::array::{new_null_array, Array, ArrayData, ArrayRef, Capacities, GenericListArray, MutableArrayData, NullArray, OffsetSizeTrait, new_empty_array, make_array}; use arrow::buffer::OffsetBuffer; use arrow::datatypes::DataType::{FixedSizeList, LargeList, List, Utf8}; -use arrow::datatypes::{DataType, Field}; +use arrow::datatypes::{DataType, Field, FieldRef}; +use itertools::Itertools; use datafusion_common::utils::array_into_list_array; -use datafusion_common::{plan_err, DataFusionError, Result}; +use datafusion_common::{plan_err, DataFusionError, Result, internal_err, exec_err}; use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility}; use std::any::Any; +use std::collections::HashSet; +use std::fmt::{Display, Formatter}; use std::sync::Arc; +use arrow::compute; +use arrow::row::{RowConverter, SortField}; +use datafusion_common::cast::{as_large_list_array, as_list_array}; +use crate::make_array::make_array_inner; // Create static instances of ScalarUDFs for each function make_udf_function!( @@ -45,6 +49,13 @@ make_udf_function!( "yyy", // The name of the function to create the ScalarUDF intersect_udf ); +make_udf_function!( + ArrayDistinct, + array_distinct, + xx, // arg name + "yyy", // The name of the function to create the ScalarUDF + distinct_udf +); #[derive(Debug)] pub(super) struct ArrayUnion { @@ -84,7 +95,15 @@ impl ScalarUDFImpl for ArrayUnion { } fn invoke(&self, args: &[ColumnarValue]) -> Result { - todo!() + if args.len() != 2 { + return exec_err!("array_union needs two arguments"); + } + let args = ColumnarValue::values_to_arrays(args)?; + let array1 = &args[0]; + let array2 = &args[1]; + + general_set_op(array1, array2, SetOp::Union) + .map(ColumnarValue::Array) } fn aliases(&self) -> &[String] { @@ -130,10 +149,275 @@ impl ScalarUDFImpl for ArrayIntersect { } fn invoke(&self, args: &[ColumnarValue]) -> Result { - todo!() + if args.len() != 2 { + return exec_err!("array_intersect needs two arguments"); + } + let args = ColumnarValue::values_to_arrays(args)?; + + let array1 = &args[0]; + let array2 = &args[1]; + + general_set_op(array1, array2, SetOp::Intersect) + .map(ColumnarValue::Array) } fn aliases(&self) -> &[String] { &self.aliases } } + + + +#[derive(Debug)] +pub(super) struct ArrayDistinct { + signature: Signature, + aliases: Vec, +} + +impl ArrayDistinct { + pub fn new() -> Self { + Self { + signature: Signature::variadic_any(Volatility::Immutable), + aliases: vec![], + } + } +} + +impl ScalarUDFImpl for ArrayDistinct { + fn as_any(&self) -> &dyn Any { + self + } + fn name(&self) -> &str { + "array_distinct" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, arg_types: &[DataType]) -> Result { + use DataType::*; + Ok(match arg_types[0] { + List(_) | LargeList(_) | FixedSizeList(_, _) => Utf8, + _ => { + return plan_err!("The array_to_string function can only accept List/LargeList/FixedSizeList."); + } + }) + } + + /// array_distinct SQL function + /// example: from list [1, 3, 2, 3, 1, 2, 4] to [1, 2, 3, 4] + fn invoke(&self, args: &[ColumnarValue]) -> Result { + if args.len() != 1 { + return exec_err!("array_distinct needs one argument"); + } + // handle null + if args[0].data_type() == DataType::Null { + return Ok(args[0].clone()); + } + let args = ColumnarValue::values_to_arrays(args)?; + + + // handle for list & largelist + match args[0].data_type() { + DataType::List(field) => { + let array = as_list_array(&args[0])?; + general_array_distinct(array, field) + } + DataType::LargeList(field) => { + let array = as_large_list_array(&args[0])?; + general_array_distinct(array, field) + } + array_type => exec_err!("array_distinct does not support type '{array_type:?}'"), + }.map(ColumnarValue::Array) + + } + + fn aliases(&self) -> &[String] { + &self.aliases + } +} + + + +#[derive(Debug, PartialEq)] +enum SetOp { + Union, + Intersect, +} + +impl Display for SetOp { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + SetOp::Union => write!(f, "array_union"), + SetOp::Intersect => write!(f, "array_intersect"), + } + } +} + +fn generic_set_lists( + l: &GenericListArray, + r: &GenericListArray, + field: Arc, + set_op: SetOp, +) -> Result { + if matches!(l.value_type(), DataType::Null) { + let field = Arc::new(Field::new("item", r.value_type(), true)); + return general_array_distinct::(r, &field); + } else if matches!(r.value_type(), DataType::Null) { + let field = Arc::new(Field::new("item", l.value_type(), true)); + return general_array_distinct::(l, &field); + } + + if l.value_type() != r.value_type() { + return internal_err!("{set_op:?} is not implemented for '{l:?}' and '{r:?}'"); + } + + let dt = l.value_type(); + + let mut offsets = vec![OffsetSize::usize_as(0)]; + let mut new_arrays = vec![]; + + let converter = RowConverter::new(vec![SortField::new(dt)])?; + for (first_arr, second_arr) in l.iter().zip(r.iter()) { + if let (Some(first_arr), Some(second_arr)) = (first_arr, second_arr) { + let l_values = converter.convert_columns(&[first_arr])?; + let r_values = converter.convert_columns(&[second_arr])?; + + let l_iter = l_values.iter().sorted().dedup(); + let values_set: HashSet<_> = l_iter.clone().collect(); + let mut rows = if set_op == SetOp::Union { + l_iter.collect::>() + } else { + vec![] + }; + for r_val in r_values.iter().sorted().dedup() { + match set_op { + SetOp::Union => { + if !values_set.contains(&r_val) { + rows.push(r_val); + } + } + SetOp::Intersect => { + if values_set.contains(&r_val) { + rows.push(r_val); + } + } + } + } + + let last_offset = match offsets.last().copied() { + Some(offset) => offset, + None => return internal_err!("offsets should not be empty"), + }; + offsets.push(last_offset + OffsetSize::usize_as(rows.len())); + let arrays = converter.convert_rows(rows)?; + let array = match arrays.first() { + Some(array) => array.clone(), + None => { + return internal_err!("{set_op}: failed to get array from rows"); + } + }; + new_arrays.push(array); + } + } + + let offsets = OffsetBuffer::new(offsets.into()); + let new_arrays_ref = new_arrays.iter().map(|v| v.as_ref()).collect::>(); + let values = compute::concat(&new_arrays_ref)?; + let arr = GenericListArray::::try_new(field, offsets, values, None)?; + Ok(Arc::new(arr)) +} + +fn general_set_op( + array1: &ArrayRef, + array2: &ArrayRef, + set_op: SetOp, +) -> Result { + match (array1.data_type(), array2.data_type()) { + (DataType::Null, DataType::List(field)) => { + if set_op == SetOp::Intersect { + return Ok(new_empty_array(&DataType::Null)); + } + let array = as_list_array(&array2)?; + general_array_distinct::(array, field) + } + + (DataType::List(field), DataType::Null) => { + if set_op == SetOp::Intersect { + return make_array_inner(&[]); + } + let array = as_list_array(&array1)?; + general_array_distinct::(array, field) + } + (DataType::Null, DataType::LargeList(field)) => { + if set_op == SetOp::Intersect { + return Ok(new_empty_array(&DataType::Null)); + } + let array = as_large_list_array(&array2)?; + general_array_distinct::(array, field) + } + (DataType::LargeList(field), DataType::Null) => { + if set_op == SetOp::Intersect { + return make_array_inner(&[]); + } + let array = as_large_list_array(&array1)?; + general_array_distinct::(array, field) + } + (DataType::Null, DataType::Null) => Ok(new_empty_array(&DataType::Null)), + + (DataType::List(field), DataType::List(_)) => { + let array1 = as_list_array(&array1)?; + let array2 = as_list_array(&array2)?; + generic_set_lists::(array1, array2, field.clone(), set_op) + } + (DataType::LargeList(field), DataType::LargeList(_)) => { + let array1 = as_large_list_array(&array1)?; + let array2 = as_large_list_array(&array2)?; + generic_set_lists::(array1, array2, field.clone(), set_op) + } + (data_type1, data_type2) => { + internal_err!( + "{set_op} does not support types '{data_type1:?}' and '{data_type2:?}'" + ) + } + } +} + + + +fn general_array_distinct( + array: &GenericListArray, + field: &FieldRef, +) -> Result { + let dt = array.value_type(); + let mut offsets = Vec::with_capacity(array.len()); + offsets.push(OffsetSize::usize_as(0)); + let mut new_arrays = Vec::with_capacity(array.len()); + let converter = RowConverter::new(vec![SortField::new(dt)])?; + // distinct for each list in ListArray + for arr in array.iter().flatten() { + let values = converter.convert_columns(&[arr])?; + // sort elements in list and remove duplicates + let rows = values.iter().sorted().dedup().collect::>(); + let last_offset: OffsetSize = offsets.last().copied().unwrap(); + offsets.push(last_offset + OffsetSize::usize_as(rows.len())); + let arrays = converter.convert_rows(rows)?; + let array = match arrays.first() { + Some(array) => array.clone(), + None => { + return internal_err!("array_distinct: failed to get array from rows") + } + }; + new_arrays.push(array); + } + let offsets = OffsetBuffer::new(offsets.into()); + let new_arrays_ref = new_arrays.iter().map(|v| v.as_ref()).collect::>(); + let values = compute::concat(&new_arrays_ref)?; + Ok(Arc::new(GenericListArray::::try_new( + field.clone(), + offsets, + values, + None, + )?)) +} diff --git a/datafusion/physical-expr/src/array_expressions.rs b/datafusion/physical-expr/src/array_expressions.rs index 7032ffa8f223..a81c71171ff3 100644 --- a/datafusion/physical-expr/src/array_expressions.rs +++ b/datafusion/physical-expr/src/array_expressions.rs @@ -1715,173 +1715,6 @@ pub fn array_replace_all(args: &[ArrayRef]) -> Result { } } -#[derive(Debug, PartialEq)] -enum SetOp { - Union, - Intersect, -} - -impl Display for SetOp { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - match self { - SetOp::Union => write!(f, "array_union"), - SetOp::Intersect => write!(f, "array_intersect"), - } - } -} - -fn generic_set_lists( - l: &GenericListArray, - r: &GenericListArray, - field: Arc, - set_op: SetOp, -) -> Result { - if matches!(l.value_type(), DataType::Null) { - let field = Arc::new(Field::new("item", r.value_type(), true)); - return general_array_distinct::(r, &field); - } else if matches!(r.value_type(), DataType::Null) { - let field = Arc::new(Field::new("item", l.value_type(), true)); - return general_array_distinct::(l, &field); - } - - if l.value_type() != r.value_type() { - return internal_err!("{set_op:?} is not implemented for '{l:?}' and '{r:?}'"); - } - - let dt = l.value_type(); - - let mut offsets = vec![OffsetSize::usize_as(0)]; - let mut new_arrays = vec![]; - - let converter = RowConverter::new(vec![SortField::new(dt)])?; - for (first_arr, second_arr) in l.iter().zip(r.iter()) { - if let (Some(first_arr), Some(second_arr)) = (first_arr, second_arr) { - let l_values = converter.convert_columns(&[first_arr])?; - let r_values = converter.convert_columns(&[second_arr])?; - - let l_iter = l_values.iter().sorted().dedup(); - let values_set: HashSet<_> = l_iter.clone().collect(); - let mut rows = if set_op == SetOp::Union { - l_iter.collect::>() - } else { - vec![] - }; - for r_val in r_values.iter().sorted().dedup() { - match set_op { - SetOp::Union => { - if !values_set.contains(&r_val) { - rows.push(r_val); - } - } - SetOp::Intersect => { - if values_set.contains(&r_val) { - rows.push(r_val); - } - } - } - } - - let last_offset = match offsets.last().copied() { - Some(offset) => offset, - None => return internal_err!("offsets should not be empty"), - }; - offsets.push(last_offset + OffsetSize::usize_as(rows.len())); - let arrays = converter.convert_rows(rows)?; - let array = match arrays.first() { - Some(array) => array.clone(), - None => { - return internal_err!("{set_op}: failed to get array from rows"); - } - }; - new_arrays.push(array); - } - } - - let offsets = OffsetBuffer::new(offsets.into()); - let new_arrays_ref = new_arrays.iter().map(|v| v.as_ref()).collect::>(); - let values = compute::concat(&new_arrays_ref)?; - let arr = GenericListArray::::try_new(field, offsets, values, None)?; - Ok(Arc::new(arr)) -} - -fn general_set_op( - array1: &ArrayRef, - array2: &ArrayRef, - set_op: SetOp, -) -> Result { - match (array1.data_type(), array2.data_type()) { - (DataType::Null, DataType::List(field)) => { - if set_op == SetOp::Intersect { - return Ok(new_empty_array(&DataType::Null)); - } - let array = as_list_array(&array2)?; - general_array_distinct::(array, field) - } - - (DataType::List(field), DataType::Null) => { - if set_op == SetOp::Intersect { - return make_array(&[]); - } - let array = as_list_array(&array1)?; - general_array_distinct::(array, field) - } - (DataType::Null, DataType::LargeList(field)) => { - if set_op == SetOp::Intersect { - return Ok(new_empty_array(&DataType::Null)); - } - let array = as_large_list_array(&array2)?; - general_array_distinct::(array, field) - } - (DataType::LargeList(field), DataType::Null) => { - if set_op == SetOp::Intersect { - return make_array(&[]); - } - let array = as_large_list_array(&array1)?; - general_array_distinct::(array, field) - } - (DataType::Null, DataType::Null) => Ok(new_empty_array(&DataType::Null)), - - (DataType::List(field), DataType::List(_)) => { - let array1 = as_list_array(&array1)?; - let array2 = as_list_array(&array2)?; - generic_set_lists::(array1, array2, field.clone(), set_op) - } - (DataType::LargeList(field), DataType::LargeList(_)) => { - let array1 = as_large_list_array(&array1)?; - let array2 = as_large_list_array(&array2)?; - generic_set_lists::(array1, array2, field.clone(), set_op) - } - (data_type1, data_type2) => { - internal_err!( - "{set_op} does not support types '{data_type1:?}' and '{data_type2:?}'" - ) - } - } -} - -/// Array_union SQL function -pub fn array_union(args: &[ArrayRef]) -> Result { - if args.len() != 2 { - return exec_err!("array_union needs two arguments"); - } - let array1 = &args[0]; - let array2 = &args[1]; - - general_set_op(array1, array2, SetOp::Union) -} - -/// array_intersect SQL function -pub fn array_intersect(args: &[ArrayRef]) -> Result { - if args.len() != 2 { - return exec_err!("array_intersect needs two arguments"); - } - - let array1 = &args[0]; - let array2 = &args[1]; - - general_set_op(array1, array2, SetOp::Intersect) -} - /// Cardinality SQL function pub fn cardinality(args: &[ArrayRef]) -> Result { if args.len() != 1 { @@ -2301,67 +2134,6 @@ pub fn string_to_array(args: &[ArrayRef]) -> Result( - array: &GenericListArray, - field: &FieldRef, -) -> Result { - let dt = array.value_type(); - let mut offsets = Vec::with_capacity(array.len()); - offsets.push(OffsetSize::usize_as(0)); - let mut new_arrays = Vec::with_capacity(array.len()); - let converter = RowConverter::new(vec![SortField::new(dt)])?; - // distinct for each list in ListArray - for arr in array.iter().flatten() { - let values = converter.convert_columns(&[arr])?; - // sort elements in list and remove duplicates - let rows = values.iter().sorted().dedup().collect::>(); - let last_offset: OffsetSize = offsets.last().copied().unwrap(); - offsets.push(last_offset + OffsetSize::usize_as(rows.len())); - let arrays = converter.convert_rows(rows)?; - let array = match arrays.first() { - Some(array) => array.clone(), - None => { - return internal_err!("array_distinct: failed to get array from rows") - } - }; - new_arrays.push(array); - } - let offsets = OffsetBuffer::new(offsets.into()); - let new_arrays_ref = new_arrays.iter().map(|v| v.as_ref()).collect::>(); - let values = compute::concat(&new_arrays_ref)?; - Ok(Arc::new(GenericListArray::::try_new( - field.clone(), - offsets, - values, - None, - )?)) -} - -/// array_distinct SQL function -/// example: from list [1, 3, 2, 3, 1, 2, 4] to [1, 2, 3, 4] -pub fn array_distinct(args: &[ArrayRef]) -> Result { - if args.len() != 1 { - return exec_err!("array_distinct needs one argument"); - } - - // handle null - if args[0].data_type() == &DataType::Null { - return Ok(args[0].clone()); - } - - // handle for list & largelist - match args[0].data_type() { - DataType::List(field) => { - let array = as_list_array(&args[0])?; - general_array_distinct(array, field) - } - DataType::LargeList(field) => { - let array = as_large_list_array(&args[0])?; - general_array_distinct(array, field) - } - array_type => exec_err!("array_distinct does not support type '{array_type:?}'"), - } -} /// array_resize SQL function pub fn array_resize(arg: &[ArrayRef]) -> Result { From 2a8bc781450c9c82012aef1a91711e32e22c65a7 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 13 Feb 2024 15:13:47 -0500 Subject: [PATCH 5/7] port metadata --- datafusion/expr/src/built_in_function.rs | 38 +----- datafusion/expr/src/expr_fn.rs | 15 +-- datafusion/functions-array/src/lib.rs | 2 +- datafusion/functions-array/src/make_array.rs | 4 +- datafusion/functions-array/src/set_ops.rs | 117 ++++++++---------- .../physical-expr/src/array_expressions.rs | 1 - 6 files changed, 55 insertions(+), 122 deletions(-) diff --git a/datafusion/expr/src/built_in_function.rs b/datafusion/expr/src/built_in_function.rs index a05fad8f4c70..978a69de4328 100644 --- a/datafusion/expr/src/built_in_function.rs +++ b/datafusion/expr/src/built_in_function.rs @@ -141,8 +141,6 @@ pub enum BuiltinScalarFunction { ArrayPopBack, /// array_dims ArrayDims, - /// array_distinct - ArrayDistinct, /// array_element ArrayElement, /// array_empty @@ -175,10 +173,6 @@ pub enum BuiltinScalarFunction { ArrayReverse, /// array_slice ArraySlice, - /// array_intersect - ArrayIntersect, - /// array_union - ArrayUnion, /// array_except ArrayExcept, /// cardinality @@ -412,7 +406,6 @@ impl BuiltinScalarFunction { BuiltinScalarFunction::ArrayHasAny => Volatility::Immutable, BuiltinScalarFunction::ArrayHas => Volatility::Immutable, BuiltinScalarFunction::ArrayDims => Volatility::Immutable, - BuiltinScalarFunction::ArrayDistinct => Volatility::Immutable, BuiltinScalarFunction::ArrayElement => Volatility::Immutable, BuiltinScalarFunction::ArrayExcept => Volatility::Immutable, BuiltinScalarFunction::ArrayLength => Volatility::Immutable, @@ -432,8 +425,6 @@ impl BuiltinScalarFunction { BuiltinScalarFunction::ArrayReverse => Volatility::Immutable, BuiltinScalarFunction::Flatten => Volatility::Immutable, BuiltinScalarFunction::ArraySlice => Volatility::Immutable, - BuiltinScalarFunction::ArrayIntersect => Volatility::Immutable, - BuiltinScalarFunction::ArrayUnion => Volatility::Immutable, BuiltinScalarFunction::ArrayResize => Volatility::Immutable, BuiltinScalarFunction::Range => Volatility::Immutable, BuiltinScalarFunction::Cardinality => Volatility::Immutable, @@ -596,7 +587,6 @@ impl BuiltinScalarFunction { BuiltinScalarFunction::ArrayDims => { Ok(List(Arc::new(Field::new("item", UInt64, true)))) } - BuiltinScalarFunction::ArrayDistinct => Ok(input_expr_types[0].clone()), BuiltinScalarFunction::ArrayElement => match &input_expr_types[0] { List(field) | LargeList(field) @@ -628,24 +618,7 @@ impl BuiltinScalarFunction { BuiltinScalarFunction::ArrayReverse => Ok(input_expr_types[0].clone()), BuiltinScalarFunction::ArraySlice => Ok(input_expr_types[0].clone()), BuiltinScalarFunction::ArrayResize => Ok(input_expr_types[0].clone()), - BuiltinScalarFunction::ArrayIntersect => { - match (input_expr_types[0].clone(), input_expr_types[1].clone()) { - (DataType::Null, DataType::Null) | (DataType::Null, _) => { - Ok(DataType::Null) - } - (_, DataType::Null) => { - Ok(List(Arc::new(Field::new("item", Null, true)))) - } - (dt, _) => Ok(dt), - } - } - BuiltinScalarFunction::ArrayUnion => { - match (input_expr_types[0].clone(), input_expr_types[1].clone()) { - (DataType::Null, dt) => Ok(dt), - (dt, DataType::Null) => Ok(dt), - (dt, _) => Ok(dt), - } - } + BuiltinScalarFunction::Range => { Ok(List(Arc::new(Field::new("item", Int64, true)))) } @@ -942,7 +915,6 @@ impl BuiltinScalarFunction { Signature::variadic_any(self.volatility()) } BuiltinScalarFunction::ArrayNdims => Signature::any(1, self.volatility()), - BuiltinScalarFunction::ArrayDistinct => Signature::any(1, self.volatility()), BuiltinScalarFunction::ArrayPosition => { Signature::variadic_any(self.volatility()) } @@ -969,9 +941,6 @@ impl BuiltinScalarFunction { BuiltinScalarFunction::ArraySlice => { Signature::variadic_any(self.volatility()) } - - BuiltinScalarFunction::ArrayIntersect => Signature::any(2, self.volatility()), - BuiltinScalarFunction::ArrayUnion => Signature::any(2, self.volatility()), BuiltinScalarFunction::Cardinality => Signature::any(1, self.volatility()), BuiltinScalarFunction::ArrayResize => { Signature::variadic_any(self.volatility()) @@ -1566,7 +1535,6 @@ impl BuiltinScalarFunction { &["array_concat", "array_cat", "list_concat", "list_cat"] } BuiltinScalarFunction::ArrayDims => &["array_dims", "list_dims"], - BuiltinScalarFunction::ArrayDistinct => &["array_distinct", "list_distinct"], BuiltinScalarFunction::ArrayEmpty => &["empty"], BuiltinScalarFunction::ArrayElement => &[ "array_element", @@ -1617,12 +1585,8 @@ impl BuiltinScalarFunction { } BuiltinScalarFunction::ArrayReverse => &["array_reverse", "list_reverse"], BuiltinScalarFunction::ArraySlice => &["array_slice", "list_slice"], - BuiltinScalarFunction::ArrayUnion => &["array_union", "list_union"], BuiltinScalarFunction::Cardinality => &["cardinality"], BuiltinScalarFunction::ArrayResize => &["array_resize", "list_resize"], - BuiltinScalarFunction::ArrayIntersect => { - &["array_intersect", "list_intersect"] - } BuiltinScalarFunction::OverLay => &["overlay"], BuiltinScalarFunction::Range => &["range", "generate_series"], diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs index 0414d024ef9b..a7226cf0fff7 100644 --- a/datafusion/expr/src/expr_fn.rs +++ b/datafusion/expr/src/expr_fn.rs @@ -662,12 +662,7 @@ scalar_expr!( array, "returns the number of dimensions of the array." ); -scalar_expr!( - ArrayDistinct, - array_distinct, - array, - "return distinct values from the array after removing duplicates." -); + scalar_expr!( ArrayPosition, array_position, @@ -740,7 +735,6 @@ scalar_expr!( array begin end stride, "returns a slice of the array." ); -scalar_expr!(ArrayUnion, array_union, array1 array2, "returns an array of the elements in the union of array1 and array2 without duplicates."); scalar_expr!( Cardinality, @@ -756,13 +750,6 @@ scalar_expr!( "returns an array with the specified size filled with the given value." ); -scalar_expr!( - ArrayIntersect, - array_intersect, - first_array second_array, - "Returns an array of the elements in the intersection of array1 and array2." -); - nary_scalar_expr!( Range, gen_range, diff --git a/datafusion/functions-array/src/lib.rs b/datafusion/functions-array/src/lib.rs index 1ef8c180f722..f897a257482b 100644 --- a/datafusion/functions-array/src/lib.rs +++ b/datafusion/functions-array/src/lib.rs @@ -42,7 +42,7 @@ use std::sync::Arc; pub mod expr_fn { pub use super::{ make_array::make_array, - set_ops::{array_intersect, array_union, array_distinct}, + set_ops::{array_distinct, array_intersect, array_union}, to_string::array_to_string, }; } diff --git a/datafusion/functions-array/src/make_array.rs b/datafusion/functions-array/src/make_array.rs index f4d7fcf8520c..c815dfeabda9 100644 --- a/datafusion/functions-array/src/make_array.rs +++ b/datafusion/functions-array/src/make_array.rs @@ -22,13 +22,11 @@ use arrow::array::{ MutableArrayData, NullArray, OffsetSizeTrait, }; use arrow::buffer::OffsetBuffer; -use arrow::datatypes::DataType::{FixedSizeList, LargeList, List, Utf8}; use arrow::datatypes::{DataType, Field}; use datafusion_common::utils::array_into_list_array; use datafusion_common::{plan_err, DataFusionError, Result}; use datafusion_expr::{ - BuiltinScalarFunction, ColumnarValue, ScalarUDFImpl, Signature, TypeSignature, - Volatility, + ColumnarValue, ScalarUDFImpl, Signature, TypeSignature, Volatility, }; use std::any::Any; use std::sync::Arc; diff --git a/datafusion/functions-array/src/set_ops.rs b/datafusion/functions-array/src/set_ops.rs index e06a8f1bcf9f..602b598e8918 100644 --- a/datafusion/functions-array/src/set_ops.rs +++ b/datafusion/functions-array/src/set_ops.rs @@ -17,43 +17,41 @@ //! implementation of make_array function -use arrow::array::{new_null_array, Array, ArrayData, ArrayRef, Capacities, GenericListArray, MutableArrayData, NullArray, OffsetSizeTrait, new_empty_array, make_array}; +use crate::make_array::make_array_inner; +use arrow::array::{new_empty_array, Array, ArrayRef, GenericListArray, OffsetSizeTrait}; use arrow::buffer::OffsetBuffer; -use arrow::datatypes::DataType::{FixedSizeList, LargeList, List, Utf8}; +use arrow::compute; use arrow::datatypes::{DataType, Field, FieldRef}; -use itertools::Itertools; -use datafusion_common::utils::array_into_list_array; -use datafusion_common::{plan_err, DataFusionError, Result, internal_err, exec_err}; +use arrow::row::{RowConverter, SortField}; +use datafusion_common::cast::{as_large_list_array, as_list_array}; +use datafusion_common::{exec_err, internal_err, DataFusionError, Result}; use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility}; +use itertools::Itertools; use std::any::Any; use std::collections::HashSet; use std::fmt::{Display, Formatter}; use std::sync::Arc; -use arrow::compute; -use arrow::row::{RowConverter, SortField}; -use datafusion_common::cast::{as_large_list_array, as_list_array}; -use crate::make_array::make_array_inner; // Create static instances of ScalarUDFs for each function make_udf_function!( ArrayUnion, array_union, - xx, // arg name - "yyy", // The name of the function to create the ScalarUDF + array1 array2, + "returns an array of the elements in the union of array1 and array2 without duplicates.", union_udf ); make_udf_function!( ArrayIntersect, array_intersect, - xx, // arg name - "yyy", // The name of the function to create the ScalarUDF + first_array second_array, + "Returns an array of the elements in the intersection of array1 and array2.", intersect_udf ); make_udf_function!( ArrayDistinct, array_distinct, - xx, // arg name - "yyy", // The name of the function to create the ScalarUDF + array, + "return distinct values from the array after removing duplicates.", distinct_udf ); @@ -66,8 +64,8 @@ pub(super) struct ArrayUnion { impl ArrayUnion { pub fn new() -> Self { Self { - signature: Signature::variadic_any(Volatility::Immutable), - aliases: vec![], + signature: Signature::any(2, Volatility::Immutable), + aliases: vec!["list_union".to_string()], } } } @@ -85,13 +83,11 @@ impl ScalarUDFImpl for ArrayUnion { } fn return_type(&self, arg_types: &[DataType]) -> Result { - use DataType::*; - Ok(match arg_types[0] { - List(_) | LargeList(_) | FixedSizeList(_, _) => Utf8, - _ => { - return plan_err!("The array_to_string function can only accept List/LargeList/FixedSizeList."); - } - }) + match (&arg_types[0], &arg_types[1]) { + (&DataType::Null, dt) => Ok(dt.clone()), + (dt, DataType::Null) => Ok(dt.clone()), + (dt, _) => Ok(dt.clone()), + } } fn invoke(&self, args: &[ColumnarValue]) -> Result { @@ -102,8 +98,7 @@ impl ScalarUDFImpl for ArrayUnion { let array1 = &args[0]; let array2 = &args[1]; - general_set_op(array1, array2, SetOp::Union) - .map(ColumnarValue::Array) + general_set_op(array1, array2, SetOp::Union).map(ColumnarValue::Array) } fn aliases(&self) -> &[String] { @@ -120,8 +115,8 @@ pub(super) struct ArrayIntersect { impl ArrayIntersect { pub fn new() -> Self { Self { - signature: Signature::variadic_any(Volatility::Immutable), - aliases: vec![], + signature: Signature::any(2, Volatility::Immutable), + aliases: vec!["list_intersect".to_string()], } } } @@ -131,7 +126,7 @@ impl ScalarUDFImpl for ArrayIntersect { self } fn name(&self) -> &str { - "array_union" + "array_intersect" } fn signature(&self) -> &Signature { @@ -139,26 +134,27 @@ impl ScalarUDFImpl for ArrayIntersect { } fn return_type(&self, arg_types: &[DataType]) -> Result { - use DataType::*; - Ok(match arg_types[0] { - List(_) | LargeList(_) | FixedSizeList(_, _) => Utf8, - _ => { - return plan_err!("The array_to_string function can only accept List/LargeList/FixedSizeList."); - } - }) + match (arg_types[0].clone(), arg_types[1].clone()) { + (DataType::Null, DataType::Null) | (DataType::Null, _) => Ok(DataType::Null), + (_, DataType::Null) => Ok(DataType::List(Arc::new(Field::new( + "item", + DataType::Null, + true, + )))), + (dt, _) => Ok(dt), + } } fn invoke(&self, args: &[ColumnarValue]) -> Result { - if args.len() != 2 { - return exec_err!("array_intersect needs two arguments"); - } - let args = ColumnarValue::values_to_arrays(args)?; + if args.len() != 2 { + return exec_err!("array_intersect needs two arguments"); + } + let args = ColumnarValue::values_to_arrays(args)?; - let array1 = &args[0]; - let array2 = &args[1]; + let array1 = &args[0]; + let array2 = &args[1]; - general_set_op(array1, array2, SetOp::Intersect) - .map(ColumnarValue::Array) + general_set_op(array1, array2, SetOp::Intersect).map(ColumnarValue::Array) } fn aliases(&self) -> &[String] { @@ -166,8 +162,6 @@ impl ScalarUDFImpl for ArrayIntersect { } } - - #[derive(Debug)] pub(super) struct ArrayDistinct { signature: Signature, @@ -177,8 +171,8 @@ pub(super) struct ArrayDistinct { impl ArrayDistinct { pub fn new() -> Self { Self { - signature: Signature::variadic_any(Volatility::Immutable), - aliases: vec![], + signature: Signature::any(1, Volatility::Immutable), + aliases: vec!["list_distinct".to_string()], } } } @@ -196,28 +190,21 @@ impl ScalarUDFImpl for ArrayDistinct { } fn return_type(&self, arg_types: &[DataType]) -> Result { - use DataType::*; - Ok(match arg_types[0] { - List(_) | LargeList(_) | FixedSizeList(_, _) => Utf8, - _ => { - return plan_err!("The array_to_string function can only accept List/LargeList/FixedSizeList."); - } - }) + Ok(arg_types[0].clone()) } /// array_distinct SQL function /// example: from list [1, 3, 2, 3, 1, 2, 4] to [1, 2, 3, 4] fn invoke(&self, args: &[ColumnarValue]) -> Result { - if args.len() != 1 { - return exec_err!("array_distinct needs one argument"); - } + if args.len() != 1 { + return exec_err!("array_distinct needs one argument"); + } // handle null if args[0].data_type() == DataType::Null { return Ok(args[0].clone()); } let args = ColumnarValue::values_to_arrays(args)?; - // handle for list & largelist match args[0].data_type() { DataType::List(field) => { @@ -228,9 +215,11 @@ impl ScalarUDFImpl for ArrayDistinct { let array = as_large_list_array(&args[0])?; general_array_distinct(array, field) } - array_type => exec_err!("array_distinct does not support type '{array_type:?}'"), - }.map(ColumnarValue::Array) - + array_type => { + exec_err!("array_distinct does not support type '{array_type:?}'") + } + } + .map(ColumnarValue::Array) } fn aliases(&self) -> &[String] { @@ -238,8 +227,6 @@ impl ScalarUDFImpl for ArrayDistinct { } } - - #[derive(Debug, PartialEq)] enum SetOp { Union, @@ -384,8 +371,6 @@ fn general_set_op( } } - - fn general_array_distinct( array: &GenericListArray, field: &FieldRef, diff --git a/datafusion/physical-expr/src/array_expressions.rs b/datafusion/physical-expr/src/array_expressions.rs index a81c71171ff3..bdcdf76e79a0 100644 --- a/datafusion/physical-expr/src/array_expressions.rs +++ b/datafusion/physical-expr/src/array_expressions.rs @@ -2134,7 +2134,6 @@ pub fn string_to_array(args: &[ArrayRef]) -> Result Result { if arg.len() < 2 || arg.len() > 3 { From d8c89ff696ef40e234f8561d3d4f7a291cea35b9 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 13 Feb 2024 15:35:37 -0500 Subject: [PATCH 6/7] Untangle make_array --- datafusion/expr/src/expr_fn.rs | 1 - datafusion/functions-array/src/make_array.rs | 4 +- .../optimizer/src/analyzer/rewrite_expr.rs | 17 ++++ .../optimizer/src/analyzer/type_coercion.rs | 91 +------------------ .../physical-expr/src/array_expressions.rs | 3 +- datafusion/physical-expr/src/functions.rs | 12 --- .../physical-expr/src/scalar_function.rs | 29 ++---- datafusion/sql/src/expr/function.rs | 17 ++-- datafusion/sql/src/expr/value.rs | 19 ++-- 9 files changed, 50 insertions(+), 143 deletions(-) diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs index a7226cf0fff7..d5361ed41b9b 100644 --- a/datafusion/expr/src/expr_fn.rs +++ b/datafusion/expr/src/expr_fn.rs @@ -1437,7 +1437,6 @@ mod test { test_scalar_expr!(ArrayReplaceN, array_replace_n, array, from, to, max); test_scalar_expr!(ArrayReplaceAll, array_replace_all, array, from, to); test_unary_scalar_expr!(Cardinality, cardinality); - test_nary_scalar_expr!(MakeArray, array, input); test_unary_scalar_expr!(ArrowTypeof, arrow_typeof); test_nary_scalar_expr!(OverLay, overlay, string, characters, position, len); diff --git a/datafusion/functions-array/src/make_array.rs b/datafusion/functions-array/src/make_array.rs index c815dfeabda9..8858296f095e 100644 --- a/datafusion/functions-array/src/make_array.rs +++ b/datafusion/functions-array/src/make_array.rs @@ -90,8 +90,8 @@ impl ScalarUDFImpl for MakeArray { } fn invoke(&self, args: &[ColumnarValue]) -> Result { - Ok(make_array_inner(&ColumnarValue::values_to_arrays(args)?) - .map(ColumnarValue::Array)?) + make_array_inner(&ColumnarValue::values_to_arrays(args)?) + .map(ColumnarValue::Array) } fn aliases(&self) -> &[String] { diff --git a/datafusion/optimizer/src/analyzer/rewrite_expr.rs b/datafusion/optimizer/src/analyzer/rewrite_expr.rs index eedfc40a7f80..036cdf31a22a 100644 --- a/datafusion/optimizer/src/analyzer/rewrite_expr.rs +++ b/datafusion/optimizer/src/analyzer/rewrite_expr.rs @@ -154,6 +154,23 @@ fn rewrite_array_concat_operator_to_func( return None; } + fn is_make_array(expr: &Expr) -> bool { + if let Expr::ScalarFunction(ScalarFunction { func_def, .. }) = expr { + func_def.name() == "make_array" + } else { + false + } + } + + fn is_array_concat(expr: &Expr) -> bool { + if let Expr::ScalarFunction(ScalarFunction { func_def, .. }) = expr { + func_def.name() == "array_concat" + } else { + false + } + } + + // TODO figure out how to generalize this so it isn't hard coded by name match (left, right) { // Chain concat operator (a || b) || array, // (arry concat, array append, array prepend) || array -> array concat diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs index 662e0fc7c258..b45d04fd2fb3 100644 --- a/datafusion/optimizer/src/analyzer/type_coercion.rs +++ b/datafusion/optimizer/src/analyzer/type_coercion.rs @@ -45,7 +45,7 @@ use datafusion_expr::type_coercion::{is_datetime, is_utf8_or_large_utf8}; use datafusion_expr::utils::merge_schema; use datafusion_expr::{ is_false, is_not_false, is_not_true, is_not_unknown, is_true, is_unknown, not, - type_coercion, AggregateFunction, BuiltinScalarFunction, Expr, ExprSchemable, + type_coercion, AggregateFunction, Expr, ExprSchemable, LogicalPlan, Operator, Projection, ScalarFunctionDefinition, Signature, WindowFrame, WindowFrameBound, WindowFrameUnits, }; @@ -333,11 +333,6 @@ impl TreeNodeRewriter for TypeCoercionRewriter { &self.schema, &fun.signature(), )?; - let new_args = coerce_arguments_for_fun( - new_args.as_slice(), - &self.schema, - &fun, - )?; Ok(Expr::ScalarFunction(ScalarFunction::new(fun, new_args))) } ScalarFunctionDefinition::UDF(fun) => { @@ -584,35 +579,6 @@ fn coerce_arguments_for_signature( .collect::>>() } -fn coerce_arguments_for_fun( - expressions: &[Expr], - schema: &DFSchema, - fun: &BuiltinScalarFunction, -) -> Result> { - if expressions.is_empty() { - return Ok(vec![]); - } - let mut expressions: Vec = expressions.to_vec(); - - // Cast Fixedsizelist to List for array functions - if *fun == BuiltinScalarFunction::MakeArray { - expressions = expressions - .into_iter() - .map(|expr| { - let data_type = expr.get_type(schema).unwrap(); - if let DataType::FixedSizeList(field, _) = data_type { - let field = field.as_ref().clone(); - let to_type = DataType::List(Arc::new(field)); - expr.cast_to(&to_type, schema) - } else { - Ok(expr) - } - }) - .collect::>>()?; - } - - Ok(expressions) -} /// Cast `expr` to the specified type, if possible fn cast_expr(expr: &Expr, to_type: &DataType, schema: &DFSchema) -> Result { @@ -762,10 +728,8 @@ mod test { use std::any::Any; use std::sync::{Arc, OnceLock}; - use arrow::array::{FixedSizeListArray, Int32Array}; use arrow::datatypes::{DataType, TimeUnit}; - use arrow::datatypes::Field; use datafusion_common::tree_node::TreeNode; use datafusion_common::{DFField, DFSchema, DFSchemaRef, Result, ScalarValue}; use datafusion_expr::expr::{self, InSubquery, Like, ScalarFunction}; @@ -783,7 +747,7 @@ mod test { use datafusion_physical_expr::expressions::AvgAccumulator; use crate::analyzer::type_coercion::{ - cast_expr, coerce_case_expression, TypeCoercion, TypeCoercionRewriter, + coerce_case_expression, TypeCoercion, TypeCoercionRewriter, }; use crate::test::assert_analyzed_plan_eq; @@ -1263,57 +1227,6 @@ mod test { Ok(()) } - #[test] - fn test_casting_for_fixed_size_list() -> Result<()> { - let val = lit(ScalarValue::FixedSizeList(Arc::new( - FixedSizeListArray::new( - Arc::new(Field::new("item", DataType::Int32, true)), - 3, - Arc::new(Int32Array::from(vec![1, 2, 3])), - None, - ), - ))); - let expr = Expr::ScalarFunction(ScalarFunction::new( - BuiltinScalarFunction::MakeArray, - vec![val.clone()], - )); - let schema = Arc::new(DFSchema::new_with_metadata( - vec![DFField::new_unqualified( - "item", - DataType::FixedSizeList( - Arc::new(Field::new("a", DataType::Int32, true)), - 3, - ), - true, - )], - std::collections::HashMap::new(), - )?); - let mut rewriter = TypeCoercionRewriter { schema }; - let result = expr.rewrite(&mut rewriter)?; - - let schema = Arc::new(DFSchema::new_with_metadata( - vec![DFField::new_unqualified( - "item", - DataType::List(Arc::new(Field::new("a", DataType::Int32, true))), - true, - )], - std::collections::HashMap::new(), - )?); - let expected_casted_expr = cast_expr( - &val, - &DataType::List(Arc::new(Field::new("item", DataType::Int32, true))), - &schema, - )?; - - let expected = Expr::ScalarFunction(ScalarFunction::new( - BuiltinScalarFunction::MakeArray, - vec![expected_casted_expr], - )); - - assert_eq!(result, expected); - Ok(()) - } - #[test] fn test_type_coercion_rewrite() -> Result<()> { // gt diff --git a/datafusion/physical-expr/src/array_expressions.rs b/datafusion/physical-expr/src/array_expressions.rs index bdcdf76e79a0..97db51e3e14b 100644 --- a/datafusion/physical-expr/src/array_expressions.rs +++ b/datafusion/physical-expr/src/array_expressions.rs @@ -19,7 +19,6 @@ use std::any::type_name; use std::collections::HashSet; -use std::fmt::{Display, Formatter}; use std::sync::Arc; use arrow::array::*; @@ -34,7 +33,7 @@ use datafusion_common::cast::{ as_generic_list_array, as_generic_string_array, as_int64_array, as_large_list_array, as_list_array, as_null_array, as_string_array, }; -use datafusion_common::utils::{array_into_list_array, list_ndims}; +use datafusion_common::utils::list_ndims; use datafusion_common::{ exec_err, internal_datafusion_err, internal_err, not_impl_err, plan_err, DataFusionError, Result, ScalarValue, diff --git a/datafusion/physical-expr/src/functions.rs b/datafusion/physical-expr/src/functions.rs index ca073516986c..26d49d861a71 100644 --- a/datafusion/physical-expr/src/functions.rs +++ b/datafusion/physical-expr/src/functions.rs @@ -351,9 +351,6 @@ pub fn create_physical_fun( BuiltinScalarFunction::ArrayDims => Arc::new(|args| { make_scalar_function_inner(array_expressions::array_dims)(args) }), - BuiltinScalarFunction::ArrayDistinct => Arc::new(|args| { - make_scalar_function_inner(array_expressions::array_distinct)(args) - }), BuiltinScalarFunction::ArrayElement => Arc::new(|args| { make_scalar_function_inner(array_expressions::array_element)(args) }), @@ -411,9 +408,6 @@ pub fn create_physical_fun( BuiltinScalarFunction::ArraySlice => Arc::new(|args| { make_scalar_function_inner(array_expressions::array_slice)(args) }), - BuiltinScalarFunction::ArrayIntersect => Arc::new(|args| { - make_scalar_function_inner(array_expressions::array_intersect)(args) - }), BuiltinScalarFunction::Range => Arc::new(|args| { make_scalar_function_inner(array_expressions::gen_range)(args) }), @@ -423,12 +417,6 @@ pub fn create_physical_fun( BuiltinScalarFunction::ArrayResize => Arc::new(|args| { make_scalar_function_inner(array_expressions::array_resize)(args) }), - BuiltinScalarFunction::MakeArray => Arc::new(|args| { - make_scalar_function_inner(array_expressions::make_array)(args) - }), - BuiltinScalarFunction::ArrayUnion => Arc::new(|args| { - make_scalar_function_inner(array_expressions::array_union)(args) - }), // struct functions BuiltinScalarFunction::Struct => Arc::new(struct_expressions::struct_expr), diff --git a/datafusion/physical-expr/src/scalar_function.rs b/datafusion/physical-expr/src/scalar_function.rs index b73626aa4340..3c53c8dee6d3 100644 --- a/datafusion/physical-expr/src/scalar_function.rs +++ b/datafusion/physical-expr/src/scalar_function.rs @@ -43,7 +43,7 @@ use arrow::datatypes::{DataType, Schema}; use arrow::record_batch::RecordBatch; use datafusion_common::Result; use datafusion_expr::{ - expr_vec_fmt, BuiltinScalarFunction, ColumnarValue, FuncMonotonicity, + expr_vec_fmt, ColumnarValue, FuncMonotonicity, ScalarFunctionImplementation, }; @@ -142,30 +142,15 @@ impl PhysicalExpr for ScalarFunctionExpr { fn evaluate(&self, batch: &RecordBatch) -> Result { // evaluate the arguments, if there are no arguments we'll instead pass in a null array // indicating the batch size (as a convention) - let inputs = match ( - self.args.is_empty(), - self.name.parse::(), - ) { - // MakeArray support zero argument but has the different behavior from the array with one null. - (true, Ok(scalar_fun)) - if scalar_fun - .signature() - .type_signature - .supports_zero_argument() - && scalar_fun != BuiltinScalarFunction::MakeArray => - { - vec![ColumnarValue::create_null_array(batch.num_rows())] - } - // If the function supports zero argument, we pass in a null array indicating the batch size. - // This is for user-defined functions. - (true, Err(_)) if self.supports_zero_argument => { - vec![ColumnarValue::create_null_array(batch.num_rows())] - } - _ => self + let inputs = if self.args.is_empty() + { + vec![ColumnarValue::create_null_array(batch.num_rows())] + } else { + self .args .iter() .map(|e| e.evaluate(batch)) - .collect::>>()?, + .collect::>>()? }; // evaluate the function diff --git a/datafusion/sql/src/expr/function.rs b/datafusion/sql/src/expr/function.rs index 3187f26dcc5d..8b3edcea3779 100644 --- a/datafusion/sql/src/expr/function.rs +++ b/datafusion/sql/src/expr/function.rs @@ -80,14 +80,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { return exec_err!("unnest() requires at least one argument"); } 1 => { - if let Expr::ScalarFunction(ScalarFunction { - func_def: - ScalarFunctionDefinition::BuiltIn( - BuiltinScalarFunction::MakeArray, - ), - .. - }) = exprs[0] - { + if is_make_array(&exprs[0]) { // valid } else if let Expr::Column(_) = exprs[0] { // valid @@ -312,3 +305,11 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { .collect::>>() } } + +fn is_make_array(expr: &Expr) -> bool { + if let Expr::ScalarFunction(ScalarFunction { func_def, .. }) = expr { + func_def.name() == "make_array" + } else { + false + } +} diff --git a/datafusion/sql/src/expr/value.rs b/datafusion/sql/src/expr/value.rs index c0870cc54106..53c000f29455 100644 --- a/datafusion/sql/src/expr/value.rs +++ b/datafusion/sql/src/expr/value.rs @@ -20,11 +20,10 @@ use arrow::compute::kernels::cast_utils::parse_interval_month_day_nano; use arrow::datatypes::DECIMAL128_MAX_PRECISION; use arrow_schema::DataType; use datafusion_common::{ - not_impl_err, plan_err, DFSchema, DataFusionError, Result, ScalarValue, + not_impl_err, plan_datafusion_err, plan_err, DFSchema, DataFusionError, Result, + ScalarValue, }; -use datafusion_expr::expr::ScalarFunction; use datafusion_expr::expr::{BinaryExpr, Placeholder}; -use datafusion_expr::BuiltinScalarFunction; use datafusion_expr::{lit, Expr, Operator}; use log::debug; use sqlparser::ast::{BinaryOperator, Expr as SQLExpr, Interval, Value}; @@ -142,10 +141,16 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { }) .collect::>>()?; - Ok(Expr::ScalarFunction(ScalarFunction::new( - BuiltinScalarFunction::MakeArray, - values, - ))) + let make_array = self + .context_provider + .get_function_meta("make_array") + .ok_or_else(|| { + plan_datafusion_err!( + "Can not plan array literal: could not find make_array function" + ) + })?; + + Ok(make_array.call(values)) } /// Convert a SQL interval expression to a DataFusion logical plan From 55915ec260dd0654a5c0cbda21bcef46931040d0 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 13 Feb 2024 16:13:30 -0500 Subject: [PATCH 7/7] more detangling --- datafusion/functions-array/src/lib.rs | 2 +- datafusion/functions-array/src/make_array.rs | 100 ++++++++++++++- .../optimizer/src/analyzer/rewrite_expr.rs | 121 +++--------------- .../optimizer/src/analyzer/type_coercion.rs | 7 +- .../physical-expr/src/array_expressions.rs | 52 +------- .../physical-expr/src/scalar_function.rs | 9 +- datafusion/proto/proto/datafusion.proto | 8 +- datafusion/proto/src/generated/pbjson.rs | 12 -- datafusion/proto/src/generated/prost.rs | 16 +-- .../proto/src/logical_plan/from_proto.rs | 39 ++---- datafusion/proto/src/logical_plan/to_proto.rs | 4 - 11 files changed, 141 insertions(+), 229 deletions(-) diff --git a/datafusion/functions-array/src/lib.rs b/datafusion/functions-array/src/lib.rs index f897a257482b..f6535495011b 100644 --- a/datafusion/functions-array/src/lib.rs +++ b/datafusion/functions-array/src/lib.rs @@ -41,7 +41,7 @@ use std::sync::Arc; /// Fluent-style API for creating `Expr`s pub mod expr_fn { pub use super::{ - make_array::make_array, + make_array::{array, make_array}, set_ops::{array_distinct, array_intersect, array_union}, to_string::array_to_string, }; diff --git a/datafusion/functions-array/src/make_array.rs b/datafusion/functions-array/src/make_array.rs index 8858296f095e..5f4011bdedc0 100644 --- a/datafusion/functions-array/src/make_array.rs +++ b/datafusion/functions-array/src/make_array.rs @@ -26,7 +26,7 @@ use arrow::datatypes::{DataType, Field}; use datafusion_common::utils::array_into_list_array; use datafusion_common::{plan_err, DataFusionError, Result}; use datafusion_expr::{ - ColumnarValue, ScalarUDFImpl, Signature, TypeSignature, Volatility, + ColumnarValue, Expr, ScalarUDFImpl, Signature, TypeSignature, Volatility, }; use std::any::Any; use std::sync::Arc; @@ -40,6 +40,11 @@ make_udf_function!( udf ); +/// Create a new array from a list of expressions +pub fn array(args: Vec) -> Expr { + udf().call(args) +} + #[derive(Debug)] pub(super) struct MakeArray { signature: Signature, @@ -64,7 +69,7 @@ impl ScalarUDFImpl for MakeArray { self } fn name(&self) -> &str { - "array_to_string" + "make_array" } fn signature(&self) -> &Signature { @@ -213,3 +218,94 @@ fn array_array( None, )?)) } + +#[cfg(test)] +mod tests { + use super::*; + use arrow::array::ListArray; + use arrow::datatypes::Int64Type; + use datafusion_common::cast::as_list_array; + + /// Only test internal functions, array-related sql functions will be tested in sqllogictest `array.slt` + #[test] + fn test_align_array_dimensions() { + let array1d_1 = + Arc::new(ListArray::from_iter_primitive::(vec![ + Some(vec![Some(1), Some(2), Some(3)]), + Some(vec![Some(4), Some(5)]), + ])); + let array1d_2 = + Arc::new(ListArray::from_iter_primitive::(vec![ + Some(vec![Some(6), Some(7), Some(8)]), + ])); + + let array2d_1 = Arc::new(array_into_list_array(array1d_1.clone())) as ArrayRef; + let array2d_2 = Arc::new(array_into_list_array(array1d_2.clone())) as ArrayRef; + + let res = align_array_dimensions::(vec![ + array1d_1.to_owned(), + array2d_2.to_owned(), + ]) + .unwrap(); + + let expected = as_list_array(&array2d_1).unwrap(); + let expected_dim = datafusion_common::utils::list_ndims(array2d_1.data_type()); + assert_ne!(as_list_array(&res[0]).unwrap(), expected); + assert_eq!( + datafusion_common::utils::list_ndims(res[0].data_type()), + expected_dim + ); + + let array3d_1 = Arc::new(array_into_list_array(array2d_1)) as ArrayRef; + let array3d_2 = array_into_list_array(array2d_2.to_owned()); + let res = + align_array_dimensions::(vec![array1d_1, Arc::new(array3d_2.clone())]) + .unwrap(); + + let expected = as_list_array(&array3d_1).unwrap(); + let expected_dim = datafusion_common::utils::list_ndims(array3d_1.data_type()); + assert_ne!(as_list_array(&res[0]).unwrap(), expected); + assert_eq!( + datafusion_common::utils::list_ndims(res[0].data_type()), + expected_dim + ); + } + + fn align_array_dimensions( + args: Vec, + ) -> Result> { + let args_ndim = args + .iter() + .map(|arg| datafusion_common::utils::list_ndims(arg.data_type())) + .collect::>(); + let max_ndim = args_ndim.iter().max().unwrap_or(&0); + + // Align the dimensions of the arrays + let aligned_args: Result> = args + .into_iter() + .zip(args_ndim.iter()) + .map(|(array, ndim)| { + if ndim < max_ndim { + let mut aligned_array = array.clone(); + for _ in 0..(max_ndim - ndim) { + let data_type = aligned_array.data_type().to_owned(); + let array_lengths = vec![1; aligned_array.len()]; + let offsets = OffsetBuffer::::from_lengths(array_lengths); + + aligned_array = Arc::new(GenericListArray::::try_new( + Arc::new(Field::new("item", data_type, true)), + offsets, + aligned_array, + None, + )?) + } + Ok(aligned_array) + } else { + Ok(array.clone()) + } + }) + .collect(); + + aligned_args + } +} diff --git a/datafusion/optimizer/src/analyzer/rewrite_expr.rs b/datafusion/optimizer/src/analyzer/rewrite_expr.rs index 036cdf31a22a..be99d0736b7f 100644 --- a/datafusion/optimizer/src/analyzer/rewrite_expr.rs +++ b/datafusion/optimizer/src/analyzer/rewrite_expr.rs @@ -140,9 +140,9 @@ impl TreeNodeRewriter for OperatorToFunctionRewriter { /// /// 3) scalar || array -> array prepend /// -/// 4) (arry concat, array append, array prepend) || array -> array concat +/// 4) (aray concat, array append, array prepend) || array -> array concat /// -/// 5) (arry concat, array append, array prepend) || scalar -> array append +/// 5) (array concat, array append, array prepend) || scalar -> array append fn rewrite_array_concat_operator_to_func( left: &Expr, op: Operator, @@ -154,120 +154,35 @@ fn rewrite_array_concat_operator_to_func( return None; } - fn is_make_array(expr: &Expr) -> bool { + fn fn_name(expr: &Expr) -> Option<&str> { if let Expr::ScalarFunction(ScalarFunction { func_def, .. }) = expr { - func_def.name() == "make_array" + Some(func_def.name()) } else { - false - } - } - - fn is_array_concat(expr: &Expr) -> bool { - if let Expr::ScalarFunction(ScalarFunction { func_def, .. }) = expr { - func_def.name() == "array_concat" - } else { - false + None } } // TODO figure out how to generalize this so it isn't hard coded by name - match (left, right) { + match (fn_name(left), fn_name(right)) { // Chain concat operator (a || b) || array, - // (arry concat, array append, array prepend) || array -> array concat + // (array concat, array append, array prepend) || array -> array concat ( - Expr::ScalarFunction(ScalarFunction { - func_def: - ScalarFunctionDefinition::BuiltIn(BuiltinScalarFunction::ArrayConcat), - args: _left_args, - }), - Expr::ScalarFunction(ScalarFunction { - func_def: - ScalarFunctionDefinition::BuiltIn(BuiltinScalarFunction::MakeArray), - args: _right_args, - }), - ) - | ( - Expr::ScalarFunction(ScalarFunction { - func_def: - ScalarFunctionDefinition::BuiltIn(BuiltinScalarFunction::ArrayAppend), - args: _left_args, - }), - Expr::ScalarFunction(ScalarFunction { - func_def: - ScalarFunctionDefinition::BuiltIn(BuiltinScalarFunction::MakeArray), - args: _right_args, - }), - ) - | ( - Expr::ScalarFunction(ScalarFunction { - func_def: - ScalarFunctionDefinition::BuiltIn(BuiltinScalarFunction::ArrayPrepend), - args: _left_args, - }), - Expr::ScalarFunction(ScalarFunction { - func_def: - ScalarFunctionDefinition::BuiltIn(BuiltinScalarFunction::MakeArray), - args: _right_args, - }), + Some("array_concat") | Some("array_append") | Some("array_prepend"), + Some("make_array"), ) => Some(BuiltinScalarFunction::ArrayConcat), // Chain concat operator (a || b) || scalar, - // (arry concat, array append, array prepend) || scalar -> array append - ( - Expr::ScalarFunction(ScalarFunction { - func_def: - ScalarFunctionDefinition::BuiltIn(BuiltinScalarFunction::ArrayConcat), - args: _left_args, - }), - _scalar, - ) - | ( - Expr::ScalarFunction(ScalarFunction { - func_def: - ScalarFunctionDefinition::BuiltIn(BuiltinScalarFunction::ArrayAppend), - args: _left_args, - }), - _scalar, - ) - | ( - Expr::ScalarFunction(ScalarFunction { - func_def: - ScalarFunctionDefinition::BuiltIn(BuiltinScalarFunction::ArrayPrepend), - args: _left_args, - }), - _scalar, - ) => Some(BuiltinScalarFunction::ArrayAppend), + // (array concat, array append, array prepend) || scalar -> array append + (Some("array_concat") | Some("array_append") | Some("array_prepend"), _) => { + Some(BuiltinScalarFunction::ArrayAppend) + } // array || array -> array concat - ( - Expr::ScalarFunction(ScalarFunction { - func_def: - ScalarFunctionDefinition::BuiltIn(BuiltinScalarFunction::MakeArray), - args: _left_args, - }), - Expr::ScalarFunction(ScalarFunction { - func_def: - ScalarFunctionDefinition::BuiltIn(BuiltinScalarFunction::MakeArray), - args: _right_args, - }), - ) => Some(BuiltinScalarFunction::ArrayConcat), + (Some("make_array"), Some("make_array")) => { + Some(BuiltinScalarFunction::ArrayConcat) + } // array || scalar -> array append - ( - Expr::ScalarFunction(ScalarFunction { - func_def: - ScalarFunctionDefinition::BuiltIn(BuiltinScalarFunction::MakeArray), - args: _left_args, - }), - _right_scalar, - ) => Some(BuiltinScalarFunction::ArrayAppend), + (Some("make_array"), _) => Some(BuiltinScalarFunction::ArrayAppend), // scalar || array -> array prepend - ( - _left_scalar, - Expr::ScalarFunction(ScalarFunction { - func_def: - ScalarFunctionDefinition::BuiltIn(BuiltinScalarFunction::MakeArray), - args: _right_args, - }), - ) => Some(BuiltinScalarFunction::ArrayPrepend), - + (_, Some("make_array")) => Some(BuiltinScalarFunction::ArrayPrepend), _ => None, } } diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs index b45d04fd2fb3..f2e0ad36726d 100644 --- a/datafusion/optimizer/src/analyzer/type_coercion.rs +++ b/datafusion/optimizer/src/analyzer/type_coercion.rs @@ -45,9 +45,9 @@ use datafusion_expr::type_coercion::{is_datetime, is_utf8_or_large_utf8}; use datafusion_expr::utils::merge_schema; use datafusion_expr::{ is_false, is_not_false, is_not_true, is_not_unknown, is_true, is_unknown, not, - type_coercion, AggregateFunction, Expr, ExprSchemable, - LogicalPlan, Operator, Projection, ScalarFunctionDefinition, Signature, WindowFrame, - WindowFrameBound, WindowFrameUnits, + type_coercion, AggregateFunction, Expr, ExprSchemable, LogicalPlan, Operator, + Projection, ScalarFunctionDefinition, Signature, WindowFrame, WindowFrameBound, + WindowFrameUnits, }; use crate::analyzer::AnalyzerRule; @@ -579,7 +579,6 @@ fn coerce_arguments_for_signature( .collect::>>() } - /// Cast `expr` to the specified type, if possible fn cast_expr(expr: &Expr, to_type: &DataType, schema: &DFSchema) -> Result { expr.clone().cast_to(to_type, schema) diff --git a/datafusion/physical-expr/src/array_expressions.rs b/datafusion/physical-expr/src/array_expressions.rs index 97db51e3e14b..3e15c709e05b 100644 --- a/datafusion/physical-expr/src/array_expressions.rs +++ b/datafusion/physical-expr/src/array_expressions.rs @@ -751,6 +751,7 @@ where for (row_index, offset_window) in list_array.offsets().windows(2).enumerate() { let start = offset_window[0].to_usize().unwrap(); let end = offset_window[1].to_usize().unwrap(); + if is_append { mutable.extend(values_index, start, end); mutable.extend(element_index, row_index, row_index + 1); @@ -2295,54 +2296,3 @@ where Some(nulls.into()), )?)) } - -#[cfg(test)] -mod tests { - use super::*; - use arrow::datatypes::Int64Type; - - /// Only test internal functions, array-related sql functions will be tested in sqllogictest `array.slt` - #[test] - fn test_align_array_dimensions() { - let array1d_1 = - Arc::new(ListArray::from_iter_primitive::(vec![ - Some(vec![Some(1), Some(2), Some(3)]), - Some(vec![Some(4), Some(5)]), - ])); - let array1d_2 = - Arc::new(ListArray::from_iter_primitive::(vec![ - Some(vec![Some(6), Some(7), Some(8)]), - ])); - - let array2d_1 = Arc::new(array_into_list_array(array1d_1.clone())) as ArrayRef; - let array2d_2 = Arc::new(array_into_list_array(array1d_2.clone())) as ArrayRef; - - let res = align_array_dimensions::(vec![ - array1d_1.to_owned(), - array2d_2.to_owned(), - ]) - .unwrap(); - - let expected = as_list_array(&array2d_1).unwrap(); - let expected_dim = datafusion_common::utils::list_ndims(array2d_1.data_type()); - assert_ne!(as_list_array(&res[0]).unwrap(), expected); - assert_eq!( - datafusion_common::utils::list_ndims(res[0].data_type()), - expected_dim - ); - - let array3d_1 = Arc::new(array_into_list_array(array2d_1)) as ArrayRef; - let array3d_2 = array_into_list_array(array2d_2.to_owned()); - let res = - align_array_dimensions::(vec![array1d_1, Arc::new(array3d_2.clone())]) - .unwrap(); - - let expected = as_list_array(&array3d_1).unwrap(); - let expected_dim = datafusion_common::utils::list_ndims(array3d_1.data_type()); - assert_ne!(as_list_array(&res[0]).unwrap(), expected); - assert_eq!( - datafusion_common::utils::list_ndims(res[0].data_type()), - expected_dim - ); - } -} diff --git a/datafusion/physical-expr/src/scalar_function.rs b/datafusion/physical-expr/src/scalar_function.rs index 3c53c8dee6d3..4111167bc321 100644 --- a/datafusion/physical-expr/src/scalar_function.rs +++ b/datafusion/physical-expr/src/scalar_function.rs @@ -43,8 +43,7 @@ use arrow::datatypes::{DataType, Schema}; use arrow::record_batch::RecordBatch; use datafusion_common::Result; use datafusion_expr::{ - expr_vec_fmt, ColumnarValue, FuncMonotonicity, - ScalarFunctionImplementation, + expr_vec_fmt, ColumnarValue, FuncMonotonicity, ScalarFunctionImplementation, }; /// Physical expression of a scalar function @@ -142,12 +141,10 @@ impl PhysicalExpr for ScalarFunctionExpr { fn evaluate(&self, batch: &RecordBatch) -> Result { // evaluate the arguments, if there are no arguments we'll instead pass in a null array // indicating the batch size (as a convention) - let inputs = if self.args.is_empty() - { + let inputs = if self.args.is_empty() { vec![ColumnarValue::create_null_array(batch.num_rows())] } else { - self - .args + self.args .iter() .map(|e| e.evaluate(batch)) .collect::>>()? diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 3c8bd4ef305b..92a478fc716a 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -564,7 +564,7 @@ enum ScalarFunction { Sqrt = 17; Tan = 18; Trunc = 19; - Array = 20; + // 20 was Array RegexpMatch = 21; BitLength = 22; Btrim = 23; @@ -661,8 +661,8 @@ enum ScalarFunction { ArrayPopBack = 116; StringToArray = 117; ToTimestampNanos = 118; - ArrayIntersect = 119; - ArrayUnion = 120; + // 119 was ArrayIntersect + // 120 was ArrayUnion OverLay = 121; Range = 122; ArrayExcept = 123; @@ -671,7 +671,7 @@ enum ScalarFunction { SubstrIndex = 126; FindInSet = 127; ArraySort = 128; - ArrayDistinct = 129; + // 129 was ArrayDistinct ArrayResize = 130; EndsWith = 131; InStr = 132; diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 89e170d3ec26..6a3452bbcc6f 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -22323,7 +22323,6 @@ impl serde::Serialize for ScalarFunction { Self::Sqrt => "Sqrt", Self::Tan => "Tan", Self::Trunc => "Trunc", - Self::Array => "Array", Self::RegexpMatch => "RegexpMatch", Self::BitLength => "BitLength", Self::Btrim => "Btrim", @@ -22419,8 +22418,6 @@ impl serde::Serialize for ScalarFunction { Self::ArrayPopBack => "ArrayPopBack", Self::StringToArray => "StringToArray", Self::ToTimestampNanos => "ToTimestampNanos", - Self::ArrayIntersect => "ArrayIntersect", - Self::ArrayUnion => "ArrayUnion", Self::OverLay => "OverLay", Self::Range => "Range", Self::ArrayExcept => "ArrayExcept", @@ -22429,7 +22426,6 @@ impl serde::Serialize for ScalarFunction { Self::SubstrIndex => "SubstrIndex", Self::FindInSet => "FindInSet", Self::ArraySort => "ArraySort", - Self::ArrayDistinct => "ArrayDistinct", Self::ArrayResize => "ArrayResize", Self::EndsWith => "EndsWith", Self::InStr => "InStr", @@ -22468,7 +22464,6 @@ impl<'de> serde::Deserialize<'de> for ScalarFunction { "Sqrt", "Tan", "Trunc", - "Array", "RegexpMatch", "BitLength", "Btrim", @@ -22564,8 +22559,6 @@ impl<'de> serde::Deserialize<'de> for ScalarFunction { "ArrayPopBack", "StringToArray", "ToTimestampNanos", - "ArrayIntersect", - "ArrayUnion", "OverLay", "Range", "ArrayExcept", @@ -22574,7 +22567,6 @@ impl<'de> serde::Deserialize<'de> for ScalarFunction { "SubstrIndex", "FindInSet", "ArraySort", - "ArrayDistinct", "ArrayResize", "EndsWith", "InStr", @@ -22642,7 +22634,6 @@ impl<'de> serde::Deserialize<'de> for ScalarFunction { "Sqrt" => Ok(ScalarFunction::Sqrt), "Tan" => Ok(ScalarFunction::Tan), "Trunc" => Ok(ScalarFunction::Trunc), - "Array" => Ok(ScalarFunction::Array), "RegexpMatch" => Ok(ScalarFunction::RegexpMatch), "BitLength" => Ok(ScalarFunction::BitLength), "Btrim" => Ok(ScalarFunction::Btrim), @@ -22738,8 +22729,6 @@ impl<'de> serde::Deserialize<'de> for ScalarFunction { "ArrayPopBack" => Ok(ScalarFunction::ArrayPopBack), "StringToArray" => Ok(ScalarFunction::StringToArray), "ToTimestampNanos" => Ok(ScalarFunction::ToTimestampNanos), - "ArrayIntersect" => Ok(ScalarFunction::ArrayIntersect), - "ArrayUnion" => Ok(ScalarFunction::ArrayUnion), "OverLay" => Ok(ScalarFunction::OverLay), "Range" => Ok(ScalarFunction::Range), "ArrayExcept" => Ok(ScalarFunction::ArrayExcept), @@ -22748,7 +22737,6 @@ impl<'de> serde::Deserialize<'de> for ScalarFunction { "SubstrIndex" => Ok(ScalarFunction::SubstrIndex), "FindInSet" => Ok(ScalarFunction::FindInSet), "ArraySort" => Ok(ScalarFunction::ArraySort), - "ArrayDistinct" => Ok(ScalarFunction::ArrayDistinct), "ArrayResize" => Ok(ScalarFunction::ArrayResize), "EndsWith" => Ok(ScalarFunction::EndsWith), "InStr" => Ok(ScalarFunction::InStr), diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 679df2b8d937..05617909c87f 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -2651,7 +2651,7 @@ pub enum ScalarFunction { Sqrt = 17, Tan = 18, Trunc = 19, - Array = 20, + /// 20 was Array RegexpMatch = 21, BitLength = 22, Btrim = 23, @@ -2748,8 +2748,8 @@ pub enum ScalarFunction { ArrayPopBack = 116, StringToArray = 117, ToTimestampNanos = 118, - ArrayIntersect = 119, - ArrayUnion = 120, + /// 119 was ArrayIntersect + /// 120 was ArrayUnion OverLay = 121, Range = 122, ArrayExcept = 123, @@ -2758,7 +2758,7 @@ pub enum ScalarFunction { SubstrIndex = 126, FindInSet = 127, ArraySort = 128, - ArrayDistinct = 129, + /// 129 was ArrayDistinct ArrayResize = 130, EndsWith = 131, InStr = 132, @@ -2794,7 +2794,6 @@ impl ScalarFunction { ScalarFunction::Sqrt => "Sqrt", ScalarFunction::Tan => "Tan", ScalarFunction::Trunc => "Trunc", - ScalarFunction::Array => "Array", ScalarFunction::RegexpMatch => "RegexpMatch", ScalarFunction::BitLength => "BitLength", ScalarFunction::Btrim => "Btrim", @@ -2890,8 +2889,6 @@ impl ScalarFunction { ScalarFunction::ArrayPopBack => "ArrayPopBack", ScalarFunction::StringToArray => "StringToArray", ScalarFunction::ToTimestampNanos => "ToTimestampNanos", - ScalarFunction::ArrayIntersect => "ArrayIntersect", - ScalarFunction::ArrayUnion => "ArrayUnion", ScalarFunction::OverLay => "OverLay", ScalarFunction::Range => "Range", ScalarFunction::ArrayExcept => "ArrayExcept", @@ -2900,7 +2897,6 @@ impl ScalarFunction { ScalarFunction::SubstrIndex => "SubstrIndex", ScalarFunction::FindInSet => "FindInSet", ScalarFunction::ArraySort => "ArraySort", - ScalarFunction::ArrayDistinct => "ArrayDistinct", ScalarFunction::ArrayResize => "ArrayResize", ScalarFunction::EndsWith => "EndsWith", ScalarFunction::InStr => "InStr", @@ -2933,7 +2929,6 @@ impl ScalarFunction { "Sqrt" => Some(Self::Sqrt), "Tan" => Some(Self::Tan), "Trunc" => Some(Self::Trunc), - "Array" => Some(Self::Array), "RegexpMatch" => Some(Self::RegexpMatch), "BitLength" => Some(Self::BitLength), "Btrim" => Some(Self::Btrim), @@ -3029,8 +3024,6 @@ impl ScalarFunction { "ArrayPopBack" => Some(Self::ArrayPopBack), "StringToArray" => Some(Self::StringToArray), "ToTimestampNanos" => Some(Self::ToTimestampNanos), - "ArrayIntersect" => Some(Self::ArrayIntersect), - "ArrayUnion" => Some(Self::ArrayUnion), "OverLay" => Some(Self::OverLay), "Range" => Some(Self::Range), "ArrayExcept" => Some(Self::ArrayExcept), @@ -3039,7 +3032,6 @@ impl ScalarFunction { "SubstrIndex" => Some(Self::SubstrIndex), "FindInSet" => Some(Self::FindInSet), "ArraySort" => Some(Self::ArraySort), - "ArrayDistinct" => Some(Self::ArrayDistinct), "ArrayResize" => Some(Self::ArrayResize), "EndsWith" => Some(Self::EndsWith), "InStr" => Some(Self::InStr), diff --git a/datafusion/proto/src/logical_plan/from_proto.rs b/datafusion/proto/src/logical_plan/from_proto.rs index 3a2494c01568..42cc84eb027a 100644 --- a/datafusion/proto/src/logical_plan/from_proto.rs +++ b/datafusion/proto/src/logical_plan/from_proto.rs @@ -47,15 +47,15 @@ use datafusion_common::{ use datafusion_expr::expr::Unnest; use datafusion_expr::window_frame::{check_window_frame, regularize_window_order_by}; use datafusion_expr::{ - abs, acos, acosh, array, array_append, array_concat, array_dims, array_distinct, - array_element, array_empty, array_except, array_has, array_has_all, array_has_any, - array_intersect, array_length, array_ndims, array_pop_back, array_pop_front, - array_position, array_positions, array_prepend, array_remove, array_remove_all, - array_remove_n, array_repeat, array_replace, array_replace_all, array_replace_n, - array_resize, array_slice, array_sort, array_union, arrow_typeof, ascii, asin, asinh, - atan, atan2, atanh, bit_length, btrim, cardinality, cbrt, ceil, character_length, - chr, coalesce, concat_expr, concat_ws_expr, cos, cosh, cot, current_date, - current_time, date_bin, date_part, date_trunc, degrees, digest, ends_with, exp, + abs, acos, acosh, array_append, array_concat, array_dims, array_element, array_empty, + array_except, array_has, array_has_all, array_has_any, array_length, array_ndims, + array_pop_back, array_pop_front, array_position, array_positions, array_prepend, + array_remove, array_remove_all, array_remove_n, array_repeat, array_replace, + array_replace_all, array_replace_n, array_resize, array_slice, array_sort, + arrow_typeof, ascii, asin, asinh, atan, atan2, atanh, bit_length, btrim, cardinality, + cbrt, ceil, character_length, chr, coalesce, concat_expr, concat_ws_expr, cos, cosh, + cot, current_date, current_time, date_bin, date_part, date_trunc, degrees, digest, + ends_with, exp, expr::{self, InList, Sort, WindowFunction}, factorial, find_in_set, flatten, floor, from_unixtime, gcd, gen_range, initcap, instr, isnan, iszero, lcm, left, levenshtein, ln, log, log10, log2, @@ -487,7 +487,6 @@ impl From<&protobuf::ScalarFunction> for BuiltinScalarFunction { ScalarFunction::ArrayHasAny => Self::ArrayHasAny, ScalarFunction::ArrayHas => Self::ArrayHas, ScalarFunction::ArrayDims => Self::ArrayDims, - ScalarFunction::ArrayDistinct => Self::ArrayDistinct, ScalarFunction::ArrayElement => Self::ArrayElement, ScalarFunction::Flatten => Self::Flatten, ScalarFunction::ArrayLength => Self::ArrayLength, @@ -506,12 +505,9 @@ impl From<&protobuf::ScalarFunction> for BuiltinScalarFunction { ScalarFunction::ArrayReplaceAll => Self::ArrayReplaceAll, ScalarFunction::ArrayReverse => Self::ArrayReverse, ScalarFunction::ArraySlice => Self::ArraySlice, - ScalarFunction::ArrayIntersect => Self::ArrayIntersect, - ScalarFunction::ArrayUnion => Self::ArrayUnion, ScalarFunction::ArrayResize => Self::ArrayResize, ScalarFunction::Range => Self::Range, ScalarFunction::Cardinality => Self::Cardinality, - ScalarFunction::Array => Self::MakeArray, ScalarFunction::NullIf => Self::NullIf, ScalarFunction::DatePart => Self::DatePart, ScalarFunction::DateTrunc => Self::DateTrunc, @@ -1360,12 +1356,6 @@ pub fn parse_expr( ScalarFunction::Acos => Ok(acos(parse_expr(&args[0], registry)?)), ScalarFunction::Asinh => Ok(asinh(parse_expr(&args[0], registry)?)), ScalarFunction::Acosh => Ok(acosh(parse_expr(&args[0], registry)?)), - ScalarFunction::Array => Ok(array( - args.to_owned() - .iter() - .map(|expr| parse_expr(expr, registry)) - .collect::, _>>()?, - )), ScalarFunction::ArrayAppend => Ok(array_append( parse_expr(&args[0], registry)?, parse_expr(&args[1], registry)?, @@ -1407,10 +1397,6 @@ pub fn parse_expr( parse_expr(&args[0], registry)?, parse_expr(&args[1], registry)?, )), - ScalarFunction::ArrayIntersect => Ok(array_intersect( - parse_expr(&args[0], registry)?, - parse_expr(&args[1], registry)?, - )), ScalarFunction::ArrayPosition => Ok(array_position( parse_expr(&args[0], registry)?, parse_expr(&args[1], registry)?, @@ -1478,9 +1464,6 @@ pub fn parse_expr( ScalarFunction::ArrayDims => { Ok(array_dims(parse_expr(&args[0], registry)?)) } - ScalarFunction::ArrayDistinct => { - Ok(array_distinct(parse_expr(&args[0], registry)?)) - } ScalarFunction::ArrayElement => Ok(array_element( parse_expr(&args[0], registry)?, parse_expr(&args[1], registry)?, @@ -1491,10 +1474,6 @@ pub fn parse_expr( ScalarFunction::ArrayNdims => { Ok(array_ndims(parse_expr(&args[0], registry)?)) } - ScalarFunction::ArrayUnion => Ok(array_union( - parse_expr(&args[0], registry)?, - parse_expr(&args[1], registry)?, - )), ScalarFunction::ArrayResize => Ok(array_resize( parse_expr(&args[0], registry)?, parse_expr(&args[1], registry)?, diff --git a/datafusion/proto/src/logical_plan/to_proto.rs b/datafusion/proto/src/logical_plan/to_proto.rs index 9510846f7063..017978bf7811 100644 --- a/datafusion/proto/src/logical_plan/to_proto.rs +++ b/datafusion/proto/src/logical_plan/to_proto.rs @@ -1467,7 +1467,6 @@ impl TryFrom<&BuiltinScalarFunction> for protobuf::ScalarFunction { BuiltinScalarFunction::ArrayHasAny => Self::ArrayHasAny, BuiltinScalarFunction::ArrayHas => Self::ArrayHas, BuiltinScalarFunction::ArrayDims => Self::ArrayDims, - BuiltinScalarFunction::ArrayDistinct => Self::ArrayDistinct, BuiltinScalarFunction::ArrayElement => Self::ArrayElement, BuiltinScalarFunction::Flatten => Self::Flatten, BuiltinScalarFunction::ArrayLength => Self::ArrayLength, @@ -1487,11 +1486,8 @@ impl TryFrom<&BuiltinScalarFunction> for protobuf::ScalarFunction { BuiltinScalarFunction::ArrayReplaceAll => Self::ArrayReplaceAll, BuiltinScalarFunction::ArrayReverse => Self::ArrayReverse, BuiltinScalarFunction::ArraySlice => Self::ArraySlice, - BuiltinScalarFunction::ArrayIntersect => Self::ArrayIntersect, - BuiltinScalarFunction::ArrayUnion => Self::ArrayUnion, BuiltinScalarFunction::Range => Self::Range, BuiltinScalarFunction::Cardinality => Self::Cardinality, - BuiltinScalarFunction::MakeArray => Self::Array, BuiltinScalarFunction::NullIf => Self::NullIf, BuiltinScalarFunction::DatePart => Self::DatePart, BuiltinScalarFunction::DateTrunc => Self::DateTrunc,