Skip to content

Commit badcdb6

Browse files
committed
enhance doc and fix test
1 parent 8cb0b57 commit badcdb6

File tree

4 files changed

+10
-11
lines changed

4 files changed

+10
-11
lines changed

datafusion/expr/src/async_udf.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use std::sync::Arc;
3333
/// Note this is less efficient than the ScalarUDFImpl, but it can be used
3434
/// to register remote functions in the context.
3535
///
36-
/// The name is chosen to mirror ScalarUDFImpl
36+
/// The name is chosen to mirror ScalarUDFImpl
3737
#[async_trait]
3838
pub trait AsyncScalarUDFImpl: Debug + Send + Sync {
3939
/// the function cast as any
@@ -49,6 +49,9 @@ pub trait AsyncScalarUDFImpl: Debug + Send + Sync {
4949
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType>;
5050

5151
/// The ideal batch size for this function.
52+
///
53+
/// This is used to determine what size of data to be evaluated at once.
54+
/// If None, the whole batch will be evaluated at once.
5255
fn ideal_batch_size(&self) -> Option<usize> {
5356
None
5457
}
@@ -65,10 +68,6 @@ pub trait AsyncScalarUDFImpl: Debug + Send + Sync {
6568
///
6669
/// Note this is not meant to be used directly, but is meant to be an implementation detail
6770
/// for AsyncUDFImpl.
68-
///
69-
/// This is used to register remote functions in the context. The function
70-
/// should not be invoked by DataFusion. It's only used to generate the logical
71-
/// plan and unparsed them to SQL.
7271
#[derive(Debug)]
7372
pub struct AsyncScalarUDF {
7473
inner: Arc<dyn AsyncScalarUDFImpl>,

datafusion/physical-expr/src/async_scalar_function.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use std::fmt::Display;
2929
use std::hash::{Hash, Hasher};
3030
use std::sync::Arc;
3131

32-
/// Wrapper for a Async function that can be used in a DataFusion query
32+
/// Wrapper around a scalar function that can be evaluated asynchronously
3333
#[derive(Debug, Clone, Eq)]
3434
pub struct AsyncFuncExpr {
3535
/// The name of the output column this function will generate
@@ -206,7 +206,7 @@ impl PhysicalExpr for AsyncFuncExpr {
206206
}
207207

208208
fn evaluate(&self, _batch: &RecordBatch) -> Result<ColumnarValue> {
209-
// TODO: implement this
209+
// TODO: implement this for scalar value input
210210
not_impl_err!("AsyncFuncExpr.evaluate")
211211
}
212212

datafusion/physical-plan/src/async_func.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,19 +35,16 @@ use log::trace;
3535
use std::any::Any;
3636
use std::sync::Arc;
3737

38-
/// This structure evaluates a set of async expressions on a record
38+
/// This structure evaluates a set of async expressions on a record
3939
/// batch producing a new record batch
4040
///
41-
/// This is similar to a ProjectionExec except that the functions can be async
42-
///
4341
/// The schema of the output of the AsyncFuncExec is:
4442
/// Input columns followed by one column for each async expression
4543
#[derive(Debug)]
4644
pub struct AsyncFuncExec {
4745
/// The async expressions to evaluate
4846
async_exprs: Vec<Arc<AsyncFuncExpr>>,
4947
input: Arc<dyn ExecutionPlan>,
50-
/// Cache holding plan properties like equivalences, output partitioning etc.
5148
cache: PlanProperties,
5249
metrics: ExecutionPlanMetricsSet,
5350
}

datafusion/sqllogictest/test_files/explain.slt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,7 @@ physical_plan after EnforceSorting SAME TEXT AS ABOVE
241241
physical_plan after OptimizeAggregateOrder SAME TEXT AS ABOVE
242242
physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
243243
physical_plan after coalesce_batches SAME TEXT AS ABOVE
244+
physical_plan after coalesce_async_exec_input SAME TEXT AS ABOVE
244245
physical_plan after OutputRequirements DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], file_type=csv, has_header=true
245246
physical_plan after LimitAggregation SAME TEXT AS ABOVE
246247
physical_plan after LimitPushdown SAME TEXT AS ABOVE
@@ -315,6 +316,7 @@ physical_plan after EnforceSorting SAME TEXT AS ABOVE
315316
physical_plan after OptimizeAggregateOrder SAME TEXT AS ABOVE
316317
physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
317318
physical_plan after coalesce_batches SAME TEXT AS ABOVE
319+
physical_plan after coalesce_async_exec_input SAME TEXT AS ABOVE
318320
physical_plan after OutputRequirements
319321
01)GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
320322
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
@@ -355,6 +357,7 @@ physical_plan after EnforceSorting SAME TEXT AS ABOVE
355357
physical_plan after OptimizeAggregateOrder SAME TEXT AS ABOVE
356358
physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
357359
physical_plan after coalesce_batches SAME TEXT AS ABOVE
360+
physical_plan after coalesce_async_exec_input SAME TEXT AS ABOVE
358361
physical_plan after OutputRequirements
359362
01)GlobalLimitExec: skip=0, fetch=10
360363
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet

0 commit comments

Comments
 (0)