-
Notifications
You must be signed in to change notification settings - Fork 1.5k
User Defined Table Function (udtf) support #2177
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@@ -99,6 +99,7 @@ impl ExpressionVisitor for ApplicabilityVisitor<'_> { | |||
Expr::ScalarUDF { fun, .. } => { | |||
self.visit_volatility(fun.signature.volatility) | |||
} | |||
Expr::TableUDF { fun, .. } => self.visit_volatility(fun.signature.volatility), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recommend writing it like this:
Expr::ScalarUDF { fun, .. } | Expr::TableUDF { fun, .. } => {
self.visit_volatility(fun.signature.volatility)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point, however, it doesn't work in this case (because argument fun has different types for TableUDF and ScalarUDF)
@@ -381,6 +381,7 @@ impl<'a> ConstEvaluator<'a> { | |||
| Expr::QualifiedWildcard { .. } => false, | |||
Expr::ScalarFunction { fun, .. } => Self::volatility_ok(fun.volatility()), | |||
Expr::ScalarUDF { fun, .. } => Self::volatility_ok(fun.signature.volatility), | |||
Expr::TableUDF { .. } => false, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
BTW, from clippy:
|
Hmm, is
It shouldn't be an expression, right? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @gandronchik -- sorry for the delay in review. I think this PR is looking quite good 👌
Epic first PR
Would it be possible to add a test for a table function that gets no arguments (as there is code to handle that case, but I don't see coverage)?
I also had one relatively minor question related to zero argument handling; Really nice.
Also it would be nice to add a note about supporting Table Functions in https://github.com/apache/arrow-datafusion/blob/master/docs/source/user-guide/sql/sql_status.md (but we can do so as a follow on PR)
Does anyone else have questions or concerns about merging this PR?
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
//! UDTF support |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
//! UDTF support | |
//! User Defined Table Function (UDTF) support |
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
//! Udtf module contains foundational types that are used to represent UDTFs in DataFusion. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
//! Udtf module contains foundational types that are used to represent UDTFs in DataFusion. | |
//! Contains foundational types that are used to represent User Defined Table Functions (UDTFs) in DataFusion. |
fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> { | ||
// evaluate the arguments, if there are no arguments we'll instead pass in a null array | ||
// indicating the batch size (as a convention) | ||
let inputs = match (self.args.len(), self.name.parse::<BuiltinScalarFunction>()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand why we are parsing the table function name using BuiltinScalarFunction
? Don't we already have self.fun
?
Hmmmm...I have some problems about this pr. |
I had the same question. I'm not sure I understand how this is different from a scalar function. It seems like a table function should produce |
I agree it should definitely produce |
what about
|
That's essentially a You could have pub type TableFunctionImplementation =
Arc<dyn Fn(&[ColumnarValue]) -> Result<Vec<ColumnarValue>> + Send + Sync>;
// This is a terrible name but this would be analogous to ReturnTypeFunction/StateTypeFunction
pub type TableSchemaFunction =
Arc<dyn Fn(&[DataType]) -> Result<SchemaRef> + Send + Sync>; |
@alamb @thinkharderdev @doki23 i met the same problem in #2343 if we treat it as a /// Evaluate an expression against a RecordBatch
fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue>; pub enum ColumnarValue {
/// Array of values
Array(ArrayRef),
/// A single value
Scalar(ScalarValue),
} cause of it return ColumnarValue, we can not return result as a table, am i right? Should i implement a fn evaluate(&self, batch: &RecordBatch) -> Result<Vec<ColumnarValue>>; |
I left some thoughts in |
…always returns array
a0b7728
to
e979728
Compare
I plan to give this a more careful review tomorrow |
@@ -39,6 +40,10 @@ use std::sync::Arc; | |||
pub type ScalarFunctionImplementation = | |||
Arc<dyn Fn(&[ColumnarValue]) -> Result<ColumnarValue> + Send + Sync>; | |||
|
|||
/// Table function. Second tuple | |||
pub type TableFunctionImplementation = | |||
Arc<dyn Fn(&[ColumnarValue], usize) -> Result<(ArrayRef, Vec<usize>)> + Send + Sync>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as ArrayRef is one of ColumnarValue
pub enum ColumnarValue {
/// Array of values
Array(ArrayRef),
/// A single value
Scalar(ScalarValue),
}
I think TableFunctionImplementation
is same as ScalarFunctionImplementation
.
And it only generate table N*1 , if we use as #2177 (comment)
Arc<dyn Fn(&[ColumnarValue], usize) -> Result<(Vec< ColumnarValue >, Vec<usize>)> + Send + Sync>;
We could generate N*M table
If im wrong plz correct me?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or in this case it can generate N*M table
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am also a little mystified by this signature
. It looks like "Second tuple" was the start of a thought that didn't get finished? I also don't understand what the usize
in the tuple represents -- perhaps you can add some comments explaining its purpose?
Also, I agree with @Ted-Jiang 's analysis -- I would expect this signature to return a "table" (aka a RecordBatch
or a Vec<ColumnarValue>
if preferred
Perhaps something like
Arc<dyn Fn(&[ColumnarValue]) -> Result<RecordBatch> + Send + Sync>;
or
Arc<dyn Fn(&[ColumnarValue]) -> Result<Vec<ColumnarValue>> + Send + Sync>;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess that @gandronchik wants to chain each result(ArrayRef) of TableFunctionImplementation
into a multi-column result (see the code in TableFunStream::batch
), which may mean the table udf consists of multi exprs. The reason should be trait PhysicalExpr
only provides fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue>
. But I agree that Arc<dyn Fn(&[ColumnarValue]) -> Result<Vec<ColumnarValue>> + Send + Sync>
is more proper. So I believe that the approach may be directly invoke the table udf in the TableFunStream
without implementing trait PhysicalExpr
for it, or adding fn evaluate(&self, batch: &RecordBatch) -> Result<Vec<ColumnarValue>>
for PhysicalExpr
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated the header of PR. Hope it is clear enough now:)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First of all, again thank you @gandronchik for this contribution
If you are implementing a table function I would expect it to be able to return multiple rows and columns. I think this PR only implements a table function that produces multiple rows out
It may be that I have a different understanding of "table function" than you are trying to implement. A writeup up of what you are trying to do (not how you are implementing it) would likely help this conversation forward.
As I am familiar with Table Functions, they are a little tricky as they can change the cardinality and schema of their input, and thus database systems restrict where in queries they may appear.
I think typical uses are in the FROM
clause and in SELECT clause. I wonder if that sounds similar to what you are trying to do?
let result = plan_and_collect(&ctx, "SELECT integer_series(1,5)").await?; | ||
|
||
let expected = vec![ | ||
"+-----------------------------------+", | ||
"| integer_series(Int64(1),Int64(5)) |", | ||
"+-----------------------------------+", | ||
"| 1 |", | ||
"| 2 |", | ||
"| 3 |", | ||
"| 4 |", | ||
"| 5 |", | ||
"+-----------------------------------+", | ||
]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good example of a UDT
producing more row than went in 👍
Would it be possible to write an example that also produces a different number of columns than went in? I think that is what @Ted-Jiang and I are pointing out in in our comments below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't support it. You can use structures for that
assert_batches_eq!(expected, &result); | ||
|
||
let result = | ||
plan_and_collect(&ctx, "SELECT * from integer_series(1,5) pos(n)").await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain what this test is supposed to be demonstrating? I am not quite sure what it shows
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have just explained it in the header of PR. Hope I did it clear enough:)
@@ -39,6 +40,10 @@ use std::sync::Arc; | |||
pub type ScalarFunctionImplementation = | |||
Arc<dyn Fn(&[ColumnarValue]) -> Result<ColumnarValue> + Send + Sync>; | |||
|
|||
/// Table function. Second tuple | |||
pub type TableFunctionImplementation = | |||
Arc<dyn Fn(&[ColumnarValue], usize) -> Result<(ArrayRef, Vec<usize>)> + Send + Sync>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am also a little mystified by this signature
. It looks like "Second tuple" was the start of a thought that didn't get finished? I also don't understand what the usize
in the tuple represents -- perhaps you can add some comments explaining its purpose?
Also, I agree with @Ted-Jiang 's analysis -- I would expect this signature to return a "table" (aka a RecordBatch
or a Vec<ColumnarValue>
if preferred
Perhaps something like
Arc<dyn Fn(&[ColumnarValue]) -> Result<RecordBatch> + Send + Sync>;
or
Arc<dyn Fn(&[ColumnarValue]) -> Result<Vec<ColumnarValue>> + Send + Sync>;
I cannot agree. Result of Table Function represents a temporary table. Since it's a table, it shouldn't only have one column. Of course, one column of type structure can solve the problem, but it's different. We cannot directly execute |
@gandronchik thank you for the explanation in this PR's description. It helps though I will admit I still don't fully understand what is going o. I agree with @doki23 -- I expect a table function to logically return a table (that something with both rows and columns)
The way the rest of DataFusion avoids buffering all the intermediate results at once int memory is with If you wanted a streaming solution, that would mean the signature might look something like the following (maybe) Arc<dyn Fn(Box<dyn SendableRecordBatchStream>) -> Result<Box<dyn SendableRecordBatchStream>> + Send + Sync>; |
Looks like I got the title wrong. I have implemented a function that returns many rows, probably it is not a table function. If I rename it, will it be fine? Regarding the function signature, I think my solution is a compromise between vec and streaming. Actually, I don't think that function can return so many rows. However, of course, I will rewrite it if you want. So which solution do we choose: current |
I think adding UDTFs (aka user defined table functions) that produce a 2 dimensional table output (aka I think Spark calls these "table value functions": Postgres calls them table functions: https://www.postgresql.org/docs/7.3/xfunc-tablefunctions.html However, this PR does not implement table functions that I can see. I still don't fully understand the usecase for the code in this PR for a function that returns a single column of values and I don't know of any other system that implements such functions. Thus I feel that this PR adds a feature that is not widely usable to DataFusion users as a whole, and so I don't feel I can approve it. If others (users or maintainers) have a perspective on this issue, I would love to hear them too. If there is broader support for this feature, I won't oppose merging it. |
marking as draft until we figure out what to do with this |
@alamb Hello! Sorry for the long response. I am sorry for so big PR with so a bad description. Now I try to explain what is happening here. As I know DataFunction is oriented on PostgreSQL behavior. So, the functionality I provide here is Postgres functionality. We already use it in Cube.js. We implemented a several functions:
Please, look at my PR closer. I am ready to improve it, rename some structures, etc. Bellow, I provide the implementation of generate_series function (real Postgres function):
|
Thanks @gandronchik -- I will try and find time to re-review this PR over the next few days in light of the information above. |
@alamb Hello! Have you had already time to check the PR? |
Hi @gandronchik sadly I have not had a chance. I apologize for my lack of bandwidth but it is hard to find sufficient contiguous time to review such large PRs when I don't have the background context. My core problem is that I don't understand (despite your admirable attempts to clarify) what this PR is trying to implement, so it is very hard to evaluate the code to see if it is implementing what is desired (because I don't understand what is desired). For example, all the examples of "set returning functions" in the links you shared in postgres appear to use those functions as elements in the select * from unnest(ARRAY[1,2], ARRAY['foo','bar','baz']) as x(a,b) → So I am struggling to understand examples you share in the PR's description that show using these functions in combination with a column 🤔 select table_fun(1, col) from (select 2 col union all select 3 col) t; So what would you think about implementing more general user defined table functions (that can return RecordBatches / streams as we have discussed above)? I think others would also likely use such functionality and it seems like it would satisfy the usecases from cube.js (?) |
@alamb Hello! I think it will be easier to understand what I implemented here if you check how
Before these changes, DataFusion had only udf (returns only one row per each input row) and udaf (returns one row per any count of input rows). My changes allow to return multiply rows per each input row. |
This PR is more than 6 month old, so closing it down for now to clean up the PR list. Please reopen if this is a mistake and you plan to work on it more |
UDTF support (User-defined functions returning table)
In my understanding table function returns multiple rows. For now, we have only UDF which returns a scalar value.
I don't think it should return multiply columns, structures are usually used for this.
we have the following cases:
1. select table_fun(1, 5);
it is the easiest scenario. The function just returns vec of values.
2. select table_fun(1, col) from (select 2 col union all select 3 col) t;
The function returns a batch.
3. select col, table_fun(1, col) from (select 2 col union all select 3 col) t;
it is the most difficult case. In this case, we have to transform data flow, because as you can see from the result, we have to duplicate col for each row of table_fun result.
4. select * from table_fun(1, 5);
In this case, in this case, the result is the same as in the first case. However, we have another plan structure here.
5. select * from table_fun(1, 5) t(n);
It looks the same with the previous case, however we have a bit different plan here to support alias (because table_fun node not support aliases and we have to add projection).
Regarding signature, I decided to use a single vector and vector with sizes of sections instead of vec of vecs to have better performance. If we use Vec, this will require a lot of memory in case of a request for millions of rows.