Skip to content

Deprecate invoke and invoke_no_args in favor of invoke_batch #13174

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

31 changes: 29 additions & 2 deletions datafusion/expr/src/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ impl ScalarUDF {
/// See [`ScalarUDFImpl::invoke`] for more details.
#[deprecated(since = "42.1.0", note = "Use `invoke_batch` instead")]
pub fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
#[allow(deprecated)]
self.inner.invoke(args)
}

Expand All @@ -218,6 +219,7 @@ impl ScalarUDF {
/// See [`ScalarUDFImpl::invoke_no_args`] for more details.
#[deprecated(since = "42.1.0", note = "Use `invoke_batch` instead")]
pub fn invoke_no_args(&self, number_rows: usize) -> Result<ColumnarValue> {
#[allow(deprecated)]
self.inner.invoke_no_args(number_rows)
}

Expand All @@ -226,6 +228,7 @@ impl ScalarUDF {
#[deprecated(since = "42.0.0", note = "Use `invoke_batch` instead")]
pub fn fun(&self) -> ScalarFunctionImplementation {
let captured = Arc::clone(&self.inner);
#[allow(deprecated)]
Arc::new(move |args| captured.invoke(args))
}

Expand Down Expand Up @@ -480,6 +483,7 @@ pub trait ScalarUDFImpl: Debug + Send + Sync {
/// to arrays, which will likely be simpler code, but be slower.
///
/// [invoke_no_args]: ScalarUDFImpl::invoke_no_args
#[deprecated(since = "42.1.0", note = "Use `invoke_batch` instead")]
fn invoke(&self, _args: &[ColumnarValue]) -> Result<ColumnarValue> {
not_impl_err!(
"Function {} does not implement invoke but called",
Expand All @@ -489,19 +493,40 @@ pub trait ScalarUDFImpl: Debug + Send + Sync {

/// Invoke the function with `args` and the number of rows,
/// returning the appropriate result.
///
/// The function will be invoked with the slice of [`ColumnarValue`]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

/// (either scalar or array).
///
/// # Performance
///
/// For the best performance, the implementations should handle the common case
/// when one or more of their arguments are constant values (aka
/// [`ColumnarValue::Scalar`]).
///
/// [`ColumnarValue::values_to_arrays`] can be used to convert the arguments
/// to arrays, which will likely be simpler code, but be slower.
fn invoke_batch(
&self,
args: &[ColumnarValue],
number_rows: usize,
) -> Result<ColumnarValue> {
match args.is_empty() {
true => self.invoke_no_args(number_rows),
false => self.invoke(args),
true =>
{
#[allow(deprecated)]
self.invoke_no_args(number_rows)
}
false =>
{
#[allow(deprecated)]
self.invoke(args)
}
}
}

/// Invoke the function without `args`, instead the number of rows are provided,
/// returning the appropriate result.
#[deprecated(since = "42.1.0", note = "Use `invoke_batch` instead")]
fn invoke_no_args(&self, _number_rows: usize) -> Result<ColumnarValue> {
not_impl_err!(
"Function {} does not implement invoke_no_args but called",
Expand Down Expand Up @@ -725,10 +750,12 @@ impl ScalarUDFImpl for AliasedScalarUDFImpl {
}

fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
#[allow(deprecated)]
self.inner.invoke(args)
}

fn invoke_no_args(&self, number_rows: usize) -> Result<ColumnarValue> {
#[allow(deprecated)]
self.inner.invoke_no_args(number_rows)
}

Expand Down
4 changes: 2 additions & 2 deletions datafusion/functions/benches/random.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ fn criterion_benchmark(c: &mut Criterion) {
c.bench_function("random_1M_rows_batch_8192", |b| {
b.iter(|| {
for _ in 0..iterations {
black_box(random_func.invoke_no_args(8192).unwrap());
black_box(random_func.invoke_batch(&[], 8192).unwrap());
}
})
});
Expand All @@ -39,7 +39,7 @@ fn criterion_benchmark(c: &mut Criterion) {
c.bench_function("random_1M_rows_batch_128", |b| {
b.iter(|| {
for _ in 0..iterations_128 {
black_box(random_func.invoke_no_args(128).unwrap());
black_box(random_func.invoke_batch(&[], 128).unwrap());
}
})
});
Expand Down
2 changes: 2 additions & 0 deletions datafusion/functions/src/datetime/date_bin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,7 @@ mod tests {
use chrono::TimeDelta;

#[test]
#[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
fn test_date_bin() {
let res = DateBinFunc::new().invoke(&[
ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
Expand Down Expand Up @@ -781,6 +782,7 @@ mod tests {
.map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
.collect::<TimestampNanosecondArray>()
.with_timezone_opt(tz_opt.clone());
#[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
let result = DateBinFunc::new()
.invoke(&[
ColumnarValue::Scalar(ScalarValue::new_interval_dt(1, 0)),
Expand Down
2 changes: 2 additions & 0 deletions datafusion/functions/src/datetime/date_trunc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -724,6 +724,7 @@ mod tests {
.map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
.collect::<TimestampNanosecondArray>()
.with_timezone_opt(tz_opt.clone());
#[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
let result = DateTruncFunc::new()
.invoke(&[
ColumnarValue::Scalar(ScalarValue::from("day")),
Expand Down Expand Up @@ -882,6 +883,7 @@ mod tests {
.map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
.collect::<TimestampNanosecondArray>()
.with_timezone_opt(tz_opt.clone());
#[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
let result = DateTruncFunc::new()
.invoke(&[
ColumnarValue::Scalar(ScalarValue::from("hour")),
Expand Down
2 changes: 2 additions & 0 deletions datafusion/functions/src/datetime/from_unixtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ mod test {
fn test_without_timezone() {
let args = [ColumnarValue::Scalar(Int64(Some(1729900800)))];

#[allow(deprecated)] // TODO use invoke_batch
let result = FromUnixtimeFunc::new().invoke(&args).unwrap();

match result {
Expand All @@ -181,6 +182,7 @@ mod test {
))),
];

#[allow(deprecated)] // TODO use invoke_batch
let result = FromUnixtimeFunc::new().invoke(&args).unwrap();

match result {
Expand Down
8 changes: 8 additions & 0 deletions datafusion/functions/src/datetime/make_date.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ mod tests {

#[test]
fn test_make_date() {
#[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
let res = MakeDateFunc::new()
.invoke(&[
ColumnarValue::Scalar(ScalarValue::Int32(Some(2024))),
Expand All @@ -248,6 +249,7 @@ mod tests {
panic!("Expected a scalar value")
}

#[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
let res = MakeDateFunc::new()
.invoke(&[
ColumnarValue::Scalar(ScalarValue::Int64(Some(2024))),
Expand All @@ -262,6 +264,7 @@ mod tests {
panic!("Expected a scalar value")
}

#[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
let res = MakeDateFunc::new()
.invoke(&[
ColumnarValue::Scalar(ScalarValue::Utf8(Some("2024".to_string()))),
Expand All @@ -279,6 +282,7 @@ mod tests {
let years = Arc::new((2021..2025).map(Some).collect::<Int64Array>());
let months = Arc::new((1..5).map(Some).collect::<Int32Array>());
let days = Arc::new((11..15).map(Some).collect::<UInt32Array>());
#[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
let res = MakeDateFunc::new()
.invoke(&[
ColumnarValue::Array(years),
Expand All @@ -304,6 +308,7 @@ mod tests {
//

// invalid number of arguments
#[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
let res = MakeDateFunc::new()
.invoke(&[ColumnarValue::Scalar(ScalarValue::Int32(Some(1)))]);
assert_eq!(
Expand All @@ -312,6 +317,7 @@ mod tests {
);

// invalid type
#[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
let res = MakeDateFunc::new().invoke(&[
ColumnarValue::Scalar(ScalarValue::IntervalYearMonth(Some(1))),
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
Expand All @@ -323,6 +329,7 @@ mod tests {
);

// overflow of month
#[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
let res = MakeDateFunc::new().invoke(&[
ColumnarValue::Scalar(ScalarValue::Int32(Some(2023))),
ColumnarValue::Scalar(ScalarValue::UInt64(Some(u64::MAX))),
Expand All @@ -334,6 +341,7 @@ mod tests {
);

// overflow of day
#[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
let res = MakeDateFunc::new().invoke(&[
ColumnarValue::Scalar(ScalarValue::Int32(Some(2023))),
ColumnarValue::Scalar(ScalarValue::Int32(Some(22))),
Expand Down
6 changes: 6 additions & 0 deletions datafusion/functions/src/datetime/to_char.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,7 @@ mod tests {
];

for (value, format, expected) in scalar_data {
#[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
let result = ToCharFunc::new()
.invoke(&[ColumnarValue::Scalar(value), ColumnarValue::Scalar(format)])
.expect("that to_char parsed values without error");
Expand Down Expand Up @@ -458,6 +459,7 @@ mod tests {
];

for (value, format, expected) in scalar_array_data {
#[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
let result = ToCharFunc::new()
.invoke(&[
ColumnarValue::Scalar(value),
Expand Down Expand Up @@ -583,6 +585,7 @@ mod tests {
];

for (value, format, expected) in array_scalar_data {
#[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
let result = ToCharFunc::new()
.invoke(&[
ColumnarValue::Array(value as ArrayRef),
Expand All @@ -599,6 +602,7 @@ mod tests {
}

for (value, format, expected) in array_array_data {
#[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
let result = ToCharFunc::new()
.invoke(&[
ColumnarValue::Array(value),
Expand All @@ -619,6 +623,7 @@ mod tests {
//

// invalid number of arguments
#[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
let result = ToCharFunc::new()
.invoke(&[ColumnarValue::Scalar(ScalarValue::Int32(Some(1)))]);
assert_eq!(
Expand All @@ -627,6 +632,7 @@ mod tests {
);

// invalid type
#[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
let result = ToCharFunc::new().invoke(&[
ColumnarValue::Scalar(ScalarValue::Int32(Some(1))),
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
Expand Down
8 changes: 8 additions & 0 deletions datafusion/functions/src/datetime/to_date.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ mod tests {
}

fn test_scalar(sv: ScalarValue, tc: &TestCase) {
#[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
let to_date_result = ToDateFunc::new().invoke(&[ColumnarValue::Scalar(sv)]);

match to_date_result {
Expand All @@ -233,6 +234,7 @@ mod tests {
A: From<Vec<&'static str>> + Array + 'static,
{
let date_array = A::from(vec![tc.date_str]);
#[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
let to_date_result =
ToDateFunc::new().invoke(&[ColumnarValue::Array(Arc::new(date_array))]);

Expand Down Expand Up @@ -323,6 +325,7 @@ mod tests {
fn test_scalar(sv: ScalarValue, tc: &TestCase) {
let format_scalar = ScalarValue::Utf8(Some(tc.format_str.to_string()));

#[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
let to_date_result = ToDateFunc::new().invoke(&[
ColumnarValue::Scalar(sv),
ColumnarValue::Scalar(format_scalar),
Expand All @@ -347,6 +350,7 @@ mod tests {
let date_array = A::from(vec![tc.formatted_date]);
let format_array = A::from(vec![tc.format_str]);

#[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
let to_date_result = ToDateFunc::new().invoke(&[
ColumnarValue::Array(Arc::new(date_array)),
ColumnarValue::Array(Arc::new(format_array)),
Expand Down Expand Up @@ -382,6 +386,7 @@ mod tests {
let format1_scalar = ScalarValue::Utf8(Some("%Y-%m-%d".into()));
let format2_scalar = ScalarValue::Utf8(Some("%Y/%m/%d".into()));

#[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
let to_date_result = ToDateFunc::new().invoke(&[
ColumnarValue::Scalar(formatted_date_scalar),
ColumnarValue::Scalar(format1_scalar),
Expand Down Expand Up @@ -410,6 +415,7 @@ mod tests {
for date_str in test_cases {
let formatted_date_scalar = ScalarValue::Utf8(Some(date_str.into()));

#[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
let to_date_result =
ToDateFunc::new().invoke(&[ColumnarValue::Scalar(formatted_date_scalar)]);

Expand All @@ -428,6 +434,7 @@ mod tests {
let date_str = "20241231";
let date_scalar = ScalarValue::Utf8(Some(date_str.into()));

#[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
let to_date_result =
ToDateFunc::new().invoke(&[ColumnarValue::Scalar(date_scalar)]);

Expand All @@ -449,6 +456,7 @@ mod tests {
let date_str = "202412311";
let date_scalar = ScalarValue::Utf8(Some(date_str.into()));

#[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
let to_date_result =
ToDateFunc::new().invoke(&[ColumnarValue::Scalar(date_scalar)]);

Expand Down
5 changes: 3 additions & 2 deletions datafusion/functions/src/datetime/to_local_time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ mod tests {

fn test_to_local_time_helper(input: ScalarValue, expected: ScalarValue) {
let res = ToLocalTimeFunc::new()
.invoke(&[ColumnarValue::Scalar(input)])
.invoke_batch(&[ColumnarValue::Scalar(input)], 1)
.unwrap();
match res {
ColumnarValue::Scalar(res) => {
Expand Down Expand Up @@ -616,8 +616,9 @@ mod tests {
.iter()
.map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
.collect::<TimestampNanosecondArray>();
let batch_size = input.len();
let result = ToLocalTimeFunc::new()
.invoke(&[ColumnarValue::Array(Arc::new(input))])
.invoke_batch(&[ColumnarValue::Array(Arc::new(input))], batch_size)
.unwrap();
if let ColumnarValue::Array(result) = result {
assert_eq!(
Expand Down
5 changes: 2 additions & 3 deletions datafusion/functions/src/datetime/to_timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,6 @@ mod tests {
use arrow::array::{ArrayRef, Int64Array, StringBuilder};
use arrow::datatypes::TimeUnit;
use chrono::Utc;

use datafusion_common::{assert_contains, DataFusionError, ScalarValue};
use datafusion_expr::ScalarFunctionImplementation;

Expand Down Expand Up @@ -1011,7 +1010,7 @@ mod tests {
assert!(matches!(rt, Timestamp(_, Some(_))));

let res = udf
.invoke(&[array.clone()])
.invoke_batch(&[array.clone()], 1)
.expect("that to_timestamp parsed values without error");
let array = match res {
ColumnarValue::Array(res) => res,
Expand Down Expand Up @@ -1054,7 +1053,7 @@ mod tests {
assert!(matches!(rt, Timestamp(_, None)));

let res = udf
.invoke(&[array.clone()])
.invoke_batch(&[array.clone()], 1)
.expect("that to_timestamp parsed values without error");
let array = match res {
ColumnarValue::Array(res) => res,
Expand Down
Loading