Skip to content

Commit c8b609b

Browse files
committed
add the async scalar udf in udfs doc
1 parent 784f0bb commit c8b609b

File tree

1 file changed

+123
-11
lines changed

1 file changed

+123
-11
lines changed

docs/source/library-user-guide/adding-udfs.md

Lines changed: 123 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,13 @@ User Defined Functions (UDFs) are functions that can be used in the context of D
2323

2424
This page covers how to add UDFs to DataFusion. In particular, it covers how to add Scalar, Window, and Aggregate UDFs.
2525

26-
| UDF Type | Description | Example |
27-
| --------- | ---------------------------------------------------------------------------------------------------------- | ------------------- |
28-
| Scalar | A function that takes a row of data and returns a single value. | [simple_udf.rs][1] |
29-
| Window | A function that takes a row of data and returns a single value, but also has access to the rows around it. | [simple_udwf.rs][2] |
30-
| Aggregate | A function that takes a group of rows and returns a single value. | [simple_udaf.rs][3] |
31-
| Table | A function that takes parameters and returns a `TableProvider` to be used in an query plan. | [simple_udtf.rs][4] |
26+
| UDF Type | Description | Example |
27+
| ------------ | ---------------------------------------------------------------------------------------------------------- | ------------------- |
28+
| Scalar | A function that takes a row of data and returns a single value. | [simple_udf.rs][1] |
29+
| Window | A function that takes a row of data and returns a single value, but also has access to the rows around it. | [simple_udwf.rs][2] |
30+
| Aggregate | A function that takes a group of rows and returns a single value. | [simple_udaf.rs][3] |
31+
| Table | A function that takes parameters and returns a `TableProvider` to be used in an query plan. | [simple_udtf.rs][4] |
32+
| Async Scalar | A scalar function that natively supports asynchronous execution, allowing you to perform async operations (such as network or I/O calls) within the UDF. | [async_udf.rs][5] |
3233

3334
First we'll talk about adding an Scalar UDF end-to-end, then we'll talk about the differences between the different
3435
types of UDFs.
@@ -344,6 +345,122 @@ async fn main() {
344345
}
345346
```
346347

348+
## Adding a Scalar Async UDF
349+
350+
A Scalar Async UDF allows you to implement user-defined functions that support asynchronous execution, such as performing network or I/O operations within the UDF.
351+
352+
To add a Scalar Async UDF, you need to:
353+
354+
1. Implement the `AsyncScalarUDFImpl` trait to define your async function logic, signature, and types.
355+
2. Wrap your implementation with `AsyncScalarUDF::new` and register it with the `SessionContext`.
356+
357+
### Adding by `impl AsyncScalarUDFImpl`
358+
359+
```rust
360+
use arrow::array::{ArrayIter, ArrayRef, AsArray, StringArray};
361+
use arrow_schema::DataType;
362+
use async_trait::async_trait;
363+
use datafusion::common::error::Result;
364+
use datafusion::common::internal_err;
365+
use datafusion::common::types::logical_string;
366+
use datafusion::config::ConfigOptions;
367+
use datafusion::logical_expr::async_udf::{
368+
AsyncScalarFunctionArgs, AsyncScalarUDFImpl,
369+
};
370+
use datafusion::logical_expr::{
371+
ColumnarValue, Signature, TypeSignature, TypeSignatureClass, Volatility,
372+
};
373+
use datafusion::logical_expr_common::signature::Coercion;
374+
use log::trace;
375+
use std::any::Any;
376+
use std::sync::Arc;
377+
378+
#[derive(Debug)]
379+
pub struct AsyncUpper {
380+
signature: Signature,
381+
}
382+
383+
impl Default for AsyncUpper {
384+
fn default() -> Self {
385+
Self::new()
386+
}
387+
}
388+
389+
impl AsyncUpper {
390+
pub fn new() -> Self {
391+
Self {
392+
signature: Signature::new(
393+
TypeSignature::Coercible(vec![Coercion::Exact {
394+
desired_type: TypeSignatureClass::Native(logical_string()),
395+
}]),
396+
Volatility::Volatile,
397+
),
398+
}
399+
}
400+
}
401+
402+
#[async_trait]
403+
impl AsyncScalarUDFImpl for AsyncUpper {
404+
fn as_any(&self) -> &dyn Any {
405+
self
406+
}
407+
408+
fn name(&self) -> &str {
409+
"async_upper"
410+
}
411+
412+
fn signature(&self) -> &Signature {
413+
&self.signature
414+
}
415+
416+
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
417+
Ok(DataType::Utf8)
418+
}
419+
420+
fn ideal_batch_size(&self) -> Option<usize> {
421+
Some(10)
422+
}
423+
424+
async fn invoke_async_with_args(
425+
&self,
426+
args: AsyncScalarFunctionArgs,
427+
_option: &ConfigOptions,
428+
) -> Result<ArrayRef> {
429+
trace!("Invoking async_upper with args: {:?}", args);
430+
let value = &args.args[0];
431+
let result = match value {
432+
ColumnarValue::Array(array) => {
433+
let string_array = array.as_string::<i32>();
434+
let iter = ArrayIter::new(string_array);
435+
let result = iter
436+
.map(|string| string.map(|s| s.to_uppercase()))
437+
.collect::<StringArray>();
438+
Arc::new(result) as ArrayRef
439+
}
440+
_ => return internal_err!("Expected a string argument, got {:?}", value),
441+
};
442+
Ok(result)
443+
}
444+
}
445+
```
446+
447+
We can now transfer the async UDF into the normal scalar using `into_scalar_udf` to register the function with DataFusion so that it can be used in the context of a query.
448+
449+
```rust
450+
let async_upper = AsyncUpper::new();
451+
let udf = AsyncScalarUDF::new(Arc::new(async_upper));
452+
ctx.register_udf(udf.into_scalar_udf());
453+
```
454+
455+
After registration, you can use these async UDFs directly in SQL queries, for example:
456+
457+
```sql
458+
SELECT async_upper('datafusion');
459+
```
460+
461+
For async UDF implementation details, see [`async_udf.rs`](https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/async_udf.rs).
462+
463+
347464
[`scalarudf`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/struct.ScalarUDF.html
348465
[`create_udf`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/fn.create_udf.html
349466
[`process_scalar_func_inputs`]: https://docs.rs/datafusion/latest/datafusion/physical_expr/functions/fn.process_scalar_func_inputs.html
@@ -1244,8 +1361,3 @@ async fn main() -> Result<()> {
12441361
Ok(())
12451362
}
12461363
```
1247-
1248-
[1]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/simple_udf.rs
1249-
[2]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/simple_udwf.rs
1250-
[3]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/simple_udaf.rs
1251-
[4]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/simple_udtf.rs

0 commit comments

Comments
 (0)